diff --git a/swh/scheduler/pytest_plugin.py b/swh/scheduler/pytest_plugin.py --- a/swh/scheduler/pytest_plugin.py +++ b/swh/scheduler/pytest_plugin.py @@ -7,6 +7,10 @@ import glob import os +from celery.contrib.testing import worker +from celery.contrib.testing.app import TestApp, setup_default_app + +import pkg_resources import pytest from swh.core.utils import numfile_sortkey as sortkey @@ -61,3 +65,47 @@ # this alias is used to be able to easily instantiate a db-backed Scheduler # eg. for the RPC client/server test suite. swh_db_scheduler = swh_scheduler + + +@pytest.fixture(scope="session") +def swh_scheduler_celery_app(): + """Set up a Celery app as swh.scheduler and swh worker tests would expect it""" + test_app = TestApp( + set_as_current=True, + enable_logging=True, + task_cls="swh.scheduler.task:SWHTask", + config={ + "accept_content": ["application/x-msgpack", "application/json"], + "task_serializer": "msgpack", + "result_serializer": "json", + }, + ) + with setup_default_app(test_app, use_trap=False): + from swh.scheduler.celery_backend import config + + config.app = test_app + test_app.set_default() + test_app.set_current() + yield test_app + + +@pytest.fixture(scope="session") +def swh_scheduler_celery_includes(): + """List of task modules that should be loaded by the swh_scheduler_celery_worker on +startup.""" + task_modules = ["swh.scheduler.tests.tasks"] + + for entrypoint in pkg_resources.iter_entry_points("swh.workers"): + task_modules.extend(entrypoint.load()().get("task_modules", [])) + return task_modules + + +@pytest.fixture(scope="session") +def swh_scheduler_celery_worker( + swh_scheduler_celery_app, swh_scheduler_celery_includes, +): + """Spawn a worker""" + for module in swh_scheduler_celery_includes: + swh_scheduler_celery_app.loader.import_task_module(module) + with worker.start_worker(swh_scheduler_celery_app, pool="solo") as w: + yield w diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -4,14 +4,18 @@ # See top-level LICENSE file for more information import os -import pytest from datetime import datetime, timezone -import pkg_resources from typing import List +import pytest + from swh.scheduler.model import ListedOrigin, Lister from swh.scheduler.tests.common import LISTERS +from swh.scheduler.pytest_plugin import ( # noqa: F401 (backwards-compat imports) + swh_scheduler_celery_app as swh_app, + swh_scheduler_celery_worker as celery_session_worker, +) # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith("CELERY")]: @@ -22,48 +26,6 @@ os.environ["LC_ALL"] = "C.UTF-8" -@pytest.fixture(scope="session") -def celery_enable_logging(): - return True - - -@pytest.fixture(scope="session") -def celery_includes(): - task_modules = [ - "swh.scheduler.tests.tasks", - ] - for entrypoint in pkg_resources.iter_entry_points("swh.workers"): - task_modules.extend(entrypoint.load()().get("task_modules", [])) - return task_modules - - -@pytest.fixture(scope="session") -def celery_parameters(): - return { - "task_cls": "swh.scheduler.task:SWHTask", - } - - -@pytest.fixture(scope="session") -def celery_config(): - return { - "accept_content": ["application/x-msgpack", "application/json"], - "task_serializer": "msgpack", - "result_serializer": "json", - } - - -# use the celery_session_app fixture to monkeypatch the 'main' -# swh.scheduler.celery_backend.config.app Celery application -# with the test application -@pytest.fixture(scope="session") -def swh_app(celery_session_app): - from swh.scheduler.celery_backend import config - - config.app = celery_session_app - yield celery_session_app - - @pytest.fixture def stored_lister(swh_scheduler) -> Lister: """Store a lister in the scheduler and return its information""" diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -10,25 +10,29 @@ from swh.scheduler.celery_backend.runner import run_ready_tasks -def test_ping(swh_app, celery_session_worker): - res = swh_app.send_task("swh.scheduler.tests.tasks.ping") +def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker): + res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.ping") assert res res.wait() assert res.successful() assert res.result == "OK" -def test_ping_with_kw(swh_app, celery_session_worker): - res = swh_app.send_task("swh.scheduler.tests.tasks.ping", kwargs={"a": 1}) +def test_ping_with_kw(swh_scheduler_celery_app, swh_scheduler_celery_worker): + res = swh_scheduler_celery_app.send_task( + "swh.scheduler.tests.tasks.ping", kwargs={"a": 1} + ) assert res res.wait() assert res.successful() assert res.result == "OK (kw={'a': 1})" -def test_multiping(swh_app, celery_session_worker): +def test_multiping(swh_scheduler_celery_app, swh_scheduler_celery_worker): "Test that a task that spawns subtasks (group) works" - res = swh_app.send_task("swh.scheduler.tests.tasks.multiping", kwargs={"n": 5}) + res = swh_scheduler_celery_app.send_task( + "swh.scheduler.tests.tasks.multiping", kwargs={"n": 5} + ) assert res res.wait() @@ -38,7 +42,7 @@ # to complete promise_id = res.result assert promise_id - promise = GroupResult.restore(promise_id, app=swh_app) + promise = GroupResult.restore(promise_id, app=swh_scheduler_celery_app) for i in range(5): if promise.ready(): break @@ -50,7 +54,9 @@ assert ("OK (kw={'i': %s})" % i) in results -def test_scheduler_fixture(swh_app, celery_session_worker, swh_scheduler): +def test_scheduler_fixture( + swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler +): "Test that the scheduler fixture works properly" task_type = swh_scheduler.get_task_type("swh-test-ping") @@ -59,35 +65,39 @@ swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_app) + backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert backend_tasks for task in backend_tasks: # Make sure the task completed AsyncResult(id=task["backend_id"]).get() -def test_task_return_value(swh_app, celery_session_worker, swh_scheduler): +def test_task_return_value( + swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler +): task_type = swh_scheduler.get_task_type("swh-test-add") assert task_type assert task_type["backend_name"] == "swh.scheduler.tests.tasks.add" swh_scheduler.create_tasks([create_task_dict("swh-test-add", "oneshot", 12, 30)]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_app) + backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert len(backend_tasks) == 1 task = backend_tasks[0] value = AsyncResult(id=task["backend_id"]).get() assert value == 42 -def test_task_exception(swh_app, celery_session_worker, swh_scheduler): +def test_task_exception( + swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler +): task_type = swh_scheduler.get_task_type("swh-test-error") assert task_type assert task_type["backend_name"] == "swh.scheduler.tests.tasks.error" swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_app) + backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert len(backend_tasks) == 1 task = backend_tasks[0] @@ -96,11 +106,11 @@ result.get() -def test_statsd(swh_app, celery_session_worker, mocker): +def test_statsd(swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker): m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) - res = swh_app.send_task("swh.scheduler.tests.tasks.echo") + res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.echo") assert res res.wait() assert res.successful() @@ -129,11 +139,13 @@ ) -def test_statsd_with_status(swh_app, celery_session_worker, mocker): +def test_statsd_with_status( + swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker +): m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) - res = swh_app.send_task( + res = swh_scheduler_celery_app.send_task( "swh.scheduler.tests.tasks.echo", kwargs={"status": "eventful"} ) assert res diff --git a/swh/scheduler/tests/test_cli_celery_monitor.py b/swh/scheduler/tests/test_cli_celery_monitor.py --- a/swh/scheduler/tests/test_cli_celery_monitor.py +++ b/swh/scheduler/tests/test_cli_celery_monitor.py @@ -28,10 +28,12 @@ assert "Options:" in result.stdout -def test_celery_monitor_ping(caplog, swh_app, celery_session_worker): +def test_celery_monitor_ping( + caplog, swh_scheduler_celery_app, swh_scheduler_celery_worker +): caplog.set_level(logging.INFO, "swh.scheduler.cli.celery_monitor") - result = invoke("--pattern", celery_session_worker.hostname, "ping-workers") + result = invoke("--pattern", swh_scheduler_celery_worker.hostname, "ping-workers") assert result.exit_code == 0 @@ -40,7 +42,7 @@ (record,) = caplog.records assert record.levelname == "INFO" - assert f"response from {celery_session_worker.hostname}" in record.message + assert f"response from {swh_scheduler_celery_worker.hostname}" in record.message @pytest.mark.parametrize( @@ -68,7 +70,12 @@ ], ) def test_celery_monitor_ping_filter( - caplog, swh_app, celery_session_worker, filter_args, filter_message, exit_code + caplog, + swh_scheduler_celery_app, + swh_scheduler_celery_worker, + filter_args, + filter_message, + exit_code, ): caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") @@ -95,27 +102,35 @@ assert got_no_response_message -def test_celery_monitor_list_running(caplog, swh_app, celery_session_worker): +def test_celery_monitor_list_running( + caplog, swh_scheduler_celery_app, swh_scheduler_celery_worker +): caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") - result = invoke("--pattern", celery_session_worker.hostname, "list-running") + result = invoke("--pattern", swh_scheduler_celery_worker.hostname, "list-running") assert result.exit_code == 0, result.stdout for record in caplog.records: if record.levelname != "INFO": continue - assert f"{celery_session_worker.hostname}: no active tasks" in record.message + assert ( + f"{swh_scheduler_celery_worker.hostname}: no active tasks" in record.message + ) @pytest.mark.parametrize("format", ["csv", "pretty"]) def test_celery_monitor_list_running_format( - caplog, swh_app, celery_session_worker, format + caplog, swh_scheduler_celery_app, swh_scheduler_celery_worker, format ): caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") result = invoke( - "--pattern", celery_session_worker.hostname, "list-running", "--format", format + "--pattern", + swh_scheduler_celery_worker.hostname, + "list-running", + "--format", + format, ) assert result.exit_code == 0, result.stdout @@ -123,7 +138,9 @@ for record in caplog.records: if record.levelname != "INFO": continue - assert f"{celery_session_worker.hostname}: no active tasks" in record.message + assert ( + f"{swh_scheduler_celery_worker.hostname}: no active tasks" in record.message + ) if format == "csv": lines = result.stdout.splitlines()