diff --git a/bin/swh-worker-control b/bin/swh-worker-control new file mode 100755 index 0000000..4e24436 --- /dev/null +++ b/bin/swh-worker-control @@ -0,0 +1,240 @@ +#!/usr/bin/python3 + +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from fnmatch import fnmatch +from operator import itemgetter +import os +import sys + +import click + + +def list_remote_workers(inspect): + ping_replies = inspect.ping() + if not ping_replies: + return {} + workers = list(sorted(ping_replies)) + ret = {} + + for worker_name in workers: + if not worker_name.startswith('celery@'): + print('Unsupported worker: %s' % worker_name, file=sys.stderr) + continue + type, host = worker_name[len('celery@'):].split('.', 1) + worker = { + 'name': worker_name, + 'host': host, + 'type': type, + } + ret[worker_name] = worker + + return ret + + +def make_filters(filter_host, filter_type): + """Parse the filters and create test functions""" + + def include(field, value): + def filter(worker, field=field, value=value): + return fnmatch(worker[field], value) + return filter + + def exclude(field, value): + def filter(worker, field=field, value=value): + return not fnmatch(worker[field], value) + return filter + + filters = [] + for host in filter_host: + if host.startswith('-'): + filters.append(exclude('host', host[1:])) + else: + filters.append(include('host', host)) + + for type_ in filter_type: + if type_.startswith('-'): + filters.append(exclude('type', type_[1:])) + else: + filters.append(include('type', type_)) + + return filters + + +def filter_workers(workers, filters): + """Filter workers according to the set criteria""" + return {name: worker + for name, worker in workers.items() + if all(check(worker) for check in filters)} + + +@click.group() +@click.option('--instance-config', metavar='CONFIG', default=None, + help='Use this worker instance configuration') +@click.option('--host', metavar='HOSTNAME_FILTER', multiple=True, + help='Filter by hostname') +@click.option('--type', metavar='WORKER_TYPE_FILTER', multiple=True, + help='Filter by worker type') +@click.option('--timeout', metavar='TIMEOUT', type=float, default=1.0, + help='Timeout for remote control communication') +@click.option('--debug/--no-debug', default=False, help='Turn on debugging') +@click.pass_context +def cli(ctx, debug, timeout, instance_config, host, type): + """Manage the Software Heritage workers + + Filters support globs; a filter starting with a "-" excludes the + corresponding values. + + """ + if instance_config: + os.environ['SWH_WORKER_INSTANCE'] = instance_config + + from swh.scheduler.celery_backend.config import app + full_inspect = app.control.inspect(timeout=timeout) + + workers = filter_workers( + list_remote_workers(full_inspect), + make_filters(host, type) + ) + ctx.obj['workers'] = workers + + destination = [worker['name'] for worker in workers.values()] + + ctx.obj['inspect'] = app.control.inspect(destination=destination, + timeout=timeout) + ctx.obj['control'] = app.control + ctx.obj['destination'] = destination + ctx.obj['timeout'] = timeout + ctx.obj['debug'] = debug + + +@cli.command() +@click.pass_context +def list_workers(ctx): + """List the currently running workers""" + workers = ctx.obj['workers'] + + for worker_name, worker in sorted(workers.items()): + click.echo("{type} alive on {host}".format(**worker)) + + if not workers: + sys.exit(2) + + +@cli.command() +@click.pass_context +def list_tasks(ctx): + """List the tasks currently running on workers""" + task_template = '{worker} {name}[{id}, pid={worker_pid}] {args} {kwargs}' + inspect = ctx.obj['inspect'] + workers = ctx.obj['workers'] + active = inspect.active() + + if not active: + click.echo('No reply from workers', err=True) + sys.exit(2) + + has_tasks = False + for worker_name, tasks in sorted(active.items()): + worker = workers[worker_name] + if not tasks: + click.echo("No active tasks on {name}".format(**worker), err=True) + for task in sorted(tasks, key=itemgetter('worker_pid')): + click.echo(task_template.format(worker=worker_name, **task)) + has_tasks = True + + if not has_tasks: + sys.exit(2) + + +@cli.command() +@click.pass_context +def list_queues(ctx): + """List all the queues currently enabled on the workers""" + inspect = ctx.obj['inspect'] + active = inspect.active_queues() + + if not active: + click.echo('No reply from workers', err=True) + sys.exit(2) + + has_queues = False + for worker_name, queues in sorted(active.items()): + queues = sorted(queue['name'] for queue in queues) + if queues: + click.echo('{worker} {queues}'.format(worker=worker_name, + queues=' '.join(queues))) + has_queues = True + else: + click.echo('No queues for {worker}'.format(worker=worker_name), + err=True) + + if not has_queues: + sys.exit(2) + + +@cli.command() +@click.option('--noop', is_flag=True, default=False, help='Do not proceed') +@click.argument('queues', nargs=-1) +@click.pass_context +def remove_queues(ctx, noop, queues): + """Cancel the queue for the given workers""" + msg_template = 'Canceling queue {queue} on worker {worker}{noop}' + + inspect = ctx.obj['inspect'] + control = ctx.obj['control'] + timeout = ctx.obj['timeout'] + active = inspect.active_queues() + + if not queues: + queues = ['*'] + + if not active: + click.echo('No reply from workers', err=True) + sys.exit(2) + + for worker, active_queues in sorted(active.items()): + for queue in sorted(active_queues, key=itemgetter('name')): + if any(fnmatch(queue['name'], name) for name in queues): + msg = msg_template.format(queue=queue['name'], worker=worker, + noop=' (noop)' if noop else '') + click.echo(msg, err=True) + if not noop: + control.cancel_consumer(queue['name'], + destination=[worker], + timeout=timeout) + + +@cli.command() +@click.option('--noop', is_flag=True, default=False, help='Do not proceed') +@click.argument('queues', nargs=-1) +@click.pass_context +def add_queues(ctx, noop, queues): + """Start the queue for the given workers""" + msg_template = 'Starting queue {queue} on worker {worker}{noop}' + + control = ctx.obj['control'] + timeout = ctx.obj['timeout'] + workers = ctx.obj['workers'] + + if not workers: + click.echo('No reply from workers', err=True) + sys.exit(2) + + for worker in sorted(workers): + for queue in queues: + msg = msg_template.format(queue=queue, worker=worker, + noop=' (noop)' if noop else '') + click.echo(msg, err=True) + if not noop: + ret = control.add_consumer(queue, + destination=[worker], + timeout=timeout) + print(ret) + + +if __name__ == '__main__': + cli(obj={}) diff --git a/setup.py b/setup.py index b1250ae..2f383a4 100644 --- a/setup.py +++ b/setup.py @@ -1,34 +1,34 @@ from setuptools import setup def parse_requirements(): requirements = [] for reqf in ('requirements.txt', 'requirements-swh.txt'): with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.scheduler', description='Software Heritage Scheduler', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DSCH/', packages=[ 'swh.scheduler', 'swh.scheduler.celery_backend', 'swh.scheduler.tests' ], - scripts=[], # scripts to package + scripts=['bin/swh-worker-control'], # scripts to package install_requires=parse_requirements(), entry_points=''' [console_scripts] swh-scheduler=swh.scheduler.cli:cli ''', setup_requires=['vcversioner'], vcversioner={}, include_package_data=True, )