diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py index 4d18213..841d6cc 100644 --- a/swh/lister/bitbucket/tasks.py +++ b/swh/lister/bitbucket/tasks.py @@ -1,52 +1,57 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.scheduler.celery_backend.config import app from swh.scheduler.task import SWHTask from .lister import BitBucketLister GROUP_SPLIT = 10000 def new_lister(api_baseurl='https://api.bitbucket.org/2.0'): return BitBucketLister(api_baseurl=api_baseurl) @app.task(name='swh.lister.bitbucket.tasks.IncrementalBitBucketLister', base=SWHTask, bind=True) def incremental_bitbucket_lister(self, **lister_args): self.log.debug('%s, lister_args=%s' % ( self.name, lister_args)) lister = new_lister(**lister_args) lister.run(min_bound=lister.db_last_index(), max_bound=None) self.log.debug('%s OK' % (self.name)) @app.task(name='swh.lister.bitbucket.tasks.RangeBitBucketLister', base=SWHTask, bind=True) def range_bitbucket_lister(self, start, end, **lister_args): self.log.debug('%s(start=%s, end=%d), lister_args=%s' % ( self.name, start, end, lister_args)) lister = new_lister(**lister_args) lister.run(min_bound=start, max_bound=end) self.log.debug('%s OK' % (self.name)) @app.task(name='swh.lister.bitbucket.tasks.FullBitBucketRelister', base=SWHTask, bind=True) def full_bitbucket_relister(self, split=None, **lister_args): self.log.debug('%s, lister_args=%s' % ( self.name, lister_args)) lister = new_lister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) random.shuffle(ranges) group(range_bitbucket_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) +@app.task(name='swh.lister.bitbucket.tasks.ping', + base=SWHTask, bind=True) +def ping(self): + self.log.debug(self.name) + return 'OK' diff --git a/swh/lister/debian/tasks.py b/swh/lister/debian/tasks.py index a922195..875551f 100644 --- a/swh/lister/debian/tasks.py +++ b/swh/lister/debian/tasks.py @@ -1,17 +1,24 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.celery_backend.config import app from swh.scheduler.task import SWHTask from .lister import DebianLister @app.task(name='swh.lister.debian.tasks.DebianListerTask', base=SWHTask, bind=True) def debian_lister(self, distribution, **lister_args): self.log.debug('%s, lister_args=%s' % ( self.name, lister_args)) DebianLister(**lister_args).run(distribution) self.log.debug('%s OK' % (self.name)) + + +@app.task(name='swh.lister.debian.tasks.ping', + base=SWHTask, bind=True) +def ping(self): + self.log.debug(self.name) + return 'OK' diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py index fdc7c39..eda4f51 100644 --- a/swh/lister/github/tasks.py +++ b/swh/lister/github/tasks.py @@ -1,53 +1,58 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.scheduler.celery_backend.config import app from swh.scheduler.task import SWHTask from swh.lister.github.lister import GitHubLister GROUP_SPLIT = 10000 def new_lister(api_baseurl='https://api.github.com', **kw): return GitHubLister(api_baseurl=api_baseurl, **kw) @app.task(name='swh.lister.github.tasks.IncrementalGitHubLister', base=SWHTask, bind=True) def incremental_github_lister(self, **lister_args): self.log.debug('%s, lister_args=%s' % ( self.name, lister_args)) lister = new_lister(**lister_args) lister.run(min_bound=lister.db_last_index(), max_bound=None) self.log.debug('%s OK' % (self.name)) @app.task(name='swh.lister.github.tasks.RangeGitHubLister', base=SWHTask, bind=True) def range_github_lister(self, start, end, **lister_args): self.log.debug('%s(start=%s, end=%d), lister_args=%s' % ( self.name, start, end, lister_args)) lister = new_lister(**lister_args) lister.run(min_bound=start, max_bound=end) self.log.debug('%s OK' % (self.name)) @app.task(name='swh.lister.github.tasks.FullGitHubRelister', base=SWHTask, bind=True) def full_github_relister(self, split=None, **lister_args): self.log.debug('%s, lister_args=%s' % ( self.name, lister_args)) lister = new_lister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) random.shuffle(ranges) group(range_github_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) +@app.task(name='swh.lister.github.tasks.ping', + base=SWHTask, bind=True) +def ping(self): + self.log.debug(self.name) + return 'OK' diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py index eacb82f..eb8a4cb 100644 --- a/swh/lister/gitlab/tasks.py +++ b/swh/lister/gitlab/tasks.py @@ -1,59 +1,66 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.scheduler.celery_backend.config import app from swh.scheduler.task import SWHTask from .. import utils from .lister import GitLabLister NBPAGES = 10 def new_lister(api_baseurl='https://gitlab.com/api/v4', instance='gitlab', sort='asc', per_page=20): return GitLabLister( api_baseurl=api_baseurl, instance=instance, sort=sort) @app.task(name='swh.lister.gitlab.tasks.IncrementalGitLabLister', base=SWHTask, bind=True) def incremental_gitlab_lister(self, **lister_args): self.log.debug('%s, lister_args=%s' % ( self.name, lister_args)) lister_args['sort'] = 'desc' lister = new_lister(**lister_args) total_pages = lister.get_pages_information()[1] # stopping as soon as existing origins for that instance are detected lister.run(min_bound=1, max_bound=total_pages, check_existence=True) self.log.debug('%s OK' % (self.name)) @app.task(name='swh.lister.gitlab.tasks.RangeGitLabLister', base=SWHTask, bind=True) def range_gitlab_lister(self, start, end, **lister_args): self.log.debug('%s(start=%s, end=%d), lister_args=%s' % ( self.name, start, end, lister_args)) lister = new_lister(**lister_args) lister.run(min_bound=start, max_bound=end) self.log.debug('%s OK' % (self.name)) @app.task(name='swh.lister.gitlab.tasks.FullGitLabRelister', base=SWHTask, bind=True) def full_gitlab_relister(self, **lister_args): self.log.debug('%s, lister_args=%s' % ( self.name, lister_args)) lister = new_lister(**lister_args) _, total_pages, _ = lister.get_pages_information() ranges = list(utils.split_range(total_pages, NBPAGES)) random.shuffle(ranges) group(range_gitlab_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) + + +@app.task(name='swh.lister.gitlab.tasks.ping', + base=SWHTask, bind=True) +def ping(self): + self.log.debug(self.name) + return 'OK' diff --git a/swh/lister/npm/tasks.py b/swh/lister/npm/tasks.py index d032061..a34290c 100644 --- a/swh/lister/npm/tasks.py +++ b/swh/lister/npm/tasks.py @@ -1,64 +1,71 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from contextlib import contextmanager from swh.scheduler.celery_backend.config import app from swh.scheduler.task import SWHTask from swh.lister.npm.lister import NpmLister, NpmIncrementalLister from swh.lister.npm.models import NpmVisitModel @contextmanager def save_registry_state(lister): params = {'headers': lister.request_headers()} registry_state = lister.session.get(lister.api_baseurl, **params) registry_state = registry_state.json() keys = ('doc_count', 'doc_del_count', 'update_seq', 'purge_seq', 'disk_size', 'data_size', 'committed_update_seq', 'compacted_seq') state = {key: registry_state[key] for key in keys} state['visit_date'] = datetime.now() yield npm_visit = NpmVisitModel(state) lister.db_session.add(npm_visit) lister.db_session.commit() def get_last_update_seq(lister): """Get latest ``update_seq`` value for listing only updated packages. """ query = lister.db_session.query(NpmVisitModel.update_seq) row = query.order_by(NpmVisitModel.uid.desc()).first() if not row: raise ValueError('No npm registry listing previously performed ! ' 'This is required prior to the execution of an ' 'incremental listing.') return row[0] @app.task(name='swh.lister.npm.tasks.NpmListerTask', base=SWHTask, bind=True) def npm_lister(self, **lister_args): self.log.debug('%s, lister_args=%s' % ( self.name, lister_args)) lister = NpmLister(**lister_args) with save_registry_state(lister): lister.run() self.log.debug('%s OK' % (self.name)) @app.task(name='swh.lister.npm.tasks.NpmIncrementalListerTask', base=SWHTask, bind=True) def npm_incremental_lister(self, **lister_args): self.log.debug('%s, lister_args=%s' % ( self.name, lister_args)) lister = NpmIncrementalLister(**lister_args) update_seq_start = get_last_update_seq(lister) with save_registry_state(lister): lister.run(min_bound=update_seq_start) self.log.debug('%s OK' % (self.name)) + + +@app.task(name='swh.lister.npm.tasks.ping', + base=SWHTask, bind=True) +def ping(self): + self.log.debug(self.name) + return 'OK' diff --git a/swh/lister/pypi/tasks.py b/swh/lister/pypi/tasks.py index 65e9348..5a1fc5f 100644 --- a/swh/lister/pypi/tasks.py +++ b/swh/lister/pypi/tasks.py @@ -1,19 +1,24 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import SWHTask from swh.scheduler.celery_backend.config import app from .lister import PyPILister @app.task(name='swh.lister.pypi.tasks.PyPIListerTask', base=SWHTask, bind=True) def pypi_lister(self, **lister_args): self.log.debug('%s(), lister_args=%s' % ( self.name, lister_args)) PyPILister(**lister_args).run() self.log.debug('%s OK' % (self.name)) +@app.task(name='swh.lister.pypi.tasks.ping', + base=SWHTask, bind=True) +def ping(self): + self.log.debug(self.name) + return 'OK'