diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -25,3 +25,6 @@ [mypy-pytest.*] ignore_missing_imports = True + +[mypy-pytest_postgresql.*] +ignore_missing_imports = True diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,5 +1,5 @@ # pytest<4 because of https://github.com/pytest-dev/pytest/issues/4641 pytest < 4 -pytest-postgresql +pytest-postgresql >= 2.1.0 celery >= 4 hypothesis >= 3.11.0 diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py --- a/swh/scheduler/api/client.py +++ b/swh/scheduler/api/client.py @@ -104,3 +104,6 @@ def delete_archived_tasks(self, task_ids): return self.post('delete_archived_tasks', {'task_ids': task_ids}) + + def get_priority_ratios(self): + return self.get('get_priority_ratios') diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -170,6 +170,13 @@ return get_sched().delete_archived_tasks(**decode_request(request)) +@app.route('/get_priority_ratios', methods=['GET', 'POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) +def get_priority_ratios(): + return get_sched().get_priority_ratios(**decode_request(request)) + + @app.route("/site-map") @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -486,3 +486,8 @@ args.append(limit) cur.execute(query, args) return cur.fetchall() + + @db_transaction() + def get_priority_ratios(self, db=None, cur=None): + cur.execute('select id, ratio from priority_ratio') + return {row['id']: row['ratio'] for row in cur.fetchall()} diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -74,15 +74,10 @@ @pytest.fixture -def swh_scheduler(request, postgresql_proc, postgresql): +def swh_scheduler(postgresql): scheduler_config = { - 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( - host=postgresql_proc.host, - port=postgresql_proc.port, - user='postgres', - dbname='tests') + 'db': postgresql.dsn, } - all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() @@ -103,3 +98,8 @@ }) return scheduler + + +# this alias is used to be able to easily instantiate a db-backed Scheduler +# eg. for the RPC client/server test suite. +swh_db_scheduler = swh_scheduler diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -3,52 +3,46 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import unittest -import requests - -from swh.core.api.tests.server_testing import ServerTestFixture - -from swh.scheduler import get_scheduler -from swh.scheduler.api.server import app -from swh.scheduler.tests.test_scheduler import CommonSchedulerTest - - -class RemoteSchedulerTest(CommonSchedulerTest, ServerTestFixture, - unittest.TestCase): - """Test the remote scheduler API. - - This class doesn't define any tests as we want identical - functionality between local and remote scheduler. All the tests are - therefore defined in CommonSchedulerTest. - """ - - def setUp(self): - self.config = { - 'scheduler': { - 'cls': 'local', - 'args': { - 'db': 'dbname=%s' % self.TEST_DB_NAME, - } - } - } - self.app = app # this will setup the local scheduler... - super().setUp() - # accessible through a remote scheduler accessible on the - # given port - self.backend = get_scheduler('remote', {'url': self.url()}) - - def test_site_map(self): - sitemap = requests.get(self.url() + 'site-map') - assert sitemap.headers['Content-Type'] == 'application/json' - sitemap = sitemap.json() - - rules = set(x['rule'] for x in sitemap) - # we expect at least these rules - expected_rules = set('/'+rule for rule in ( - 'set_status_tasks', 'create_task_type', - 'get_task_type', 'get_task_types', 'create_tasks', 'disable_tasks', - 'get_tasks', 'search_tasks', 'peek_ready_tasks', - 'grab_ready_tasks', 'schedule_task_run', 'mass_schedule_task_runs', - 'start_task_run', 'end_task_run', 'filter_task_to_archive', - 'delete_archived_tasks')) - assert rules.issuperset(expected_rules), expected_rules - rules +import pytest +from flask import url_for + +import swh.scheduler.api.server as server +from swh.scheduler.api.client import RemoteScheduler +from swh.scheduler.tests.test_scheduler import TestScheduler # noqa + +# tests are executed using imported class (TestScheduler) using overloaded +# swh_scheduler fixture below + + +# the Flask app used as server in these tests +@pytest.fixture +def app(swh_db_scheduler): + server.scheduler = swh_db_scheduler + yield server.app + + +# the RPCClient class used as client used in these tests +@pytest.fixture +def swh_rpc_client_class(): + return RemoteScheduler + + +@pytest.fixture +def swh_scheduler(swh_rpc_client, app): + yield swh_rpc_client + + +def test_site_map(flask_app_client): + sitemap = flask_app_client.get(url_for('site_map')) + assert sitemap.headers['Content-Type'] == 'application/json' + + rules = set(x['rule'] for x in sitemap.json) + # we expect at least these rules + expected_rules = set('/'+rule for rule in ( + 'set_status_tasks', 'create_task_type', + 'get_task_type', 'get_task_types', 'create_tasks', 'disable_tasks', + 'get_tasks', 'search_tasks', 'get_task_runs', 'peek_ready_tasks', + 'grab_ready_tasks', 'schedule_task_run', 'mass_schedule_task_runs', + 'start_task_run', 'end_task_run', 'filter_task_to_archive', + 'delete_archived_tasks', 'get_priority_ratios')) + assert rules == expected_rules diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -5,9 +5,7 @@ import copy import datetime -import os import random -import unittest import uuid from collections import defaultdict @@ -15,11 +13,6 @@ from arrow import utcnow import pytest -from swh.core.db.tests.db_testing import SingleDbTestFixture -from swh.scheduler import get_scheduler - -from . import SQL_DIR - TASK_TYPES = { 'git': { @@ -76,94 +69,38 @@ @pytest.mark.db -class CommonSchedulerTest(SingleDbTestFixture): - TEST_DB_NAME = 'softwareheritage-scheduler-test' - TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') - - def tearDown(self): - self.empty_tables() - super().tearDown() - - def empty_tables(self, whitelist=["priority_ratio"]): - query = """SELECT table_name FROM information_schema.tables - WHERE table_schema = %%s and - table_name not in (%s) - """ % ','.join(map(lambda t: "'%s'" % t, whitelist)) - self.cursor.execute(query, ('public', )) - - tables = set(table for (table,) in self.cursor.fetchall()) - - for table in tables: - self.cursor.execute('truncate table %s cascade' % table) - self.conn.commit() +class TestScheduler: + def test_get_priority_ratios(self, swh_scheduler): + assert swh_scheduler.get_priority_ratios() == { + 'high': 0.5, + 'normal': 0.3, + 'low': 0.2, + } - def test_add_task_type(self): + def test_add_task_type(self, swh_scheduler): tt = TASK_TYPES['git'] - self.backend.create_task_type(tt) - self.assertEqual(tt, self.backend.get_task_type(tt['type'])) - with self.assertRaisesRegex(psycopg2.IntegrityError, - r'\(type\)=\(%s\)' % tt['type']): - self.backend.create_task_type(tt) + swh_scheduler.create_task_type(tt) + assert tt == swh_scheduler.get_task_type(tt['type']) + with pytest.raises(psycopg2.IntegrityError, + match=r'\(type\)=\(%s\)' % tt['type']): + swh_scheduler.create_task_type(tt) tt2 = TASK_TYPES['hg'] - self.backend.create_task_type(tt2) - self.assertEqual(tt, self.backend.get_task_type(tt['type'])) - self.assertEqual(tt2, self.backend.get_task_type(tt2['type'])) + swh_scheduler.create_task_type(tt2) + assert tt == swh_scheduler.get_task_type(tt['type']) + assert tt2 == swh_scheduler.get_task_type(tt2['type']) - def test_get_task_types(self): + def test_get_task_types(self, swh_scheduler): tt, tt2 = TASK_TYPES['git'], TASK_TYPES['hg'] - self.backend.create_task_type(tt) - self.backend.create_task_type(tt2) - self.assertCountEqual([tt2, tt], self.backend.get_task_types()) - - @staticmethod - def _task_from_template(template, next_run, priority, *args, **kwargs): - ret = copy.deepcopy(template) - ret['next_run'] = next_run - if priority: - ret['priority'] = priority - if args: - ret['arguments']['args'] = list(args) - if kwargs: - ret['arguments']['kwargs'] = kwargs - return ret - - def _pop_priority(self, priorities): - if not priorities: - return None - for priority, remains in priorities.items(): - if remains > 0: - priorities[priority] = remains - 1 - return priority - return None - - def _tasks_from_template(self, template, max_timestamp, num, - num_priority=0, priorities=None): - if num_priority and priorities: - priorities = { - priority: ratio * num_priority - for priority, ratio in priorities.items() - } - - tasks = [] - for i in range(num + num_priority): - priority = self._pop_priority(priorities) - tasks.append(self._task_from_template( - template, - max_timestamp - datetime.timedelta(microseconds=i), - priority, - 'argument-%03d' % i, - **{'kwarg%03d' % i: 'bogus-kwarg'} - )) - return tasks - - def _create_task_types(self): - for tt in TASK_TYPES.values(): - self.backend.create_task_type(tt) - - def test_create_tasks(self): - priority_ratio = self._priority_ratio() - self._create_task_types() + swh_scheduler.create_task_type(tt) + swh_scheduler.create_task_type(tt2) + actual_task_types = swh_scheduler.get_task_types() + assert tt in actual_task_types + assert tt2 in actual_task_types + + def test_create_tasks(self, swh_scheduler): + priority_ratio = self._priority_ratio(swh_scheduler) + self._create_task_types(swh_scheduler) num_tasks_priority = 100 tasks_1 = self._tasks_from_template( TEMPLATES['git'], utcnow(), 100) @@ -173,16 +110,16 @@ tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids - ret1 = self.backend.create_tasks(tasks + tasks_1 + tasks_2) + ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2) set_ret1 = set([t['id'] for t in ret1]) # creating the same set result in the same ids - ret = self.backend.create_tasks(tasks) + ret = swh_scheduler.create_tasks(tasks) set_ret = set([t['id'] for t in ret]) # Idempotence results - self.assertEqual(set_ret, set_ret1) - self.assertEqual(len(ret), len(ret1)) + assert set_ret == set_ret1 + assert len(ret) == len(ret1) ids = set() actual_priorities = defaultdict(int) @@ -190,18 +127,15 @@ for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) task_type = TASK_TYPES[orig_task['type'].split('-')[-1]] - self.assertNotIn(task['id'], ids) - self.assertEqual(task['status'], 'next_run_not_scheduled') - self.assertEqual(task['current_interval'], - task_type['default_interval']) - self.assertEqual(task['policy'], orig_task.get('policy', - 'recurring')) + assert task['id'] not in ids + assert task['status'] == 'next_run_not_scheduled' + assert task['current_interval'] == task_type['default_interval'] + assert task['policy'] == orig_task.get('policy', 'recurring') priority = task.get('priority') if priority: actual_priorities[priority] += 1 - self.assertEqual(task['retries_left'], - task_type['num_retries'] or 0) + assert task['retries_left'] == (task_type['num_retries'] or 0) ids.add(task['id']) del task['id'] del task['status'] @@ -211,67 +145,60 @@ del task['policy'] if 'priority' not in orig_task: del task['priority'] - self.assertEqual(task, orig_task) + assert task == orig_task - self.assertEqual(dict(actual_priorities), { + assert dict(actual_priorities) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() - }) + } - def test_peek_ready_tasks_no_priority(self): - self._create_task_types() + def test_peek_ready_tasks_no_priority(self, swh_scheduler): + self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES['git']['type'] tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) random.shuffle(tasks) - self.backend.create_tasks(tasks) + swh_scheduler.create_tasks(tasks) - ready_tasks = self.backend.peek_ready_tasks(task_type) - self.assertEqual(len(ready_tasks), len(tasks)) + ready_tasks = swh_scheduler.peek_ready_tasks(task_type) + assert len(ready_tasks) == len(tasks) for i in range(len(ready_tasks) - 1): - self.assertLessEqual(ready_tasks[i]['next_run'], - ready_tasks[i+1]['next_run']) + assert ready_tasks[i]['next_run'] <= ready_tasks[i+1]['next_run'] # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks)//2) - ready_tasks_limited = self.backend.peek_ready_tasks( + ready_tasks_limited = swh_scheduler.peek_ready_tasks( task_type, num_tasks=limit) - self.assertEqual(len(ready_tasks_limited), limit) - self.assertCountEqual(ready_tasks_limited, ready_tasks[:limit]) + assert len(ready_tasks_limited) == limit + assert ready_tasks_limited == ready_tasks[:limit] # Limit by timestamp max_ts = tasks[limit-1]['next_run'] - ready_tasks_timestamped = self.backend.peek_ready_tasks( + ready_tasks_timestamped = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts) for ready_task in ready_tasks_timestamped: - self.assertLessEqual(ready_task['next_run'], max_ts) + assert ready_task['next_run'] <= max_ts # Make sure we get proper behavior for the first ready tasks - self.assertCountEqual( - ready_tasks[:len(ready_tasks_timestamped)], - ready_tasks_timestamped, - ) + assert ready_tasks[:len(ready_tasks_timestamped)] \ + == ready_tasks_timestamped # Limit by both - ready_tasks_both = self.backend.peek_ready_tasks( + ready_tasks_both = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit//3) - self.assertLessEqual(len(ready_tasks_both), limit//3) + assert len(ready_tasks_both) <= limit//3 for ready_task in ready_tasks_both: - self.assertLessEqual(ready_task['next_run'], max_ts) - self.assertIn(ready_task, ready_tasks[:limit//3]) - - def _priority_ratio(self): - self.cursor.execute('select id, ratio from priority_ratio') - priority_ratio = {} - for row in self.cursor.fetchall(): - priority_ratio[row[0]] = row[1] - return priority_ratio - - def test_peek_ready_tasks_mixed_priorities(self): - priority_ratio = self._priority_ratio() - self._create_task_types() + assert ready_task['next_run'] <= max_ts + assert ready_task in ready_tasks[:limit//3] + + def _priority_ratio(self, swh_scheduler): + return swh_scheduler.get_priority_ratios() + + def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler): + priority_ratio = self._priority_ratio(swh_scheduler) + self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES['git']['type'] num_tasks_priority = 100 @@ -284,15 +211,15 @@ priorities=priority_ratio) random.shuffle(tasks) - self.backend.create_tasks(tasks) + swh_scheduler.create_tasks(tasks) # take all available tasks - ready_tasks = self.backend.peek_ready_tasks( + ready_tasks = swh_scheduler.peek_ready_tasks( task_type) - self.assertEqual(len(ready_tasks), len(tasks)) - self.assertEqual(num_tasks_priority + num_tasks_no_priority, - len(ready_tasks)) + assert len(ready_tasks) == len(tasks) + assert num_tasks_priority + num_tasks_no_priority \ + == len(ready_tasks) count_tasks_per_priority = defaultdict(int) for task in ready_tasks: @@ -300,15 +227,15 @@ if priority: count_tasks_per_priority[priority] += 1 - self.assertEqual(dict(count_tasks_per_priority), { + assert dict(count_tasks_per_priority) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() - }) + } # Only get some ready tasks num_tasks = random.randrange(5, 5 + num_tasks_no_priority//2) num_tasks_priority = random.randrange(5, num_tasks_priority//2) - ready_tasks_limited = self.backend.peek_ready_tasks( + ready_tasks_limited = swh_scheduler.peek_ready_tasks( task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) @@ -321,15 +248,14 @@ for priority, ratio in priority_ratio.items(): expected_count = math.ceil(ratio * num_tasks_priority) actual_prio = count_tasks_per_priority[priority] - self.assertTrue( - actual_prio == expected_count or - actual_prio == expected_count + 1) + assert (actual_prio == expected_count or + actual_prio == expected_count + 1) - self.assertEqual(count_tasks_per_priority[None], num_tasks) + assert count_tasks_per_priority[None] == num_tasks - def test_grab_ready_tasks(self): - priority_ratio = self._priority_ratio() - self._create_task_types() + def test_grab_ready_tasks(self, swh_scheduler): + priority_ratio = self._priority_ratio(swh_scheduler) + self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES['git']['type'] num_tasks_priority = 100 @@ -341,70 +267,71 @@ num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) - self.backend.create_tasks(tasks) + swh_scheduler.create_tasks(tasks) - first_ready_tasks = self.backend.peek_ready_tasks( + first_ready_tasks = swh_scheduler.peek_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) - grabbed_tasks = self.backend.grab_ready_tasks( + grabbed_tasks = swh_scheduler.grab_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): - self.assertEqual(peeked['status'], 'next_run_not_scheduled') + assert peeked['status'] == 'next_run_not_scheduled' del peeked['status'] - self.assertEqual(grabbed['status'], 'next_run_scheduled') + assert grabbed['status'] == 'next_run_scheduled' del grabbed['status'] - self.assertEqual(peeked, grabbed) - self.assertEqual(peeked['priority'], grabbed['priority']) + assert peeked == grabbed + assert peeked['priority'] == grabbed['priority'] - def test_get_tasks(self): - self._create_task_types() + def test_get_tasks(self, swh_scheduler): + self._create_task_types(swh_scheduler) t = utcnow() tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) - tasks = self.backend.create_tasks(tasks) + tasks = swh_scheduler.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) - cur_tasks = tasks[:length] + cur_tasks = sorted(tasks[:length], key=lambda x: x['id']) tasks[:length] = [] - ret = self.backend.get_tasks(task['id'] for task in cur_tasks) - self.assertCountEqual(ret, cur_tasks) + ret = swh_scheduler.get_tasks(task['id'] for task in cur_tasks) + # result is not guaranteed to be sorted + ret.sort(key=lambda x: x['id']) + assert ret == cur_tasks - def test_search_tasks(self): + def test_search_tasks(self, swh_scheduler): def make_real_dicts(l): """RealDictRow is not a real dict.""" return [dict(d.items()) for d in l] - self._create_task_types() + self._create_task_types(swh_scheduler) t = utcnow() tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) - tasks = self.backend.create_tasks(tasks) - self.assertCountEqual( - make_real_dicts(self.backend.search_tasks()), - make_real_dicts(tasks)) + tasks = swh_scheduler.create_tasks(tasks) + assert make_real_dicts(swh_scheduler.search_tasks()) \ + == make_real_dicts(tasks) - def test_filter_task_to_archive(self): + def test_filter_task_to_archive(self, swh_scheduler): """Filtering only list disabled recurring or completed oneshot tasks """ - self._create_task_types() + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template(TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks - pending_tasks = self.backend.create_tasks(recurring + oneshots) + pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] - self.backend.mass_schedule_task_runs(backend_tasks) + swh_scheduler.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: - t = self.backend.end_task_run( + t = swh_scheduler.end_task_run( task['backend_id'], status='eventful') _tasks.append(t) @@ -433,12 +360,12 @@ status_per_policy[policy] += status[0] tasks_to_update[policy].append(task['id']) - self.backend.disable_tasks(tasks_to_update['recurring']) + swh_scheduler.disable_tasks(tasks_to_update['recurring']) # hack: change the status to something else than completed/disabled - self.backend.set_status_tasks( + swh_scheduler.set_status_tasks( _task_ids['oneshot'], status='next_run_not_scheduled') # complete the tasks to update - self.backend.set_status_tasks( + swh_scheduler.set_status_tasks( tasks_to_update['oneshot'], status='completed') total_tasks_filtered = (status_per_policy['recurring'] + @@ -447,162 +374,156 @@ # retrieve tasks to archive after = _time.shift(days=-1).format('YYYY-MM-DD') before = utcnow().shift(days=1).format('YYYY-MM-DD') - tasks_to_archive = list(self.backend.filter_task_to_archive( + tasks_to_archive = list(swh_scheduler.filter_task_to_archive( after_ts=after, before_ts=before, limit=total_tasks)) - self.assertEqual(len(tasks_to_archive), total_tasks_filtered) + assert len(tasks_to_archive) == total_tasks_filtered actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} for task in tasks_to_archive: actual_filtered_per_status[task['task_policy']] += 1 - self.assertEqual(actual_filtered_per_status, status_per_policy) + assert actual_filtered_per_status == status_per_policy - def test_delete_archived_tasks(self): - self._create_task_types() + def test_delete_archived_tasks(self, swh_scheduler): + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) - pending_tasks = self.backend.create_tasks(recurring + oneshots) + pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] - self.backend.mass_schedule_task_runs(backend_tasks) + swh_scheduler.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: - t = self.backend.end_task_run( + t = swh_scheduler.end_task_run( task['backend_id'], status='eventful') c = random.randint(0, 100) if c <= percent: _tasks.append({'task_id': t['task'], 'task_run_id': t['id']}) - self.backend.delete_archived_tasks(_tasks) + swh_scheduler.delete_archived_tasks(_tasks) - self.cursor.execute('select count(*) from task') - tasks_count = self.cursor.fetchone() + all_tasks = [task['id'] for task in swh_scheduler.search_tasks()] + tasks_count = len(all_tasks) + tasks_run_count = len(swh_scheduler.get_task_runs(all_tasks)) - self.cursor.execute('select count(*) from task_run') - tasks_run_count = self.cursor.fetchone() + assert tasks_count == total_tasks - len(_tasks) + assert tasks_run_count == total_tasks - len(_tasks) - self.assertEqual(tasks_count[0], total_tasks - len(_tasks)) - self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks)) - - def test_get_task_runs_no_task(self): + def test_get_task_runs_no_task(self, swh_scheduler): '''No task exist in the scheduler's db, get_task_runs() should always return an empty list. ''' - self.assertFalse(self.backend.get_task_runs(task_ids=())) - self.assertFalse(self.backend.get_task_runs(task_ids=(1, 2, 3))) - self.assertFalse(self.backend.get_task_runs(task_ids=(1, 2, 3), - limit=10)) + assert not swh_scheduler.get_task_runs(task_ids=()) + assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) + assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), + limit=10) - def test_get_task_runs_no_task_executed(self): + def test_get_task_runs_no_task_executed(self, swh_scheduler): '''No task has been executed yet, get_task_runs() should always return an empty list. ''' - self._create_task_types() + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) - self.backend.create_tasks(recurring + oneshots) + swh_scheduler.create_tasks(recurring + oneshots) - self.assertFalse(self.backend.get_task_runs( - task_ids=())) - self.assertFalse(self.backend.get_task_runs( - task_ids=(1, 2, 3))) - self.assertFalse(self.backend.get_task_runs( - task_ids=(1, 2, 3), limit=10)) + assert not swh_scheduler.get_task_runs(task_ids=()) + assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) + assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) - def test_get_task_runs_with_scheduled(self): + def test_get_task_runs_with_scheduled(self, swh_scheduler): '''Some tasks have been scheduled but not executed yet, get_task_runs() should not return an empty list. limit should behave as expected. ''' - self._create_task_types() + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) - pending_tasks = self.backend.create_tasks(recurring + oneshots) + pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] - self.backend.mass_schedule_task_runs(backend_tasks) + swh_scheduler.mass_schedule_task_runs(backend_tasks) - self.assertFalse(self.backend.get_task_runs( - task_ids=[total_tasks + 1])) + assert not swh_scheduler.get_task_runs( + task_ids=[total_tasks + 1]) btask = backend_tasks[0] - runs = self.backend.get_task_runs( + runs = swh_scheduler.get_task_runs( task_ids=[btask['task']]) - self.assertEqual(len(runs), 1) + assert len(runs) == 1 run = runs[0] - self.assertEqual(subdict(run, excl=('id',)), - {'task': btask['task'], - 'backend_id': btask['backend_id'], - 'scheduled': btask['scheduled'], - 'started': None, - 'ended': None, - 'metadata': None, - 'status': 'scheduled', - }) - - runs = self.backend.get_task_runs( + assert subdict(run, excl=('id',)) == { + 'task': btask['task'], + 'backend_id': btask['backend_id'], + 'scheduled': btask['scheduled'], + 'started': None, + 'ended': None, + 'metadata': None, + 'status': 'scheduled', + } + + runs = swh_scheduler.get_task_runs( task_ids=[bt['task'] for bt in backend_tasks], limit=2) - self.assertEqual(len(runs), 2) + assert len(runs) == 2 - runs = self.backend.get_task_runs( + runs = swh_scheduler.get_task_runs( task_ids=[bt['task'] for bt in backend_tasks]) - self.assertEqual(len(runs), total_tasks) + assert len(runs) == total_tasks keys = ('task', 'backend_id', 'scheduled') - self.assertEqual(sorted([subdict(x, keys) for x in runs], - key=lambda x: x['task']), - backend_tasks) + assert sorted([subdict(x, keys) for x in runs], + key=lambda x: x['task']) == backend_tasks - def test_get_task_runs_with_executed(self): + def test_get_task_runs_with_executed(self, swh_scheduler): '''Some tasks have been executed, get_task_runs() should not return an empty list. limit should behave as expected. ''' - self._create_task_types() + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) - pending_tasks = self.backend.create_tasks(recurring + oneshots) + pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] - self.backend.mass_schedule_task_runs(backend_tasks) + swh_scheduler.mass_schedule_task_runs(backend_tasks) btask = backend_tasks[0] ts = utcnow() - self.backend.start_task_run(btask['backend_id'], - metadata={'something': 'stupid'}, - timestamp=ts) - runs = self.backend.get_task_runs(task_ids=[btask['task']]) - self.assertEqual(len(runs), 1) - self.assertEqual(subdict(runs[0], excl=('id')), { + swh_scheduler.start_task_run(btask['backend_id'], + metadata={'something': 'stupid'}, + timestamp=ts) + runs = swh_scheduler.get_task_runs(task_ids=[btask['task']]) + assert len(runs) == 1 + assert subdict(runs[0], excl=('id')) == { 'task': btask['task'], 'backend_id': btask['backend_id'], 'scheduled': btask['scheduled'], @@ -610,16 +531,16 @@ 'ended': None, 'metadata': {'something': 'stupid'}, 'status': 'started', - }) + } ts2 = utcnow() - self.backend.end_task_run(btask['backend_id'], - metadata={'other': 'stuff'}, - timestamp=ts2, - status='eventful') - runs = self.backend.get_task_runs(task_ids=[btask['task']]) - self.assertEqual(len(runs), 1) - self.assertEqual(subdict(runs[0], excl=('id')), { + swh_scheduler.end_task_run(btask['backend_id'], + metadata={'other': 'stuff'}, + timestamp=ts2, + status='eventful') + runs = swh_scheduler.get_task_runs(task_ids=[btask['task']]) + assert len(runs) == 1 + assert subdict(runs[0], excl=('id')) == { 'task': btask['task'], 'backend_id': btask['backend_id'], 'scheduled': btask['scheduled'], @@ -627,11 +548,49 @@ 'ended': ts2, 'metadata': {'something': 'stupid', 'other': 'stuff'}, 'status': 'eventful', - }) + } + + @staticmethod + def _task_from_template(template, next_run, priority, *args, **kwargs): + ret = copy.deepcopy(template) + ret['next_run'] = next_run + if priority: + ret['priority'] = priority + if args: + ret['arguments']['args'] = list(args) + if kwargs: + ret['arguments']['kwargs'] = kwargs + return ret + + def _pop_priority(self, priorities): + if not priorities: + return None + for priority, remains in priorities.items(): + if remains > 0: + priorities[priority] = remains - 1 + return priority + return None + + def _tasks_from_template(self, template, max_timestamp, num, + num_priority=0, priorities=None): + if num_priority and priorities: + priorities = { + priority: ratio * num_priority + for priority, ratio in priorities.items() + } + tasks = [] + for i in range(num + num_priority): + priority = self._pop_priority(priorities) + tasks.append(self._task_from_template( + template, + max_timestamp - datetime.timedelta(microseconds=i), + priority, + 'argument-%03d' % i, + **{'kwarg%03d' % i: 'bogus-kwarg'} + )) + return tasks -class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase): - def setUp(self): - super().setUp() - self.config = {'db': 'dbname=' + self.TEST_DB_NAME} - self.backend = get_scheduler('local', self.config) + def _create_task_types(self, scheduler): + for tt in TASK_TYPES.values(): + scheduler.create_task_type(tt) diff --git a/swh/scheduler/tests/updater/conftest.py b/swh/scheduler/tests/updater/conftest.py --- a/swh/scheduler/tests/updater/conftest.py +++ b/swh/scheduler/tests/updater/conftest.py @@ -1,6 +1,7 @@ import pytest import glob import os +from arrow import utcnow # XXX from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler.updater.backend import SchedulerUpdaterBackend @@ -12,13 +13,9 @@ @pytest.fixture -def swh_scheduler_updater(request, postgresql_proc, postgresql): +def swh_scheduler_updater(postgresql): config = { - 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( - host=postgresql_proc.host, - port=postgresql_proc.port, - user='postgres', - dbname='tests') + 'db': postgresql.dsn, } all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) @@ -31,3 +28,41 @@ backend = SchedulerUpdaterBackend(**config) return backend + + +def make_event(event_type, name, origin_type): + return { + 'type': event_type, + 'repo': { + 'name': name, + }, + 'created_at': utcnow(), + 'origin_type': origin_type, + } + + +def make_simple_event(event_type, name, origin_type): + return { + 'type': event_type, + 'url': 'https://fakeurl/%s' % name, + 'origin_type': origin_type, + 'created_at': utcnow(), + } + + +def make_events(events): + for event_type, repo_name, origin_type in events: + yield make_event(event_type, repo_name, origin_type) + + +def make_incomplete_event(event_type, name, origin_type, + missing_data_key): + event = make_event(event_type, name, origin_type) + del event[missing_data_key] + return event + + +def make_incomplete_events(events): + for event_type, repo_name, origin_type, missing_data_key in events: + yield make_incomplete_event(event_type, repo_name, + origin_type, missing_data_key) diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py --- a/swh/scheduler/tests/updater/test_writer.py +++ b/swh/scheduler/tests/updater/test_writer.py @@ -4,150 +4,149 @@ # See top-level LICENSE file for more information import os -import unittest from glob import glob import pytest +from pytest_postgresql.factories import postgresql as pg_fixture_factory +from os.path import join from swh.core.utils import numfile_sortkey as sortkey -from swh.core.db.tests.db_testing import DbTestFixture from swh.scheduler.tests import SQL_DIR from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from swh.scheduler.updater.writer import UpdaterWriter -from . import UpdaterTestUtil - - -@pytest.mark.db -class CommonSchedulerTest(DbTestFixture): - TEST_SCHED_DB = 'softwareheritage-scheduler-test' - TEST_SCHED_DUMP = os.path.join(SQL_DIR, '*.sql') - - TEST_SCHED_UPDATER_DB = 'softwareheritage-scheduler-updater-test' - TEST_SCHED_UPDATER_DUMP = os.path.join(SQL_DIR, 'updater', '*.sql') - - @classmethod - def setUpClass(cls): - cls.add_db(cls.TEST_SCHED_DB, - [(sqlfn, 'psql') for sqlfn in - sorted(glob(cls.TEST_SCHED_DUMP), key=sortkey)]) - cls.add_db(cls.TEST_SCHED_UPDATER_DB, - [(sqlfn, 'psql') for sqlfn in - sorted(glob(cls.TEST_SCHED_UPDATER_DUMP), key=sortkey)]) - super().setUpClass() - - def tearDown(self): - self.reset_db_tables(self.TEST_SCHED_UPDATER_DB) - self.reset_db_tables(self.TEST_SCHED_DB, - excluded=['task_type', 'priority_ratio']) - super().tearDown() - - -class UpdaterWriterTest(UpdaterTestUtil, CommonSchedulerTest, - unittest.TestCase): - def setUp(self): - super().setUp() - - config = { - 'scheduler': { - 'cls': 'local', - 'args': { - 'db': 'dbname=softwareheritage-scheduler-test', - }, - }, - 'scheduler_updater': { - 'cls': 'local', - 'args': { - 'db': - 'dbname=softwareheritage-scheduler-updater-test', - 'cache_read_limit': 5, - }, +from .conftest import make_simple_event + + +pg_scheduler = pg_fixture_factory('postgresql_proc', 'scheduler') +pg_updater = pg_fixture_factory('postgresql_proc', 'updater') + + +def pg_sched_fact(dbname, sqldir): + @pytest.fixture + def pg_scheduler_db(request): + pg = request.getfixturevalue('pg_%s' % dbname) + dump_files = sorted(glob(os.path.join(sqldir, '*.sql')), + key=sortkey) + with pg.cursor() as cur: + for fname in dump_files: + with open(fname) as fobj: + sql = fobj.read().replace('concurrently', '') + cur.execute(sql) + pg.commit() + yield pg + + return pg_scheduler_db + + +scheduler_db = pg_sched_fact('scheduler', SQL_DIR) +updater_db = pg_sched_fact('updater', join(SQL_DIR, 'updater')) + + +@pytest.fixture +def swh_updater_writer(scheduler_db, updater_db): + config = { + 'scheduler': { + 'cls': 'local', + 'args': { + 'db': scheduler_db.dsn, }, - 'updater_writer': { - 'pause': 0.1, - 'verbose': False, + }, + 'scheduler_updater': { + 'cls': 'local', + 'args': { + 'db': updater_db.dsn, + 'cache_read_limit': 5, }, - } - self.writer = UpdaterWriter(**config) - self.scheduler_backend = self.writer.scheduler_backend - self.scheduler_updater_backend = self.writer.scheduler_updater_backend + }, + 'updater_writer': { + 'pause': 0.1, + 'verbose': False, + }, + } + return UpdaterWriter(**config) + + +def test_run_ko(swh_updater_writer): + """Only git tasks are supported for now, other types are dismissed. + + """ + scheduler = swh_updater_writer.scheduler_backend + updater = swh_updater_writer.scheduler_updater_backend - def test_run_ko(self): - """Only git tasks are supported for now, other types are dismissed. + ready_events = [ + SWHEvent( + make_simple_event(event_type, 'origin-%s' % i, + 'svn')) + for i, event_type in enumerate(LISTENED_EVENTS) + ] - """ - ready_events = [ - SWHEvent( - self._make_simple_event(event_type, 'origin-%s' % i, - 'svn')) - for i, event_type in enumerate(LISTENED_EVENTS) - ] + updater.cache_put(ready_events) + list(updater.cache_read()) - expected_length = len(ready_events) + r = scheduler.peek_ready_tasks('load-git') - self.scheduler_updater_backend.cache_put(ready_events) - data = list(self.scheduler_updater_backend.cache_read()) - self.assertEqual(len(data), expected_length) + # first read on an empty scheduling db results with nothing in it + assert not r - r = self.scheduler_backend.peek_ready_tasks('load-git') + # Read from cache to scheduler db + swh_updater_writer.run() - # first read on an empty scheduling db results with nothing in it - self.assertEqual(len(r), 0) + r = scheduler.peek_ready_tasks('load-git') - # Read from cache to scheduler db - self.writer.run() + # other reads after writes are still empty since it's not supported + assert not r - r = self.scheduler_backend.peek_ready_tasks('load-git') - # other reads after writes are still empty since it's not supported - self.assertEqual(len(r), 0) +def test_run_ok(swh_updater_writer): + """Only git origin are supported for now - def test_run_ok(self): - """Only git origin are supported for now + """ + scheduler = swh_updater_writer.scheduler_backend + updater = swh_updater_writer.scheduler_updater_backend - """ - ready_events = [ - SWHEvent( - self._make_simple_event(event_type, 'origin-%s' % i, 'git')) - for i, event_type in enumerate(LISTENED_EVENTS) - ] + ready_events = [ + SWHEvent( + make_simple_event(event_type, 'origin-%s' % i, 'git')) + for i, event_type in enumerate(LISTENED_EVENTS) + ] - expected_length = len(ready_events) + expected_length = len(ready_events) - self.scheduler_updater_backend.cache_put(ready_events) + updater.cache_put(ready_events) - data = list(self.scheduler_updater_backend.cache_read()) - self.assertEqual(len(data), expected_length) + data = list(updater.cache_read()) + assert len(data) == expected_length - r = self.scheduler_backend.peek_ready_tasks('load-git') + r = scheduler.peek_ready_tasks('load-git') - # first read on an empty scheduling db results with nothing in it - self.assertEqual(len(r), 0) + # first read on an empty scheduling db results with nothing in it + assert not r - # Read from cache to scheduler db - self.writer.run() + # Read from cache to scheduler db + swh_updater_writer.run() - # now, we should have scheduling task ready - r = self.scheduler_backend.peek_ready_tasks('load-git') + # now, we should have scheduling task ready + r = scheduler.peek_ready_tasks('load-git') - self.assertEqual(len(r), expected_length) + assert len(r) == expected_length - # Check the task has been scheduled - for t in r: - self.assertEqual(t['type'], 'load-git') - self.assertEqual(t['priority'], 'normal') - self.assertEqual(t['policy'], 'oneshot') - self.assertEqual(t['status'], 'next_run_not_scheduled') + # Check the task has been scheduled + for t in r: + assert t['type'] == 'load-git' + assert t['priority'] == 'normal' + assert t['policy'] == 'oneshot' + assert t['status'] == 'next_run_not_scheduled' - # writer has nothing to do now - self.writer.run() + # writer has nothing to do now + swh_updater_writer.run() - # so no more data in cache - data = list(self.scheduler_updater_backend.cache_read()) + # so no more data in cache + data = list(updater.cache_read()) - self.assertEqual(len(data), 0) + assert not data - # provided, no runner is ran, still the same amount of scheduling tasks - r = self.scheduler_backend.peek_ready_tasks('load-git') + # provided, no runner is ran, still the same amount of scheduling tasks + r = scheduler.peek_ready_tasks('load-git') - self.assertEqual(len(r), expected_length) + assert len(r) == expected_length