Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/loader.py
| # Copyright (C) 2015-2022 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||||||||||||||
| # See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||||||||||
| # License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||||||||||
| # See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||||||||||
| import contextlib | |||||||||||||||||
| import datetime | import datetime | ||||||||||||||||
| import hashlib | import hashlib | ||||||||||||||||
| import logging | import logging | ||||||||||||||||
| import os | import os | ||||||||||||||||
| import time | import time | ||||||||||||||||
| from typing import Any, Dict, Iterable, List, Optional | from typing import Any, ContextManager, Dict, Iterable, List, Optional, Union | ||||||||||||||||
| import sentry_sdk | import sentry_sdk | ||||||||||||||||
| from swh.core.config import load_from_envvar | from swh.core.config import load_from_envvar | ||||||||||||||||
| from swh.core.statsd import statsd | from swh.core.statsd import Statsd | ||||||||||||||||
| from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | ||||||||||||||||
| from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||||||||||||||
| from swh.model.model import ( | from swh.model.model import ( | ||||||||||||||||
| BaseContent, | BaseContent, | ||||||||||||||||
| Content, | Content, | ||||||||||||||||
| Directory, | Directory, | ||||||||||||||||
| Origin, | Origin, | ||||||||||||||||
| OriginVisit, | OriginVisit, | ||||||||||||||||
| OriginVisitStatus, | OriginVisitStatus, | ||||||||||||||||
| RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||||||||||||||
| Release, | Release, | ||||||||||||||||
| Revision, | Revision, | ||||||||||||||||
| Sha1Git, | Sha1Git, | ||||||||||||||||
| SkippedContent, | SkippedContent, | ||||||||||||||||
| Snapshot, | Snapshot, | ||||||||||||||||
| ) | ) | ||||||||||||||||
| from swh.storage import get_storage | from swh.storage import get_storage | ||||||||||||||||
| from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||||||||||||||
| from swh.storage.utils import now | from swh.storage.utils import now | ||||||||||||||||
| DEFAULT_CONFIG: Dict[str, Any] = { | DEFAULT_CONFIG: Dict[str, Any] = { | ||||||||||||||||
| "max_content_size": 100 * 1024 * 1024, | "max_content_size": 100 * 1024 * 1024, | ||||||||||||||||
| } | } | ||||||||||||||||
| STATSD_PREFIX = "swh_loader" | |||||||||||||||||
| class BaseLoader: | class BaseLoader: | ||||||||||||||||
| """Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g | """Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g | ||||||||||||||||
| PyPI, Npm, CRAN, ...) | PyPI, Npm, CRAN, ...) | ||||||||||||||||
| A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/... | A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/... | ||||||||||||||||
| package artifacts), ingests the contents/directories/revisions/releases/snapshot | package artifacts), ingests the contents/directories/revisions/releases/snapshot | ||||||||||||||||
| read from those artifacts and send them to the archive through the storage backend. | read from those artifacts and send them to the archive through the storage backend. | ||||||||||||||||
| ▲ Show 20 Lines • Show All 77 Lines • ▼ Show 20 Lines | ): | ||||||||||||||||
| os.stat(path) | os.stat(path) | ||||||||||||||||
| if not os.access(path, os.R_OK | os.W_OK): | if not os.access(path, os.R_OK | os.W_OK): | ||||||||||||||||
| raise PermissionError("Permission denied: %r" % path) | raise PermissionError("Permission denied: %r" % path) | ||||||||||||||||
| self.save_data_path = save_data_path | self.save_data_path = save_data_path | ||||||||||||||||
| self.parent_origins = None | self.parent_origins = None | ||||||||||||||||
| self.statsd = Statsd( | |||||||||||||||||
| namespace="swh_loader", constant_tags={"visit_type": self.visit_type} | |||||||||||||||||
| ) | |||||||||||||||||
| @classmethod | @classmethod | ||||||||||||||||
| def from_config(cls, storage: Dict[str, Any], **config: Any): | def from_config(cls, storage: Dict[str, Any], **config: Any): | ||||||||||||||||
| """Instantiate a loader from a configuration dict. | """Instantiate a loader from a configuration dict. | ||||||||||||||||
| This is basically a backwards-compatibility shim for the CLI. | This is basically a backwards-compatibility shim for the CLI. | ||||||||||||||||
| Args: | Args: | ||||||||||||||||
| storage: instantiation config for the storage | storage: instantiation config for the storage | ||||||||||||||||
| ▲ Show 20 Lines • Show All 343 Lines • ▼ Show 20 Lines | def build_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadata]: | ||||||||||||||||
| self.statsd_average( | self.statsd_average( | ||||||||||||||||
| "metadata_parent_origins", | "metadata_parent_origins", | ||||||||||||||||
| len(self.parent_origins), | len(self.parent_origins), | ||||||||||||||||
| tags={"fetcher": cls.FETCHER_NAME}, | tags={"fetcher": cls.FETCHER_NAME}, | ||||||||||||||||
| ) | ) | ||||||||||||||||
| self.statsd_average("metadata_objects", len(metadata)) | self.statsd_average("metadata_objects", len(metadata)) | ||||||||||||||||
| return metadata | return metadata | ||||||||||||||||
| @contextlib.contextmanager | def statsd_timed(self, name: str, tags: Dict[str, Any] = {}) -> ContextManager: | ||||||||||||||||
olasdUnsubmitted Done Inline Actions
olasd: | |||||||||||||||||
| def statsd_timed(self, name, tags={}): | """ | ||||||||||||||||
| with statsd.timed( | Wrapper for :meth:`swh.core.statsd.Statsd.timed`, which uses the standard | ||||||||||||||||
| f"{STATSD_PREFIX}_operation_duration_seconds", | metric name and tags for loaders. | ||||||||||||||||
| tags={"visit_type": self.visit_type, "operation": name, **tags}, | """ | ||||||||||||||||
| ): | return self.statsd.timed( | ||||||||||||||||
| yield | "operation_duration_seconds", tags={"operation": name, **tags} | ||||||||||||||||
| ) | |||||||||||||||||
| def statsd_timing(self, name, value, tags={}): | def statsd_timing(self, name: str, value: float, tags: Dict[str, Any] = {}) -> None: | ||||||||||||||||
| statsd.timing( | """ | ||||||||||||||||
| f"{STATSD_PREFIX}_operation_duration_seconds", | Wrapper for :meth:`swh.core.statsd.Statsd.timing`, which uses the standard | ||||||||||||||||
| value, | metric name and tags for loaders. | ||||||||||||||||
| tags={"visit_type": self.visit_type, "operation": name, **tags}, | """ | ||||||||||||||||
| ) | self.statsd.timing( | ||||||||||||||||
| "operation_duration_seconds", value, tags={"operation": name, **tags} | |||||||||||||||||
| def statsd_average(self, name, value, tags={}): | |||||||||||||||||
| statsd.increment( | |||||||||||||||||
| f"{STATSD_PREFIX}_{name}_sum", | |||||||||||||||||
| value, | |||||||||||||||||
| tags={"visit_type": self.visit_type, **tags}, | |||||||||||||||||
| ) | |||||||||||||||||
| statsd.increment( | |||||||||||||||||
| f"{STATSD_PREFIX}_{name}_count", | |||||||||||||||||
| tags={"visit_type": self.visit_type, **tags}, | |||||||||||||||||
| ) | ) | ||||||||||||||||
| def statsd_average( | |||||||||||||||||
| self, name: str, value: Union[int, float], tags: Dict[str, Any] = {} | |||||||||||||||||
| ) -> None: | |||||||||||||||||
| """Increments both ``{name}_sum`` (by the ``value``) and ``{name}_count`` | |||||||||||||||||
| (by ``1``), allowing to prometheus to compute the average ``value`` over | |||||||||||||||||
| time.""" | |||||||||||||||||
| self.statsd.increment(f"{name}_sum", value, tags=tags) | |||||||||||||||||
| self.statsd.increment(f"{name}_count", tags=tags) | |||||||||||||||||
| class DVCSLoader(BaseLoader): | class DVCSLoader(BaseLoader): | ||||||||||||||||
| """This base class is a pattern for dvcs loaders (e.g. git, mercurial). | """This base class is a pattern for dvcs loaders (e.g. git, mercurial). | ||||||||||||||||
| Those loaders are able to load all the data in one go. For example, the | Those loaders are able to load all the data in one go. For example, the | ||||||||||||||||
| loader defined in swh-loader-git :class:`BulkUpdater`. | loader defined in swh-loader-git :class:`BulkUpdater`. | ||||||||||||||||
| For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), | For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), | ||||||||||||||||
| ▲ Show 20 Lines • Show All 74 Lines • Show Last 20 Lines | |||||||||||||||||