Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9313811
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
26 KB
Subscribers
None
View Options
diff --git a/sql/updates/23.sql b/sql/updates/23.sql
new file mode 100644
index 0000000..392f818
--- /dev/null
+++ b/sql/updates/23.sql
@@ -0,0 +1,71 @@
+insert into dbversion (version, release, description)
+ values (23, now(), 'Work In Progress');
+
+create or replace function swh_scheduler_update_task_on_task_end ()
+ returns trigger
+ language plpgsql
+as $$
+declare
+ cur_task task%rowtype;
+ cur_task_type task_type%rowtype;
+ adjustment_factor float;
+ new_interval interval;
+begin
+ select * from task where id = new.task into cur_task;
+ select * from task_type where type = cur_task.type into cur_task_type;
+
+ case
+ when new.status = 'permfailed' then
+ update task
+ set status = 'disabled'
+ where id = cur_task.id;
+ when new.status in ('eventful', 'uneventful') then
+ case
+ when cur_task.policy = 'oneshot' then
+ update task
+ set status = 'completed'
+ where id = cur_task.id;
+ when cur_task.policy = 'recurring' then
+ if new.status = 'uneventful' then
+ adjustment_factor := 1/cur_task_type.backoff_factor;
+ else
+ adjustment_factor := 1/cur_task_type.backoff_factor;
+ end if;
+ new_interval := greatest(
+ cur_task_type.min_interval,
+ least(
+ cur_task_type.max_interval,
+ adjustment_factor * cur_task.current_interval));
+ update task
+ set status = 'next_run_not_scheduled',
+ next_run = new.ended + new_interval,
+ current_interval = new_interval,
+ retries_left = coalesce(cur_task_type.num_retries, 0)
+ where id = cur_task.id;
+ end case;
+ else -- new.status in 'failed', 'lost'
+ if cur_task.retries_left > 0 then
+ update task
+ set status = 'next_run_not_scheduled',
+ next_run = new.ended + coalesce(cur_task_type.retry_delay, interval '1 hour'),
+ retries_left = cur_task.retries_left - 1
+ where id = cur_task.id;
+ else -- no retries left
+ case
+ when cur_task.policy = 'oneshot' then
+ update task
+ set status = 'disabled'
+ where id = cur_task.id;
+ when cur_task.policy = 'recurring' then
+ update task
+ set status = 'next_run_not_scheduled',
+ next_run = new.ended + cur_task.current_interval,
+ retries_left = coalesce(cur_task_type.num_retries, 0)
+ where id = cur_task.id;
+ end case;
+ end if; -- retries
+ end case;
+ return null;
+end;
+$$;
+
diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql
index 466e270..b9e84d2 100644
--- a/swh/scheduler/sql/30-schema.sql
+++ b/swh/scheduler/sql/30-schema.sql
@@ -1,186 +1,186 @@
create table dbversion
(
version int primary key,
release timestamptz not null,
description text not null
);
comment on table dbversion is 'Schema update tracking';
comment on column dbversion.version is 'SQL schema version';
comment on column dbversion.release is 'Version deployment timestamp';
comment on column dbversion.description is 'Version description';
insert into dbversion (version, release, description)
- values (22, now(), 'Work In Progress');
+ values (23, now(), 'Work In Progress');
create table task_type (
type text primary key,
description text not null,
backend_name text not null,
default_interval interval,
min_interval interval,
max_interval interval,
backoff_factor float,
max_queue_length bigint,
num_retries bigint,
retry_delay interval
);
comment on table task_type is 'Types of schedulable tasks';
comment on column task_type.type is 'Short identifier for the task type';
comment on column task_type.description is 'Human-readable task description';
comment on column task_type.backend_name is 'Name of the task in the job-running backend';
comment on column task_type.default_interval is 'Default interval for newly scheduled tasks';
comment on column task_type.min_interval is 'Minimum interval between two runs of a task';
comment on column task_type.max_interval is 'Maximum interval between two runs of a task';
comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs';
comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks';
comment on column task_type.num_retries is 'Default number of retries on transient failures';
comment on column task_type.retry_delay is 'Retry delay for the task';
create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled');
comment on type task_status is 'Status of a given task';
create type task_policy as enum ('recurring', 'oneshot');
comment on type task_policy is 'Recurrence policy of the given task';
create type task_priority as enum('high', 'normal', 'low');
comment on type task_priority is 'Priority of the given task';
create table priority_ratio(
id task_priority primary key,
ratio float not null
);
comment on table priority_ratio is 'Oneshot task''s reading ratio per priority';
comment on column priority_ratio.id is 'Task priority id';
comment on column priority_ratio.ratio is 'Percentage of tasks to read per priority';
insert into priority_ratio (id, ratio) values ('high', 0.5);
insert into priority_ratio (id, ratio) values ('normal', 0.3);
insert into priority_ratio (id, ratio) values ('low', 0.2);
create table task (
id bigserial primary key,
type text not null references task_type(type),
arguments jsonb not null,
next_run timestamptz not null,
current_interval interval,
status task_status not null,
policy task_policy not null default 'recurring',
retries_left bigint not null default 0,
priority task_priority references priority_ratio(id),
check (policy <> 'recurring' or current_interval is not null)
);
comment on table task is 'Schedule of recurring tasks';
comment on column task.arguments is 'Arguments passed to the underlying job scheduler. '
'Contains two keys, ''args'' (list) and ''kwargs'' (object).';
comment on column task.next_run is 'The next run of this task should be run on or after that time';
comment on column task.current_interval is 'The interval between two runs of this task, '
'taking into account the backoff factor';
comment on column task.policy is 'Whether the task is one-shot or recurring';
comment on column task.retries_left is 'The number of "short delay" retries of the task in case of '
'transient failure';
comment on column task.priority is 'Policy of the given task';
comment on column task.id is 'Task Identifier';
comment on column task.type is 'References task_type table';
comment on column task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')';
create type task_run_status as enum ('scheduled', 'started', 'eventful', 'uneventful', 'failed', 'permfailed', 'lost');
comment on type task_run_status is 'Status of a given task run';
create table task_run (
id bigserial primary key,
task bigint not null references task(id),
backend_id text,
scheduled timestamptz,
started timestamptz,
ended timestamptz,
metadata jsonb,
status task_run_status not null default 'scheduled'
);
comment on table task_run is 'History of task runs sent to the job-running backend';
comment on column task_run.backend_id is 'id of the task run in the job-running backend';
comment on column task_run.metadata is 'Useful metadata for the given task run. '
'For instance, the worker that took on the job, '
'or the logs for the run.';
comment on column task_run.id is 'Task run identifier';
comment on column task_run.task is 'References task table';
comment on column task_run.scheduled is 'Scheduled run time for task';
comment on column task_run.started is 'Task starting time';
comment on column task_run.ended is 'Task ending time';
create table if not exists listers (
id uuid primary key default uuid_generate_v4(),
name text not null,
instance_name text not null,
created timestamptz not null default now(), -- auto_now_add in the model
current_state jsonb not null,
updated timestamptz not null
);
comment on table listers is 'Lister instances known to the origin visit scheduler';
comment on column listers.name is 'Name of the lister (e.g. github, gitlab, debian, ...)';
comment on column listers.instance_name is 'Name of the current instance of this lister (e.g. framagit, bitbucket, ...)';
comment on column listers.created is 'Timestamp at which the lister was originally created';
comment on column listers.current_state is 'Known current state of this lister';
comment on column listers.updated is 'Timestamp at which the lister state was last updated';
create table if not exists listed_origins (
-- Basic information
lister_id uuid not null references listers(id),
url text not null,
visit_type text not null,
extra_loader_arguments jsonb not null,
-- Whether this origin still exists or not
enabled boolean not null,
-- time-based information
first_seen timestamptz not null default now(),
last_seen timestamptz not null,
-- potentially provided by the lister
last_update timestamptz,
-- visit scheduling information
last_scheduled timestamptz,
primary key (lister_id, url, visit_type)
);
comment on table listed_origins is 'Origins known to the origin visit scheduler';
comment on column listed_origins.lister_id is 'Lister instance which owns this origin';
comment on column listed_origins.url is 'URL of the origin listed';
comment on column listed_origins.visit_type is 'Type of the visit which should be scheduled for the given url';
comment on column listed_origins.extra_loader_arguments is 'Extra arguments that should be passed to the loader for this origin';
comment on column listed_origins.enabled is 'Whether this origin has been seen during the last listing, and visits should be scheduled.';
comment on column listed_origins.first_seen is 'Time at which the origin was first seen by a lister';
comment on column listed_origins.last_seen is 'Time at which the origin was last seen by the lister';
comment on column listed_origins.last_update is 'Time of the last update to the origin recorded by the remote';
comment on column listed_origins.last_scheduled is 'Time when this origin was scheduled to be visited last';
create table origin_visit_stats (
url text not null,
visit_type text not null,
last_eventful timestamptz,
last_uneventful timestamptz,
last_failed timestamptz,
last_notfound timestamptz,
last_snapshot bytea,
primary key (url, visit_type)
);
comment on column origin_visit_stats.url is 'Origin URL';
comment on column origin_visit_stats.visit_type is 'Type of the visit for the given url';
comment on column origin_visit_stats.last_eventful is 'Date of the last eventful event';
comment on column origin_visit_stats.last_uneventful is 'Date of the last uneventful event';
comment on column origin_visit_stats.last_failed is 'Date of the last failed event';
comment on column origin_visit_stats.last_notfound is 'Date of the last notfound event';
comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot';
diff --git a/swh/scheduler/sql/40-func.sql b/swh/scheduler/sql/40-func.sql
index 684aebc..11ef2cb 100644
--- a/swh/scheduler/sql/40-func.sql
+++ b/swh/scheduler/sql/40-func.sql
@@ -1,408 +1,408 @@
create or replace function swh_scheduler_mktemp_task ()
returns void
language sql
as $$
create temporary table tmp_task (
like task excluding indexes
) on commit drop;
alter table tmp_task
alter column retries_left drop not null,
drop column id;
$$;
comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation';
create or replace function swh_scheduler_create_tasks_from_temp ()
returns setof task
language plpgsql
as $$
begin
-- update the default values in one go
-- this is separated from the insert/select to avoid too much
-- juggling
update tmp_task t
set current_interval = tt.default_interval,
retries_left = coalesce(retries_left, tt.num_retries, 0)
from task_type tt
where tt.type=t.type;
insert into task (type, arguments, next_run, status, current_interval, policy,
retries_left, priority)
select type, arguments, next_run, status, current_interval, policy,
retries_left, priority
from tmp_task t
where not exists(select 1
from task
where type = t.type and
md5(arguments::text) = md5(t.arguments::text) and
arguments = t.arguments and
policy = t.policy and
priority is not distinct from t.priority and
status = t.status);
return query
select distinct t.*
from tmp_task tt inner join task t on (
tt.type = t.type and
md5(tt.arguments::text) = md5(t.arguments::text) and
tt.arguments = t.arguments and
tt.policy = t.policy and
tt.priority is not distinct from t.priority and
tt.status = t.status
);
end;
$$;
comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table';
create or replace function swh_scheduler_peek_no_priority_tasks (task_type text, ts timestamptz default now(),
num_tasks bigint default NULL)
returns setof task
language sql
stable
as $$
select * from task
where next_run <= ts
and type = task_type
and status = 'next_run_not_scheduled'
and priority is null
order by next_run
limit num_tasks
for update skip locked;
$$;
comment on function swh_scheduler_peek_no_priority_tasks (text, timestamptz, bigint)
is 'Retrieve tasks without priority';
create or replace function swh_scheduler_nb_priority_tasks(num_tasks_priority bigint, task_priority task_priority)
returns numeric
language sql stable
as $$
select ceil(num_tasks_priority * (select ratio from priority_ratio where id = task_priority)) :: numeric
$$;
comment on function swh_scheduler_nb_priority_tasks (bigint, task_priority)
is 'Given a priority task and a total number, compute the number of tasks to read';
create or replace function swh_scheduler_peek_tasks_with_priority (task_type text, ts timestamptz default now(),
num_tasks_priority bigint default NULL,
task_priority task_priority default 'normal')
returns setof task
language sql
stable
as $$
select *
from task t
where t.next_run <= ts
and t.type = task_type
and t.status = 'next_run_not_scheduled'
and t.priority = task_priority
order by t.next_run
limit num_tasks_priority
for update skip locked;
$$;
comment on function swh_scheduler_peek_tasks_with_priority(text, timestamptz, bigint, task_priority)
is 'Retrieve tasks with a given priority';
create or replace function swh_scheduler_peek_priority_tasks (task_type text, ts timestamptz default now(),
num_tasks_priority bigint default NULL)
returns setof task
language plpgsql
as $$
declare
r record;
count_row bigint;
nb_diff bigint;
nb_high bigint;
nb_normal bigint;
nb_low bigint;
begin
-- expected values to fetch
select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'high') into nb_high;
select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'normal') into nb_normal;
select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'low') into nb_low;
nb_diff := 0;
count_row := 0;
for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_high, 'high')
loop
count_row := count_row + 1;
return next r;
end loop;
if count_row < nb_high then
nb_normal := nb_normal + nb_high - count_row;
end if;
count_row := 0;
for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_normal, 'normal')
loop
count_row := count_row + 1;
return next r;
end loop;
if count_row < nb_normal then
nb_low := nb_low + nb_normal - count_row;
end if;
return query select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_low, 'low');
end
$$;
comment on function swh_scheduler_peek_priority_tasks(text, timestamptz, bigint)
is 'Retrieve priority tasks';
create or replace function swh_scheduler_peek_ready_tasks (task_type text, ts timestamptz default now(),
num_tasks bigint default NULL, num_tasks_priority bigint default NULL)
returns setof task
language plpgsql
as $$
declare
r record;
count_row bigint;
nb_diff bigint;
nb_tasks bigint;
begin
count_row := 0;
for r in select * from swh_scheduler_peek_priority_tasks(task_type, ts, num_tasks_priority)
order by priority, next_run
loop
count_row := count_row + 1;
return next r;
end loop;
if count_row < num_tasks_priority then
nb_tasks := num_tasks + num_tasks_priority - count_row;
else
nb_tasks := num_tasks;
end if;
for r in select * from swh_scheduler_peek_no_priority_tasks(task_type, ts, nb_tasks)
order by priority, next_run
loop
return next r;
end loop;
return;
end
$$;
comment on function swh_scheduler_peek_ready_tasks(text, timestamptz, bigint, bigint)
is 'Retrieve tasks with/without priority in order';
create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(),
num_tasks bigint default NULL,
num_tasks_priority bigint default NULL)
returns setof task
language sql
as $$
update task
set status='next_run_scheduled'
from (
select id from swh_scheduler_peek_ready_tasks(task_type, ts, num_tasks, num_tasks_priority)
) next_tasks
where task.id = next_tasks.id
returning task.*;
$$;
comment on function swh_scheduler_grab_ready_tasks (text, timestamptz, bigint, bigint)
is 'Grab tasks ready for scheduling and change their status';
create or replace function swh_scheduler_schedule_task_run (task_id bigint,
backend_id text,
metadata jsonb default '{}'::jsonb,
ts timestamptz default now())
returns task_run
language sql
as $$
insert into task_run (task, backend_id, metadata, scheduled, status)
values (task_id, backend_id, metadata, ts, 'scheduled')
returning *;
$$;
create or replace function swh_scheduler_mktemp_task_run ()
returns void
language sql
as $$
create temporary table tmp_task_run (
like task_run excluding indexes
) on commit drop;
alter table tmp_task_run
drop column id,
drop column status;
$$;
comment on function swh_scheduler_mktemp_task_run () is 'Create a temporary table for bulk task run scheduling';
create or replace function swh_scheduler_schedule_task_run_from_temp ()
returns void
language plpgsql
as $$
begin
insert into task_run (task, backend_id, metadata, scheduled, status)
select task, backend_id, metadata, scheduled, 'scheduled'
from tmp_task_run;
return;
end;
$$;
create or replace function swh_scheduler_start_task_run (backend_id text,
metadata jsonb default '{}'::jsonb,
ts timestamptz default now())
returns task_run
language sql
as $$
update task_run
set started = ts,
status = 'started',
metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_start_task_run.metadata
where task_run.backend_id = swh_scheduler_start_task_run.backend_id
returning *;
$$;
create or replace function swh_scheduler_end_task_run (backend_id text,
status task_run_status,
metadata jsonb default '{}'::jsonb,
ts timestamptz default now())
returns task_run
language sql
as $$
update task_run
set ended = ts,
status = swh_scheduler_end_task_run.status,
metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_end_task_run.metadata
where task_run.backend_id = swh_scheduler_end_task_run.backend_id
returning *;
$$;
create type task_record as (
task_id bigint,
task_policy task_policy,
task_status task_status,
task_run_id bigint,
arguments jsonb,
type text,
backend_id text,
metadata jsonb,
scheduled timestamptz,
started timestamptz,
ended timestamptz,
task_run_status task_run_status
);
create or replace function swh_scheduler_task_to_archive(
ts_after timestamptz, ts_before timestamptz, last_id bigint default -1,
lim bigint default 10)
returns setof task_record
language sql stable
as $$
select t.id as task_id, t.policy as task_policy,
t.status as task_status, tr.id as task_run_id,
t.arguments, t.type, tr.backend_id, tr.metadata,
tr.scheduled, tr.started, tr.ended, tr.status as task_run_status
from task_run tr inner join task t on tr.task=t.id
where ((t.policy = 'oneshot' and t.status in ('completed', 'disabled')) or
(t.policy = 'recurring' and t.status = 'disabled')) and
((ts_after <= tr.started and tr.started < ts_before) or
(tr.started is null and (ts_after <= tr.scheduled and tr.scheduled < ts_before))) and
t.id >= last_id
order by tr.task, tr.started
limit lim;
$$;
comment on function swh_scheduler_task_to_archive (timestamptz, timestamptz, bigint, bigint) is 'Read archivable tasks function';
create or replace function swh_scheduler_delete_archived_tasks(
task_ids bigint[], task_run_ids bigint[])
returns void
language sql
as $$
-- clean up task_run_ids
delete from task_run where id in (select * from unnest(task_run_ids));
-- clean up only tasks whose associated task_run are all cleaned up.
-- Remaining tasks will stay there and will be cleaned up when
-- remaining data have been indexed
delete from task
where id in (select t.id
from task t left outer join task_run tr on t.id=tr.task
where t.id in (select * from unnest(task_ids))
and tr.task is null);
$$;
comment on function swh_scheduler_delete_archived_tasks(bigint[], bigint[]) is 'Clean up archived tasks function';
create or replace function swh_scheduler_update_task_on_task_end ()
returns trigger
language plpgsql
as $$
declare
cur_task task%rowtype;
cur_task_type task_type%rowtype;
adjustment_factor float;
new_interval interval;
begin
select * from task where id = new.task into cur_task;
select * from task_type where type = cur_task.type into cur_task_type;
case
when new.status = 'permfailed' then
update task
set status = 'disabled'
where id = cur_task.id;
when new.status in ('eventful', 'uneventful') then
case
when cur_task.policy = 'oneshot' then
update task
set status = 'completed'
where id = cur_task.id;
when cur_task.policy = 'recurring' then
if new.status = 'uneventful' then
adjustment_factor := 1/cur_task_type.backoff_factor;
else
adjustment_factor := 1/cur_task_type.backoff_factor;
end if;
new_interval := greatest(
cur_task_type.min_interval,
least(
cur_task_type.max_interval,
adjustment_factor * cur_task.current_interval));
update task
set status = 'next_run_not_scheduled',
- next_run = now() + new_interval,
+ next_run = new.ended + new_interval,
current_interval = new_interval,
retries_left = coalesce(cur_task_type.num_retries, 0)
where id = cur_task.id;
end case;
else -- new.status in 'failed', 'lost'
if cur_task.retries_left > 0 then
update task
set status = 'next_run_not_scheduled',
- next_run = now() + coalesce(cur_task_type.retry_delay, interval '1 hour'),
+ next_run = new.ended + coalesce(cur_task_type.retry_delay, interval '1 hour'),
retries_left = cur_task.retries_left - 1
where id = cur_task.id;
else -- no retries left
case
when cur_task.policy = 'oneshot' then
update task
set status = 'disabled'
where id = cur_task.id;
when cur_task.policy = 'recurring' then
update task
set status = 'next_run_not_scheduled',
- next_run = now() + cur_task.current_interval,
+ next_run = new.ended + cur_task.current_interval,
retries_left = coalesce(cur_task_type.num_retries, 0)
where id = cur_task.id;
end case;
end if; -- retries
end case;
return null;
end;
$$;
create trigger update_task_on_task_end
after update of status on task_run
for each row
when (new.status NOT IN ('scheduled', 'started'))
execute procedure swh_scheduler_update_task_on_task_end ();
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Jul 3, 11:57 AM (3 d, 2 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3277137
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment