Page MenuHomeSoftware Heritage

D5103.id.diff
No OneTemporary

D5103.id.diff

diff --git a/swh/search/api/server.py b/swh/search/api/server.py
--- a/swh/search/api/server.py
+++ b/swh/search/api/server.py
@@ -10,6 +10,7 @@
from swh.core.api import RPCServerApp
from swh.core.api import encode_data_server as encode_data
from swh.core.api import error_handler
+from swh.search.metrics import timed
from .. import get_search
from ..interface import SearchInterface
@@ -34,6 +35,7 @@
@app.route("/")
+@timed
def index():
return "SWH Search API server"
diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py
--- a/swh/search/elasticsearch.py
+++ b/swh/search/elasticsearch.py
@@ -6,14 +6,14 @@
import base64
from typing import Any, Dict, Iterable, Iterator, List, Optional
-from elasticsearch import Elasticsearch
-from elasticsearch.helpers import bulk, scan
+from elasticsearch import Elasticsearch, helpers
import msgpack
from swh.indexer import codemeta
from swh.model import model
from swh.model.identifiers import origin_identifier
from swh.search.interface import PagedResult
+from swh.search.metrics import send_metric, timed
def _sanitize_origin(origin):
@@ -68,6 +68,7 @@
if index_prefix:
self.origin_index = index_prefix + "_" + self.origin_index
+ @timed
def check(self):
return self._backend.ping()
@@ -117,9 +118,11 @@
},
)
+ @timed
def flush(self) -> None:
self._backend.indices.refresh(index=self.origin_index)
+ @timed
def origin_update(self, documents: Iterable[Dict]) -> None:
documents = map(_sanitize_origin, documents)
documents_with_sha1 = (
@@ -135,15 +138,25 @@
}
for (sha1, document) in documents_with_sha1
]
- bulk(self._backend, actions, index=self.origin_index)
+
+ indexed_count, errors = helpers.bulk(
+ self._backend, actions, index=self.origin_index
+ )
+ assert isinstance(errors, List) # Make mypy happy
+
+ send_metric("document:index", count=indexed_count, method_name="origin_update")
+ send_metric(
+ "document:index_error", count=len(errors), method_name="origin_update"
+ )
def origin_dump(self) -> Iterator[model.Origin]:
- results = scan(self._backend, index=self.origin_index)
+ results = helpers.scan(self._backend, index=self.origin_index)
for hit in results:
yield self._backend.termvectors(
index=self.origin_index, id=hit["_id"], fields=["*"]
)
+ @timed
def origin_search(
self,
*,
diff --git a/swh/search/metrics.py b/swh/search/metrics.py
new file mode 100644
--- /dev/null
+++ b/swh/search/metrics.py
@@ -0,0 +1,63 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from functools import wraps
+import logging
+
+from swh.core.statsd import statsd
+
+OPERATIONS_METRIC = "swh_search_operations_total"
+DURATION_METRIC = "swh_search_request_duration_seconds"
+
+
+def timed(f):
+ """Time that function!
+
+ """
+
+ @wraps(f)
+ def d(*a, **kw):
+ with statsd.timed(DURATION_METRIC, tags={"endpoint": f.__name__}):
+ return f(*a, **kw)
+
+ return d
+
+
+def send_metric(metric: str, count: int, method_name: str) -> bool:
+ """Send statsd metric with count for method `method_name`
+
+ If count is 0, the metric is discarded. If the metric is not
+ parseable, the metric is discarded with a log message.
+
+ Args:
+ metric: Metric's name (e.g content:add, content:add:bytes)
+ count: Associated value for the metric
+ method_name: Method's name
+
+ Returns:
+ Bool to explicit if metric has been set or not
+ """
+ if count == 0:
+ return False
+
+ metric_type = metric.split(":")
+ _length = len(metric_type)
+ if _length == 2:
+ object_type, operation = metric_type
+ metric_name = OPERATIONS_METRIC
+ else:
+ logging.warning("Skipping unknown metric {%s: %s}" % (metric, count))
+ return False
+
+ statsd.increment(
+ metric_name,
+ count,
+ tags={
+ "endpoint": method_name,
+ "object_type": object_type,
+ "operation": operation,
+ },
+ )
+ return True
diff --git a/swh/search/tests/test_elasticsearch.py b/swh/search/tests/test_elasticsearch.py
--- a/swh/search/tests/test_elasticsearch.py
+++ b/swh/search/tests/test_elasticsearch.py
@@ -7,14 +7,17 @@
import pytest
+from swh.search.metrics import OPERATIONS_METRIC
+
from .test_search import CommonSearchTest
class BaseElasticsearchTest(unittest.TestCase):
@pytest.fixture(autouse=True)
- def _instantiate_search(self, swh_search, elasticsearch_host):
+ def _instantiate_search(self, swh_search, elasticsearch_host, mocker):
self._elasticsearch_host = elasticsearch_host
self.search = swh_search
+ self.mocker = mocker
def reset(self):
self.search.deinitialize()
@@ -22,4 +25,47 @@
class TestElasticsearchSearch(CommonSearchTest, BaseElasticsearchTest):
- pass
+ def test_metrics_update_duration(self):
+ mock = self.mocker.patch("swh.search.metrics.statsd.timing")
+
+ for url in ["http://foobar.bar", "http://foobar.baz"]:
+ self.search.origin_update([{"url": url}])
+
+ assert mock.call_count == 2
+
+ def test_metrics_search_duration(self):
+ mock = self.mocker.patch("swh.search.metrics.statsd.timing")
+
+ for url_pattern in ["foobar", "foobaz"]:
+ self.search.origin_search(url_pattern=url_pattern, with_visit=True)
+
+ assert mock.call_count == 2
+
+ def test_metrics_indexation_counters(self):
+ mock_es = self.mocker.patch("elasticsearch.helpers.bulk")
+ mock_es.return_value = 2, ["error"]
+
+ mock_metrics = self.mocker.patch("swh.search.metrics.statsd.increment")
+
+ self.search.origin_update([{"url": "http://foobar.baz"}])
+
+ assert mock_metrics.call_count == 2
+
+ mock_metrics.assert_any_call(
+ OPERATIONS_METRIC,
+ 2,
+ tags={
+ "endpoint": "origin_update",
+ "object_type": "document",
+ "operation": "index",
+ },
+ )
+ mock_metrics.assert_any_call(
+ OPERATIONS_METRIC,
+ 1,
+ tags={
+ "endpoint": "origin_update",
+ "object_type": "document",
+ "operation": "index_error",
+ },
+ )

File Metadata

Mime Type
text/plain
Expires
Mar 17 2025, 6:59 PM (7 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216318

Event Timeline