diff --git a/PKG-INFO b/PKG-INFO index 49ebe47..6637905 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.scheduler -Version: 0.0.13 +Version: 0.0.14 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/bin/swh-worker-control b/bin/swh-worker-control new file mode 100755 index 0000000..67409da --- /dev/null +++ b/bin/swh-worker-control @@ -0,0 +1,269 @@ +#!/usr/bin/python3 + +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +from fnmatch import fnmatch +from operator import itemgetter +import os +import sys + +import click + + +def list_remote_workers(inspect): + ping_replies = inspect.ping() + if not ping_replies: + return {} + workers = list(sorted(ping_replies)) + ret = {} + + for worker_name in workers: + if not worker_name.startswith('celery@'): + print('Unsupported worker: %s' % worker_name, file=sys.stderr) + continue + type, host = worker_name[len('celery@'):].split('.', 1) + worker = { + 'name': worker_name, + 'host': host, + 'type': type, + } + ret[worker_name] = worker + + return ret + + +def make_filters(filter_host, filter_type): + """Parse the filters and create test functions""" + + def include(field, value): + def filter(worker, field=field, value=value): + return fnmatch(worker[field], value) + return filter + + def exclude(field, value): + def filter(worker, field=field, value=value): + return not fnmatch(worker[field], value) + return filter + + filters = [] + for host in filter_host: + if host.startswith('-'): + filters.append(exclude('host', host[1:])) + else: + filters.append(include('host', host)) + + for type_ in filter_type: + if type_.startswith('-'): + filters.append(exclude('type', type_[1:])) + else: + filters.append(include('type', type_)) + + return filters + + +def filter_workers(workers, filters): + """Filter workers according to the set criteria""" + return {name: worker + for name, worker in workers.items() + if all(check(worker) for check in filters)} + + +def get_clock_offsets(workers, inspect): + """Add a clock_offset entry for each worker""" + err_msg = 'Could not get monotonic clock for {worker}' + + t = datetime.datetime.now(tz=datetime.timezone.utc) + for worker, clock in inspect._request('monotonic').items(): + monotonic = clock.get('monotonic') + if monotonic is None: + monotonic = 0 + click.echo(err_msg.format(worker=worker), err=True) + dt = datetime.timedelta(seconds=monotonic) + workers[worker]['clock_offset'] = t - dt + + +def worker_to_wallclock(worker, monotonic): + """Convert a monotonic timestamp from a worker to a wall clock time""" + dt = datetime.timedelta(seconds=monotonic) + return worker['clock_offset'] + dt + + +@click.group() +@click.option('--instance-config', metavar='CONFIG', default=None, + help='Use this worker instance configuration') +@click.option('--host', metavar='HOSTNAME_FILTER', multiple=True, + help='Filter by hostname') +@click.option('--type', metavar='WORKER_TYPE_FILTER', multiple=True, + help='Filter by worker type') +@click.option('--timeout', metavar='TIMEOUT', type=float, default=1.0, + help='Timeout for remote control communication') +@click.option('--debug/--no-debug', default=False, help='Turn on debugging') +@click.pass_context +def cli(ctx, debug, timeout, instance_config, host, type): + """Manage the Software Heritage workers + + Filters support globs; a filter starting with a "-" excludes the + corresponding values. + + """ + if instance_config: + os.environ['SWH_WORKER_INSTANCE'] = instance_config + + from swh.scheduler.celery_backend.config import app + full_inspect = app.control.inspect(timeout=timeout) + + workers = filter_workers( + list_remote_workers(full_inspect), + make_filters(host, type) + ) + ctx.obj['workers'] = workers + + destination = list(workers) + inspect = app.control.inspect(destination=destination, + timeout=timeout) + ctx.obj['inspect'] = inspect + + get_clock_offsets(workers, inspect) + + ctx.obj['control'] = app.control + ctx.obj['destination'] = destination + ctx.obj['timeout'] = timeout + ctx.obj['debug'] = debug + + +@cli.command() +@click.pass_context +def list_workers(ctx): + """List the currently running workers""" + workers = ctx.obj['workers'] + + for worker_name, worker in sorted(workers.items()): + click.echo("{type} alive on {host}".format(**worker)) + + if not workers: + sys.exit(2) + + +@cli.command() +@click.pass_context +def list_tasks(ctx): + """List the tasks currently running on workers""" + task_template = ('{worker} {name}' + '[{id} ' + 'started={started:%Y-%m-%mT%H:%M:%S} ' + 'pid={worker_pid}] {args} {kwargs}') + inspect = ctx.obj['inspect'] + workers = ctx.obj['workers'] + active = inspect.active() + + if not active: + click.echo('No reply from workers', err=True) + sys.exit(2) + + has_tasks = False + for worker_name, tasks in sorted(active.items()): + worker = workers[worker_name] + if not tasks: + click.echo("No active tasks on {name}".format(**worker), err=True) + print(tasks) + for task in sorted(tasks, key=itemgetter('time_start')): + task['started'] = worker_to_wallclock(worker, task['time_start']) + click.echo(task_template.format(worker=worker_name, **task)) + has_tasks = True + + if not has_tasks: + sys.exit(2) + + +@cli.command() +@click.pass_context +def list_queues(ctx): + """List all the queues currently enabled on the workers""" + inspect = ctx.obj['inspect'] + active = inspect.active_queues() + + if not active: + click.echo('No reply from workers', err=True) + sys.exit(2) + + has_queues = False + for worker_name, queues in sorted(active.items()): + queues = sorted(queue['name'] for queue in queues) + if queues: + click.echo('{worker} {queues}'.format(worker=worker_name, + queues=' '.join(queues))) + has_queues = True + else: + click.echo('No queues for {worker}'.format(worker=worker_name), + err=True) + + if not has_queues: + sys.exit(2) + + +@cli.command() +@click.option('--noop', is_flag=True, default=False, help='Do not proceed') +@click.argument('queues', nargs=-1) +@click.pass_context +def remove_queues(ctx, noop, queues): + """Cancel the queue for the given workers""" + msg_template = 'Canceling queue {queue} on worker {worker}{noop}' + + inspect = ctx.obj['inspect'] + control = ctx.obj['control'] + timeout = ctx.obj['timeout'] + active = inspect.active_queues() + + if not queues: + queues = ['*'] + + if not active: + click.echo('No reply from workers', err=True) + sys.exit(2) + + for worker, active_queues in sorted(active.items()): + for queue in sorted(active_queues, key=itemgetter('name')): + if any(fnmatch(queue['name'], name) for name in queues): + msg = msg_template.format(queue=queue['name'], worker=worker, + noop=' (noop)' if noop else '') + click.echo(msg, err=True) + if not noop: + control.cancel_consumer(queue['name'], + destination=[worker], + timeout=timeout) + + +@cli.command() +@click.option('--noop', is_flag=True, default=False, help='Do not proceed') +@click.argument('queues', nargs=-1) +@click.pass_context +def add_queues(ctx, noop, queues): + """Start the queue for the given workers""" + msg_template = 'Starting queue {queue} on worker {worker}{noop}' + + control = ctx.obj['control'] + timeout = ctx.obj['timeout'] + workers = ctx.obj['workers'] + + if not workers: + click.echo('No reply from workers', err=True) + sys.exit(2) + + for worker in sorted(workers): + for queue in queues: + msg = msg_template.format(queue=queue, worker=worker, + noop=' (noop)' if noop else '') + click.echo(msg, err=True) + if not noop: + ret = control.add_consumer(queue, + destination=[worker], + timeout=timeout) + print(ret) + + +if __name__ == '__main__': + cli(obj={}) diff --git a/setup.py b/setup.py index b1250ae..2f383a4 100644 --- a/setup.py +++ b/setup.py @@ -1,34 +1,34 @@ from setuptools import setup def parse_requirements(): requirements = [] for reqf in ('requirements.txt', 'requirements-swh.txt'): with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.scheduler', description='Software Heritage Scheduler', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DSCH/', packages=[ 'swh.scheduler', 'swh.scheduler.celery_backend', 'swh.scheduler.tests' ], - scripts=[], # scripts to package + scripts=['bin/swh-worker-control'], # scripts to package install_requires=parse_requirements(), entry_points=''' [console_scripts] swh-scheduler=swh.scheduler.cli:cli ''', setup_requires=['vcversioner'], vcversioner={}, include_package_data=True, ) diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 49ebe47..6637905 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.scheduler -Version: 0.0.13 +Version: 0.0.14 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index 635a93d..f342d47 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,35 +1,36 @@ .gitignore AUTHORS LICENSE LICENSE.Celery MANIFEST.in Makefile requirements-swh.txt requirements.txt setup.py version.txt +bin/swh-worker-control debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format sql/swh-scheduler-schema.sql sql/swh-scheduler-testdata.sql sql/updates/02.sql sql/updates/03.sql swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/backend.py swh/scheduler/cli.py swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/runner.py swh/scheduler/tests/test_task.py \ No newline at end of file diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index b1bda92..b7f0cb4 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,385 +1,385 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime from functools import wraps import json import tempfile from arrow import Arrow, utcnow import psycopg2 import psycopg2.extras from psycopg2.extensions import AsIs from swh.core.config import SWHConfig def adapt_arrow(arrow): return AsIs("'%s'::timestamptz" % arrow.isoformat()) psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(Arrow, adapt_arrow) def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped class SchedulerBackend(SWHConfig): """ Backend for the Software Heritage scheduling database. """ CONFIG_BASE_FILENAME = 'scheduler.ini' DEFAULT_CONFIG = { 'scheduling_db': ('str', 'dbname=swh-scheduler'), } def __init__(self, **override_config): self.config = self.parse_config_file(global_config=False) self.config.update(override_config) self.db = None self.reconnect() def reconnect(self): if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['scheduling_db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() def copy_to(self, items, tblname, columns, cursor=None, item_cb=None): def escape(data): if data is None: return '' if isinstance(data, bytes): return '\\x%s' % binascii.hexlify(data).decode('ascii') elif isinstance(data, str): return '"%s"' % data.replace('"', '""') elif isinstance(data, (datetime.datetime, Arrow)): # We escape twice to make sure the string generated by # isoformat gets escaped return escape(data.isoformat()) elif isinstance(data, dict): return escape(json.dumps(data)) elif isinstance(data, list): return escape("{%s}" % ','.join(escape(d) for d in data)) elif isinstance(data, psycopg2.extras.Range): # We escape twice here too, so that we make sure # everything gets passed to copy properly return escape( '%s%s,%s%s' % ( '[' if data.lower_inc else '(', '-infinity' if data.lower_inf else escape(data.lower), 'infinity' if data.upper_inf else escape(data.upper), ']' if data.upper_inc else ')', ) ) else: # We don't escape here to make sure we pass literals properly return str(data) with tempfile.TemporaryFile('w+') as f: for d in items: if item_cb is not None: item_cb(d) line = [escape(d.get(k)) for k in columns] f.write(','.join(line)) f.write('\n') f.seek(0) cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) task_type_keys = [ 'type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', ] def _format_query(self, query, keys): """Format a query with the given keys""" query_keys = ', '.join(keys) placeholders = ', '.join(['%s'] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) def _format_multiquery(self, query, keys, values): """Format a query with placeholders generated for multiple values""" query_keys = ', '.join(keys) placeholders = '), ('.join( [', '.join(['%s'] * len(keys))] * len(values) ) ret_values = sum([[value[key] for key in keys] for value in values], []) return ( query.format(keys=query_keys, placeholders=placeholders), ret_values, ) @autocommit def create_task_type(self, task_type, cursor=None): """Create a new task type ready for scheduling. A task type is a dictionary with the following keys: type (str): an identifier for the task type description (str): a human-readable description of what the task does backend_name (str): the name of the task in the job-scheduling backend default_interval (datetime.timedelta): the default interval between two task runs min_interval (datetime.timedelta): the minimum interval between two task runs max_interval (datetime.timedelta): the maximum interval between two task runs backoff_factor (float): the factor by which the interval changes at each run """ query = self._format_query( """insert into task_type ({keys}) values ({placeholders})""", self.task_type_keys, ) cursor.execute(query, [task_type[key] for key in self.task_type_keys]) @autocommit def get_task_type(self, task_type_name, cursor=None): """Retrieve the task type with id task_type_name""" query = self._format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cursor.execute(query, (task_type_name,)) ret = cursor.fetchone() return ret task_keys = ['id', 'type', 'arguments', 'next_run', 'current_interval', 'status'] task_create_keys = ['type', 'arguments', 'next_run'] @autocommit def create_tasks(self, tasks, cursor=None): """Create new tasks. A task is a dictionary with the following keys: type (str): the task type arguments (dict): the arguments for the task runner args (list of str): arguments kwargs (dict str -> str): keyword arguments next_run (datetime.datetime): the next scheduled run for the task This returns a list of created task ids. """ cursor.execute('select swh_scheduler_mktemp_task()') self.copy_to(tasks, 'tmp_task', self.task_create_keys, cursor) query = self._format_query( 'select {keys} from swh_scheduler_create_tasks_from_temp()', self.task_keys, ) cursor.execute(query) return cursor.fetchall() @autocommit def disable_tasks(self, task_ids, cursor=None): """Disable the tasks whose ids are listed.""" query = "UPDATE task SET status = 'disabled' WHERE id IN %s" cursor.execute(query, (tuple(task_ids),)) return None @autocommit def peek_ready_tasks(self, timestamp=None, num_tasks=None, cursor=None): """Fetch the list of ready tasks Args: timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cursor.execute('select * from swh_scheduler_peek_ready_tasks(%s, %s)', (timestamp, num_tasks)) return cursor.fetchall() @autocommit def grab_ready_tasks(self, timestamp=None, num_tasks=None, cursor=None): """Fetch the list of ready tasks, and mark them as scheduled Args: timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cursor.execute('select * from swh_scheduler_grab_ready_tasks(%s, %s)', (timestamp, num_tasks)) return cursor.fetchall() task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata'] @autocommit def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None, cursor=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)', (task_id, backend_id, metadata, timestamp) ) return cursor.fetchone() @autocommit def mass_schedule_task_runs(self, task_runs, cursor=None): """Schedule a bunch of task runs. Args: task_runs: a list of dicts with keys: task (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry scheduled (datetime.datetime): the instant the event occurred Returns: None """ cursor.execute('select swh_scheduler_mktemp_task_run()') self.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys, cursor) cursor.execute('select swh_scheduler_schedule_task_run_from_temp()') @autocommit def start_task_run(self, backend_id, metadata=None, timestamp=None, cursor=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_start_task_run(%s, %s, %s)', (backend_id, metadata, timestamp) ) return cursor.fetchone() @autocommit def end_task_run(self, backend_id, status, metadata=None, timestamp=None, - cursor=None): + result=None, cursor=None): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status ('eventful', 'uneventful', 'failed'): how the task ended metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)', (backend_id, status, metadata, timestamp) ) return cursor.fetchone() diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py index 64c29de..16a399c 100644 --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -1,140 +1,149 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os from celery import Celery from celery.signals import setup_logging +from celery.worker.control import Panel + from kombu import Exchange, Queue +from kombu.five import monotonic as _monotonic from swh.core.config import load_named_config from swh.core.logger import JournalHandler DEFAULT_CONFIG_NAME = 'worker' CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' CONFIG_NAME_TEMPLATE = 'worker/%s' DEFAULT_CONFIG = { 'task_broker': ('str', 'amqp://guest@localhost//'), 'task_modules': ('list[str]', []), 'task_queues': ('list[str]', []), 'task_soft_time_limit': ('int', 0), } @setup_logging.connect def setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, **kwargs): """Setup logging according to Software Heritage preferences. We use the command-line loglevel for tasks only, as we never really care about the debug messages from celery. """ if loglevel is None: loglevel = logging.DEBUG formatter = logging.Formatter(format) root_logger = logging.getLogger('') root_logger.setLevel(logging.INFO) systemd_journal = JournalHandler() systemd_journal.setLevel(logging.DEBUG) systemd_journal.setFormatter(formatter) root_logger.addHandler(systemd_journal) celery_logger = logging.getLogger('celery') celery_logger.setLevel(logging.INFO) # Silence useless "Starting new HTTP connection" messages urllib3_logger = logging.getLogger('urllib3') urllib3_logger.setLevel(logging.WARNING) swh_logger = logging.getLogger('swh') swh_logger.setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task celery_task_logger = logging.getLogger('celery.task') celery_task_logger.setLevel(loglevel) +@Panel.register +def monotonic(state): + """Get the current value for the monotonic clock""" + return {'monotonic': _monotonic()} + + class TaskRouter: """Route tasks according to the task_queue attribute in the task class""" def route_for_task(self, task, args=None, kwargs=None): task_class = app.tasks[task] if hasattr(task_class, 'task_queue'): return {'queue': task_class.task_queue} return None INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) if INSTANCE_NAME: CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME else: CONFIG_NAME = DEFAULT_CONFIG_NAME # Load the Celery config CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] for queue in CONFIG['task_queues']: CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) # Instantiate the Celery app app = Celery() app.conf.update( # The broker BROKER_URL=CONFIG['task_broker'], # Timezone configuration: all in UTC CELERY_ENABLE_UTC=True, CELERY_TIMEZONE='UTC', # Imported modules CELERY_IMPORTS=CONFIG['task_modules'], # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. None means to never expire results. CELERY_TASK_RESULT_EXPIRES=None, # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. CELERY_ACKS_LATE=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry CELERY_ACCEPT_CONTENT=['msgpack', 'json', 'pickle'], # If True the task will report its status as “started” # when the task is executed by a worker. CELERY_TRACK_STARTED=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # CELERY_MESSAGE_COMPRESSION='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) CELERY_DISABLE_RATE_LIMITS=True, # Task hard time limit in seconds. The worker processing the task will be # killed and replaced with a new one when this is exceeded. # CELERYD_TASK_TIME_LIMIT=3600, # Task soft time limit in seconds. # The SoftTimeLimitExceeded exception will be raised when this is exceeded. # The task can catch this to e.g. clean up before the hard time limit # comes. CELERYD_TASK_SOFT_TIME_LIMIT=CONFIG['task_soft_time_limit'], # Task routing CELERY_ROUTES=TaskRouter(), # Task queues this worker will consume from CELERY_QUEUES=CELERY_QUEUES, # Allow pool restarts from remote CELERYD_POOL_RESTARTS=True, # Do not prefetch tasks CELERYD_PREFETCH_MULTIPLIER=1, # Send events CELERY_SEND_EVENTS=True, # Do not send useless task_sent events CELERY_SEND_TASK_SENT_EVENT=False, ) diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py index a31d6e1..0430d67 100644 --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -1,178 +1,156 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import socket from arrow import utcnow from kombu import Queue -from kombu.mixins import ConsumerMixin -from celery.events import get_exchange +from celery.events import EventReceiver from .config import app as main_app from ..backend import SchedulerBackend -# This is a simplified version of celery.events.Receiver, with a persistent -# queue and acked messages, with most of the options stripped down -# -# The original celery.events.Receiver code is available under the following -# license: -# -# Copyright (c) 2015-2016 Ask Solem & contributors. All rights reserved. -# Copyright (c) 2012-2014 GoPivotal, Inc. All rights reserved. -# Copyright (c) 2009, 2010, 2011, 2012 Ask Solem, and individual contributors. -# All rights reserved. -# -# Celery is licensed under The BSD License (3 Clause, also known as -# the new BSD license), whose full-text is available in the top-level -# LICENSE.Celery file. - -class ReliableEventsReceiver(ConsumerMixin): - def __init__(self, app, handlers, queue_id): - self.app = app - self.connection = self.app.connection().connection.client - self.handlers = handlers - self.queue_id = queue_id - self.exchange = get_exchange(self.connection) - self.queue = Queue(queue_id, exchange=self.exchange, routing_key='#', - auto_delete=False, durable=True) - self.accept = set([self.app.conf.CELERY_EVENT_SERIALIZER, 'json']) +class ReliableEventReceiver(EventReceiver): + def __init__(self, channel, handlers=None, routing_key='#', + node_id=None, app=None, queue_prefix='celeryev', + accept=None): + super(ReliableEventReceiver, self).__init__( + channel, handlers, routing_key, node_id, app, queue_prefix, accept) - def process(self, type, event, message): - """Process the received event by dispatching it to the appropriate - handler.""" - handler = self.handlers.get(type) or self.handlers.get('*') - handler and handler(event, message) + self.queue = Queue('.'.join([self.queue_prefix, self.node_id]), + exchange=self.exchange, + routing_key=self.routing_key, + auto_delete=False, + durable=True, + queue_arguments=self._get_queue_arguments()) def get_consumers(self, consumer, channel): return [consumer(queues=[self.queue], - callbacks=[self.receive], no_ack=False, + callbacks=[self._receive], no_ack=False, accept=self.accept)] - def on_consume_ready(self, connection, channel, consumers, **kwargs): - # When starting to consume, wakeup the workers - self.app.control.broadcast('heartbeat', - connection=self.connection, - channel=channel) - - def capture(self, limit=None, timeout=None, wakeup=True): - """Open up a consumer capturing events. - - This has to run in the main process, and it will never - stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`. + def _receive(self, body, message): + type, body = self.event_from_message(body) + self.process(type, body, message) - """ - for _ in self.consume(limit=limit, timeout=timeout, wakeup=wakeup): - pass - - def receive(self, body, message): - body['local_received'] = utcnow() - self.process(body['type'], body, message=message) + def process(self, type, event, message): + """Process the received event by dispatching it to the appropriate + handler.""" + handler = self.handlers.get(type) or self.handlers.get('*') + handler and handler(event, message) ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0) ACTION_QUEUE_MAX_LENGTH = 1000 def event_monitor(app, backend): actions = { 'last_send': utcnow() - 2*ACTION_SEND_DELAY, 'queue': [], } def try_perform_actions(actions=actions): if not actions['queue']: return if utcnow() - actions['last_send'] > ACTION_SEND_DELAY or \ len(actions['queue']) > ACTION_QUEUE_MAX_LENGTH: perform_actions(actions) def perform_actions(actions, backend=backend): action_map = { 'start_task_run': backend.start_task_run, 'end_task_run': backend.end_task_run, } messages = [] cursor = backend.cursor() for action in actions['queue']: messages.append(action['message']) function = action_map[action['action']] args = action.get('args', ()) kwargs = action.get('kwargs', {}) kwargs['cursor'] = cursor function(*args, **kwargs) backend.commit() for message in messages: message.ack() actions['queue'] = [] actions['last_send'] = utcnow() def queue_action(action, actions=actions): actions['queue'].append(action) try_perform_actions() def catchall_event(event, message): message.ack() try_perform_actions() def task_started(event, message): queue_action({ 'action': 'start_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'metadata': { 'worker': event['hostname'], }, }, 'message': message, }) def task_succeeded(event, message): - status = 'uneventful' - if 'True' in event['result']: - status = 'eventful' + result = event['result'] + + try: + status = result.get('status') + if status == 'success': + status = 'eventful' if result.get('eventful') else 'uneventful' + except Exception: + status = 'eventful' if result else 'uneventful' queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': status, + 'result': result, }, 'message': message, }) def task_failed(event, message): queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': 'failed', }, 'message': message, }) - recv = ReliableEventsReceiver( - main_app, + recv = ReliableEventReceiver( + main_app.connection(), + app=main_app, handlers={ 'task-started': task_started, - 'task-succeeded': task_succeeded, + 'task-result': task_succeeded, 'task-failed': task_failed, '*': catchall_event, }, - queue_id='celeryev.listener-%s' % socket.gethostname(), + node_id='listener-%s' % socket.gethostname(), ) recv.capture(limit=None, timeout=None, wakeup=True) if __name__ == '__main__': main_backend = SchedulerBackend() event_monitor(main_app, main_backend) diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py index a7aedde..053d3e3 100644 --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -1,34 +1,53 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import celery from celery.utils.log import get_task_logger class Task(celery.Task): """a schedulable task (abstract class) Sub-classes must implement the run() method. Sub-classes that want their tasks to get routed to a non-default task queue must override the task_queue attribute. Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ abstract = True task_queue = 'celery' def run(self, *args, **kwargs): - raise NotImplementedError('tasks must implement the run() method') + """This method is called by the celery worker when a task is received. + + Should not be overridden as we need our special events to be sent for + the reccurrent scheduler. Override run_task instead.""" + try: + result = self.run_task(*args, **kwargs) + except Exception as e: + self.send_event('task-result-exception') + raise e from None + else: + self.send_event('task-result', result=result) + return result + + def run_task(self, *args, **kwargs): + """Perform the task. + + Must return a json-serializable value as it is passed back to the task + scheduler using a celery event. + """ + raise NotImplementedError('tasks must implement the run_task() method') @property def log(self): if not hasattr(self, '__log'): self.__log = get_task_logger('%s.%s' % (__name__, self.__class__.__name__)) return self.__log diff --git a/version.txt b/version.txt index 2155564..eb36473 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.13-0-gf758291 \ No newline at end of file +v0.0.14-0-g7c1c041 \ No newline at end of file