diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -3,15 +3,18 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import contextlib import datetime import hashlib import logging import os +import time from typing import Any, Dict, Iterable, List, Optional import sentry_sdk from swh.core.config import load_from_envvar +from swh.core.statsd import statsd from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister from swh.loader.exception import NotFound from swh.model.model import ( @@ -36,6 +39,8 @@ "max_content_size": 100 * 1024 * 1024, } +STATSD_PREFIX = "swh.loader.core" + class BaseLoader: """Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g @@ -317,7 +322,8 @@ """ try: - self.pre_cleanup() + with self.statsd_timed("pre_cleanup"): + self.pre_cleanup() except Exception: msg = "Cleaning up dangling data failed! Continue loading." self.log.warning(msg) @@ -333,7 +339,8 @@ ) try: - metadata = self.build_extrinsic_origin_metadata() + with self.statsd_timed("build_extrinsic_origin_metadata"): + metadata = self.build_extrinsic_origin_metadata() self.load_metadata_objects(metadata) except Exception as e: sentry_sdk.capture_exception(e) @@ -350,25 +357,36 @@ }, ) + total_time_fetch_data = 0.0 + total_time_store_data = 0.0 + try: - self.prepare() + with self.statsd_timed("prepare"): + self.prepare() while True: + t1 = time.monotonic() more_data_to_fetch = self.fetch_data() + t2 = time.monotonic() + total_time_fetch_data += t2 - t1 self.store_data() + t3 = time.monotonic() + total_time_store_data += t3 - t2 if not more_data_to_fetch: break + status = self.visit_status() visit_status = OriginVisitStatus( origin=self.origin.url, visit=self.visit.visit, type=self.visit_type, date=now(), - status=self.visit_status(), + status=status, snapshot=self.loaded_snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) - self.post_load() + with self.statsd_timed("post_load", tags={"success": True}): + self.post_load() except Exception as e: if isinstance(e, NotFound): status = "not_found" @@ -399,11 +417,18 @@ snapshot=self.loaded_snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) - self.post_load(success=False) + with self.statsd_timed( + "post_load", tags={"success": False, "status": status} + ): + self.post_load(success=False) return {"status": task_status} finally: - self.flush() - self.cleanup() + with self.statsd_timed("flush", tags={"success": False, "status": status}): + self.flush() + with self.statsd_timed( + "cleanup", tags={"success": False, "status": status} + ): + self.cleanup() return self.load_status() @@ -440,12 +465,21 @@ lister_instance_name=self.lister_instance_name, credentials=self.metadata_fetcher_credentials, ) - metadata.extend(metadata_fetcher.get_origin_metadata()) + with self.statsd_timed("fetch_one_metadata"): + metadata.extend(metadata_fetcher.get_origin_metadata()) if self.parent_origins is None: self.parent_origins = metadata_fetcher.get_parent_origins() return metadata + @contextlib.contextmanager + def statsd_timed(self, name, tags={}): + with statsd.timed( + f"{STATSD_PREFIX}.duration_seconds.{name}", + tags={"visit_type": self.visit_type, **tags}, + ): + yield + class DVCSLoader(BaseLoader): """This base class is a pattern for dvcs loaders (e.g. git, mercurial).