diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py --- a/swh/lister/bitbucket/tasks.py +++ b/swh/lister/bitbucket/tasks.py @@ -2,26 +2,51 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.lister.core.tasks import (IndexingDiscoveryListerTask, - RangeListerTask, - IndexingRefreshListerTask, ListerTaskBase) +import random +from celery import group + +from swh.scheduler.celery_backend.config import app +from swh.scheduler.task import SWHTask from .lister import BitBucketLister +GROUP_SPLIT = 10000 + + +def new_lister(api_baseurl='https://api.bitbucket.org/2.0'): + return BitBucketLister(api_baseurl=api_baseurl) + -class BitBucketListerTask(ListerTaskBase): - def new_lister(self, *, api_baseurl='https://api.bitbucket.org/2.0'): - return BitBucketLister(api_baseurl=api_baseurl) +@app.task(name='swh.lister.bitbucket.tasks.IncrementalBitBucketLister', + base=SWHTask, bind=True) +def incremental_bitbucket_lister(self, **lister_args): + self.log.debug('%s, lister_args=%s' % ( + self.name, lister_args)) + lister = new_lister(**lister_args) + lister.run(min_bound=lister.db_last_index(), max_bound=None) + self.log.debug('%s OK' % (self.name)) -class IncrementalBitBucketLister(BitBucketListerTask, - IndexingDiscoveryListerTask): - task_queue = 'swh_lister_bitbucket_discover' +@app.task(name='swh.lister.bitbucket.tasks.RangeBitBucketLister', + base=SWHTask, bind=True) +def range_bitbucket_lister(self, start, end, **lister_args): + self.log.debug('%s(start=%s, end=%d), lister_args=%s' % ( + self.name, start, end, lister_args)) + lister = new_lister(**lister_args) + lister.run(min_bound=start, max_bound=end) + self.log.debug('%s OK' % (self.name)) -class RangeBitBucketLister(BitBucketListerTask, RangeListerTask): - task_queue = 'swh_lister_bitbucket_refresh' +@app.task(name='swh.lister.bitbucket.tasks.FullBitBucketRelister', + base=SWHTask, bind=True) +def full_bitbucket_relister(self, split=None, **lister_args): + self.log.debug('%s, lister_args=%s' % ( + self.name, lister_args)) + lister = new_lister(**lister_args) + ranges = lister.db_partition_indices(split or GROUP_SPLIT) + random.shuffle(ranges) + group(range_bitbucket_lister.s(minv, maxv, **lister_args) + for minv, maxv in ranges)() + self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) -class FullBitBucketRelister(BitBucketListerTask, IndexingRefreshListerTask): - task_queue = 'swh_lister_bitbucket_refresh' diff --git a/swh/lister/core/tasks.py b/swh/lister/core/tasks.py deleted file mode 100644 --- a/swh/lister/core/tasks.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright (C) 2017-2018 the Software Heritage developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import abc -import random - -from celery import group - -from swh.scheduler.task import Task - -from .abstractattribute import AbstractAttribute - - -class AbstractTaskMeta(abc.ABCMeta): - pass - - -class ListerTaskBase(Task, metaclass=AbstractTaskMeta): - """Lister Tasks define the process of periodically requesting batches of - repository information from source code hosting services. They - instantiate Listers to do batches of work at periodic intervals. - - There are two main kinds of lister tasks: - - 1. Discovering new repositories. - 2. Refreshing the list of already discovered repositories. - - If the hosting service is indexable (according to the requirements of - :class:`SWHIndexingLister`), then we can optionally partition the - set of known repositories into sub-sets to distribute the work. - - This means that there is a third possible Task type for Indexing - Listers: - - 3. Discover or refresh a specific range of indices. - - """ - task_queue = AbstractAttribute('Celery Task queue name') - - @abc.abstractmethod - def new_lister(self, **lister_args): - """Return a new lister of the appropriate type. - """ - pass - - @abc.abstractmethod - def run_task(self, *, lister_args=None): - pass - - -# Paging/Indexing lister tasks derivatives -# (cf. {github/bitbucket/gitlab}/tasks) - - -class RangeListerTask(ListerTaskBase): - """Range lister task. - - """ - def run_task(self, start, end, lister_args=None): - if lister_args is None: - lister_args = {} - lister = self.new_lister(**lister_args) - return lister.run(min_bound=start, max_bound=end) - - -# Indexing Lister tasks derivatives (cf. {github/bitbucket}/tasks) - - -class IndexingDiscoveryListerTask(ListerTaskBase): - """Incremental indexing lister task. - - """ - def run_task(self, *, lister_args=None): - if lister_args is None: - lister_args = {} - lister = self.new_lister(**lister_args) - return lister.run(min_bound=lister.db_last_index(), max_bound=None) - - -class IndexingRefreshListerTask(ListerTaskBase): - """Full indexing lister task. - - """ - GROUP_SPLIT = 10000 - - def run_task(self, *, lister_args=None): - if lister_args is None: - lister_args = {} - lister = self.new_lister(**lister_args) - ranges = lister.db_partition_indices(self.GROUP_SPLIT) - random.shuffle(ranges) - range_task = RangeListerTask() - group(range_task.s(minv, maxv, lister_args) - for minv, maxv in ranges)() diff --git a/swh/lister/debian/tasks.py b/swh/lister/debian/tasks.py --- a/swh/lister/debian/tasks.py +++ b/swh/lister/debian/tasks.py @@ -2,17 +2,16 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.lister.core.tasks import ListerTaskBase +from swh.scheduler.celery_backend.config import app +from swh.scheduler.task import SWHTask from .lister import DebianLister -class DebianListerTask(ListerTaskBase): - task_queue = 'swh_lister_debian' - - def new_lister(self): - return DebianLister() - - def run_task(self, distribution): - lister = self.new_lister() - return lister.run(distribution) +@app.task(name='swh.lister.debian.tasks.DebianListerTask', + base=SWHTask, bind=True) +def debian_lister(self, distribution, **lister_args): + self.log.debug('%s, lister_args=%s' % ( + self.name, lister_args)) + DebianLister(**lister_args).run(distribution) + self.log.debug('%s OK' % (self.name)) diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py --- a/swh/lister/github/tasks.py +++ b/swh/lister/github/tasks.py @@ -2,25 +2,52 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.lister.core.tasks import (IndexingDiscoveryListerTask, - RangeListerTask, - IndexingRefreshListerTask, ListerTaskBase) +import random -from .lister import GitHubLister +from celery import group +from swh.scheduler.celery_backend.config import app +from swh.scheduler.task import SWHTask -class GitHubListerTask(ListerTaskBase): - def new_lister(self, *, api_baseurl='https://api.github.com'): - return GitHubLister(api_baseurl=api_baseurl) +from swh.lister.github.lister import GitHubLister +GROUP_SPLIT = 10000 -class IncrementalGitHubLister(GitHubListerTask, IndexingDiscoveryListerTask): - task_queue = 'swh_lister_github_discover' +def new_lister(api_baseurl='https://api.github.com', **kw): + return GitHubLister(api_baseurl=api_baseurl, **kw) -class RangeGitHubLister(GitHubListerTask, RangeListerTask): - task_queue = 'swh_lister_github_refresh' + +@app.task(name='swh.lister.github.tasks.IncrementalGitHubLister', + base=SWHTask, bind=True) +def incremental_github_lister(self, **lister_args): + self.log.debug('%s, lister_args=%s' % ( + self.name, lister_args)) + lister = new_lister(**lister_args) + lister.run(min_bound=lister.db_last_index(), max_bound=None) + self.log.debug('%s OK' % (self.name)) + + +@app.task(name='swh.lister.github.tasks.RangeGitHubLister', + base=SWHTask, bind=True) +def range_github_lister(self, start, end, **lister_args): + self.log.debug('%s(start=%s, end=%d), lister_args=%s' % ( + self.name, start, end, lister_args)) + lister = new_lister(**lister_args) + lister.run(min_bound=start, max_bound=end) + self.log.debug('%s OK' % (self.name)) + + +@app.task(name='swh.lister.github.tasks.FullGitHubRelister', + base=SWHTask, bind=True) +def full_github_relister(self, split=None, **lister_args): + self.log.debug('%s, lister_args=%s' % ( + self.name, lister_args)) + lister = new_lister(**lister_args) + ranges = lister.db_partition_indices(split or GROUP_SPLIT) + random.shuffle(ranges) + group(range_github_lister.s(minv, maxv, **lister_args) + for minv, maxv in ranges)() + self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) -class FullGitHubRelister(GitHubListerTask, IndexingRefreshListerTask): - task_queue = 'swh_lister_github_refresh' diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py --- a/swh/lister/gitlab/tasks.py +++ b/swh/lister/gitlab/tasks.py @@ -6,58 +6,54 @@ from celery import group +from swh.scheduler.celery_backend.config import app +from swh.scheduler.task import SWHTask + from .. import utils -from ..core.tasks import ListerTaskBase, RangeListerTask from .lister import GitLabLister -class GitLabListerTask(ListerTaskBase): - def new_lister(self, *, api_baseurl='https://gitlab.com/api/v4', - instance='gitlab', sort='asc', per_page=20): - return GitLabLister( - api_baseurl=api_baseurl, instance=instance, sort=sort) - - -class RangeGitLabLister(GitLabListerTask, RangeListerTask): - """Range GitLab lister (list available origins on specified range) - - """ - task_queue = 'swh_lister_gitlab_refresh' - - -class FullGitLabRelister(GitLabListerTask): - """Full GitLab lister (list all available origins from the api). - - """ - task_queue = 'swh_lister_gitlab_refresh' - - # nb pages - nb_pages = 10 - - def run_task(self, lister_args=None): - if lister_args is None: - lister_args = {} - lister = self.new_lister(**lister_args) - _, total_pages, _ = lister.get_pages_information() - ranges = list(utils.split_range(total_pages, self.nb_pages)) - random.shuffle(ranges) - range_task = RangeGitLabLister() - group(range_task.s(minv, maxv, lister_args=lister_args) - for minv, maxv in ranges)() - - -class IncrementalGitLabLister(GitLabListerTask): - """Incremental GitLab lister (list only new available origins). - - """ - task_queue = 'swh_lister_gitlab_discover' - - def run_task(self, lister_args=None): - if lister_args is None: - lister_args = {} - lister_args['sort'] = 'desc' - lister = self.new_lister(**lister_args) - _, total_pages, _ = lister.get_pages_information() - # stopping as soon as existing origins for that instance are detected - return lister.run(min_bound=1, max_bound=total_pages, - check_existence=True) +NBPAGES = 10 + + +def new_lister(api_baseurl='https://gitlab.com/api/v4', + instance='gitlab', sort='asc', per_page=20): + return GitLabLister( + api_baseurl=api_baseurl, instance=instance, sort=sort) + + +@app.task(name='swh.lister.gitlab.tasks.IncrementalGitLabLister', + base=SWHTask, bind=True) +def incremental_gitlab_lister(self, **lister_args): + self.log.debug('%s, lister_args=%s' % ( + self.name, lister_args)) + lister_args['sort'] = 'desc' + lister = new_lister(**lister_args) + total_pages = lister.get_pages_information()[1] + # stopping as soon as existing origins for that instance are detected + lister.run(min_bound=1, max_bound=total_pages, check_existence=True) + self.log.debug('%s OK' % (self.name)) + + +@app.task(name='swh.lister.gitlab.tasks.RangeGitLabLister', + base=SWHTask, bind=True) +def range_gitlab_lister(self, start, end, **lister_args): + self.log.debug('%s(start=%s, end=%d), lister_args=%s' % ( + self.name, start, end, lister_args)) + lister = new_lister(**lister_args) + lister.run(min_bound=start, max_bound=end) + self.log.debug('%s OK' % (self.name)) + + +@app.task(name='swh.lister.gitlab.tasks.FullGitLabRelister', + base=SWHTask, bind=True) +def full_gitlab_relister(self, **lister_args): + self.log.debug('%s, lister_args=%s' % ( + self.name, lister_args)) + lister = new_lister(**lister_args) + _, total_pages, _ = lister.get_pages_information() + ranges = list(utils.split_range(total_pages, NBPAGES)) + random.shuffle(ranges) + group(range_gitlab_lister.s(minv, maxv, **lister_args) + for minv, maxv in ranges)() + self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) diff --git a/swh/lister/npm/tasks.py b/swh/lister/npm/tasks.py --- a/swh/lister/npm/tasks.py +++ b/swh/lister/npm/tasks.py @@ -3,75 +3,62 @@ # See top-level LICENSE file for more information from datetime import datetime +from contextlib import contextmanager + +from swh.scheduler.celery_backend.config import app +from swh.scheduler.task import SWHTask -from swh.lister.core.tasks import ListerTaskBase from swh.lister.npm.lister import NpmLister, NpmIncrementalLister from swh.lister.npm.models import NpmVisitModel -class _NpmListerTaskBase(ListerTaskBase): - - task_queue = 'swh_lister_npm_refresh' - - def _save_registry_state(self): - """Query the root endpoint from the npm registry and - backup values of interest for future listing - """ - params = {'headers': self.lister.request_headers()} - registry_state = \ - self.lister.session.get(self.lister.api_baseurl, **params) - registry_state = registry_state.json() - self.registry_state = { - 'visit_date': datetime.now(), - } - for key in ('doc_count', 'doc_del_count', 'update_seq', 'purge_seq', - 'disk_size', 'data_size', 'committed_update_seq', - 'compacted_seq'): - self.registry_state[key] = registry_state[key] +@contextmanager +def save_registry_state(lister): + params = {'headers': lister.request_headers()} + registry_state = lister.session.get(lister.api_baseurl, **params) + registry_state = registry_state.json() + keys = ('doc_count', 'doc_del_count', 'update_seq', 'purge_seq', + 'disk_size', 'data_size', 'committed_update_seq', + 'compacted_seq') - def _store_registry_state(self): - """Store the backup npm registry state to database. - """ - npm_visit = NpmVisitModel(**self.registry_state) - self.lister.db_session.add(npm_visit) - self.lister.db_session.commit() + state = {key: registry_state[key] for key in keys} + state['visit_date'] = datetime.now() + yield + npm_visit = NpmVisitModel(state) + lister.db_session.add(npm_visit) + lister.db_session.commit() -class NpmListerTask(_NpmListerTaskBase): - """Full npm lister (list all available packages from the npm registry) +def get_last_update_seq(lister): + """Get latest ``update_seq`` value for listing only updated packages. """ - - def new_lister(self): - return NpmLister() - - def run_task(self): - self.lister = self.new_lister() - self._save_registry_state() - self.lister.run() - self._store_registry_state() - - -class NpmIncrementalListerTask(_NpmListerTaskBase): - """Incremental npm lister (list all updated packages since the last listing) - """ - - def new_lister(self): - return NpmIncrementalLister() - - def run_task(self): - self.lister = self.new_lister() - update_seq_start = self._get_last_update_seq() - self._save_registry_state() - self.lister.run(min_bound=update_seq_start) - self._store_registry_state() - - def _get_last_update_seq(self): - """Get latest ``update_seq`` value for listing only updated packages. - """ - query = self.lister.db_session.query(NpmVisitModel.update_seq) - row = query.order_by(NpmVisitModel.uid.desc()).first() - if not row: - raise ValueError('No npm registry listing previously performed ! ' - 'This is required prior to the execution of an ' - 'incremental listing.') - return row[0] + query = lister.db_session.query(NpmVisitModel.update_seq) + row = query.order_by(NpmVisitModel.uid.desc()).first() + if not row: + raise ValueError('No npm registry listing previously performed ! ' + 'This is required prior to the execution of an ' + 'incremental listing.') + return row[0] + + +@app.task(name='swh.lister.npm.tasks.NpmListerTask', + base=SWHTask, bind=True) +def npm_lister(self, **lister_args): + self.log.debug('%s, lister_args=%s' % ( + self.name, lister_args)) + lister = NpmLister(**lister_args) + with save_registry_state(lister): + lister.run() + self.log.debug('%s OK' % (self.name)) + + +@app.task(name='swh.lister.npm.tasks.NpmIncrementalListerTask', + base=SWHTask, bind=True) +def npm_incremental_lister(self, **lister_args): + self.log.debug('%s, lister_args=%s' % ( + self.name, lister_args)) + lister = NpmIncrementalLister(**lister_args) + update_seq_start = get_last_update_seq(lister) + with save_registry_state(lister): + lister.run(min_bound=update_seq_start) + self.log.debug('%s OK' % (self.name)) diff --git a/swh/lister/pypi/tasks.py b/swh/lister/pypi/tasks.py --- a/swh/lister/pypi/tasks.py +++ b/swh/lister/pypi/tasks.py @@ -2,19 +2,18 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from ..core.tasks import ListerTaskBase -from .lister import PyPILister +from swh.scheduler.task import SWHTask +from swh.scheduler.celery_backend.config import app +from .lister import PyPILister -class PyPIListerTask(ListerTaskBase): - """Full PyPI lister (list all available origins from the api). - """ - task_queue = 'swh_lister_pypi_refresh' +@app.task(name='swh.lister.pypi.tasks.PyPIListerTask', + base=SWHTask, bind=True) +def pypi_lister(self, **lister_args): + self.log.debug('%s(), lister_args=%s' % ( + self.name, lister_args)) + PyPILister(**lister_args).run() + self.log.debug('%s OK' % (self.name)) - def new_lister(self): - return PyPILister() - def run_task(self): - lister = self.new_lister() - lister.run()