diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py index b8fa316..28b29d6 100644 --- a/swh/lister/bitbucket/tasks.py +++ b/swh/lister/bitbucket/tasks.py @@ -1,54 +1,52 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random -from celery import group - -from swh.scheduler.celery_backend.config import app +from celery import group, shared_task from .lister import BitBucketLister GROUP_SPLIT = 10000 -@app.task(name=__name__ + '.IncrementalBitBucketLister') +@shared_task(name=__name__ + '.IncrementalBitBucketLister') def list_bitbucket_incremental(**lister_args): '''Incremental update of the BitBucket forge''' lister = BitBucketLister(**lister_args) lister.run(min_bound=lister.db_last_index(), max_bound=None) -@app.task(name=__name__ + '.RangeBitBucketLister') +@shared_task(name=__name__ + '.RangeBitBucketLister') def _range_bitbucket_lister(start, end, **lister_args): lister = BitBucketLister(**lister_args) lister.run(min_bound=start, max_bound=end) -@app.task(name=__name__ + '.FullBitBucketRelister', bind=True) +@shared_task(name=__name__ + '.FullBitBucketRelister', bind=True) def list_bitbucket_full(self, split=None, **lister_args): """Full update of the BitBucket forge It's not to be called for an initial listing. """ lister = BitBucketLister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) if not ranges: self.log.info('Nothing to list') return random.shuffle(ranges) promise = group(_range_bitbucket_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)', (self.name, len(ranges))) try: promise.save() # so that we can restore the GroupResult in tests except (NotImplementedError, AttributeError): self.log.info('Unable to call save_group with current result backend.') return promise.id -@app.task(name=__name__ + '.ping') +@shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/cgit/tasks.py b/swh/lister/cgit/tasks.py index 0066cf7..1295248 100644 --- a/swh/lister/cgit/tasks.py +++ b/swh/lister/cgit/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.scheduler.celery_backend.config import app +from celery import shared_task from .lister import CGitLister -@app.task(name=__name__ + '.CGitListerTask') +@shared_task(name=__name__ + '.CGitListerTask') def list_cgit(**lister_args): '''Lister task for CGit instances''' CGitLister(**lister_args).run() -@app.task(name=__name__ + '.ping') +@shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/cran/tasks.py b/swh/lister/cran/tasks.py index 3b449de..6802619 100644 --- a/swh/lister/cran/tasks.py +++ b/swh/lister/cran/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.scheduler.celery_backend.config import app +from celery import shared_task from swh.lister.cran.lister import CRANLister -@app.task(name=__name__ + '.CRANListerTask') +@shared_task(name=__name__ + '.CRANListerTask') def list_cran(**lister_args): '''Lister task for the CRAN registry''' CRANLister(**lister_args).run() -@app.task(name=__name__ + '.ping') +@shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/debian/tasks.py b/swh/lister/debian/tasks.py index 356b192..ad45f4c 100644 --- a/swh/lister/debian/tasks.py +++ b/swh/lister/debian/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.scheduler.celery_backend.config import app +from celery import shared_task from .lister import DebianLister -@app.task(name=__name__ + '.DebianListerTask') +@shared_task(name=__name__ + '.DebianListerTask') def list_debian_distribution(distribution, **lister_args): '''List a Debian distribution''' DebianLister(**lister_args).run(distribution) -@app.task(name=__name__ + '.ping') +@shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py index 555dc0a..207ddf9 100644 --- a/swh/lister/github/tasks.py +++ b/swh/lister/github/tasks.py @@ -1,53 +1,52 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random -from celery import group -from swh.scheduler.celery_backend.config import app +from celery import group, shared_task from swh.lister.github.lister import GitHubLister GROUP_SPLIT = 10000 -@app.task(name=__name__ + '.IncrementalGitHubLister') +@shared_task(name=__name__ + '.IncrementalGitHubLister') def list_github_incremental(**lister_args): 'Incremental update of GitHub' lister = GitHubLister(**lister_args) lister.run(min_bound=lister.db_last_index(), max_bound=None) -@app.task(name=__name__ + '.RangeGitHubLister') +@shared_task(name=__name__ + '.RangeGitHubLister') def _range_github_lister(start, end, **lister_args): lister = GitHubLister(**lister_args) lister.run(min_bound=start, max_bound=end) -@app.task(name=__name__ + '.FullGitHubRelister', bind=True) +@shared_task(name=__name__ + '.FullGitHubRelister', bind=True) def list_github_full(self, split=None, **lister_args): """Full update of GitHub It's not to be called for an initial listing. """ lister = GitHubLister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) if not ranges: self.log.info('Nothing to list') return random.shuffle(ranges) promise = group(_range_github_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) try: promise.save() # so that we can restore the GroupResult in tests except (NotImplementedError, AttributeError): self.log.info('Unable to call save_group with current result backend.') return promise.id -@app.task(name=__name__ + '.ping') +@shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py index 30cab41..bdb23cd 100644 --- a/swh/lister/gitlab/tasks.py +++ b/swh/lister/gitlab/tasks.py @@ -1,52 +1,51 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random -from celery import group -from swh.scheduler.celery_backend.config import app +from celery import group, shared_task from .. import utils from .lister import GitLabLister NBPAGES = 10 -@app.task(name=__name__ + '.IncrementalGitLabLister') +@shared_task(name=__name__ + '.IncrementalGitLabLister') def list_gitlab_incremental(**lister_args): """Incremental update of a GitLab instance""" lister_args['sort'] = 'desc' lister = GitLabLister(**lister_args) total_pages = lister.get_pages_information()[1] # stopping as soon as existing origins for that instance are detected lister.run(min_bound=1, max_bound=total_pages, check_existence=True) -@app.task(name=__name__ + '.RangeGitLabLister') +@shared_task(name=__name__ + '.RangeGitLabLister') def _range_gitlab_lister(start, end, **lister_args): lister = GitLabLister(**lister_args) lister.run(min_bound=start, max_bound=end) -@app.task(name=__name__ + '.FullGitLabRelister', bind=True) +@shared_task(name=__name__ + '.FullGitLabRelister', bind=True) def list_gitlab_full(self, **lister_args): """Full update of a GitLab instance""" lister = GitLabLister(**lister_args) _, total_pages, _ = lister.get_pages_information() ranges = list(utils.split_range(total_pages, NBPAGES)) random.shuffle(ranges) promise = group(_range_gitlab_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) try: promise.save() except (NotImplementedError, AttributeError): self.log.info('Unable to call save_group with current result backend.') return promise.id -@app.task(name=__name__ + '.ping') +@shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/gnu/tasks.py b/swh/lister/gnu/tasks.py index 7191453..1eb7061 100644 --- a/swh/lister/gnu/tasks.py +++ b/swh/lister/gnu/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.scheduler.celery_backend.config import app +from celery import shared_task from .lister import GNULister -@app.task(name=__name__ + '.GNUListerTask') +@shared_task(name=__name__ + '.GNUListerTask') def list_gnu_full(**lister_args): 'List lister for the GNU source code archive' GNULister(**lister_args).run() -@app.task(name=__name__ + '.ping') +@shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/npm/tasks.py b/swh/lister/npm/tasks.py index 8d1c369..94b991b 100644 --- a/swh/lister/npm/tasks.py +++ b/swh/lister/npm/tasks.py @@ -1,62 +1,62 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from contextlib import contextmanager -from swh.scheduler.celery_backend.config import app +from celery import shared_task from swh.lister.npm.lister import NpmLister, NpmIncrementalLister from swh.lister.npm.models import NpmVisitModel @contextmanager def save_registry_state(lister): params = {'headers': lister.request_headers()} registry_state = lister.session.get(lister.url, **params) registry_state = registry_state.json() keys = ('doc_count', 'doc_del_count', 'update_seq', 'purge_seq', 'disk_size', 'data_size', 'committed_update_seq', 'compacted_seq') state = {key: registry_state[key] for key in keys} state['visit_date'] = datetime.now() yield npm_visit = NpmVisitModel(**state) lister.db_session.add(npm_visit) lister.db_session.commit() def get_last_update_seq(lister): """Get latest ``update_seq`` value for listing only updated packages. """ query = lister.db_session.query(NpmVisitModel.update_seq) row = query.order_by(NpmVisitModel.uid.desc()).first() if not row: raise ValueError('No npm registry listing previously performed ! ' 'This is required prior to the execution of an ' 'incremental listing.') return row[0] -@app.task(name=__name__ + '.NpmListerTask') +@shared_task(name=__name__ + '.NpmListerTask') def list_npm_full(**lister_args): 'Full lister for the npm (javascript) registry' lister = NpmLister(**lister_args) with save_registry_state(lister): lister.run() -@app.task(name=__name__ + '.NpmIncrementalListerTask') +@shared_task(name=__name__ + '.NpmIncrementalListerTask') def list_npm_incremental(**lister_args): 'Incremental lister for the npm (javascript) registry' lister = NpmIncrementalLister(**lister_args) update_seq_start = get_last_update_seq(lister) with save_registry_state(lister): lister.run(min_bound=update_seq_start) -@app.task(name=__name__ + '.ping') +@shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/packagist/tasks.py b/swh/lister/packagist/tasks.py index 6db17dc..146ebe2 100644 --- a/swh/lister/packagist/tasks.py +++ b/swh/lister/packagist/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.scheduler.celery_backend.config import app +from celery import shared_task from .lister import PackagistLister -@app.task(name=__name__ + '.PackagistListerTask') +@shared_task(name=__name__ + '.PackagistListerTask') def list_packagist(**lister_args): 'List the packagist (php) registry' PackagistLister(**lister_args).run() -@app.task(name=__name__ + '.ping') +@shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/phabricator/tasks.py b/swh/lister/phabricator/tasks.py index 0b8b77d..361fedd 100644 --- a/swh/lister/phabricator/tasks.py +++ b/swh/lister/phabricator/tasks.py @@ -1,17 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.scheduler.celery_backend.config import app +from celery import shared_task + from swh.lister.phabricator.lister import PhabricatorLister -@app.task(name=__name__ + '.FullPhabricatorLister') +@shared_task(name=__name__ + '.FullPhabricatorLister') def list_phabricator_full(**lister_args): 'Full update of a Phabricator instance' PhabricatorLister(**lister_args).run() -@app.task(name=__name__ + '.ping') +@shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/pypi/tasks.py b/swh/lister/pypi/tasks.py index d6206a4..2412f26 100644 --- a/swh/lister/pypi/tasks.py +++ b/swh/lister/pypi/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.scheduler.celery_backend.config import app +from celery import shared_task from .lister import PyPILister -@app.task(name=__name__ + '.PyPIListerTask') +@shared_task(name=__name__ + '.PyPIListerTask') def list_pypi(**lister_args): 'Full update of the PyPI (python) registry' PyPILister(**lister_args).run() -@app.task(name=__name__ + '.ping') +@shared_task(name=__name__ + '.ping') def _ping(): return 'OK'