Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_scheduler.py
Show First 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | 'hg': { | ||||
'kwargs': {}, | 'kwargs': {}, | ||||
}, | }, | ||||
'next_run': None, | 'next_run': None, | ||||
'policy': 'oneshot', | 'policy': 'oneshot', | ||||
} | } | ||||
} | } | ||||
def subdict(d, keys=None, excl=()): | |||||
if keys is None: | |||||
keys = [k for k in d.keys()] | |||||
return {k: d[k] for k in keys if k not in excl} | |||||
@pytest.mark.db | @pytest.mark.db | ||||
class CommonSchedulerTest(SingleDbTestFixture): | class CommonSchedulerTest(SingleDbTestFixture): | ||||
TEST_DB_NAME = 'softwareheritage-scheduler-test' | TEST_DB_NAME = 'softwareheritage-scheduler-test' | ||||
TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') | TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') | ||||
def tearDown(self): | def tearDown(self): | ||||
self.empty_tables() | self.empty_tables() | ||||
super().tearDown() | super().tearDown() | ||||
▲ Show 20 Lines • Show All 391 Lines • ▼ Show 20 Lines | def test_delete_archived_tasks(self): | ||||
tasks_count = self.cursor.fetchone() | tasks_count = self.cursor.fetchone() | ||||
self.cursor.execute('select count(*) from task_run') | self.cursor.execute('select count(*) from task_run') | ||||
tasks_run_count = self.cursor.fetchone() | tasks_run_count = self.cursor.fetchone() | ||||
self.assertEqual(tasks_count[0], total_tasks - len(_tasks)) | self.assertEqual(tasks_count[0], total_tasks - len(_tasks)) | ||||
self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks)) | self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks)) | ||||
def test_get_task_runs_no_task(self): | |||||
'''No task exist in the scheduler's db, get_task_runs() should always return an | |||||
empty list. | |||||
''' | |||||
self.assertFalse(self.backend.get_task_runs(task_ids=())) | |||||
self.assertFalse(self.backend.get_task_runs(task_ids=(1, 2, 3))) | |||||
self.assertFalse(self.backend.get_task_runs(task_ids=(1, 2, 3), | |||||
limit=10)) | |||||
def test_get_task_runs_no_task_executed(self): | |||||
'''No task has been executed yet, get_task_runs() should always return an empty | |||||
list. | |||||
''' | |||||
self._create_task_types() | |||||
_time = utcnow() | |||||
recurring = self._tasks_from_template( | |||||
TEMPLATES['git'], _time, 12) | |||||
oneshots = self._tasks_from_template( | |||||
TEMPLATES['hg'], _time, 12) | |||||
self.backend.create_tasks(recurring + oneshots) | |||||
self.assertFalse(self.backend.get_task_runs( | |||||
task_ids=())) | |||||
self.assertFalse(self.backend.get_task_runs( | |||||
task_ids=(1, 2, 3))) | |||||
self.assertFalse(self.backend.get_task_runs( | |||||
task_ids=(1, 2, 3), limit=10)) | |||||
def test_get_task_runs_with_scheduled(self): | |||||
'''Some tasks have been scheduled but not executed yet, get_task_runs() should | |||||
not return an empty list. limit should behave as expected. | |||||
''' | |||||
self._create_task_types() | |||||
_time = utcnow() | |||||
recurring = self._tasks_from_template( | |||||
TEMPLATES['git'], _time, 12) | |||||
oneshots = self._tasks_from_template( | |||||
TEMPLATES['hg'], _time, 12) | |||||
total_tasks = len(recurring) + len(oneshots) | |||||
pending_tasks = self.backend.create_tasks(recurring + oneshots) | |||||
backend_tasks = [{ | |||||
'task': task['id'], | |||||
'backend_id': str(uuid.uuid4()), | |||||
'scheduled': utcnow(), | |||||
} for task in pending_tasks] | |||||
self.backend.mass_schedule_task_runs(backend_tasks) | |||||
self.assertFalse(self.backend.get_task_runs( | |||||
task_ids=[total_tasks + 1])) | |||||
btask = backend_tasks[0] | |||||
runs = self.backend.get_task_runs( | |||||
task_ids=[btask['task']]) | |||||
self.assertEqual(len(runs), 1) | |||||
run = runs[0] | |||||
self.assertEqual(subdict(run, excl=('id',)), | |||||
{'task': btask['task'], | |||||
'backend_id': btask['backend_id'], | |||||
'scheduled': btask['scheduled'], | |||||
'started': None, | |||||
'ended': None, | |||||
'metadata': None, | |||||
'status': 'scheduled', | |||||
}) | |||||
runs = self.backend.get_task_runs( | |||||
task_ids=[bt['task'] for bt in backend_tasks], limit=2) | |||||
self.assertEqual(len(runs), 2) | |||||
runs = self.backend.get_task_runs( | |||||
task_ids=[bt['task'] for bt in backend_tasks]) | |||||
self.assertEqual(len(runs), total_tasks) | |||||
keys = ('task', 'backend_id', 'scheduled') | |||||
self.assertEqual(sorted([subdict(x, keys) for x in runs], | |||||
key=lambda x: x['task']), | |||||
backend_tasks) | |||||
def test_get_task_runs_with_executed(self): | |||||
'''Some tasks have been executed, get_task_runs() should | |||||
not return an empty list. limit should behave as expected. | |||||
''' | |||||
self._create_task_types() | |||||
_time = utcnow() | |||||
recurring = self._tasks_from_template( | |||||
TEMPLATES['git'], _time, 12) | |||||
oneshots = self._tasks_from_template( | |||||
TEMPLATES['hg'], _time, 12) | |||||
pending_tasks = self.backend.create_tasks(recurring + oneshots) | |||||
backend_tasks = [{ | |||||
'task': task['id'], | |||||
'backend_id': str(uuid.uuid4()), | |||||
'scheduled': utcnow(), | |||||
} for task in pending_tasks] | |||||
self.backend.mass_schedule_task_runs(backend_tasks) | |||||
btask = backend_tasks[0] | |||||
ts = utcnow() | |||||
self.backend.start_task_run(btask['backend_id'], | |||||
metadata={'something': 'stupid'}, | |||||
timestamp=ts) | |||||
runs = self.backend.get_task_runs(task_ids=[btask['task']]) | |||||
self.assertEqual(len(runs), 1) | |||||
self.assertEqual(subdict(runs[0], excl=('id')), { | |||||
'task': btask['task'], | |||||
'backend_id': btask['backend_id'], | |||||
'scheduled': btask['scheduled'], | |||||
'started': ts, | |||||
'ended': None, | |||||
'metadata': {'something': 'stupid'}, | |||||
'status': 'started', | |||||
}) | |||||
ts2 = utcnow() | |||||
self.backend.end_task_run(btask['backend_id'], | |||||
metadata={'other': 'stuff'}, | |||||
timestamp=ts2, | |||||
status='eventful') | |||||
runs = self.backend.get_task_runs(task_ids=[btask['task']]) | |||||
self.assertEqual(len(runs), 1) | |||||
self.assertEqual(subdict(runs[0], excl=('id')), { | |||||
'task': btask['task'], | |||||
'backend_id': btask['backend_id'], | |||||
'scheduled': btask['scheduled'], | |||||
'started': ts, | |||||
'ended': ts2, | |||||
'metadata': {'something': 'stupid', 'other': 'stuff'}, | |||||
'status': 'eventful', | |||||
}) | |||||
class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase): | class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase): | ||||
def setUp(self): | def setUp(self): | ||||
super().setUp() | super().setUp() | ||||
self.config = {'db': 'dbname=' + self.TEST_DB_NAME} | self.config = {'db': 'dbname=' + self.TEST_DB_NAME} | ||||
self.backend = get_scheduler('local', self.config) | self.backend = get_scheduler('local', self.config) |