Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_scheduler.py
Show All 10 Lines | |||||
from collections import defaultdict | from collections import defaultdict | ||||
from typing import Any, Dict | from typing import Any, Dict | ||||
from arrow import utcnow | from arrow import utcnow | ||||
import psycopg2 | import psycopg2 | ||||
import pytest | import pytest | ||||
from .common import tasks_from_template, TEMPLATES, TASK_TYPES | |||||
TASK_TYPES = { | |||||
'git': { | |||||
'type': 'update-git', | |||||
'description': 'Update a git repository', | |||||
'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', | |||||
'default_interval': datetime.timedelta(days=64), | |||||
'min_interval': datetime.timedelta(hours=12), | |||||
'max_interval': datetime.timedelta(days=64), | |||||
'backoff_factor': 2, | |||||
'max_queue_length': None, | |||||
'num_retries': 7, | |||||
'retry_delay': datetime.timedelta(hours=2), | |||||
}, | |||||
'hg': { | |||||
'type': 'update-hg', | |||||
'description': 'Update a mercurial repository', | |||||
'backend_name': 'swh.loader.mercurial.tasks.UpdateHgRepository', | |||||
'default_interval': datetime.timedelta(days=64), | |||||
'min_interval': datetime.timedelta(hours=12), | |||||
'max_interval': datetime.timedelta(days=64), | |||||
'backoff_factor': 2, | |||||
'max_queue_length': None, | |||||
'num_retries': 7, | |||||
'retry_delay': datetime.timedelta(hours=2), | |||||
}, | |||||
} | |||||
TEMPLATES = { | |||||
'git': { | |||||
'type': 'update-git', | |||||
'arguments': { | |||||
'args': [], | |||||
'kwargs': {}, | |||||
}, | |||||
'next_run': None, | |||||
}, | |||||
'hg': { | |||||
'type': 'update-hg', | |||||
'arguments': { | |||||
'args': [], | |||||
'kwargs': {}, | |||||
}, | |||||
'next_run': None, | |||||
'policy': 'oneshot', | |||||
} | |||||
} | |||||
def subdict(d, keys=None, excl=()): | def subdict(d, keys=None, excl=()): | ||||
if keys is None: | if keys is None: | ||||
keys = [k for k in d.keys()] | keys = [k for k in d.keys()] | ||||
return {k: d[k] for k in keys if k not in excl} | return {k: d[k] for k in keys if k not in excl} | ||||
Show All 26 Lines | def test_get_task_types(self, swh_scheduler): | ||||
actual_task_types = swh_scheduler.get_task_types() | actual_task_types = swh_scheduler.get_task_types() | ||||
assert tt in actual_task_types | assert tt in actual_task_types | ||||
assert tt2 in actual_task_types | assert tt2 in actual_task_types | ||||
def test_create_tasks(self, swh_scheduler): | def test_create_tasks(self, swh_scheduler): | ||||
priority_ratio = self._priority_ratio(swh_scheduler) | priority_ratio = self._priority_ratio(swh_scheduler) | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
num_tasks_priority = 100 | num_tasks_priority = 100 | ||||
tasks_1 = self._tasks_from_template( | tasks_1 = tasks_from_template( | ||||
TEMPLATES['git'], utcnow(), 100) | TEMPLATES['git'], utcnow(), 100) | ||||
tasks_2 = self._tasks_from_template( | tasks_2 = tasks_from_template( | ||||
TEMPLATES['hg'], utcnow(), 100, | TEMPLATES['hg'], utcnow(), 100, | ||||
num_tasks_priority, priorities=priority_ratio) | num_tasks_priority, priorities=priority_ratio) | ||||
tasks = tasks_1 + tasks_2 | tasks = tasks_1 + tasks_2 | ||||
# tasks are returned only once with their ids | # tasks are returned only once with their ids | ||||
ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2) | ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2) | ||||
set_ret1 = set([t['id'] for t in ret1]) | set_ret1 = set([t['id'] for t in ret1]) | ||||
Show All 35 Lines | def test_create_tasks(self, swh_scheduler): | ||||
priority: int(ratio * num_tasks_priority) | priority: int(ratio * num_tasks_priority) | ||||
for priority, ratio in priority_ratio.items() | for priority, ratio in priority_ratio.items() | ||||
} | } | ||||
def test_peek_ready_tasks_no_priority(self, swh_scheduler): | def test_peek_ready_tasks_no_priority(self, swh_scheduler): | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
t = utcnow() | t = utcnow() | ||||
task_type = TEMPLATES['git']['type'] | task_type = TEMPLATES['git']['type'] | ||||
tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) | tasks = tasks_from_template(TEMPLATES['git'], t, 100) | ||||
random.shuffle(tasks) | random.shuffle(tasks) | ||||
swh_scheduler.create_tasks(tasks) | swh_scheduler.create_tasks(tasks) | ||||
ready_tasks = swh_scheduler.peek_ready_tasks(task_type) | ready_tasks = swh_scheduler.peek_ready_tasks(task_type) | ||||
assert len(ready_tasks) == len(tasks) | assert len(ready_tasks) == len(tasks) | ||||
for i in range(len(ready_tasks) - 1): | for i in range(len(ready_tasks) - 1): | ||||
assert ready_tasks[i]['next_run'] <= ready_tasks[i+1]['next_run'] | assert ready_tasks[i]['next_run'] <= ready_tasks[i+1]['next_run'] | ||||
Show All 31 Lines | class TestScheduler: | ||||
def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler): | def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler): | ||||
priority_ratio = self._priority_ratio(swh_scheduler) | priority_ratio = self._priority_ratio(swh_scheduler) | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
t = utcnow() | t = utcnow() | ||||
task_type = TEMPLATES['git']['type'] | task_type = TEMPLATES['git']['type'] | ||||
num_tasks_priority = 100 | num_tasks_priority = 100 | ||||
num_tasks_no_priority = 100 | num_tasks_no_priority = 100 | ||||
# Create tasks with and without priorities | # Create tasks with and without priorities | ||||
tasks = self._tasks_from_template( | tasks = tasks_from_template( | ||||
TEMPLATES['git'], t, | TEMPLATES['git'], t, | ||||
num=num_tasks_no_priority, | num=num_tasks_no_priority, | ||||
num_priority=num_tasks_priority, | num_priority=num_tasks_priority, | ||||
priorities=priority_ratio) | priorities=priority_ratio) | ||||
random.shuffle(tasks) | random.shuffle(tasks) | ||||
swh_scheduler.create_tasks(tasks) | swh_scheduler.create_tasks(tasks) | ||||
Show All 40 Lines | class TestScheduler: | ||||
def test_grab_ready_tasks(self, swh_scheduler): | def test_grab_ready_tasks(self, swh_scheduler): | ||||
priority_ratio = self._priority_ratio(swh_scheduler) | priority_ratio = self._priority_ratio(swh_scheduler) | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
t = utcnow() | t = utcnow() | ||||
task_type = TEMPLATES['git']['type'] | task_type = TEMPLATES['git']['type'] | ||||
num_tasks_priority = 100 | num_tasks_priority = 100 | ||||
num_tasks_no_priority = 100 | num_tasks_no_priority = 100 | ||||
# Create tasks with and without priorities | # Create tasks with and without priorities | ||||
tasks = self._tasks_from_template( | tasks = tasks_from_template( | ||||
TEMPLATES['git'], t, | TEMPLATES['git'], t, | ||||
num=num_tasks_no_priority, | num=num_tasks_no_priority, | ||||
num_priority=num_tasks_priority, | num_priority=num_tasks_priority, | ||||
priorities=priority_ratio) | priorities=priority_ratio) | ||||
random.shuffle(tasks) | random.shuffle(tasks) | ||||
swh_scheduler.create_tasks(tasks) | swh_scheduler.create_tasks(tasks) | ||||
first_ready_tasks = swh_scheduler.peek_ready_tasks( | first_ready_tasks = swh_scheduler.peek_ready_tasks( | ||||
task_type, num_tasks=10, num_tasks_priority=10) | task_type, num_tasks=10, num_tasks_priority=10) | ||||
grabbed_tasks = swh_scheduler.grab_ready_tasks( | grabbed_tasks = swh_scheduler.grab_ready_tasks( | ||||
task_type, num_tasks=10, num_tasks_priority=10) | task_type, num_tasks=10, num_tasks_priority=10) | ||||
for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): | for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): | ||||
assert peeked['status'] == 'next_run_not_scheduled' | assert peeked['status'] == 'next_run_not_scheduled' | ||||
del peeked['status'] | del peeked['status'] | ||||
assert grabbed['status'] == 'next_run_scheduled' | assert grabbed['status'] == 'next_run_scheduled' | ||||
del grabbed['status'] | del grabbed['status'] | ||||
assert peeked == grabbed | assert peeked == grabbed | ||||
assert peeked['priority'] == grabbed['priority'] | assert peeked['priority'] == grabbed['priority'] | ||||
def test_get_tasks(self, swh_scheduler): | def test_get_tasks(self, swh_scheduler): | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
t = utcnow() | t = utcnow() | ||||
tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) | tasks = tasks_from_template(TEMPLATES['git'], t, 100) | ||||
tasks = swh_scheduler.create_tasks(tasks) | tasks = swh_scheduler.create_tasks(tasks) | ||||
random.shuffle(tasks) | random.shuffle(tasks) | ||||
while len(tasks) > 1: | while len(tasks) > 1: | ||||
length = random.randrange(1, len(tasks)) | length = random.randrange(1, len(tasks)) | ||||
cur_tasks = sorted(tasks[:length], key=lambda x: x['id']) | cur_tasks = sorted(tasks[:length], key=lambda x: x['id']) | ||||
tasks[:length] = [] | tasks[:length] = [] | ||||
ret = swh_scheduler.get_tasks(task['id'] for task in cur_tasks) | ret = swh_scheduler.get_tasks(task['id'] for task in cur_tasks) | ||||
# result is not guaranteed to be sorted | # result is not guaranteed to be sorted | ||||
ret.sort(key=lambda x: x['id']) | ret.sort(key=lambda x: x['id']) | ||||
assert ret == cur_tasks | assert ret == cur_tasks | ||||
def test_search_tasks(self, swh_scheduler): | def test_search_tasks(self, swh_scheduler): | ||||
def make_real_dicts(l): | def make_real_dicts(l): | ||||
"""RealDictRow is not a real dict.""" | """RealDictRow is not a real dict.""" | ||||
return [dict(d.items()) for d in l] | return [dict(d.items()) for d in l] | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
t = utcnow() | t = utcnow() | ||||
tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) | tasks = tasks_from_template(TEMPLATES['git'], t, 100) | ||||
tasks = swh_scheduler.create_tasks(tasks) | tasks = swh_scheduler.create_tasks(tasks) | ||||
assert make_real_dicts(swh_scheduler.search_tasks()) \ | assert make_real_dicts(swh_scheduler.search_tasks()) \ | ||||
== make_real_dicts(tasks) | == make_real_dicts(tasks) | ||||
def assert_filtered_task_ok( | def assert_filtered_task_ok( | ||||
self, task: Dict[str, Any], | self, task: Dict[str, Any], | ||||
after: datetime.datetime, | after: datetime.datetime, | ||||
before: datetime.datetime) -> None: | before: datetime.datetime) -> None: | ||||
Show All 10 Lines | def assert_filtered_task_ok( | ||||
assert task['task_status'] in ['disabled'] | assert task['task_status'] in ['disabled'] | ||||
def test_filter_task_to_archive(self, swh_scheduler): | def test_filter_task_to_archive(self, swh_scheduler): | ||||
"""Filtering only list disabled recurring or completed oneshot tasks | """Filtering only list disabled recurring or completed oneshot tasks | ||||
""" | """ | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
_time = utcnow() | _time = utcnow() | ||||
recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12) | recurring = tasks_from_template(TEMPLATES['git'], _time, 12) | ||||
oneshots = self._tasks_from_template(TEMPLATES['hg'], _time, 12) | oneshots = tasks_from_template(TEMPLATES['hg'], _time, 12) | ||||
total_tasks = len(recurring) + len(oneshots) | total_tasks = len(recurring) + len(oneshots) | ||||
# simulate scheduling tasks | # simulate scheduling tasks | ||||
pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) | pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) | ||||
backend_tasks = [{ | backend_tasks = [{ | ||||
'task': task['id'], | 'task': task['id'], | ||||
'backend_id': str(uuid.uuid4()), | 'backend_id': str(uuid.uuid4()), | ||||
'scheduled': utcnow(), | 'scheduled': utcnow(), | ||||
▲ Show 20 Lines • Show All 92 Lines • ▼ Show 20 Lines | def test_filter_task_to_archive(self, swh_scheduler): | ||||
self.assert_filtered_task_ok(task, after, before) | self.assert_filtered_task_ok(task, after, before) | ||||
actual_filtered_per_status[task['task_policy']] += 1 | actual_filtered_per_status[task['task_policy']] += 1 | ||||
assert actual_filtered_per_status == status_per_policy | assert actual_filtered_per_status == status_per_policy | ||||
def test_delete_archived_tasks(self, swh_scheduler): | def test_delete_archived_tasks(self, swh_scheduler): | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
_time = utcnow() | _time = utcnow() | ||||
recurring = self._tasks_from_template( | recurring = tasks_from_template( | ||||
TEMPLATES['git'], _time, 12) | TEMPLATES['git'], _time, 12) | ||||
oneshots = self._tasks_from_template( | oneshots = tasks_from_template( | ||||
TEMPLATES['hg'], _time, 12) | TEMPLATES['hg'], _time, 12) | ||||
total_tasks = len(recurring) + len(oneshots) | total_tasks = len(recurring) + len(oneshots) | ||||
pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) | pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) | ||||
backend_tasks = [{ | backend_tasks = [{ | ||||
'task': task['id'], | 'task': task['id'], | ||||
'backend_id': str(uuid.uuid4()), | 'backend_id': str(uuid.uuid4()), | ||||
'scheduled': utcnow(), | 'scheduled': utcnow(), | ||||
} for task in pending_tasks] | } for task in pending_tasks] | ||||
Show All 29 Lines | class TestScheduler: | ||||
def test_get_task_runs_no_task_executed(self, swh_scheduler): | def test_get_task_runs_no_task_executed(self, swh_scheduler): | ||||
'''No task has been executed yet, get_task_runs() should always return an empty | '''No task has been executed yet, get_task_runs() should always return an empty | ||||
list. | list. | ||||
''' | ''' | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
_time = utcnow() | _time = utcnow() | ||||
recurring = self._tasks_from_template( | recurring = tasks_from_template( | ||||
TEMPLATES['git'], _time, 12) | TEMPLATES['git'], _time, 12) | ||||
oneshots = self._tasks_from_template( | oneshots = tasks_from_template( | ||||
TEMPLATES['hg'], _time, 12) | TEMPLATES['hg'], _time, 12) | ||||
swh_scheduler.create_tasks(recurring + oneshots) | swh_scheduler.create_tasks(recurring + oneshots) | ||||
assert not swh_scheduler.get_task_runs(task_ids=()) | assert not swh_scheduler.get_task_runs(task_ids=()) | ||||
assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) | assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) | ||||
assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) | assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) | ||||
def test_get_task_runs_with_scheduled(self, swh_scheduler): | def test_get_task_runs_with_scheduled(self, swh_scheduler): | ||||
'''Some tasks have been scheduled but not executed yet, get_task_runs() should | '''Some tasks have been scheduled but not executed yet, get_task_runs() should | ||||
not return an empty list. limit should behave as expected. | not return an empty list. limit should behave as expected. | ||||
''' | ''' | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
_time = utcnow() | _time = utcnow() | ||||
recurring = self._tasks_from_template( | recurring = tasks_from_template( | ||||
TEMPLATES['git'], _time, 12) | TEMPLATES['git'], _time, 12) | ||||
oneshots = self._tasks_from_template( | oneshots = tasks_from_template( | ||||
TEMPLATES['hg'], _time, 12) | TEMPLATES['hg'], _time, 12) | ||||
total_tasks = len(recurring) + len(oneshots) | total_tasks = len(recurring) + len(oneshots) | ||||
pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) | pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) | ||||
backend_tasks = [{ | backend_tasks = [{ | ||||
'task': task['id'], | 'task': task['id'], | ||||
'backend_id': str(uuid.uuid4()), | 'backend_id': str(uuid.uuid4()), | ||||
'scheduled': utcnow(), | 'scheduled': utcnow(), | ||||
} for task in pending_tasks] | } for task in pending_tasks] | ||||
Show All 32 Lines | class TestScheduler: | ||||
def test_get_task_runs_with_executed(self, swh_scheduler): | def test_get_task_runs_with_executed(self, swh_scheduler): | ||||
'''Some tasks have been executed, get_task_runs() should | '''Some tasks have been executed, get_task_runs() should | ||||
not return an empty list. limit should behave as expected. | not return an empty list. limit should behave as expected. | ||||
''' | ''' | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
_time = utcnow() | _time = utcnow() | ||||
recurring = self._tasks_from_template( | recurring = tasks_from_template( | ||||
TEMPLATES['git'], _time, 12) | TEMPLATES['git'], _time, 12) | ||||
oneshots = self._tasks_from_template( | oneshots = tasks_from_template( | ||||
TEMPLATES['hg'], _time, 12) | TEMPLATES['hg'], _time, 12) | ||||
pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) | pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) | ||||
backend_tasks = [{ | backend_tasks = [{ | ||||
'task': task['id'], | 'task': task['id'], | ||||
'backend_id': str(uuid.uuid4()), | 'backend_id': str(uuid.uuid4()), | ||||
'scheduled': utcnow(), | 'scheduled': utcnow(), | ||||
} for task in pending_tasks] | } for task in pending_tasks] | ||||
swh_scheduler.mass_schedule_task_runs(backend_tasks) | swh_scheduler.mass_schedule_task_runs(backend_tasks) | ||||
Show All 27 Lines | def test_get_task_runs_with_executed(self, swh_scheduler): | ||||
'backend_id': btask['backend_id'], | 'backend_id': btask['backend_id'], | ||||
'scheduled': btask['scheduled'], | 'scheduled': btask['scheduled'], | ||||
'started': ts, | 'started': ts, | ||||
'ended': ts2, | 'ended': ts2, | ||||
'metadata': {'something': 'stupid', 'other': 'stuff'}, | 'metadata': {'something': 'stupid', 'other': 'stuff'}, | ||||
'status': 'eventful', | 'status': 'eventful', | ||||
} | } | ||||
@staticmethod | |||||
def _task_from_template(template, next_run, priority, *args, **kwargs): | |||||
ret = copy.deepcopy(template) | |||||
ret['next_run'] = next_run | |||||
if priority: | |||||
ret['priority'] = priority | |||||
if args: | |||||
ret['arguments']['args'] = list(args) | |||||
if kwargs: | |||||
ret['arguments']['kwargs'] = kwargs | |||||
return ret | |||||
def _pop_priority(self, priorities): | |||||
if not priorities: | |||||
return None | |||||
for priority, remains in priorities.items(): | |||||
if remains > 0: | |||||
priorities[priority] = remains - 1 | |||||
return priority | |||||
return None | |||||
def _tasks_from_template(self, template, max_timestamp, num, | |||||
num_priority=0, priorities=None): | |||||
if num_priority and priorities: | |||||
priorities = { | |||||
priority: ratio * num_priority | |||||
for priority, ratio in priorities.items() | |||||
} | |||||
tasks = [] | |||||
for i in range(num + num_priority): | |||||
priority = self._pop_priority(priorities) | |||||
tasks.append(self._task_from_template( | |||||
template, | |||||
max_timestamp - datetime.timedelta(microseconds=i), | |||||
priority, | |||||
'argument-%03d' % i, | |||||
**{'kwarg%03d' % i: 'bogus-kwarg'} | |||||
)) | |||||
return tasks | |||||
def _create_task_types(self, scheduler): | def _create_task_types(self, scheduler): | ||||
for tt in TASK_TYPES.values(): | for tt in TASK_TYPES.values(): | ||||
scheduler.create_task_type(tt) | scheduler.create_task_type(tt) |