Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_cli_celery_monitor.py
- This file was added.
# Copyright (C) 2020 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import logging | |||||
from click.testing import CliRunner | |||||
import pytest | |||||
from swh.scheduler.cli import cli | |||||
def invoke(*args, catch_exceptions=False): | |||||
result = CliRunner(mix_stderr=False).invoke( | |||||
cli, ["celery-monitor", *args], catch_exceptions=catch_exceptions, | |||||
) | |||||
return result | |||||
def test_celery_monitor(): | |||||
"""Check that celery-monitor returns its help text""" | |||||
result = invoke() | |||||
assert "Commands:" in result.stdout | |||||
assert "Options:" in result.stdout | |||||
def test_celery_monitor_ping(caplog, swh_app, celery_session_worker): | |||||
caplog.set_level(logging.INFO, "swh.scheduler.cli.celery_monitor") | |||||
result = invoke("--pattern", celery_session_worker.hostname, "ping-workers") | |||||
assert result.exit_code == 0 | |||||
assert len(caplog.records) == 1 | |||||
(record,) = caplog.records | |||||
assert record.levelname == "INFO" | |||||
assert f"response from {celery_session_worker.hostname}" in record.message | |||||
@pytest.mark.parametrize( | |||||
"filter_args,filter_message,exit_code", | |||||
[ | |||||
((), "Matching all workers", 0), | |||||
( | |||||
("--pattern", "celery@*.test-host"), | |||||
"Using glob pattern celery@*.test-host", | |||||
1, | |||||
), | |||||
( | |||||
("--pattern", "celery@test-type.test-host"), | |||||
"Using destinations celery@test-type.test-host", | |||||
1, | |||||
), | |||||
( | |||||
("--pattern", "celery@test-type.test-host,celery@test-type2.test-host"), | |||||
( | |||||
"Using destinations " | |||||
"celery@test-type.test-host, celery@test-type2.test-host" | |||||
), | |||||
1, | |||||
), | |||||
], | |||||
) | |||||
def test_celery_monitor_ping_filter( | |||||
caplog, swh_app, celery_session_worker, filter_args, filter_message, exit_code | |||||
): | |||||
caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") | |||||
result = invoke("--timeout", "1.5", *filter_args, "ping-workers") | |||||
assert result.exit_code == exit_code, result.stdout | |||||
got_no_response_message = False | |||||
got_filter_message = False | |||||
for record in caplog.records: | |||||
# Check the proper filter has been generated | |||||
if record.levelname == "DEBUG": | |||||
if filter_message in record.message: | |||||
got_filter_message = True | |||||
# Check that no worker responded | |||||
if record.levelname == "INFO": | |||||
if "No response in" in record.message: | |||||
got_no_response_message = True | |||||
assert got_filter_message | |||||
if filter_args: | |||||
assert got_no_response_message | |||||
def test_celery_monitor_list_running(caplog, swh_app, celery_session_worker): | |||||
caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") | |||||
result = invoke("--pattern", celery_session_worker.hostname, "list-running") | |||||
assert result.exit_code == 0, result.stdout | |||||
for record in caplog.records: | |||||
if record.levelname != "INFO": | |||||
continue | |||||
assert f"{celery_session_worker.hostname}: no active tasks" in record.message | |||||
@pytest.mark.parametrize("format", ["csv", "pretty"]) | |||||
def test_celery_monitor_list_running_format( | |||||
caplog, swh_app, celery_session_worker, format | |||||
): | |||||
caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") | |||||
result = invoke( | |||||
"--pattern", celery_session_worker.hostname, "list-running", "--format", format | |||||
) | |||||
assert result.exit_code == 0, result.stdout | |||||
for record in caplog.records: | |||||
if record.levelname != "INFO": | |||||
continue | |||||
assert f"{celery_session_worker.hostname}: no active tasks" in record.message | |||||
if format == "csv": | |||||
lines = result.stdout.splitlines() | |||||
assert lines == ["worker,name,args,kwargs,duration,worker_pid"] |