Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/utils.py
# Copyright (C) 2019-2021 The Software Heritage developers | # Copyright (C) 2019-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import copy | import copy | ||||
import functools | import functools | ||||
import itertools | |||||
import logging | import logging | ||||
import os | import os | ||||
from typing import Callable, Dict, Optional, Tuple, TypeVar | from typing import Callable, Dict, Optional, Tuple, TypeVar | ||||
from urllib.request import urlopen | |||||
import requests | import requests | ||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||
from swh.loader.package import DEFAULT_PARAMS | from swh.loader.package import DEFAULT_PARAMS | ||||
from swh.model.hashutil import HASH_BLOCK_SIZE, MultiHash | from swh.model.hashutil import HASH_BLOCK_SIZE, MultiHash | ||||
from swh.model.model import Person | from swh.model.model import Person | ||||
▲ Show 20 Lines • Show All 55 Lines • ▼ Show 20 Lines | ) -> Tuple[str, Dict]: | ||||
""" | """ | ||||
params = copy.deepcopy(DEFAULT_PARAMS) | params = copy.deepcopy(DEFAULT_PARAMS) | ||||
if auth is not None: | if auth is not None: | ||||
params["auth"] = auth | params["auth"] = auth | ||||
if extra_request_headers is not None: | if extra_request_headers is not None: | ||||
params["headers"].update(extra_request_headers) | params["headers"].update(extra_request_headers) | ||||
# so the connection does not hang indefinitely (read/connection timeout) | # so the connection does not hang indefinitely (read/connection timeout) | ||||
timeout = params.get("timeout", 60) | timeout = params.get("timeout", 60) | ||||
if url.startswith("ftp://"): | |||||
response = urlopen(url, timeout=timeout) | |||||
vlorentz: What about something like this to keep it streaming? (the `response` will be closed when… | |||||
Done Inline ActionsNice, just tested in docker and it works great, will update the diff then. anlambert: Nice, just tested in docker and it works great, will update the diff then. | |||||
Not Done Inline ActionsWe should call response.close() in both cases so the connection gets properly returned to the pool. We can do that after we're done streaming it (before checking the hashes). olasd: We should call response.close() in both cases so the connection gets properly returned to the… | |||||
chunks = (response.read(HASH_BLOCK_SIZE) for _ in itertools.count()) | |||||
response_data = itertools.takewhile(bool, chunks) | |||||
else: | |||||
response = requests.get(url, **params, timeout=timeout, stream=True) | response = requests.get(url, **params, timeout=timeout, stream=True) | ||||
if response.status_code != 200: | if response.status_code != 200: | ||||
raise ValueError("Fail to query '%s'. Reason: %s" % (url, response.status_code)) | raise ValueError( | ||||
"Fail to query '%s'. Reason: %s" % (url, response.status_code) | |||||
) | |||||
response_data = response.iter_content(chunk_size=HASH_BLOCK_SIZE) | |||||
filename = filename if filename else os.path.basename(url) | filename = filename if filename else os.path.basename(url) | ||||
logger.debug("filename: %s", filename) | logger.debug("filename: %s", filename) | ||||
filepath = os.path.join(dest, filename) | filepath = os.path.join(dest, filename) | ||||
logger.debug("filepath: %s", filepath) | logger.debug("filepath: %s", filepath) | ||||
h = MultiHash(hash_names=DOWNLOAD_HASHES) | h = MultiHash(hash_names=DOWNLOAD_HASHES) | ||||
with open(filepath, "wb") as f: | with open(filepath, "wb") as f: | ||||
for chunk in response.iter_content(chunk_size=HASH_BLOCK_SIZE): | for chunk in response_data: | ||||
h.update(chunk) | h.update(chunk) | ||||
f.write(chunk) | f.write(chunk) | ||||
response.close() | |||||
Not Done Inline Actions(add response.close() here) olasd: (add `response.close()` here) | |||||
# Also check the expected hashes if provided | # Also check the expected hashes if provided | ||||
if hashes: | if hashes: | ||||
actual_hashes = h.hexdigest() | actual_hashes = h.hexdigest() | ||||
for algo_hash in hashes.keys(): | for algo_hash in hashes.keys(): | ||||
actual_digest = actual_hashes[algo_hash] | actual_digest = actual_hashes[algo_hash] | ||||
expected_digest = hashes[algo_hash] | expected_digest = hashes[algo_hash] | ||||
if actual_digest != expected_digest: | if actual_digest != expected_digest: | ||||
raise ValueError( | raise ValueError( | ||||
▲ Show 20 Lines • Show All 43 Lines • Show Last 20 Lines |
What about something like this to keep it streaming? (the response will be closed when decrefed anyway)