diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index 9791ec6..fbfb5d2 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,100 +1,105 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import glob from datetime import timedelta import pkg_resources from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler import get_scheduler from swh.scheduler.tests import SQL_DIR from swh.scheduler.tests.tasks import register_test_tasks # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith('CELERY')]: os.environ.pop(var) # test_cli tests depends on a en/C locale, so ensure it os.environ['LC_ALL'] = 'C.UTF-8' DUMP_FILES = os.path.join(SQL_DIR, '*.sql') # celery tasks for testing purpose; tasks themselves should be # in swh/scheduler/tests/tasks.py TASK_NAMES = ['ping', 'multiping', 'add', 'error'] @pytest.fixture(scope='session') def celery_enable_logging(): return True @pytest.fixture(scope='session') def celery_includes(): task_modules = [ 'swh.scheduler.tests.tasks', ] for entrypoint in pkg_resources.iter_entry_points('swh.workers'): task_modules.extend(entrypoint.load()().get('task_modules', [])) return task_modules @pytest.fixture(scope='session') def celery_parameters(): return { 'task_cls': 'swh.scheduler.task:SWHTask', } @pytest.fixture(scope='session') def celery_config(): return { 'accept_content': ['application/x-msgpack', 'application/json'], 'task_serializer': 'msgpack', 'result_serializer': 'json', } # override the celery_session_app fixture to monkeypatch the 'main' # swh.scheduler.celery_backend.config.app Celery application # with the test application (and also register test tasks) @pytest.fixture(scope='session') def swh_app(celery_session_app): from swh.scheduler.celery_backend.config import app register_test_tasks(celery_session_app) app = celery_session_app # noqa yield app @pytest.fixture def swh_scheduler(postgresql): scheduler_config = { 'db': postgresql.dsn, } all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() for fname in all_dump_files: with open(fname) as fobj: cursor.execute(fobj.read()) postgresql.commit() scheduler = get_scheduler('local', scheduler_config) for taskname in TASK_NAMES: scheduler.create_task_type({ 'type': 'swh-test-{}'.format(taskname), 'description': 'The {} testing task'.format(taskname), 'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname), 'default_interval': timedelta(days=1), 'min_interval': timedelta(hours=6), 'max_interval': timedelta(days=12), }) return scheduler + + +# this alias is used to be able to easily instantiate a db-backed Scheduler +# eg. for the RPC client/server test suite. +swh_db_scheduler = swh_scheduler diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py index e3c194b..92ccbff 100644 --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -1,54 +1,48 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import unittest -import requests - -from swh.core.api.tests.server_testing import ServerTestFixture - -from swh.scheduler import get_scheduler -from swh.scheduler.api.server import app -from swh.scheduler.tests.test_scheduler import CommonSchedulerTest - - -class RemoteSchedulerTest(CommonSchedulerTest, ServerTestFixture, - unittest.TestCase): - """Test the remote scheduler API. - - This class doesn't define any tests as we want identical - functionality between local and remote scheduler. All the tests are - therefore defined in CommonSchedulerTest. - """ - - def setUp(self): - self.config = { - 'scheduler': { - 'cls': 'local', - 'args': { - 'db': 'dbname=%s' % self.TEST_DB_NAME, - } - } - } - self.app = app # this will setup the local scheduler... - super().setUp() - # accessible through a remote scheduler accessible on the - # given port - self.backend = get_scheduler('remote', {'url': self.url()}) - - def test_site_map(self): - sitemap = requests.get(self.url() + 'site-map') - assert sitemap.headers['Content-Type'] == 'application/json' - sitemap = sitemap.json() - - rules = set(x['rule'] for x in sitemap) - # we expect at least these rules - expected_rules = set('/'+rule for rule in ( - 'set_status_tasks', 'create_task_type', - 'get_task_type', 'get_task_types', 'create_tasks', 'disable_tasks', - 'get_tasks', 'search_tasks', 'peek_ready_tasks', - 'grab_ready_tasks', 'schedule_task_run', 'mass_schedule_task_runs', - 'start_task_run', 'end_task_run', 'filter_task_to_archive', - 'delete_archived_tasks')) - assert rules.issuperset(expected_rules), expected_rules - rules +import pytest +from flask import url_for + +import swh.scheduler.api.server as server +from swh.scheduler.api.client import RemoteScheduler +from swh.scheduler.tests.test_scheduler import TestScheduler # noqa + +# tests are executed using imported class (TestScheduler) using overloaded +# swh_scheduler fixture below + + +# the Flask app used as server in these tests +@pytest.fixture +def app(swh_db_scheduler): + server.scheduler = swh_db_scheduler + yield server.app + + +# the RPCClient class used as client used in these tests +@pytest.fixture +def swh_rpc_client_class(): + return RemoteScheduler + + +@pytest.fixture +def swh_scheduler(swh_rpc_client, app): + yield swh_rpc_client + + +def test_site_map(flask_app_client): + sitemap = flask_app_client.get(url_for('site_map')) + assert sitemap.headers['Content-Type'] == 'application/json' + + rules = set(x['rule'] for x in sitemap.json) + # we expect at least these rules + expected_rules = set('/'+rule for rule in ( + 'set_status_tasks', 'create_task_type', + 'get_task_type', 'get_task_types', 'create_tasks', 'disable_tasks', + 'get_tasks', 'search_tasks', 'get_task_runs', 'peek_ready_tasks', + 'grab_ready_tasks', 'schedule_task_run', 'mass_schedule_task_runs', + 'start_task_run', 'end_task_run', 'filter_task_to_archive', + 'delete_archived_tasks', 'get_priority_ratios')) + assert rules == expected_rules diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index 712bf5d..3172dcb 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,637 +1,596 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime -import os import random -import unittest import uuid from collections import defaultdict import psycopg2 from arrow import utcnow import pytest -from swh.core.db.tests.db_testing import SingleDbTestFixture -from swh.scheduler import get_scheduler - -from . import SQL_DIR - TASK_TYPES = { 'git': { 'type': 'update-git', 'description': 'Update a git repository', 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', 'default_interval': datetime.timedelta(days=64), 'min_interval': datetime.timedelta(hours=12), 'max_interval': datetime.timedelta(days=64), 'backoff_factor': 2, 'max_queue_length': None, 'num_retries': 7, 'retry_delay': datetime.timedelta(hours=2), }, 'hg': { 'type': 'update-hg', 'description': 'Update a mercurial repository', 'backend_name': 'swh.loader.mercurial.tasks.UpdateHgRepository', 'default_interval': datetime.timedelta(days=64), 'min_interval': datetime.timedelta(hours=12), 'max_interval': datetime.timedelta(days=64), 'backoff_factor': 2, 'max_queue_length': None, 'num_retries': 7, 'retry_delay': datetime.timedelta(hours=2), }, } TEMPLATES = { 'git': { 'type': 'update-git', 'arguments': { 'args': [], 'kwargs': {}, }, 'next_run': None, }, 'hg': { 'type': 'update-hg', 'arguments': { 'args': [], 'kwargs': {}, }, 'next_run': None, 'policy': 'oneshot', } } def subdict(d, keys=None, excl=()): if keys is None: keys = [k for k in d.keys()] return {k: d[k] for k in keys if k not in excl} @pytest.mark.db -class CommonSchedulerTest(SingleDbTestFixture): - TEST_DB_NAME = 'softwareheritage-scheduler-test' - TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') - - def tearDown(self): - self.empty_tables() - super().tearDown() - - def empty_tables(self, whitelist=["priority_ratio"]): - query = """SELECT table_name FROM information_schema.tables - WHERE table_schema = %%s and - table_name not in (%s) - """ % ','.join(map(lambda t: "'%s'" % t, whitelist)) - self.cursor.execute(query, ('public', )) - - tables = set(table for (table,) in self.cursor.fetchall()) - - for table in tables: - self.cursor.execute('truncate table %s cascade' % table) - self.conn.commit() +class TestScheduler: + def test_get_priority_ratios(self, swh_scheduler): + assert swh_scheduler.get_priority_ratios() == { + 'high': 0.5, + 'normal': 0.3, + 'low': 0.2, + } - def test_add_task_type(self): + def test_add_task_type(self, swh_scheduler): tt = TASK_TYPES['git'] - self.backend.create_task_type(tt) - self.assertEqual(tt, self.backend.get_task_type(tt['type'])) - with self.assertRaisesRegex(psycopg2.IntegrityError, - r'\(type\)=\(%s\)' % tt['type']): - self.backend.create_task_type(tt) + swh_scheduler.create_task_type(tt) + assert tt == swh_scheduler.get_task_type(tt['type']) + with pytest.raises(psycopg2.IntegrityError, + match=r'\(type\)=\(%s\)' % tt['type']): + swh_scheduler.create_task_type(tt) tt2 = TASK_TYPES['hg'] - self.backend.create_task_type(tt2) - self.assertEqual(tt, self.backend.get_task_type(tt['type'])) - self.assertEqual(tt2, self.backend.get_task_type(tt2['type'])) + swh_scheduler.create_task_type(tt2) + assert tt == swh_scheduler.get_task_type(tt['type']) + assert tt2 == swh_scheduler.get_task_type(tt2['type']) - def test_get_task_types(self): + def test_get_task_types(self, swh_scheduler): tt, tt2 = TASK_TYPES['git'], TASK_TYPES['hg'] - self.backend.create_task_type(tt) - self.backend.create_task_type(tt2) - self.assertCountEqual([tt2, tt], self.backend.get_task_types()) - - @staticmethod - def _task_from_template(template, next_run, priority, *args, **kwargs): - ret = copy.deepcopy(template) - ret['next_run'] = next_run - if priority: - ret['priority'] = priority - if args: - ret['arguments']['args'] = list(args) - if kwargs: - ret['arguments']['kwargs'] = kwargs - return ret - - def _pop_priority(self, priorities): - if not priorities: - return None - for priority, remains in priorities.items(): - if remains > 0: - priorities[priority] = remains - 1 - return priority - return None - - def _tasks_from_template(self, template, max_timestamp, num, - num_priority=0, priorities=None): - if num_priority and priorities: - priorities = { - priority: ratio * num_priority - for priority, ratio in priorities.items() - } - - tasks = [] - for i in range(num + num_priority): - priority = self._pop_priority(priorities) - tasks.append(self._task_from_template( - template, - max_timestamp - datetime.timedelta(microseconds=i), - priority, - 'argument-%03d' % i, - **{'kwarg%03d' % i: 'bogus-kwarg'} - )) - return tasks - - def _create_task_types(self): - for tt in TASK_TYPES.values(): - self.backend.create_task_type(tt) - - def test_create_tasks(self): - priority_ratio = self._priority_ratio() - self._create_task_types() + swh_scheduler.create_task_type(tt) + swh_scheduler.create_task_type(tt2) + actual_task_types = swh_scheduler.get_task_types() + assert tt in actual_task_types + assert tt2 in actual_task_types + + def test_create_tasks(self, swh_scheduler): + priority_ratio = self._priority_ratio(swh_scheduler) + self._create_task_types(swh_scheduler) num_tasks_priority = 100 tasks_1 = self._tasks_from_template( TEMPLATES['git'], utcnow(), 100) tasks_2 = self._tasks_from_template( TEMPLATES['hg'], utcnow(), 100, num_tasks_priority, priorities=priority_ratio) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids - ret1 = self.backend.create_tasks(tasks + tasks_1 + tasks_2) + ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2) set_ret1 = set([t['id'] for t in ret1]) # creating the same set result in the same ids - ret = self.backend.create_tasks(tasks) + ret = swh_scheduler.create_tasks(tasks) set_ret = set([t['id'] for t in ret]) # Idempotence results - self.assertEqual(set_ret, set_ret1) - self.assertEqual(len(ret), len(ret1)) + assert set_ret == set_ret1 + assert len(ret) == len(ret1) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) task_type = TASK_TYPES[orig_task['type'].split('-')[-1]] - self.assertNotIn(task['id'], ids) - self.assertEqual(task['status'], 'next_run_not_scheduled') - self.assertEqual(task['current_interval'], - task_type['default_interval']) - self.assertEqual(task['policy'], orig_task.get('policy', - 'recurring')) + assert task['id'] not in ids + assert task['status'] == 'next_run_not_scheduled' + assert task['current_interval'] == task_type['default_interval'] + assert task['policy'] == orig_task.get('policy', 'recurring') priority = task.get('priority') if priority: actual_priorities[priority] += 1 - self.assertEqual(task['retries_left'], - task_type['num_retries'] or 0) + assert task['retries_left'] == (task_type['num_retries'] or 0) ids.add(task['id']) del task['id'] del task['status'] del task['current_interval'] del task['retries_left'] if 'policy' not in orig_task: del task['policy'] if 'priority' not in orig_task: del task['priority'] - self.assertEqual(task, orig_task) + assert task == orig_task - self.assertEqual(dict(actual_priorities), { + assert dict(actual_priorities) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() - }) + } - def test_peek_ready_tasks_no_priority(self): - self._create_task_types() + def test_peek_ready_tasks_no_priority(self, swh_scheduler): + self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES['git']['type'] tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) random.shuffle(tasks) - self.backend.create_tasks(tasks) + swh_scheduler.create_tasks(tasks) - ready_tasks = self.backend.peek_ready_tasks(task_type) - self.assertEqual(len(ready_tasks), len(tasks)) + ready_tasks = swh_scheduler.peek_ready_tasks(task_type) + assert len(ready_tasks) == len(tasks) for i in range(len(ready_tasks) - 1): - self.assertLessEqual(ready_tasks[i]['next_run'], - ready_tasks[i+1]['next_run']) + assert ready_tasks[i]['next_run'] <= ready_tasks[i+1]['next_run'] # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks)//2) - ready_tasks_limited = self.backend.peek_ready_tasks( + ready_tasks_limited = swh_scheduler.peek_ready_tasks( task_type, num_tasks=limit) - self.assertEqual(len(ready_tasks_limited), limit) - self.assertCountEqual(ready_tasks_limited, ready_tasks[:limit]) + assert len(ready_tasks_limited) == limit + assert ready_tasks_limited == ready_tasks[:limit] # Limit by timestamp max_ts = tasks[limit-1]['next_run'] - ready_tasks_timestamped = self.backend.peek_ready_tasks( + ready_tasks_timestamped = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts) for ready_task in ready_tasks_timestamped: - self.assertLessEqual(ready_task['next_run'], max_ts) + assert ready_task['next_run'] <= max_ts # Make sure we get proper behavior for the first ready tasks - self.assertCountEqual( - ready_tasks[:len(ready_tasks_timestamped)], - ready_tasks_timestamped, - ) + assert ready_tasks[:len(ready_tasks_timestamped)] \ + == ready_tasks_timestamped # Limit by both - ready_tasks_both = self.backend.peek_ready_tasks( + ready_tasks_both = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit//3) - self.assertLessEqual(len(ready_tasks_both), limit//3) + assert len(ready_tasks_both) <= limit//3 for ready_task in ready_tasks_both: - self.assertLessEqual(ready_task['next_run'], max_ts) - self.assertIn(ready_task, ready_tasks[:limit//3]) - - def _priority_ratio(self): - self.cursor.execute('select id, ratio from priority_ratio') - priority_ratio = {} - for row in self.cursor.fetchall(): - priority_ratio[row[0]] = row[1] - return priority_ratio - - def test_peek_ready_tasks_mixed_priorities(self): - priority_ratio = self._priority_ratio() - self._create_task_types() + assert ready_task['next_run'] <= max_ts + assert ready_task in ready_tasks[:limit//3] + + def _priority_ratio(self, swh_scheduler): + return swh_scheduler.get_priority_ratios() + + def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler): + priority_ratio = self._priority_ratio(swh_scheduler) + self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES['git']['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( TEMPLATES['git'], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) - self.backend.create_tasks(tasks) + swh_scheduler.create_tasks(tasks) # take all available tasks - ready_tasks = self.backend.peek_ready_tasks( + ready_tasks = swh_scheduler.peek_ready_tasks( task_type) - self.assertEqual(len(ready_tasks), len(tasks)) - self.assertEqual(num_tasks_priority + num_tasks_no_priority, - len(ready_tasks)) + assert len(ready_tasks) == len(tasks) + assert num_tasks_priority + num_tasks_no_priority \ + == len(ready_tasks) count_tasks_per_priority = defaultdict(int) for task in ready_tasks: priority = task.get('priority') if priority: count_tasks_per_priority[priority] += 1 - self.assertEqual(dict(count_tasks_per_priority), { + assert dict(count_tasks_per_priority) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() - }) + } # Only get some ready tasks num_tasks = random.randrange(5, 5 + num_tasks_no_priority//2) num_tasks_priority = random.randrange(5, num_tasks_priority//2) - ready_tasks_limited = self.backend.peek_ready_tasks( + ready_tasks_limited = swh_scheduler.peek_ready_tasks( task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) count_tasks_per_priority = defaultdict(int) for task in ready_tasks_limited: priority = task.get('priority') count_tasks_per_priority[priority] += 1 import math for priority, ratio in priority_ratio.items(): expected_count = math.ceil(ratio * num_tasks_priority) actual_prio = count_tasks_per_priority[priority] - self.assertTrue( - actual_prio == expected_count or - actual_prio == expected_count + 1) + assert (actual_prio == expected_count or + actual_prio == expected_count + 1) - self.assertEqual(count_tasks_per_priority[None], num_tasks) + assert count_tasks_per_priority[None] == num_tasks - def test_grab_ready_tasks(self): - priority_ratio = self._priority_ratio() - self._create_task_types() + def test_grab_ready_tasks(self, swh_scheduler): + priority_ratio = self._priority_ratio(swh_scheduler) + self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES['git']['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( TEMPLATES['git'], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) - self.backend.create_tasks(tasks) + swh_scheduler.create_tasks(tasks) - first_ready_tasks = self.backend.peek_ready_tasks( + first_ready_tasks = swh_scheduler.peek_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) - grabbed_tasks = self.backend.grab_ready_tasks( + grabbed_tasks = swh_scheduler.grab_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): - self.assertEqual(peeked['status'], 'next_run_not_scheduled') + assert peeked['status'] == 'next_run_not_scheduled' del peeked['status'] - self.assertEqual(grabbed['status'], 'next_run_scheduled') + assert grabbed['status'] == 'next_run_scheduled' del grabbed['status'] - self.assertEqual(peeked, grabbed) - self.assertEqual(peeked['priority'], grabbed['priority']) + assert peeked == grabbed + assert peeked['priority'] == grabbed['priority'] - def test_get_tasks(self): - self._create_task_types() + def test_get_tasks(self, swh_scheduler): + self._create_task_types(swh_scheduler) t = utcnow() tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) - tasks = self.backend.create_tasks(tasks) + tasks = swh_scheduler.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) - cur_tasks = tasks[:length] + cur_tasks = sorted(tasks[:length], key=lambda x: x['id']) tasks[:length] = [] - ret = self.backend.get_tasks(task['id'] for task in cur_tasks) - self.assertCountEqual(ret, cur_tasks) + ret = swh_scheduler.get_tasks(task['id'] for task in cur_tasks) + # result is not guaranteed to be sorted + ret.sort(key=lambda x: x['id']) + assert ret == cur_tasks - def test_search_tasks(self): + def test_search_tasks(self, swh_scheduler): def make_real_dicts(l): """RealDictRow is not a real dict.""" return [dict(d.items()) for d in l] - self._create_task_types() + self._create_task_types(swh_scheduler) t = utcnow() tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) - tasks = self.backend.create_tasks(tasks) - self.assertCountEqual( - make_real_dicts(self.backend.search_tasks()), - make_real_dicts(tasks)) + tasks = swh_scheduler.create_tasks(tasks) + assert make_real_dicts(swh_scheduler.search_tasks()) \ + == make_real_dicts(tasks) - def test_filter_task_to_archive(self): + def test_filter_task_to_archive(self, swh_scheduler): """Filtering only list disabled recurring or completed oneshot tasks """ - self._create_task_types() + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template(TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks - pending_tasks = self.backend.create_tasks(recurring + oneshots) + pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] - self.backend.mass_schedule_task_runs(backend_tasks) + swh_scheduler.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: - t = self.backend.end_task_run( + t = swh_scheduler.end_task_run( task['backend_id'], status='eventful') _tasks.append(t) # Randomly update task's status per policy status_per_policy = {'recurring': 0, 'oneshot': 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] 'recurring': [(1, 'disabled'), (0, 'completed'), (0, 'next_run_not_scheduled')], 'oneshot': [(0, 'next_run_not_scheduled'), (1, 'disabled'), (1, 'completed')] } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task['policy'] _task_ids[policy].append(task['id']) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task['id']) - self.backend.disable_tasks(tasks_to_update['recurring']) + swh_scheduler.disable_tasks(tasks_to_update['recurring']) # hack: change the status to something else than completed/disabled - self.backend.set_status_tasks( + swh_scheduler.set_status_tasks( _task_ids['oneshot'], status='next_run_not_scheduled') # complete the tasks to update - self.backend.set_status_tasks( + swh_scheduler.set_status_tasks( tasks_to_update['oneshot'], status='completed') total_tasks_filtered = (status_per_policy['recurring'] + status_per_policy['oneshot']) # retrieve tasks to archive after = _time.shift(days=-1).format('YYYY-MM-DD') before = utcnow().shift(days=1).format('YYYY-MM-DD') - tasks_to_archive = list(self.backend.filter_task_to_archive( + tasks_to_archive = list(swh_scheduler.filter_task_to_archive( after_ts=after, before_ts=before, limit=total_tasks)) - self.assertEqual(len(tasks_to_archive), total_tasks_filtered) + assert len(tasks_to_archive) == total_tasks_filtered actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} for task in tasks_to_archive: actual_filtered_per_status[task['task_policy']] += 1 - self.assertEqual(actual_filtered_per_status, status_per_policy) + assert actual_filtered_per_status == status_per_policy - def test_delete_archived_tasks(self): - self._create_task_types() + def test_delete_archived_tasks(self, swh_scheduler): + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) - pending_tasks = self.backend.create_tasks(recurring + oneshots) + pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] - self.backend.mass_schedule_task_runs(backend_tasks) + swh_scheduler.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: - t = self.backend.end_task_run( + t = swh_scheduler.end_task_run( task['backend_id'], status='eventful') c = random.randint(0, 100) if c <= percent: _tasks.append({'task_id': t['task'], 'task_run_id': t['id']}) - self.backend.delete_archived_tasks(_tasks) + swh_scheduler.delete_archived_tasks(_tasks) - self.cursor.execute('select count(*) from task') - tasks_count = self.cursor.fetchone() + all_tasks = [task['id'] for task in swh_scheduler.search_tasks()] + tasks_count = len(all_tasks) + tasks_run_count = len(swh_scheduler.get_task_runs(all_tasks)) - self.cursor.execute('select count(*) from task_run') - tasks_run_count = self.cursor.fetchone() + assert tasks_count == total_tasks - len(_tasks) + assert tasks_run_count == total_tasks - len(_tasks) - self.assertEqual(tasks_count[0], total_tasks - len(_tasks)) - self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks)) - - def test_get_task_runs_no_task(self): + def test_get_task_runs_no_task(self, swh_scheduler): '''No task exist in the scheduler's db, get_task_runs() should always return an empty list. ''' - self.assertFalse(self.backend.get_task_runs(task_ids=())) - self.assertFalse(self.backend.get_task_runs(task_ids=(1, 2, 3))) - self.assertFalse(self.backend.get_task_runs(task_ids=(1, 2, 3), - limit=10)) + assert not swh_scheduler.get_task_runs(task_ids=()) + assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) + assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), + limit=10) - def test_get_task_runs_no_task_executed(self): + def test_get_task_runs_no_task_executed(self, swh_scheduler): '''No task has been executed yet, get_task_runs() should always return an empty list. ''' - self._create_task_types() + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) - self.backend.create_tasks(recurring + oneshots) + swh_scheduler.create_tasks(recurring + oneshots) - self.assertFalse(self.backend.get_task_runs( - task_ids=())) - self.assertFalse(self.backend.get_task_runs( - task_ids=(1, 2, 3))) - self.assertFalse(self.backend.get_task_runs( - task_ids=(1, 2, 3), limit=10)) + assert not swh_scheduler.get_task_runs(task_ids=()) + assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) + assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) - def test_get_task_runs_with_scheduled(self): + def test_get_task_runs_with_scheduled(self, swh_scheduler): '''Some tasks have been scheduled but not executed yet, get_task_runs() should not return an empty list. limit should behave as expected. ''' - self._create_task_types() + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) - pending_tasks = self.backend.create_tasks(recurring + oneshots) + pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] - self.backend.mass_schedule_task_runs(backend_tasks) + swh_scheduler.mass_schedule_task_runs(backend_tasks) - self.assertFalse(self.backend.get_task_runs( - task_ids=[total_tasks + 1])) + assert not swh_scheduler.get_task_runs( + task_ids=[total_tasks + 1]) btask = backend_tasks[0] - runs = self.backend.get_task_runs( + runs = swh_scheduler.get_task_runs( task_ids=[btask['task']]) - self.assertEqual(len(runs), 1) + assert len(runs) == 1 run = runs[0] - self.assertEqual(subdict(run, excl=('id',)), - {'task': btask['task'], - 'backend_id': btask['backend_id'], - 'scheduled': btask['scheduled'], - 'started': None, - 'ended': None, - 'metadata': None, - 'status': 'scheduled', - }) - - runs = self.backend.get_task_runs( + assert subdict(run, excl=('id',)) == { + 'task': btask['task'], + 'backend_id': btask['backend_id'], + 'scheduled': btask['scheduled'], + 'started': None, + 'ended': None, + 'metadata': None, + 'status': 'scheduled', + } + + runs = swh_scheduler.get_task_runs( task_ids=[bt['task'] for bt in backend_tasks], limit=2) - self.assertEqual(len(runs), 2) + assert len(runs) == 2 - runs = self.backend.get_task_runs( + runs = swh_scheduler.get_task_runs( task_ids=[bt['task'] for bt in backend_tasks]) - self.assertEqual(len(runs), total_tasks) + assert len(runs) == total_tasks keys = ('task', 'backend_id', 'scheduled') - self.assertEqual(sorted([subdict(x, keys) for x in runs], - key=lambda x: x['task']), - backend_tasks) + assert sorted([subdict(x, keys) for x in runs], + key=lambda x: x['task']) == backend_tasks - def test_get_task_runs_with_executed(self): + def test_get_task_runs_with_executed(self, swh_scheduler): '''Some tasks have been executed, get_task_runs() should not return an empty list. limit should behave as expected. ''' - self._create_task_types() + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) - pending_tasks = self.backend.create_tasks(recurring + oneshots) + pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] - self.backend.mass_schedule_task_runs(backend_tasks) + swh_scheduler.mass_schedule_task_runs(backend_tasks) btask = backend_tasks[0] ts = utcnow() - self.backend.start_task_run(btask['backend_id'], - metadata={'something': 'stupid'}, - timestamp=ts) - runs = self.backend.get_task_runs(task_ids=[btask['task']]) - self.assertEqual(len(runs), 1) - self.assertEqual(subdict(runs[0], excl=('id')), { + swh_scheduler.start_task_run(btask['backend_id'], + metadata={'something': 'stupid'}, + timestamp=ts) + runs = swh_scheduler.get_task_runs(task_ids=[btask['task']]) + assert len(runs) == 1 + assert subdict(runs[0], excl=('id')) == { 'task': btask['task'], 'backend_id': btask['backend_id'], 'scheduled': btask['scheduled'], 'started': ts, 'ended': None, 'metadata': {'something': 'stupid'}, 'status': 'started', - }) + } ts2 = utcnow() - self.backend.end_task_run(btask['backend_id'], - metadata={'other': 'stuff'}, - timestamp=ts2, - status='eventful') - runs = self.backend.get_task_runs(task_ids=[btask['task']]) - self.assertEqual(len(runs), 1) - self.assertEqual(subdict(runs[0], excl=('id')), { + swh_scheduler.end_task_run(btask['backend_id'], + metadata={'other': 'stuff'}, + timestamp=ts2, + status='eventful') + runs = swh_scheduler.get_task_runs(task_ids=[btask['task']]) + assert len(runs) == 1 + assert subdict(runs[0], excl=('id')) == { 'task': btask['task'], 'backend_id': btask['backend_id'], 'scheduled': btask['scheduled'], 'started': ts, 'ended': ts2, 'metadata': {'something': 'stupid', 'other': 'stuff'}, 'status': 'eventful', - }) + } + + @staticmethod + def _task_from_template(template, next_run, priority, *args, **kwargs): + ret = copy.deepcopy(template) + ret['next_run'] = next_run + if priority: + ret['priority'] = priority + if args: + ret['arguments']['args'] = list(args) + if kwargs: + ret['arguments']['kwargs'] = kwargs + return ret + + def _pop_priority(self, priorities): + if not priorities: + return None + for priority, remains in priorities.items(): + if remains > 0: + priorities[priority] = remains - 1 + return priority + return None + + def _tasks_from_template(self, template, max_timestamp, num, + num_priority=0, priorities=None): + if num_priority and priorities: + priorities = { + priority: ratio * num_priority + for priority, ratio in priorities.items() + } + tasks = [] + for i in range(num + num_priority): + priority = self._pop_priority(priorities) + tasks.append(self._task_from_template( + template, + max_timestamp - datetime.timedelta(microseconds=i), + priority, + 'argument-%03d' % i, + **{'kwarg%03d' % i: 'bogus-kwarg'} + )) + return tasks -class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase): - def setUp(self): - super().setUp() - self.config = {'db': 'dbname=' + self.TEST_DB_NAME} - self.backend = get_scheduler('local', self.config) + def _create_task_types(self, scheduler): + for tt in TASK_TYPES.values(): + scheduler.create_task_type(tt)