Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9697718
D3248.id11529.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
19 KB
Subscribers
None
D3248.id11529.diff
View Options
diff --git a/bin/swh-worker-control b/bin/swh-worker-control
deleted file mode 100755
--- a/bin/swh-worker-control
+++ /dev/null
@@ -1,284 +0,0 @@
-#!/usr/bin/env python3
-
-# Copyright (C) 2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import datetime
-from fnmatch import fnmatch
-from operator import itemgetter
-import os
-import sys
-
-import click
-
-
-def list_remote_workers(inspect):
- ping_replies = inspect.ping()
- if not ping_replies:
- return {}
- workers = list(sorted(ping_replies))
- ret = {}
-
- for worker_name in workers:
- if not worker_name.startswith("celery@"):
- print("Unsupported worker: %s" % worker_name, file=sys.stderr)
- continue
- type, host = worker_name[len("celery@") :].split(".", 1)
- worker = {
- "name": worker_name,
- "host": host,
- "type": type,
- }
- ret[worker_name] = worker
-
- return ret
-
-
-def make_filters(filter_host, filter_type):
- """Parse the filters and create test functions"""
-
- def include(field, value):
- def filter(worker, field=field, value=value):
- return fnmatch(worker[field], value)
-
- return filter
-
- def exclude(field, value):
- def filter(worker, field=field, value=value):
- return not fnmatch(worker[field], value)
-
- return filter
-
- filters = []
- for host in filter_host:
- if host.startswith("-"):
- filters.append(exclude("host", host[1:]))
- else:
- filters.append(include("host", host))
-
- for type_ in filter_type:
- if type_.startswith("-"):
- filters.append(exclude("type", type_[1:]))
- else:
- filters.append(include("type", type_))
-
- return filters
-
-
-def filter_workers(workers, filters):
- """Filter workers according to the set criteria"""
- return {
- name: worker
- for name, worker in workers.items()
- if all(check(worker) for check in filters)
- }
-
-
-def get_clock_offsets(workers, inspect):
- """Add a clock_offset entry for each worker"""
- err_msg = "Could not get monotonic clock for {worker}"
-
- t = datetime.datetime.now(tz=datetime.timezone.utc)
- for worker, clock in inspect._request("monotonic").items():
- monotonic = clock.get("monotonic")
- if monotonic is None:
- monotonic = 0
- click.echo(err_msg.format(worker=worker), err=True)
- dt = datetime.timedelta(seconds=monotonic)
- workers[worker]["clock_offset"] = t - dt
-
-
-def worker_to_wallclock(worker, monotonic):
- """Convert a monotonic timestamp from a worker to a wall clock time"""
- dt = datetime.timedelta(seconds=monotonic)
- return worker["clock_offset"] + dt
-
-
-@click.group()
-@click.option(
- "--instance-config",
- metavar="CONFIG",
- default=None,
- help="Use this worker instance configuration",
-)
-@click.option(
- "--host", metavar="HOSTNAME_FILTER", multiple=True, help="Filter by hostname"
-)
-@click.option(
- "--type", metavar="WORKER_TYPE_FILTER", multiple=True, help="Filter by worker type"
-)
-@click.option(
- "--timeout",
- metavar="TIMEOUT",
- type=float,
- default=1.0,
- help="Timeout for remote control communication",
-)
-@click.option("--debug/--no-debug", default=False, help="Turn on debugging")
-@click.pass_context
-def cli(ctx, debug, timeout, instance_config, host, type):
- """Manage the Software Heritage workers
-
- Filters support globs; a filter starting with a "-" excludes the
- corresponding values.
-
- """
- if instance_config:
- os.environ["SWH_WORKER_INSTANCE"] = instance_config
-
- from swh.scheduler.celery_backend.config import app
-
- full_inspect = app.control.inspect(timeout=timeout)
-
- workers = filter_workers(
- list_remote_workers(full_inspect), make_filters(host, type)
- )
- ctx.obj["workers"] = workers
-
- destination = list(workers)
- inspect = app.control.inspect(destination=destination, timeout=timeout)
- ctx.obj["inspect"] = inspect
-
- get_clock_offsets(workers, inspect)
-
- ctx.obj["control"] = app.control
- ctx.obj["destination"] = destination
- ctx.obj["timeout"] = timeout
- ctx.obj["debug"] = debug
-
-
-@cli.command()
-@click.pass_context
-def list_workers(ctx):
- """List the currently running workers"""
- workers = ctx.obj["workers"]
-
- for worker_name, worker in sorted(workers.items()):
- click.echo("{type} alive on {host}".format(**worker))
-
- if not workers:
- sys.exit(2)
-
-
-@cli.command()
-@click.pass_context
-def list_tasks(ctx):
- """List the tasks currently running on workers"""
- task_template = (
- "{worker} {name}"
- "[{id} "
- "started={started:%Y-%m-%mT%H:%M:%S} "
- "pid={worker_pid}] {args} {kwargs}"
- )
- inspect = ctx.obj["inspect"]
- workers = ctx.obj["workers"]
- active = inspect.active()
-
- if not active:
- click.echo("No reply from workers", err=True)
- sys.exit(2)
-
- has_tasks = False
- for worker_name, tasks in sorted(active.items()):
- worker = workers[worker_name]
- if not tasks:
- click.echo("No active tasks on {name}".format(**worker), err=True)
- for task in sorted(tasks, key=itemgetter("time_start")):
- task["started"] = worker_to_wallclock(worker, task["time_start"])
- click.echo(task_template.format(worker=worker_name, **task))
- has_tasks = True
-
- if not has_tasks:
- sys.exit(2)
-
-
-@cli.command()
-@click.pass_context
-def list_queues(ctx):
- """List all the queues currently enabled on the workers"""
- inspect = ctx.obj["inspect"]
- active = inspect.active_queues()
-
- if not active:
- click.echo("No reply from workers", err=True)
- sys.exit(2)
-
- has_queues = False
- for worker_name, queues in sorted(active.items()):
- queues = sorted(queue["name"] for queue in queues)
- if queues:
- click.echo(
- "{worker} {queues}".format(worker=worker_name, queues=" ".join(queues))
- )
- has_queues = True
- else:
- click.echo("No queues for {worker}".format(worker=worker_name), err=True)
-
- if not has_queues:
- sys.exit(2)
-
-
-@cli.command()
-@click.option("--noop", is_flag=True, default=False, help="Do not proceed")
-@click.argument("queues", nargs=-1)
-@click.pass_context
-def remove_queues(ctx, noop, queues):
- """Cancel the queue for the given workers"""
- msg_template = "Canceling queue {queue} on worker {worker}{noop}"
-
- inspect = ctx.obj["inspect"]
- control = ctx.obj["control"]
- timeout = ctx.obj["timeout"]
- active = inspect.active_queues()
-
- if not queues:
- queues = ["*"]
-
- if not active:
- click.echo("No reply from workers", err=True)
- sys.exit(2)
-
- for worker, active_queues in sorted(active.items()):
- for queue in sorted(active_queues, key=itemgetter("name")):
- if any(fnmatch(queue["name"], name) for name in queues):
- msg = msg_template.format(
- queue=queue["name"], worker=worker, noop=" (noop)" if noop else ""
- )
- click.echo(msg, err=True)
- if not noop:
- control.cancel_consumer(
- queue["name"], destination=[worker], timeout=timeout
- )
-
-
-@cli.command()
-@click.option("--noop", is_flag=True, default=False, help="Do not proceed")
-@click.argument("queues", nargs=-1)
-@click.pass_context
-def add_queues(ctx, noop, queues):
- """Start the queue for the given workers"""
- msg_template = "Starting queue {queue} on worker {worker}{noop}"
-
- control = ctx.obj["control"]
- timeout = ctx.obj["timeout"]
- workers = ctx.obj["workers"]
-
- if not workers:
- click.echo("No reply from workers", err=True)
- sys.exit(2)
-
- for worker in sorted(workers):
- for queue in queues:
- msg = msg_template.format(
- queue=queue, worker=worker, noop=" (noop)" if noop else ""
- )
- click.echo(msg, err=True)
- if not noop:
- ret = control.add_consumer(queue, destination=[worker], timeout=timeout)
- print(ret)
-
-
-if __name__ == "__main__":
- cli(obj={})
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -45,7 +45,6 @@
author_email="swh-devel@inria.fr",
url="https://forge.softwareheritage.org/diffusion/DSCH/",
packages=find_packages(),
- scripts=["bin/swh-worker-control"],
setup_requires=["setuptools-scm"],
use_scm_version=True,
install_requires=parse_requirements() + parse_requirements("swh"),
diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py
--- a/swh/scheduler/cli/__init__.py
+++ b/swh/scheduler/cli/__init__.py
@@ -37,19 +37,17 @@
Use a local scheduler instance by default (plugged to the
main scheduler db).
"""
+ try:
+ from psycopg2 import OperationalError
+ except ImportError:
+
+ class OperationalError(Exception):
+ pass
+
from swh.core import config
- from swh.scheduler.celery_backend.config import setup_log_handler
from swh.scheduler import get_scheduler, DEFAULT_CONFIG
ctx.ensure_object(dict)
- log_level = ctx.obj.get("log_level", logging.INFO)
-
- setup_log_handler(
- loglevel=log_level,
- colorize=False,
- format="[%(levelname)s] %(name)s -- %(message)s",
- log_console=not no_stdout,
- )
logger = logging.getLogger(__name__)
scheduler = None
@@ -67,7 +65,7 @@
try:
logger.debug("Instantiating scheduler with %s" % (sched_conf))
scheduler = get_scheduler(**sched_conf)
- except ValueError:
+ except (ValueError, OperationalError):
# it's the subcommand to decide whether not having a proper
# scheduler instance is a problem.
pass
@@ -76,7 +74,7 @@
ctx.obj["config"] = conf
-from . import admin, task, task_type # noqa
+from . import admin, celery_monitor, task, task_type # noqa
def main():
diff --git a/swh/scheduler/cli/celery_monitor.py b/swh/scheduler/cli/celery_monitor.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/cli/celery_monitor.py
@@ -0,0 +1,180 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from ast import literal_eval
+import csv
+import logging
+import sys
+import time
+from typing import Any, Dict, List, Optional
+
+from celery.utils.nodenames import gethostname
+import click
+from kombu.matcher import match
+
+from . import cli
+
+logger = logging.getLogger(__name__)
+
+
+def destination_from_host_type(
+ ctx: click.Context, host: Optional[str], type: Optional[str]
+):
+ """Get the celery destination pattern from host and type values"""
+ pattern: List[str] = ["celery@"]
+ if type:
+ pattern.append(type)
+ else:
+ pattern.append("*")
+ pattern.append(".")
+ if host is not None:
+ if host == "LOCAL":
+ host = gethostname()
+ assert host is not None
+ pattern.append(host)
+ else:
+ pattern.append("*")
+
+ rendered_pattern = "".join(pattern)
+
+ if host is None and type is None:
+ logger.debug("Matching all workers")
+ elif "*" in pattern:
+ ctx.obj["inspect"].pattern = rendered_pattern
+ ctx.obj["inspect"].matcher = "glob"
+ logger.debug("Using glob pattern %s", rendered_pattern)
+ ctx.obj["destination_filter"] = match("glob", rendered_pattern)
+ else:
+ ctx.obj["inspect"].destination = [rendered_pattern]
+ logger.debug("Using destination %s", rendered_pattern)
+
+
+@cli.group("celery-monitor")
+@click.option(
+ "--timeout", type=float, default=3.0, help="Timeout for celery remote control"
+)
+@click.option("--host", help="Filter by hostname", default=None)
+@click.option("--type", help="Filter by type", default=None)
+@click.pass_context
+def celery_monitor(
+ ctx: click.Context, timeout: float, host: Optional[str], type: Optional[str]
+) -> None:
+ """Monitoring of Celery."""
+ from swh.scheduler.celery_backend.config import app
+
+ ctx.obj["timeout"] = timeout
+ ctx.obj["inspect"] = app.control.inspect(timeout=timeout)
+
+ destination_from_host_type(ctx, host, type)
+
+
+@celery_monitor.command("ping-workers")
+@click.pass_context
+def ping_workers(ctx: click.Context) -> None:
+ """Check which workers respond to the celery remote control"""
+
+ response_times = {}
+
+ def ping_callback(response):
+ rtt = time.monotonic() - ping_time
+ for destination in response:
+ logger.debug("Got ping response from %s: %r", destination, response)
+ response_times[destination] = rtt
+
+ ctx.obj["inspect"].callback = ping_callback
+
+ ping_time = time.monotonic()
+ ret = ctx.obj["inspect"].ping()
+
+ if not ret:
+ logger.info("No response in %f seconds", time.monotonic() - ping_time)
+ ctx.exit(1)
+
+ for destination in ret:
+ logger.info(
+ "Got response from %s in %f seconds",
+ destination,
+ response_times[destination],
+ )
+
+ ctx.exit(0)
+
+
+@celery_monitor.command("list-running")
+@click.option(
+ "--format",
+ help="Output format",
+ default="pretty",
+ type=click.Choice(["pretty", "csv"]),
+)
+@click.pass_context
+def list_running(ctx: click.Context, format: str):
+ """List running tasks on the lister workers"""
+ response_times = {}
+
+ def active_callback(response):
+ rtt = time.monotonic() - active_time
+ for destination in response:
+ response_times[destination] = rtt
+
+ ctx.obj["inspect"].callback = active_callback
+
+ active_time = time.monotonic()
+ ret = ctx.obj["inspect"].active()
+
+ if not ret:
+ logger.info("No response in %f seconds", time.monotonic() - active_time)
+ ctx.exit(1)
+
+ def pretty_task_arguments(task: Dict[str, Any]) -> str:
+ arg_list = []
+ for arg in task["args"]:
+ arg_list.append(repr(arg))
+ for k, v in task["kwargs"].items():
+ arg_list.append(f"{k}={v!r}")
+
+ return f'{task["name"]}({", ".join(arg_list)})'
+
+ def get_task_data(worker: str, task: Dict[str, Any]) -> Dict[str, Any]:
+ duration = time.time() - task["time_start"]
+ return {
+ "worker": worker,
+ "name": task["name"],
+ "args": literal_eval(task["args"]),
+ "kwargs": literal_eval(task["kwargs"]),
+ "duration": duration,
+ "worker_pid": task["worker_pid"],
+ }
+
+ if format == "csv":
+ writer = csv.DictWriter(
+ sys.stdout, ["worker", "name", "args", "kwargs", "duration", "worker_pid"]
+ )
+ writer.writeheader()
+
+ def output(data: Dict[str, Any]):
+ writer.writerow(data)
+
+ elif format == "pretty":
+
+ def output(data: Dict[str, Any]):
+ print(
+ f"{data['worker']}: {pretty_task_arguments(data)} "
+ f"[for {data['duration']:f}s, pid={data['worker_pid']}]"
+ )
+
+ else:
+ logger.error("Unknown format %s", format)
+ ctx.exit(127)
+
+ for worker, active in sorted(ret.items()):
+ if not active:
+ logger.info("%s: no active tasks", worker)
+ continue
+
+ for task in sorted(active, key=lambda t: t["time_start"]):
+ output(get_task_data(worker, task))
+
+ ctx.exit(0)
diff --git a/swh/scheduler/tests/test_cli_celery_monitor.py b/swh/scheduler/tests/test_cli_celery_monitor.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_cli_celery_monitor.py
@@ -0,0 +1,122 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+
+from click.testing import CliRunner
+import pytest
+
+from swh.scheduler.cli import cli
+
+
+def invoke(*args, catch_exceptions=False):
+ result = CliRunner(mix_stderr=False).invoke(
+ cli, ["celery-monitor", *args], catch_exceptions=catch_exceptions,
+ )
+
+ return result
+
+
+def test_celery_monitor():
+ """Check that celery-monitor returns its help text"""
+
+ result = invoke()
+
+ assert "Commands:" in result.stdout
+ assert "Options:" in result.stdout
+
+
+def test_celery_monitor_ping_all(caplog, swh_app, celery_session_worker):
+ caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
+
+ result = invoke("ping-workers")
+
+ assert result.exit_code == 0
+
+ got_response_message = False
+ got_all_workers_message = False
+
+ for record in caplog.records:
+ # Check that the filtering didn't do anything
+ if record.levelname == "DEBUG":
+ if "Matching all workers" in record.message:
+ got_all_workers_message = True
+ # Check that the worker responded
+ if record.levelname == "INFO":
+ if f"response from {celery_session_worker.hostname}" in record.message:
+ got_response_message = True
+
+ assert got_all_workers_message
+ assert got_response_message
+
+
+@pytest.mark.parametrize(
+ "filter_args,filter_message",
+ [
+ (("--host", "test-host"), "Using glob pattern celery@*.test-host"),
+ (("--type", "test-type"), "Using glob pattern celery@test-type.*"),
+ (
+ ("--host", "test-host", "--type", "test-type"),
+ "Using destination celery@test-type.test-host",
+ ),
+ ],
+)
+def test_celery_monitor_ping_filter(
+ caplog, swh_app, celery_session_worker, filter_args, filter_message
+):
+ caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
+
+ result = invoke("--timeout", "1.5", *filter_args, "ping-workers")
+
+ assert result.exit_code == 1, result.stdout
+
+ got_no_response_message = False
+ got_filter_message = False
+
+ for record in caplog.records:
+ # Check the proper filter has been generated
+ if record.levelname == "DEBUG":
+ if filter_message in record.message:
+ got_filter_message = True
+ # Check that no worker responded
+ if record.levelname == "INFO":
+ if "No response in" in record.message:
+ got_no_response_message = True
+
+ assert got_filter_message
+ assert got_no_response_message
+
+
+def test_celery_monitor_list_running(caplog, swh_app, celery_session_worker):
+ caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
+
+ result = invoke("--timeout", "1.5", "list-running")
+
+ assert result.exit_code == 0, result.stdout
+
+ for record in caplog.records:
+ if record.levelname != "INFO":
+ continue
+ assert f"{celery_session_worker.hostname}: no active tasks" in record.message
+
+
+@pytest.mark.parametrize("format", ["csv", "pretty"])
+def test_celery_monitor_list_running_format(
+ caplog, swh_app, celery_session_worker, format
+):
+ caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
+
+ result = invoke("--timeout", "1.5", "list-running", "--format", format)
+
+ assert result.exit_code == 0, result.stdout
+
+ for record in caplog.records:
+ if record.levelname != "INFO":
+ continue
+ assert f"{celery_session_worker.hostname}: no active tasks" in record.message
+
+ if format == "csv":
+ lines = result.stdout.splitlines()
+ assert lines == ["worker,name,args,kwargs,duration,worker_pid"]
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mon, Aug 18, 12:43 AM (2 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3215808
Attached To
D3248: Replace swh-worker-control with a swh scheduler celery-monitor subcommand
Event Timeline
Log In to Comment