diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2ab2d5e..46b4702 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,47 +1,41 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v2.4.0 hooks: - id: trailing-whitespace - id: flake8 - id: check-json - id: check-yaml - repo: https://github.com/codespell-project/codespell rev: v1.16.0 hooks: - id: codespell exclude: ^(swh/lister/.*/tests/data/.*)$ - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] +- repo: https://github.com/python/black + rev: 19.10b0 + hooks: + - id: black + # unfortunately, we are far from being able to enable this... # - repo: https://github.com/PyCQA/pydocstyle.git # rev: 4.0.0 # hooks: # - id: pydocstyle # name: pydocstyle # description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. # entry: pydocstyle --convention=google # language: python # types: [python] -# black requires py3.6+ -#- repo: https://github.com/python/black -# rev: 19.3b0 -# hooks: -# - id: black -# language_version: python3 -#- repo: https://github.com/asottile/blacken-docs -# rev: v1.0.0-1 -# hooks: -# - id: blacken-docs -# additional_dependencies: [black==19.3b0] diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..8d79b7e --- /dev/null +++ b/setup.cfg @@ -0,0 +1,6 @@ +[flake8] +# E203: whitespaces before ':' +# E231: missing whitespace after ',' +# W503: line break before binary operator +ignore = E203,E231,W503 +max-line-length = 88 diff --git a/setup.py b/setup.py index 905d1f4..f3a07da 100755 --- a/setup.py +++ b/setup.py @@ -1,81 +1,81 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file -with open(path.join(here, 'README.md'), encoding='utf-8') as f: +with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: - reqf = 'requirements-%s.txt' % name + reqf = "requirements-%s.txt" % name else: - reqf = 'requirements.txt' + reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() - if not line or line.startswith('#'): + if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( - name='swh.lister', - description='Software Heritage lister', + name="swh.lister", + description="Software Heritage lister", long_description=long_description, - long_description_content_type='text/markdown', - author='Software Heritage developers', - author_email='swh-devel@inria.fr', - url='https://forge.softwareheritage.org/diffusion/DLSGH/', + long_description_content_type="text/markdown", + author="Software Heritage developers", + author_email="swh-devel@inria.fr", + url="https://forge.softwareheritage.org/diffusion/DLSGH/", packages=find_packages(), - install_requires=parse_requirements() + parse_requirements('swh'), - tests_require=parse_requirements('test'), - setup_requires=['vcversioner'], - extras_require={'testing': parse_requirements('test')}, + install_requires=parse_requirements() + parse_requirements("swh"), + tests_require=parse_requirements("test"), + setup_requires=["vcversioner"], + extras_require={"testing": parse_requirements("test")}, vcversioner={}, include_package_data=True, - entry_points=''' + entry_points=""" [swh.cli.subcommands] lister=swh.lister.cli:lister [swh.workers] lister.bitbucket=swh.lister.bitbucket:register lister.cgit=swh.lister.cgit:register lister.cran=swh.lister.cran:register lister.debian=swh.lister.debian:register lister.github=swh.lister.github:register lister.gitlab=swh.lister.gitlab:register lister.gnu=swh.lister.gnu:register lister.npm=swh.lister.npm:register lister.packagist=swh.lister.packagist:register lister.phabricator=swh.lister.phabricator:register lister.pypi=swh.lister.pypi:register - ''', + """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ - 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', - 'Funding': 'https://www.softwareheritage.org/donate', - 'Source': 'https://forge.softwareheritage.org/source/swh-lister', + "Bug Reports": "https://forge.softwareheritage.org/maniphest", + "Funding": "https://www.softwareheritage.org/donate", + "Source": "https://forge.softwareheritage.org/source/swh-lister", }, ) diff --git a/swh/lister/__init__.py b/swh/lister/__init__.py index 840ceca..b37e946 100644 --- a/swh/lister/__init__.py +++ b/swh/lister/__init__.py @@ -1,52 +1,55 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import pkg_resources logger = logging.getLogger(__name__) try: - __version__ = pkg_resources.get_distribution('swh.lister').version + __version__ = pkg_resources.get_distribution("swh.lister").version except pkg_resources.DistributionNotFound: - __version__ = 'devel' + __version__ = "devel" -USER_AGENT_TEMPLATE = 'Software Heritage Lister (%s)' +USER_AGENT_TEMPLATE = "Software Heritage Lister (%s)" USER_AGENT = USER_AGENT_TEMPLATE % __version__ -LISTERS = {entry_point.name.split('.', 1)[1]: entry_point - for entry_point in pkg_resources.iter_entry_points('swh.workers') - if entry_point.name.split('.', 1)[0] == 'lister'} +LISTERS = { + entry_point.name.split(".", 1)[1]: entry_point + for entry_point in pkg_resources.iter_entry_points("swh.workers") + if entry_point.name.split(".", 1)[0] == "lister" +} SUPPORTED_LISTERS = list(LISTERS) def get_lister(lister_name, db_url=None, **conf): """Instantiate a lister given its name. Args: lister_name (str): Lister's name conf (dict): Configuration dict (lister db cnx, policy, priority...) Returns: Tuple (instantiated lister, drop_tables function, init schema function, insert minimum data function) """ if lister_name not in LISTERS: raise ValueError( - 'Invalid lister %s: only supported listers are %s' % - (lister_name, SUPPORTED_LISTERS)) + "Invalid lister %s: only supported listers are %s" + % (lister_name, SUPPORTED_LISTERS) + ) if db_url: - conf['lister'] = {'cls': 'local', 'args': {'db': db_url}} + conf["lister"] = {"cls": "local", "args": {"db": db_url}} registry_entry = LISTERS[lister_name].load()() - lister_cls = registry_entry['lister'] + lister_cls = registry_entry["lister"] lister = lister_cls(override_config=conf) return lister diff --git a/swh/lister/bitbucket/__init__.py b/swh/lister/bitbucket/__init__.py index 7a524e2..917c7bd 100644 --- a/swh/lister/bitbucket/__init__.py +++ b/swh/lister/bitbucket/__init__.py @@ -1,13 +1,14 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def register(): from .models import BitBucketModel from .lister import BitBucketLister - return {'models': [BitBucketModel], - 'lister': BitBucketLister, - 'task_modules': ['%s.tasks' % __name__], - } + return { + "models": [BitBucketModel], + "lister": BitBucketLister, + "task_modules": ["%s.tasks" % __name__], + } diff --git a/swh/lister/bitbucket/lister.py b/swh/lister/bitbucket/lister.py index e067148..d64281c 100644 --- a/swh/lister/bitbucket/lister.py +++ b/swh/lister/bitbucket/lister.py @@ -1,87 +1,86 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import iso8601 from datetime import datetime, timezone from typing import Any, Dict, List, Optional from urllib import parse from requests import Response from swh.lister.bitbucket.models import BitBucketModel from swh.lister.core.indexing_lister import IndexingHttpLister logger = logging.getLogger(__name__) class BitBucketLister(IndexingHttpLister): - PATH_TEMPLATE = '/repositories?after=%s' + PATH_TEMPLATE = "/repositories?after=%s" MODEL = BitBucketModel - LISTER_NAME = 'bitbucket' - DEFAULT_URL = 'https://api.bitbucket.org/2.0' - instance = 'bitbucket' + LISTER_NAME = "bitbucket" + DEFAULT_URL = "https://api.bitbucket.org/2.0" + instance = "bitbucket" default_min_bound = datetime.fromtimestamp(0, timezone.utc) # type: Any - def __init__(self, url: str = None, - override_config=None, per_page: int = 100) -> None: + def __init__( + self, url: str = None, override_config=None, per_page: int = 100 + ) -> None: super().__init__(url=url, override_config=override_config) - per_page = self.config.get('per_page', per_page) + per_page = self.config.get("per_page", per_page) - self.PATH_TEMPLATE = '%s&pagelen=%s' % ( - self.PATH_TEMPLATE, per_page) + self.PATH_TEMPLATE = "%s&pagelen=%s" % (self.PATH_TEMPLATE, per_page) def get_model_from_repo(self, repo: Dict) -> Dict[str, Any]: return { - 'uid': repo['uuid'], - 'indexable': iso8601.parse_date(repo['created_on']), - 'name': repo['name'], - 'full_name': repo['full_name'], - 'html_url': repo['links']['html']['href'], - 'origin_url': repo['links']['clone'][0]['href'], - 'origin_type': repo['scm'], + "uid": repo["uuid"], + "indexable": iso8601.parse_date(repo["created_on"]), + "name": repo["name"], + "full_name": repo["full_name"], + "html_url": repo["links"]["html"]["href"], + "origin_url": repo["links"]["clone"][0]["href"], + "origin_type": repo["scm"], } - def get_next_target_from_response(self, response: Response - ) -> Optional[datetime]: + def get_next_target_from_response(self, response: Response) -> Optional[datetime]: """This will read the 'next' link from the api response if any and return it as a datetime. Args: response (Response): requests' response from api call Returns: next date as a datetime """ body = response.json() - next_ = body.get('next') + next_ = body.get("next") if next_ is not None: next_ = parse.urlparse(next_) - return iso8601.parse_date(parse.parse_qs(next_.query)['after'][0]) + return iso8601.parse_date(parse.parse_qs(next_.query)["after"][0]) return None - def transport_response_simplified(self, response: Response - ) -> List[Dict[str, Any]]: - repos = response.json()['values'] + def transport_response_simplified(self, response: Response) -> List[Dict[str, Any]]: + repos = response.json()["values"] return [self.get_model_from_repo(repo) for repo in repos] def request_uri(self, identifier: datetime) -> str: # type: ignore identifier_str = parse.quote(identifier.isoformat()) - return super().request_uri(identifier_str or '1970-01-01') + return super().request_uri(identifier_str or "1970-01-01") - def is_within_bounds(self, inner: int, lower: Optional[int] = None, - upper: Optional[int] = None) -> bool: + def is_within_bounds( + self, inner: int, lower: Optional[int] = None, upper: Optional[int] = None + ) -> bool: # values are expected to be datetimes if lower is None and upper is None: ret = True elif lower is None: ret = inner <= upper # type: ignore elif upper is None: ret = inner >= lower else: ret = lower <= inner <= upper return ret diff --git a/swh/lister/bitbucket/models.py b/swh/lister/bitbucket/models.py index d299b5b..dca32f7 100644 --- a/swh/lister/bitbucket/models.py +++ b/swh/lister/bitbucket/models.py @@ -1,15 +1,16 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from sqlalchemy import Column, String, DateTime from swh.lister.core.models import IndexingModelBase class BitBucketModel(IndexingModelBase): """a BitBucket repository""" - __tablename__ = 'bitbucket_repo' + + __tablename__ = "bitbucket_repo" uid = Column(String, primary_key=True) indexable = Column(DateTime(timezone=True), index=True) diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py index 3b64de0..68cae21 100644 --- a/swh/lister/bitbucket/tasks.py +++ b/swh/lister/bitbucket/tasks.py @@ -1,53 +1,54 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group, shared_task from .lister import BitBucketLister GROUP_SPLIT = 10000 -@shared_task(name=__name__ + '.IncrementalBitBucketLister') +@shared_task(name=__name__ + ".IncrementalBitBucketLister") def list_bitbucket_incremental(**lister_args): - '''Incremental update of the BitBucket forge''' + """Incremental update of the BitBucket forge""" lister = BitBucketLister(**lister_args) return lister.run(min_bound=lister.db_last_index(), max_bound=None) -@shared_task(name=__name__ + '.RangeBitBucketLister') +@shared_task(name=__name__ + ".RangeBitBucketLister") def _range_bitbucket_lister(start, end, **lister_args): lister = BitBucketLister(**lister_args) return lister.run(min_bound=start, max_bound=end) -@shared_task(name=__name__ + '.FullBitBucketRelister', bind=True) +@shared_task(name=__name__ + ".FullBitBucketRelister", bind=True) def list_bitbucket_full(self, split=None, **lister_args): """Full update of the BitBucket forge It's not to be called for an initial listing. """ lister = BitBucketLister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) if not ranges: - self.log.info('Nothing to list') + self.log.info("Nothing to list") return random.shuffle(ranges) - promise = group(_range_bitbucket_lister.s(minv, maxv, **lister_args) - for minv, maxv in ranges)() - self.log.debug('%s OK (spawned %s subtasks)', (self.name, len(ranges))) + promise = group( + _range_bitbucket_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges + )() + self.log.debug("%s OK (spawned %s subtasks)", (self.name, len(ranges))) try: promise.save() # so that we can restore the GroupResult in tests except (NotImplementedError, AttributeError): - self.log.info('Unable to call save_group with current result backend.') + self.log.info("Unable to call save_group with current result backend.") # FIXME: what to do in terms of return here? return promise.id -@shared_task(name=__name__ + '.ping') +@shared_task(name=__name__ + ".ping") def _ping(): - return 'OK' + return "OK" diff --git a/swh/lister/bitbucket/tests/test_lister.py b/swh/lister/bitbucket/tests/test_lister.py index eda17d6..191b7c5 100644 --- a/swh/lister/bitbucket/tests/test_lister.py +++ b/swh/lister/bitbucket/tests/test_lister.py @@ -1,113 +1,120 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import unittest from datetime import timedelta from urllib.parse import unquote import iso8601 import requests_mock from swh.lister.bitbucket.lister import BitBucketLister from swh.lister.core.tests.test_lister import HttpListerTester def _convert_type(req_index): """Convert the req_index to its right type according to the model's "indexable" column. """ return iso8601.parse_date(unquote(req_index)) class BitBucketListerTester(HttpListerTester, unittest.TestCase): Lister = BitBucketLister - test_re = re.compile(r'/repositories\?after=([^?&]+)') - lister_subdir = 'bitbucket' - good_api_response_file = 'data/https_api.bitbucket.org/response.json' - bad_api_response_file = 'data/https_api.bitbucket.org/empty_response.json' - first_index = _convert_type('2008-07-12T07:44:01.476818+00:00') - last_index = _convert_type('2008-07-19T06:16:43.044743+00:00') + test_re = re.compile(r"/repositories\?after=([^?&]+)") + lister_subdir = "bitbucket" + good_api_response_file = "data/https_api.bitbucket.org/response.json" + bad_api_response_file = "data/https_api.bitbucket.org/empty_response.json" + first_index = _convert_type("2008-07-12T07:44:01.476818+00:00") + last_index = _convert_type("2008-07-19T06:16:43.044743+00:00") entries_per_page = 10 convert_type = _convert_type def request_index(self, request): """(Override) This is needed to emulate the listing bootstrap when no min_bound is provided to run """ m = self.test_re.search(request.path_url) idx = _convert_type(m.group(1)) if idx == self.Lister.default_min_bound: idx = self.first_index return idx @requests_mock.Mocker() def test_fetch_none_nodb(self, http_mocker): """Overridden because index is not an integer nor a string """ http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_scheduler(fl) self.disable_db(fl) # stores no results - fl.run(min_bound=self.first_index - timedelta(days=3), - max_bound=self.first_index) + fl.run( + min_bound=self.first_index - timedelta(days=3), max_bound=self.first_index + ) def test_is_within_bounds(self): fl = self.get_fl() - self.assertTrue(fl.is_within_bounds( - iso8601.parse_date('2008-07-15'), - self.first_index, self.last_index)) - self.assertFalse(fl.is_within_bounds( - iso8601.parse_date('2008-07-20'), - self.first_index, self.last_index)) - self.assertFalse(fl.is_within_bounds( - iso8601.parse_date('2008-07-11'), - self.first_index, self.last_index)) + self.assertTrue( + fl.is_within_bounds( + iso8601.parse_date("2008-07-15"), self.first_index, self.last_index + ) + ) + self.assertFalse( + fl.is_within_bounds( + iso8601.parse_date("2008-07-20"), self.first_index, self.last_index + ) + ) + self.assertFalse( + fl.is_within_bounds( + iso8601.parse_date("2008-07-11"), self.first_index, self.last_index + ) + ) def test_lister_bitbucket(swh_listers, requests_mock_datadir): """Simple bitbucket listing should create scheduled tasks (git, hg) """ - lister = swh_listers['bitbucket'] + lister = swh_listers["bitbucket"] lister.run() - r = lister.scheduler.search_tasks(task_type='load-hg') + r = lister.scheduler.search_tasks(task_type="load-hg") assert len(r) == 9 for row in r: - args = row['arguments']['args'] - kwargs = row['arguments']['kwargs'] + args = row["arguments"]["args"] + kwargs = row["arguments"]["kwargs"] assert len(args) == 0 assert len(kwargs) == 1 - url = kwargs['url'] + url = kwargs["url"] - assert url.startswith('https://bitbucket.org') + assert url.startswith("https://bitbucket.org") - assert row['policy'] == 'recurring' - assert row['priority'] is None + assert row["policy"] == "recurring" + assert row["priority"] is None - r = lister.scheduler.search_tasks(task_type='load-git') + r = lister.scheduler.search_tasks(task_type="load-git") assert len(r) == 1 for row in r: - args = row['arguments']['args'] - kwargs = row['arguments']['kwargs'] + args = row["arguments"]["args"] + kwargs = row["arguments"]["kwargs"] assert len(args) == 0 assert len(kwargs) == 1 - url = kwargs['url'] + url = kwargs["url"] - assert url.startswith('https://bitbucket.org') + assert url.startswith("https://bitbucket.org") - assert row['policy'] == 'recurring' - assert row['priority'] is None + assert row["policy"] == "recurring" + assert row["priority"] is None diff --git a/swh/lister/bitbucket/tests/test_tasks.py b/swh/lister/bitbucket/tests/test_tasks.py index bd881ab..9441f09 100644 --- a/swh/lister/bitbucket/tests/test_tasks.py +++ b/swh/lister/bitbucket/tests/test_tasks.py @@ -1,89 +1,86 @@ from time import sleep from celery.result import GroupResult from unittest.mock import patch def test_ping(swh_app, celery_session_worker): - res = swh_app.send_task( - 'swh.lister.bitbucket.tasks.ping') + res = swh_app.send_task("swh.lister.bitbucket.tasks.ping") assert res res.wait() assert res.successful() - assert res.result == 'OK' + assert res.result == "OK" -@patch('swh.lister.bitbucket.tasks.BitBucketLister') +@patch("swh.lister.bitbucket.tasks.BitBucketLister") def test_incremental(lister, swh_app, celery_session_worker): # setup the mocked BitbucketLister lister.return_value = lister lister.db_last_index.return_value = 42 lister.run.return_value = None - res = swh_app.send_task( - 'swh.lister.bitbucket.tasks.IncrementalBitBucketLister') + res = swh_app.send_task("swh.lister.bitbucket.tasks.IncrementalBitBucketLister") assert res res.wait() assert res.successful() lister.assert_called_once_with() lister.db_last_index.assert_called_once_with() lister.run.assert_called_once_with(min_bound=42, max_bound=None) -@patch('swh.lister.bitbucket.tasks.BitBucketLister') +@patch("swh.lister.bitbucket.tasks.BitBucketLister") def test_range(lister, swh_app, celery_session_worker): # setup the mocked BitbucketLister lister.return_value = lister lister.run.return_value = None res = swh_app.send_task( - 'swh.lister.bitbucket.tasks.RangeBitBucketLister', - kwargs=dict(start=12, end=42)) + "swh.lister.bitbucket.tasks.RangeBitBucketLister", kwargs=dict(start=12, end=42) + ) assert res res.wait() assert res.successful() lister.assert_called_once_with() lister.db_last_index.assert_not_called() lister.run.assert_called_once_with(min_bound=12, max_bound=42) -@patch('swh.lister.bitbucket.tasks.BitBucketLister') +@patch("swh.lister.bitbucket.tasks.BitBucketLister") def test_relister(lister, swh_app, celery_session_worker): # setup the mocked BitbucketLister lister.return_value = lister lister.run.return_value = None - lister.db_partition_indices.return_value = [ - (i, i+9) for i in range(0, 50, 10)] + lister.db_partition_indices.return_value = [(i, i + 9) for i in range(0, 50, 10)] - res = swh_app.send_task( - 'swh.lister.bitbucket.tasks.FullBitBucketRelister') + res = swh_app.send_task("swh.lister.bitbucket.tasks.FullBitBucketRelister") assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) lister.assert_called_with() # one by the FullBitbucketRelister task # + 5 for the RangeBitbucketLister subtasks assert lister.call_count == 6 lister.db_last_index.assert_not_called() lister.db_partition_indices.assert_called_once_with(10000) # lister.run should have been called once per partition interval for i in range(5): - assert (dict(min_bound=10*i, max_bound=10*i + 9),) \ - in lister.run.call_args_list + assert ( + dict(min_bound=10 * i, max_bound=10 * i + 9), + ) in lister.run.call_args_list diff --git a/swh/lister/cgit/__init__.py b/swh/lister/cgit/__init__.py index 00d5788..f5f9cf6 100644 --- a/swh/lister/cgit/__init__.py +++ b/swh/lister/cgit/__init__.py @@ -1,13 +1,14 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def register(): from .models import CGitModel from .lister import CGitLister - return {'models': [CGitModel], - 'lister': CGitLister, - 'task_modules': ['%s.tasks' % __name__], - } + return { + "models": [CGitModel], + "lister": CGitLister, + "task_modules": ["%s.tasks" % __name__], + } diff --git a/swh/lister/cgit/lister.py b/swh/lister/cgit/lister.py index bf37ea3..5d88edc 100644 --- a/swh/lister/cgit/lister.py +++ b/swh/lister/cgit/lister.py @@ -1,150 +1,149 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import logging from urllib.parse import urlparse, urljoin from bs4 import BeautifulSoup from requests import Session from requests.adapters import HTTPAdapter from typing import Any, Dict, Generator, Optional from .models import CGitModel from swh.core.utils import grouper from swh.lister import USER_AGENT from swh.lister.core.lister_base import ListerBase logger = logging.getLogger(__name__) class CGitLister(ListerBase): """Lister class for CGit repositories. This lister will retrieve the list of published git repositories by parsing the HTML page(s) of the index retrieved at `url`. For each found git repository, a query is made at the given url found in this index to gather published "Clone" URLs to be used as origin URL for that git repo. If several "Clone" urls are provided, prefer the http/https one, if any, otherwise fall bak to the first one. A loader task is created for each git repository:: Task: Type: load-git Policy: recurring Args: Example:: Task: Type: load-git Policy: recurring Args: 'https://git.savannah.gnu.org/git/elisp-es.git' """ + MODEL = CGitModel - DEFAULT_URL = 'https://git.savannah.gnu.org/cgit/' - LISTER_NAME = 'cgit' + DEFAULT_URL = "https://git.savannah.gnu.org/cgit/" + LISTER_NAME = "cgit" url_prefix_present = True - def __init__(self, url=None, instance=None, - override_config=None): + def __init__(self, url=None, instance=None, override_config=None): """Lister class for CGit repositories. Args: url (str): main URL of the CGit instance, i.e. url of the index of published git repositories on this instance. instance (str): Name of cgit instance. Defaults to url's hostname if unset. """ super().__init__(override_config=override_config) if url is None: - url = self.config.get('url', self.DEFAULT_URL) + url = self.config.get("url", self.DEFAULT_URL) self.url = url if not instance: instance = urlparse(url).hostname self.instance = instance self.session = Session() self.session.mount(self.url, HTTPAdapter(max_retries=3)) self.session.headers = { - 'User-Agent': USER_AGENT, + "User-Agent": USER_AGENT, } def run(self) -> Dict[str, str]: - status = 'uneventful' + status = "uneventful" total = 0 for repos in grouper(self.get_repos(), 10): - models = list(filter(None, (self.build_model(repo) - for repo in repos))) + models = list(filter(None, (self.build_model(repo) for repo in repos))) injected_repos = self.inject_repo_data_into_db(models) self.schedule_missing_tasks(models, injected_repos) self.db_session.commit() total += len(injected_repos) - logger.debug('Scheduled %s tasks for %s', total, self.url) - status = 'eventful' + logger.debug("Scheduled %s tasks for %s", total, self.url) + status = "eventful" - return {'status': status} + return {"status": status} def get_repos(self) -> Generator[str, None, None]: """Generate git 'project' URLs found on the current CGit server """ next_page = self.url while next_page: bs_idx = self.get_and_parse(next_page) - for tr in bs_idx.find( - 'div', {"class": "content"}).find_all( - "tr", {"class": ""}): - yield urljoin(self.url, tr.find('a')['href']) + for tr in bs_idx.find("div", {"class": "content"}).find_all( + "tr", {"class": ""} + ): + yield urljoin(self.url, tr.find("a")["href"]) try: - pager = bs_idx.find('ul', {'class': 'pager'}) - current_page = pager.find('a', {'class': 'current'}) + pager = bs_idx.find("ul", {"class": "pager"}) + current_page = pager.find("a", {"class": "current"}) if current_page: - next_page = current_page.parent.next_sibling.a['href'] + next_page = current_page.parent.next_sibling.a["href"] next_page = urljoin(self.url, next_page) except (AttributeError, KeyError): # no pager, or no next page next_page = None def build_model(self, repo_url: str) -> Optional[Dict[str, Any]]: """Given the URL of a git repo project page on a CGit server, return the repo description (dict) suitable for insertion in the db. """ bs = self.get_and_parse(repo_url) - urls = [x['href'] for x in bs.find_all('a', {'rel': 'vcs-git'})] + urls = [x["href"] for x in bs.find_all("a", {"rel": "vcs-git"})] if not urls: return None # look for the http/https url, if any, and use it as origin_url for url in urls: - if urlparse(url).scheme in ('http', 'https'): + if urlparse(url).scheme in ("http", "https"): origin_url = url break else: # otherwise, choose the first one origin_url = urls[0] - return {'uid': repo_url, - 'name': bs.find('a', title=re.compile('.+'))['title'], - 'origin_type': 'git', - 'instance': self.instance, - 'origin_url': origin_url, - } + return { + "uid": repo_url, + "name": bs.find("a", title=re.compile(".+"))["title"], + "origin_type": "git", + "instance": self.instance, + "origin_url": origin_url, + } def get_and_parse(self, url: str) -> BeautifulSoup: "Get the given url and parse the retrieved HTML using BeautifulSoup" - return BeautifulSoup(self.session.get(url).text, - features='html.parser') + return BeautifulSoup(self.session.get(url).text, features="html.parser") diff --git a/swh/lister/cgit/models.py b/swh/lister/cgit/models.py index be10161..61bc545 100644 --- a/swh/lister/cgit/models.py +++ b/swh/lister/cgit/models.py @@ -1,17 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from sqlalchemy import Column, String from ..core.models import ModelBase class CGitModel(ModelBase): """a CGit repository representation """ - __tablename__ = 'cgit_repo' + + __tablename__ = "cgit_repo" uid = Column(String, primary_key=True) instance = Column(String, index=True) diff --git a/swh/lister/cgit/tasks.py b/swh/lister/cgit/tasks.py index 2d60e36..2e41133 100644 --- a/swh/lister/cgit/tasks.py +++ b/swh/lister/cgit/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from .lister import CGitLister -@shared_task(name=__name__ + '.CGitListerTask') +@shared_task(name=__name__ + ".CGitListerTask") def list_cgit(**lister_args): - '''Lister task for CGit instances''' + """Lister task for CGit instances""" return CGitLister(**lister_args).run() -@shared_task(name=__name__ + '.ping') +@shared_task(name=__name__ + ".ping") def _ping(): - return 'OK' + return "OK" diff --git a/swh/lister/cgit/tests/test_lister.py b/swh/lister/cgit/tests/test_lister.py index ca8ddd5..bc71e15 100644 --- a/swh/lister/cgit/tests/test_lister.py +++ b/swh/lister/cgit/tests/test_lister.py @@ -1,82 +1,82 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.lister import __version__ def test_lister_no_page(requests_mock_datadir, swh_listers): - lister = swh_listers['cgit'] + lister = swh_listers["cgit"] - assert lister.url == 'https://git.savannah.gnu.org/cgit/' + assert lister.url == "https://git.savannah.gnu.org/cgit/" repos = list(lister.get_repos()) assert len(repos) == 977 - assert repos[0] == 'https://git.savannah.gnu.org/cgit/elisp-es.git/' + assert repos[0] == "https://git.savannah.gnu.org/cgit/elisp-es.git/" # note the url below is NOT a subpath of /cgit/ - assert repos[-1] == 'https://git.savannah.gnu.org/path/to/yetris.git/' # noqa + assert repos[-1] == "https://git.savannah.gnu.org/path/to/yetris.git/" # noqa # note the url below is NOT on the same server - assert repos[-2] == 'http://example.org/cgit/xstarcastle.git/' + assert repos[-2] == "http://example.org/cgit/xstarcastle.git/" def test_lister_model(requests_mock_datadir, swh_listers): - lister = swh_listers['cgit'] + lister = swh_listers["cgit"] repo = next(lister.get_repos()) model = lister.build_model(repo) assert model == { - 'uid': 'https://git.savannah.gnu.org/cgit/elisp-es.git/', - 'name': 'elisp-es.git', - 'origin_type': 'git', - 'instance': 'git.savannah.gnu.org', - 'origin_url': 'https://git.savannah.gnu.org/git/elisp-es.git' - } + "uid": "https://git.savannah.gnu.org/cgit/elisp-es.git/", + "name": "elisp-es.git", + "origin_type": "git", + "instance": "git.savannah.gnu.org", + "origin_url": "https://git.savannah.gnu.org/git/elisp-es.git", + } def test_lister_with_pages(requests_mock_datadir, swh_listers): - lister = swh_listers['cgit'] - lister.url = 'https://git.tizen/cgit/' + lister = swh_listers["cgit"] + lister.url = "https://git.tizen/cgit/" repos = list(lister.get_repos()) # we should have 16 repos (listed on 3 pages) assert len(repos) == 16 def test_lister_run(requests_mock_datadir, swh_listers): - lister = swh_listers['cgit'] - lister.url = 'https://git.tizen/cgit/' + lister = swh_listers["cgit"] + lister.url = "https://git.tizen/cgit/" lister.run() - r = lister.scheduler.search_tasks(task_type='load-git') + r = lister.scheduler.search_tasks(task_type="load-git") assert len(r) == 16 for row in r: - assert row['type'] == 'load-git' + assert row["type"] == "load-git" # arguments check - args = row['arguments']['args'] + args = row["arguments"]["args"] assert len(args) == 0 # kwargs - kwargs = row['arguments']['kwargs'] + kwargs = row["arguments"]["kwargs"] assert len(kwargs) == 1 - url = kwargs['url'] - assert url.startswith('https://git.tizen') + url = kwargs["url"] + assert url.startswith("https://git.tizen") - assert row['policy'] == 'recurring' - assert row['priority'] is None + assert row["policy"] == "recurring" + assert row["priority"] is None def test_lister_requests(requests_mock_datadir, swh_listers): - lister = swh_listers['cgit'] - lister.url = 'https://git.tizen/cgit/' + lister = swh_listers["cgit"] + lister.url = "https://git.tizen/cgit/" lister.run() assert len(requests_mock_datadir.request_history) != 0 for request in requests_mock_datadir.request_history: - assert 'User-Agent' in request.headers - user_agent = request.headers['User-Agent'] - assert 'Software Heritage Lister' in user_agent + assert "User-Agent" in request.headers + user_agent = request.headers["User-Agent"] + assert "Software Heritage Lister" in user_agent assert __version__ in user_agent diff --git a/swh/lister/cgit/tests/test_tasks.py b/swh/lister/cgit/tests/test_tasks.py index 38bf7b7..866bfde 100644 --- a/swh/lister/cgit/tests/test_tasks.py +++ b/swh/lister/cgit/tests/test_tasks.py @@ -1,30 +1,28 @@ from unittest.mock import patch def test_ping(swh_app, celery_session_worker): - res = swh_app.send_task( - 'swh.lister.cgit.tasks.ping') + res = swh_app.send_task("swh.lister.cgit.tasks.ping") assert res res.wait() assert res.successful() - assert res.result == 'OK' + assert res.result == "OK" -@patch('swh.lister.cgit.tasks.CGitLister') +@patch("swh.lister.cgit.tasks.CGitLister") def test_lister(lister, swh_app, celery_session_worker): # setup the mocked CGitLister lister.return_value = lister lister.run.return_value = None res = swh_app.send_task( - 'swh.lister.cgit.tasks.CGitListerTask', - kwargs=dict(url='https://git.kernel.org/', instance='kernel')) + "swh.lister.cgit.tasks.CGitListerTask", + kwargs=dict(url="https://git.kernel.org/", instance="kernel"), + ) assert res res.wait() assert res.successful() - lister.assert_called_once_with( - url='https://git.kernel.org/', - instance='kernel') + lister.assert_called_once_with(url="https://git.kernel.org/", instance="kernel") lister.db_last_index.assert_not_called() lister.run.assert_called_once_with() diff --git a/swh/lister/cli.py b/swh/lister/cli.py index 365c36a..c725213 100644 --- a/swh/lister/cli.py +++ b/swh/lister/cli.py @@ -1,126 +1,145 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import logging from copy import deepcopy import click from sqlalchemy import create_engine from swh.core.cli import CONTEXT_SETTINGS from swh.lister import get_lister, SUPPORTED_LISTERS, LISTERS from swh.lister.core.models import initialize logger = logging.getLogger(__name__) # the key in this dict is the suffix used to match new task-type to be added. # For example for a task which function name is "list_gitlab_full', the default # value used when inserting a new task-type in the scheduler db will be the one # under the 'full' key below (because it matches xxx_full). DEFAULT_TASK_TYPE = { - 'full': { # for tasks like 'list_xxx_full()' - 'default_interval': '90 days', - 'min_interval': '90 days', - 'max_interval': '90 days', - 'backoff_factor': 1 - }, - '*': { # value if not suffix matches - 'default_interval': '1 day', - 'min_interval': '1 day', - 'max_interval': '1 day', - 'backoff_factor': 1 - }, - } - - -@click.group(name='lister', context_settings=CONTEXT_SETTINGS) -@click.option('--config-file', '-C', default=None, - type=click.Path(exists=True, dir_okay=False,), - help="Configuration file.") -@click.option('--db-url', '-d', default=None, - help='SQLAlchemy DB URL; see ' - '') # noqa + "full": { # for tasks like 'list_xxx_full()' + "default_interval": "90 days", + "min_interval": "90 days", + "max_interval": "90 days", + "backoff_factor": 1, + }, + "*": { # value if not suffix matches + "default_interval": "1 day", + "min_interval": "1 day", + "max_interval": "1 day", + "backoff_factor": 1, + }, +} + + +@click.group(name="lister", context_settings=CONTEXT_SETTINGS) +@click.option( + "--config-file", + "-C", + default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.", +) +@click.option( + "--db-url", + "-d", + default=None, + help="SQLAlchemy DB URL; see " + "", +) # noqa @click.pass_context def lister(ctx, config_file, db_url): - '''Software Heritage Lister tools.''' + """Software Heritage Lister tools.""" from swh.core import config + ctx.ensure_object(dict) if not config_file: - config_file = os.environ.get('SWH_CONFIG_FILENAME') + config_file = os.environ.get("SWH_CONFIG_FILENAME") conf = config.read(config_file) if db_url: - conf['lister'] = { - 'cls': 'local', - 'args': {'db': db_url} - } - ctx.obj['config'] = conf - - -@lister.command(name='db-init', context_settings=CONTEXT_SETTINGS) -@click.option('--drop-tables', '-D', is_flag=True, default=False, - help='Drop tables before creating the database schema') + conf["lister"] = {"cls": "local", "args": {"db": db_url}} + ctx.obj["config"] = conf + + +@lister.command(name="db-init", context_settings=CONTEXT_SETTINGS) +@click.option( + "--drop-tables", + "-D", + is_flag=True, + default=False, + help="Drop tables before creating the database schema", +) @click.pass_context def db_init(ctx, drop_tables): """Initialize the database model for given listers. """ - cfg = ctx.obj['config'] - lister_cfg = cfg['lister'] - if lister_cfg['cls'] != 'local': - click.echo('A local lister configuration is required') + cfg = ctx.obj["config"] + lister_cfg = cfg["lister"] + if lister_cfg["cls"] != "local": + click.echo("A local lister configuration is required") ctx.exit(1) - db_url = lister_cfg['args']['db'] + db_url = lister_cfg["args"]["db"] db_engine = create_engine(db_url) registry = {} for lister, entrypoint in LISTERS.items(): - logger.info('Loading lister %s', lister) + logger.info("Loading lister %s", lister) registry[lister] = entrypoint.load()() - logger.info('Initializing database') + logger.info("Initializing database") initialize(db_engine, drop_tables) for lister, entrypoint in LISTERS.items(): registry_entry = registry[lister] - init_hook = registry_entry.get('init') + init_hook = registry_entry.get("init") if callable(init_hook): - logger.info('Calling init hook for %s', lister) + logger.info("Calling init hook for %s", lister) init_hook(db_engine) -@lister.command(name='run', context_settings=CONTEXT_SETTINGS, - help='Trigger a full listing run for a particular forge ' - 'instance. The output of this listing results in ' - '"oneshot" tasks in the scheduler db with a priority ' - 'defined by the user') -@click.option('--lister', '-l', help='Lister to run', - type=click.Choice(SUPPORTED_LISTERS)) -@click.option('--priority', '-p', default='high', - type=click.Choice(['high', 'medium', 'low']), - help='Task priority for the listed repositories to ingest') -@click.argument('options', nargs=-1) +@lister.command( + name="run", + context_settings=CONTEXT_SETTINGS, + help="Trigger a full listing run for a particular forge " + "instance. The output of this listing results in " + '"oneshot" tasks in the scheduler db with a priority ' + "defined by the user", +) +@click.option( + "--lister", "-l", help="Lister to run", type=click.Choice(SUPPORTED_LISTERS) +) +@click.option( + "--priority", + "-p", + default="high", + type=click.Choice(["high", "medium", "low"]), + help="Task priority for the listed repositories to ingest", +) +@click.argument("options", nargs=-1) @click.pass_context def run(ctx, lister, priority, options): from swh.scheduler.cli.utils import parse_options - config = deepcopy(ctx.obj['config']) + config = deepcopy(ctx.obj["config"]) if options: config.update(parse_options(options)[1]) - config['priority'] = priority - config['policy'] = 'oneshot' + config["priority"] = priority + config["policy"] = "oneshot" get_lister(lister, **config).run() -if __name__ == '__main__': +if __name__ == "__main__": lister() diff --git a/swh/lister/core/abstractattribute.py b/swh/lister/core/abstractattribute.py index fdb4219..01eb84a 100644 --- a/swh/lister/core/abstractattribute.py +++ b/swh/lister/core/abstractattribute.py @@ -1,27 +1,28 @@ # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information class AbstractAttribute: """AbstractAttributes in a base class must be overridden by the subclass. It's like the :func:`abc.abstractmethod` decorator, but for things that are explicitly attributes/properties, not methods, without the need for empty method def boilerplate. Like abc.abstractmethod, the class containing AbstractAttributes must inherit from :class:`abc.ABC` or use the :class:`abc.ABCMeta` metaclass. Usage example:: import abc class ClassContainingAnAbstractAttribute(abc.ABC): foo: Union[AbstractAttribute, Any] = \ AbstractAttribute('docstring for foo') """ + __isabstractmethod__ = True def __init__(self, docstring=None): if docstring is not None: - self.__doc__ = 'AbstractAttribute: ' + docstring + self.__doc__ = "AbstractAttribute: " + docstring diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py index d13933d..f7c6aa4 100644 --- a/swh/lister/core/indexing_lister.py +++ b/swh/lister/core/indexing_lister.py @@ -1,271 +1,277 @@ # Copyright (C) 2015-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging from itertools import count import dateutil from sqlalchemy import func from .lister_transports import ListerHttpTransport from .lister_base import ListerBase from requests import Response from typing import Any, Dict, List, Tuple, Optional, Union from datetime import datetime logger = logging.getLogger(__name__) class IndexingLister(ListerBase): """Lister* intermediate class for any service that follows the pattern: - The service must report at least one stable unique identifier, known herein as the UID value, for every listed repository. - If the service splits the list of repositories into sublists, it must report at least one stable and sorted index identifier for every listed repository, known herein as the indexable value, which can be used as part of the service endpoint query to request a sublist beginning from that index. This might be the UID if the UID is monotonic. - Client sends a request to list repositories starting from a given index. - Client receives structured (json/xml/etc) response with information about a sequential series of repositories starting from that index and, if necessary/available, some indication of the URL or index for fetching the next series of repository data. See :class:`swh.lister.core.lister_base.ListerBase` for more details. This class cannot be instantiated. To create a new Lister for a source code listing service that follows the model described above, you must subclass this class and provide the required overrides in addition to any unmet implementation/override requirements of this class's base. (see parent class and member docstrings for details) Required Overrides:: def get_next_target_from_response """ + flush_packet_db = 20 """Number of iterations in-between write flushes of lister repositories to db (see fn:`run`). """ - default_min_bound = '' + default_min_bound = "" """Default initialization value for the minimum boundary index to use when undefined (see fn:`run`). """ @abc.abstractmethod def get_next_target_from_response( - self, response: Response + self, response: Response ) -> Union[Optional[datetime], Optional[str], Optional[int]]: """Find the next server endpoint identifier given the entire response. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. Args: response (transport response): response page from the server Returns: index of next page, possibly extracted from a next href url """ pass # You probably don't need to override anything below this line. def filter_before_inject( - self, models_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + self, models_list: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: """Overrides ListerBase.filter_before_inject Bounds query results by this Lister's set max_index. """ models_list = [ - m for m in models_list - if self.is_within_bounds(m['indexable'], None, self.max_index) + m + for m in models_list + if self.is_within_bounds(m["indexable"], None, self.max_index) ] return models_list def db_query_range(self, start, end): """Look in the db for a range of repositories with indexable values in the range [start, end] Args: start (model indexable type): start of desired indexable range end (model indexable type): end of desired indexable range Returns: a list of sqlalchemy.ext.declarative.declarative_base objects with indexable values within the given range """ retlist = self.db_session.query(self.MODEL) if start is not None: retlist = retlist.filter(self.MODEL.indexable >= start) if end is not None: retlist = retlist.filter(self.MODEL.indexable <= end) return retlist def db_partition_indices( - self, partition_size: int + self, partition_size: int ) -> List[Tuple[Optional[int], Optional[int]]]: """Describe an index-space compartmentalization of the db table in equal sized chunks. This is used to describe min&max bounds for parallelizing fetch tasks. Args: partition_size (int): desired size to make each partition Returns: a list of tuples (begin, end) of indexable value that declare approximately equal-sized ranges of existing repos """ n = max(self.db_num_entries(), 10) partition_size = min(partition_size, n) n_partitions = n // partition_size min_index = self.db_first_index() max_index = self.db_last_index() if min_index is None or max_index is None: # Nothing to list return [] if isinstance(min_index, str): + def format_bound(bound): return bound.isoformat() + min_index = dateutil.parser.parse(min_index) max_index = dateutil.parser.parse(max_index) elif isinstance(max_index - min_index, int): + def format_bound(bound): return int(bound) + else: + def format_bound(bound): return bound partition_width = (max_index - min_index) / n_partitions # Generate n_partitions + 1 bounds for n_partitions partitons bounds = [ format_bound(min_index + i * partition_width) for i in range(n_partitions + 1) ] # Trim duplicate bounds bounds.append(None) - bounds = [cur - for cur, next in zip(bounds[:-1], bounds[1:]) - if cur != next] + bounds = [cur for cur, next in zip(bounds[:-1], bounds[1:]) if cur != next] # Remove bounds for lowest and highest partition bounds[0] = bounds[-1] = None return list(zip(bounds[:-1], bounds[1:])) def db_first_index(self): """Look in the db for the smallest indexable value Returns: the smallest indexable value of all repos in the db """ t = self.db_session.query(func.min(self.MODEL.indexable)).first() if t: return t[0] return None def db_last_index(self): """Look in the db for the largest indexable value Returns: the largest indexable value of all repos in the db """ t = self.db_session.query(func.max(self.MODEL.indexable)).first() if t: return t[0] return None def disable_deleted_repo_tasks(self, start, end, keep_these): """Disable tasks for repos that no longer exist between start and end. Args: start: beginning of range to disable end: end of range to disable keep_these (uid list): do not disable repos with uids in this list """ if end is None: end = self.db_last_index() if not self.is_within_bounds(end, None, self.max_index): end = self.max_index deleted_repos = self.winnow_models( self.db_query_range(start, end), self.MODEL.uid, keep_these ) - tasks_to_disable = [repo.task_id for repo in deleted_repos - if repo.task_id is not None] + tasks_to_disable = [ + repo.task_id for repo in deleted_repos if repo.task_id is not None + ] if tasks_to_disable: self.scheduler.disable_tasks(tasks_to_disable) for repo in deleted_repos: repo.task_id = None def run(self, min_bound=None, max_bound=None): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class docstring, continually fetching sublists until either there is no next index reference given or the given next index is greater than the desired max_bound. Args: min_bound (indexable type): optional index to start from max_bound (indexable type): optional index to stop at Returns: nothing """ - status = 'uneventful' + status = "uneventful" self.min_index = min_bound self.max_index = max_bound def ingest_indexes(): index = min_bound or self.default_min_bound for i in count(1): response, injected_repos = self.ingest_data(index) if not response and not injected_repos: - logger.info('No response from api server, stopping') + logger.info("No response from api server, stopping") return next_index = self.get_next_target_from_response(response) # Determine if any repos were deleted, and disable their tasks. keep_these = list(injected_repos.keys()) self.disable_deleted_repo_tasks(index, next_index, keep_these) # termination condition if next_index is None or next_index == index: - logger.info('stopping after index %s, no next link found', - index) + logger.info("stopping after index %s, no next link found", index) return index = next_index - logger.debug('Index: %s', index) + logger.debug("Index: %s", index) yield i for i in ingest_indexes(): if (i % self.flush_packet_db) == 0: - logger.debug('Flushing updates at index %s', i) + logger.debug("Flushing updates at index %s", i) self.db_session.commit() self.db_session = self.mk_session() - status = 'eventful' + status = "eventful" self.db_session.commit() self.db_session = self.mk_session() - return {'status': status} + return {"status": status} class IndexingHttpLister(ListerHttpTransport, IndexingLister): """Convenience class for ensuring right lookup and init order when combining IndexingLister and ListerHttpTransport.""" def __init__(self, url=None, override_config=None): IndexingLister.__init__(self, override_config=override_config) ListerHttpTransport.__init__(self, url=url) diff --git a/swh/lister/core/lister_base.py b/swh/lister/core/lister_base.py index 5c02642..ff6e086 100644 --- a/swh/lister/core/lister_base.py +++ b/swh/lister/core/lister_base.py @@ -1,535 +1,521 @@ # Copyright (C) 2015-2020 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import datetime import gzip import json import logging import os import re import time from sqlalchemy import create_engine, func from sqlalchemy.orm import sessionmaker from typing import Any, Dict, List, Type, Union, Optional from swh.core import config from swh.core.utils import grouper from swh.scheduler import get_scheduler, utils from .abstractattribute import AbstractAttribute from requests import Response logger = logging.getLogger(__name__) def utcnow(): return datetime.datetime.now(tz=datetime.timezone.utc) class FetchError(RuntimeError): def __init__(self, response): self.response = response def __str__(self): return repr(self.response) class ListerBase(abc.ABC, config.SWHConfig): """Lister core base class. Generally a source code hosting service provides an API endpoint for listing the set of stored repositories. A Lister is the discovery service responsible for finding this list, all at once or sequentially by parts, and queueing local tasks to fetch and ingest the referenced repositories. The core method in this class is ingest_data. Any subclasses should be calling this method one or more times to fetch and ingest data from API endpoints. See swh.lister.core.lister_base.IndexingLister for example usage. This class cannot be instantiated. Any instantiable Lister descending from ListerBase must provide at least the required overrides. (see member docstrings for details): Required Overrides: MODEL def transport_request def transport_response_to_string def transport_response_simplified def transport_quota_check Optional Overrides: def filter_before_inject def is_within_bounds """ MODEL = AbstractAttribute( - 'Subclass type (not instance) of swh.lister.core.models.ModelBase ' - 'customized for a specific service.' + "Subclass type (not instance) of swh.lister.core.models.ModelBase " + "customized for a specific service." ) # type: Union[AbstractAttribute, Type[Any]] LISTER_NAME = AbstractAttribute( - "Lister's name") # type: Union[AbstractAttribute, str] + "Lister's name" + ) # type: Union[AbstractAttribute, str] def transport_request(self, identifier): """Given a target endpoint identifier to query, try once to request it. Implementation of this method determines the network request protocol. Args: identifier (string): unique identifier for an endpoint query. e.g. If the service indexes lists of repositories by date and time of creation, this might be that as a formatted string. Or it might be an integer UID. Or it might be nothing. It depends on what the service needs. Returns: the entire request response Raises: Will catch internal transport-dependent connection exceptions and raise swh.lister.core.lister_base.FetchError instead. Other non-connection exceptions should propagate unchanged. """ pass def transport_response_to_string(self, response): """Convert the server response into a formatted string for logging. Implementation of this method depends on the shape of the network response object returned by the transport_request method. Args: response: the server response Returns: a pretty string of the response """ pass def transport_response_simplified(self, response): """Convert the server response into list of a dict for each repo in the response, mapping columns in the lister's MODEL class to repo data. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. Args: response: response object from the server. Returns: list of repo MODEL dicts ( eg. [{'uid': r['id'], etc.} for r in response.json()] ) """ pass def transport_quota_check(self, response): """Check server response to see if we're hitting request rate limits. Implementation of this method depends on the server communication protocol and API spec and the shape of the network response object returned by the transport_request method. Args: response (session response): complete API query response Returns: 1) must retry request? True/False 2) seconds to delay if True """ pass - def filter_before_inject( - self, models_list: List[Dict]) -> List[Dict]: + def filter_before_inject(self, models_list: List[Dict]) -> List[Dict]: """Filter models_list entries prior to injection in the db. This is ran directly after `transport_response_simplified`. Default implementation is to have no filtering. Args: models_list: list of dicts returned by transport_response_simplified. Returns: models_list with entries changed according to custom logic. """ return models_list - def do_additional_checks( - self, models_list: List[Dict]) -> List[Dict]: + def do_additional_checks(self, models_list: List[Dict]) -> List[Dict]: """Execute some additional checks on the model list (after the filtering). Default implementation is to run no check at all and to return the input as is. Args: models_list: list of dicts returned by transport_response_simplified. Returns: models_list with entries if checks ok, False otherwise """ return models_list def is_within_bounds( - self, inner: int, - lower: Optional[int] = None, upper: Optional[int] = None) -> bool: + self, inner: int, lower: Optional[int] = None, upper: Optional[int] = None + ) -> bool: """See if a sortable value is inside the range [lower,upper]. MAY BE OVERRIDDEN, for example if the server indexable* key is technically sortable but not automatically so. * - ( see: swh.lister.core.indexing_lister.IndexingLister ) Args: inner (sortable type): the value being checked lower (sortable type): optional lower bound upper (sortable type): optional upper bound Returns: whether inner is confined by the optional lower and upper bounds """ try: if lower is None and upper is None: return True elif lower is None: ret = inner <= upper # type: ignore elif upper is None: ret = inner >= lower else: ret = lower <= inner <= upper self.string_pattern_check(inner, lower, upper) except Exception as e: - logger.error(str(e) + ': %s, %s, %s' % - (('inner=%s%s' % (type(inner), inner)), - ('lower=%s%s' % (type(lower), lower)), - ('upper=%s%s' % (type(upper), upper))) - ) + logger.error( + str(e) + + ": %s, %s, %s" + % ( + ("inner=%s%s" % (type(inner), inner)), + ("lower=%s%s" % (type(lower), lower)), + ("upper=%s%s" % (type(upper), upper)), + ) + ) raise return ret # You probably don't need to override anything below this line. DEFAULT_CONFIG = { - 'scheduler': ('dict', { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5008/' - }, - }), - 'lister': ('dict', { - 'cls': 'local', - 'args': { - 'db': 'postgresql:///lister', - }, - }), + "scheduler": ( + "dict", + {"cls": "remote", "args": {"url": "http://localhost:5008/"},}, + ), + "lister": ("dict", {"cls": "local", "args": {"db": "postgresql:///lister",},}), } @property def CONFIG_BASE_FILENAME(self): # noqa: N802 - return 'lister_%s' % self.LISTER_NAME + return "lister_%s" % self.LISTER_NAME @property def ADDITIONAL_CONFIG(self): # noqa: N802 return { - 'credentials': ('dict', {}), - 'cache_responses': ('bool', False), - 'cache_dir': ('str', '~/.cache/swh/lister/%s' % self.LISTER_NAME), + "credentials": ("dict", {}), + "cache_responses": ("bool", False), + "cache_dir": ("str", "~/.cache/swh/lister/%s" % self.LISTER_NAME), } INITIAL_BACKOFF = 10 MAX_RETRIES = 7 CONN_SLEEP = 10 def __init__(self, override_config=None): self.backoff = self.INITIAL_BACKOFF - logger.debug('Loading config from %s' % self.CONFIG_BASE_FILENAME) + logger.debug("Loading config from %s" % self.CONFIG_BASE_FILENAME) self.config = self.parse_config_file( base_filename=self.CONFIG_BASE_FILENAME, - additional_configs=[self.ADDITIONAL_CONFIG] + additional_configs=[self.ADDITIONAL_CONFIG], ) - self.config['cache_dir'] = os.path.expanduser(self.config['cache_dir']) - if self.config['cache_responses']: - config.prepare_folders(self.config, 'cache_dir') + self.config["cache_dir"] = os.path.expanduser(self.config["cache_dir"]) + if self.config["cache_responses"]: + config.prepare_folders(self.config, "cache_dir") if override_config: self.config.update(override_config) - logger.debug('%s CONFIG=%s' % (self, self.config)) - self.scheduler = get_scheduler(**self.config['scheduler']) - self.db_engine = create_engine(self.config['lister']['args']['db']) + logger.debug("%s CONFIG=%s" % (self, self.config)) + self.scheduler = get_scheduler(**self.config["scheduler"]) + self.db_engine = create_engine(self.config["lister"]["args"]["db"]) self.mk_session = sessionmaker(bind=self.db_engine) self.db_session = self.mk_session() def reset_backoff(self): """Reset exponential backoff timeout to initial level.""" self.backoff = self.INITIAL_BACKOFF def back_off(self) -> int: """Get next exponential backoff timeout.""" ret = self.backoff self.backoff *= 10 return ret def safely_issue_request(self, identifier: int) -> Optional[Response]: """Make network request with retries, rate quotas, and response logs. Protocol is handled by the implementation of the transport_request method. Args: identifier: resource identifier Returns: server response """ retries_left = self.MAX_RETRIES - do_cache = self.config['cache_responses'] + do_cache = self.config["cache_responses"] r = None while retries_left > 0: try: r = self.transport_request(identifier) except FetchError: # network-level connection error, try again logger.warning( - 'connection error on %s: sleep for %d seconds' % - (identifier, self.CONN_SLEEP)) + "connection error on %s: sleep for %d seconds" + % (identifier, self.CONN_SLEEP) + ) time.sleep(self.CONN_SLEEP) retries_left -= 1 continue if do_cache: self.save_response(r) # detect throttling must_retry, delay = self.transport_quota_check(r) if must_retry: logger.warning( - 'rate limited on %s: sleep for %f seconds' % - (identifier, delay)) + "rate limited on %s: sleep for %f seconds" % (identifier, delay) + ) time.sleep(delay) else: # request ok break retries_left -= 1 if not retries_left: - logger.warning( - 'giving up on %s: max retries exceeded' % identifier) + logger.warning("giving up on %s: max retries exceeded" % identifier) return r def db_query_equal(self, key: Any, value: Any): """Look in the db for a row with key == value Args: key: column key to look at value: value to look for in that column Returns: sqlalchemy.ext.declarative.declarative_base object with the given key == value """ if isinstance(key, str): key = self.MODEL.__dict__[key] - return self.db_session.query(self.MODEL) \ - .filter(key == value).first() + return self.db_session.query(self.MODEL).filter(key == value).first() def winnow_models(self, mlist, key, to_remove): """Given a list of models, remove any with matching some member of a list of values. Args: mlist (list of model rows): the initial list of models key (column): the column to filter on to_remove (list): if anything in mlist has column equal to one of the values in to_remove, it will be removed from the result Returns: A list of model rows starting from mlist minus any matching rows """ if isinstance(key, str): key = self.MODEL.__dict__[key] if to_remove: return mlist.filter(~key.in_(to_remove)).all() else: return mlist.all() def db_num_entries(self): """Return the known number of entries in the lister db""" - return self.db_session.query(func.count('*')).select_from(self.MODEL) \ - .scalar() + return self.db_session.query(func.count("*")).select_from(self.MODEL).scalar() def db_inject_repo(self, model_dict): """Add/update a new repo to the db and mark it last_seen now. Args: model_dict: dictionary mapping model keys to values Returns: new or updated sqlalchemy.ext.declarative.declarative_base object associated with the injection """ - sql_repo = self.db_query_equal('uid', model_dict['uid']) + sql_repo = self.db_query_equal("uid", model_dict["uid"]) if not sql_repo: sql_repo = self.MODEL(**model_dict) self.db_session.add(sql_repo) else: for k in model_dict: setattr(sql_repo, k, model_dict[k]) sql_repo.last_seen = utcnow() return sql_repo - def task_dict(self, origin_type: str, - origin_url: str, **kwargs) -> Dict[str, Any]: + def task_dict(self, origin_type: str, origin_url: str, **kwargs) -> Dict[str, Any]: """Return special dict format for the tasks list Args: origin_type (string) origin_url (string) Returns: the same information in a different form """ - logger.debug('origin-url: %s, type: %s', origin_url, origin_type) - _type = 'load-%s' % origin_type - _policy = kwargs.get('policy', 'recurring') - priority = kwargs.get('priority') - kw = {'priority': priority} if priority else {} + logger.debug("origin-url: %s, type: %s", origin_url, origin_type) + _type = "load-%s" % origin_type + _policy = kwargs.get("policy", "recurring") + priority = kwargs.get("priority") + kw = {"priority": priority} if priority else {} return utils.create_task_dict(_type, _policy, url=origin_url, **kw) def string_pattern_check(self, a, b, c=None): """When comparing indexable types in is_within_bounds, complex strings may not be allowed to differ in basic structure. If they do, it could be a sign of not understanding the data well. For instance, an ISO 8601 time string cannot be compared against its urlencoded equivalent, but this is an easy mistake to accidentally make. This method acts as a friendly sanity check. Args: a (string): inner component of the is_within_bounds method b (string): lower component of the is_within_bounds method c (string): upper component of the is_within_bounds method Returns: nothing Raises: TypeError if strings a, b, and c don't conform to the same basic pattern. """ if isinstance(a, str): - a_pattern = re.sub('[a-zA-Z0-9]', - '[a-zA-Z0-9]', - re.escape(a)) - if (isinstance(b, str) and (re.match(a_pattern, b) is None) - or isinstance(c, str) and - (re.match(a_pattern, c) is None)): + a_pattern = re.sub("[a-zA-Z0-9]", "[a-zA-Z0-9]", re.escape(a)) + if ( + isinstance(b, str) + and (re.match(a_pattern, b) is None) + or isinstance(c, str) + and (re.match(a_pattern, c) is None) + ): logger.debug(a_pattern) - raise TypeError('incomparable string patterns detected') + raise TypeError("incomparable string patterns detected") def inject_repo_data_into_db(self, models_list: List[Dict]) -> Dict: """Inject data into the db. Args: models_list: list of dicts mapping keys from the db model for each repo to be injected Returns: dict of uid:sql_repo pairs """ injected_repos = {} for m in models_list: - injected_repos[m['uid']] = self.db_inject_repo(m) + injected_repos[m["uid"]] = self.db_inject_repo(m) return injected_repos def schedule_missing_tasks( - self, models_list: List[Dict], injected_repos: Dict) -> None: + self, models_list: List[Dict], injected_repos: Dict + ) -> None: """Schedule any newly created db entries that do not have been scheduled yet. Args: models_list: List of dicts mapping keys in the db model for each repo injected_repos: Dict of uid:sql_repo pairs that have just been created Returns: Nothing. (Note that it Modifies injected_repos to set the new task_id). """ tasks = {} def _task_key(m): - return '%s-%s' % ( - m['type'], - json.dumps(m['arguments'], sort_keys=True) - ) + return "%s-%s" % (m["type"], json.dumps(m["arguments"], sort_keys=True)) for m in models_list: - ir = injected_repos[m['uid']] + ir = injected_repos[m["uid"]] if not ir.task_id: # Patching the model instance to add the policy/priority task # scheduling - if 'policy' in self.config: - m['policy'] = self.config['policy'] - if 'priority' in self.config: - m['priority'] = self.config['priority'] + if "policy" in self.config: + m["policy"] = self.config["policy"] + if "priority" in self.config: + m["priority"] = self.config["priority"] task_dict = self.task_dict(**m) tasks[_task_key(task_dict)] = (ir, m, task_dict) gen_tasks = (task_dicts for (_, _, task_dicts) in tasks.values()) for grouped_tasks in grouper(gen_tasks, n=1000): new_tasks = self.scheduler.create_tasks(list(grouped_tasks)) for task in new_tasks: ir, m, _ = tasks[_task_key(task)] - ir.task_id = task['id'] + ir.task_id = task["id"] def ingest_data(self, identifier: int, checks: bool = False): """The core data fetch sequence. Request server endpoint. Simplify and filter response list of repositories. Inject repo information into local db. Queue loader tasks for linked repositories. Args: identifier: Resource identifier. checks (bool): Additional checks required """ # Request (partial?) list of repositories info response = self.safely_issue_request(identifier) if not response: return response, [] models_list = self.transport_response_simplified(response) models_list = self.filter_before_inject(models_list) if checks: models_list = self.do_additional_checks(models_list) if not models_list: return response, [] # inject into local db injected = self.inject_repo_data_into_db(models_list) # queue workers self.schedule_missing_tasks(models_list, injected) return response, injected def save_response(self, response): """Log the response from a server request to a cache dir. Args: response: full server response cache_dir: system path for cache dir Returns: nothing """ datepath = utcnow().isoformat() - fname = os.path.join( - self.config['cache_dir'], - datepath + '.gz', - ) + fname = os.path.join(self.config["cache_dir"], datepath + ".gz",) - with gzip.open(fname, 'w') as f: - f.write(bytes( - self.transport_response_to_string(response), - 'UTF-8' - )) + with gzip.open(fname, "w") as f: + f.write(bytes(self.transport_response_to_string(response), "UTF-8")) diff --git a/swh/lister/core/lister_transports.py b/swh/lister/core/lister_transports.py index f4d6920..a1027ac 100644 --- a/swh/lister/core/lister_transports.py +++ b/swh/lister/core/lister_transports.py @@ -1,235 +1,235 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import random from datetime import datetime from email.utils import parsedate from pprint import pformat import logging import requests import xmltodict from typing import Optional, Union, Dict, Any, List from requests import Response from swh.lister import USER_AGENT_TEMPLATE, __version__ from .abstractattribute import AbstractAttribute from .lister_base import FetchError logger = logging.getLogger(__name__) class ListerHttpTransport(abc.ABC): """Use the Requests library for making Lister endpoint requests. To be used in conjunction with ListerBase or a subclass of it. """ + DEFAULT_URL = None # type: Optional[str] - PATH_TEMPLATE = \ - AbstractAttribute( - 'string containing a python string format pattern that produces' - ' the API endpoint path for listing stored repositories when given' - ' an index, e.g., "/repositories?after=%s". To be implemented in' - ' the API-specific class inheriting this.' - ) # type: Union[AbstractAttribute, Optional[str]] + PATH_TEMPLATE = AbstractAttribute( + "string containing a python string format pattern that produces" + " the API endpoint path for listing stored repositories when given" + ' an index, e.g., "/repositories?after=%s". To be implemented in' + " the API-specific class inheriting this." + ) # type: Union[AbstractAttribute, Optional[str]] EXPECTED_STATUS_CODES = (200, 429, 403, 404) def request_headers(self) -> Dict[str, Any]: """Returns dictionary of any request headers needed by the server. MAY BE OVERRIDDEN if request headers are needed. """ - return { - 'User-Agent': USER_AGENT_TEMPLATE % self.lister_version - } + return {"User-Agent": USER_AGENT_TEMPLATE % self.lister_version} def request_instance_credentials(self) -> List[Dict[str, Any]]: """Returns dictionary of any credentials configuration needed by the forge instance to list. The 'credentials' configuration is expected to be a dict of multiple levels. The first level is the lister's name, the second is the lister's instance name, which value is expected to be a list of credential structures (typically a couple username/password). For example:: credentials: github: # github lister github: # has only one instance (so far) - username: some password: somekey - username: one password: onekey - ... gitlab: # gitlab lister riseup: # has many instances - username: someone password: ... - ... gitlab: - username: someone password: ... - ... Returns: list of credential dicts for the current lister. """ - all_creds = self.config.get('credentials') # type: ignore + all_creds = self.config.get("credentials") # type: ignore if not all_creds: return [] lister_creds = all_creds.get(self.LISTER_NAME, {}) # type: ignore creds = lister_creds.get(self.instance, []) # type: ignore return creds def request_uri(self, identifier: str) -> str: """Get the full request URI given the transport_request identifier. MAY BE OVERRIDDEN if something more complex than the PATH_TEMPLATE is required. """ path = self.PATH_TEMPLATE % identifier # type: ignore return self.url + path def request_params(self, identifier: str) -> Dict[str, Any]: """Get the full parameters passed to requests given the transport_request identifier. This uses credentials if any are provided (see request_instance_credentials). MAY BE OVERRIDDEN if something more complex than the request headers is needed. """ params = {} - params['headers'] = self.request_headers() or {} + params["headers"] = self.request_headers() or {} creds = self.request_instance_credentials() if not creds: return params auth = random.choice(creds) if creds else None if auth: - params['auth'] = (auth['username'], # type: ignore - auth['password']) + params["auth"] = ( + auth["username"], # type: ignore + auth["password"], + ) return params def transport_quota_check(self, response): """Implements ListerBase.transport_quota_check with standard 429 code check for HTTP with Requests library. MAY BE OVERRIDDEN if the server notifies about rate limits in a non-standard way that doesn't use HTTP 429 and the Retry-After response header. ( https://tools.ietf.org/html/rfc6585#section-4 ) """ if response.status_code == 429: # HTTP too many requests - retry_after = response.headers.get('Retry-After', self.back_off()) + retry_after = response.headers.get("Retry-After", self.back_off()) try: # might be seconds return True, float(retry_after) except Exception: # might be http-date at_date = datetime(*parsedate(retry_after)[:6]) from_now = (at_date - datetime.today()).total_seconds() + 5 return True, max(0, from_now) else: # response ok self.reset_backoff() return False, 0 def __init__(self, url=None): if not url: - url = self.config.get('url') + url = self.config.get("url") if not url: url = self.DEFAULT_URL if not url: - raise NameError('HTTP Lister Transport requires an url.') + raise NameError("HTTP Lister Transport requires an url.") self.url = url # eg. 'https://api.github.com' self.session = requests.Session() self.lister_version = __version__ - def _transport_action( - self, identifier: str, method: str = 'get') -> Response: + def _transport_action(self, identifier: str, method: str = "get") -> Response: """Permit to ask information to the api prior to actually executing query. """ path = self.request_uri(identifier) params = self.request_params(identifier) - logger.debug('path: %s', path) - logger.debug('params: %s', params) - logger.debug('method: %s', method) + logger.debug("path: %s", path) + logger.debug("params: %s", params) + logger.debug("method: %s", method) try: - if method == 'head': + if method == "head": response = self.session.head(path, **params) else: response = self.session.get(path, **params) except requests.exceptions.ConnectionError as e: - logger.warning('Failed to fetch %s: %s', path, e) + logger.warning("Failed to fetch %s: %s", path, e) raise FetchError(e) else: if response.status_code not in self.EXPECTED_STATUS_CODES: raise FetchError(response) return response def transport_head(self, identifier: str) -> Response: """Retrieve head information on api. """ - return self._transport_action(identifier, method='head') + return self._transport_action(identifier, method="head") def transport_request(self, identifier: str) -> Response: """Implements ListerBase.transport_request for HTTP using Requests. Retrieve get information on api. """ return self._transport_action(identifier) def transport_response_to_string(self, response: Response) -> str: """Implements ListerBase.transport_response_to_string for HTTP given Requests responses. """ s = pformat(response.request.path_url) - s += '\n#\n' + pformat(response.request.headers) - s += '\n#\n' + pformat(response.status_code) - s += '\n#\n' + pformat(response.headers) - s += '\n#\n' + s += "\n#\n" + pformat(response.request.headers) + s += "\n#\n" + pformat(response.status_code) + s += "\n#\n" + pformat(response.headers) + s += "\n#\n" try: # json? s += pformat(response.json()) except Exception: # not json try: # xml? s += pformat(xmltodict.parse(response.text)) except Exception: # not xml s += pformat(response.text) return s class ListerOnePageApiTransport(ListerHttpTransport): """Leverage requests library to retrieve basic html page and parse result. To be used in conjunction with ListerBase or a subclass of it. """ + PAGE = AbstractAttribute( - "URL of the API's unique page to retrieve and parse " - "for information") # type: Union[AbstractAttribute, str] + "URL of the API's unique page to retrieve and parse " "for information" + ) # type: Union[AbstractAttribute, str] PATH_TEMPLATE = None # we do not use it def __init__(self, url=None): self.session = requests.Session() self.lister_version = __version__ def request_uri(self, _): """Get the full request URI given the transport_request identifier. """ return self.PAGE diff --git a/swh/lister/core/models.py b/swh/lister/core/models.py index 27eb080..7e87d78 100644 --- a/swh/lister/core/models.py +++ b/swh/lister/core/models.py @@ -1,79 +1,78 @@ # Copyright (C) 2015-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc from datetime import datetime import logging from sqlalchemy import Column, DateTime, Integer, String from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base from typing import Type, Union from .abstractattribute import AbstractAttribute SQLBase = declarative_base() logger = logging.getLogger(__name__) class ABCSQLMeta(abc.ABCMeta, DeclarativeMeta): pass class ModelBase(SQLBase, metaclass=ABCSQLMeta): """a common repository""" + __abstract__ = True - __tablename__ = \ - AbstractAttribute # type: Union[Type[AbstractAttribute], str] + __tablename__ = AbstractAttribute # type: Union[Type[AbstractAttribute], str] uid = AbstractAttribute( - 'Column(, primary_key=True)' + "Column(, primary_key=True)" ) # type: Union[AbstractAttribute, Column] name = Column(String, index=True) full_name = Column(String, index=True) html_url = Column(String) origin_url = Column(String) origin_type = Column(String) last_seen = Column(DateTime, nullable=False) task_id = Column(Integer) def __init__(self, **kw): - kw['last_seen'] = datetime.now() + kw["last_seen"] = datetime.now() super().__init__(**kw) class IndexingModelBase(ModelBase, metaclass=ABCSQLMeta): __abstract__ = True - __tablename__ = \ - AbstractAttribute # type: Union[Type[AbstractAttribute], str] + __tablename__ = AbstractAttribute # type: Union[Type[AbstractAttribute], str] # The value used for sorting, segmenting, or api query paging, # because uids aren't always sequential. indexable = AbstractAttribute( - 'Column(, index=True)' + "Column(, index=True)" ) # type: Union[AbstractAttribute, Column] def initialize(db_engine, drop_tables=False, **kwargs): """Default database initialization function for a lister. Typically called from the lister's initialization hook. Args: models (list): list of SQLAlchemy tables/models to drop/create. db_engine (): the SQLAlchemy DB engine. drop_tables (bool): if True, tables will be dropped before (re)creating them. """ if drop_tables: - logger.info('Dropping tables') + logger.info("Dropping tables") SQLBase.metadata.drop_all(db_engine, checkfirst=True) - logger.info('Creating tables') + logger.info("Creating tables") SQLBase.metadata.create_all(db_engine, checkfirst=True) diff --git a/swh/lister/core/page_by_page_lister.py b/swh/lister/core/page_by_page_lister.py index 8bcce45..1f38a2a 100644 --- a/swh/lister/core/page_by_page_lister.py +++ b/swh/lister/core/page_by_page_lister.py @@ -1,164 +1,164 @@ # Copyright (C) 2015-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging from .lister_transports import ListerHttpTransport from .lister_base import ListerBase class PageByPageLister(ListerBase): """Lister* intermediate class for any service that follows the simple pagination page pattern. - Client sends a request to list repositories starting from a given page identifier. - Client receives structured (json/xml/etc) response with information about a sequential series of repositories (per page) starting from a given index. And, if available, some indication of the next page index for fetching the remaining repository data. See :class:`swh.lister.core.lister_base.ListerBase` for more details. This class cannot be instantiated. To create a new Lister for a source code listing service that follows the model described above, you must subclass this class. Then provide the required overrides in addition to any unmet implementation/override requirements of this class's base (see parent class and member docstrings for details). Required Overrides:: def get_next_target_from_response """ + @abc.abstractmethod def get_next_target_from_response(self, response): """Find the next server endpoint page given the entire response. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. For example, some api can use the headers links to provide the next page. Args: response (transport response): response page from the server Returns: index of next page, possibly extracted from a next href url """ pass @abc.abstractmethod def get_pages_information(self): """Find the total number of pages. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. For example, some api can use dedicated headers: - x-total-pages to provide the total number of pages - x-total to provide the total number of repositories - x-per-page to provide the number of elements per page Returns: tuple (total number of repositories, total number of pages, per_page) """ pass # You probably don't need to override anything below this line. def do_additional_checks(self, models_list): """Potentially check for existence of repositories in models_list. This will be called only if check_existence is flipped on in the run method below. """ for m in models_list: - sql_repo = self.db_query_equal('uid', m['uid']) + sql_repo = self.db_query_equal("uid", m["uid"]) if sql_repo: return False return models_list def run(self, min_bound=None, max_bound=None, check_existence=False): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class docstring. Continually fetching sublists until either there is no next page reference given or the given next page is greater than the desired max_page. Args: min_bound: optional page to start from max_bound: optional page to stop at check_existence (bool): optional existence check (for incremental lister whose sort order is inverted) Returns: nothing """ - status = 'uneventful' + status = "uneventful" page = min_bound or 0 loop_count = 0 self.min_page = min_bound self.max_page = max_bound while self.is_within_bounds(page, self.min_page, self.max_page): - logging.info('listing repos starting at %s' % page) + logging.info("listing repos starting at %s" % page) - response, injected_repos = self.ingest_data(page, - checks=check_existence) + response, injected_repos = self.ingest_data(page, checks=check_existence) if not response and not injected_repos: - logging.info('No response from api server, stopping') + logging.info("No response from api server, stopping") break elif not injected_repos: - logging.info('Repositories already seen, stopping') + logging.info("Repositories already seen, stopping") break - status = 'eventful' + status = "eventful" next_page = self.get_next_target_from_response(response) # termination condition if (next_page is None) or (next_page == page): - logging.info('stopping after page %s, no next link found' % - page) + logging.info("stopping after page %s, no next link found" % page) break else: page = next_page loop_count += 1 if loop_count == 20: - logging.info('flushing updates') + logging.info("flushing updates") loop_count = 0 self.db_session.commit() self.db_session = self.mk_session() self.db_session.commit() self.db_session = self.mk_session() - return {'status': status} + return {"status": status} class PageByPageHttpLister(ListerHttpTransport, PageByPageLister): """Convenience class for ensuring right lookup and init order when combining PageByPageLister and ListerHttpTransport. """ + def __init__(self, url=None, override_config=None): PageByPageLister.__init__(self, override_config=override_config) ListerHttpTransport.__init__(self, url=url) diff --git a/swh/lister/core/simple_lister.py b/swh/lister/core/simple_lister.py index fa09ed8..9612986 100644 --- a/swh/lister/core/simple_lister.py +++ b/swh/lister/core/simple_lister.py @@ -1,96 +1,97 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Any, List from swh.core import utils from .lister_base import ListerBase logger = logging.getLogger(__name__) class SimpleLister(ListerBase): """Lister* intermediate class for any service that follows the simple, 'list in oneshot information' pattern. - Client sends a request to list repositories in oneshot - Client receives structured (json/xml/etc) response with information and stores those in db """ + flush_packet_db = 2 """Number of iterations in-between write flushes of lister repositories to db (see fn:`ingest_data`). """ def list_packages(self, response: Any) -> List[Any]: """Listing packages method. """ pass def ingest_data(self, identifier, checks=False): """Rework the base ingest_data. Request server endpoint which gives all in one go. Simplify and filter response list of repositories. Inject repo information into local db. Queue loader tasks for linked repositories. Args: identifier: Resource identifier (unused) checks (bool): Additional checks required (unused) """ response = self.safely_issue_request(identifier) response = self.list_packages(response) if not response: return response, [] models_list = self.transport_response_simplified(response) models_list = self.filter_before_inject(models_list) all_injected = [] for i, models in enumerate(utils.grouper(models_list, n=100), start=1): models = list(models) - logging.debug('models: %s' % len(models)) + logging.debug("models: %s" % len(models)) # inject into local db injected = self.inject_repo_data_into_db(models) # queue workers self.schedule_missing_tasks(models, injected) all_injected.append(injected) if (i % self.flush_packet_db) == 0: - logger.debug('Flushing updates at index %s', i) + logger.debug("Flushing updates at index %s", i) self.db_session.commit() self.db_session = self.mk_session() return response, all_injected def transport_response_simplified(self, response): """Transform response to list for model manipulation """ return [self.get_model_from_repo(repo_name) for repo_name in response] def run(self): """Query the server which answers in one query. Stores the information, dropping actual redundant information we already have. Returns: nothing """ dump_not_used_identifier = 0 response, injected_repos = self.ingest_data(dump_not_used_identifier) if not response and not injected_repos: - logging.info('No response from api server, stopping') - status = 'uneventful' + logging.info("No response from api server, stopping") + status = "uneventful" else: - status = 'eventful' + status = "eventful" - return {'status': status} + return {"status": status} diff --git a/swh/lister/core/tests/conftest.py b/swh/lister/core/tests/conftest.py index b7093d8..9d3491b 100644 --- a/swh/lister/core/tests/conftest.py +++ b/swh/lister/core/tests/conftest.py @@ -1,47 +1,50 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.tests.conftest import * # noqa import logging import pytest from sqlalchemy import create_engine from swh.lister import get_lister, SUPPORTED_LISTERS from swh.lister.core.models import initialize logger = logging.getLogger(__name__) @pytest.fixture def swh_listers(request, postgresql_proc, postgresql, swh_scheduler): - db_url = 'postgresql://{user}@{host}:{port}/{dbname}'.format( - host=postgresql_proc.host, - port=postgresql_proc.port, - user='postgres', - dbname='tests') + db_url = "postgresql://{user}@{host}:{port}/{dbname}".format( + host=postgresql_proc.host, + port=postgresql_proc.port, + user="postgres", + dbname="tests", + ) - logger.debug('lister db_url: %s', db_url) + logger.debug("lister db_url: %s", db_url) listers = {} # Prepare schema for all listers for lister_name in SUPPORTED_LISTERS: lister = get_lister(lister_name, db_url=db_url) lister.scheduler = swh_scheduler # inject scheduler fixture listers[lister_name] = lister initialize(create_engine(db_url), drop_tables=True) # Add the load-archive-files expected by some listers (gnu, cran, ...) - swh_scheduler.create_task_type({ - 'type': 'load-archive-files', - 'description': 'Load archive files.', - 'backend_name': 'swh.loader.package.tasks.LoadArchive', - 'default_interval': '1 day', - }) + swh_scheduler.create_task_type( + { + "type": "load-archive-files", + "description": "Load archive files.", + "backend_name": "swh.loader.package.tasks.LoadArchive", + "default_interval": "1 day", + } + ) return listers diff --git a/swh/lister/core/tests/test_abstractattribute.py b/swh/lister/core/tests/test_abstractattribute.py index 8190d01..113ee0a 100644 --- a/swh/lister/core/tests/test_abstractattribute.py +++ b/swh/lister/core/tests/test_abstractattribute.py @@ -1,66 +1,65 @@ # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import unittest from typing import Any from swh.lister.core.abstractattribute import AbstractAttribute class BaseClass(abc.ABC): v1 = AbstractAttribute # type: Any v2 = AbstractAttribute() # type: Any - v3 = AbstractAttribute('changed docstring') # type: Any - v4 = 'qux' + v3 = AbstractAttribute("changed docstring") # type: Any + v4 = "qux" class BadSubclass1(BaseClass): pass class BadSubclass2(BaseClass): - v1 = 'foo' - v2 = 'bar' + v1 = "foo" + v2 = "bar" class BadSubclass3(BaseClass): - v2 = 'bar' - v3 = 'baz' + v2 = "bar" + v3 = "baz" class GoodSubclass(BaseClass): - v1 = 'foo' - v2 = 'bar' - v3 = 'baz' + v1 = "foo" + v2 = "bar" + v3 = "baz" class TestAbstractAttributes(unittest.TestCase): def test_aa(self): with self.assertRaises(TypeError): BaseClass() with self.assertRaises(TypeError): BadSubclass1() with self.assertRaises(TypeError): BadSubclass2() with self.assertRaises(TypeError): BadSubclass3() self.assertIsInstance(GoodSubclass(), GoodSubclass) gsc = GoodSubclass() - self.assertEqual(gsc.v1, 'foo') - self.assertEqual(gsc.v2, 'bar') - self.assertEqual(gsc.v3, 'baz') - self.assertEqual(gsc.v4, 'qux') + self.assertEqual(gsc.v1, "foo") + self.assertEqual(gsc.v2, "bar") + self.assertEqual(gsc.v3, "baz") + self.assertEqual(gsc.v4, "qux") def test_aa_docstrings(self): self.assertEqual(BaseClass.v1.__doc__, AbstractAttribute.__doc__) self.assertEqual(BaseClass.v2.__doc__, AbstractAttribute.__doc__) - self.assertEqual(BaseClass.v3.__doc__, - 'AbstractAttribute: changed docstring') + self.assertEqual(BaseClass.v3.__doc__, "AbstractAttribute: changed docstring") diff --git a/swh/lister/core/tests/test_indexing_lister.py b/swh/lister/core/tests/test_indexing_lister.py index 7e20bf1..3d29ab7 100644 --- a/swh/lister/core/tests/test_indexing_lister.py +++ b/swh/lister/core/tests/test_indexing_lister.py @@ -1,135 +1,125 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from swh.lister.core.indexing_lister import IndexingLister class MockedIndexingListerDbPartitionIndices(IndexingLister): # Abstract Attribute boilerplate - LISTER_NAME = 'DbPartitionIndices' + LISTER_NAME = "DbPartitionIndices" MODEL = type(None) # ABC boilerplate def get_next_target_from_response(self, *args, **kwargs): pass def __init__(self, num_entries, first_index, last_index): self.num_entries = num_entries self.first_index = first_index self.last_index = last_index def db_num_entries(self): return self.num_entries def db_first_index(self): return self.first_index def db_last_index(self): return self.last_index def test_db_partition_indices(): m = MockedIndexingListerDbPartitionIndices( - num_entries=1000, - first_index=1, - last_index=10001, + num_entries=1000, first_index=1, last_index=10001, ) assert m partitions = m.db_partition_indices(100) # 1000 entries with indices 1 - 10001, partitions of 100 entries assert len(partitions) == 10 assert partitions[0] == (None, 1001) assert partitions[-1] == (9001, None) def test_db_partition_indices_zero_first(): m = MockedIndexingListerDbPartitionIndices( - num_entries=1000, - first_index=0, - last_index=10000, + num_entries=1000, first_index=0, last_index=10000, ) assert m partitions = m.db_partition_indices(100) # 1000 entries with indices 0 - 10000, partitions of 100 entries assert len(partitions) == 10 assert partitions[0] == (None, 1000) assert partitions[-1] == (9000, None) def test_db_partition_indices_small_index_range(): m = MockedIndexingListerDbPartitionIndices( - num_entries=5000, - first_index=0, - last_index=5, + num_entries=5000, first_index=0, last_index=5, ) assert m partitions = m.db_partition_indices(100) assert partitions == [(None, 1), (1, 2), (2, 3), (3, 4), (4, None)] def test_db_partition_indices_date_indices(): # 24 hour delta - first = datetime.datetime.fromisoformat('2019-11-01T00:00:00+00:00') - last = datetime.datetime.fromisoformat('2019-11-02T00:00:00+00:00') + first = datetime.datetime.fromisoformat("2019-11-01T00:00:00+00:00") + last = datetime.datetime.fromisoformat("2019-11-02T00:00:00+00:00") m = MockedIndexingListerDbPartitionIndices( # one entry per second num_entries=24 * 3600, first_index=first, last_index=last, ) assert m # 3600 entries per partition => 1 partition per hour partitions = m.db_partition_indices(3600) assert len(partitions) == 24 expected_bounds = [first + datetime.timedelta(hours=i) for i in range(25)] expected_bounds[0] = expected_bounds[-1] = None assert partitions == list(zip(expected_bounds[:-1], expected_bounds[1:])) def test_db_partition_indices_float_index_range(): m = MockedIndexingListerDbPartitionIndices( - num_entries=10000, - first_index=0.0, - last_index=1.0, + num_entries=10000, first_index=0.0, last_index=1.0, ) assert m partitions = m.db_partition_indices(1000) assert len(partitions) == 10 expected_bounds = [0.1 * i for i in range(11)] expected_bounds[0] = expected_bounds[-1] = None assert partitions == list(zip(expected_bounds[:-1], expected_bounds[1:])) def test_db_partition_indices_uneven_int_index_range(): m = MockedIndexingListerDbPartitionIndices( - num_entries=5641, - first_index=0, - last_index=10000, + num_entries=5641, first_index=0, last_index=10000, ) assert m partitions = m.db_partition_indices(500) assert len(partitions) == 5641 // 500 for i, (start, end) in enumerate(partitions): assert isinstance(start, int) or (i == 0 and start is None) assert isinstance(end, int) or (i == 10 and end is None) diff --git a/swh/lister/core/tests/test_lister.py b/swh/lister/core/tests/test_lister.py index 908fd9c..a835b1e 100644 --- a/swh/lister/core/tests/test_lister.py +++ b/swh/lister/core/tests/test_lister.py @@ -1,444 +1,453 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import datetime import time from unittest import TestCase from unittest.mock import Mock, patch import requests_mock from sqlalchemy import create_engine from typing import Any, Callable, Optional, Pattern, Type, Union import swh.lister from swh.lister.core.abstractattribute import AbstractAttribute from swh.lister.tests.test_utils import init_db def noop(*args, **kwargs): pass def test_version_generation(): - assert swh.lister.__version__ != 'devel', \ - "Make sure swh.lister is installed (e.g. pip install -e .)" + assert ( + swh.lister.__version__ != "devel" + ), "Make sure swh.lister is installed (e.g. pip install -e .)" class HttpListerTesterBase(abc.ABC): """Testing base class for listers. This contains methods for both :class:`HttpSimpleListerTester` and :class:`HttpListerTester`. See :class:`swh.lister.gitlab.tests.test_lister` for an example of how to customize for a specific listing service. """ + Lister = AbstractAttribute( - 'Lister class to test') # type: Union[AbstractAttribute, Type[Any]] + "Lister class to test" + ) # type: Union[AbstractAttribute, Type[Any]] lister_subdir = AbstractAttribute( - 'bitbucket, github, etc.') # type: Union[AbstractAttribute, str] + "bitbucket, github, etc." + ) # type: Union[AbstractAttribute, str] good_api_response_file = AbstractAttribute( - 'Example good response body') # type: Union[AbstractAttribute, str] - LISTER_NAME = 'fake-lister' + "Example good response body" + ) # type: Union[AbstractAttribute, str] + LISTER_NAME = "fake-lister" # May need to override this if the headers are used for something def response_headers(self, request): return {} # May need to override this if the server uses non-standard rate limiting # method. # Please keep the requested retry delay reasonably low. def mock_rate_quota(self, n, request, context): self.rate_limit += 1 context.status_code = 429 - context.headers['Retry-After'] = '1' + context.headers["Retry-After"] = "1" return '{"error":"dummy"}' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.rate_limit = 1 self.response = None self.fl = None self.helper = None self.scheduler_tasks = [] if self.__class__ != HttpListerTesterBase: self.run = TestCase.run.__get__(self, self.__class__) else: self.run = noop def mock_limit_n_response(self, n, request, context): self.fl.reset_backoff() if self.rate_limit <= n: return self.mock_rate_quota(n, request, context) else: return self.mock_response(request, context) def mock_limit_twice_response(self, request, context): return self.mock_limit_n_response(2, request, context) def get_api_response(self, identifier): fl = self.get_fl() if self.response is None: self.response = fl.safely_issue_request(identifier) return self.response def get_fl(self, override_config=None): """Retrieve an instance of fake lister (fl). """ if override_config or self.fl is None: - self.fl = self.Lister(url='https://fakeurl', - override_config=override_config) + self.fl = self.Lister( + url="https://fakeurl", override_config=override_config + ) self.fl.INITIAL_BACKOFF = 1 self.fl.reset_backoff() self.scheduler_tasks = [] return self.fl def disable_scheduler(self, fl): fl.schedule_missing_tasks = Mock(return_value=None) def mock_scheduler(self, fl): def _create_tasks(tasks): task_id = 0 current_nb_tasks = len(self.scheduler_tasks) if current_nb_tasks > 0: - task_id = self.scheduler_tasks[-1]['id'] + 1 + task_id = self.scheduler_tasks[-1]["id"] + 1 for task in tasks: scheduler_task = dict(task) - scheduler_task.update({ - 'status': 'next_run_not_scheduled', - 'retries_left': 0, - 'priority': None, - 'id': task_id, - 'current_interval': datetime.timedelta(days=64) - }) + scheduler_task.update( + { + "status": "next_run_not_scheduled", + "retries_left": 0, + "priority": None, + "id": task_id, + "current_interval": datetime.timedelta(days=64), + } + ) self.scheduler_tasks.append(scheduler_task) task_id = task_id + 1 return self.scheduler_tasks[current_nb_tasks:] def _disable_tasks(task_ids): for task_id in task_ids: - self.scheduler_tasks[task_id]['status'] = 'disabled' + self.scheduler_tasks[task_id]["status"] = "disabled" fl.scheduler.create_tasks = Mock(wraps=_create_tasks) fl.scheduler.disable_tasks = Mock(wraps=_disable_tasks) def disable_db(self, fl): fl.winnow_models = Mock(return_value=[]) fl.db_inject_repo = Mock(return_value=fl.MODEL()) fl.disable_deleted_repo_tasks = Mock(return_value=None) def init_db(self, db, model): engine = create_engine(db.url()) model.metadata.create_all(engine) @requests_mock.Mocker() def test_is_within_bounds(self, http_mocker): fl = self.get_fl() self.assertFalse(fl.is_within_bounds(1, 2, 3)) self.assertTrue(fl.is_within_bounds(2, 1, 3)) self.assertTrue(fl.is_within_bounds(1, 1, 1)) self.assertTrue(fl.is_within_bounds(1, None, None)) self.assertTrue(fl.is_within_bounds(1, None, 2)) self.assertTrue(fl.is_within_bounds(1, 0, None)) self.assertTrue(fl.is_within_bounds("b", "a", "c")) self.assertFalse(fl.is_within_bounds("a", "b", "c")) self.assertTrue(fl.is_within_bounds("a", None, "c")) self.assertTrue(fl.is_within_bounds("a", None, None)) self.assertTrue(fl.is_within_bounds("b", "a", None)) self.assertFalse(fl.is_within_bounds("a", "b", None)) self.assertTrue(fl.is_within_bounds("aa:02", "aa:01", "aa:03")) self.assertFalse(fl.is_within_bounds("aa:12", None, "aa:03")) with self.assertRaises(TypeError): fl.is_within_bounds(1.0, "b", None) with self.assertRaises(TypeError): fl.is_within_bounds("A:B", "A::B", None) class HttpListerTester(HttpListerTesterBase, abc.ABC): """Base testing class for subclass of :class:`swh.lister.core.indexing_lister.IndexingHttpLister` See :class:`swh.lister.github.tests.test_gh_lister` for an example of how to customize for a specific listing service. """ + last_index = AbstractAttribute( - 'Last index ' - 'in good_api_response') # type: Union[AbstractAttribute, int] + "Last index " "in good_api_response" + ) # type: Union[AbstractAttribute, int] first_index = AbstractAttribute( - 'First index in ' - ' good_api_response') # type: Union[AbstractAttribute, Optional[int]] + "First index in " " good_api_response" + ) # type: Union[AbstractAttribute, Optional[int]] bad_api_response_file = AbstractAttribute( - 'Example bad response body') # type: Union[AbstractAttribute, str] + "Example bad response body" + ) # type: Union[AbstractAttribute, str] entries_per_page = AbstractAttribute( - 'Number of results in ' - 'good response') # type: Union[AbstractAttribute, int] + "Number of results in " "good response" + ) # type: Union[AbstractAttribute, int] test_re = AbstractAttribute( - 'Compiled regex matching the server url. Must capture the ' - 'index value.') # type: Union[AbstractAttribute, Pattern] + "Compiled regex matching the server url. Must capture the " "index value." + ) # type: Union[AbstractAttribute, Pattern] convert_type = str # type: Callable[..., Any] """static method used to convert the "request_index" to its right type (for indexing listers for example, this is in accordance with the model's "indexable" column). """ + def mock_response(self, request, context): self.fl.reset_backoff() self.rate_limit = 1 context.status_code = 200 custom_headers = self.response_headers(request) context.headers.update(custom_headers) req_index = self.request_index(request) if req_index == self.first_index: response_file = self.good_api_response_file else: response_file = self.bad_api_response_file - with open('swh/lister/%s/tests/%s' % (self.lister_subdir, - response_file), - 'r', encoding='utf-8') as r: + with open( + "swh/lister/%s/tests/%s" % (self.lister_subdir, response_file), + "r", + encoding="utf-8", + ) as r: return r.read() def request_index(self, request): m = self.test_re.search(request.path_url) if m and (len(m.groups()) > 0): return self.convert_type(m.group(1)) def create_fl_with_db(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) db = init_db() - fl = self.get_fl(override_config={ - 'lister': { - 'cls': 'local', - 'args': {'db': db.url()} - } - }) + fl = self.get_fl( + override_config={"lister": {"cls": "local", "args": {"db": db.url()}}} + ) fl.db = db self.init_db(db, fl.MODEL) self.mock_scheduler(fl) return fl @requests_mock.Mocker() def test_fetch_no_bounds_yesdb(self, http_mocker): fl = self.create_fl_with_db(http_mocker) fl.run() self.assertEqual(fl.db_last_index(), self.last_index) - ingested_repos = list(fl.db_query_range(self.first_index, - self.last_index)) + ingested_repos = list(fl.db_query_range(self.first_index, self.last_index)) self.assertEqual(len(ingested_repos), self.entries_per_page) @requests_mock.Mocker() def test_fetch_multiple_pages_yesdb(self, http_mocker): fl = self.create_fl_with_db(http_mocker) fl.run(min_bound=self.first_index) self.assertEqual(fl.db_last_index(), self.last_index) partitions = fl.db_partition_indices(5) self.assertGreater(len(partitions), 0) for k in partitions: self.assertLessEqual(len(k), 5) self.assertGreater(len(k), 0) @requests_mock.Mocker() def test_fetch_none_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_scheduler(fl) self.disable_db(fl) fl.run(min_bound=1, max_bound=1) # stores no results # FIXME: Determine what this method tries to test and add checks to # actually test @requests_mock.Mocker() def test_fetch_one_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_scheduler(fl) self.disable_db(fl) fl.run(min_bound=self.first_index, max_bound=self.first_index) # FIXME: Determine what this method tries to test and add checks to # actually test @requests_mock.Mocker() def test_fetch_multiple_pages_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_scheduler(fl) self.disable_db(fl) fl.run(min_bound=self.first_index) # FIXME: Determine what this method tries to test and add checks to # actually test @requests_mock.Mocker() def test_repos_list(self, http_mocker): """Test the number of repos listed by the lister """ http_mocker.get(self.test_re, text=self.mock_response) li = self.get_fl().transport_response_simplified( self.get_api_response(self.first_index) ) self.assertIsInstance(li, list) self.assertEqual(len(li), self.entries_per_page) @requests_mock.Mocker() def test_model_map(self, http_mocker): """Check if all the keys of model are present in the model created by the `transport_response_simplified` """ http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() - li = fl.transport_response_simplified( - self.get_api_response(self.first_index)) + li = fl.transport_response_simplified(self.get_api_response(self.first_index)) di = li[0] self.assertIsInstance(di, dict) - pubs = [k for k in vars(fl.MODEL).keys() if not k.startswith('_')] + pubs = [k for k in vars(fl.MODEL).keys() if not k.startswith("_")] for k in pubs: - if k not in ['last_seen', 'task_id', 'id']: + if k not in ["last_seen", "task_id", "id"]: self.assertIn(k, di) @requests_mock.Mocker() def test_api_request(self, http_mocker): """Test API request for rate limit handling """ http_mocker.get(self.test_re, text=self.mock_limit_twice_response) - with patch.object(time, 'sleep', wraps=time.sleep) as sleepmock: + with patch.object(time, "sleep", wraps=time.sleep) as sleepmock: self.get_api_response(self.first_index) self.assertEqual(sleepmock.call_count, 2) @requests_mock.Mocker() def test_request_headers(self, http_mocker): fl = self.create_fl_with_db(http_mocker) fl.run() self.assertNotEqual(len(http_mocker.request_history), 0) for request in http_mocker.request_history: - assert 'User-Agent' in request.headers - user_agent = request.headers['User-Agent'] - assert 'Software Heritage Lister' in user_agent + assert "User-Agent" in request.headers + user_agent = request.headers["User-Agent"] + assert "Software Heritage Lister" in user_agent assert swh.lister.__version__ in user_agent - def scheduled_tasks_test(self, next_api_response_file, next_last_index, - http_mocker): + def scheduled_tasks_test( + self, next_api_response_file, next_last_index, http_mocker + ): """Check that no loading tasks get disabled when processing a new page of repositories returned by a forge API """ fl = self.create_fl_with_db(http_mocker) # process first page of repositories listing fl.run() # process second page of repositories listing prev_last_index = self.last_index self.first_index = self.last_index self.last_index = next_last_index self.good_api_response_file = next_api_response_file fl.run(min_bound=prev_last_index) # check expected number of ingested repos and loading tasks ingested_repos = list(fl.db_query_range(0, self.last_index)) self.assertEqual(len(ingested_repos), len(self.scheduler_tasks)) self.assertEqual(len(ingested_repos), 2 * self.entries_per_page) # check tasks are not disabled for task in self.scheduler_tasks: - self.assertTrue(task['status'] != 'disabled') + self.assertTrue(task["status"] != "disabled") class HttpSimpleListerTester(HttpListerTesterBase, abc.ABC): """Base testing class for subclass of :class:`swh.lister.core.simple)_lister.SimpleLister` See :class:`swh.lister.pypi.tests.test_lister` for an example of how to customize for a specific listing service. """ + entries = AbstractAttribute( - 'Number of results ' - 'in good response') # type: Union[AbstractAttribute, int] + "Number of results " "in good response" + ) # type: Union[AbstractAttribute, int] PAGE = AbstractAttribute( - "URL of the server api's unique page to retrieve and " - "parse for information") # type: Union[AbstractAttribute, str] + "URL of the server api's unique page to retrieve and " "parse for information" + ) # type: Union[AbstractAttribute, str] def get_fl(self, override_config=None): """Retrieve an instance of fake lister (fl). """ if override_config or self.fl is None: - self.fl = self.Lister( - override_config=override_config) + self.fl = self.Lister(override_config=override_config) self.fl.INITIAL_BACKOFF = 1 self.fl.reset_backoff() return self.fl def mock_response(self, request, context): self.fl.reset_backoff() self.rate_limit = 1 context.status_code = 200 custom_headers = self.response_headers(request) context.headers.update(custom_headers) response_file = self.good_api_response_file - with open('swh/lister/%s/tests/%s' % (self.lister_subdir, - response_file), - 'r', encoding='utf-8') as r: + with open( + "swh/lister/%s/tests/%s" % (self.lister_subdir, response_file), + "r", + encoding="utf-8", + ) as r: return r.read() @requests_mock.Mocker() def test_api_request(self, http_mocker): """Test API request for rate limit handling """ http_mocker.get(self.PAGE, text=self.mock_limit_twice_response) - with patch.object(time, 'sleep', wraps=time.sleep) as sleepmock: + with patch.object(time, "sleep", wraps=time.sleep) as sleepmock: self.get_api_response(0) self.assertEqual(sleepmock.call_count, 2) @requests_mock.Mocker() def test_model_map(self, http_mocker): """Check if all the keys of model are present in the model created by the `transport_response_simplified` """ http_mocker.get(self.PAGE, text=self.mock_response) fl = self.get_fl() li = fl.list_packages(self.get_api_response(0)) li = fl.transport_response_simplified(li) di = li[0] self.assertIsInstance(di, dict) - pubs = [k for k in vars(fl.MODEL).keys() if not k.startswith('_')] + pubs = [k for k in vars(fl.MODEL).keys() if not k.startswith("_")] for k in pubs: - if k not in ['last_seen', 'task_id', 'id']: + if k not in ["last_seen", "task_id", "id"]: self.assertIn(k, di) @requests_mock.Mocker() def test_repos_list(self, http_mocker): """Test the number of packages listed by the lister """ http_mocker.get(self.PAGE, text=self.mock_response) - li = self.get_fl().list_packages( - self.get_api_response(0) - ) + li = self.get_fl().list_packages(self.get_api_response(0)) self.assertIsInstance(li, list) self.assertEqual(len(li), self.entries) diff --git a/swh/lister/core/tests/test_model.py b/swh/lister/core/tests/test_model.py index 9f07223..f85bbdf 100644 --- a/swh/lister/core/tests/test_model.py +++ b/swh/lister/core/tests/test_model.py @@ -1,91 +1,91 @@ # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from sqlalchemy import Column, Integer from swh.lister.core.models import IndexingModelBase, ModelBase class BadSubclass1(ModelBase): __abstract__ = True pass class BadSubclass2(ModelBase): __abstract__ = True - __tablename__ = 'foo' + __tablename__ = "foo" class BadSubclass3(BadSubclass2): __abstract__ = True pass class GoodSubclass(BadSubclass2): uid = Column(Integer, primary_key=True) indexable = Column(Integer, index=True) class IndexingBadSubclass(IndexingModelBase): __abstract__ = True pass class IndexingBadSubclass2(IndexingModelBase): __abstract__ = True - __tablename__ = 'foo' + __tablename__ = "foo" class IndexingBadSubclass3(IndexingBadSubclass2): __abstract__ = True pass class IndexingGoodSubclass(IndexingModelBase): uid = Column(Integer, primary_key=True) indexable = Column(Integer, index=True) - __tablename__ = 'bar' + __tablename__ = "bar" class TestModel(unittest.TestCase): def test_model_instancing(self): with self.assertRaises(TypeError): ModelBase() with self.assertRaises(TypeError): BadSubclass1() with self.assertRaises(TypeError): BadSubclass2() with self.assertRaises(TypeError): BadSubclass3() self.assertIsInstance(GoodSubclass(), GoodSubclass) - gsc = GoodSubclass(uid='uid') + gsc = GoodSubclass(uid="uid") - self.assertEqual(gsc.__tablename__, 'foo') - self.assertEqual(gsc.uid, 'uid') + self.assertEqual(gsc.__tablename__, "foo") + self.assertEqual(gsc.uid, "uid") def test_indexing_model_instancing(self): with self.assertRaises(TypeError): IndexingModelBase() with self.assertRaises(TypeError): IndexingBadSubclass() with self.assertRaises(TypeError): IndexingBadSubclass2() with self.assertRaises(TypeError): IndexingBadSubclass3() self.assertIsInstance(IndexingGoodSubclass(), IndexingGoodSubclass) - gsc = IndexingGoodSubclass(uid='uid', indexable='indexable') + gsc = IndexingGoodSubclass(uid="uid", indexable="indexable") - self.assertEqual(gsc.__tablename__, 'bar') - self.assertEqual(gsc.uid, 'uid') - self.assertEqual(gsc.indexable, 'indexable') + self.assertEqual(gsc.__tablename__, "bar") + self.assertEqual(gsc.uid, "uid") + self.assertEqual(gsc.indexable, "indexable") diff --git a/swh/lister/cran/__init__.py b/swh/lister/cran/__init__.py index 6abfa5b..3fa6586 100644 --- a/swh/lister/cran/__init__.py +++ b/swh/lister/cran/__init__.py @@ -1,13 +1,14 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def register(): from .models import CRANModel from .lister import CRANLister - return {'models': [CRANModel], - 'lister': CRANLister, - 'task_modules': ['%s.tasks' % __name__], - } + return { + "models": [CRANModel], + "lister": CRANLister, + "task_modules": ["%s.tasks" % __name__], + } diff --git a/swh/lister/cran/lister.py b/swh/lister/cran/lister.py index a818dd0..6f9e738 100644 --- a/swh/lister/cran/lister.py +++ b/swh/lister/cran/lister.py @@ -1,139 +1,144 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging import pkg_resources import subprocess from typing import List, Mapping, Tuple from swh.lister.cran.models import CRANModel from swh.lister.core.simple_lister import SimpleLister from swh.scheduler.utils import create_task_dict logger = logging.getLogger(__name__) -CRAN_MIRROR = 'https://cran.r-project.org' +CRAN_MIRROR = "https://cran.r-project.org" class CRANLister(SimpleLister): MODEL = CRANModel - LISTER_NAME = 'cran' - instance = 'cran' - - def task_dict(self, origin_type, origin_url, version=None, html_url=None, - policy=None, **kwargs): + LISTER_NAME = "cran" + instance = "cran" + + def task_dict( + self, + origin_type, + origin_url, + version=None, + html_url=None, + policy=None, + **kwargs, + ): """Return task format dict. This creates tasks with args and kwargs set, for example:: args: [] kwargs: { 'url': 'https://cran.r-project.org/Packages/...', 'artifacts': [{ 'url': 'https://cran.r-project.org/...', 'version': '0.0.1', }] } """ if not policy: - policy = 'oneshot' + policy = "oneshot" artifact_url = html_url - assert origin_type == 'tar' + assert origin_type == "tar" return create_task_dict( - 'load-cran', policy, - url=origin_url, artifacts=[{ - 'url': artifact_url, - 'version': version - }], retries_left=3 + "load-cran", + policy, + url=origin_url, + artifacts=[{"url": artifact_url, "version": version}], + retries_left=3, ) def safely_issue_request(self, identifier): """Bypass the implementation. It's now the `list_packages` which returns data. As an implementation detail, we cannot change simply the base SimpleLister yet as other implementation still uses it. This shall be part of another refactoring pass. """ return None def list_packages(self, response) -> List[Mapping[str, str]]: """Runs R script which uses inbuilt API to return a json response containing data about the R packages. Returns: List of Dict about R packages. For example:: [ { 'Package': 'A3', 'Version': '1.0.0', 'Title': 'A3 package', 'Description': ... }, { 'Package': 'abbyyR', 'Version': '0.5.4', 'Title': 'Access to Abbyy OCR (OCR) API', 'Description': ...' }, ... ] """ return read_cran_data() - def get_model_from_repo( - self, repo: Mapping[str, str]) -> Mapping[str, str]: + def get_model_from_repo(self, repo: Mapping[str, str]) -> Mapping[str, str]: """Transform from repository representation to model """ - logger.debug('repo: %s', repo) + logger.debug("repo: %s", repo) origin_url, artifact_url = compute_origin_urls(repo) - package = repo['Package'] - version = repo['Version'] + package = repo["Package"] + version = repo["Version"] return { - 'uid': f'{package}-{version}', - 'name': package, - 'full_name': repo['Title'], - 'version': version, - 'html_url': artifact_url, - 'origin_url': origin_url, - 'origin_type': 'tar', + "uid": f"{package}-{version}", + "name": package, + "full_name": repo["Title"], + "version": version, + "html_url": artifact_url, + "origin_url": origin_url, + "origin_type": "tar", } def read_cran_data() -> List[Mapping[str, str]]: """Execute r script to read cran listing. """ - filepath = pkg_resources.resource_filename('swh.lister.cran', - 'list_all_packages.R') - logger.debug('script list-all-packages.R path: %s', filepath) + filepath = pkg_resources.resource_filename("swh.lister.cran", "list_all_packages.R") + logger.debug("script list-all-packages.R path: %s", filepath) response = subprocess.run(filepath, stdout=subprocess.PIPE, shell=False) - return json.loads(response.stdout.decode('utf-8')) + return json.loads(response.stdout.decode("utf-8")) def compute_origin_urls(repo: Mapping[str, str]) -> Tuple[str, str]: """Compute the package url from the repo dict. Args: repo: dict with key 'Package', 'Version' Returns: the tuple project url, artifact url """ - package = repo['Package'] - version = repo['Version'] - origin_url = f'{CRAN_MIRROR}/package={package}' - artifact_url = f'{CRAN_MIRROR}/src/contrib/{package}_{version}.tar.gz' + package = repo["Package"] + version = repo["Version"] + origin_url = f"{CRAN_MIRROR}/package={package}" + artifact_url = f"{CRAN_MIRROR}/src/contrib/{package}_{version}.tar.gz" return origin_url, artifact_url diff --git a/swh/lister/cran/models.py b/swh/lister/cran/models.py index 3fe94d9..5c8dd5c 100644 --- a/swh/lister/cran/models.py +++ b/swh/lister/cran/models.py @@ -1,17 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from sqlalchemy import Column, String from swh.lister.core.models import ModelBase class CRANModel(ModelBase): """a CRAN repository representation """ - __tablename__ = 'cran_repo' + + __tablename__ = "cran_repo" uid = Column(String, primary_key=True) version = Column(String) diff --git a/swh/lister/cran/tasks.py b/swh/lister/cran/tasks.py index 74eef74..b541541 100644 --- a/swh/lister/cran/tasks.py +++ b/swh/lister/cran/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from swh.lister.cran.lister import CRANLister -@shared_task(name=__name__ + '.CRANListerTask') +@shared_task(name=__name__ + ".CRANListerTask") def list_cran(**lister_args): - '''Lister task for the CRAN registry''' + """Lister task for the CRAN registry""" return CRANLister(**lister_args).run() -@shared_task(name=__name__ + '.ping') +@shared_task(name=__name__ + ".ping") def _ping(): - return 'OK' + return "OK" diff --git a/swh/lister/cran/tests/conftest.py b/swh/lister/cran/tests/conftest.py index cce18ab..30d88c3 100644 --- a/swh/lister/cran/tests/conftest.py +++ b/swh/lister/cran/tests/conftest.py @@ -1,23 +1,25 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.lister.core.tests.conftest import * # noqa @pytest.fixture def lister_cran(swh_listers): - lister = swh_listers['cran'] + lister = swh_listers["cran"] # Add the load-deb-package in the scheduler backend - lister.scheduler.create_task_type({ - 'type': 'load-cran', - 'description': 'Load a CRAN package', - 'backend_name': 'swh.loader.package.cran.tasks.LoaderCRAN', - 'default_interval': '1 day', - }) + lister.scheduler.create_task_type( + { + "type": "load-cran", + "description": "Load a CRAN package", + "backend_name": "swh.loader.package.cran.tasks.LoaderCRAN", + "default_interval": "1 day", + } + ) return lister diff --git a/swh/lister/cran/tests/test_lister.py b/swh/lister/cran/tests/test_lister.py index 3b6847c..d7ce6a4 100644 --- a/swh/lister/cran/tests/test_lister.py +++ b/swh/lister/cran/tests/test_lister.py @@ -1,72 +1,71 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import pytest from os import path from unittest.mock import patch from swh.lister.cran.lister import compute_origin_urls, CRAN_MIRROR def test_cran_compute_origin_urls(): - pack = 'something' - vers = '0.0.1' - origin_url, artifact_url = compute_origin_urls({ - 'Package': pack, - 'Version': vers, - }) + pack = "something" + vers = "0.0.1" + origin_url, artifact_url = compute_origin_urls({"Package": pack, "Version": vers,}) - assert origin_url == f'{CRAN_MIRROR}/package={pack}' - assert artifact_url == f'{CRAN_MIRROR}/src/contrib/{pack}_{vers}.tar.gz' + assert origin_url == f"{CRAN_MIRROR}/package={pack}" + assert artifact_url == f"{CRAN_MIRROR}/src/contrib/{pack}_{vers}.tar.gz" def test_cran_compute_origin_urls_failure(): - for incomplete_repo in [{'Version': '0.0.1'}, {'Package': 'package'}, {}]: + for incomplete_repo in [{"Version": "0.0.1"}, {"Package": "package"}, {}]: with pytest.raises(KeyError): compute_origin_urls(incomplete_repo) -@patch('swh.lister.cran.lister.read_cran_data') +@patch("swh.lister.cran.lister.read_cran_data") def test_cran_lister_cran(mock_cran, datadir, lister_cran): lister = lister_cran - with open(path.join(datadir, 'list-r-packages.json')) as f: + with open(path.join(datadir, "list-r-packages.json")) as f: data = json.loads(f.read()) mock_cran.return_value = data assert len(data) == 6 lister.run() - r = lister.scheduler.search_tasks(task_type='load-cran') + r = lister.scheduler.search_tasks(task_type="load-cran") assert len(r) == 6 for row in r: - assert row['type'] == 'load-cran' + assert row["type"] == "load-cran" # arguments check - args = row['arguments']['args'] + args = row["arguments"]["args"] assert len(args) == 0 # kwargs - kwargs = row['arguments']['kwargs'] + kwargs = row["arguments"]["kwargs"] assert len(kwargs) == 2 - assert set(kwargs.keys()) == {'url', 'artifacts'} + assert set(kwargs.keys()) == {"url", "artifacts"} - artifacts = kwargs['artifacts'] + artifacts = kwargs["artifacts"] assert len(artifacts) == 1 - assert set(artifacts[0].keys()) == {'url', 'version'} + assert set(artifacts[0].keys()) == {"url", "version"} - assert row['policy'] == 'oneshot' - assert row['retries_left'] == 3 + assert row["policy"] == "oneshot" + assert row["retries_left"] == 3 - origin_url = kwargs['url'] - record = lister.db_session \ - .query(lister.MODEL) \ - .filter(origin_url == origin_url).first() + origin_url = kwargs["url"] + record = ( + lister.db_session.query(lister.MODEL) + .filter(origin_url == origin_url) + .first() + ) assert record - assert record.uid == f'{record.name}-{record.version}' + assert record.uid == f"{record.name}-{record.version}" diff --git a/swh/lister/cran/tests/test_tasks.py b/swh/lister/cran/tests/test_tasks.py index 1a0b95a..9ff3d9f 100644 --- a/swh/lister/cran/tests/test_tasks.py +++ b/swh/lister/cran/tests/test_tasks.py @@ -1,27 +1,25 @@ from unittest.mock import patch def test_ping(swh_app, celery_session_worker): - res = swh_app.send_task( - 'swh.lister.cran.tasks.ping') + res = swh_app.send_task("swh.lister.cran.tasks.ping") assert res res.wait() assert res.successful() - assert res.result == 'OK' + assert res.result == "OK" -@patch('swh.lister.cran.tasks.CRANLister') +@patch("swh.lister.cran.tasks.CRANLister") def test_lister(lister, swh_app, celery_session_worker): # setup the mocked CRANLister lister.return_value = lister lister.run.return_value = None - res = swh_app.send_task( - 'swh.lister.cran.tasks.CRANListerTask') + res = swh_app.send_task("swh.lister.cran.tasks.CRANListerTask") assert res res.wait() assert res.successful() lister.assert_called_once_with() lister.db_last_index.assert_not_called() lister.run.assert_called_once_with() diff --git a/swh/lister/debian/__init__.py b/swh/lister/debian/__init__.py index e07a179..9e201b0 100644 --- a/swh/lister/debian/__init__.py +++ b/swh/lister/debian/__init__.py @@ -1,70 +1,77 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Any, List, Mapping logger = logging.getLogger(__name__) -def debian_init(db_engine, - override_conf: Mapping[str, Any] = {}, - distribution_name: str = 'Debian', - suites: List[str] = ['stretch', 'buster', 'bullseye'], - components: List[str] = ['main', 'contrib', 'non-free']): +def debian_init( + db_engine, + override_conf: Mapping[str, Any] = {}, + distribution_name: str = "Debian", + suites: List[str] = ["stretch", "buster", "bullseye"], + components: List[str] = ["main", "contrib", "non-free"], +): """Initialize the debian data model. Args: db_engine: SQLAlchemy manipulation database object override_conf: Override conf to pass to instantiate a lister distribution_name: Distribution to initialize suites: Default suites to register with the lister components: Default components to register per suite """ from swh.lister.debian.models import Distribution, Area from sqlalchemy.orm import sessionmaker + db_session = sessionmaker(bind=db_engine)() - distrib = db_session.query(Distribution) \ - .filter(Distribution.name == distribution_name) \ + distrib = ( + db_session.query(Distribution) + .filter(Distribution.name == distribution_name) .one_or_none() + ) if distrib is None: distrib = Distribution( - name=distribution_name, type='deb', - mirror_uri='http://deb.debian.org/debian/' + name=distribution_name, + type="deb", + mirror_uri="http://deb.debian.org/debian/", ) db_session.add(distrib) # Check the existing - existing_area = db_session.query(Area) \ - .filter(Area.distribution == distrib) \ - .all() + existing_area = db_session.query(Area).filter(Area.distribution == distrib).all() existing_area = set([a.name for a in existing_area]) - logger.debug('Area already known: %s', ', '.join(existing_area)) + logger.debug("Area already known: %s", ", ".join(existing_area)) # Create only the new ones for suite in suites: for component in components: - area_name = f'{suite}/{component}' + area_name = f"{suite}/{component}" if area_name in existing_area: logger.debug("Area '%s' already set, skipping", area_name) continue area = Area(name=area_name, distribution=distrib) db_session.add(area) db_session.commit() db_session.close() def register() -> Mapping[str, Any]: from .lister import DebianLister - return {'models': [DebianLister.MODEL], - 'lister': DebianLister, - 'task_modules': ['%s.tasks' % __name__], - 'init': debian_init} + + return { + "models": [DebianLister.MODEL], + "lister": DebianLister, + "task_modules": ["%s.tasks" % __name__], + "init": debian_init, + } diff --git a/swh/lister/debian/lister.py b/swh/lister/debian/lister.py index b5c4c50..7355fc7 100644 --- a/swh/lister/debian/lister.py +++ b/swh/lister/debian/lister.py @@ -1,256 +1,261 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import bz2 from collections import defaultdict import datetime import gzip import lzma import logging from debian.deb822 import Sources from sqlalchemy.orm import joinedload, load_only from sqlalchemy.schema import CreateTable, DropTable from typing import Mapping, Optional, Dict, Any from requests import Response from swh.lister.debian.models import ( - AreaSnapshot, Distribution, DistributionSnapshot, Package, + AreaSnapshot, + Distribution, + DistributionSnapshot, + Package, TempPackage, ) from swh.lister.core.lister_base import ListerBase, FetchError from swh.lister.core.lister_transports import ListerHttpTransport decompressors = { - 'gz': lambda f: gzip.GzipFile(fileobj=f), - 'bz2': bz2.BZ2File, - 'xz': lzma.LZMAFile, + "gz": lambda f: gzip.GzipFile(fileobj=f), + "bz2": bz2.BZ2File, + "xz": lzma.LZMAFile, } logger = logging.getLogger(__name__) class DebianLister(ListerHttpTransport, ListerBase): MODEL = Package PATH_TEMPLATE = None - LISTER_NAME = 'debian' - instance = 'debian' - - def __init__(self, distribution: str = 'Debian', - date: Optional[datetime.datetime] = None, - override_config: Mapping = {}): + LISTER_NAME = "debian" + instance = "debian" + + def __init__( + self, + distribution: str = "Debian", + date: Optional[datetime.datetime] = None, + override_config: Mapping = {}, + ): """Initialize the debian lister for a given distribution at a given date. Args: distribution: name of the distribution (e.g. "Debian") date: date the snapshot is taken (defaults to now if empty) override_config: Override configuration (which takes precedence over the parameters if provided) """ ListerHttpTransport.__init__(self, url="notused") ListerBase.__init__(self, override_config=override_config) - self.distribution = override_config.get('distribution', distribution) - self.date = override_config.get('date', date) or datetime.datetime.now( - tz=datetime.timezone.utc) + self.distribution = override_config.get("distribution", distribution) + self.date = override_config.get("date", date) or datetime.datetime.now( + tz=datetime.timezone.utc + ) def transport_request(self, identifier) -> Response: """Subvert ListerHttpTransport.transport_request, to try several index URIs in turn. The Debian repository format supports several compression algorithms across the ages, so we try several URIs. Once we have found a working URI, we break and set `self.decompressor` to the one that matched. Returns: a requests Response object. Raises: FetchError: when all the URIs failed to be retrieved. """ response = None compression = None for uri, compression in self.area.index_uris(): response = super().transport_request(uri) if response.status_code == 200: break else: - raise FetchError( - "Could not retrieve index for %s" % self.area - ) + raise FetchError("Could not retrieve index for %s" % self.area) self.decompressor = decompressors.get(compression) return response def request_uri(self, identifier): # In the overridden transport_request, we pass # ListerBase.transport_request() the full URI as identifier, so we # need to return it here. return identifier def request_params(self, identifier) -> Dict[str, Any]: # Enable streaming to allow wrapping the response in the decompressor # in transport_response_simplified. params = super().request_params(identifier) - params['stream'] = True + params["stream"] = True return params def transport_response_simplified(self, response): """Decompress and parse the package index fetched in `transport_request`. For each package, we "pivot" the file list entries (Files, Checksums-Sha1, Checksums-Sha256), to return a files dict mapping filenames to their checksums. """ if self.decompressor: data = self.decompressor(response.raw) else: data = response.raw for src_pkg in Sources.iter_paragraphs(data.readlines()): files = defaultdict(dict) for field in src_pkg._multivalued_fields: - if field.startswith('checksums-'): - sum_name = field[len('checksums-'):] + if field.startswith("checksums-"): + sum_name = field[len("checksums-") :] else: - sum_name = 'md5sum' + sum_name = "md5sum" if field in src_pkg: for entry in src_pkg[field]: - name = entry['name'] - files[name]['name'] = entry['name'] - files[name]['size'] = int(entry['size'], 10) + name = entry["name"] + files[name]["name"] = entry["name"] + files[name]["size"] = int(entry["size"], 10) files[name][sum_name] = entry[sum_name] yield { - 'name': src_pkg['Package'], - 'version': src_pkg['Version'], - 'directory': src_pkg['Directory'], - 'files': files, + "name": src_pkg["Package"], + "version": src_pkg["Version"], + "directory": src_pkg["Directory"], + "files": files, } def inject_repo_data_into_db(self, models_list): """Generate the Package entries that didn't previously exist. Contrary to ListerBase, we don't actually insert the data in database. `schedule_missing_tasks` does it once we have the origin and task identifiers. """ by_name_version = {} temp_packages = [] area_id = self.area.id for model in models_list: - name = model['name'] - version = model['version'] - temp_packages.append({ - 'area_id': area_id, - 'name': name, - 'version': version, - }) + name = model["name"] + version = model["version"] + temp_packages.append( + {"area_id": area_id, "name": name, "version": version,} + ) by_name_version[name, version] = model # Add all the listed packages to a temporary table self.db_session.execute(CreateTable(TempPackage.__table__)) self.db_session.bulk_insert_mappings(TempPackage, temp_packages) def exists_tmp_pkg(db_session, model): return ( db_session.query(model) .filter(Package.area_id == TempPackage.area_id) .filter(Package.name == TempPackage.name) .filter(Package.version == TempPackage.version) .exists() ) # Filter out the packages that already exist in the main Package table - new_packages = self.db_session\ - .query(TempPackage)\ - .options(load_only('name', 'version'))\ - .filter(~exists_tmp_pkg(self.db_session, Package))\ - .all() - - self.old_area_packages = self.db_session.query(Package).filter( - exists_tmp_pkg(self.db_session, TempPackage) - ).all() + new_packages = ( + self.db_session.query(TempPackage) + .options(load_only("name", "version")) + .filter(~exists_tmp_pkg(self.db_session, Package)) + .all() + ) + + self.old_area_packages = ( + self.db_session.query(Package) + .filter(exists_tmp_pkg(self.db_session, TempPackage)) + .all() + ) self.db_session.execute(DropTable(TempPackage.__table__)) added_packages = [] for package in new_packages: model = by_name_version[package.name, package.version] - added_packages.append(Package(area=self.area, - **model)) + added_packages.append(Package(area=self.area, **model)) self.db_session.add_all(added_packages) return added_packages def schedule_missing_tasks(self, models_list, added_packages): """We create tasks at the end of the full snapshot processing""" return def create_tasks_for_snapshot(self, snapshot): tasks = [ snapshot.task_for_package(name, versions) for name, versions in snapshot.get_packages().items() ] return self.scheduler.create_tasks(tasks) def run(self): """Run the lister for a given (distribution, area) tuple. """ - distribution = self.db_session\ - .query(Distribution)\ - .options(joinedload(Distribution.areas))\ - .filter(Distribution.name == self.distribution)\ - .one_or_none() + distribution = ( + self.db_session.query(Distribution) + .options(joinedload(Distribution.areas)) + .filter(Distribution.name == self.distribution) + .one_or_none() + ) if not distribution: - logger.error("Distribution %s is not registered" % - self.distribution) - return {'status': 'failed'} + logger.error("Distribution %s is not registered" % self.distribution) + return {"status": "failed"} - if not distribution.type == 'deb': - logger.error("Distribution %s is not a Debian derivative" % - distribution) - return {'status': 'failed'} + if not distribution.type == "deb": + logger.error("Distribution %s is not a Debian derivative" % distribution) + return {"status": "failed"} date = self.date - logger.debug('Creating snapshot for distribution %s on date %s' % - (distribution, date)) + logger.debug( + "Creating snapshot for distribution %s on date %s" % (distribution, date) + ) snapshot = DistributionSnapshot(date=date, distribution=distribution) self.db_session.add(snapshot) for area in distribution.areas: if not area.active: continue self.area = area - logger.debug('Processing area %s' % area) + logger.debug("Processing area %s" % area) _, new_area_packages = self.ingest_data(None) area_snapshot = AreaSnapshot(snapshot=snapshot, area=area) self.db_session.add(area_snapshot) area_snapshot.packages.extend(new_area_packages) area_snapshot.packages.extend(self.old_area_packages) self.create_tasks_for_snapshot(snapshot) self.db_session.commit() - return {'status': 'eventful'} + return {"status": "eventful"} diff --git a/swh/lister/debian/models.py b/swh/lister/debian/models.py index 1a7058f..9335b82 100644 --- a/swh/lister/debian/models.py +++ b/swh/lister/debian/models.py @@ -1,250 +1,230 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii from collections import defaultdict import datetime from sqlalchemy import ( Boolean, Column, DateTime, Enum, ForeignKey, Integer, LargeBinary, String, Table, UniqueConstraint, ) from typing import Any, Mapping try: from sqlalchemy import JSON except ImportError: # SQLAlchemy < 1.1 from sqlalchemy.dialects.postgresql import JSONB as JSON from sqlalchemy.orm import relationship from swh.lister.core.models import SQLBase class Distribution(SQLBase): """A distribution (e.g. Debian, Ubuntu, Fedora, ...)""" - __tablename__ = 'distribution' + + __tablename__ = "distribution" id = Column(Integer, primary_key=True) name = Column(String, unique=True, nullable=False) - type = Column(Enum('deb', 'rpm', name='distribution_types'), - nullable=False) + type = Column(Enum("deb", "rpm", name="distribution_types"), nullable=False) mirror_uri = Column(String, nullable=False) - areas = relationship('Area', back_populates='distribution') + areas = relationship("Area", back_populates="distribution") def origin_for_package(self, package_name: str) -> str: """Return the origin url for the given package """ - return '%s://%s/packages/%s' % (self.type, self.name, package_name) + return "%s://%s/packages/%s" % (self.type, self.name, package_name) def __repr__(self): - return 'Distribution(%s (%s) on %s)' % ( - self.name, - self.type, - self.mirror_uri, - ) + return "Distribution(%s (%s) on %s)" % (self.name, self.type, self.mirror_uri,) class Area(SQLBase): - __tablename__ = 'area' - __table_args__ = ( - UniqueConstraint('distribution_id', 'name'), - ) + __tablename__ = "area" + __table_args__ = (UniqueConstraint("distribution_id", "name"),) id = Column(Integer, primary_key=True) - distribution_id = Column(Integer, ForeignKey('distribution.id'), - nullable=False) + distribution_id = Column(Integer, ForeignKey("distribution.id"), nullable=False) name = Column(String, nullable=False) active = Column(Boolean, nullable=False, default=True) - distribution = relationship('Distribution', back_populates='areas') + distribution = relationship("Distribution", back_populates="areas") def index_uris(self): """Get possible URIs for this component's package index""" - if self.distribution.type == 'deb': - compression_exts = ('xz', 'bz2', 'gz', None) - base_uri = '%s/dists/%s/source/Sources' % ( + if self.distribution.type == "deb": + compression_exts = ("xz", "bz2", "gz", None) + base_uri = "%s/dists/%s/source/Sources" % ( self.distribution.mirror_uri, self.name, ) for ext in compression_exts: if ext: - yield (base_uri + '.' + ext, ext) + yield (base_uri + "." + ext, ext) else: yield (base_uri, None) else: raise NotImplementedError( - 'Do not know how to build index URI for Distribution type %s' % - self.distribution.type + "Do not know how to build index URI for Distribution type %s" + % self.distribution.type ) def __repr__(self): - return 'Area(%s of %s)' % ( - self.name, - self.distribution.name, - ) + return "Area(%s of %s)" % (self.name, self.distribution.name,) class Package(SQLBase): - __tablename__ = 'package' - __table_args__ = ( - UniqueConstraint('area_id', 'name', 'version'), - ) + __tablename__ = "package" + __table_args__ = (UniqueConstraint("area_id", "name", "version"),) id = Column(Integer, primary_key=True) - area_id = Column(Integer, ForeignKey('area.id'), nullable=False) + area_id = Column(Integer, ForeignKey("area.id"), nullable=False) name = Column(String, nullable=False) version = Column(String, nullable=False) directory = Column(String, nullable=False) files = Column(JSON, nullable=False) origin_id = Column(Integer) task_id = Column(Integer) revision_id = Column(LargeBinary(20)) - area = relationship('Area') + area = relationship("Area") @property def distribution(self): return self.area.distribution def fetch_uri(self, filename): """Get the URI to fetch the `filename` file associated with the package""" - if self.distribution.type == 'deb': - return '%s/%s/%s' % ( + if self.distribution.type == "deb": + return "%s/%s/%s" % ( self.distribution.mirror_uri, self.directory, filename, ) else: raise NotImplementedError( - 'Do not know how to build fetch URI for Distribution type %s' % - self.distribution.type + "Do not know how to build fetch URI for Distribution type %s" + % self.distribution.type ) def loader_dict(self): ret = { - 'id': self.id, - 'name': self.name, - 'version': self.version, + "id": self.id, + "name": self.name, + "version": self.version, } if self.revision_id: - ret['revision_id'] = binascii.hexlify(self.revision_id).decode() + ret["revision_id"] = binascii.hexlify(self.revision_id).decode() else: - files = { - name: checksums.copy() - for name, checksums in self.files.items() - } + files = {name: checksums.copy() for name, checksums in self.files.items()} for name in files: - files[name]['uri'] = self.fetch_uri(name) + files[name]["uri"] = self.fetch_uri(name) - ret.update({ - 'revision_id': None, - 'files': files, - }) + ret.update( + {"revision_id": None, "files": files,} + ) return ret def __repr__(self): - return 'Package(%s_%s of %s %s)' % ( + return "Package(%s_%s of %s %s)" % ( self.name, self.version, self.distribution.name, self.area.name, ) class DistributionSnapshot(SQLBase): - __tablename__ = 'distribution_snapshot' + __tablename__ = "distribution_snapshot" id = Column(Integer, primary_key=True) date = Column(DateTime, nullable=False, index=True) - distribution_id = Column(Integer, - ForeignKey('distribution.id'), - nullable=False) + distribution_id = Column(Integer, ForeignKey("distribution.id"), nullable=False) - distribution = relationship('Distribution') - areas = relationship('AreaSnapshot', back_populates='snapshot') + distribution = relationship("Distribution") + areas = relationship("AreaSnapshot", back_populates="snapshot") - def task_for_package(self, package_name: str, - package_versions: Mapping) -> Mapping[str, Any]: + def task_for_package( + self, package_name: str, package_versions: Mapping + ) -> Mapping[str, Any]: """Return the task dictionary for the given list of package versions """ origin_url = self.distribution.origin_for_package(package_name) return { - 'policy': 'oneshot', - 'type': 'load-%s-package' % self.distribution.type, - 'next_run': datetime.datetime.now(tz=datetime.timezone.utc), - 'arguments': { - 'args': [], - 'kwargs': { - 'url': origin_url, - 'date': self.date.isoformat(), - 'packages': package_versions, + "policy": "oneshot", + "type": "load-%s-package" % self.distribution.type, + "next_run": datetime.datetime.now(tz=datetime.timezone.utc), + "arguments": { + "args": [], + "kwargs": { + "url": origin_url, + "date": self.date.isoformat(), + "packages": package_versions, }, }, - 'retries_left': 3, + "retries_left": 3, } def get_packages(self): packages = defaultdict(dict) for area_snapshot in self.areas: area_name = area_snapshot.area.name for package in area_snapshot.packages: - ref_name = '%s/%s' % (area_name, package.version) + ref_name = "%s/%s" % (area_name, package.version) packages[package.name][ref_name] = package.loader_dict() return packages area_snapshot_package_assoc = Table( - 'area_snapshot_package', SQLBase.metadata, - Column('area_snapshot_id', Integer, ForeignKey('area_snapshot.id'), - nullable=False), - Column('package_id', Integer, ForeignKey('package.id'), - nullable=False), + "area_snapshot_package", + SQLBase.metadata, + Column("area_snapshot_id", Integer, ForeignKey("area_snapshot.id"), nullable=False), + Column("package_id", Integer, ForeignKey("package.id"), nullable=False), ) class AreaSnapshot(SQLBase): - __tablename__ = 'area_snapshot' + __tablename__ = "area_snapshot" id = Column(Integer, primary_key=True) - snapshot_id = Column(Integer, - ForeignKey('distribution_snapshot.id'), - nullable=False) - area_id = Column(Integer, - ForeignKey('area.id'), - nullable=False) + snapshot_id = Column( + Integer, ForeignKey("distribution_snapshot.id"), nullable=False + ) + area_id = Column(Integer, ForeignKey("area.id"), nullable=False) - snapshot = relationship('DistributionSnapshot', back_populates='areas') - area = relationship('Area') - packages = relationship('Package', secondary=area_snapshot_package_assoc) + snapshot = relationship("DistributionSnapshot", back_populates="areas") + area = relationship("Area") + packages = relationship("Package", secondary=area_snapshot_package_assoc) class TempPackage(SQLBase): - __tablename__ = 'temp_package' + __tablename__ = "temp_package" __table_args__ = { - 'prefixes': ['TEMPORARY'], + "prefixes": ["TEMPORARY"], } id = Column(Integer, primary_key=True) area_id = Column(Integer) name = Column(String) version = Column(String) diff --git a/swh/lister/debian/tasks.py b/swh/lister/debian/tasks.py index 04d1297..3099e61 100644 --- a/swh/lister/debian/tasks.py +++ b/swh/lister/debian/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from .lister import DebianLister -@shared_task(name=__name__ + '.DebianListerTask') +@shared_task(name=__name__ + ".DebianListerTask") def list_debian_distribution(distribution, **lister_args): - '''List a Debian distribution''' + """List a Debian distribution""" return DebianLister(distribution=distribution, **lister_args).run() -@shared_task(name=__name__ + '.ping') +@shared_task(name=__name__ + ".ping") def _ping(): - return 'OK' + return "OK" diff --git a/swh/lister/debian/tests/conftest.py b/swh/lister/debian/tests/conftest.py index 8bbc443..4b2ab4c 100644 --- a/swh/lister/debian/tests/conftest.py +++ b/swh/lister/debian/tests/conftest.py @@ -1,60 +1,58 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from pytest_postgresql.janitor import DatabaseJanitor from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from swh.lister.core.tests.conftest import * # noqa from swh.lister.core.models import SQLBase from swh.lister.debian import debian_init @pytest.fixture def lister_debian(swh_listers): - lister = swh_listers['debian'] + lister = swh_listers["debian"] # Initialize the debian data model - debian_init( - lister.db_engine, suites=['stretch'], components=['main', 'contrib'] - ) + debian_init(lister.db_engine, suites=["stretch"], components=["main", "contrib"]) # Add the load-deb-package in the scheduler backend - lister.scheduler.create_task_type({ - 'type': 'load-deb-package', - 'description': 'Load a Debian package', - 'backend_name': 'swh.loader.debian.tasks.LoaderDebianPackage', - 'default_interval': '1 day', - }) + lister.scheduler.create_task_type( + { + "type": "load-deb-package", + "description": "Load a Debian package", + "backend_name": "swh.loader.debian.tasks.LoaderDebianPackage", + "default_interval": "1 day", + } + ) return lister @pytest.fixture def sqlalchemy_engine(postgresql_proc): pg_host = postgresql_proc.host pg_port = postgresql_proc.port pg_user = postgresql_proc.user - pg_db = 'sqlalchemy-tests' + pg_db = "sqlalchemy-tests" - url = f'postgresql://{pg_user}@{pg_host}:{pg_port}/{pg_db}' - with DatabaseJanitor( - pg_user, pg_host, pg_port, pg_db, postgresql_proc.version - ): + url = f"postgresql://{pg_user}@{pg_host}:{pg_port}/{pg_db}" + with DatabaseJanitor(pg_user, pg_host, pg_port, pg_db, postgresql_proc.version): engine = create_engine(url) yield engine engine.dispose() @pytest.fixture def session(sqlalchemy_engine): SQLBase.metadata.create_all(sqlalchemy_engine) Session = sessionmaker(bind=sqlalchemy_engine) session = Session() yield session session.close() diff --git a/swh/lister/debian/tests/test_init.py b/swh/lister/debian/tests/test_init.py index 928cfa6..860c4ec 100644 --- a/swh/lister/debian/tests/test_init.py +++ b/swh/lister/debian/tests/test_init.py @@ -1,77 +1,93 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.lister.debian import debian_init from swh.lister.debian.models import Distribution, Area @pytest.fixture def engine(session): session.autoflush = False return session.bind def test_debian_init_step(engine, session): - distribution_name = 'KaliLinux' + distribution_name = "KaliLinux" - distrib = session.query(Distribution) \ - .filter(Distribution.name == distribution_name) \ + distrib = ( + session.query(Distribution) + .filter(Distribution.name == distribution_name) .one_or_none() + ) assert distrib is None all_area = session.query(Area).all() assert all_area == [] - suites = ['wheezy', 'jessie'] - components = ['main', 'contrib'] - - debian_init(engine, distribution_name=distribution_name, - suites=suites, components=components) - distrib = session.query(Distribution) \ - .filter(Distribution.name == distribution_name) \ + suites = ["wheezy", "jessie"] + components = ["main", "contrib"] + + debian_init( + engine, + distribution_name=distribution_name, + suites=suites, + components=components, + ) + distrib = ( + session.query(Distribution) + .filter(Distribution.name == distribution_name) .one_or_none() + ) assert distrib is not None assert distrib.name == distribution_name - assert distrib.type == 'deb' - assert distrib.mirror_uri == 'http://deb.debian.org/debian/' + assert distrib.type == "deb" + assert distrib.mirror_uri == "http://deb.debian.org/debian/" all_area = session.query(Area).all() assert len(all_area) == 2 * 2, "2 suites * 2 components per suite" expected_area_names = [] for suite in suites: for component in components: - expected_area_names.append(f'{suite}/{component}') + expected_area_names.append(f"{suite}/{component}") for area in all_area: area.id = None assert area.distribution == distrib assert area.name in expected_area_names # check idempotency (on exact same call) - debian_init(engine, distribution_name=distribution_name, - suites=suites, components=components) + debian_init( + engine, + distribution_name=distribution_name, + suites=suites, + components=components, + ) - distribs = session.query(Distribution) \ - .filter(Distribution.name == distribution_name) \ - .all() + distribs = ( + session.query(Distribution).filter(Distribution.name == distribution_name).all() + ) assert len(distribs) == 1 distrib = distribs[0] all_area = session.query(Area).all() assert len(all_area) == 2 * 2, "2 suites * 2 components per suite" # Add a new suite - debian_init(engine, distribution_name=distribution_name, - suites=['lenny'], components=components) + debian_init( + engine, + distribution_name=distribution_name, + suites=["lenny"], + components=components, + ) all_area = [a.name for a in session.query(Area).all()] assert len(all_area) == (2 + 1) * 2, "3 suites * 2 components per suite" diff --git a/swh/lister/debian/tests/test_lister.py b/swh/lister/debian/tests/test_lister.py index 773289e..8694d8d 100644 --- a/swh/lister/debian/tests/test_lister.py +++ b/swh/lister/debian/tests/test_lister.py @@ -1,36 +1,36 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging logger = logging.getLogger(__name__) def test_lister_debian(lister_debian, datadir, requests_mock_datadir): """Simple debian listing should create scheduled tasks """ # Run the lister lister_debian.run() - r = lister_debian.scheduler.search_tasks(task_type='load-deb-package') + r = lister_debian.scheduler.search_tasks(task_type="load-deb-package") assert len(r) == 151 for row in r: - assert row['type'] == 'load-deb-package' + assert row["type"] == "load-deb-package" # arguments check - args = row['arguments']['args'] + args = row["arguments"]["args"] assert len(args) == 0 # kwargs - kwargs = row['arguments']['kwargs'] - assert set(kwargs.keys()) == {'url', 'date', 'packages'} + kwargs = row["arguments"]["kwargs"] + assert set(kwargs.keys()) == {"url", "date", "packages"} - logger.debug('kwargs: %s', kwargs) - assert isinstance(kwargs['url'], str) + logger.debug("kwargs: %s", kwargs) + assert isinstance(kwargs["url"], str) - assert row['policy'] == 'oneshot' - assert row['priority'] is None + assert row["policy"] == "oneshot" + assert row["priority"] is None diff --git a/swh/lister/debian/tests/test_models.py b/swh/lister/debian/tests/test_models.py index 701d573..43d1555 100644 --- a/swh/lister/debian/tests/test_models.py +++ b/swh/lister/debian/tests/test_models.py @@ -1,41 +1,32 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.lister.debian.models import Distribution, Area def test_area_index_uris_deb(session): d = Distribution( - name='Debian', type='deb', mirror_uri='http://deb.debian.org/debian' - ) - a = Area( - distribution=d, - name='unstable/main', - active=True, + name="Debian", type="deb", mirror_uri="http://deb.debian.org/debian" ) + a = Area(distribution=d, name="unstable/main", active=True,) session.add_all([d, a]) session.commit() uris = list(a.index_uris()) assert uris def test_area_index_uris_rpm(session): d = Distribution( - name='CentOS', type='rpm', - mirror_uri='http://centos.mirrors.proxad.net/' - ) - a = Area( - distribution=d, - name='8', - active=True, + name="CentOS", type="rpm", mirror_uri="http://centos.mirrors.proxad.net/" ) + a = Area(distribution=d, name="8", active=True,) session.add_all([d, a]) session.commit() with pytest.raises(NotImplementedError): list(a.index_uris()) diff --git a/swh/lister/debian/tests/test_tasks.py b/swh/lister/debian/tests/test_tasks.py index 7a6e97f..4fb08bd 100644 --- a/swh/lister/debian/tests/test_tasks.py +++ b/swh/lister/debian/tests/test_tasks.py @@ -1,31 +1,29 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from unittest.mock import patch def test_ping(swh_app, celery_session_worker): - res = swh_app.send_task( - 'swh.lister.debian.tasks.ping') + res = swh_app.send_task("swh.lister.debian.tasks.ping") assert res res.wait() assert res.successful() - assert res.result == 'OK' + assert res.result == "OK" -@patch('swh.lister.debian.tasks.DebianLister') +@patch("swh.lister.debian.tasks.DebianLister") def test_lister(lister, swh_app, celery_session_worker): # setup the mocked DebianLister lister.return_value = lister lister.run.return_value = None - res = swh_app.send_task( - 'swh.lister.debian.tasks.DebianListerTask', ('stretch',)) + res = swh_app.send_task("swh.lister.debian.tasks.DebianListerTask", ("stretch",)) assert res res.wait() assert res.successful() - lister.assert_called_once_with(distribution='stretch') + lister.assert_called_once_with(distribution="stretch") lister.run.assert_called_once_with() diff --git a/swh/lister/debian/utils.py b/swh/lister/debian/utils.py index 19a3e97..f6c4ca8 100644 --- a/swh/lister/debian/utils.py +++ b/swh/lister/debian/utils.py @@ -1,81 +1,83 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import click from swh.lister.debian.models import Distribution, Area, SQLBase from swh.lister.debian.lister import DebianLister @click.group() -@click.option('--verbose/--no-verbose', default=False) +@click.option("--verbose/--no-verbose", default=False) @click.pass_context def cli(ctx, verbose): - ctx.obj['lister'] = DebianLister() + ctx.obj["lister"] = DebianLister() if verbose: loglevel = logging.DEBUG - logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) + logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO) else: loglevel = logging.INFO logging.basicConfig( - format='%(asctime)s %(process)d %(levelname)s %(message)s', - level=loglevel, + format="%(asctime)s %(process)d %(levelname)s %(message)s", level=loglevel, ) @cli.command() @click.pass_context def create_schema(ctx): """Create the schema from the models""" - SQLBase.metadata.create_all(ctx.obj['lister'].db_engine) + SQLBase.metadata.create_all(ctx.obj["lister"].db_engine) @cli.command() -@click.option('--name', help='The name of the distribution') -@click.option('--type', help='The type of distribution') -@click.option('--mirror-uri', help='The URL to the mirror of the distribution') -@click.option('--area', help='The areas for the distribution', - multiple=True) +@click.option("--name", help="The name of the distribution") +@click.option("--type", help="The type of distribution") +@click.option("--mirror-uri", help="The URL to the mirror of the distribution") +@click.option("--area", help="The areas for the distribution", multiple=True) @click.pass_context def create_distribution(ctx, name, type, mirror_uri, area): to_add = [] - db_session = ctx.obj['lister'].db_session - d = db_session.query(Distribution)\ - .filter(Distribution.name == name)\ - .filter(Distribution.type == type)\ - .one_or_none() + db_session = ctx.obj["lister"].db_session + d = ( + db_session.query(Distribution) + .filter(Distribution.name == name) + .filter(Distribution.type == type) + .one_or_none() + ) if not d: d = Distribution(name=name, type=type, mirror_uri=mirror_uri) to_add.append(d) for area_name in area: a = None if d.id: - a = db_session.query(Area)\ - .filter(Area.distribution == d)\ - .filter(Area.name == area_name)\ - .one_or_none() + a = ( + db_session.query(Area) + .filter(Area.distribution == d) + .filter(Area.name == area_name) + .one_or_none() + ) if not a: a = Area(name=area_name, distribution=d) to_add.append(a) db_session.add_all(to_add) db_session.commit() @cli.command() -@click.option('--name', help='The name of the distribution') +@click.option("--name", help="The name of the distribution") @click.pass_context def list_distribution(ctx, name): """List the distribution""" - ctx.obj['lister'].run(name) + ctx.obj["lister"].run(name) -if __name__ == '__main__': +if __name__ == "__main__": cli(obj={}) diff --git a/swh/lister/github/__init__.py b/swh/lister/github/__init__.py index 13f4688..1704bc8 100644 --- a/swh/lister/github/__init__.py +++ b/swh/lister/github/__init__.py @@ -1,13 +1,14 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def register(): from .models import GitHubModel from .lister import GitHubLister - return {'models': [GitHubModel], - 'lister': GitHubLister, - 'task_modules': ['%s.tasks' % __name__], - } + return { + "models": [GitHubModel], + "lister": GitHubLister, + "task_modules": ["%s.tasks" % __name__], + } diff --git a/swh/lister/github/lister.py b/swh/lister/github/lister.py index 066b884..0f3b71d 100644 --- a/swh/lister/github/lister.py +++ b/swh/lister/github/lister.py @@ -1,79 +1,75 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re from typing import Any, Dict, List, Tuple, Optional from swh.lister.core.indexing_lister import IndexingHttpLister from swh.lister.github.models import GitHubModel from requests import Response class GitHubLister(IndexingHttpLister): - PATH_TEMPLATE = '/repositories?since=%d' + PATH_TEMPLATE = "/repositories?since=%d" MODEL = GitHubModel - DEFAULT_URL = 'https://api.github.com' - API_URL_INDEX_RE = re.compile(r'^.*/repositories\?since=(\d+)') - LISTER_NAME = 'github' - instance = 'github' # There is only 1 instance of such lister + DEFAULT_URL = "https://api.github.com" + API_URL_INDEX_RE = re.compile(r"^.*/repositories\?since=(\d+)") + LISTER_NAME = "github" + instance = "github" # There is only 1 instance of such lister default_min_bound = 0 # type: Any def get_model_from_repo(self, repo: Dict[str, Any]) -> Dict[str, Any]: return { - 'uid': repo['id'], - 'indexable': repo['id'], - 'name': repo['name'], - 'full_name': repo['full_name'], - 'html_url': repo['html_url'], - 'origin_url': repo['html_url'], - 'origin_type': 'git', - 'fork': repo['fork'], + "uid": repo["id"], + "indexable": repo["id"], + "name": repo["name"], + "full_name": repo["full_name"], + "html_url": repo["html_url"], + "origin_url": repo["html_url"], + "origin_type": "git", + "fork": repo["fork"], } def transport_quota_check(self, response: Response) -> Tuple[bool, int]: - x_rate_limit_remaining = response.headers.get('X-RateLimit-Remaining') + x_rate_limit_remaining = response.headers.get("X-RateLimit-Remaining") if not x_rate_limit_remaining: return False, 0 reqs_remaining = int(x_rate_limit_remaining) if response.status_code == 403 and reqs_remaining == 0: - delay = int(response.headers['Retry-After']) + delay = int(response.headers["Retry-After"]) return True, delay return False, 0 - def get_next_target_from_response(self, - response: Response) -> Optional[int]: - if 'next' in response.links: - next_url = response.links['next']['url'] - return int( - self.API_URL_INDEX_RE.match(next_url).group(1)) # type: ignore + def get_next_target_from_response(self, response: Response) -> Optional[int]: + if "next" in response.links: + next_url = response.links["next"]["url"] + return int(self.API_URL_INDEX_RE.match(next_url).group(1)) # type: ignore return None - def transport_response_simplified(self, response: Response - ) -> List[Dict[str, Any]]: + def transport_response_simplified(self, response: Response) -> List[Dict[str, Any]]: repos = response.json() - return [self.get_model_from_repo(repo) - for repo in repos if repo and 'id' in repo] + return [ + self.get_model_from_repo(repo) for repo in repos if repo and "id" in repo + ] def request_headers(self) -> Dict[str, Any]: """(Override) Set requests headers to send when querying the GitHub API """ headers = super().request_headers() - headers['Accept'] = 'application/vnd.github.v3+json' + headers["Accept"] = "application/vnd.github.v3+json" return headers - def disable_deleted_repo_tasks(self, index: int, - next_index: int, keep_these: int): + def disable_deleted_repo_tasks(self, index: int, next_index: int, keep_these: int): """ (Overrides) Fix provided index value to avoid erroneously disabling some scheduler tasks """ # Next listed repository ids are strictly greater than the 'since' # parameter, so increment the index to avoid disabling the latest # created task when processing a new repositories page returned by # the Github API - return super().disable_deleted_repo_tasks(index + 1, next_index, - keep_these) + return super().disable_deleted_repo_tasks(index + 1, next_index, keep_these) diff --git a/swh/lister/github/models.py b/swh/lister/github/models.py index 47df1a3..58de011 100644 --- a/swh/lister/github/models.py +++ b/swh/lister/github/models.py @@ -1,16 +1,17 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from sqlalchemy import Column, Boolean, Integer from swh.lister.core.models import IndexingModelBase class GitHubModel(IndexingModelBase): """a GitHub repository""" - __tablename__ = 'github_repo' + + __tablename__ = "github_repo" uid = Column(Integer, primary_key=True) indexable = Column(Integer, index=True) fork = Column(Boolean, default=False) diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py index 1b9f37e..e9d82a4 100644 --- a/swh/lister/github/tasks.py +++ b/swh/lister/github/tasks.py @@ -1,53 +1,54 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group, shared_task from swh.lister.github.lister import GitHubLister GROUP_SPLIT = 10000 -@shared_task(name=__name__ + '.IncrementalGitHubLister') +@shared_task(name=__name__ + ".IncrementalGitHubLister") def list_github_incremental(**lister_args): - 'Incremental update of GitHub' + "Incremental update of GitHub" lister = GitHubLister(**lister_args) return lister.run(min_bound=lister.db_last_index(), max_bound=None) -@shared_task(name=__name__ + '.RangeGitHubLister') +@shared_task(name=__name__ + ".RangeGitHubLister") def _range_github_lister(start, end, **lister_args): lister = GitHubLister(**lister_args) return lister.run(min_bound=start, max_bound=end) -@shared_task(name=__name__ + '.FullGitHubRelister', bind=True) +@shared_task(name=__name__ + ".FullGitHubRelister", bind=True) def list_github_full(self, split=None, **lister_args): """Full update of GitHub It's not to be called for an initial listing. """ lister = GitHubLister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) if not ranges: - self.log.info('Nothing to list') + self.log.info("Nothing to list") return random.shuffle(ranges) - promise = group(_range_github_lister.s(minv, maxv, **lister_args) - for minv, maxv in ranges)() - self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) + promise = group( + _range_github_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges + )() + self.log.debug("%s OK (spawned %s subtasks)" % (self.name, len(ranges))) try: promise.save() # so that we can restore the GroupResult in tests except (NotImplementedError, AttributeError): - self.log.info('Unable to call save_group with current result backend.') + self.log.info("Unable to call save_group with current result backend.") # FIXME: what to do in terms of return here? return promise.id -@shared_task(name=__name__ + '.ping') +@shared_task(name=__name__ + ".ping") def _ping(): - return 'OK' + return "OK" diff --git a/swh/lister/github/tests/test_lister.py b/swh/lister/github/tests/test_lister.py index c0b2711..f33d721 100644 --- a/swh/lister/github/tests/test_lister.py +++ b/swh/lister/github/tests/test_lister.py @@ -1,78 +1,83 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import unittest import requests_mock from swh.lister.core.tests.test_lister import HttpListerTester from swh.lister.github.lister import GitHubLister class GitHubListerTester(HttpListerTester, unittest.TestCase): Lister = GitHubLister - test_re = re.compile(r'/repositories\?since=([^?&]+)') - lister_subdir = 'github' - good_api_response_file = 'data/https_api.github.com/first_response.json' - bad_api_response_file = 'data/https_api.github.com/empty_response.json' + test_re = re.compile(r"/repositories\?since=([^?&]+)") + lister_subdir = "github" + good_api_response_file = "data/https_api.github.com/first_response.json" + bad_api_response_file = "data/https_api.github.com/empty_response.json" first_index = 0 last_index = 369 entries_per_page = 100 convert_type = int def response_headers(self, request): - headers = {'X-RateLimit-Remaining': '1'} + headers = {"X-RateLimit-Remaining": "1"} if self.request_index(request) == self.first_index: - headers.update({ - 'Link': ';' - ' rel="next",' - ';' - ' rel="first"' % self.last_index - }) + headers.update( + { + "Link": ";" + ' rel="next",' + ";" + ' rel="first"' % self.last_index + } + ) else: - headers.update({ - 'Link': ';' - ' rel="first"' - }) + headers.update( + { + "Link": ";" + ' rel="first"' + } + ) return headers def mock_rate_quota(self, n, request, context): self.rate_limit += 1 context.status_code = 403 - context.headers['X-RateLimit-Remaining'] = '0' - context.headers['Retry-After'] = '1' # 1 second + context.headers["X-RateLimit-Remaining"] = "0" + context.headers["Retry-After"] = "1" # 1 second return '{"error":"dummy"}' @requests_mock.Mocker() def test_scheduled_tasks(self, http_mocker): self.scheduled_tasks_test( - 'data/https_api.github.com/next_response.json', 876, http_mocker) + "data/https_api.github.com/next_response.json", 876, http_mocker + ) def test_lister_github(swh_listers, requests_mock_datadir): """Simple github listing should create scheduled tasks """ - lister = swh_listers['github'] + lister = swh_listers["github"] lister.run() - r = lister.scheduler.search_tasks(task_type='load-git') + r = lister.scheduler.search_tasks(task_type="load-git") assert len(r) == 100 for row in r: - assert row['type'] == 'load-git' + assert row["type"] == "load-git" # arguments check - args = row['arguments']['args'] + args = row["arguments"]["args"] assert len(args) == 0 # kwargs - kwargs = row['arguments']['kwargs'] - url = kwargs['url'] - assert url.startswith('https://github.com') + kwargs = row["arguments"]["kwargs"] + url = kwargs["url"] + assert url.startswith("https://github.com") - assert row['policy'] == 'recurring' - assert row['priority'] is None + assert row["policy"] == "recurring" + assert row["priority"] is None diff --git a/swh/lister/github/tests/test_tasks.py b/swh/lister/github/tests/test_tasks.py index c652404..721d88d 100644 --- a/swh/lister/github/tests/test_tasks.py +++ b/swh/lister/github/tests/test_tasks.py @@ -1,90 +1,87 @@ from time import sleep from celery.result import GroupResult from unittest.mock import patch def test_ping(swh_app, celery_session_worker): - res = swh_app.send_task( - 'swh.lister.github.tasks.ping') + res = swh_app.send_task("swh.lister.github.tasks.ping") assert res res.wait() assert res.successful() - assert res.result == 'OK' + assert res.result == "OK" -@patch('swh.lister.github.tasks.GitHubLister') +@patch("swh.lister.github.tasks.GitHubLister") def test_incremental(lister, swh_app, celery_session_worker): # setup the mocked GitHubLister lister.return_value = lister lister.db_last_index.return_value = 42 lister.run.return_value = None - res = swh_app.send_task( - 'swh.lister.github.tasks.IncrementalGitHubLister') + res = swh_app.send_task("swh.lister.github.tasks.IncrementalGitHubLister") assert res res.wait() assert res.successful() lister.assert_called_once_with() lister.db_last_index.assert_called_once_with() lister.run.assert_called_once_with(min_bound=42, max_bound=None) -@patch('swh.lister.github.tasks.GitHubLister') +@patch("swh.lister.github.tasks.GitHubLister") def test_range(lister, swh_app, celery_session_worker): # setup the mocked GitHubLister lister.return_value = lister lister.run.return_value = None res = swh_app.send_task( - 'swh.lister.github.tasks.RangeGitHubLister', - kwargs=dict(start=12, end=42)) + "swh.lister.github.tasks.RangeGitHubLister", kwargs=dict(start=12, end=42) + ) assert res res.wait() assert res.successful() lister.assert_called_once_with() lister.db_last_index.assert_not_called() lister.run.assert_called_once_with(min_bound=12, max_bound=42) -@patch('swh.lister.github.tasks.GitHubLister') +@patch("swh.lister.github.tasks.GitHubLister") def test_relister(lister, swh_app, celery_session_worker): # setup the mocked GitHubLister lister.return_value = lister lister.run.return_value = None - lister.db_partition_indices.return_value = [ - (i, i+9) for i in range(0, 50, 10)] + lister.db_partition_indices.return_value = [(i, i + 9) for i in range(0, 50, 10)] - res = swh_app.send_task( - 'swh.lister.github.tasks.FullGitHubRelister') + res = swh_app.send_task("swh.lister.github.tasks.FullGitHubRelister") assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) lister.assert_called_with() # one by the FullGitHubRelister task # + 5 for the RangeGitHubLister subtasks assert lister.call_count == 6 lister.db_last_index.assert_not_called() lister.db_partition_indices.assert_called_once_with(10000) # lister.run should have been called once per partition interval for i in range(5): # XXX inconsistent behavior: max_bound is INCLUDED here - assert (dict(min_bound=10*i, max_bound=10*i + 9),) \ - in lister.run.call_args_list + assert ( + dict(min_bound=10 * i, max_bound=10 * i + 9), + ) in lister.run.call_args_list diff --git a/swh/lister/gitlab/__init__.py b/swh/lister/gitlab/__init__.py index ca2b89b..5ddf416 100644 --- a/swh/lister/gitlab/__init__.py +++ b/swh/lister/gitlab/__init__.py @@ -1,13 +1,14 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def register(): from .models import GitLabModel from .lister import GitLabLister - return {'models': [GitLabModel], - 'lister': GitLabLister, - 'task_modules': ['%s.tasks' % __name__], - } + return { + "models": [GitLabModel], + "lister": GitLabLister, + "task_modules": ["%s.tasks" % __name__], + } diff --git a/swh/lister/gitlab/lister.py b/swh/lister/gitlab/lister.py index d3e45bf..ca70bf4 100644 --- a/swh/lister/gitlab/lister.py +++ b/swh/lister/gitlab/lister.py @@ -1,91 +1,97 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time from urllib3.util import parse_url from ..core.page_by_page_lister import PageByPageHttpLister from .models import GitLabModel from typing import Any, Dict, List, Tuple, Union, MutableMapping, Optional from requests import Response class GitLabLister(PageByPageHttpLister): # Template path expecting an integer that represents the page id - PATH_TEMPLATE = '/projects?page=%d&order_by=id' - DEFAULT_URL = 'https://gitlab.com/api/v4/' + PATH_TEMPLATE = "/projects?page=%d&order_by=id" + DEFAULT_URL = "https://gitlab.com/api/v4/" MODEL = GitLabModel - LISTER_NAME = 'gitlab' + LISTER_NAME = "gitlab" - def __init__(self, url=None, instance=None, - override_config=None, sort='asc', per_page=20): + def __init__( + self, url=None, instance=None, override_config=None, sort="asc", per_page=20 + ): super().__init__(url=url, override_config=override_config) if instance is None: instance = parse_url(self.url).host self.instance = instance - self.PATH_TEMPLATE = '%s&sort=%s&per_page=%s' % ( - self.PATH_TEMPLATE, sort, per_page) + self.PATH_TEMPLATE = "%s&sort=%s&per_page=%s" % ( + self.PATH_TEMPLATE, + sort, + per_page, + ) def uid(self, repo: Dict[str, Any]) -> str: - return '%s/%s' % (self.instance, repo['path_with_namespace']) + return "%s/%s" % (self.instance, repo["path_with_namespace"]) def get_model_from_repo(self, repo: Dict[str, Any]) -> Dict[str, Any]: return { - 'instance': self.instance, - 'uid': self.uid(repo), - 'name': repo['name'], - 'full_name': repo['path_with_namespace'], - 'html_url': repo['web_url'], - 'origin_url': repo['http_url_to_repo'], - 'origin_type': 'git', + "instance": self.instance, + "uid": self.uid(repo), + "name": repo["name"], + "full_name": repo["path_with_namespace"], + "html_url": repo["web_url"], + "origin_url": repo["http_url_to_repo"], + "origin_type": "git", } - def transport_quota_check(self, response: Response - ) -> Tuple[bool, Union[int, float]]: + def transport_quota_check( + self, response: Response + ) -> Tuple[bool, Union[int, float]]: """Deal with rate limit if any. """ # not all gitlab instance have rate limit - if 'RateLimit-Remaining' in response.headers: - reqs_remaining = int(response.headers['RateLimit-Remaining']) + if "RateLimit-Remaining" in response.headers: + reqs_remaining = int(response.headers["RateLimit-Remaining"]) if response.status_code == 403 and reqs_remaining == 0: - reset_at = int(response.headers['RateLimit-Reset']) + reset_at = int(response.headers["RateLimit-Reset"]) delay = min(reset_at - time.time(), 3600) return True, delay return False, 0 - def _get_int(self, headers: MutableMapping[str, Any], - key: str) -> Optional[int]: + def _get_int(self, headers: MutableMapping[str, Any], key: str) -> Optional[int]: _val = headers.get(key) if _val: return int(_val) return None - def get_next_target_from_response( - self, response: Response) -> Optional[int]: + def get_next_target_from_response(self, response: Response) -> Optional[int]: """Determine the next page identifier. """ - return self._get_int(response.headers, 'x-next-page') + return self._get_int(response.headers, "x-next-page") - def get_pages_information(self) -> Tuple[Optional[int], - Optional[int], Optional[int]]: + def get_pages_information( + self, + ) -> Tuple[Optional[int], Optional[int], Optional[int]]: """Determine pages information. """ response = self.transport_head(identifier=1) # type: ignore if not response.ok: raise ValueError( - 'Problem during information fetch: %s' % response.status_code) + "Problem during information fetch: %s" % response.status_code + ) h = response.headers - return (self._get_int(h, 'x-total'), - self._get_int(h, 'x-total-pages'), - self._get_int(h, 'x-per-page')) + return ( + self._get_int(h, "x-total"), + self._get_int(h, "x-total-pages"), + self._get_int(h, "x-per-page"), + ) - def transport_response_simplified(self, response: Response - ) -> List[Dict[str, Any]]: + def transport_response_simplified(self, response: Response) -> List[Dict[str, Any]]: repos = response.json() return [self.get_model_from_repo(repo) for repo in repos] diff --git a/swh/lister/gitlab/models.py b/swh/lister/gitlab/models.py index 1302e67..d1907f2 100644 --- a/swh/lister/gitlab/models.py +++ b/swh/lister/gitlab/models.py @@ -1,17 +1,18 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from sqlalchemy import Column, String from ..core.models import ModelBase class GitLabModel(ModelBase): """a Gitlab repository from a gitlab instance """ - __tablename__ = 'gitlab_repo' + + __tablename__ = "gitlab_repo" uid = Column(String, primary_key=True) instance = Column(String, index=True) diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py index e6a1755..85866bd 100644 --- a/swh/lister/gitlab/tasks.py +++ b/swh/lister/gitlab/tasks.py @@ -1,52 +1,53 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group, shared_task from .. import utils from .lister import GitLabLister NBPAGES = 10 -@shared_task(name=__name__ + '.IncrementalGitLabLister') +@shared_task(name=__name__ + ".IncrementalGitLabLister") def list_gitlab_incremental(**lister_args): """Incremental update of a GitLab instance""" - lister_args['sort'] = 'desc' + lister_args["sort"] = "desc" lister = GitLabLister(**lister_args) total_pages = lister.get_pages_information()[1] # stopping as soon as existing origins for that instance are detected return lister.run(min_bound=1, max_bound=total_pages, check_existence=True) -@shared_task(name=__name__ + '.RangeGitLabLister') +@shared_task(name=__name__ + ".RangeGitLabLister") def _range_gitlab_lister(start, end, **lister_args): lister = GitLabLister(**lister_args) return lister.run(min_bound=start, max_bound=end) -@shared_task(name=__name__ + '.FullGitLabRelister', bind=True) +@shared_task(name=__name__ + ".FullGitLabRelister", bind=True) def list_gitlab_full(self, **lister_args): """Full update of a GitLab instance""" lister = GitLabLister(**lister_args) _, total_pages, _ = lister.get_pages_information() ranges = list(utils.split_range(total_pages, NBPAGES)) random.shuffle(ranges) - promise = group(_range_gitlab_lister.s(minv, maxv, **lister_args) - for minv, maxv in ranges)() - self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) + promise = group( + _range_gitlab_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges + )() + self.log.debug("%s OK (spawned %s subtasks)" % (self.name, len(ranges))) try: promise.save() except (NotImplementedError, AttributeError): - self.log.info('Unable to call save_group with current result backend.') + self.log.info("Unable to call save_group with current result backend.") # FIXME: what to do in terms of return here? return promise.id -@shared_task(name=__name__ + '.ping') +@shared_task(name=__name__ + ".ping") def _ping(): - return 'OK' + return "OK" diff --git a/swh/lister/gitlab/tests/test_lister.py b/swh/lister/gitlab/tests/test_lister.py index 0d02423..041e969 100644 --- a/swh/lister/gitlab/tests/test_lister.py +++ b/swh/lister/gitlab/tests/test_lister.py @@ -1,66 +1,66 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import re import unittest from datetime import datetime, timedelta from swh.lister.core.tests.test_lister import HttpListerTesterBase from swh.lister.gitlab.lister import GitLabLister logger = logging.getLogger(__name__) class GitLabListerTester(HttpListerTesterBase, unittest.TestCase): Lister = GitLabLister - test_re = re.compile(r'^.*/projects.*page=(\d+).*') - lister_subdir = 'gitlab' - good_api_response_file = 'data/gitlab.com/api_response.json' - bad_api_response_file = 'data/gitlab.com/api_empty_response.json' + test_re = re.compile(r"^.*/projects.*page=(\d+).*") + lister_subdir = "gitlab" + good_api_response_file = "data/gitlab.com/api_response.json" + bad_api_response_file = "data/gitlab.com/api_empty_response.json" first_index = 1 entries_per_page = 10 convert_type = int def response_headers(self, request): - headers = {'RateLimit-Remaining': '1'} + headers = {"RateLimit-Remaining": "1"} if self.request_index(request) == self.first_index: - headers.update({ - 'x-next-page': '3', - }) + headers.update( + {"x-next-page": "3",} + ) return headers def mock_rate_quota(self, n, request, context): self.rate_limit += 1 context.status_code = 403 - context.headers['RateLimit-Remaining'] = '0' + context.headers["RateLimit-Remaining"] = "0" one_second = int((datetime.now() + timedelta(seconds=1.5)).timestamp()) - context.headers['RateLimit-Reset'] = str(one_second) + context.headers["RateLimit-Reset"] = str(one_second) return '{"error":"dummy"}' def test_lister_gitlab(swh_listers, requests_mock_datadir): - lister = swh_listers['gitlab'] + lister = swh_listers["gitlab"] lister.run() - r = lister.scheduler.search_tasks(task_type='load-git') + r = lister.scheduler.search_tasks(task_type="load-git") assert len(r) == 10 for row in r: - assert row['type'] == 'load-git' + assert row["type"] == "load-git" # arguments check - args = row['arguments']['args'] + args = row["arguments"]["args"] assert len(args) == 0 # kwargs - kwargs = row['arguments']['kwargs'] - url = kwargs['url'] - assert url.startswith('https://gitlab.com') + kwargs = row["arguments"]["kwargs"] + url = kwargs["url"] + assert url.startswith("https://gitlab.com") - assert row['policy'] == 'recurring' - assert row['priority'] is None + assert row["policy"] == "recurring" + assert row["priority"] is None diff --git a/swh/lister/gitlab/tests/test_tasks.py b/swh/lister/gitlab/tests/test_tasks.py index 56332a1..1959989 100644 --- a/swh/lister/gitlab/tests/test_tasks.py +++ b/swh/lister/gitlab/tests/test_tasks.py @@ -1,142 +1,141 @@ from time import sleep from celery.result import GroupResult from unittest.mock import patch def test_ping(swh_app, celery_session_worker): - res = swh_app.send_task( - 'swh.lister.gitlab.tasks.ping') + res = swh_app.send_task("swh.lister.gitlab.tasks.ping") assert res res.wait() assert res.successful() - assert res.result == 'OK' + assert res.result == "OK" -@patch('swh.lister.gitlab.tasks.GitLabLister') +@patch("swh.lister.gitlab.tasks.GitLabLister") def test_incremental(lister, swh_app, celery_session_worker): # setup the mocked GitlabLister lister.return_value = lister lister.run.return_value = None lister.get_pages_information.return_value = (None, 10, None) - res = swh_app.send_task( - 'swh.lister.gitlab.tasks.IncrementalGitLabLister') + res = swh_app.send_task("swh.lister.gitlab.tasks.IncrementalGitLabLister") assert res res.wait() assert res.successful() - lister.assert_called_once_with(sort='desc') + lister.assert_called_once_with(sort="desc") lister.db_last_index.assert_not_called() lister.get_pages_information.assert_called_once_with() - lister.run.assert_called_once_with( - min_bound=1, max_bound=10, check_existence=True) + lister.run.assert_called_once_with(min_bound=1, max_bound=10, check_existence=True) -@patch('swh.lister.gitlab.tasks.GitLabLister') +@patch("swh.lister.gitlab.tasks.GitLabLister") def test_range(lister, swh_app, celery_session_worker): # setup the mocked GitlabLister lister.return_value = lister lister.run.return_value = None res = swh_app.send_task( - 'swh.lister.gitlab.tasks.RangeGitLabLister', - kwargs=dict(start=12, end=42)) + "swh.lister.gitlab.tasks.RangeGitLabLister", kwargs=dict(start=12, end=42) + ) assert res res.wait() assert res.successful() lister.assert_called_once_with() lister.db_last_index.assert_not_called() lister.run.assert_called_once_with(min_bound=12, max_bound=42) -@patch('swh.lister.gitlab.tasks.GitLabLister') +@patch("swh.lister.gitlab.tasks.GitLabLister") def test_relister(lister, swh_app, celery_session_worker): # setup the mocked GitlabLister lister.return_value = lister lister.run.return_value = None lister.get_pages_information.return_value = (None, 85, None) lister.db_partition_indices.return_value = [ - (i, i+9) for i in range(0, 80, 10)] + [(80, 85)] + (i, i + 9) for i in range(0, 80, 10) + ] + [(80, 85)] - res = swh_app.send_task( - 'swh.lister.gitlab.tasks.FullGitLabRelister') + res = swh_app.send_task("swh.lister.gitlab.tasks.FullGitLabRelister") assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) lister.assert_called_with() # one by the FullGitlabRelister task # + 9 for the RangeGitlabLister subtasks assert lister.call_count == 10 lister.db_last_index.assert_not_called() lister.db_partition_indices.assert_not_called() lister.get_pages_information.assert_called_once_with() # lister.run should have been called once per partition interval for i in range(8): # XXX inconsistent behavior: max_bound is EXCLUDED here - assert (dict(min_bound=10*i, max_bound=10*i + 10),) \ - in lister.run.call_args_list - assert (dict(min_bound=80, max_bound=85),) \ - in lister.run.call_args_list + assert ( + dict(min_bound=10 * i, max_bound=10 * i + 10), + ) in lister.run.call_args_list + assert (dict(min_bound=80, max_bound=85),) in lister.run.call_args_list -@patch('swh.lister.gitlab.tasks.GitLabLister') +@patch("swh.lister.gitlab.tasks.GitLabLister") def test_relister_instance(lister, swh_app, celery_session_worker): # setup the mocked GitlabLister lister.return_value = lister lister.run.return_value = None lister.get_pages_information.return_value = (None, 85, None) lister.db_partition_indices.return_value = [ - (i, i+9) for i in range(0, 80, 10)] + [(80, 85)] + (i, i + 9) for i in range(0, 80, 10) + ] + [(80, 85)] res = swh_app.send_task( - 'swh.lister.gitlab.tasks.FullGitLabRelister', - kwargs=dict(url='https://0xacab.org/api/v4')) + "swh.lister.gitlab.tasks.FullGitLabRelister", + kwargs=dict(url="https://0xacab.org/api/v4"), + ) assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) - lister.assert_called_with(url='https://0xacab.org/api/v4') + lister.assert_called_with(url="https://0xacab.org/api/v4") # one by the FullGitlabRelister task # + 9 for the RangeGitlabLister subtasks assert lister.call_count == 10 lister.db_last_index.assert_not_called() lister.db_partition_indices.assert_not_called() lister.get_pages_information.assert_called_once_with() # lister.run should have been called once per partition interval for i in range(8): # XXX inconsistent behavior: max_bound is EXCLUDED here - assert (dict(min_bound=10*i, max_bound=10*i + 10),) \ - in lister.run.call_args_list - assert (dict(min_bound=80, max_bound=85),) \ - in lister.run.call_args_list + assert ( + dict(min_bound=10 * i, max_bound=10 * i + 10), + ) in lister.run.call_args_list + assert (dict(min_bound=80, max_bound=85),) in lister.run.call_args_list diff --git a/swh/lister/gnu/__init__.py b/swh/lister/gnu/__init__.py index 7787464..8ff20bb 100644 --- a/swh/lister/gnu/__init__.py +++ b/swh/lister/gnu/__init__.py @@ -1,13 +1,14 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def register(): from .models import GNUModel from .lister import GNULister - return {'models': [GNUModel], - 'lister': GNULister, - 'task_modules': ['%s.tasks' % __name__], - } + return { + "models": [GNUModel], + "lister": GNULister, + "task_modules": ["%s.tasks" % __name__], + } diff --git a/swh/lister/gnu/lister.py b/swh/lister/gnu/lister.py index 1f41b0a..3390078 100644 --- a/swh/lister/gnu/lister.py +++ b/swh/lister/gnu/lister.py @@ -1,113 +1,113 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.scheduler import utils from swh.lister.core.simple_lister import SimpleLister from swh.lister.gnu.models import GNUModel from swh.lister.gnu.tree import GNUTree from typing import Any, Dict, List from requests import Response logger = logging.getLogger(__name__) class GNULister(SimpleLister): MODEL = GNUModel - LISTER_NAME = 'gnu' - instance = 'gnu' + LISTER_NAME = "gnu" + instance = "gnu" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.gnu_tree = GNUTree('https://ftp.gnu.org/tree.json.gz') + self.gnu_tree = GNUTree("https://ftp.gnu.org/tree.json.gz") def task_dict(self, origin_type, origin_url, **kwargs): """Return task format dict This is overridden from the lister_base as more information is needed for the ingestion task creation. This creates tasks with args and kwargs set, for example: .. code-block:: python args: kwargs: { 'url': 'https://ftp.gnu.org/gnu/3dldf/', 'artifacts': [{ 'url': 'https://...', 'time': '2003-12-09T21:43:20+00:00', 'length': 128, 'version': '1.0.1', 'filename': 'something-1.0.1.tar.gz', }, ... ] } """ artifacts = self.gnu_tree.artifacts[origin_url] - assert origin_type == 'tar' + assert origin_type == "tar" return utils.create_task_dict( - 'load-archive-files', - kwargs.get('policy', 'oneshot'), + "load-archive-files", + kwargs.get("policy", "oneshot"), url=origin_url, artifacts=artifacts, retries_left=3, ) def safely_issue_request(self, identifier: int) -> None: """Bypass the implementation. It's now the GNUTree which deals with querying the gnu mirror. As an implementation detail, we cannot change simply the base SimpleLister as other implementation still uses it. This shall be part of another refactoring pass. """ return None def list_packages(self, response: Response) -> List[Dict[str, Any]]: """List the actual gnu origins (package name) with their name, url and associated tarballs. Args: response: Unused Returns: List of packages name, url, last modification time:: [ { 'name': '3dldf', 'url': 'https://ftp.gnu.org/gnu/3dldf/', 'time_modified': '2003-12-09T20:43:20+00:00' }, { 'name': '8sync', 'url': 'https://ftp.gnu.org/gnu/8sync/', 'time_modified': '2016-12-06T02:37:10+00:00' }, ... ] """ return list(self.gnu_tree.projects.values()) def get_model_from_repo(self, repo: Dict[str, Any]) -> Dict[str, Any]: """Transform from repository representation to model """ return { - 'uid': repo['url'], - 'name': repo['name'], - 'full_name': repo['name'], - 'html_url': repo['url'], - 'origin_url': repo['url'], - 'time_last_updated': repo['time_modified'], - 'origin_type': 'tar', + "uid": repo["url"], + "name": repo["name"], + "full_name": repo["name"], + "html_url": repo["url"], + "origin_url": repo["url"], + "time_last_updated": repo["time_modified"], + "origin_type": "tar", } diff --git a/swh/lister/gnu/models.py b/swh/lister/gnu/models.py index 38c47ae..db024f7 100644 --- a/swh/lister/gnu/models.py +++ b/swh/lister/gnu/models.py @@ -1,17 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from sqlalchemy import Column, DateTime, String from ..core.models import ModelBase class GNUModel(ModelBase): """a GNU repository representation """ - __tablename__ = 'gnu_repo' + + __tablename__ = "gnu_repo" uid = Column(String, primary_key=True) time_last_updated = Column(DateTime) diff --git a/swh/lister/gnu/tasks.py b/swh/lister/gnu/tasks.py index edcde7e..3134582 100644 --- a/swh/lister/gnu/tasks.py +++ b/swh/lister/gnu/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from .lister import GNULister -@shared_task(name=__name__ + '.GNUListerTask') +@shared_task(name=__name__ + ".GNUListerTask") def list_gnu_full(**lister_args): """List lister for the GNU source code archive""" return GNULister(**lister_args).run() -@shared_task(name=__name__ + '.ping') +@shared_task(name=__name__ + ".ping") def _ping(): - return 'OK' + return "OK" diff --git a/swh/lister/gnu/tests/test_lister.py b/swh/lister/gnu/tests/test_lister.py index a1c9c09..92a9bc4 100644 --- a/swh/lister/gnu/tests/test_lister.py +++ b/swh/lister/gnu/tests/test_lister.py @@ -1,52 +1,50 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging logger = logging.getLogger(__name__) def test_gnu_lister(swh_listers, requests_mock_datadir): - lister = swh_listers['gnu'] + lister = swh_listers["gnu"] lister.run() - r = lister.scheduler.search_tasks(task_type='load-archive-files') + r = lister.scheduler.search_tasks(task_type="load-archive-files") assert len(r) == 383 for row in r: - assert row['type'] == 'load-archive-files' + assert row["type"] == "load-archive-files" # arguments check - args = row['arguments']['args'] + args = row["arguments"]["args"] assert len(args) == 0 # kwargs - kwargs = row['arguments']['kwargs'] - assert set(kwargs.keys()) == {'url', 'artifacts'} + kwargs = row["arguments"]["kwargs"] + assert set(kwargs.keys()) == {"url", "artifacts"} - url = kwargs['url'] - assert url.startswith('https://ftp.gnu.org') + url = kwargs["url"] + assert url.startswith("https://ftp.gnu.org") - url_suffix = url.split('https://ftp.gnu.org')[1] - assert 'gnu' in url_suffix or 'old-gnu' in url_suffix + url_suffix = url.split("https://ftp.gnu.org")[1] + assert "gnu" in url_suffix or "old-gnu" in url_suffix - artifacts = kwargs['artifacts'] + artifacts = kwargs["artifacts"] # check the artifact's structure artifact = artifacts[0] - assert set(artifact.keys()) == { - 'url', 'length', 'time', 'filename', 'version' - } + assert set(artifact.keys()) == {"url", "length", "time", "filename", "version"} for artifact in artifacts: logger.debug(artifact) # 'time' is an isoformat string now - for key in ['url', 'time', 'filename', 'version']: + for key in ["url", "time", "filename", "version"]: assert isinstance(artifact[key], str) - assert isinstance(artifact['length'], int) + assert isinstance(artifact["length"], int) - assert row['policy'] == 'oneshot' - assert row['priority'] is None - assert row['retries_left'] == 3 + assert row["policy"] == "oneshot" + assert row["priority"] is None + assert row["retries_left"] == 3 diff --git a/swh/lister/gnu/tests/test_tasks.py b/swh/lister/gnu/tests/test_tasks.py index 4c82f77..d496798 100644 --- a/swh/lister/gnu/tests/test_tasks.py +++ b/swh/lister/gnu/tests/test_tasks.py @@ -1,27 +1,25 @@ from unittest.mock import patch def test_ping(swh_app, celery_session_worker): - res = swh_app.send_task( - 'swh.lister.gnu.tasks.ping') + res = swh_app.send_task("swh.lister.gnu.tasks.ping") assert res res.wait() assert res.successful() - assert res.result == 'OK' + assert res.result == "OK" -@patch('swh.lister.gnu.tasks.GNULister') +@patch("swh.lister.gnu.tasks.GNULister") def test_lister(lister, swh_app, celery_session_worker): # setup the mocked GNULister lister.return_value = lister lister.run.return_value = None - res = swh_app.send_task( - 'swh.lister.gnu.tasks.GNUListerTask') + res = swh_app.send_task("swh.lister.gnu.tasks.GNUListerTask") assert res res.wait() assert res.successful() lister.assert_called_once_with() lister.db_last_index.assert_not_called() lister.run.assert_called_once_with() diff --git a/swh/lister/gnu/tests/test_tree.py b/swh/lister/gnu/tests/test_tree.py index ea25515..f09fe9e 100644 --- a/swh/lister/gnu/tests/test_tree.py +++ b/swh/lister/gnu/tests/test_tree.py @@ -1,226 +1,238 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import pytest from os import path from swh.lister.gnu.tree import ( - GNUTree, find_artifacts, check_filename_is_archive, load_raw_data, - get_version, format_date + GNUTree, + find_artifacts, + check_filename_is_archive, + load_raw_data, + get_version, + format_date, ) def test_load_raw_data_from_query(requests_mock_datadir): - actual_json = load_raw_data('https://ftp.gnu.org/tree.json.gz') + actual_json = load_raw_data("https://ftp.gnu.org/tree.json.gz") assert actual_json is not None assert isinstance(actual_json, list) assert len(actual_json) == 2 def test_load_raw_data_from_query_failure(requests_mock_datadir): - inexistant_url = 'https://ftp2.gnu.org/tree.unknown.gz' - with pytest.raises(ValueError, match='Error during query'): + inexistant_url = "https://ftp2.gnu.org/tree.unknown.gz" + with pytest.raises(ValueError, match="Error during query"): load_raw_data(inexistant_url) def test_load_raw_data_from_file(datadir): - filepath = path.join(datadir, 'https_ftp.gnu.org', 'tree.json.gz') + filepath = path.join(datadir, "https_ftp.gnu.org", "tree.json.gz") actual_json = load_raw_data(filepath) assert actual_json is not None assert isinstance(actual_json, list) assert len(actual_json) == 2 def test_load_raw_data_from_file_failure(datadir): - unknown_path = path.join(datadir, 'ftp.gnu.org2', 'tree.json.gz') + unknown_path = path.join(datadir, "ftp.gnu.org2", "tree.json.gz") with pytest.raises(FileNotFoundError): load_raw_data(unknown_path) def test_tree_json(requests_mock_datadir): - tree_json = GNUTree('https://ftp.gnu.org/tree.json.gz') + tree_json = GNUTree("https://ftp.gnu.org/tree.json.gz") - assert tree_json.projects['https://ftp.gnu.org/gnu/8sync/'] == { - 'name': '8sync', - 'time_modified': '2017-03-18T06:10:08+00:00', - 'url': 'https://ftp.gnu.org/gnu/8sync/' + assert tree_json.projects["https://ftp.gnu.org/gnu/8sync/"] == { + "name": "8sync", + "time_modified": "2017-03-18T06:10:08+00:00", + "url": "https://ftp.gnu.org/gnu/8sync/", } - assert tree_json.projects['https://ftp.gnu.org/gnu/3dldf/'] == { - 'name': '3dldf', - 'time_modified': '2013-12-13T19:00:36+00:00', - 'url': 'https://ftp.gnu.org/gnu/3dldf/' + assert tree_json.projects["https://ftp.gnu.org/gnu/3dldf/"] == { + "name": "3dldf", + "time_modified": "2013-12-13T19:00:36+00:00", + "url": "https://ftp.gnu.org/gnu/3dldf/", } - assert tree_json.projects['https://ftp.gnu.org/gnu/a2ps/'] == { - 'name': 'a2ps', - 'time_modified': '2007-12-29T03:55:05+00:00', - 'url': 'https://ftp.gnu.org/gnu/a2ps/' + assert tree_json.projects["https://ftp.gnu.org/gnu/a2ps/"] == { + "name": "a2ps", + "time_modified": "2007-12-29T03:55:05+00:00", + "url": "https://ftp.gnu.org/gnu/a2ps/", } - assert tree_json.projects['https://ftp.gnu.org/old-gnu/xshogi/'] == { - 'name': 'xshogi', - 'time_modified': '2003-08-02T11:15:22+00:00', - 'url': 'https://ftp.gnu.org/old-gnu/xshogi/' + assert tree_json.projects["https://ftp.gnu.org/old-gnu/xshogi/"] == { + "name": "xshogi", + "time_modified": "2003-08-02T11:15:22+00:00", + "url": "https://ftp.gnu.org/old-gnu/xshogi/", } - assert tree_json.artifacts['https://ftp.gnu.org/old-gnu/zlibc/'] == [ + assert tree_json.artifacts["https://ftp.gnu.org/old-gnu/zlibc/"] == [ { - 'url': 'https://ftp.gnu.org/old-gnu/zlibc/zlibc-0.9b.tar.gz', # noqa - 'length': 90106, - 'time': '1997-03-10T08:00:00+00:00', - 'filename': 'zlibc-0.9b.tar.gz', - 'version': '0.9b', + "url": "https://ftp.gnu.org/old-gnu/zlibc/zlibc-0.9b.tar.gz", # noqa + "length": 90106, + "time": "1997-03-10T08:00:00+00:00", + "filename": "zlibc-0.9b.tar.gz", + "version": "0.9b", }, { - 'url': 'https://ftp.gnu.org/old-gnu/zlibc/zlibc-0.9e.tar.gz', # noqa - 'length': 89625, - 'time': '1997-04-07T07:00:00+00:00', - 'filename': 'zlibc-0.9e.tar.gz', - 'version': '0.9e', - } + "url": "https://ftp.gnu.org/old-gnu/zlibc/zlibc-0.9e.tar.gz", # noqa + "length": 89625, + "time": "1997-04-07T07:00:00+00:00", + "filename": "zlibc-0.9e.tar.gz", + "version": "0.9e", + }, ] def test_tree_json_failures(requests_mock_datadir): - url = 'https://unknown/tree.json.gz' + url = "https://unknown/tree.json.gz" tree_json = GNUTree(url) - with pytest.raises(ValueError, match='Error during query to %s' % url): - tree_json.artifacts['https://ftp.gnu.org/gnu/3dldf/'] + with pytest.raises(ValueError, match="Error during query to %s" % url): + tree_json.artifacts["https://ftp.gnu.org/gnu/3dldf/"] - with pytest.raises(ValueError, match='Error during query to %s' % url): - tree_json.projects['https://ftp.gnu.org/old-gnu/xshogi/'] + with pytest.raises(ValueError, match="Error during query to %s" % url): + tree_json.projects["https://ftp.gnu.org/old-gnu/xshogi/"] def test_find_artifacts_small_sample(datadir): expected_artifacts = [ { - 'url': '/root/artanis/artanis-0.2.1.tar.bz2', - 'time': '2017-05-19T14:59:39+00:00', - 'length': 424081, - 'version': '0.2.1', - 'filename': 'artanis-0.2.1.tar.bz2', + "url": "/root/artanis/artanis-0.2.1.tar.bz2", + "time": "2017-05-19T14:59:39+00:00", + "length": 424081, + "version": "0.2.1", + "filename": "artanis-0.2.1.tar.bz2", }, { - 'url': '/root/xboard/winboard/winboard-4_0_0-src.zip', # noqa - 'time': '1998-06-21T09:55:00+00:00', - 'length': 1514448, - 'version': '4_0_0-src', - 'filename': 'winboard-4_0_0-src.zip', + "url": "/root/xboard/winboard/winboard-4_0_0-src.zip", # noqa + "time": "1998-06-21T09:55:00+00:00", + "length": 1514448, + "version": "4_0_0-src", + "filename": "winboard-4_0_0-src.zip", }, { - 'url': '/root/xboard/xboard-3.6.2.tar.gz', # noqa - 'time': '1997-07-25T07:00:00+00:00', - 'length': 450164, - 'version': '3.6.2', - 'filename': 'xboard-3.6.2.tar.gz', + "url": "/root/xboard/xboard-3.6.2.tar.gz", # noqa + "time": "1997-07-25T07:00:00+00:00", + "length": 450164, + "version": "3.6.2", + "filename": "xboard-3.6.2.tar.gz", }, { - 'url': '/root/xboard/xboard-4.0.0.tar.gz', # noqa - 'time': '1998-06-21T09:55:00+00:00', - 'length': 514951, - 'version': '4.0.0', - 'filename': 'xboard-4.0.0.tar.gz', + "url": "/root/xboard/xboard-4.0.0.tar.gz", # noqa + "time": "1998-06-21T09:55:00+00:00", + "length": 514951, + "version": "4.0.0", + "filename": "xboard-4.0.0.tar.gz", }, ] - file_structure = json.load(open(path.join(datadir, 'tree.min.json'))) - actual_artifacts = find_artifacts(file_structure, '/root/') + file_structure = json.load(open(path.join(datadir, "tree.min.json"))) + actual_artifacts = find_artifacts(file_structure, "/root/") assert actual_artifacts == expected_artifacts def test_find_artifacts(datadir): - file_structure = json.load(open(path.join(datadir, 'tree.json'))) - actual_artifacts = find_artifacts(file_structure, '/root/') + file_structure = json.load(open(path.join(datadir, "tree.json"))) + actual_artifacts = find_artifacts(file_structure, "/root/") assert len(actual_artifacts) == 42 + 3 # tar + zip def test_check_filename_is_archive(): - for ext in ['abc.xy.zip', 'cvb.zip', 'abc.tar.bz2', 'something.tar']: + for ext in ["abc.xy.zip", "cvb.zip", "abc.tar.bz2", "something.tar"]: assert check_filename_is_archive(ext) is True - for ext in ['abc.tar.gz.sig', 'abc', 'something.zip2', 'foo.tar.']: + for ext in ["abc.tar.gz.sig", "abc", "something.zip2", "foo.tar."]: assert check_filename_is_archive(ext) is False def test_get_version(): """Parsing version from url should yield some form of "sensible" version Given the dataset, it's not a simple task to extract correctly the version. """ for url, expected_branchname in [ - ('https://gnu.org/sthg/info-2.1.0.tar.gz', '2.1.0'), - ('https://gnu.org/sthg/info-2.1.2.zip', '2.1.2'), - ('https://sthg.org/gnu/sthg.tar.gz', 'sthg'), - ('https://sthg.org/gnu/DLDF-1.1.4.tar.gz', '1.1.4'), - ('https://sthg.org/gnu/anubis-latest.tar.bz2', 'latest'), - ('https://ftp.org/gnu/aris-w32.zip', 'w32'), - ('https://ftp.org/gnu/aris-w32-2.2.zip', 'w32-2.2'), - ('https://ftp.org/gnu/autogen.info.tar.gz', 'autogen.info'), - ('https://ftp.org/gnu/crypto-build-demo.tar.gz', - 'crypto-build-demo'), - ('https://ftp.org/gnu/clue+clio+xit.clisp.tar.gz', - 'clue+clio+xit.clisp'), - ('https://ftp.org/gnu/clue+clio.for-pcl.tar.gz', - 'clue+clio.for-pcl'), - ('https://ftp.org/gnu/clisp-hppa2.0-hp-hpux10.20.tar.gz', - 'hppa2.0-hp-hpux10.20'), - ('clisp-i386-solaris2.6.tar.gz', 'i386-solaris2.6'), - ('clisp-mips-sgi-irix6.5.tar.gz', 'mips-sgi-irix6.5'), - ('clisp-powerpc-apple-macos.tar.gz', 'powerpc-apple-macos'), - ('clisp-powerpc-unknown-linuxlibc6.tar.gz', - 'powerpc-unknown-linuxlibc6'), - - ('clisp-rs6000-ibm-aix3.2.5.tar.gz', 'rs6000-ibm-aix3.2.5'), - ('clisp-sparc-redhat51-linux.tar.gz', 'sparc-redhat51-linux'), - ('clisp-sparc-sun-solaris2.4.tar.gz', 'sparc-sun-solaris2.4'), - ('clisp-sparc-sun-sunos4.1.3_U1.tar.gz', - 'sparc-sun-sunos4.1.3_U1'), - ('clisp-2.25.1-powerpc-apple-MacOSX.tar.gz', - '2.25.1-powerpc-apple-MacOSX'), - ('clisp-2.27-PowerMacintosh-powerpc-Darwin-1.3.7.tar.gz', - '2.27-PowerMacintosh-powerpc-Darwin-1.3.7'), - ('clisp-2.27-i686-unknown-Linux-2.2.19.tar.gz', - '2.27-i686-unknown-Linux-2.2.19'), - ('clisp-2.28-i386-i386-freebsd-4.3-RELEASE.tar.gz', - '2.28-i386-i386-freebsd-4.3-RELEASE'), - ('clisp-2.28-i686-unknown-cygwin_me-4.90-1.3.10.tar.gz', - '2.28-i686-unknown-cygwin_me-4.90-1.3.10'), - ('clisp-2.29-i386-i386-freebsd-4.6-STABLE.tar.gz', - '2.29-i386-i386-freebsd-4.6-STABLE'), - ('clisp-2.29-i686-unknown-cygwin_nt-5.0-1.3.12.tar.gz', - '2.29-i686-unknown-cygwin_nt-5.0-1.3.12'), - ('gcl-2.5.3-ansi-japi-xdr.20030701_mingw32.zip', - '2.5.3-ansi-japi-xdr.20030701_mingw32'), - ('gettext-runtime-0.13.1.bin.woe32.zip', '0.13.1.bin.woe32'), - ('sather-logo_images.tar.gz', 'sather-logo_images'), - ('sather-specification-000328.html.tar.gz', '000328.html'), - ('something-10.1.0.7z', '10.1.0'), - + ("https://gnu.org/sthg/info-2.1.0.tar.gz", "2.1.0"), + ("https://gnu.org/sthg/info-2.1.2.zip", "2.1.2"), + ("https://sthg.org/gnu/sthg.tar.gz", "sthg"), + ("https://sthg.org/gnu/DLDF-1.1.4.tar.gz", "1.1.4"), + ("https://sthg.org/gnu/anubis-latest.tar.bz2", "latest"), + ("https://ftp.org/gnu/aris-w32.zip", "w32"), + ("https://ftp.org/gnu/aris-w32-2.2.zip", "w32-2.2"), + ("https://ftp.org/gnu/autogen.info.tar.gz", "autogen.info"), + ("https://ftp.org/gnu/crypto-build-demo.tar.gz", "crypto-build-demo"), + ("https://ftp.org/gnu/clue+clio+xit.clisp.tar.gz", "clue+clio+xit.clisp"), + ("https://ftp.org/gnu/clue+clio.for-pcl.tar.gz", "clue+clio.for-pcl"), + ( + "https://ftp.org/gnu/clisp-hppa2.0-hp-hpux10.20.tar.gz", + "hppa2.0-hp-hpux10.20", + ), + ("clisp-i386-solaris2.6.tar.gz", "i386-solaris2.6"), + ("clisp-mips-sgi-irix6.5.tar.gz", "mips-sgi-irix6.5"), + ("clisp-powerpc-apple-macos.tar.gz", "powerpc-apple-macos"), + ("clisp-powerpc-unknown-linuxlibc6.tar.gz", "powerpc-unknown-linuxlibc6"), + ("clisp-rs6000-ibm-aix3.2.5.tar.gz", "rs6000-ibm-aix3.2.5"), + ("clisp-sparc-redhat51-linux.tar.gz", "sparc-redhat51-linux"), + ("clisp-sparc-sun-solaris2.4.tar.gz", "sparc-sun-solaris2.4"), + ("clisp-sparc-sun-sunos4.1.3_U1.tar.gz", "sparc-sun-sunos4.1.3_U1"), + ("clisp-2.25.1-powerpc-apple-MacOSX.tar.gz", "2.25.1-powerpc-apple-MacOSX"), + ( + "clisp-2.27-PowerMacintosh-powerpc-Darwin-1.3.7.tar.gz", + "2.27-PowerMacintosh-powerpc-Darwin-1.3.7", + ), + ( + "clisp-2.27-i686-unknown-Linux-2.2.19.tar.gz", + "2.27-i686-unknown-Linux-2.2.19", + ), + ( + "clisp-2.28-i386-i386-freebsd-4.3-RELEASE.tar.gz", + "2.28-i386-i386-freebsd-4.3-RELEASE", + ), + ( + "clisp-2.28-i686-unknown-cygwin_me-4.90-1.3.10.tar.gz", + "2.28-i686-unknown-cygwin_me-4.90-1.3.10", + ), + ( + "clisp-2.29-i386-i386-freebsd-4.6-STABLE.tar.gz", + "2.29-i386-i386-freebsd-4.6-STABLE", + ), + ( + "clisp-2.29-i686-unknown-cygwin_nt-5.0-1.3.12.tar.gz", + "2.29-i686-unknown-cygwin_nt-5.0-1.3.12", + ), + ( + "gcl-2.5.3-ansi-japi-xdr.20030701_mingw32.zip", + "2.5.3-ansi-japi-xdr.20030701_mingw32", + ), + ("gettext-runtime-0.13.1.bin.woe32.zip", "0.13.1.bin.woe32"), + ("sather-logo_images.tar.gz", "sather-logo_images"), + ("sather-specification-000328.html.tar.gz", "000328.html"), + ("something-10.1.0.7z", "10.1.0"), ]: actual_branchname = get_version(url) assert actual_branchname == expected_branchname def test_format_date(): for timestamp, expected_isoformat_date in [ - (1489817408, '2017-03-18T06:10:08+00:00'), - (1386961236, '2013-12-13T19:00:36+00:00'), - ('1198900505', '2007-12-29T03:55:05+00:00'), - (1059822922, '2003-08-02T11:15:22+00:00'), - ('1489817408', '2017-03-18T06:10:08+00:00'), + (1489817408, "2017-03-18T06:10:08+00:00"), + (1386961236, "2013-12-13T19:00:36+00:00"), + ("1198900505", "2007-12-29T03:55:05+00:00"), + (1059822922, "2003-08-02T11:15:22+00:00"), + ("1489817408", "2017-03-18T06:10:08+00:00"), ]: actual_date = format_date(timestamp) assert actual_date == expected_isoformat_date with pytest.raises(ValueError): - format_date('') + format_date("") with pytest.raises(TypeError): format_date(None) diff --git a/swh/lister/gnu/tree.py b/swh/lister/gnu/tree.py index 8ef6bd6..f01c0a7 100644 --- a/swh/lister/gnu/tree.py +++ b/swh/lister/gnu/tree.py @@ -1,308 +1,340 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip import json import logging import requests import re from datetime import datetime from os import path from pathlib import Path from pytz import utc from typing import Any, List, Mapping, Sequence, Tuple from urllib.parse import urlparse logger = logging.getLogger(__name__) class GNUTree: """Gnu Tree's representation """ + def __init__(self, url: str): self.url = url # filepath or uri u = urlparse(url) - self.base_url = '%s://%s' % (u.scheme, u.netloc) + self.base_url = "%s://%s" % (u.scheme, u.netloc) # Interesting top level directories - self.top_level_directories = ['gnu', 'old-gnu'] + self.top_level_directories = ["gnu", "old-gnu"] # internal state self._artifacts = {} # type: Mapping[str, Any] self._projects = {} # type: Mapping[str, Any] @property def projects(self) -> Mapping[str, Any]: if not self._projects: self._projects, self._artifacts = self._load() return self._projects @property def artifacts(self) -> Mapping[str, Any]: if not self._artifacts: self._projects, self._artifacts = self._load() return self._artifacts def _load(self) -> Tuple[Mapping[str, Any], Mapping[str, Any]]: """Compute projects and artifacts per project Returns: Tuple of dict projects (key project url, value the associated information) and a dict artifacts (key project url, value the info_file list) """ projects = {} artifacts = {} raw_data = load_raw_data(self.url)[0] - for directory in raw_data['contents']: - if directory['name'] not in self.top_level_directories: + for directory in raw_data["contents"]: + if directory["name"] not in self.top_level_directories: continue - infos = directory['contents'] + infos = directory["contents"] for info in infos: - if info['type'] == 'directory': - package_url = '%s/%s/%s/' % ( - self.base_url, directory['name'], info['name']) - package_artifacts = find_artifacts( - info['contents'], package_url) + if info["type"] == "directory": + package_url = "%s/%s/%s/" % ( + self.base_url, + directory["name"], + info["name"], + ) + package_artifacts = find_artifacts(info["contents"], package_url) if package_artifacts != []: repo_details = { - 'name': info['name'], - 'url': package_url, - 'time_modified': format_date(info['time']) + "name": info["name"], + "url": package_url, + "time_modified": format_date(info["time"]), } artifacts[package_url] = package_artifacts projects[package_url] = repo_details return projects, artifacts -def find_artifacts(filesystem: List[Mapping[str, Any]], - url: str) -> List[Mapping[str, Any]]: +def find_artifacts( + filesystem: List[Mapping[str, Any]], url: str +) -> List[Mapping[str, Any]]: """Recursively list artifacts present in the folder and subfolders for a particular package url. Args: filesystem: File structure of the package root directory. This is a list of Dict representing either file or directory information as dict (keys: name, size, time, type). url: URL of the corresponding package Returns List of tarball urls and their associated metadata (time, length, etc...). For example: .. code-block:: python [ { 'url': 'https://ftp.gnu.org/gnu/3dldf/3DLDF-1.1.3.tar.gz', 'time': 1071002600, 'filename': '3DLDF-1.1.3.tar.gz', 'version': '1.1.3', 'length': 543 }, { 'url': 'https://ftp.gnu.org/gnu/3dldf/3DLDF-1.1.4.tar.gz', 'time': 1071078759, 'filename: '3DLDF-1.1.4.tar.gz', 'version': '1.1.4', 'length': 456 }, { 'url': 'https://ftp.gnu.org/gnu/3dldf/3DLDF-1.1.5.tar.gz', 'time': 1074278633, 'filename': '3DLDF-1.1.5.tar.gz', 'version': '1.1.5' 'length': 251 }, ... ] """ artifacts = [] # type: List[Mapping[str, Any]] for info_file in filesystem: - filetype = info_file['type'] - filename = info_file['name'] - if filetype == 'file': + filetype = info_file["type"] + filename = info_file["name"] + if filetype == "file": if check_filename_is_archive(filename): uri = url + filename - artifacts.append({ - 'url': uri, - 'filename': filename, - 'time': format_date(info_file['time']), - 'length': int(info_file['size']), - 'version': get_version(filename), - }) + artifacts.append( + { + "url": uri, + "filename": filename, + "time": format_date(info_file["time"]), + "length": int(info_file["size"]), + "version": get_version(filename), + } + ) # It will recursively check for artifacts in all sub-folders - elif filetype == 'directory': + elif filetype == "directory": tarballs_in_dir = find_artifacts( - info_file['contents'], - url + filename + '/') + info_file["contents"], url + filename + "/" + ) artifacts.extend(tarballs_in_dir) return artifacts def check_filename_is_archive(filename: str) -> bool: """ Check for the extension of the file, if the file is of zip format of .tar.x format, where x could be anything, then returns true. Args: filename: name of the file for which the extensions is needs to be checked. Returns: Whether filename is an archive or not Example: >>> check_filename_is_archive('abc.zip') True >>> check_filename_is_archive('abc.tar.gz') True >>> check_filename_is_archive('bac.tar') True >>> check_filename_is_archive('abc.tar.gz.sig') False >>> check_filename_is_archive('foobar.tar.') False """ file_suffixes = Path(filename).suffixes - if len(file_suffixes) == 1 and file_suffixes[-1] in ('.zip', '.tar'): + if len(file_suffixes) == 1 and file_suffixes[-1] in (".zip", ".tar"): return True elif len(file_suffixes) > 1: - if file_suffixes[-1] == '.zip' or file_suffixes[-2] == '.tar': + if file_suffixes[-1] == ".zip" or file_suffixes[-2] == ".tar": return True return False # to recognize existing naming pattern EXTENSIONS = [ - 'zip', - 'tar', - 'gz', 'tgz', - 'bz2', 'bzip2', - 'lzma', 'lz', - 'xz', - 'Z', '7z', + "zip", + "tar", + "gz", + "tgz", + "bz2", + "bzip2", + "lzma", + "lz", + "xz", + "Z", + "7z", ] VERSION_KEYWORDS = [ - 'cygwin_me', - 'w32', 'win32', 'nt', 'cygwin', 'mingw', - 'latest', 'alpha', 'beta', - 'release', 'stable', - 'hppa', - 'solaris', 'sunos', 'sun4u', 'sparc', 'sun', - 'aix', 'ibm', 'rs6000', - 'i386', 'i686', - 'linux', 'redhat', 'linuxlibc', - 'mips', - 'powerpc', 'macos', 'apple', 'darwin', 'macosx', 'powermacintosh', - 'unknown', - 'netbsd', 'freebsd', - 'sgi', 'irix', + "cygwin_me", + "w32", + "win32", + "nt", + "cygwin", + "mingw", + "latest", + "alpha", + "beta", + "release", + "stable", + "hppa", + "solaris", + "sunos", + "sun4u", + "sparc", + "sun", + "aix", + "ibm", + "rs6000", + "i386", + "i686", + "linux", + "redhat", + "linuxlibc", + "mips", + "powerpc", + "macos", + "apple", + "darwin", + "macosx", + "powermacintosh", + "unknown", + "netbsd", + "freebsd", + "sgi", + "irix", ] # Match a filename into components. # # We use Debian's release number heuristic: A release number starts # with a digit, and is followed by alphanumeric characters or any of # ., +, :, ~ and - # # We hardcode a list of possible extensions, as this release number # scheme would match them too... We match on any combination of those. # # Greedy matching is done right to left (we only match the extension # greedily with +, software_name and release_number are matched lazily # with +? and *?). -PATTERN = r''' +PATTERN = r""" ^ (?: # We have a software name and a release number, separated with a # -, _ or dot. (?P.+?[-_.]) (?P({vkeywords}|[0-9][0-9a-zA-Z_.+:~-]*?)+) | # We couldn't match a release number, put everything in the # software name. (?P.+?) ) (?P(?:\.(?:{extensions}))+) $ -'''.format( - extensions='|'.join(EXTENSIONS), - vkeywords='|'.join('%s[-]?' % k for k in VERSION_KEYWORDS), +""".format( + extensions="|".join(EXTENSIONS), + vkeywords="|".join("%s[-]?" % k for k in VERSION_KEYWORDS), ) def get_version(uri: str) -> str: """Extract branch name from tarball uri Args: uri (str): Tarball URI Returns: Version detected Example: For uri = https://ftp.gnu.org/gnu/8sync/8sync-0.2.0.tar.gz >>> get_version(uri) '0.2.0' For uri = 8sync-0.3.0.tar.gz >>> get_version(uri) '0.3.0' """ filename = path.split(uri)[-1] - m = re.match(PATTERN, filename, - flags=re.VERBOSE | re.IGNORECASE) + m = re.match(PATTERN, filename, flags=re.VERBOSE | re.IGNORECASE) if m: d = m.groupdict() - if d['software_name1'] and d['release_number']: - return d['release_number'] - if d['software_name2']: - return d['software_name2'] + if d["software_name1"] and d["release_number"]: + return d["release_number"] + if d["software_name2"]: + return d["software_name2"] - return '' + return "" def load_raw_data(url: str) -> Sequence[Mapping]: """Load the raw json from the tree.json.gz Args: url: Tree.json.gz url or path Returns: The raw json list """ - if url.startswith('http://') or url.startswith('https://'): + if url.startswith("http://") or url.startswith("https://"): response = requests.get(url, allow_redirects=True) if not response.ok: - raise ValueError('Error during query to %s' % url) + raise ValueError("Error during query to %s" % url) raw = gzip.decompress(response.content) else: - with gzip.open(url, 'r') as f: + with gzip.open(url, "r") as f: raw = f.read() - raw_data = json.loads(raw.decode('utf-8')) + raw_data = json.loads(raw.decode("utf-8")) return raw_data def format_date(timestamp: str) -> str: """Format a string timestamp to an isoformat string """ return datetime.fromtimestamp(int(timestamp), tz=utc).isoformat() diff --git a/swh/lister/npm/__init__.py b/swh/lister/npm/__init__.py index 77c3d38..0d10210 100644 --- a/swh/lister/npm/__init__.py +++ b/swh/lister/npm/__init__.py @@ -1,20 +1,21 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def register(): from .models import NpmVisitModel, NpmModel from .lister import NpmLister - return {'models': [NpmVisitModel, NpmModel], - 'lister': NpmLister, - 'task_modules': ['%s.tasks' % __name__], - 'task_types': { - 'list-npm-full': { - 'default_interval': '7 days', - 'min_interval': '7 days', - 'max_interval': '7 days', - }, - }, - } + return { + "models": [NpmVisitModel, NpmModel], + "lister": NpmLister, + "task_modules": ["%s.tasks" % __name__], + "task_types": { + "list-npm-full": { + "default_interval": "7 days", + "min_interval": "7 days", + "max_interval": "7 days", + }, + }, + } diff --git a/swh/lister/npm/lister.py b/swh/lister/npm/lister.py index 5214032..15c2556 100644 --- a/swh/lister/npm/lister.py +++ b/swh/lister/npm/lister.py @@ -1,157 +1,156 @@ # Copyright (C) 2018-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.lister.core.indexing_lister import IndexingHttpLister from swh.lister.npm.models import NpmModel from swh.scheduler.utils import create_task_dict from typing import Any, Dict, Optional, List from requests import Response class NpmListerBase(IndexingHttpLister): """List packages available in the npm registry in a paginated way """ + MODEL = NpmModel - LISTER_NAME = 'npm' - instance = 'npm' + LISTER_NAME = "npm" + instance = "npm" - def __init__(self, url='https://replicate.npmjs.com', - per_page=1000, override_config=None): + def __init__( + self, url="https://replicate.npmjs.com", per_page=1000, override_config=None + ): super().__init__(url=url, override_config=override_config) self.per_page = per_page + 1 - self.PATH_TEMPLATE += '&limit=%s' % self.per_page + self.PATH_TEMPLATE += "&limit=%s" % self.per_page @property def ADDITIONAL_CONFIG(self) -> Dict[str, Any]: """(Override) Add extra configuration """ default_config = super().ADDITIONAL_CONFIG - default_config['loading_task_policy'] = ('str', 'recurring') + default_config["loading_task_policy"] = ("str", "recurring") return default_config def get_model_from_repo(self, repo_name: str) -> Dict[str, str]: """(Override) Transform from npm package name to model """ - package_url = 'https://www.npmjs.com/package/%s' % repo_name + package_url = "https://www.npmjs.com/package/%s" % repo_name return { - 'uid': repo_name, - 'indexable': repo_name, - 'name': repo_name, - 'full_name': repo_name, - 'html_url': package_url, - 'origin_url': package_url, - 'origin_type': 'npm', + "uid": repo_name, + "indexable": repo_name, + "name": repo_name, + "full_name": repo_name, + "html_url": package_url, + "origin_url": package_url, + "origin_type": "npm", } def task_dict(self, origin_type: str, origin_url: str, **kwargs): """(Override) Return task dict for loading a npm package into the archive. This is overridden from the lister_base as more information is needed for the ingestion task creation. """ - task_type = 'load-%s' % origin_type - task_policy = self.config['loading_task_policy'] - return create_task_dict(task_type, task_policy, - url=origin_url) + task_type = "load-%s" % origin_type + task_policy = self.config["loading_task_policy"] + return create_task_dict(task_type, task_policy, url=origin_url) def request_headers(self) -> Dict[str, Any]: """(Override) Set requests headers to send when querying the npm registry. """ headers = super().request_headers() - headers['Accept'] = 'application/json' + headers["Accept"] = "application/json" return headers def string_pattern_check(self, inner: int, lower: int, upper: int = None): """ (Override) Inhibit the effect of that method as packages indices correspond to package names and thus do not respect any kind of fixed length string pattern """ pass class NpmLister(NpmListerBase): """List all packages available in the npm registry in a paginated way """ + PATH_TEMPLATE = '/_all_docs?startkey="%s"' - def get_next_target_from_response( - self, response: Response) -> Optional[str]: + def get_next_target_from_response(self, response: Response) -> Optional[str]: """(Override) Get next npm package name to continue the listing """ - repos = response.json()['rows'] - return repos[-1]['id'] if len(repos) == self.per_page else None + repos = response.json()["rows"] + return repos[-1]["id"] if len(repos) == self.per_page else None - def transport_response_simplified( - self, response: Response) -> List[Dict[str, str]]: + def transport_response_simplified(self, response: Response) -> List[Dict[str, str]]: """(Override) Transform npm registry response to list for model manipulation """ - repos = response.json()['rows'] + repos = response.json()["rows"] if len(repos) == self.per_page: repos = repos[:-1] - return [self.get_model_from_repo(repo['id']) for repo in repos] + return [self.get_model_from_repo(repo["id"]) for repo in repos] class NpmIncrementalLister(NpmListerBase): """List packages in the npm registry, updated since a specific update_seq value of the underlying CouchDB database, in a paginated way. """ - PATH_TEMPLATE = '/_changes?since=%s' + + PATH_TEMPLATE = "/_changes?since=%s" @property def CONFIG_BASE_FILENAME(self): # noqa: N802 - return 'lister_npm_incremental' + return "lister_npm_incremental" - def get_next_target_from_response( - self, response: Response) -> Optional[str]: + def get_next_target_from_response(self, response: Response) -> Optional[str]: """(Override) Get next npm package name to continue the listing. """ - repos = response.json()['results'] - return repos[-1]['seq'] if len(repos) == self.per_page else None + repos = response.json()["results"] + return repos[-1]["seq"] if len(repos) == self.per_page else None - def transport_response_simplified( - self, response: Response) -> List[Dict[str, str]]: + def transport_response_simplified(self, response: Response) -> List[Dict[str, str]]: """(Override) Transform npm registry response to list for model manipulation. """ - repos = response.json()['results'] + repos = response.json()["results"] if len(repos) == self.per_page: repos = repos[:-1] - return [self.get_model_from_repo(repo['id']) for repo in repos] + return [self.get_model_from_repo(repo["id"]) for repo in repos] def filter_before_inject(self, models_list: List[Dict[str, Any]]): """(Override) Filter out documents in the CouchDB database not related to a npm package. """ models_filtered = [] for model in models_list: - package_name = model['name'] + package_name = model["name"] # document related to CouchDB internals - if package_name.startswith('_design/'): + if package_name.startswith("_design/"): continue models_filtered.append(model) return models_filtered def disable_deleted_repo_tasks(self, start, end, keep_these): """(Override) Disable the processing performed by that method as it is not relevant in this incremental lister context. It also raises an exception due to a different index type (int instead of str). """ pass diff --git a/swh/lister/npm/models.py b/swh/lister/npm/models.py index 5eb8d0d..08f7d6e 100644 --- a/swh/lister/npm/models.py +++ b/swh/lister/npm/models.py @@ -1,35 +1,37 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from sqlalchemy import Column, String, DateTime, Integer, BigInteger, Sequence from swh.lister.core.models import IndexingModelBase, SQLBase, ABCSQLMeta class NpmVisitModel(SQLBase, metaclass=ABCSQLMeta): """Table to store the npm registry state at the time of a content listing by Software Heritage """ - __tablename__ = 'npm_visit' - uid = Column(Integer, Sequence('npm_visit_id_seq'), primary_key=True) + __tablename__ = "npm_visit" + + uid = Column(Integer, Sequence("npm_visit_id_seq"), primary_key=True) visit_date = Column(DateTime, nullable=False) doc_count = Column(BigInteger) doc_del_count = Column(BigInteger) update_seq = Column(BigInteger) purge_seq = Column(BigInteger) disk_size = Column(BigInteger) data_size = Column(BigInteger) committed_update_seq = Column(BigInteger) compacted_seq = Column(BigInteger) class NpmModel(IndexingModelBase): """A npm package representation """ - __tablename__ = 'npm_repo' + + __tablename__ = "npm_repo" uid = Column(String, primary_key=True) indexable = Column(String, index=True) diff --git a/swh/lister/npm/tasks.py b/swh/lister/npm/tasks.py index 1e4a51c..b0d06d4 100644 --- a/swh/lister/npm/tasks.py +++ b/swh/lister/npm/tasks.py @@ -1,62 +1,71 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from contextlib import contextmanager from celery import shared_task from swh.lister.npm.lister import NpmLister, NpmIncrementalLister from swh.lister.npm.models import NpmVisitModel @contextmanager def save_registry_state(lister): - params = {'headers': lister.request_headers()} + params = {"headers": lister.request_headers()} registry_state = lister.session.get(lister.url, **params) registry_state = registry_state.json() - keys = ('doc_count', 'doc_del_count', 'update_seq', 'purge_seq', - 'disk_size', 'data_size', 'committed_update_seq', - 'compacted_seq') + keys = ( + "doc_count", + "doc_del_count", + "update_seq", + "purge_seq", + "disk_size", + "data_size", + "committed_update_seq", + "compacted_seq", + ) state = {key: registry_state[key] for key in keys} - state['visit_date'] = datetime.now() + state["visit_date"] = datetime.now() yield npm_visit = NpmVisitModel(**state) lister.db_session.add(npm_visit) lister.db_session.commit() def get_last_update_seq(lister): """Get latest ``update_seq`` value for listing only updated packages. """ query = lister.db_session.query(NpmVisitModel.update_seq) row = query.order_by(NpmVisitModel.uid.desc()).first() if not row: - raise ValueError('No npm registry listing previously performed ! ' - 'This is required prior to the execution of an ' - 'incremental listing.') + raise ValueError( + "No npm registry listing previously performed ! " + "This is required prior to the execution of an " + "incremental listing." + ) return row[0] -@shared_task(name=__name__ + '.NpmListerTask') +@shared_task(name=__name__ + ".NpmListerTask") def list_npm_full(**lister_args): - 'Full lister for the npm (javascript) registry' + "Full lister for the npm (javascript) registry" lister = NpmLister(**lister_args) with save_registry_state(lister): return lister.run() -@shared_task(name=__name__ + '.NpmIncrementalListerTask') +@shared_task(name=__name__ + ".NpmIncrementalListerTask") def list_npm_incremental(**lister_args): - 'Incremental lister for the npm (javascript) registry' + "Incremental lister for the npm (javascript) registry" lister = NpmIncrementalLister(**lister_args) update_seq_start = get_last_update_seq(lister) with save_registry_state(lister): return lister.run(min_bound=update_seq_start) -@shared_task(name=__name__ + '.ping') +@shared_task(name=__name__ + ".ping") def _ping(): - return 'OK' + return "OK" diff --git a/swh/lister/npm/tests/conftest.py b/swh/lister/npm/tests/conftest.py index a7f2433..bfa555f 100644 --- a/swh/lister/npm/tests/conftest.py +++ b/swh/lister/npm/tests/conftest.py @@ -1,23 +1,25 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.lister.core.tests.conftest import * # noqa @pytest.fixture def lister_npm(swh_listers): - lister = swh_listers['npm'] + lister = swh_listers["npm"] # Add the load-deb-package in the scheduler backend - lister.scheduler.create_task_type({ - 'type': 'load-npm', - 'description': 'Load npm package', - 'backend_name': 'swh.loader.package.tasks.LoadNpm', - 'default_interval': '1 day', - }) + lister.scheduler.create_task_type( + { + "type": "load-npm", + "description": "Load npm package", + "backend_name": "swh.loader.package.tasks.LoadNpm", + "default_interval": "1 day", + } + ) return lister diff --git a/swh/lister/npm/tests/test_lister.py b/swh/lister/npm/tests/test_lister.py index 2a7ed8d..5a28a6d 100644 --- a/swh/lister/npm/tests/test_lister.py +++ b/swh/lister/npm/tests/test_lister.py @@ -1,98 +1,99 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import re import requests_mock import unittest from swh.lister.core.tests.test_lister import HttpListerTesterBase from swh.lister.npm.lister import NpmLister, NpmIncrementalLister from typing import Any, List logger = logging.getLogger(__name__) class NpmListerTester(HttpListerTesterBase, unittest.TestCase): Lister = NpmLister test_re = re.compile(r'^.*/_all_docs\?startkey="(.+)".*') - lister_subdir = 'npm' - good_api_response_file = 'data/replicate.npmjs.com/api_response.json' - bad_api_response_file = 'data/api_empty_response.json' - first_index = 'jquery' + lister_subdir = "npm" + good_api_response_file = "data/replicate.npmjs.com/api_response.json" + bad_api_response_file = "data/api_empty_response.json" + first_index = "jquery" entries_per_page = 100 @requests_mock.Mocker() def test_is_within_bounds(self, http_mocker): # disable this test from HttpListerTesterBase as # it can not succeed for the npm lister due to the # overriding of the string_pattern_check method pass class NpmIncrementalListerTester(HttpListerTesterBase, unittest.TestCase): Lister = NpmIncrementalLister - test_re = re.compile(r'^.*/_changes\?since=([0-9]+).*') - lister_subdir = 'npm' - good_api_response_file = 'data/api_inc_response.json' - bad_api_response_file = 'data/api_inc_empty_response.json' - first_index = '6920642' + test_re = re.compile(r"^.*/_changes\?since=([0-9]+).*") + lister_subdir = "npm" + good_api_response_file = "data/api_inc_response.json" + bad_api_response_file = "data/api_inc_empty_response.json" + first_index = "6920642" entries_per_page = 100 @requests_mock.Mocker() def test_is_within_bounds(self, http_mocker): # disable this test from HttpListerTesterBase as # it can not succeed for the npm lister due to the # overriding of the string_pattern_check method pass def check_tasks(tasks: List[Any]): """Ensure scheduled tasks are in the expected format. """ for row in tasks: - logger.debug('row: %s', row) - assert row['type'] == 'load-npm' + logger.debug("row: %s", row) + assert row["type"] == "load-npm" # arguments check - args = row['arguments']['args'] + args = row["arguments"]["args"] assert len(args) == 0 # kwargs - kwargs = row['arguments']['kwargs'] + kwargs = row["arguments"]["kwargs"] assert len(kwargs) == 1 - package_url = kwargs['url'] - package_name = package_url.split('/')[-1] - assert package_url == f'https://www.npmjs.com/package/{package_name}' + package_url = kwargs["url"] + package_name = package_url.split("/")[-1] + assert package_url == f"https://www.npmjs.com/package/{package_name}" - assert row['policy'] == 'recurring' - assert row['priority'] is None + assert row["policy"] == "recurring" + assert row["priority"] is None def test_lister_npm_basic_listing(lister_npm, requests_mock_datadir): lister_npm.run() - tasks = lister_npm.scheduler.search_tasks(task_type='load-npm') + tasks = lister_npm.scheduler.search_tasks(task_type="load-npm") assert len(tasks) == 100 check_tasks(tasks) def test_lister_npm_listing_pagination(lister_npm, requests_mock_datadir): lister = lister_npm # Patch per page pagination lister.per_page = 10 + 1 lister.PATH_TEMPLATE = lister.PATH_TEMPLATE.replace( - '&limit=1001', '&limit=%s' % lister.per_page) + "&limit=1001", "&limit=%s" % lister.per_page + ) lister.run() - tasks = lister.scheduler.search_tasks(task_type='load-npm') + tasks = lister.scheduler.search_tasks(task_type="load-npm") assert len(tasks) == 2 * 10 # only 2 files with 10 results each check_tasks(tasks) diff --git a/swh/lister/npm/tests/test_tasks.py b/swh/lister/npm/tests/test_tasks.py index 491374f..382e557 100644 --- a/swh/lister/npm/tests/test_tasks.py +++ b/swh/lister/npm/tests/test_tasks.py @@ -1,54 +1,52 @@ from contextlib import contextmanager from unittest.mock import patch @contextmanager def mock_save(lister): yield def test_ping(swh_app, celery_session_worker): - res = swh_app.send_task( - 'swh.lister.npm.tasks.ping') + res = swh_app.send_task("swh.lister.npm.tasks.ping") assert res res.wait() assert res.successful() - assert res.result == 'OK' + assert res.result == "OK" -@patch('swh.lister.npm.tasks.save_registry_state') -@patch('swh.lister.npm.tasks.NpmLister') +@patch("swh.lister.npm.tasks.save_registry_state") +@patch("swh.lister.npm.tasks.NpmLister") def test_lister(lister, save, swh_app, celery_session_worker): # setup the mocked NpmLister lister.return_value = lister lister.run.return_value = None save.side_effect = mock_save - res = swh_app.send_task('swh.lister.npm.tasks.NpmListerTask') + res = swh_app.send_task("swh.lister.npm.tasks.NpmListerTask") assert res res.wait() assert res.successful() lister.assert_called_once_with() lister.run.assert_called_once_with() -@patch('swh.lister.npm.tasks.save_registry_state') -@patch('swh.lister.npm.tasks.get_last_update_seq') -@patch('swh.lister.npm.tasks.NpmIncrementalLister') +@patch("swh.lister.npm.tasks.save_registry_state") +@patch("swh.lister.npm.tasks.get_last_update_seq") +@patch("swh.lister.npm.tasks.NpmIncrementalLister") def test_incremental(lister, seq, save, swh_app, celery_session_worker): # setup the mocked NpmLister lister.return_value = lister lister.run.return_value = None seq.return_value = 42 save.side_effect = mock_save - res = swh_app.send_task( - 'swh.lister.npm.tasks.NpmIncrementalListerTask') + res = swh_app.send_task("swh.lister.npm.tasks.NpmIncrementalListerTask") assert res res.wait() assert res.successful() lister.assert_called_once_with() seq.assert_called_once_with(lister) lister.run.assert_called_once_with(min_bound=42) diff --git a/swh/lister/packagist/__init__.py b/swh/lister/packagist/__init__.py index 4060cf2..a97ede1 100644 --- a/swh/lister/packagist/__init__.py +++ b/swh/lister/packagist/__init__.py @@ -1,13 +1,14 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def register(): from .models import PackagistModel from .lister import PackagistLister - return {'models': [PackagistModel], - 'lister': PackagistLister, - 'task_modules': ['%s.tasks' % __name__], - } + return { + "models": [PackagistModel], + "lister": PackagistLister, + "task_modules": ["%s.tasks" % __name__], + } diff --git a/swh/lister/packagist/lister.py b/swh/lister/packagist/lister.py index 98e72f3..e7b9709 100644 --- a/swh/lister/packagist/lister.py +++ b/swh/lister/packagist/lister.py @@ -1,100 +1,104 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging import random from typing import Any, Dict, List, Mapping from swh.scheduler import utils from swh.lister.core.simple_lister import SimpleLister from swh.lister.core.lister_transports import ListerOnePageApiTransport from .models import PackagistModel logger = logging.getLogger(__name__) def compute_package_url(repo_name: str) -> str: """Compute packgist package url from repo name. """ - return 'https://repo.packagist.org/p/%s.json' % repo_name + return "https://repo.packagist.org/p/%s.json" % repo_name class PackagistLister(ListerOnePageApiTransport, SimpleLister): """List packages available in the Packagist package manager. The lister sends the request to the url present in the class variable `PAGE`, to receive a list of all the package names present in the Packagist package manager. Iterates over all the packages and constructs the metadata url of the package from the name of the package and creates a loading task:: Task: Type: load-packagist Policy: recurring Args: Example:: Task: Type: load-packagist Policy: recurring Args: 'hypejunction/hypegamemechanics' 'https://repo.packagist.org/p/hypejunction/hypegamemechanics.json' """ + MODEL = PackagistModel - LISTER_NAME = 'packagist' - PAGE = 'https://packagist.org/packages/list.json' - instance = 'packagist' + LISTER_NAME = "packagist" + PAGE = "https://packagist.org/packages/list.json" + instance = "packagist" def __init__(self, override_config=None): - ListerOnePageApiTransport .__init__(self) + ListerOnePageApiTransport.__init__(self) SimpleLister.__init__(self, override_config=override_config) - def task_dict(self, origin_type: str, origin_url: str, - **kwargs: Mapping[str, str]) -> Dict[str, Any]: + def task_dict( + self, origin_type: str, origin_url: str, **kwargs: Mapping[str, str] + ) -> Dict[str, Any]: """Return task format dict This is overridden from the lister_base as more information is needed for the ingestion task creation. """ return utils.create_task_dict( - 'load-%s' % origin_type, - kwargs.get('policy', 'recurring'), - kwargs.get('name'), origin_url, - retries_left=3) + "load-%s" % origin_type, + kwargs.get("policy", "recurring"), + kwargs.get("name"), + origin_url, + retries_left=3, + ) def list_packages(self, response: Any) -> List[str]: """List the actual packagist origins from the response. """ response = json.loads(response.text) - packages = [name for name in response['packageNames']] - logger.debug('Number of packages: %s', len(packages)) + packages = [name for name in response["packageNames"]] + logger.debug("Number of packages: %s", len(packages)) random.shuffle(packages) return packages def get_model_from_repo(self, repo_name: str) -> Mapping[str, str]: """Transform from repository representation to model """ url = compute_package_url(repo_name) return { - 'uid': repo_name, - 'name': repo_name, - 'full_name': repo_name, - 'html_url': url, - 'origin_url': url, - 'origin_type': 'packagist', + "uid": repo_name, + "name": repo_name, + "full_name": repo_name, + "html_url": url, + "origin_url": url, + "origin_type": "packagist", } diff --git a/swh/lister/packagist/models.py b/swh/lister/packagist/models.py index 36a6333..268f884 100644 --- a/swh/lister/packagist/models.py +++ b/swh/lister/packagist/models.py @@ -1,16 +1,17 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from sqlalchemy import Column, String from ..core.models import ModelBase class PackagistModel(ModelBase): """a Packagist repository representation """ - __tablename__ = 'packagist_repo' + + __tablename__ = "packagist_repo" uid = Column(String, primary_key=True) diff --git a/swh/lister/packagist/tasks.py b/swh/lister/packagist/tasks.py index 146ebe2..6f6087b 100644 --- a/swh/lister/packagist/tasks.py +++ b/swh/lister/packagist/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from .lister import PackagistLister -@shared_task(name=__name__ + '.PackagistListerTask') +@shared_task(name=__name__ + ".PackagistListerTask") def list_packagist(**lister_args): - 'List the packagist (php) registry' + "List the packagist (php) registry" PackagistLister(**lister_args).run() -@shared_task(name=__name__ + '.ping') +@shared_task(name=__name__ + ".ping") def _ping(): - return 'OK' + return "OK" diff --git a/swh/lister/packagist/tests/conftest.py b/swh/lister/packagist/tests/conftest.py index fe31517..1eafc36 100644 --- a/swh/lister/packagist/tests/conftest.py +++ b/swh/lister/packagist/tests/conftest.py @@ -1,23 +1,25 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.lister.core.tests.conftest import * # noqa @pytest.fixture def lister_packagist(swh_listers): - lister = swh_listers['packagist'] + lister = swh_listers["packagist"] # Amend the scheduler with an unknown yet load-packagist task type - lister.scheduler.create_task_type({ - 'type': 'load-packagist', - 'description': 'Load packagist origin', - 'backend_name': 'swh.loader.package.tasks.LoaderPackagist', - 'default_interval': '1 day', - }) + lister.scheduler.create_task_type( + { + "type": "load-packagist", + "description": "Load packagist origin", + "backend_name": "swh.loader.package.tasks.LoaderPackagist", + "default_interval": "1 day", + } + ) return lister diff --git a/swh/lister/packagist/tests/test_lister.py b/swh/lister/packagist/tests/test_lister.py index 869e6c2..3bfff49 100644 --- a/swh/lister/packagist/tests/test_lister.py +++ b/swh/lister/packagist/tests/test_lister.py @@ -1,102 +1,105 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import requests_mock from unittest.mock import patch from swh.lister.packagist.lister import PackagistLister, compute_package_url from swh.lister.core.tests.test_lister import HttpSimpleListerTester -expected_packages = ['0.0.0/composer-include-files', '0.0.0/laravel-env-shim', - '0.0.1/try-make-package', '0099ff/dialogflowphp', - '00f100/array_dot'] +expected_packages = [ + "0.0.0/composer-include-files", + "0.0.0/laravel-env-shim", + "0.0.1/try-make-package", + "0099ff/dialogflowphp", + "00f100/array_dot", +] expected_model = { - 'uid': '0099ff/dialogflowphp', - 'name': '0099ff/dialogflowphp', - 'full_name': '0099ff/dialogflowphp', - 'html_url': - 'https://repo.packagist.org/p/0099ff/dialogflowphp.json', - 'origin_url': - 'https://repo.packagist.org/p/0099ff/dialogflowphp.json', - 'origin_type': 'packagist', - } + "uid": "0099ff/dialogflowphp", + "name": "0099ff/dialogflowphp", + "full_name": "0099ff/dialogflowphp", + "html_url": "https://repo.packagist.org/p/0099ff/dialogflowphp.json", + "origin_url": "https://repo.packagist.org/p/0099ff/dialogflowphp.json", + "origin_type": "packagist", +} class PackagistListerTester(HttpSimpleListerTester, unittest.TestCase): Lister = PackagistLister - PAGE = 'https://packagist.org/packages/list.json' - lister_subdir = 'packagist' - good_api_response_file = 'data/https_packagist.org/packages_list.json' + PAGE = "https://packagist.org/packages/list.json" + lister_subdir = "packagist" + good_api_response_file = "data/https_packagist.org/packages_list.json" entries = 5 @requests_mock.Mocker() def test_list_packages(self, http_mocker): """List packages from simple api page should retrieve all packages within """ http_mocker.get(self.PAGE, text=self.mock_response) fl = self.get_fl() packages = fl.list_packages(self.get_api_response(0)) for package in expected_packages: assert package in packages def test_transport_response_simplified(self): """Test model created by the lister """ fl = self.get_fl() - model = fl.transport_response_simplified(['0099ff/dialogflowphp']) + model = fl.transport_response_simplified(["0099ff/dialogflowphp"]) assert len(model) == 1 for key, values in model[0].items(): assert values == expected_model[key] - @patch('swh.lister.packagist.lister.utils.create_task_dict') + @patch("swh.lister.packagist.lister.utils.create_task_dict") def test_task_dict(self, mock_create_tasks): """Test the task creation of lister """ fl = self.get_fl() - fl.task_dict(origin_type='packagist', origin_url='https://abc', - name='test_pack') + fl.task_dict( + origin_type="packagist", origin_url="https://abc", name="test_pack" + ) mock_create_tasks.assert_called_once_with( - 'load-packagist', 'recurring', 'test_pack', 'https://abc', - retries_left=3) + "load-packagist", "recurring", "test_pack", "https://abc", retries_left=3 + ) def test_compute_package_url(): - expected_url = 'https://repo.packagist.org/p/hello.json' - actual_url = compute_package_url('hello') + expected_url = "https://repo.packagist.org/p/hello.json" + actual_url = compute_package_url("hello") assert actual_url == expected_url def test_packagist_lister(lister_packagist, requests_mock_datadir): lister_packagist.run() - r = lister_packagist.scheduler.search_tasks(task_type='load-packagist') + r = lister_packagist.scheduler.search_tasks(task_type="load-packagist") assert len(r) == 5 for row in r: - assert row['type'] == 'load-packagist' + assert row["type"] == "load-packagist" # arguments check - args = row['arguments']['args'] + args = row["arguments"]["args"] assert len(args) == 2 package = args[0] url = args[1] expected_url = compute_package_url(package) assert url == expected_url # kwargs - kwargs = row['arguments']['kwargs'] + kwargs = row["arguments"]["kwargs"] assert kwargs == {} - assert row['policy'] == 'recurring' - assert row['priority'] is None + assert row["policy"] == "recurring" + assert row["priority"] is None diff --git a/swh/lister/packagist/tests/test_tasks.py b/swh/lister/packagist/tests/test_tasks.py index cbe807d..7c89b5b 100644 --- a/swh/lister/packagist/tests/test_tasks.py +++ b/swh/lister/packagist/tests/test_tasks.py @@ -1,31 +1,29 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from unittest.mock import patch def test_ping(swh_app, celery_session_worker): - res = swh_app.send_task( - 'swh.lister.packagist.tasks.ping') + res = swh_app.send_task("swh.lister.packagist.tasks.ping") assert res res.wait() assert res.successful() - assert res.result == 'OK' + assert res.result == "OK" -@patch('swh.lister.packagist.tasks.PackagistLister') +@patch("swh.lister.packagist.tasks.PackagistLister") def test_lister(lister, swh_app, celery_session_worker): # setup the mocked PackagistLister lister.return_value = lister lister.run.return_value = None - res = swh_app.send_task( - 'swh.lister.packagist.tasks.PackagistListerTask') + res = swh_app.send_task("swh.lister.packagist.tasks.PackagistListerTask") assert res res.wait() assert res.successful() lister.assert_called_once_with() lister.db_last_index.assert_not_called() lister.run.assert_called_once_with() diff --git a/swh/lister/phabricator/__init__.py b/swh/lister/phabricator/__init__.py index aeaee0a..3f5ff29 100644 --- a/swh/lister/phabricator/__init__.py +++ b/swh/lister/phabricator/__init__.py @@ -1,13 +1,14 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def register(): from .models import PhabricatorModel from .lister import PhabricatorLister - return {'models': [PhabricatorModel], - 'lister': PhabricatorLister, - 'task_modules': ['%s.tasks' % __name__], - } + return { + "models": [PhabricatorModel], + "lister": PhabricatorLister, + "task_modules": ["%s.tasks" % __name__], + } diff --git a/swh/lister/phabricator/lister.py b/swh/lister/phabricator/lister.py index bfb4a95..89487ae 100644 --- a/swh/lister/phabricator/lister.py +++ b/swh/lister/phabricator/lister.py @@ -1,190 +1,189 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import random import urllib.parse from collections import defaultdict from sqlalchemy import func from swh.lister.core.indexing_lister import IndexingHttpLister from swh.lister.phabricator.models import PhabricatorModel from typing import Any, Dict, List, Optional from requests import Response logger = logging.getLogger(__name__) class PhabricatorLister(IndexingHttpLister): - PATH_TEMPLATE = '?order=oldest&attachments[uris]=1&after=%s' - DEFAULT_URL = \ - 'https://forge.softwareheritage.org/api/diffusion.repository.search' + PATH_TEMPLATE = "?order=oldest&attachments[uris]=1&after=%s" + DEFAULT_URL = "https://forge.softwareheritage.org/api/diffusion.repository.search" MODEL = PhabricatorModel - LISTER_NAME = 'phabricator' + LISTER_NAME = "phabricator" def __init__(self, url=None, instance=None, override_config=None): super().__init__(url=url, override_config=override_config) if not instance: instance = urllib.parse.urlparse(self.url).hostname self.instance = instance def request_params(self, identifier: str) -> Dict[str, Any]: """Override the default params behavior to retrieve the api token Credentials are stored as: credentials: phabricator: : - username: password: """ creds = self.request_instance_credentials() if not creds: raise ValueError( - 'Phabricator forge needs authentication credential to list.') - api_token = random.choice(creds)['password'] + "Phabricator forge needs authentication credential to list." + ) + api_token = random.choice(creds)["password"] - return {'headers': self.request_headers() or {}, - 'params': {'api.token': api_token}} + return { + "headers": self.request_headers() or {}, + "params": {"api.token": api_token}, + } def request_headers(self): """ (Override) Set requests headers to send when querying the Phabricator API """ headers = super().request_headers() - headers['Accept'] = 'application/json' + headers["Accept"] = "application/json" return headers - def get_model_from_repo( - self, repo: Dict[str, Any]) -> Optional[Dict[str, Any]]: - url = get_repo_url(repo['attachments']['uris']['uris']) + def get_model_from_repo(self, repo: Dict[str, Any]) -> Optional[Dict[str, Any]]: + url = get_repo_url(repo["attachments"]["uris"]["uris"]) if url is None: return None return { - 'uid': url, - 'indexable': repo['id'], - 'name': repo['fields']['shortName'], - 'full_name': repo['fields']['name'], - 'html_url': url, - 'origin_url': url, - 'origin_type': repo['fields']['vcs'], - 'instance': self.instance, + "uid": url, + "indexable": repo["id"], + "name": repo["fields"]["shortName"], + "full_name": repo["fields"]["name"], + "html_url": url, + "origin_url": url, + "origin_type": repo["fields"]["vcs"], + "instance": self.instance, } - def get_next_target_from_response( - self, response: Response) -> Optional[int]: - body = response.json()['result']['cursor'] - if body['after'] and body['after'] != 'null': - return int(body['after']) + def get_next_target_from_response(self, response: Response) -> Optional[int]: + body = response.json()["result"]["cursor"] + if body["after"] and body["after"] != "null": + return int(body["after"]) return None def transport_response_simplified( - self, response: Response) -> List[Optional[Dict[str, Any]]]: + self, response: Response + ) -> List[Optional[Dict[str, Any]]]: repos = response.json() - if repos['result'] is None: + if repos["result"] is None: raise ValueError( - 'Problem during information fetch: %s' % repos['error_code']) - repos = repos['result']['data'] + "Problem during information fetch: %s" % repos["error_code"] + ) + repos = repos["result"]["data"] return [self.get_model_from_repo(repo) for repo in repos] def filter_before_inject(self, models_list): """ (Overrides) IndexingLister.filter_before_inject Bounds query results by this Lister's set max_index. """ models_list = [m for m in models_list if m is not None] return super().filter_before_inject(models_list) - def disable_deleted_repo_tasks( - self, index: int, next_index: int, keep_these: str): + def disable_deleted_repo_tasks(self, index: int, next_index: int, keep_these: str): """ (Overrides) Fix provided index value to avoid: - database query error - erroneously disabling some scheduler tasks """ # First call to the Phabricator API uses an empty 'after' parameter, # so set the index to 0 to avoid database query error - if index == '': + if index == "": index = 0 # Next listed repository ids are strictly greater than the 'after' # parameter, so increment the index to avoid disabling the latest # created task when processing a new repositories page returned by # the Phabricator API else: index = index + 1 - return super().disable_deleted_repo_tasks(index, next_index, - keep_these) + return super().disable_deleted_repo_tasks(index, next_index, keep_these) def db_first_index(self) -> Optional[int]: """ (Overrides) Filter results by Phabricator instance Returns: the smallest indexable value of all repos in the db """ t = self.db_session.query(func.min(self.MODEL.indexable)) t = t.filter(self.MODEL.instance == self.instance).first() if t: return t[0] return None def db_last_index(self): """ (Overrides) Filter results by Phabricator instance Returns: the largest indexable value of all instance repos in the db """ t = self.db_session.query(func.max(self.MODEL.indexable)) t = t.filter(self.MODEL.instance == self.instance).first() if t: return t[0] def db_query_range(self, start: int, end: int): """ (Overrides) Filter the results by the Phabricator instance to avoid disabling loading tasks for repositories hosted on a different instance. Returns: a list of sqlalchemy.ext.declarative.declarative_base objects with indexable values within the given range for the instance """ retlist = super().db_query_range(start, end) return retlist.filter(self.MODEL.instance == self.instance) def get_repo_url(attachments: List[Dict[str, Any]]) -> Optional[int]: """ Return url for a hosted repository from its uris attachments according to the following priority lists: * protocol: https > http * identifier: shortname > callsign > id """ processed_urls = defaultdict(dict) # type: Dict[str, Any] for uri in attachments: - protocol = uri['fields']['builtin']['protocol'] - url = uri['fields']['uri']['effective'] - identifier = uri['fields']['builtin']['identifier'] - if protocol in ('http', 'https'): + protocol = uri["fields"]["builtin"]["protocol"] + url = uri["fields"]["uri"]["effective"] + identifier = uri["fields"]["builtin"]["identifier"] + if protocol in ("http", "https"): processed_urls[protocol][identifier] = url elif protocol is None: - for protocol in ('https', 'http'): + for protocol in ("https", "http"): if url.startswith(protocol): - processed_urls[protocol]['undefined'] = url + processed_urls[protocol]["undefined"] = url break - for protocol in ['https', 'http']: - for identifier in ['shortname', 'callsign', 'id', 'undefined']: - if (protocol in processed_urls and - identifier in processed_urls[protocol]): + for protocol in ["https", "http"]: + for identifier in ["shortname", "callsign", "id", "undefined"]: + if protocol in processed_urls and identifier in processed_urls[protocol]: return processed_urls[protocol][identifier] return None diff --git a/swh/lister/phabricator/models.py b/swh/lister/phabricator/models.py index 96cc497..676be83 100644 --- a/swh/lister/phabricator/models.py +++ b/swh/lister/phabricator/models.py @@ -1,16 +1,17 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from sqlalchemy import Column, String, Integer from swh.lister.core.models import IndexingModelBase class PhabricatorModel(IndexingModelBase): """a Phabricator repository""" - __tablename__ = 'phabricator_repo' + + __tablename__ = "phabricator_repo" uid = Column(String, primary_key=True) indexable = Column(Integer, index=True) instance = Column(String, index=True) diff --git a/swh/lister/phabricator/tasks.py b/swh/lister/phabricator/tasks.py index 614f4f2..69e562c 100644 --- a/swh/lister/phabricator/tasks.py +++ b/swh/lister/phabricator/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from swh.lister.phabricator.lister import PhabricatorLister -@shared_task(name=__name__ + '.FullPhabricatorLister') +@shared_task(name=__name__ + ".FullPhabricatorLister") def list_phabricator_full(**lister_args): """Full update of a Phabricator instance""" return PhabricatorLister(**lister_args).run() -@shared_task(name=__name__ + '.ping') +@shared_task(name=__name__ + ".ping") def _ping(): - return 'OK' + return "OK" diff --git a/swh/lister/phabricator/tests/conftest.py b/swh/lister/phabricator/tests/conftest.py index 22de766..2713ce9 100644 --- a/swh/lister/phabricator/tests/conftest.py +++ b/swh/lister/phabricator/tests/conftest.py @@ -1,26 +1,21 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.lister.core.tests.conftest import * # noqa @pytest.fixture def lister_phabricator(swh_listers): - lister = swh_listers['phabricator'] + lister = swh_listers["phabricator"] # Amend the credentials lister.config = { - 'cache_responses': False, - 'credentials': { - 'phabricator': { - lister.instance: [{ - 'password': 'foo' - }] - }} + "cache_responses": False, + "credentials": {"phabricator": {lister.instance: [{"password": "foo"}]}}, } return lister diff --git a/swh/lister/phabricator/tests/test_lister.py b/swh/lister/phabricator/tests/test_lister.py index dcf76c0..6b95af0 100644 --- a/swh/lister/phabricator/tests/test_lister.py +++ b/swh/lister/phabricator/tests/test_lister.py @@ -1,140 +1,135 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import json import logging import unittest import requests_mock from swh.lister.core.tests.test_lister import HttpListerTester from swh.lister.phabricator.lister import PhabricatorLister from swh.lister.phabricator.lister import get_repo_url logger = logging.getLogger(__name__) class PhabricatorListerTester(HttpListerTester, unittest.TestCase): Lister = PhabricatorLister # first request will have the after parameter empty - test_re = re.compile(r'\&after=([^?&]*)') - lister_subdir = 'phabricator' - good_api_response_file = 'data/api_first_response.json' - good_api_response_undefined_protocol = \ - 'data/api_response_undefined_protocol.json' - bad_api_response_file = 'data/api_empty_response.json' + test_re = re.compile(r"\&after=([^?&]*)") + lister_subdir = "phabricator" + good_api_response_file = "data/api_first_response.json" + good_api_response_undefined_protocol = "data/api_response_undefined_protocol.json" + bad_api_response_file = "data/api_empty_response.json" # first_index must be retrieved through a bootstrap process for Phabricator first_index = None last_index = 12 entries_per_page = 10 convert_type = int def request_index(self, request): """(Override) This is needed to emulate the listing bootstrap when no min_bound is provided to run """ m = self.test_re.search(request.path_url) idx = m.group(1) - if idx not in ('', 'None'): + if idx not in ("", "None"): return int(idx) def get_fl(self, override_config=None): """(Override) Retrieve an instance of fake lister (fl). """ if override_config or self.fl is None: - credentials = {'phabricator': {'fake': [ - {'password': 'toto'} - ]}} - override_config = dict(credentials=credentials, - **(override_config or {})) - self.fl = self.Lister(url='https://fakeurl', instance='fake', - override_config=override_config) + credentials = {"phabricator": {"fake": [{"password": "toto"}]}} + override_config = dict(credentials=credentials, **(override_config or {})) + self.fl = self.Lister( + url="https://fakeurl", instance="fake", override_config=override_config + ) self.fl.INITIAL_BACKOFF = 1 self.fl.reset_backoff() return self.fl def test_get_repo_url(self): - f = open('swh/lister/%s/tests/%s' % (self.lister_subdir, - self.good_api_response_file)) + f = open( + "swh/lister/%s/tests/%s" % (self.lister_subdir, self.good_api_response_file) + ) api_response = json.load(f) - repos = api_response['result']['data'] + repos = api_response["result"]["data"] for repo in repos: self.assertEqual( - 'https://forge.softwareheritage.org/source/%s.git' % - (repo['fields']['shortName']), - get_repo_url(repo['attachments']['uris']['uris'])) - - f = open('swh/lister/%s/tests/%s' % - (self.lister_subdir, - self.good_api_response_undefined_protocol)) + "https://forge.softwareheritage.org/source/%s.git" + % (repo["fields"]["shortName"]), + get_repo_url(repo["attachments"]["uris"]["uris"]), + ) + + f = open( + "swh/lister/%s/tests/%s" + % (self.lister_subdir, self.good_api_response_undefined_protocol) + ) repo = json.load(f) self.assertEqual( - 'https://svn.blender.org/svnroot/bf-blender/', - get_repo_url(repo['attachments']['uris']['uris'])) + "https://svn.blender.org/svnroot/bf-blender/", + get_repo_url(repo["attachments"]["uris"]["uris"]), + ) @requests_mock.Mocker() def test_scheduled_tasks(self, http_mocker): - self.scheduled_tasks_test('data/api_next_response.json', 23, - http_mocker) + self.scheduled_tasks_test("data/api_next_response.json", 23, http_mocker) @requests_mock.Mocker() def test_scheduled_tasks_multiple_instances(self, http_mocker): fl = self.create_fl_with_db(http_mocker) # list first Phabricator instance fl.run() - fl.instance = 'other_fake' - fl.config['credentials'] = { - 'phabricator': { - 'other_fake': [{ - 'password': 'foo' - }] - } + fl.instance = "other_fake" + fl.config["credentials"] = { + "phabricator": {"other_fake": [{"password": "foo"}]} } # list second Phabricator instance hosting repositories having # same ids as those listed from the first instance - self.good_api_response_file = \ - 'data/api_first_response_other_instance.json' + self.good_api_response_file = "data/api_first_response_other_instance.json" self.last_index = 13 fl.run() # check expected number of loading tasks self.assertEqual(len(self.scheduler_tasks), 2 * self.entries_per_page) # check tasks are not disabled for task in self.scheduler_tasks: - self.assertTrue(task['status'] != 'disabled') + self.assertTrue(task["status"] != "disabled") def test_phabricator_lister(lister_phabricator, requests_mock_datadir): lister = lister_phabricator assert lister.url == lister.DEFAULT_URL - assert lister.instance == 'forge.softwareheritage.org' + assert lister.instance == "forge.softwareheritage.org" lister.run() - r = lister.scheduler.search_tasks(task_type='load-git') + r = lister.scheduler.search_tasks(task_type="load-git") assert len(r) == 10 for row in r: - assert row['type'] == 'load-git' + assert row["type"] == "load-git" # arguments check - args = row['arguments']['args'] + args = row["arguments"]["args"] assert len(args) == 0 # kwargs - kwargs = row['arguments']['kwargs'] - url = kwargs['url'] + kwargs = row["arguments"]["kwargs"] + url = kwargs["url"] assert lister.instance in url - assert row['policy'] == 'recurring' - assert row['priority'] is None + assert row["policy"] == "recurring" + assert row["priority"] is None diff --git a/swh/lister/phabricator/tests/test_tasks.py b/swh/lister/phabricator/tests/test_tasks.py index bf8f307..38e1686 100644 --- a/swh/lister/phabricator/tests/test_tasks.py +++ b/swh/lister/phabricator/tests/test_tasks.py @@ -1,12 +1,11 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def test_ping(swh_app, celery_session_worker): - res = swh_app.send_task( - 'swh.lister.phabricator.tasks.ping') + res = swh_app.send_task("swh.lister.phabricator.tasks.ping") assert res res.wait() assert res.successful() - assert res.result == 'OK' + assert res.result == "OK" diff --git a/swh/lister/pypi/__init__.py b/swh/lister/pypi/__init__.py index 0f845c3..6266e58 100644 --- a/swh/lister/pypi/__init__.py +++ b/swh/lister/pypi/__init__.py @@ -1,13 +1,14 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def register(): from .models import PyPIModel from .lister import PyPILister - return {'models': [PyPIModel], - 'lister': PyPILister, - 'task_modules': ['%s.tasks' % __name__], - } + return { + "models": [PyPIModel], + "lister": PyPILister, + "task_modules": ["%s.tasks" % __name__], + } diff --git a/swh/lister/pypi/lister.py b/swh/lister/pypi/lister.py index 0f22ae0..e7223e7 100644 --- a/swh/lister/pypi/lister.py +++ b/swh/lister/pypi/lister.py @@ -1,68 +1,67 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random import xmltodict from .models import PyPIModel from swh.scheduler import utils from swh.lister.core.simple_lister import SimpleLister from swh.lister.core.lister_transports import ListerOnePageApiTransport from typing import Any, Dict from requests import Response class PyPILister(ListerOnePageApiTransport, SimpleLister): MODEL = PyPIModel - LISTER_NAME = 'pypi' - PAGE = 'https://pypi.org/simple/' - instance = 'pypi' # As of today only the main pypi.org is used + LISTER_NAME = "pypi" + PAGE = "https://pypi.org/simple/" + instance = "pypi" # As of today only the main pypi.org is used def __init__(self, override_config=None): - ListerOnePageApiTransport .__init__(self) + ListerOnePageApiTransport.__init__(self) SimpleLister.__init__(self, override_config=override_config) def task_dict(self, origin_type: str, origin_url: str, **kwargs): """(Override) Return task format dict This is overridden from the lister_base as more information is needed for the ingestion task creation. """ - _type = 'load-%s' % origin_type - _policy = kwargs.get('policy', 'recurring') - return utils.create_task_dict( - _type, _policy, url=origin_url) + _type = "load-%s" % origin_type + _policy = kwargs.get("policy", "recurring") + return utils.create_task_dict(_type, _policy, url=origin_url) def list_packages(self, response: Response) -> list: """(Override) List the actual pypi origins from the response. """ result = xmltodict.parse(response.content) - _packages = [p['#text'] for p in result['html']['body']['a']] + _packages = [p["#text"] for p in result["html"]["body"]["a"]] random.shuffle(_packages) return _packages def origin_url(self, repo_name: str) -> str: """Returns origin_url """ - return 'https://pypi.org/project/%s/' % repo_name + return "https://pypi.org/project/%s/" % repo_name def get_model_from_repo(self, repo_name: str) -> Dict[str, Any]: """(Override) Transform from repository representation to model """ origin_url = self.origin_url(repo_name) return { - 'uid': origin_url, - 'name': repo_name, - 'full_name': repo_name, - 'html_url': origin_url, - 'origin_url': origin_url, - 'origin_type': 'pypi', + "uid": origin_url, + "name": repo_name, + "full_name": repo_name, + "html_url": origin_url, + "origin_url": origin_url, + "origin_type": "pypi", } diff --git a/swh/lister/pypi/models.py b/swh/lister/pypi/models.py index f34eef9..a6ec5ff 100644 --- a/swh/lister/pypi/models.py +++ b/swh/lister/pypi/models.py @@ -1,16 +1,17 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from sqlalchemy import Column, String from ..core.models import ModelBase class PyPIModel(ModelBase): """a PyPI repository representation """ - __tablename__ = 'pypi_repo' + + __tablename__ = "pypi_repo" uid = Column(String, primary_key=True) diff --git a/swh/lister/pypi/tasks.py b/swh/lister/pypi/tasks.py index b59e6b0..a6ef7f3 100644 --- a/swh/lister/pypi/tasks.py +++ b/swh/lister/pypi/tasks.py @@ -1,18 +1,18 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from .lister import PyPILister -@shared_task(name=__name__ + '.PyPIListerTask') +@shared_task(name=__name__ + ".PyPIListerTask") def list_pypi(**lister_args): - 'Full update of the PyPI (python) registry' + "Full update of the PyPI (python) registry" return PyPILister(**lister_args).run() -@shared_task(name=__name__ + '.ping') +@shared_task(name=__name__ + ".ping") def _ping(): - return 'OK' + return "OK" diff --git a/swh/lister/pypi/tests/conftest.py b/swh/lister/pypi/tests/conftest.py index 50a4239..658fdcb 100644 --- a/swh/lister/pypi/tests/conftest.py +++ b/swh/lister/pypi/tests/conftest.py @@ -1,23 +1,25 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.lister.core.tests.conftest import * # noqa @pytest.fixture def lister_pypi(swh_listers): - lister = swh_listers['pypi'] + lister = swh_listers["pypi"] # Add the load-deb-package in the scheduler backend - lister.scheduler.create_task_type({ - 'type': 'load-pypi', - 'description': 'Load PyPI package', - 'backend_name': 'swh.loader.package.tasks.LoadPyPI', - 'default_interval': '1 day', - }) + lister.scheduler.create_task_type( + { + "type": "load-pypi", + "description": "Load PyPI package", + "backend_name": "swh.loader.package.tasks.LoadPyPI", + "default_interval": "1 day", + } + ) return lister diff --git a/swh/lister/pypi/tests/test_lister.py b/swh/lister/pypi/tests/test_lister.py index 6f7fc4d..6338130 100644 --- a/swh/lister/pypi/tests/test_lister.py +++ b/swh/lister/pypi/tests/test_lister.py @@ -1,27 +1,27 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def test_pypi_lister(lister_pypi, requests_mock_datadir): lister_pypi.run() - r = lister_pypi.scheduler.search_tasks(task_type='load-pypi') + r = lister_pypi.scheduler.search_tasks(task_type="load-pypi") assert len(r) == 4 for row in r: - assert row['type'] == 'load-pypi' + assert row["type"] == "load-pypi" # arguments check - args = row['arguments']['args'] + args = row["arguments"]["args"] assert len(args) == 0 # kwargs - kwargs = row['arguments']['kwargs'] + kwargs = row["arguments"]["kwargs"] assert len(kwargs) == 1 - origin_url = kwargs['url'] - assert 'https://pypi.org/project' in origin_url + origin_url = kwargs["url"] + assert "https://pypi.org/project" in origin_url - assert row['policy'] == 'recurring' - assert row['priority'] is None + assert row["policy"] == "recurring" + assert row["priority"] is None diff --git a/swh/lister/pypi/tests/test_tasks.py b/swh/lister/pypi/tests/test_tasks.py index ab7032b..89ffeac 100644 --- a/swh/lister/pypi/tests/test_tasks.py +++ b/swh/lister/pypi/tests/test_tasks.py @@ -1,27 +1,25 @@ from unittest.mock import patch def test_ping(swh_app, celery_session_worker): - res = swh_app.send_task( - 'swh.lister.pypi.tasks.ping') + res = swh_app.send_task("swh.lister.pypi.tasks.ping") assert res res.wait() assert res.successful() - assert res.result == 'OK' + assert res.result == "OK" -@patch('swh.lister.pypi.tasks.PyPILister') +@patch("swh.lister.pypi.tasks.PyPILister") def test_lister(lister, swh_app, celery_session_worker): # setup the mocked PypiLister lister.return_value = lister lister.run.return_value = None - res = swh_app.send_task( - 'swh.lister.pypi.tasks.PyPIListerTask') + res = swh_app.send_task("swh.lister.pypi.tasks.PyPIListerTask") assert res res.wait() assert res.successful() lister.assert_called_once_with() lister.db_last_index.assert_not_called() lister.run.assert_called_once_with() diff --git a/swh/lister/tests/test_cli.py b/swh/lister/tests/test_cli.py index 3224c81..bc59895 100644 --- a/swh/lister/tests/test_cli.py +++ b/swh/lister/tests/test_cli.py @@ -1,67 +1,66 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.lister.core.lister_base import ListerBase from swh.lister.cli import get_lister, SUPPORTED_LISTERS from .test_utils import init_db def test_get_lister_wrong_input(): """Unsupported lister should raise""" with pytest.raises(ValueError) as e: - get_lister('unknown', 'db-url') + get_lister("unknown", "db-url") assert "Invalid lister" in str(e.value) def test_get_lister(): """Instantiating a supported lister should be ok """ db_url = init_db().url() for lister_name in SUPPORTED_LISTERS: lst = get_lister(lister_name, db_url) assert isinstance(lst, ListerBase) def test_get_lister_override(): """Overriding the lister configuration should populate its config """ db_url = init_db().url() listers = { - 'gitlab': 'https://other.gitlab.uni/api/v4/', - 'phabricator': 'https://somewhere.org/api/diffusion.repository.search', - 'cgit': 'https://some.where/cgit', + "gitlab": "https://other.gitlab.uni/api/v4/", + "phabricator": "https://somewhere.org/api/diffusion.repository.search", + "cgit": "https://some.where/cgit", } # check the override ends up defined in the lister for lister_name, url in listers.items(): lst = get_lister( - lister_name, db_url, **{ - 'url': url, - 'priority': 'high', - 'policy': 'oneshot', - }) + lister_name, + db_url, + **{"url": url, "priority": "high", "policy": "oneshot",} + ) assert lst.url == url - assert lst.config['priority'] == 'high' - assert lst.config['policy'] == 'oneshot' + assert lst.config["priority"] == "high" + assert lst.config["policy"] == "oneshot" # check the default urls are used and not the override (since it's not # passed) for lister_name, url in listers.items(): lst = get_lister(lister_name, db_url) # no override so this does not end up in lister's configuration - assert 'url' not in lst.config - assert 'priority' not in lst.config - assert 'oneshot' not in lst.config + assert "url" not in lst.config + assert "priority" not in lst.config + assert "oneshot" not in lst.config assert lst.url == lst.DEFAULT_URL diff --git a/swh/lister/tests/test_utils.py b/swh/lister/tests/test_utils.py index 1fe7e7a..05966ce 100644 --- a/swh/lister/tests/test_utils.py +++ b/swh/lister/tests/test_utils.py @@ -1,38 +1,37 @@ # Copyright (C) 2018-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from testing.postgresql import Postgresql from swh.lister import utils class UtilsTest(unittest.TestCase): - def test_split_range(self): actual_ranges = list(utils.split_range(14, 5)) self.assertEqual(actual_ranges, [(0, 5), (5, 10), (10, 14)]) actual_ranges = list(utils.split_range(19, 10)) self.assertEqual(actual_ranges, [(0, 10), (10, 19)]) def test_split_range_errors(self): with self.assertRaises(TypeError): list(utils.split_range(None, 1)) with self.assertRaises(TypeError): list(utils.split_range(100, None)) def init_db(): """Factorize the db_url instantiation Returns: db object to ease db manipulation """ - initdb_args = Postgresql.DEFAULT_SETTINGS['initdb_args'] - initdb_args = ' '.join([initdb_args, '-E UTF-8']) + initdb_args = Postgresql.DEFAULT_SETTINGS["initdb_args"] + initdb_args = " ".join([initdb_args, "-E UTF-8"]) return Postgresql(initdb_args=initdb_args) diff --git a/tox.ini b/tox.ini index 7e3f601..ef77275 100644 --- a/tox.ini +++ b/tox.ini @@ -1,29 +1,36 @@ [tox] -envlist=flake8,mypy,py3 +envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = swh.core[http] >= 0.0.61 pytest-cov dev: ipdb commands = pytest \ !dev: --cov={envsitepackagesdir}/swh/lister/ --cov-branch \ {envsitepackagesdir}/swh/lister/ {posargs} +[testenv:black] +skip_install = true +deps = + black +commands = + {envpython} -m black --check swh + [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy commands = mypy swh