Page MenuHomeSoftware Heritage

D3248.id11529.diff
No OneTemporary

D3248.id11529.diff

diff --git a/bin/swh-worker-control b/bin/swh-worker-control
deleted file mode 100755
--- a/bin/swh-worker-control
+++ /dev/null
@@ -1,284 +0,0 @@
-#!/usr/bin/env python3
-
-# Copyright (C) 2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import datetime
-from fnmatch import fnmatch
-from operator import itemgetter
-import os
-import sys
-
-import click
-
-
-def list_remote_workers(inspect):
- ping_replies = inspect.ping()
- if not ping_replies:
- return {}
- workers = list(sorted(ping_replies))
- ret = {}
-
- for worker_name in workers:
- if not worker_name.startswith("celery@"):
- print("Unsupported worker: %s" % worker_name, file=sys.stderr)
- continue
- type, host = worker_name[len("celery@") :].split(".", 1)
- worker = {
- "name": worker_name,
- "host": host,
- "type": type,
- }
- ret[worker_name] = worker
-
- return ret
-
-
-def make_filters(filter_host, filter_type):
- """Parse the filters and create test functions"""
-
- def include(field, value):
- def filter(worker, field=field, value=value):
- return fnmatch(worker[field], value)
-
- return filter
-
- def exclude(field, value):
- def filter(worker, field=field, value=value):
- return not fnmatch(worker[field], value)
-
- return filter
-
- filters = []
- for host in filter_host:
- if host.startswith("-"):
- filters.append(exclude("host", host[1:]))
- else:
- filters.append(include("host", host))
-
- for type_ in filter_type:
- if type_.startswith("-"):
- filters.append(exclude("type", type_[1:]))
- else:
- filters.append(include("type", type_))
-
- return filters
-
-
-def filter_workers(workers, filters):
- """Filter workers according to the set criteria"""
- return {
- name: worker
- for name, worker in workers.items()
- if all(check(worker) for check in filters)
- }
-
-
-def get_clock_offsets(workers, inspect):
- """Add a clock_offset entry for each worker"""
- err_msg = "Could not get monotonic clock for {worker}"
-
- t = datetime.datetime.now(tz=datetime.timezone.utc)
- for worker, clock in inspect._request("monotonic").items():
- monotonic = clock.get("monotonic")
- if monotonic is None:
- monotonic = 0
- click.echo(err_msg.format(worker=worker), err=True)
- dt = datetime.timedelta(seconds=monotonic)
- workers[worker]["clock_offset"] = t - dt
-
-
-def worker_to_wallclock(worker, monotonic):
- """Convert a monotonic timestamp from a worker to a wall clock time"""
- dt = datetime.timedelta(seconds=monotonic)
- return worker["clock_offset"] + dt
-
-
-@click.group()
-@click.option(
- "--instance-config",
- metavar="CONFIG",
- default=None,
- help="Use this worker instance configuration",
-)
-@click.option(
- "--host", metavar="HOSTNAME_FILTER", multiple=True, help="Filter by hostname"
-)
-@click.option(
- "--type", metavar="WORKER_TYPE_FILTER", multiple=True, help="Filter by worker type"
-)
-@click.option(
- "--timeout",
- metavar="TIMEOUT",
- type=float,
- default=1.0,
- help="Timeout for remote control communication",
-)
-@click.option("--debug/--no-debug", default=False, help="Turn on debugging")
-@click.pass_context
-def cli(ctx, debug, timeout, instance_config, host, type):
- """Manage the Software Heritage workers
-
- Filters support globs; a filter starting with a "-" excludes the
- corresponding values.
-
- """
- if instance_config:
- os.environ["SWH_WORKER_INSTANCE"] = instance_config
-
- from swh.scheduler.celery_backend.config import app
-
- full_inspect = app.control.inspect(timeout=timeout)
-
- workers = filter_workers(
- list_remote_workers(full_inspect), make_filters(host, type)
- )
- ctx.obj["workers"] = workers
-
- destination = list(workers)
- inspect = app.control.inspect(destination=destination, timeout=timeout)
- ctx.obj["inspect"] = inspect
-
- get_clock_offsets(workers, inspect)
-
- ctx.obj["control"] = app.control
- ctx.obj["destination"] = destination
- ctx.obj["timeout"] = timeout
- ctx.obj["debug"] = debug
-
-
-@cli.command()
-@click.pass_context
-def list_workers(ctx):
- """List the currently running workers"""
- workers = ctx.obj["workers"]
-
- for worker_name, worker in sorted(workers.items()):
- click.echo("{type} alive on {host}".format(**worker))
-
- if not workers:
- sys.exit(2)
-
-
-@cli.command()
-@click.pass_context
-def list_tasks(ctx):
- """List the tasks currently running on workers"""
- task_template = (
- "{worker} {name}"
- "[{id} "
- "started={started:%Y-%m-%mT%H:%M:%S} "
- "pid={worker_pid}] {args} {kwargs}"
- )
- inspect = ctx.obj["inspect"]
- workers = ctx.obj["workers"]
- active = inspect.active()
-
- if not active:
- click.echo("No reply from workers", err=True)
- sys.exit(2)
-
- has_tasks = False
- for worker_name, tasks in sorted(active.items()):
- worker = workers[worker_name]
- if not tasks:
- click.echo("No active tasks on {name}".format(**worker), err=True)
- for task in sorted(tasks, key=itemgetter("time_start")):
- task["started"] = worker_to_wallclock(worker, task["time_start"])
- click.echo(task_template.format(worker=worker_name, **task))
- has_tasks = True
-
- if not has_tasks:
- sys.exit(2)
-
-
-@cli.command()
-@click.pass_context
-def list_queues(ctx):
- """List all the queues currently enabled on the workers"""
- inspect = ctx.obj["inspect"]
- active = inspect.active_queues()
-
- if not active:
- click.echo("No reply from workers", err=True)
- sys.exit(2)
-
- has_queues = False
- for worker_name, queues in sorted(active.items()):
- queues = sorted(queue["name"] for queue in queues)
- if queues:
- click.echo(
- "{worker} {queues}".format(worker=worker_name, queues=" ".join(queues))
- )
- has_queues = True
- else:
- click.echo("No queues for {worker}".format(worker=worker_name), err=True)
-
- if not has_queues:
- sys.exit(2)
-
-
-@cli.command()
-@click.option("--noop", is_flag=True, default=False, help="Do not proceed")
-@click.argument("queues", nargs=-1)
-@click.pass_context
-def remove_queues(ctx, noop, queues):
- """Cancel the queue for the given workers"""
- msg_template = "Canceling queue {queue} on worker {worker}{noop}"
-
- inspect = ctx.obj["inspect"]
- control = ctx.obj["control"]
- timeout = ctx.obj["timeout"]
- active = inspect.active_queues()
-
- if not queues:
- queues = ["*"]
-
- if not active:
- click.echo("No reply from workers", err=True)
- sys.exit(2)
-
- for worker, active_queues in sorted(active.items()):
- for queue in sorted(active_queues, key=itemgetter("name")):
- if any(fnmatch(queue["name"], name) for name in queues):
- msg = msg_template.format(
- queue=queue["name"], worker=worker, noop=" (noop)" if noop else ""
- )
- click.echo(msg, err=True)
- if not noop:
- control.cancel_consumer(
- queue["name"], destination=[worker], timeout=timeout
- )
-
-
-@cli.command()
-@click.option("--noop", is_flag=True, default=False, help="Do not proceed")
-@click.argument("queues", nargs=-1)
-@click.pass_context
-def add_queues(ctx, noop, queues):
- """Start the queue for the given workers"""
- msg_template = "Starting queue {queue} on worker {worker}{noop}"
-
- control = ctx.obj["control"]
- timeout = ctx.obj["timeout"]
- workers = ctx.obj["workers"]
-
- if not workers:
- click.echo("No reply from workers", err=True)
- sys.exit(2)
-
- for worker in sorted(workers):
- for queue in queues:
- msg = msg_template.format(
- queue=queue, worker=worker, noop=" (noop)" if noop else ""
- )
- click.echo(msg, err=True)
- if not noop:
- ret = control.add_consumer(queue, destination=[worker], timeout=timeout)
- print(ret)
-
-
-if __name__ == "__main__":
- cli(obj={})
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -45,7 +45,6 @@
author_email="swh-devel@inria.fr",
url="https://forge.softwareheritage.org/diffusion/DSCH/",
packages=find_packages(),
- scripts=["bin/swh-worker-control"],
setup_requires=["setuptools-scm"],
use_scm_version=True,
install_requires=parse_requirements() + parse_requirements("swh"),
diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py
--- a/swh/scheduler/cli/__init__.py
+++ b/swh/scheduler/cli/__init__.py
@@ -37,19 +37,17 @@
Use a local scheduler instance by default (plugged to the
main scheduler db).
"""
+ try:
+ from psycopg2 import OperationalError
+ except ImportError:
+
+ class OperationalError(Exception):
+ pass
+
from swh.core import config
- from swh.scheduler.celery_backend.config import setup_log_handler
from swh.scheduler import get_scheduler, DEFAULT_CONFIG
ctx.ensure_object(dict)
- log_level = ctx.obj.get("log_level", logging.INFO)
-
- setup_log_handler(
- loglevel=log_level,
- colorize=False,
- format="[%(levelname)s] %(name)s -- %(message)s",
- log_console=not no_stdout,
- )
logger = logging.getLogger(__name__)
scheduler = None
@@ -67,7 +65,7 @@
try:
logger.debug("Instantiating scheduler with %s" % (sched_conf))
scheduler = get_scheduler(**sched_conf)
- except ValueError:
+ except (ValueError, OperationalError):
# it's the subcommand to decide whether not having a proper
# scheduler instance is a problem.
pass
@@ -76,7 +74,7 @@
ctx.obj["config"] = conf
-from . import admin, task, task_type # noqa
+from . import admin, celery_monitor, task, task_type # noqa
def main():
diff --git a/swh/scheduler/cli/celery_monitor.py b/swh/scheduler/cli/celery_monitor.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/cli/celery_monitor.py
@@ -0,0 +1,180 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from ast import literal_eval
+import csv
+import logging
+import sys
+import time
+from typing import Any, Dict, List, Optional
+
+from celery.utils.nodenames import gethostname
+import click
+from kombu.matcher import match
+
+from . import cli
+
+logger = logging.getLogger(__name__)
+
+
+def destination_from_host_type(
+ ctx: click.Context, host: Optional[str], type: Optional[str]
+):
+ """Get the celery destination pattern from host and type values"""
+ pattern: List[str] = ["celery@"]
+ if type:
+ pattern.append(type)
+ else:
+ pattern.append("*")
+ pattern.append(".")
+ if host is not None:
+ if host == "LOCAL":
+ host = gethostname()
+ assert host is not None
+ pattern.append(host)
+ else:
+ pattern.append("*")
+
+ rendered_pattern = "".join(pattern)
+
+ if host is None and type is None:
+ logger.debug("Matching all workers")
+ elif "*" in pattern:
+ ctx.obj["inspect"].pattern = rendered_pattern
+ ctx.obj["inspect"].matcher = "glob"
+ logger.debug("Using glob pattern %s", rendered_pattern)
+ ctx.obj["destination_filter"] = match("glob", rendered_pattern)
+ else:
+ ctx.obj["inspect"].destination = [rendered_pattern]
+ logger.debug("Using destination %s", rendered_pattern)
+
+
+@cli.group("celery-monitor")
+@click.option(
+ "--timeout", type=float, default=3.0, help="Timeout for celery remote control"
+)
+@click.option("--host", help="Filter by hostname", default=None)
+@click.option("--type", help="Filter by type", default=None)
+@click.pass_context
+def celery_monitor(
+ ctx: click.Context, timeout: float, host: Optional[str], type: Optional[str]
+) -> None:
+ """Monitoring of Celery."""
+ from swh.scheduler.celery_backend.config import app
+
+ ctx.obj["timeout"] = timeout
+ ctx.obj["inspect"] = app.control.inspect(timeout=timeout)
+
+ destination_from_host_type(ctx, host, type)
+
+
+@celery_monitor.command("ping-workers")
+@click.pass_context
+def ping_workers(ctx: click.Context) -> None:
+ """Check which workers respond to the celery remote control"""
+
+ response_times = {}
+
+ def ping_callback(response):
+ rtt = time.monotonic() - ping_time
+ for destination in response:
+ logger.debug("Got ping response from %s: %r", destination, response)
+ response_times[destination] = rtt
+
+ ctx.obj["inspect"].callback = ping_callback
+
+ ping_time = time.monotonic()
+ ret = ctx.obj["inspect"].ping()
+
+ if not ret:
+ logger.info("No response in %f seconds", time.monotonic() - ping_time)
+ ctx.exit(1)
+
+ for destination in ret:
+ logger.info(
+ "Got response from %s in %f seconds",
+ destination,
+ response_times[destination],
+ )
+
+ ctx.exit(0)
+
+
+@celery_monitor.command("list-running")
+@click.option(
+ "--format",
+ help="Output format",
+ default="pretty",
+ type=click.Choice(["pretty", "csv"]),
+)
+@click.pass_context
+def list_running(ctx: click.Context, format: str):
+ """List running tasks on the lister workers"""
+ response_times = {}
+
+ def active_callback(response):
+ rtt = time.monotonic() - active_time
+ for destination in response:
+ response_times[destination] = rtt
+
+ ctx.obj["inspect"].callback = active_callback
+
+ active_time = time.monotonic()
+ ret = ctx.obj["inspect"].active()
+
+ if not ret:
+ logger.info("No response in %f seconds", time.monotonic() - active_time)
+ ctx.exit(1)
+
+ def pretty_task_arguments(task: Dict[str, Any]) -> str:
+ arg_list = []
+ for arg in task["args"]:
+ arg_list.append(repr(arg))
+ for k, v in task["kwargs"].items():
+ arg_list.append(f"{k}={v!r}")
+
+ return f'{task["name"]}({", ".join(arg_list)})'
+
+ def get_task_data(worker: str, task: Dict[str, Any]) -> Dict[str, Any]:
+ duration = time.time() - task["time_start"]
+ return {
+ "worker": worker,
+ "name": task["name"],
+ "args": literal_eval(task["args"]),
+ "kwargs": literal_eval(task["kwargs"]),
+ "duration": duration,
+ "worker_pid": task["worker_pid"],
+ }
+
+ if format == "csv":
+ writer = csv.DictWriter(
+ sys.stdout, ["worker", "name", "args", "kwargs", "duration", "worker_pid"]
+ )
+ writer.writeheader()
+
+ def output(data: Dict[str, Any]):
+ writer.writerow(data)
+
+ elif format == "pretty":
+
+ def output(data: Dict[str, Any]):
+ print(
+ f"{data['worker']}: {pretty_task_arguments(data)} "
+ f"[for {data['duration']:f}s, pid={data['worker_pid']}]"
+ )
+
+ else:
+ logger.error("Unknown format %s", format)
+ ctx.exit(127)
+
+ for worker, active in sorted(ret.items()):
+ if not active:
+ logger.info("%s: no active tasks", worker)
+ continue
+
+ for task in sorted(active, key=lambda t: t["time_start"]):
+ output(get_task_data(worker, task))
+
+ ctx.exit(0)
diff --git a/swh/scheduler/tests/test_cli_celery_monitor.py b/swh/scheduler/tests/test_cli_celery_monitor.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_cli_celery_monitor.py
@@ -0,0 +1,122 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+
+from click.testing import CliRunner
+import pytest
+
+from swh.scheduler.cli import cli
+
+
+def invoke(*args, catch_exceptions=False):
+ result = CliRunner(mix_stderr=False).invoke(
+ cli, ["celery-monitor", *args], catch_exceptions=catch_exceptions,
+ )
+
+ return result
+
+
+def test_celery_monitor():
+ """Check that celery-monitor returns its help text"""
+
+ result = invoke()
+
+ assert "Commands:" in result.stdout
+ assert "Options:" in result.stdout
+
+
+def test_celery_monitor_ping_all(caplog, swh_app, celery_session_worker):
+ caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
+
+ result = invoke("ping-workers")
+
+ assert result.exit_code == 0
+
+ got_response_message = False
+ got_all_workers_message = False
+
+ for record in caplog.records:
+ # Check that the filtering didn't do anything
+ if record.levelname == "DEBUG":
+ if "Matching all workers" in record.message:
+ got_all_workers_message = True
+ # Check that the worker responded
+ if record.levelname == "INFO":
+ if f"response from {celery_session_worker.hostname}" in record.message:
+ got_response_message = True
+
+ assert got_all_workers_message
+ assert got_response_message
+
+
+@pytest.mark.parametrize(
+ "filter_args,filter_message",
+ [
+ (("--host", "test-host"), "Using glob pattern celery@*.test-host"),
+ (("--type", "test-type"), "Using glob pattern celery@test-type.*"),
+ (
+ ("--host", "test-host", "--type", "test-type"),
+ "Using destination celery@test-type.test-host",
+ ),
+ ],
+)
+def test_celery_monitor_ping_filter(
+ caplog, swh_app, celery_session_worker, filter_args, filter_message
+):
+ caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
+
+ result = invoke("--timeout", "1.5", *filter_args, "ping-workers")
+
+ assert result.exit_code == 1, result.stdout
+
+ got_no_response_message = False
+ got_filter_message = False
+
+ for record in caplog.records:
+ # Check the proper filter has been generated
+ if record.levelname == "DEBUG":
+ if filter_message in record.message:
+ got_filter_message = True
+ # Check that no worker responded
+ if record.levelname == "INFO":
+ if "No response in" in record.message:
+ got_no_response_message = True
+
+ assert got_filter_message
+ assert got_no_response_message
+
+
+def test_celery_monitor_list_running(caplog, swh_app, celery_session_worker):
+ caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
+
+ result = invoke("--timeout", "1.5", "list-running")
+
+ assert result.exit_code == 0, result.stdout
+
+ for record in caplog.records:
+ if record.levelname != "INFO":
+ continue
+ assert f"{celery_session_worker.hostname}: no active tasks" in record.message
+
+
+@pytest.mark.parametrize("format", ["csv", "pretty"])
+def test_celery_monitor_list_running_format(
+ caplog, swh_app, celery_session_worker, format
+):
+ caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
+
+ result = invoke("--timeout", "1.5", "list-running", "--format", format)
+
+ assert result.exit_code == 0, result.stdout
+
+ for record in caplog.records:
+ if record.levelname != "INFO":
+ continue
+ assert f"{celery_session_worker.hostname}: no active tasks" in record.message
+
+ if format == "csv":
+ lines = result.stdout.splitlines()
+ assert lines == ["worker,name,args,kwargs,duration,worker_pid"]

File Metadata

Mime Type
text/plain
Expires
Mon, Aug 18, 12:43 AM (2 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3215808

Event Timeline