Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/scheduler_testing.py
Show All 38 Lines | def add_scheduler_task_type(self, task_type, backend_name): | ||||
'num_retries': 7, | 'num_retries': 7, | ||||
'retry_delay': datetime.timedelta(hours=2), | 'retry_delay': datetime.timedelta(hours=2), | ||||
} | } | ||||
self.scheduler.create_task_type(task_type) | self.scheduler.create_task_type(task_type) | ||||
def run_ready_tasks(self): | def run_ready_tasks(self): | ||||
"""Runs the scheduler and a Celery worker, then blocks until | """Runs the scheduler and a Celery worker, then blocks until | ||||
all tasks are completed.""" | all tasks are completed.""" | ||||
# Make sure the worker is listening to all task-specific queues | |||||
for task in self.scheduler.get_task_types(): | |||||
app.amqp.queues.select_add(task['backend_name']) | |||||
with start_worker(app): | with start_worker(app): | ||||
backend_tasks = run_ready_tasks(self.scheduler, app) | backend_tasks = run_ready_tasks(self.scheduler, app) | ||||
for task in backend_tasks: | for task in backend_tasks: | ||||
AsyncResult(id=task['backend_id']).get() | AsyncResult(id=task['backend_id']).get() | ||||
@classmethod | @classmethod | ||||
def setUpClass(cls): | def setUpClass(cls): | ||||
all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) | all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) | ||||
Show All 17 Lines |