diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -13,6 +13,7 @@ from swh.storage.common import db_transaction_generator, db_transaction from swh.storage.exc import StorageDBError +from swh.storage.metrics import send_metric, timed, process_metrics from . import converters from .db import Db @@ -166,6 +167,8 @@ indexer_configuration_id, limit=limit, db=db, cur=cur) + @timed + @process_metrics @db_transaction() def content_mimetype_add( self, mimetypes: List[Dict], conflict_update: bool = False, @@ -184,6 +187,8 @@ ['id', 'mimetype', 'encoding', 'indexer_configuration_id'], cur) count = db.content_mimetype_add_from_temp(conflict_update, cur) + send_metric('content_mimetype:add', + count=count, method_name='content_mimetype_add') return { 'content_mimetype:add': count } @@ -205,6 +210,8 @@ yield converters.db_to_language( dict(zip(db.content_language_cols, c))) + @timed + @process_metrics @db_transaction() def content_language_add( self, languages: List[Dict], @@ -223,6 +230,8 @@ ['id', 'lang', 'indexer_configuration_id'], cur) count = db.content_language_add_from_temp(conflict_update, cur) + send_metric('content_language:add', + count=count, method_name='content_language_add') return { 'content_language:add': count } @@ -237,6 +246,8 @@ for c in db.content_ctags_get_from_list(ids, cur): yield converters.db_to_ctags(dict(zip(db.content_ctags_cols, c))) + @timed + @process_metrics @db_transaction() def content_ctags_add( self, ctags: List[Dict], conflict_update: bool = False, @@ -259,6 +270,8 @@ cur=cur) count = db.content_ctags_add_from_temp(conflict_update, cur) + send_metric('content_ctags:add', + count=count, method_name='content_ctags_add') return { 'content_ctags:add': count } @@ -282,6 +295,8 @@ for id_, facts in d.items(): yield {id_: facts} + @timed + @process_metrics @db_transaction() def content_fossology_license_add( self, licenses: List[Dict], conflict_update: bool = False, @@ -301,6 +316,8 @@ cur=cur) count = db.content_fossology_license_add_from_temp( conflict_update, cur) + send_metric('content_fossology_license:add', + count=count, method_name='content_fossology_license_add') return { 'content_fossology_license:add': count } @@ -324,6 +341,8 @@ yield converters.db_to_metadata( dict(zip(db.content_metadata_cols, c))) + @timed + @process_metrics @db_transaction() def content_metadata_add( self, metadata: List[Dict], conflict_update: bool = False, @@ -337,6 +356,8 @@ ['id', 'metadata', 'indexer_configuration_id'], cur) count = db.content_metadata_add_from_temp(conflict_update, cur) + send_metric('content_metadata:add', + count=count, method_name='content_metadata_add') return { 'content_metadata:add': count, } @@ -353,6 +374,8 @@ yield converters.db_to_metadata( dict(zip(db.revision_intrinsic_metadata_cols, c))) + @timed + @process_metrics @db_transaction() def revision_intrinsic_metadata_add( self, metadata: List[Dict], conflict_update: bool = False, @@ -368,10 +391,14 @@ cur) count = db.revision_intrinsic_metadata_add_from_temp( conflict_update, cur) + send_metric('revision_intrinsic_metadata:add', + count=count, method_name='revision_intrinsic_metadata_add') return { 'revision_intrinsic_metadata:add': count, } + @timed + @process_metrics @db_transaction() def revision_intrinsic_metadata_delete( self, entries: List[Dict], db=None, cur=None) -> Dict: @@ -386,6 +413,8 @@ yield converters.db_to_metadata( dict(zip(db.origin_intrinsic_metadata_cols, c))) + @timed + @process_metrics @db_transaction() def origin_intrinsic_metadata_add( self, metadata: List[Dict], conflict_update: bool = False, @@ -402,10 +431,14 @@ cur) count = db.origin_intrinsic_metadata_add_from_temp( conflict_update, cur) + send_metric('content_origin_intrinsic:add', + count=count, method_name='content_origin_intrinsic_add') return { 'origin_intrinsic_metadata:add': count, } + @timed + @process_metrics @db_transaction() def origin_intrinsic_metadata_delete( self, entries: List[Dict], db=None, cur=None) -> Dict: diff --git a/swh/indexer/storage/metrics.py b/swh/indexer/storage/metrics.py new file mode 100644 --- /dev/null +++ b/swh/indexer/storage/metrics.py @@ -0,0 +1,79 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import wraps +import logging + +from swh.core.statsd import statsd + +OPERATIONS_METRIC = 'swh_indexer_storage_operations_total' +OPERATIONS_UNIT_METRIC = "swh_indexer_storage_operations_{unit}_total" +DURATION_METRIC = "swh_indexer_storage_request_duration_seconds" + + +def timed(f): + """Time that function! + + """ + @wraps(f) + def d(*a, **kw): + with statsd.timed(DURATION_METRIC, tags={'endpoint': f.__name__}): + return f(*a, **kw) + + return d + + +def send_metric(metric, count, method_name): + """Send statsd metric with count for method `method_name` + + If count is 0, the metric is discarded. If the metric is not + parseable, the metric is discarded with a log message. + + Args: + metric (str): Metric's name (e.g content:add, content:add:bytes) + count (int): Associated value for the metric + method_name (str): Method's name + + Returns: + Bool to explicit if metric has been set or not + """ + if count == 0: + return False + + metric_type = metric.split(':') + _length = len(metric_type) + if _length == 2: + object_type, operation = metric_type + metric_name = OPERATIONS_METRIC + elif _length == 3: + object_type, operation, unit = metric_type + metric_name = OPERATIONS_UNIT_METRIC.format(unit=unit) + else: + logging.warning('Skipping unknown metric {%s: %s}' % ( + metric, count)) + return False + + statsd.increment( + metric_name, count, tags={ + 'endpoint': method_name, + 'object_type': object_type, + 'operation': operation, + }) + return True + + +def process_metrics(f): + """Increment object counters for the decorated function. + + """ + @wraps(f) + def d(*a, **kw): + r = f(*a, **kw) + for metric, count in r.items(): + send_metric(metric=metric, count=count, method_name=f.__name__) + + return r + + return d diff --git a/swh/indexer/tests/storage/test_metrics.py b/swh/indexer/tests/storage/test_metrics.py new file mode 100644 --- /dev/null +++ b/swh/indexer/tests/storage/test_metrics.py @@ -0,0 +1,53 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from unittest.mock import patch + +from swh.indexer.storage.metrics import ( + send_metric, OPERATIONS_METRIC, OPERATIONS_UNIT_METRIC +) + + +def test_send_metric_unknown_unit(): + r = send_metric('content', count=10, method_name='content_add') + assert r is False + r = send_metric('sthg:add:bytes:extra', count=10, method_name='sthg_add') + assert r is False + + +def test_send_metric_no_value(): + r = send_metric('content_mimetype:add', count=0, + method_name='content_mimetype_add') + assert r is False + + +@patch('swh.indexer.storage.metrics.statsd.increment') +def test_send_metric_no_unit(mock_statsd): + r = send_metric('content_mimetype:add', count=10, + method_name='content_mimetype_add') + + mock_statsd.assert_called_with(OPERATIONS_METRIC, 10, tags={ + 'endpoint': 'content_mimetype_add', + 'object_type': 'content_mimetype', + 'operation': 'add', + }) + + assert r + + +@patch('swh.indexer.storage.metrics.statsd.increment') +def test_send_metric_unit(mock_statsd): + unit_ = 'bytes' + r = send_metric('c:add:%s' % unit_, count=100, method_name='c_add') + + expected_metric = OPERATIONS_UNIT_METRIC.format(unit=unit_) + mock_statsd.assert_called_with( + expected_metric, 100, tags={ + 'endpoint': 'c_add', + 'object_type': 'c', + 'operation': 'add', + }) + + assert r