Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/loader.py
# Copyright (C) 2015-2022 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||||||
import base64 | |||||||||||||
import datetime | import datetime | ||||||||||||
import hashlib | import hashlib | ||||||||||||
import logging | import logging | ||||||||||||
import os | import os | ||||||||||||
import time | import time | ||||||||||||
from typing import Any, ContextManager, Dict, Iterable, List, Optional, Union | from typing import Any, ContextManager, Dict, Iterable, List, Optional, Union | ||||||||||||
from urllib.parse import urlparse | |||||||||||||
import sentry_sdk | import sentry_sdk | ||||||||||||
from swh.core.config import load_from_envvar | from swh.core.config import load_from_envvar | ||||||||||||
from swh.core.statsd import Statsd | from swh.core.statsd import Statsd | ||||||||||||
from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | ||||||||||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||||||||||
from swh.loader.package.utils import get_url_body | |||||||||||||
from swh.model.model import ( | from swh.model.model import ( | ||||||||||||
BaseContent, | BaseContent, | ||||||||||||
Content, | Content, | ||||||||||||
Directory, | Directory, | ||||||||||||
Origin, | Origin, | ||||||||||||
OriginVisit, | OriginVisit, | ||||||||||||
OriginVisitStatus, | OriginVisitStatus, | ||||||||||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||||||||||
Release, | Release, | ||||||||||||
Revision, | Revision, | ||||||||||||
Sha1Git, | Sha1Git, | ||||||||||||
SkippedContent, | SkippedContent, | ||||||||||||
Snapshot, | Snapshot, | ||||||||||||
SnapshotBranch, | |||||||||||||
TargetType, | |||||||||||||
) | ) | ||||||||||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||||||||||
from swh.storage.algos.snapshot import snapshot_get_latest | |||||||||||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||||||||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||||||||||
DEFAULT_CONFIG: Dict[str, Any] = { | DEFAULT_CONFIG: Dict[str, Any] = { | ||||||||||||
"max_content_size": 100 * 1024 * 1024, | "max_content_size": 100 * 1024 * 1024, | ||||||||||||
} | } | ||||||||||||
SENTRY_ORIGIN_URL_TAG_NAME = "swh.loader.origin_url" | SENTRY_ORIGIN_URL_TAG_NAME = "swh.loader.origin_url" | ||||||||||||
▲ Show 20 Lines • Show All 226 Lines • ▼ Show 20 Lines | def process_data(self) -> bool: | ||||||||||||
Returns: | Returns: | ||||||||||||
a value that is interpreted as a boolean. If True, fetch_data needs | a value that is interpreted as a boolean. If True, fetch_data needs | ||||||||||||
to be called again to complete loading. | to be called again to complete loading. | ||||||||||||
Ignored if ``fetch_data`` already returned :const:`False`. | Ignored if ``fetch_data`` already returned :const:`False`. | ||||||||||||
""" | """ | ||||||||||||
return True | return True | ||||||||||||
def store_data(self): | def store_data(self) -> None: | ||||||||||||
"""Store fetched data in the database. | """Store fetched data in the database. | ||||||||||||
Should call the :func:`maybe_load_xyz` methods, which handle the | Should call the :func:`maybe_load_xyz` methods, which handle the | ||||||||||||
bundles sent to storage, rather than send directly. | bundles sent to storage, rather than send directly. | ||||||||||||
""" | """ | ||||||||||||
raise NotImplementedError | raise NotImplementedError | ||||||||||||
def load_status(self) -> Dict[str, str]: | def load_status(self) -> Dict[str, str]: | ||||||||||||
▲ Show 20 Lines • Show All 350 Lines • ▼ Show 20 Lines | def store_data(self) -> None: | ||||||||||||
self.storage.revision_add([revision]) | self.storage.revision_add([revision]) | ||||||||||||
if self.has_releases(): | if self.has_releases(): | ||||||||||||
for release in self.get_releases(): | for release in self.get_releases(): | ||||||||||||
self.storage.release_add([release]) | self.storage.release_add([release]) | ||||||||||||
snapshot = self.get_snapshot() | snapshot = self.get_snapshot() | ||||||||||||
self.storage.snapshot_add([snapshot]) | self.storage.snapshot_add([snapshot]) | ||||||||||||
self.flush() | self.flush() | ||||||||||||
self.loaded_snapshot_id = snapshot.id | self.loaded_snapshot_id = snapshot.id | ||||||||||||
class ContentLoader(BaseLoader): | |||||||||||||
"""Basic loader for edge case content ingestion. | |||||||||||||
The "integrity" field is a normalized information about the checksum used and the | |||||||||||||
corresponding base64 hash encoded value of the content. | |||||||||||||
The multiple "fallback" urls received for the same content are mirror urls so no | |||||||||||||
need to keep those. We only use them to fetch the actual content if the main origin | |||||||||||||
is no longer available. | |||||||||||||
The output snapshot is of the form: | |||||||||||||
vlorentz: why not like this? | |||||||||||||
Done Inline Actions+1 ardumont: +1 | |||||||||||||
Done Inline Actions+1 ardumont: +1 | |||||||||||||
.. code:: | |||||||||||||
id: <bytes> | |||||||||||||
branches: | |||||||||||||
HEAD: | |||||||||||||
target_type: content | |||||||||||||
target: <content-id> | |||||||||||||
""" | |||||||||||||
Done Inline Actions
might work vlorentz: might work | |||||||||||||
Done Inline Actionsit works! ardumont: it works! | |||||||||||||
visit_type = "content" | |||||||||||||
def __init__( | |||||||||||||
self, *args, integrity: str, fallback_urls: List[str] = None, **kwargs | |||||||||||||
): | |||||||||||||
super().__init__(*args, **kwargs) | |||||||||||||
self.fallback_urls = fallback_urls or [] | |||||||||||||
self.integrity: str = integrity | |||||||||||||
self.content: Optional[Content] = None | |||||||||||||
self.snapshot: Optional[Snapshot] = None | |||||||||||||
self.last_snapshot: Optional[Snapshot] = None | |||||||||||||
def prepare(self) -> None: | |||||||||||||
self.last_snapshot = snapshot_get_latest(self.storage, self.origin.url) | |||||||||||||
def fetch_data(self) -> bool: | |||||||||||||
Done Inline Actions
And why .lower()? The grammar in the spec is: hash-algo = "sha256" / "sha384" / "sha512" vlorentz: And why `.lower()`? The grammar in the spec is:
```
hash-algo = "sha256" / "sha384" /… | |||||||||||||
Done Inline Actionssure, i recalled having seen some in upper case. simpler if no need for it. ardumont: sure, i recalled having seen some in upper case. simpler if no need for it. | |||||||||||||
"""Retrieve the content file as a Content Object""" | |||||||||||||
urls = {self.origin.url, *self.fallback_urls} | |||||||||||||
# Determine the content checksum stored in the integrity field | |||||||||||||
# hash-<b64-encoded-checksum> | |||||||||||||
# https://w3c.github.io/webappsec-subresource-integrity/#grammardef-hash-algo | |||||||||||||
hash_algo, hash_value_b64 = self.integrity.split("-") | |||||||||||||
expected_checksum = base64.decodebytes(hash_value_b64.encode()) | |||||||||||||
data: Optional[bytes] = None | |||||||||||||
for url in urls: | |||||||||||||
url_ = urlparse(url) | |||||||||||||
self.log.debug( | |||||||||||||
"prepare; origin_url=%s fallback=%s scheme=%s path=%s", | |||||||||||||
Done Inline ActionsI don't understand what this means vlorentz: I don't understand what this means | |||||||||||||
self.origin.url, | |||||||||||||
url, | |||||||||||||
url_.scheme, | |||||||||||||
url_.path, | |||||||||||||
) | |||||||||||||
try: | |||||||||||||
data = get_url_body(url) | |||||||||||||
self.content = Content.from_data(data) | |||||||||||||
Done Inline ActionsDoesn't work with trailing slashes: >>> os.path.basename("https://sh.rustup.rs/") '' vlorentz: Doesn't work with trailing slashes:
```
>>> os.path.basename("https://sh.rustup.rs/")
''
``` | |||||||||||||
Done Inline ActionsThis makes me think that we should just keep the full URI as origin, and just have the snapshot HEAD point directly to the content object. olasd: This makes me think that we should just keep the full URI as origin, and just have the snapshot… | |||||||||||||
Done Inline Actionsdone as olasd suggested here and dropped it. ardumont: done as olasd suggested here and dropped it. | |||||||||||||
# Ensure content received matched the integrity field received | |||||||||||||
actual_checksum = self.content.get_hash(hash_algo) | |||||||||||||
if actual_checksum == expected_checksum: | |||||||||||||
# match, we have found our content to ingest, exit loop | |||||||||||||
break | |||||||||||||
# otherwise continue | |||||||||||||
Done Inline Actions
vlorentz: | |||||||||||||
except NotFound: | |||||||||||||
continue | |||||||||||||
Done Inline Actionsthat's the right one vlorentz: that's the right one | |||||||||||||
if not self.content: | |||||||||||||
raise NotFound(f"Unknown origin {self.origin.url}.") | |||||||||||||
return False # no more data to fetch | |||||||||||||
Done Inline Actions
vlorentz: | |||||||||||||
def process_data(self) -> bool: | |||||||||||||
"""Build the snapshot out of the Content retrieved.""" | |||||||||||||
assert self.content is not None | |||||||||||||
self.snapshot = Snapshot( | |||||||||||||
branches={ | |||||||||||||
b"HEAD": SnapshotBranch( | |||||||||||||
target=self.content.sha1_git, | |||||||||||||
target_type=TargetType.CONTENT, | |||||||||||||
), | |||||||||||||
} | |||||||||||||
) | |||||||||||||
return False # no more data to process | |||||||||||||
def store_data(self) -> None: | |||||||||||||
"""Store newly retrieved Content and Snapshot.""" | |||||||||||||
assert self.content is not None | |||||||||||||
self.storage.content_add([self.content]) | |||||||||||||
assert self.snapshot is not None | |||||||||||||
self.storage.snapshot_add([self.snapshot]) | |||||||||||||
self.loaded_snapshot_id = self.snapshot.id | |||||||||||||
def visit_status(self): | |||||||||||||
return "full" if self.content and self.snapshot is not None else "partial" | |||||||||||||
def load_status(self) -> Dict[str, Any]: | |||||||||||||
return { | |||||||||||||
"status": "uneventful" | |||||||||||||
if self.last_snapshot == self.snapshot | |||||||||||||
else "eventful" | |||||||||||||
} | |||||||||||||
def cleanup(self) -> None: | |||||||||||||
self.log.debug("cleanup") |
why not like this?