diff --git a/swh/scheduler/pytest_plugin.py b/swh/scheduler/pytest_plugin.py index cdeb0ff..723bc3a 100644 --- a/swh/scheduler/pytest_plugin.py +++ b/swh/scheduler/pytest_plugin.py @@ -1,63 +1,111 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import glob import os +from celery.contrib.testing import worker +from celery.contrib.testing.app import TestApp, setup_default_app + +import pkg_resources import pytest from swh.core.utils import numfile_sortkey as sortkey import swh.scheduler from swh.scheduler import get_scheduler SQL_DIR = os.path.join(os.path.dirname(swh.scheduler.__file__), "sql") DUMP_FILES = os.path.join(SQL_DIR, "*.sql") # celery tasks for testing purpose; tasks themselves should be # in swh/scheduler/tests/tasks.py TASK_NAMES = ["ping", "multiping", "add", "error", "echo"] @pytest.fixture def swh_scheduler_config(request, postgresql): scheduler_config = { "db": postgresql.dsn, } all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() for fname in all_dump_files: with open(fname) as fobj: cursor.execute(fobj.read()) postgresql.commit() return scheduler_config @pytest.fixture def swh_scheduler(swh_scheduler_config): scheduler = get_scheduler("local", swh_scheduler_config) for taskname in TASK_NAMES: scheduler.create_task_type( { "type": "swh-test-{}".format(taskname), "description": "The {} testing task".format(taskname), "backend_name": "swh.scheduler.tests.tasks.{}".format(taskname), "default_interval": timedelta(days=1), "min_interval": timedelta(hours=6), "max_interval": timedelta(days=12), } ) return scheduler # this alias is used to be able to easily instantiate a db-backed Scheduler # eg. for the RPC client/server test suite. swh_db_scheduler = swh_scheduler + + +@pytest.fixture(scope="session") +def swh_scheduler_celery_app(): + """Set up a Celery app as swh.scheduler and swh worker tests would expect it""" + test_app = TestApp( + set_as_current=True, + enable_logging=True, + task_cls="swh.scheduler.task:SWHTask", + config={ + "accept_content": ["application/x-msgpack", "application/json"], + "task_serializer": "msgpack", + "result_serializer": "json", + }, + ) + with setup_default_app(test_app, use_trap=False): + from swh.scheduler.celery_backend import config + + config.app = test_app + test_app.set_default() + test_app.set_current() + yield test_app + + +@pytest.fixture(scope="session") +def swh_scheduler_celery_includes(): + """List of task modules that should be loaded by the swh_scheduler_celery_worker on +startup.""" + task_modules = ["swh.scheduler.tests.tasks"] + + for entrypoint in pkg_resources.iter_entry_points("swh.workers"): + task_modules.extend(entrypoint.load()().get("task_modules", [])) + return task_modules + + +@pytest.fixture(scope="session") +def swh_scheduler_celery_worker( + swh_scheduler_celery_app, swh_scheduler_celery_includes, +): + """Spawn a worker""" + for module in swh_scheduler_celery_includes: + swh_scheduler_celery_app.loader.import_task_module(module) + with worker.start_worker(swh_scheduler_celery_app, pool="solo") as w: + yield w diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index 8acc02e..380692c 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,84 +1,46 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os -import pytest from datetime import datetime, timezone -import pkg_resources from typing import List +import pytest + from swh.scheduler.model import ListedOrigin, Lister from swh.scheduler.tests.common import LISTERS +from swh.scheduler.pytest_plugin import ( # noqa: F401 (backwards-compat imports) + swh_scheduler_celery_app as swh_app, + swh_scheduler_celery_worker as celery_session_worker, +) # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith("CELERY")]: os.environ.pop(var) # test_cli tests depends on a en/C locale, so ensure it os.environ["LC_ALL"] = "C.UTF-8" -@pytest.fixture(scope="session") -def celery_enable_logging(): - return True - - -@pytest.fixture(scope="session") -def celery_includes(): - task_modules = [ - "swh.scheduler.tests.tasks", - ] - for entrypoint in pkg_resources.iter_entry_points("swh.workers"): - task_modules.extend(entrypoint.load()().get("task_modules", [])) - return task_modules - - -@pytest.fixture(scope="session") -def celery_parameters(): - return { - "task_cls": "swh.scheduler.task:SWHTask", - } - - -@pytest.fixture(scope="session") -def celery_config(): - return { - "accept_content": ["application/x-msgpack", "application/json"], - "task_serializer": "msgpack", - "result_serializer": "json", - } - - -# use the celery_session_app fixture to monkeypatch the 'main' -# swh.scheduler.celery_backend.config.app Celery application -# with the test application -@pytest.fixture(scope="session") -def swh_app(celery_session_app): - from swh.scheduler.celery_backend import config - - config.app = celery_session_app - yield celery_session_app - - @pytest.fixture def stored_lister(swh_scheduler) -> Lister: """Store a lister in the scheduler and return its information""" return swh_scheduler.get_or_create_lister(**LISTERS[0]) @pytest.fixture def listed_origins(stored_lister) -> List[ListedOrigin]: """Return a (fixed) set of 1000 listed origins""" return [ ListedOrigin( lister_id=stored_lister.id, url=f"https://example.com/{i:04d}.git", visit_type="git", last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), ) for i in range(1000) ] diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py index ed9539d..b61cea1 100644 --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -1,164 +1,176 @@ from time import sleep from itertools import count from celery.result import GroupResult from celery.result import AsyncResult import pytest from swh.scheduler.utils import create_task_dict from swh.scheduler.celery_backend.runner import run_ready_tasks -def test_ping(swh_app, celery_session_worker): - res = swh_app.send_task("swh.scheduler.tests.tasks.ping") +def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker): + res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.ping") assert res res.wait() assert res.successful() assert res.result == "OK" -def test_ping_with_kw(swh_app, celery_session_worker): - res = swh_app.send_task("swh.scheduler.tests.tasks.ping", kwargs={"a": 1}) +def test_ping_with_kw(swh_scheduler_celery_app, swh_scheduler_celery_worker): + res = swh_scheduler_celery_app.send_task( + "swh.scheduler.tests.tasks.ping", kwargs={"a": 1} + ) assert res res.wait() assert res.successful() assert res.result == "OK (kw={'a': 1})" -def test_multiping(swh_app, celery_session_worker): +def test_multiping(swh_scheduler_celery_app, swh_scheduler_celery_worker): "Test that a task that spawns subtasks (group) works" - res = swh_app.send_task("swh.scheduler.tests.tasks.multiping", kwargs={"n": 5}) + res = swh_scheduler_celery_app.send_task( + "swh.scheduler.tests.tasks.multiping", kwargs={"n": 5} + ) assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id - promise = GroupResult.restore(promise_id, app=swh_app) + promise = GroupResult.restore(promise_id, app=swh_scheduler_celery_app) for i in range(5): if promise.ready(): break sleep(1) results = [x.get() for x in promise.results] assert len(results) == 5 for i in range(5): assert ("OK (kw={'i': %s})" % i) in results -def test_scheduler_fixture(swh_app, celery_session_worker, swh_scheduler): +def test_scheduler_fixture( + swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler +): "Test that the scheduler fixture works properly" task_type = swh_scheduler.get_task_type("swh-test-ping") assert task_type assert task_type["backend_name"] == "swh.scheduler.tests.tasks.ping" swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_app) + backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert backend_tasks for task in backend_tasks: # Make sure the task completed AsyncResult(id=task["backend_id"]).get() -def test_task_return_value(swh_app, celery_session_worker, swh_scheduler): +def test_task_return_value( + swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler +): task_type = swh_scheduler.get_task_type("swh-test-add") assert task_type assert task_type["backend_name"] == "swh.scheduler.tests.tasks.add" swh_scheduler.create_tasks([create_task_dict("swh-test-add", "oneshot", 12, 30)]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_app) + backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert len(backend_tasks) == 1 task = backend_tasks[0] value = AsyncResult(id=task["backend_id"]).get() assert value == 42 -def test_task_exception(swh_app, celery_session_worker, swh_scheduler): +def test_task_exception( + swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler +): task_type = swh_scheduler.get_task_type("swh-test-error") assert task_type assert task_type["backend_name"] == "swh.scheduler.tests.tasks.error" swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_app) + backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert len(backend_tasks) == 1 task = backend_tasks[0] result = AsyncResult(id=task["backend_id"]) with pytest.raises(NotImplementedError): result.get() -def test_statsd(swh_app, celery_session_worker, mocker): +def test_statsd(swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker): m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) - res = swh_app.send_task("swh.scheduler.tests.tasks.echo") + res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.echo") assert res res.wait() assert res.successful() assert res.result == {} m.assert_any_call( "swh_task_called_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_start_ts:0|g|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_end_ts:1|g|" "#status:uneventful,task:swh.scheduler.tests.tasks.echo," "worker:unknown worker" ) m.assert_any_call( "swh_task_duration_seconds:1000|ms|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_success_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) -def test_statsd_with_status(swh_app, celery_session_worker, mocker): +def test_statsd_with_status( + swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker +): m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) - res = swh_app.send_task( + res = swh_scheduler_celery_app.send_task( "swh.scheduler.tests.tasks.echo", kwargs={"status": "eventful"} ) assert res res.wait() assert res.successful() assert res.result == {"status": "eventful"} m.assert_any_call( "swh_task_called_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_start_ts:0|g|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_end_ts:1|g|" "#status:eventful,task:swh.scheduler.tests.tasks.echo," "worker:unknown worker" ) m.assert_any_call( "swh_task_duration_seconds:1000|ms|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_success_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) diff --git a/swh/scheduler/tests/test_cli_celery_monitor.py b/swh/scheduler/tests/test_cli_celery_monitor.py index 5a0ed24..151eff1 100644 --- a/swh/scheduler/tests/test_cli_celery_monitor.py +++ b/swh/scheduler/tests/test_cli_celery_monitor.py @@ -1,130 +1,147 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from click.testing import CliRunner import pytest from swh.scheduler.cli import cli def invoke(*args, catch_exceptions=False): result = CliRunner(mix_stderr=False).invoke( cli, ["celery-monitor", *args], catch_exceptions=catch_exceptions, ) return result def test_celery_monitor(): """Check that celery-monitor returns its help text""" result = invoke() assert "Commands:" in result.stdout assert "Options:" in result.stdout -def test_celery_monitor_ping(caplog, swh_app, celery_session_worker): +def test_celery_monitor_ping( + caplog, swh_scheduler_celery_app, swh_scheduler_celery_worker +): caplog.set_level(logging.INFO, "swh.scheduler.cli.celery_monitor") - result = invoke("--pattern", celery_session_worker.hostname, "ping-workers") + result = invoke("--pattern", swh_scheduler_celery_worker.hostname, "ping-workers") assert result.exit_code == 0 assert len(caplog.records) == 1 (record,) = caplog.records assert record.levelname == "INFO" - assert f"response from {celery_session_worker.hostname}" in record.message + assert f"response from {swh_scheduler_celery_worker.hostname}" in record.message @pytest.mark.parametrize( "filter_args,filter_message,exit_code", [ ((), "Matching all workers", 0), ( ("--pattern", "celery@*.test-host"), "Using glob pattern celery@*.test-host", 1, ), ( ("--pattern", "celery@test-type.test-host"), "Using destinations celery@test-type.test-host", 1, ), ( ("--pattern", "celery@test-type.test-host,celery@test-type2.test-host"), ( "Using destinations " "celery@test-type.test-host, celery@test-type2.test-host" ), 1, ), ], ) def test_celery_monitor_ping_filter( - caplog, swh_app, celery_session_worker, filter_args, filter_message, exit_code + caplog, + swh_scheduler_celery_app, + swh_scheduler_celery_worker, + filter_args, + filter_message, + exit_code, ): caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") result = invoke("--timeout", "1.5", *filter_args, "ping-workers") assert result.exit_code == exit_code, result.stdout got_no_response_message = False got_filter_message = False for record in caplog.records: # Check the proper filter has been generated if record.levelname == "DEBUG": if filter_message in record.message: got_filter_message = True # Check that no worker responded if record.levelname == "INFO": if "No response in" in record.message: got_no_response_message = True assert got_filter_message if filter_args: assert got_no_response_message -def test_celery_monitor_list_running(caplog, swh_app, celery_session_worker): +def test_celery_monitor_list_running( + caplog, swh_scheduler_celery_app, swh_scheduler_celery_worker +): caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") - result = invoke("--pattern", celery_session_worker.hostname, "list-running") + result = invoke("--pattern", swh_scheduler_celery_worker.hostname, "list-running") assert result.exit_code == 0, result.stdout for record in caplog.records: if record.levelname != "INFO": continue - assert f"{celery_session_worker.hostname}: no active tasks" in record.message + assert ( + f"{swh_scheduler_celery_worker.hostname}: no active tasks" in record.message + ) @pytest.mark.parametrize("format", ["csv", "pretty"]) def test_celery_monitor_list_running_format( - caplog, swh_app, celery_session_worker, format + caplog, swh_scheduler_celery_app, swh_scheduler_celery_worker, format ): caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") result = invoke( - "--pattern", celery_session_worker.hostname, "list-running", "--format", format + "--pattern", + swh_scheduler_celery_worker.hostname, + "list-running", + "--format", + format, ) assert result.exit_code == 0, result.stdout for record in caplog.records: if record.levelname != "INFO": continue - assert f"{celery_session_worker.hostname}: no active tasks" in record.message + assert ( + f"{swh_scheduler_celery_worker.hostname}: no active tasks" in record.message + ) if format == "csv": lines = result.stdout.splitlines() assert lines == ["worker,name,args,kwargs,duration,worker_pid"]