diff --git a/sql/swh-scheduler-schema.sql b/sql/swh-scheduler-schema.sql index 416df75..2863ea6 100644 --- a/sql/swh-scheduler-schema.sql +++ b/sql/swh-scheduler-schema.sql @@ -1,182 +1,222 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); +comment on table dbversion is 'Schema update tracking'; + insert into dbversion (version, release, description) values (1, now(), 'Work In Progress'); create table task_type ( type text primary key, description text not null, - task_name text not null, + backend_name text not null, default_interval interval not null, min_interval interval not null, max_interval interval not null, backoff_factor float not null ); -create type task_status as enum ('pending', 'scheduled', 'disabled'); +comment on table task_type is 'Types of schedulable tasks'; +comment on column task_type.type is 'Short identifier for the task type'; +comment on column task_type.description is 'Human-readable task description'; +comment on column task_type.backend_name is 'Name of the task in the job-running backend'; +comment on column task_type.default_interval is 'Default interval for newly scheduled tasks'; +comment on column task_type.min_interval is 'Minimum interval between two runs of a task'; +comment on column task_type.max_interval is 'Maximum interval between two runs of a task'; +comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs'; + +create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'disabled'); +comment on type task_status is 'Status of a given task'; create table task ( id bigserial primary key, type text not null references task_type(type), arguments jsonb not null, next_run timestamptz not null, current_interval interval not null, status task_status not null ); +comment on table task is 'Schedule of recurring tasks'; +comment on column task.arguments is 'Arguments passed to the underlying job scheduler. ' + 'Contains two keys, ''args'' (list) and ''kwargs'' (object).'; +comment on column task.next_run is 'The next run of this task should be run on or after that time'; +comment on column task.current_interval is 'The interval between two runs of this task, ' + 'taking into account the backoff factor'; create index on task(type); create index on task(next_run); -create index on task using btree ((arguments -> 'args')); -create index on task using gin ((arguments -> 'kwargs')); +create index task_args on task using btree ((arguments -> 'args')); +create index task_kwargs on task using gin ((arguments -> 'kwargs')); create table task_run ( id bigserial primary key, task bigint not null references task(id), backend_id text, scheduled timestamptz, started timestamptz, ended timestamptz, metadata jsonb, eventful boolean ); +comment on table task_run is 'History of task runs sent to the job-running backend'; +comment on column task_run.backend_id is 'id of the task run in the job-running backend'; +comment on column task_run.metadata is 'Useful metadata for the given task run. ' + 'For instance, the worker that took on the job, ' + 'or the logs for the run.'; +comment on column task_run.eventful is 'Whether the task run has seen an update (and the ' + 'task running interval should be reduced)'; create index on task_run(task); create index on task_run(backend_id); create or replace function swh_scheduler_mktemp_task () returns void language sql as $$ create temporary table tmp_task ( like task excluding indexes ) on commit drop; alter table tmp_task drop column id, drop column current_interval, drop column status; $$; +comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation'; + + create or replace function swh_scheduler_create_tasks_from_temp () returns setof task language plpgsql as $$ begin return query - insert into task (type, arguments, next_run, current_interval, status) - select type, arguments, next_run, (select default_interval from task_type tt where tt.type = type), 'pending' + insert into task (type, arguments, next_run, status, current_interval) + select type, arguments, next_run, 'next_run_not_scheduled', + (select default_interval from task_type tt where tt.type = type) from tmp_task returning task.*; end; $$; -create or replace function swh_scheduler_peek_pending_tasks (ts timestamptz default now(), num_tasks bigint default NULL) +comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; + +create or replace function swh_scheduler_peek_ready_tasks (ts timestamptz default now(), + num_tasks bigint default NULL) returns setof task language sql stable as $$ select * from task where next_run <= ts - and status='pending' + and status = 'next_run_not_scheduled' order by next_run limit num_tasks; $$; -create or replace function swh_scheduler_grab_pending_tasks (ts timestamptz default now(), num_tasks bigint default NULL) +create or replace function swh_scheduler_grab_ready_tasks (ts timestamptz default now(), + num_tasks bigint default NULL) returns setof task language sql as $$ update task - set status='scheduled' + set status='next_run_scheduled' from ( select id from task where next_run <= ts - and status='pending' + and status='next_run_not_scheduled' order by next_run limit num_tasks for update skip locked ) next_tasks where task.id = next_tasks.id returning task.*; $$; -create or replace function swh_scheduler_schedule_task_run (task_id bigint, backend_id text, metadata jsonb default '{}'::jsonb) +create or replace function swh_scheduler_schedule_task_run (task_id bigint, + backend_id text, + metadata jsonb default '{}'::jsonb) returns task_run language sql as $$ insert into task_run (task, backend_id, metadata, scheduled) values (task_id, backend_id, metadata, now()) returning *; $$; -create or replace function swh_scheduler_start_task_run (backend_id text, metadata jsonb default '{}'::jsonb) +create or replace function swh_scheduler_start_task_run (backend_id text, + metadata jsonb default '{}'::jsonb) returns task_run language sql as $$ update task_run set started = now (), metadata = task_run.metadata || swh_scheduler_start_task_run.metadata where task_run.backend_id = swh_scheduler_start_task_run.backend_id returning *; $$; -create or replace function swh_scheduler_end_task_run (backend_id text, eventful boolean, metadata jsonb default '{}'::jsonb) +create or replace function swh_scheduler_end_task_run (backend_id text, + eventful boolean, + metadata jsonb default '{}'::jsonb) returns task_run language sql as $$ update task_run set ended = now (), eventful = swh_scheduler_end_task_run.eventful, metadata = task_run.metadata || swh_scheduler_end_task_run.metadata where task_run.backend_id = swh_scheduler_end_task_run.backend_id returning *; $$; -create or replace function swh_scheduler_compute_new_task_interval (task_type text, current_interval interval, eventful boolean) +create or replace function swh_scheduler_compute_new_task_interval (task_type text, + current_interval interval, + eventful boolean) returns interval language plpgsql stable as $$ declare task_type_row task_type%rowtype; adjustment_factor float; begin select * from task_type where type = swh_scheduler_compute_new_task_interval.task_type into task_type_row; if eventful then adjustment_factor := 1/task_type_row.backoff_factor; else adjustment_factor := task_type_row.backoff_factor; end if; return greatest(task_type_row.min_interval, least(task_type_row.max_interval, adjustment_factor * current_interval)); end; $$; create or replace function swh_scheduler_update_task_interval () returns trigger language plpgsql as $$ begin update task set status = 'pending', - current_interval = swh_scheduler_compute_new_task_interval(type, current_interval, NEW.eventful), - next_run = now () + swh_scheduler_compute_new_task_interval(type, current_interval, NEW.eventful) - where id = NEW.task; - return NULL; + current_interval = swh_scheduler_compute_new_task_interval(type, current_interval, new.eventful), + next_run = now () + swh_scheduler_compute_new_task_interval(type, current_interval, new.eventful) + where id = new.task; + return null; end; $$; create trigger update_interval_on_task_end after update of ended, eventful on task_run for each row execute procedure swh_scheduler_update_task_interval ();