diff --git a/swh/lister/bitbucket/lister.py b/swh/lister/bitbucket/lister.py index d7dfe51..a5cf91b 100644 --- a/swh/lister/bitbucket/lister.py +++ b/swh/lister/bitbucket/lister.py @@ -1,98 +1,85 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime import logging import iso8601 +from datetime import datetime + from urllib import parse from swh.lister.bitbucket.models import BitBucketModel from swh.lister.core.indexing_lister import IndexingHttpLister logger = logging.getLogger(__name__) DEFAULT_BITBUCKET_PAGE = 10 class BitBucketLister(IndexingHttpLister): PATH_TEMPLATE = '/repositories?after=%s' MODEL = BitBucketModel LISTER_NAME = 'bitbucket' instance = 'bitbucket' + default_min_bound = datetime.utcfromtimestamp(0).isoformat() def __init__(self, api_baseurl, override_config=None, per_page=100): super().__init__( api_baseurl=api_baseurl, override_config=override_config) if per_page != DEFAULT_BITBUCKET_PAGE: self.PATH_TEMPLATE = '%s&pagelen=%s' % ( self.PATH_TEMPLATE, per_page) # to stay consistent with prior behavior (20 * 10 repositories then) self.flush_packet_db = int( (self.flush_packet_db * DEFAULT_BITBUCKET_PAGE) / per_page) def get_model_from_repo(self, repo): return { 'uid': repo['uuid'], 'indexable': repo['created_on'], 'name': repo['name'], 'full_name': repo['full_name'], 'html_url': repo['links']['html']['href'], 'origin_url': repo['links']['clone'][0]['href'], 'origin_type': repo['scm'], } def get_next_target_from_response(self, response): body = response.json() if 'next' in body: return parse.unquote(body['next'].split('after=')[1]) def transport_response_simplified(self, response): repos = response.json()['values'] return [self.get_model_from_repo(repo) for repo in repos] - def db_first_index(self): - """For the first time listing, there is no data in db, so fallback to the - bitbucket starting year. - - """ - return super().db_first_index() or '2008-01-01T00:00:00Z' - - def db_last_index(self): - """For the first time listing, there is no data in db, so fallback to the time - of the first run as max date. - - """ - return super().db_last_index() or datetime.datetime.now( - tz=datetime.timezone.utc).isoformat() - def request_uri(self, identifier): return super().request_uri(identifier or '1970-01-01') def is_within_bounds(self, inner, lower=None, upper=None): # values are expected to be str dates try: inner = iso8601.parse_date(inner) if lower: lower = iso8601.parse_date(lower) if upper: upper = iso8601.parse_date(upper) if lower is None and upper is None: return True elif lower is None: ret = inner <= upper elif upper is None: ret = inner >= lower else: ret = lower <= inner <= upper except Exception as e: logger.error(str(e) + ': %s, %s, %s', ('inner=%s%s' % (type(inner), inner)), ('lower=%s%s' % (type(lower), lower)), ('upper=%s%s' % (type(upper), upper))) raise return ret diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py index 1bb00c9..a084846 100644 --- a/swh/lister/bitbucket/tasks.py +++ b/swh/lister/bitbucket/tasks.py @@ -1,48 +1,57 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.scheduler.celery_backend.config import app from .lister import BitBucketLister GROUP_SPLIT = 10000 def new_lister(api_baseurl='https://api.bitbucket.org/2.0', per_page=100): return BitBucketLister(api_baseurl=api_baseurl, per_page=per_page) @app.task(name=__name__ + '.IncrementalBitBucketLister') def incremental_bitbucket_lister(**lister_args): lister = new_lister(**lister_args) lister.run(min_bound=lister.db_last_index(), max_bound=None) @app.task(name=__name__ + '.RangeBitBucketLister') def range_bitbucket_lister(start, end, **lister_args): lister = new_lister(**lister_args) lister.run(min_bound=start, max_bound=end) @app.task(name=__name__ + '.FullBitBucketRelister', bind=True) def full_bitbucket_relister(self, split=None, **lister_args): + """Relist from the beginning of what's already been listed. + + It's not to be called for an initial listing. + + """ lister = new_lister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) + if not ranges: + self.log.info('Nothing to list') + return + random.shuffle(ranges) promise = group(range_bitbucket_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)', (self.name, len(ranges))) try: promise.save() # so that we can restore the GroupResult in tests except (NotImplementedError, AttributeError): self.log.info('Unable to call save_group with current result backend.') return promise.id @app.task(name=__name__ + '.ping') def ping(): return 'OK' diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py index aa913e4..5436da1 100644 --- a/swh/lister/core/indexing_lister.py +++ b/swh/lister/core/indexing_lister.py @@ -1,240 +1,243 @@ # Copyright (C) 2015-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging from itertools import count import dateutil from sqlalchemy import func from .lister_transports import ListerHttpTransport from .lister_base import ListerBase logger = logging.getLogger(__name__) class IndexingLister(ListerBase): flush_packet_db = 20 + default_min_bound = '' """Lister* intermediate class for any service that follows the pattern: - The service must report at least one stable unique identifier, known herein as the UID value, for every listed repository. - If the service splits the list of repositories into sublists, it must report at least one stable and sorted index identifier for every listed repository, known herein as the indexable value, which can be used as part of the service endpoint query to request a sublist beginning from that index. This might be the UID if the UID is monotonic. - Client sends a request to list repositories starting from a given index. - Client receives structured (json/xml/etc) response with information about a sequential series of repositories starting from that index and, if necessary/available, some indication of the URL or index for fetching the next series of repository data. See :class:`swh.lister.core.lister_base.ListerBase` for more details. This class cannot be instantiated. To create a new Lister for a source code listing service that follows the model described above, you must subclass this class and provide the required overrides in addition to any unmet implementation/override requirements of this class's base. (see parent class and member docstrings for details) Required Overrides:: def get_next_target_from_response """ @abc.abstractmethod def get_next_target_from_response(self, response): """Find the next server endpoint identifier given the entire response. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. Args: response (transport response): response page from the server Returns: index of next page, possibly extracted from a next href url """ pass # You probably don't need to override anything below this line. def filter_before_inject(self, models_list): """Overrides ListerBase.filter_before_inject Bounds query results by this Lister's set max_index. """ models_list = [ m for m in models_list if self.is_within_bounds(m['indexable'], None, self.max_index) ] return models_list def db_query_range(self, start, end): """Look in the db for a range of repositories with indexable values in the range [start, end] Args: start (model indexable type): start of desired indexable range end (model indexable type): end of desired indexable range Returns: a list of sqlalchemy.ext.declarative.declarative_base objects with indexable values within the given range """ retlist = self.db_session.query(self.MODEL) if start is not None: retlist = retlist.filter(self.MODEL.indexable >= start) if end is not None: retlist = retlist.filter(self.MODEL.indexable <= end) return retlist def db_partition_indices(self, partition_size): """Describe an index-space compartmentalization of the db table - in equal sized chunks. This is used to describe min&max bounds for - parallelizing fetch tasks. + in equal sized chunks. This is used to describe min&max bounds for + parallelizing fetch tasks. Args: partition_size (int): desired size to make each partition + Returns: a list of tuples (begin, end) of indexable value that - declare approximately equal-sized ranges of existing - repos - """ + declare approximately equal-sized ranges of existing + repos + """ n = max(self.db_num_entries(), 10) partition_size = min(partition_size, n) n_partitions = n // partition_size min_index = self.db_first_index() max_index = self.db_last_index() if not min_index or not max_index: - raise ValueError("Can't partition an empty range") + # Nothing to list + return [] if isinstance(min_index, str): def format_bound(bound): return bound.isoformat() min_index = dateutil.parser.parse(min_index) max_index = dateutil.parser.parse(max_index) else: def format_bound(bound): return bound partition_width = (max_index - min_index) / n_partitions partitions = [ [ format_bound(min_index + i * partition_width), format_bound(min_index + (i+1) * partition_width), ] for i in range(n_partitions) ] # Remove bounds for lowest and highest partition partitions[0][0] = None partitions[-1][1] = None return [tuple(partition) for partition in partitions] def db_first_index(self): """Look in the db for the smallest indexable value Returns: the smallest indexable value of all repos in the db """ t = self.db_session.query(func.min(self.MODEL.indexable)).first() if t: return t[0] def db_last_index(self): """Look in the db for the largest indexable value Returns: the largest indexable value of all repos in the db """ t = self.db_session.query(func.max(self.MODEL.indexable)).first() if t: return t[0] def disable_deleted_repo_tasks(self, start, end, keep_these): """Disable tasks for repos that no longer exist between start and end. Args: start: beginning of range to disable end: end of range to disable keep_these (uid list): do not disable repos with uids in this list """ if end is None: end = self.db_last_index() if not self.is_within_bounds(end, None, self.max_index): end = self.max_index deleted_repos = self.winnow_models( self.db_query_range(start, end), self.MODEL.uid, keep_these ) tasks_to_disable = [repo.task_id for repo in deleted_repos if repo.task_id is not None] if tasks_to_disable: self.scheduler.disable_tasks(tasks_to_disable) for repo in deleted_repos: repo.task_id = None def run(self, min_bound=None, max_bound=None): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class docstring, continually fetching sublists until either there is no next index reference given or the given next index is greater than the desired max_bound. Args: min_bound (indexable type): optional index to start from max_bound (indexable type): optional index to stop at Returns: nothing """ self.min_index = min_bound self.max_index = max_bound def ingest_indexes(): - index = min_bound or '' + index = min_bound or self.default_min_bound for i in count(1): response, injected_repos = self.ingest_data(index) if not response and not injected_repos: logger.info('No response from api server, stopping') return next_index = self.get_next_target_from_response(response) # Determine if any repos were deleted, and disable their tasks. keep_these = list(injected_repos.keys()) self.disable_deleted_repo_tasks(index, next_index, keep_these) # termination condition if next_index is None or next_index == index: logger.info('stopping after index %s, no next link found', index) return index = next_index logger.debug('Index: %s', index) yield i for i in ingest_indexes(): if (i % self.flush_packet_db) == 0: logger.debug('Flushing updates at index %s', i) self.db_session.commit() self.db_session = self.mk_session() self.db_session.commit() self.db_session = self.mk_session() class IndexingHttpLister(ListerHttpTransport, IndexingLister): """Convenience class for ensuring right lookup and init order when combining IndexingLister and ListerHttpTransport.""" def __init__(self, api_baseurl=None, override_config=None): ListerHttpTransport.__init__(self, api_baseurl=api_baseurl) IndexingLister.__init__(self, override_config=override_config) diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py index 7d91dc9..bc3f8c2 100644 --- a/swh/lister/github/tasks.py +++ b/swh/lister/github/tasks.py @@ -1,48 +1,56 @@ -# Copyright (C) 2017-2018 the Software Heritage developers +# Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.scheduler.celery_backend.config import app from swh.lister.github.lister import GitHubLister GROUP_SPLIT = 10000 def new_lister(api_baseurl='https://api.github.com', **kw): return GitHubLister(api_baseurl=api_baseurl, **kw) @app.task(name=__name__ + '.IncrementalGitHubLister') def incremental_github_lister(**lister_args): lister = new_lister(**lister_args) lister.run(min_bound=lister.db_last_index(), max_bound=None) @app.task(name=__name__ + '.RangeGitHubLister') def range_github_lister(start, end, **lister_args): lister = new_lister(**lister_args) lister.run(min_bound=start, max_bound=end) @app.task(name=__name__ + '.FullGitHubRelister', bind=True) def full_github_relister(self, split=None, **lister_args): + """Relist from the beginning of what's already been listed. + + It's not to be called for an initial listing. + + """ lister = new_lister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) + if not ranges: + self.log.info('Nothing to list') + return random.shuffle(ranges) promise = group(range_github_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) try: promise.save() # so that we can restore the GroupResult in tests except (NotImplementedError, AttributeError): self.log.info('Unable to call save_group with current result backend.') return promise.id @app.task(name=__name__ + '.ping') def ping(): return 'OK' diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py index aee2c19..eff3114 100644 --- a/swh/lister/gitlab/tasks.py +++ b/swh/lister/gitlab/tasks.py @@ -1,57 +1,62 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.scheduler.celery_backend.config import app from .. import utils from .lister import GitLabLister NBPAGES = 10 def new_lister(api_baseurl='https://gitlab.com/api/v4', instance=None, sort='asc', per_page=20): return GitLabLister( api_baseurl=api_baseurl, instance=instance, sort=sort, per_page=per_page) @app.task(name=__name__ + '.IncrementalGitLabLister') def incremental_gitlab_lister(**lister_args): lister_args['sort'] = 'desc' lister = new_lister(**lister_args) total_pages = lister.get_pages_information()[1] # stopping as soon as existing origins for that instance are detected lister.run(min_bound=1, max_bound=total_pages, check_existence=True) @app.task(name=__name__ + '.RangeGitLabLister') def range_gitlab_lister(start, end, **lister_args): lister = new_lister(**lister_args) lister.run(min_bound=start, max_bound=end) @app.task(name=__name__ + '.FullGitLabRelister', bind=True) def full_gitlab_relister(self, **lister_args): + """Full lister + + This should be renamed as such. + + """ lister = new_lister(**lister_args) _, total_pages, _ = lister.get_pages_information() ranges = list(utils.split_range(total_pages, NBPAGES)) random.shuffle(ranges) promise = group(range_gitlab_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) try: promise.save() except (NotImplementedError, AttributeError): self.log.info('Unable to call save_group with current result backend.') return promise.id @app.task(name=__name__ + '.ping') def ping(): return 'OK' diff --git a/swh/lister/phabricator/lister.py b/swh/lister/phabricator/lister.py index 579bd31..9b8059e 100644 --- a/swh/lister/phabricator/lister.py +++ b/swh/lister/phabricator/lister.py @@ -1,177 +1,170 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import urllib.parse from swh.lister.core.indexing_lister import IndexingHttpLister from swh.lister.phabricator.models import PhabricatorModel from collections import defaultdict logger = logging.getLogger(__name__) class PhabricatorLister(IndexingHttpLister): PATH_TEMPLATE = '?order=oldest&attachments[uris]=1&after=%s' MODEL = PhabricatorModel LISTER_NAME = 'phabricator' def __init__(self, forge_url, instance=None, api_token=None, override_config=None): if forge_url.endswith("/"): forge_url = forge_url[:-1] self.forge_url = forge_url api_baseurl = '%s/api/diffusion.repository.search' % forge_url self.api_token = api_token if not instance: instance = urllib.parse.urlparse(forge_url).hostname self.instance = instance super().__init__(api_baseurl=api_baseurl, override_config=override_config) + @property + def default_min_bound(self): + """Starting boundary when `min_bound` is not defined (db empty). This + is used within the fn:`run` call. + + """ + return self._bootstrap_repositories_listing() + def _build_query_params(self, params, api_token): """Build query params to include the forge's api token Returns: updated params dict with 'params' entry. """ params.update({'params': {'api.token': api_token}}) return params def request_params(self, identifier): """Override the default params behavior to retrieve the api token Credentials are stored as: credentials: phabricator: : - username: password: """ params = {} params['headers'] = self.request_headers() or {} if self.api_token: return self._build_query_params(params, self.api_token) instance_creds = self.request_instance_credentials() if not instance_creds: raise ValueError( 'Phabricator forge needs authentication credential to list.') api_token = instance_creds[0]['password'] return self._build_query_params(params, api_token) def request_headers(self): """ (Override) Set requests headers to send when querying the Phabricator API """ return {'User-Agent': 'Software Heritage phabricator lister', 'Accept': 'application/json'} def get_model_from_repo(self, repo): url = get_repo_url(repo['attachments']['uris']['uris']) if url is None: return None return { 'uid': self.forge_url + str(repo['id']), 'indexable': repo['id'], 'name': repo['fields']['shortName'], 'full_name': repo['fields']['name'], 'html_url': url, 'origin_url': url, 'origin_type': repo['fields']['vcs'], 'instance': self.instance, } def get_next_target_from_response(self, response): body = response.json()['result']['cursor'] if body['after'] != 'null': return body['after'] return None def transport_response_simplified(self, response): repos = response.json() if repos['result'] is None: raise ValueError( 'Problem during information fetch: %s' % repos['error_code']) repos = repos['result']['data'] return [self.get_model_from_repo(repo) for repo in repos] def filter_before_inject(self, models_list): """ (Overrides) IndexingLister.filter_before_inject Bounds query results by this Lister's set max_index. """ models_list = [m for m in models_list if m is not None] return super().filter_before_inject(models_list) def _bootstrap_repositories_listing(self): """ Method called when no min_bound value has been provided to the lister. Its purpose is to: 1. get the first repository data hosted on the Phabricator instance 2. inject them into the lister database 3. return the first repository index to start the listing after that value Returns: int: The first repository index """ params = '&order=oldest&limit=1' response = self.safely_issue_request(params) models_list = self.transport_response_simplified(response) self.max_index = models_list[0]['indexable'] models_list = self.filter_before_inject(models_list) injected = self.inject_repo_data_into_db(models_list) self.schedule_missing_tasks(models_list, injected) return self.max_index - def run(self, min_bound=None, max_bound=None): - """ - (Override) Run the lister on the specified Phabricator instance - - Args: - min_bound (int): Optional repository index to start the listing - after it - max_bound (int): Optional repository index to stop the listing - after it - """ - # initial call to the lister, we need to bootstrap it in that case - if min_bound is None: - min_bound = self._bootstrap_repositories_listing() - super().run(min_bound, max_bound) - def get_repo_url(attachments): """ Return url for a hosted repository from its uris attachments according to the following priority lists: * protocol: https > http * identifier: shortname > callsign > id """ processed_urls = defaultdict(dict) for uri in attachments: protocol = uri['fields']['builtin']['protocol'] url = uri['fields']['uri']['effective'] identifier = uri['fields']['builtin']['identifier'] if protocol in ('http', 'https'): processed_urls[protocol][identifier] = url elif protocol is None: for protocol in ('https', 'http'): if url.startswith(protocol): processed_urls[protocol]['undefined'] = url break for protocol in ['https', 'http']: for identifier in ['shortname', 'callsign', 'id', 'undefined']: if (protocol in processed_urls and identifier in processed_urls[protocol]): return processed_urls[protocol][identifier] return None