diff --git a/requirements-test.txt b/requirements-test.txt index 539ca68..2df755a 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,5 +1,6 @@ # pytest<4 because of https://github.com/pytest-dev/pytest/issues/4641 pytest < 4 pytest-postgresql >= 2.1.0 celery >= 4 hypothesis >= 3.11.0 +swh.lister diff --git a/requirements.txt b/requirements.txt index 657dc3f..2829d44 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,15 +1,16 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html arrow celery >= 4 Click elasticsearch > 5.4 flask psycopg2 pyyaml vcversioner +setuptools # test dependencies # hypothesis diff --git a/swh/scheduler/cli/task_type.py b/swh/scheduler/cli/task_type.py index 2a84782..5da5ee2 100644 --- a/swh/scheduler/cli/task_type.py +++ b/swh/scheduler/cli/task_type.py @@ -1,82 +1,198 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import celery.app.task import click +import logging +from importlib import import_module +from typing import Mapping + +from pkg_resources import iter_entry_points from . import cli +logger = logging.getLogger(__name__) + + +DEFAULT_TASK_TYPE = { + 'full': { # for tasks like 'list_xxx_full()' + 'default_interval': '90 days', + 'min_interval': '90 days', + 'max_interval': '90 days', + 'backoff_factor': 1 + }, + '*': { # value if not suffix matches + 'default_interval': '1 day', + 'min_interval': '1 day', + 'max_interval': '1 day', + 'backoff_factor': 1 + }, +} + + +PLUGIN_WORKER_DESCRIPTIONS = { + entry_point.name: entry_point + for entry_point in iter_entry_points('swh.workers') +} + + @cli.group('task-type') @click.pass_context def task_type(ctx): """Manipulate task types.""" pass @task_type.command('list') @click.option('--verbose', '-v', is_flag=True, default=False, help='Verbose mode') @click.option('--task_type', '-t', multiple=True, default=None, help='List task types of given type') @click.option('--task_name', '-n', multiple=True, default=None, help='List task types of given backend task name') @click.pass_context def list_task_types(ctx, verbose, task_type, task_name): click.echo("Known task types:") if verbose: tmpl = click.style('{type}: ', bold=True) + '''{backend_name} {description} interval: {default_interval} [{min_interval}, {max_interval}] backoff_factor: {backoff_factor} max_queue_length: {max_queue_length} num_retries: {num_retries} retry_delay: {retry_delay} ''' else: tmpl = '{type}:\n {description}' for tasktype in sorted(ctx.obj['scheduler'].get_task_types(), key=lambda x: x['type']): if task_type and tasktype['type'] not in task_type: continue if task_name and tasktype['backend_name'] not in task_name: continue click.echo(tmpl.format(**tasktype)) +@task_type.command('register') +@click.option('--plugins', '-p', 'plugins', multiple=True, default=('all', ), + type=click.Choice(['all'] + list(PLUGIN_WORKER_DESCRIPTIONS)), + help='Registers task-types for provided plugins. ' + 'Defaults to all') +@click.pass_context +def register_task_types(ctx, plugins): + """Register missing task-type entries in the scheduler. + + According to declared tasks in each loaded worker (e.g. lister, loader, + ...) plugins. + + """ + scheduler = ctx.obj['scheduler'] + + if plugins == ('all', ): + plugins = list(PLUGIN_WORKER_DESCRIPTIONS) + + for plugin in plugins: + entrypoint = PLUGIN_WORKER_DESCRIPTIONS[plugin] + logger.info('Loading entrypoint for plugin %s', plugin) + registry_entry = entrypoint.load()() + + for task_module in registry_entry['task_modules']: + mod = import_module(task_module) + for task_name in (x for x in dir(mod) if not x.startswith('_')): + logger.debug('Loading task name %s', task_name) + taskobj = getattr(mod, task_name) + if isinstance(taskobj, celery.app.task.Task): + tt_name = task_name.replace('_', '-') + task_cfg = registry_entry.get('task_types', {}).get( + tt_name, {}) + ensure_task_type( + task_module, tt_name, taskobj, task_cfg, scheduler) + + +def ensure_task_type( + task_module: str, task_type: str, + swhtask, task_config: Mapping, scheduler): + """Ensure a given task-type (for the task_module) exists in the scheduler. + + Args: + task_module: task module we are currently checking for task type + consistency + task_type: the type of the task to check/insert (correspond to + the 'type' field in the db) + swhtask (SWHTask): the SWHTask instance the task-type correspond to + task_config: a dict with specific/overloaded values for the + task-type to be created + scheduler: the scheduler object used to access the scheduler db + + """ + for suffix, defaults in DEFAULT_TASK_TYPE.items(): + if task_type.endswith('-' + suffix): + task_type_dict = defaults.copy() + break + else: + task_type_dict = DEFAULT_TASK_TYPE['*'].copy() + + task_type_dict['type'] = task_type + task_type_dict['backend_name'] = swhtask.name + if swhtask.__doc__: + task_type_dict['description'] = swhtask.__doc__.splitlines()[0] + + task_type_dict.update(task_config) + + current_task_type = scheduler.get_task_type(task_type) + if current_task_type: + # Ensure the existing task_type is consistent in the scheduler + if current_task_type['backend_name'] != task_type_dict['backend_name']: + logger.warning( + 'Existing task type %s for module %s has a ' + 'different backend name than current ' + 'code version provides (%s vs. %s)', + task_type, + task_module, + current_task_type['backend_name'], + task_type_dict['backend_name'], + ) + else: + logger.info('Create task type %s in scheduler', task_type) + logger.debug(' %s', task_type_dict) + scheduler.create_task_type(task_type_dict) + + @task_type.command('add') @click.argument('type', required=True) @click.argument('task-name', required=True) @click.argument('description', required=True) @click.option('--default-interval', '-i', default='90 days', help='Default interval ("90 days" by default)') @click.option('--min-interval', default=None, help='Minimum interval (default interval if not set)') @click.option('--max-interval', '-i', default=None, help='Maximal interval (default interval if not set)') @click.option('--backoff-factor', '-f', type=float, default=1, help='Backoff factor') @click.pass_context def add_task_type(ctx, type, task_name, description, default_interval, min_interval, max_interval, backoff_factor): """Create a new task type """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') task_type = dict( type=type, backend_name=task_name, description=description, default_interval=default_interval, min_interval=min_interval, max_interval=max_interval, backoff_factor=backoff_factor, max_queue_length=None, num_retries=None, retry_delay=None, ) scheduler.create_task_type(task_type) click.echo('OK') diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index 118c543..e96f15f 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,103 +1,109 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import glob from datetime import timedelta import pkg_resources from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler import get_scheduler from swh.scheduler.tests import SQL_DIR # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith('CELERY')]: os.environ.pop(var) # test_cli tests depends on a en/C locale, so ensure it os.environ['LC_ALL'] = 'C.UTF-8' DUMP_FILES = os.path.join(SQL_DIR, '*.sql') # celery tasks for testing purpose; tasks themselves should be # in swh/scheduler/tests/tasks.py TASK_NAMES = ['ping', 'multiping', 'add', 'error'] @pytest.fixture(scope='session') def celery_enable_logging(): return True @pytest.fixture(scope='session') def celery_includes(): task_modules = [ 'swh.scheduler.tests.tasks', ] for entrypoint in pkg_resources.iter_entry_points('swh.workers'): task_modules.extend(entrypoint.load()().get('task_modules', [])) return task_modules @pytest.fixture(scope='session') def celery_parameters(): return { 'task_cls': 'swh.scheduler.task:SWHTask', } @pytest.fixture(scope='session') def celery_config(): return { 'accept_content': ['application/x-msgpack', 'application/json'], 'task_serializer': 'msgpack', 'result_serializer': 'json', } # use the celery_session_app fixture to monkeypatch the 'main' # swh.scheduler.celery_backend.config.app Celery application # with the test application @pytest.fixture(scope='session') def swh_app(celery_session_app): from swh.scheduler.celery_backend import config config.app = celery_session_app yield celery_session_app @pytest.fixture -def swh_scheduler(postgresql): +def swh_scheduler_config(request, postgresql): scheduler_config = { 'db': postgresql.dsn, } + all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() for fname in all_dump_files: with open(fname) as fobj: cursor.execute(fobj.read()) postgresql.commit() - scheduler = get_scheduler('local', scheduler_config) + return scheduler_config + + +@pytest.fixture +def swh_scheduler(swh_scheduler_config): + scheduler = get_scheduler('local', swh_scheduler_config) for taskname in TASK_NAMES: scheduler.create_task_type({ 'type': 'swh-test-{}'.format(taskname), 'description': 'The {} testing task'.format(taskname), 'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname), 'default_interval': timedelta(days=1), 'min_interval': timedelta(hours=6), 'max_interval': timedelta(days=12), }) return scheduler # this alias is used to be able to easily instantiate a db-backed Scheduler # eg. for the RPC client/server test suite. swh_db_scheduler = swh_scheduler diff --git a/swh/scheduler/tests/test_cli_task_type.py b/swh/scheduler/tests/test_cli_task_type.py new file mode 100644 index 0000000..0a762b5 --- /dev/null +++ b/swh/scheduler/tests/test_cli_task_type.py @@ -0,0 +1,120 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest +import traceback +import yaml +import pkg_resources + +from click.testing import CliRunner + +from swh.scheduler import get_scheduler +from swh.scheduler.cli import cli + + +FAKE_MODULE_ENTRY_POINTS = { + 'lister.gnu=swh.lister.gnu:register', + 'lister.pypi=swh.lister.pypi:register', +} + + +@pytest.fixture +def mock_pkg_resources(monkeypatch): + """Monkey patch swh.scheduler's mock_pkg_resources.iter_entry_point call + + """ + def fake_iter_entry_points(*args, **kwargs): + """Substitute fake function to return a fixed set of entrypoints + + """ + from pkg_resources import EntryPoint, Distribution + d = Distribution() + return [EntryPoint.parse(entry, dist=d) + for entry in FAKE_MODULE_ENTRY_POINTS] + + original_method = pkg_resources.iter_entry_points + monkeypatch.setattr( + pkg_resources, "iter_entry_points", fake_iter_entry_points) + + yield + + # reset monkeypatch: is that needed? + monkeypatch.setattr( + pkg_resources, "iter_entry_points", original_method) + + +@pytest.fixture +def local_sched_config(swh_scheduler_config): + """Expose the local scheduler configuration + + """ + return { + 'scheduler': { + 'cls': 'local', + 'args': swh_scheduler_config + } + } + + +@pytest.fixture +def local_sched_configfile(local_sched_config, tmp_path): + """Write in temporary location the local scheduler configuration + + """ + configfile = tmp_path / 'config.yml' + configfile.write_text(yaml.dump(local_sched_config)) + return configfile.as_posix() + + +def test_register_ttypes_all( + mock_pkg_resources, local_sched_config, local_sched_configfile): + """Registering all task types""" + + for command in [ + ['--config-file', local_sched_configfile, 'task-type', 'register'], + ['--config-file', local_sched_configfile, 'task-type', 'register', + '-p', 'all'], + ['--config-file', local_sched_configfile, 'task-type', 'register', + '-p', 'lister.gnu', + '-p', 'lister.pypi'], + ]: + result = CliRunner().invoke(cli, command) + + assert result.exit_code == 0, traceback.print_exception( + *result.exc_info) + + scheduler = get_scheduler(**local_sched_config['scheduler']) + all_tasks = [ + 'list-gnu-full', + 'list-pypi', + ] + for task in all_tasks: + task_type_desc = scheduler.get_task_type(task) + assert task_type_desc + assert task_type_desc['type'] == task + assert task_type_desc['backoff_factor'] == 1 + + +def test_register_ttypes_filter( + mock_pkg_resources, local_sched_config, local_sched_configfile): + """Filtering on one worker should only register its associated task type + + """ + result = CliRunner().invoke(cli, [ + '--config-file', local_sched_configfile, + 'task-type', 'register', '--plugins', 'lister.gnu' + ]) + + assert result.exit_code == 0, traceback.print_exception(*result.exc_info) + + scheduler = get_scheduler(**local_sched_config['scheduler']) + all_tasks = [ + 'list-gnu-full', + ] + for task in all_tasks: + task_type_desc = scheduler.get_task_type(task) + assert task_type_desc + assert task_type_desc['type'] == task + assert task_type_desc['backoff_factor'] == 1