Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/cli.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | import os | ||||
import logging | import logging | ||||
from copy import deepcopy | from copy import deepcopy | ||||
from importlib import import_module | |||||
import celery.app.task | |||||
import click | import click | ||||
from sqlalchemy import create_engine | from sqlalchemy import create_engine | ||||
from swh.core.cli import CONTEXT_SETTINGS | from swh.core.cli import CONTEXT_SETTINGS | ||||
from swh.scheduler import get_scheduler | |||||
from swh.lister import get_lister, SUPPORTED_LISTERS, LISTERS | from swh.lister import get_lister, SUPPORTED_LISTERS, LISTERS | ||||
from swh.lister.core.models import initialize | from swh.lister.core.models import initialize | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
# the key in this dict is the suffix used to match new task-type to be added. | # the key in this dict is the suffix used to match new task-type to be added. | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | def db_init(ctx, drop_tables): | ||||
for lister, entrypoint in LISTERS.items(): | for lister, entrypoint in LISTERS.items(): | ||||
registry_entry = registry[lister] | registry_entry = registry[lister] | ||||
init_hook = registry_entry.get('init') | init_hook = registry_entry.get('init') | ||||
if callable(init_hook): | if callable(init_hook): | ||||
logger.info('Calling init hook for %s', lister) | logger.info('Calling init hook for %s', lister) | ||||
init_hook(db_engine) | init_hook(db_engine) | ||||
@lister.command(name='register-task-types', context_settings=CONTEXT_SETTINGS) | |||||
@click.option('--lister', '-l', 'listers', multiple=True, | |||||
default=('all', ), show_default=True, | |||||
help='Only registers task-types for these listers', | |||||
type=click.Choice(['all'] + SUPPORTED_LISTERS)) | |||||
@click.pass_context | |||||
def register_task_types(ctx, listers): | |||||
"""Insert missing task-type entries in the scheduler | |||||
According to declared tasks in each loaded lister plugin. | |||||
""" | |||||
cfg = ctx.obj['config'] | |||||
scheduler = get_scheduler(**cfg['scheduler']) | |||||
for lister, entrypoint in LISTERS.items(): | |||||
if 'all' not in listers and lister not in listers: | |||||
continue | |||||
logger.info('Loading lister %s', lister) | |||||
registry_entry = entrypoint.load()() | |||||
for task_module in registry_entry['task_modules']: | |||||
mod = import_module(task_module) | |||||
for task_name in (x for x in dir(mod) if not x.startswith('_')): | |||||
taskobj = getattr(mod, task_name) | |||||
if isinstance(taskobj, celery.app.task.Task): | |||||
task_type = task_name.replace('_', '-') | |||||
task_cfg = registry_entry.get('task_types', {}).get( | |||||
task_type, {}) | |||||
ensure_task_type(task_type, taskobj, task_cfg, scheduler) | |||||
def ensure_task_type(task_type, swhtask, task_config, scheduler): | |||||
"""Ensure a task-type is known by the scheduler | |||||
Args: | |||||
task_type (str): the type of the task to check/insert (correspond to | |||||
the 'type' field in the db) | |||||
swhtask (SWHTask): the SWHTask instance the task-type correspond to | |||||
task_config (dict): a dict with specific/overloaded values for the | |||||
task-type to be created | |||||
scheduler: the scheduler object used to access the scheduler db | |||||
""" | |||||
for suffix, defaults in DEFAULT_TASK_TYPE.items(): | |||||
if task_type.endswith('-' + suffix): | |||||
task_type_dict = defaults.copy() | |||||
break | |||||
else: | |||||
task_type_dict = DEFAULT_TASK_TYPE['*'].copy() | |||||
task_type_dict['type'] = task_type | |||||
task_type_dict['backend_name'] = swhtask.name | |||||
if swhtask.__doc__: | |||||
task_type_dict['description'] = swhtask.__doc__.splitlines()[0] | |||||
task_type_dict.update(task_config) | |||||
current_task_type = scheduler.get_task_type(task_type) | |||||
if current_task_type: | |||||
# check some stuff | |||||
if current_task_type['backend_name'] != task_type_dict['backend_name']: | |||||
logger.warning('Existing task type %s for lister %s has a ' | |||||
'different backend name than current ' | |||||
'code version provides (%s vs. %s)', | |||||
task_type, | |||||
lister, | |||||
current_task_type['backend_name'], | |||||
task_type_dict['backend_name'], | |||||
) | |||||
else: | |||||
logger.info('Create task type %s in scheduler', task_type) | |||||
logger.debug(' %s', task_type_dict) | |||||
scheduler.create_task_type(task_type_dict) | |||||
@lister.command(name='run', context_settings=CONTEXT_SETTINGS, | @lister.command(name='run', context_settings=CONTEXT_SETTINGS, | ||||
help='Trigger a full listing run for a particular forge ' | help='Trigger a full listing run for a particular forge ' | ||||
'instance. The output of this listing results in ' | 'instance. The output of this listing results in ' | ||||
'"oneshot" tasks in the scheduler db with a priority ' | '"oneshot" tasks in the scheduler db with a priority ' | ||||
'defined by the user') | 'defined by the user') | ||||
@click.option('--lister', '-l', help='Lister to run', | @click.option('--lister', '-l', help='Lister to run', | ||||
type=click.Choice(SUPPORTED_LISTERS)) | type=click.Choice(SUPPORTED_LISTERS)) | ||||
@click.option('--priority', '-p', default='high', | @click.option('--priority', '-p', default='high', | ||||
Show All 20 Lines |