Page MenuHomeSoftware Heritage
Paste P1006

tryout fixture but this hung so meh
ActivePublic

Authored by ardumont on Apr 15 2021, 1:29 PM.
Staged
modified swh/scheduler/pytest_plugin.py
@@ -1,13 +1,15 @@
-# Copyright (C) 2020 The Software Heritage developers
+# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
import os
+from typing import List
from celery.contrib.testing import worker
from celery.contrib.testing.app import TestApp, setup_default_app
+from kombu import Exchange, Queue
import pkg_resources
import pytest
@@ -60,26 +62,45 @@ def swh_scheduler(swh_scheduler_config):
swh_db_scheduler = swh_scheduler
-@pytest.fixture(scope="session")
-def swh_scheduler_celery_app():
- """Set up a Celery app as swh.scheduler and swh worker tests would expect it"""
- test_app = TestApp(
- set_as_current=True,
- enable_logging=True,
- task_cls="swh.scheduler.task:SWHTask",
- config={
- "accept_content": ["application/x-msgpack", "application/json"],
- "task_serializer": "msgpack",
- "result_serializer": "json",
- },
- )
- with setup_default_app(test_app, use_trap=False):
- from swh.scheduler.celery_backend import config
-
- config.app = test_app
- test_app.set_default()
- test_app.set_current()
- yield test_app
+def swh_scheduler_celery_app_fact(queues: List[str] = [],):
+ """Celery app fixture factory to create celery app the swh.scheduler and swh worker
+ tests would expect it.
+
+ """
+
+ @pytest.fixture(scope="session")
+ def swh_scheduler_celery_app_fn():
+ """Set up a Celery app as swh.scheduler and swh worker tests would expect it"""
+ kwargs = dict(
+ set_as_current=True,
+ enable_logging=True,
+ task_cls="swh.scheduler.task:SWHTask",
+ config={
+ "accept_content": ["application/x-msgpack", "application/json"],
+ "task_serializer": "msgpack",
+ "result_serializer": "json",
+ },
+ )
+ if queues:
+ kwargs["task_queues"] = [
+ Queue(queue, Exchange(queue), routing_key=queue) for queue in queues
+ ]
+
+ print("##### kwargs: ", kwargs)
+
+ test_app = TestApp(**kwargs)
+ with setup_default_app(test_app, use_trap=False):
+ from swh.scheduler.celery_backend import config
+
+ config.app = test_app
+ test_app.set_default()
+ test_app.set_current()
+ yield test_app
+
+ return swh_scheduler_celery_app_fn
+
+
+swh_scheduler_celery_app = swh_scheduler_celery_app_fact()
@pytest.fixture(scope="session")