Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/loader.py
# Copyright (C) 2015-2022 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||||||||||
import contextlib | |||||||||||||||||
import datetime | import datetime | ||||||||||||||||
import hashlib | import hashlib | ||||||||||||||||
import logging | import logging | ||||||||||||||||
import os | import os | ||||||||||||||||
import time | import time | ||||||||||||||||
from typing import Any, Dict, Iterable, List, Optional | from typing import Any, ContextManager, Dict, Iterable, List, Optional, Union | ||||||||||||||||
import sentry_sdk | import sentry_sdk | ||||||||||||||||
from swh.core.config import load_from_envvar | from swh.core.config import load_from_envvar | ||||||||||||||||
from swh.core.statsd import statsd | from swh.core.statsd import Statsd | ||||||||||||||||
from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | ||||||||||||||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||||||||||||||
from swh.model.model import ( | from swh.model.model import ( | ||||||||||||||||
BaseContent, | BaseContent, | ||||||||||||||||
Content, | Content, | ||||||||||||||||
Directory, | Directory, | ||||||||||||||||
Origin, | Origin, | ||||||||||||||||
OriginVisit, | OriginVisit, | ||||||||||||||||
OriginVisitStatus, | OriginVisitStatus, | ||||||||||||||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||||||||||||||
Release, | Release, | ||||||||||||||||
Revision, | Revision, | ||||||||||||||||
Sha1Git, | Sha1Git, | ||||||||||||||||
SkippedContent, | SkippedContent, | ||||||||||||||||
Snapshot, | Snapshot, | ||||||||||||||||
) | ) | ||||||||||||||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||||||||||||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||||||||||||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||||||||||||||
DEFAULT_CONFIG: Dict[str, Any] = { | DEFAULT_CONFIG: Dict[str, Any] = { | ||||||||||||||||
"max_content_size": 100 * 1024 * 1024, | "max_content_size": 100 * 1024 * 1024, | ||||||||||||||||
} | } | ||||||||||||||||
STATSD_PREFIX = "swh_loader" | |||||||||||||||||
class BaseLoader: | class BaseLoader: | ||||||||||||||||
"""Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g | """Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g | ||||||||||||||||
PyPI, Npm, CRAN, ...) | PyPI, Npm, CRAN, ...) | ||||||||||||||||
A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/... | A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/... | ||||||||||||||||
package artifacts), ingests the contents/directories/revisions/releases/snapshot | package artifacts), ingests the contents/directories/revisions/releases/snapshot | ||||||||||||||||
read from those artifacts and send them to the archive through the storage backend. | read from those artifacts and send them to the archive through the storage backend. | ||||||||||||||||
▲ Show 20 Lines • Show All 77 Lines • ▼ Show 20 Lines | ): | ||||||||||||||||
os.stat(path) | os.stat(path) | ||||||||||||||||
if not os.access(path, os.R_OK | os.W_OK): | if not os.access(path, os.R_OK | os.W_OK): | ||||||||||||||||
raise PermissionError("Permission denied: %r" % path) | raise PermissionError("Permission denied: %r" % path) | ||||||||||||||||
self.save_data_path = save_data_path | self.save_data_path = save_data_path | ||||||||||||||||
self.parent_origins = None | self.parent_origins = None | ||||||||||||||||
self.statsd = Statsd( | |||||||||||||||||
namespace="swh_loader", constant_tags={"visit_type": self.visit_type} | |||||||||||||||||
) | |||||||||||||||||
@classmethod | @classmethod | ||||||||||||||||
def from_config(cls, storage: Dict[str, Any], **config: Any): | def from_config(cls, storage: Dict[str, Any], **config: Any): | ||||||||||||||||
"""Instantiate a loader from a configuration dict. | """Instantiate a loader from a configuration dict. | ||||||||||||||||
This is basically a backwards-compatibility shim for the CLI. | This is basically a backwards-compatibility shim for the CLI. | ||||||||||||||||
Args: | Args: | ||||||||||||||||
storage: instantiation config for the storage | storage: instantiation config for the storage | ||||||||||||||||
▲ Show 20 Lines • Show All 343 Lines • ▼ Show 20 Lines | def build_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadata]: | ||||||||||||||||
self.statsd_average( | self.statsd_average( | ||||||||||||||||
"metadata_parent_origins", | "metadata_parent_origins", | ||||||||||||||||
len(self.parent_origins), | len(self.parent_origins), | ||||||||||||||||
tags={"fetcher": cls.FETCHER_NAME}, | tags={"fetcher": cls.FETCHER_NAME}, | ||||||||||||||||
) | ) | ||||||||||||||||
self.statsd_average("metadata_objects", len(metadata)) | self.statsd_average("metadata_objects", len(metadata)) | ||||||||||||||||
return metadata | return metadata | ||||||||||||||||
@contextlib.contextmanager | def statsd_timed(self, name: str, tags: Dict[str, Any] = {}) -> ContextManager: | ||||||||||||||||
olasdUnsubmitted Done Inline Actions
olasd: | |||||||||||||||||
def statsd_timed(self, name, tags={}): | """ | ||||||||||||||||
with statsd.timed( | Wrapper for :meth:`swh.core.statsd.Statsd.timed`, which uses the standard | ||||||||||||||||
f"{STATSD_PREFIX}_operation_duration_seconds", | metric name and tags for loaders. | ||||||||||||||||
tags={"visit_type": self.visit_type, "operation": name, **tags}, | """ | ||||||||||||||||
): | return self.statsd.timed( | ||||||||||||||||
yield | "operation_duration_seconds", tags={"operation": name, **tags} | ||||||||||||||||
) | |||||||||||||||||
def statsd_timing(self, name, value, tags={}): | def statsd_timing(self, name: str, value: float, tags: Dict[str, Any] = {}) -> None: | ||||||||||||||||
statsd.timing( | """ | ||||||||||||||||
f"{STATSD_PREFIX}_operation_duration_seconds", | Wrapper for :meth:`swh.core.statsd.Statsd.timing`, which uses the standard | ||||||||||||||||
value, | metric name and tags for loaders. | ||||||||||||||||
tags={"visit_type": self.visit_type, "operation": name, **tags}, | """ | ||||||||||||||||
) | self.statsd.timing( | ||||||||||||||||
"operation_duration_seconds", value, tags={"operation": name, **tags} | |||||||||||||||||
def statsd_average(self, name, value, tags={}): | |||||||||||||||||
statsd.increment( | |||||||||||||||||
f"{STATSD_PREFIX}_{name}_sum", | |||||||||||||||||
value, | |||||||||||||||||
tags={"visit_type": self.visit_type, **tags}, | |||||||||||||||||
) | |||||||||||||||||
statsd.increment( | |||||||||||||||||
f"{STATSD_PREFIX}_{name}_count", | |||||||||||||||||
tags={"visit_type": self.visit_type, **tags}, | |||||||||||||||||
) | ) | ||||||||||||||||
def statsd_average( | |||||||||||||||||
self, name: str, value: Union[int, float], tags: Dict[str, Any] = {} | |||||||||||||||||
) -> None: | |||||||||||||||||
"""Increments both ``{name}_sum`` (by the ``value``) and ``{name}_count`` | |||||||||||||||||
(by ``1``), allowing to prometheus to compute the average ``value`` over | |||||||||||||||||
time.""" | |||||||||||||||||
self.statsd.increment(f"{name}_sum", value, tags=tags) | |||||||||||||||||
self.statsd.increment(f"{name}_count", tags=tags) | |||||||||||||||||
class DVCSLoader(BaseLoader): | class DVCSLoader(BaseLoader): | ||||||||||||||||
"""This base class is a pattern for dvcs loaders (e.g. git, mercurial). | """This base class is a pattern for dvcs loaders (e.g. git, mercurial). | ||||||||||||||||
Those loaders are able to load all the data in one go. For example, the | Those loaders are able to load all the data in one go. For example, the | ||||||||||||||||
loader defined in swh-loader-git :class:`BulkUpdater`. | loader defined in swh-loader-git :class:`BulkUpdater`. | ||||||||||||||||
For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), | For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), | ||||||||||||||||
▲ Show 20 Lines • Show All 74 Lines • Show Last 20 Lines |