diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -3,15 +3,18 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import contextlib import datetime import hashlib import logging import os +import time from typing import Any, Dict, Iterable, List, Optional import sentry_sdk from swh.core.config import load_from_envvar +from swh.core.statsd import statsd from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister from swh.loader.exception import NotFound from swh.model.model import ( @@ -36,6 +39,8 @@ "max_content_size": 100 * 1024 * 1024, } +STATSD_PREFIX = "swh_loader" + class BaseLoader: """Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g @@ -317,7 +322,8 @@ """ try: - self.pre_cleanup() + with self.statsd_timed("pre_cleanup"): + self.pre_cleanup() except Exception: msg = "Cleaning up dangling data failed! Continue loading." self.log.warning(msg) @@ -333,7 +339,8 @@ ) try: - metadata = self.build_extrinsic_origin_metadata() + with self.statsd_timed("build_extrinsic_origin_metadata"): + metadata = self.build_extrinsic_origin_metadata() self.load_metadata_objects(metadata) except Exception as e: sentry_sdk.capture_exception(e) @@ -350,26 +357,44 @@ }, ) + total_time_fetch_data = 0.0 + total_time_store_data = 0.0 + try: - self.prepare() + with self.statsd_timed("prepare"): + self.prepare() while True: + t1 = time.monotonic() more_data_to_fetch = self.fetch_data() + t2 = time.monotonic() + total_time_fetch_data += t2 - t1 self.store_data() + t3 = time.monotonic() + total_time_store_data += t3 - t2 if not more_data_to_fetch: break + self.statsd_timing("fetch_data", total_time_fetch_data * 1000.0) + self.statsd_timing("store_data", total_time_store_data * 1000.0) + + status = self.visit_status() visit_status = OriginVisitStatus( origin=self.origin.url, visit=self.visit.visit, type=self.visit_type, date=now(), - status=self.visit_status(), + status=status, snapshot=self.loaded_snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) - self.post_load() + success = True + with self.statsd_timed( + "post_load", tags={"success": success, "status": status} + ): + self.post_load() except Exception as e: + success = False if isinstance(e, NotFound): status = "not_found" task_status = "uneventful" @@ -399,11 +424,20 @@ snapshot=self.loaded_snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) - self.post_load(success=False) + with self.statsd_timed( + "post_load", tags={"success": success, "status": status} + ): + self.post_load(success=success) return {"status": task_status} finally: - self.flush() - self.cleanup() + with self.statsd_timed( + "flush", tags={"success": success, "status": status} + ): + self.flush() + with self.statsd_timed( + "cleanup", tags={"success": success, "status": status} + ): + self.cleanup() return self.load_status() @@ -440,12 +474,28 @@ lister_instance_name=self.lister_instance_name, credentials=self.metadata_fetcher_credentials, ) - metadata.extend(metadata_fetcher.get_origin_metadata()) + with self.statsd_timed("fetch_one_metadata"): + metadata.extend(metadata_fetcher.get_origin_metadata()) if self.parent_origins is None: self.parent_origins = metadata_fetcher.get_parent_origins() return metadata + @contextlib.contextmanager + def statsd_timed(self, name, tags={}): + with statsd.timed( + f"{STATSD_PREFIX}_operation_duration_seconds", + tags={"visit_type": self.visit_type, "operation": name, **tags}, + ): + yield + + def statsd_timing(self, name, value, tags={}): + statsd.timing( + f"{STATSD_PREFIX}_operation_duration_seconds", + value, + tags={"visit_type": self.visit_type, "operation": name, **tags}, + ) + class DVCSLoader(BaseLoader): """This base class is a pattern for dvcs loaders (e.g. git, mercurial). diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py --- a/swh/loader/core/tests/test_loader.py +++ b/swh/loader/core/tests/test_loader.py @@ -6,7 +6,8 @@ import datetime import hashlib import logging -from unittest.mock import MagicMock +import time +from unittest.mock import MagicMock, call import pytest @@ -277,6 +278,80 @@ assert loader.loaded_snapshot_id is None +@pytest.mark.parametrize("success", [True, False]) +def test_loader_timings(swh_storage, mocker, success): + current_time = time.time() + mocker.patch("time.monotonic", side_effect=lambda: current_time) + mocker.patch("swh.core.statsd.monotonic", side_effect=lambda: current_time) + + runtimes = { + "pre_cleanup": 2.0, + "build_extrinsic_origin_metadata": 3.0, + "prepare": 5.0, + "fetch_data": 7.0, + "store_data": 11.0, + "post_load": 13.0, + "flush": 17.0, + "cleanup": 23.0, + } + + class TimedLoader(BaseLoader): + visit_type = "my-visit-type" + + def __getattribute__(self, method_name): + if method_name == "visit_status" and not success: + + def crashy(): + raise Exception("oh no") + + return crashy + + if method_name not in runtimes: + return super().__getattribute__(method_name) + + def meth(*args, **kwargs): + nonlocal current_time + current_time += runtimes[method_name] + + return meth + + statsd_report = mocker.patch("swh.core.statsd.statsd._report") + + loader = TimedLoader(swh_storage, origin_url="http://example.org/hello.git") + loader.load() + + if success: + expected_tags = { + "post_load": {"success": True, "status": "full"}, + "flush": {"success": True, "status": "full"}, + "cleanup": {"success": True, "status": "full"}, + } + else: + expected_tags = { + "post_load": {"success": False, "status": "failed"}, + "flush": {"success": False, "status": "failed"}, + "cleanup": {"success": False, "status": "failed"}, + } + + # note that this is a list equality, so order of entries in 'runtimes' matters. + # This is not perfect, but call() objects are not hashable so it's simpler this way, + # even if not perfect. + assert statsd_report.mock_calls == [ + call( + "swh_loader_operation_duration_seconds", + "ms", + value * 1000, + { + "visit_type": "my-visit-type", + "operation": key, + **expected_tags.get(key, {}), + }, + 1, + ) + for (key, value) in runtimes.items() + ] + + class DummyDVCSLoaderExc(DummyDVCSLoader): """A loader which raises an exception when loading some contents"""