diff --git a/requirements-swh.txt b/requirements-swh.txt index 2ad8f8c..6d30cae 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,2 @@ -swh.core[db,http] >= 0.0.60 +swh.core[db,http] >= 0.0.61 swh.storage >= 0.0.129 diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py index e0ecf5a..e3c194b 100644 --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -1,53 +1,54 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import requests from swh.core.api.tests.server_testing import ServerTestFixture + from swh.scheduler import get_scheduler from swh.scheduler.api.server import app from swh.scheduler.tests.test_scheduler import CommonSchedulerTest class RemoteSchedulerTest(CommonSchedulerTest, ServerTestFixture, unittest.TestCase): """Test the remote scheduler API. This class doesn't define any tests as we want identical functionality between local and remote scheduler. All the tests are therefore defined in CommonSchedulerTest. """ def setUp(self): self.config = { 'scheduler': { 'cls': 'local', 'args': { 'db': 'dbname=%s' % self.TEST_DB_NAME, } } } self.app = app # this will setup the local scheduler... super().setUp() # accessible through a remote scheduler accessible on the # given port self.backend = get_scheduler('remote', {'url': self.url()}) def test_site_map(self): sitemap = requests.get(self.url() + 'site-map') assert sitemap.headers['Content-Type'] == 'application/json' sitemap = sitemap.json() rules = set(x['rule'] for x in sitemap) # we expect at least these rules expected_rules = set('/'+rule for rule in ( 'set_status_tasks', 'create_task_type', 'get_task_type', 'get_task_types', 'create_tasks', 'disable_tasks', 'get_tasks', 'search_tasks', 'peek_ready_tasks', 'grab_ready_tasks', 'schedule_task_run', 'mass_schedule_task_runs', 'start_task_run', 'end_task_run', 'filter_task_to_archive', 'delete_archived_tasks')) assert rules.issuperset(expected_rules), expected_rules - rules diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index c553782..712bf5d 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,637 +1,637 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import os import random import unittest import uuid from collections import defaultdict import psycopg2 from arrow import utcnow import pytest -from swh.core.tests.db_testing import SingleDbTestFixture +from swh.core.db.tests.db_testing import SingleDbTestFixture from swh.scheduler import get_scheduler from . import SQL_DIR TASK_TYPES = { 'git': { 'type': 'update-git', 'description': 'Update a git repository', 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', 'default_interval': datetime.timedelta(days=64), 'min_interval': datetime.timedelta(hours=12), 'max_interval': datetime.timedelta(days=64), 'backoff_factor': 2, 'max_queue_length': None, 'num_retries': 7, 'retry_delay': datetime.timedelta(hours=2), }, 'hg': { 'type': 'update-hg', 'description': 'Update a mercurial repository', 'backend_name': 'swh.loader.mercurial.tasks.UpdateHgRepository', 'default_interval': datetime.timedelta(days=64), 'min_interval': datetime.timedelta(hours=12), 'max_interval': datetime.timedelta(days=64), 'backoff_factor': 2, 'max_queue_length': None, 'num_retries': 7, 'retry_delay': datetime.timedelta(hours=2), }, } TEMPLATES = { 'git': { 'type': 'update-git', 'arguments': { 'args': [], 'kwargs': {}, }, 'next_run': None, }, 'hg': { 'type': 'update-hg', 'arguments': { 'args': [], 'kwargs': {}, }, 'next_run': None, 'policy': 'oneshot', } } def subdict(d, keys=None, excl=()): if keys is None: keys = [k for k in d.keys()] return {k: d[k] for k in keys if k not in excl} @pytest.mark.db class CommonSchedulerTest(SingleDbTestFixture): TEST_DB_NAME = 'softwareheritage-scheduler-test' TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') def tearDown(self): self.empty_tables() super().tearDown() def empty_tables(self, whitelist=["priority_ratio"]): query = """SELECT table_name FROM information_schema.tables WHERE table_schema = %%s and table_name not in (%s) """ % ','.join(map(lambda t: "'%s'" % t, whitelist)) self.cursor.execute(query, ('public', )) tables = set(table for (table,) in self.cursor.fetchall()) for table in tables: self.cursor.execute('truncate table %s cascade' % table) self.conn.commit() def test_add_task_type(self): tt = TASK_TYPES['git'] self.backend.create_task_type(tt) self.assertEqual(tt, self.backend.get_task_type(tt['type'])) with self.assertRaisesRegex(psycopg2.IntegrityError, r'\(type\)=\(%s\)' % tt['type']): self.backend.create_task_type(tt) tt2 = TASK_TYPES['hg'] self.backend.create_task_type(tt2) self.assertEqual(tt, self.backend.get_task_type(tt['type'])) self.assertEqual(tt2, self.backend.get_task_type(tt2['type'])) def test_get_task_types(self): tt, tt2 = TASK_TYPES['git'], TASK_TYPES['hg'] self.backend.create_task_type(tt) self.backend.create_task_type(tt2) self.assertCountEqual([tt2, tt], self.backend.get_task_types()) @staticmethod def _task_from_template(template, next_run, priority, *args, **kwargs): ret = copy.deepcopy(template) ret['next_run'] = next_run if priority: ret['priority'] = priority if args: ret['arguments']['args'] = list(args) if kwargs: ret['arguments']['kwargs'] = kwargs return ret def _pop_priority(self, priorities): if not priorities: return None for priority, remains in priorities.items(): if remains > 0: priorities[priority] = remains - 1 return priority return None def _tasks_from_template(self, template, max_timestamp, num, num_priority=0, priorities=None): if num_priority and priorities: priorities = { priority: ratio * num_priority for priority, ratio in priorities.items() } tasks = [] for i in range(num + num_priority): priority = self._pop_priority(priorities) tasks.append(self._task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, 'argument-%03d' % i, **{'kwarg%03d' % i: 'bogus-kwarg'} )) return tasks def _create_task_types(self): for tt in TASK_TYPES.values(): self.backend.create_task_type(tt) def test_create_tasks(self): priority_ratio = self._priority_ratio() self._create_task_types() num_tasks_priority = 100 tasks_1 = self._tasks_from_template( TEMPLATES['git'], utcnow(), 100) tasks_2 = self._tasks_from_template( TEMPLATES['hg'], utcnow(), 100, num_tasks_priority, priorities=priority_ratio) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids ret1 = self.backend.create_tasks(tasks + tasks_1 + tasks_2) set_ret1 = set([t['id'] for t in ret1]) # creating the same set result in the same ids ret = self.backend.create_tasks(tasks) set_ret = set([t['id'] for t in ret]) # Idempotence results self.assertEqual(set_ret, set_ret1) self.assertEqual(len(ret), len(ret1)) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) task_type = TASK_TYPES[orig_task['type'].split('-')[-1]] self.assertNotIn(task['id'], ids) self.assertEqual(task['status'], 'next_run_not_scheduled') self.assertEqual(task['current_interval'], task_type['default_interval']) self.assertEqual(task['policy'], orig_task.get('policy', 'recurring')) priority = task.get('priority') if priority: actual_priorities[priority] += 1 self.assertEqual(task['retries_left'], task_type['num_retries'] or 0) ids.add(task['id']) del task['id'] del task['status'] del task['current_interval'] del task['retries_left'] if 'policy' not in orig_task: del task['policy'] if 'priority' not in orig_task: del task['priority'] self.assertEqual(task, orig_task) self.assertEqual(dict(actual_priorities), { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() }) def test_peek_ready_tasks_no_priority(self): self._create_task_types() t = utcnow() task_type = TEMPLATES['git']['type'] tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) random.shuffle(tasks) self.backend.create_tasks(tasks) ready_tasks = self.backend.peek_ready_tasks(task_type) self.assertEqual(len(ready_tasks), len(tasks)) for i in range(len(ready_tasks) - 1): self.assertLessEqual(ready_tasks[i]['next_run'], ready_tasks[i+1]['next_run']) # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks)//2) ready_tasks_limited = self.backend.peek_ready_tasks( task_type, num_tasks=limit) self.assertEqual(len(ready_tasks_limited), limit) self.assertCountEqual(ready_tasks_limited, ready_tasks[:limit]) # Limit by timestamp max_ts = tasks[limit-1]['next_run'] ready_tasks_timestamped = self.backend.peek_ready_tasks( task_type, timestamp=max_ts) for ready_task in ready_tasks_timestamped: self.assertLessEqual(ready_task['next_run'], max_ts) # Make sure we get proper behavior for the first ready tasks self.assertCountEqual( ready_tasks[:len(ready_tasks_timestamped)], ready_tasks_timestamped, ) # Limit by both ready_tasks_both = self.backend.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit//3) self.assertLessEqual(len(ready_tasks_both), limit//3) for ready_task in ready_tasks_both: self.assertLessEqual(ready_task['next_run'], max_ts) self.assertIn(ready_task, ready_tasks[:limit//3]) def _priority_ratio(self): self.cursor.execute('select id, ratio from priority_ratio') priority_ratio = {} for row in self.cursor.fetchall(): priority_ratio[row[0]] = row[1] return priority_ratio def test_peek_ready_tasks_mixed_priorities(self): priority_ratio = self._priority_ratio() self._create_task_types() t = utcnow() task_type = TEMPLATES['git']['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( TEMPLATES['git'], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) self.backend.create_tasks(tasks) # take all available tasks ready_tasks = self.backend.peek_ready_tasks( task_type) self.assertEqual(len(ready_tasks), len(tasks)) self.assertEqual(num_tasks_priority + num_tasks_no_priority, len(ready_tasks)) count_tasks_per_priority = defaultdict(int) for task in ready_tasks: priority = task.get('priority') if priority: count_tasks_per_priority[priority] += 1 self.assertEqual(dict(count_tasks_per_priority), { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() }) # Only get some ready tasks num_tasks = random.randrange(5, 5 + num_tasks_no_priority//2) num_tasks_priority = random.randrange(5, num_tasks_priority//2) ready_tasks_limited = self.backend.peek_ready_tasks( task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) count_tasks_per_priority = defaultdict(int) for task in ready_tasks_limited: priority = task.get('priority') count_tasks_per_priority[priority] += 1 import math for priority, ratio in priority_ratio.items(): expected_count = math.ceil(ratio * num_tasks_priority) actual_prio = count_tasks_per_priority[priority] self.assertTrue( actual_prio == expected_count or actual_prio == expected_count + 1) self.assertEqual(count_tasks_per_priority[None], num_tasks) def test_grab_ready_tasks(self): priority_ratio = self._priority_ratio() self._create_task_types() t = utcnow() task_type = TEMPLATES['git']['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( TEMPLATES['git'], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) self.backend.create_tasks(tasks) first_ready_tasks = self.backend.peek_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) grabbed_tasks = self.backend.grab_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): self.assertEqual(peeked['status'], 'next_run_not_scheduled') del peeked['status'] self.assertEqual(grabbed['status'], 'next_run_scheduled') del grabbed['status'] self.assertEqual(peeked, grabbed) self.assertEqual(peeked['priority'], grabbed['priority']) def test_get_tasks(self): self._create_task_types() t = utcnow() tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) tasks = self.backend.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) cur_tasks = tasks[:length] tasks[:length] = [] ret = self.backend.get_tasks(task['id'] for task in cur_tasks) self.assertCountEqual(ret, cur_tasks) def test_search_tasks(self): def make_real_dicts(l): """RealDictRow is not a real dict.""" return [dict(d.items()) for d in l] self._create_task_types() t = utcnow() tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) tasks = self.backend.create_tasks(tasks) self.assertCountEqual( make_real_dicts(self.backend.search_tasks()), make_real_dicts(tasks)) def test_filter_task_to_archive(self): """Filtering only list disabled recurring or completed oneshot tasks """ self._create_task_types() _time = utcnow() recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template(TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks pending_tasks = self.backend.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] self.backend.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: t = self.backend.end_task_run( task['backend_id'], status='eventful') _tasks.append(t) # Randomly update task's status per policy status_per_policy = {'recurring': 0, 'oneshot': 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] 'recurring': [(1, 'disabled'), (0, 'completed'), (0, 'next_run_not_scheduled')], 'oneshot': [(0, 'next_run_not_scheduled'), (1, 'disabled'), (1, 'completed')] } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task['policy'] _task_ids[policy].append(task['id']) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task['id']) self.backend.disable_tasks(tasks_to_update['recurring']) # hack: change the status to something else than completed/disabled self.backend.set_status_tasks( _task_ids['oneshot'], status='next_run_not_scheduled') # complete the tasks to update self.backend.set_status_tasks( tasks_to_update['oneshot'], status='completed') total_tasks_filtered = (status_per_policy['recurring'] + status_per_policy['oneshot']) # retrieve tasks to archive after = _time.shift(days=-1).format('YYYY-MM-DD') before = utcnow().shift(days=1).format('YYYY-MM-DD') tasks_to_archive = list(self.backend.filter_task_to_archive( after_ts=after, before_ts=before, limit=total_tasks)) self.assertEqual(len(tasks_to_archive), total_tasks_filtered) actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} for task in tasks_to_archive: actual_filtered_per_status[task['task_policy']] += 1 self.assertEqual(actual_filtered_per_status, status_per_policy) def test_delete_archived_tasks(self): self._create_task_types() _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = self.backend.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] self.backend.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: t = self.backend.end_task_run( task['backend_id'], status='eventful') c = random.randint(0, 100) if c <= percent: _tasks.append({'task_id': t['task'], 'task_run_id': t['id']}) self.backend.delete_archived_tasks(_tasks) self.cursor.execute('select count(*) from task') tasks_count = self.cursor.fetchone() self.cursor.execute('select count(*) from task_run') tasks_run_count = self.cursor.fetchone() self.assertEqual(tasks_count[0], total_tasks - len(_tasks)) self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks)) def test_get_task_runs_no_task(self): '''No task exist in the scheduler's db, get_task_runs() should always return an empty list. ''' self.assertFalse(self.backend.get_task_runs(task_ids=())) self.assertFalse(self.backend.get_task_runs(task_ids=(1, 2, 3))) self.assertFalse(self.backend.get_task_runs(task_ids=(1, 2, 3), limit=10)) def test_get_task_runs_no_task_executed(self): '''No task has been executed yet, get_task_runs() should always return an empty list. ''' self._create_task_types() _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) self.backend.create_tasks(recurring + oneshots) self.assertFalse(self.backend.get_task_runs( task_ids=())) self.assertFalse(self.backend.get_task_runs( task_ids=(1, 2, 3))) self.assertFalse(self.backend.get_task_runs( task_ids=(1, 2, 3), limit=10)) def test_get_task_runs_with_scheduled(self): '''Some tasks have been scheduled but not executed yet, get_task_runs() should not return an empty list. limit should behave as expected. ''' self._create_task_types() _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = self.backend.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] self.backend.mass_schedule_task_runs(backend_tasks) self.assertFalse(self.backend.get_task_runs( task_ids=[total_tasks + 1])) btask = backend_tasks[0] runs = self.backend.get_task_runs( task_ids=[btask['task']]) self.assertEqual(len(runs), 1) run = runs[0] self.assertEqual(subdict(run, excl=('id',)), {'task': btask['task'], 'backend_id': btask['backend_id'], 'scheduled': btask['scheduled'], 'started': None, 'ended': None, 'metadata': None, 'status': 'scheduled', }) runs = self.backend.get_task_runs( task_ids=[bt['task'] for bt in backend_tasks], limit=2) self.assertEqual(len(runs), 2) runs = self.backend.get_task_runs( task_ids=[bt['task'] for bt in backend_tasks]) self.assertEqual(len(runs), total_tasks) keys = ('task', 'backend_id', 'scheduled') self.assertEqual(sorted([subdict(x, keys) for x in runs], key=lambda x: x['task']), backend_tasks) def test_get_task_runs_with_executed(self): '''Some tasks have been executed, get_task_runs() should not return an empty list. limit should behave as expected. ''' self._create_task_types() _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) pending_tasks = self.backend.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] self.backend.mass_schedule_task_runs(backend_tasks) btask = backend_tasks[0] ts = utcnow() self.backend.start_task_run(btask['backend_id'], metadata={'something': 'stupid'}, timestamp=ts) runs = self.backend.get_task_runs(task_ids=[btask['task']]) self.assertEqual(len(runs), 1) self.assertEqual(subdict(runs[0], excl=('id')), { 'task': btask['task'], 'backend_id': btask['backend_id'], 'scheduled': btask['scheduled'], 'started': ts, 'ended': None, 'metadata': {'something': 'stupid'}, 'status': 'started', }) ts2 = utcnow() self.backend.end_task_run(btask['backend_id'], metadata={'other': 'stuff'}, timestamp=ts2, status='eventful') runs = self.backend.get_task_runs(task_ids=[btask['task']]) self.assertEqual(len(runs), 1) self.assertEqual(subdict(runs[0], excl=('id')), { 'task': btask['task'], 'backend_id': btask['backend_id'], 'scheduled': btask['scheduled'], 'started': ts, 'ended': ts2, 'metadata': {'something': 'stupid', 'other': 'stuff'}, 'status': 'eventful', }) class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase): def setUp(self): super().setUp() self.config = {'db': 'dbname=' + self.TEST_DB_NAME} self.backend = get_scheduler('local', self.config) diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py index 77dee54..73d71e1 100644 --- a/swh/scheduler/tests/updater/test_writer.py +++ b/swh/scheduler/tests/updater/test_writer.py @@ -1,158 +1,158 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest from glob import glob import pytest from swh.core.utils import numfile_sortkey as sortkey -from swh.core.tests.db_testing import DbTestFixture +from swh.core.db.tests.db_testing import DbTestFixture from swh.scheduler.tests import SQL_DIR from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from swh.scheduler.updater.writer import UpdaterWriter from . import UpdaterTestUtil @pytest.mark.db class CommonSchedulerTest(DbTestFixture): TEST_SCHED_DB = 'softwareheritage-scheduler-test' TEST_SCHED_DUMP = os.path.join(SQL_DIR, '*.sql') TEST_SCHED_UPDATER_DB = 'softwareheritage-scheduler-updater-test' TEST_SCHED_UPDATER_DUMP = os.path.join(SQL_DIR, 'updater', '*.sql') @classmethod def setUpClass(cls): cls.add_db(cls.TEST_SCHED_DB, [(sqlfn, 'psql') for sqlfn in sorted(glob(cls.TEST_SCHED_DUMP), key=sortkey)]) cls.add_db(cls.TEST_SCHED_UPDATER_DB, [(sqlfn, 'psql') for sqlfn in sorted(glob(cls.TEST_SCHED_UPDATER_DUMP), key=sortkey)]) super().setUpClass() def tearDown(self): self.reset_db_tables(self.TEST_SCHED_UPDATER_DB) self.reset_db_tables(self.TEST_SCHED_DB, excluded=['task_type', 'priority_ratio']) super().tearDown() class UpdaterWriterTest(UpdaterTestUtil, CommonSchedulerTest, unittest.TestCase): def setUp(self): super().setUp() config = { 'scheduler': { 'cls': 'local', 'args': { 'db': 'dbname=softwareheritage-scheduler-test', }, }, 'scheduler_updater': { 'cls': 'local', 'args': { 'db': 'dbname=softwareheritage-scheduler-updater-test', 'cache_read_limit': 5, }, }, 'updater_writer': { 'pause': 0.1, 'verbose': False, }, } self.writer = UpdaterWriter(**config) self.scheduler_backend = self.writer.scheduler_backend self.scheduler_updater_backend = self.writer.scheduler_updater_backend def test_run_ko(self): """Only git tasks are supported for now, other types are dismissed. """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'svn')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # other reads after writes are still empty since it's not supported self.assertEqual(len(r), 0) def test_run_ok(self): """Only git origin are supported for now """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'git')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() # now, we should have scheduling task ready r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEqual(len(r), expected_length) # Check the task has been scheduled for t in r: self.assertEqual(t['type'], 'origin-update-git') self.assertEqual(t['priority'], 'normal') self.assertEqual(t['policy'], 'oneshot') self.assertEqual(t['status'], 'next_run_not_scheduled') # writer has nothing to do now self.writer.run() # so no more data in cache data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), 0) # provided, no runner is ran, still the same amount of scheduling tasks r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEqual(len(r), expected_length)