diff --git a/requirements-swh.txt b/requirements-swh.txt index 800086b..65ce072 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,3 @@ swh.core >= 0.0.75 swh.model >= 0.0.18 swh.storage >= 0.0.153 -swh.deposit diff --git a/swh/loader/package/__init__.py b/swh/loader/package/__init__.py index 5427a77..f77990c 100644 --- a/swh/loader/package/__init__.py +++ b/swh/loader/package/__init__.py @@ -1,27 +1,27 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Mapping +from typing import Any, Dict, Mapping try: from swh.loader.core._version import __version__ # type: ignore except ImportError: __version__ = 'devel' -DEFAULT_PARAMS = { +DEFAULT_PARAMS: Dict[str, Any] = { 'headers': { 'User-Agent': 'Software Heritage Loader (%s)' % ( __version__ ) } } def register() -> Mapping[str, Any]: return { 'task_modules': ['%s.tasks' % __name__], } diff --git a/swh/loader/package/deposit.py b/swh/loader/package/deposit.py index 1fe1752..b4ebc32 100644 --- a/swh/loader/package/deposit.py +++ b/swh/loader/package/deposit.py @@ -1,154 +1,226 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging +import requests -from typing import Any, Dict, Generator, Mapping, Sequence, Tuple +from typing import ( + Any, Dict, Generator, List, Mapping, Optional, Sequence, Tuple, Union +) from swh.model.hashutil import hash_to_hex, hash_to_bytes from swh.loader.package.loader import PackageLoader -from swh.deposit.client import PrivateApiDepositClient as ApiClient +from swh.loader.package.utils import download logger = logging.getLogger(__name__) class DepositLoader(PackageLoader): """Load pypi origin's artifact releases into swh archive. """ visit_type = 'deposit' def __init__(self, url: str, deposit_id: str): """Constructor Args: url: Origin url to associate the artifacts/metadata to deposit_id: Deposit identity """ super().__init__(url=url) - # For now build back existing api urls - # archive_url: Private api url to retrieve archive artifact - self.archive_url = '/%s/raw/' % deposit_id - # metadata_url: Private api url to retrieve the deposit metadata - self.metadata_url = '/%s/meta/' % deposit_id - # deposit_update_url: Private api to push pids and status update on the - # deposit id - self.deposit_update_url = '/%s/update/' % deposit_id - self.client = ApiClient() + config_deposit = self.config['deposit'] + self.deposit_id = deposit_id + self.client = ApiClient(url=config_deposit['url'], + auth=config_deposit['auth']) self._metadata = None @property def metadata(self): if self._metadata is None: - self._metadata = self.client.metadata_get(self.metadata_url) + self._metadata = self.client.metadata_get(self.deposit_id) return self._metadata def get_versions(self) -> Sequence[str]: # only 1 branch 'HEAD' with no alias since we only have 1 snapshot # branch return ['HEAD'] def get_package_info(self, version: str) -> Generator[ Tuple[str, Mapping[str, Any]], None, None]: p_info = { - 'url': self.client.base_url + self.archive_url, 'filename': 'archive.zip', 'raw': self.metadata, } yield 'HEAD', p_info + def download_package(self, p_info: Mapping[str, Any], + tmpdir: str) -> List[Tuple[str, Mapping]]: + """Override to allow use of the dedicated deposit client + + """ + return [self.client.archive_get( + self.deposit_id, tmpdir, p_info['filename'])] + def build_revision( self, a_metadata: Dict, uncompressed_path: str) -> Dict: revision = a_metadata.pop('revision') metadata = { 'extrinsic': { - 'provider': '%s/%s' % ( - self.client.base_url, self.metadata_url), + 'provider': self.client.metadata_url(self.deposit_id), 'when': self.visit_date.isoformat(), 'raw': a_metadata, }, } # FIXME: the deposit no longer needs to build the revision revision['metadata'].update(metadata) revision['author'] = parse_author(revision['author']) revision['committer'] = parse_author(revision['committer']) revision['message'] = revision['message'].encode('utf-8') revision['type'] = 'tar' return revision def load(self) -> Dict: # Usual loading r = super().load() success = r['status'] != 'failed' if success: # Update archive with metadata information origin_metadata = self.metadata['origin_metadata'] logger.debug('origin_metadata: %s', origin_metadata) tools = self.storage.tool_add([origin_metadata['tool']]) logger.debug('tools: %s', tools) tool_id = tools[0]['id'] provider = origin_metadata['provider'] # FIXME: Shall we delete this info? provider_id = self.storage.metadata_provider_add( provider['provider_name'], provider['provider_type'], provider['provider_url'], metadata=None) metadata = origin_metadata['metadata'] self.storage.origin_metadata_add( self.url, self.visit_date, provider_id, tool_id, metadata) # Update deposit status try: if not success: - self.client.status_update( - self.deposit_update_url, status='failed') + self.client.status_update(self.deposit_id, status='failed') return r snapshot_id = hash_to_bytes(r['snapshot_id']) branches = self.storage.snapshot_get(snapshot_id)['branches'] logger.debug('branches: %s', branches) if not branches: return r rev_id = branches[b'HEAD']['target'] revision = next(self.storage.revision_get([rev_id])) # Retrieve the revision identifier dir_id = revision['directory'] # update the deposit's status to success with its # revision-id and directory-id self.client.status_update( - self.deposit_update_url, + self.deposit_id, status='done', revision_id=hash_to_hex(rev_id), directory_id=hash_to_hex(dir_id), origin_url=self.url) except Exception: logger.exception( 'Problem when trying to update the deposit\'s status') return {'status': 'failed'} return r def parse_author(author): """See prior fixme """ return { 'fullname': author['fullname'].encode('utf-8'), 'name': author['name'].encode('utf-8'), 'email': author['email'].encode('utf-8'), } + + +class ApiClient: + """Private Deposit Api client + + """ + def __init__(self, url, auth: Optional[Mapping[str, str]]): + self.base_url = url.rstrip('/') + self.auth = None if not auth else (auth['username'], auth['password']) + + def do(self, method: str, url: str, *args, **kwargs): + """Internal method to deal with requests, possibly with basic http + authentication. + + Args: + method (str): supported http methods as in get/post/put + + Returns: + The request's execution output + + """ + method_fn = getattr(requests, method) + if self.auth: + kwargs['auth'] = self.auth + return method_fn(url, *args, **kwargs) + + def archive_get( + self, deposit_id: Union[int, str], tmpdir: str, + filename: str) -> Tuple[str, Dict]: + """Retrieve deposit's archive artifact locally + + """ + url = f'{self.base_url}/{deposit_id}/raw/' + return download(url, dest=tmpdir, filename=filename, auth=self.auth) + + def metadata_url(self, deposit_id: Union[int, str]) -> str: + return f'{self.base_url}/{deposit_id}/meta/' + + def metadata_get(self, deposit_id: Union[int, str]) -> Dict[str, Any]: + """Retrieve deposit's metadata artifact as json + + """ + url = self.metadata_url(deposit_id) + r = self.do('get', url) + if r.ok: + return r.json() + + msg = f'Problem when retrieving deposit metadata at {url}' + logger.error(msg) + raise ValueError(msg) + + def status_update(self, deposit_id: Union[int, str], status: str, + revision_id: Optional[str] = None, + directory_id: Optional[str] = None, + origin_url: Optional[str] = None): + """Update deposit's information including status, and persistent + identifiers result of the loading. + + """ + url = f'/{self.base_url}/{deposit_id}/update/' + payload = {'status': status} + if revision_id: + payload['revision_id'] = revision_id + if directory_id: + payload['directory_id'] = directory_id + if origin_url: + payload['origin_url'] = origin_url + + self.do('put', url, json=payload) diff --git a/swh/loader/package/tests/conftest.py b/swh/loader/package/tests/conftest.py index f80142f..a5e446c 100644 --- a/swh/loader/package/tests/conftest.py +++ b/swh/loader/package/tests/conftest.py @@ -1,51 +1,62 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import yaml +from typing import Any, Dict + from swh.storage.tests.conftest import * # noqa from swh.scheduler.tests.conftest import * # noqa @pytest.fixture -def swh_config(monkeypatch, swh_storage_postgresql, tmp_path): - storage_config = { +def swh_loader_config(swh_storage_postgresql) -> Dict[str, Any]: + return { 'storage': { 'cls': 'local', 'args': { 'db': swh_storage_postgresql.dsn, 'objstorage': { 'cls': 'memory', 'args': {} }, }, }, - 'url': 'https://deposit.softwareheritage.org/1/private', + 'deposit': { + 'url': 'https://deposit.softwareheritage.org/1/private', + 'auth': { + 'username': 'user', + 'password': 'pass', + } + }, } + +@pytest.fixture +def swh_config(swh_loader_config, monkeypatch, tmp_path): conffile = os.path.join(str(tmp_path), 'loader.yml') with open(conffile, 'w') as f: - f.write(yaml.dump(storage_config)) + f.write(yaml.dump(swh_loader_config)) monkeypatch.setenv('SWH_CONFIG_FILENAME', conffile) return conffile @pytest.fixture(autouse=True, scope='session') def swh_proxy(): """Automatically inject this fixture in all tests to ensure no outside connection takes place. """ os.environ['http_proxy'] = 'http://localhost:999' os.environ['https_proxy'] = 'http://localhost:999' @pytest.fixture(scope='session') # type: ignore # expected redefinition def celery_includes(): return [ 'swh.loader.package.tasks', ] diff --git a/swh/loader/package/tests/test_deposit.py b/swh/loader/package/tests/test_deposit.py index 9b157cd..9f00c41 100644 --- a/swh/loader/package/tests/test_deposit.py +++ b/swh/loader/package/tests/test_deposit.py @@ -1,213 +1,208 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re from swh.model.hashutil import hash_to_bytes from swh.loader.package.deposit import DepositLoader from swh.loader.package.tests.common import ( check_snapshot, check_metadata_paths, get_stats ) from swh.core.pytest_plugin import requests_mock_datadir_factory -def test_deposit_init_ok(swh_config): +def test_deposit_init_ok(swh_config, swh_loader_config): url = 'some-url' deposit_id = 999 loader = DepositLoader(url, deposit_id) # Something that does not exist assert loader.url == url - assert loader.archive_url == '/%s/raw/' % deposit_id - assert loader.metadata_url == '/%s/meta/' % deposit_id - assert loader.deposit_update_url == '/%s/update/' % deposit_id assert loader.client is not None + assert loader.client.base_url == swh_loader_config['deposit']['url'] def test_deposit_loading_failure_to_fetch_metadata(swh_config): """Error during fetching artifact ends us with failed/partial visit """ # private api url form: 'https://deposit.s.o/1/private/hal/666/raw/' url = 'some-url' unknown_deposit_id = 666 loader = DepositLoader(url, unknown_deposit_id) # does not exist actual_load_status = loader.load() assert actual_load_status == {'status': 'failed'} stats = get_stats(loader.storage) assert { 'content': 0, 'directory': 0, 'origin': 1, 'origin_visit': 1, 'person': 0, 'release': 0, 'revision': 0, 'skipped_content': 0, 'snapshot': 0, } == stats origin_visit = next(loader.storage.origin_visit_get(url)) assert origin_visit['status'] == 'partial' assert origin_visit['type'] == 'deposit' requests_mock_datadir_missing_one = requests_mock_datadir_factory(ignore_urls=[ 'https://deposit.softwareheritage.org/1/private/666/raw/', ]) def test_deposit_loading_failure_to_retrieve_1_artifact( swh_config, requests_mock_datadir_missing_one): """Deposit with missing artifact ends up with an uneventful/partial visit """ # private api url form: 'https://deposit.s.o/1/private/hal/666/raw/' url = 'some-url-2' deposit_id = 666 loader = DepositLoader(url, deposit_id) - assert loader.archive_url actual_load_status = loader.load() assert actual_load_status['status'] == 'uneventful' assert actual_load_status['snapshot_id'] is not None stats = get_stats(loader.storage) assert { 'content': 0, 'directory': 0, 'origin': 1, 'origin_visit': 1, 'person': 0, 'release': 0, 'revision': 0, 'skipped_content': 0, 'snapshot': 1, } == stats origin_visit = next(loader.storage.origin_visit_get(url)) assert origin_visit['status'] == 'partial' assert origin_visit['type'] == 'deposit' def test_revision_metadata_structure(swh_config, requests_mock_datadir): # do not care for deposit update query requests_mock_datadir.put(re.compile('https')) url = 'https://hal-test.archives-ouvertes.fr/some-external-id' deposit_id = 666 loader = DepositLoader(url, deposit_id) - assert loader.archive_url actual_load_status = loader.load() assert actual_load_status['status'] == 'eventful' assert actual_load_status['snapshot_id'] is not None expected_revision_id = hash_to_bytes( '9471c606239bccb1f269564c9ea114e1eeab9eb4') revision = list(loader.storage.revision_get([expected_revision_id]))[0] assert revision is not None check_metadata_paths(revision['metadata'], paths=[ ('extrinsic.provider', str), ('extrinsic.when', str), ('extrinsic.raw', dict), ('original_artifact', list), ]) for original_artifact in revision['metadata']['original_artifact']: check_metadata_paths(original_artifact, paths=[ ('filename', str), ('length', int), ('checksums', dict), ]) def test_deposit_loading_ok(swh_config, requests_mock_datadir): requests_mock_datadir.put(re.compile('https')) # do not care for put url = 'https://hal-test.archives-ouvertes.fr/some-external-id' deposit_id = 666 loader = DepositLoader(url, deposit_id) - assert loader.archive_url actual_load_status = loader.load() expected_snapshot_id = '453f455d0efb69586143cd6b6e5897f9906b53a7' assert actual_load_status == { 'status': 'eventful', 'snapshot_id': expected_snapshot_id, } stats = get_stats(loader.storage) assert { 'content': 303, 'directory': 12, 'origin': 1, 'origin_visit': 1, 'person': 1, 'release': 0, 'revision': 1, 'skipped_content': 0, 'snapshot': 1, } == stats origin_visit = next(loader.storage.origin_visit_get(url)) assert origin_visit['status'] == 'full' assert origin_visit['type'] == 'deposit' expected_branches = { 'HEAD': { 'target': '9471c606239bccb1f269564c9ea114e1eeab9eb4', 'target_type': 'revision', }, } expected_snapshot = { 'id': expected_snapshot_id, 'branches': expected_branches, } check_snapshot(expected_snapshot, storage=loader.storage) # check metadata tool = { "name": "swh-deposit", "version": "0.0.1", "configuration": { "sword_version": "2", } } tool = loader.storage.tool_get(tool) assert tool is not None assert tool['id'] is not None provider = { "provider_name": "hal", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", "metadata": None, } provider = loader.storage.metadata_provider_get_by(provider) assert provider is not None assert provider['id'] is not None metadata = list(loader.storage.origin_metadata_get_by( url, provider_type='deposit_client')) assert metadata is not None assert isinstance(metadata, list) assert len(metadata) == 1 metadata0 = metadata[0] assert metadata0['provider_id'] == provider['id'] assert metadata0['provider_type'] == 'deposit_client' assert metadata0['tool_id'] == tool['id'] diff --git a/swh/loader/package/utils.py b/swh/loader/package/utils.py index 3dedc2b..1a95fb8 100644 --- a/swh/loader/package/utils.py +++ b/swh/loader/package/utils.py @@ -1,111 +1,117 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import copy import logging import os import requests from typing import Dict, Optional, Tuple from swh.model.hashutil import MultiHash, HASH_BLOCK_SIZE from swh.loader.package import DEFAULT_PARAMS logger = logging.getLogger(__name__) def api_info(url: str) -> Dict: """Basic api client to retrieve information on project. This deals with fetching json metadata about pypi projects. Args: url (str): The api url (e.g PyPI, npm, etc...) Raises: ValueError in case of query failures (for some reasons: 404, ...) Returns: The associated response's information dict """ response = requests.get(url, **DEFAULT_PARAMS) if response.status_code != 200: raise ValueError("Fail to query '%s'. Reason: %s" % ( url, response.status_code)) return response.json() def download(url: str, dest: str, hashes: Dict = {}, - filename: Optional[str] = None) -> Tuple[str, Dict]: + filename: Optional[str] = None, + auth: Optional[Tuple[str, str]] = None) -> Tuple[str, Dict]: """Download a remote tarball from url, uncompresses and computes swh hashes on it. Args: url: Artifact uri to fetch, uncompress and hash dest: Directory to write the archive to - hashes: Dict of expected hashes (key is the hash algo) for the artifact to download (those hashes are expected to be hex string) + auth: Optional tuple of login/password (for http authentication + service, e.g. deposit) Raises: ValueError in case of any error when fetching/computing (length, checksums mismatched...) Returns: Tuple of local (filepath, hashes of filepath) """ - response = requests.get(url, **DEFAULT_PARAMS, stream=True) + params = copy.deepcopy(DEFAULT_PARAMS) + if auth is not None: + params['auth'] = auth + response = requests.get(url, **params, stream=True) logger.debug('headers: %s', response.headers) if response.status_code != 200: raise ValueError("Fail to query '%s'. Reason: %s" % ( url, response.status_code)) length = int(response.headers['content-length']) filename = filename if filename else os.path.basename(url) logger.debug('filename: %s', filename) filepath = os.path.join(dest, filename) logger.debug('filepath: %s', filepath) h = MultiHash(length=length) with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=HASH_BLOCK_SIZE): h.update(chunk) f.write(chunk) actual_length = os.path.getsize(filepath) if length != actual_length: raise ValueError('Error when checking size: %s != %s' % ( length, actual_length)) # Also check the expected hashes if provided if hashes: actual_hashes = h.hexdigest() for algo_hash in hashes.keys(): actual_digest = actual_hashes[algo_hash] expected_digest = hashes[algo_hash] if actual_digest != expected_digest: raise ValueError( 'Failure when fetching %s. ' 'Checksum mismatched: %s != %s' % ( url, expected_digest, actual_digest)) extrinsic_metadata = { 'length': length, 'filename': filename, 'checksums': { **h.hexdigest() }, } logger.debug('extrinsic_metadata', extrinsic_metadata) return filepath, extrinsic_metadata def release_name(version: str, filename: Optional[str] = None) -> str: if filename: return 'releases/%s/%s' % (version, filename) return 'releases/%s' % version