diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -8,17 +8,21 @@ import hashlib import logging import os +import tempfile import time from typing import Any, ContextManager, Dict, Iterable, List, Optional, Union from urllib.parse import urlparse +from requests.exceptions import HTTPError import sentry_sdk from swh.core.config import load_from_envvar from swh.core.statsd import Statsd +from swh.core.tarball import uncompress from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister from swh.loader.exception import NotFound -from swh.loader.package.utils import get_url_body +from swh.loader.package.utils import download, get_url_body +from swh.model import from_disk from swh.model.model import ( BaseContent, Content, @@ -648,15 +652,47 @@ self.loaded_snapshot_id = snapshot.id -class ContentLoader(BaseLoader): - """Basic loader for edge case content ingestion. +class NodeLoader(BaseLoader): + """Common class for Content and Directory loaders. The "integrity" field is a normalized information about the checksum used and the - corresponding base64 hash encoded value of the content. + corresponding base64 hash encoded value of the object retrieved (content or + directory). + + The multiple "fallback" urls received are mirror urls so no need to keep those. We + only use them to fetch the actual object if the main origin is no longer available. + + """ + + def __init__( + self, *args, integrity: str, fallback_urls: List[str] = None, **kwargs + ): + super().__init__(*args, **kwargs) + self.snapshot: Optional[Snapshot] = None + # Determine the content checksum stored in the integrity field + # hash- + # https://w3c.github.io/webappsec-subresource-integrity/#grammardef-hash-algo + self.checksum_algo, checksum_value_b64 = integrity.split("-") + self.expected_checksum: bytes = base64.decodebytes(checksum_value_b64.encode()) + fallback_urls_ = fallback_urls or [] + self.mirror_urls: List[str] = [self.origin.url, *fallback_urls_] + + def prepare(self) -> None: + self.last_snapshot = snapshot_get_latest(self.storage, self.origin.url) + + def load_status(self) -> Dict[str, Any]: + return { + "status": "uneventful" + if self.last_snapshot == self.snapshot + else "eventful" + } + + def cleanup(self) -> None: + self.log.debug("cleanup") + - The multiple "fallback" urls received for the same content are mirror urls so no - need to keep those. We only use them to fetch the actual content if the main origin - is no longer available. +class ContentLoader(NodeLoader): + """Basic loader for edge case content ingestion. The output snapshot is of the form: @@ -672,29 +708,14 @@ visit_type = "content" - def __init__( - self, *args, integrity: str, fallback_urls: List[str] = None, **kwargs - ): + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.fallback_urls = fallback_urls or [] - self.integrity: str = integrity self.content: Optional[Content] = None - self.snapshot: Optional[Snapshot] = None - self.last_snapshot: Optional[Snapshot] = None - - def prepare(self) -> None: - self.last_snapshot = snapshot_get_latest(self.storage, self.origin.url) def fetch_data(self) -> bool: """Retrieve the content file as a Content Object""" - urls = {self.origin.url, *self.fallback_urls} - # Determine the content checksum stored in the integrity field - # hash- - # https://w3c.github.io/webappsec-subresource-integrity/#grammardef-hash-algo - hash_algo, hash_value_b64 = self.integrity.split("-") - expected_checksum = base64.decodebytes(hash_value_b64.encode()) data: Optional[bytes] = None - for url in urls: + for url in self.mirror_urls: url_ = urlparse(url) self.log.debug( "prepare; origin_url=%s fallback=%s scheme=%s path=%s", @@ -708,8 +729,8 @@ self.content = Content.from_data(data) # Ensure content received matched the integrity field received - actual_checksum = self.content.get_hash(hash_algo) - if actual_checksum == expected_checksum: + actual_checksum = self.content.get_hash(self.checksum_algo) + if actual_checksum == self.expected_checksum: # match, we have found our content to ingest, exit loop break # otherwise continue @@ -747,12 +768,116 @@ def visit_status(self): return "full" if self.content and self.snapshot is not None else "partial" - def load_status(self) -> Dict[str, Any]: - return { - "status": "uneventful" - if self.last_snapshot == self.snapshot - else "eventful" - } - def cleanup(self) -> None: - self.log.debug("cleanup") +class DirectoryLoader(NodeLoader): + """Basic loader for edge case directory ingestion (through one tarball). + + The output snapshot is of the form: + + .. code:: + + id: + branches: + HEAD: + target_type: directory + target: + + """ + + visit_type = "directory" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.directory: Optional[from_disk.Directory] = None + self.cnts: List[Content] = None + self.skipped_cnts: List[SkippedContent] = None + self.dirs: List[Directory] = None + + def fetch_data(self) -> bool: + """Fetch directory as a tarball amongst the self.mirror_urls. + + Raises NotFound if no tarball is found + + """ + expected_checksum_hashhex = self.expected_checksum.decode("utf-8") + for url in self.mirror_urls: + url_ = urlparse(url) + self.log.debug( + "prepare; origin_url=%s fallback=%s scheme=%s path=%s", + self.origin.url, + url, + url_.scheme, + url_.path, + ) + with tempfile.TemporaryDirectory() as tmpdir: + try: + tarball_path, extrinsic_metadata = download( + url, + tmpdir, + # Ensure content received matched the integrity field received + hashes={self.checksum_algo: expected_checksum_hashhex}, + extra_request_headers={"Accept-Encoding": "identity"}, + ) + except ValueError as e: + # Checksum mismatch + self.log.debug("Error: %s", e) + continue + except HTTPError: + self.log.debug( + "Not found %s, continue on next mirror url if any", url + ) + # mirror url not found, continue on the next mirror url if any + continue + + directory_path = os.path.join(tmpdir, "src") + os.makedirs(directory_path, exist_ok=True) + uncompress(tarball_path, dest=directory_path) + + self.log.debug("uncompressed path to directory: %s", directory_path) + + self.directory = from_disk.Directory.from_disk( + path=directory_path.encode("utf-8"), + max_content_length=self.max_content_size, + ) + # Compute the merkle dag from the top-level directory + self.cnts, self.skipped_cnts, self.dirs = from_disk.iter_directory( + self.directory + ) + + if self.directory is not None: + return False # no more data to fetch + + # if we reach here, we did not find any proper tarball, so consider the origin + # not found + raise NotFound(f"Unknown origin {self.origin.url}.") + + def process_data(self) -> bool: + """Build the snapshot out of the Directory retrieved.""" + + assert self.directory is not None + # Build the snapshot + self.snapshot = Snapshot( + branches={ + b"HEAD": SnapshotBranch( + target=self.directory.hash, + target_type=TargetType.DIRECTORY, + ), + } + ) + + return False # no more data to process + + def store_data(self) -> None: + """Store newly retrieved Content and Snapshot.""" + self.log.debug("Number of skipped contents: %s", len(self.skipped_cnts)) + self.storage.skipped_content_add(self.skipped_cnts) + self.log.debug("Number of contents: %s", len(self.cnts)) + self.storage.content_add(self.cnts) + self.log.debug("Number of directories: %s", len(self.dirs)) + self.storage.directory_add(self.dirs) + assert self.snapshot is not None + self.storage.snapshot_add([self.snapshot]) + self.loaded_snapshot_id = self.snapshot.id + + def visit_status(self): + return "full" if self.directory and self.snapshot is not None else "partial" diff --git a/swh/loader/core/tests/data/https_example.org/archives_dummy-hello.tar.gz b/swh/loader/core/tests/data/https_example.org/archives_dummy-hello.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 GIT binary patch literal 0 Hc$@