Page MenuHomeSoftware Heritage
Paste P1253

Clean up scheduler db script -- ran on dump extracted from swh-scheduler db (staging) and mounted back on swh-scheduler-dev db (docker)

Authored by ardumont on Jan 10 2022, 3:52 PM.
-- SWH DB schema upgrade
-- from_version: 32
-- to_version: 33
-- description: Archive old new_task/new_task-run tasks
insert into dbversion (version, release, description)
values (33, now(), 'Work In Progress');
-- Schema
create table new_task (
id bigserial primary key,
type text, -- not null references task_type(type),
arguments jsonb not null,
next_run timestamptz not null,
current_interval interval,
status task_status not null,
policy task_policy not null default 'recurring',
retries_left bigint not null default 0,
priority task_priority -- references priority_ratio(id)
-- check (policy <> 'recurring' or current_interval is not null)
comment on table new_task is 'Schedule of recurring tasks';
comment on column new_task.arguments is 'Arguments passed to the underlying job scheduler. '
'Contains two keys, ''args'' (list) and ''kwargs'' (object).';
comment on column new_task.next_run is 'The next run of this new_task should be run on or after that time';
comment on column new_task.current_interval is 'The interval between two runs of this new_task, '
'taking into account the backoff factor';
comment on column new_task.policy is 'Whether the new_task is one-shot or recurring';
comment on column new_task.retries_left is 'The number of "short delay" retries of the new_task in case of '
'transient failure';
comment on column new_task.priority is 'Policy of the given new_task';
comment on column is 'Task Identifier';
comment on column new_task.type is 'References task_type table';
comment on column new_task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')';
create table new_task_run (
id bigserial primary key,
task bigint, -- not null references task(id),
backend_id text,
scheduled timestamptz,
started timestamptz,
ended timestamptz,
metadata jsonb,
status task_run_status -- not null default 'scheduled'
comment on table new_task_run is 'History of new_task runs sent to the job-running backend';
comment on column new_task_run.backend_id is 'id of the new_task run in the job-running backend';
comment on column new_task_run.metadata is 'Useful metadata for the given new_task run. '
'For instance, the worker that took on the job, '
'or the logs for the run.';
comment on column is 'Task run identifier';
comment on column new_task_run.task is 'References new_task table';
comment on column new_task_run.scheduled is 'Scheduled run time for new_task';
comment on column new_task_run.started is 'Task starting time';
comment on column new_task_run.ended is 'Task ending time';
-- Actual migration script
-- Keep only the necessary recurring task runs (from tasks lister & load-nixguix)
insert into new_task_run
select id, task, backend_id, scheduled, started, ended, metadata, status
from task_run where task in (
select distinct id from task
where (
policy='recurring' and (type = 'load-nixguix' or type like 'list-%' or type like 'index-%')
) or (
policy = 'oneshot' and next_run > now() - interval '2 months'
insert into new_task(id, type, arguments, next_run, current_interval, status, policy, retries_left, priority)
select distinct id, type, arguments, next_run, current_interval, status, policy, retries_left, priority
from task
where (policy='recurring' and (type = 'load-nixguix' or type like 'list-%' or type like 'index-%'))
or (policy = 'oneshot' and next_run > now() - interval '2 months')
-- -----------
-- -- renaming
-- -----------
-- -- Rename current tables to archive_ prefixed names tables
alter table task rename to archive_task;
alter table task_run rename to archive_task_run;
-- -- Rename new tables to standard tables
alter table new_task rename to task;
alter table new_task_run rename to task_run;
-- -- -------------
-- -- -- pk and fks
-- -- -------------
alter table task
alter column type set not null;
alter table task
add constraint task_type_fk
foreign key (type) references task_type (type);
alter table task
add constraint task_priority_fk
foreign key (priority) references priority_ratio (id);
alter table task
add constraint task_check_policy
check (policy <> 'recurring' or current_interval is not null)
not valid;
alter table task
validate constraint task_check_policy;
alter table task_run
alter column task set not null,
add constraint task_id_fk
foreign key (task) references task (id);
alter table task_run
alter column status set not null,
alter column status set default 'scheduled';
-- index
create index on task(type);
create index on task(next_run);
create index on task using btree(type, md5(arguments::text));
create index on task(priority);
create index on task_run(task);
create index on task_run(backend_id);
create index on task_run(task asc, started asc);
create index on task(type, next_run)
where status = 'next_run_not_scheduled'::task_status;
-- trigger
create or replace function swh_scheduler_update_task_on_task_end ()
returns trigger
language plpgsql
as $$
cur_task task%rowtype;
cur_task_type task_type%rowtype;
adjustment_factor float;
new_interval interval;
select * from task where id = new.task into cur_task;
select * from task_type where type = cur_task.type into cur_task_type;
when new.status = 'permfailed' then
update task
set status = 'disabled'
where id =;
when new.status in ('eventful', 'uneventful') then
when cur_task.policy = 'oneshot' then
update task
set status = 'completed'
where id =;
when cur_task.policy = 'recurring' then
if new.status = 'uneventful' then
adjustment_factor := 1/cur_task_type.backoff_factor;
adjustment_factor := 1/cur_task_type.backoff_factor;
end if;
new_interval := greatest(
adjustment_factor * cur_task.current_interval));
update task
set status = 'next_run_not_scheduled',
next_run = new.ended + new_interval,
current_interval = new_interval,
retries_left = coalesce(cur_task_type.num_retries, 0)
where id =;
end case;
else -- new.status in 'failed', 'lost'
if cur_task.retries_left > 0 then
update task
set status = 'next_run_not_scheduled',
next_run = new.ended + coalesce(cur_task_type.retry_delay, interval '1 hour'),
retries_left = cur_task.retries_left - 1
where id =;
else -- no retries left
when cur_task.policy = 'oneshot' then
update task
set status = 'disabled'
where id =;
when cur_task.policy = 'recurring' then
update task
set status = 'next_run_not_scheduled',
next_run = new.ended + cur_task.current_interval,
retries_left = coalesce(cur_task_type.num_retries, 0)
where id =;
end case;
end if; -- retries
end case;
return null;
create trigger update_task_on_task_end
after update of status on task_run
for each row
when (new.status NOT IN ('scheduled', 'started'))
execute procedure swh_scheduler_update_task_on_task_end ();

Event Timeline

Successful run on docker swh-scheduler-db with a staging dump:

17:58:48 !swh-scheduler@localhost:5433=# \i ../33.sql
Time: 0.302 ms
Time: 0.216 ms
insert into dbversion (version, release, description)
       values (33, now(), 'Work In Progress');
Time: 0.213 ms
create table new_task (
  id bigserial primary key,
  type text,
  arguments jsonb not null,
  next_run timestamptz not null,
  current_interval interval,
  status task_status not null,
  policy task_policy not null default 'recurring',
  retries_left bigint not null default 0,
  priority task_priority

Time: 5.941 ms
comment on table new_task is 'Schedule of recurring tasks';
Time: 0.180 ms
comment on column new_task.arguments is 'Arguments passed to the underlying job scheduler. '
                                    'Contains two keys, ''args'' (list) and ''kwargs'' (object).';
Time: 0.187 ms
comment on column new_task.next_run is 'The next run of this new_task should be run on or after that time';
Time: 0.091 ms
comment on column new_task.current_interval is 'The interval between two runs of this new_task, '
                                           'taking into account the backoff factor';
Time: 0.083 ms
comment on column new_task.policy is 'Whether the new_task is one-shot or recurring';
Time: 0.082 ms
comment on column new_task.retries_left is 'The number of "short delay" retries of the new_task in case of '
                                       'transient failure';
Time: 0.076 ms
comment on column new_task.priority is 'Policy of the given new_task';
Time: 0.071 ms
comment on column is 'Task Identifier';
Time: 0.066 ms
comment on column new_task.type is 'References task_type table';
Time: 0.069 ms
comment on column new_task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')';
Time: 0.073 ms
create table new_task_run (
  id bigserial primary key,
  task bigint,
  backend_id text,
  scheduled timestamptz,
  started timestamptz,
  ended timestamptz,
  metadata jsonb,
  status task_run_status
Time: 3.788 ms
comment on table new_task_run is 'History of new_task runs sent to the job-running backend';
Time: 0.075 ms
comment on column new_task_run.backend_id is 'id of the new_task run in the job-running backend';
Time: 0.118 ms
comment on column new_task_run.metadata is 'Useful metadata for the given new_task run. '
                                       'For instance, the worker that took on the job, '
                                       'or the logs for the run.';
Time: 0.116 ms
comment on column is 'Task run identifier';
Time: 0.099 ms
comment on column new_task_run.task is 'References new_task table';
Time: 0.075 ms
comment on column new_task_run.scheduled is 'Scheduled run time for new_task';
Time: 0.073 ms
comment on column new_task_run.started is 'Task starting time';
Time: 0.070 ms
comment on column new_task_run.ended is 'Task ending time';
Time: 0.066 ms
insert into new_task_run
select id, task, backend_id, scheduled, started, ended, metadata, status
from task_run where task in (
  select distinct id from task
  where (
    policy='recurring' and (type = 'load-nixguix' or type like 'list-%' or type like 'index-%')
  ) or (
    policy = 'oneshot' and next_run > now() - interval '2 months'
INSERT 0 977586
Time: 57143.264 ms (00:57.143)
insert into new_task(id, type, arguments, next_run, current_interval, status, policy, retries_left, priority)
select distinct id, type, arguments, next_run, current_interval, status, policy, retries_left, priority
from task
where (policy='recurring' and (type = 'load-nixguix' or type like 'list-%' or type like 'index-%'))
    or (policy = 'oneshot' and next_run > now() - interval '2 months')
INSERT 0 642804
Time: 9329.771 ms (00:09.330)
alter table task rename to archive_task;
Time: 1.277 ms
alter table task_run rename to archive_task_run;
Time: 0.248 ms
alter table new_task rename to task;
Time: 0.561 ms
alter table new_task_run rename to task_run;
Time: 0.223 ms
alter table task
alter column type set not null;
Time: 81.739 ms
alter table task
add constraint task_type_fk
foreign key (type) references task_type (type);
Time: 148.260 ms
alter table task
add constraint task_priority_fk
foreign key (priority) references priority_ratio (id);
Time: 69.977 ms
alter table task
add constraint task_check_policy
check (policy <> 'recurring' or current_interval is not null)
not valid;
Time: 0.409 ms
alter table task
  validate constraint task_check_policy;
Time: 63.936 ms
alter table task_run
alter column task set not null,
add constraint task_id_fk
foreign key (task) references task (id);
Time: 653.348 ms
alter table task_run
alter column status set not null,
alter column status set default 'scheduled';
Time: 105.229 ms
create index on task(type);
Time: 407.991 ms
create index on task(next_run);
Time: 215.362 ms
create index on task using btree(type, md5(arguments::text));
Time: 1975.847 ms (00:01.976)
create index on task(priority);
Time: 239.980 ms
create index on task_run(task);
Time: 346.608 ms
create index on task_run(backend_id);
Time: 1982.309 ms (00:01.982)
create index on task_run(task asc, started asc);
Time: 406.017 ms
create index on task(type, next_run)
where status = 'next_run_not_scheduled'::task_status;
Time: 64.303 ms
ardumont changed the title of this paste from wip: Clean up scheduler db to Clean up scheduler db.Jan 11 2022, 6:07 PM


18:16:27 *swh-scheduler@localhost:5433=# \d+
                                                                                        List of relations
| Schema |              Name              |   Type   |  Owner   |    Size    |                                                    Description                                                    |
| public | archive_task                   | table    | postgres | 9424 MB    | Schedule of recurring tasks                                                                                       |
| public | archive_task_run               | table    | postgres | 12 GB      | History of task runs sent to the job-running backend                                                              |
| public | dbversion                      | table    | postgres | 16 kB      | Schema update tracking                                                                                            |
| public | listed_origins                 | table    | postgres | 8192 bytes | Origins known to the origin visit scheduler                                                                       |
| public | listers                        | table    | postgres | 8192 bytes | Lister instances known to the origin visit scheduler                                                              |
| public | new_task_id_seq                | sequence | postgres | 8192 bytes |                                                                                                                   |
| public | new_task_run_id_seq            | sequence | postgres | 8192 bytes |                                                                                                                   |
| public | origin_visit_stats             | table    | postgres | 8192 bytes | Aggregated information on visits for each origin in the archive                                                   |
| public | priority_ratio                 | table    | postgres | 8192 bytes | Oneshot task's reading ratio per priority                                                                         |
| public | scheduler_metrics              | table    | postgres | 8192 bytes | Cache of per-lister metrics for the scheduler, collated between the listed_origins and origin_visit_stats tables. |
| public | task                           | table    | postgres | 134 MB     | Schedule of recurring tasks                                                                                       |
| public | task_id_seq                    | sequence | postgres | 8192 bytes |                                                                                                                   |
| public | task_run                       | table    | postgres | 191 MB     | History of new_task runs sent to the job-running backend                                                          |
| public | task_run_id_seq                | sequence | postgres | 8192 bytes |                                                                                                                   |
| public | task_type                      | table    | postgres | 48 kB      | Types of schedulable tasks                                                                                        |
| public | visit_scheduler_queue_position | table    | postgres | 8192 bytes | Current queue position for the recurrent visit scheduler                                                          |
(16 rows)
18:16:45 *swh-scheduler@localhost:5433=# \d task
                                             Table "public.task"
|      Column      |           Type           | Collation | Nullable |               Default                |
| id               | bigint                   |           | not null | nextval('new_task_id_seq'::regclass) |
| type             | text                     |           | not null |                                      |
| arguments        | jsonb                    |           | not null |                                      |
| next_run         | timestamp with time zone |           | not null |                                      |
| current_interval | interval                 |           |          |                                      |
| status           | task_status              |           | not null |                                      |
| policy           | task_policy              |           | not null | 'recurring'::task_policy             |
| retries_left     | bigint                   |           | not null | 0                                    |
| priority         | task_priority            |           |          |                                      |
    "new_task_pkey" PRIMARY KEY, btree (id)
    "task_next_run_idx1" btree (next_run)
    "task_priority_idx1" btree (priority)
    "task_type_idx1" btree (type)
    "task_type_md5_idx1" btree (type, md5(arguments::text))
    "task_type_next_run_idx1" btree (type, next_run) WHERE status = 'next_run_not_scheduled'::task_status
Check constraints:
    "task_check_policy" CHECK (policy <> 'recurring'::task_policy OR current_interval IS NOT NULL)
Foreign-key constraints:
    "task_priority_fk" FOREIGN KEY (priority) REFERENCES priority_ratio(id)
    "task_type_fk" FOREIGN KEY (type) REFERENCES task_type(type)
Referenced by:
    TABLE "task_run" CONSTRAINT "task_id_fk" FOREIGN KEY (task) REFERENCES task(id)

18:17:17 *swh-scheduler@localhost:5433=# \d archive_task
                                       Table "public.archive_task"
|      Column      |           Type           | Collation | Nullable |             Default              |
| id               | bigint                   |           | not null | nextval('task_id_seq'::regclass) |
| type             | text                     |           | not null |                                  |
| arguments        | jsonb                    |           | not null |                                  |
| next_run         | timestamp with time zone |           | not null |                                  |
| current_interval | interval                 |           |          |                                  |
| status           | task_status              |           | not null |                                  |
| policy           | task_policy              |           | not null | 'recurring'::task_policy         |
| retries_left     | bigint                   |           | not null | 0                                |
| priority         | task_priority            |           |          |                                  |
    "task_pkey" PRIMARY KEY, btree (id)
    "task_next_run_idx" btree (next_run)
    "task_priority_idx" btree (priority)
    "task_type_idx" btree (type)
    "task_type_md5_idx" btree (type, md5(arguments::text))
    "task_type_next_run_idx" btree (type, next_run) WHERE status = 'next_run_not_scheduled'::task_status
Check constraints:
    "task_check" CHECK (policy <> 'recurring'::task_policy OR current_interval IS NOT NULL)
Foreign-key constraints:
    "task_priority_fkey" FOREIGN KEY (priority) REFERENCES priority_ratio(id)
    "task_type_fkey" FOREIGN KEY (type) REFERENCES task_type(type)
Referenced by:
    TABLE "archive_task_run" CONSTRAINT "task_run_task_fkey" FOREIGN KEY (task) REFERENCES archive_task(id)

18:17:22 *swh-scheduler@localhost:5433=# \d task_run
                                          Table "public.task_run"
|   Column   |           Type           | Collation | Nullable |                 Default                  |
| id         | bigint                   |           | not null | nextval('new_task_run_id_seq'::regclass) |
| task       | bigint                   |           | not null |                                          |
| backend_id | text                     |           |          |                                          |
| scheduled  | timestamp with time zone |           |          |                                          |
| started    | timestamp with time zone |           |          |                                          |
| ended      | timestamp with time zone |           |          |                                          |
| metadata   | jsonb                    |           |          |                                          |
| status     | task_run_status          |           | not null | 'scheduled'::task_run_status             |
    "new_task_run_pkey" PRIMARY KEY, btree (id)
    "task_run_backend_id_idx1" btree (backend_id)
    "task_run_task_idx1" btree (task)
    "task_run_task_started_idx" btree (task, started)
Foreign-key constraints:
    "task_id_fk" FOREIGN KEY (task) REFERENCES task(id)
    update_task_on_task_end AFTER UPDATE OF status ON task_run FOR EACH ROW WHEN (new.status <> ALL (ARRAY['scheduled'::task_run_status, 'started'::task_run_status])) EXECUTE FUNCTION swh_scheduler_update_task_on_task_end()

18:17:24 *swh-scheduler@localhost:5433=# \d archive_task_run
                                    Table "public.archive_task_run"
|   Column   |           Type           | Collation | Nullable |               Default                |
| id         | bigint                   |           | not null | nextval('task_run_id_seq'::regclass) |
| task       | bigint                   |           | not null |                                      |
| backend_id | text                     |           |          |                                      |
| scheduled  | timestamp with time zone |           |          |                                      |
| started    | timestamp with time zone |           |          |                                      |
| ended      | timestamp with time zone |           |          |                                      |
| metadata   | jsonb                    |           |          |                                      |
| status     | task_run_status          |           | not null | 'scheduled'::task_run_status         |
    "task_run_pkey" PRIMARY KEY, btree (id)
    "task_run_backend_id_idx" btree (backend_id)
    "task_run_id_asc_idx" btree (task, started)
    "task_run_task_idx" btree (task)
Foreign-key constraints:
    "task_run_task_fkey" FOREIGN KEY (task) REFERENCES archive_task(id)
    update_task_on_task_end AFTER UPDATE OF status ON archive_task_run FOR EACH ROW WHEN (new.status <> ALL (ARRAY['scheduled'::task_run_status, 'started'::task_run_status])) EXECUTE FUNCTION swh_scheduler_update_task_on_task_end()

To create the tables, you could use

create table new_task
like task
including all
excluding indexes;

This duplicates the schema, comments, constraints, etc. without having to duplicate the definitions.

You'll need to use the original sequences for the default value of the id columns for the new tables so that new entries properly get new ids. If the sequence overlaps with existing ids, some object insertions will fail with a duplicate primary key, which will be very confusing.

I also suggest doing the copy of the task table entries first, then using the contents of that new table to filter the task_run rows. That'll avoid having to do the large select/filter on the original table twice.

In P1253#8448, @olasd wrote:

To create the tables, you could use

create table new_task
like task
including all
excluding indexes;

This duplicates the schema, comments, constraints, etc. without having to duplicate the definitions.

cool stuff, thanks!

You'll need to use the original sequences for the default value of the id columns for the new tables so that new entries properly get new ids. If the sequence overlaps with existing ids, some object insertions will fail with a duplicate primary key, which will be very confusing.

right, i'll check the instructions to add to set the sequence value to the same as the old table.

I also suggest doing the copy of the task table entries first, then using the contents of that new table to filter the task_run rows. That'll avoid having

I did the order following the original constraints (which are not installed at that moment) so there is no reason to do that indeed.

Thanks for those good suggestion.
That shall definitely help the production migration to go faster!!!

ardumont changed the title of this paste from Clean up scheduler db to Clean up scheduler db script -- ran on dump extracted from swh-scheduler db (staging) and mounted back on swh-scheduler-dev db (docker).Jan 12 2022, 9:45 AM

Currently taking all those pieces of advices, merging them into the script and checking everything is fine.
Diff on its way to integrate it into the scheduler update scripts.