Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123677
D1940.id6583.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Subscribers
None
D1940.id6583.diff
View Options
diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py
--- a/swh/lister/bitbucket/tasks.py
+++ b/swh/lister/bitbucket/tasks.py
@@ -17,20 +17,21 @@
@app.task(name=__name__ + '.IncrementalBitBucketLister')
-def incremental_bitbucket_lister(**lister_args):
+def list_bitbucket_incremental(**lister_args):
+ '''Incremental update of the BitBucket forge'''
lister = new_lister(**lister_args)
lister.run(min_bound=lister.db_last_index(), max_bound=None)
@app.task(name=__name__ + '.RangeBitBucketLister')
-def range_bitbucket_lister(start, end, **lister_args):
+def _range_bitbucket_lister(start, end, **lister_args):
lister = new_lister(**lister_args)
lister.run(min_bound=start, max_bound=end)
@app.task(name=__name__ + '.FullBitBucketRelister', bind=True)
-def full_bitbucket_relister(self, split=None, **lister_args):
- """Relist from the beginning of what's already been listed.
+def list_bitbucket_full(self, split=None, **lister_args):
+ """Full update of the BitBucket forge
It's not to be called for an initial listing.
@@ -42,7 +43,7 @@
return
random.shuffle(ranges)
- promise = group(range_bitbucket_lister.s(minv, maxv, **lister_args)
+ promise = group(_range_bitbucket_lister.s(minv, maxv, **lister_args)
for minv, maxv in ranges)()
self.log.debug('%s OK (spawned %s subtasks)', (self.name, len(ranges)))
try:
@@ -53,5 +54,5 @@
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/cgit/tasks.py b/swh/lister/cgit/tasks.py
--- a/swh/lister/cgit/tasks.py
+++ b/swh/lister/cgit/tasks.py
@@ -8,10 +8,11 @@
@app.task(name=__name__ + '.CGitListerTask')
-def cgit_lister(**lister_args):
+def list_cgit(**lister_args):
+ '''Lister task for CGit instances'''
CGitLister(**lister_args).run()
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/cli.py b/swh/lister/cli.py
--- a/swh/lister/cli.py
+++ b/swh/lister/cli.py
@@ -3,14 +3,18 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import os
import logging
import pkg_resources
from copy import deepcopy
+from importlib import import_module
import click
from sqlalchemy import create_engine
from swh.core.cli import CONTEXT_SETTINGS
+from swh.scheduler import get_scheduler
+from swh.scheduler.task import SWHTask
from swh.lister.core.models import initialize
@@ -21,6 +25,25 @@
if entry_point.name.split('.', 1)[0] == 'lister'}
SUPPORTED_LISTERS = list(LISTERS)
+# the key in this dict is the suffix used to match new task-type to be added.
+# For example for a task which function name is "list_gitlab_full', the default
+# value used when inserting a new task-type in the scheduler db will be the one
+# under the 'full' key below (because it matches xxx_full).
+DEFAULT_TASK_TYPE = {
+ 'full': { # for tasks like 'list_xxx_full()'
+ 'default_interval': '90 days',
+ 'min_interval': '90 days',
+ 'max_interval': '90 days',
+ 'backoff_factor': 1
+ },
+ '*': { # value if not suffix matches
+ 'default_interval': '1 day',
+ 'min_interval': '1 day',
+ 'max_interval': '1 day',
+ 'backoff_factor': 1
+ },
+ }
+
def get_lister(lister_name, db_url=None, **conf):
"""Instantiate a lister given its name.
@@ -66,6 +89,8 @@
'cls': 'local',
'args': {'db': db_url}
}
+ if not config_file:
+ config_file = os.environ.get('SWH_CONFIG_FILENAME')
conf = config.read(config_file, override_conf)
ctx.obj['config'] = conf
ctx.obj['override_conf'] = override_conf
@@ -89,20 +114,97 @@
db_url = lister_cfg['args']['db']
db_engine = create_engine(db_url)
+ registry = {}
for lister, entrypoint in LISTERS.items():
logger.info('Loading lister %s', lister)
- registry_entry = entrypoint.load()()
+ registry[lister] = entrypoint.load()()
logger.info('Initializing database')
initialize(db_engine, drop_tables)
for lister, entrypoint in LISTERS.items():
+ registry_entry = registry[lister]
init_hook = registry_entry.get('init')
if callable(init_hook):
logger.info('Calling init hook for %s', lister)
init_hook(db_engine)
+@lister.command(name='register-task-types', context_settings=CONTEXT_SETTINGS)
+@click.option('--lister', '-l', 'listers', multiple=True,
+ default=('all', ), show_default=True,
+ help='Only registers task-types for these listers',
+ type=click.Choice(['all'] + SUPPORTED_LISTERS))
+@click.pass_context
+def register_task_types(ctx, listers):
+ """Insert missing task-type entries in the scheduler
+
+ According to declared tasks in each loaded lister plugin.
+ """
+
+ cfg = ctx.obj['config']
+ scheduler = get_scheduler(**cfg['scheduler'])
+
+ for lister, entrypoint in LISTERS.items():
+ if 'all' not in listers and lister not in listers:
+ continue
+ logger.info('Loading lister %s', lister)
+
+ registry_entry = entrypoint.load()()
+ for task_module in registry_entry['task_modules']:
+ mod = import_module(task_module)
+ for task_name in (x for x in dir(mod) if not x.startswith('_')):
+ taskobj = getattr(mod, task_name)
+ if isinstance(taskobj, SWHTask):
+ task_type = task_name.replace('_', '-')
+ task_cfg = registry_entry.get('task_types', {}).get(
+ task_type, {})
+ ensure_task_type(task_type, taskobj, task_cfg, scheduler)
+
+
+def ensure_task_type(task_type, swhtask, task_config, scheduler):
+ """Ensure a task-type is known by the scheduler
+
+ Args:
+ task_type (str): the type of the task to check/insert (correspond to
+ the 'type' field in the db)
+ swhtask (SWHTask): the SWHTask instance the task-type correspond to
+ task_config (dict): a dict with specific/overloaded values for the
+ task-type to be created
+ scheduler: the scheduler object used to access the scheduler db
+ """
+ for suffix, defaults in DEFAULT_TASK_TYPE.items():
+ if task_type.endswith('-' + suffix):
+ task_type_dict = defaults.copy()
+ break
+ else:
+ task_type_dict = DEFAULT_TASK_TYPE['*'].copy()
+
+ task_type_dict['type'] = task_type
+ task_type_dict['backend_name'] = swhtask.name
+ if swhtask.__doc__:
+ task_type_dict['description'] = swhtask.__doc__.splitlines()[0]
+
+ task_type_dict.update(task_config)
+
+ current_task_type = scheduler.get_task_type(task_type)
+ if current_task_type:
+ # check some stuff
+ if current_task_type['backend_name'] != task_type_dict['backend_name']:
+ logger.warning('Existing task type %s for lister %s has a '
+ 'different backend name than current '
+ 'code version provides (%s vs. %s)',
+ task_type,
+ lister,
+ current_task_type['backend_name'],
+ task_type_dict['backend_name'],
+ )
+ else:
+ logger.info('Create task type %s in scheduler', task_type)
+ logger.debug(' %s', task_type_dict)
+ scheduler.create_task_type(task_type_dict)
+
+
@lister.command(name='run', context_settings=CONTEXT_SETTINGS,
help='Trigger a full listing run for a particular forge '
'instance. The output of this listing results in '
diff --git a/swh/lister/cran/tasks.py b/swh/lister/cran/tasks.py
--- a/swh/lister/cran/tasks.py
+++ b/swh/lister/cran/tasks.py
@@ -8,10 +8,11 @@
@app.task(name=__name__ + '.CRANListerTask')
-def cran_lister(**lister_args):
+def list_cran(**lister_args):
+ '''Lister task for the CRAN registry'''
CRANLister(**lister_args).run()
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/debian/tasks.py b/swh/lister/debian/tasks.py
--- a/swh/lister/debian/tasks.py
+++ b/swh/lister/debian/tasks.py
@@ -8,10 +8,11 @@
@app.task(name=__name__ + '.DebianListerTask')
-def debian_lister(distribution, **lister_args):
+def list_debian_distribution(distribution, **lister_args):
+ '''List a Debian distribution'''
DebianLister(**lister_args).run(distribution)
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py
--- a/swh/lister/github/tasks.py
+++ b/swh/lister/github/tasks.py
@@ -17,20 +17,21 @@
@app.task(name=__name__ + '.IncrementalGitHubLister')
-def incremental_github_lister(**lister_args):
+def list_github_incremental(**lister_args):
+ 'Incremental update of GitHub'
lister = new_lister(**lister_args)
lister.run(min_bound=lister.db_last_index(), max_bound=None)
@app.task(name=__name__ + '.RangeGitHubLister')
-def range_github_lister(start, end, **lister_args):
+def _range_github_lister(start, end, **lister_args):
lister = new_lister(**lister_args)
lister.run(min_bound=start, max_bound=end)
@app.task(name=__name__ + '.FullGitHubRelister', bind=True)
-def full_github_relister(self, split=None, **lister_args):
- """Relist from the beginning of what's already been listed.
+def list_github_full(self, split=None, **lister_args):
+ """Full update of GitHub
It's not to be called for an initial listing.
@@ -41,7 +42,7 @@
self.log.info('Nothing to list')
return
random.shuffle(ranges)
- promise = group(range_github_lister.s(minv, maxv, **lister_args)
+ promise = group(_range_github_lister.s(minv, maxv, **lister_args)
for minv, maxv in ranges)()
self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
try:
@@ -52,5 +53,5 @@
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py
--- a/swh/lister/gitlab/tasks.py
+++ b/swh/lister/gitlab/tasks.py
@@ -22,7 +22,8 @@
@app.task(name=__name__ + '.IncrementalGitLabLister')
-def incremental_gitlab_lister(**lister_args):
+def list_gitlab_incremental(**lister_args):
+ """Incremental update of a GitLab instance"""
lister_args['sort'] = 'desc'
lister = new_lister(**lister_args)
total_pages = lister.get_pages_information()[1]
@@ -31,23 +32,19 @@
@app.task(name=__name__ + '.RangeGitLabLister')
-def range_gitlab_lister(start, end, **lister_args):
+def _range_gitlab_lister(start, end, **lister_args):
lister = new_lister(**lister_args)
lister.run(min_bound=start, max_bound=end)
@app.task(name=__name__ + '.FullGitLabRelister', bind=True)
-def full_gitlab_relister(self, **lister_args):
- """Full lister
-
- This should be renamed as such.
-
- """
+def list_gitlab_full(self, **lister_args):
+ """Full update of a GitLab instance"""
lister = new_lister(**lister_args)
_, total_pages, _ = lister.get_pages_information()
ranges = list(utils.split_range(total_pages, NBPAGES))
random.shuffle(ranges)
- promise = group(range_gitlab_lister.s(minv, maxv, **lister_args)
+ promise = group(_range_gitlab_lister.s(minv, maxv, **lister_args)
for minv, maxv in ranges)()
self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
try:
@@ -58,5 +55,5 @@
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/gnu/tasks.py b/swh/lister/gnu/tasks.py
--- a/swh/lister/gnu/tasks.py
+++ b/swh/lister/gnu/tasks.py
@@ -8,10 +8,11 @@
@app.task(name=__name__ + '.GNUListerTask')
-def gnu_lister(**lister_args):
+def list_gnu_full(**lister_args):
+ 'List lister for the GNU source code archive'
GNULister(**lister_args).run()
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/npm/__init__.py b/swh/lister/npm/__init__.py
--- a/swh/lister/npm/__init__.py
+++ b/swh/lister/npm/__init__.py
@@ -10,4 +10,11 @@
return {'models': [NpmVisitModel, NpmModel],
'lister': NpmLister,
'task_modules': ['%s.tasks' % __name__],
+ 'task_types': {
+ 'list-npm-full': {
+ 'default_interval': '7 days',
+ 'min_interval': '7 days',
+ 'max_interval': '7 days',
+ },
+ },
}
diff --git a/swh/lister/npm/tasks.py b/swh/lister/npm/tasks.py
--- a/swh/lister/npm/tasks.py
+++ b/swh/lister/npm/tasks.py
@@ -41,14 +41,16 @@
@app.task(name=__name__ + '.NpmListerTask')
-def npm_lister(**lister_args):
+def list_npm_full(**lister_args):
+ 'Full lister for the npm (javascript) registry'
lister = NpmLister(**lister_args)
with save_registry_state(lister):
lister.run()
@app.task(name=__name__ + '.NpmIncrementalListerTask')
-def npm_incremental_lister(**lister_args):
+def list_npm_incremental(**lister_args):
+ 'Incremental lister for the npm (javascript) registry'
lister = NpmIncrementalLister(**lister_args)
update_seq_start = get_last_update_seq(lister)
with save_registry_state(lister):
@@ -56,5 +58,5 @@
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/packagist/tasks.py b/swh/lister/packagist/tasks.py
--- a/swh/lister/packagist/tasks.py
+++ b/swh/lister/packagist/tasks.py
@@ -8,10 +8,11 @@
@app.task(name=__name__ + '.PackagistListerTask')
-def packagist_lister(**lister_args):
+def list_packagist(**lister_args):
+ 'List the packagist (php) registry'
PackagistLister(**lister_args).run()
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/phabricator/tasks.py b/swh/lister/phabricator/tasks.py
--- a/swh/lister/phabricator/tasks.py
+++ b/swh/lister/phabricator/tasks.py
@@ -7,10 +7,11 @@
@app.task(name=__name__ + '.FullPhabricatorLister')
-def full_phabricator_lister(**lister_args):
+def list_phabricator_full(**lister_args):
+ 'Full update of a Phabricator instance'
PhabricatorLister(**lister_args).run()
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/pypi/tasks.py b/swh/lister/pypi/tasks.py
--- a/swh/lister/pypi/tasks.py
+++ b/swh/lister/pypi/tasks.py
@@ -8,10 +8,11 @@
@app.task(name=__name__ + '.PyPIListerTask')
-def pypi_lister(**lister_args):
+def list_pypi(**lister_args):
+ 'Full update of the PyPI (python) registry'
PyPILister(**lister_args).run()
@app.task(name=__name__ + '.ping')
-def ping():
+def _ping():
return 'OK'
diff --git a/swh/lister/tests/test_cli.py b/swh/lister/tests/test_cli.py
--- a/swh/lister/tests/test_cli.py
+++ b/swh/lister/tests/test_cli.py
@@ -3,12 +3,43 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import glob
import pytest
+import traceback
+from datetime import timedelta
+
+import yaml
+
+from swh.core.utils import numfile_sortkey as sortkey
+from swh.scheduler import get_scheduler
+from swh.scheduler.tests.conftest import DUMP_FILES
from swh.lister.core.lister_base import ListerBase
-from swh.lister.cli import get_lister, SUPPORTED_LISTERS
+from swh.lister.cli import lister as cli, get_lister, SUPPORTED_LISTERS
from .test_utils import init_db
+from click.testing import CliRunner
+
+
+@pytest.fixture
+def swh_scheduler_config(request, postgresql_proc, postgresql):
+ scheduler_config = {
+ 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format(
+ host=postgresql_proc.host,
+ port=postgresql_proc.port,
+ user='postgres',
+ dbname='tests')
+ }
+
+ all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey)
+
+ cursor = postgresql.cursor()
+ for fname in all_dump_files:
+ with open(fname) as fobj:
+ cursor.execute(fobj.read())
+ postgresql.commit()
+
+ return scheduler_config
def test_get_lister_wrong_input():
@@ -64,3 +95,47 @@
assert url_key not in lst.config
assert 'priority' not in lst.config
assert 'oneshot' not in lst.config
+
+
+def test_task_types(swh_scheduler_config, tmp_path):
+ db_url = init_db().url()
+
+ configfile = tmp_path / 'config.yml'
+ configfile.write_text(yaml.dump({'scheduler': {
+ 'cls': 'local',
+ 'args': swh_scheduler_config}}))
+ runner = CliRunner()
+ result = runner.invoke(cli, [
+ '--db-url', db_url,
+ '--config-file', configfile.as_posix(),
+ 'register-task-types'])
+
+ assert result.exit_code == 0, traceback.print_exception(*result.exc_info)
+
+ scheduler = get_scheduler(cls='local', args=swh_scheduler_config)
+ all_tasks = [
+ 'list-bitbucket-full', 'list-bitbucket-incremental',
+ 'list-cran',
+ 'list-cgit',
+ 'list-debian-distribution',
+ 'list-gitlab-full', 'list-gitlab-incremental',
+ 'list-github-full', 'list-github-incremental',
+ 'list-gnu-full',
+ 'list-npm-full', 'list-npm-incremental',
+ 'list-phabricator-full',
+ 'list-packagist',
+ 'list-pypi',
+ ]
+ for task in all_tasks:
+ task_type_desc = scheduler.get_task_type(task)
+ assert task_type_desc
+ assert task_type_desc['type'] == task
+ assert task_type_desc['backoff_factor'] == 1
+
+ if task == 'list-npm-full':
+ delay = timedelta(days=7) # overloaded in the plugin registry
+ elif task.endswith('-full'):
+ delay = timedelta(days=90) # default value for 'full' lister tasks
+ else:
+ delay = timedelta(days=1) # default value for other lister tasks
+ assert task_type_desc['default_interval'] == delay, task
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 19 2024, 6:04 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216698
Attached To
D1940: plugins: add support for scheduler's task-type declaration
Event Timeline
Log In to Comment