Page MenuHomeSoftware Heritage

test_scheduler.py
No OneTemporary

test_scheduler.py

# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import datetime
import random
import uuid
from collections import defaultdict
import psycopg2
from arrow import utcnow
import pytest
TASK_TYPES = {
'git': {
'type': 'update-git',
'description': 'Update a git repository',
'backend_name': 'swh.loader.git.tasks.UpdateGitRepository',
'default_interval': datetime.timedelta(days=64),
'min_interval': datetime.timedelta(hours=12),
'max_interval': datetime.timedelta(days=64),
'backoff_factor': 2,
'max_queue_length': None,
'num_retries': 7,
'retry_delay': datetime.timedelta(hours=2),
},
'hg': {
'type': 'update-hg',
'description': 'Update a mercurial repository',
'backend_name': 'swh.loader.mercurial.tasks.UpdateHgRepository',
'default_interval': datetime.timedelta(days=64),
'min_interval': datetime.timedelta(hours=12),
'max_interval': datetime.timedelta(days=64),
'backoff_factor': 2,
'max_queue_length': None,
'num_retries': 7,
'retry_delay': datetime.timedelta(hours=2),
},
}
TEMPLATES = {
'git': {
'type': 'update-git',
'arguments': {
'args': [],
'kwargs': {},
},
'next_run': None,
},
'hg': {
'type': 'update-hg',
'arguments': {
'args': [],
'kwargs': {},
},
'next_run': None,
'policy': 'oneshot',
}
}
def subdict(d, keys=None, excl=()):
if keys is None:
keys = [k for k in d.keys()]
return {k: d[k] for k in keys if k not in excl}
@pytest.mark.db
class TestScheduler:
def test_get_priority_ratios(self, swh_scheduler):
assert swh_scheduler.get_priority_ratios() == {
'high': 0.5,
'normal': 0.3,
'low': 0.2,
}
def test_add_task_type(self, swh_scheduler):
tt = TASK_TYPES['git']
swh_scheduler.create_task_type(tt)
assert tt == swh_scheduler.get_task_type(tt['type'])
with pytest.raises(psycopg2.IntegrityError,
match=r'\(type\)=\(%s\)' % tt['type']):
swh_scheduler.create_task_type(tt)
tt2 = TASK_TYPES['hg']
swh_scheduler.create_task_type(tt2)
assert tt == swh_scheduler.get_task_type(tt['type'])
assert tt2 == swh_scheduler.get_task_type(tt2['type'])
def test_get_task_types(self, swh_scheduler):
tt, tt2 = TASK_TYPES['git'], TASK_TYPES['hg']
swh_scheduler.create_task_type(tt)
swh_scheduler.create_task_type(tt2)
actual_task_types = swh_scheduler.get_task_types()
assert tt in actual_task_types
assert tt2 in actual_task_types
def test_create_tasks(self, swh_scheduler):
priority_ratio = self._priority_ratio(swh_scheduler)
self._create_task_types(swh_scheduler)
num_tasks_priority = 100
tasks_1 = self._tasks_from_template(
TEMPLATES['git'], utcnow(), 100)
tasks_2 = self._tasks_from_template(
TEMPLATES['hg'], utcnow(), 100,
num_tasks_priority, priorities=priority_ratio)
tasks = tasks_1 + tasks_2
# tasks are returned only once with their ids
ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2)
set_ret1 = set([t['id'] for t in ret1])
# creating the same set result in the same ids
ret = swh_scheduler.create_tasks(tasks)
set_ret = set([t['id'] for t in ret])
# Idempotence results
assert set_ret == set_ret1
assert len(ret) == len(ret1)
ids = set()
actual_priorities = defaultdict(int)
for task, orig_task in zip(ret, tasks):
task = copy.deepcopy(task)
task_type = TASK_TYPES[orig_task['type'].split('-')[-1]]
assert task['id'] not in ids
assert task['status'] == 'next_run_not_scheduled'
assert task['current_interval'] == task_type['default_interval']
assert task['policy'] == orig_task.get('policy', 'recurring')
priority = task.get('priority')
if priority:
actual_priorities[priority] += 1
assert task['retries_left'] == (task_type['num_retries'] or 0)
ids.add(task['id'])
del task['id']
del task['status']
del task['current_interval']
del task['retries_left']
if 'policy' not in orig_task:
del task['policy']
if 'priority' not in orig_task:
del task['priority']
assert task == orig_task
assert dict(actual_priorities) == {
priority: int(ratio * num_tasks_priority)
for priority, ratio in priority_ratio.items()
}
def test_peek_ready_tasks_no_priority(self, swh_scheduler):
self._create_task_types(swh_scheduler)
t = utcnow()
task_type = TEMPLATES['git']['type']
tasks = self._tasks_from_template(TEMPLATES['git'], t, 100)
random.shuffle(tasks)
swh_scheduler.create_tasks(tasks)
ready_tasks = swh_scheduler.peek_ready_tasks(task_type)
assert len(ready_tasks) == len(tasks)
for i in range(len(ready_tasks) - 1):
assert ready_tasks[i]['next_run'] <= ready_tasks[i+1]['next_run']
# Only get the first few ready tasks
limit = random.randrange(5, 5 + len(tasks)//2)
ready_tasks_limited = swh_scheduler.peek_ready_tasks(
task_type, num_tasks=limit)
assert len(ready_tasks_limited) == limit
assert ready_tasks_limited == ready_tasks[:limit]
# Limit by timestamp
max_ts = tasks[limit-1]['next_run']
ready_tasks_timestamped = swh_scheduler.peek_ready_tasks(
task_type, timestamp=max_ts)
for ready_task in ready_tasks_timestamped:
assert ready_task['next_run'] <= max_ts
# Make sure we get proper behavior for the first ready tasks
assert ready_tasks[:len(ready_tasks_timestamped)] \
== ready_tasks_timestamped
# Limit by both
ready_tasks_both = swh_scheduler.peek_ready_tasks(
task_type, timestamp=max_ts, num_tasks=limit//3)
assert len(ready_tasks_both) <= limit//3
for ready_task in ready_tasks_both:
assert ready_task['next_run'] <= max_ts
assert ready_task in ready_tasks[:limit//3]
def _priority_ratio(self, swh_scheduler):
return swh_scheduler.get_priority_ratios()
def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler):
priority_ratio = self._priority_ratio(swh_scheduler)
self._create_task_types(swh_scheduler)
t = utcnow()
task_type = TEMPLATES['git']['type']
num_tasks_priority = 100
num_tasks_no_priority = 100
# Create tasks with and without priorities
tasks = self._tasks_from_template(
TEMPLATES['git'], t,
num=num_tasks_no_priority,
num_priority=num_tasks_priority,
priorities=priority_ratio)
random.shuffle(tasks)
swh_scheduler.create_tasks(tasks)
# take all available tasks
ready_tasks = swh_scheduler.peek_ready_tasks(
task_type)
assert len(ready_tasks) == len(tasks)
assert num_tasks_priority + num_tasks_no_priority \
== len(ready_tasks)
count_tasks_per_priority = defaultdict(int)
for task in ready_tasks:
priority = task.get('priority')
if priority:
count_tasks_per_priority[priority] += 1
assert dict(count_tasks_per_priority) == {
priority: int(ratio * num_tasks_priority)
for priority, ratio in priority_ratio.items()
}
# Only get some ready tasks
num_tasks = random.randrange(5, 5 + num_tasks_no_priority//2)
num_tasks_priority = random.randrange(5, num_tasks_priority//2)
ready_tasks_limited = swh_scheduler.peek_ready_tasks(
task_type, num_tasks=num_tasks,
num_tasks_priority=num_tasks_priority)
count_tasks_per_priority = defaultdict(int)
for task in ready_tasks_limited:
priority = task.get('priority')
count_tasks_per_priority[priority] += 1
import math
for priority, ratio in priority_ratio.items():
expected_count = math.ceil(ratio * num_tasks_priority)
actual_prio = count_tasks_per_priority[priority]
assert (actual_prio == expected_count or
actual_prio == expected_count + 1)
assert count_tasks_per_priority[None] == num_tasks
def test_grab_ready_tasks(self, swh_scheduler):
priority_ratio = self._priority_ratio(swh_scheduler)
self._create_task_types(swh_scheduler)
t = utcnow()
task_type = TEMPLATES['git']['type']
num_tasks_priority = 100
num_tasks_no_priority = 100
# Create tasks with and without priorities
tasks = self._tasks_from_template(
TEMPLATES['git'], t,
num=num_tasks_no_priority,
num_priority=num_tasks_priority,
priorities=priority_ratio)
random.shuffle(tasks)
swh_scheduler.create_tasks(tasks)
first_ready_tasks = swh_scheduler.peek_ready_tasks(
task_type, num_tasks=10, num_tasks_priority=10)
grabbed_tasks = swh_scheduler.grab_ready_tasks(
task_type, num_tasks=10, num_tasks_priority=10)
for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks):
assert peeked['status'] == 'next_run_not_scheduled'
del peeked['status']
assert grabbed['status'] == 'next_run_scheduled'
del grabbed['status']
assert peeked == grabbed
assert peeked['priority'] == grabbed['priority']
def test_get_tasks(self, swh_scheduler):
self._create_task_types(swh_scheduler)
t = utcnow()
tasks = self._tasks_from_template(TEMPLATES['git'], t, 100)
tasks = swh_scheduler.create_tasks(tasks)
random.shuffle(tasks)
while len(tasks) > 1:
length = random.randrange(1, len(tasks))
cur_tasks = sorted(tasks[:length], key=lambda x: x['id'])
tasks[:length] = []
ret = swh_scheduler.get_tasks(task['id'] for task in cur_tasks)
# result is not guaranteed to be sorted
ret.sort(key=lambda x: x['id'])
assert ret == cur_tasks
def test_search_tasks(self, swh_scheduler):
def make_real_dicts(l):
"""RealDictRow is not a real dict."""
return [dict(d.items()) for d in l]
self._create_task_types(swh_scheduler)
t = utcnow()
tasks = self._tasks_from_template(TEMPLATES['git'], t, 100)
tasks = swh_scheduler.create_tasks(tasks)
assert make_real_dicts(swh_scheduler.search_tasks()) \
== make_real_dicts(tasks)
def test_filter_task_to_archive(self, swh_scheduler):
"""Filtering only list disabled recurring or completed oneshot tasks
"""
self._create_task_types(swh_scheduler)
_time = utcnow()
recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12)
oneshots = self._tasks_from_template(TEMPLATES['hg'], _time, 12)
total_tasks = len(recurring) + len(oneshots)
# simulate scheduling tasks
pending_tasks = swh_scheduler.create_tasks(recurring + oneshots)
backend_tasks = [{
'task': task['id'],
'backend_id': str(uuid.uuid4()),
'scheduled': utcnow(),
} for task in pending_tasks]
swh_scheduler.mass_schedule_task_runs(backend_tasks)
# we simulate the task are being done
_tasks = []
for task in backend_tasks:
t = swh_scheduler.end_task_run(
task['backend_id'], status='eventful')
_tasks.append(t)
# Randomly update task's status per policy
status_per_policy = {'recurring': 0, 'oneshot': 0}
status_choice = {
# policy: [tuple (1-for-filtering, 'associated-status')]
'recurring': [(1, 'disabled'),
(0, 'completed'),
(0, 'next_run_not_scheduled')],
'oneshot': [(0, 'next_run_not_scheduled'),
(1, 'disabled'),
(1, 'completed')]
}
tasks_to_update = defaultdict(list)
_task_ids = defaultdict(list)
# randomize 'disabling' recurring task or 'complete' oneshot task
for task in pending_tasks:
policy = task['policy']
_task_ids[policy].append(task['id'])
status = random.choice(status_choice[policy])
if status[0] != 1:
continue
# elected for filtering
status_per_policy[policy] += status[0]
tasks_to_update[policy].append(task['id'])
swh_scheduler.disable_tasks(tasks_to_update['recurring'])
# hack: change the status to something else than completed/disabled
swh_scheduler.set_status_tasks(
_task_ids['oneshot'], status='next_run_not_scheduled')
# complete the tasks to update
swh_scheduler.set_status_tasks(
tasks_to_update['oneshot'], status='completed')
total_tasks_filtered = (status_per_policy['recurring'] +
status_per_policy['oneshot'])
# retrieve tasks to archive
after = _time.shift(days=-1).format('YYYY-MM-DD')
before = utcnow().shift(days=1).format('YYYY-MM-DD')
tasks_to_archive = list(swh_scheduler.filter_task_to_archive(
after_ts=after, before_ts=before, limit=total_tasks))
assert len(tasks_to_archive) == total_tasks_filtered
actual_filtered_per_status = {'recurring': 0, 'oneshot': 0}
for task in tasks_to_archive:
actual_filtered_per_status[task['task_policy']] += 1
assert actual_filtered_per_status == status_per_policy
def test_delete_archived_tasks(self, swh_scheduler):
self._create_task_types(swh_scheduler)
_time = utcnow()
recurring = self._tasks_from_template(
TEMPLATES['git'], _time, 12)
oneshots = self._tasks_from_template(
TEMPLATES['hg'], _time, 12)
total_tasks = len(recurring) + len(oneshots)
pending_tasks = swh_scheduler.create_tasks(recurring + oneshots)
backend_tasks = [{
'task': task['id'],
'backend_id': str(uuid.uuid4()),
'scheduled': utcnow(),
} for task in pending_tasks]
swh_scheduler.mass_schedule_task_runs(backend_tasks)
_tasks = []
percent = random.randint(0, 100) # random election removal boundary
for task in backend_tasks:
t = swh_scheduler.end_task_run(
task['backend_id'], status='eventful')
c = random.randint(0, 100)
if c <= percent:
_tasks.append({'task_id': t['task'], 'task_run_id': t['id']})
swh_scheduler.delete_archived_tasks(_tasks)
all_tasks = [task['id'] for task in swh_scheduler.search_tasks()]
tasks_count = len(all_tasks)
tasks_run_count = len(swh_scheduler.get_task_runs(all_tasks))
assert tasks_count == total_tasks - len(_tasks)
assert tasks_run_count == total_tasks - len(_tasks)
def test_get_task_runs_no_task(self, swh_scheduler):
'''No task exist in the scheduler's db, get_task_runs() should always return an
empty list.
'''
assert not swh_scheduler.get_task_runs(task_ids=())
assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3))
assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3),
limit=10)
def test_get_task_runs_no_task_executed(self, swh_scheduler):
'''No task has been executed yet, get_task_runs() should always return an empty
list.
'''
self._create_task_types(swh_scheduler)
_time = utcnow()
recurring = self._tasks_from_template(
TEMPLATES['git'], _time, 12)
oneshots = self._tasks_from_template(
TEMPLATES['hg'], _time, 12)
swh_scheduler.create_tasks(recurring + oneshots)
assert not swh_scheduler.get_task_runs(task_ids=())
assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3))
assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10)
def test_get_task_runs_with_scheduled(self, swh_scheduler):
'''Some tasks have been scheduled but not executed yet, get_task_runs() should
not return an empty list. limit should behave as expected.
'''
self._create_task_types(swh_scheduler)
_time = utcnow()
recurring = self._tasks_from_template(
TEMPLATES['git'], _time, 12)
oneshots = self._tasks_from_template(
TEMPLATES['hg'], _time, 12)
total_tasks = len(recurring) + len(oneshots)
pending_tasks = swh_scheduler.create_tasks(recurring + oneshots)
backend_tasks = [{
'task': task['id'],
'backend_id': str(uuid.uuid4()),
'scheduled': utcnow(),
} for task in pending_tasks]
swh_scheduler.mass_schedule_task_runs(backend_tasks)
assert not swh_scheduler.get_task_runs(
task_ids=[total_tasks + 1])
btask = backend_tasks[0]
runs = swh_scheduler.get_task_runs(
task_ids=[btask['task']])
assert len(runs) == 1
run = runs[0]
assert subdict(run, excl=('id',)) == {
'task': btask['task'],
'backend_id': btask['backend_id'],
'scheduled': btask['scheduled'],
'started': None,
'ended': None,
'metadata': None,
'status': 'scheduled',
}
runs = swh_scheduler.get_task_runs(
task_ids=[bt['task'] for bt in backend_tasks], limit=2)
assert len(runs) == 2
runs = swh_scheduler.get_task_runs(
task_ids=[bt['task'] for bt in backend_tasks])
assert len(runs) == total_tasks
keys = ('task', 'backend_id', 'scheduled')
assert sorted([subdict(x, keys) for x in runs],
key=lambda x: x['task']) == backend_tasks
def test_get_task_runs_with_executed(self, swh_scheduler):
'''Some tasks have been executed, get_task_runs() should
not return an empty list. limit should behave as expected.
'''
self._create_task_types(swh_scheduler)
_time = utcnow()
recurring = self._tasks_from_template(
TEMPLATES['git'], _time, 12)
oneshots = self._tasks_from_template(
TEMPLATES['hg'], _time, 12)
pending_tasks = swh_scheduler.create_tasks(recurring + oneshots)
backend_tasks = [{
'task': task['id'],
'backend_id': str(uuid.uuid4()),
'scheduled': utcnow(),
} for task in pending_tasks]
swh_scheduler.mass_schedule_task_runs(backend_tasks)
btask = backend_tasks[0]
ts = utcnow()
swh_scheduler.start_task_run(btask['backend_id'],
metadata={'something': 'stupid'},
timestamp=ts)
runs = swh_scheduler.get_task_runs(task_ids=[btask['task']])
assert len(runs) == 1
assert subdict(runs[0], excl=('id')) == {
'task': btask['task'],
'backend_id': btask['backend_id'],
'scheduled': btask['scheduled'],
'started': ts,
'ended': None,
'metadata': {'something': 'stupid'},
'status': 'started',
}
ts2 = utcnow()
swh_scheduler.end_task_run(btask['backend_id'],
metadata={'other': 'stuff'},
timestamp=ts2,
status='eventful')
runs = swh_scheduler.get_task_runs(task_ids=[btask['task']])
assert len(runs) == 1
assert subdict(runs[0], excl=('id')) == {
'task': btask['task'],
'backend_id': btask['backend_id'],
'scheduled': btask['scheduled'],
'started': ts,
'ended': ts2,
'metadata': {'something': 'stupid', 'other': 'stuff'},
'status': 'eventful',
}
@staticmethod
def _task_from_template(template, next_run, priority, *args, **kwargs):
ret = copy.deepcopy(template)
ret['next_run'] = next_run
if priority:
ret['priority'] = priority
if args:
ret['arguments']['args'] = list(args)
if kwargs:
ret['arguments']['kwargs'] = kwargs
return ret
def _pop_priority(self, priorities):
if not priorities:
return None
for priority, remains in priorities.items():
if remains > 0:
priorities[priority] = remains - 1
return priority
return None
def _tasks_from_template(self, template, max_timestamp, num,
num_priority=0, priorities=None):
if num_priority and priorities:
priorities = {
priority: ratio * num_priority
for priority, ratio in priorities.items()
}
tasks = []
for i in range(num + num_priority):
priority = self._pop_priority(priorities)
tasks.append(self._task_from_template(
template,
max_timestamp - datetime.timedelta(microseconds=i),
priority,
'argument-%03d' % i,
**{'kwarg%03d' % i: 'bogus-kwarg'}
))
return tasks
def _create_task_types(self, scheduler):
for tt in TASK_TYPES.values():
scheduler.create_task_type(tt)

File Metadata

Mime Type
text/x-python
Expires
Mon, Apr 14, 11:52 PM (1 w, 17 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3286452

Event Timeline