Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/loader.py
# Copyright (C) 2015-2022 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | |||||||||||||||||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | |||||||||||||||||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | |||||||||||||||||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | |||||||||||||||||||||||||||
import contextlib | ||||||||||||||||||||||||||||
import datetime | import datetime | |||||||||||||||||||||||||||
import hashlib | import hashlib | |||||||||||||||||||||||||||
import logging | import logging | |||||||||||||||||||||||||||
import os | import os | |||||||||||||||||||||||||||
import time | ||||||||||||||||||||||||||||
from typing import Any, Dict, Iterable, List, Optional | from typing import Any, Dict, Iterable, List, Optional | |||||||||||||||||||||||||||
import sentry_sdk | import sentry_sdk | |||||||||||||||||||||||||||
from swh.core.config import load_from_envvar | from swh.core.config import load_from_envvar | |||||||||||||||||||||||||||
from swh.core.statsd import statsd | ||||||||||||||||||||||||||||
from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister | |||||||||||||||||||||||||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | |||||||||||||||||||||||||||
from swh.model.model import ( | from swh.model.model import ( | |||||||||||||||||||||||||||
BaseContent, | BaseContent, | |||||||||||||||||||||||||||
Content, | Content, | |||||||||||||||||||||||||||
Directory, | Directory, | |||||||||||||||||||||||||||
Origin, | Origin, | |||||||||||||||||||||||||||
OriginVisit, | OriginVisit, | |||||||||||||||||||||||||||
OriginVisitStatus, | OriginVisitStatus, | |||||||||||||||||||||||||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | |||||||||||||||||||||||||||
Release, | Release, | |||||||||||||||||||||||||||
Revision, | Revision, | |||||||||||||||||||||||||||
Sha1Git, | Sha1Git, | |||||||||||||||||||||||||||
SkippedContent, | SkippedContent, | |||||||||||||||||||||||||||
Snapshot, | Snapshot, | |||||||||||||||||||||||||||
) | ) | |||||||||||||||||||||||||||
from swh.storage import get_storage | from swh.storage import get_storage | |||||||||||||||||||||||||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | |||||||||||||||||||||||||||
from swh.storage.utils import now | from swh.storage.utils import now | |||||||||||||||||||||||||||
DEFAULT_CONFIG: Dict[str, Any] = { | DEFAULT_CONFIG: Dict[str, Any] = { | |||||||||||||||||||||||||||
"max_content_size": 100 * 1024 * 1024, | "max_content_size": 100 * 1024 * 1024, | |||||||||||||||||||||||||||
} | } | |||||||||||||||||||||||||||
olasd: Could probably just be `swh_loader` as it's fully generic. | ||||||||||||||||||||||||||||
STATSD_PREFIX = "swh_loader" | ||||||||||||||||||||||||||||
class BaseLoader: | class BaseLoader: | |||||||||||||||||||||||||||
"""Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g | """Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g | |||||||||||||||||||||||||||
PyPI, Npm, CRAN, ...) | PyPI, Npm, CRAN, ...) | |||||||||||||||||||||||||||
A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/... | A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/... | |||||||||||||||||||||||||||
package artifacts), ingests the contents/directories/revisions/releases/snapshot | package artifacts), ingests the contents/directories/revisions/releases/snapshot | |||||||||||||||||||||||||||
read from those artifacts and send them to the archive through the storage backend. | read from those artifacts and send them to the archive through the storage backend. | |||||||||||||||||||||||||||
▲ Show 20 Lines • Show All 265 Lines • ▼ Show 20 Lines | def load(self) -> Dict[str, str]: | |||||||||||||||||||||||||||
- Call :meth:`fetch_data` to fetch the data to store | - Call :meth:`fetch_data` to fetch the data to store | |||||||||||||||||||||||||||
- Call :meth:`store_data` to store the data | - Call :meth:`store_data` to store the data | |||||||||||||||||||||||||||
- Call :meth:`cleanup` to clean up any eventual state put in place | - Call :meth:`cleanup` to clean up any eventual state put in place | |||||||||||||||||||||||||||
in :meth:`prepare` method. | in :meth:`prepare` method. | |||||||||||||||||||||||||||
""" | """ | |||||||||||||||||||||||||||
try: | try: | |||||||||||||||||||||||||||
with self.statsd_timed("pre_cleanup"): | ||||||||||||||||||||||||||||
self.pre_cleanup() | self.pre_cleanup() | |||||||||||||||||||||||||||
except Exception: | except Exception: | |||||||||||||||||||||||||||
msg = "Cleaning up dangling data failed! Continue loading." | msg = "Cleaning up dangling data failed! Continue loading." | |||||||||||||||||||||||||||
self.log.warning(msg) | self.log.warning(msg) | |||||||||||||||||||||||||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | |||||||||||||||||||||||||||
self._store_origin_visit() | self._store_origin_visit() | |||||||||||||||||||||||||||
assert ( | assert ( | |||||||||||||||||||||||||||
self.visit.visit | self.visit.visit | |||||||||||||||||||||||||||
), "The method `_store_origin_visit` should set the visit (OriginVisit)" | ), "The method `_store_origin_visit` should set the visit (OriginVisit)" | |||||||||||||||||||||||||||
self.log.info( | self.log.info( | |||||||||||||||||||||||||||
"Load origin '%s' with type '%s'", self.origin.url, self.visit.type | "Load origin '%s' with type '%s'", self.origin.url, self.visit.type | |||||||||||||||||||||||||||
) | ) | |||||||||||||||||||||||||||
try: | try: | |||||||||||||||||||||||||||
with self.statsd_timed("build_extrinsic_origin_metadata"): | ||||||||||||||||||||||||||||
metadata = self.build_extrinsic_origin_metadata() | metadata = self.build_extrinsic_origin_metadata() | |||||||||||||||||||||||||||
self.load_metadata_objects(metadata) | self.load_metadata_objects(metadata) | |||||||||||||||||||||||||||
except Exception as e: | except Exception as e: | |||||||||||||||||||||||||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | |||||||||||||||||||||||||||
# Do not fail the whole task if this is the only failure | # Do not fail the whole task if this is the only failure | |||||||||||||||||||||||||||
self.log.exception( | self.log.exception( | |||||||||||||||||||||||||||
"Failure while loading extrinsic origin metadata.", | "Failure while loading extrinsic origin metadata.", | |||||||||||||||||||||||||||
extra={ | extra={ | |||||||||||||||||||||||||||
"swh_task_args": [], | "swh_task_args": [], | |||||||||||||||||||||||||||
"swh_task_kwargs": { | "swh_task_kwargs": { | |||||||||||||||||||||||||||
"origin": self.origin.url, | "origin": self.origin.url, | |||||||||||||||||||||||||||
"lister_name": self.lister_name, | "lister_name": self.lister_name, | |||||||||||||||||||||||||||
"lister_instance_name": self.lister_instance_name, | "lister_instance_name": self.lister_instance_name, | |||||||||||||||||||||||||||
}, | }, | |||||||||||||||||||||||||||
}, | }, | |||||||||||||||||||||||||||
) | ) | |||||||||||||||||||||||||||
total_time_fetch_data = 0.0 | ||||||||||||||||||||||||||||
total_time_store_data = 0.0 | ||||||||||||||||||||||||||||
try: | try: | |||||||||||||||||||||||||||
with self.statsd_timed("prepare"): | ||||||||||||||||||||||||||||
self.prepare() | self.prepare() | |||||||||||||||||||||||||||
while True: | while True: | |||||||||||||||||||||||||||
t1 = time.monotonic() | ||||||||||||||||||||||||||||
more_data_to_fetch = self.fetch_data() | more_data_to_fetch = self.fetch_data() | |||||||||||||||||||||||||||
t2 = time.monotonic() | ||||||||||||||||||||||||||||
total_time_fetch_data += t2 - t1 | ||||||||||||||||||||||||||||
self.store_data() | self.store_data() | |||||||||||||||||||||||||||
t3 = time.monotonic() | ||||||||||||||||||||||||||||
total_time_store_data += t3 - t2 | ||||||||||||||||||||||||||||
if not more_data_to_fetch: | if not more_data_to_fetch: | |||||||||||||||||||||||||||
break | break | |||||||||||||||||||||||||||
self.statsd_timing("fetch_data", total_time_fetch_data * 1000.0) | ||||||||||||||||||||||||||||
Not Done Inline ActionsAre you sure the * 1000 is needed? I'm pretty sure our statsd client does this conversion already and expects seconds as arguments. olasd: Are you sure the `* 1000` is needed? I'm pretty sure our statsd client does this conversion… | ||||||||||||||||||||||||||||
Done Inline ActionsIt's done in TimedContextManagerDecorator, which is used by statsd_timed (a decoractor and context manager), not by statsd_timing (a one-off function). You can see in the test that in the end, everything is multiplied by 1000. vlorentz: It's done in `TimedContextManagerDecorator`, which is used by `statsd_timed` (a decoractor and… | ||||||||||||||||||||||||||||
Not Done Inline ActionsBarf. Okay. olasd: Barf. Okay. | ||||||||||||||||||||||||||||
self.statsd_timing("store_data", total_time_store_data * 1000.0) | ||||||||||||||||||||||||||||
status = self.visit_status() | ||||||||||||||||||||||||||||
visit_status = OriginVisitStatus( | visit_status = OriginVisitStatus( | |||||||||||||||||||||||||||
origin=self.origin.url, | origin=self.origin.url, | |||||||||||||||||||||||||||
visit=self.visit.visit, | visit=self.visit.visit, | |||||||||||||||||||||||||||
type=self.visit_type, | type=self.visit_type, | |||||||||||||||||||||||||||
date=now(), | date=now(), | |||||||||||||||||||||||||||
status=self.visit_status(), | status=status, | |||||||||||||||||||||||||||
snapshot=self.loaded_snapshot_id, | snapshot=self.loaded_snapshot_id, | |||||||||||||||||||||||||||
) | ) | |||||||||||||||||||||||||||
self.storage.origin_visit_status_add([visit_status]) | self.storage.origin_visit_status_add([visit_status]) | |||||||||||||||||||||||||||
success = True | ||||||||||||||||||||||||||||
with self.statsd_timed( | ||||||||||||||||||||||||||||
"post_load", tags={"success": success, "status": status} | ||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||
self.post_load() | self.post_load() | |||||||||||||||||||||||||||
except Exception as e: | except Exception as e: | |||||||||||||||||||||||||||
success = False | ||||||||||||||||||||||||||||
if isinstance(e, NotFound): | if isinstance(e, NotFound): | |||||||||||||||||||||||||||
status = "not_found" | status = "not_found" | |||||||||||||||||||||||||||
task_status = "uneventful" | task_status = "uneventful" | |||||||||||||||||||||||||||
else: | else: | |||||||||||||||||||||||||||
status = "partial" if self.loaded_snapshot_id else "failed" | status = "partial" if self.loaded_snapshot_id else "failed" | |||||||||||||||||||||||||||
task_status = "failed" | task_status = "failed" | |||||||||||||||||||||||||||
self.log.exception( | self.log.exception( | |||||||||||||||||||||||||||
Show All 13 Lines | def load(self) -> Dict[str, str]: | |||||||||||||||||||||||||||
origin=self.origin.url, | origin=self.origin.url, | |||||||||||||||||||||||||||
visit=self.visit.visit, | visit=self.visit.visit, | |||||||||||||||||||||||||||
type=self.visit_type, | type=self.visit_type, | |||||||||||||||||||||||||||
date=now(), | date=now(), | |||||||||||||||||||||||||||
status=status, | status=status, | |||||||||||||||||||||||||||
snapshot=self.loaded_snapshot_id, | snapshot=self.loaded_snapshot_id, | |||||||||||||||||||||||||||
) | ) | |||||||||||||||||||||||||||
self.storage.origin_visit_status_add([visit_status]) | self.storage.origin_visit_status_add([visit_status]) | |||||||||||||||||||||||||||
self.post_load(success=False) | with self.statsd_timed( | |||||||||||||||||||||||||||
"post_load", tags={"success": success, "status": status} | ||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||
self.post_load(success=success) | ||||||||||||||||||||||||||||
return {"status": task_status} | return {"status": task_status} | |||||||||||||||||||||||||||
finally: | finally: | |||||||||||||||||||||||||||
with self.statsd_timed( | ||||||||||||||||||||||||||||
"flush", tags={"success": success, "status": status} | ||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||
Done Inline Actionssame here anlambert: same here | ||||||||||||||||||||||||||||
self.flush() | self.flush() | |||||||||||||||||||||||||||
with self.statsd_timed( | ||||||||||||||||||||||||||||
"cleanup", tags={"success": success, "status": status} | ||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||
self.cleanup() | self.cleanup() | |||||||||||||||||||||||||||
Done Inline Actionssuccess my be True here if we did not enter the except block. anlambert: `success` my be `True` here if we did not enter the except block. | ||||||||||||||||||||||||||||
return self.load_status() | return self.load_status() | |||||||||||||||||||||||||||
def load_metadata_objects( | def load_metadata_objects( | |||||||||||||||||||||||||||
self, metadata_objects: List[RawExtrinsicMetadata] | self, metadata_objects: List[RawExtrinsicMetadata] | |||||||||||||||||||||||||||
) -> None: | ) -> None: | |||||||||||||||||||||||||||
if not metadata_objects: | if not metadata_objects: | |||||||||||||||||||||||||||
return | return | |||||||||||||||||||||||||||
Show All 20 Lines | def build_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadata]: | |||||||||||||||||||||||||||
metadata = [] | metadata = [] | |||||||||||||||||||||||||||
for cls in get_fetchers_for_lister(self.lister_name): | for cls in get_fetchers_for_lister(self.lister_name): | |||||||||||||||||||||||||||
metadata_fetcher = cls( | metadata_fetcher = cls( | |||||||||||||||||||||||||||
origin=self.origin, | origin=self.origin, | |||||||||||||||||||||||||||
lister_name=self.lister_name, | lister_name=self.lister_name, | |||||||||||||||||||||||||||
lister_instance_name=self.lister_instance_name, | lister_instance_name=self.lister_instance_name, | |||||||||||||||||||||||||||
credentials=self.metadata_fetcher_credentials, | credentials=self.metadata_fetcher_credentials, | |||||||||||||||||||||||||||
) | ) | |||||||||||||||||||||||||||
with self.statsd_timed("fetch_one_metadata"): | ||||||||||||||||||||||||||||
metadata.extend(metadata_fetcher.get_origin_metadata()) | metadata.extend(metadata_fetcher.get_origin_metadata()) | |||||||||||||||||||||||||||
if self.parent_origins is None: | if self.parent_origins is None: | |||||||||||||||||||||||||||
self.parent_origins = metadata_fetcher.get_parent_origins() | self.parent_origins = metadata_fetcher.get_parent_origins() | |||||||||||||||||||||||||||
return metadata | return metadata | |||||||||||||||||||||||||||
@contextlib.contextmanager | ||||||||||||||||||||||||||||
def statsd_timed(self, name, tags={}): | ||||||||||||||||||||||||||||
with statsd.timed( | ||||||||||||||||||||||||||||
f"{STATSD_PREFIX}_operation_duration_seconds", | ||||||||||||||||||||||||||||
tags={"visit_type": self.visit_type, "operation": name, **tags}, | ||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||
yield | ||||||||||||||||||||||||||||
def statsd_timing(self, name, value, tags={}): | ||||||||||||||||||||||||||||
statsd.timing( | ||||||||||||||||||||||||||||
f"{STATSD_PREFIX}_operation_duration_seconds", | ||||||||||||||||||||||||||||
value, | ||||||||||||||||||||||||||||
tags={"visit_type": self.visit_type, "operation": name, **tags}, | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
Not Done Inline Actions
olasd: - We want the metric name to finish with the unit to match the [[ https://prometheus. | ||||||||||||||||||||||||||||
Done Inline Actionsgood point, will do. vlorentz: good point, will do. | ||||||||||||||||||||||||||||
Done Inline ActionsIs it fine to call it _seconds even though it's in millisecond; or should I change the suffix to _ms? vlorentz: Is it fine to call it `_seconds` even though it's in millisecond; or should I change the suffix… | ||||||||||||||||||||||||||||
Not Done Inline ActionsThe prometheus statsd exporter converts it to sensible units, so _seconds is correct olasd: The prometheus statsd exporter converts it to sensible units, so _seconds is correct | ||||||||||||||||||||||||||||
class DVCSLoader(BaseLoader): | class DVCSLoader(BaseLoader): | |||||||||||||||||||||||||||
"""This base class is a pattern for dvcs loaders (e.g. git, mercurial). | """This base class is a pattern for dvcs loaders (e.g. git, mercurial). | |||||||||||||||||||||||||||
Those loaders are able to load all the data in one go. For example, the | Those loaders are able to load all the data in one go. For example, the | |||||||||||||||||||||||||||
loader defined in swh-loader-git :class:`BulkUpdater`. | loader defined in swh-loader-git :class:`BulkUpdater`. | |||||||||||||||||||||||||||
For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), | For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), | |||||||||||||||||||||||||||
▲ Show 20 Lines • Show All 74 Lines • Show Last 20 Lines |
Could probably just be swh_loader as it's fully generic.