Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/scheduler_testing.py
import glob | import glob | ||||
import os.path | import os.path | ||||
import datetime | import datetime | ||||
from celery.result import AsyncResult | from celery.result import AsyncResult | ||||
from celery.contrib.testing.worker import start_worker | from celery.contrib.testing.worker import start_worker | ||||
import celery.contrib.testing.tasks # noqa | import celery.contrib.testing.tasks # noqa | ||||
import pytest | import pytest | ||||
from swh.core.tests.db_testing import DbTestFixture, DB_DUMP_TYPES | from swh.core.tests.db_testing import DbTestFixture, DB_DUMP_TYPES | ||||
from swh.core.utils import numfile_sortkey as sortkey | from swh.core.utils import numfile_sortkey as sortkey | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
from swh.scheduler.celery_backend.runner import run_ready_tasks | from swh.scheduler.celery_backend.runner import run_ready_tasks | ||||
from swh.scheduler.celery_backend.config import app | from swh.scheduler.celery_backend.config import app, register_task_class | ||||
from swh.scheduler.tests.celery_testing import CeleryTestFixture | from swh.scheduler.tests.celery_testing import CeleryTestFixture | ||||
from . import SQL_DIR | from . import SQL_DIR | ||||
DUMP_FILES = os.path.join(SQL_DIR, '*.sql') | DUMP_FILES = os.path.join(SQL_DIR, '*.sql') | ||||
@pytest.mark.db | @pytest.mark.db | ||||
Show All 13 Lines | def add_scheduler_task_type(self, task_type, backend_name, | ||||
'max_interval': datetime.timedelta(days=64), | 'max_interval': datetime.timedelta(days=64), | ||||
'backoff_factor': 2, | 'backoff_factor': 2, | ||||
'max_queue_length': None, | 'max_queue_length': None, | ||||
'num_retries': 7, | 'num_retries': 7, | ||||
'retry_delay': datetime.timedelta(hours=2), | 'retry_delay': datetime.timedelta(hours=2), | ||||
} | } | ||||
self.scheduler.create_task_type(task_type) | self.scheduler.create_task_type(task_type) | ||||
if task_class: | if task_class: | ||||
app.register_task_class(backend_name, task_class) | register_task_class(app, backend_name, task_class) | ||||
def run_ready_tasks(self): | def run_ready_tasks(self): | ||||
"""Runs the scheduler and a Celery worker, then blocks until | """Runs the scheduler and a Celery worker, then blocks until | ||||
all tasks are completed.""" | all tasks are completed.""" | ||||
# Make sure the worker is listening to all task-specific queues | # Make sure the worker is listening to all task-specific queues | ||||
for task in self.scheduler.get_task_types(): | for task in self.scheduler.get_task_types(): | ||||
app.amqp.queues.select_add(task['backend_name']) | app.amqp.queues.select_add(task['backend_name']) | ||||
Show All 27 Lines |