Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli/celery_monitor.py
- This file was added.
# Copyright (C) 2020 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from ast import literal_eval | |||||
import csv | |||||
import logging | |||||
import sys | |||||
import time | |||||
from typing import Any, Dict, List, Optional | |||||
from celery.utils.nodenames import gethostname | |||||
import click | |||||
from kombu.matcher import match | |||||
from . import cli | |||||
logger = logging.getLogger(__name__) | |||||
def destination_from_host_type( | |||||
ctx: click.Context, host: Optional[str], type: Optional[str] | |||||
): | |||||
"""Get the celery destination pattern from host and type values""" | |||||
pattern: List[str] = ["celery@"] | |||||
if type: | |||||
pattern.append(type) | |||||
else: | |||||
pattern.append("*") | |||||
pattern.append(".") | |||||
if host is not None: | |||||
if host == "LOCAL": | |||||
host = gethostname() | |||||
assert host is not None | |||||
pattern.append(host) | |||||
else: | |||||
pattern.append("*") | |||||
rendered_pattern = "".join(pattern) | |||||
if "*" in pattern: | |||||
ctx.obj["inspect"].pattern = rendered_pattern | |||||
ctx.obj["inspect"].matcher = "glob" | |||||
logger.debug("Using glob pattern %s", rendered_pattern) | |||||
ctx.obj["destination_filter"] = match("glob", rendered_pattern) | |||||
else: | |||||
ctx.obj["inspect"].destination = [rendered_pattern] | |||||
logger.debug("Using destination %s", rendered_pattern) | |||||
@cli.group("celery-monitor") | |||||
@click.option( | |||||
"--timeout", type=float, default=3.0, help="Timeout for celery remote control" | |||||
) | |||||
@click.option("--host", help="Filter by hostname", default=None) | |||||
@click.option("--type", help="Filter by type", default=None) | |||||
@click.pass_context | |||||
def celery_monitor( | |||||
ctx: click.Context, timeout: float, host: Optional[str], type: Optional[str] | |||||
) -> None: | |||||
"""Monitoring of Celery.""" | |||||
from swh.scheduler.celery_backend.config import app | |||||
ctx.obj["timeout"] = timeout | |||||
ctx.obj["inspect"] = app.control.inspect(timeout=timeout) | |||||
destination_from_host_type(ctx, host, type) | |||||
@celery_monitor.command("ping-workers") | |||||
@click.pass_context | |||||
def ping_workers(ctx: click.Context) -> None: | |||||
"""Check which workers respond to the celery remote control""" | |||||
response_times = {} | |||||
def ping_callback(response): | |||||
rtt = time.monotonic() - ping_time | |||||
for destination in response: | |||||
response_times[destination] = rtt | |||||
ctx.obj["inspect"].callback = ping_callback | |||||
ping_time = time.monotonic() | |||||
ret = ctx.obj["inspect"].ping() | |||||
if not ret: | |||||
logger.info("No response in %f seconds", time.monotonic() - ping_time) | |||||
ctx.exit(1) | |||||
for destination in ret: | |||||
logger.info( | |||||
"Got response from %s in %f seconds", | |||||
destination, | |||||
response_times[destination], | |||||
) | |||||
ctx.exit(0) | |||||
@celery_monitor.command("list-running") | |||||
@click.option( | |||||
"--format", | |||||
help="Output format", | |||||
default="pretty", | |||||
type=click.Choice(["pretty", "csv"]), | |||||
) | |||||
@click.pass_context | |||||
def list_running(ctx: click.Context, format: str): | |||||
"""List running tasks on the lister workers""" | |||||
response_times = {} | |||||
def active_callback(response): | |||||
rtt = time.monotonic() - active_time | |||||
for destination in response: | |||||
response_times[destination] = rtt | |||||
ctx.obj["inspect"].callback = active_callback | |||||
active_time = time.monotonic() | |||||
ret = ctx.obj["inspect"].active() | |||||
if not ret: | |||||
logger.info("No response in %f seconds", time.monotonic() - active_time) | |||||
ctx.exit(1) | |||||
def pretty_task_arguments(task: Dict[str, Any]) -> str: | |||||
arg_list = [] | |||||
for arg in task["args"]: | |||||
arg_list.append(repr(arg)) | |||||
for k, v in task["kwargs"].items(): | |||||
arg_list.append(f"{k}={v!r}") | |||||
return f'{task["name"]}({", ".join(arg_list)})' | |||||
def get_task_data(worker: str, task: Dict[str, Any]) -> Dict[str, Any]: | |||||
duration = time.time() - task["time_start"] | |||||
return { | |||||
"worker": worker, | |||||
"name": task["name"], | |||||
"args": literal_eval(task["args"]), | |||||
"kwargs": literal_eval(task["kwargs"]), | |||||
"duration": duration, | |||||
"worker_pid": task["worker_pid"], | |||||
} | |||||
if format == "csv": | |||||
writer = csv.DictWriter( | |||||
sys.stdout, ["worker", "name", "args", "kwargs", "duration", "worker_pid"] | |||||
) | |||||
writer.writeheader() | |||||
def output(data: Dict[str, Any]): | |||||
writer.writerow(data) | |||||
elif format == "pretty": | |||||
def output(data: Dict[str, Any]): | |||||
print( | |||||
f"{data['worker']}: {pretty_task_arguments(data)} " | |||||
f"[for {data['duration']:f}s, pid={data['worker_pid']}]" | |||||
) | |||||
else: | |||||
logger.error("Unknown format %s", format) | |||||
ctx.exit(127) | |||||
for worker, active in sorted(ret.items()): | |||||
if not active: | |||||
logger.info("%s: no active tasks", worker) | |||||
continue | |||||
for task in sorted(active, key=lambda t: t["time_start"]): | |||||
output(get_task_data(worker, task)) | |||||
ctx.exit(0) |