diff --git a/PKG-INFO b/PKG-INFO index 84b01ba..05541a9 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.62 +Version: 0.0.63 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements.txt b/requirements.txt index 657dc3f..2829d44 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,15 +1,16 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html arrow celery >= 4 Click elasticsearch > 5.4 flask psycopg2 pyyaml vcversioner +setuptools # test dependencies # hypothesis diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 84b01ba..05541a9 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.62 +Version: 0.0.63 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index 20837c6..9afe090 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,64 +1,65 @@ MANIFEST.in Makefile README.md requirements-swh.txt requirements.txt setup.py version.txt bin/swh-worker-control swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/backend_es.py swh/scheduler/cli_utils.py swh/scheduler/py.typed swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py swh/scheduler/api/server.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/runner.py swh/scheduler/cli/__init__.py swh/scheduler/cli/admin.py swh/scheduler/cli/task.py swh/scheduler/cli/task_type.py swh/scheduler/cli/utils.py swh/scheduler/sql/30-swh-schema.sql swh/scheduler/sql/40-swh-func.sql swh/scheduler/sql/50-swh-data.sql swh/scheduler/sql/60-swh-indexes.sql swh/scheduler/sql/updater/10-swh-init.sql swh/scheduler/sql/updater/30-swh-schema.sql swh/scheduler/sql/updater/40-swh-func.sql swh/scheduler/tests/__init__.py swh/scheduler/tests/conftest.py swh/scheduler/tests/tasks.py swh/scheduler/tests/test_api_client.py swh/scheduler/tests/test_celery_tasks.py swh/scheduler/tests/test_cli.py +swh/scheduler/tests/test_cli_task_type.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_server.py swh/scheduler/tests/test_utils.py swh/scheduler/tests/updater/__init__.py swh/scheduler/tests/updater/conftest.py swh/scheduler/tests/updater/test_backend.py swh/scheduler/tests/updater/test_consumer.py swh/scheduler/tests/updater/test_events.py swh/scheduler/tests/updater/test_ghtorrent.py swh/scheduler/tests/updater/test_writer.py swh/scheduler/updater/__init__.py swh/scheduler/updater/backend.py swh/scheduler/updater/consumer.py swh/scheduler/updater/events.py swh/scheduler/updater/writer.py swh/scheduler/updater/ghtorrent/__init__.py swh/scheduler/updater/ghtorrent/cli.py \ No newline at end of file diff --git a/swh.scheduler.egg-info/requires.txt b/swh.scheduler.egg-info/requires.txt index 857239a..774c8c1 100644 --- a/swh.scheduler.egg-info/requires.txt +++ b/swh.scheduler.egg-info/requires.txt @@ -1,16 +1,18 @@ arrow celery>=4 Click elasticsearch>5.4 flask psycopg2 pyyaml vcversioner +setuptools swh.core[db,http]>=0.0.65 swh.storage>=0.0.129 [testing] pytest<4 pytest-postgresql>=2.1.0 celery>=4 hypothesis>=3.11.0 +swh.lister diff --git a/swh/scheduler/cli/task_type.py b/swh/scheduler/cli/task_type.py index 2a84782..5da5ee2 100644 --- a/swh/scheduler/cli/task_type.py +++ b/swh/scheduler/cli/task_type.py @@ -1,82 +1,198 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import celery.app.task import click +import logging +from importlib import import_module +from typing import Mapping + +from pkg_resources import iter_entry_points from . import cli +logger = logging.getLogger(__name__) + + +DEFAULT_TASK_TYPE = { + 'full': { # for tasks like 'list_xxx_full()' + 'default_interval': '90 days', + 'min_interval': '90 days', + 'max_interval': '90 days', + 'backoff_factor': 1 + }, + '*': { # value if not suffix matches + 'default_interval': '1 day', + 'min_interval': '1 day', + 'max_interval': '1 day', + 'backoff_factor': 1 + }, +} + + +PLUGIN_WORKER_DESCRIPTIONS = { + entry_point.name: entry_point + for entry_point in iter_entry_points('swh.workers') +} + + @cli.group('task-type') @click.pass_context def task_type(ctx): """Manipulate task types.""" pass @task_type.command('list') @click.option('--verbose', '-v', is_flag=True, default=False, help='Verbose mode') @click.option('--task_type', '-t', multiple=True, default=None, help='List task types of given type') @click.option('--task_name', '-n', multiple=True, default=None, help='List task types of given backend task name') @click.pass_context def list_task_types(ctx, verbose, task_type, task_name): click.echo("Known task types:") if verbose: tmpl = click.style('{type}: ', bold=True) + '''{backend_name} {description} interval: {default_interval} [{min_interval}, {max_interval}] backoff_factor: {backoff_factor} max_queue_length: {max_queue_length} num_retries: {num_retries} retry_delay: {retry_delay} ''' else: tmpl = '{type}:\n {description}' for tasktype in sorted(ctx.obj['scheduler'].get_task_types(), key=lambda x: x['type']): if task_type and tasktype['type'] not in task_type: continue if task_name and tasktype['backend_name'] not in task_name: continue click.echo(tmpl.format(**tasktype)) +@task_type.command('register') +@click.option('--plugins', '-p', 'plugins', multiple=True, default=('all', ), + type=click.Choice(['all'] + list(PLUGIN_WORKER_DESCRIPTIONS)), + help='Registers task-types for provided plugins. ' + 'Defaults to all') +@click.pass_context +def register_task_types(ctx, plugins): + """Register missing task-type entries in the scheduler. + + According to declared tasks in each loaded worker (e.g. lister, loader, + ...) plugins. + + """ + scheduler = ctx.obj['scheduler'] + + if plugins == ('all', ): + plugins = list(PLUGIN_WORKER_DESCRIPTIONS) + + for plugin in plugins: + entrypoint = PLUGIN_WORKER_DESCRIPTIONS[plugin] + logger.info('Loading entrypoint for plugin %s', plugin) + registry_entry = entrypoint.load()() + + for task_module in registry_entry['task_modules']: + mod = import_module(task_module) + for task_name in (x for x in dir(mod) if not x.startswith('_')): + logger.debug('Loading task name %s', task_name) + taskobj = getattr(mod, task_name) + if isinstance(taskobj, celery.app.task.Task): + tt_name = task_name.replace('_', '-') + task_cfg = registry_entry.get('task_types', {}).get( + tt_name, {}) + ensure_task_type( + task_module, tt_name, taskobj, task_cfg, scheduler) + + +def ensure_task_type( + task_module: str, task_type: str, + swhtask, task_config: Mapping, scheduler): + """Ensure a given task-type (for the task_module) exists in the scheduler. + + Args: + task_module: task module we are currently checking for task type + consistency + task_type: the type of the task to check/insert (correspond to + the 'type' field in the db) + swhtask (SWHTask): the SWHTask instance the task-type correspond to + task_config: a dict with specific/overloaded values for the + task-type to be created + scheduler: the scheduler object used to access the scheduler db + + """ + for suffix, defaults in DEFAULT_TASK_TYPE.items(): + if task_type.endswith('-' + suffix): + task_type_dict = defaults.copy() + break + else: + task_type_dict = DEFAULT_TASK_TYPE['*'].copy() + + task_type_dict['type'] = task_type + task_type_dict['backend_name'] = swhtask.name + if swhtask.__doc__: + task_type_dict['description'] = swhtask.__doc__.splitlines()[0] + + task_type_dict.update(task_config) + + current_task_type = scheduler.get_task_type(task_type) + if current_task_type: + # Ensure the existing task_type is consistent in the scheduler + if current_task_type['backend_name'] != task_type_dict['backend_name']: + logger.warning( + 'Existing task type %s for module %s has a ' + 'different backend name than current ' + 'code version provides (%s vs. %s)', + task_type, + task_module, + current_task_type['backend_name'], + task_type_dict['backend_name'], + ) + else: + logger.info('Create task type %s in scheduler', task_type) + logger.debug(' %s', task_type_dict) + scheduler.create_task_type(task_type_dict) + + @task_type.command('add') @click.argument('type', required=True) @click.argument('task-name', required=True) @click.argument('description', required=True) @click.option('--default-interval', '-i', default='90 days', help='Default interval ("90 days" by default)') @click.option('--min-interval', default=None, help='Minimum interval (default interval if not set)') @click.option('--max-interval', '-i', default=None, help='Maximal interval (default interval if not set)') @click.option('--backoff-factor', '-f', type=float, default=1, help='Backoff factor') @click.pass_context def add_task_type(ctx, type, task_name, description, default_interval, min_interval, max_interval, backoff_factor): """Create a new task type """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') task_type = dict( type=type, backend_name=task_name, description=description, default_interval=default_interval, min_interval=min_interval, max_interval=max_interval, backoff_factor=backoff_factor, max_queue_length=None, num_retries=None, retry_delay=None, ) scheduler.create_task_type(task_type) click.echo('OK') diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index fbfb5d2..e96f15f 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,105 +1,109 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import glob from datetime import timedelta import pkg_resources from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler import get_scheduler from swh.scheduler.tests import SQL_DIR -from swh.scheduler.tests.tasks import register_test_tasks # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith('CELERY')]: os.environ.pop(var) # test_cli tests depends on a en/C locale, so ensure it os.environ['LC_ALL'] = 'C.UTF-8' DUMP_FILES = os.path.join(SQL_DIR, '*.sql') # celery tasks for testing purpose; tasks themselves should be # in swh/scheduler/tests/tasks.py TASK_NAMES = ['ping', 'multiping', 'add', 'error'] @pytest.fixture(scope='session') def celery_enable_logging(): return True @pytest.fixture(scope='session') def celery_includes(): task_modules = [ 'swh.scheduler.tests.tasks', ] for entrypoint in pkg_resources.iter_entry_points('swh.workers'): task_modules.extend(entrypoint.load()().get('task_modules', [])) return task_modules @pytest.fixture(scope='session') def celery_parameters(): return { 'task_cls': 'swh.scheduler.task:SWHTask', } @pytest.fixture(scope='session') def celery_config(): return { 'accept_content': ['application/x-msgpack', 'application/json'], 'task_serializer': 'msgpack', 'result_serializer': 'json', } -# override the celery_session_app fixture to monkeypatch the 'main' +# use the celery_session_app fixture to monkeypatch the 'main' # swh.scheduler.celery_backend.config.app Celery application -# with the test application (and also register test tasks) +# with the test application @pytest.fixture(scope='session') def swh_app(celery_session_app): - from swh.scheduler.celery_backend.config import app - register_test_tasks(celery_session_app) - app = celery_session_app # noqa - yield app + from swh.scheduler.celery_backend import config + config.app = celery_session_app + yield celery_session_app @pytest.fixture -def swh_scheduler(postgresql): +def swh_scheduler_config(request, postgresql): scheduler_config = { 'db': postgresql.dsn, } + all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() for fname in all_dump_files: with open(fname) as fobj: cursor.execute(fobj.read()) postgresql.commit() - scheduler = get_scheduler('local', scheduler_config) + return scheduler_config + + +@pytest.fixture +def swh_scheduler(swh_scheduler_config): + scheduler = get_scheduler('local', swh_scheduler_config) for taskname in TASK_NAMES: scheduler.create_task_type({ 'type': 'swh-test-{}'.format(taskname), 'description': 'The {} testing task'.format(taskname), 'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname), 'default_interval': timedelta(days=1), 'min_interval': timedelta(hours=6), 'max_interval': timedelta(days=12), }) return scheduler # this alias is used to be able to easily instantiate a db-backed Scheduler # eg. for the RPC client/server test suite. swh_db_scheduler = swh_scheduler diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py index 89f7476..be7628d 100644 --- a/swh/scheduler/tests/tasks.py +++ b/swh/scheduler/tests/tasks.py @@ -1,43 +1,36 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from celery import group - - -def register_test_tasks(app): - """Register test tasks for the specific app passed as parameter. - - In the test context, app is the swh_app and not the runtime one. - - Args: - app: Celery app. Expects the tests application - (swh.scheduler.tests.conftest.swh_app) - - """ - @app.task(name='swh.scheduler.tests.tasks.ping', bind=True) - def ping(self, **kw): - # check this is a SWHTask - assert hasattr(self, 'log') - assert not hasattr(self, 'run_task') - assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] - self.log.debug(self.name) - if kw: - return 'OK (kw=%s)' % kw - return 'OK' - - @app.task(name='swh.scheduler.tests.tasks.multiping', bind=True) - def multiping(self, n=10): - promise = group(ping.s(i=i) for i in range(n))() - self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) - promise.save() - return promise.id - - @app.task(name='swh.scheduler.tests.tasks.error') - def not_implemented(): - raise NotImplementedError('Nope') - - @app.task(name='swh.scheduler.tests.tasks.add') - def add(x, y): - return x + y +from celery import group, shared_task + + +@shared_task(name='swh.scheduler.tests.tasks.ping', bind=True) +def ping(self, **kw): + # check this is a SWHTask + assert hasattr(self, 'log') + assert not hasattr(self, 'run_task') + assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] + self.log.debug(self.name) + if kw: + return 'OK (kw=%s)' % kw + return 'OK' + + +@shared_task(name='swh.scheduler.tests.tasks.multiping', bind=True) +def multiping(self, n=10): + promise = group(ping.s(i=i) for i in range(n))() + self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) + promise.save() + return promise.id + + +@shared_task(name='swh.scheduler.tests.tasks.error') +def not_implemented(): + raise NotImplementedError('Nope') + + +@shared_task(name='swh.scheduler.tests.tasks.add') +def add(x, y): + return x + y diff --git a/swh/scheduler/tests/test_cli_task_type.py b/swh/scheduler/tests/test_cli_task_type.py new file mode 100644 index 0000000..0a762b5 --- /dev/null +++ b/swh/scheduler/tests/test_cli_task_type.py @@ -0,0 +1,120 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest +import traceback +import yaml +import pkg_resources + +from click.testing import CliRunner + +from swh.scheduler import get_scheduler +from swh.scheduler.cli import cli + + +FAKE_MODULE_ENTRY_POINTS = { + 'lister.gnu=swh.lister.gnu:register', + 'lister.pypi=swh.lister.pypi:register', +} + + +@pytest.fixture +def mock_pkg_resources(monkeypatch): + """Monkey patch swh.scheduler's mock_pkg_resources.iter_entry_point call + + """ + def fake_iter_entry_points(*args, **kwargs): + """Substitute fake function to return a fixed set of entrypoints + + """ + from pkg_resources import EntryPoint, Distribution + d = Distribution() + return [EntryPoint.parse(entry, dist=d) + for entry in FAKE_MODULE_ENTRY_POINTS] + + original_method = pkg_resources.iter_entry_points + monkeypatch.setattr( + pkg_resources, "iter_entry_points", fake_iter_entry_points) + + yield + + # reset monkeypatch: is that needed? + monkeypatch.setattr( + pkg_resources, "iter_entry_points", original_method) + + +@pytest.fixture +def local_sched_config(swh_scheduler_config): + """Expose the local scheduler configuration + + """ + return { + 'scheduler': { + 'cls': 'local', + 'args': swh_scheduler_config + } + } + + +@pytest.fixture +def local_sched_configfile(local_sched_config, tmp_path): + """Write in temporary location the local scheduler configuration + + """ + configfile = tmp_path / 'config.yml' + configfile.write_text(yaml.dump(local_sched_config)) + return configfile.as_posix() + + +def test_register_ttypes_all( + mock_pkg_resources, local_sched_config, local_sched_configfile): + """Registering all task types""" + + for command in [ + ['--config-file', local_sched_configfile, 'task-type', 'register'], + ['--config-file', local_sched_configfile, 'task-type', 'register', + '-p', 'all'], + ['--config-file', local_sched_configfile, 'task-type', 'register', + '-p', 'lister.gnu', + '-p', 'lister.pypi'], + ]: + result = CliRunner().invoke(cli, command) + + assert result.exit_code == 0, traceback.print_exception( + *result.exc_info) + + scheduler = get_scheduler(**local_sched_config['scheduler']) + all_tasks = [ + 'list-gnu-full', + 'list-pypi', + ] + for task in all_tasks: + task_type_desc = scheduler.get_task_type(task) + assert task_type_desc + assert task_type_desc['type'] == task + assert task_type_desc['backoff_factor'] == 1 + + +def test_register_ttypes_filter( + mock_pkg_resources, local_sched_config, local_sched_configfile): + """Filtering on one worker should only register its associated task type + + """ + result = CliRunner().invoke(cli, [ + '--config-file', local_sched_configfile, + 'task-type', 'register', '--plugins', 'lister.gnu' + ]) + + assert result.exit_code == 0, traceback.print_exception(*result.exc_info) + + scheduler = get_scheduler(**local_sched_config['scheduler']) + all_tasks = [ + 'list-gnu-full', + ] + for task in all_tasks: + task_type_desc = scheduler.get_task_type(task) + assert task_type_desc + assert task_type_desc['type'] == task + assert task_type_desc['backoff_factor'] == 1 diff --git a/version.txt b/version.txt index ec85cf3..2a89b44 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.62-0-g787c7a9 \ No newline at end of file +v0.0.63-0-g9358572 \ No newline at end of file