Page MenuHomeSoftware Heritage

D1940.id6583.diff
No OneTemporary

D1940.id6583.diff

diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py
--- a/swh/lister/bitbucket/tasks.py
+++ b/swh/lister/bitbucket/tasks.py
@@ -17,20 +17,21 @@
@app.task(name=__name__ + '.IncrementalBitBucketLister')
-def incremental_bitbucket_lister(**lister_args):
+def list_bitbucket_incremental(**lister_args):
+ '''Incremental update of the BitBucket forge'''
lister = new_lister(**lister_args)
lister.run(min_bound=lister.db_last_index(), max_bound=None)
@app.task(name=__name__ + '.RangeBitBucketLister')
-def range_bitbucket_lister(start, end, **lister_args):
+def _range_bitbucket_lister(start, end, **lister_args):
lister = new_lister(**lister_args)
lister.run(min_bound=start, max_bound=end)
@app.task(name=__name__ + '.FullBitBucketRelister', bind=True)
-def full_bitbucket_relister(self, split=None, **lister_args):
- """Relist from the beginning of what's already been listed.
+def list_bitbucket_full(self, split=None, **lister_args):
+ """Full update of the BitBucket forge
It's not to be called for an initial listing.
@@ -42,7 +43,7 @@
return
random.shuffle(ranges)
- promise = group(range_bitbucket_lister.s(minv, maxv, **lister_args)
+ promise = group(_range_bitbucket_lister.s(minv, maxv, **lister_args)
for minv, maxv in ranges)()
self.log.debug('%s OK (spawned %s subtasks)', (self.name, len(ranges)))
try:
@@ -53,5 +54,5 @@
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/cgit/tasks.py b/swh/lister/cgit/tasks.py
--- a/swh/lister/cgit/tasks.py
+++ b/swh/lister/cgit/tasks.py
@@ -8,10 +8,11 @@
@app.task(name=__name__ + '.CGitListerTask')
-def cgit_lister(**lister_args):
+def list_cgit(**lister_args):
+ '''Lister task for CGit instances'''
CGitLister(**lister_args).run()
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/cli.py b/swh/lister/cli.py
--- a/swh/lister/cli.py
+++ b/swh/lister/cli.py
@@ -3,14 +3,18 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import os
import logging
import pkg_resources
from copy import deepcopy
+from importlib import import_module
import click
from sqlalchemy import create_engine
from swh.core.cli import CONTEXT_SETTINGS
+from swh.scheduler import get_scheduler
+from swh.scheduler.task import SWHTask
from swh.lister.core.models import initialize
@@ -21,6 +25,25 @@
if entry_point.name.split('.', 1)[0] == 'lister'}
SUPPORTED_LISTERS = list(LISTERS)
+# the key in this dict is the suffix used to match new task-type to be added.
+# For example for a task which function name is "list_gitlab_full', the default
+# value used when inserting a new task-type in the scheduler db will be the one
+# under the 'full' key below (because it matches xxx_full).
+DEFAULT_TASK_TYPE = {
+ 'full': { # for tasks like 'list_xxx_full()'
+ 'default_interval': '90 days',
+ 'min_interval': '90 days',
+ 'max_interval': '90 days',
+ 'backoff_factor': 1
+ },
+ '*': { # value if not suffix matches
+ 'default_interval': '1 day',
+ 'min_interval': '1 day',
+ 'max_interval': '1 day',
+ 'backoff_factor': 1
+ },
+ }
+
def get_lister(lister_name, db_url=None, **conf):
"""Instantiate a lister given its name.
@@ -66,6 +89,8 @@
'cls': 'local',
'args': {'db': db_url}
}
+ if not config_file:
+ config_file = os.environ.get('SWH_CONFIG_FILENAME')
conf = config.read(config_file, override_conf)
ctx.obj['config'] = conf
ctx.obj['override_conf'] = override_conf
@@ -89,20 +114,97 @@
db_url = lister_cfg['args']['db']
db_engine = create_engine(db_url)
+ registry = {}
for lister, entrypoint in LISTERS.items():
logger.info('Loading lister %s', lister)
- registry_entry = entrypoint.load()()
+ registry[lister] = entrypoint.load()()
logger.info('Initializing database')
initialize(db_engine, drop_tables)
for lister, entrypoint in LISTERS.items():
+ registry_entry = registry[lister]
init_hook = registry_entry.get('init')
if callable(init_hook):
logger.info('Calling init hook for %s', lister)
init_hook(db_engine)
+@lister.command(name='register-task-types', context_settings=CONTEXT_SETTINGS)
+@click.option('--lister', '-l', 'listers', multiple=True,
+ default=('all', ), show_default=True,
+ help='Only registers task-types for these listers',
+ type=click.Choice(['all'] + SUPPORTED_LISTERS))
+@click.pass_context
+def register_task_types(ctx, listers):
+ """Insert missing task-type entries in the scheduler
+
+ According to declared tasks in each loaded lister plugin.
+ """
+
+ cfg = ctx.obj['config']
+ scheduler = get_scheduler(**cfg['scheduler'])
+
+ for lister, entrypoint in LISTERS.items():
+ if 'all' not in listers and lister not in listers:
+ continue
+ logger.info('Loading lister %s', lister)
+
+ registry_entry = entrypoint.load()()
+ for task_module in registry_entry['task_modules']:
+ mod = import_module(task_module)
+ for task_name in (x for x in dir(mod) if not x.startswith('_')):
+ taskobj = getattr(mod, task_name)
+ if isinstance(taskobj, SWHTask):
+ task_type = task_name.replace('_', '-')
+ task_cfg = registry_entry.get('task_types', {}).get(
+ task_type, {})
+ ensure_task_type(task_type, taskobj, task_cfg, scheduler)
+
+
+def ensure_task_type(task_type, swhtask, task_config, scheduler):
+ """Ensure a task-type is known by the scheduler
+
+ Args:
+ task_type (str): the type of the task to check/insert (correspond to
+ the 'type' field in the db)
+ swhtask (SWHTask): the SWHTask instance the task-type correspond to
+ task_config (dict): a dict with specific/overloaded values for the
+ task-type to be created
+ scheduler: the scheduler object used to access the scheduler db
+ """
+ for suffix, defaults in DEFAULT_TASK_TYPE.items():
+ if task_type.endswith('-' + suffix):
+ task_type_dict = defaults.copy()
+ break
+ else:
+ task_type_dict = DEFAULT_TASK_TYPE['*'].copy()
+
+ task_type_dict['type'] = task_type
+ task_type_dict['backend_name'] = swhtask.name
+ if swhtask.__doc__:
+ task_type_dict['description'] = swhtask.__doc__.splitlines()[0]
+
+ task_type_dict.update(task_config)
+
+ current_task_type = scheduler.get_task_type(task_type)
+ if current_task_type:
+ # check some stuff
+ if current_task_type['backend_name'] != task_type_dict['backend_name']:
+ logger.warning('Existing task type %s for lister %s has a '
+ 'different backend name than current '
+ 'code version provides (%s vs. %s)',
+ task_type,
+ lister,
+ current_task_type['backend_name'],
+ task_type_dict['backend_name'],
+ )
+ else:
+ logger.info('Create task type %s in scheduler', task_type)
+ logger.debug(' %s', task_type_dict)
+ scheduler.create_task_type(task_type_dict)
+
+
@lister.command(name='run', context_settings=CONTEXT_SETTINGS,
help='Trigger a full listing run for a particular forge '
'instance. The output of this listing results in '
diff --git a/swh/lister/cran/tasks.py b/swh/lister/cran/tasks.py
--- a/swh/lister/cran/tasks.py
+++ b/swh/lister/cran/tasks.py
@@ -8,10 +8,11 @@
@app.task(name=__name__ + '.CRANListerTask')
-def cran_lister(**lister_args):
+def list_cran(**lister_args):
+ '''Lister task for the CRAN registry'''
CRANLister(**lister_args).run()
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/debian/tasks.py b/swh/lister/debian/tasks.py
--- a/swh/lister/debian/tasks.py
+++ b/swh/lister/debian/tasks.py
@@ -8,10 +8,11 @@
@app.task(name=__name__ + '.DebianListerTask')
-def debian_lister(distribution, **lister_args):
+def list_debian_distribution(distribution, **lister_args):
+ '''List a Debian distribution'''
DebianLister(**lister_args).run(distribution)
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py
--- a/swh/lister/github/tasks.py
+++ b/swh/lister/github/tasks.py
@@ -17,20 +17,21 @@
@app.task(name=__name__ + '.IncrementalGitHubLister')
-def incremental_github_lister(**lister_args):
+def list_github_incremental(**lister_args):
+ 'Incremental update of GitHub'
lister = new_lister(**lister_args)
lister.run(min_bound=lister.db_last_index(), max_bound=None)
@app.task(name=__name__ + '.RangeGitHubLister')
-def range_github_lister(start, end, **lister_args):
+def _range_github_lister(start, end, **lister_args):
lister = new_lister(**lister_args)
lister.run(min_bound=start, max_bound=end)
@app.task(name=__name__ + '.FullGitHubRelister', bind=True)
-def full_github_relister(self, split=None, **lister_args):
- """Relist from the beginning of what's already been listed.
+def list_github_full(self, split=None, **lister_args):
+ """Full update of GitHub
It's not to be called for an initial listing.
@@ -41,7 +42,7 @@
self.log.info('Nothing to list')
return
random.shuffle(ranges)
- promise = group(range_github_lister.s(minv, maxv, **lister_args)
+ promise = group(_range_github_lister.s(minv, maxv, **lister_args)
for minv, maxv in ranges)()
self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
try:
@@ -52,5 +53,5 @@
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py
--- a/swh/lister/gitlab/tasks.py
+++ b/swh/lister/gitlab/tasks.py
@@ -22,7 +22,8 @@
@app.task(name=__name__ + '.IncrementalGitLabLister')
-def incremental_gitlab_lister(**lister_args):
+def list_gitlab_incremental(**lister_args):
+ """Incremental update of a GitLab instance"""
lister_args['sort'] = 'desc'
lister = new_lister(**lister_args)
total_pages = lister.get_pages_information()[1]
@@ -31,23 +32,19 @@
@app.task(name=__name__ + '.RangeGitLabLister')
-def range_gitlab_lister(start, end, **lister_args):
+def _range_gitlab_lister(start, end, **lister_args):
lister = new_lister(**lister_args)
lister.run(min_bound=start, max_bound=end)
@app.task(name=__name__ + '.FullGitLabRelister', bind=True)
-def full_gitlab_relister(self, **lister_args):
- """Full lister
-
- This should be renamed as such.
-
- """
+def list_gitlab_full(self, **lister_args):
+ """Full update of a GitLab instance"""
lister = new_lister(**lister_args)
_, total_pages, _ = lister.get_pages_information()
ranges = list(utils.split_range(total_pages, NBPAGES))
random.shuffle(ranges)
- promise = group(range_gitlab_lister.s(minv, maxv, **lister_args)
+ promise = group(_range_gitlab_lister.s(minv, maxv, **lister_args)
for minv, maxv in ranges)()
self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
try:
@@ -58,5 +55,5 @@
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/gnu/tasks.py b/swh/lister/gnu/tasks.py
--- a/swh/lister/gnu/tasks.py
+++ b/swh/lister/gnu/tasks.py
@@ -8,10 +8,11 @@
@app.task(name=__name__ + '.GNUListerTask')
-def gnu_lister(**lister_args):
+def list_gnu_full(**lister_args):
+ 'List lister for the GNU source code archive'
GNULister(**lister_args).run()
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/npm/__init__.py b/swh/lister/npm/__init__.py
--- a/swh/lister/npm/__init__.py
+++ b/swh/lister/npm/__init__.py
@@ -10,4 +10,11 @@
return {'models': [NpmVisitModel, NpmModel],
'lister': NpmLister,
'task_modules': ['%s.tasks' % __name__],
+ 'task_types': {
+ 'list-npm-full': {
+ 'default_interval': '7 days',
+ 'min_interval': '7 days',
+ 'max_interval': '7 days',
+ },
+ },
}
diff --git a/swh/lister/npm/tasks.py b/swh/lister/npm/tasks.py
--- a/swh/lister/npm/tasks.py
+++ b/swh/lister/npm/tasks.py
@@ -41,14 +41,16 @@
@app.task(name=__name__ + '.NpmListerTask')
-def npm_lister(**lister_args):
+def list_npm_full(**lister_args):
+ 'Full lister for the npm (javascript) registry'
lister = NpmLister(**lister_args)
with save_registry_state(lister):
lister.run()
@app.task(name=__name__ + '.NpmIncrementalListerTask')
-def npm_incremental_lister(**lister_args):
+def list_npm_incremental(**lister_args):
+ 'Incremental lister for the npm (javascript) registry'
lister = NpmIncrementalLister(**lister_args)
update_seq_start = get_last_update_seq(lister)
with save_registry_state(lister):
@@ -56,5 +58,5 @@
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/packagist/tasks.py b/swh/lister/packagist/tasks.py
--- a/swh/lister/packagist/tasks.py
+++ b/swh/lister/packagist/tasks.py
@@ -8,10 +8,11 @@
@app.task(name=__name__ + '.PackagistListerTask')
-def packagist_lister(**lister_args):
+def list_packagist(**lister_args):
+ 'List the packagist (php) registry'
PackagistLister(**lister_args).run()
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/phabricator/tasks.py b/swh/lister/phabricator/tasks.py
--- a/swh/lister/phabricator/tasks.py
+++ b/swh/lister/phabricator/tasks.py
@@ -7,10 +7,11 @@
@app.task(name=__name__ + '.FullPhabricatorLister')
-def full_phabricator_lister(**lister_args):
+def list_phabricator_full(**lister_args):
+ 'Full update of a Phabricator instance'
PhabricatorLister(**lister_args).run()
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/pypi/tasks.py b/swh/lister/pypi/tasks.py
--- a/swh/lister/pypi/tasks.py
+++ b/swh/lister/pypi/tasks.py
@@ -8,10 +8,11 @@
@app.task(name=__name__ + '.PyPIListerTask')
-def pypi_lister(**lister_args):
+def list_pypi(**lister_args):
+ 'Full update of the PyPI (python) registry'
PyPILister(**lister_args).run()
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/tests/test_cli.py b/swh/lister/tests/test_cli.py
--- a/swh/lister/tests/test_cli.py
+++ b/swh/lister/tests/test_cli.py
@@ -3,12 +3,43 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import glob
import pytest
+import traceback
+from datetime import timedelta
+
+import yaml
+
+from swh.core.utils import numfile_sortkey as sortkey
+from swh.scheduler import get_scheduler
+from swh.scheduler.tests.conftest import DUMP_FILES
from swh.lister.core.lister_base import ListerBase
-from swh.lister.cli import get_lister, SUPPORTED_LISTERS
+from swh.lister.cli import lister as cli, get_lister, SUPPORTED_LISTERS
from .test_utils import init_db
+from click.testing import CliRunner
+
+
+@pytest.fixture
+def swh_scheduler_config(request, postgresql_proc, postgresql):
+ scheduler_config = {
+ 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format(
+ host=postgresql_proc.host,
+ port=postgresql_proc.port,
+ user='postgres',
+ dbname='tests')
+ }
+
+ all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey)
+
+ cursor = postgresql.cursor()
+ for fname in all_dump_files:
+ with open(fname) as fobj:
+ cursor.execute(fobj.read())
+ postgresql.commit()
+
+ return scheduler_config
def test_get_lister_wrong_input():
@@ -64,3 +95,47 @@
assert url_key not in lst.config
assert 'priority' not in lst.config
assert 'oneshot' not in lst.config
+
+
+def test_task_types(swh_scheduler_config, tmp_path):
+ db_url = init_db().url()
+
+ configfile = tmp_path / 'config.yml'
+ configfile.write_text(yaml.dump({'scheduler': {
+ 'cls': 'local',
+ 'args': swh_scheduler_config}}))
+ runner = CliRunner()
+ result = runner.invoke(cli, [
+ '--db-url', db_url,
+ '--config-file', configfile.as_posix(),
+ 'register-task-types'])
+
+ assert result.exit_code == 0, traceback.print_exception(*result.exc_info)
+
+ scheduler = get_scheduler(cls='local', args=swh_scheduler_config)
+ all_tasks = [
+ 'list-bitbucket-full', 'list-bitbucket-incremental',
+ 'list-cran',
+ 'list-cgit',
+ 'list-debian-distribution',
+ 'list-gitlab-full', 'list-gitlab-incremental',
+ 'list-github-full', 'list-github-incremental',
+ 'list-gnu-full',
+ 'list-npm-full', 'list-npm-incremental',
+ 'list-phabricator-full',
+ 'list-packagist',
+ 'list-pypi',
+ ]
+ for task in all_tasks:
+ task_type_desc = scheduler.get_task_type(task)
+ assert task_type_desc
+ assert task_type_desc['type'] == task
+ assert task_type_desc['backoff_factor'] == 1
+
+ if task == 'list-npm-full':
+ delay = timedelta(days=7) # overloaded in the plugin registry
+ elif task.endswith('-full'):
+ delay = timedelta(days=90) # default value for 'full' lister tasks
+ else:
+ delay = timedelta(days=1) # default value for other lister tasks
+ assert task_type_desc['default_interval'] == delay, task

File Metadata

Mime Type
text/plain
Expires
Dec 19 2024, 6:04 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216698

Event Timeline