Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_scheduler.py
Show All 15 Lines | |||||
import pytest | import pytest | ||||
from swh.core.tests.db_testing import SingleDbTestFixture | from swh.core.tests.db_testing import SingleDbTestFixture | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
from . import SQL_DIR | from . import SQL_DIR | ||||
@pytest.mark.db | TASK_TYPES = { | ||||
class CommonSchedulerTest(SingleDbTestFixture): | 'git': { | ||||
TEST_DB_NAME = 'softwareheritage-scheduler-test' | |||||
TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') | |||||
def setUp(self): | |||||
super().setUp() | |||||
tt = { | |||||
'type': 'update-git', | 'type': 'update-git', | ||||
'description': 'Update a git repository', | 'description': 'Update a git repository', | ||||
'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', | 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', | ||||
'default_interval': datetime.timedelta(days=64), | 'default_interval': datetime.timedelta(days=64), | ||||
'min_interval': datetime.timedelta(hours=12), | 'min_interval': datetime.timedelta(hours=12), | ||||
'max_interval': datetime.timedelta(days=64), | 'max_interval': datetime.timedelta(days=64), | ||||
'backoff_factor': 2, | 'backoff_factor': 2, | ||||
'max_queue_length': None, | 'max_queue_length': None, | ||||
'num_retries': 7, | 'num_retries': 7, | ||||
'retry_delay': datetime.timedelta(hours=2), | 'retry_delay': datetime.timedelta(hours=2), | ||||
} | }, | ||||
tt2 = tt.copy() | 'hg': { | ||||
tt2['type'] = 'update-hg' | 'type': 'update-hg', | ||||
tt2['description'] = 'Update a mercurial repository' | 'description': 'Update a mercurial repository', | ||||
tt2['backend_name'] = 'swh.loader.mercurial.tasks.UpdateHgRepository' | 'backend_name': 'swh.loader.mercurial.tasks.UpdateHgRepository', | ||||
tt2['max_queue_length'] = 42 | 'default_interval': datetime.timedelta(days=64), | ||||
tt2['num_retries'] = None | 'min_interval': datetime.timedelta(hours=12), | ||||
tt2['retry_delay'] = None | 'max_interval': datetime.timedelta(days=64), | ||||
'backoff_factor': 2, | |||||
self.task_types = { | 'max_queue_length': None, | ||||
tt['type']: tt, | 'num_retries': 7, | ||||
tt2['type']: tt2, | 'retry_delay': datetime.timedelta(hours=2), | ||||
}, | |||||
} | } | ||||
self.task1_template = t1_template = { | TEMPLATES = { | ||||
'type': tt['type'], | 'git': { | ||||
'type': 'update-git', | |||||
'arguments': { | |||||
'args': [], | |||||
'kwargs': {}, | |||||
}, | |||||
'next_run': None, | |||||
}, | |||||
'hg': { | |||||
'type': 'update-hg', | |||||
'arguments': { | 'arguments': { | ||||
'args': [], | 'args': [], | ||||
'kwargs': {}, | 'kwargs': {}, | ||||
}, | }, | ||||
'next_run': None, | 'next_run': None, | ||||
'policy': 'oneshot', | |||||
} | } | ||||
self.task2_template = t2_template = copy.deepcopy(t1_template) | } | ||||
t2_template['type'] = tt2['type'] | |||||
t2_template['policy'] = 'oneshot' | |||||
@pytest.mark.db | |||||
class CommonSchedulerTest(SingleDbTestFixture): | |||||
TEST_DB_NAME = 'softwareheritage-scheduler-test' | |||||
TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') | |||||
def tearDown(self): | def tearDown(self): | ||||
self.backend.close_connection() | |||||
vlorentz: Any idea why needed that? | |||||
Done Inline Actionsnope douardda: nope | |||||
self.empty_tables() | self.empty_tables() | ||||
super().tearDown() | super().tearDown() | ||||
def empty_tables(self, whitelist=["priority_ratio"]): | def empty_tables(self, whitelist=["priority_ratio"]): | ||||
query = """SELECT table_name FROM information_schema.tables | query = """SELECT table_name FROM information_schema.tables | ||||
WHERE table_schema = %%s and | WHERE table_schema = %%s and | ||||
table_name not in (%s) | table_name not in (%s) | ||||
""" % ','.join(map(lambda t: "'%s'" % t, whitelist)) | """ % ','.join(map(lambda t: "'%s'" % t, whitelist)) | ||||
self.cursor.execute(query, ('public', )) | self.cursor.execute(query, ('public', )) | ||||
tables = set(table for (table,) in self.cursor.fetchall()) | tables = set(table for (table,) in self.cursor.fetchall()) | ||||
for table in tables: | for table in tables: | ||||
self.cursor.execute('truncate table %s cascade' % table) | self.cursor.execute('truncate table %s cascade' % table) | ||||
self.conn.commit() | self.conn.commit() | ||||
def test_add_task_type(self): | def test_add_task_type(self): | ||||
tt, tt2 = self.task_types.values() | tt = TASK_TYPES['git'] | ||||
self.backend.create_task_type(tt) | self.backend.create_task_type(tt) | ||||
self.assertEqual(tt, self.backend.get_task_type(tt['type'])) | self.assertEqual(tt, self.backend.get_task_type(tt['type'])) | ||||
with self.assertRaisesRegex(psycopg2.IntegrityError, | with self.assertRaisesRegex(psycopg2.IntegrityError, | ||||
r'\(type\)=\(%s\)' % tt['type']): | r'\(type\)=\(%s\)' % tt['type']): | ||||
self.backend.create_task_type(tt) | self.backend.create_task_type(tt) | ||||
tt2 = TASK_TYPES['hg'] | |||||
self.backend.create_task_type(tt2) | self.backend.create_task_type(tt2) | ||||
self.assertEqual(tt, self.backend.get_task_type(tt['type'])) | self.assertEqual(tt, self.backend.get_task_type(tt['type'])) | ||||
self.assertEqual(tt2, self.backend.get_task_type(tt2['type'])) | self.assertEqual(tt2, self.backend.get_task_type(tt2['type'])) | ||||
def test_get_task_types(self): | def test_get_task_types(self): | ||||
tt, tt2 = self.task_types.values() | tt, tt2 = TASK_TYPES['git'], TASK_TYPES['hg'] | ||||
self.backend.create_task_type(tt) | self.backend.create_task_type(tt) | ||||
self.backend.create_task_type(tt2) | self.backend.create_task_type(tt2) | ||||
self.assertCountEqual([tt2, tt], self.backend.get_task_types()) | self.assertCountEqual([tt2, tt], self.backend.get_task_types()) | ||||
@staticmethod | @staticmethod | ||||
def _task_from_template(template, next_run, priority, *args, **kwargs): | def _task_from_template(template, next_run, priority, *args, **kwargs): | ||||
ret = copy.deepcopy(template) | ret = copy.deepcopy(template) | ||||
ret['next_run'] = next_run | ret['next_run'] = next_run | ||||
Show All 30 Lines | def _tasks_from_template(self, template, max_timestamp, num, | ||||
max_timestamp - datetime.timedelta(microseconds=i), | max_timestamp - datetime.timedelta(microseconds=i), | ||||
priority, | priority, | ||||
'argument-%03d' % i, | 'argument-%03d' % i, | ||||
**{'kwarg%03d' % i: 'bogus-kwarg'} | **{'kwarg%03d' % i: 'bogus-kwarg'} | ||||
)) | )) | ||||
return tasks | return tasks | ||||
def _create_task_types(self): | def _create_task_types(self): | ||||
for tt in self.task_types.values(): | for tt in TASK_TYPES.values(): | ||||
self.backend.create_task_type(tt) | self.backend.create_task_type(tt) | ||||
def test_create_tasks(self): | def test_create_tasks(self): | ||||
priority_ratio = self._priority_ratio() | priority_ratio = self._priority_ratio() | ||||
self._create_task_types() | self._create_task_types() | ||||
num_tasks_priority = 100 | num_tasks_priority = 100 | ||||
tasks_1 = self._tasks_from_template(self.task1_template, utcnow(), 100) | tasks_1 = self._tasks_from_template( | ||||
TEMPLATES['git'], utcnow(), 100) | |||||
tasks_2 = self._tasks_from_template( | tasks_2 = self._tasks_from_template( | ||||
self.task2_template, utcnow(), 100, | TEMPLATES['hg'], utcnow(), 100, | ||||
num_tasks_priority, priorities=priority_ratio) | num_tasks_priority, priorities=priority_ratio) | ||||
tasks = tasks_1 + tasks_2 | tasks = tasks_1 + tasks_2 | ||||
# tasks are returned only once with their ids | # tasks are returned only once with their ids | ||||
ret1 = self.backend.create_tasks(tasks + tasks_1 + tasks_2) | ret1 = self.backend.create_tasks(tasks + tasks_1 + tasks_2) | ||||
set_ret1 = set([t['id'] for t in ret1]) | set_ret1 = set([t['id'] for t in ret1]) | ||||
# creating the same set result in the same ids | # creating the same set result in the same ids | ||||
ret = self.backend.create_tasks(tasks) | ret = self.backend.create_tasks(tasks) | ||||
set_ret = set([t['id'] for t in ret]) | set_ret = set([t['id'] for t in ret]) | ||||
# Idempotence results | # Idempotence results | ||||
self.assertEqual(set_ret, set_ret1) | self.assertEqual(set_ret, set_ret1) | ||||
self.assertEqual(len(ret), len(ret1)) | self.assertEqual(len(ret), len(ret1)) | ||||
ids = set() | ids = set() | ||||
actual_priorities = defaultdict(int) | actual_priorities = defaultdict(int) | ||||
for task, orig_task in zip(ret, tasks): | for task, orig_task in zip(ret, tasks): | ||||
task = copy.deepcopy(task) | task = copy.deepcopy(task) | ||||
task_type = self.task_types[orig_task['type']] | task_type = TASK_TYPES[orig_task['type'].split('-')[-1]] | ||||
self.assertNotIn(task['id'], ids) | self.assertNotIn(task['id'], ids) | ||||
self.assertEqual(task['status'], 'next_run_not_scheduled') | self.assertEqual(task['status'], 'next_run_not_scheduled') | ||||
self.assertEqual(task['current_interval'], | self.assertEqual(task['current_interval'], | ||||
task_type['default_interval']) | task_type['default_interval']) | ||||
self.assertEqual(task['policy'], orig_task.get('policy', | self.assertEqual(task['policy'], orig_task.get('policy', | ||||
'recurring')) | 'recurring')) | ||||
priority = task.get('priority') | priority = task.get('priority') | ||||
if priority: | if priority: | ||||
Show All 15 Lines | def test_create_tasks(self): | ||||
self.assertEqual(dict(actual_priorities), { | self.assertEqual(dict(actual_priorities), { | ||||
priority: int(ratio * num_tasks_priority) | priority: int(ratio * num_tasks_priority) | ||||
for priority, ratio in priority_ratio.items() | for priority, ratio in priority_ratio.items() | ||||
}) | }) | ||||
def test_peek_ready_tasks_no_priority(self): | def test_peek_ready_tasks_no_priority(self): | ||||
self._create_task_types() | self._create_task_types() | ||||
t = utcnow() | t = utcnow() | ||||
task_type = self.task1_template['type'] | task_type = TEMPLATES['git']['type'] | ||||
tasks = self._tasks_from_template(self.task1_template, t, 100) | tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) | ||||
random.shuffle(tasks) | random.shuffle(tasks) | ||||
self.backend.create_tasks(tasks) | self.backend.create_tasks(tasks) | ||||
ready_tasks = self.backend.peek_ready_tasks(task_type) | ready_tasks = self.backend.peek_ready_tasks(task_type) | ||||
self.assertEqual(len(ready_tasks), len(tasks)) | self.assertEqual(len(ready_tasks), len(tasks)) | ||||
for i in range(len(ready_tasks) - 1): | for i in range(len(ready_tasks) - 1): | ||||
self.assertLessEqual(ready_tasks[i]['next_run'], | self.assertLessEqual(ready_tasks[i]['next_run'], | ||||
ready_tasks[i+1]['next_run']) | ready_tasks[i+1]['next_run']) | ||||
Show All 34 Lines | def _priority_ratio(self): | ||||
for row in self.cursor.fetchall(): | for row in self.cursor.fetchall(): | ||||
priority_ratio[row[0]] = row[1] | priority_ratio[row[0]] = row[1] | ||||
return priority_ratio | return priority_ratio | ||||
def test_peek_ready_tasks_mixed_priorities(self): | def test_peek_ready_tasks_mixed_priorities(self): | ||||
priority_ratio = self._priority_ratio() | priority_ratio = self._priority_ratio() | ||||
self._create_task_types() | self._create_task_types() | ||||
t = utcnow() | t = utcnow() | ||||
task_type = self.task1_template['type'] | task_type = TEMPLATES['git']['type'] | ||||
num_tasks_priority = 100 | num_tasks_priority = 100 | ||||
num_tasks_no_priority = 100 | num_tasks_no_priority = 100 | ||||
# Create tasks with and without priorities | # Create tasks with and without priorities | ||||
tasks = self._tasks_from_template( | tasks = self._tasks_from_template( | ||||
self.task1_template, t, | TEMPLATES['git'], t, | ||||
num=num_tasks_no_priority, | num=num_tasks_no_priority, | ||||
num_priority=num_tasks_priority, | num_priority=num_tasks_priority, | ||||
priorities=priority_ratio) | priorities=priority_ratio) | ||||
random.shuffle(tasks) | random.shuffle(tasks) | ||||
self.backend.create_tasks(tasks) | self.backend.create_tasks(tasks) | ||||
# take all available tasks | # take all available tasks | ||||
Show All 36 Lines | def test_peek_ready_tasks_mixed_priorities(self): | ||||
actual_prio == expected_count + 1) | actual_prio == expected_count + 1) | ||||
self.assertEqual(count_tasks_per_priority[None], num_tasks) | self.assertEqual(count_tasks_per_priority[None], num_tasks) | ||||
def test_grab_ready_tasks(self): | def test_grab_ready_tasks(self): | ||||
priority_ratio = self._priority_ratio() | priority_ratio = self._priority_ratio() | ||||
self._create_task_types() | self._create_task_types() | ||||
t = utcnow() | t = utcnow() | ||||
task_type = self.task1_template['type'] | task_type = TEMPLATES['git']['type'] | ||||
num_tasks_priority = 100 | num_tasks_priority = 100 | ||||
num_tasks_no_priority = 100 | num_tasks_no_priority = 100 | ||||
# Create tasks with and without priorities | # Create tasks with and without priorities | ||||
tasks = self._tasks_from_template( | tasks = self._tasks_from_template( | ||||
self.task1_template, t, | TEMPLATES['git'], t, | ||||
num=num_tasks_no_priority, | num=num_tasks_no_priority, | ||||
num_priority=num_tasks_priority, | num_priority=num_tasks_priority, | ||||
priorities=priority_ratio) | priorities=priority_ratio) | ||||
random.shuffle(tasks) | random.shuffle(tasks) | ||||
self.backend.create_tasks(tasks) | self.backend.create_tasks(tasks) | ||||
first_ready_tasks = self.backend.peek_ready_tasks( | first_ready_tasks = self.backend.peek_ready_tasks( | ||||
task_type, num_tasks=10, num_tasks_priority=10) | task_type, num_tasks=10, num_tasks_priority=10) | ||||
grabbed_tasks = self.backend.grab_ready_tasks( | grabbed_tasks = self.backend.grab_ready_tasks( | ||||
task_type, num_tasks=10, num_tasks_priority=10) | task_type, num_tasks=10, num_tasks_priority=10) | ||||
for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): | for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): | ||||
self.assertEqual(peeked['status'], 'next_run_not_scheduled') | self.assertEqual(peeked['status'], 'next_run_not_scheduled') | ||||
del peeked['status'] | del peeked['status'] | ||||
self.assertEqual(grabbed['status'], 'next_run_scheduled') | self.assertEqual(grabbed['status'], 'next_run_scheduled') | ||||
del grabbed['status'] | del grabbed['status'] | ||||
self.assertEqual(peeked, grabbed) | self.assertEqual(peeked, grabbed) | ||||
self.assertEqual(peeked['priority'], grabbed['priority']) | self.assertEqual(peeked['priority'], grabbed['priority']) | ||||
def test_get_tasks(self): | def test_get_tasks(self): | ||||
self._create_task_types() | self._create_task_types() | ||||
t = utcnow() | t = utcnow() | ||||
tasks = self._tasks_from_template(self.task1_template, t, 100) | tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) | ||||
tasks = self.backend.create_tasks(tasks) | tasks = self.backend.create_tasks(tasks) | ||||
random.shuffle(tasks) | random.shuffle(tasks) | ||||
while len(tasks) > 1: | while len(tasks) > 1: | ||||
length = random.randrange(1, len(tasks)) | length = random.randrange(1, len(tasks)) | ||||
cur_tasks = tasks[:length] | cur_tasks = tasks[:length] | ||||
tasks[:length] = [] | tasks[:length] = [] | ||||
ret = self.backend.get_tasks(task['id'] for task in cur_tasks) | ret = self.backend.get_tasks(task['id'] for task in cur_tasks) | ||||
self.assertCountEqual(ret, cur_tasks) | self.assertCountEqual(ret, cur_tasks) | ||||
def test_filter_task_to_archive(self): | def test_filter_task_to_archive(self): | ||||
"""Filtering only list disabled recurring or completed oneshot tasks | """Filtering only list disabled recurring or completed oneshot tasks | ||||
""" | """ | ||||
self._create_task_types() | self._create_task_types() | ||||
_time = utcnow() | _time = utcnow() | ||||
recurring = self._tasks_from_template(self.task1_template, _time, 12) | recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12) | ||||
oneshots = self._tasks_from_template(self.task2_template, _time, 12) | oneshots = self._tasks_from_template(TEMPLATES['hg'], _time, 12) | ||||
total_tasks = len(recurring) + len(oneshots) | total_tasks = len(recurring) + len(oneshots) | ||||
# simulate scheduling tasks | # simulate scheduling tasks | ||||
pending_tasks = self.backend.create_tasks(recurring + oneshots) | pending_tasks = self.backend.create_tasks(recurring + oneshots) | ||||
backend_tasks = [{ | backend_tasks = [{ | ||||
'task': task['id'], | 'task': task['id'], | ||||
'backend_id': str(uuid.uuid4()), | 'backend_id': str(uuid.uuid4()), | ||||
'scheduled': utcnow(), | 'scheduled': utcnow(), | ||||
▲ Show 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | def test_filter_task_to_archive(self): | ||||
actual_filtered_per_status[task['task_policy']] += 1 | actual_filtered_per_status[task['task_policy']] += 1 | ||||
self.assertEqual(actual_filtered_per_status, status_per_policy) | self.assertEqual(actual_filtered_per_status, status_per_policy) | ||||
def test_delete_archived_tasks(self): | def test_delete_archived_tasks(self): | ||||
self._create_task_types() | self._create_task_types() | ||||
_time = utcnow() | _time = utcnow() | ||||
recurring = self._tasks_from_template( | recurring = self._tasks_from_template( | ||||
self.task1_template, _time, 12) | TEMPLATES['git'], _time, 12) | ||||
oneshots = self._tasks_from_template( | oneshots = self._tasks_from_template( | ||||
self.task2_template, _time, 12) | TEMPLATES['hg'], _time, 12) | ||||
total_tasks = len(recurring) + len(oneshots) | total_tasks = len(recurring) + len(oneshots) | ||||
pending_tasks = self.backend.create_tasks(recurring + oneshots) | pending_tasks = self.backend.create_tasks(recurring + oneshots) | ||||
backend_tasks = [{ | backend_tasks = [{ | ||||
'task': task['id'], | 'task': task['id'], | ||||
'backend_id': str(uuid.uuid4()), | 'backend_id': str(uuid.uuid4()), | ||||
'scheduled': utcnow(), | 'scheduled': utcnow(), | ||||
} for task in pending_tasks] | } for task in pending_tasks] | ||||
self.backend.mass_schedule_task_runs(backend_tasks) | self.backend.mass_schedule_task_runs(backend_tasks) | ||||
Show All 17 Lines | def test_delete_archived_tasks(self): | ||||
self.assertEqual(tasks_count[0], total_tasks - len(_tasks)) | self.assertEqual(tasks_count[0], total_tasks - len(_tasks)) | ||||
self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks)) | self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks)) | ||||
class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase): | class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase): | ||||
def setUp(self): | def setUp(self): | ||||
super().setUp() | super().setUp() | ||||
self.config = {'scheduling_db': 'dbname=' + self.TEST_DB_NAME} | self.config = {'db': 'dbname=' + self.TEST_DB_NAME} | ||||
self.backend = get_scheduler('local', self.config) | self.backend = get_scheduler('local', self.config) |
Any idea why needed that?