Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/pytest_plugin.py
Show First 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | |||||
def swh_scheduler_celery_app(): | def swh_scheduler_celery_app(): | ||||
"""Set up a Celery app as swh.scheduler and swh worker tests would expect it""" | """Set up a Celery app as swh.scheduler and swh worker tests would expect it""" | ||||
test_app = TestApp( | test_app = TestApp( | ||||
set_as_current=True, | set_as_current=True, | ||||
enable_logging=True, | enable_logging=True, | ||||
task_cls="swh.scheduler.task:SWHTask", | task_cls="swh.scheduler.task:SWHTask", | ||||
config={ | config={ | ||||
"accept_content": ["application/x-msgpack", "application/json"], | "accept_content": ["application/x-msgpack", "application/json"], | ||||
"broker_url": "memory://guest@localhost//", | |||||
"task_serializer": "msgpack", | "task_serializer": "msgpack", | ||||
"result_serializer": "json", | "result_serializer": "json", | ||||
}, | }, | ||||
) | ) | ||||
with setup_default_app(test_app, use_trap=False): | with setup_default_app(test_app, use_trap=False): | ||||
from swh.scheduler.celery_backend import config | from swh.scheduler.celery_backend import config | ||||
config.app = test_app | config.app = test_app | ||||
Show All 25 Lines |