Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/conftest.py
import os | import os | ||||
import pytest | import pytest | ||||
import glob | import glob | ||||
from datetime import timedelta | from datetime import timedelta | ||||
import swh.scheduler.celery_backend.config | from swh.core.utils import numfile_sortkey as sortkey | ||||
from swh.scheduler import get_scheduler | |||||
from swh.scheduler.tests import SQL_DIR | |||||
# make sure we are not fooled by CELERY_ config environment vars | |||||
for var in [x for x in os.environ.keys() if x.startswith('CELERY')]: | |||||
os.environ.pop(var) | |||||
import swh.scheduler.celery_backend.config # noqa | |||||
# this import is needed here to enforce creation of the celery current app | # this import is needed here to enforce creation of the celery current app | ||||
# BEFORE the swh_app fixture is called, otherwise the Celery app instance from | # BEFORE the swh_app fixture is called, otherwise the Celery app instance from | ||||
# celery_backend.config becomes the celery.current_app | # celery_backend.config becomes the celery.current_app | ||||
from swh.core.utils import numfile_sortkey as sortkey | |||||
from swh.scheduler import get_scheduler | |||||
from swh.scheduler.tests import SQL_DIR | |||||
DUMP_FILES = os.path.join(SQL_DIR, '*.sql') | DUMP_FILES = os.path.join(SQL_DIR, '*.sql') | ||||
# celery tasks for testing purpose; tasks themselves should be | # celery tasks for testing purpose; tasks themselves should be | ||||
# in swh/scheduler/tests/celery_tasks.py | # in swh/scheduler/tests/celery_tasks.py | ||||
TASK_NAMES = ['ping', 'multiping', 'add', 'error'] | TASK_NAMES = ['ping', 'multiping', 'add', 'error'] | ||||
@pytest.fixture(scope='session') | @pytest.fixture(scope='session') | ||||
▲ Show 20 Lines • Show All 66 Lines • Show Last 20 Lines |