Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/utils.py
# Copyright (C) 2019-2021 The Software Heritage developers | # Copyright (C) 2019-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import copy | import copy | ||||
import functools | import functools | ||||
import itertools | import itertools | ||||
import logging | import logging | ||||
import os | import os | ||||
import re | |||||
from typing import Callable, Dict, Optional, Tuple, TypeVar | from typing import Callable, Dict, Optional, Tuple, TypeVar | ||||
from urllib.parse import unquote | |||||
from urllib.request import urlopen | from urllib.request import urlopen | ||||
import requests | import requests | ||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||
from swh.loader.package import DEFAULT_PARAMS | from swh.loader.package import DEFAULT_PARAMS | ||||
from swh.model.hashutil import HASH_BLOCK_SIZE, MultiHash | from swh.model.hashutil import HASH_BLOCK_SIZE, MultiHash | ||||
from swh.model.model import Person | from swh.model.model import Person | ||||
Show All 22 Lines | def api_info(url: str, **extra_params) -> bytes: | ||||
""" | """ | ||||
response = requests.get(url, **{**DEFAULT_PARAMS, **extra_params}) | response = requests.get(url, **{**DEFAULT_PARAMS, **extra_params}) | ||||
if response.status_code != 200: | if response.status_code != 200: | ||||
raise NotFound(f"Fail to query '{url}'. Reason: {response.status_code}") | raise NotFound(f"Fail to query '{url}'. Reason: {response.status_code}") | ||||
return response.content | return response.content | ||||
def _content_disposition_filename(header: str) -> Optional[str]: | |||||
fname = None | |||||
fnames = re.findall(r"filename[\*]?=([^;]+)", header) | |||||
if fnames and "utf-8''" in fnames[0].lower(): | |||||
# RFC 5987 | |||||
fname = re.sub("utf-8''", "", fnames[0], flags=re.IGNORECASE) | |||||
fname = unquote(fname) | |||||
elif fnames: | |||||
fname = fnames[0] | |||||
if fname: | |||||
fname = os.path.basename(fname.strip().strip('"')) | |||||
return fname | |||||
def download( | def download( | ||||
url: str, | url: str, | ||||
dest: str, | dest: str, | ||||
hashes: Dict = {}, | hashes: Dict = {}, | ||||
filename: Optional[str] = None, | filename: Optional[str] = None, | ||||
auth: Optional[Tuple[str, str]] = None, | auth: Optional[Tuple[str, str]] = None, | ||||
extra_request_headers: Optional[Dict[str, str]] = None, | extra_request_headers: Optional[Dict[str, str]] = None, | ||||
) -> Tuple[str, Dict]: | ) -> Tuple[str, Dict]: | ||||
Show All 29 Lines | if url.startswith("ftp://"): | ||||
chunks = (response.read(HASH_BLOCK_SIZE) for _ in itertools.count()) | chunks = (response.read(HASH_BLOCK_SIZE) for _ in itertools.count()) | ||||
response_data = itertools.takewhile(bool, chunks) | response_data = itertools.takewhile(bool, chunks) | ||||
else: | else: | ||||
response = requests.get(url, **params, timeout=timeout, stream=True) | response = requests.get(url, **params, timeout=timeout, stream=True) | ||||
if response.status_code != 200: | if response.status_code != 200: | ||||
raise ValueError( | raise ValueError( | ||||
"Fail to query '%s'. Reason: %s" % (url, response.status_code) | "Fail to query '%s'. Reason: %s" % (url, response.status_code) | ||||
) | ) | ||||
# update URL to response one as requests follow redirection by default | |||||
# on GET requests | |||||
url = response.url | |||||
# try to extract filename from content-disposition header if available | |||||
if filename is None and "content-disposition" in response.headers: | |||||
filename = _content_disposition_filename( | |||||
olasd: The first one will match until the next double quote. The second one will stop at the first… | |||||
response.headers["content-disposition"] | |||||
) | |||||
response_data = response.iter_content(chunk_size=HASH_BLOCK_SIZE) | response_data = response.iter_content(chunk_size=HASH_BLOCK_SIZE) | ||||
filename = filename if filename else os.path.basename(url) | filename = filename if filename else os.path.basename(url) | ||||
logger.debug("filename: %s", filename) | logger.debug("filename: %s", filename) | ||||
filepath = os.path.join(dest, filename) | filepath = os.path.join(dest, filename) | ||||
logger.debug("filepath: %s", filepath) | logger.debug("filepath: %s", filepath) | ||||
h = MultiHash(hash_names=DOWNLOAD_HASHES) | h = MultiHash(hash_names=DOWNLOAD_HASHES) | ||||
▲ Show 20 Lines • Show All 58 Lines • Show Last 20 Lines |
The first one will match until the next double quote. The second one will stop at the first space in the header (which may happen depending how broken it is).