Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/loader.py
# Copyright (C) 2015-2022 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import hashlib | import hashlib | ||||
import logging | import logging | ||||
import os | import os | ||||
from pathlib import Path | from pathlib import Path | ||||
import tempfile | import tempfile | ||||
import time | import time | ||||
from typing import Any, ContextManager, Dict, Iterable, List, Optional, Union | from typing import Any, ContextManager, Dict, List, Optional, Union | ||||
from urllib.parse import urlparse | from urllib.parse import urlparse | ||||
from requests.exceptions import HTTPError | from requests.exceptions import HTTPError | ||||
import sentry_sdk | import sentry_sdk | ||||
from swh.core.config import load_from_envvar | from swh.core.config import load_from_envvar | ||||
from swh.core.statsd import Statsd | from swh.core.statsd import Statsd | ||||
from swh.core.tarball import uncompress | from swh.core.tarball import uncompress | ||||
from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | ||||
from swh.loader.core.utils import nix_hashes | from swh.loader.core.utils import nix_hashes | ||||
from swh.loader.exception import NotFound, UnsupportedChecksumComputation | from swh.loader.exception import NotFound, UnsupportedChecksumComputation | ||||
from swh.loader.package.utils import download | from swh.loader.package.utils import download | ||||
from swh.model import from_disk | from swh.model import from_disk | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | |||||
Content, | Content, | ||||
Directory, | Directory, | ||||
Origin, | Origin, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Release, | |||||
Revision, | |||||
Sha1Git, | Sha1Git, | ||||
SkippedContent, | SkippedContent, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
) | ) | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.algos.snapshot import snapshot_get_latest | from swh.storage.algos.snapshot import snapshot_get_latest | ||||
▲ Show 20 Lines • Show All 520 Lines • ▼ Show 20 Lines | class BaseLoader: | ||||
) -> None: | ) -> None: | ||||
"""Increments both ``{name}_sum`` (by the ``value``) and ``{name}_count`` | """Increments both ``{name}_sum`` (by the ``value``) and ``{name}_count`` | ||||
(by ``1``), allowing to prometheus to compute the average ``value`` over | (by ``1``), allowing to prometheus to compute the average ``value`` over | ||||
time.""" | time.""" | ||||
self.statsd.increment(f"{name}_sum", value, tags=tags) | self.statsd.increment(f"{name}_sum", value, tags=tags) | ||||
self.statsd.increment(f"{name}_count", tags=tags) | self.statsd.increment(f"{name}_count", tags=tags) | ||||
class DVCSLoader(BaseLoader): | |||||
"""This base class is a pattern for dvcs loaders (e.g. git, mercurial). | |||||
Those loaders are able to load all the data in one go. For example, the | |||||
loader defined in swh-loader-git :class:`BulkUpdater`. | |||||
For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), | |||||
inherit directly from :class:`BaseLoader`. | |||||
""" | |||||
def cleanup(self) -> None: | |||||
"""Clean up an eventual state installed for computations.""" | |||||
pass | |||||
def has_contents(self) -> bool: | |||||
"""Checks whether we need to load contents""" | |||||
return True | |||||
def get_contents(self) -> Iterable[BaseContent]: | |||||
"""Get the contents that need to be loaded""" | |||||
raise NotImplementedError | |||||
def has_directories(self) -> bool: | |||||
"""Checks whether we need to load directories""" | |||||
return True | |||||
def get_directories(self) -> Iterable[Directory]: | |||||
"""Get the directories that need to be loaded""" | |||||
raise NotImplementedError | |||||
def has_revisions(self) -> bool: | |||||
"""Checks whether we need to load revisions""" | |||||
return True | |||||
def get_revisions(self) -> Iterable[Revision]: | |||||
"""Get the revisions that need to be loaded""" | |||||
raise NotImplementedError | |||||
def has_releases(self) -> bool: | |||||
"""Checks whether we need to load releases""" | |||||
return True | |||||
def get_releases(self) -> Iterable[Release]: | |||||
"""Get the releases that need to be loaded""" | |||||
raise NotImplementedError | |||||
def get_snapshot(self) -> Snapshot: | |||||
"""Get the snapshot that needs to be loaded""" | |||||
raise NotImplementedError | |||||
def eventful(self) -> bool: | |||||
"""Whether the load was eventful""" | |||||
raise NotImplementedError | |||||
def store_data(self) -> None: | |||||
assert self.origin | |||||
if self.save_data_path: | |||||
self.save_data() | |||||
if self.has_contents(): | |||||
for obj in self.get_contents(): | |||||
if isinstance(obj, Content): | |||||
self.storage.content_add([obj]) | |||||
elif isinstance(obj, SkippedContent): | |||||
self.storage.skipped_content_add([obj]) | |||||
else: | |||||
raise TypeError(f"Unexpected content type: {obj}") | |||||
if self.has_directories(): | |||||
for directory in self.get_directories(): | |||||
self.storage.directory_add([directory]) | |||||
if self.has_revisions(): | |||||
for revision in self.get_revisions(): | |||||
self.storage.revision_add([revision]) | |||||
if self.has_releases(): | |||||
for release in self.get_releases(): | |||||
self.storage.release_add([release]) | |||||
snapshot = self.get_snapshot() | |||||
self.storage.snapshot_add([snapshot]) | |||||
self.flush() | |||||
self.loaded_snapshot_id = snapshot.id | |||||
class NodeLoader(BaseLoader): | class NodeLoader(BaseLoader): | ||||
"""Common class for :class:`ContentLoader` and :class:`Directoryloader`. | """Common class for :class:`ContentLoader` and :class:`Directoryloader`. | ||||
The "checksums" field is a dictionary of hex hashes on the object retrieved (content | The "checksums" field is a dictionary of hex hashes on the object retrieved (content | ||||
or directory). When "checksums_computation" is "standard", that means the checksums | or directory). When "checksums_computation" is "standard", that means the checksums | ||||
are computed on the content of the remote file to retrieve itself (as unix cli | are computed on the content of the remote file to retrieve itself (as unix cli | ||||
allows, "sha1sum", "sha256sum", ...). When "checksums_computation" is "nar", the | allows, "sha1sum", "sha256sum", ...). When "checksums_computation" is "nar", the | ||||
checks is delegated to the `nix-store --dump` command, it's actually checksums on | checks is delegated to the `nix-store --dump` command, it's actually checksums on | ||||
▲ Show 20 Lines • Show All 304 Lines • Show Last 20 Lines |