diff --git a/sql/swh-scheduler-schema.sql b/sql/swh-scheduler-schema.sql index 2863ea6..f463598 100644 --- a/sql/swh-scheduler-schema.sql +++ b/sql/swh-scheduler-schema.sql @@ -1,222 +1,222 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) values (1, now(), 'Work In Progress'); create table task_type ( type text primary key, description text not null, backend_name text not null, default_interval interval not null, min_interval interval not null, max_interval interval not null, backoff_factor float not null ); comment on table task_type is 'Types of schedulable tasks'; comment on column task_type.type is 'Short identifier for the task type'; comment on column task_type.description is 'Human-readable task description'; comment on column task_type.backend_name is 'Name of the task in the job-running backend'; comment on column task_type.default_interval is 'Default interval for newly scheduled tasks'; comment on column task_type.min_interval is 'Minimum interval between two runs of a task'; comment on column task_type.max_interval is 'Maximum interval between two runs of a task'; comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs'; create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'disabled'); comment on type task_status is 'Status of a given task'; create table task ( id bigserial primary key, type text not null references task_type(type), arguments jsonb not null, next_run timestamptz not null, current_interval interval not null, status task_status not null ); comment on table task is 'Schedule of recurring tasks'; comment on column task.arguments is 'Arguments passed to the underlying job scheduler. ' 'Contains two keys, ''args'' (list) and ''kwargs'' (object).'; comment on column task.next_run is 'The next run of this task should be run on or after that time'; comment on column task.current_interval is 'The interval between two runs of this task, ' 'taking into account the backoff factor'; create index on task(type); create index on task(next_run); create index task_args on task using btree ((arguments -> 'args')); create index task_kwargs on task using gin ((arguments -> 'kwargs')); create table task_run ( id bigserial primary key, task bigint not null references task(id), backend_id text, scheduled timestamptz, started timestamptz, ended timestamptz, metadata jsonb, eventful boolean ); comment on table task_run is 'History of task runs sent to the job-running backend'; comment on column task_run.backend_id is 'id of the task run in the job-running backend'; comment on column task_run.metadata is 'Useful metadata for the given task run. ' 'For instance, the worker that took on the job, ' 'or the logs for the run.'; comment on column task_run.eventful is 'Whether the task run has seen an update (and the ' 'task running interval should be reduced)'; create index on task_run(task); create index on task_run(backend_id); create or replace function swh_scheduler_mktemp_task () returns void language sql as $$ create temporary table tmp_task ( like task excluding indexes ) on commit drop; alter table tmp_task drop column id, drop column current_interval, drop column status; $$; comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation'; create or replace function swh_scheduler_create_tasks_from_temp () returns setof task language plpgsql as $$ begin return query insert into task (type, arguments, next_run, status, current_interval) select type, arguments, next_run, 'next_run_not_scheduled', (select default_interval from task_type tt where tt.type = type) from tmp_task returning task.*; end; $$; comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; create or replace function swh_scheduler_peek_ready_tasks (ts timestamptz default now(), num_tasks bigint default NULL) returns setof task language sql stable as $$ select * from task where next_run <= ts and status = 'next_run_not_scheduled' order by next_run limit num_tasks; $$; create or replace function swh_scheduler_grab_ready_tasks (ts timestamptz default now(), num_tasks bigint default NULL) returns setof task language sql as $$ update task set status='next_run_scheduled' from ( select id from task where next_run <= ts and status='next_run_not_scheduled' order by next_run limit num_tasks for update skip locked ) next_tasks where task.id = next_tasks.id returning task.*; $$; create or replace function swh_scheduler_schedule_task_run (task_id bigint, backend_id text, metadata jsonb default '{}'::jsonb) returns task_run language sql as $$ insert into task_run (task, backend_id, metadata, scheduled) values (task_id, backend_id, metadata, now()) returning *; $$; create or replace function swh_scheduler_start_task_run (backend_id text, metadata jsonb default '{}'::jsonb) returns task_run language sql as $$ update task_run set started = now (), metadata = task_run.metadata || swh_scheduler_start_task_run.metadata where task_run.backend_id = swh_scheduler_start_task_run.backend_id returning *; $$; create or replace function swh_scheduler_end_task_run (backend_id text, eventful boolean, metadata jsonb default '{}'::jsonb) returns task_run language sql as $$ update task_run set ended = now (), eventful = swh_scheduler_end_task_run.eventful, metadata = task_run.metadata || swh_scheduler_end_task_run.metadata where task_run.backend_id = swh_scheduler_end_task_run.backend_id returning *; $$; create or replace function swh_scheduler_compute_new_task_interval (task_type text, current_interval interval, eventful boolean) returns interval language plpgsql stable as $$ declare task_type_row task_type%rowtype; adjustment_factor float; begin select * from task_type where type = swh_scheduler_compute_new_task_interval.task_type into task_type_row; if eventful then adjustment_factor := 1/task_type_row.backoff_factor; else adjustment_factor := task_type_row.backoff_factor; end if; return greatest(task_type_row.min_interval, least(task_type_row.max_interval, adjustment_factor * current_interval)); end; $$; create or replace function swh_scheduler_update_task_interval () returns trigger language plpgsql as $$ begin update task - set status = 'pending', + set status = 'next_run_not_scheduled', current_interval = swh_scheduler_compute_new_task_interval(type, current_interval, new.eventful), next_run = now () + swh_scheduler_compute_new_task_interval(type, current_interval, new.eventful) where id = new.task; return null; end; $$; create trigger update_interval_on_task_end after update of ended, eventful on task_run for each row execute procedure swh_scheduler_update_task_interval (); diff --git a/sql/swh-scheduler-testdata.sql b/sql/swh-scheduler-testdata.sql index d32d837..df1da98 100644 --- a/sql/swh-scheduler-testdata.sql +++ b/sql/swh-scheduler-testdata.sql @@ -1,30 +1,30 @@ begin; insert into task_type - (type, task_name, description, min_interval, max_interval, default_interval, backoff_factor) + (type, backend_name, description, min_interval, max_interval, default_interval, backoff_factor) values ('git-updater', 'swh.loader.git.tasks.UpdateGitRepository', 'Git repository updater', '12 hours'::interval, '30 days'::interval, '1 day'::interval, 2); select swh_scheduler_mktemp_task (); insert into tmp_task (type, arguments, next_run) values ('git-updater', '{"args":["git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git"], "kwargs":{}}'::jsonb, now()); select * from swh_scheduler_create_tasks_from_temp (); commit; -select * from swh_scheduler_grab_pending_tasks (); +select * from swh_scheduler_grab_ready_tasks (); select * from swh_scheduler_schedule_task_run(1, 'foo'); select * from swh_scheduler_start_task_run('foo', '{"worker": "worker01"}'); select * from swh_scheduler_end_task_run('foo', true, '{"logs": "foobarbaz"}'); select * from task; select * from task_run;