Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_scheduler.py
Show First 20 Lines • Show All 365 Lines • ▼ Show 20 Lines | def test_get_tasks(self): | ||||
length = random.randrange(1, len(tasks)) | length = random.randrange(1, len(tasks)) | ||||
cur_tasks = tasks[:length] | cur_tasks = tasks[:length] | ||||
tasks[:length] = [] | tasks[:length] = [] | ||||
ret = self.backend.get_tasks(task['id'] for task in cur_tasks) | ret = self.backend.get_tasks(task['id'] for task in cur_tasks) | ||||
self.assertCountEqual(ret, cur_tasks) | self.assertCountEqual(ret, cur_tasks) | ||||
def test_search_tasks(self): | def test_search_tasks(self): | ||||
def make_real_dicts(l): | |||||
"""RealDictRow is not a real dict.""" | |||||
return [dict(d.items()) for d in l] | |||||
self._create_task_types() | self._create_task_types() | ||||
t = utcnow() | t = utcnow() | ||||
tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) | tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) | ||||
tasks = self.backend.create_tasks(tasks) | tasks = self.backend.create_tasks(tasks) | ||||
self.assertCountEqual(self.backend.search_tasks(), tasks) | self.assertCountEqual( | ||||
make_real_dicts(self.backend.search_tasks()), | |||||
make_real_dicts(tasks)) | |||||
def test_filter_task_to_archive(self): | def test_filter_task_to_archive(self): | ||||
"""Filtering only list disabled recurring or completed oneshot tasks | """Filtering only list disabled recurring or completed oneshot tasks | ||||
""" | """ | ||||
self._create_task_types() | self._create_task_types() | ||||
_time = utcnow() | _time = utcnow() | ||||
recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12) | recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12) | ||||
▲ Show 20 Lines • Show All 246 Lines • Show Last 20 Lines |