diff --git a/swh/loader/package/deposit/tests/test_deposit.py b/swh/loader/package/deposit/tests/test_deposit.py index 181415c..ed540f9 100644 --- a/swh/loader/package/deposit/tests/test_deposit.py +++ b/swh/loader/package/deposit/tests/test_deposit.py @@ -1,565 +1,565 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json import re import pytest from swh.core.pytest_plugin import requests_mock_datadir_factory from swh.loader.package.deposit.loader import ApiClient, DepositLoader from swh.loader.package.loader import now from swh.loader.tests import assert_last_visit_matches, check_snapshot, get_stats from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import ( Origin, Person, RawExtrinsicMetadata, Release, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) from swh.model.model import MetadataAuthority, MetadataAuthorityType, MetadataFetcher from swh.model.model import ObjectType as ModelObjectType from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID, ObjectType DEPOSIT_URL = "https://deposit.softwareheritage.org/1/private" @pytest.fixture def requests_mock_datadir(requests_mock_datadir): """Enhance default mock data to mock put requests as the loader does some internal update queries there. """ requests_mock_datadir.put(re.compile("https")) return requests_mock_datadir def test_deposit_init_ok(swh_storage, deposit_client, swh_loader_config): url = "some-url" deposit_id = 999 loader = DepositLoader( swh_storage, url, deposit_id, deposit_client, default_filename="archive.zip" ) # Something that does not exist assert loader.origin.url == url assert loader.client is not None assert loader.client.base_url == swh_loader_config["deposit"]["url"] def test_deposit_from_configfile(swh_config): """Ensure the deposit instantiation is ok""" loader = DepositLoader.from_configfile( url="some-url", deposit_id="666", default_filename="archive.zip" ) assert isinstance(loader.client, ApiClient) def test_deposit_loading_unknown_deposit( swh_storage, deposit_client, requests_mock_datadir ): """Loading an unknown deposit should fail no origin, no visit, no snapshot """ # private api url form: 'https://deposit.s.o/1/private/hal/666/raw/' url = "some-url" unknown_deposit_id = 667 loader = DepositLoader( swh_storage, url, unknown_deposit_id, deposit_client, default_filename="archive.zip", ) # does not exist actual_load_status = loader.load() assert actual_load_status == {"status": "failed"} stats = get_stats(loader.storage) assert { "content": 0, "directory": 0, "origin": 0, "origin_visit": 0, "release": 0, "revision": 0, "skipped_content": 0, "snapshot": 0, } == stats requests_mock_datadir_missing_one = requests_mock_datadir_factory( ignore_urls=[ f"{DEPOSIT_URL}/666/raw/", ] ) def test_deposit_loading_failure_to_retrieve_1_artifact( swh_storage, deposit_client, requests_mock_datadir_missing_one ): """Deposit with missing artifact ends up with an uneventful/partial visit""" # private api url form: 'https://deposit.s.o/1/private/hal/666/raw/' url = "some-url-2" deposit_id = 666 requests_mock_datadir_missing_one.put(re.compile("https")) loader = DepositLoader( swh_storage, url, deposit_id, deposit_client, default_filename="archive.zip" ) actual_load_status = loader.load() assert actual_load_status["status"] == "uneventful" assert actual_load_status["snapshot_id"] is not None assert_last_visit_matches(loader.storage, url, status="partial", type="deposit") stats = get_stats(loader.storage) assert { "content": 0, "directory": 0, "origin": 1, "origin_visit": 1, "release": 0, "revision": 0, "skipped_content": 0, "snapshot": 1, } == stats # Retrieve the information for deposit status update query to the deposit urls = [ m for m in requests_mock_datadir_missing_one.request_history if m.url == f"{DEPOSIT_URL}/{deposit_id}/update/" ] assert len(urls) == 1 update_query = urls[0] body = update_query.json() expected_body = { "status": "failed", "status_detail": { "loading": [ - "Failed to load branch HEAD for some-url-2: Fail to query " - "'https://deposit.softwareheritage.org/1/private/666/raw/'. Reason: 404" + "Failed to load branch HEAD for some-url-2: 404 Client Error: None " + "for url: https://deposit.softwareheritage.org/1/private/666/raw/" ] }, } assert body == expected_body def test_deposit_loading_ok(swh_storage, deposit_client, requests_mock_datadir): url = "https://hal-test.archives-ouvertes.fr/some-external-id" deposit_id = 666 loader = DepositLoader( swh_storage, url, deposit_id, deposit_client, default_filename="archive.zip" ) actual_load_status = loader.load() expected_snapshot_id = "338b45d87e02fb5cbf324694bc4a898623d6a30f" assert actual_load_status == { "status": "eventful", "snapshot_id": expected_snapshot_id, } assert_last_visit_matches( loader.storage, url, status="full", type="deposit", snapshot=hash_to_bytes(expected_snapshot_id), ) release_id_hex = "2566a64a27bc00362e265be9666d7606750530a1" release_id = hash_to_bytes(release_id_hex) expected_snapshot = Snapshot( id=hash_to_bytes(expected_snapshot_id), branches={ b"HEAD": SnapshotBranch( target=release_id, target_type=TargetType.RELEASE, ), }, ) check_snapshot(expected_snapshot, storage=loader.storage) release = loader.storage.release_get([release_id])[0] date = TimestampWithTimezone.from_datetime( datetime.datetime(2017, 10, 7, 15, 17, 8, tzinfo=datetime.timezone.utc) ) person = Person( fullname=b"Software Heritage", name=b"Software Heritage", email=b"robot@softwareheritage.org", ) assert release == Release( id=release_id, name=b"HEAD", message=b"hal: Deposit 666 in collection hal\n", author=person, date=date, target_type=ModelObjectType.DIRECTORY, target=b"\xfd-\xf1-\xc5SL\x1d\xa1\xe9\x18\x0b\x91Q\x02\xfbo`\x1d\x19", synthetic=True, metadata=None, ) # check metadata fetcher = MetadataFetcher( name="swh-deposit", version="0.0.1", ) authority = MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url="https://hal-test.archives-ouvertes.fr/", ) # Check origin metadata orig_meta = loader.storage.raw_extrinsic_metadata_get( Origin(url).swhid(), authority ) assert orig_meta.next_page_token is None raw_meta = loader.client.metadata_get(deposit_id) raw_metadata: str = raw_meta["raw_metadata"] # 2 raw metadata xml + 1 json dict assert len(orig_meta.results) == 2 orig_meta0 = orig_meta.results[0] assert orig_meta0.authority == authority assert orig_meta0.fetcher == fetcher # Check directory metadata assert release.target_type == ModelObjectType.DIRECTORY directory_swhid = CoreSWHID( object_type=ObjectType.DIRECTORY, object_id=release.target ) actual_dir_meta = loader.storage.raw_extrinsic_metadata_get( directory_swhid, authority ) assert actual_dir_meta.next_page_token is None assert len(actual_dir_meta.results) == 1 dir_meta = actual_dir_meta.results[0] assert dir_meta.authority == authority assert dir_meta.fetcher == fetcher assert dir_meta.metadata.decode() == raw_metadata # Retrieve the information for deposit status update query to the deposit urls = [ m for m in requests_mock_datadir.request_history if m.url == f"{DEPOSIT_URL}/{deposit_id}/update/" ] assert len(urls) == 1 update_query = urls[0] body = update_query.json() expected_body = { "status": "done", "release_id": release_id_hex, "directory_id": hash_to_hex(release.target), "snapshot_id": expected_snapshot_id, "origin_url": url, } assert body == expected_body stats = get_stats(loader.storage) assert { "content": 303, "directory": 12, "origin": 1, "origin_visit": 1, "release": 1, "revision": 0, "skipped_content": 0, "snapshot": 1, } == stats def test_deposit_loading_ok_2(swh_storage, deposit_client, requests_mock_datadir): """Field dates should be se appropriately""" external_id = "some-external-id" url = f"https://hal-test.archives-ouvertes.fr/{external_id}" deposit_id = 777 loader = DepositLoader( swh_storage, url, deposit_id, deposit_client, default_filename="archive.zip" ) actual_load_status = loader.load() expected_snapshot_id = "3449b8ff31abeacefd33cca60e3074c1649dc3a1" assert actual_load_status == { "status": "eventful", "snapshot_id": expected_snapshot_id, } assert_last_visit_matches( loader.storage, url, status="full", type="deposit", snapshot=hash_to_bytes(expected_snapshot_id), ) release_id = "ba6c9a59ae3256e765d32b211cc183dc2380aed7" expected_snapshot = Snapshot( id=hash_to_bytes(expected_snapshot_id), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes(release_id), target_type=TargetType.RELEASE ) }, ) check_snapshot(expected_snapshot, storage=loader.storage) raw_meta = loader.client.metadata_get(deposit_id) # Ensure the date fields are set appropriately in the release # Retrieve the release release = loader.storage.release_get([hash_to_bytes(release_id)])[0] assert release # swh-deposit uses the numeric 'offset_minutes' instead of the bytes offset # attribute, because its dates are always well-formed, and it can only send # JSON-serializable data. release_date_dict = { "timestamp": release.date.timestamp.to_dict(), "offset": release.date.offset_minutes(), } assert release_date_dict == raw_meta["deposit"]["author_date"] assert not release.metadata provider = { "provider_name": "hal", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", "metadata": None, } tool = { "name": "swh-deposit", "version": "0.0.1", "configuration": {"sword_version": "2"}, } fetcher = MetadataFetcher( name="swh-deposit", version="0.0.1", ) authority = MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url="https://hal-test.archives-ouvertes.fr/", ) # Check the origin metadata swh side origin_extrinsic_metadata = loader.storage.raw_extrinsic_metadata_get( Origin(url).swhid(), authority ) assert origin_extrinsic_metadata.next_page_token is None raw_metadata: str = raw_meta["raw_metadata"] # 1 raw metadata xml + 1 json dict assert len(origin_extrinsic_metadata.results) == 2 origin_swhid = Origin(url).swhid() expected_metadata = [] origin_meta = origin_extrinsic_metadata.results[0] expected_metadata.append( RawExtrinsicMetadata( target=origin_swhid, discovery_date=origin_meta.discovery_date, metadata=raw_metadata.encode(), format="sword-v2-atom-codemeta-v2", authority=authority, fetcher=fetcher, ) ) origin_metadata = { "metadata": [raw_metadata], "provider": provider, "tool": tool, } expected_metadata.append( RawExtrinsicMetadata( target=origin_swhid, discovery_date=origin_extrinsic_metadata.results[-1].discovery_date, metadata=json.dumps(origin_metadata).encode(), format="original-artifacts-json", authority=authority, fetcher=fetcher, ) ) assert sorted(origin_extrinsic_metadata.results) == sorted(expected_metadata) # Check the release metadata swh side assert release.target_type == ModelObjectType.DIRECTORY directory_swhid = ExtendedSWHID( object_type=ExtendedObjectType.DIRECTORY, object_id=release.target ) actual_directory_metadata = loader.storage.raw_extrinsic_metadata_get( directory_swhid, authority ) assert actual_directory_metadata.next_page_token is None assert len(actual_directory_metadata.results) == 1 release_swhid = CoreSWHID( object_type=ObjectType.RELEASE, object_id=hash_to_bytes(release_id) ) dir_metadata_template = RawExtrinsicMetadata( target=directory_swhid, format="sword-v2-atom-codemeta-v2", authority=authority, fetcher=fetcher, origin=url, release=release_swhid, # to satisfy the constructor discovery_date=now(), metadata=b"", ) expected_directory_metadata = [] dir_metadata = actual_directory_metadata.results[0] expected_directory_metadata.append( RawExtrinsicMetadata.from_dict( { **{ k: v for (k, v) in dir_metadata_template.to_dict().items() if k != "id" }, "discovery_date": dir_metadata.discovery_date, "metadata": raw_metadata.encode(), } ) ) assert sorted(actual_directory_metadata.results) == sorted( expected_directory_metadata ) # Retrieve the information for deposit status update query to the deposit urls = [ m for m in requests_mock_datadir.request_history if m.url == f"{DEPOSIT_URL}/{deposit_id}/update/" ] assert len(urls) == 1 update_query = urls[0] body = update_query.json() expected_body = { "status": "done", "release_id": release_id, "directory_id": hash_to_hex(release.target), "snapshot_id": expected_snapshot_id, "origin_url": url, } assert body == expected_body def test_deposit_loading_ok_3(swh_storage, deposit_client, requests_mock_datadir): """Deposit loading can happen on tarball artifacts as well The latest deposit changes introduce the internal change. """ external_id = "hal-123456" url = f"https://hal-test.archives-ouvertes.fr/{external_id}" deposit_id = 888 loader = DepositLoader(swh_storage, url, deposit_id, deposit_client) actual_load_status = loader.load() expected_snapshot_id = "4677843de89e398f1d6bfedc9ca9b89c451c55c8" assert actual_load_status == { "status": "eventful", "snapshot_id": expected_snapshot_id, } assert_last_visit_matches( loader.storage, url, status="full", type="deposit", snapshot=hash_to_bytes(expected_snapshot_id), ) def test_deposit_loading_ok_release_notes( swh_storage, deposit_client, requests_mock_datadir ): url = "https://hal-test.archives-ouvertes.fr/some-external-id" deposit_id = 999 loader = DepositLoader( swh_storage, url, deposit_id, deposit_client, default_filename="archive.zip" ) actual_load_status = loader.load() expected_snapshot_id = "a307acffb7c29bebb3daf1bcb680bb3f452890a8" assert actual_load_status == { "status": "eventful", "snapshot_id": expected_snapshot_id, } assert_last_visit_matches( loader.storage, url, status="full", type="deposit", snapshot=hash_to_bytes(expected_snapshot_id), ) release_id_hex = "f5e8ec02ede57edbe061afa7fc2a07bb7d14a700" release_id = hash_to_bytes(release_id_hex) expected_snapshot = Snapshot( id=hash_to_bytes(expected_snapshot_id), branches={ b"HEAD": SnapshotBranch( target=release_id, target_type=TargetType.RELEASE, ), }, ) check_snapshot(expected_snapshot, storage=loader.storage) release = loader.storage.release_get([release_id])[0] date = TimestampWithTimezone.from_datetime( datetime.datetime(2017, 10, 7, 15, 17, 8, tzinfo=datetime.timezone.utc) ) person = Person( fullname=b"Software Heritage", name=b"Software Heritage", email=b"robot@softwareheritage.org", ) assert release == Release( id=release_id, name=b"HEAD", message=( b"hal: Deposit 999 in collection hal\n\nThis release adds this and that.\n" ), author=person, date=date, target_type=ModelObjectType.DIRECTORY, target=b"\xfd-\xf1-\xc5SL\x1d\xa1\xe9\x18\x0b\x91Q\x02\xfbo`\x1d\x19", synthetic=True, metadata=None, ) diff --git a/swh/loader/package/tests/test_utils.py b/swh/loader/package/tests/test_utils.py index 75373e7..bf1f4da 100644 --- a/swh/loader/package/tests/test_utils.py +++ b/swh/loader/package/tests/test_utils.py @@ -1,236 +1,273 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os from unittest.mock import MagicMock from urllib.error import URLError from urllib.parse import quote import pytest +from requests.exceptions import HTTPError from swh.loader.exception import NotFound import swh.loader.package from swh.loader.package.utils import api_info, download, release_name def test_version_generation(): assert ( swh.loader.package.__version__ != "devel" ), "Make sure swh.loader.core is installed (e.g. pip install -e .)" @pytest.mark.fs def test_download_fail_to_download(tmp_path, requests_mock): url = "https://pypi.org/pypi/arrow/json" status_code = 404 requests_mock.get(url, status_code=status_code) - with pytest.raises(ValueError) as e: + with pytest.raises( + HTTPError, match=f"{status_code} Client Error: None for url: {url}" + ): download(url, tmp_path) - assert e.value.args[0] == "Fail to query '%s'. Reason: %s" % (url, status_code) - _filename = "requests-0.0.1.tar.gz" _data = "this is something" def _check_download_ok(url, dest, filename=_filename, hashes={}): actual_filepath, actual_hashes = download(url, dest, hashes=hashes) actual_filename = os.path.basename(actual_filepath) assert actual_filename == filename assert actual_hashes["length"] == len(_data) assert ( actual_hashes["checksums"]["sha1"] == "fdd1ce606a904b08c816ba84f3125f2af44d92b2" ) assert ( actual_hashes["checksums"]["sha256"] == "1d9224378d77925d612c9f926eb9fb92850e6551def8328011b6a972323298d5" ) @pytest.mark.fs def test_download_ok(tmp_path, requests_mock): """Download without issue should provide filename and hashes""" url = f"https://pypi.org/pypi/requests/{_filename}" requests_mock.get(url, text=_data, headers={"content-length": str(len(_data))}) _check_download_ok(url, dest=str(tmp_path)) @pytest.mark.fs def test_download_ok_no_header(tmp_path, requests_mock): """Download without issue should provide filename and hashes""" url = f"https://pypi.org/pypi/requests/{_filename}" requests_mock.get(url, text=_data) # no header information _check_download_ok(url, dest=str(tmp_path)) @pytest.mark.fs def test_download_ok_with_hashes(tmp_path, requests_mock): """Download without issue should provide filename and hashes""" url = f"https://pypi.org/pypi/requests/{_filename}" requests_mock.get(url, text=_data, headers={"content-length": str(len(_data))}) # good hashes for such file good = { "sha1": "fdd1ce606a904b08c816ba84f3125f2af44d92b2", "sha256": "1d9224378d77925d612c9f926eb9fb92850e6551def8328011b6a972323298d5", # noqa } _check_download_ok(url, dest=str(tmp_path), hashes=good) @pytest.mark.fs def test_download_fail_hashes_mismatch(tmp_path, requests_mock): """Mismatch hash after download should raise""" url = f"https://pypi.org/pypi/requests/{_filename}" requests_mock.get(url, text=_data, headers={"content-length": str(len(_data))}) # good hashes for such file good = { "sha1": "fdd1ce606a904b08c816ba84f3125f2af44d92b2", "sha256": "1d9224378d77925d612c9f926eb9fb92850e6551def8328011b6a972323298d5", # noqa } for hash_algo in good.keys(): wrong_hash = good[hash_algo].replace("1", "0") expected_hashes = good.copy() expected_hashes[hash_algo] = wrong_hash # set the wrong hash expected_msg = "Failure when fetching %s. " "Checksum mismatched: %s != %s" % ( url, wrong_hash, good[hash_algo], ) with pytest.raises(ValueError, match=expected_msg): download(url, dest=str(tmp_path), hashes=expected_hashes) @pytest.mark.fs def test_ftp_download_ok(tmp_path, mocker): """Download without issue should provide filename and hashes""" url = f"ftp://pypi.org/pypi/requests/{_filename}" cm = MagicMock() cm.getstatus.return_value = 200 cm.read.side_effect = [_data.encode(), b""] cm.__enter__.return_value = cm mocker.patch("swh.loader.package.utils.urlopen").return_value = cm _check_download_ok(url, dest=str(tmp_path)) @pytest.mark.fs def test_ftp_download_ko(tmp_path, mocker): """Download without issue should provide filename and hashes""" filename = "requests-0.0.1.tar.gz" url = "ftp://pypi.org/pypi/requests/%s" % filename mocker.patch("swh.loader.package.utils.urlopen").side_effect = URLError("FTP error") with pytest.raises(URLError): download(url, dest=str(tmp_path)) @pytest.mark.fs def test_download_with_redirection(tmp_path, requests_mock): """Download with redirection should use the targeted URL to extract filename""" url = "https://example.org/project/requests/download" redirection_url = f"https://example.org/project/requests/files/{_filename}" requests_mock.get(url, status_code=302, headers={"location": redirection_url}) requests_mock.get( redirection_url, text=_data, headers={"content-length": str(len(_data))} ) _check_download_ok(url, dest=str(tmp_path)) def test_download_extracting_filename_from_url(tmp_path, requests_mock): """Extracting filename from url must sanitize the filename first""" url = "https://example.org/project/requests-0.0.1.tar.gz?a=b&c=d&foo=bar" requests_mock.get( url, status_code=200, text=_data, headers={"content-length": str(len(_data))} ) _check_download_ok(url, dest=str(tmp_path)) @pytest.mark.fs @pytest.mark.parametrize( "filename", [f'"{_filename}"', _filename, '"filename with spaces.tar.gz"'] ) def test_download_filename_from_content_disposition(tmp_path, requests_mock, filename): """Filename should be extracted from content-disposition request header when available.""" url = "https://example.org/download/requests/tar.gz/v0.0.1" requests_mock.get( url, text=_data, headers={ "content-length": str(len(_data)), "content-disposition": f"attachment; filename={filename}", }, ) _check_download_ok(url, dest=str(tmp_path), filename=filename.strip('"')) @pytest.mark.fs @pytest.mark.parametrize("filename", ['"archive école.tar.gz"', "archive_école.tgz"]) def test_download_utf8_filename_from_content_disposition( tmp_path, requests_mock, filename ): """Filename should be extracted from content-disposition request header when available.""" url = "https://example.org/download/requests/tar.gz/v0.0.1" data = "this is something" requests_mock.get( url, text=data, headers={ "content-length": str(len(data)), "content-disposition": f"attachment; filename*=utf-8''{quote(filename)}", }, ) _check_download_ok(url, dest=str(tmp_path), filename=filename.strip('"')) def test_api_info_failure(requests_mock): """Failure to fetch info/release information should raise""" url = "https://pypi.org/pypi/requests/json" status_code = 400 requests_mock.get(url, status_code=status_code) with pytest.raises(NotFound) as e0: api_info(url) assert e0.value.args[0] == "Fail to query '%s'. Reason: %s" % (url, status_code) def test_api_info(requests_mock): """Fetching json info from pypi project should be ok""" url = "https://pypi.org/pypi/requests/json" requests_mock.get(url, text='{"version": "0.0.1"}') actual_info = json.loads(api_info(url)) assert actual_info == { "version": "0.0.1", } def test_release_name(): for version, filename, expected_release in [ ("0.0.1", None, "releases/0.0.1"), ("0.0.2", "something", "releases/0.0.2/something"), ]: assert release_name(version, filename) == expected_release + + +@pytest.fixture(autouse=True) +def mock_download_retry_sleep(mocker): + mocker.patch.object(download.retry, "sleep") + + +def test_download_retry(mocker, requests_mock, tmp_path): + url = f"https://example.org/project/requests/files/{_filename}" + + requests_mock.get( + url, + [ + {"status_code": 429}, + {"status_code": 429}, + { + "text": _data, + "headers": {"content-length": str(len(_data))}, + "status_code": 200, + }, + ], + ) + + _check_download_ok(url, dest=str(tmp_path)) + + +def test_download_retry_reraise(mocker, requests_mock, tmp_path): + url = f"https://example.org/project/requests/files/{_filename}" + + requests_mock.get( + url, + [{"status_code": 429}] * 5, + ) + + with pytest.raises(HTTPError): + _check_download_ok(url, dest=str(tmp_path)) diff --git a/swh/loader/package/utils.py b/swh/loader/package/utils.py index 0656eca..df3127c 100644 --- a/swh/loader/package/utils.py +++ b/swh/loader/package/utils.py @@ -1,185 +1,207 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import functools import itertools import logging import os import re from typing import Callable, Dict, Optional, Tuple, TypeVar from urllib.parse import unquote, urlsplit from urllib.request import urlopen import requests +from requests.exceptions import HTTPError +from tenacity import retry +from tenacity.before_sleep import before_sleep_log +from tenacity.stop import stop_after_attempt +from tenacity.wait import wait_exponential from swh.loader.exception import NotFound from swh.loader.package import DEFAULT_PARAMS from swh.model.hashutil import HASH_BLOCK_SIZE, MultiHash from swh.model.model import Person logger = logging.getLogger(__name__) DOWNLOAD_HASHES = set(["sha1", "sha256", "length"]) EMPTY_AUTHOR = Person.from_fullname(b"") def api_info(url: str, **extra_params) -> bytes: """Basic api client to retrieve information on project. This deals with fetching json metadata about pypi projects. Args: url (str): The api url (e.g PyPI, npm, etc...) Raises: NotFound in case of query failures (for some reasons: 404, ...) Returns: The associated response's information """ response = requests.get(url, **{**DEFAULT_PARAMS, **extra_params}) if response.status_code != 200: raise NotFound(f"Fail to query '{url}'. Reason: {response.status_code}") return response.content def _content_disposition_filename(header: str) -> Optional[str]: fname = None fnames = re.findall(r"filename[\*]?=([^;]+)", header) if fnames and "utf-8''" in fnames[0].lower(): # RFC 5987 fname = re.sub("utf-8''", "", fnames[0], flags=re.IGNORECASE) fname = unquote(fname) elif fnames: fname = fnames[0] if fname: fname = os.path.basename(fname.strip().strip('"')) return fname +def _retry_if_throttling(retry_state) -> bool: + """Custom tenacity retry predicate for handling HTTP responses with + status code 429 (too many requests). + """ + attempt = retry_state.outcome + if attempt.failed: + exception = attempt.exception() + return ( + isinstance(exception, HTTPError) and exception.response.status_code == 429 + ) + return False + + +@retry( + retry=_retry_if_throttling, + wait=wait_exponential(exp_base=10), + stop=stop_after_attempt(max_attempt_number=5), + before_sleep=before_sleep_log(logger, logging.WARNING), + reraise=True, +) def download( url: str, dest: str, hashes: Dict = {}, filename: Optional[str] = None, auth: Optional[Tuple[str, str]] = None, extra_request_headers: Optional[Dict[str, str]] = None, ) -> Tuple[str, Dict]: """Download a remote tarball from url, uncompresses and computes swh hashes on it. Args: url: Artifact uri to fetch, uncompress and hash dest: Directory to write the archive to hashes: Dict of expected hashes (key is the hash algo) for the artifact to download (those hashes are expected to be hex string) auth: Optional tuple of login/password (for http authentication service, e.g. deposit) Raises: ValueError in case of any error when fetching/computing (length, checksums mismatched...) Returns: Tuple of local (filepath, hashes of filepath) """ params = copy.deepcopy(DEFAULT_PARAMS) if auth is not None: params["auth"] = auth if extra_request_headers is not None: params["headers"].update(extra_request_headers) # so the connection does not hang indefinitely (read/connection timeout) timeout = params.get("timeout", 60) if url.startswith("ftp://"): response = urlopen(url, timeout=timeout) chunks = (response.read(HASH_BLOCK_SIZE) for _ in itertools.count()) response_data = itertools.takewhile(bool, chunks) else: response = requests.get(url, **params, timeout=timeout, stream=True) - if response.status_code != 200: - raise ValueError( - "Fail to query '%s'. Reason: %s" % (url, response.status_code) - ) + response.raise_for_status() # update URL to response one as requests follow redirection by default # on GET requests url = response.url # try to extract filename from content-disposition header if available if filename is None and "content-disposition" in response.headers: filename = _content_disposition_filename( response.headers["content-disposition"] ) response_data = response.iter_content(chunk_size=HASH_BLOCK_SIZE) filename = filename if filename else os.path.basename(urlsplit(url).path) logger.debug("filename: %s", filename) filepath = os.path.join(dest, filename) logger.debug("filepath: %s", filepath) h = MultiHash(hash_names=DOWNLOAD_HASHES | set(hashes.keys())) with open(filepath, "wb") as f: for chunk in response_data: h.update(chunk) f.write(chunk) response.close() # Also check the expected hashes if provided if hashes: actual_hashes = h.hexdigest() for algo_hash in hashes.keys(): actual_digest = actual_hashes[algo_hash] expected_digest = hashes[algo_hash] if actual_digest != expected_digest: raise ValueError( "Failure when fetching %s. " "Checksum mismatched: %s != %s" % (url, expected_digest, actual_digest) ) computed_hashes = h.hexdigest() length = computed_hashes.pop("length") extrinsic_metadata = { "length": length, "filename": filename, "checksums": computed_hashes, "url": url, } logger.debug("extrinsic_metadata", extrinsic_metadata) return filepath, extrinsic_metadata def release_name(version: str, filename: Optional[str] = None) -> str: if filename: return "releases/%s/%s" % (version, filename) return "releases/%s" % version TReturn = TypeVar("TReturn") TSelf = TypeVar("TSelf") _UNDEFINED = object() def cached_method(f: Callable[[TSelf], TReturn]) -> Callable[[TSelf], TReturn]: cache_name = f"_cached_{f.__name__}" @functools.wraps(f) def newf(self): value = getattr(self, cache_name, _UNDEFINED) if value is _UNDEFINED: value = f(self) setattr(self, cache_name, value) return value return newf