diff --git a/sql/updates/15.sql b/sql/updates/15.sql index 75dd44a..5dd088d 100644 --- a/sql/updates/15.sql +++ b/sql/updates/15.sql @@ -1,23 +1,24 @@ insert into dbversion (version, release, description) values (15, now(), 'Work In Progress'); create or replace function swh_scheduler_task_to_archive( ts_after timestamptz, ts_before timestamptz, last_id bigint default -1, lim bigint default 10) returns setof task_record language sql stable as $$ select t.id as task_id, t.policy as task_policy, t.status as task_status, tr.id as task_run_id, t.arguments, t.type, tr.backend_id, tr.metadata, tr.scheduled, tr.started, tr.ended, tr.status as task_run_status from task_run tr inner join task t on tr.task=t.id where ((t.policy = 'oneshot' and t.status in ('completed', 'disabled')) or (t.policy = 'recurring' and t.status = 'disabled')) and - ((ts_after <= tr.started and tr.started < ts_before) or tr.started is null) and + ((ts_after <= tr.started and tr.started < ts_before) or + (tr.started is null and (ts_after <= tr.scheduled and tr.scheduled < ts_before))) and t.id >= last_id order by tr.task, tr.started limit lim; $$; comment on function swh_scheduler_task_to_archive (timestamptz, timestamptz, bigint, bigint) is 'Read archivable tasks function'; diff --git a/swh/scheduler/sql/40-swh-func.sql b/swh/scheduler/sql/40-swh-func.sql index 770e667..684aebc 100644 --- a/swh/scheduler/sql/40-swh-func.sql +++ b/swh/scheduler/sql/40-swh-func.sql @@ -1,407 +1,408 @@ create or replace function swh_scheduler_mktemp_task () returns void language sql as $$ create temporary table tmp_task ( like task excluding indexes ) on commit drop; alter table tmp_task alter column retries_left drop not null, drop column id; $$; comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation'; create or replace function swh_scheduler_create_tasks_from_temp () returns setof task language plpgsql as $$ begin -- update the default values in one go -- this is separated from the insert/select to avoid too much -- juggling update tmp_task t set current_interval = tt.default_interval, retries_left = coalesce(retries_left, tt.num_retries, 0) from task_type tt where tt.type=t.type; insert into task (type, arguments, next_run, status, current_interval, policy, retries_left, priority) select type, arguments, next_run, status, current_interval, policy, retries_left, priority from tmp_task t where not exists(select 1 from task where type = t.type and md5(arguments::text) = md5(t.arguments::text) and arguments = t.arguments and policy = t.policy and priority is not distinct from t.priority and status = t.status); return query select distinct t.* from tmp_task tt inner join task t on ( tt.type = t.type and md5(tt.arguments::text) = md5(t.arguments::text) and tt.arguments = t.arguments and tt.policy = t.policy and tt.priority is not distinct from t.priority and tt.status = t.status ); end; $$; comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; create or replace function swh_scheduler_peek_no_priority_tasks (task_type text, ts timestamptz default now(), num_tasks bigint default NULL) returns setof task language sql stable as $$ select * from task where next_run <= ts and type = task_type and status = 'next_run_not_scheduled' and priority is null order by next_run limit num_tasks for update skip locked; $$; comment on function swh_scheduler_peek_no_priority_tasks (text, timestamptz, bigint) is 'Retrieve tasks without priority'; create or replace function swh_scheduler_nb_priority_tasks(num_tasks_priority bigint, task_priority task_priority) returns numeric language sql stable as $$ select ceil(num_tasks_priority * (select ratio from priority_ratio where id = task_priority)) :: numeric $$; comment on function swh_scheduler_nb_priority_tasks (bigint, task_priority) is 'Given a priority task and a total number, compute the number of tasks to read'; create or replace function swh_scheduler_peek_tasks_with_priority (task_type text, ts timestamptz default now(), num_tasks_priority bigint default NULL, task_priority task_priority default 'normal') returns setof task language sql stable as $$ select * from task t where t.next_run <= ts and t.type = task_type and t.status = 'next_run_not_scheduled' and t.priority = task_priority order by t.next_run limit num_tasks_priority for update skip locked; $$; comment on function swh_scheduler_peek_tasks_with_priority(text, timestamptz, bigint, task_priority) is 'Retrieve tasks with a given priority'; create or replace function swh_scheduler_peek_priority_tasks (task_type text, ts timestamptz default now(), num_tasks_priority bigint default NULL) returns setof task language plpgsql as $$ declare r record; count_row bigint; nb_diff bigint; nb_high bigint; nb_normal bigint; nb_low bigint; begin -- expected values to fetch select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'high') into nb_high; select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'normal') into nb_normal; select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'low') into nb_low; nb_diff := 0; count_row := 0; for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_high, 'high') loop count_row := count_row + 1; return next r; end loop; if count_row < nb_high then nb_normal := nb_normal + nb_high - count_row; end if; count_row := 0; for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_normal, 'normal') loop count_row := count_row + 1; return next r; end loop; if count_row < nb_normal then nb_low := nb_low + nb_normal - count_row; end if; return query select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_low, 'low'); end $$; comment on function swh_scheduler_peek_priority_tasks(text, timestamptz, bigint) is 'Retrieve priority tasks'; create or replace function swh_scheduler_peek_ready_tasks (task_type text, ts timestamptz default now(), num_tasks bigint default NULL, num_tasks_priority bigint default NULL) returns setof task language plpgsql as $$ declare r record; count_row bigint; nb_diff bigint; nb_tasks bigint; begin count_row := 0; for r in select * from swh_scheduler_peek_priority_tasks(task_type, ts, num_tasks_priority) order by priority, next_run loop count_row := count_row + 1; return next r; end loop; if count_row < num_tasks_priority then nb_tasks := num_tasks + num_tasks_priority - count_row; else nb_tasks := num_tasks; end if; for r in select * from swh_scheduler_peek_no_priority_tasks(task_type, ts, nb_tasks) order by priority, next_run loop return next r; end loop; return; end $$; comment on function swh_scheduler_peek_ready_tasks(text, timestamptz, bigint, bigint) is 'Retrieve tasks with/without priority in order'; create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(), num_tasks bigint default NULL, num_tasks_priority bigint default NULL) returns setof task language sql as $$ update task set status='next_run_scheduled' from ( select id from swh_scheduler_peek_ready_tasks(task_type, ts, num_tasks, num_tasks_priority) ) next_tasks where task.id = next_tasks.id returning task.*; $$; comment on function swh_scheduler_grab_ready_tasks (text, timestamptz, bigint, bigint) is 'Grab tasks ready for scheduling and change their status'; create or replace function swh_scheduler_schedule_task_run (task_id bigint, backend_id text, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ insert into task_run (task, backend_id, metadata, scheduled, status) values (task_id, backend_id, metadata, ts, 'scheduled') returning *; $$; create or replace function swh_scheduler_mktemp_task_run () returns void language sql as $$ create temporary table tmp_task_run ( like task_run excluding indexes ) on commit drop; alter table tmp_task_run drop column id, drop column status; $$; comment on function swh_scheduler_mktemp_task_run () is 'Create a temporary table for bulk task run scheduling'; create or replace function swh_scheduler_schedule_task_run_from_temp () returns void language plpgsql as $$ begin insert into task_run (task, backend_id, metadata, scheduled, status) select task, backend_id, metadata, scheduled, 'scheduled' from tmp_task_run; return; end; $$; create or replace function swh_scheduler_start_task_run (backend_id text, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ update task_run set started = ts, status = 'started', metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_start_task_run.metadata where task_run.backend_id = swh_scheduler_start_task_run.backend_id returning *; $$; create or replace function swh_scheduler_end_task_run (backend_id text, status task_run_status, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ update task_run set ended = ts, status = swh_scheduler_end_task_run.status, metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_end_task_run.metadata where task_run.backend_id = swh_scheduler_end_task_run.backend_id returning *; $$; create type task_record as ( task_id bigint, task_policy task_policy, task_status task_status, task_run_id bigint, arguments jsonb, type text, backend_id text, metadata jsonb, scheduled timestamptz, started timestamptz, ended timestamptz, task_run_status task_run_status ); create or replace function swh_scheduler_task_to_archive( ts_after timestamptz, ts_before timestamptz, last_id bigint default -1, lim bigint default 10) returns setof task_record language sql stable as $$ select t.id as task_id, t.policy as task_policy, t.status as task_status, tr.id as task_run_id, t.arguments, t.type, tr.backend_id, tr.metadata, tr.scheduled, tr.started, tr.ended, tr.status as task_run_status from task_run tr inner join task t on tr.task=t.id where ((t.policy = 'oneshot' and t.status in ('completed', 'disabled')) or (t.policy = 'recurring' and t.status = 'disabled')) and - ((ts_after <= tr.started and tr.started < ts_before) or tr.started is null) and + ((ts_after <= tr.started and tr.started < ts_before) or + (tr.started is null and (ts_after <= tr.scheduled and tr.scheduled < ts_before))) and t.id >= last_id order by tr.task, tr.started limit lim; $$; comment on function swh_scheduler_task_to_archive (timestamptz, timestamptz, bigint, bigint) is 'Read archivable tasks function'; create or replace function swh_scheduler_delete_archived_tasks( task_ids bigint[], task_run_ids bigint[]) returns void language sql as $$ -- clean up task_run_ids delete from task_run where id in (select * from unnest(task_run_ids)); -- clean up only tasks whose associated task_run are all cleaned up. -- Remaining tasks will stay there and will be cleaned up when -- remaining data have been indexed delete from task where id in (select t.id from task t left outer join task_run tr on t.id=tr.task where t.id in (select * from unnest(task_ids)) and tr.task is null); $$; comment on function swh_scheduler_delete_archived_tasks(bigint[], bigint[]) is 'Clean up archived tasks function'; create or replace function swh_scheduler_update_task_on_task_end () returns trigger language plpgsql as $$ declare cur_task task%rowtype; cur_task_type task_type%rowtype; adjustment_factor float; new_interval interval; begin select * from task where id = new.task into cur_task; select * from task_type where type = cur_task.type into cur_task_type; case when new.status = 'permfailed' then update task set status = 'disabled' where id = cur_task.id; when new.status in ('eventful', 'uneventful') then case when cur_task.policy = 'oneshot' then update task set status = 'completed' where id = cur_task.id; when cur_task.policy = 'recurring' then if new.status = 'uneventful' then adjustment_factor := 1/cur_task_type.backoff_factor; else adjustment_factor := 1/cur_task_type.backoff_factor; end if; new_interval := greatest( cur_task_type.min_interval, least( cur_task_type.max_interval, adjustment_factor * cur_task.current_interval)); update task set status = 'next_run_not_scheduled', next_run = now() + new_interval, current_interval = new_interval, retries_left = coalesce(cur_task_type.num_retries, 0) where id = cur_task.id; end case; else -- new.status in 'failed', 'lost' if cur_task.retries_left > 0 then update task set status = 'next_run_not_scheduled', next_run = now() + coalesce(cur_task_type.retry_delay, interval '1 hour'), retries_left = cur_task.retries_left - 1 where id = cur_task.id; else -- no retries left case when cur_task.policy = 'oneshot' then update task set status = 'disabled' where id = cur_task.id; when cur_task.policy = 'recurring' then update task set status = 'next_run_not_scheduled', next_run = now() + cur_task.current_interval, retries_left = coalesce(cur_task_type.num_retries, 0) where id = cur_task.id; end case; end if; -- retries end case; return null; end; $$; create trigger update_task_on_task_end after update of status on task_run for each row when (new.status NOT IN ('scheduled', 'started')) execute procedure swh_scheduler_update_task_on_task_end (); diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index 771eb70..484009b 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,639 +1,639 @@ -# Copyright (C) 2017-2018 The Software Heritage developers +# Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import random import uuid from collections import defaultdict import psycopg2 from arrow import utcnow import pytest TASK_TYPES = { 'git': { 'type': 'update-git', 'description': 'Update a git repository', 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', 'default_interval': datetime.timedelta(days=64), 'min_interval': datetime.timedelta(hours=12), 'max_interval': datetime.timedelta(days=64), 'backoff_factor': 2, 'max_queue_length': None, 'num_retries': 7, 'retry_delay': datetime.timedelta(hours=2), }, 'hg': { 'type': 'update-hg', 'description': 'Update a mercurial repository', 'backend_name': 'swh.loader.mercurial.tasks.UpdateHgRepository', 'default_interval': datetime.timedelta(days=64), 'min_interval': datetime.timedelta(hours=12), 'max_interval': datetime.timedelta(days=64), 'backoff_factor': 2, 'max_queue_length': None, 'num_retries': 7, 'retry_delay': datetime.timedelta(hours=2), }, } TEMPLATES = { 'git': { 'type': 'update-git', 'arguments': { 'args': [], 'kwargs': {}, }, 'next_run': None, }, 'hg': { 'type': 'update-hg', 'arguments': { 'args': [], 'kwargs': {}, }, 'next_run': None, 'policy': 'oneshot', } } def subdict(d, keys=None, excl=()): if keys is None: keys = [k for k in d.keys()] return {k: d[k] for k in keys if k not in excl} @pytest.mark.db class TestScheduler: def test_get_priority_ratios(self, swh_scheduler): assert swh_scheduler.get_priority_ratios() == { 'high': 0.5, 'normal': 0.3, 'low': 0.2, } def test_add_task_type(self, swh_scheduler): tt = TASK_TYPES['git'] swh_scheduler.create_task_type(tt) assert tt == swh_scheduler.get_task_type(tt['type']) with pytest.raises(psycopg2.IntegrityError, match=r'\(type\)=\(%s\)' % tt['type']): swh_scheduler.create_task_type(tt) tt2 = TASK_TYPES['hg'] swh_scheduler.create_task_type(tt2) assert tt == swh_scheduler.get_task_type(tt['type']) assert tt2 == swh_scheduler.get_task_type(tt2['type']) def test_get_task_types(self, swh_scheduler): tt, tt2 = TASK_TYPES['git'], TASK_TYPES['hg'] swh_scheduler.create_task_type(tt) swh_scheduler.create_task_type(tt2) actual_task_types = swh_scheduler.get_task_types() assert tt in actual_task_types assert tt2 in actual_task_types def test_create_tasks(self, swh_scheduler): priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) num_tasks_priority = 100 tasks_1 = self._tasks_from_template( TEMPLATES['git'], utcnow(), 100) tasks_2 = self._tasks_from_template( TEMPLATES['hg'], utcnow(), 100, num_tasks_priority, priorities=priority_ratio) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2) set_ret1 = set([t['id'] for t in ret1]) # creating the same set result in the same ids ret = swh_scheduler.create_tasks(tasks) set_ret = set([t['id'] for t in ret]) # Idempotence results assert set_ret == set_ret1 assert len(ret) == len(ret1) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) task_type = TASK_TYPES[orig_task['type'].split('-')[-1]] assert task['id'] not in ids assert task['status'] == 'next_run_not_scheduled' assert task['current_interval'] == task_type['default_interval'] assert task['policy'] == orig_task.get('policy', 'recurring') priority = task.get('priority') if priority: actual_priorities[priority] += 1 assert task['retries_left'] == (task_type['num_retries'] or 0) ids.add(task['id']) del task['id'] del task['status'] del task['current_interval'] del task['retries_left'] if 'policy' not in orig_task: del task['policy'] if 'priority' not in orig_task: del task['priority'] assert task == orig_task assert dict(actual_priorities) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() } def test_peek_ready_tasks_no_priority(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES['git']['type'] tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) ready_tasks = swh_scheduler.peek_ready_tasks(task_type) assert len(ready_tasks) == len(tasks) for i in range(len(ready_tasks) - 1): assert ready_tasks[i]['next_run'] <= ready_tasks[i+1]['next_run'] # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks)//2) ready_tasks_limited = swh_scheduler.peek_ready_tasks( task_type, num_tasks=limit) assert len(ready_tasks_limited) == limit assert ready_tasks_limited == ready_tasks[:limit] # Limit by timestamp max_ts = tasks[limit-1]['next_run'] ready_tasks_timestamped = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts) for ready_task in ready_tasks_timestamped: assert ready_task['next_run'] <= max_ts # Make sure we get proper behavior for the first ready tasks assert ready_tasks[:len(ready_tasks_timestamped)] \ == ready_tasks_timestamped # Limit by both ready_tasks_both = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit//3) assert len(ready_tasks_both) <= limit//3 for ready_task in ready_tasks_both: assert ready_task['next_run'] <= max_ts assert ready_task in ready_tasks[:limit//3] def _priority_ratio(self, swh_scheduler): return swh_scheduler.get_priority_ratios() def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler): priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES['git']['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( TEMPLATES['git'], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) # take all available tasks ready_tasks = swh_scheduler.peek_ready_tasks( task_type) assert len(ready_tasks) == len(tasks) assert num_tasks_priority + num_tasks_no_priority \ == len(ready_tasks) count_tasks_per_priority = defaultdict(int) for task in ready_tasks: priority = task.get('priority') if priority: count_tasks_per_priority[priority] += 1 assert dict(count_tasks_per_priority) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() } # Only get some ready tasks num_tasks = random.randrange(5, 5 + num_tasks_no_priority//2) num_tasks_priority = random.randrange(5, num_tasks_priority//2) ready_tasks_limited = swh_scheduler.peek_ready_tasks( task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) count_tasks_per_priority = defaultdict(int) for task in ready_tasks_limited: priority = task.get('priority') count_tasks_per_priority[priority] += 1 import math for priority, ratio in priority_ratio.items(): expected_count = math.ceil(ratio * num_tasks_priority) actual_prio = count_tasks_per_priority[priority] assert (actual_prio == expected_count or actual_prio == expected_count + 1) assert count_tasks_per_priority[None] == num_tasks def test_grab_ready_tasks(self, swh_scheduler): priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES['git']['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( TEMPLATES['git'], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) first_ready_tasks = swh_scheduler.peek_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) grabbed_tasks = swh_scheduler.grab_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): assert peeked['status'] == 'next_run_not_scheduled' del peeked['status'] assert grabbed['status'] == 'next_run_scheduled' del grabbed['status'] assert peeked == grabbed assert peeked['priority'] == grabbed['priority'] def test_get_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) tasks = swh_scheduler.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) cur_tasks = sorted(tasks[:length], key=lambda x: x['id']) tasks[:length] = [] ret = swh_scheduler.get_tasks(task['id'] for task in cur_tasks) # result is not guaranteed to be sorted ret.sort(key=lambda x: x['id']) assert ret == cur_tasks def test_search_tasks(self, swh_scheduler): def make_real_dicts(l): """RealDictRow is not a real dict.""" return [dict(d.items()) for d in l] self._create_task_types(swh_scheduler) t = utcnow() tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) tasks = swh_scheduler.create_tasks(tasks) assert make_real_dicts(swh_scheduler.search_tasks()) \ == make_real_dicts(tasks) def test_filter_task_to_archive(self, swh_scheduler): """Filtering only list disabled recurring or completed oneshot tasks """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template(TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] swh_scheduler.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: t = swh_scheduler.end_task_run( task['backend_id'], status='eventful') _tasks.append(t) # Randomly update task's status per policy status_per_policy = {'recurring': 0, 'oneshot': 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] 'recurring': [(1, 'disabled'), (0, 'completed'), (0, 'next_run_not_scheduled')], 'oneshot': [(0, 'next_run_not_scheduled'), (1, 'disabled'), (1, 'completed')] } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task['policy'] _task_ids[policy].append(task['id']) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task['id']) swh_scheduler.disable_tasks(tasks_to_update['recurring']) # hack: change the status to something else than completed/disabled swh_scheduler.set_status_tasks( _task_ids['oneshot'], status='next_run_not_scheduled') # complete the tasks to update swh_scheduler.set_status_tasks( tasks_to_update['oneshot'], status='completed') total_tasks_filtered = (status_per_policy['recurring'] + status_per_policy['oneshot']) # no pagination scenario # retrieve tasks to archive after = _time.shift(days=-1) after_ts = after.format('YYYY-MM-DD') before = utcnow().shift(days=1) before_ts = before.format('YYYY-MM-DD') tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=total_tasks) tasks_to_archive = tasks_result['tasks'] assert len(tasks_to_archive) == total_tasks_filtered assert tasks_result.get('next_task_id') is None actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} for task in tasks_to_archive: - if task['started'] is not None: - assert after <= task['started'] - assert task['started'] <= before + started = task['started'] + date = started if started is not None else task['scheduled'] + assert after <= date and date <= before if task['task_policy'] == 'oneshot': assert task['task_status'] in ['completed', 'disabled'] if task['task_policy'] == 'recurring': assert task['task_status'] in ['disabled'] actual_filtered_per_status[task['task_policy']] += 1 assert actual_filtered_per_status == status_per_policy # pagination scenario limit = 3 tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=limit) tasks_to_archive2 = tasks_result['tasks'] assert len(tasks_to_archive2) == limit next_id = tasks_result['next_task_id'] assert next_id is not None all_tasks = tasks_to_archive2 while next_id is not None: tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, last_id=next_id, limit=limit) tasks_to_archive2 = tasks_result['tasks'] assert len(tasks_to_archive2) <= limit all_tasks.extend(tasks_to_archive2) next_id = tasks_result.get('next_task_id') actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} for task in all_tasks: actual_filtered_per_status[task['task_policy']] += 1 assert actual_filtered_per_status == status_per_policy def test_delete_archived_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] swh_scheduler.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: t = swh_scheduler.end_task_run( task['backend_id'], status='eventful') c = random.randint(0, 100) if c <= percent: _tasks.append({'task_id': t['task'], 'task_run_id': t['id']}) swh_scheduler.delete_archived_tasks(_tasks) all_tasks = [task['id'] for task in swh_scheduler.search_tasks()] tasks_count = len(all_tasks) tasks_run_count = len(swh_scheduler.get_task_runs(all_tasks)) assert tasks_count == total_tasks - len(_tasks) assert tasks_run_count == total_tasks - len(_tasks) def test_get_task_runs_no_task(self, swh_scheduler): '''No task exist in the scheduler's db, get_task_runs() should always return an empty list. ''' assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_no_task_executed(self, swh_scheduler): '''No task has been executed yet, get_task_runs() should always return an empty list. ''' self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) swh_scheduler.create_tasks(recurring + oneshots) assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_with_scheduled(self, swh_scheduler): '''Some tasks have been scheduled but not executed yet, get_task_runs() should not return an empty list. limit should behave as expected. ''' self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] swh_scheduler.mass_schedule_task_runs(backend_tasks) assert not swh_scheduler.get_task_runs( task_ids=[total_tasks + 1]) btask = backend_tasks[0] runs = swh_scheduler.get_task_runs( task_ids=[btask['task']]) assert len(runs) == 1 run = runs[0] assert subdict(run, excl=('id',)) == { 'task': btask['task'], 'backend_id': btask['backend_id'], 'scheduled': btask['scheduled'], 'started': None, 'ended': None, 'metadata': None, 'status': 'scheduled', } runs = swh_scheduler.get_task_runs( task_ids=[bt['task'] for bt in backend_tasks], limit=2) assert len(runs) == 2 runs = swh_scheduler.get_task_runs( task_ids=[bt['task'] for bt in backend_tasks]) assert len(runs) == total_tasks keys = ('task', 'backend_id', 'scheduled') assert sorted([subdict(x, keys) for x in runs], key=lambda x: x['task']) == backend_tasks def test_get_task_runs_with_executed(self, swh_scheduler): '''Some tasks have been executed, get_task_runs() should not return an empty list. limit should behave as expected. ''' self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] swh_scheduler.mass_schedule_task_runs(backend_tasks) btask = backend_tasks[0] ts = utcnow() swh_scheduler.start_task_run(btask['backend_id'], metadata={'something': 'stupid'}, timestamp=ts) runs = swh_scheduler.get_task_runs(task_ids=[btask['task']]) assert len(runs) == 1 assert subdict(runs[0], excl=('id')) == { 'task': btask['task'], 'backend_id': btask['backend_id'], 'scheduled': btask['scheduled'], 'started': ts, 'ended': None, 'metadata': {'something': 'stupid'}, 'status': 'started', } ts2 = utcnow() swh_scheduler.end_task_run(btask['backend_id'], metadata={'other': 'stuff'}, timestamp=ts2, status='eventful') runs = swh_scheduler.get_task_runs(task_ids=[btask['task']]) assert len(runs) == 1 assert subdict(runs[0], excl=('id')) == { 'task': btask['task'], 'backend_id': btask['backend_id'], 'scheduled': btask['scheduled'], 'started': ts, 'ended': ts2, 'metadata': {'something': 'stupid', 'other': 'stuff'}, 'status': 'eventful', } @staticmethod def _task_from_template(template, next_run, priority, *args, **kwargs): ret = copy.deepcopy(template) ret['next_run'] = next_run if priority: ret['priority'] = priority if args: ret['arguments']['args'] = list(args) if kwargs: ret['arguments']['kwargs'] = kwargs return ret def _pop_priority(self, priorities): if not priorities: return None for priority, remains in priorities.items(): if remains > 0: priorities[priority] = remains - 1 return priority return None def _tasks_from_template(self, template, max_timestamp, num, num_priority=0, priorities=None): if num_priority and priorities: priorities = { priority: ratio * num_priority for priority, ratio in priorities.items() } tasks = [] for i in range(num + num_priority): priority = self._pop_priority(priorities) tasks.append(self._task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, 'argument-%03d' % i, **{'kwarg%03d' % i: 'bogus-kwarg'} )) return tasks def _create_task_types(self, scheduler): for tt in TASK_TYPES.values(): scheduler.create_task_type(tt)