diff --git a/PKG-INFO b/PKG-INFO index 6637905..eddb527 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.scheduler -Version: 0.0.14 +Version: 0.0.15 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/bin/swh-worker-control b/bin/swh-worker-control index 67409da..9785b37 100755 --- a/bin/swh-worker-control +++ b/bin/swh-worker-control @@ -1,269 +1,268 @@ #!/usr/bin/python3 # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from fnmatch import fnmatch from operator import itemgetter import os import sys import click def list_remote_workers(inspect): ping_replies = inspect.ping() if not ping_replies: return {} workers = list(sorted(ping_replies)) ret = {} for worker_name in workers: if not worker_name.startswith('celery@'): print('Unsupported worker: %s' % worker_name, file=sys.stderr) continue type, host = worker_name[len('celery@'):].split('.', 1) worker = { 'name': worker_name, 'host': host, 'type': type, } ret[worker_name] = worker return ret def make_filters(filter_host, filter_type): """Parse the filters and create test functions""" def include(field, value): def filter(worker, field=field, value=value): return fnmatch(worker[field], value) return filter def exclude(field, value): def filter(worker, field=field, value=value): return not fnmatch(worker[field], value) return filter filters = [] for host in filter_host: if host.startswith('-'): filters.append(exclude('host', host[1:])) else: filters.append(include('host', host)) for type_ in filter_type: if type_.startswith('-'): filters.append(exclude('type', type_[1:])) else: filters.append(include('type', type_)) return filters def filter_workers(workers, filters): """Filter workers according to the set criteria""" return {name: worker for name, worker in workers.items() if all(check(worker) for check in filters)} def get_clock_offsets(workers, inspect): """Add a clock_offset entry for each worker""" err_msg = 'Could not get monotonic clock for {worker}' t = datetime.datetime.now(tz=datetime.timezone.utc) for worker, clock in inspect._request('monotonic').items(): monotonic = clock.get('monotonic') if monotonic is None: monotonic = 0 click.echo(err_msg.format(worker=worker), err=True) dt = datetime.timedelta(seconds=monotonic) workers[worker]['clock_offset'] = t - dt def worker_to_wallclock(worker, monotonic): """Convert a monotonic timestamp from a worker to a wall clock time""" dt = datetime.timedelta(seconds=monotonic) return worker['clock_offset'] + dt @click.group() @click.option('--instance-config', metavar='CONFIG', default=None, help='Use this worker instance configuration') @click.option('--host', metavar='HOSTNAME_FILTER', multiple=True, help='Filter by hostname') @click.option('--type', metavar='WORKER_TYPE_FILTER', multiple=True, help='Filter by worker type') @click.option('--timeout', metavar='TIMEOUT', type=float, default=1.0, help='Timeout for remote control communication') @click.option('--debug/--no-debug', default=False, help='Turn on debugging') @click.pass_context def cli(ctx, debug, timeout, instance_config, host, type): """Manage the Software Heritage workers Filters support globs; a filter starting with a "-" excludes the corresponding values. """ if instance_config: os.environ['SWH_WORKER_INSTANCE'] = instance_config from swh.scheduler.celery_backend.config import app full_inspect = app.control.inspect(timeout=timeout) workers = filter_workers( list_remote_workers(full_inspect), make_filters(host, type) ) ctx.obj['workers'] = workers destination = list(workers) inspect = app.control.inspect(destination=destination, timeout=timeout) ctx.obj['inspect'] = inspect get_clock_offsets(workers, inspect) ctx.obj['control'] = app.control ctx.obj['destination'] = destination ctx.obj['timeout'] = timeout ctx.obj['debug'] = debug @cli.command() @click.pass_context def list_workers(ctx): """List the currently running workers""" workers = ctx.obj['workers'] for worker_name, worker in sorted(workers.items()): click.echo("{type} alive on {host}".format(**worker)) if not workers: sys.exit(2) @cli.command() @click.pass_context def list_tasks(ctx): """List the tasks currently running on workers""" task_template = ('{worker} {name}' '[{id} ' 'started={started:%Y-%m-%mT%H:%M:%S} ' 'pid={worker_pid}] {args} {kwargs}') inspect = ctx.obj['inspect'] workers = ctx.obj['workers'] active = inspect.active() if not active: click.echo('No reply from workers', err=True) sys.exit(2) has_tasks = False for worker_name, tasks in sorted(active.items()): worker = workers[worker_name] if not tasks: click.echo("No active tasks on {name}".format(**worker), err=True) - print(tasks) for task in sorted(tasks, key=itemgetter('time_start')): task['started'] = worker_to_wallclock(worker, task['time_start']) click.echo(task_template.format(worker=worker_name, **task)) has_tasks = True if not has_tasks: sys.exit(2) @cli.command() @click.pass_context def list_queues(ctx): """List all the queues currently enabled on the workers""" inspect = ctx.obj['inspect'] active = inspect.active_queues() if not active: click.echo('No reply from workers', err=True) sys.exit(2) has_queues = False for worker_name, queues in sorted(active.items()): queues = sorted(queue['name'] for queue in queues) if queues: click.echo('{worker} {queues}'.format(worker=worker_name, queues=' '.join(queues))) has_queues = True else: click.echo('No queues for {worker}'.format(worker=worker_name), err=True) if not has_queues: sys.exit(2) @cli.command() @click.option('--noop', is_flag=True, default=False, help='Do not proceed') @click.argument('queues', nargs=-1) @click.pass_context def remove_queues(ctx, noop, queues): """Cancel the queue for the given workers""" msg_template = 'Canceling queue {queue} on worker {worker}{noop}' inspect = ctx.obj['inspect'] control = ctx.obj['control'] timeout = ctx.obj['timeout'] active = inspect.active_queues() if not queues: queues = ['*'] if not active: click.echo('No reply from workers', err=True) sys.exit(2) for worker, active_queues in sorted(active.items()): for queue in sorted(active_queues, key=itemgetter('name')): if any(fnmatch(queue['name'], name) for name in queues): msg = msg_template.format(queue=queue['name'], worker=worker, noop=' (noop)' if noop else '') click.echo(msg, err=True) if not noop: control.cancel_consumer(queue['name'], destination=[worker], timeout=timeout) @cli.command() @click.option('--noop', is_flag=True, default=False, help='Do not proceed') @click.argument('queues', nargs=-1) @click.pass_context def add_queues(ctx, noop, queues): """Start the queue for the given workers""" msg_template = 'Starting queue {queue} on worker {worker}{noop}' control = ctx.obj['control'] timeout = ctx.obj['timeout'] workers = ctx.obj['workers'] if not workers: click.echo('No reply from workers', err=True) sys.exit(2) for worker in sorted(workers): for queue in queues: msg = msg_template.format(queue=queue, worker=worker, noop=' (noop)' if noop else '') click.echo(msg, err=True) if not noop: ret = control.add_consumer(queue, destination=[worker], timeout=timeout) print(ret) if __name__ == '__main__': cli(obj={}) diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 6637905..eddb527 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.scheduler -Version: 0.0.14 +Version: 0.0.15 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py index 16a399c..b412618 100644 --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -1,149 +1,197 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os +import urllib.parse from celery import Celery from celery.signals import setup_logging +from celery.utils.log import ColorFormatter from celery.worker.control import Panel from kombu import Exchange, Queue from kombu.five import monotonic as _monotonic +import requests + from swh.core.config import load_named_config from swh.core.logger import JournalHandler DEFAULT_CONFIG_NAME = 'worker' CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' CONFIG_NAME_TEMPLATE = 'worker/%s' DEFAULT_CONFIG = { 'task_broker': ('str', 'amqp://guest@localhost//'), 'task_modules': ('list[str]', []), 'task_queues': ('list[str]', []), 'task_soft_time_limit': ('int', 0), } @setup_logging.connect def setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, **kwargs): """Setup logging according to Software Heritage preferences. We use the command-line loglevel for tasks only, as we never really care about the debug messages from celery. """ if loglevel is None: loglevel = logging.DEBUG formatter = logging.Formatter(format) root_logger = logging.getLogger('') root_logger.setLevel(logging.INFO) + if loglevel == logging.DEBUG: + color_formatter = ColorFormatter(format) if colorize else formatter + console = logging.StreamHandler() + console.setLevel(logging.DEBUG) + console.setFormatter(color_formatter) + root_logger.addHandler(console) + systemd_journal = JournalHandler() systemd_journal.setLevel(logging.DEBUG) systemd_journal.setFormatter(formatter) root_logger.addHandler(systemd_journal) celery_logger = logging.getLogger('celery') celery_logger.setLevel(logging.INFO) # Silence useless "Starting new HTTP connection" messages urllib3_logger = logging.getLogger('urllib3') urllib3_logger.setLevel(logging.WARNING) swh_logger = logging.getLogger('swh') swh_logger.setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task celery_task_logger = logging.getLogger('celery.task') celery_task_logger.setLevel(loglevel) @Panel.register def monotonic(state): """Get the current value for the monotonic clock""" return {'monotonic': _monotonic()} class TaskRouter: """Route tasks according to the task_queue attribute in the task class""" def route_for_task(self, task, args=None, kwargs=None): task_class = app.tasks[task] if hasattr(task_class, 'task_queue'): return {'queue': task_class.task_queue} return None +class CustomCelery(Celery): + def get_queue_stats(self, queue_name): + """Get the statistics regarding a queue on the broker. + + Arguments: + queue_name: name of the queue to check + + Returns a dictionary raw from the RabbitMQ management API. + + Interesting keys: + - consumers (number of consumers for the queue) + - messages (number of messages in queue) + - messages_unacknowledged (number of messages currently being + processed) + + Documentation: https://www.rabbitmq.com/management.html#http-api + """ + + conn_info = self.connection().info() + url = 'http://{hostname}:{port}/api/queues/{vhost}/{queue}'.format( + hostname=conn_info['hostname'], + port=conn_info['port'] + 10000, + vhost=urllib.parse.quote(conn_info['virtual_host'], safe=''), + queue=urllib.parse.quote(queue_name, safe=''), + ) + credentials = (conn_info['userid'], conn_info['password']) + r = requests.get(url, auth=credentials) + if r.status_code != 200: + raise ValueError('Got error %s when reading queue stats: %s' % ( + r.status_code, r.json())) + return r.json() + + def get_queue_length(self, queue_name): + """Shortcut to get a queue's length""" + return self.get_queue_stats(queue_name)['messages'] + + INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) if INSTANCE_NAME: CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME else: CONFIG_NAME = DEFAULT_CONFIG_NAME # Load the Celery config CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] for queue in CONFIG['task_queues']: CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) # Instantiate the Celery app -app = Celery() +app = CustomCelery() app.conf.update( # The broker BROKER_URL=CONFIG['task_broker'], # Timezone configuration: all in UTC CELERY_ENABLE_UTC=True, CELERY_TIMEZONE='UTC', # Imported modules CELERY_IMPORTS=CONFIG['task_modules'], # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. None means to never expire results. CELERY_TASK_RESULT_EXPIRES=None, # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. CELERY_ACKS_LATE=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry CELERY_ACCEPT_CONTENT=['msgpack', 'json', 'pickle'], # If True the task will report its status as “started” # when the task is executed by a worker. CELERY_TRACK_STARTED=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # CELERY_MESSAGE_COMPRESSION='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) CELERY_DISABLE_RATE_LIMITS=True, # Task hard time limit in seconds. The worker processing the task will be # killed and replaced with a new one when this is exceeded. # CELERYD_TASK_TIME_LIMIT=3600, # Task soft time limit in seconds. # The SoftTimeLimitExceeded exception will be raised when this is exceeded. # The task can catch this to e.g. clean up before the hard time limit # comes. CELERYD_TASK_SOFT_TIME_LIMIT=CONFIG['task_soft_time_limit'], # Task routing CELERY_ROUTES=TaskRouter(), # Task queues this worker will consume from CELERY_QUEUES=CELERY_QUEUES, # Allow pool restarts from remote CELERYD_POOL_RESTARTS=True, # Do not prefetch tasks CELERYD_PREFETCH_MULTIPLIER=1, # Send events CELERY_SEND_EVENTS=True, # Do not send useless task_sent events CELERY_SEND_TASK_SENT_EVENT=False, ) diff --git a/version.txt b/version.txt index eb36473..66131fc 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.14-0-g7c1c041 \ No newline at end of file +v0.0.15-0-g0e14eff \ No newline at end of file