diff --git a/bin/swh-worker-control b/bin/swh-worker-control deleted file mode 100755 --- a/bin/swh-worker-control +++ /dev/null @@ -1,284 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (C) 2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime -from fnmatch import fnmatch -from operator import itemgetter -import os -import sys - -import click - - -def list_remote_workers(inspect): - ping_replies = inspect.ping() - if not ping_replies: - return {} - workers = list(sorted(ping_replies)) - ret = {} - - for worker_name in workers: - if not worker_name.startswith("celery@"): - print("Unsupported worker: %s" % worker_name, file=sys.stderr) - continue - type, host = worker_name[len("celery@") :].split(".", 1) - worker = { - "name": worker_name, - "host": host, - "type": type, - } - ret[worker_name] = worker - - return ret - - -def make_filters(filter_host, filter_type): - """Parse the filters and create test functions""" - - def include(field, value): - def filter(worker, field=field, value=value): - return fnmatch(worker[field], value) - - return filter - - def exclude(field, value): - def filter(worker, field=field, value=value): - return not fnmatch(worker[field], value) - - return filter - - filters = [] - for host in filter_host: - if host.startswith("-"): - filters.append(exclude("host", host[1:])) - else: - filters.append(include("host", host)) - - for type_ in filter_type: - if type_.startswith("-"): - filters.append(exclude("type", type_[1:])) - else: - filters.append(include("type", type_)) - - return filters - - -def filter_workers(workers, filters): - """Filter workers according to the set criteria""" - return { - name: worker - for name, worker in workers.items() - if all(check(worker) for check in filters) - } - - -def get_clock_offsets(workers, inspect): - """Add a clock_offset entry for each worker""" - err_msg = "Could not get monotonic clock for {worker}" - - t = datetime.datetime.now(tz=datetime.timezone.utc) - for worker, clock in inspect._request("monotonic").items(): - monotonic = clock.get("monotonic") - if monotonic is None: - monotonic = 0 - click.echo(err_msg.format(worker=worker), err=True) - dt = datetime.timedelta(seconds=monotonic) - workers[worker]["clock_offset"] = t - dt - - -def worker_to_wallclock(worker, monotonic): - """Convert a monotonic timestamp from a worker to a wall clock time""" - dt = datetime.timedelta(seconds=monotonic) - return worker["clock_offset"] + dt - - -@click.group() -@click.option( - "--instance-config", - metavar="CONFIG", - default=None, - help="Use this worker instance configuration", -) -@click.option( - "--host", metavar="HOSTNAME_FILTER", multiple=True, help="Filter by hostname" -) -@click.option( - "--type", metavar="WORKER_TYPE_FILTER", multiple=True, help="Filter by worker type" -) -@click.option( - "--timeout", - metavar="TIMEOUT", - type=float, - default=1.0, - help="Timeout for remote control communication", -) -@click.option("--debug/--no-debug", default=False, help="Turn on debugging") -@click.pass_context -def cli(ctx, debug, timeout, instance_config, host, type): - """Manage the Software Heritage workers - - Filters support globs; a filter starting with a "-" excludes the - corresponding values. - - """ - if instance_config: - os.environ["SWH_WORKER_INSTANCE"] = instance_config - - from swh.scheduler.celery_backend.config import app - - full_inspect = app.control.inspect(timeout=timeout) - - workers = filter_workers( - list_remote_workers(full_inspect), make_filters(host, type) - ) - ctx.obj["workers"] = workers - - destination = list(workers) - inspect = app.control.inspect(destination=destination, timeout=timeout) - ctx.obj["inspect"] = inspect - - get_clock_offsets(workers, inspect) - - ctx.obj["control"] = app.control - ctx.obj["destination"] = destination - ctx.obj["timeout"] = timeout - ctx.obj["debug"] = debug - - -@cli.command() -@click.pass_context -def list_workers(ctx): - """List the currently running workers""" - workers = ctx.obj["workers"] - - for worker_name, worker in sorted(workers.items()): - click.echo("{type} alive on {host}".format(**worker)) - - if not workers: - sys.exit(2) - - -@cli.command() -@click.pass_context -def list_tasks(ctx): - """List the tasks currently running on workers""" - task_template = ( - "{worker} {name}" - "[{id} " - "started={started:%Y-%m-%mT%H:%M:%S} " - "pid={worker_pid}] {args} {kwargs}" - ) - inspect = ctx.obj["inspect"] - workers = ctx.obj["workers"] - active = inspect.active() - - if not active: - click.echo("No reply from workers", err=True) - sys.exit(2) - - has_tasks = False - for worker_name, tasks in sorted(active.items()): - worker = workers[worker_name] - if not tasks: - click.echo("No active tasks on {name}".format(**worker), err=True) - for task in sorted(tasks, key=itemgetter("time_start")): - task["started"] = worker_to_wallclock(worker, task["time_start"]) - click.echo(task_template.format(worker=worker_name, **task)) - has_tasks = True - - if not has_tasks: - sys.exit(2) - - -@cli.command() -@click.pass_context -def list_queues(ctx): - """List all the queues currently enabled on the workers""" - inspect = ctx.obj["inspect"] - active = inspect.active_queues() - - if not active: - click.echo("No reply from workers", err=True) - sys.exit(2) - - has_queues = False - for worker_name, queues in sorted(active.items()): - queues = sorted(queue["name"] for queue in queues) - if queues: - click.echo( - "{worker} {queues}".format(worker=worker_name, queues=" ".join(queues)) - ) - has_queues = True - else: - click.echo("No queues for {worker}".format(worker=worker_name), err=True) - - if not has_queues: - sys.exit(2) - - -@cli.command() -@click.option("--noop", is_flag=True, default=False, help="Do not proceed") -@click.argument("queues", nargs=-1) -@click.pass_context -def remove_queues(ctx, noop, queues): - """Cancel the queue for the given workers""" - msg_template = "Canceling queue {queue} on worker {worker}{noop}" - - inspect = ctx.obj["inspect"] - control = ctx.obj["control"] - timeout = ctx.obj["timeout"] - active = inspect.active_queues() - - if not queues: - queues = ["*"] - - if not active: - click.echo("No reply from workers", err=True) - sys.exit(2) - - for worker, active_queues in sorted(active.items()): - for queue in sorted(active_queues, key=itemgetter("name")): - if any(fnmatch(queue["name"], name) for name in queues): - msg = msg_template.format( - queue=queue["name"], worker=worker, noop=" (noop)" if noop else "" - ) - click.echo(msg, err=True) - if not noop: - control.cancel_consumer( - queue["name"], destination=[worker], timeout=timeout - ) - - -@cli.command() -@click.option("--noop", is_flag=True, default=False, help="Do not proceed") -@click.argument("queues", nargs=-1) -@click.pass_context -def add_queues(ctx, noop, queues): - """Start the queue for the given workers""" - msg_template = "Starting queue {queue} on worker {worker}{noop}" - - control = ctx.obj["control"] - timeout = ctx.obj["timeout"] - workers = ctx.obj["workers"] - - if not workers: - click.echo("No reply from workers", err=True) - sys.exit(2) - - for worker in sorted(workers): - for queue in queues: - msg = msg_template.format( - queue=queue, worker=worker, noop=" (noop)" if noop else "" - ) - click.echo(msg, err=True) - if not noop: - ret = control.add_consumer(queue, destination=[worker], timeout=timeout) - print(ret) - - -if __name__ == "__main__": - cli(obj={}) diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -45,7 +45,6 @@ author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DSCH/", packages=find_packages(), - scripts=["bin/swh-worker-control"], setup_requires=["setuptools-scm"], use_scm_version=True, install_requires=parse_requirements() + parse_requirements("swh"), diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py --- a/swh/scheduler/cli/__init__.py +++ b/swh/scheduler/cli/__init__.py @@ -37,19 +37,17 @@ Use a local scheduler instance by default (plugged to the main scheduler db). """ + try: + from psycopg2 import OperationalError + except ImportError: + + class OperationalError(Exception): + pass + from swh.core import config - from swh.scheduler.celery_backend.config import setup_log_handler from swh.scheduler import get_scheduler, DEFAULT_CONFIG ctx.ensure_object(dict) - log_level = ctx.obj.get("log_level", logging.INFO) - - setup_log_handler( - loglevel=log_level, - colorize=False, - format="[%(levelname)s] %(name)s -- %(message)s", - log_console=not no_stdout, - ) logger = logging.getLogger(__name__) scheduler = None @@ -67,7 +65,7 @@ try: logger.debug("Instantiating scheduler with %s" % (sched_conf)) scheduler = get_scheduler(**sched_conf) - except ValueError: + except (ValueError, OperationalError): # it's the subcommand to decide whether not having a proper # scheduler instance is a problem. pass @@ -76,7 +74,7 @@ ctx.obj["config"] = conf -from . import admin, task, task_type # noqa +from . import admin, celery_monitor, task, task_type # noqa def main(): diff --git a/swh/scheduler/cli/celery_monitor.py b/swh/scheduler/cli/celery_monitor.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/cli/celery_monitor.py @@ -0,0 +1,180 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from ast import literal_eval +import csv +import logging +import sys +import time +from typing import Any, Dict, List, Optional + +from celery.utils.nodenames import gethostname +import click +from kombu.matcher import match + +from . import cli + +logger = logging.getLogger(__name__) + + +def destination_from_host_type( + ctx: click.Context, host: Optional[str], type: Optional[str] +): + """Get the celery destination pattern from host and type values""" + pattern: List[str] = ["celery@"] + if type: + pattern.append(type) + else: + pattern.append("*") + pattern.append(".") + if host is not None: + if host == "LOCAL": + host = gethostname() + assert host is not None + pattern.append(host) + else: + pattern.append("*") + + rendered_pattern = "".join(pattern) + + if host is None and type is None: + logger.debug("Matching all workers") + elif "*" in pattern: + ctx.obj["inspect"].pattern = rendered_pattern + ctx.obj["inspect"].matcher = "glob" + logger.debug("Using glob pattern %s", rendered_pattern) + ctx.obj["destination_filter"] = match("glob", rendered_pattern) + else: + ctx.obj["inspect"].destination = [rendered_pattern] + logger.debug("Using destination %s", rendered_pattern) + + +@cli.group("celery-monitor") +@click.option( + "--timeout", type=float, default=3.0, help="Timeout for celery remote control" +) +@click.option("--host", help="Filter by hostname", default=None) +@click.option("--type", help="Filter by type", default=None) +@click.pass_context +def celery_monitor( + ctx: click.Context, timeout: float, host: Optional[str], type: Optional[str] +) -> None: + """Monitoring of Celery.""" + from swh.scheduler.celery_backend.config import app + + ctx.obj["timeout"] = timeout + ctx.obj["inspect"] = app.control.inspect(timeout=timeout) + + destination_from_host_type(ctx, host, type) + + +@celery_monitor.command("ping-workers") +@click.pass_context +def ping_workers(ctx: click.Context) -> None: + """Check which workers respond to the celery remote control""" + + response_times = {} + + def ping_callback(response): + rtt = time.monotonic() - ping_time + for destination in response: + logger.debug("Got ping response from %s: %r", destination, response) + response_times[destination] = rtt + + ctx.obj["inspect"].callback = ping_callback + + ping_time = time.monotonic() + ret = ctx.obj["inspect"].ping() + + if not ret: + logger.info("No response in %f seconds", time.monotonic() - ping_time) + ctx.exit(1) + + for destination in ret: + logger.info( + "Got response from %s in %f seconds", + destination, + response_times[destination], + ) + + ctx.exit(0) + + +@celery_monitor.command("list-running") +@click.option( + "--format", + help="Output format", + default="pretty", + type=click.Choice(["pretty", "csv"]), +) +@click.pass_context +def list_running(ctx: click.Context, format: str): + """List running tasks on the lister workers""" + response_times = {} + + def active_callback(response): + rtt = time.monotonic() - active_time + for destination in response: + response_times[destination] = rtt + + ctx.obj["inspect"].callback = active_callback + + active_time = time.monotonic() + ret = ctx.obj["inspect"].active() + + if not ret: + logger.info("No response in %f seconds", time.monotonic() - active_time) + ctx.exit(1) + + def pretty_task_arguments(task: Dict[str, Any]) -> str: + arg_list = [] + for arg in task["args"]: + arg_list.append(repr(arg)) + for k, v in task["kwargs"].items(): + arg_list.append(f"{k}={v!r}") + + return f'{task["name"]}({", ".join(arg_list)})' + + def get_task_data(worker: str, task: Dict[str, Any]) -> Dict[str, Any]: + duration = time.time() - task["time_start"] + return { + "worker": worker, + "name": task["name"], + "args": literal_eval(task["args"]), + "kwargs": literal_eval(task["kwargs"]), + "duration": duration, + "worker_pid": task["worker_pid"], + } + + if format == "csv": + writer = csv.DictWriter( + sys.stdout, ["worker", "name", "args", "kwargs", "duration", "worker_pid"] + ) + writer.writeheader() + + def output(data: Dict[str, Any]): + writer.writerow(data) + + elif format == "pretty": + + def output(data: Dict[str, Any]): + print( + f"{data['worker']}: {pretty_task_arguments(data)} " + f"[for {data['duration']:f}s, pid={data['worker_pid']}]" + ) + + else: + logger.error("Unknown format %s", format) + ctx.exit(127) + + for worker, active in sorted(ret.items()): + if not active: + logger.info("%s: no active tasks", worker) + continue + + for task in sorted(active, key=lambda t: t["time_start"]): + output(get_task_data(worker, task)) + + ctx.exit(0) diff --git a/swh/scheduler/tests/test_cli_celery_monitor.py b/swh/scheduler/tests/test_cli_celery_monitor.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_cli_celery_monitor.py @@ -0,0 +1,122 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +from click.testing import CliRunner +import pytest + +from swh.scheduler.cli import cli + + +def invoke(*args, catch_exceptions=False): + result = CliRunner(mix_stderr=False).invoke( + cli, ["celery-monitor", *args], catch_exceptions=catch_exceptions, + ) + + return result + + +def test_celery_monitor(): + """Check that celery-monitor returns its help text""" + + result = invoke() + + assert "Commands:" in result.stdout + assert "Options:" in result.stdout + + +def test_celery_monitor_ping_all(caplog, swh_app, celery_session_worker): + caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") + + result = invoke("ping-workers") + + assert result.exit_code == 0 + + got_response_message = False + got_all_workers_message = False + + for record in caplog.records: + # Check that the filtering didn't do anything + if record.levelname == "DEBUG": + if "Matching all workers" in record.message: + got_all_workers_message = True + # Check that the worker responded + if record.levelname == "INFO": + if f"response from {celery_session_worker.hostname}" in record.message: + got_response_message = True + + assert got_all_workers_message + assert got_response_message + + +@pytest.mark.parametrize( + "filter_args,filter_message", + [ + (("--host", "test-host"), "Using glob pattern celery@*.test-host"), + (("--type", "test-type"), "Using glob pattern celery@test-type.*"), + ( + ("--host", "test-host", "--type", "test-type"), + "Using destination celery@test-type.test-host", + ), + ], +) +def test_celery_monitor_ping_filter( + caplog, swh_app, celery_session_worker, filter_args, filter_message +): + caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") + + result = invoke("--timeout", "1.5", *filter_args, "ping-workers") + + assert result.exit_code == 1, result.stdout + + got_no_response_message = False + got_filter_message = False + + for record in caplog.records: + # Check the proper filter has been generated + if record.levelname == "DEBUG": + if filter_message in record.message: + got_filter_message = True + # Check that no worker responded + if record.levelname == "INFO": + if "No response in" in record.message: + got_no_response_message = True + + assert got_filter_message + assert got_no_response_message + + +def test_celery_monitor_list_running(caplog, swh_app, celery_session_worker): + caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") + + result = invoke("--timeout", "1.5", "list-running") + + assert result.exit_code == 0, result.stdout + + for record in caplog.records: + if record.levelname != "INFO": + continue + assert f"{celery_session_worker.hostname}: no active tasks" in record.message + + +@pytest.mark.parametrize("format", ["csv", "pretty"]) +def test_celery_monitor_list_running_format( + caplog, swh_app, celery_session_worker, format +): + caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") + + result = invoke("--timeout", "1.5", "list-running", "--format", format) + + assert result.exit_code == 0, result.stdout + + for record in caplog.records: + if record.levelname != "INFO": + continue + assert f"{celery_session_worker.hostname}: no active tasks" in record.message + + if format == "csv": + lines = result.stdout.splitlines() + assert lines == ["worker,name,args,kwargs,duration,worker_pid"]