Changeset View
Standalone View
swh/loader/core/loader.py
# Copyright (C) 2015-2022 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import base64 | |||||
import datetime | import datetime | ||||
import hashlib | import hashlib | ||||
import logging | import logging | ||||
import os | import os | ||||
import tempfile | import tempfile | ||||
import time | import time | ||||
from typing import Any, ContextManager, Dict, Iterable, List, Optional, Union | from typing import Any, ContextManager, Dict, Iterable, List, Optional, Union | ||||
from urllib.parse import urlparse | from urllib.parse import urlparse | ||||
from requests.exceptions import HTTPError | from requests.exceptions import HTTPError | ||||
import sentry_sdk | import sentry_sdk | ||||
from swh.core.config import load_from_envvar | from swh.core.config import load_from_envvar | ||||
from swh.core.statsd import Statsd | from swh.core.statsd import Statsd | ||||
from swh.core.tarball import uncompress | from swh.core.tarball import uncompress | ||||
from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | ||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||
from swh.loader.package.utils import download, get_url_body | from swh.loader.package.utils import download | ||||
from swh.model import from_disk | from swh.model import from_disk | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | BaseContent, | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
Origin, | Origin, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
▲ Show 20 Lines • Show All 615 Lines • ▼ Show 20 Lines | def store_data(self) -> None: | ||||
self.storage.release_add([release]) | self.storage.release_add([release]) | ||||
snapshot = self.get_snapshot() | snapshot = self.get_snapshot() | ||||
self.storage.snapshot_add([snapshot]) | self.storage.snapshot_add([snapshot]) | ||||
self.flush() | self.flush() | ||||
self.loaded_snapshot_id = snapshot.id | self.loaded_snapshot_id = snapshot.id | ||||
class NodeLoader(BaseLoader): | class NodeLoader(BaseLoader): | ||||
"""Common class for Content and Directory loaders. | """Common class for :class:`ContentLoader` and :class:`Directoryloader`. | ||||
The "integrity" field is a normalized information about the checksum used and the | The "checksums" field is a dictionary of hex hashes on the object retrieved (content | ||||
corresponding base64 hash encoded value of the object retrieved (content or | or directory). | ||||
directory). | |||||
The multiple "fallback" urls received are mirror urls so no need to keep those. We | The multiple "fallback" urls received are mirror urls only used to fetch the object | ||||
only use them to fetch the actual object if the main origin is no longer available. | if the main origin is no longer available. Those are not stored. | ||||
Ingestion is considered eventful on the first ingestion. Subsequent load of the same | |||||
object should end up being an uneventful visit (matching snapshot). | |||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, *args, integrity: str, fallback_urls: List[str] = None, **kwargs | self, | ||||
*args, | |||||
checksums: Dict[str, str], | |||||
fallback_urls: List[str] = None, | |||||
**kwargs, | |||||
): | ): | ||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
self.snapshot: Optional[Snapshot] = None | self.snapshot: Optional[Snapshot] = None | ||||
# Determine the content checksum stored in the integrity field | self.checksums = checksums | ||||
# hash-<b64-encoded-checksum> | |||||
# https://w3c.github.io/webappsec-subresource-integrity/#grammardef-hash-algo | |||||
self.checksum_algo, checksum_value_b64 = integrity.split("-") | |||||
self.expected_checksum: bytes = base64.decodebytes(checksum_value_b64.encode()) | |||||
fallback_urls_ = fallback_urls or [] | fallback_urls_ = fallback_urls or [] | ||||
self.mirror_urls: List[str] = [self.origin.url, *fallback_urls_] | self.mirror_urls: List[str] = [self.origin.url, *fallback_urls_] | ||||
def prepare(self) -> None: | def prepare(self) -> None: | ||||
self.last_snapshot = snapshot_get_latest(self.storage, self.origin.url) | self.last_snapshot = snapshot_get_latest(self.storage, self.origin.url) | ||||
def load_status(self) -> Dict[str, Any]: | def load_status(self) -> Dict[str, Any]: | ||||
return { | return { | ||||
Show All 23 Lines | class ContentLoader(NodeLoader): | ||||
visit_type = "content" | visit_type = "content" | ||||
def __init__(self, *args, **kwargs): | def __init__(self, *args, **kwargs): | ||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
self.content: Optional[Content] = None | self.content: Optional[Content] = None | ||||
def fetch_data(self) -> bool: | def fetch_data(self) -> bool: | ||||
"""Retrieve the content file as a Content Object""" | """Retrieve the content file as a Content Object""" | ||||
vlorentz: why is self.expected_checksum hexadecimal bytes? | |||||
Done Inline Actionsgood question... That's coming from the instruction line 677 which was the only way i saw that matched types: self.expected_checksum: bytes = base64.decodebytes(checksum_value_b64.encode()) ardumont: good question...
That's coming from the instruction line 677 which was the only way i saw that… | |||||
Done Inline ActionsI must have made a mistake during my hash computation switch when i changed to a simpler content file (than the original asdf one). ardumont: I must have made a mistake during my hash computation switch when i changed to a simpler… | |||||
data: Optional[bytes] = None | |||||
for url in self.mirror_urls: | for url in self.mirror_urls: | ||||
url_ = urlparse(url) | url_ = urlparse(url) | ||||
self.log.debug( | self.log.debug( | ||||
"prepare; origin_url=%s fallback=%s scheme=%s path=%s", | "prepare; origin_url=%s fallback=%s scheme=%s path=%s", | ||||
self.origin.url, | self.origin.url, | ||||
url, | url, | ||||
url_.scheme, | url_.scheme, | ||||
url_.path, | url_.path, | ||||
) | ) | ||||
try: | try: | ||||
data = get_url_body(url) | with tempfile.TemporaryDirectory() as tmpdir: | ||||
self.content = Content.from_data(data) | file_path, _ = download(url, dest=tmpdir, hashes=self.checksums) | ||||
with open(file_path, "rb") as file: | |||||
# Ensure content received matched the integrity field received | self.content = Content.from_data(file.read()) | ||||
actual_checksum = self.content.get_hash(self.checksum_algo) | except HTTPError as http_error: | ||||
if actual_checksum == self.expected_checksum: | if http_error.response.status_code == 404: | ||||
# match, we have found our content to ingest, exit loop | self.log.debug( | ||||
break | "Not found '%s', continue on next mirror url if any", url | ||||
# otherwise continue | ) | ||||
except NotFound: | |||||
continue | continue | ||||
else: | |||||
return False # no more data to fetch | |||||
if not self.content: | # If we reach this point, we did not find any proper content, consider the | ||||
Not Done Inline ActionsI think you could reuse the download function (whose documentation is incorrect, need to fix this) instead of using get_url_body. This way you do not have to reimplement checking of hash values. anlambert: I think you could reuse the `download` function (whose documentation is incorrect, need to fix… | |||||
Done Inline ActionsI tried that but iirc that failed (i don't recall why). Will give it another go (as i really don't remember what the issue was ¯\_(ツ)_/¯). ardumont: I tried that but iirc that failed (i don't recall why).
As we are downloading raw file here… | |||||
Not Done Inline ActionsFor the record, this seems to work: diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py index b55a85f..9af55cf 100644 --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -716,7 +716,6 @@ class ContentLoader(NodeLoader): def fetch_data(self) -> bool: """Retrieve the content file as a Content Object""" - data: Optional[bytes] = None for url in self.mirror_urls: url_ = urlparse(url) self.log.debug( @@ -727,36 +726,19 @@ class ContentLoader(NodeLoader): url_.path, ) try: - data = get_url_body(url) + with tempfile.TemporaryDirectory() as tmpdir: + file_path, _ = download(url, dest=tmpdir, hashes=self.checksums) + with open(file_path, "rb") as file: + self.content = Content.from_data(file.read()) # otherwise continue - except NotFound: - self.log.debug("Not found %s, continue on next mirror url if any", url) - continue - - checksum_algos = {c for c in self.checksums.keys()} - content_d = MultiHash.from_data( - data, hash_names=DEFAULT_ALGORITHMS | checksum_algos - ).digest() - - # We now need to determine if we found a matching content (matching the - # checksums provided). As an implementation detail, we must drop the - # checksums that are unsupported by the Content Model (e.g. sha512) - found: bool = False - for checksum_algo in checksum_algos: - if checksum_algo not in DEFAULT_ALGORITHMS: - # We must drop this unssuported checksum algo from the Content model - actual_checksum = content_d.pop(checksum_algo) - else: - actual_checksum = content_d[checksum_algo] - - # and check whether we found a correct content matching the checksum - found = actual_checksum == hash_to_bytes(self.checksums[checksum_algo]) - - if found: - # We have a match, we have our content to ingest - content_d["data"] = data - content_d["length"] = len(data) - self.content = Content.from_dict(content_d) + except HTTPError as http_error: + if http_error.response.status_code == 404: + self.log.debug( + "Not found %s, continue on next mirror url if any", url + ) + continue + raise + else: # we are done, no more data to fetch return False anlambert: For the record, this seems to work:
```
diff --git a/swh/loader/core/loader.py… | |||||
Done Inline ActionsAwesome, i was rebasing my stuff and getting back to it. ardumont: Awesome, i was rebasing my stuff and getting back to it.
You gain me some time, thx. | |||||
Done Inline Actionsset(checksum.keys()) is more readable imho anlambert: `set(checksum.keys())` is more readable imho | |||||
Done Inline Actionssure (idk why i did it that way, must have been more convoluted at some point). ardumont: sure (idk why i did it that way, must have been more convoluted at some point). | |||||
# origin not found | |||||
raise NotFound(f"Unknown origin {self.origin.url}.") | raise NotFound(f"Unknown origin {self.origin.url}.") | ||||
return False # no more data to fetch | |||||
def process_data(self) -> bool: | def process_data(self) -> bool: | ||||
"""Build the snapshot out of the Content retrieved.""" | """Build the snapshot out of the Content retrieved.""" | ||||
assert self.content is not None | assert self.content is not None | ||||
self.snapshot = Snapshot( | self.snapshot = Snapshot( | ||||
branches={ | branches={ | ||||
b"HEAD": SnapshotBranch( | b"HEAD": SnapshotBranch( | ||||
target=self.content.sha1_git, | target=self.content.sha1_git, | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def __init__(self, *args, **kwargs): | ||||
self.dirs: List[Directory] = None | self.dirs: List[Directory] = None | ||||
def fetch_data(self) -> bool: | def fetch_data(self) -> bool: | ||||
"""Fetch directory as a tarball amongst the self.mirror_urls. | """Fetch directory as a tarball amongst the self.mirror_urls. | ||||
Raises NotFound if no tarball is found | Raises NotFound if no tarball is found | ||||
""" | """ | ||||
expected_checksum_hashhex = self.expected_checksum.decode("utf-8") | |||||
for url in self.mirror_urls: | for url in self.mirror_urls: | ||||
url_ = urlparse(url) | url_ = urlparse(url) | ||||
self.log.debug( | self.log.debug( | ||||
"prepare; origin_url=%s fallback=%s scheme=%s path=%s", | "prepare; origin_url=%s fallback=%s scheme=%s path=%s", | ||||
self.origin.url, | self.origin.url, | ||||
url, | url, | ||||
url_.scheme, | url_.scheme, | ||||
url_.path, | url_.path, | ||||
) | ) | ||||
with tempfile.TemporaryDirectory() as tmpdir: | with tempfile.TemporaryDirectory() as tmpdir: | ||||
try: | try: | ||||
tarball_path, extrinsic_metadata = download( | tarball_path, extrinsic_metadata = download( | ||||
url, | url, | ||||
tmpdir, | tmpdir, | ||||
# Ensure content received matched the integrity field received | # Ensure content received matched the checksums received | ||||
hashes={self.checksum_algo: expected_checksum_hashhex}, | hashes=self.checksums, | ||||
extra_request_headers={"Accept-Encoding": "identity"}, | extra_request_headers={"Accept-Encoding": "identity"}, | ||||
) | ) | ||||
except ValueError as e: | except ValueError as e: | ||||
# Checksum mismatch | # Checksum mismatch | ||||
self.log.debug("Error: %s", e) | self.log.debug("Error: %s", e) | ||||
continue | continue | ||||
except HTTPError: | except HTTPError as http_error: | ||||
if http_error.response.status_code == 404: | |||||
self.log.debug( | self.log.debug( | ||||
"Not found %s, continue on next mirror url if any", url | "Not found '%s', continue on next mirror url if any", url | ||||
) | ) | ||||
# mirror url not found, continue on the next mirror url if any | |||||
continue | continue | ||||
directory_path = os.path.join(tmpdir, "src") | directory_path = os.path.join(tmpdir, "src") | ||||
os.makedirs(directory_path, exist_ok=True) | os.makedirs(directory_path, exist_ok=True) | ||||
uncompress(tarball_path, dest=directory_path) | uncompress(tarball_path, dest=directory_path) | ||||
self.log.debug("uncompressed path to directory: %s", directory_path) | self.log.debug("uncompressed path to directory: %s", directory_path) | ||||
▲ Show 20 Lines • Show All 46 Lines • Show Last 20 Lines |
why is self.expected_checksum hexadecimal bytes?