diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -3,3 +3,4 @@ pytest-postgresql >= 2.1.0 celery >= 4 hypothesis >= 3.11.0 +swh.lister diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ psycopg2 pyyaml vcversioner +setuptools # test dependencies # hypothesis diff --git a/swh/scheduler/__init__.py b/swh/scheduler/__init__.py --- a/swh/scheduler/__init__.py +++ b/swh/scheduler/__init__.py @@ -3,7 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict +from typing import Any, Dict, Mapping # Percentage of tasks with priority to schedule @@ -69,3 +69,14 @@ raise ValueError('Unknown swh.scheduler class `%s`' % cls) return SchedulerBackend(**args) + + +def list_plugin_worker_descriptions() -> Mapping[str, Any]: + """List plugin worker descriptions (declared in /setup.py) + + """ + from pkg_resources import iter_entry_points + return { + entry_point.name: entry_point + for entry_point in iter_entry_points('swh.workers') + } diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -5,7 +5,6 @@ import logging import os -import pkg_resources import urllib.parse from celery import Celery @@ -21,6 +20,7 @@ from typing import Any, Dict from swh.scheduler import CONFIG as SWH_CONFIG +from swh.scheduler import list_plugin_worker_descriptions from swh.core.config import load_named_config, merge_configs @@ -211,7 +211,7 @@ CONFIG.setdefault('task_modules', []) # load tasks modules declared as plugin entry points -for entrypoint in pkg_resources.iter_entry_points('swh.workers'): +for entrypoint in list_plugin_worker_descriptions().values(): worker_registrer_fn = entrypoint.load() # The registry function is expected to return a dict which the 'tasks' key # is a string (or a list of strings) with the name of the python module in diff --git a/swh/scheduler/cli/task_type.py b/swh/scheduler/cli/task_type.py --- a/swh/scheduler/cli/task_type.py +++ b/swh/scheduler/cli/task_type.py @@ -3,11 +3,39 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import celery.app.task import click +import logging +from importlib import import_module +from typing import Mapping + +from swh.scheduler import list_plugin_worker_descriptions from . import cli +logger = logging.getLogger(__name__) + + +DEFAULT_TASK_TYPE = { + 'full': { # for tasks like 'list_xxx_full()' + 'default_interval': '90 days', + 'min_interval': '90 days', + 'max_interval': '90 days', + 'backoff_factor': 1 + }, + '*': { # value if not suffix matches + 'default_interval': '1 day', + 'min_interval': '1 day', + 'max_interval': '1 day', + 'backoff_factor': 1 + }, +} + + +PLUGIN_WORKER_DESCRIPTIONS = list_plugin_worker_descriptions() + + @cli.group('task-type') @click.pass_context def task_type(ctx): @@ -45,6 +73,95 @@ click.echo(tmpl.format(**tasktype)) +@task_type.command('register') +@click.option('--plugins', '-p', 'plugins', multiple=True, default=('all', ), + type=click.Choice(['all'] + list(PLUGIN_WORKER_DESCRIPTIONS)), + help='Only registers task-types for provided plugins') +@click.pass_context +def register_task_types(ctx, plugins): + """Register missing task-type entries in the scheduler. + + According to declared tasks in each loaded worker (e.g. lister, loader, + ...) plugins. + + """ + scheduler = ctx.obj['scheduler'] + + supported_modules = list(PLUGIN_WORKER_DESCRIPTIONS) + + logger.debug('known modules: %s', supported_modules) + logger.debug('plugins: %s', plugins) + + if plugins == ('all', ): + plugins = supported_modules + + for plugin_module in plugins: + entrypoint = PLUGIN_WORKER_DESCRIPTIONS[plugin_module] + logger.info('Loading module %s', plugin_module) + registry_entry = entrypoint.load()() + + for task_module in registry_entry['task_modules']: + mod = import_module(task_module) + for task_name in (x for x in dir(mod) if not x.startswith('_')): + logger.debug('Loading task name %s', task_name) + taskobj = getattr(mod, task_name) + if isinstance(taskobj, celery.app.task.Task): + tt_name = task_name.replace('_', '-') + task_cfg = registry_entry.get('task_types', {}).get( + tt_name, {}) + ensure_task_type( + task_module, tt_name, taskobj, task_cfg, scheduler) + + +def ensure_task_type( + task_module: str, task_type: str, + swhtask, task_config: Mapping, scheduler): + """Ensure a given task-type (for the task_module) exists in the scheduler. + + Args: + task_module: task module we are currently checking for task type + consistency + task_type: the type of the task to check/insert (correspond to + the 'type' field in the db) + swhtask (SWHTask): the SWHTask instance the task-type correspond to + task_config: a dict with specific/overloaded values for the + task-type to be created + scheduler: the scheduler object used to access the scheduler db + + """ + for suffix, defaults in DEFAULT_TASK_TYPE.items(): + if task_type.endswith('-' + suffix): + task_type_dict = defaults.copy() + break + else: + task_type_dict = DEFAULT_TASK_TYPE['*'].copy() + + task_type_dict['type'] = task_type + task_type_dict['backend_name'] = swhtask.name + if swhtask.__doc__: + task_type_dict['description'] = swhtask.__doc__.splitlines()[0] + + task_type_dict.update(task_config) + + current_task_type = scheduler.get_task_type(task_type) + if current_task_type: + # Ensure the existing task_type is consistent in the scheduler + if current_task_type['backend_name'] != task_type_dict['backend_name']: + logger.warning( + 'Existing task type %s for module %s has a ' + 'different backend name than current ' + 'code version provides (%s vs. %s)', + task_type, + task_module, + current_task_type['backend_name'], + task_type_dict['backend_name'], + ) + else: + logger.info('Create task type %s in scheduler', task_type) + logger.debug(' %s', task_type_dict) + scheduler.create_task_type(task_type_dict) + + @task_type.command('add') @click.argument('type', required=True) @click.argument('task-name', required=True) diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -72,10 +72,11 @@ @pytest.fixture -def swh_scheduler(postgresql): +def swh_scheduler_config(request, postgresql): scheduler_config = { 'db': postgresql.dsn, } + all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() @@ -84,7 +85,12 @@ cursor.execute(fobj.read()) postgresql.commit() - scheduler = get_scheduler('local', scheduler_config) + return scheduler_config + + +@pytest.fixture +def swh_scheduler(swh_scheduler_config): + scheduler = get_scheduler('local', swh_scheduler_config) for taskname in TASK_NAMES: scheduler.create_task_type({ 'type': 'swh-test-{}'.format(taskname), diff --git a/swh/scheduler/tests/test_cli_task_type.py b/swh/scheduler/tests/test_cli_task_type.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_cli_task_type.py @@ -0,0 +1,125 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest +import traceback +import yaml +import pkg_resources + +from click.testing import CliRunner + +from swh.scheduler import get_scheduler +from swh.scheduler.cli import cli + + +FAKE_MODULE_ENTRY_POINTS = { + 'lister.gnu=swh.lister.gnu:register', + 'lister.pypi=swh.lister.pypi:register', +} + + +@pytest.fixture +def mock_pkg_resources(monkeypatch): + """Monkey patch swh.scheduler's mock_pkg_resources.iter_entry_point call + + """ + def fake_iter_entry_points(*args, **kwargs): + """Substitute fake function to return a fixed set of entrypoints + + """ + from pkg_resources import EntryPoint, Distribution + d = Distribution() + return [EntryPoint.parse(entry, dist=d) + for entry in FAKE_MODULE_ENTRY_POINTS] + + original_method = pkg_resources.iter_entry_points + monkeypatch.setattr( + pkg_resources, "iter_entry_points", fake_iter_entry_points) + + # Now making sure the monkeypatching step is working + from swh.scheduler.cli.task_type import list_plugin_worker_descriptions + assert len(list(list_plugin_worker_descriptions())) == len( + FAKE_MODULE_ENTRY_POINTS) + + yield + + # reset monkeypatch: is that needed? + monkeypatch.setattr( + pkg_resources, "iter_entry_points", original_method) + + +@pytest.fixture +def local_sched_config(swh_scheduler_config): + """Expose the local scheduler configuration + + """ + return { + 'scheduler': { + 'cls': 'local', + 'args': swh_scheduler_config + } + } + + +@pytest.fixture +def local_sched_configfile(local_sched_config, tmp_path): + """Write in temporary location the local scheduler configuration + + """ + configfile = tmp_path / 'config.yml' + configfile.write_text(yaml.dump(local_sched_config)) + return configfile.as_posix() + + +def test_register_ttypes_all( + mock_pkg_resources, local_sched_config, local_sched_configfile): + """Registering all task types""" + + for command in [ + ['--config-file', local_sched_configfile, 'task-type', 'register'], + ['--config-file', local_sched_configfile, 'task-type', 'register', + '-p', 'all'], + ['--config-file', local_sched_configfile, 'task-type', 'register', + '-p', 'lister.gnu', + '-p', 'lister.pypi'], + ]: + result = CliRunner().invoke(cli, command) + + assert result.exit_code == 0, traceback.print_exception( + *result.exc_info) + + scheduler = get_scheduler(**local_sched_config['scheduler']) + all_tasks = [ + 'list-gnu-full', + 'list-pypi', + ] + for task in all_tasks: + task_type_desc = scheduler.get_task_type(task) + assert task_type_desc + assert task_type_desc['type'] == task + assert task_type_desc['backoff_factor'] == 1 + + +def test_register_ttypes_filter( + mock_pkg_resources, local_sched_config, local_sched_configfile): + """Filtering on one worker should only register its associated task type + + """ + result = CliRunner().invoke(cli, [ + '--config-file', local_sched_configfile, + 'task-type', 'register', '--plugins', 'lister.gnu' + ]) + + assert result.exit_code == 0, traceback.print_exception(*result.exc_info) + + scheduler = get_scheduler(**local_sched_config['scheduler']) + all_tasks = [ + 'list-gnu-full', + ] + for task in all_tasks: + task_type_desc = scheduler.get_task_type(task) + assert task_type_desc + assert task_type_desc['type'] == task + assert task_type_desc['backoff_factor'] == 1