diff --git a/swh/core/statsd.py b/swh/core/statsd.py --- a/swh/core/statsd.py +++ b/swh/core/statsd.py @@ -52,6 +52,7 @@ from asyncio import iscoroutinefunction +from contextlib import contextmanager from functools import wraps import itertools import logging @@ -61,6 +62,7 @@ import socket import threading from time import monotonic +from typing import Collection, Dict, Optional import warnings log = logging.getLogger("swh.core.statsd") @@ -445,5 +447,49 @@ ) } + @contextmanager + def status_gauge( + self, + metric_name: str, + statuses: Collection[str], + tags: Optional[Dict[str, str]] = None, + ): + """Context manager to keep track of status changes as a gauge + + In addition to the `metric_name` and `tags` arguments, it expects a + list of `statuses` to declare which statuses are possible, and returns + a callable as context manager. This callable takes ones of the possible + statuses as argument. Typical usage would be: + + >>> with statsd.status_gauge( + "metric_name", ["starting", "processing", "waiting"]) as set_status: + set_status("starting") + # ... + set_status("waiting") + # ... + """ + if tags is None: + tags = {} + current_status: Optional[str] = None + # reset status gauges to make sure they do not "leak" + for status in statuses: + self.gauge(metric_name, 0, {**tags, "status": status}) + + def set_status(new_status: str): + nonlocal current_status + assert isinstance(tags, dict) + if new_status not in statuses: + raise ValueError(f"{new_status} not in {statuses}") + if current_status and new_status != current_status: + self.gauge(metric_name, 0, {**tags, "status": current_status}) + current_status = new_status + self.gauge(metric_name, 1, {**tags, "status": current_status}) + + yield set_status + + # reset gauges on exit + for status in statuses: + self.gauge(metric_name, 0, {**tags, "status": status}) + statsd = Statsd() diff --git a/swh/core/tests/test_statsd.py b/swh/core/tests/test_statsd.py --- a/swh/core/tests/test_statsd.py +++ b/swh/core/tests/test_statsd.py @@ -549,3 +549,55 @@ assert t.elapsed >= 0 assert statsd.socket.recv() == "test_timer:%s|ms" % t.elapsed + + +def test_status_gauge(statsd): + with statsd.status_gauge("test_status_gauge", ["s1", "s2", "s3"]) as set_status: + set_status("s1") + set_status("s2") + set_status("s3") + + # enter the context manager: initialisation of gauges for listed statuses + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s1" + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s2" + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s3" + # set_status("s1") + assert statsd.socket.recv() == "test_status_gauge:1|g|#status:s1" + # set_status("s2") + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s1" + assert statsd.socket.recv() == "test_status_gauge:1|g|#status:s2" + # set_status("s3") + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s2" + assert statsd.socket.recv() == "test_status_gauge:1|g|#status:s3" + # exit the context manager: cleanup gauges + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s1" + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s2" + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s3" + + +def test_status_gauge_error(statsd): + with statsd.status_gauge("test_status_gauge", ["s1", "s2", "s3"]) as set_status: + with pytest.raises(ValueError): + set_status("s4") + + +def test_status_gauge_repeated(statsd): + with statsd.status_gauge("test_status_gauge", ["s1", "s2", "s3"]) as set_status: + set_status("s1") + set_status("s1") + set_status("s1") + + # enter the context manager: initialisation of gauges for listed statuses + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s1" + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s2" + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s3" + # set_status("s1") + assert statsd.socket.recv() == "test_status_gauge:1|g|#status:s1" + # set_status("s1") + assert statsd.socket.recv() == "test_status_gauge:1|g|#status:s1" + # set_status("s1") + assert statsd.socket.recv() == "test_status_gauge:1|g|#status:s1" + # exit the context manager: cleanup gauges + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s1" + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s2" + assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s3"