diff --git a/bin/swh-worker-control b/bin/swh-worker-control index 67409da..9785b37 100755 --- a/bin/swh-worker-control +++ b/bin/swh-worker-control @@ -1,269 +1,268 @@ #!/usr/bin/python3 # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from fnmatch import fnmatch from operator import itemgetter import os import sys import click def list_remote_workers(inspect): ping_replies = inspect.ping() if not ping_replies: return {} workers = list(sorted(ping_replies)) ret = {} for worker_name in workers: if not worker_name.startswith('celery@'): print('Unsupported worker: %s' % worker_name, file=sys.stderr) continue type, host = worker_name[len('celery@'):].split('.', 1) worker = { 'name': worker_name, 'host': host, 'type': type, } ret[worker_name] = worker return ret def make_filters(filter_host, filter_type): """Parse the filters and create test functions""" def include(field, value): def filter(worker, field=field, value=value): return fnmatch(worker[field], value) return filter def exclude(field, value): def filter(worker, field=field, value=value): return not fnmatch(worker[field], value) return filter filters = [] for host in filter_host: if host.startswith('-'): filters.append(exclude('host', host[1:])) else: filters.append(include('host', host)) for type_ in filter_type: if type_.startswith('-'): filters.append(exclude('type', type_[1:])) else: filters.append(include('type', type_)) return filters def filter_workers(workers, filters): """Filter workers according to the set criteria""" return {name: worker for name, worker in workers.items() if all(check(worker) for check in filters)} def get_clock_offsets(workers, inspect): """Add a clock_offset entry for each worker""" err_msg = 'Could not get monotonic clock for {worker}' t = datetime.datetime.now(tz=datetime.timezone.utc) for worker, clock in inspect._request('monotonic').items(): monotonic = clock.get('monotonic') if monotonic is None: monotonic = 0 click.echo(err_msg.format(worker=worker), err=True) dt = datetime.timedelta(seconds=monotonic) workers[worker]['clock_offset'] = t - dt def worker_to_wallclock(worker, monotonic): """Convert a monotonic timestamp from a worker to a wall clock time""" dt = datetime.timedelta(seconds=monotonic) return worker['clock_offset'] + dt @click.group() @click.option('--instance-config', metavar='CONFIG', default=None, help='Use this worker instance configuration') @click.option('--host', metavar='HOSTNAME_FILTER', multiple=True, help='Filter by hostname') @click.option('--type', metavar='WORKER_TYPE_FILTER', multiple=True, help='Filter by worker type') @click.option('--timeout', metavar='TIMEOUT', type=float, default=1.0, help='Timeout for remote control communication') @click.option('--debug/--no-debug', default=False, help='Turn on debugging') @click.pass_context def cli(ctx, debug, timeout, instance_config, host, type): """Manage the Software Heritage workers Filters support globs; a filter starting with a "-" excludes the corresponding values. """ if instance_config: os.environ['SWH_WORKER_INSTANCE'] = instance_config from swh.scheduler.celery_backend.config import app full_inspect = app.control.inspect(timeout=timeout) workers = filter_workers( list_remote_workers(full_inspect), make_filters(host, type) ) ctx.obj['workers'] = workers destination = list(workers) inspect = app.control.inspect(destination=destination, timeout=timeout) ctx.obj['inspect'] = inspect get_clock_offsets(workers, inspect) ctx.obj['control'] = app.control ctx.obj['destination'] = destination ctx.obj['timeout'] = timeout ctx.obj['debug'] = debug @cli.command() @click.pass_context def list_workers(ctx): """List the currently running workers""" workers = ctx.obj['workers'] for worker_name, worker in sorted(workers.items()): click.echo("{type} alive on {host}".format(**worker)) if not workers: sys.exit(2) @cli.command() @click.pass_context def list_tasks(ctx): """List the tasks currently running on workers""" task_template = ('{worker} {name}' '[{id} ' 'started={started:%Y-%m-%mT%H:%M:%S} ' 'pid={worker_pid}] {args} {kwargs}') inspect = ctx.obj['inspect'] workers = ctx.obj['workers'] active = inspect.active() if not active: click.echo('No reply from workers', err=True) sys.exit(2) has_tasks = False for worker_name, tasks in sorted(active.items()): worker = workers[worker_name] if not tasks: click.echo("No active tasks on {name}".format(**worker), err=True) - print(tasks) for task in sorted(tasks, key=itemgetter('time_start')): task['started'] = worker_to_wallclock(worker, task['time_start']) click.echo(task_template.format(worker=worker_name, **task)) has_tasks = True if not has_tasks: sys.exit(2) @cli.command() @click.pass_context def list_queues(ctx): """List all the queues currently enabled on the workers""" inspect = ctx.obj['inspect'] active = inspect.active_queues() if not active: click.echo('No reply from workers', err=True) sys.exit(2) has_queues = False for worker_name, queues in sorted(active.items()): queues = sorted(queue['name'] for queue in queues) if queues: click.echo('{worker} {queues}'.format(worker=worker_name, queues=' '.join(queues))) has_queues = True else: click.echo('No queues for {worker}'.format(worker=worker_name), err=True) if not has_queues: sys.exit(2) @cli.command() @click.option('--noop', is_flag=True, default=False, help='Do not proceed') @click.argument('queues', nargs=-1) @click.pass_context def remove_queues(ctx, noop, queues): """Cancel the queue for the given workers""" msg_template = 'Canceling queue {queue} on worker {worker}{noop}' inspect = ctx.obj['inspect'] control = ctx.obj['control'] timeout = ctx.obj['timeout'] active = inspect.active_queues() if not queues: queues = ['*'] if not active: click.echo('No reply from workers', err=True) sys.exit(2) for worker, active_queues in sorted(active.items()): for queue in sorted(active_queues, key=itemgetter('name')): if any(fnmatch(queue['name'], name) for name in queues): msg = msg_template.format(queue=queue['name'], worker=worker, noop=' (noop)' if noop else '') click.echo(msg, err=True) if not noop: control.cancel_consumer(queue['name'], destination=[worker], timeout=timeout) @cli.command() @click.option('--noop', is_flag=True, default=False, help='Do not proceed') @click.argument('queues', nargs=-1) @click.pass_context def add_queues(ctx, noop, queues): """Start the queue for the given workers""" msg_template = 'Starting queue {queue} on worker {worker}{noop}' control = ctx.obj['control'] timeout = ctx.obj['timeout'] workers = ctx.obj['workers'] if not workers: click.echo('No reply from workers', err=True) sys.exit(2) for worker in sorted(workers): for queue in queues: msg = msg_template.format(queue=queue, worker=worker, noop=' (noop)' if noop else '') click.echo(msg, err=True) if not noop: ret = control.add_consumer(queue, destination=[worker], timeout=timeout) print(ret) if __name__ == '__main__': cli(obj={})