-- SWH DB schema upgrade -- from_version: 32 -- to_version: 33 -- description: Archive old new_task/new_task-run tasks rollback; begin; insert into dbversion (version, release, description) values (33, now(), 'Work In Progress'); -- Schema create table new_task ( id bigserial primary key, type text, -- not null references task_type(type), arguments jsonb not null, next_run timestamptz not null, current_interval interval, status task_status not null, policy task_policy not null default 'recurring', retries_left bigint not null default 0, priority task_priority -- references priority_ratio(id) -- check (policy <> 'recurring' or current_interval is not null) ); comment on table new_task is 'Schedule of recurring tasks'; comment on column new_task.arguments is 'Arguments passed to the underlying job scheduler. ' 'Contains two keys, ''args'' (list) and ''kwargs'' (object).'; comment on column new_task.next_run is 'The next run of this new_task should be run on or after that time'; comment on column new_task.current_interval is 'The interval between two runs of this new_task, ' 'taking into account the backoff factor'; comment on column new_task.policy is 'Whether the new_task is one-shot or recurring'; comment on column new_task.retries_left is 'The number of "short delay" retries of the new_task in case of ' 'transient failure'; comment on column new_task.priority is 'Policy of the given new_task'; comment on column new_task.id is 'Task Identifier'; comment on column new_task.type is 'References task_type table'; comment on column new_task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')'; create table new_task_run ( id bigserial primary key, task bigint, -- not null references task(id), backend_id text, scheduled timestamptz, started timestamptz, ended timestamptz, metadata jsonb, status task_run_status -- not null default 'scheduled' ); comment on table new_task_run is 'History of new_task runs sent to the job-running backend'; comment on column new_task_run.backend_id is 'id of the new_task run in the job-running backend'; comment on column new_task_run.metadata is 'Useful metadata for the given new_task run. ' 'For instance, the worker that took on the job, ' 'or the logs for the run.'; comment on column new_task_run.id is 'Task run identifier'; comment on column new_task_run.task is 'References new_task table'; comment on column new_task_run.scheduled is 'Scheduled run time for new_task'; comment on column new_task_run.started is 'Task starting time'; comment on column new_task_run.ended is 'Task ending time'; -------------------------- -- Actual migration script -------------------------- -- Keep only the necessary recurring task runs (from tasks lister & load-nixguix) insert into new_task_run select id, task, backend_id, scheduled, started, ended, metadata, status from task_run where task in ( select distinct id from task where ( policy='recurring' and (type = 'load-nixguix' or type like 'list-%' or type like 'index-%') ) or ( policy = 'oneshot' and next_run > now() - interval '2 months' ) ); insert into new_task(id, type, arguments, next_run, current_interval, status, policy, retries_left, priority) select distinct id, type, arguments, next_run, current_interval, status, policy, retries_left, priority from task where (policy='recurring' and (type = 'load-nixguix' or type like 'list-%' or type like 'index-%')) or (policy = 'oneshot' and next_run > now() - interval '2 months') ; -- ----------- -- -- renaming -- ----------- -- -- Rename current tables to archive_ prefixed names tables alter table task rename to archive_task; alter table task_run rename to archive_task_run; -- -- Rename new tables to standard tables alter table new_task rename to task; alter table new_task_run rename to task_run; -- -- ------------- -- -- -- pk and fks -- -- ------------- alter table task alter column type set not null; alter table task add constraint task_type_fk foreign key (type) references task_type (type); alter table task add constraint task_priority_fk foreign key (priority) references priority_ratio (id); alter table task add constraint task_check_policy check (policy <> 'recurring' or current_interval is not null) not valid; alter table task validate constraint task_check_policy; alter table task_run alter column task set not null, add constraint task_id_fk foreign key (task) references task (id); alter table task_run alter column status set not null, alter column status set default 'scheduled'; -------- -- index -------- create index on task(type); create index on task(next_run); create index on task using btree(type, md5(arguments::text)); create index on task(priority); create index on task_run(task); create index on task_run(backend_id); create index on task_run(task asc, started asc); create index on task(type, next_run) where status = 'next_run_not_scheduled'::task_status; ---------- -- trigger ---------- create or replace function swh_scheduler_update_task_on_task_end () returns trigger language plpgsql as $$ declare cur_task task%rowtype; cur_task_type task_type%rowtype; adjustment_factor float; new_interval interval; begin select * from task where id = new.task into cur_task; select * from task_type where type = cur_task.type into cur_task_type; case when new.status = 'permfailed' then update task set status = 'disabled' where id = cur_task.id; when new.status in ('eventful', 'uneventful') then case when cur_task.policy = 'oneshot' then update task set status = 'completed' where id = cur_task.id; when cur_task.policy = 'recurring' then if new.status = 'uneventful' then adjustment_factor := 1/cur_task_type.backoff_factor; else adjustment_factor := 1/cur_task_type.backoff_factor; end if; new_interval := greatest( cur_task_type.min_interval, least( cur_task_type.max_interval, adjustment_factor * cur_task.current_interval)); update task set status = 'next_run_not_scheduled', next_run = new.ended + new_interval, current_interval = new_interval, retries_left = coalesce(cur_task_type.num_retries, 0) where id = cur_task.id; end case; else -- new.status in 'failed', 'lost' if cur_task.retries_left > 0 then update task set status = 'next_run_not_scheduled', next_run = new.ended + coalesce(cur_task_type.retry_delay, interval '1 hour'), retries_left = cur_task.retries_left - 1 where id = cur_task.id; else -- no retries left case when cur_task.policy = 'oneshot' then update task set status = 'disabled' where id = cur_task.id; when cur_task.policy = 'recurring' then update task set status = 'next_run_not_scheduled', next_run = new.ended + cur_task.current_interval, retries_left = coalesce(cur_task_type.num_retries, 0) where id = cur_task.id; end case; end if; -- retries end case; return null; end; $$; create trigger update_task_on_task_end after update of status on task_run for each row when (new.status NOT IN ('scheduled', 'started')) execute procedure swh_scheduler_update_task_on_task_end ();