Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/bitbucket/tasks.py
# Copyright (C) 2017-2019 the Software Heritage developers | # Copyright (C) 2017-2021 the Software Heritage developers | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import random | from typing import Optional | ||||
from celery import group, shared_task | from celery import shared_task | ||||
from .lister import BitBucketLister | from .lister import BitbucketLister | ||||
GROUP_SPLIT = 10000 | |||||
@shared_task(name=__name__ + ".IncrementalBitBucketLister") | @shared_task(name=__name__ + ".IncrementalBitBucketLister") | ||||
def list_bitbucket_incremental(**lister_args): | def list_bitbucket_incremental( | ||||
"""Incremental update of the BitBucket forge""" | page_size: Optional[int] = None, | ||||
lister = BitBucketLister(**lister_args) | username: Optional[str] = None, | ||||
return lister.run(min_bound=lister.db_last_index(), max_bound=None) | password: Optional[str] = None, | ||||
): | |||||
"""Incremental listing of the public Bitbucket repositories.""" | |||||
@shared_task(name=__name__ + ".RangeBitBucketLister") | lister = BitbucketLister.from_configfile( | ||||
def _range_bitbucket_lister(start, end, **lister_args): | page_size=page_size, credentials=[(username, password)], | ||||
lister = BitBucketLister(**lister_args) | ) | ||||
return lister.run(min_bound=start, max_bound=end) | return lister.run().dict() | ||||
@shared_task(name=__name__ + ".FullBitBucketRelister", bind=True) | @shared_task(name=__name__ + ".FullBitBucketRelister") | ||||
def list_bitbucket_full(self, split=None, **lister_args): | def list_bitbucket_full( | ||||
"""Full update of the BitBucket forge | page_size: Optional[int] = None, | ||||
username: Optional[str] = None, | |||||
It's not to be called for an initial listing. | password: Optional[str] = None, | ||||
): | |||||
""" | """Full listing of the public Bitbucket repositories.""" | ||||
lister = BitBucketLister(**lister_args) | lister = BitbucketLister.from_configfile( | ||||
ranges = lister.db_partition_indices(split or GROUP_SPLIT) | page_size=page_size, incremental=False, credentials=[(username, password)], | ||||
if not ranges: | ) | ||||
self.log.info("Nothing to list") | return lister.run().dict() | ||||
return | |||||
random.shuffle(ranges) | |||||
promise = group( | |||||
_range_bitbucket_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges | |||||
)() | |||||
self.log.debug("%s OK (spawned %s subtasks)", (self.name, len(ranges))) | |||||
try: | |||||
promise.save() # so that we can restore the GroupResult in tests | |||||
except (NotImplementedError, AttributeError): | |||||
self.log.info("Unable to call save_group with current result backend.") | |||||
# FIXME: what to do in terms of return here? | |||||
return promise.id | |||||
@shared_task(name=__name__ + ".ping") | @shared_task(name=__name__ + ".ping") | ||||
def _ping(): | def _ping(): | ||||
return "OK" | return "OK" |