Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/loader.py
| # Copyright (C) 2015-2022 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||
| # See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
| # License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
| # See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
| import datetime | import datetime | ||||
| import hashlib | import hashlib | ||||
| import logging | import logging | ||||
| import os | import os | ||||
| from pathlib import Path | from pathlib import Path | ||||
| import tempfile | import tempfile | ||||
| import time | import time | ||||
| from typing import Any, ContextManager, Dict, Iterable, List, Optional, Union | from typing import Any, ContextManager, Dict, List, Optional, Union | ||||
| from urllib.parse import urlparse | from urllib.parse import urlparse | ||||
| from requests.exceptions import HTTPError | from requests.exceptions import HTTPError | ||||
| import sentry_sdk | import sentry_sdk | ||||
| from swh.core.config import load_from_envvar | from swh.core.config import load_from_envvar | ||||
| from swh.core.statsd import Statsd | from swh.core.statsd import Statsd | ||||
| from swh.core.tarball import uncompress | from swh.core.tarball import uncompress | ||||
| from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | ||||
| from swh.loader.core.utils import nix_hashes | from swh.loader.core.utils import nix_hashes | ||||
| from swh.loader.exception import NotFound, UnsupportedChecksumComputation | from swh.loader.exception import NotFound, UnsupportedChecksumComputation | ||||
| from swh.loader.package.utils import download | from swh.loader.package.utils import download | ||||
| from swh.model import from_disk | from swh.model import from_disk | ||||
| from swh.model.model import ( | from swh.model.model import ( | ||||
| BaseContent, | |||||
| Content, | Content, | ||||
| Directory, | Directory, | ||||
| Origin, | Origin, | ||||
| OriginVisit, | OriginVisit, | ||||
| OriginVisitStatus, | OriginVisitStatus, | ||||
| RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
| Release, | |||||
| Revision, | |||||
| Sha1Git, | Sha1Git, | ||||
| SkippedContent, | SkippedContent, | ||||
| Snapshot, | Snapshot, | ||||
| SnapshotBranch, | SnapshotBranch, | ||||
| TargetType, | TargetType, | ||||
| ) | ) | ||||
| from swh.storage import get_storage | from swh.storage import get_storage | ||||
| from swh.storage.algos.snapshot import snapshot_get_latest | from swh.storage.algos.snapshot import snapshot_get_latest | ||||
| ▲ Show 20 Lines • Show All 520 Lines • ▼ Show 20 Lines | class BaseLoader: | ||||
| ) -> None: | ) -> None: | ||||
| """Increments both ``{name}_sum`` (by the ``value``) and ``{name}_count`` | """Increments both ``{name}_sum`` (by the ``value``) and ``{name}_count`` | ||||
| (by ``1``), allowing to prometheus to compute the average ``value`` over | (by ``1``), allowing to prometheus to compute the average ``value`` over | ||||
| time.""" | time.""" | ||||
| self.statsd.increment(f"{name}_sum", value, tags=tags) | self.statsd.increment(f"{name}_sum", value, tags=tags) | ||||
| self.statsd.increment(f"{name}_count", tags=tags) | self.statsd.increment(f"{name}_count", tags=tags) | ||||
| class DVCSLoader(BaseLoader): | |||||
| """This base class is a pattern for dvcs loaders (e.g. git, mercurial). | |||||
| Those loaders are able to load all the data in one go. For example, the | |||||
| loader defined in swh-loader-git :class:`BulkUpdater`. | |||||
| For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), | |||||
| inherit directly from :class:`BaseLoader`. | |||||
| """ | |||||
| def cleanup(self) -> None: | |||||
| """Clean up an eventual state installed for computations.""" | |||||
| pass | |||||
| def has_contents(self) -> bool: | |||||
| """Checks whether we need to load contents""" | |||||
| return True | |||||
| def get_contents(self) -> Iterable[BaseContent]: | |||||
| """Get the contents that need to be loaded""" | |||||
| raise NotImplementedError | |||||
| def has_directories(self) -> bool: | |||||
| """Checks whether we need to load directories""" | |||||
| return True | |||||
| def get_directories(self) -> Iterable[Directory]: | |||||
| """Get the directories that need to be loaded""" | |||||
| raise NotImplementedError | |||||
| def has_revisions(self) -> bool: | |||||
| """Checks whether we need to load revisions""" | |||||
| return True | |||||
| def get_revisions(self) -> Iterable[Revision]: | |||||
| """Get the revisions that need to be loaded""" | |||||
| raise NotImplementedError | |||||
| def has_releases(self) -> bool: | |||||
| """Checks whether we need to load releases""" | |||||
| return True | |||||
| def get_releases(self) -> Iterable[Release]: | |||||
| """Get the releases that need to be loaded""" | |||||
| raise NotImplementedError | |||||
| def get_snapshot(self) -> Snapshot: | |||||
| """Get the snapshot that needs to be loaded""" | |||||
| raise NotImplementedError | |||||
| def eventful(self) -> bool: | |||||
| """Whether the load was eventful""" | |||||
| raise NotImplementedError | |||||
| def store_data(self) -> None: | |||||
| assert self.origin | |||||
| if self.save_data_path: | |||||
| self.save_data() | |||||
| if self.has_contents(): | |||||
| for obj in self.get_contents(): | |||||
| if isinstance(obj, Content): | |||||
| self.storage.content_add([obj]) | |||||
| elif isinstance(obj, SkippedContent): | |||||
| self.storage.skipped_content_add([obj]) | |||||
| else: | |||||
| raise TypeError(f"Unexpected content type: {obj}") | |||||
| if self.has_directories(): | |||||
| for directory in self.get_directories(): | |||||
| self.storage.directory_add([directory]) | |||||
| if self.has_revisions(): | |||||
| for revision in self.get_revisions(): | |||||
| self.storage.revision_add([revision]) | |||||
| if self.has_releases(): | |||||
| for release in self.get_releases(): | |||||
| self.storage.release_add([release]) | |||||
| snapshot = self.get_snapshot() | |||||
| self.storage.snapshot_add([snapshot]) | |||||
| self.flush() | |||||
| self.loaded_snapshot_id = snapshot.id | |||||
| class NodeLoader(BaseLoader): | class NodeLoader(BaseLoader): | ||||
| """Common class for :class:`ContentLoader` and :class:`Directoryloader`. | """Common class for :class:`ContentLoader` and :class:`Directoryloader`. | ||||
| The "checksums" field is a dictionary of hex hashes on the object retrieved (content | The "checksums" field is a dictionary of hex hashes on the object retrieved (content | ||||
| or directory). When "checksums_computation" is "standard", that means the checksums | or directory). When "checksums_computation" is "standard", that means the checksums | ||||
| are computed on the content of the remote file to retrieve itself (as unix cli | are computed on the content of the remote file to retrieve itself (as unix cli | ||||
| allows, "sha1sum", "sha256sum", ...). When "checksums_computation" is "nar", the | allows, "sha1sum", "sha256sum", ...). When "checksums_computation" is "nar", the | ||||
| checks is delegated to the `nix-store --dump` command, it's actually checksums on | checks is delegated to the `nix-store --dump` command, it's actually checksums on | ||||
| ▲ Show 20 Lines • Show All 304 Lines • Show Last 20 Lines | |||||