diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py index f8b4c2f..614f369 100644 --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -1,256 +1,257 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import importlib import logging import os import urllib.parse from celery import Celery from celery.signals import setup_logging, celeryd_after_setup from celery.utils.log import ColorFormatter from celery.worker.control import Panel from kombu import Exchange, Queue from kombu.five import monotonic as _monotonic import requests from swh.scheduler.task import Task from swh.core.config import load_named_config from swh.core.logger import JournalHandler DEFAULT_CONFIG_NAME = 'worker' CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' CONFIG_NAME_TEMPLATE = 'worker/%s' DEFAULT_CONFIG = { 'task_broker': ('str', 'amqp://guest@localhost//'), 'task_modules': ('list[str]', []), 'task_queues': ('list[str]', []), 'task_soft_time_limit': ('int', 0), } @setup_logging.connect def setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, **kwargs): """Setup logging according to Software Heritage preferences. We use the command-line loglevel for tasks only, as we never really care about the debug messages from celery. """ if loglevel is None: loglevel = logging.DEBUG if isinstance(loglevel, str): loglevel = logging._nameToLevel[loglevel] formatter = logging.Formatter(format) root_logger = logging.getLogger('') root_logger.setLevel(logging.INFO) if loglevel == logging.DEBUG: color_formatter = ColorFormatter(format) if colorize else formatter console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(color_formatter) root_logger.addHandler(console) systemd_journal = JournalHandler() systemd_journal.setLevel(logging.DEBUG) systemd_journal.setFormatter(formatter) root_logger.addHandler(systemd_journal) logging.getLogger('celery').setLevel(logging.INFO) # Silence amqp heartbeat_tick messages logger = logging.getLogger('amqp') logger.addFilter(lambda record: not record.msg.startswith( 'heartbeat_tick')) logger.setLevel(logging.DEBUG) # Silence useless "Starting new HTTP connection" messages logging.getLogger('urllib3').setLevel(logging.WARNING) logging.getLogger('swh').setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task logging.getLogger('celery.task').setLevel(loglevel) + return loglevel @celeryd_after_setup.connect def setup_queues_and_tasks(sender, instance, **kwargs): """Signal called on worker start. This automatically registers swh.scheduler.task.Task subclasses as available celery tasks. This also subscribes the worker to the "implicit" per-task queues defined for these task classes. """ for module_name in itertools.chain( # celery worker -I flag instance.app.conf['include'], # set from the celery / swh worker instance configuration file instance.app.conf['imports'], ): module = importlib.import_module(module_name) for name in dir(module): obj = getattr(module, name) if ( isinstance(obj, type) and issubclass(obj, Task) and obj != Task # Don't register the abstract class itself ): class_name = '%s.%s' % (module_name, name) instance.app.register_task_class(class_name, obj) for task_name in instance.app.tasks: if task_name.startswith('swh.'): instance.app.amqp.queues.select_add(task_name) @Panel.register def monotonic(state): """Get the current value for the monotonic clock""" return {'monotonic': _monotonic()} def route_for_task(name, args, kwargs, options, task=None, **kw): """Route tasks according to the task_queue attribute in the task class""" if name is not None and name.startswith('swh.'): return {'queue': name} class CustomCelery(Celery): def get_queue_stats(self, queue_name): """Get the statistics regarding a queue on the broker. Arguments: queue_name: name of the queue to check Returns a dictionary raw from the RabbitMQ management API; or `None` if the current configuration does not use RabbitMQ. Interesting keys: - consumers (number of consumers for the queue) - messages (number of messages in queue) - messages_unacknowledged (number of messages currently being processed) Documentation: https://www.rabbitmq.com/management.html#http-api """ conn_info = self.connection().info() if conn_info['transport'] == 'memory': # We're running in a test environment, without RabbitMQ. return None url = 'http://{hostname}:{port}/api/queues/{vhost}/{queue}'.format( hostname=conn_info['hostname'], port=conn_info['port'] + 10000, vhost=urllib.parse.quote(conn_info['virtual_host'], safe=''), queue=urllib.parse.quote(queue_name, safe=''), ) credentials = (conn_info['userid'], conn_info['password']) r = requests.get(url, auth=credentials) if r.status_code == 404: return {} if r.status_code != 200: raise ValueError('Got error %s when reading queue stats: %s' % ( r.status_code, r.json())) return r.json() def get_queue_length(self, queue_name): """Shortcut to get a queue's length""" stats = self.get_queue_stats(queue_name) if stats: return stats.get('messages') def register_task_class(self, name, cls): """Register a class-based task under the given name""" if name in self.tasks: return task_instance = cls() task_instance.name = name self.register_task(task_instance) INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) if INSTANCE_NAME: CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME else: CONFIG_NAME = DEFAULT_CONFIG_NAME # Load the Celery config CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] for queue in CONFIG['task_queues']: CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) # Instantiate the Celery app app = CustomCelery() app.conf.update( # The broker broker_url=CONFIG['task_broker'], # Timezone configuration: all in UTC enable_utc=True, timezone='UTC', # Imported modules imports=CONFIG['task_modules'], # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. None means to never expire results. result_expires=None, # A string identifying the default serialization method to use. Can # be json (default), pickle, yaml, msgpack, or any custom # serialization methods that have been registered with task_serializer='msgpack', # Result serialization format result_serializer='msgpack', # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. task_acks_late=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry accept_content=['msgpack', 'json'], # If True the task will report its status as “started” # when the task is executed by a worker. task_track_started=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # result_compression='bzip2', # task_compression='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) worker_disable_rate_limits=True, # Task hard time limit in seconds. The worker processing the task will be # killed and replaced with a new one when this is exceeded. # task_time_limit=3600, # Task soft time limit in seconds. # The SoftTimeLimitExceeded exception will be raised when this is exceeded. # The task can catch this to e.g. clean up before the hard time limit # comes. task_soft_time_limit=CONFIG['task_soft_time_limit'], # Task routing task_routes=route_for_task, # Task queues this worker will consume from task_queues=CELERY_QUEUES, # Allow pool restarts from remote worker_pool_restarts=True, # Do not prefetch tasks worker_prefetch_multiplier=1, # Send events worker_send_task_events=True, # Do not send useless task_sent events task_send_sent_event=False, ) diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py index d1f8423..7b38bfa 100644 --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -1,441 +1,458 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import click import csv import itertools import json import locale import logging import time from swh.core import utils, config from . import compute_nb_tasks_from from .backend_es import SWHElasticSearchClient +from . import get_scheduler locale.setlocale(locale.LC_ALL, '') ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] class DateTimeType(click.ParamType): name = 'time and date' def convert(self, value, param, ctx): if not isinstance(value, arrow.Arrow): value = arrow.get(value) return value DATETIME = DateTimeType() CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) def pretty_print_list(list, indent): """Pretty-print a list""" return ''.join('%s%s\n' % (' ' * indent, item) for item in list) def pretty_print_dict(dict, indent): """Pretty-print a list""" return ''.join('%s%s: %s\n' % (' ' * indent, click.style(key, bold=True), value) for key, value in dict.items()) def pretty_print_task(task): """Pretty-print a task""" next_run = arrow.get(task['next_run']) lines = [ '%s %s\n' % (click.style('Task', bold=True), task['id']), click.style(' Next run: ', bold=True), "%s (%s)" % (next_run.humanize(locale=ARROW_LOCALE), next_run.format()), '\n', click.style(' Interval: ', bold=True), str(task['current_interval']), '\n', click.style(' Type: ', bold=True), task['type'], '\n', click.style(' Policy: ', bold=True), task['policy'], '\n', click.style(' Args:\n', bold=True), pretty_print_list(task['arguments']['args'], indent=4), click.style(' Keyword args:\n', bold=True), pretty_print_dict(task['arguments']['kwargs'], indent=4), ] return ''.join(lines) def list_task_types(ctx, param, value): if not value or ctx.resilient_parsing: return click.echo("Known task types:") for tasktype in ctx.obj['scheduler'].get_task_types(): click.echo('{type}:\n {description}'.format(**tasktype)) ctx.exit() @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--cls', '-c', default='local', type=click.Choice(['local', 'remote']), help="Scheduler's class, default to 'local'") @click.option('--database', '-d', help="Scheduling database DSN (if cls is 'local')") @click.option('--url', '-u', help="Scheduler's url access (if cls is 'remote')") +@click.option('--log-level', '-l', default='INFO', + type=click.Choice(logging._nameToLevel.keys()), + help="Log level (default to INFO)") @click.pass_context -def cli(ctx, cls, database, url): +def cli(ctx, cls, database, url, log_level): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the main scheduler db). """ + from swh.scheduler.celery_backend.config import setup_log_handler + log_level = setup_log_handler( + loglevel=log_level, colorize=False, + format='[%(levelname)s] %(name)s -- %(message)s') + ctx.ensure_object(dict) scheduler = None override_config = {} try: if cls == 'local' and database: override_config = {'scheduling_db': database} elif cls == 'remote' and url: override_config = {'url': url} scheduler = get_scheduler(cls, args=override_config) except Exception: # it's the subcommand to decide whether not having a proper # scheduler instance is a problem. pass ctx.obj['scheduler'] = scheduler ctx.obj['config'] = {'cls': cls, 'args': override_config} + ctx.obj['loglevel'] = log_level @cli.group('task') @click.option('--list-types', '-l', is_flag=True, default=False, is_eager=True, expose_value=False, callback=list_task_types) @click.pass_context def task(ctx): """Manipulate tasks.""" pass @task.command('schedule') @click.option('--columns', '-c', multiple=True, default=['type', 'args', 'kwargs', 'next_run'], type=click.Choice([ 'type', 'args', 'kwargs', 'policy', 'next_run']), help='columns present in the CSV file') @click.option('--delimiter', '-d', default=',') @click.argument('file', type=click.File(encoding='utf-8')) @click.pass_context def schedule_tasks(ctx, columns, delimiter, file): """Schedule tasks from a CSV input file. The following columns are expected, and can be set through the -c option: - type: the type of the task to be scheduled (mandatory) - args: the arguments passed to the task (JSON list, defaults to an empty list) - kwargs: the keyword arguments passed to the task (JSON object, defaults to an empty dict) - next_run: the date at which the task should run (datetime, defaults to now) The CSV can be read either from a named file, or from stdin (use - as filename). Use sample: cat scheduling-task.txt | \ python3 -m swh.scheduler.cli \ --database 'service=swh-scheduler-dev' \ task schedule \ --columns type --columns kwargs --columns policy \ --delimiter ';' - """ tasks = [] now = arrow.utcnow() scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') reader = csv.reader(file, delimiter=delimiter) for line in reader: task = dict(zip(columns, line)) args = json.loads(task.pop('args', '[]')) kwargs = json.loads(task.pop('kwargs', '{}')) task['arguments'] = { 'args': args, 'kwargs': kwargs, } task['next_run'] = DATETIME.convert(task.get('next_run', now), None, None) tasks.append(task) created = scheduler.create_tasks(tasks) output = [ 'Created %d tasks\n' % len(created), ] for task in created: output.append(pretty_print_task(task)) click.echo_via_pager('\n'.join(output)) @task.command('add') @click.argument('type', nargs=1, required=True) @click.argument('options', nargs=-1) @click.option('--policy', '-p', default='recurring', type=click.Choice(['recurring', 'oneshot'])) @click.option('--next-run', '-n', default=None) @click.pass_context def schedule_task(ctx, type, options, policy, next_run): """Schedule one task from arguments. Use sample: swh-scheduler --database 'service=swh-scheduler' \ task add swh-lister-pypi swh-scheduler --database 'service=swh-scheduler' \ task add swh-lister-debian --policy=oneshot distribution=stretch """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') now = arrow.utcnow() args = [x for x in options if '=' not in x] kw = dict(x.split('=', 1) for x in options if '=' in x) task = {'type': type, 'policy': policy, 'arguments': { 'args': args, 'kwargs': kw, }, 'next_run': DATETIME.convert(next_run or now, None, None), } created = scheduler.create_tasks([task]) output = [ 'Created %d tasks\n' % len(created), ] for task in created: output.append(pretty_print_task(task)) click.echo('\n'.join(output)) @task.command('list-pending') @click.argument('task-types', required=True, nargs=-1) @click.option('--limit', '-l', required=False, type=click.INT, help='The maximum number of tasks to fetch') @click.option('--before', '-b', required=False, type=DATETIME, help='List all jobs supposed to run before the given date') @click.pass_context def list_pending_tasks(ctx, task_types, limit, before): """List the tasks that are going to be run. You can override the number of tasks to fetch """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) output = [] for task_type in task_types: pending = scheduler.peek_ready_tasks( task_type, timestamp=before, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) output.append('Found %d %s tasks\n' % ( len(pending), task_type)) for task in pending: output.append(pretty_print_task(task)) click.echo('\n'.join(output)) @task.command('archive') @click.option('--before', '-b', default=None, help='''Task whose ended date is anterior will be archived. Default to current month's first day.''') @click.option('--after', '-a', default=None, help='''Task whose ended date is after the specified date will be archived. Default to prior month's first day.''') @click.option('--batch-index', default=1000, type=click.INT, help='Batch size of tasks to read from db to archive') @click.option('--bulk-index', default=200, type=click.INT, help='Batch size of tasks to bulk index') @click.option('--batch-clean', default=1000, type=click.INT, help='Batch size of task to clean after archival') @click.option('--dry-run/--no-dry-run', is_flag=True, default=False, help='Default to list only what would be archived.') @click.option('--verbose', is_flag=True, default=False, help='Default to list only what would be archived.') @click.option('--cleanup/--no-cleanup', is_flag=True, default=True, help='Clean up archived tasks (default)') @click.option('--start-from', type=click.INT, default=-1, help='(Optional) default task id to start from. Default is -1.') @click.pass_context def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean, dry_run, verbose, cleanup, start_from): """Archive task/task_run whose (task_type is 'oneshot' and task_status is 'completed') or (task_type is 'recurring' and task_status is 'disabled'). With --dry-run flag set (default), only list those. """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') es_client = SWHElasticSearchClient() logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) log = logging.getLogger('swh.scheduler.cli.archive') logging.getLogger('urllib3').setLevel(logging.WARN) logging.getLogger('elasticsearch').setLevel(logging.WARN) if dry_run: log.info('**DRY-RUN** (only reading db)') if not cleanup: log.info('**NO CLEANUP**') now = arrow.utcnow() # Default to archive tasks from a rolling month starting the week # prior to the current one if not before: before = now.shift(weeks=-1).format('YYYY-MM-DD') if not after: after = now.shift(weeks=-1).shift(months=-1).format('YYYY-MM-DD') log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % ( not dry_run, not dry_run and cleanup, after, before)) def group_by_index_name(data, es_client=es_client): """Given a data record, determine the index's name through its ending date. This varies greatly depending on the task_run's status. """ date = data.get('started') if not date: date = data['scheduled'] return es_client.compute_index_name(date.year, date.month) def index_data(before, last_id, batch_index): tasks_in = scheduler.filter_task_to_archive( after, before, last_id=last_id, limit=batch_index) for index_name, tasks_group in itertools.groupby( tasks_in, key=group_by_index_name): log.debug('Index tasks to %s' % index_name) if dry_run: for task in tasks_group: yield task continue yield from es_client.streaming_bulk( index_name, tasks_group, source=['task_id', 'task_run_id'], chunk_size=bulk_index, log=log) gen = index_data(before, last_id=start_from, batch_index=batch_index) if cleanup: for task_ids in utils.grouper(gen, n=batch_clean): task_ids = list(task_ids) log.info('Clean up %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) if dry_run: # no clean up continue ctx.obj['scheduler'].delete_archived_tasks(task_ids) else: for task_ids in utils.grouper(gen, n=batch_index): task_ids = list(task_ids) log.info('Indexed %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) @cli.command('runner') @click.option('--period', '-p', default=0, help=('Period (in s) at witch pending tasks are checked and ' 'executed. Set to 0 (default) for a one shot.')) @click.pass_context def runner(ctx, period): """Starts a swh-scheduler runner service. This process is responsible for checking for ready-to-run tasks and schedule them.""" - from swh.scheduler.celery_backend.runner import main + from swh.scheduler.celery_backend.runner import run_ready_tasks + from swh.scheduler.celery_backend.config import app + + scheduler = ctx.obj['scheduler'] try: while True: - main() + try: + run_ready_tasks(scheduler, app) + except Exception: + scheduler.rollback() + raise if not period: break time.sleep(period) except KeyboardInterrupt: ctx.exit(0) @cli.command('listener') -@click.option('--log-level', '-l', default='INFO', - type=click.Choice(logging._nameToLevel.keys()), - help='Log level (default to INFO)') @click.pass_context -def listener(ctx, log_level): +def listener(ctx): """Starts a swh-scheduler listener service. This service is responsible for listening at task lifecycle events and handle their workflow status in the database.""" scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') from swh.scheduler.celery_backend.listener import ( - event_monitor, main_app, setup_log_handler) - setup_log_handler(loglevel=log_level, colorize=False, - format='[%(levelname)s] %(name)s -- %(message)s') + event_monitor, main_app) event_monitor(main_app, backend=scheduler) @cli.command('api-server') @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the scheduler server api") @click.option('--port', default=5008, type=click.INT, help="Binding port of the server") -@click.option('--debug/--nodebug', default=True, - help="Indicates if the server should run in debug mode") +@click.option('--debug/--nodebug', default=None, + help=("Indicates if the server should run in debug mode. " + "Defaults to True if log-level is DEBUG, False otherwise.") + ) @click.pass_context def api_server(ctx, config_path, host, port, debug): """Starts a swh-scheduler API HTTP server. """ if ctx.obj['config']['cls'] == 'remote': click.echo("The API server can only be started with a 'local' " "configuration", err=True) ctx.exit(1) from swh.scheduler.api.server import app, DEFAULT_CONFIG conf = config.read(config_path, DEFAULT_CONFIG) if ctx.obj['config']['args']: conf['scheduler']['args'].update(ctx.obj['config']['args']) app.config.update(conf) + if debug is None: + debug = ctx.obj['loglevel'] <= logging.DEBUG + app.run(host, port=port, debug=bool(debug)) if __name__ == '__main__': cli()