diff --git a/PKG-INFO b/PKG-INFO index 3906c60..5a684de 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,271 +1,271 @@ Metadata-Version: 2.1 Name: swh.lister -Version: 0.0.33 +Version: 0.0.34 Summary: Software Heritage lister Home-page: https://forge.softwareheritage.org/diffusion/DLSGH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Funding, https://www.softwareheritage.org/donate -Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Source, https://forge.softwareheritage.org/source/swh-lister +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Description: swh-lister ========== This component from the Software Heritage stack aims to produce listings of software origins and their urls hosted on various public developer platforms or package managers. As these operations are quite similar, it provides a set of Python modules abstracting common software origins listing behaviors. It also provides several lister implementations, contained in the following Python modules: - `swh.lister.bitbucket` - `swh.lister.debian` - `swh.lister.github` - `swh.lister.gitlab` - `swh.lister.gnu` - `swh.lister.pypi` - `swh.lister.npm` - `swh.lister.phabricator` - `swh.lister.cran` - `swh.lister.cgit` - `swh.lister.packagist` Dependencies ------------ All required dependencies can be found in the `requirements*.txt` files located at the root of the repository. Local deployment ---------------- ## lister configuration Each lister implemented so far by Software Heritage (`github`, `gitlab`, `debian`, `pypi`, `npm`) must be configured by following the instructions below (please note that you have to replace `` by one of the lister name introduced above). ### Preparation steps 1. `mkdir ~/.config/swh/ ~/.cache/swh/lister//` 2. create configuration file `~/.config/swh/lister_.yml` 3. Bootstrap the db instance schema ```lang=bash $ createdb lister- $ python3 -m swh.lister.cli --db-url postgres:///lister- ``` Note: This bootstraps a minimum data set needed for the lister to run. ### Configuration file sample Minimalistic configuration shared by all listers to add in file `~/.config/swh/lister_.yml`: ```lang=yml storage: cls: 'remote' args: url: 'http://localhost:5002/' scheduler: cls: 'remote' args: url: 'http://localhost:5008/' lister: cls: 'local' args: # see http://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls db: 'postgresql:///lister-' credentials: [] cache_responses: True cache_dir: /home/user/.cache/swh/lister// ``` Note: This expects storage (5002) and scheduler (5008) services to run locally ## lister-github Once configured, you can execute a GitHub lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.github.tasks import range_github_lister logging.basicConfig(level=logging.DEBUG) range_github_lister(364, 365) ... ``` ## lister-gitlab Once configured, you can execute a GitLab lister using the instructions detailed in the `python3` scripts below: ```lang=python import logging from swh.lister.gitlab.tasks import range_gitlab_lister logging.basicConfig(level=logging.DEBUG) range_gitlab_lister(1, 2, { 'instance': 'debian', 'api_baseurl': 'https://salsa.debian.org/api/v4', 'sort': 'asc', 'per_page': 20 }) ``` ```lang=python import logging from swh.lister.gitlab.tasks import full_gitlab_relister logging.basicConfig(level=logging.DEBUG) full_gitlab_relister({ 'instance': '0xacab', 'api_baseurl': 'https://0xacab.org/api/v4', 'sort': 'asc', 'per_page': 20 }) ``` ```lang=python import logging from swh.lister.gitlab.tasks import incremental_gitlab_lister logging.basicConfig(level=logging.DEBUG) incremental_gitlab_lister({ 'instance': 'freedesktop.org', 'api_baseurl': 'https://gitlab.freedesktop.org/api/v4', 'sort': 'asc', 'per_page': 20 }) ``` ## lister-debian Once configured, you can execute a Debian lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.debian.tasks import debian_lister logging.basicConfig(level=logging.DEBUG) debian_lister('Debian') ``` ## lister-pypi Once configured, you can execute a PyPI lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.pypi.tasks import pypi_lister logging.basicConfig(level=logging.DEBUG) pypi_lister() ``` ## lister-npm Once configured, you can execute a npm lister using the following instructions in a `python3` REPL: ```lang=python import logging from swh.lister.npm.tasks import npm_lister logging.basicConfig(level=logging.DEBUG) npm_lister() ``` ## lister-phabricator Once configured, you can execute a Phabricator lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.phabricator.tasks import incremental_phabricator_lister logging.basicConfig(level=logging.DEBUG) incremental_phabricator_lister(forge_url='https://forge.softwareheritage.org', api_token='XXXX') ``` ## lister-gnu Once configured, you can execute a PyPI lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.gnu.tasks import gnu_lister logging.basicConfig(level=logging.DEBUG) gnu_lister() ``` ## lister-cran Once configured, you can execute a CRAN lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.cran.tasks import cran_lister logging.basicConfig(level=logging.DEBUG) cran_lister() ``` ## lister-cgit Once configured, you can execute a cgit lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.cgit.tasks import cgit_lister logging.basicConfig(level=logging.DEBUG) # simple cgit instance cgit_lister(url='https://git.kernel.org/') # cgit instance whose listed repositories differ from the base url cgit_lister(url='https://cgit.kde.org/', url_prefix='https://anongit.kde.org/') ``` ## lister-packagist Once configured, you can execute a Packagist lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.packagist.tasks import packagist_lister logging.basicConfig(level=logging.DEBUG) packagist_lister() ``` Licensing --------- This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. See top-level LICENSE file for the full text of the GNU General Public License along with this program. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-swh.txt b/requirements-swh.txt index ac9a199..7fa659d 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,3 @@ swh.core -swh.storage >= 0.0.122 -swh.storage[schemata] -swh.scheduler >= 0.0.39 +swh.storage[schemata] >= 0.0.122 +swh.scheduler >= 0.0.58 diff --git a/requirements-test.txt b/requirements-test.txt index 4a7da94..71f2b3d 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,4 @@ -pytest<4 +pytest pytest-postgresql requests_mock testing.postgresql diff --git a/setup.py b/setup.py index 99e74c2..e868b1c 100755 --- a/setup.py +++ b/setup.py @@ -1,71 +1,83 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, 'README.md'), encoding='utf-8') as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = 'requirements-%s.txt' % name else: reqf = 'requirements.txt' requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.lister', description='Software Heritage lister', long_description=long_description, long_description_content_type='text/markdown', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DLSGH/', packages=find_packages(), install_requires=parse_requirements() + parse_requirements('swh'), tests_require=parse_requirements('test'), setup_requires=['vcversioner'], extras_require={'testing': parse_requirements('test')}, vcversioner={'version_module_paths': ['swh/lister/_version.py']}, include_package_data=True, entry_points=''' [console_scripts] swh-lister=swh.lister.cli:cli [swh.cli.subcommands] lister=swh.lister.cli:lister + [swh.workers] + lister.bitbucket=swh.lister.bitbucket:register + lister.cgit=swh.lister.cgit:register + lister.cran=swh.lister.cran:register + lister.debian=swh.lister.debian:register + lister.github=swh.lister.github:register + lister.gitlab=swh.lister.gitlab:register + lister.gnu=swh.lister.gnu:register + lister.npm=swh.lister.npm:register + lister.packagist=swh.lister.packagist:register + lister.phabricator=swh.lister.phabricator:register + lister.pypi=swh.lister.pypi:register ''', classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', 'Funding': 'https://www.softwareheritage.org/donate', 'Source': 'https://forge.softwareheritage.org/source/swh-lister', }, ) diff --git a/swh.lister.egg-info/PKG-INFO b/swh.lister.egg-info/PKG-INFO index 3906c60..5a684de 100644 --- a/swh.lister.egg-info/PKG-INFO +++ b/swh.lister.egg-info/PKG-INFO @@ -1,271 +1,271 @@ Metadata-Version: 2.1 Name: swh.lister -Version: 0.0.33 +Version: 0.0.34 Summary: Software Heritage lister Home-page: https://forge.softwareheritage.org/diffusion/DLSGH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Funding, https://www.softwareheritage.org/donate -Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Source, https://forge.softwareheritage.org/source/swh-lister +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Description: swh-lister ========== This component from the Software Heritage stack aims to produce listings of software origins and their urls hosted on various public developer platforms or package managers. As these operations are quite similar, it provides a set of Python modules abstracting common software origins listing behaviors. It also provides several lister implementations, contained in the following Python modules: - `swh.lister.bitbucket` - `swh.lister.debian` - `swh.lister.github` - `swh.lister.gitlab` - `swh.lister.gnu` - `swh.lister.pypi` - `swh.lister.npm` - `swh.lister.phabricator` - `swh.lister.cran` - `swh.lister.cgit` - `swh.lister.packagist` Dependencies ------------ All required dependencies can be found in the `requirements*.txt` files located at the root of the repository. Local deployment ---------------- ## lister configuration Each lister implemented so far by Software Heritage (`github`, `gitlab`, `debian`, `pypi`, `npm`) must be configured by following the instructions below (please note that you have to replace `` by one of the lister name introduced above). ### Preparation steps 1. `mkdir ~/.config/swh/ ~/.cache/swh/lister//` 2. create configuration file `~/.config/swh/lister_.yml` 3. Bootstrap the db instance schema ```lang=bash $ createdb lister- $ python3 -m swh.lister.cli --db-url postgres:///lister- ``` Note: This bootstraps a minimum data set needed for the lister to run. ### Configuration file sample Minimalistic configuration shared by all listers to add in file `~/.config/swh/lister_.yml`: ```lang=yml storage: cls: 'remote' args: url: 'http://localhost:5002/' scheduler: cls: 'remote' args: url: 'http://localhost:5008/' lister: cls: 'local' args: # see http://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls db: 'postgresql:///lister-' credentials: [] cache_responses: True cache_dir: /home/user/.cache/swh/lister// ``` Note: This expects storage (5002) and scheduler (5008) services to run locally ## lister-github Once configured, you can execute a GitHub lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.github.tasks import range_github_lister logging.basicConfig(level=logging.DEBUG) range_github_lister(364, 365) ... ``` ## lister-gitlab Once configured, you can execute a GitLab lister using the instructions detailed in the `python3` scripts below: ```lang=python import logging from swh.lister.gitlab.tasks import range_gitlab_lister logging.basicConfig(level=logging.DEBUG) range_gitlab_lister(1, 2, { 'instance': 'debian', 'api_baseurl': 'https://salsa.debian.org/api/v4', 'sort': 'asc', 'per_page': 20 }) ``` ```lang=python import logging from swh.lister.gitlab.tasks import full_gitlab_relister logging.basicConfig(level=logging.DEBUG) full_gitlab_relister({ 'instance': '0xacab', 'api_baseurl': 'https://0xacab.org/api/v4', 'sort': 'asc', 'per_page': 20 }) ``` ```lang=python import logging from swh.lister.gitlab.tasks import incremental_gitlab_lister logging.basicConfig(level=logging.DEBUG) incremental_gitlab_lister({ 'instance': 'freedesktop.org', 'api_baseurl': 'https://gitlab.freedesktop.org/api/v4', 'sort': 'asc', 'per_page': 20 }) ``` ## lister-debian Once configured, you can execute a Debian lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.debian.tasks import debian_lister logging.basicConfig(level=logging.DEBUG) debian_lister('Debian') ``` ## lister-pypi Once configured, you can execute a PyPI lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.pypi.tasks import pypi_lister logging.basicConfig(level=logging.DEBUG) pypi_lister() ``` ## lister-npm Once configured, you can execute a npm lister using the following instructions in a `python3` REPL: ```lang=python import logging from swh.lister.npm.tasks import npm_lister logging.basicConfig(level=logging.DEBUG) npm_lister() ``` ## lister-phabricator Once configured, you can execute a Phabricator lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.phabricator.tasks import incremental_phabricator_lister logging.basicConfig(level=logging.DEBUG) incremental_phabricator_lister(forge_url='https://forge.softwareheritage.org', api_token='XXXX') ``` ## lister-gnu Once configured, you can execute a PyPI lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.gnu.tasks import gnu_lister logging.basicConfig(level=logging.DEBUG) gnu_lister() ``` ## lister-cran Once configured, you can execute a CRAN lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.cran.tasks import cran_lister logging.basicConfig(level=logging.DEBUG) cran_lister() ``` ## lister-cgit Once configured, you can execute a cgit lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.cgit.tasks import cgit_lister logging.basicConfig(level=logging.DEBUG) # simple cgit instance cgit_lister(url='https://git.kernel.org/') # cgit instance whose listed repositories differ from the base url cgit_lister(url='https://cgit.kde.org/', url_prefix='https://anongit.kde.org/') ``` ## lister-packagist Once configured, you can execute a Packagist lister using the following instructions in a `python3` script: ```lang=python import logging from swh.lister.packagist.tasks import packagist_lister logging.basicConfig(level=logging.DEBUG) packagist_lister() ``` Licensing --------- This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. See top-level LICENSE file for the full text of the GNU General Public License along with this program. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.lister.egg-info/SOURCES.txt b/swh.lister.egg-info/SOURCES.txt index 3d67d6d..f11dfb3 100644 --- a/swh.lister.egg-info/SOURCES.txt +++ b/swh.lister.egg-info/SOURCES.txt @@ -1,140 +1,139 @@ MANIFEST.in Makefile README.md requirements-swh.txt requirements-test.txt requirements.txt setup.py version.txt swh/__init__.py swh.lister.egg-info/PKG-INFO swh.lister.egg-info/SOURCES.txt swh.lister.egg-info/dependency_links.txt swh.lister.egg-info/entry_points.txt swh.lister.egg-info/requires.txt swh.lister.egg-info/top_level.txt swh/lister/__init__.py swh/lister/_version.py swh/lister/cli.py swh/lister/utils.py swh/lister/bitbucket/__init__.py swh/lister/bitbucket/lister.py swh/lister/bitbucket/models.py swh/lister/bitbucket/tasks.py swh/lister/bitbucket/tests/__init__.py swh/lister/bitbucket/tests/api_empty_response.json swh/lister/bitbucket/tests/api_response.json swh/lister/bitbucket/tests/conftest.py swh/lister/bitbucket/tests/test_bb_lister.py swh/lister/bitbucket/tests/test_tasks.py swh/lister/cgit/__init__.py swh/lister/cgit/lister.py swh/lister/cgit/models.py swh/lister/cgit/tasks.py swh/lister/cgit/tests/__init__.py swh/lister/cgit/tests/conftest.py swh/lister/cgit/tests/repo_list.txt -swh/lister/cgit/tests/response.html swh/lister/cgit/tests/test_lister.py swh/lister/cgit/tests/test_tasks.py swh/lister/core/__init__.py swh/lister/core/abstractattribute.py swh/lister/core/db_utils.py swh/lister/core/indexing_lister.py swh/lister/core/lister_base.py swh/lister/core/lister_transports.py swh/lister/core/models.py swh/lister/core/page_by_page_lister.py swh/lister/core/simple_lister.py swh/lister/core/tests/__init__.py swh/lister/core/tests/conftest.py swh/lister/core/tests/test_abstractattribute.py swh/lister/core/tests/test_lister.py swh/lister/core/tests/test_model.py swh/lister/cran/__init__.py swh/lister/cran/list_all_packages.R swh/lister/cran/lister.py swh/lister/cran/models.py swh/lister/cran/tasks.py swh/lister/cran/tests/__init__.py swh/lister/cran/tests/conftest.py swh/lister/cran/tests/test_lister.py swh/lister/cran/tests/test_tasks.py swh/lister/debian/__init__.py swh/lister/debian/lister.py swh/lister/debian/tasks.py swh/lister/debian/utils.py swh/lister/debian/tests/__init__.py swh/lister/debian/tests/conftest.py swh/lister/debian/tests/test_tasks.py swh/lister/github/__init__.py swh/lister/github/lister.py swh/lister/github/models.py swh/lister/github/tasks.py swh/lister/github/tests/__init__.py swh/lister/github/tests/api_empty_response.json swh/lister/github/tests/api_response.json swh/lister/github/tests/conftest.py swh/lister/github/tests/test_gh_lister.py swh/lister/github/tests/test_tasks.py swh/lister/gitlab/__init__.py swh/lister/gitlab/lister.py swh/lister/gitlab/models.py swh/lister/gitlab/tasks.py swh/lister/gitlab/tests/__init__.py swh/lister/gitlab/tests/api_empty_response.json swh/lister/gitlab/tests/api_response.json swh/lister/gitlab/tests/conftest.py swh/lister/gitlab/tests/test_gitlab_lister.py swh/lister/gitlab/tests/test_tasks.py swh/lister/gnu/__init__.py swh/lister/gnu/lister.py swh/lister/gnu/models.py swh/lister/gnu/tasks.py swh/lister/gnu/tests/__init__.py swh/lister/gnu/tests/api_response.json swh/lister/gnu/tests/conftest.py swh/lister/gnu/tests/file_structure.json swh/lister/gnu/tests/find_tarballs_output.json swh/lister/gnu/tests/test_lister.py swh/lister/gnu/tests/test_tasks.py swh/lister/npm/__init__.py swh/lister/npm/lister.py swh/lister/npm/models.py swh/lister/npm/tasks.py swh/lister/npm/tests/api_empty_response.json swh/lister/npm/tests/api_inc_empty_response.json swh/lister/npm/tests/api_inc_response.json swh/lister/npm/tests/api_response.json swh/lister/packagist/__init__.py swh/lister/packagist/lister.py swh/lister/packagist/models.py swh/lister/packagist/tasks.py swh/lister/packagist/tests/__init__.py swh/lister/packagist/tests/api_response.json swh/lister/packagist/tests/conftest.py swh/lister/packagist/tests/test_lister.py swh/lister/packagist/tests/test_tasks.py swh/lister/phabricator/__init__.py swh/lister/phabricator/lister.py swh/lister/phabricator/models.py swh/lister/phabricator/tasks.py swh/lister/phabricator/tests/__init__.py swh/lister/phabricator/tests/api_empty_response.json swh/lister/phabricator/tests/api_response.json swh/lister/phabricator/tests/api_response_undefined_protocol.json swh/lister/phabricator/tests/conftest.py swh/lister/phabricator/tests/test_lister.py swh/lister/phabricator/tests/test_tasks.py swh/lister/pypi/__init__.py swh/lister/pypi/lister.py swh/lister/pypi/models.py swh/lister/pypi/tasks.py swh/lister/pypi/tests/__init__.py swh/lister/pypi/tests/api_response.html swh/lister/pypi/tests/conftest.py swh/lister/pypi/tests/test_lister.py swh/lister/pypi/tests/test_tasks.py swh/lister/tests/__init__.py swh/lister/tests/test_cli.py swh/lister/tests/test_utils.py \ No newline at end of file diff --git a/swh.lister.egg-info/entry_points.txt b/swh.lister.egg-info/entry_points.txt index cebbda5..da30a4d 100644 --- a/swh.lister.egg-info/entry_points.txt +++ b/swh.lister.egg-info/entry_points.txt @@ -1,6 +1,18 @@ [console_scripts] swh-lister=swh.lister.cli:cli [swh.cli.subcommands] lister=swh.lister.cli:lister + [swh.workers] + lister.bitbucket=swh.lister.bitbucket:register + lister.cgit=swh.lister.cgit:register + lister.cran=swh.lister.cran:register + lister.debian=swh.lister.debian:register + lister.github=swh.lister.github:register + lister.gitlab=swh.lister.gitlab:register + lister.gnu=swh.lister.gnu:register + lister.npm=swh.lister.npm:register + lister.packagist=swh.lister.packagist:register + lister.phabricator=swh.lister.phabricator:register + lister.pypi=swh.lister.pypi:register \ No newline at end of file diff --git a/swh.lister.egg-info/requires.txt b/swh.lister.egg-info/requires.txt index 9e82f51..aa88592 100644 --- a/swh.lister.egg-info/requires.txt +++ b/swh.lister.egg-info/requires.txt @@ -1,18 +1,17 @@ SQLAlchemy arrow python_debian requests setuptools xmltodict iso8601 beautifulsoup4 swh.core -swh.storage>=0.0.122 -swh.storage[schemata] -swh.scheduler>=0.0.39 +swh.storage[schemata]>=0.0.122 +swh.scheduler>=0.0.58 [testing] -pytest<4 +pytest pytest-postgresql requests_mock testing.postgresql diff --git a/swh/lister/_version.py b/swh/lister/_version.py index c9aff30..1daf266 100644 --- a/swh/lister/_version.py +++ b/swh/lister/_version.py @@ -1,5 +1,5 @@ # This file is automatically generated by setup.py. -__version__ = '0.0.33' -__sha__ = 'g09f3605' -__revision__ = 'g09f3605' +__version__ = '0.0.34' +__sha__ = 'g481b30c' +__revision__ = 'g481b30c' diff --git a/swh/lister/bitbucket/__init__.py b/swh/lister/bitbucket/__init__.py index e69de29..7a524e2 100644 --- a/swh/lister/bitbucket/__init__.py +++ b/swh/lister/bitbucket/__init__.py @@ -0,0 +1,13 @@ +# Copyright (C) 2019 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +def register(): + from .models import BitBucketModel + from .lister import BitBucketLister + + return {'models': [BitBucketModel], + 'lister': BitBucketLister, + 'task_modules': ['%s.tasks' % __name__], + } diff --git a/swh/lister/bitbucket/lister.py b/swh/lister/bitbucket/lister.py index 30787b1..45b573c 100644 --- a/swh/lister/bitbucket/lister.py +++ b/swh/lister/bitbucket/lister.py @@ -1,83 +1,79 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import iso8601 from datetime import datetime from urllib import parse from swh.lister.bitbucket.models import BitBucketModel from swh.lister.core.indexing_lister import IndexingHttpLister logger = logging.getLogger(__name__) -DEFAULT_BITBUCKET_PAGE = 10 - class BitBucketLister(IndexingHttpLister): PATH_TEMPLATE = '/repositories?after=%s' MODEL = BitBucketModel LISTER_NAME = 'bitbucket' + DEFAULT_URL = 'https://api.bitbucket.org/2.0' instance = 'bitbucket' default_min_bound = datetime.utcfromtimestamp(0) - def __init__(self, api_baseurl, override_config=None, per_page=100): - super().__init__( - api_baseurl=api_baseurl, override_config=override_config) - if per_page != DEFAULT_BITBUCKET_PAGE: - self.PATH_TEMPLATE = '%s&pagelen=%s' % ( - self.PATH_TEMPLATE, per_page) - # to stay consistent with prior behavior (20 * 10 repositories then) - self.flush_packet_db = int( - (self.flush_packet_db * DEFAULT_BITBUCKET_PAGE) / per_page) + def __init__(self, url=None, override_config=None, per_page=100): + super().__init__(url=url, override_config=override_config) + per_page = self.config.get('per_page', per_page) + + self.PATH_TEMPLATE = '%s&pagelen=%s' % ( + self.PATH_TEMPLATE, per_page) def get_model_from_repo(self, repo): return { 'uid': repo['uuid'], 'indexable': iso8601.parse_date(repo['created_on']), 'name': repo['name'], 'full_name': repo['full_name'], 'html_url': repo['links']['html']['href'], 'origin_url': repo['links']['clone'][0]['href'], 'origin_type': repo['scm'], } def get_next_target_from_response(self, response): """This will read the 'next' link from the api response if any and return it as a datetime. Args: response (Response): requests' response from api call Returns: next date as a datetime """ body = response.json() next_ = body.get('next') if next_ is not None: next_ = parse.urlparse(next_) return iso8601.parse_date(parse.parse_qs(next_.query)['after'][0]) def transport_response_simplified(self, response): repos = response.json()['values'] return [self.get_model_from_repo(repo) for repo in repos] def request_uri(self, identifier): identifier = parse.quote(identifier.isoformat()) return super().request_uri(identifier or '1970-01-01') def is_within_bounds(self, inner, lower=None, upper=None): # values are expected to be datetimes if lower is None and upper is None: ret = True elif lower is None: ret = inner <= upper elif upper is None: ret = inner >= lower else: ret = lower <= inner <= upper return ret diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py index a084846..b8fa316 100644 --- a/swh/lister/bitbucket/tasks.py +++ b/swh/lister/bitbucket/tasks.py @@ -1,57 +1,54 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.scheduler.celery_backend.config import app from .lister import BitBucketLister GROUP_SPLIT = 10000 -def new_lister(api_baseurl='https://api.bitbucket.org/2.0', per_page=100): - return BitBucketLister(api_baseurl=api_baseurl, per_page=per_page) - - @app.task(name=__name__ + '.IncrementalBitBucketLister') -def incremental_bitbucket_lister(**lister_args): - lister = new_lister(**lister_args) +def list_bitbucket_incremental(**lister_args): + '''Incremental update of the BitBucket forge''' + lister = BitBucketLister(**lister_args) lister.run(min_bound=lister.db_last_index(), max_bound=None) @app.task(name=__name__ + '.RangeBitBucketLister') -def range_bitbucket_lister(start, end, **lister_args): - lister = new_lister(**lister_args) +def _range_bitbucket_lister(start, end, **lister_args): + lister = BitBucketLister(**lister_args) lister.run(min_bound=start, max_bound=end) @app.task(name=__name__ + '.FullBitBucketRelister', bind=True) -def full_bitbucket_relister(self, split=None, **lister_args): - """Relist from the beginning of what's already been listed. +def list_bitbucket_full(self, split=None, **lister_args): + """Full update of the BitBucket forge It's not to be called for an initial listing. """ - lister = new_lister(**lister_args) + lister = BitBucketLister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) if not ranges: self.log.info('Nothing to list') return random.shuffle(ranges) - promise = group(range_bitbucket_lister.s(minv, maxv, **lister_args) + promise = group(_range_bitbucket_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)', (self.name, len(ranges))) try: promise.save() # so that we can restore the GroupResult in tests except (NotImplementedError, AttributeError): self.log.info('Unable to call save_group with current result backend.') return promise.id @app.task(name=__name__ + '.ping') -def ping(): +def _ping(): return 'OK' diff --git a/swh/lister/bitbucket/tests/test_tasks.py b/swh/lister/bitbucket/tests/test_tasks.py index 1e02b6f..bd881ab 100644 --- a/swh/lister/bitbucket/tests/test_tasks.py +++ b/swh/lister/bitbucket/tests/test_tasks.py @@ -1,92 +1,89 @@ from time import sleep from celery.result import GroupResult from unittest.mock import patch def test_ping(swh_app, celery_session_worker): res = swh_app.send_task( 'swh.lister.bitbucket.tasks.ping') assert res res.wait() assert res.successful() assert res.result == 'OK' @patch('swh.lister.bitbucket.tasks.BitBucketLister') def test_incremental(lister, swh_app, celery_session_worker): # setup the mocked BitbucketLister lister.return_value = lister lister.db_last_index.return_value = 42 lister.run.return_value = None res = swh_app.send_task( 'swh.lister.bitbucket.tasks.IncrementalBitBucketLister') assert res res.wait() assert res.successful() - lister.assert_called_once_with( - api_baseurl='https://api.bitbucket.org/2.0', per_page=100) + lister.assert_called_once_with() lister.db_last_index.assert_called_once_with() lister.run.assert_called_once_with(min_bound=42, max_bound=None) @patch('swh.lister.bitbucket.tasks.BitBucketLister') def test_range(lister, swh_app, celery_session_worker): # setup the mocked BitbucketLister lister.return_value = lister lister.run.return_value = None res = swh_app.send_task( 'swh.lister.bitbucket.tasks.RangeBitBucketLister', kwargs=dict(start=12, end=42)) assert res res.wait() assert res.successful() - lister.assert_called_once_with( - api_baseurl='https://api.bitbucket.org/2.0', per_page=100) + lister.assert_called_once_with() lister.db_last_index.assert_not_called() lister.run.assert_called_once_with(min_bound=12, max_bound=42) @patch('swh.lister.bitbucket.tasks.BitBucketLister') def test_relister(lister, swh_app, celery_session_worker): # setup the mocked BitbucketLister lister.return_value = lister lister.run.return_value = None lister.db_partition_indices.return_value = [ (i, i+9) for i in range(0, 50, 10)] res = swh_app.send_task( 'swh.lister.bitbucket.tasks.FullBitBucketRelister') assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) - lister.assert_called_with( - api_baseurl='https://api.bitbucket.org/2.0', per_page=100) + lister.assert_called_with() # one by the FullBitbucketRelister task # + 5 for the RangeBitbucketLister subtasks assert lister.call_count == 6 lister.db_last_index.assert_not_called() lister.db_partition_indices.assert_called_once_with(10000) # lister.run should have been called once per partition interval for i in range(5): assert (dict(min_bound=10*i, max_bound=10*i + 9),) \ in lister.run.call_args_list diff --git a/swh/lister/cgit/__init__.py b/swh/lister/cgit/__init__.py index e69de29..00d5788 100644 --- a/swh/lister/cgit/__init__.py +++ b/swh/lister/cgit/__init__.py @@ -0,0 +1,13 @@ +# Copyright (C) 2019 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +def register(): + from .models import CGitModel + from .lister import CGitLister + + return {'models': [CGitModel], + 'lister': CGitLister, + 'task_modules': ['%s.tasks' % __name__], + } diff --git a/swh/lister/cgit/lister.py b/swh/lister/cgit/lister.py index 3897adb..c459eb5 100644 --- a/swh/lister/cgit/lister.py +++ b/swh/lister/cgit/lister.py @@ -1,251 +1,138 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import random +import re import logging +from urllib.parse import urlparse, urljoin + from bs4 import BeautifulSoup -import requests -from urllib.parse import urlparse +from requests import Session +from requests.adapters import HTTPAdapter from .models import CGitModel -from swh.lister.core.simple_lister import SimpleLister -from swh.lister.core.lister_transports import ListerOnePageApiTransport - - -class CGitLister(ListerOnePageApiTransport, SimpleLister): - MODEL = CGitModel - LISTER_NAME = 'cgit' - PAGE = None - url_prefix_present = True - - def __init__(self, url, instance=None, url_prefix=None, - override_config=None): - """Inits Class with PAGE url and origin url prefix. - - Args: - url (str): URL of the CGit instance. - instance (str): Name of cgit instance. - url_prefix (str): Prefix of the origin_url. Origin link of the - repos of some special instances do not match - the url of the repository page, they have origin - url in the format /. - - """ - self.PAGE = url - if url_prefix is None: - self.url_prefix = url - self.url_prefix_present = False - else: - self.url_prefix = url_prefix - - if not self.url_prefix.endswith('/'): - self.url_prefix += '/' - url = urlparse(self.PAGE) - self.url_netloc = find_netloc(url) - - if not instance: - instance = url.hostname - self.instance = instance - - ListerOnePageApiTransport .__init__(self) - SimpleLister.__init__(self, override_config=override_config) - - def list_packages(self, response): - """List the actual cgit instance origins from the response. - - Find repositories metadata by parsing the html page (response's raw - content). If there are links in the html page, retrieve those - repositories metadata from those pages as well. Return the - repositories as list of dictionaries. +from swh.core.utils import grouper +from swh.lister.core.lister_base import ListerBase - Args: - response (Response): http api request response. - - Returns: - List of repository origin urls (as dict) included in the response. - - """ - repos_details = [] - - for repo in self.yield_repo_from_responses(response): - repo_name = repo.a.text - origin_url = self.find_origin_url(repo, repo_name) - try: - time = repo.span['title'] - except Exception: - time = None - - if origin_url is not None: - repos_details.append({ - 'name': repo_name, - 'time': time, - 'origin_url': origin_url, - }) +logger = logging.getLogger(__name__) - random.shuffle(repos_details) - return repos_details - def yield_repo_from_responses(self, response): - """Yield repositories from all pages of the cgit instance. +class CGitLister(ListerBase): + """Lister class for CGit repositories. - Finds the number of pages present and yields the list of - repositories present. + This lister will retrieve the list of published git repositories by + parsing the HTML page(s) of the index retrieved at `url`. - Args: - response (Response): server response. + For each found git repository, a query is made at the given url found + in this index to gather published "Clone" URLs to be used as origin + URL for that git repo. - Yields: - List of beautifulsoup object of repository rows. - - """ - html = response.text - yield from get_repo_list(html) - pages = self.get_pages(make_soup(html)) - if len(pages) > 1: - yield from self.get_repos_from_pages(pages[1:]) + If several "Clone" urls are provided, prefer the http/https one, if + any, otherwise fall bak to the first one. - def find_origin_url(self, repo, repo_name): - """Finds the origin url for a repository + A loader task is created for each git repository: + Task: + Type: load-git + Policy: recurring Args: - repo (Beautifulsoup): Beautifulsoup object of the repository - row present in base url. - repo_name (str): Repository name. - - Returns: - string: origin url. - - """ - if self.url_prefix_present: - return self.url_prefix + repo_name - - return self.get_url(repo) - - def get_pages(self, url_soup): - """Find URL of all pages. - - Finds URL of pages that are present by parsing over the HTML of - pagination present at the end of the page. + + Example: + Type: load-git + Policy: recurring Args: - url_soup (Beautifulsoup): a beautifulsoup object of base URL - - Returns: - list: URL of pages present for a cgit instance - - """ - pages = url_soup.find('div', {"class": "content"}).find_all('li') - - if not pages: - return [self.PAGE] - - return [self.get_url(page) for page in pages] - - def get_repos_from_pages(self, pages): - """Find repos from all pages. + 'https://git.savannah.gnu.org/git/elisp-es.git' + """ + MODEL = CGitModel + DEFAULT_URL = 'http://git.savannah.gnu.org/cgit/' + LISTER_NAME = 'cgit' + url_prefix_present = True - Request the available repos from the pages. This yields - the available repositories found as beautiful object representation. + def __init__(self, url=None, instance=None, override_config=None): + """Lister class for CGit repositories. Args: - pages ([str]): list of urls of all pages present for a - particular cgit instance. - - Yields: - List of beautifulsoup object of repository (url) rows - present in pages(except first). + url (str): main URL of the CGit instance, i.e. url of the index + of published git repositories on this instance. + instance (str): Name of cgit instance. Defaults to url's hostname + if unset. """ - for page in pages: - response = requests.get(page) - if not response.ok: - logging.warning('Failed to retrieve repositories from page %s', - page) - continue - - yield from get_repo_list(response.text) + super().__init__(override_config=override_config) - def get_url(self, repo): - """Finds url of a repo page. + if url is None: + url = self.config.get('url', self.DEFAULT_URL) + self.url = url - Finds the url of a repo page by parsing over the html of the row of - that repo present in the base url. - - Args: - repo (Beautifulsoup): a beautifulsoup object of the repository - row present in base url. - - Returns: - string: The url of a repo. - - """ - suffix = repo.a['href'] - return self.url_netloc + suffix - - def get_model_from_repo(self, repo): - """Transform from repository representation to model. + if not instance: + instance = urlparse(url).hostname + self.instance = instance + self.session = Session() + self.session.mount(self.url, HTTPAdapter(max_retries=3)) + + def run(self): + total = 0 + for repos in grouper(self.get_repos(), 10): + models = list(filter(None, (self.build_model(repo) + for repo in repos))) + injected_repos = self.inject_repo_data_into_db(models) + self.schedule_missing_tasks(models, injected_repos) + self.db_session.commit() + total += len(injected_repos) + logger.debug('Scheduled %s tasks for %s', total, self.url) + + def get_repos(self): + """Generate git 'project' URLs found on the current CGit server """ - return { - 'uid': self.PAGE + repo['name'], - 'name': repo['name'], - 'full_name': repo['name'], - 'html_url': repo['origin_url'], - 'origin_url': repo['origin_url'], - 'origin_type': 'git', - 'time_updated': repo['time'], - 'instance': self.instance, - } - - def transport_response_simplified(self, repos_details): - """Transform response to list for model manipulation. + next_page = self.url + while next_page: + bs_idx = self.get_and_parse(next_page) + for tr in bs_idx.find( + 'div', {"class": "content"}).find_all( + "tr", {"class": ""}): + yield urljoin(self.url, tr.find('a')['href']) + try: + pager = bs_idx.find('ul', {'class': 'pager'}) + current_page = pager.find('a', {'class': 'current'}) + if current_page: + next_page = current_page.parent.next_sibling.a['href'] + next_page = urljoin(self.url, next_page) + except (AttributeError, KeyError): + # no pager, or no next page + next_page = None + + def build_model(self, repo_url): + """Given the URL of a git repo project page on a CGit server, + return the repo description (dict) suitable for insertion in the db. """ - return [self.get_model_from_repo(repo) for repo in repos_details] + bs = self.get_and_parse(repo_url) + urls = [x['href'] for x in bs.find_all('a', {'rel': 'vcs-git'})] + if not urls: + return -def find_netloc(url): - """Finds the network location from then url. - - URL in the repo are relative to the network location part of base - URL, so we need to compute it to reconstruct URLs. - - Args: - url (urllib): urllib object of url. - - Returns: - string: Scheme and Network location part in the base URL. - - Example: - For url = https://git.kernel.org/pub/scm/ - >>> find_netloc(url) - 'https://git.kernel.org' - - """ - return '%s://%s' % (url.scheme, url.netloc) - - -def get_repo_list(response): - """Find repositories (as beautifulsoup object) available within the server - response. - - Args: - response (Response): server response - - Returns: - List all repositories as beautifulsoup object within the response. - - """ - repo_soup = make_soup(response) - return repo_soup \ - .find('div', {"class": "content"}).find_all("tr", {"class": ""}) - - -def make_soup(response): - """Instantiates a beautiful soup object from the response object. - - """ - return BeautifulSoup(response, features="html.parser") + # look for the http/https url, if any, and use it as origin_url + for url in urls: + if urlparse(url).scheme in ('http', 'https'): + origin_url = url + break + else: + # otherwise, choose the first one + origin_url = urls[0] + + return {'uid': repo_url, + 'name': bs.find('a', title=re.compile('.+'))['title'], + 'origin_type': 'git', + 'instance': self.instance, + 'origin_url': origin_url, + } + + def get_and_parse(self, url): + "Get the given url and parse the retrieved HTML using BeautifulSoup" + return BeautifulSoup(self.session.get(url).text, + features='html.parser') diff --git a/swh/lister/cgit/models.py b/swh/lister/cgit/models.py index 4e16798..be10161 100644 --- a/swh/lister/cgit/models.py +++ b/swh/lister/cgit/models.py @@ -1,18 +1,17 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from sqlalchemy import Column, String from ..core.models import ModelBase class CGitModel(ModelBase): """a CGit repository representation """ __tablename__ = 'cgit_repo' uid = Column(String, primary_key=True) - time_updated = Column(String) instance = Column(String, index=True) diff --git a/swh/lister/cgit/tasks.py b/swh/lister/cgit/tasks.py index 31148dd..0066cf7 100644 --- a/swh/lister/cgit/tasks.py +++ b/swh/lister/cgit/tasks.py @@ -1,25 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.celery_backend.config import app from .lister import CGitLister -def new_lister(url='https://git.kernel.org/', - url_prefix=None, - instance='kernal', **kw): - return CGitLister(url=url, instance=instance, url_prefix=url_prefix, - **kw) - - @app.task(name=__name__ + '.CGitListerTask') -def cgit_lister(**lister_args): - lister = new_lister(**lister_args) - lister.run() +def list_cgit(**lister_args): + '''Lister task for CGit instances''' + CGitLister(**lister_args).run() @app.task(name=__name__ + '.ping') -def ping(): +def _ping(): return 'OK' diff --git a/swh/lister/cgit/tests/response.html b/swh/lister/cgit/tests/response.html deleted file mode 100644 index cd95ccb..0000000 --- a/swh/lister/cgit/tests/response.html +++ /dev/null @@ -1,41 +0,0 @@ - - - -OpenEmbedded Git Repository Browser - - - - - - - - - diff --git a/swh/lister/cgit/tests/test_lister.py b/swh/lister/cgit/tests/test_lister.py index 049893e..8524c9b 100644 --- a/swh/lister/cgit/tests/test_lister.py +++ b/swh/lister/cgit/tests/test_lister.py @@ -1,27 +1,86 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - +from os.path import join, dirname +import re from urllib.parse import urlparse +from unittest.mock import Mock + +import requests_mock +from sqlalchemy import create_engine + +from swh.lister.cgit.lister import CGitLister +from swh.lister.tests.test_utils import init_db + + +DATADIR = join(dirname(__file__), 'data') + + +def get_response_cb(request, context): + url = urlparse(request.url) + dirname = url.hostname + filename = url.path[1:-1].replace('/', '_') + if url.query: + filename += ',' + url.query + resp = open(join(DATADIR, dirname, filename), 'rb').read() + return resp.decode('ascii', 'ignore') + + +def test_lister_no_page(): + with requests_mock.Mocker() as m: + m.get(re.compile('http://git.savannah.gnu.org'), text=get_response_cb) + lister = CGitLister() + + assert lister.url == 'http://git.savannah.gnu.org/cgit/' + + repos = list(lister.get_repos()) + assert len(repos) == 977 + + assert repos[0] == 'http://git.savannah.gnu.org/cgit/elisp-es.git/' + # note the url below is NOT a subpath of /cgit/ + assert repos[-1] == 'http://git.savannah.gnu.org/path/to/yetris.git/' # noqa + # note the url below is NOT on the same server + assert repos[-2] == 'http://example.org/cgit/xstarcastle.git/' + + +def test_lister_model(): + with requests_mock.Mocker() as m: + m.get(re.compile('http://git.savannah.gnu.org'), text=get_response_cb) + lister = CGitLister() + + repo = next(lister.get_repos()) + + model = lister.build_model(repo) + assert model == { + 'uid': 'http://git.savannah.gnu.org/cgit/elisp-es.git/', + 'name': 'elisp-es.git', + 'origin_type': 'git', + 'instance': 'git.savannah.gnu.org', + 'origin_url': 'https://git.savannah.gnu.org/git/elisp-es.git' + } -from swh.lister.cgit.lister import find_netloc, get_repo_list +def test_lister_with_pages(): + with requests_mock.Mocker() as m: + m.get(re.compile('http://git.tizen/cgit/'), text=get_response_cb) + lister = CGitLister(url='http://git.tizen/cgit/') -def test_get_repo_list(): - f = open('swh/lister/cgit/tests/response.html') - repos = get_repo_list(f.read()) - f = open('swh/lister/cgit/tests/repo_list.txt') - expected_repos = f.readlines() - expected_repos = list(map((lambda repo: repo[:-1]), expected_repos)) - assert len(repos) == len(expected_repos) - for i in range(len(repos)): - assert str(repos[i]) == expected_repos[i] + assert lister.url == 'http://git.tizen/cgit/' + repos = list(lister.get_repos()) + # we should have 16 repos (listed on 3 pages) + assert len(repos) == 16 -def test_find_netloc(): - first_url = urlparse('http://git.savannah.gnu.org/cgit/') - second_url = urlparse('https://cgit.kde.org/') - assert find_netloc(first_url) == 'http://git.savannah.gnu.org' - assert find_netloc(second_url) == 'https://cgit.kde.org' +def test_lister_run(): + with requests_mock.Mocker() as m: + m.get(re.compile('http://git.tizen/cgit/'), text=get_response_cb) + db = init_db() + conf = {'lister': {'cls': 'local', 'args': {'db': db.url()}}} + lister = CGitLister(url='http://git.tizen/cgit/', + override_config=conf) + engine = create_engine(db.url()) + lister.MODEL.metadata.create_all(engine) + lister.schedule_missing_tasks = Mock(return_value=None) + lister.run() diff --git a/swh/lister/cgit/tests/test_tasks.py b/swh/lister/cgit/tests/test_tasks.py index 4a36a05..38bf7b7 100644 --- a/swh/lister/cgit/tests/test_tasks.py +++ b/swh/lister/cgit/tests/test_tasks.py @@ -1,53 +1,30 @@ from unittest.mock import patch def test_ping(swh_app, celery_session_worker): res = swh_app.send_task( 'swh.lister.cgit.tasks.ping') assert res res.wait() assert res.successful() assert res.result == 'OK' @patch('swh.lister.cgit.tasks.CGitLister') -def test_lister_no_url_prefix(lister, swh_app, celery_session_worker): +def test_lister(lister, swh_app, celery_session_worker): # setup the mocked CGitLister lister.return_value = lister lister.run.return_value = None res = swh_app.send_task( 'swh.lister.cgit.tasks.CGitListerTask', kwargs=dict(url='https://git.kernel.org/', instance='kernel')) assert res res.wait() assert res.successful() lister.assert_called_once_with( url='https://git.kernel.org/', - url_prefix=None, instance='kernel') lister.db_last_index.assert_not_called() lister.run.assert_called_once_with() - - -@patch('swh.lister.cgit.tasks.CGitLister') -def test_lister_with_url_prefix(lister, swh_app, celery_session_worker): - # setup the mocked CGitLister - lister.return_value = lister - lister.run.return_value = None - - res = swh_app.send_task( - 'swh.lister.cgit.tasks.CGitListerTask', - kwargs=dict(url='https://cgit.kde.org/', - url_prefix='https://anongit.kde.org/', instance='kde')) - assert res - res.wait() - assert res.successful() - - lister.assert_called_once_with( - url='https://cgit.kde.org/', - url_prefix='https://anongit.kde.org/', - instance='kde') - lister.db_last_index.assert_not_called() - lister.run.assert_called_once_with() diff --git a/swh/lister/cli.py b/swh/lister/cli.py index 2445140..9e1dad4 100644 --- a/swh/lister/cli.py +++ b/swh/lister/cli.py @@ -1,237 +1,235 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os import logging +import pkg_resources +from copy import deepcopy +from importlib import import_module + import click +from sqlalchemy import create_engine from swh.core.cli import CONTEXT_SETTINGS +from swh.scheduler import get_scheduler +from swh.scheduler.task import SWHTask +from swh.lister.core.models import initialize logger = logging.getLogger(__name__) -SUPPORTED_LISTERS = ['github', 'gitlab', 'bitbucket', 'debian', 'pypi', - 'npm', 'phabricator', 'gnu', 'cran', 'cgit', 'packagist'] - - -# Base urls for most listers -DEFAULT_BASEURLS = { - 'gitlab': 'https://gitlab.com/api/v4/', - 'phabricator': 'https://forge.softwareheritage.org', - 'cgit': ( - 'http://git.savannah.gnu.org/cgit/', - 'http://git.savannah.gnu.org/git/' - ), -} +LISTERS = {entry_point.name.split('.', 1)[1]: entry_point + for entry_point in pkg_resources.iter_entry_points('swh.workers') + if entry_point.name.split('.', 1)[0] == 'lister'} +SUPPORTED_LISTERS = list(LISTERS) + +# the key in this dict is the suffix used to match new task-type to be added. +# For example for a task which function name is "list_gitlab_full', the default +# value used when inserting a new task-type in the scheduler db will be the one +# under the 'full' key below (because it matches xxx_full). +DEFAULT_TASK_TYPE = { + 'full': { # for tasks like 'list_xxx_full()' + 'default_interval': '90 days', + 'min_interval': '90 days', + 'max_interval': '90 days', + 'backoff_factor': 1 + }, + '*': { # value if not suffix matches + 'default_interval': '1 day', + 'min_interval': '1 day', + 'max_interval': '1 day', + 'backoff_factor': 1 + }, + } -def get_lister(lister_name, db_url, drop_tables=False, **conf): +def get_lister(lister_name, db_url=None, **conf): """Instantiate a lister given its name. Args: lister_name (str): Lister's name - db_url (str): Db's service url access - conf (dict): Extra configuration (policy, priority for example) + conf (dict): Configuration dict (lister db cnx, policy, priority...) Returns: Tuple (instantiated lister, drop_tables function, init schema function, insert minimum data function) """ - override_conf = { - 'lister': { - 'cls': 'local', - 'args': {'db': db_url} - }, - **conf, - } - - # To allow api_baseurl override per lister - if 'api_baseurl' in override_conf: - api_baseurl = override_conf.pop('api_baseurl') - else: - api_baseurl = DEFAULT_BASEURLS.get(lister_name) - - insert_minimum_data_fn = None - if lister_name == 'github': - from .github.models import IndexingModelBase as ModelBase - from .github.lister import GitHubLister - - _lister = GitHubLister(api_baseurl='https://api.github.com', - override_config=override_conf) - elif lister_name == 'bitbucket': - from .bitbucket.models import IndexingModelBase as ModelBase - from .bitbucket.lister import BitBucketLister - _lister = BitBucketLister(api_baseurl='https://api.bitbucket.org/2.0', - override_config=override_conf) - - elif lister_name == 'gitlab': - from .gitlab.models import ModelBase - from .gitlab.lister import GitLabLister - _lister = GitLabLister(api_baseurl=api_baseurl, - override_config=override_conf) - elif lister_name == 'debian': - from .debian.lister import DebianLister - ModelBase = DebianLister.MODEL # noqa - _lister = DebianLister(override_config=override_conf) - - def insert_minimum_data_fn(lister_name, lister): - logger.info('Inserting minimal data for %s', lister_name) - from swh.storage.schemata.distribution import ( - Distribution, Area) - d = Distribution( - name='Debian', - type='deb', - mirror_uri='http://deb.debian.org/debian/') - lister.db_session.add(d) - - areas = [] - for distribution_name in ['stretch']: - for area_name in ['main', 'contrib', 'non-free']: - areas.append(Area( - name='%s/%s' % (distribution_name, area_name), - distribution=d, - )) - lister.db_session.add_all(areas) - lister.db_session.commit() - - elif lister_name == 'pypi': - from .pypi.models import ModelBase - from .pypi.lister import PyPILister - _lister = PyPILister(override_config=override_conf) - - elif lister_name == 'npm': - from .npm.models import IndexingModelBase as ModelBase - from .npm.models import NpmVisitModel - from .npm.lister import NpmLister - _lister = NpmLister(override_config=override_conf) - - def insert_minimum_data_fn(lister_name, lister): - logger.info('Inserting minimal data for %s', lister_name) - if drop_tables: - NpmVisitModel.metadata.drop_all(lister.db_engine) - NpmVisitModel.metadata.create_all(lister.db_engine) - - elif lister_name == 'phabricator': - from .phabricator.models import IndexingModelBase as ModelBase - from .phabricator.lister import PhabricatorLister - _lister = PhabricatorLister(forge_url=api_baseurl, - override_config=override_conf) - - elif lister_name == 'gnu': - from .gnu.models import ModelBase - from .gnu.lister import GNULister - _lister = GNULister(override_config=override_conf) - - elif lister_name == 'cran': - from .cran.models import ModelBase - from .cran.lister import CRANLister - _lister = CRANLister(override_config=override_conf) - - elif lister_name == 'cgit': - from .cgit.models import ModelBase - from .cgit.lister import CGitLister - if isinstance(api_baseurl, str): - _lister = CGitLister(url=api_baseurl, - override_config=override_conf) - else: # tuple - _lister = CGitLister(url=api_baseurl[0], - url_prefix=api_baseurl[1], - override_config=override_conf) - - elif lister_name == 'packagist': - from .packagist.models import ModelBase # noqa - from .packagist.lister import PackagistLister - _lister = PackagistLister(override_config=override_conf) - - else: + if lister_name not in LISTERS: raise ValueError( 'Invalid lister %s: only supported listers are %s' % (lister_name, SUPPORTED_LISTERS)) + if db_url: + conf['lister'] = {'cls': 'local', 'args': {'db': db_url}} - drop_table_fn = None - if drop_tables: - def drop_table_fn(lister_name, lister): - logger.info('Dropping tables for %s', lister_name) - ModelBase.metadata.drop_all(lister.db_engine) - - def init_schema_fn(lister_name, lister): - logger.info('Creating tables for %s', lister_name) - ModelBase.metadata.create_all(lister.db_engine) - - return _lister, drop_table_fn, init_schema_fn, insert_minimum_data_fn + registry_entry = LISTERS[lister_name].load()() + lister_cls = registry_entry['lister'] + lister = lister_cls(override_config=conf) + return lister @click.group(name='lister', context_settings=CONTEXT_SETTINGS) +@click.option('--config-file', '-C', default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.") +@click.option('--db-url', '-d', default=None, + help='SQLAlchemy DB URL; see ' + '') # noqa @click.pass_context -def lister(ctx): +def lister(ctx, config_file, db_url): '''Software Heritage Lister tools.''' - pass + from swh.core import config + ctx.ensure_object(dict) + + override_conf = {} + if db_url: + override_conf['lister'] = { + 'cls': 'local', + 'args': {'db': db_url} + } + if not config_file: + config_file = os.environ.get('SWH_CONFIG_FILENAME') + conf = config.read(config_file, override_conf) + ctx.obj['config'] = conf + ctx.obj['override_conf'] = override_conf @lister.command(name='db-init', context_settings=CONTEXT_SETTINGS) -@click.option('--db-url', '-d', default='postgres:///lister', - help='SQLAlchemy DB URL; see ' - '') # noqa -@click.argument('listers', required=1, nargs=-1, - type=click.Choice(SUPPORTED_LISTERS + ['all'])) @click.option('--drop-tables', '-D', is_flag=True, default=False, help='Drop tables before creating the database schema') @click.pass_context -def cli(ctx, db_url, listers, drop_tables): +def db_init(ctx, drop_tables): """Initialize the database model for given listers. """ - if 'all' in listers: - listers = SUPPORTED_LISTERS - for lister_name in listers: - logger.info('Initializing lister %s', lister_name) - lister, drop_schema_fn, init_schema_fn, insert_minimum_data_fn = \ - get_lister(lister_name, db_url, drop_tables=drop_tables) + cfg = ctx.obj['config'] + lister_cfg = cfg['lister'] + if lister_cfg['cls'] != 'local': + click.echo('A local lister configuration is required') + ctx.exit(1) + + db_url = lister_cfg['args']['db'] + db_engine = create_engine(db_url) + + registry = {} + for lister, entrypoint in LISTERS.items(): + logger.info('Loading lister %s', lister) + registry[lister] = entrypoint.load()() + + logger.info('Initializing database') + initialize(db_engine, drop_tables) + + for lister, entrypoint in LISTERS.items(): + registry_entry = registry[lister] + init_hook = registry_entry.get('init') + if callable(init_hook): + logger.info('Calling init hook for %s', lister) + init_hook(db_engine) + + +@lister.command(name='register-task-types', context_settings=CONTEXT_SETTINGS) +@click.option('--lister', '-l', 'listers', multiple=True, + default=('all', ), show_default=True, + help='Only registers task-types for these listers', + type=click.Choice(['all'] + SUPPORTED_LISTERS)) +@click.pass_context +def register_task_types(ctx, listers): + """Insert missing task-type entries in the scheduler + + According to declared tasks in each loaded lister plugin. + """ + + cfg = ctx.obj['config'] + scheduler = get_scheduler(**cfg['scheduler']) - if drop_schema_fn: - drop_schema_fn(lister_name, lister) + for lister, entrypoint in LISTERS.items(): + if 'all' not in listers and lister not in listers: + continue + logger.info('Loading lister %s', lister) - init_schema_fn(lister_name, lister) + registry_entry = entrypoint.load()() + for task_module in registry_entry['task_modules']: + mod = import_module(task_module) + for task_name in (x for x in dir(mod) if not x.startswith('_')): + taskobj = getattr(mod, task_name) + if isinstance(taskobj, SWHTask): + task_type = task_name.replace('_', '-') + task_cfg = registry_entry.get('task_types', {}).get( + task_type, {}) + ensure_task_type(task_type, taskobj, task_cfg, scheduler) - if insert_minimum_data_fn: - insert_minimum_data_fn(lister_name, lister) + +def ensure_task_type(task_type, swhtask, task_config, scheduler): + """Ensure a task-type is known by the scheduler + + Args: + task_type (str): the type of the task to check/insert (correspond to + the 'type' field in the db) + swhtask (SWHTask): the SWHTask instance the task-type correspond to + task_config (dict): a dict with specific/overloaded values for the + task-type to be created + scheduler: the scheduler object used to access the scheduler db + """ + for suffix, defaults in DEFAULT_TASK_TYPE.items(): + if task_type.endswith('-' + suffix): + task_type_dict = defaults.copy() + break + else: + task_type_dict = DEFAULT_TASK_TYPE['*'].copy() + + task_type_dict['type'] = task_type + task_type_dict['backend_name'] = swhtask.name + if swhtask.__doc__: + task_type_dict['description'] = swhtask.__doc__.splitlines()[0] + + task_type_dict.update(task_config) + + current_task_type = scheduler.get_task_type(task_type) + if current_task_type: + # check some stuff + if current_task_type['backend_name'] != task_type_dict['backend_name']: + logger.warning('Existing task type %s for lister %s has a ' + 'different backend name than current ' + 'code version provides (%s vs. %s)', + task_type, + lister, + current_task_type['backend_name'], + task_type_dict['backend_name'], + ) + else: + logger.info('Create task type %s in scheduler', task_type) + logger.debug(' %s', task_type_dict) + scheduler.create_task_type(task_type_dict) @lister.command(name='run', context_settings=CONTEXT_SETTINGS, help='Trigger a full listing run for a particular forge ' 'instance. The output of this listing results in ' '"oneshot" tasks in the scheduler db with a priority ' 'defined by the user') -@click.option('--db-url', '-d', default='postgres:///lister', - help='SQLAlchemy DB URL; see ' - '') # noqa @click.option('--lister', '-l', help='Lister to run', type=click.Choice(SUPPORTED_LISTERS)) @click.option('--priority', '-p', default='high', type=click.Choice(['high', 'medium', 'low']), help='Task priority for the listed repositories to ingest') @click.argument('options', nargs=-1) @click.pass_context -def run(ctx, db_url, lister, priority, options): +def run(ctx, lister, priority, options): from swh.scheduler.cli.utils import parse_options + config = deepcopy(ctx.obj['config']) + if options: - _, kwargs = parse_options(options) - else: - kwargs = {} + config.update(parse_options(options)[1]) - override_config = { - 'priority': priority, - 'policy': 'oneshot', - **kwargs, - } + config['priority'] = priority + config['policy'] = 'oneshot' - lister, _, _, _ = get_lister(lister, db_url, **override_config) - lister.run() + get_lister(lister, **config).run() if __name__ == '__main__': - cli() + lister() diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py index 21d4a82..7d4a38a 100644 --- a/swh/lister/core/indexing_lister.py +++ b/swh/lister/core/indexing_lister.py @@ -1,249 +1,249 @@ # Copyright (C) 2015-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging from itertools import count import dateutil from sqlalchemy import func from .lister_transports import ListerHttpTransport from .lister_base import ListerBase logger = logging.getLogger(__name__) class IndexingLister(ListerBase): """Lister* intermediate class for any service that follows the pattern: - The service must report at least one stable unique identifier, known herein as the UID value, for every listed repository. - If the service splits the list of repositories into sublists, it must report at least one stable and sorted index identifier for every listed repository, known herein as the indexable value, which can be used as part of the service endpoint query to request a sublist beginning from that index. This might be the UID if the UID is monotonic. - Client sends a request to list repositories starting from a given index. - Client receives structured (json/xml/etc) response with information about a sequential series of repositories starting from that index and, if necessary/available, some indication of the URL or index for fetching the next series of repository data. See :class:`swh.lister.core.lister_base.ListerBase` for more details. This class cannot be instantiated. To create a new Lister for a source code listing service that follows the model described above, you must subclass this class and provide the required overrides in addition to any unmet implementation/override requirements of this class's base. (see parent class and member docstrings for details) Required Overrides:: def get_next_target_from_response """ flush_packet_db = 20 """Number of iterations in-between write flushes of lister repositories to db (see fn:`run`). """ default_min_bound = '' """Default initialization value for the minimum boundary index to use when undefined (see fn:`run`). """ @abc.abstractmethod def get_next_target_from_response(self, response): """Find the next server endpoint identifier given the entire response. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. Args: response (transport response): response page from the server Returns: index of next page, possibly extracted from a next href url """ pass # You probably don't need to override anything below this line. def filter_before_inject(self, models_list): """Overrides ListerBase.filter_before_inject Bounds query results by this Lister's set max_index. """ models_list = [ m for m in models_list if self.is_within_bounds(m['indexable'], None, self.max_index) ] return models_list def db_query_range(self, start, end): """Look in the db for a range of repositories with indexable values in the range [start, end] Args: start (model indexable type): start of desired indexable range end (model indexable type): end of desired indexable range Returns: a list of sqlalchemy.ext.declarative.declarative_base objects with indexable values within the given range """ retlist = self.db_session.query(self.MODEL) if start is not None: retlist = retlist.filter(self.MODEL.indexable >= start) if end is not None: retlist = retlist.filter(self.MODEL.indexable <= end) return retlist def db_partition_indices(self, partition_size): """Describe an index-space compartmentalization of the db table in equal sized chunks. This is used to describe min&max bounds for parallelizing fetch tasks. Args: partition_size (int): desired size to make each partition Returns: a list of tuples (begin, end) of indexable value that declare approximately equal-sized ranges of existing repos """ n = max(self.db_num_entries(), 10) partition_size = min(partition_size, n) n_partitions = n // partition_size min_index = self.db_first_index() max_index = self.db_last_index() if not min_index or not max_index: # Nothing to list return [] if isinstance(min_index, str): def format_bound(bound): return bound.isoformat() min_index = dateutil.parser.parse(min_index) max_index = dateutil.parser.parse(max_index) else: def format_bound(bound): return bound partition_width = (max_index - min_index) / n_partitions partitions = [ [ format_bound(min_index + i * partition_width), format_bound(min_index + (i+1) * partition_width), ] for i in range(n_partitions) ] # Remove bounds for lowest and highest partition partitions[0][0] = None partitions[-1][1] = None return [tuple(partition) for partition in partitions] def db_first_index(self): """Look in the db for the smallest indexable value Returns: the smallest indexable value of all repos in the db """ t = self.db_session.query(func.min(self.MODEL.indexable)).first() if t: return t[0] def db_last_index(self): """Look in the db for the largest indexable value Returns: the largest indexable value of all repos in the db """ t = self.db_session.query(func.max(self.MODEL.indexable)).first() if t: return t[0] def disable_deleted_repo_tasks(self, start, end, keep_these): """Disable tasks for repos that no longer exist between start and end. Args: start: beginning of range to disable end: end of range to disable keep_these (uid list): do not disable repos with uids in this list """ if end is None: end = self.db_last_index() if not self.is_within_bounds(end, None, self.max_index): end = self.max_index deleted_repos = self.winnow_models( self.db_query_range(start, end), self.MODEL.uid, keep_these ) tasks_to_disable = [repo.task_id for repo in deleted_repos if repo.task_id is not None] if tasks_to_disable: self.scheduler.disable_tasks(tasks_to_disable) for repo in deleted_repos: repo.task_id = None def run(self, min_bound=None, max_bound=None): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class docstring, continually fetching sublists until either there is no next index reference given or the given next index is greater than the desired max_bound. Args: min_bound (indexable type): optional index to start from max_bound (indexable type): optional index to stop at Returns: nothing """ self.min_index = min_bound self.max_index = max_bound def ingest_indexes(): index = min_bound or self.default_min_bound for i in count(1): response, injected_repos = self.ingest_data(index) if not response and not injected_repos: logger.info('No response from api server, stopping') return next_index = self.get_next_target_from_response(response) # Determine if any repos were deleted, and disable their tasks. keep_these = list(injected_repos.keys()) self.disable_deleted_repo_tasks(index, next_index, keep_these) # termination condition if next_index is None or next_index == index: logger.info('stopping after index %s, no next link found', index) return index = next_index logger.debug('Index: %s', index) yield i for i in ingest_indexes(): if (i % self.flush_packet_db) == 0: logger.debug('Flushing updates at index %s', i) self.db_session.commit() self.db_session = self.mk_session() self.db_session.commit() self.db_session = self.mk_session() class IndexingHttpLister(ListerHttpTransport, IndexingLister): """Convenience class for ensuring right lookup and init order when combining IndexingLister and ListerHttpTransport.""" - def __init__(self, api_baseurl=None, override_config=None): - ListerHttpTransport.__init__(self, api_baseurl=api_baseurl) + def __init__(self, url=None, override_config=None): IndexingLister.__init__(self, override_config=override_config) + ListerHttpTransport.__init__(self, url=url) diff --git a/swh/lister/core/lister_base.py b/swh/lister/core/lister_base.py index fe23c5a..501b774 100644 --- a/swh/lister/core/lister_base.py +++ b/swh/lister/core/lister_base.py @@ -1,525 +1,517 @@ # Copyright (C) 2015-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import datetime import gzip import json import logging import os import re import time from sqlalchemy import create_engine, func from sqlalchemy.orm import sessionmaker from swh.core import config from swh.scheduler import get_scheduler, utils -from swh.storage import get_storage from .abstractattribute import AbstractAttribute logger = logging.getLogger(__name__) def utcnow(): return datetime.datetime.now(tz=datetime.timezone.utc) class FetchError(RuntimeError): def __init__(self, response): self.response = response def __str__(self): return repr(self.response) class ListerBase(abc.ABC, config.SWHConfig): """Lister core base class. Generally a source code hosting service provides an API endpoint for listing the set of stored repositories. A Lister is the discovery service responsible for finding this list, all at once or sequentially by parts, and queueing local tasks to fetch and ingest the referenced repositories. The core method in this class is ingest_data. Any subclasses should be calling this method one or more times to fetch and ingest data from API endpoints. See swh.lister.core.lister_base.IndexingLister for example usage. This class cannot be instantiated. Any instantiable Lister descending from ListerBase must provide at least the required overrides. (see member docstrings for details): Required Overrides: MODEL def transport_request def transport_response_to_string def transport_response_simplified def transport_quota_check Optional Overrides: def filter_before_inject def is_within_bounds """ MODEL = AbstractAttribute('Subclass type (not instance)' ' of swh.lister.core.models.ModelBase' ' customized for a specific service.') LISTER_NAME = AbstractAttribute("Lister's name") def transport_request(self, identifier): """Given a target endpoint identifier to query, try once to request it. Implementation of this method determines the network request protocol. Args: identifier (string): unique identifier for an endpoint query. e.g. If the service indexes lists of repositories by date and time of creation, this might be that as a formatted string. Or it might be an integer UID. Or it might be nothing. It depends on what the service needs. Returns: the entire request response Raises: Will catch internal transport-dependent connection exceptions and raise swh.lister.core.lister_base.FetchError instead. Other non-connection exceptions should propagate unchanged. """ pass def transport_response_to_string(self, response): """Convert the server response into a formatted string for logging. Implementation of this method depends on the shape of the network response object returned by the transport_request method. Args: response: the server response Returns: a pretty string of the response """ pass def transport_response_simplified(self, response): """Convert the server response into list of a dict for each repo in the response, mapping columns in the lister's MODEL class to repo data. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. Args: response: response object from the server. Returns: list of repo MODEL dicts ( eg. [{'uid': r['id'], etc.} for r in response.json()] ) """ pass def transport_quota_check(self, response): """Check server response to see if we're hitting request rate limits. Implementation of this method depends on the server communication protocol and API spec and the shape of the network response object returned by the transport_request method. Args: response (session response): complete API query response Returns: 1) must retry request? True/False 2) seconds to delay if True """ pass def filter_before_inject(self, models_list): """Filter models_list entries prior to injection in the db. This is ran directly after `transport_response_simplified`. Default implementation is to have no filtering. Args: models_list: list of dicts returned by transport_response_simplified. Returns: models_list with entries changed according to custom logic. """ return models_list def do_additional_checks(self, models_list): """Execute some additional checks on the model list (after the filtering). Default implementation is to run no check at all and to return the input as is. Args: models_list: list of dicts returned by transport_response_simplified. Returns: models_list with entries if checks ok, False otherwise """ return models_list def is_within_bounds(self, inner, lower=None, upper=None): """See if a sortable value is inside the range [lower,upper]. MAY BE OVERRIDDEN, for example if the server indexable* key is technically sortable but not automatically so. * - ( see: swh.lister.core.indexing_lister.IndexingLister ) Args: inner (sortable type): the value being checked lower (sortable type): optional lower bound upper (sortable type): optional upper bound Returns: whether inner is confined by the optional lower and upper bounds """ try: if lower is None and upper is None: return True elif lower is None: ret = inner <= upper elif upper is None: ret = inner >= lower else: ret = lower <= inner <= upper self.string_pattern_check(inner, lower, upper) except Exception as e: logger.error(str(e) + ': %s, %s, %s' % (('inner=%s%s' % (type(inner), inner)), ('lower=%s%s' % (type(lower), lower)), ('upper=%s%s' % (type(upper), upper))) ) raise return ret # You probably don't need to override anything below this line. DEFAULT_CONFIG = { - 'storage': ('dict', { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5002/' - }, - }), 'scheduler': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5008/' }, }), 'lister': ('dict', { 'cls': 'local', 'args': { 'db': 'postgresql:///lister', }, }), } @property def CONFIG_BASE_FILENAME(self): # noqa: N802 return 'lister_%s' % self.LISTER_NAME @property def ADDITIONAL_CONFIG(self): # noqa: N802 return { 'credentials': ('dict', {}), 'cache_responses': ('bool', False), 'cache_dir': ('str', '~/.cache/swh/lister/%s' % self.LISTER_NAME), } INITIAL_BACKOFF = 10 MAX_RETRIES = 7 CONN_SLEEP = 10 def __init__(self, override_config=None): self.backoff = self.INITIAL_BACKOFF logger.debug('Loading config from %s' % self.CONFIG_BASE_FILENAME) self.config = self.parse_config_file( base_filename=self.CONFIG_BASE_FILENAME, additional_configs=[self.ADDITIONAL_CONFIG] ) self.config['cache_dir'] = os.path.expanduser(self.config['cache_dir']) if self.config['cache_responses']: config.prepare_folders(self.config, 'cache_dir') if override_config: self.config.update(override_config) logger.debug('%s CONFIG=%s' % (self, self.config)) - self.storage = get_storage(**self.config['storage']) self.scheduler = get_scheduler(**self.config['scheduler']) self.db_engine = create_engine(self.config['lister']['args']['db']) self.mk_session = sessionmaker(bind=self.db_engine) self.db_session = self.mk_session() def reset_backoff(self): """Reset exponential backoff timeout to initial level.""" self.backoff = self.INITIAL_BACKOFF def back_off(self): """Get next exponential backoff timeout.""" ret = self.backoff self.backoff *= 10 return ret def safely_issue_request(self, identifier): """Make network request with retries, rate quotas, and response logs. Protocol is handled by the implementation of the transport_request method. Args: identifier: resource identifier Returns: server response """ retries_left = self.MAX_RETRIES do_cache = self.config['cache_responses'] r = None while retries_left > 0: try: r = self.transport_request(identifier) except FetchError: # network-level connection error, try again logger.warning( 'connection error on %s: sleep for %d seconds' % (identifier, self.CONN_SLEEP)) time.sleep(self.CONN_SLEEP) retries_left -= 1 continue if do_cache: self.save_response(r) # detect throttling must_retry, delay = self.transport_quota_check(r) if must_retry: logger.warning( 'rate limited on %s: sleep for %f seconds' % (identifier, delay)) time.sleep(delay) else: # request ok break retries_left -= 1 if not retries_left: logger.warning( 'giving up on %s: max retries exceeded' % identifier) return r def db_query_equal(self, key, value): """Look in the db for a row with key == value Args: key: column key to look at value: value to look for in that column Returns: sqlalchemy.ext.declarative.declarative_base object with the given key == value """ if isinstance(key, str): key = self.MODEL.__dict__[key] return self.db_session.query(self.MODEL) \ .filter(key == value).first() def winnow_models(self, mlist, key, to_remove): """Given a list of models, remove any with matching some member of a list of values. Args: mlist (list of model rows): the initial list of models key (column): the column to filter on to_remove (list): if anything in mlist has column equal to one of the values in to_remove, it will be removed from the result Returns: A list of model rows starting from mlist minus any matching rows """ if isinstance(key, str): key = self.MODEL.__dict__[key] if to_remove: return mlist.filter(~key.in_(to_remove)).all() else: return mlist.all() def db_num_entries(self): """Return the known number of entries in the lister db""" return self.db_session.query(func.count('*')).select_from(self.MODEL) \ .scalar() def db_inject_repo(self, model_dict): """Add/update a new repo to the db and mark it last_seen now. Args: model_dict: dictionary mapping model keys to values Returns: new or updated sqlalchemy.ext.declarative.declarative_base object associated with the injection """ sql_repo = self.db_query_equal('uid', model_dict['uid']) if not sql_repo: sql_repo = self.MODEL(**model_dict) self.db_session.add(sql_repo) else: for k in model_dict: setattr(sql_repo, k, model_dict[k]) sql_repo.last_seen = utcnow() return sql_repo def task_dict(self, origin_type, origin_url, **kwargs): """Return special dict format for the tasks list Args: origin_type (string) origin_url (string) Returns: the same information in a different form """ _type = 'load-%s' % origin_type _policy = kwargs.get('policy', 'recurring') priority = kwargs.get('priority') kw = {'priority': priority} if priority else {} return utils.create_task_dict(_type, _policy, origin_url, **kw) def string_pattern_check(self, a, b, c=None): """When comparing indexable types in is_within_bounds, complex strings may not be allowed to differ in basic structure. If they do, it could be a sign of not understanding the data well. For instance, an ISO 8601 time string cannot be compared against its urlencoded equivalent, but this is an easy mistake to accidentally make. This method acts as a friendly sanity check. Args: a (string): inner component of the is_within_bounds method b (string): lower component of the is_within_bounds method c (string): upper component of the is_within_bounds method Returns: nothing Raises: TypeError if strings a, b, and c don't conform to the same basic pattern. """ if isinstance(a, str): a_pattern = re.sub('[a-zA-Z0-9]', '[a-zA-Z0-9]', re.escape(a)) if (isinstance(b, str) and (re.match(a_pattern, b) is None) or isinstance(c, str) and (re.match(a_pattern, c) is None)): logger.debug(a_pattern) raise TypeError('incomparable string patterns detected') def inject_repo_data_into_db(self, models_list): """Inject data into the db. Args: models_list: list of dicts mapping keys from the db model for each repo to be injected Returns: dict of uid:sql_repo pairs """ injected_repos = {} for m in models_list: injected_repos[m['uid']] = self.db_inject_repo(m) return injected_repos def schedule_missing_tasks(self, models_list, injected_repos): """Find any newly created db entries that do not have been scheduled yet. Args: models_list ([Model]): List of dicts mapping keys in the db model for each repo injected_repos ([dict]): Dict of uid:sql_repo pairs that have just been created Returns: Nothing. Modifies injected_repos. """ tasks = {} def _task_key(m): return '%s-%s' % ( m['type'], json.dumps(m['arguments'], sort_keys=True) ) for m in models_list: ir = injected_repos[m['uid']] if not ir.task_id: # Patching the model instance to add the policy/priority task # scheduling if 'policy' in self.config: m['policy'] = self.config['policy'] if 'priority' in self.config: m['priority'] = self.config['priority'] task_dict = self.task_dict(**m) tasks[_task_key(task_dict)] = (ir, m, task_dict) new_tasks = self.scheduler.create_tasks( (task_dicts for (_, _, task_dicts) in tasks.values())) for task in new_tasks: ir, m, _ = tasks[_task_key(task)] ir.task_id = task['id'] def ingest_data(self, identifier, checks=False): """The core data fetch sequence. Request server endpoint. Simplify and filter response list of repositories. Inject repo information into local db. Queue loader tasks for linked repositories. Args: identifier: Resource identifier. checks (bool): Additional checks required """ # Request (partial?) list of repositories info response = self.safely_issue_request(identifier) if not response: return response, [] models_list = self.transport_response_simplified(response) models_list = self.filter_before_inject(models_list) if checks: models_list = self.do_additional_checks(models_list) if not models_list: return response, [] # inject into local db injected = self.inject_repo_data_into_db(models_list) # queue workers self.schedule_missing_tasks(models_list, injected) return response, injected def save_response(self, response): """Log the response from a server request to a cache dir. Args: response: full server response cache_dir: system path for cache dir Returns: nothing """ datepath = utcnow().isoformat() fname = os.path.join( self.config['cache_dir'], datepath + '.gz', ) with gzip.open(fname, 'w') as f: f.write(bytes( self.transport_response_to_string(response), 'UTF-8' )) diff --git a/swh/lister/core/lister_transports.py b/swh/lister/core/lister_transports.py index fac3b7e..55188fd 100644 --- a/swh/lister/core/lister_transports.py +++ b/swh/lister/core/lister_transports.py @@ -1,222 +1,229 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import random from datetime import datetime from email.utils import parsedate from pprint import pformat import logging import requests import xmltodict try: from swh.lister._version import __version__ except ImportError: __version__ = 'devel' from .abstractattribute import AbstractAttribute from .lister_base import FetchError logger = logging.getLogger(__name__) class ListerHttpTransport(abc.ABC): """Use the Requests library for making Lister endpoint requests. To be used in conjunction with ListerBase or a subclass of it. """ - + DEFAULT_URL = None PATH_TEMPLATE = AbstractAttribute('string containing a python string' ' format pattern that produces the API' ' endpoint path for listing stored' ' repositories when given an index.' ' eg. "/repositories?after=%s".' 'To be implemented in the API-specific' ' class inheriting this.') EXPECTED_STATUS_CODES = (200, 429, 403, 404) def request_headers(self): """Returns dictionary of any request headers needed by the server. MAY BE OVERRIDDEN if request headers are needed. """ return { 'User-Agent': 'Software Heritage lister (%s)' % self.lister_version } def request_instance_credentials(self): """Returns dictionary of any credentials configuration needed by the forge instance to list. - Returns: - dict of credentials per instance lister or {} if none. - - """ - all_creds = self.config.get('credentials') - if not all_creds: - return {} - lister_creds = all_creds.get(self.LISTER_NAME, {}) - creds = lister_creds.get(self.instance, {}) - return creds - - def request_uri(self, identifier): - """Get the full request URI given the transport_request identifier. - - MAY BE OVERRIDDEN if something more complex than the PATH_TEMPLATE is - required. - """ - path = self.PATH_TEMPLATE % identifier - return self.api_baseurl + path - - def request_params(self, identifier): - """Get the full parameters passed to requests given the - transport_request identifier. - - This uses credentials if any are provided. The 'credentials' - configuration is expected to be a dict of multiple levels. The first - level is the lister's name, the second is the lister's instance name. + The 'credentials' configuration is expected to be a dict of multiple + levels. The first level is the lister's name, the second is the + lister's instance name, which value is expected to be a list of + credential structures (typically a couple username/password). For example: credentials: github: # github lister github: # has only one instance (so far) - username: some password: somekey - username: one password: onekey - ... gitlab: # gitlab lister riseup: # has many instances - username: someone password: ... - ... gitlab: - username: someone password: ... - ... + Returns: + list of credential dicts for the current lister. + + """ + all_creds = self.config.get('credentials') + if not all_creds: + return [] + lister_creds = all_creds.get(self.LISTER_NAME, {}) + creds = lister_creds.get(self.instance, []) + return creds + + def request_uri(self, identifier): + """Get the full request URI given the transport_request identifier. + + MAY BE OVERRIDDEN if something more complex than the PATH_TEMPLATE is + required. + """ + path = self.PATH_TEMPLATE % identifier + return self.url + path + + def request_params(self, identifier): + """Get the full parameters passed to requests given the + transport_request identifier. + + This uses credentials if any are provided (see + request_instance_credentials). MAY BE OVERRIDDEN if something more complex than the request headers is needed. """ params = {} params['headers'] = self.request_headers() or {} creds = self.request_instance_credentials() if not creds: return params auth = random.choice(creds) if creds else None if auth: params['auth'] = (auth['username'], auth['password']) return params def transport_quota_check(self, response): """Implements ListerBase.transport_quota_check with standard 429 code check for HTTP with Requests library. MAY BE OVERRIDDEN if the server notifies about rate limits in a non-standard way that doesn't use HTTP 429 and the Retry-After response header. ( https://tools.ietf.org/html/rfc6585#section-4 ) """ if response.status_code == 429: # HTTP too many requests retry_after = response.headers.get('Retry-After', self.back_off()) try: # might be seconds return True, float(retry_after) except Exception: # might be http-date at_date = datetime(*parsedate(retry_after)[:6]) from_now = (at_date - datetime.today()).total_seconds() + 5 return True, max(0, from_now) else: # response ok self.reset_backoff() return False, 0 - def __init__(self, api_baseurl=None): - if not api_baseurl: - raise NameError('HTTP Lister Transport requires api_baseurl.') - self.api_baseurl = api_baseurl # eg. 'https://api.github.com' + def __init__(self, url=None): + if not url: + url = self.config.get('url') + if not url: + url = self.DEFAULT_URL + if not url: + raise NameError('HTTP Lister Transport requires an url.') + self.url = url # eg. 'https://api.github.com' self.session = requests.Session() self.lister_version = __version__ def _transport_action(self, identifier, method='get'): """Permit to ask information to the api prior to actually executing query. """ path = self.request_uri(identifier) params = self.request_params(identifier) try: if method == 'head': response = self.session.head(path, **params) else: response = self.session.get(path, **params) except requests.exceptions.ConnectionError as e: logger.warning('Failed to fetch %s: %s', path, e) raise FetchError(e) else: if response.status_code not in self.EXPECTED_STATUS_CODES: raise FetchError(response) return response def transport_head(self, identifier): """Retrieve head information on api. """ return self._transport_action(identifier, method='head') def transport_request(self, identifier): """Implements ListerBase.transport_request for HTTP using Requests. Retrieve get information on api. """ return self._transport_action(identifier) def transport_response_to_string(self, response): """Implements ListerBase.transport_response_to_string for HTTP given Requests responses. """ s = pformat(response.request.path_url) s += '\n#\n' + pformat(response.request.headers) s += '\n#\n' + pformat(response.status_code) s += '\n#\n' + pformat(response.headers) s += '\n#\n' try: # json? s += pformat(response.json()) except Exception: # not json try: # xml? s += pformat(xmltodict.parse(response.text)) except Exception: # not xml s += pformat(response.text) return s class ListerOnePageApiTransport(ListerHttpTransport): """Leverage requests library to retrieve basic html page and parse result. To be used in conjunction with ListerBase or a subclass of it. """ PAGE = AbstractAttribute("The server api's unique page to retrieve and " "parse for information") PATH_TEMPLATE = None # we do not use it - def __init__(self, api_baseurl=None): + def __init__(self, url=None): self.session = requests.Session() self.lister_version = __version__ def request_uri(self, _): """Get the full request URI given the transport_request identifier. """ return self.PAGE diff --git a/swh/lister/core/models.py b/swh/lister/core/models.py index d0c61c5..ce996b1 100644 --- a/swh/lister/core/models.py +++ b/swh/lister/core/models.py @@ -1,48 +1,72 @@ # Copyright (C) 2015-2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc from datetime import datetime +import logging from sqlalchemy import Column, DateTime, Integer, String -from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta +from sqlalchemy.ext.declarative import DeclarativeMeta from .abstractattribute import AbstractAttribute -SQLBase = declarative_base() +from swh.storage.schemata.distribution import SQLBase + + +logger = logging.getLogger(__name__) class ABCSQLMeta(abc.ABCMeta, DeclarativeMeta): pass class ModelBase(SQLBase, metaclass=ABCSQLMeta): """a common repository""" __abstract__ = True __tablename__ = AbstractAttribute uid = AbstractAttribute('Column(, primary_key=True)') name = Column(String, index=True) full_name = Column(String, index=True) html_url = Column(String) origin_url = Column(String) origin_type = Column(String) last_seen = Column(DateTime, nullable=False) task_id = Column(Integer) def __init__(self, **kw): kw['last_seen'] = datetime.now() super().__init__(**kw) class IndexingModelBase(ModelBase, metaclass=ABCSQLMeta): __abstract__ = True __tablename__ = AbstractAttribute # The value used for sorting, segmenting, or api query paging, # because uids aren't always sequential. indexable = AbstractAttribute('Column(, index=True)') + + +def initialize(db_engine, drop_tables=False, **kwargs): + """Default database initialization function for a lister. + + Typically called from the lister's initialization hook. + + Args: + models (list): list of SQLAlchemy tables/models to drop/create. + db_enfine (): the SQLAlchemy DB engine. + drop_tables (bool): if True, tables will be dropped before + (re)creating them. + """ + + if drop_tables: + logger.info('Dropping tables') + SQLBase.metadata.drop_all(db_engine, checkfirst=True) + + logger.info('Creating tables') + SQLBase.metadata.create_all(db_engine, checkfirst=True) diff --git a/swh/lister/core/page_by_page_lister.py b/swh/lister/core/page_by_page_lister.py index f05b3a5..3d6d9c7 100644 --- a/swh/lister/core/page_by_page_lister.py +++ b/swh/lister/core/page_by_page_lister.py @@ -1,160 +1,160 @@ # Copyright (C) 2015-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging from .lister_transports import ListerHttpTransport from .lister_base import ListerBase class PageByPageLister(ListerBase): """Lister* intermediate class for any service that follows the simple pagination page pattern. - Client sends a request to list repositories starting from a given page identifier. - Client receives structured (json/xml/etc) response with information about a sequential series of repositories (per page) starting from a given index. And, if available, some indication of the next page index for fetching the remaining repository data. See :class:`swh.lister.core.lister_base.ListerBase` for more details. This class cannot be instantiated. To create a new Lister for a source code listing service that follows the model described above, you must subclass this class. Then provide the required overrides in addition to any unmet implementation/override requirements of this class's base (see parent class and member docstrings for details). Required Overrides:: def get_next_target_from_response """ @abc.abstractmethod def get_next_target_from_response(self, response): """Find the next server endpoint page given the entire response. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. For example, some api can use the headers links to provide the next page. Args: response (transport response): response page from the server Returns: index of next page, possibly extracted from a next href url """ pass @abc.abstractmethod def get_pages_information(self): """Find the total number of pages. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. For example, some api can use dedicated headers: - x-total-pages to provide the total number of pages - x-total to provide the total number of repositories - x-per-page to provide the number of elements per page Returns: tuple (total number of repositories, total number of pages, per_page) """ pass # You probably don't need to override anything below this line. def do_additional_checks(self, models_list): """Potentially check for existence of repositories in models_list. This will be called only if check_existence is flipped on in the run method below. """ for m in models_list: sql_repo = self.db_query_equal('uid', m['uid']) if sql_repo: return False return models_list def run(self, min_bound=None, max_bound=None, check_existence=False): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class docstring. Continually fetching sublists until either there is no next page reference given or the given next page is greater than the desired max_page. Args: min_bound: optional page to start from max_bound: optional page to stop at check_existence (bool): optional existence check (for incremental lister whose sort order is inverted) Returns: nothing """ page = min_bound or 0 loop_count = 0 self.min_page = min_bound self.max_page = max_bound while self.is_within_bounds(page, self.min_page, self.max_page): logging.info('listing repos starting at %s' % page) response, injected_repos = self.ingest_data(page, checks=check_existence) if not response and not injected_repos: logging.info('No response from api server, stopping') break elif not injected_repos: logging.info('Repositories already seen, stopping') break next_page = self.get_next_target_from_response(response) # termination condition if (next_page is None) or (next_page == page): logging.info('stopping after page %s, no next link found' % page) break else: page = next_page loop_count += 1 if loop_count == 20: logging.info('flushing updates') loop_count = 0 self.db_session.commit() self.db_session = self.mk_session() self.db_session.commit() self.db_session = self.mk_session() class PageByPageHttpLister(ListerHttpTransport, PageByPageLister): """Convenience class for ensuring right lookup and init order when combining PageByPageLister and ListerHttpTransport. """ - def __init__(self, api_baseurl=None, override_config=None): - ListerHttpTransport.__init__(self, api_baseurl=api_baseurl) + def __init__(self, url=None, override_config=None): PageByPageLister.__init__(self, override_config=override_config) + ListerHttpTransport.__init__(self, url=url) diff --git a/swh/lister/core/tests/conftest.py b/swh/lister/core/tests/conftest.py index a1f9346..32d03d4 100644 --- a/swh/lister/core/tests/conftest.py +++ b/swh/lister/core/tests/conftest.py @@ -1,19 +1 @@ -import pytest from swh.scheduler.tests.conftest import * # noqa - - -@pytest.fixture(scope='session') -def celery_includes(): - return [ - 'swh.lister.bitbucket.tasks', - 'swh.lister.cgit.tasks', - 'swh.lister.cran.tasks', - 'swh.lister.debian.tasks', - 'swh.lister.github.tasks', - 'swh.lister.gitlab.tasks', - 'swh.lister.gnu.tasks', - 'swh.lister.npm.tasks', - 'swh.lister.packagist.tasks', - 'swh.lister.phabricator.tasks', - 'swh.lister.pypi.tasks', - ] diff --git a/swh/lister/core/tests/test_lister.py b/swh/lister/core/tests/test_lister.py index b7ae9e5..dec68c6 100644 --- a/swh/lister/core/tests/test_lister.py +++ b/swh/lister/core/tests/test_lister.py @@ -1,340 +1,340 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import time from unittest import TestCase from unittest.mock import Mock, patch import requests_mock from sqlalchemy import create_engine from swh.lister.core.abstractattribute import AbstractAttribute from swh.lister.tests.test_utils import init_db def noop(*args, **kwargs): pass class HttpListerTesterBase(abc.ABC): """Testing base class for listers. This contains methods for both :class:`HttpSimpleListerTester` and :class:`HttpListerTester`. See :class:`swh.lister.gitlab.tests.test_lister` for an example of how to customize for a specific listing service. """ Lister = AbstractAttribute('The lister class to test') lister_subdir = AbstractAttribute('bitbucket, github, etc.') good_api_response_file = AbstractAttribute('Example good response body') LISTER_NAME = 'fake-lister' # May need to override this if the headers are used for something def response_headers(self, request): return {} # May need to override this if the server uses non-standard rate limiting # method. # Please keep the requested retry delay reasonably low. def mock_rate_quota(self, n, request, context): self.rate_limit += 1 context.status_code = 429 context.headers['Retry-After'] = '1' return '{"error":"dummy"}' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.rate_limit = 1 self.response = None self.fl = None self.helper = None if self.__class__ != HttpListerTesterBase: self.run = TestCase.run.__get__(self, self.__class__) else: self.run = noop def mock_limit_n_response(self, n, request, context): self.fl.reset_backoff() if self.rate_limit <= n: return self.mock_rate_quota(n, request, context) else: return self.mock_response(request, context) def mock_limit_twice_response(self, request, context): return self.mock_limit_n_response(2, request, context) def get_api_response(self, identifier): fl = self.get_fl() if self.response is None: self.response = fl.safely_issue_request(identifier) return self.response def get_fl(self, override_config=None): """Retrieve an instance of fake lister (fl). """ if override_config or self.fl is None: - self.fl = self.Lister(api_baseurl='https://fakeurl', + self.fl = self.Lister(url='https://fakeurl', override_config=override_config) self.fl.INITIAL_BACKOFF = 1 self.fl.reset_backoff() return self.fl def disable_scheduler(self, fl): fl.schedule_missing_tasks = Mock(return_value=None) def disable_db(self, fl): fl.winnow_models = Mock(return_value=[]) fl.db_inject_repo = Mock(return_value=fl.MODEL()) fl.disable_deleted_repo_tasks = Mock(return_value=None) def init_db(self, db, model): engine = create_engine(db.url()) model.metadata.create_all(engine) @requests_mock.Mocker() def test_is_within_bounds(self, http_mocker): fl = self.get_fl() self.assertFalse(fl.is_within_bounds(1, 2, 3)) self.assertTrue(fl.is_within_bounds(2, 1, 3)) self.assertTrue(fl.is_within_bounds(1, 1, 1)) self.assertTrue(fl.is_within_bounds(1, None, None)) self.assertTrue(fl.is_within_bounds(1, None, 2)) self.assertTrue(fl.is_within_bounds(1, 0, None)) self.assertTrue(fl.is_within_bounds("b", "a", "c")) self.assertFalse(fl.is_within_bounds("a", "b", "c")) self.assertTrue(fl.is_within_bounds("a", None, "c")) self.assertTrue(fl.is_within_bounds("a", None, None)) self.assertTrue(fl.is_within_bounds("b", "a", None)) self.assertFalse(fl.is_within_bounds("a", "b", None)) self.assertTrue(fl.is_within_bounds("aa:02", "aa:01", "aa:03")) self.assertFalse(fl.is_within_bounds("aa:12", None, "aa:03")) with self.assertRaises(TypeError): fl.is_within_bounds(1.0, "b", None) with self.assertRaises(TypeError): fl.is_within_bounds("A:B", "A::B", None) class HttpListerTester(HttpListerTesterBase, abc.ABC): """Base testing class for subclass of :class:`swh.lister.core.indexing_lister.IndexingHttpLister` See :class:`swh.lister.github.tests.test_gh_lister` for an example of how to customize for a specific listing service. """ last_index = AbstractAttribute('Last index in good_api_response') first_index = AbstractAttribute('First index in good_api_response') bad_api_response_file = AbstractAttribute('Example bad response body') entries_per_page = AbstractAttribute('Number of results in good response') test_re = AbstractAttribute('Compiled regex matching the server url. Must' ' capture the index value.') convert_type = str """static method used to convert the "request_index" to its right type (for indexing listers for example, this is in accordance with the model's "indexable" column). """ def mock_response(self, request, context): self.fl.reset_backoff() self.rate_limit = 1 context.status_code = 200 custom_headers = self.response_headers(request) context.headers.update(custom_headers) req_index = self.request_index(request) if req_index == self.first_index: response_file = self.good_api_response_file else: response_file = self.bad_api_response_file with open('swh/lister/%s/tests/%s' % (self.lister_subdir, response_file), 'r', encoding='utf-8') as r: return r.read() def request_index(self, request): m = self.test_re.search(request.path_url) if m and (len(m.groups()) > 0): return self.convert_type(m.group(1)) @requests_mock.Mocker() def test_fetch_multiple_pages_yesdb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) db = init_db() fl = self.get_fl(override_config={ 'lister': { 'cls': 'local', 'args': {'db': db.url()} } }) self.init_db(db, fl.MODEL) self.disable_scheduler(fl) fl.run(min_bound=self.first_index) self.assertEqual(fl.db_last_index(), self.last_index) partitions = fl.db_partition_indices(5) self.assertGreater(len(partitions), 0) for k in partitions: self.assertLessEqual(len(k), 5) self.assertGreater(len(k), 0) @requests_mock.Mocker() def test_fetch_none_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_scheduler(fl) self.disable_db(fl) fl.run(min_bound=1, max_bound=1) # stores no results # FIXME: Determine what this method tries to test and add checks to # actually test @requests_mock.Mocker() def test_fetch_one_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_scheduler(fl) self.disable_db(fl) fl.run(min_bound=self.first_index, max_bound=self.first_index) # FIXME: Determine what this method tries to test and add checks to # actually test @requests_mock.Mocker() def test_fetch_multiple_pages_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_scheduler(fl) self.disable_db(fl) fl.run(min_bound=self.first_index) # FIXME: Determine what this method tries to test and add checks to # actually test @requests_mock.Mocker() def test_repos_list(self, http_mocker): """Test the number of repos listed by the lister """ http_mocker.get(self.test_re, text=self.mock_response) li = self.get_fl().transport_response_simplified( self.get_api_response(self.first_index) ) self.assertIsInstance(li, list) self.assertEqual(len(li), self.entries_per_page) @requests_mock.Mocker() def test_model_map(self, http_mocker): """Check if all the keys of model are present in the model created by the `transport_response_simplified` """ http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() li = fl.transport_response_simplified( self.get_api_response(self.first_index)) di = li[0] self.assertIsInstance(di, dict) pubs = [k for k in vars(fl.MODEL).keys() if not k.startswith('_')] for k in pubs: if k not in ['last_seen', 'task_id', 'id']: self.assertIn(k, di) @requests_mock.Mocker() def test_api_request(self, http_mocker): """Test API request for rate limit handling """ http_mocker.get(self.test_re, text=self.mock_limit_twice_response) with patch.object(time, 'sleep', wraps=time.sleep) as sleepmock: self.get_api_response(self.first_index) self.assertEqual(sleepmock.call_count, 2) class HttpSimpleListerTester(HttpListerTesterBase, abc.ABC): """Base testing class for subclass of :class:`swh.lister.core.simple)_lister.SimpleLister` See :class:`swh.lister.pypi.tests.test_lister` for an example of how to customize for a specific listing service. """ entries = AbstractAttribute('Number of results in good response') PAGE = AbstractAttribute("The server api's unique page to retrieve and " "parse for information") def get_fl(self, override_config=None): """Retrieve an instance of fake lister (fl). """ if override_config or self.fl is None: self.fl = self.Lister( override_config=override_config) self.fl.INITIAL_BACKOFF = 1 self.fl.reset_backoff() return self.fl def mock_response(self, request, context): self.fl.reset_backoff() self.rate_limit = 1 context.status_code = 200 custom_headers = self.response_headers(request) context.headers.update(custom_headers) response_file = self.good_api_response_file with open('swh/lister/%s/tests/%s' % (self.lister_subdir, response_file), 'r', encoding='utf-8') as r: return r.read() @requests_mock.Mocker() def test_api_request(self, http_mocker): """Test API request for rate limit handling """ http_mocker.get(self.PAGE, text=self.mock_limit_twice_response) with patch.object(time, 'sleep', wraps=time.sleep) as sleepmock: self.get_api_response(0) self.assertEqual(sleepmock.call_count, 2) @requests_mock.Mocker() def test_model_map(self, http_mocker): """Check if all the keys of model are present in the model created by the `transport_response_simplified` """ http_mocker.get(self.PAGE, text=self.mock_response) fl = self.get_fl() li = fl.list_packages(self.get_api_response(0)) li = fl.transport_response_simplified(li) di = li[0] self.assertIsInstance(di, dict) pubs = [k for k in vars(fl.MODEL).keys() if not k.startswith('_')] for k in pubs: if k not in ['last_seen', 'task_id', 'id']: self.assertIn(k, di) @requests_mock.Mocker() def test_repos_list(self, http_mocker): """Test the number of packages listed by the lister """ http_mocker.get(self.PAGE, text=self.mock_response) li = self.get_fl().list_packages( self.get_api_response(0) ) self.assertIsInstance(li, list) self.assertEqual(len(li), self.entries) diff --git a/swh/lister/cran/__init__.py b/swh/lister/cran/__init__.py index e69de29..6abfa5b 100644 --- a/swh/lister/cran/__init__.py +++ b/swh/lister/cran/__init__.py @@ -0,0 +1,13 @@ +# Copyright (C) 2019 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +def register(): + from .models import CRANModel + from .lister import CRANLister + + return {'models': [CRANModel], + 'lister': CRANLister, + 'task_modules': ['%s.tasks' % __name__], + } diff --git a/swh/lister/cran/tasks.py b/swh/lister/cran/tasks.py index f7098a1..3b449de 100644 --- a/swh/lister/cran/tasks.py +++ b/swh/lister/cran/tasks.py @@ -1,17 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.celery_backend.config import app from swh.lister.cran.lister import CRANLister @app.task(name=__name__ + '.CRANListerTask') -def cran_lister(**lister_args): +def list_cran(**lister_args): + '''Lister task for the CRAN registry''' CRANLister(**lister_args).run() @app.task(name=__name__ + '.ping') -def ping(): +def _ping(): return 'OK' diff --git a/swh/lister/debian/__init__.py b/swh/lister/debian/__init__.py index e69de29..b578fe7 100644 --- a/swh/lister/debian/__init__.py +++ b/swh/lister/debian/__init__.py @@ -0,0 +1,40 @@ +# Copyright (C) 2019 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +def debian_init(db_engine, override_conf=None): + from swh.storage.schemata.distribution import ( + Distribution, Area) + from .lister import DebianLister + + lister = DebianLister(override_config=override_conf) + + if not lister.db_session\ + .query(Distribution)\ + .filter(Distribution.name == 'Debian')\ + .one_or_none(): + + d = Distribution( + name='Debian', + type='deb', + mirror_uri='http://deb.debian.org/debian/') + lister.db_session.add(d) + + areas = [] + for distribution_name in ['stretch', 'buster']: + for area_name in ['main', 'contrib', 'non-free']: + areas.append(Area( + name='%s/%s' % (distribution_name, area_name), + distribution=d, + )) + lister.db_session.add_all(areas) + lister.db_session.commit() + + +def register(): + from .lister import DebianLister + return {'models': [DebianLister.MODEL], + 'lister': DebianLister, + 'task_modules': ['%s.tasks' % __name__], + 'init': debian_init} diff --git a/swh/lister/debian/lister.py b/swh/lister/debian/lister.py index 44b766e..8837b17 100644 --- a/swh/lister/debian/lister.py +++ b/swh/lister/debian/lister.py @@ -1,237 +1,237 @@ # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import bz2 from collections import defaultdict import datetime import gzip import lzma import logging from debian.deb822 import Sources from sqlalchemy.orm import joinedload, load_only from sqlalchemy.schema import CreateTable, DropTable from swh.storage.schemata.distribution import ( AreaSnapshot, Distribution, DistributionSnapshot, Package, TempPackage, ) from swh.lister.core.lister_base import ListerBase, FetchError from swh.lister.core.lister_transports import ListerHttpTransport decompressors = { 'gz': lambda f: gzip.GzipFile(fileobj=f), 'bz2': bz2.BZ2File, 'xz': lzma.LZMAFile, } class DebianLister(ListerHttpTransport, ListerBase): MODEL = Package PATH_TEMPLATE = None LISTER_NAME = 'debian' instance = 'debian' def __init__(self, override_config=None): - ListerHttpTransport.__init__(self, api_baseurl="bogus") + ListerHttpTransport.__init__(self, url="notused") ListerBase.__init__(self, override_config=override_config) def transport_request(self, identifier): """Subvert ListerHttpTransport.transport_request, to try several index URIs in turn. The Debian repository format supports several compression algorithms across the ages, so we try several URIs. Once we have found a working URI, we break and set `self.decompressor` to the one that matched. Returns: a requests Response object. Raises: FetchError: when all the URIs failed to be retrieved. """ response = None compression = None for uri, compression in self.area.index_uris(): response = super().transport_request(uri) if response.status_code == 200: break else: raise FetchError( "Could not retrieve index for %s" % self.area ) self.decompressor = decompressors.get(compression) return response def request_uri(self, identifier): # In the overridden transport_request, we pass # ListerBase.transport_request() the full URI as identifier, so we # need to return it here. return identifier def request_params(self, identifier): # Enable streaming to allow wrapping the response in the decompressor # in transport_response_simplified. params = super().request_params(identifier) params['stream'] = True return params def transport_response_simplified(self, response): """Decompress and parse the package index fetched in `transport_request`. For each package, we "pivot" the file list entries (Files, Checksums-Sha1, Checksums-Sha256), to return a files dict mapping filenames to their checksums. """ if self.decompressor: data = self.decompressor(response.raw) else: data = response.raw for src_pkg in Sources.iter_paragraphs(data.readlines()): files = defaultdict(dict) for field in src_pkg._multivalued_fields: if field.startswith('checksums-'): sum_name = field[len('checksums-'):] else: sum_name = 'md5sum' if field in src_pkg: for entry in src_pkg[field]: name = entry['name'] files[name]['name'] = entry['name'] files[name]['size'] = int(entry['size'], 10) files[name][sum_name] = entry[sum_name] yield { 'name': src_pkg['Package'], 'version': src_pkg['Version'], 'directory': src_pkg['Directory'], 'files': files, } def inject_repo_data_into_db(self, models_list): """Generate the Package entries that didn't previously exist. Contrary to ListerBase, we don't actually insert the data in database. `schedule_missing_tasks` does it once we have the origin and task identifiers. """ by_name_version = {} temp_packages = [] area_id = self.area.id for model in models_list: name = model['name'] version = model['version'] temp_packages.append({ 'area_id': area_id, 'name': name, 'version': version, }) by_name_version[name, version] = model # Add all the listed packages to a temporary table self.db_session.execute(CreateTable(TempPackage.__table__)) self.db_session.bulk_insert_mappings(TempPackage, temp_packages) def exists_tmp_pkg(db_session, model): return ( db_session.query(model) .filter(Package.area_id == TempPackage.area_id) .filter(Package.name == TempPackage.name) .filter(Package.version == TempPackage.version) .exists() ) # Filter out the packages that already exist in the main Package table new_packages = self.db_session\ .query(TempPackage)\ .options(load_only('name', 'version'))\ .filter(~exists_tmp_pkg(self.db_session, Package))\ .all() self.old_area_packages = self.db_session.query(Package).filter( exists_tmp_pkg(self.db_session, TempPackage) ).all() self.db_session.execute(DropTable(TempPackage.__table__)) added_packages = [] for package in new_packages: model = by_name_version[package.name, package.version] added_packages.append(Package(area=self.area, **model)) self.db_session.add_all(added_packages) return added_packages def schedule_missing_tasks(self, models_list, added_packages): """We create tasks at the end of the full snapshot processing""" return def create_tasks_for_snapshot(self, snapshot): tasks = [ snapshot.task_for_package(name, versions) for name, versions in snapshot.get_packages().items() ] return self.scheduler.create_tasks(tasks) def run(self, distribution, date=None): """Run the lister for a given (distribution, area) tuple. Args: distribution (str): name of the distribution (e.g. "Debian") date (datetime.datetime): date the snapshot is taken (defaults to now) """ distribution = self.db_session\ .query(Distribution)\ .options(joinedload(Distribution.areas))\ .filter(Distribution.name == distribution)\ .one_or_none() if not distribution: raise ValueError("Distribution %s is not registered" % distribution) if not distribution.type == 'deb': raise ValueError("Distribution %s is not a Debian derivative" % distribution) date = date or datetime.datetime.now(tz=datetime.timezone.utc) logging.debug('Creating snapshot for distribution %s on date %s' % (distribution, date)) snapshot = DistributionSnapshot(date=date, distribution=distribution) self.db_session.add(snapshot) for area in distribution.areas: if not area.active: continue self.area = area logging.debug('Processing area %s' % area) _, new_area_packages = self.ingest_data(None) area_snapshot = AreaSnapshot(snapshot=snapshot, area=area) self.db_session.add(area_snapshot) area_snapshot.packages.extend(new_area_packages) area_snapshot.packages.extend(self.old_area_packages) self.create_tasks_for_snapshot(snapshot) self.db_session.commit() return True diff --git a/swh/lister/debian/tasks.py b/swh/lister/debian/tasks.py index 9f5af90..356b192 100644 --- a/swh/lister/debian/tasks.py +++ b/swh/lister/debian/tasks.py @@ -1,17 +1,18 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.celery_backend.config import app from .lister import DebianLister @app.task(name=__name__ + '.DebianListerTask') -def debian_lister(distribution, **lister_args): +def list_debian_distribution(distribution, **lister_args): + '''List a Debian distribution''' DebianLister(**lister_args).run(distribution) @app.task(name=__name__ + '.ping') -def ping(): +def _ping(): return 'OK' diff --git a/swh/lister/github/__init__.py b/swh/lister/github/__init__.py index e69de29..13f4688 100644 --- a/swh/lister/github/__init__.py +++ b/swh/lister/github/__init__.py @@ -0,0 +1,13 @@ +# Copyright (C) 2019 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +def register(): + from .models import GitHubModel + from .lister import GitHubLister + + return {'models': [GitHubModel], + 'lister': GitHubLister, + 'task_modules': ['%s.tasks' % __name__], + } diff --git a/swh/lister/github/lister.py b/swh/lister/github/lister.py index 7d05579..173eb9d 100644 --- a/swh/lister/github/lister.py +++ b/swh/lister/github/lister.py @@ -1,49 +1,50 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import time from swh.lister.core.indexing_lister import IndexingHttpLister from swh.lister.github.models import GitHubModel class GitHubLister(IndexingHttpLister): PATH_TEMPLATE = '/repositories?since=%d' MODEL = GitHubModel + DEFAULT_URL = 'https://api.github.com' API_URL_INDEX_RE = re.compile(r'^.*/repositories\?since=(\d+)') LISTER_NAME = 'github' instance = 'github' # There is only 1 instance of such lister def get_model_from_repo(self, repo): return { 'uid': repo['id'], 'indexable': repo['id'], 'name': repo['name'], 'full_name': repo['full_name'], 'html_url': repo['html_url'], 'origin_url': repo['html_url'], 'origin_type': 'git', 'fork': repo['fork'], } def transport_quota_check(self, response): reqs_remaining = int(response.headers['X-RateLimit-Remaining']) if response.status_code == 403 and reqs_remaining == 0: reset_at = int(response.headers['X-RateLimit-Reset']) delay = min(reset_at - time.time(), 3600) return True, delay return False, 0 def get_next_target_from_response(self, response): if 'next' in response.links: next_url = response.links['next']['url'] return int(self.API_URL_INDEX_RE.match(next_url).group(1)) def transport_response_simplified(self, response): repos = response.json() return [self.get_model_from_repo(repo) for repo in repos] def request_headers(self): return {'Accept': 'application/vnd.github.v3+json'} diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py index bc3f8c2..555dc0a 100644 --- a/swh/lister/github/tasks.py +++ b/swh/lister/github/tasks.py @@ -1,56 +1,53 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.scheduler.celery_backend.config import app from swh.lister.github.lister import GitHubLister GROUP_SPLIT = 10000 -def new_lister(api_baseurl='https://api.github.com', **kw): - return GitHubLister(api_baseurl=api_baseurl, **kw) - - @app.task(name=__name__ + '.IncrementalGitHubLister') -def incremental_github_lister(**lister_args): - lister = new_lister(**lister_args) +def list_github_incremental(**lister_args): + 'Incremental update of GitHub' + lister = GitHubLister(**lister_args) lister.run(min_bound=lister.db_last_index(), max_bound=None) @app.task(name=__name__ + '.RangeGitHubLister') -def range_github_lister(start, end, **lister_args): - lister = new_lister(**lister_args) +def _range_github_lister(start, end, **lister_args): + lister = GitHubLister(**lister_args) lister.run(min_bound=start, max_bound=end) @app.task(name=__name__ + '.FullGitHubRelister', bind=True) -def full_github_relister(self, split=None, **lister_args): - """Relist from the beginning of what's already been listed. +def list_github_full(self, split=None, **lister_args): + """Full update of GitHub It's not to be called for an initial listing. """ - lister = new_lister(**lister_args) + lister = GitHubLister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) if not ranges: self.log.info('Nothing to list') return random.shuffle(ranges) - promise = group(range_github_lister.s(minv, maxv, **lister_args) + promise = group(_range_github_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) try: promise.save() # so that we can restore the GroupResult in tests except (NotImplementedError, AttributeError): self.log.info('Unable to call save_group with current result backend.') return promise.id @app.task(name=__name__ + '.ping') -def ping(): +def _ping(): return 'OK' diff --git a/swh/lister/github/tests/test_tasks.py b/swh/lister/github/tests/test_tasks.py index 9bd30c1..c652404 100644 --- a/swh/lister/github/tests/test_tasks.py +++ b/swh/lister/github/tests/test_tasks.py @@ -1,90 +1,90 @@ from time import sleep from celery.result import GroupResult from unittest.mock import patch def test_ping(swh_app, celery_session_worker): res = swh_app.send_task( 'swh.lister.github.tasks.ping') assert res res.wait() assert res.successful() assert res.result == 'OK' @patch('swh.lister.github.tasks.GitHubLister') def test_incremental(lister, swh_app, celery_session_worker): # setup the mocked GitHubLister lister.return_value = lister lister.db_last_index.return_value = 42 lister.run.return_value = None res = swh_app.send_task( 'swh.lister.github.tasks.IncrementalGitHubLister') assert res res.wait() assert res.successful() - lister.assert_called_once_with(api_baseurl='https://api.github.com') + lister.assert_called_once_with() lister.db_last_index.assert_called_once_with() lister.run.assert_called_once_with(min_bound=42, max_bound=None) @patch('swh.lister.github.tasks.GitHubLister') def test_range(lister, swh_app, celery_session_worker): # setup the mocked GitHubLister lister.return_value = lister lister.run.return_value = None res = swh_app.send_task( 'swh.lister.github.tasks.RangeGitHubLister', kwargs=dict(start=12, end=42)) assert res res.wait() assert res.successful() - lister.assert_called_once_with(api_baseurl='https://api.github.com') + lister.assert_called_once_with() lister.db_last_index.assert_not_called() lister.run.assert_called_once_with(min_bound=12, max_bound=42) @patch('swh.lister.github.tasks.GitHubLister') def test_relister(lister, swh_app, celery_session_worker): # setup the mocked GitHubLister lister.return_value = lister lister.run.return_value = None lister.db_partition_indices.return_value = [ (i, i+9) for i in range(0, 50, 10)] res = swh_app.send_task( 'swh.lister.github.tasks.FullGitHubRelister') assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) - lister.assert_called_with(api_baseurl='https://api.github.com') + lister.assert_called_with() # one by the FullGitHubRelister task # + 5 for the RangeGitHubLister subtasks assert lister.call_count == 6 lister.db_last_index.assert_not_called() lister.db_partition_indices.assert_called_once_with(10000) # lister.run should have been called once per partition interval for i in range(5): # XXX inconsistent behavior: max_bound is INCLUDED here assert (dict(min_bound=10*i, max_bound=10*i + 9),) \ in lister.run.call_args_list diff --git a/swh/lister/gitlab/__init__.py b/swh/lister/gitlab/__init__.py index e69de29..ca2b89b 100644 --- a/swh/lister/gitlab/__init__.py +++ b/swh/lister/gitlab/__init__.py @@ -0,0 +1,13 @@ +# Copyright (C) 2019 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +def register(): + from .models import GitLabModel + from .lister import GitLabLister + + return {'models': [GitLabModel], + 'lister': GitLabLister, + 'task_modules': ['%s.tasks' % __name__], + } diff --git a/swh/lister/gitlab/lister.py b/swh/lister/gitlab/lister.py index f8e7ead..60b5320 100644 --- a/swh/lister/gitlab/lister.py +++ b/swh/lister/gitlab/lister.py @@ -1,83 +1,81 @@ # Copyright (C) 2018-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time from urllib3.util import parse_url from ..core.page_by_page_lister import PageByPageHttpLister from .models import GitLabModel class GitLabLister(PageByPageHttpLister): # Template path expecting an integer that represents the page id PATH_TEMPLATE = '/projects?page=%d&order_by=id' + DEFAULT_URL = 'https://gitlab.com/api/v4/' MODEL = GitLabModel LISTER_NAME = 'gitlab' - def __init__(self, api_baseurl, instance=None, + def __init__(self, url=None, instance=None, override_config=None, sort='asc', per_page=20): - super().__init__(api_baseurl=api_baseurl, - override_config=override_config) + super().__init__(url=url, override_config=override_config) if instance is None: - instance = parse_url(api_baseurl).host + instance = parse_url(self.url).host self.instance = instance - self.PATH_TEMPLATE = '%s&sort=%s' % (self.PATH_TEMPLATE, sort) - if per_page != 20: - self.PATH_TEMPLATE = '%s&per_page=%s' % ( - self.PATH_TEMPLATE, per_page) + self.PATH_TEMPLATE = '%s&sort=%s&per_page=%s' % ( + self.PATH_TEMPLATE, sort, per_page) def uid(self, repo): return '%s/%s' % (self.instance, repo['path_with_namespace']) def get_model_from_repo(self, repo): return { 'instance': self.instance, 'uid': self.uid(repo), 'name': repo['name'], 'full_name': repo['path_with_namespace'], 'html_url': repo['web_url'], 'origin_url': repo['http_url_to_repo'], 'origin_type': 'git', } def transport_quota_check(self, response): """Deal with rate limit if any. """ # not all gitlab instance have rate limit if 'RateLimit-Remaining' in response.headers: reqs_remaining = int(response.headers['RateLimit-Remaining']) if response.status_code == 403 and reqs_remaining == 0: reset_at = int(response.headers['RateLimit-Reset']) delay = min(reset_at - time.time(), 3600) return True, delay return False, 0 def _get_int(self, headers, key): _val = headers.get(key) if _val: return int(_val) def get_next_target_from_response(self, response): """Determine the next page identifier. """ return self._get_int(response.headers, 'x-next-page') def get_pages_information(self): """Determine pages information. """ response = self.transport_head(identifier=1) if not response.ok: raise ValueError( 'Problem during information fetch: %s' % response.status_code) h = response.headers return (self._get_int(h, 'x-total'), self._get_int(h, 'x-total-pages'), self._get_int(h, 'x-per-page')) def transport_response_simplified(self, response): repos = response.json() return [self.get_model_from_repo(repo) for repo in repos] diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py index eff3114..30cab41 100644 --- a/swh/lister/gitlab/tasks.py +++ b/swh/lister/gitlab/tasks.py @@ -1,62 +1,52 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.scheduler.celery_backend.config import app from .. import utils from .lister import GitLabLister NBPAGES = 10 -def new_lister(api_baseurl='https://gitlab.com/api/v4', - instance=None, sort='asc', per_page=20): - return GitLabLister( - api_baseurl=api_baseurl, instance=instance, sort=sort, - per_page=per_page) - - @app.task(name=__name__ + '.IncrementalGitLabLister') -def incremental_gitlab_lister(**lister_args): +def list_gitlab_incremental(**lister_args): + """Incremental update of a GitLab instance""" lister_args['sort'] = 'desc' - lister = new_lister(**lister_args) + lister = GitLabLister(**lister_args) total_pages = lister.get_pages_information()[1] # stopping as soon as existing origins for that instance are detected lister.run(min_bound=1, max_bound=total_pages, check_existence=True) @app.task(name=__name__ + '.RangeGitLabLister') -def range_gitlab_lister(start, end, **lister_args): - lister = new_lister(**lister_args) +def _range_gitlab_lister(start, end, **lister_args): + lister = GitLabLister(**lister_args) lister.run(min_bound=start, max_bound=end) @app.task(name=__name__ + '.FullGitLabRelister', bind=True) -def full_gitlab_relister(self, **lister_args): - """Full lister - - This should be renamed as such. - - """ - lister = new_lister(**lister_args) +def list_gitlab_full(self, **lister_args): + """Full update of a GitLab instance""" + lister = GitLabLister(**lister_args) _, total_pages, _ = lister.get_pages_information() ranges = list(utils.split_range(total_pages, NBPAGES)) random.shuffle(ranges) - promise = group(range_gitlab_lister.s(minv, maxv, **lister_args) + promise = group(_range_gitlab_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) try: promise.save() except (NotImplementedError, AttributeError): self.log.info('Unable to call save_group with current result backend.') return promise.id @app.task(name=__name__ + '.ping') -def ping(): +def _ping(): return 'OK' diff --git a/swh/lister/gitlab/tests/test_tasks.py b/swh/lister/gitlab/tests/test_tasks.py index f8d0a81..56332a1 100644 --- a/swh/lister/gitlab/tests/test_tasks.py +++ b/swh/lister/gitlab/tests/test_tasks.py @@ -1,150 +1,142 @@ from time import sleep from celery.result import GroupResult from unittest.mock import patch def test_ping(swh_app, celery_session_worker): res = swh_app.send_task( 'swh.lister.gitlab.tasks.ping') assert res res.wait() assert res.successful() assert res.result == 'OK' @patch('swh.lister.gitlab.tasks.GitLabLister') def test_incremental(lister, swh_app, celery_session_worker): # setup the mocked GitlabLister lister.return_value = lister lister.run.return_value = None lister.get_pages_information.return_value = (None, 10, None) res = swh_app.send_task( 'swh.lister.gitlab.tasks.IncrementalGitLabLister') assert res res.wait() assert res.successful() - lister.assert_called_once_with( - api_baseurl='https://gitlab.com/api/v4', - instance=None, sort='desc', per_page=20) + lister.assert_called_once_with(sort='desc') lister.db_last_index.assert_not_called() lister.get_pages_information.assert_called_once_with() lister.run.assert_called_once_with( min_bound=1, max_bound=10, check_existence=True) @patch('swh.lister.gitlab.tasks.GitLabLister') def test_range(lister, swh_app, celery_session_worker): # setup the mocked GitlabLister lister.return_value = lister lister.run.return_value = None res = swh_app.send_task( 'swh.lister.gitlab.tasks.RangeGitLabLister', kwargs=dict(start=12, end=42)) assert res res.wait() assert res.successful() - lister.assert_called_once_with( - api_baseurl='https://gitlab.com/api/v4', - instance=None, sort='asc', per_page=20) + lister.assert_called_once_with() lister.db_last_index.assert_not_called() lister.run.assert_called_once_with(min_bound=12, max_bound=42) @patch('swh.lister.gitlab.tasks.GitLabLister') def test_relister(lister, swh_app, celery_session_worker): # setup the mocked GitlabLister lister.return_value = lister lister.run.return_value = None lister.get_pages_information.return_value = (None, 85, None) lister.db_partition_indices.return_value = [ (i, i+9) for i in range(0, 80, 10)] + [(80, 85)] res = swh_app.send_task( 'swh.lister.gitlab.tasks.FullGitLabRelister') assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) - lister.assert_called_with( - api_baseurl='https://gitlab.com/api/v4', - instance=None, sort='asc', per_page=20) + lister.assert_called_with() # one by the FullGitlabRelister task # + 9 for the RangeGitlabLister subtasks assert lister.call_count == 10 lister.db_last_index.assert_not_called() lister.db_partition_indices.assert_not_called() lister.get_pages_information.assert_called_once_with() # lister.run should have been called once per partition interval for i in range(8): # XXX inconsistent behavior: max_bound is EXCLUDED here assert (dict(min_bound=10*i, max_bound=10*i + 10),) \ in lister.run.call_args_list assert (dict(min_bound=80, max_bound=85),) \ in lister.run.call_args_list @patch('swh.lister.gitlab.tasks.GitLabLister') def test_relister_instance(lister, swh_app, celery_session_worker): # setup the mocked GitlabLister lister.return_value = lister lister.run.return_value = None lister.get_pages_information.return_value = (None, 85, None) lister.db_partition_indices.return_value = [ (i, i+9) for i in range(0, 80, 10)] + [(80, 85)] res = swh_app.send_task( 'swh.lister.gitlab.tasks.FullGitLabRelister', - kwargs=dict(api_baseurl='https://0xacab.org/api/v4')) + kwargs=dict(url='https://0xacab.org/api/v4')) assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) - lister.assert_called_with( - api_baseurl='https://0xacab.org/api/v4', - instance=None, sort='asc', per_page=20) + lister.assert_called_with(url='https://0xacab.org/api/v4') # one by the FullGitlabRelister task # + 9 for the RangeGitlabLister subtasks assert lister.call_count == 10 lister.db_last_index.assert_not_called() lister.db_partition_indices.assert_not_called() lister.get_pages_information.assert_called_once_with() # lister.run should have been called once per partition interval for i in range(8): # XXX inconsistent behavior: max_bound is EXCLUDED here assert (dict(min_bound=10*i, max_bound=10*i + 10),) \ in lister.run.call_args_list assert (dict(min_bound=80, max_bound=85),) \ in lister.run.call_args_list diff --git a/swh/lister/gnu/__init__.py b/swh/lister/gnu/__init__.py index e69de29..7787464 100644 --- a/swh/lister/gnu/__init__.py +++ b/swh/lister/gnu/__init__.py @@ -0,0 +1,13 @@ +# Copyright (C) 2019 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +def register(): + from .models import GNUModel + from .lister import GNULister + + return {'models': [GNUModel], + 'lister': GNULister, + 'task_modules': ['%s.tasks' % __name__], + } diff --git a/swh/lister/gnu/tasks.py b/swh/lister/gnu/tasks.py index 251eccf..7191453 100644 --- a/swh/lister/gnu/tasks.py +++ b/swh/lister/gnu/tasks.py @@ -1,17 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.celery_backend.config import app from .lister import GNULister @app.task(name=__name__ + '.GNUListerTask') -def gnu_lister(**lister_args): +def list_gnu_full(**lister_args): + 'List lister for the GNU source code archive' GNULister(**lister_args).run() @app.task(name=__name__ + '.ping') -def ping(): +def _ping(): return 'OK' diff --git a/swh/lister/npm/__init__.py b/swh/lister/npm/__init__.py index e69de29..77c3d38 100644 --- a/swh/lister/npm/__init__.py +++ b/swh/lister/npm/__init__.py @@ -0,0 +1,20 @@ +# Copyright (C) 2019 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +def register(): + from .models import NpmVisitModel, NpmModel + from .lister import NpmLister + + return {'models': [NpmVisitModel, NpmModel], + 'lister': NpmLister, + 'task_modules': ['%s.tasks' % __name__], + 'task_types': { + 'list-npm-full': { + 'default_interval': '7 days', + 'min_interval': '7 days', + 'max_interval': '7 days', + }, + }, + } diff --git a/swh/lister/npm/lister.py b/swh/lister/npm/lister.py index c7e9d29..0672f7c 100644 --- a/swh/lister/npm/lister.py +++ b/swh/lister/npm/lister.py @@ -1,157 +1,156 @@ # Copyright (C) 2018-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from urllib.parse import quote from swh.lister.core.indexing_lister import IndexingHttpLister from swh.lister.npm.models import NpmModel from swh.scheduler.utils import create_task_dict class NpmListerBase(IndexingHttpLister): """List packages available in the npm registry in a paginated way """ MODEL = NpmModel LISTER_NAME = 'npm' instance = 'npm' - def __init__(self, api_baseurl='https://replicate.npmjs.com', + def __init__(self, url='https://replicate.npmjs.com', per_page=1000, override_config=None): - super().__init__(api_baseurl=api_baseurl, - override_config=override_config) + super().__init__(url=url, override_config=override_config) self.per_page = per_page + 1 self.PATH_TEMPLATE += '&limit=%s' % self.per_page @property def ADDITIONAL_CONFIG(self): """(Override) Add extra configuration """ default_config = super().ADDITIONAL_CONFIG default_config['loading_task_policy'] = ('str', 'recurring') return default_config def get_model_from_repo(self, repo_name): """(Override) Transform from npm package name to model """ package_url, package_metadata_url = self._compute_urls(repo_name) return { 'uid': repo_name, 'indexable': repo_name, 'name': repo_name, 'full_name': repo_name, 'html_url': package_metadata_url, 'origin_url': package_url, 'origin_type': 'npm', } def task_dict(self, origin_type, origin_url, **kwargs): """(Override) Return task dict for loading a npm package into the archive This is overridden from the lister_base as more information is needed for the ingestion task creation. """ task_type = 'load-%s' % origin_type task_policy = self.config['loading_task_policy'] package_name = kwargs.get('name') package_metadata_url = kwargs.get('html_url') return create_task_dict(task_type, task_policy, package_name, origin_url, package_metadata_url=package_metadata_url) def request_headers(self): """(Override) Set requests headers to send when querying the npm registry """ return {'User-Agent': 'Software Heritage npm lister', 'Accept': 'application/json'} def _compute_urls(self, repo_name): """Return a tuple (package_url, package_metadata_url) """ return ( 'https://www.npmjs.com/package/%s' % repo_name, # package metadata url needs to be escaped otherwise some requests # may fail (for instance when a package name contains '/') - '%s/%s' % (self.api_baseurl, quote(repo_name, safe='')) + '%s/%s' % (self.url, quote(repo_name, safe='')) ) def string_pattern_check(self, inner, lower, upper=None): """ (Override) Inhibit the effect of that method as packages indices correspond to package names and thus do not respect any kind of fixed length string pattern """ pass class NpmLister(NpmListerBase): """List all packages available in the npm registry in a paginated way """ PATH_TEMPLATE = '/_all_docs?startkey="%s"' def get_next_target_from_response(self, response): """(Override) Get next npm package name to continue the listing """ repos = response.json()['rows'] return repos[-1]['id'] if len(repos) == self.per_page else None def transport_response_simplified(self, response): """(Override) Transform npm registry response to list for model manipulation """ repos = response.json()['rows'] if len(repos) == self.per_page: repos = repos[:-1] return [self.get_model_from_repo(repo['id']) for repo in repos] class NpmIncrementalLister(NpmListerBase): """List packages in the npm registry, updated since a specific update_seq value of the underlying CouchDB database, in a paginated way """ PATH_TEMPLATE = '/_changes?since=%s' @property def CONFIG_BASE_FILENAME(self): # noqa: N802 return 'lister_npm_incremental' def get_next_target_from_response(self, response): """(Override) Get next npm package name to continue the listing """ repos = response.json()['results'] return repos[-1]['seq'] if len(repos) == self.per_page else None def transport_response_simplified(self, response): """(Override) Transform npm registry response to list for model manipulation """ repos = response.json()['results'] if len(repos) == self.per_page: repos = repos[:-1] return [self.get_model_from_repo(repo['id']) for repo in repos] def filter_before_inject(self, models_list): """(Override) Filter out documents in the CouchDB database not related to a npm package """ models_filtered = [] for model in models_list: package_name = model['name'] # document related to CouchDB internals if package_name.startswith('_design/'): continue models_filtered.append(model) return models_filtered def disable_deleted_repo_tasks(self, start, end, keep_these): """(Override) Disable the processing performed by that method as it is not relevant in this incremental lister context and it raises and exception due to a different index type (int instead of str) """ pass diff --git a/swh/lister/npm/models.py b/swh/lister/npm/models.py index 206ae36..5eb8d0d 100644 --- a/swh/lister/npm/models.py +++ b/swh/lister/npm/models.py @@ -1,38 +1,35 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from sqlalchemy import Column, String, DateTime, Integer, BigInteger, Sequence -from sqlalchemy.ext.declarative import declarative_base -from swh.lister.core.models import IndexingModelBase, ABCSQLMeta - -SQLBase = declarative_base() +from swh.lister.core.models import IndexingModelBase, SQLBase, ABCSQLMeta class NpmVisitModel(SQLBase, metaclass=ABCSQLMeta): """Table to store the npm registry state at the time of a content listing by Software Heritage """ __tablename__ = 'npm_visit' uid = Column(Integer, Sequence('npm_visit_id_seq'), primary_key=True) visit_date = Column(DateTime, nullable=False) doc_count = Column(BigInteger) doc_del_count = Column(BigInteger) update_seq = Column(BigInteger) purge_seq = Column(BigInteger) disk_size = Column(BigInteger) data_size = Column(BigInteger) committed_update_seq = Column(BigInteger) compacted_seq = Column(BigInteger) class NpmModel(IndexingModelBase): """A npm package representation """ __tablename__ = 'npm_repo' uid = Column(String, primary_key=True) indexable = Column(String, index=True) diff --git a/swh/lister/npm/tasks.py b/swh/lister/npm/tasks.py index 26a243b..8d1c369 100644 --- a/swh/lister/npm/tasks.py +++ b/swh/lister/npm/tasks.py @@ -1,60 +1,62 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from contextlib import contextmanager from swh.scheduler.celery_backend.config import app from swh.lister.npm.lister import NpmLister, NpmIncrementalLister from swh.lister.npm.models import NpmVisitModel @contextmanager def save_registry_state(lister): params = {'headers': lister.request_headers()} - registry_state = lister.session.get(lister.api_baseurl, **params) + registry_state = lister.session.get(lister.url, **params) registry_state = registry_state.json() keys = ('doc_count', 'doc_del_count', 'update_seq', 'purge_seq', 'disk_size', 'data_size', 'committed_update_seq', 'compacted_seq') state = {key: registry_state[key] for key in keys} state['visit_date'] = datetime.now() yield npm_visit = NpmVisitModel(**state) lister.db_session.add(npm_visit) lister.db_session.commit() def get_last_update_seq(lister): """Get latest ``update_seq`` value for listing only updated packages. """ query = lister.db_session.query(NpmVisitModel.update_seq) row = query.order_by(NpmVisitModel.uid.desc()).first() if not row: raise ValueError('No npm registry listing previously performed ! ' 'This is required prior to the execution of an ' 'incremental listing.') return row[0] @app.task(name=__name__ + '.NpmListerTask') -def npm_lister(**lister_args): +def list_npm_full(**lister_args): + 'Full lister for the npm (javascript) registry' lister = NpmLister(**lister_args) with save_registry_state(lister): lister.run() @app.task(name=__name__ + '.NpmIncrementalListerTask') -def npm_incremental_lister(**lister_args): +def list_npm_incremental(**lister_args): + 'Incremental lister for the npm (javascript) registry' lister = NpmIncrementalLister(**lister_args) update_seq_start = get_last_update_seq(lister) with save_registry_state(lister): lister.run(min_bound=update_seq_start) @app.task(name=__name__ + '.ping') -def ping(): +def _ping(): return 'OK' diff --git a/swh/lister/packagist/__init__.py b/swh/lister/packagist/__init__.py index e69de29..4060cf2 100644 --- a/swh/lister/packagist/__init__.py +++ b/swh/lister/packagist/__init__.py @@ -0,0 +1,13 @@ +# Copyright (C) 2019 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +def register(): + from .models import PackagistModel + from .lister import PackagistLister + + return {'models': [PackagistModel], + 'lister': PackagistLister, + 'task_modules': ['%s.tasks' % __name__], + } diff --git a/swh/lister/packagist/tasks.py b/swh/lister/packagist/tasks.py index e17e892..6db17dc 100644 --- a/swh/lister/packagist/tasks.py +++ b/swh/lister/packagist/tasks.py @@ -1,17 +1,18 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.celery_backend.config import app from .lister import PackagistLister @app.task(name=__name__ + '.PackagistListerTask') -def packagist_lister(**lister_args): +def list_packagist(**lister_args): + 'List the packagist (php) registry' PackagistLister(**lister_args).run() @app.task(name=__name__ + '.ping') -def ping(): +def _ping(): return 'OK' diff --git a/swh/lister/phabricator/__init__.py b/swh/lister/phabricator/__init__.py index e69de29..aeaee0a 100644 --- a/swh/lister/phabricator/__init__.py +++ b/swh/lister/phabricator/__init__.py @@ -0,0 +1,13 @@ +# Copyright (C) 2019 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +def register(): + from .models import PhabricatorModel + from .lister import PhabricatorLister + + return {'models': [PhabricatorModel], + 'lister': PhabricatorLister, + 'task_modules': ['%s.tasks' % __name__], + } diff --git a/swh/lister/phabricator/lister.py b/swh/lister/phabricator/lister.py index 9b8059e..7f60c14 100644 --- a/swh/lister/phabricator/lister.py +++ b/swh/lister/phabricator/lister.py @@ -1,170 +1,153 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging +import random import urllib.parse from swh.lister.core.indexing_lister import IndexingHttpLister from swh.lister.phabricator.models import PhabricatorModel from collections import defaultdict logger = logging.getLogger(__name__) class PhabricatorLister(IndexingHttpLister): PATH_TEMPLATE = '?order=oldest&attachments[uris]=1&after=%s' + DEFAULT_URL = 'https://forge.softwareheritage.org/api/diffusion.repository.search' # noqa MODEL = PhabricatorModel LISTER_NAME = 'phabricator' - def __init__(self, forge_url, instance=None, api_token=None, - override_config=None): - if forge_url.endswith("/"): - forge_url = forge_url[:-1] - self.forge_url = forge_url - api_baseurl = '%s/api/diffusion.repository.search' % forge_url - self.api_token = api_token + def __init__(self, url=None, instance=None, override_config=None): + super().__init__(url=url, override_config=override_config) if not instance: - instance = urllib.parse.urlparse(forge_url).hostname + instance = urllib.parse.urlparse(self.url).hostname self.instance = instance - super().__init__(api_baseurl=api_baseurl, - override_config=override_config) @property def default_min_bound(self): """Starting boundary when `min_bound` is not defined (db empty). This is used within the fn:`run` call. """ return self._bootstrap_repositories_listing() - def _build_query_params(self, params, api_token): - """Build query params to include the forge's api token - - Returns: - updated params dict with 'params' entry. - - """ - params.update({'params': {'api.token': api_token}}) - return params - def request_params(self, identifier): """Override the default params behavior to retrieve the api token Credentials are stored as: credentials: phabricator: : - username: password: """ - params = {} - params['headers'] = self.request_headers() or {} - if self.api_token: - return self._build_query_params(params, self.api_token) - instance_creds = self.request_instance_credentials() - if not instance_creds: + creds = self.request_instance_credentials() + if not creds: raise ValueError( 'Phabricator forge needs authentication credential to list.') - api_token = instance_creds[0]['password'] - return self._build_query_params(params, api_token) + api_token = random.choice(creds)['password'] + + return {'headers': self.request_headers() or {}, + 'params': {'api.token': api_token}} def request_headers(self): """ (Override) Set requests headers to send when querying the Phabricator API """ return {'User-Agent': 'Software Heritage phabricator lister', 'Accept': 'application/json'} def get_model_from_repo(self, repo): url = get_repo_url(repo['attachments']['uris']['uris']) if url is None: return None return { - 'uid': self.forge_url + str(repo['id']), + 'uid': url, 'indexable': repo['id'], 'name': repo['fields']['shortName'], 'full_name': repo['fields']['name'], 'html_url': url, 'origin_url': url, 'origin_type': repo['fields']['vcs'], 'instance': self.instance, } def get_next_target_from_response(self, response): body = response.json()['result']['cursor'] if body['after'] != 'null': return body['after'] return None def transport_response_simplified(self, response): repos = response.json() if repos['result'] is None: raise ValueError( 'Problem during information fetch: %s' % repos['error_code']) repos = repos['result']['data'] return [self.get_model_from_repo(repo) for repo in repos] def filter_before_inject(self, models_list): """ (Overrides) IndexingLister.filter_before_inject Bounds query results by this Lister's set max_index. """ models_list = [m for m in models_list if m is not None] return super().filter_before_inject(models_list) def _bootstrap_repositories_listing(self): """ Method called when no min_bound value has been provided to the lister. Its purpose is to: 1. get the first repository data hosted on the Phabricator instance 2. inject them into the lister database 3. return the first repository index to start the listing after that value Returns: int: The first repository index """ params = '&order=oldest&limit=1' response = self.safely_issue_request(params) models_list = self.transport_response_simplified(response) self.max_index = models_list[0]['indexable'] models_list = self.filter_before_inject(models_list) injected = self.inject_repo_data_into_db(models_list) self.schedule_missing_tasks(models_list, injected) return self.max_index def get_repo_url(attachments): """ Return url for a hosted repository from its uris attachments according to the following priority lists: * protocol: https > http * identifier: shortname > callsign > id """ processed_urls = defaultdict(dict) for uri in attachments: protocol = uri['fields']['builtin']['protocol'] url = uri['fields']['uri']['effective'] identifier = uri['fields']['builtin']['identifier'] if protocol in ('http', 'https'): processed_urls[protocol][identifier] = url elif protocol is None: for protocol in ('https', 'http'): if url.startswith(protocol): processed_urls[protocol]['undefined'] = url break for protocol in ['https', 'http']: for identifier in ['shortname', 'callsign', 'id', 'undefined']: if (protocol in processed_urls and identifier in processed_urls[protocol]): return processed_urls[protocol][identifier] return None diff --git a/swh/lister/phabricator/tasks.py b/swh/lister/phabricator/tasks.py index 5ec794d..0b8b77d 100644 --- a/swh/lister/phabricator/tasks.py +++ b/swh/lister/phabricator/tasks.py @@ -1,23 +1,17 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.celery_backend.config import app from swh.lister.phabricator.lister import PhabricatorLister -def new_lister(forge_url='https://forge.softwareheritage.org', instance='swh', - api_token=None, **kw): - return PhabricatorLister( - forge_url=forge_url, instance=instance, api_token=api_token, **kw) - - @app.task(name=__name__ + '.FullPhabricatorLister') -def full_phabricator_lister(**lister_args): - lister = new_lister(**lister_args) - lister.run() +def list_phabricator_full(**lister_args): + 'Full update of a Phabricator instance' + PhabricatorLister(**lister_args).run() @app.task(name=__name__ + '.ping') -def ping(): +def _ping(): return 'OK' diff --git a/swh/lister/phabricator/tests/test_lister.py b/swh/lister/phabricator/tests/test_lister.py index e4d6cce..f52e560 100644 --- a/swh/lister/phabricator/tests/test_lister.py +++ b/swh/lister/phabricator/tests/test_lister.py @@ -1,56 +1,60 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import json import unittest from swh.lister.core.tests.test_lister import HttpListerTester from swh.lister.phabricator.lister import PhabricatorLister from swh.lister.phabricator.lister import get_repo_url class PhabricatorListerTester(HttpListerTester, unittest.TestCase): Lister = PhabricatorLister test_re = re.compile(r'\&after=([^?&]+)') lister_subdir = 'phabricator' good_api_response_file = 'api_response.json' good_api_response_undefined_protocol = 'api_response_undefined_'\ 'protocol.json' bad_api_response_file = 'api_empty_response.json' first_index = 1 last_index = 12 entries_per_page = 10 convert_type = int def get_fl(self, override_config=None): """(Override) Retrieve an instance of fake lister (fl). """ if override_config or self.fl is None: - self.fl = self.Lister(forge_url='https://fakeurl', instance='fake', - api_token='a-1', + credentials = {'phabricator': {'fake': [ + {'password': 'toto'} + ]}} + override_config = dict(credentials=credentials, + **(override_config or {})) + self.fl = self.Lister(url='https://fakeurl', instance='fake', override_config=override_config) self.fl.INITIAL_BACKOFF = 1 self.fl.reset_backoff() return self.fl def test_get_repo_url(self): f = open('swh/lister/%s/tests/%s' % (self.lister_subdir, self.good_api_response_file)) api_response = json.load(f) repos = api_response['result']['data'] for repo in repos: self.assertEqual( 'https://forge.softwareheritage.org/source/%s.git' % (repo['fields']['shortName']), get_repo_url(repo['attachments']['uris']['uris'])) f = open('swh/lister/%s/tests/%s' % (self.lister_subdir, self.good_api_response_undefined_protocol)) repo = json.load(f) self.assertEqual( 'https://svn.blender.org/svnroot/bf-blender/', get_repo_url(repo['attachments']['uris']['uris'])) diff --git a/swh/lister/pypi/__init__.py b/swh/lister/pypi/__init__.py index e69de29..0f845c3 100644 --- a/swh/lister/pypi/__init__.py +++ b/swh/lister/pypi/__init__.py @@ -0,0 +1,13 @@ +# Copyright (C) 2019 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +def register(): + from .models import PyPIModel + from .lister import PyPILister + + return {'models': [PyPIModel], + 'lister': PyPILister, + 'task_modules': ['%s.tasks' % __name__], + } diff --git a/swh/lister/pypi/tasks.py b/swh/lister/pypi/tasks.py index bf210ab..d6206a4 100644 --- a/swh/lister/pypi/tasks.py +++ b/swh/lister/pypi/tasks.py @@ -1,17 +1,18 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.celery_backend.config import app from .lister import PyPILister @app.task(name=__name__ + '.PyPIListerTask') -def pypi_lister(**lister_args): +def list_pypi(**lister_args): + 'Full update of the PyPI (python) registry' PyPILister(**lister_args).run() @app.task(name=__name__ + '.ping') -def ping(): +def _ping(): return 'OK' diff --git a/swh/lister/tests/test_cli.py b/swh/lister/tests/test_cli.py index 57ea7a3..6039ea4 100644 --- a/swh/lister/tests/test_cli.py +++ b/swh/lister/tests/test_cli.py @@ -1,95 +1,141 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import glob import pytest +import traceback +from datetime import timedelta + +import yaml + +from swh.core.utils import numfile_sortkey as sortkey +from swh.scheduler import get_scheduler +from swh.scheduler.tests.conftest import DUMP_FILES from swh.lister.core.lister_base import ListerBase -from swh.lister.cli import get_lister, SUPPORTED_LISTERS, DEFAULT_BASEURLS +from swh.lister.cli import lister as cli, get_lister, SUPPORTED_LISTERS from .test_utils import init_db +from click.testing import CliRunner + + +@pytest.fixture +def swh_scheduler_config(request, postgresql_proc, postgresql): + scheduler_config = { + 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( + host=postgresql_proc.host, + port=postgresql_proc.port, + user='postgres', + dbname='tests') + } + + all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) + + cursor = postgresql.cursor() + for fname in all_dump_files: + with open(fname) as fobj: + cursor.execute(fobj.read()) + postgresql.commit() + + return scheduler_config def test_get_lister_wrong_input(): """Unsupported lister should raise""" with pytest.raises(ValueError) as e: get_lister('unknown', 'db-url') assert "Invalid lister" in str(e.value) def test_get_lister(): """Instantiating a supported lister should be ok """ db_url = init_db().url() - supported_listers_with_init = {'npm', 'debian'} - supported_listers = set(SUPPORTED_LISTERS) - supported_listers_with_init - for lister_name in supported_listers: - lst, drop_fn, init_fn, insert_data_fn = get_lister(lister_name, db_url) - + for lister_name in SUPPORTED_LISTERS: + lst = get_lister(lister_name, db_url) assert isinstance(lst, ListerBase) - assert drop_fn is None - assert init_fn is not None - assert insert_data_fn is None - - for lister_name in supported_listers_with_init: - lst, drop_fn, init_fn, insert_data_fn = get_lister(lister_name, db_url) - - assert isinstance(lst, ListerBase) - assert drop_fn is None - assert init_fn is not None - assert insert_data_fn is not None - - for lister_name in supported_listers_with_init: - lst, drop_fn, init_fn, insert_data_fn = get_lister(lister_name, db_url, - drop_tables=True) - - assert isinstance(lst, ListerBase) - assert drop_fn is not None - assert init_fn is not None - assert insert_data_fn is not None def test_get_lister_override(): """Overriding the lister configuration should populate its config """ db_url = init_db().url() listers = { - 'gitlab': ('api_baseurl', 'https://gitlab.uni/api/v4/'), - 'phabricator': ('forge_url', 'https://somewhere.org'), - 'cgit': ('url_prefix', 'https://some-cgit.eu/'), + 'gitlab': 'https://other.gitlab.uni/api/v4/', + 'phabricator': 'https://somewhere.org/api/diffusion.repository.search', + 'cgit': 'https://some.where/cgit', } # check the override ends up defined in the lister - for lister_name, (url_key, url_value) in listers.items(): - lst, drop_fn, init_fn, insert_data_fn = get_lister( + for lister_name, url in listers.items(): + lst = get_lister( lister_name, db_url, **{ - 'api_baseurl': url_value, + 'url': url, 'priority': 'high', 'policy': 'oneshot', }) - assert getattr(lst, url_key) == url_value + assert lst.url == url assert lst.config['priority'] == 'high' assert lst.config['policy'] == 'oneshot' # check the default urls are used and not the override (since it's not # passed) - for lister_name, (url_key, url_value) in listers.items(): - lst, drop_fn, init_fn, insert_data_fn = get_lister(lister_name, db_url) + for lister_name, url in listers.items(): + lst = get_lister(lister_name, db_url) # no override so this does not end up in lister's configuration - assert url_key not in lst.config - - # then the default base url is used - default_url = DEFAULT_BASEURLS[lister_name] - if isinstance(default_url, tuple): # cgit implementation detail... - default_url = default_url[1] - - assert getattr(lst, url_key) == default_url + assert 'url' not in lst.config assert 'priority' not in lst.config assert 'oneshot' not in lst.config + assert lst.url == lst.DEFAULT_URL + + +def test_task_types(swh_scheduler_config, tmp_path): + db_url = init_db().url() + + configfile = tmp_path / 'config.yml' + configfile.write_text(yaml.dump({'scheduler': { + 'cls': 'local', + 'args': swh_scheduler_config}})) + runner = CliRunner() + result = runner.invoke(cli, [ + '--db-url', db_url, + '--config-file', configfile.as_posix(), + 'register-task-types']) + + assert result.exit_code == 0, traceback.print_exception(*result.exc_info) + + scheduler = get_scheduler(cls='local', args=swh_scheduler_config) + all_tasks = [ + 'list-bitbucket-full', 'list-bitbucket-incremental', + 'list-cran', + 'list-cgit', + 'list-debian-distribution', + 'list-gitlab-full', 'list-gitlab-incremental', + 'list-github-full', 'list-github-incremental', + 'list-gnu-full', + 'list-npm-full', 'list-npm-incremental', + 'list-phabricator-full', + 'list-packagist', + 'list-pypi', + ] + for task in all_tasks: + task_type_desc = scheduler.get_task_type(task) + assert task_type_desc + assert task_type_desc['type'] == task + assert task_type_desc['backoff_factor'] == 1 + + if task == 'list-npm-full': + delay = timedelta(days=7) # overloaded in the plugin registry + elif task.endswith('-full'): + delay = timedelta(days=90) # default value for 'full' lister tasks + else: + delay = timedelta(days=1) # default value for other lister tasks + assert task_type_desc['default_interval'] == delay, task diff --git a/version.txt b/version.txt index f20dce7..daabb1f 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.33-0-g09f3605 \ No newline at end of file +v0.0.34-0-g481b30c \ No newline at end of file