Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_cli_celery_monitor.py
- This file was added.
# Copyright (C) 2020 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import logging | |||||
from click.testing import CliRunner | |||||
import pytest | |||||
from swh.scheduler.cli import cli | |||||
def invoke(*args, catch_exceptions=False): | |||||
result = CliRunner(mix_stderr=False).invoke( | |||||
cli, ["celery-monitor", *args], catch_exceptions=catch_exceptions, | |||||
) | |||||
return result | |||||
def test_celery_monitor(): | |||||
"""Check that celery-monitor returns its help text""" | |||||
result = invoke() | |||||
assert "Commands:" in result.stdout | |||||
assert "Options:" in result.stdout | |||||
def test_celery_monitor_ping_all(caplog, swh_app, celery_session_worker): | |||||
caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") | |||||
result = invoke("ping-workers") | |||||
assert result.exit_code == 0 | |||||
got_response_message = False | |||||
got_all_workers_message = False | |||||
for record in caplog.records: | |||||
# Check that the filtering didn't do anything | |||||
if record.levelname == "DEBUG": | |||||
if "Matching all workers" in record.message: | |||||
got_all_workers_message = True | |||||
# Check that the worker responded | |||||
if record.levelname == "INFO": | |||||
if f"response from {celery_session_worker.hostname}" in record.message: | |||||
got_response_message = True | |||||
assert got_all_workers_message | |||||
assert got_response_message | |||||
@pytest.mark.parametrize( | |||||
"filter_args,filter_message", | |||||
[ | |||||
(("--host", "test-host"), "Using glob pattern celery@*.test-host"), | |||||
(("--type", "test-type"), "Using glob pattern celery@test-type.*"), | |||||
( | |||||
("--host", "test-host", "--type", "test-type"), | |||||
"Using destination celery@test-type.test-host", | |||||
), | |||||
], | |||||
) | |||||
def test_celery_monitor_ping_filter( | |||||
caplog, swh_app, celery_session_worker, filter_args, filter_message | |||||
): | |||||
caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") | |||||
result = invoke("--timeout", "1.5", *filter_args, "ping-workers") | |||||
assert result.exit_code == 1, result.stdout | |||||
got_no_response_message = False | |||||
got_filter_message = False | |||||
for record in caplog.records: | |||||
# Check the proper filter has been generated | |||||
if record.levelname == "DEBUG": | |||||
if filter_message in record.message: | |||||
got_filter_message = True | |||||
# Check that no worker responded | |||||
if record.levelname == "INFO": | |||||
if "No response in" in record.message: | |||||
got_no_response_message = True | |||||
assert got_filter_message | |||||
assert got_no_response_message | |||||
def test_celery_monitor_list_running(caplog, swh_app, celery_session_worker): | |||||
caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") | |||||
result = invoke("--timeout", "1.5", "list-running") | |||||
assert result.exit_code == 0, result.stdout | |||||
for record in caplog.records: | |||||
if record.levelname != "INFO": | |||||
continue | |||||
assert f"{celery_session_worker.hostname}: no active tasks" in record.message | |||||
@pytest.mark.parametrize("format", ["csv", "pretty"]) | |||||
def test_celery_monitor_list_running_format( | |||||
caplog, swh_app, celery_session_worker, format | |||||
): | |||||
caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") | |||||
result = invoke("--timeout", "1.5", "list-running", "--format", format) | |||||
assert result.exit_code == 0, result.stdout | |||||
for record in caplog.records: | |||||
if record.levelname != "INFO": | |||||
continue | |||||
assert f"{celery_session_worker.hostname}: no active tasks" in record.message | |||||
if format == "csv": | |||||
lines = result.stdout.splitlines() | |||||
assert lines == ["worker,name,args,kwargs,duration,worker_pid"] |