Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9341829
D2291.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
9 KB
Subscribers
None
D2291.diff
View Options
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -3,3 +3,4 @@
pytest-postgresql >= 2.1.0
celery >= 4
hypothesis >= 3.11.0
+swh.lister
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -10,6 +10,7 @@
psycopg2
pyyaml
vcversioner
+setuptools
# test dependencies
# hypothesis
diff --git a/swh/scheduler/cli/task_type.py b/swh/scheduler/cli/task_type.py
--- a/swh/scheduler/cli/task_type.py
+++ b/swh/scheduler/cli/task_type.py
@@ -3,11 +3,42 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import celery.app.task
import click
+import logging
+from importlib import import_module
+from typing import Mapping
+
+from pkg_resources import iter_entry_points
from . import cli
+logger = logging.getLogger(__name__)
+
+
+DEFAULT_TASK_TYPE = {
+ 'full': { # for tasks like 'list_xxx_full()'
+ 'default_interval': '90 days',
+ 'min_interval': '90 days',
+ 'max_interval': '90 days',
+ 'backoff_factor': 1
+ },
+ '*': { # value if not suffix matches
+ 'default_interval': '1 day',
+ 'min_interval': '1 day',
+ 'max_interval': '1 day',
+ 'backoff_factor': 1
+ },
+}
+
+
+PLUGIN_WORKER_DESCRIPTIONS = {
+ entry_point.name: entry_point
+ for entry_point in iter_entry_points('swh.workers')
+}
+
+
@cli.group('task-type')
@click.pass_context
def task_type(ctx):
@@ -45,6 +76,91 @@
click.echo(tmpl.format(**tasktype))
+@task_type.command('register')
+@click.option('--plugins', '-p', 'plugins', multiple=True, default=('all', ),
+ type=click.Choice(['all'] + list(PLUGIN_WORKER_DESCRIPTIONS)),
+ help='Registers task-types for provided plugins. '
+ 'Defaults to all')
+@click.pass_context
+def register_task_types(ctx, plugins):
+ """Register missing task-type entries in the scheduler.
+
+ According to declared tasks in each loaded worker (e.g. lister, loader,
+ ...) plugins.
+
+ """
+ scheduler = ctx.obj['scheduler']
+
+ if plugins == ('all', ):
+ plugins = list(PLUGIN_WORKER_DESCRIPTIONS)
+
+ for plugin in plugins:
+ entrypoint = PLUGIN_WORKER_DESCRIPTIONS[plugin]
+ logger.info('Loading entrypoint for plugin %s', plugin)
+ registry_entry = entrypoint.load()()
+
+ for task_module in registry_entry['task_modules']:
+ mod = import_module(task_module)
+ for task_name in (x for x in dir(mod) if not x.startswith('_')):
+ logger.debug('Loading task name %s', task_name)
+ taskobj = getattr(mod, task_name)
+ if isinstance(taskobj, celery.app.task.Task):
+ tt_name = task_name.replace('_', '-')
+ task_cfg = registry_entry.get('task_types', {}).get(
+ tt_name, {})
+ ensure_task_type(
+ task_module, tt_name, taskobj, task_cfg, scheduler)
+
+
+def ensure_task_type(
+ task_module: str, task_type: str,
+ swhtask, task_config: Mapping, scheduler):
+ """Ensure a given task-type (for the task_module) exists in the scheduler.
+
+ Args:
+ task_module: task module we are currently checking for task type
+ consistency
+ task_type: the type of the task to check/insert (correspond to
+ the 'type' field in the db)
+ swhtask (SWHTask): the SWHTask instance the task-type correspond to
+ task_config: a dict with specific/overloaded values for the
+ task-type to be created
+ scheduler: the scheduler object used to access the scheduler db
+
+ """
+ for suffix, defaults in DEFAULT_TASK_TYPE.items():
+ if task_type.endswith('-' + suffix):
+ task_type_dict = defaults.copy()
+ break
+ else:
+ task_type_dict = DEFAULT_TASK_TYPE['*'].copy()
+
+ task_type_dict['type'] = task_type
+ task_type_dict['backend_name'] = swhtask.name
+ if swhtask.__doc__:
+ task_type_dict['description'] = swhtask.__doc__.splitlines()[0]
+
+ task_type_dict.update(task_config)
+
+ current_task_type = scheduler.get_task_type(task_type)
+ if current_task_type:
+ # Ensure the existing task_type is consistent in the scheduler
+ if current_task_type['backend_name'] != task_type_dict['backend_name']:
+ logger.warning(
+ 'Existing task type %s for module %s has a '
+ 'different backend name than current '
+ 'code version provides (%s vs. %s)',
+ task_type,
+ task_module,
+ current_task_type['backend_name'],
+ task_type_dict['backend_name'],
+ )
+ else:
+ logger.info('Create task type %s in scheduler', task_type)
+ logger.debug(' %s', task_type_dict)
+ scheduler.create_task_type(task_type_dict)
+
+
@task_type.command('add')
@click.argument('type', required=True)
@click.argument('task-name', required=True)
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -72,10 +72,11 @@
@pytest.fixture
-def swh_scheduler(postgresql):
+def swh_scheduler_config(request, postgresql):
scheduler_config = {
'db': postgresql.dsn,
}
+
all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey)
cursor = postgresql.cursor()
@@ -84,7 +85,12 @@
cursor.execute(fobj.read())
postgresql.commit()
- scheduler = get_scheduler('local', scheduler_config)
+ return scheduler_config
+
+
+@pytest.fixture
+def swh_scheduler(swh_scheduler_config):
+ scheduler = get_scheduler('local', swh_scheduler_config)
for taskname in TASK_NAMES:
scheduler.create_task_type({
'type': 'swh-test-{}'.format(taskname),
diff --git a/swh/scheduler/tests/test_cli_task_type.py b/swh/scheduler/tests/test_cli_task_type.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_cli_task_type.py
@@ -0,0 +1,120 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+import traceback
+import yaml
+import pkg_resources
+
+from click.testing import CliRunner
+
+from swh.scheduler import get_scheduler
+from swh.scheduler.cli import cli
+
+
+FAKE_MODULE_ENTRY_POINTS = {
+ 'lister.gnu=swh.lister.gnu:register',
+ 'lister.pypi=swh.lister.pypi:register',
+}
+
+
+@pytest.fixture
+def mock_pkg_resources(monkeypatch):
+ """Monkey patch swh.scheduler's mock_pkg_resources.iter_entry_point call
+
+ """
+ def fake_iter_entry_points(*args, **kwargs):
+ """Substitute fake function to return a fixed set of entrypoints
+
+ """
+ from pkg_resources import EntryPoint, Distribution
+ d = Distribution()
+ return [EntryPoint.parse(entry, dist=d)
+ for entry in FAKE_MODULE_ENTRY_POINTS]
+
+ original_method = pkg_resources.iter_entry_points
+ monkeypatch.setattr(
+ pkg_resources, "iter_entry_points", fake_iter_entry_points)
+
+ yield
+
+ # reset monkeypatch: is that needed?
+ monkeypatch.setattr(
+ pkg_resources, "iter_entry_points", original_method)
+
+
+@pytest.fixture
+def local_sched_config(swh_scheduler_config):
+ """Expose the local scheduler configuration
+
+ """
+ return {
+ 'scheduler': {
+ 'cls': 'local',
+ 'args': swh_scheduler_config
+ }
+ }
+
+
+@pytest.fixture
+def local_sched_configfile(local_sched_config, tmp_path):
+ """Write in temporary location the local scheduler configuration
+
+ """
+ configfile = tmp_path / 'config.yml'
+ configfile.write_text(yaml.dump(local_sched_config))
+ return configfile.as_posix()
+
+
+def test_register_ttypes_all(
+ mock_pkg_resources, local_sched_config, local_sched_configfile):
+ """Registering all task types"""
+
+ for command in [
+ ['--config-file', local_sched_configfile, 'task-type', 'register'],
+ ['--config-file', local_sched_configfile, 'task-type', 'register',
+ '-p', 'all'],
+ ['--config-file', local_sched_configfile, 'task-type', 'register',
+ '-p', 'lister.gnu',
+ '-p', 'lister.pypi'],
+ ]:
+ result = CliRunner().invoke(cli, command)
+
+ assert result.exit_code == 0, traceback.print_exception(
+ *result.exc_info)
+
+ scheduler = get_scheduler(**local_sched_config['scheduler'])
+ all_tasks = [
+ 'list-gnu-full',
+ 'list-pypi',
+ ]
+ for task in all_tasks:
+ task_type_desc = scheduler.get_task_type(task)
+ assert task_type_desc
+ assert task_type_desc['type'] == task
+ assert task_type_desc['backoff_factor'] == 1
+
+
+def test_register_ttypes_filter(
+ mock_pkg_resources, local_sched_config, local_sched_configfile):
+ """Filtering on one worker should only register its associated task type
+
+ """
+ result = CliRunner().invoke(cli, [
+ '--config-file', local_sched_configfile,
+ 'task-type', 'register', '--plugins', 'lister.gnu'
+ ])
+
+ assert result.exit_code == 0, traceback.print_exception(*result.exc_info)
+
+ scheduler = get_scheduler(**local_sched_config['scheduler'])
+ all_tasks = [
+ 'list-gnu-full',
+ ]
+ for task in all_tasks:
+ task_type_desc = scheduler.get_task_type(task)
+ assert task_type_desc
+ assert task_type_desc['type'] == task
+ assert task_type_desc['backoff_factor'] == 1
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 12:19 PM (2 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226894
Attached To
D2291: swh.scheduler.cli: Add `swh scheduler task-type register` cli
Event Timeline
Log In to Comment