diff --git a/.gitignore b/.gitignore index c5baade..67cfee3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,10 @@ *.pyc *.sw? *~ .coverage .eggs/ __pycache__ dist *.egg-info version.txt +swh/lister/_version.py diff --git a/PKG-INFO b/PKG-INFO index 1a11c7b..3bb8159 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.lister -Version: 0.0.5 +Version: 0.0.6 Summary: Software Heritage GitHub lister Home-page: https://forge.softwareheritage.org/diffusion/DLSGH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/debian/clean b/debian/clean new file mode 100644 index 0000000..c9e1970 --- /dev/null +++ b/debian/clean @@ -0,0 +1 @@ +swh/lister/_version.py diff --git a/debian/control b/debian/control index 80c5c00..43920c5 100644 --- a/debian/control +++ b/debian/control @@ -1,26 +1,27 @@ Source: swh-lister Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python (>= 2), python3-all, python3-dateutil, python3-nose, python3-setuptools, + python3-sqlalchemy, python3-swh.core, python3-swh.scheduler (>= 0.0.14~), - python3-swh.storage (>= 0.0.76~), + python3-swh.storage, + python3-swh.storage.schemata, python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/source/swh-lister/ Package: python3-swh.lister Architecture: all Depends: python3-swh.scheduler (>= 0.0.14~), - python3-swh.storage (>= 0.0.76~), ${misc:Depends}, ${python3:Depends} Breaks: python3-swh.lister.github Replaces: python3-swh.lister.github Description: Software Heritage lister diff --git a/debian/rules b/debian/rules index 6ce1946..d0ebfa4 100755 --- a/debian/rules +++ b/debian/rules @@ -1,9 +1,7 @@ #!/usr/bin/make -f export PYBUILD_NAME=swh-lister +export PYBUILD_TEST_ARGS=--with-doctest -sv -a !db,!fs swh/lister %: dh $@ --with python3 --buildsystem=pybuild - - -override_dh_auto_test: diff --git a/docs/.gitignore b/docs/.gitignore new file mode 100644 index 0000000..58a761e --- /dev/null +++ b/docs/.gitignore @@ -0,0 +1,3 @@ +_build/ +apidoc/ +*-stamp diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..c30c50a --- /dev/null +++ b/docs/Makefile @@ -0,0 +1 @@ +include ../../swh-docs/Makefile.sphinx diff --git a/docs/_static/.placeholder b/docs/_static/.placeholder new file mode 100644 index 0000000..e69de29 diff --git a/docs/_templates/.placeholder b/docs/_templates/.placeholder new file mode 100644 index 0000000..e69de29 diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..190deb7 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1 @@ +from swh.docs.sphinx.conf import * # NoQA diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..8b64117 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,15 @@ +Software Heritage - Development Documentation +============================================= + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/requirements-swh.txt b/requirements-swh.txt index a6b75b4..47175d4 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core swh.storage >= 0.0.76 -swh.scheduler >= 0.0.14 +swh.scheduler[schemata] >= 0.0.14 diff --git a/requirements.txt b/requirements.txt index 6dfeecc..9a284d8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ +arrow nose requests requests_mock setuptools SQLAlchemy testing.postgresql xmltodict diff --git a/setup.py b/setup.py index 8fe511f..74ec27b 100644 --- a/setup.py +++ b/setup.py @@ -1,30 +1,30 @@ #!/usr/bin/env python3 from setuptools import setup def parse_requirements(): requirements = [] for reqf in ('requirements.txt', 'requirements-swh.txt'): with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.lister', description='Software Heritage GitHub lister', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DLSGH/', packages=['swh.lister'], scripts=['bin/ghlister'], install_requires=parse_requirements(), setup_requires=['vcversioner'], - vcversioner={}, + vcversioner={'version_module_paths': ['swh/lister/_version.py']}, include_package_data=True, ) diff --git a/swh.lister.egg-info/PKG-INFO b/swh.lister.egg-info/PKG-INFO index 1a11c7b..3bb8159 100644 --- a/swh.lister.egg-info/PKG-INFO +++ b/swh.lister.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.lister -Version: 0.0.5 +Version: 0.0.6 Summary: Software Heritage GitHub lister Home-page: https://forge.softwareheritage.org/diffusion/DLSGH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.lister.egg-info/SOURCES.txt b/swh.lister.egg-info/SOURCES.txt index 5497187..78daf35 100644 --- a/swh.lister.egg-info/SOURCES.txt +++ b/swh.lister.egg-info/SOURCES.txt @@ -1,51 +1,61 @@ .gitignore ACKNOWLEDGEMENTS LICENSE MANIFEST.in Makefile README TODO requirements-swh.txt requirements.txt setup.py version.txt bin/batch bin/ghlister bin/reset.sh bin/status debian/changelog +debian/clean debian/compat debian/control debian/copyright debian/rules debian/source/format +docs/.gitignore +docs/Makefile +docs/conf.py +docs/index.rst +docs/_static/.placeholder +docs/_templates/.placeholder etc/crontab sql/crawler.sql sql/pimp_db.sql swh.lister.egg-info/PKG-INFO swh.lister.egg-info/SOURCES.txt swh.lister.egg-info/dependency_links.txt swh.lister.egg-info/requires.txt swh.lister.egg-info/top_level.txt +swh/lister/_version.py swh/lister/bitbucket/lister.py swh/lister/bitbucket/models.py swh/lister/bitbucket/tasks.py swh/lister/bitbucket/tests/api_empty_response.json swh/lister/bitbucket/tests/api_response.json swh/lister/bitbucket/tests/test_bb_lister.py swh/lister/core/abstractattribute.py swh/lister/core/db_utils.py swh/lister/core/indexing_lister.py swh/lister/core/lister_base.py swh/lister/core/lister_transports.py swh/lister/core/models.py swh/lister/core/tasks.py swh/lister/core/tests/test_abstractattribute.py swh/lister/core/tests/test_lister.py swh/lister/core/tests/test_model.py +swh/lister/debian/lister.py +swh/lister/debian/utils.py swh/lister/github/lister.py swh/lister/github/models.py swh/lister/github/tasks.py swh/lister/github/tests/api_empty_response.json swh/lister/github/tests/api_response.json swh/lister/github/tests/test_gh_lister.py \ No newline at end of file diff --git a/swh.lister.egg-info/requires.txt b/swh.lister.egg-info/requires.txt index 229685d..1dd5d45 100644 --- a/swh.lister.egg-info/requires.txt +++ b/swh.lister.egg-info/requires.txt @@ -1,10 +1,11 @@ SQLAlchemy +arrow nose requests requests_mock setuptools swh.core -swh.scheduler>=0.0.14 +swh.scheduler[schemata]>=0.0.14 swh.storage>=0.0.76 testing.postgresql xmltodict diff --git a/swh/lister/_version.py b/swh/lister/_version.py new file mode 100644 index 0000000..3a549e9 --- /dev/null +++ b/swh/lister/_version.py @@ -0,0 +1,5 @@ + +# This file is automatically generated by setup.py. +__version__ = '0.0.6' +__sha__ = 'g751b64b' +__revision__ = 'g751b64b' diff --git a/swh/lister/core/abstractattribute.py b/swh/lister/core/abstractattribute.py index afd244c..3adabaf 100644 --- a/swh/lister/core/abstractattribute.py +++ b/swh/lister/core/abstractattribute.py @@ -1,24 +1,26 @@ # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information class AbstractAttribute: """AbstractAttributes in a base class must be overridden by the subclass. - It's like the @abc.abstractmethod decorator, but for things that are - explicitly attributes/properties, not methods, without the need for - empty method def boilerplate. Like abc.abstractmethod, the class - containing AbstractAttributes must inherit abc.ABC or use the - abc.ABCMeta metaclass. + It's like the :func:`abc.abstractmethod` decorator, but for things that + are explicitly attributes/properties, not methods, without the need for + empty method def boilerplate. Like abc.abstractmethod, the class containing + AbstractAttributes must inherit from :class:`abc.ABC` or use the + :class:`abc.ABCMeta` metaclass. + + Usage example:: - Usage Example: import abc class ClassContainingAnAbstractAttribute(abc.ABC): foo = AbstractAttribute('descriptive docstring for foo') + """ __isabstractmethod__ = True def __init__(self, docstring=None): if docstring is not None: self.__doc__ = 'AbstractAttribute: ' + docstring diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py index e3c1b5d..8166900 100644 --- a/swh/lister/core/indexing_lister.py +++ b/swh/lister/core/indexing_lister.py @@ -1,210 +1,212 @@ # Copyright (C) 2015-2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging from sqlalchemy import func from .lister_transports import SWHListerHttpTransport from .lister_base import SWHListerBase class SWHIndexingLister(SWHListerBase): """Lister* intermediate class for any service that follows the pattern: - -- The service must report at least one stable unique identifier, - known herein as the UID value, for every listed repository. - -- If the service splits the list of repositories into sublists, - it must report at least one stable and sorted index identifier - for every listed repository, known herein as the indexable value, - which can be used as part of the service endpoint query to request - a sublist beginning from that index. This might be the UID if the - UID is monotonic. - -- Client sends a request to list repositories starting from a given - index. - -- Client receives structured (json/xml/etc) response with information - about a sequential series of repositories starting from that index - and, if necessary/available, some indication of the URL or index - for fetching the next series of repository data. - - * - See swh.lister.core.lister_base.SWHListerBase for more details. + + - The service must report at least one stable unique identifier, known + herein as the UID value, for every listed repository. + - If the service splits the list of repositories into sublists, it must + report at least one stable and sorted index identifier for every listed + repository, known herein as the indexable value, which can be used as + part of the service endpoint query to request a sublist beginning from + that index. This might be the UID if the UID is monotonic. + - Client sends a request to list repositories starting from a given + index. + - Client receives structured (json/xml/etc) response with information about + a sequential series of repositories starting from that index and, if + necessary/available, some indication of the URL or index for fetching the + next series of repository data. + + See :class:`swh.lister.core.lister_base.SWHListerBase` for more details. This class cannot be instantiated. To create a new Lister for a source code listing service that follows the model described above, you must subclass this class and provide the required overrides in addition to any unmet implementation/override requirements of this class's base. (see parent class and member docstrings for details) - Required Overrides: + Required Overrides:: + def get_next_target_from_response + """ @abc.abstractmethod def get_next_target_from_response(self, response): """Find the next server endpoint identifier given the entire response. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. Args: response (transport response): response page from the server Returns: index of next page, possibly extracted from a next href url """ pass # You probably don't need to override anything below this line. def filter_before_inject(self, models_list): """Overrides SWHListerBase.filter_before_inject Bounds query results by this Lister's set max_index. """ models_list = [ m for m in models_list if self.is_within_bounds(m['indexable'], None, self.max_index) ] return models_list def db_query_range(self, start, end): """Look in the db for a range of repositories with indexable values in the range [start, end] Args: start (model indexable type): start of desired indexable range end (model indexable type): end of desired indexable range Returns: a list of sqlalchemy.ext.declarative.declarative_base objects with indexable values within the given range """ retlist = self.db_session.query(self.MODEL) if start is not None: - retlist.filter(self.MODEL.indexable >= start) + retlist = retlist.filter(self.MODEL.indexable >= start) if end is not None: - retlist.filter(self.MODEL.indexable <= end) + retlist = retlist.filter(self.MODEL.indexable <= end) return retlist def db_partition_indices(self, partition_size): """Describe an index-space compartmentalization of the db table in equal sized chunks. This is used to describe min&max bounds for parallelizing fetch tasks. Args: partition_size (int): desired size to make each partition Returns: a list of tuples (begin, end) of indexable value that declare approximately equal-sized ranges of existing repos """ n = self.db_num_entries() partitions = [] partition_size = min(partition_size, n) prev_index = None for i in range(0, n-1, partition_size): # indexable column from the ith row index = self.db_session.query(self.MODEL.indexable) \ .order_by(self.MODEL.indexable).offset(i).first() if index is not None and prev_index is not None: partitions.append((prev_index, index)) prev_index = index partitions.append((prev_index, self.db_last_index())) return partitions def db_last_index(self): """Look in the db for the largest indexable value Returns: the largest indexable value of all repos in the db """ t = self.db_session.query(func.max(self.MODEL.indexable)).first() if t: return t[0] else: return None def disable_deleted_repo_tasks(self, start, end, keep_these): """Disable tasks for repos that no longer exist between start and end. Args: start: beginning of range to disable end: end of range to disable keep_these (uid list): do not disable repos with uids in this list """ if end is None: end = self.db_last_index() if not self.is_within_bounds(end, None, self.max_index): end = self.max_index deleted_repos = self.winnow_models( self.db_query_range(start, end), self.MODEL.uid, keep_these ) - tasks_to_disable = [repo for repo in deleted_repos + tasks_to_disable = [repo.task_id for repo in deleted_repos if repo.task_id is not None] if tasks_to_disable: self.scheduler.disable_tasks(tasks_to_disable) for repo in deleted_repos: repo.task_id = None def run(self, min_index=None, max_index=None): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class docstring, continually fetching sublists until either there is no next index reference given or the given next index is greater than the desired max_index. Args: min_index (indexable type): optional index to start from max_index (indexable type): optional index to stop at Returns: nothing """ index = min_index or '' loop_count = 0 self.min_index = min_index self.max_index = max_index while self.is_within_bounds(index, self.min_index, self.max_index): logging.info('listing repos starting at %s' % index) response, injected_repos = self.ingest_data(index) next_index = self.get_next_target_from_response(response) # Determine if any repos were deleted, and disable their tasks. keep_these = [k for k in injected_repos.keys()] self.disable_deleted_repo_tasks(index, next_index, keep_these) # termination condition if (next_index is None) or (next_index == index): logging.info('stopping after index %s, no next link found' % index) break else: index = next_index loop_count += 1 if loop_count == 20: logging.info('flushing updates') loop_count = 0 self.db_session.commit() self.db_session = self.mk_session() self.db_session.commit() self.db_session = self.mk_session() class SWHIndexingHttpLister(SWHListerHttpTransport, SWHIndexingLister): """Convenience class for ensuring right lookup and init order when combining SWHIndexingLister and SWHListerHttpTransport.""" def __init__(self, lister_name=None, api_baseurl=None, override_config=None): SWHListerHttpTransport.__init__(self, api_baseurl=api_baseurl) SWHIndexingLister.__init__(self, lister_name=lister_name, override_config=override_config) diff --git a/swh/lister/core/lister_base.py b/swh/lister/core/lister_base.py index d4c7eed..39d2c70 100644 --- a/swh/lister/core/lister_base.py +++ b/swh/lister/core/lister_base.py @@ -1,492 +1,496 @@ # Copyright (C) 2015-2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc +import datetime import gzip import logging import os import re import time -from datetime import datetime from sqlalchemy import create_engine, func from sqlalchemy.orm import sessionmaker from swh.core import config from swh.scheduler.backend import SchedulerBackend from swh.storage import get_storage from .abstractattribute import AbstractAttribute +def utcnow(): + return datetime.datetime.now(tz=datetime.timezone.utc) + + class FetchError(RuntimeError): def __init__(self, response): self.response = response def __str__(self): return repr(self.response) class SWHListerBase(abc.ABC, config.SWHConfig): """Lister core base class. Generally a source code hosting service provides an API endpoint for listing the set of stored repositories. A Lister is the discovery service responsible for finding this list, all at once or sequentially by parts, and queueing local tasks to fetch and ingest the referenced repositories. The core method in this class is ingest_data. Any subclasses should be calling this method one or more times to fetch and ingest data from API endpoints. See swh.lister.core.lister_base.SWHIndexingLister for example usage. This class cannot be instantiated. Any instantiable Lister descending from SWHListerBase must provide at least the required overrides. (see member docstrings for details): Required Overrides: MODEL def transport_request def transport_response_to_string def transport_response_simplified def transport_quota_check Optional Overrides: def filter_before_inject def is_within_bounds """ MODEL = AbstractAttribute('Subclass type (not instance)' ' of swh.lister.core.models.ModelBase' ' customized for a specific service.') @abc.abstractmethod def transport_request(self, identifier): """Given a target endpoint identifier to query, try once to request it. Implementation of this method determines the network request protocol. Args: identifier (string): unique identifier for an endpoint query. e.g. If the service indexes lists of repositories by date and time of creation, this might be that as a formatted string. Or it might be an integer UID. Or it might be nothing. It depends on what the service needs. Returns: the entire request response Raises: Will catch internal transport-dependent connection exceptions and raise swh.lister.core.lister_base.FetchError instead. Other non-connection exceptions should propogate unchanged. """ pass @abc.abstractmethod def transport_response_to_string(self, response): """Convert the server response into a formatted string for logging. Implementation of this method depends on the shape of the network response object returned by the transport_request method. Args: response: the server response Returns: a pretty string of the response """ pass @abc.abstractmethod def transport_response_simplified(self, response): """Convert the server response into list of a dict for each repo in the response, mapping columns in the lister's MODEL class to repo data. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. Args: response: response object from the server. Returns: list of repo MODEL dicts ( eg. [{'uid': r['id'], etc.} for r in response.json()] ) """ pass @abc.abstractmethod def transport_quota_check(self, response): """Check server response to see if we're hitting request rate limits. Implementation of this method depends on the server communication protocol and API spec and the shape of the network response object returned by the transport_request method. Args: response (session response): complete API query response Returns: 1) must retry request? True/False 2) seconds to delay if True """ pass def filter_before_inject(self, models_list): """Function run after transport_response_simplified but before injection into the local db and creation of workers. Can be used to eliminate some of the results if necessary. MAY BE OVERRIDDEN if an intermediate Lister class needs to filter results before injection without requiring every child class to do so. Args: models_list: list of dicts returned by transport_response_simplified. Returns: models_list with entries changed according to custom logic. """ - pass + return models_list def is_within_bounds(self, inner, lower=None, upper=None): """See if a sortable value is inside the range [lower,upper]. MAY BE OVERRIDDEN, for example if the server indexable* key is technically sortable but not automatically so. * - ( see: swh.lister.core.indexing_lister.SWHIndexingLister ) Args: inner (sortable type): the value being checked lower (sortable type): optional lower bound upper (sortable type): optional upper bound Returns: whether inner is confined by the optional lower and upper bounds """ try: if lower is None and upper is None: return True elif lower is None: ret = inner <= upper elif upper is None: ret = inner >= lower else: ret = lower <= inner <= upper self.string_pattern_check(inner, lower, upper) except Exception as e: logging.error(str(e) + ': %s, %s, %s' % (('inner=%s%s' % (type(inner), inner)), ('lower=%s%s' % (type(lower), lower)), ('upper=%s%s' % (type(upper), upper))) ) raise return ret # You probably don't need to override anything below this line. DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/' }, }), - 'scheduling_db': ('str', 'dbname=softwareheritage-scheduler'), + 'scheduling_db': ('str', 'dbname=softwareheritage-scheduler-dev'), } @property def CONFIG_BASE_FILENAME(self): # noqa: N802 return 'lister-%s' % self.lister_name @property def ADDITIONAL_CONFIG(self): # noqa: N802 return { 'lister_db_url': ('str', 'postgresql:///lister-%s' % self.lister_name), 'credentials': ('list[dict]', []), 'cache_responses': ('bool', False), 'cache_dir': ('str', '~/.cache/swh/lister/%s' % self.lister_name), } INITIAL_BACKOFF = 10 MAX_RETRIES = 7 CONN_SLEEP = 10 def __init__(self, lister_name=None, override_config=None): self.backoff = self.INITIAL_BACKOFF if lister_name is None: raise NameError("Every lister must be assigned a lister_name.") self.lister_name = lister_name # 'github?', 'bitbucket?', 'foo.com?' self.config = self.parse_config_file( base_filename=self.CONFIG_BASE_FILENAME, additional_configs=[self.ADDITIONAL_CONFIG] ) self.config['cache_dir'] = os.path.expanduser(self.config['cache_dir']) if self.config['cache_responses']: - config.prepare_folders(self.config, ['cache_dir']) + config.prepare_folders(self.config, 'cache_dir') if override_config: self.config.update(override_config) self.storage = get_storage(**self.config['storage']) self.scheduler = SchedulerBackend( scheduling_db=self.config['scheduling_db'], ) self.db_engine = create_engine(self.config['lister_db_url']) self.mk_session = sessionmaker(bind=self.db_engine) self.db_session = self.mk_session() def reset_backoff(self): """Reset exponential backoff timeout to initial level.""" self.backoff = self.INITIAL_BACKOFF def back_off(self): """Get next exponential backoff timeout.""" ret = self.backoff self.backoff *= 10 return ret def safely_issue_request(self, identifier): """Make network request with retries, rate quotas, and response logs. Protocol is handled by the implementation of the transport_request method. Args: identifier: resource identifier Returns: server response """ retries_left = self.MAX_RETRIES + do_cache = self.config['cache_responses'] while retries_left > 0: try: r = self.transport_request(identifier) except FetchError: # network-level connection error, try again logging.warn('connection error on %s: sleep for %d seconds' % (identifier, self.CONN_SLEEP)) time.sleep(self.CONN_SLEEP) retries_left -= 1 continue + if do_cache: + self.save_response(r) + # detect throttling must_retry, delay = self.transport_quota_check(r) if must_retry: logging.warn('rate limited on %s: sleep for %f seconds' % (identifier, delay)) time.sleep(delay) else: # request ok break retries_left -= 1 if not retries_left: logging.warn('giving up on %s: max retries exceeded' % identifier) - do_cache = self.config['cache_responses'] - if do_cache: - self.save_response(r) - return r def db_query_equal(self, key, value): """Look in the db for a row with key == value Args: key: column key to look at value: value to look for in that column Returns: sqlalchemy.ext.declarative.declarative_base object with the given key == value """ if isinstance(key, str): key = self.MODEL.__dict__[key] return self.db_session.query(self.MODEL) \ .filter(key == value).first() def winnow_models(self, mlist, key, to_remove): """Given a list of models, remove any with matching some member of a list of values. Args: mlist (list of model rows): the initial list of models key (column): the column to filter on to_remove (list): if anything in mlist has column equal to one of the values in to_remove, it will be removed from the result Returns: A list of model rows starting from mlist minus any matching rows """ if isinstance(key, str): key = self.MODEL.__dict__[key] if to_remove: return mlist.filter(~key.in_(to_remove)).all() else: return mlist.all() def db_num_entries(self): """Return the known number of entries in the lister db""" return self.db_session.query(func.count('*')).select_from(self.MODEL) \ .scalar() def db_inject_repo(self, model_dict): """Add/update a new repo to the db and mark it last_seen now. Args: model_dict: dictionary mapping model keys to values Returns: new or updated sqlalchemy.ext.declarative.declarative_base object associated with the injection """ sql_repo = self.db_query_equal('uid', model_dict['uid']) if not sql_repo: sql_repo = self.MODEL(**model_dict) self.db_session.add(sql_repo) else: for k in model_dict: setattr(sql_repo, k, model_dict[k]) - sql_repo.last_seen = datetime.now() + sql_repo.last_seen = utcnow() return sql_repo def origin_dict(self, origin_type, origin_url): """Return special dict format for the origins list Args: origin_type (string) origin_url (string) Returns: the same information in a different form """ return { 'type': origin_type, 'url': origin_url, } def task_dict(self, origin_type, origin_url): """Return special dict format for the tasks list Args: origin_type (string) origin_url (string) Returns: the same information in a different form """ return { 'type': 'origin-update-%s' % origin_type, 'arguments': { 'args': [ origin_url, ], 'kwargs': {}, }, - 'next_run': datetime.now(), + 'next_run': utcnow(), } def string_pattern_check(self, a, b, c=None): """When comparing indexable types in is_within_bounds, complex strings may not be allowed to differ in basic structure. If they do, it could be a sign of not understanding the data well. For instance, an ISO 8601 time string cannot be compared against its urlencoded equivalent, but this is an easy mistake to accidentally make. This method acts as a friendly sanity check. Args: a (string): inner component of the is_within_bounds method b (string): lower component of the is_within_bounds method c (string): upper component of the is_within_bounds method Returns: nothing Raises: TypeError if strings a, b, and c don't conform to the same basic pattern. """ if isinstance(a, str): a_pattern = re.sub('[a-zA-Z0-9]', '[a-zA-Z0-9]', re.escape(a)) if (isinstance(b, str) and (re.match(a_pattern, b) is None) or isinstance(c, str) and (re.match(a_pattern, c) is None)): logging.debug(a_pattern) raise TypeError('incomparable string patterns detected') def inject_repo_data_into_db(self, models_list): """Inject data into the db. Args: models_list: list of dicts mapping keys from the db model for each repo to be injected Returns: dict of uid:sql_repo pairs """ injected_repos = {} for m in models_list: injected_repos[m['uid']] = self.db_inject_repo(m) return injected_repos def create_missing_origins_and_tasks(self, models_list, injected_repos): """Find any newly created db entries that don't yet have tasks or origin objects assigned. Args: models_list: a list of dicts mapping keys in the db model for each repo injected_repos: dict of uid:sql_repo pairs that have just been created Returns: Nothing. Modifies injected_repos. """ for m in models_list: ir = injected_repos[m['uid']] if not ir.origin_id: ir.origin_id = self.storage.origin_add_one( self.origin_dict(m['origin_type'], m['origin_url']) ) if not ir.task_id: ir.task_id = self.scheduler.create_tasks( [self.task_dict(m['origin_type'], m['origin_url'])] - )['id'] + )[0]['id'] def ingest_data(self, identifier): """The core data fetch sequence. Request server endpoint. Simplify and filter response list of repositories. Inject repo information into local db. Queue loader tasks for linked repositories. Args: identifier: Resource identifier. """ # Request (partial?) list of repositories info response = self.safely_issue_request(identifier) models_list = self.transport_response_simplified(response) models_list = self.filter_before_inject(models_list) # inject into local db injected = self.inject_repo_data_into_db(models_list) # queue workers self.create_missing_origins_and_tasks(models_list, injected) return response, injected def save_response(self, response): """Log the response from a server request to a cache dir. Args: response: full server response cache_dir: system path for cache dir Returns: nothing """ - def escape_url_path(p): - return p.replace('/', '__') + datepath = utcnow().isoformat() fname = os.path.join( - self.config['cache_dir'], - escape_url_path(response.request.path_url) + '.gz' - ) + self.config['cache_dir'], + datepath + '.gz', + ) + with gzip.open(fname, 'w') as f: f.write(bytes( self.transport_response_to_string(response), 'UTF-8' )) diff --git a/swh/lister/core/lister_transports.py b/swh/lister/core/lister_transports.py index 01ee9f2..58a4605 100644 --- a/swh/lister/core/lister_transports.py +++ b/swh/lister/core/lister_transports.py @@ -1,97 +1,132 @@ # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import random from datetime import datetime from email.utils import parsedate from pprint import pformat import requests import xmltodict +try: + from swh.lister._version import __version__ +except ImportError: + __version__ = 'devel' + from .abstractattribute import AbstractAttribute from .lister_base import FetchError class SWHListerHttpTransport(abc.ABC): """Use the Requests library for making Lister endpoint requests. To be used in conjunction with SWHListerBase or a subclass of it. """ PATH_TEMPLATE = AbstractAttribute('string containing a python string' ' format pattern that produces the API' ' endpoint path for listing stored' ' repositories when given an index.' ' eg. "/repositories?after=%s".' 'To be implemented in the API-specific' ' class inheriting this.') + EXPECTED_STATUS_CODES = (200, 429, 403, 404) + def request_headers(self): """Returns dictionary of any request headers needed by the server. MAY BE OVERRIDDEN if request headers are needed. """ - return {} + return { + 'User-Agent': 'Software Heritage lister (%s)' % self.lister_version + } + + def request_uri(self, identifier): + """Get the full request URI given the transport_request identifier. + + MAY BE OVERRIDDEN if something more complex than the PATH_TEMPLATE is + required. + """ + path = self.PATH_TEMPLATE % identifier + return self.api_baseurl + path + + def request_params(self, identifier): + """Get the full parameters passed to requests given the transport_request + identifier. + + MAY BE OVERRIDDEN if something more complex than the request headers + ois needed. + """ + params = {} + params['headers'] = self.request_headers() or {} + creds = self.config['credentials'] + auth = random.choice(creds) if creds else None + if auth: + params['auth'] = (auth['username'], auth['password']) + return params def transport_quota_check(self, response): """Implements SWHListerBase.transport_quota_check with standard 429 code check for HTTP with Requests library. MAY BE OVERRIDDEN if the server notifies about rate limits in a non-standard way that doesn't use HTTP 429 and the Retry-After response header. ( https://tools.ietf.org/html/rfc6585#section-4 ) """ if response.status_code == 429: # HTTP too many requests retry_after = response.headers.get('Retry-After', self.back_off()) try: # might be seconds return True, float(retry_after) except: # might be http-date at_date = datetime(*parsedate(retry_after)[:6]) from_now = (at_date - datetime.today()).total_seconds() + 5 return True, max(0, from_now) else: # response ok self.reset_backoff() return False, 0 def __init__(self, api_baseurl=None): if not api_baseurl: raise NameError('HTTP Lister Transport requires api_baseurl.') self.api_baseurl = api_baseurl # eg. 'https://api.github.com' self.session = requests.Session() + self.lister_version = __version__ def transport_request(self, identifier): """Implements SWHListerBase.transport_request for HTTP using Requests. """ - path = self.PATH_TEMPLATE % identifier - params = {} - params['headers'] = self.request_headers() or {} - creds = self.config['credentials'] - auth = random.choice(creds) if creds else None - if auth: - params['auth'] = auth + path = self.request_uri(identifier) + params = self.request_params(identifier) + try: - return self.session.get(self.api_baseurl + path, **params) + response = self.session.get(path, **params) except requests.exceptions.ConnectionError as e: raise FetchError(e) + else: + if response.status_code not in self.EXPECTED_STATUS_CODES: + raise FetchError(response) + return response def transport_response_to_string(self, response): """Implements SWHListerBase.transport_response_to_string for HTTP given Requests responses. """ s = pformat(response.request.path_url) + s += '\n#\n' + pformat(response.request.headers) s += '\n#\n' + pformat(response.status_code) s += '\n#\n' + pformat(response.headers) s += '\n#\n' try: # json? s += pformat(response.json()) except: # not json try: # xml? s += pformat(xmltodict.parse(response.text)) except: # not xml s += pformat(response.text) return s diff --git a/swh/lister/core/tasks.py b/swh/lister/core/tasks.py index 4ab610a..5d4e0f6 100644 --- a/swh/lister/core/tasks.py +++ b/swh/lister/core/tasks.py @@ -1,71 +1,73 @@ # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import random from celery import group from celery.app.task import TaskType from swh.scheduler.task import Task from .abstractattribute import AbstractAttribute class AbstractTaskMeta(abc.ABCMeta, TaskType): pass class ListerTaskBase(Task, metaclass=AbstractTaskMeta): """Lister Tasks define the process of periodically requesting batches of repository information from source code hosting services. They instantiate Listers to do batches of work at periodic intervals. There are two main kinds of lister tasks: - 1) Discovering new repositories. - 2) Refreshing the list of already discovered repositories. + 1. Discovering new repositories. + 2. Refreshing the list of already discovered repositories. If the hosting service is indexable (according to the requirements of - SWHIndexingLister), then we can optionally partition the set of known - repositories into sub-sets to distribute the work. + :class:`SWHIndexingLister`), then we can optionally partition the + set of known repositories into sub-sets to distribute the work. This means that there is a third possible Task type for Indexing Listers: - 3) Discover or refresh a specific range of indices. + + 3. Discover or refresh a specific range of indices. + """ task_queue = AbstractAttribute('Celery Task queue name') @abc.abstractmethod def new_lister(self): """Return a new lister of the appropriate type. """ pass @abc.abstractmethod def run_task(self): pass class IndexingDiscoveryListerTask(ListerTaskBase): def run_task(self): lister = self.new_lister() return lister.run(min_index=lister.db_last_index(), max_index=None) class IndexingRangeListerTask(ListerTaskBase): def run_task(self, start, end): lister = self.new_lister() return lister.run(min_index=start, max_index=end) class IndexingRefreshListerTask(ListerTaskBase): GROUP_SPLIT = 10000 def run_task(self): lister = self.new_lister() ranges = lister.db_partition_indices(self.GROUP_SPLIT) random.shuffle(ranges) range_task = IndexingRangeListerTask() group(range_task.s(minv, maxv) for minv, maxv in ranges)() diff --git a/swh/lister/debian/lister.py b/swh/lister/debian/lister.py new file mode 100644 index 0000000..db61aec --- /dev/null +++ b/swh/lister/debian/lister.py @@ -0,0 +1,234 @@ +# Copyright (C) 2017 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import bz2 +from collections import defaultdict +import datetime +import gzip +import lzma +import logging + +from debian.deb822 import Sources +from sqlalchemy.orm import joinedload, load_only +from sqlalchemy.schema import CreateTable, DropTable + +from swh.storage.schemata.distribution import ( + AreaSnapshot, Distribution, DistributionSnapshot, Package, + TempPackage, +) + +from swh.lister.core.lister_base import SWHListerBase, FetchError +from swh.lister.core.lister_transports import SWHListerHttpTransport + +decompressors = { + 'gz': lambda f: gzip.GzipFile(fileobj=f), + 'bz2': bz2.BZ2File, + 'xz': lzma.LZMAFile, +} + + +class DebianLister(SWHListerHttpTransport, SWHListerBase): + MODEL = Package + PATH_TEMPLATE = None + + def __init__(self, override_config=None): + SWHListerHttpTransport.__init__(self, api_baseurl="bogus") + SWHListerBase.__init__(self, lister_name='debian', + override_config=override_config) + + def transport_request(self, identifier): + """Subvert SWHListerHttpTransport.transport_request, to try several + index URIs in turn. + + The Debian repository format supports several compression algorithms + across the ages, so we try several URIs. + + Once we have found a working URI, we break and set `self.decompressor` + to the one that matched. + + Returns: + a requests Response object. + + Raises: + FetchError: when all the URIs failed to be retrieved. + """ + response = None + compression = None + + for uri, compression in self.area.index_uris(): + response = super().transport_request(uri) + if response.status_code == 200: + break + else: + raise FetchError( + "Could not retrieve index for %s" % self.area + ) + self.decompressor = decompressors.get(compression) + return response + + def request_uri(self, identifier): + # In the overridden transport_request, we pass + # SWHListerBase.transport_request() the full URI as identifier, so we + # need to return it here. + return identifier + + def request_params(self, identifier): + # Enable streaming to allow wrapping the response in the decompressor + # in transport_response_simplified. + params = super().request_params(identifier) + params['stream'] = True + return params + + def transport_response_simplified(self, response): + """Decompress and parse the package index fetched in `transport_request`. + + For each package, we "pivot" the file list entries (Files, + Checksums-Sha1, Checksums-Sha256), to return a files dict mapping + filenames to their checksums. + """ + if self.decompressor: + data = self.decompressor(response.raw) + else: + data = response.raw + + for src_pkg in Sources.iter_paragraphs(data.readlines()): + files = defaultdict(dict) + + for field in src_pkg._multivalued_fields: + if field.startswith('checksums-'): + sum_name = field[len('checksums-'):] + else: + sum_name = 'md5sum' + if field in src_pkg: + for entry in src_pkg[field]: + name = entry['name'] + files[name]['name'] = entry['name'] + files[name]['size'] = int(entry['size'], 10) + files[name][sum_name] = entry[sum_name] + + yield { + 'name': src_pkg['Package'], + 'version': src_pkg['Version'], + 'directory': src_pkg['Directory'], + 'files': files, + } + + def inject_repo_data_into_db(self, models_list): + """Generate the Package entries that didn't previously exist. + + Contrary to SWHListerBase, we don't actually insert the data in + database. `create_missing_origins_and_tasks` does it once we have the + origin and task identifiers. + """ + by_name_version = {} + temp_packages = [] + + area_id = self.area.id + + for model in models_list: + name = model['name'] + version = model['version'] + temp_packages.append({ + 'area_id': area_id, + 'name': name, + 'version': version, + }) + by_name_version[name, version] = model + + # Add all the listed packages to a temporary table + self.db_session.execute(CreateTable(TempPackage.__table__)) + self.db_session.bulk_insert_mappings(TempPackage, temp_packages) + + def exists_tmp_pkg(db_session, model): + return ( + db_session.query(model) + .filter(Package.area_id == TempPackage.area_id) + .filter(Package.name == TempPackage.name) + .filter(Package.version == TempPackage.version) + .exists() + ) + + # Filter out the packages that already exist in the main Package table + new_packages = self.db_session\ + .query(TempPackage)\ + .options(load_only('name', 'version'))\ + .filter(~exists_tmp_pkg(self.db_session, Package))\ + .all() + + self.old_area_packages = self.db_session.query(Package).filter( + exists_tmp_pkg(self.db_session, TempPackage) + ).all() + + self.db_session.execute(DropTable(TempPackage.__table__)) + + added_packages = [] + for package in new_packages: + model = by_name_version[package.name, package.version] + + added_packages.append(Package(area=self.area, + **model)) + + self.db_session.add_all(added_packages) + return added_packages + + def create_missing_origins_and_tasks(self, models_list, added_packages): + """We create tasks at the end of the full snapshot processing""" + return + + def create_tasks_for_snapshot(self, snapshot): + tasks = [ + snapshot.task_for_package(name, versions) + for name, versions in snapshot.get_packages().items() + ] + + return self.scheduler.create_tasks(tasks) + + def run(self, distribution, date=None): + """Run the lister for a given (distribution, area) tuple. + + Args: + distribution (str): name of the distribution (e.g. "Debian") + date (datetime.datetime): date the snapshot is taken (defaults to + now) + """ + distribution = self.db_session\ + .query(Distribution)\ + .options(joinedload(Distribution.areas))\ + .filter(Distribution.name == distribution)\ + .one_or_none() + + if not distribution: + raise ValueError("Distribution %s is not registered" % + distribution) + + if not distribution.type == 'deb': + raise ValueError("Distribution %s is not a Debian derivative" % + distribution) + + date = date or datetime.datetime.now(tz=datetime.timezone.utc) + + logging.debug('Creating snapshot for distribution %s on date %s' % + (distribution, date)) + + snapshot = DistributionSnapshot(date=date, distribution=distribution) + + self.db_session.add(snapshot) + + for area in distribution.areas: + if not area.active: + continue + + self.area = area + + logging.debug('Processing area %s' % area) + + _, new_area_packages = self.ingest_data(None) + area_snapshot = AreaSnapshot(snapshot=snapshot, area=area) + self.db_session.add(area_snapshot) + area_snapshot.packages.extend(new_area_packages) + area_snapshot.packages.extend(self.old_area_packages) + + self.create_tasks_for_snapshot(snapshot) + + self.db_session.commit() diff --git a/swh/lister/debian/utils.py b/swh/lister/debian/utils.py new file mode 100644 index 0000000..2ed3c54 --- /dev/null +++ b/swh/lister/debian/utils.py @@ -0,0 +1,81 @@ +# Copyright (C) 2017 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +import click + +from swh.storage.schemata.distribution import Distribution, Area, SQLBase +from swh.lister.debian.lister import DebianLister + + +@click.group() +@click.option('--verbose/--no-verbose', default=False) +@click.pass_context +def cli(ctx, verbose): + ctx.obj['lister'] = DebianLister() + if verbose: + loglevel = logging.DEBUG + logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) + else: + loglevel = logging.INFO + + logging.basicConfig( + format='%(asctime)s %(process)d %(levelname)s %(message)s', + level=loglevel, + ) + + +@cli.command() +@click.pass_context +def create_schema(ctx): + """Create the schema from the models""" + SQLBase.metadata.create_all(ctx.obj['lister'].db_engine) + + +@cli.command() +@click.option('--name', help='The name of the distribution') +@click.option('--type', help='The type of distribution') +@click.option('--mirror-uri', help='The URL to the mirror of the distribution') +@click.option('--area', help='The areas for the distribution', + multiple=True) +@click.pass_context +def create_distribution(ctx, name, type, mirror_uri, area): + to_add = [] + db_session = ctx.obj['lister'].db_session + d = db_session.query(Distribution)\ + .filter(Distribution.name == name)\ + .filter(Distribution.type == type)\ + .one_or_none() + + if not d: + d = Distribution(name=name, type=type, mirror_uri=mirror_uri) + to_add.append(d) + + for area_name in area: + a = None + if d.id: + a = db_session.query(Area)\ + .filter(Area.distribution == d)\ + .filter(Area.name == area_name)\ + .one_or_none() + + if not a: + a = Area(name=area_name, distribution=d) + to_add.append(a) + + db_session.add_all(to_add) + db_session.commit() + + +@cli.command() +@click.option('--name', help='The name of the distribution') +@click.pass_context +def list_distribution(ctx, name): + """List the distribution""" + ctx.obj['lister'].run(name) + + +if __name__ == '__main__': + cli(obj={}) diff --git a/swh/lister/github/lister.py b/swh/lister/github/lister.py index 4b83bb5..af27899 100644 --- a/swh/lister/github/lister.py +++ b/swh/lister/github/lister.py @@ -1,50 +1,51 @@ # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import time from swh.lister.core.indexing_lister import SWHIndexingHttpLister from swh.lister.github.models import GitHubModel class GitHubLister(SWHIndexingHttpLister): PATH_TEMPLATE = '/repositories?since=%d' MODEL = GitHubModel API_URL_INDEX_RE = re.compile(r'^.*/repositories\?since=(\d+)') def get_model_from_repo(self, repo): return { 'uid': repo['id'], 'indexable': repo['id'], 'name': repo['name'], 'full_name': repo['full_name'], 'html_url': repo['html_url'], 'origin_url': repo['html_url'], 'origin_type': 'git', - 'description': repo['description'] + 'description': repo['description'], + 'fork': repo['fork'], } def transport_quota_check(self, response): reqs_remaining = int(response.headers['X-RateLimit-Remaining']) if response.status_code == 403 and reqs_remaining == 0: reset_at = int(response.headers['X-RateLimit-Reset']) delay = min(reset_at - time.time(), 3600) return True, delay else: return False, 0 def get_next_target_from_response(self, response): if 'next' in response.links: next_url = response.links['next']['url'] return int(self.API_URL_INDEX_RE.match(next_url).group(1)) else: return None def transport_response_simplified(self, response): repos = response.json() return [self.get_model_from_repo(repo) for repo in repos] def request_headers(self): return {'Accept': 'application/vnd.github.v3+json'} diff --git a/swh/lister/github/models.py b/swh/lister/github/models.py index 094526d..32055a7 100644 --- a/swh/lister/github/models.py +++ b/swh/lister/github/models.py @@ -1,18 +1,20 @@ # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from sqlalchemy import Column, Integer +from sqlalchemy import Column, Boolean, Integer from swh.lister.core.models import ModelBase class GitHubModel(ModelBase): """a GitHub repository""" __tablename__ = 'github_repos' uid = Column(Integer, primary_key=True) indexable = Column(Integer, index=True) + fork = Column(Boolean) def __init__(self, *args, **kwargs): + self.fork = kwargs.pop('fork', False) super().__init__(*args, **kwargs) diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py index fdc5a68..ef88c5d 100644 --- a/swh/lister/github/tasks.py +++ b/swh/lister/github/tasks.py @@ -1,27 +1,27 @@ # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.lister.core.tasks import (IndexingDiscoveryListerTask, IndexingRangeListerTask, IndexingRefreshListerTask, ListerTaskBase) from .lister import GitHubLister class GitHubListerTask(ListerTaskBase): def new_lister(self): return GitHubLister(lister_name='github.com', - api_baseurl='https://github.com') + api_baseurl='https://api.github.com') class IncrementalGitHubLister(GitHubListerTask, IndexingDiscoveryListerTask): task_queue = 'swh_lister_github_discover' class RangeGitHubLister(GitHubListerTask, IndexingRangeListerTask): task_queue = 'swh_lister_github_refresh' class FullGitHubRelister(GitHubListerTask, IndexingRefreshListerTask): task_queue = 'swh_lister_github_refresh' diff --git a/version.txt b/version.txt index 6116527..40261ee 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.5-0-g75ba12e \ No newline at end of file +v0.0.6-0-g751b64b \ No newline at end of file