diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py index 28b29d6..3b64de0 100644 --- a/swh/lister/bitbucket/tasks.py +++ b/swh/lister/bitbucket/tasks.py @@ -1,52 +1,53 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group, shared_task from .lister import BitBucketLister GROUP_SPLIT = 10000 @shared_task(name=__name__ + '.IncrementalBitBucketLister') def list_bitbucket_incremental(**lister_args): '''Incremental update of the BitBucket forge''' lister = BitBucketLister(**lister_args) - lister.run(min_bound=lister.db_last_index(), max_bound=None) + return lister.run(min_bound=lister.db_last_index(), max_bound=None) @shared_task(name=__name__ + '.RangeBitBucketLister') def _range_bitbucket_lister(start, end, **lister_args): lister = BitBucketLister(**lister_args) - lister.run(min_bound=start, max_bound=end) + return lister.run(min_bound=start, max_bound=end) @shared_task(name=__name__ + '.FullBitBucketRelister', bind=True) def list_bitbucket_full(self, split=None, **lister_args): """Full update of the BitBucket forge It's not to be called for an initial listing. """ lister = BitBucketLister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) if not ranges: self.log.info('Nothing to list') return random.shuffle(ranges) promise = group(_range_bitbucket_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)', (self.name, len(ranges))) try: promise.save() # so that we can restore the GroupResult in tests except (NotImplementedError, AttributeError): self.log.info('Unable to call save_group with current result backend.') + # FIXME: what to do in terms of return here? return promise.id @shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/cgit/lister.py b/swh/lister/cgit/lister.py index 37464c0..a8da6c6 100644 --- a/swh/lister/cgit/lister.py +++ b/swh/lister/cgit/lister.py @@ -1,142 +1,146 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import logging from urllib.parse import urlparse, urljoin from bs4 import BeautifulSoup from requests import Session from requests.adapters import HTTPAdapter from .models import CGitModel from swh.core.utils import grouper from swh.lister import USER_AGENT from swh.lister.core.lister_base import ListerBase logger = logging.getLogger(__name__) class CGitLister(ListerBase): """Lister class for CGit repositories. This lister will retrieve the list of published git repositories by parsing the HTML page(s) of the index retrieved at `url`. For each found git repository, a query is made at the given url found in this index to gather published "Clone" URLs to be used as origin URL for that git repo. If several "Clone" urls are provided, prefer the http/https one, if any, otherwise fall bak to the first one. A loader task is created for each git repository: Task: Type: load-git Policy: recurring Args: Example: Type: load-git Policy: recurring Args: 'https://git.savannah.gnu.org/git/elisp-es.git' """ MODEL = CGitModel DEFAULT_URL = 'https://git.savannah.gnu.org/cgit/' LISTER_NAME = 'cgit' url_prefix_present = True def __init__(self, url=None, instance=None, override_config=None): """Lister class for CGit repositories. Args: url (str): main URL of the CGit instance, i.e. url of the index of published git repositories on this instance. instance (str): Name of cgit instance. Defaults to url's hostname if unset. """ super().__init__(override_config=override_config) if url is None: url = self.config.get('url', self.DEFAULT_URL) self.url = url if not instance: instance = urlparse(url).hostname self.instance = instance self.session = Session() self.session.mount(self.url, HTTPAdapter(max_retries=3)) self.session.headers = { 'User-Agent': USER_AGENT, } def run(self): + status = 'uneventful' total = 0 for repos in grouper(self.get_repos(), 10): models = list(filter(None, (self.build_model(repo) for repo in repos))) injected_repos = self.inject_repo_data_into_db(models) self.schedule_missing_tasks(models, injected_repos) self.db_session.commit() total += len(injected_repos) logger.debug('Scheduled %s tasks for %s', total, self.url) + status = 'eventful' + + return {'status': status} def get_repos(self): """Generate git 'project' URLs found on the current CGit server """ next_page = self.url while next_page: bs_idx = self.get_and_parse(next_page) for tr in bs_idx.find( 'div', {"class": "content"}).find_all( "tr", {"class": ""}): yield urljoin(self.url, tr.find('a')['href']) try: pager = bs_idx.find('ul', {'class': 'pager'}) current_page = pager.find('a', {'class': 'current'}) if current_page: next_page = current_page.parent.next_sibling.a['href'] next_page = urljoin(self.url, next_page) except (AttributeError, KeyError): # no pager, or no next page next_page = None def build_model(self, repo_url): """Given the URL of a git repo project page on a CGit server, return the repo description (dict) suitable for insertion in the db. """ bs = self.get_and_parse(repo_url) urls = [x['href'] for x in bs.find_all('a', {'rel': 'vcs-git'})] if not urls: return # look for the http/https url, if any, and use it as origin_url for url in urls: if urlparse(url).scheme in ('http', 'https'): origin_url = url break else: # otherwise, choose the first one origin_url = urls[0] return {'uid': repo_url, 'name': bs.find('a', title=re.compile('.+'))['title'], 'origin_type': 'git', 'instance': self.instance, 'origin_url': origin_url, } def get_and_parse(self, url): "Get the given url and parse the retrieved HTML using BeautifulSoup" return BeautifulSoup(self.session.get(url).text, features='html.parser') diff --git a/swh/lister/cgit/tasks.py b/swh/lister/cgit/tasks.py index 1295248..2d60e36 100644 --- a/swh/lister/cgit/tasks.py +++ b/swh/lister/cgit/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from .lister import CGitLister @shared_task(name=__name__ + '.CGitListerTask') def list_cgit(**lister_args): '''Lister task for CGit instances''' - CGitLister(**lister_args).run() + return CGitLister(**lister_args).run() @shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py index 48be968..2e7f300 100644 --- a/swh/lister/core/indexing_lister.py +++ b/swh/lister/core/indexing_lister.py @@ -1,256 +1,259 @@ # Copyright (C) 2015-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging from itertools import count import dateutil from sqlalchemy import func from .lister_transports import ListerHttpTransport from .lister_base import ListerBase logger = logging.getLogger(__name__) class IndexingLister(ListerBase): """Lister* intermediate class for any service that follows the pattern: - The service must report at least one stable unique identifier, known herein as the UID value, for every listed repository. - If the service splits the list of repositories into sublists, it must report at least one stable and sorted index identifier for every listed repository, known herein as the indexable value, which can be used as part of the service endpoint query to request a sublist beginning from that index. This might be the UID if the UID is monotonic. - Client sends a request to list repositories starting from a given index. - Client receives structured (json/xml/etc) response with information about a sequential series of repositories starting from that index and, if necessary/available, some indication of the URL or index for fetching the next series of repository data. See :class:`swh.lister.core.lister_base.ListerBase` for more details. This class cannot be instantiated. To create a new Lister for a source code listing service that follows the model described above, you must subclass this class and provide the required overrides in addition to any unmet implementation/override requirements of this class's base. (see parent class and member docstrings for details) Required Overrides:: def get_next_target_from_response """ flush_packet_db = 20 """Number of iterations in-between write flushes of lister repositories to db (see fn:`run`). """ default_min_bound = '' """Default initialization value for the minimum boundary index to use when undefined (see fn:`run`). """ @abc.abstractmethod def get_next_target_from_response(self, response): """Find the next server endpoint identifier given the entire response. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. Args: response (transport response): response page from the server Returns: index of next page, possibly extracted from a next href url """ pass # You probably don't need to override anything below this line. def filter_before_inject(self, models_list): """Overrides ListerBase.filter_before_inject Bounds query results by this Lister's set max_index. """ models_list = [ m for m in models_list if self.is_within_bounds(m['indexable'], None, self.max_index) ] return models_list def db_query_range(self, start, end): """Look in the db for a range of repositories with indexable values in the range [start, end] Args: start (model indexable type): start of desired indexable range end (model indexable type): end of desired indexable range Returns: a list of sqlalchemy.ext.declarative.declarative_base objects with indexable values within the given range """ retlist = self.db_session.query(self.MODEL) if start is not None: retlist = retlist.filter(self.MODEL.indexable >= start) if end is not None: retlist = retlist.filter(self.MODEL.indexable <= end) return retlist def db_partition_indices(self, partition_size): """Describe an index-space compartmentalization of the db table in equal sized chunks. This is used to describe min&max bounds for parallelizing fetch tasks. Args: partition_size (int): desired size to make each partition Returns: a list of tuples (begin, end) of indexable value that declare approximately equal-sized ranges of existing repos """ n = max(self.db_num_entries(), 10) partition_size = min(partition_size, n) n_partitions = n // partition_size min_index = self.db_first_index() max_index = self.db_last_index() if min_index is None or max_index is None: # Nothing to list return [] if isinstance(min_index, str): def format_bound(bound): return bound.isoformat() min_index = dateutil.parser.parse(min_index) max_index = dateutil.parser.parse(max_index) elif isinstance(max_index - min_index, int): def format_bound(bound): return int(bound) else: def format_bound(bound): return bound partition_width = (max_index - min_index) / n_partitions # Generate n_partitions + 1 bounds for n_partitions partitons bounds = [ format_bound(min_index + i * partition_width) for i in range(n_partitions + 1) ] # Trim duplicate bounds bounds.append(None) bounds = [cur for cur, next in zip(bounds[:-1], bounds[1:]) if cur != next] # Remove bounds for lowest and highest partition bounds[0] = bounds[-1] = None return list(zip(bounds[:-1], bounds[1:])) def db_first_index(self): """Look in the db for the smallest indexable value Returns: the smallest indexable value of all repos in the db """ t = self.db_session.query(func.min(self.MODEL.indexable)).first() if t: return t[0] def db_last_index(self): """Look in the db for the largest indexable value Returns: the largest indexable value of all repos in the db """ t = self.db_session.query(func.max(self.MODEL.indexable)).first() if t: return t[0] def disable_deleted_repo_tasks(self, start, end, keep_these): """Disable tasks for repos that no longer exist between start and end. Args: start: beginning of range to disable end: end of range to disable keep_these (uid list): do not disable repos with uids in this list """ if end is None: end = self.db_last_index() if not self.is_within_bounds(end, None, self.max_index): end = self.max_index deleted_repos = self.winnow_models( self.db_query_range(start, end), self.MODEL.uid, keep_these ) tasks_to_disable = [repo.task_id for repo in deleted_repos if repo.task_id is not None] if tasks_to_disable: self.scheduler.disable_tasks(tasks_to_disable) for repo in deleted_repos: repo.task_id = None def run(self, min_bound=None, max_bound=None): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class docstring, continually fetching sublists until either there is no next index reference given or the given next index is greater than the desired max_bound. Args: min_bound (indexable type): optional index to start from max_bound (indexable type): optional index to stop at Returns: nothing """ + status = 'uneventful' self.min_index = min_bound self.max_index = max_bound def ingest_indexes(): index = min_bound or self.default_min_bound for i in count(1): response, injected_repos = self.ingest_data(index) if not response and not injected_repos: logger.info('No response from api server, stopping') return next_index = self.get_next_target_from_response(response) # Determine if any repos were deleted, and disable their tasks. keep_these = list(injected_repos.keys()) self.disable_deleted_repo_tasks(index, next_index, keep_these) # termination condition if next_index is None or next_index == index: logger.info('stopping after index %s, no next link found', index) return index = next_index logger.debug('Index: %s', index) yield i for i in ingest_indexes(): if (i % self.flush_packet_db) == 0: logger.debug('Flushing updates at index %s', i) self.db_session.commit() self.db_session = self.mk_session() + status = 'eventful' self.db_session.commit() self.db_session = self.mk_session() + return {'status': status} class IndexingHttpLister(ListerHttpTransport, IndexingLister): """Convenience class for ensuring right lookup and init order when combining IndexingLister and ListerHttpTransport.""" def __init__(self, url=None, override_config=None): IndexingLister.__init__(self, override_config=override_config) ListerHttpTransport.__init__(self, url=url) diff --git a/swh/lister/core/page_by_page_lister.py b/swh/lister/core/page_by_page_lister.py index 3d6d9c7..8bcce45 100644 --- a/swh/lister/core/page_by_page_lister.py +++ b/swh/lister/core/page_by_page_lister.py @@ -1,160 +1,164 @@ # Copyright (C) 2015-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging from .lister_transports import ListerHttpTransport from .lister_base import ListerBase class PageByPageLister(ListerBase): """Lister* intermediate class for any service that follows the simple pagination page pattern. - Client sends a request to list repositories starting from a given page identifier. - Client receives structured (json/xml/etc) response with information about a sequential series of repositories (per page) starting from a given index. And, if available, some indication of the next page index for fetching the remaining repository data. See :class:`swh.lister.core.lister_base.ListerBase` for more details. This class cannot be instantiated. To create a new Lister for a source code listing service that follows the model described above, you must subclass this class. Then provide the required overrides in addition to any unmet implementation/override requirements of this class's base (see parent class and member docstrings for details). Required Overrides:: def get_next_target_from_response """ @abc.abstractmethod def get_next_target_from_response(self, response): """Find the next server endpoint page given the entire response. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. For example, some api can use the headers links to provide the next page. Args: response (transport response): response page from the server Returns: index of next page, possibly extracted from a next href url """ pass @abc.abstractmethod def get_pages_information(self): """Find the total number of pages. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. For example, some api can use dedicated headers: - x-total-pages to provide the total number of pages - x-total to provide the total number of repositories - x-per-page to provide the number of elements per page Returns: tuple (total number of repositories, total number of pages, per_page) """ pass # You probably don't need to override anything below this line. def do_additional_checks(self, models_list): """Potentially check for existence of repositories in models_list. This will be called only if check_existence is flipped on in the run method below. """ for m in models_list: sql_repo = self.db_query_equal('uid', m['uid']) if sql_repo: return False return models_list def run(self, min_bound=None, max_bound=None, check_existence=False): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class docstring. Continually fetching sublists until either there is no next page reference given or the given next page is greater than the desired max_page. Args: min_bound: optional page to start from max_bound: optional page to stop at check_existence (bool): optional existence check (for incremental lister whose sort order is inverted) Returns: nothing """ + status = 'uneventful' page = min_bound or 0 loop_count = 0 self.min_page = min_bound self.max_page = max_bound while self.is_within_bounds(page, self.min_page, self.max_page): logging.info('listing repos starting at %s' % page) response, injected_repos = self.ingest_data(page, checks=check_existence) if not response and not injected_repos: logging.info('No response from api server, stopping') break elif not injected_repos: logging.info('Repositories already seen, stopping') break + status = 'eventful' next_page = self.get_next_target_from_response(response) # termination condition if (next_page is None) or (next_page == page): logging.info('stopping after page %s, no next link found' % page) break else: page = next_page loop_count += 1 if loop_count == 20: logging.info('flushing updates') loop_count = 0 self.db_session.commit() self.db_session = self.mk_session() self.db_session.commit() self.db_session = self.mk_session() + return {'status': status} + class PageByPageHttpLister(ListerHttpTransport, PageByPageLister): """Convenience class for ensuring right lookup and init order when combining PageByPageLister and ListerHttpTransport. """ def __init__(self, url=None, override_config=None): PageByPageLister.__init__(self, override_config=override_config) ListerHttpTransport.__init__(self, url=url) diff --git a/swh/lister/core/simple_lister.py b/swh/lister/core/simple_lister.py index a355d9d..fa09ed8 100644 --- a/swh/lister/core/simple_lister.py +++ b/swh/lister/core/simple_lister.py @@ -1,91 +1,96 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Any, List from swh.core import utils from .lister_base import ListerBase logger = logging.getLogger(__name__) class SimpleLister(ListerBase): """Lister* intermediate class for any service that follows the simple, 'list in oneshot information' pattern. - Client sends a request to list repositories in oneshot - Client receives structured (json/xml/etc) response with information and stores those in db """ flush_packet_db = 2 """Number of iterations in-between write flushes of lister repositories to db (see fn:`ingest_data`). """ def list_packages(self, response: Any) -> List[Any]: """Listing packages method. """ pass def ingest_data(self, identifier, checks=False): """Rework the base ingest_data. Request server endpoint which gives all in one go. Simplify and filter response list of repositories. Inject repo information into local db. Queue loader tasks for linked repositories. Args: identifier: Resource identifier (unused) checks (bool): Additional checks required (unused) """ response = self.safely_issue_request(identifier) response = self.list_packages(response) if not response: return response, [] models_list = self.transport_response_simplified(response) models_list = self.filter_before_inject(models_list) all_injected = [] for i, models in enumerate(utils.grouper(models_list, n=100), start=1): models = list(models) logging.debug('models: %s' % len(models)) # inject into local db injected = self.inject_repo_data_into_db(models) # queue workers self.schedule_missing_tasks(models, injected) all_injected.append(injected) if (i % self.flush_packet_db) == 0: logger.debug('Flushing updates at index %s', i) self.db_session.commit() self.db_session = self.mk_session() return response, all_injected def transport_response_simplified(self, response): """Transform response to list for model manipulation """ return [self.get_model_from_repo(repo_name) for repo_name in response] def run(self): """Query the server which answers in one query. Stores the information, dropping actual redundant information we already have. Returns: nothing """ dump_not_used_identifier = 0 response, injected_repos = self.ingest_data(dump_not_used_identifier) if not response and not injected_repos: logging.info('No response from api server, stopping') + status = 'uneventful' + else: + status = 'eventful' + + return {'status': status} diff --git a/swh/lister/cran/tasks.py b/swh/lister/cran/tasks.py index 6802619..74eef74 100644 --- a/swh/lister/cran/tasks.py +++ b/swh/lister/cran/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from swh.lister.cran.lister import CRANLister @shared_task(name=__name__ + '.CRANListerTask') def list_cran(**lister_args): '''Lister task for the CRAN registry''' - CRANLister(**lister_args).run() + return CRANLister(**lister_args).run() @shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/debian/lister.py b/swh/lister/debian/lister.py index cc9ecd3..ef9853b 100644 --- a/swh/lister/debian/lister.py +++ b/swh/lister/debian/lister.py @@ -1,253 +1,255 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import bz2 from collections import defaultdict import datetime import gzip import lzma import logging from debian.deb822 import Sources from sqlalchemy.orm import joinedload, load_only from sqlalchemy.schema import CreateTable, DropTable from typing import Mapping, Optional from swh.lister.debian.models import ( AreaSnapshot, Distribution, DistributionSnapshot, Package, TempPackage, ) from swh.lister.core.lister_base import ListerBase, FetchError from swh.lister.core.lister_transports import ListerHttpTransport decompressors = { 'gz': lambda f: gzip.GzipFile(fileobj=f), 'bz2': bz2.BZ2File, 'xz': lzma.LZMAFile, } logger = logging.getLogger(__name__) class DebianLister(ListerHttpTransport, ListerBase): MODEL = Package PATH_TEMPLATE = None LISTER_NAME = 'debian' instance = 'debian' def __init__(self, distribution: str = 'Debian', date: Optional[datetime.datetime] = None, override_config: Mapping = {}): """Initialize the debian lister for a given distribution at a given date. Args: distribution: name of the distribution (e.g. "Debian") date: date the snapshot is taken (defaults to now if empty) override_config: Override configuration (which takes precedence over the parameters if provided) """ ListerHttpTransport.__init__(self, url="notused") ListerBase.__init__(self, override_config=override_config) self.distribution = override_config.get('distribution', distribution) self.date = override_config.get('date', date) or datetime.datetime.now( tz=datetime.timezone.utc) def transport_request(self, identifier): """Subvert ListerHttpTransport.transport_request, to try several index URIs in turn. The Debian repository format supports several compression algorithms across the ages, so we try several URIs. Once we have found a working URI, we break and set `self.decompressor` to the one that matched. Returns: a requests Response object. Raises: FetchError: when all the URIs failed to be retrieved. """ response = None compression = None for uri, compression in self.area.index_uris(): response = super().transport_request(uri) if response.status_code == 200: break else: raise FetchError( "Could not retrieve index for %s" % self.area ) self.decompressor = decompressors.get(compression) return response def request_uri(self, identifier): # In the overridden transport_request, we pass # ListerBase.transport_request() the full URI as identifier, so we # need to return it here. return identifier def request_params(self, identifier): # Enable streaming to allow wrapping the response in the decompressor # in transport_response_simplified. params = super().request_params(identifier) params['stream'] = True return params def transport_response_simplified(self, response): """Decompress and parse the package index fetched in `transport_request`. For each package, we "pivot" the file list entries (Files, Checksums-Sha1, Checksums-Sha256), to return a files dict mapping filenames to their checksums. """ if self.decompressor: data = self.decompressor(response.raw) else: data = response.raw for src_pkg in Sources.iter_paragraphs(data.readlines()): files = defaultdict(dict) for field in src_pkg._multivalued_fields: if field.startswith('checksums-'): sum_name = field[len('checksums-'):] else: sum_name = 'md5sum' if field in src_pkg: for entry in src_pkg[field]: name = entry['name'] files[name]['name'] = entry['name'] files[name]['size'] = int(entry['size'], 10) files[name][sum_name] = entry[sum_name] yield { 'name': src_pkg['Package'], 'version': src_pkg['Version'], 'directory': src_pkg['Directory'], 'files': files, } def inject_repo_data_into_db(self, models_list): """Generate the Package entries that didn't previously exist. Contrary to ListerBase, we don't actually insert the data in database. `schedule_missing_tasks` does it once we have the origin and task identifiers. """ by_name_version = {} temp_packages = [] area_id = self.area.id for model in models_list: name = model['name'] version = model['version'] temp_packages.append({ 'area_id': area_id, 'name': name, 'version': version, }) by_name_version[name, version] = model # Add all the listed packages to a temporary table self.db_session.execute(CreateTable(TempPackage.__table__)) self.db_session.bulk_insert_mappings(TempPackage, temp_packages) def exists_tmp_pkg(db_session, model): return ( db_session.query(model) .filter(Package.area_id == TempPackage.area_id) .filter(Package.name == TempPackage.name) .filter(Package.version == TempPackage.version) .exists() ) # Filter out the packages that already exist in the main Package table new_packages = self.db_session\ .query(TempPackage)\ .options(load_only('name', 'version'))\ .filter(~exists_tmp_pkg(self.db_session, Package))\ .all() self.old_area_packages = self.db_session.query(Package).filter( exists_tmp_pkg(self.db_session, TempPackage) ).all() self.db_session.execute(DropTable(TempPackage.__table__)) added_packages = [] for package in new_packages: model = by_name_version[package.name, package.version] added_packages.append(Package(area=self.area, **model)) self.db_session.add_all(added_packages) return added_packages def schedule_missing_tasks(self, models_list, added_packages): """We create tasks at the end of the full snapshot processing""" return def create_tasks_for_snapshot(self, snapshot): tasks = [ snapshot.task_for_package(name, versions) for name, versions in snapshot.get_packages().items() ] return self.scheduler.create_tasks(tasks) def run(self): """Run the lister for a given (distribution, area) tuple. """ distribution = self.db_session\ .query(Distribution)\ .options(joinedload(Distribution.areas))\ .filter(Distribution.name == self.distribution)\ .one_or_none() if not distribution: - raise ValueError("Distribution %s is not registered" % - self.distribution) + logger.error("Distribution %s is not registered" % + self.distribution) + return {'status': 'failed'} if not distribution.type == 'deb': - raise ValueError("Distribution %s is not a Debian derivative" % - distribution) + logger.error("Distribution %s is not a Debian derivative" % + distribution) + return {'status': 'failed'} date = self.date logger.debug('Creating snapshot for distribution %s on date %s' % (distribution, date)) snapshot = DistributionSnapshot(date=date, distribution=distribution) self.db_session.add(snapshot) for area in distribution.areas: if not area.active: continue self.area = area logger.debug('Processing area %s' % area) _, new_area_packages = self.ingest_data(None) area_snapshot = AreaSnapshot(snapshot=snapshot, area=area) self.db_session.add(area_snapshot) area_snapshot.packages.extend(new_area_packages) area_snapshot.packages.extend(self.old_area_packages) self.create_tasks_for_snapshot(snapshot) self.db_session.commit() - return True + return {'status': 'eventful'} diff --git a/swh/lister/debian/tasks.py b/swh/lister/debian/tasks.py index d542d04..04d1297 100644 --- a/swh/lister/debian/tasks.py +++ b/swh/lister/debian/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from .lister import DebianLister @shared_task(name=__name__ + '.DebianListerTask') def list_debian_distribution(distribution, **lister_args): '''List a Debian distribution''' - DebianLister(distribution=distribution, **lister_args).run() + return DebianLister(distribution=distribution, **lister_args).run() @shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py index 207ddf9..1b9f37e 100644 --- a/swh/lister/github/tasks.py +++ b/swh/lister/github/tasks.py @@ -1,52 +1,53 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group, shared_task from swh.lister.github.lister import GitHubLister GROUP_SPLIT = 10000 @shared_task(name=__name__ + '.IncrementalGitHubLister') def list_github_incremental(**lister_args): 'Incremental update of GitHub' lister = GitHubLister(**lister_args) - lister.run(min_bound=lister.db_last_index(), max_bound=None) + return lister.run(min_bound=lister.db_last_index(), max_bound=None) @shared_task(name=__name__ + '.RangeGitHubLister') def _range_github_lister(start, end, **lister_args): lister = GitHubLister(**lister_args) - lister.run(min_bound=start, max_bound=end) + return lister.run(min_bound=start, max_bound=end) @shared_task(name=__name__ + '.FullGitHubRelister', bind=True) def list_github_full(self, split=None, **lister_args): """Full update of GitHub It's not to be called for an initial listing. """ lister = GitHubLister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) if not ranges: self.log.info('Nothing to list') return random.shuffle(ranges) promise = group(_range_github_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) try: promise.save() # so that we can restore the GroupResult in tests except (NotImplementedError, AttributeError): self.log.info('Unable to call save_group with current result backend.') + # FIXME: what to do in terms of return here? return promise.id @shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py index bdb23cd..e6a1755 100644 --- a/swh/lister/gitlab/tasks.py +++ b/swh/lister/gitlab/tasks.py @@ -1,51 +1,52 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group, shared_task from .. import utils from .lister import GitLabLister NBPAGES = 10 @shared_task(name=__name__ + '.IncrementalGitLabLister') def list_gitlab_incremental(**lister_args): """Incremental update of a GitLab instance""" lister_args['sort'] = 'desc' lister = GitLabLister(**lister_args) total_pages = lister.get_pages_information()[1] # stopping as soon as existing origins for that instance are detected - lister.run(min_bound=1, max_bound=total_pages, check_existence=True) + return lister.run(min_bound=1, max_bound=total_pages, check_existence=True) @shared_task(name=__name__ + '.RangeGitLabLister') def _range_gitlab_lister(start, end, **lister_args): lister = GitLabLister(**lister_args) - lister.run(min_bound=start, max_bound=end) + return lister.run(min_bound=start, max_bound=end) @shared_task(name=__name__ + '.FullGitLabRelister', bind=True) def list_gitlab_full(self, **lister_args): """Full update of a GitLab instance""" lister = GitLabLister(**lister_args) _, total_pages, _ = lister.get_pages_information() ranges = list(utils.split_range(total_pages, NBPAGES)) random.shuffle(ranges) promise = group(_range_gitlab_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) try: promise.save() except (NotImplementedError, AttributeError): self.log.info('Unable to call save_group with current result backend.') + # FIXME: what to do in terms of return here? return promise.id @shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/gnu/tasks.py b/swh/lister/gnu/tasks.py index 1eb7061..edcde7e 100644 --- a/swh/lister/gnu/tasks.py +++ b/swh/lister/gnu/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from .lister import GNULister @shared_task(name=__name__ + '.GNUListerTask') def list_gnu_full(**lister_args): - 'List lister for the GNU source code archive' - GNULister(**lister_args).run() + """List lister for the GNU source code archive""" + return GNULister(**lister_args).run() @shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/npm/tasks.py b/swh/lister/npm/tasks.py index 94b991b..1e4a51c 100644 --- a/swh/lister/npm/tasks.py +++ b/swh/lister/npm/tasks.py @@ -1,62 +1,62 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from contextlib import contextmanager from celery import shared_task from swh.lister.npm.lister import NpmLister, NpmIncrementalLister from swh.lister.npm.models import NpmVisitModel @contextmanager def save_registry_state(lister): params = {'headers': lister.request_headers()} registry_state = lister.session.get(lister.url, **params) registry_state = registry_state.json() keys = ('doc_count', 'doc_del_count', 'update_seq', 'purge_seq', 'disk_size', 'data_size', 'committed_update_seq', 'compacted_seq') state = {key: registry_state[key] for key in keys} state['visit_date'] = datetime.now() yield npm_visit = NpmVisitModel(**state) lister.db_session.add(npm_visit) lister.db_session.commit() def get_last_update_seq(lister): """Get latest ``update_seq`` value for listing only updated packages. """ query = lister.db_session.query(NpmVisitModel.update_seq) row = query.order_by(NpmVisitModel.uid.desc()).first() if not row: raise ValueError('No npm registry listing previously performed ! ' 'This is required prior to the execution of an ' 'incremental listing.') return row[0] @shared_task(name=__name__ + '.NpmListerTask') def list_npm_full(**lister_args): 'Full lister for the npm (javascript) registry' lister = NpmLister(**lister_args) with save_registry_state(lister): - lister.run() + return lister.run() @shared_task(name=__name__ + '.NpmIncrementalListerTask') def list_npm_incremental(**lister_args): 'Incremental lister for the npm (javascript) registry' lister = NpmIncrementalLister(**lister_args) update_seq_start = get_last_update_seq(lister) with save_registry_state(lister): - lister.run(min_bound=update_seq_start) + return lister.run(min_bound=update_seq_start) @shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/phabricator/tasks.py b/swh/lister/phabricator/tasks.py index 361fedd..614f4f2 100644 --- a/swh/lister/phabricator/tasks.py +++ b/swh/lister/phabricator/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from swh.lister.phabricator.lister import PhabricatorLister @shared_task(name=__name__ + '.FullPhabricatorLister') def list_phabricator_full(**lister_args): - 'Full update of a Phabricator instance' - PhabricatorLister(**lister_args).run() + """Full update of a Phabricator instance""" + return PhabricatorLister(**lister_args).run() @shared_task(name=__name__ + '.ping') def _ping(): return 'OK' diff --git a/swh/lister/pypi/tasks.py b/swh/lister/pypi/tasks.py index 2412f26..b59e6b0 100644 --- a/swh/lister/pypi/tasks.py +++ b/swh/lister/pypi/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from .lister import PyPILister @shared_task(name=__name__ + '.PyPIListerTask') def list_pypi(**lister_args): 'Full update of the PyPI (python) registry' - PyPILister(**lister_args).run() + return PyPILister(**lister_args).run() @shared_task(name=__name__ + '.ping') def _ping(): return 'OK'