diff --git a/swh/search/api/server.py b/swh/search/api/server.py --- a/swh/search/api/server.py +++ b/swh/search/api/server.py @@ -10,6 +10,7 @@ from swh.core.api import RPCServerApp from swh.core.api import encode_data_server as encode_data from swh.core.api import error_handler +from swh.search.metrics import timed from .. import get_search from ..interface import SearchInterface @@ -34,6 +35,7 @@ @app.route("/") +@timed def index(): return "SWH Search API server" diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py --- a/swh/search/elasticsearch.py +++ b/swh/search/elasticsearch.py @@ -6,14 +6,14 @@ import base64 from typing import Any, Dict, Iterable, Iterator, List, Optional -from elasticsearch import Elasticsearch -from elasticsearch.helpers import bulk, scan +from elasticsearch import Elasticsearch, helpers import msgpack from swh.indexer import codemeta from swh.model import model from swh.model.identifiers import origin_identifier from swh.search.interface import PagedResult +from swh.search.metrics import send_metric, timed def _sanitize_origin(origin): @@ -68,6 +68,7 @@ if index_prefix: self.origin_index = index_prefix + "_" + self.origin_index + @timed def check(self): return self._backend.ping() @@ -117,9 +118,11 @@ }, ) + @timed def flush(self) -> None: self._backend.indices.refresh(index=self.origin_index) + @timed def origin_update(self, documents: Iterable[Dict]) -> None: documents = map(_sanitize_origin, documents) documents_with_sha1 = ( @@ -135,15 +138,24 @@ } for (sha1, document) in documents_with_sha1 ] - bulk(self._backend, actions, index=self.origin_index) + + indexed_count, errors = helpers.bulk( + self._backend, actions, index=self.origin_index + ) + + send_metric("document:index", count=indexed_count, method_name="origin_update") + send_metric( + "document:index_error", count=len(errors), method_name="origin_update" + ) def origin_dump(self) -> Iterator[model.Origin]: - results = scan(self._backend, index=self.origin_index) + results = helpers.scan(self._backend, index=self.origin_index) for hit in results: yield self._backend.termvectors( index=self.origin_index, id=hit["_id"], fields=["*"] ) + @timed def origin_search( self, *, diff --git a/swh/search/metrics.py b/swh/search/metrics.py new file mode 100644 --- /dev/null +++ b/swh/search/metrics.py @@ -0,0 +1,63 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import wraps +import logging + +from swh.core.statsd import statsd + +OPERATIONS_METRIC = "swh_search_operations_total" +DURATION_METRIC = "swh_search_request_duration_seconds" + + +def timed(f): + """Time that function! + + """ + + @wraps(f) + def d(*a, **kw): + with statsd.timed(DURATION_METRIC, tags={"endpoint": f.__name__}): + return f(*a, **kw) + + return d + + +def send_metric(metric: str, count: int, method_name: str) -> bool: + """Send statsd metric with count for method `method_name` + + If count is 0, the metric is discarded. If the metric is not + parseable, the metric is discarded with a log message. + + Args: + metric: Metric's name (e.g content:add, content:add:bytes) + count: Associated value for the metric + method_name: Method's name + + Returns: + Bool to explicit if metric has been set or not + """ + if count == 0: + return False + + metric_type = metric.split(":") + _length = len(metric_type) + if _length == 2: + object_type, operation = metric_type + metric_name = OPERATIONS_METRIC + else: + logging.warning("Skipping unknown metric {%s: %s}" % (metric, count)) + return False + + statsd.increment( + metric_name, + count, + tags={ + "endpoint": method_name, + "object_type": object_type, + "operation": operation, + }, + ) + return True diff --git a/swh/search/tests/test_elasticsearch.py b/swh/search/tests/test_elasticsearch.py --- a/swh/search/tests/test_elasticsearch.py +++ b/swh/search/tests/test_elasticsearch.py @@ -7,14 +7,17 @@ import pytest +from swh.search.metrics import OPERATIONS_METRIC + from .test_search import CommonSearchTest class BaseElasticsearchTest(unittest.TestCase): @pytest.fixture(autouse=True) - def _instantiate_search(self, swh_search, elasticsearch_host): + def _instantiate_search(self, swh_search, elasticsearch_host, mocker): self._elasticsearch_host = elasticsearch_host self.search = swh_search + self.mocker = mocker def reset(self): self.search.deinitialize() @@ -22,4 +25,47 @@ class TestElasticsearchSearch(CommonSearchTest, BaseElasticsearchTest): - pass + def test_metrics_update_duration(self): + mock = self.mocker.patch("swh.search.metrics.statsd.timing") + + for url in ["http://foobar.bar", "http://foobar.baz"]: + self.search.origin_update([{"url": url}]) + + assert mock.call_count == 2 + + def test_metrics_search_duration(self): + mock = self.mocker.patch("swh.search.metrics.statsd.timing") + + for url_pattern in ["foobar", "foobaz"]: + self.search.origin_search(url_pattern=url_pattern, with_visit=True) + + assert mock.call_count == 2 + + def test_metrics_indexation_counters(self): + mock_es = self.mocker.patch("elasticsearch.helpers.bulk") + mock_es.return_value = 2, ["error"] + + mock_metrics = self.mocker.patch("swh.search.metrics.statsd.increment") + + self.search.origin_update([{"url": "http://foobar.baz"}]) + + assert mock_metrics.call_count == 2 + + mock_metrics.assert_any_call( + OPERATIONS_METRIC, + 2, + tags={ + "endpoint": "origin_update", + "object_type": "document", + "operation": "index", + }, + ) + mock_metrics.assert_any_call( + OPERATIONS_METRIC, + 1, + tags={ + "endpoint": "origin_update", + "object_type": "document", + "operation": "index_error", + }, + )