Changeset View
Standalone View
swh/lister/cli.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | |||||
import logging | import logging | ||||
import pkg_resources | import pkg_resources | ||||
from copy import deepcopy | from copy import deepcopy | ||||
from importlib import import_module | |||||
import click | import click | ||||
from sqlalchemy import create_engine | from sqlalchemy import create_engine | ||||
from swh.core.cli import CONTEXT_SETTINGS | from swh.core.cli import CONTEXT_SETTINGS | ||||
from swh.scheduler import get_scheduler | |||||
from swh.scheduler.task import SWHTask | |||||
from swh.lister.core.models import initialize | from swh.lister.core.models import initialize | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
LISTERS = {entry_point.name.split('.', 1)[1]: entry_point | LISTERS = {entry_point.name.split('.', 1)[1]: entry_point | ||||
for entry_point in pkg_resources.iter_entry_points('swh.workers') | for entry_point in pkg_resources.iter_entry_points('swh.workers') | ||||
if entry_point.name.split('.', 1)[0] == 'lister'} | if entry_point.name.split('.', 1)[0] == 'lister'} | ||||
SUPPORTED_LISTERS = list(LISTERS) | SUPPORTED_LISTERS = list(LISTERS) | ||||
# the key in this dict is the suffix used to match new task-type to be added. | |||||
# For example for a task which function name is "list_gitlab_full', the default | |||||
# value used when inserting a new task-type in the scheduler db will be the one | |||||
# under the 'full' key below (because it matches xxx_full). | |||||
DEFAULT_TASK_TYPE = { | |||||
'full': { # for tasks like 'list_xxx_full()' | |||||
'default_interval': '90 days', | |||||
'min_interval': '90 days', | |||||
'max_interval': '90 days', | |||||
'backoff_factor': 1 | |||||
}, | |||||
'*': { # value if not suffix matches | |||||
'default_interval': '1 day', | |||||
'min_interval': '1 day', | |||||
'max_interval': '1 day', | |||||
'backoff_factor': 1 | |||||
}, | |||||
} | |||||
def get_lister(lister_name, db_url=None, **conf): | def get_lister(lister_name, db_url=None, **conf): | ||||
"""Instantiate a lister given its name. | """Instantiate a lister given its name. | ||||
Args: | Args: | ||||
lister_name (str): Lister's name | lister_name (str): Lister's name | ||||
conf (dict): Configuration dict (lister db cnx, policy, priority...) | conf (dict): Configuration dict (lister db cnx, policy, priority...) | ||||
Show All 29 Lines | def lister(ctx, config_file, db_url): | ||||
ctx.ensure_object(dict) | ctx.ensure_object(dict) | ||||
override_conf = {} | override_conf = {} | ||||
if db_url: | if db_url: | ||||
override_conf['lister'] = { | override_conf['lister'] = { | ||||
'cls': 'local', | 'cls': 'local', | ||||
'args': {'db': db_url} | 'args': {'db': db_url} | ||||
} | } | ||||
if not config_file: | |||||
config_file = os.environ.get('SWH_CONFIG_FILENAME') | |||||
conf = config.read(config_file, override_conf) | conf = config.read(config_file, override_conf) | ||||
ctx.obj['config'] = conf | ctx.obj['config'] = conf | ||||
ctx.obj['override_conf'] = override_conf | ctx.obj['override_conf'] = override_conf | ||||
@lister.command(name='db-init', context_settings=CONTEXT_SETTINGS) | @lister.command(name='db-init', context_settings=CONTEXT_SETTINGS) | ||||
@click.option('--drop-tables', '-D', is_flag=True, default=False, | @click.option('--drop-tables', '-D', is_flag=True, default=False, | ||||
help='Drop tables before creating the database schema') | help='Drop tables before creating the database schema') | ||||
@click.pass_context | @click.pass_context | ||||
def db_init(ctx, drop_tables): | def db_init(ctx, drop_tables): | ||||
"""Initialize the database model for given listers. | """Initialize the database model for given listers. | ||||
""" | """ | ||||
cfg = ctx.obj['config'] | cfg = ctx.obj['config'] | ||||
lister_cfg = cfg['lister'] | lister_cfg = cfg['lister'] | ||||
if lister_cfg['cls'] != 'local': | if lister_cfg['cls'] != 'local': | ||||
click.echo('A local lister configuration is required') | click.echo('A local lister configuration is required') | ||||
ctx.exit(1) | ctx.exit(1) | ||||
db_url = lister_cfg['args']['db'] | db_url = lister_cfg['args']['db'] | ||||
db_engine = create_engine(db_url) | db_engine = create_engine(db_url) | ||||
registry = {} | |||||
for lister, entrypoint in LISTERS.items(): | for lister, entrypoint in LISTERS.items(): | ||||
logger.info('Loading lister %s', lister) | logger.info('Loading lister %s', lister) | ||||
registry_entry = entrypoint.load()() | registry[lister] = entrypoint.load()() | ||||
logger.info('Initializing database') | logger.info('Initializing database') | ||||
initialize(db_engine, drop_tables) | initialize(db_engine, drop_tables) | ||||
for lister, entrypoint in LISTERS.items(): | for lister, entrypoint in LISTERS.items(): | ||||
registry_entry = registry[lister] | |||||
init_hook = registry_entry.get('init') | init_hook = registry_entry.get('init') | ||||
if callable(init_hook): | if callable(init_hook): | ||||
logger.info('Calling init hook for %s', lister) | logger.info('Calling init hook for %s', lister) | ||||
init_hook(db_engine) | init_hook(db_engine) | ||||
@lister.command(name='register-task-types', context_settings=CONTEXT_SETTINGS) | |||||
@click.option('--lister', '-l', 'listers', multiple=True, | |||||
default=('all', ), show_default=True, | |||||
help='Only registers task-types for these listers', | |||||
type=click.Choice(['all'] + SUPPORTED_LISTERS)) | |||||
@click.pass_context | |||||
def register_task_types(ctx, listers): | |||||
ardumont: I like this but should this be called in the `db-init` subcommand?
That will work for the… | |||||
Done Inline ActionsI am not yet sure where I want for this new feature/command to live. It might make sense to make it a dedicated command indeed (possibly called from db-init). For the production system, I guess one would prefer this 'upgrade' task to be done by hand, so I won't make it executed automagically somehow. douardda: I am not yet sure where I want for this new feature/command to live.
It might make sense to… | |||||
Not Done Inline Actions
Yes (i thought of that \m/ ;) subcommand: register-task or something?
yes, i'd prefer that the call be manual (at least at first ;) (Also, you made it idempotent already so that should be fine to use) ardumont: > It might make sense to make it a dedicated command indeed (possibly called from db-init). | |||||
"""Insert missing task-type entries in the scheduler | |||||
Not Done Inline ActionsAccording to ardumont: According to | |||||
According to declared tasks in each loaded lister plugin. | |||||
""" | |||||
Not Done Inline Actionskey does not convey much ;) ardumont: `key` does not convey much ;)
I had to scratch my head a bit to understand it was the lister's… | |||||
Done Inline Actionsagreed, could use something like pattern (but it's not a regex) of suffix maybe? douardda: agreed, could use something like `pattern` (but it's not a regex) of `suffix` maybe? | |||||
Not Done Inline Actionsyes, suffix sounds good. Might be adding a comment to explicit the suffix lives in a fixed set of {'full', 'incremental'} (as of now ;) ardumont: yes, suffix sounds good.
Might be adding a comment to explicit the suffix lives in a fixed set… | |||||
cfg = ctx.obj['config'] | |||||
scheduler = get_scheduler(**cfg['scheduler']) | |||||
for lister, entrypoint in LISTERS.items(): | |||||
if 'all' not in listers and lister not in listers: | |||||
continue | |||||
logger.info('Loading lister %s', lister) | |||||
registry_entry = entrypoint.load()() | |||||
for task_module in registry_entry['task_modules']: | |||||
mod = import_module(task_module) | |||||
for task_name in (x for x in dir(mod) if not x.startswith('_')): | |||||
taskobj = getattr(mod, task_name) | |||||
if isinstance(taskobj, SWHTask): | |||||
task_type = task_name.replace('_', '-') | |||||
task_cfg = registry_entry.get('task_types', {}).get( | |||||
task_type, {}) | |||||
ensure_task_type(task_type, taskobj, task_cfg, scheduler) | |||||
Not Done Inline Actionsyou could make that a """ docstring """ ;) ardumont: you could make that a """ docstring """ ;) | |||||
def ensure_task_type(task_type, swhtask, task_config, scheduler): | |||||
"""Ensure a task-type is known by the scheduler | |||||
Args: | |||||
task_type (str): the type of the task to check/insert (correspond to | |||||
the 'type' field in the db) | |||||
swhtask (SWHTask): the SWHTask instance the task-type correspond to | |||||
task_config (dict): a dict with specific/overloaded values for the | |||||
task-type to be created | |||||
scheduler: the scheduler object used to access the scheduler db | |||||
""" | |||||
for suffix, defaults in DEFAULT_TASK_TYPE.items(): | |||||
if task_type.endswith('-' + suffix): | |||||
task_type_dict = defaults.copy() | |||||
break | |||||
else: | |||||
task_type_dict = DEFAULT_TASK_TYPE['*'].copy() | |||||
task_type_dict['type'] = task_type | |||||
task_type_dict['backend_name'] = swhtask.name | |||||
if swhtask.__doc__: | |||||
task_type_dict['description'] = swhtask.__doc__.splitlines()[0] | |||||
task_type_dict.update(task_config) | |||||
current_task_type = scheduler.get_task_type(task_type) | |||||
if current_task_type: | |||||
# check some stuff | |||||
if current_task_type['backend_name'] != task_type_dict['backend_name']: | |||||
logger.warning('Existing task type %s for lister %s has a ' | |||||
'different backend name than current ' | |||||
'code version provides (%s vs. %s)', | |||||
task_type, | |||||
lister, | |||||
current_task_type['backend_name'], | |||||
task_type_dict['backend_name'], | |||||
) | |||||
else: | |||||
logger.info('Create task type %s in scheduler', task_type) | |||||
logger.debug(' %s', task_type_dict) | |||||
scheduler.create_task_type(task_type_dict) | |||||
@lister.command(name='run', context_settings=CONTEXT_SETTINGS, | @lister.command(name='run', context_settings=CONTEXT_SETTINGS, | ||||
help='Trigger a full listing run for a particular forge ' | help='Trigger a full listing run for a particular forge ' | ||||
'instance. The output of this listing results in ' | 'instance. The output of this listing results in ' | ||||
'"oneshot" tasks in the scheduler db with a priority ' | '"oneshot" tasks in the scheduler db with a priority ' | ||||
'defined by the user') | 'defined by the user') | ||||
@click.option('--lister', '-l', help='Lister to run', | @click.option('--lister', '-l', help='Lister to run', | ||||
type=click.Choice(SUPPORTED_LISTERS)) | type=click.Choice(SUPPORTED_LISTERS)) | ||||
@click.option('--priority', '-p', default='high', | @click.option('--priority', '-p', default='high', | ||||
Show All 20 Lines |
I like this but should this be called in the db-init subcommand?
That will work for the docker-dev environment but what about production?