diff --git a/swh/loader/package/tests/test_utils.py b/swh/loader/package/tests/test_utils.py --- a/swh/loader/package/tests/test_utils.py +++ b/swh/loader/package/tests/test_utils.py @@ -8,6 +8,7 @@ import os from unittest.mock import MagicMock from urllib.error import URLError +from urllib.parse import quote import pytest @@ -171,6 +172,99 @@ download(url, dest=str(tmp_path)) +@pytest.mark.fs +def test_download_with_redirection(tmp_path, requests_mock): + """Download with redirection should use the targeted URL to extract filename""" + url = "https://example.org/project/requests/download" + filename = "requests-0.0.1.tar.gz" + redirection_url = f"https://example.org/project/requests/files/{filename}" + data = "this is something" + + requests_mock.get(url, status_code=302, headers={"location": redirection_url}) + requests_mock.get( + redirection_url, text=data, headers={"content-length": str(len(data))} + ) + + actual_filepath, actual_hashes = download(url, dest=str(tmp_path)) + + actual_filename = os.path.basename(actual_filepath) + assert actual_filename == filename + assert actual_hashes["length"] == len(data) + assert ( + actual_hashes["checksums"]["sha1"] == "fdd1ce606a904b08c816ba84f3125f2af44d92b2" + ) # noqa + assert ( + actual_hashes["checksums"]["sha256"] + == "1d9224378d77925d612c9f926eb9fb92850e6551def8328011b6a972323298d5" + ) + + +@pytest.mark.fs +def test_download_filename_from_content_disposition(tmp_path, requests_mock): + """Filename should be extracted from content-disposition request header + when available.""" + url = "https://example.org/download/requests/tar.gz/v0.0.1" + filename = "requests-0.0.1.tar.gz" + + data = "this is something" + + for fname in (f'"{filename}"', filename, '"filename with spaces.tar.gz"'): + requests_mock.get( + url, + text=data, + headers={ + "content-length": str(len(data)), + "content-disposition": f"attachment; filename={fname}", + }, + ) + + actual_filepath, actual_hashes = download(url, dest=str(tmp_path)) + + actual_filename = os.path.basename(actual_filepath) + assert actual_filename == fname.strip('"') + assert actual_hashes["length"] == len(data) + assert ( + actual_hashes["checksums"]["sha1"] + == "fdd1ce606a904b08c816ba84f3125f2af44d92b2" + ) # noqa + assert ( + actual_hashes["checksums"]["sha256"] + == "1d9224378d77925d612c9f926eb9fb92850e6551def8328011b6a972323298d5" + ) + + +@pytest.mark.fs +def test_download_utf8_filename_from_content_disposition(tmp_path, requests_mock): + """Filename should be extracted from content-disposition request header + when available.""" + url = "https://example.org/download/requests/tar.gz/v0.0.1" + data = "this is something" + + for fname in ('"archive école.tar.gz"', "archive_école.tgz"): + requests_mock.get( + url, + text=data, + headers={ + "content-length": str(len(data)), + "content-disposition": f"attachment; filename*=utf-8''{quote(fname)}", + }, + ) + + actual_filepath, actual_hashes = download(url, dest=str(tmp_path)) + + actual_filename = os.path.basename(actual_filepath) + assert actual_filename == fname.strip('"') + assert actual_hashes["length"] == len(data) + assert ( + actual_hashes["checksums"]["sha1"] + == "fdd1ce606a904b08c816ba84f3125f2af44d92b2" + ) # noqa + assert ( + actual_hashes["checksums"]["sha256"] + == "1d9224378d77925d612c9f926eb9fb92850e6551def8328011b6a972323298d5" + ) + + def test_api_info_failure(requests_mock): """Failure to fetch info/release information should raise""" url = "https://pypi.org/pypi/requests/json" diff --git a/swh/loader/package/utils.py b/swh/loader/package/utils.py --- a/swh/loader/package/utils.py +++ b/swh/loader/package/utils.py @@ -8,7 +8,9 @@ import itertools import logging import os +import re from typing import Callable, Dict, Optional, Tuple, TypeVar +from urllib.parse import unquote from urllib.request import urlopen import requests @@ -47,6 +49,20 @@ return response.content +def _content_disposition_filename(header: str) -> Optional[str]: + fname = None + fnames = re.findall(r"filename[\*]?=([^;]+)", header) + if fnames and "utf-8''" in fnames[0].lower(): + # RFC 5987 + fname = re.sub("utf-8''", "", fnames[0], flags=re.IGNORECASE) + fname = unquote(fname) + elif fnames: + fname = fnames[0] + if fname: + fname = fname.strip().strip('"') + return fname + + def download( url: str, dest: str, @@ -92,6 +108,14 @@ raise ValueError( "Fail to query '%s'. Reason: %s" % (url, response.status_code) ) + # update URL to response one as requests follow redirection by default + # on GET requests + url = response.url + # try to extract filename from content-disposition header if available + if filename is None and "content-disposition" in response.headers: + filename = _content_disposition_filename( + response.headers["content-disposition"] + ) response_data = response.iter_content(chunk_size=HASH_BLOCK_SIZE) filename = filename if filename else os.path.basename(url)