Changeset View
Changeset View
Standalone View
Standalone View
swh/core/statsd.py
Show First 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | |||||
# - made the tags a dict instead of a list (prometheus-statsd-exporter only | # - made the tags a dict instead of a list (prometheus-statsd-exporter only | ||||
# supports tags with a value, mirroring prometheus) | # supports tags with a value, mirroring prometheus) | ||||
# - switch from time.time to time.monotonic | # - switch from time.time to time.monotonic | ||||
# - improve unit test coverage | # - improve unit test coverage | ||||
# - documentation cleanup | # - documentation cleanup | ||||
from asyncio import iscoroutinefunction | from asyncio import iscoroutinefunction | ||||
from contextlib import contextmanager | |||||
from functools import wraps | from functools import wraps | ||||
import itertools | import itertools | ||||
import logging | import logging | ||||
import os | import os | ||||
from random import random | from random import random | ||||
import re | import re | ||||
import socket | import socket | ||||
import threading | import threading | ||||
from time import monotonic | from time import monotonic | ||||
from typing import Collection, Dict, Optional | |||||
import warnings | import warnings | ||||
log = logging.getLogger("swh.core.statsd") | log = logging.getLogger("swh.core.statsd") | ||||
class TimedContextManagerDecorator(object): | class TimedContextManagerDecorator(object): | ||||
""" | """ | ||||
A context manager and a decorator which will report the elapsed time in | A context manager and a decorator which will report the elapsed time in | ||||
▲ Show 20 Lines • Show All 368 Lines • ▼ Show 20 Lines | class Statsd(object): | ||||
def _add_constant_tags(self, tags): | def _add_constant_tags(self, tags): | ||||
return { | return { | ||||
str(k): str(v) | str(k): str(v) | ||||
for k, v in itertools.chain( | for k, v in itertools.chain( | ||||
self.constant_tags.items(), (tags if tags else {}).items(), | self.constant_tags.items(), (tags if tags else {}).items(), | ||||
) | ) | ||||
} | } | ||||
@contextmanager | |||||
def status_gauge( | |||||
self, | |||||
metric_name: str, | |||||
statuses: Collection[str], | |||||
tags: Optional[Dict[str, str]] = None, | |||||
): | |||||
"""Context manager to keep track of status changes as a gauge | |||||
In addition to the `metric_name` and `tags` arguments, it expects a | |||||
list of `statuses` to declare which statuses are possible, and returns | |||||
a callable as context manager. This callable takes ones of the possible | |||||
statuses as argument. Typical usage would be: | |||||
>>> with statsd.status_gauge( | |||||
"metric_name", ["starting", "processing", "waiting"]) as set_status: | |||||
set_status("starting") | |||||
# ... | |||||
set_status("waiting") | |||||
# ... | |||||
""" | |||||
if tags is None: | |||||
tags = {} | |||||
current_status: Optional[str] = None | |||||
# reset status gauges to make sure they do not "leak" | |||||
for status in statuses: | |||||
self.gauge(metric_name, 0, {**tags, "status": status}) | |||||
def set_status(new_status: str): | |||||
nonlocal current_status | |||||
assert isinstance(tags, dict) | |||||
if new_status not in statuses: | |||||
raise ValueError(f"{new_status} not in {statuses}") | |||||
if current_status and new_status != current_status: | |||||
self.gauge(metric_name, 0, {**tags, "status": current_status}) | |||||
current_status = new_status | |||||
self.gauge(metric_name, 1, {**tags, "status": current_status}) | |||||
yield set_status | |||||
# reset gauges on exit | |||||
for status in statuses: | |||||
self.gauge(metric_name, 0, {**tags, "status": status}) | |||||
statsd = Statsd() | statsd = Statsd() |