Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli/task_type.py
# Copyright (C) 2016-2019 The Software Heritage developers | # Copyright (C) 2016-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import celery.app.task | |||||
import click | import click | ||||
import logging | |||||
from importlib import import_module | |||||
from typing import Mapping | |||||
from pkg_resources import iter_entry_points | |||||
from . import cli | from . import cli | ||||
logger = logging.getLogger(__name__) | |||||
DEFAULT_TASK_TYPE = { | |||||
'full': { # for tasks like 'list_xxx_full()' | |||||
'default_interval': '90 days', | |||||
'min_interval': '90 days', | |||||
'max_interval': '90 days', | |||||
'backoff_factor': 1 | |||||
}, | |||||
'*': { # value if not suffix matches | |||||
'default_interval': '1 day', | |||||
'min_interval': '1 day', | |||||
'max_interval': '1 day', | |||||
'backoff_factor': 1 | |||||
}, | |||||
} | |||||
douardda: naming again: these are not worker modules, and not even plugin (python) modules, but worker… | |||||
PLUGIN_WORKER_DESCRIPTIONS = { | |||||
entry_point.name: entry_point | |||||
for entry_point in iter_entry_points('swh.workers') | |||||
} | |||||
@cli.group('task-type') | @cli.group('task-type') | ||||
@click.pass_context | @click.pass_context | ||||
def task_type(ctx): | def task_type(ctx): | ||||
"""Manipulate task types.""" | """Manipulate task types.""" | ||||
pass | pass | ||||
@task_type.command('list') | @task_type.command('list') | ||||
Show All 21 Lines | for tasktype in sorted(ctx.obj['scheduler'].get_task_types(), | ||||
key=lambda x: x['type']): | key=lambda x: x['type']): | ||||
if task_type and tasktype['type'] not in task_type: | if task_type and tasktype['type'] not in task_type: | ||||
continue | continue | ||||
if task_name and tasktype['backend_name'] not in task_name: | if task_name and tasktype['backend_name'] not in task_name: | ||||
continue | continue | ||||
click.echo(tmpl.format(**tasktype)) | click.echo(tmpl.format(**tasktype)) | ||||
@task_type.command('register') | |||||
@click.option('--plugins', '-p', 'plugins', multiple=True, default=('all', ), | |||||
Done Inline ActionsI did not find a way to make that actually a "choice". ardumont: I did not find a way to make that actually a "choice".
So the current code loads all that is… | |||||
Done Inline ActionsIt's now a real "choice". ardumont: It's now a real "choice". | |||||
type=click.Choice(['all'] + list(PLUGIN_WORKER_DESCRIPTIONS)), | |||||
help='Registers task-types for provided plugins. ' | |||||
'Defaults to all') | |||||
@click.pass_context | |||||
def register_task_types(ctx, plugins): | |||||
"""Register missing task-type entries in the scheduler. | |||||
According to declared tasks in each loaded worker (e.g. lister, loader, | |||||
...) plugins. | |||||
""" | |||||
scheduler = ctx.obj['scheduler'] | |||||
Not Done Inline Actions/nitpicky/ this worker_modules local variable is useless IMHO. douardda: /nitpicky/ this worker_modules local variable is useless IMHO. | |||||
if plugins == ('all', ): | |||||
plugins = list(PLUGIN_WORKER_DESCRIPTIONS) | |||||
for plugin in plugins: | |||||
entrypoint = PLUGIN_WORKER_DESCRIPTIONS[plugin] | |||||
logger.info('Loading entrypoint for plugin %s', plugin) | |||||
registry_entry = entrypoint.load()() | |||||
for task_module in registry_entry['task_modules']: | |||||
mod = import_module(task_module) | |||||
for task_name in (x for x in dir(mod) if not x.startswith('_')): | |||||
logger.debug('Loading task name %s', task_name) | |||||
taskobj = getattr(mod, task_name) | |||||
if isinstance(taskobj, celery.app.task.Task): | |||||
tt_name = task_name.replace('_', '-') | |||||
task_cfg = registry_entry.get('task_types', {}).get( | |||||
tt_name, {}) | |||||
ensure_task_type( | |||||
task_module, tt_name, taskobj, task_cfg, scheduler) | |||||
def ensure_task_type( | |||||
task_module: str, task_type: str, | |||||
swhtask, task_config: Mapping, scheduler): | |||||
"""Ensure a given task-type (for the task_module) exists in the scheduler. | |||||
Args: | |||||
task_module: task module we are currently checking for task type | |||||
consistency | |||||
task_type: the type of the task to check/insert (correspond to | |||||
the 'type' field in the db) | |||||
swhtask (SWHTask): the SWHTask instance the task-type correspond to | |||||
task_config: a dict with specific/overloaded values for the | |||||
task-type to be created | |||||
scheduler: the scheduler object used to access the scheduler db | |||||
""" | |||||
for suffix, defaults in DEFAULT_TASK_TYPE.items(): | |||||
if task_type.endswith('-' + suffix): | |||||
task_type_dict = defaults.copy() | |||||
break | |||||
else: | |||||
task_type_dict = DEFAULT_TASK_TYPE['*'].copy() | |||||
task_type_dict['type'] = task_type | |||||
task_type_dict['backend_name'] = swhtask.name | |||||
if swhtask.__doc__: | |||||
task_type_dict['description'] = swhtask.__doc__.splitlines()[0] | |||||
task_type_dict.update(task_config) | |||||
current_task_type = scheduler.get_task_type(task_type) | |||||
if current_task_type: | |||||
# Ensure the existing task_type is consistent in the scheduler | |||||
if current_task_type['backend_name'] != task_type_dict['backend_name']: | |||||
logger.warning( | |||||
'Existing task type %s for module %s has a ' | |||||
'different backend name than current ' | |||||
'code version provides (%s vs. %s)', | |||||
task_type, | |||||
task_module, | |||||
current_task_type['backend_name'], | |||||
task_type_dict['backend_name'], | |||||
) | |||||
else: | |||||
logger.info('Create task type %s in scheduler', task_type) | |||||
logger.debug(' %s', task_type_dict) | |||||
scheduler.create_task_type(task_type_dict) | |||||
@task_type.command('add') | @task_type.command('add') | ||||
@click.argument('type', required=True) | @click.argument('type', required=True) | ||||
@click.argument('task-name', required=True) | @click.argument('task-name', required=True) | ||||
@click.argument('description', required=True) | @click.argument('description', required=True) | ||||
@click.option('--default-interval', '-i', default='90 days', | @click.option('--default-interval', '-i', default='90 days', | ||||
help='Default interval ("90 days" by default)') | help='Default interval ("90 days" by default)') | ||||
@click.option('--min-interval', default=None, | @click.option('--min-interval', default=None, | ||||
help='Minimum interval (default interval if not set)') | help='Minimum interval (default interval if not set)') | ||||
Show All 27 Lines |
naming again: these are not worker modules, and not even plugin (python) modules, but worker plugin descriptions.
The name should be more explicit.