Staged modified swh/scheduler/pytest_plugin.py @@ -1,13 +1,15 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import os +from typing import List from celery.contrib.testing import worker from celery.contrib.testing.app import TestApp, setup_default_app +from kombu import Exchange, Queue import pkg_resources import pytest @@ -60,26 +62,45 @@ def swh_scheduler(swh_scheduler_config): swh_db_scheduler = swh_scheduler -@pytest.fixture(scope="session") -def swh_scheduler_celery_app(): - """Set up a Celery app as swh.scheduler and swh worker tests would expect it""" - test_app = TestApp( - set_as_current=True, - enable_logging=True, - task_cls="swh.scheduler.task:SWHTask", - config={ - "accept_content": ["application/x-msgpack", "application/json"], - "task_serializer": "msgpack", - "result_serializer": "json", - }, - ) - with setup_default_app(test_app, use_trap=False): - from swh.scheduler.celery_backend import config - - config.app = test_app - test_app.set_default() - test_app.set_current() - yield test_app +def swh_scheduler_celery_app_fact(queues: List[str] = [],): + """Celery app fixture factory to create celery app the swh.scheduler and swh worker + tests would expect it. + + """ + + @pytest.fixture(scope="session") + def swh_scheduler_celery_app_fn(): + """Set up a Celery app as swh.scheduler and swh worker tests would expect it""" + kwargs = dict( + set_as_current=True, + enable_logging=True, + task_cls="swh.scheduler.task:SWHTask", + config={ + "accept_content": ["application/x-msgpack", "application/json"], + "task_serializer": "msgpack", + "result_serializer": "json", + }, + ) + if queues: + kwargs["task_queues"] = [ + Queue(queue, Exchange(queue), routing_key=queue) for queue in queues + ] + + print("##### kwargs: ", kwargs) + + test_app = TestApp(**kwargs) + with setup_default_app(test_app, use_trap=False): + from swh.scheduler.celery_backend import config + + config.app = test_app + test_app.set_default() + test_app.set_current() + yield test_app + + return swh_scheduler_celery_app_fn + + +swh_scheduler_celery_app = swh_scheduler_celery_app_fact() @pytest.fixture(scope="session")