Page MenuHomeSoftware Heritage

D2291.diff
No OneTemporary

D2291.diff

diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -3,3 +3,4 @@
pytest-postgresql >= 2.1.0
celery >= 4
hypothesis >= 3.11.0
+swh.lister
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -10,6 +10,7 @@
psycopg2
pyyaml
vcversioner
+setuptools
# test dependencies
# hypothesis
diff --git a/swh/scheduler/cli/task_type.py b/swh/scheduler/cli/task_type.py
--- a/swh/scheduler/cli/task_type.py
+++ b/swh/scheduler/cli/task_type.py
@@ -3,11 +3,42 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import celery.app.task
import click
+import logging
+from importlib import import_module
+from typing import Mapping
+
+from pkg_resources import iter_entry_points
from . import cli
+logger = logging.getLogger(__name__)
+
+
+DEFAULT_TASK_TYPE = {
+ 'full': { # for tasks like 'list_xxx_full()'
+ 'default_interval': '90 days',
+ 'min_interval': '90 days',
+ 'max_interval': '90 days',
+ 'backoff_factor': 1
+ },
+ '*': { # value if not suffix matches
+ 'default_interval': '1 day',
+ 'min_interval': '1 day',
+ 'max_interval': '1 day',
+ 'backoff_factor': 1
+ },
+}
+
+
+PLUGIN_WORKER_DESCRIPTIONS = {
+ entry_point.name: entry_point
+ for entry_point in iter_entry_points('swh.workers')
+}
+
+
@cli.group('task-type')
@click.pass_context
def task_type(ctx):
@@ -45,6 +76,91 @@
click.echo(tmpl.format(**tasktype))
+@task_type.command('register')
+@click.option('--plugins', '-p', 'plugins', multiple=True, default=('all', ),
+ type=click.Choice(['all'] + list(PLUGIN_WORKER_DESCRIPTIONS)),
+ help='Registers task-types for provided plugins. '
+ 'Defaults to all')
+@click.pass_context
+def register_task_types(ctx, plugins):
+ """Register missing task-type entries in the scheduler.
+
+ According to declared tasks in each loaded worker (e.g. lister, loader,
+ ...) plugins.
+
+ """
+ scheduler = ctx.obj['scheduler']
+
+ if plugins == ('all', ):
+ plugins = list(PLUGIN_WORKER_DESCRIPTIONS)
+
+ for plugin in plugins:
+ entrypoint = PLUGIN_WORKER_DESCRIPTIONS[plugin]
+ logger.info('Loading entrypoint for plugin %s', plugin)
+ registry_entry = entrypoint.load()()
+
+ for task_module in registry_entry['task_modules']:
+ mod = import_module(task_module)
+ for task_name in (x for x in dir(mod) if not x.startswith('_')):
+ logger.debug('Loading task name %s', task_name)
+ taskobj = getattr(mod, task_name)
+ if isinstance(taskobj, celery.app.task.Task):
+ tt_name = task_name.replace('_', '-')
+ task_cfg = registry_entry.get('task_types', {}).get(
+ tt_name, {})
+ ensure_task_type(
+ task_module, tt_name, taskobj, task_cfg, scheduler)
+
+
+def ensure_task_type(
+ task_module: str, task_type: str,
+ swhtask, task_config: Mapping, scheduler):
+ """Ensure a given task-type (for the task_module) exists in the scheduler.
+
+ Args:
+ task_module: task module we are currently checking for task type
+ consistency
+ task_type: the type of the task to check/insert (correspond to
+ the 'type' field in the db)
+ swhtask (SWHTask): the SWHTask instance the task-type correspond to
+ task_config: a dict with specific/overloaded values for the
+ task-type to be created
+ scheduler: the scheduler object used to access the scheduler db
+
+ """
+ for suffix, defaults in DEFAULT_TASK_TYPE.items():
+ if task_type.endswith('-' + suffix):
+ task_type_dict = defaults.copy()
+ break
+ else:
+ task_type_dict = DEFAULT_TASK_TYPE['*'].copy()
+
+ task_type_dict['type'] = task_type
+ task_type_dict['backend_name'] = swhtask.name
+ if swhtask.__doc__:
+ task_type_dict['description'] = swhtask.__doc__.splitlines()[0]
+
+ task_type_dict.update(task_config)
+
+ current_task_type = scheduler.get_task_type(task_type)
+ if current_task_type:
+ # Ensure the existing task_type is consistent in the scheduler
+ if current_task_type['backend_name'] != task_type_dict['backend_name']:
+ logger.warning(
+ 'Existing task type %s for module %s has a '
+ 'different backend name than current '
+ 'code version provides (%s vs. %s)',
+ task_type,
+ task_module,
+ current_task_type['backend_name'],
+ task_type_dict['backend_name'],
+ )
+ else:
+ logger.info('Create task type %s in scheduler', task_type)
+ logger.debug(' %s', task_type_dict)
+ scheduler.create_task_type(task_type_dict)
+
+
@task_type.command('add')
@click.argument('type', required=True)
@click.argument('task-name', required=True)
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -72,10 +72,11 @@
@pytest.fixture
-def swh_scheduler(postgresql):
+def swh_scheduler_config(request, postgresql):
scheduler_config = {
'db': postgresql.dsn,
}
+
all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey)
cursor = postgresql.cursor()
@@ -84,7 +85,12 @@
cursor.execute(fobj.read())
postgresql.commit()
- scheduler = get_scheduler('local', scheduler_config)
+ return scheduler_config
+
+
+@pytest.fixture
+def swh_scheduler(swh_scheduler_config):
+ scheduler = get_scheduler('local', swh_scheduler_config)
for taskname in TASK_NAMES:
scheduler.create_task_type({
'type': 'swh-test-{}'.format(taskname),
diff --git a/swh/scheduler/tests/test_cli_task_type.py b/swh/scheduler/tests/test_cli_task_type.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_cli_task_type.py
@@ -0,0 +1,120 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+import traceback
+import yaml
+import pkg_resources
+
+from click.testing import CliRunner
+
+from swh.scheduler import get_scheduler
+from swh.scheduler.cli import cli
+
+
+FAKE_MODULE_ENTRY_POINTS = {
+ 'lister.gnu=swh.lister.gnu:register',
+ 'lister.pypi=swh.lister.pypi:register',
+}
+
+
+@pytest.fixture
+def mock_pkg_resources(monkeypatch):
+ """Monkey patch swh.scheduler's mock_pkg_resources.iter_entry_point call
+
+ """
+ def fake_iter_entry_points(*args, **kwargs):
+ """Substitute fake function to return a fixed set of entrypoints
+
+ """
+ from pkg_resources import EntryPoint, Distribution
+ d = Distribution()
+ return [EntryPoint.parse(entry, dist=d)
+ for entry in FAKE_MODULE_ENTRY_POINTS]
+
+ original_method = pkg_resources.iter_entry_points
+ monkeypatch.setattr(
+ pkg_resources, "iter_entry_points", fake_iter_entry_points)
+
+ yield
+
+ # reset monkeypatch: is that needed?
+ monkeypatch.setattr(
+ pkg_resources, "iter_entry_points", original_method)
+
+
+@pytest.fixture
+def local_sched_config(swh_scheduler_config):
+ """Expose the local scheduler configuration
+
+ """
+ return {
+ 'scheduler': {
+ 'cls': 'local',
+ 'args': swh_scheduler_config
+ }
+ }
+
+
+@pytest.fixture
+def local_sched_configfile(local_sched_config, tmp_path):
+ """Write in temporary location the local scheduler configuration
+
+ """
+ configfile = tmp_path / 'config.yml'
+ configfile.write_text(yaml.dump(local_sched_config))
+ return configfile.as_posix()
+
+
+def test_register_ttypes_all(
+ mock_pkg_resources, local_sched_config, local_sched_configfile):
+ """Registering all task types"""
+
+ for command in [
+ ['--config-file', local_sched_configfile, 'task-type', 'register'],
+ ['--config-file', local_sched_configfile, 'task-type', 'register',
+ '-p', 'all'],
+ ['--config-file', local_sched_configfile, 'task-type', 'register',
+ '-p', 'lister.gnu',
+ '-p', 'lister.pypi'],
+ ]:
+ result = CliRunner().invoke(cli, command)
+
+ assert result.exit_code == 0, traceback.print_exception(
+ *result.exc_info)
+
+ scheduler = get_scheduler(**local_sched_config['scheduler'])
+ all_tasks = [
+ 'list-gnu-full',
+ 'list-pypi',
+ ]
+ for task in all_tasks:
+ task_type_desc = scheduler.get_task_type(task)
+ assert task_type_desc
+ assert task_type_desc['type'] == task
+ assert task_type_desc['backoff_factor'] == 1
+
+
+def test_register_ttypes_filter(
+ mock_pkg_resources, local_sched_config, local_sched_configfile):
+ """Filtering on one worker should only register its associated task type
+
+ """
+ result = CliRunner().invoke(cli, [
+ '--config-file', local_sched_configfile,
+ 'task-type', 'register', '--plugins', 'lister.gnu'
+ ])
+
+ assert result.exit_code == 0, traceback.print_exception(*result.exc_info)
+
+ scheduler = get_scheduler(**local_sched_config['scheduler'])
+ all_tasks = [
+ 'list-gnu-full',
+ ]
+ for task in all_tasks:
+ task_type_desc = scheduler.get_task_type(task)
+ assert task_type_desc
+ assert task_type_desc['type'] == task
+ assert task_type_desc['backoff_factor'] == 1

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 12:19 PM (2 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226894

Event Timeline