-- SWH DB schema upgrade
-- from_version: 32
-- to_version: 33
-- description: Archive old new_task/new_task-run tasks
rollback;
begin;
insert into dbversion (version, release, description)
values (33, now(), 'Work In Progress');
-- Schema
create table new_task (
id bigserial primary key,
type text, -- not null references task_type(type),
arguments jsonb not null,
next_run timestamptz not null,
current_interval interval,
status task_status not null,
policy task_policy not null default 'recurring',
retries_left bigint not null default 0,
priority task_priority -- references priority_ratio(id)
-- check (policy <> 'recurring' or current_interval is not null)
);
comment on table new_task is 'Schedule of recurring tasks';
comment on column new_task.arguments is 'Arguments passed to the underlying job scheduler. '
'Contains two keys, ''args'' (list) and ''kwargs'' (object).';
comment on column new_task.next_run is 'The next run of this new_task should be run on or after that time';
comment on column new_task.current_interval is 'The interval between two runs of this new_task, '
'taking into account the backoff factor';
comment on column new_task.policy is 'Whether the new_task is one-shot or recurring';
comment on column new_task.retries_left is 'The number of "short delay" retries of the new_task in case of '
'transient failure';
comment on column new_task.priority is 'Policy of the given new_task';
comment on column new_task.id is 'Task Identifier';
comment on column new_task.type is 'References task_type table';
comment on column new_task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')';
create table new_task_run (
id bigserial primary key,
task bigint, -- not null references task(id),
backend_id text,
scheduled timestamptz,
started timestamptz,
ended timestamptz,
metadata jsonb,
status task_run_status -- not null default 'scheduled'
);
comment on table new_task_run is 'History of new_task runs sent to the job-running backend';
comment on column new_task_run.backend_id is 'id of the new_task run in the job-running backend';
comment on column new_task_run.metadata is 'Useful metadata for the given new_task run. '
'For instance, the worker that took on the job, '
'or the logs for the run.';
comment on column new_task_run.id is 'Task run identifier';
comment on column new_task_run.task is 'References new_task table';
comment on column new_task_run.scheduled is 'Scheduled run time for new_task';
comment on column new_task_run.started is 'Task starting time';
comment on column new_task_run.ended is 'Task ending time';
--------------------------
-- Actual migration script
--------------------------
-- Keep only the necessary recurring task runs (from tasks lister & load-nixguix)
insert into new_task_run
select id, task, backend_id, scheduled, started, ended, metadata, status
from task_run where task in (
select distinct id from task
where (
policy='recurring' and (type = 'load-nixguix' or type like 'list-%' or type like 'index-%')
) or (
policy = 'oneshot' and next_run > now() - interval '2 months'
)
);
insert into new_task(id, type, arguments, next_run, current_interval, status, policy, retries_left, priority)
select distinct id, type, arguments, next_run, current_interval, status, policy, retries_left, priority
from task
where (policy='recurring' and (type = 'load-nixguix' or type like 'list-%' or type like 'index-%'))
or (policy = 'oneshot' and next_run > now() - interval '2 months')
;
-- -----------
-- -- renaming
-- -----------
-- -- Rename current tables to archive_ prefixed names tables
alter table task rename to archive_task;
alter table task_run rename to archive_task_run;
-- -- Rename new tables to standard tables
alter table new_task rename to task;
alter table new_task_run rename to task_run;
-- -- -------------
-- -- -- pk and fks
-- -- -------------
alter table task
alter column type set not null;
alter table task
add constraint task_type_fk
foreign key (type) references task_type (type);
alter table task
add constraint task_priority_fk
foreign key (priority) references priority_ratio (id);
alter table task
add constraint task_check_policy
check (policy <> 'recurring' or current_interval is not null)
not valid;
alter table task
validate constraint task_check_policy;
alter table task_run
alter column task set not null,
add constraint task_id_fk
foreign key (task) references task (id);
alter table task_run
alter column status set not null,
alter column status set default 'scheduled';
--------
-- index
--------
create index on task(type);
create index on task(next_run);
create index on task using btree(type, md5(arguments::text));
create index on task(priority);
create index on task_run(task);
create index on task_run(backend_id);
create index on task_run(task asc, started asc);
create index on task(type, next_run)
where status = 'next_run_not_scheduled'::task_status;
----------
-- trigger
----------
create or replace function swh_scheduler_update_task_on_task_end ()
returns trigger
language plpgsql
as $$
declare
cur_task task%rowtype;
cur_task_type task_type%rowtype;
adjustment_factor float;
new_interval interval;
begin
select * from task where id = new.task into cur_task;
select * from task_type where type = cur_task.type into cur_task_type;
case
when new.status = 'permfailed' then
update task
set status = 'disabled'
where id = cur_task.id;
when new.status in ('eventful', 'uneventful') then
case
when cur_task.policy = 'oneshot' then
update task
set status = 'completed'
where id = cur_task.id;
when cur_task.policy = 'recurring' then
if new.status = 'uneventful' then
adjustment_factor := 1/cur_task_type.backoff_factor;
else
adjustment_factor := 1/cur_task_type.backoff_factor;
end if;
new_interval := greatest(
cur_task_type.min_interval,
least(
cur_task_type.max_interval,
adjustment_factor * cur_task.current_interval));
update task
set status = 'next_run_not_scheduled',
next_run = new.ended + new_interval,
current_interval = new_interval,
retries_left = coalesce(cur_task_type.num_retries, 0)
where id = cur_task.id;
end case;
else -- new.status in 'failed', 'lost'
if cur_task.retries_left > 0 then
update task
set status = 'next_run_not_scheduled',
next_run = new.ended + coalesce(cur_task_type.retry_delay, interval '1 hour'),
retries_left = cur_task.retries_left - 1
where id = cur_task.id;
else -- no retries left
case
when cur_task.policy = 'oneshot' then
update task
set status = 'disabled'
where id = cur_task.id;
when cur_task.policy = 'recurring' then
update task
set status = 'next_run_not_scheduled',
next_run = new.ended + cur_task.current_interval,
retries_left = coalesce(cur_task_type.num_retries, 0)
where id = cur_task.id;
end case;
end if; -- retries
end case;
return null;
end;
$$;
create trigger update_task_on_task_end
after update of status on task_run
for each row
when (new.status NOT IN ('scheduled', 'started'))
execute procedure swh_scheduler_update_task_on_task_end ();