diff --git a/swh/core/statsd.py b/swh/core/statsd.py index 29b9240..b366d59 100644 --- a/swh/core/statsd.py +++ b/swh/core/statsd.py @@ -1,496 +1,500 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Initially imported from https://github.com/DataDog/datadogpy/ # at revision 62b3a3e89988dc18d78c282fe3ff5d1813917436 # # Copyright (c) 2015, Datadog # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of Datadog nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # # Vastly adapted for integration in swh.core: # # - Removed python < 3.5 compat code # - trimmed the imports down to be a single module # - adjust some options: # - drop unix socket connection option # - add environment variable support for setting the statsd host and # port (pulled the idea from the main python statsd module) # - only send timer metrics in milliseconds (that's what # prometheus-statsd-exporter expects) # - drop DataDog-specific metric types (that are unsupported in # prometheus-statsd-exporter) # - made the tags a dict instead of a list (prometheus-statsd-exporter only # supports tags with a value, mirroring prometheus) # - switch from time.time to time.monotonic # - improve unit test coverage # - documentation cleanup from asyncio import iscoroutinefunction from contextlib import contextmanager from functools import wraps import itertools import logging import os from random import random import re import socket import threading from time import monotonic from typing import Collection, Dict, Optional import warnings log = logging.getLogger("swh.core.statsd") class TimedContextManagerDecorator(object): """ A context manager and a decorator which will report the elapsed time in the context OR in a function call. Attributes: elapsed (float): the elapsed time at the point of completion """ def __init__( self, statsd, metric=None, error_metric=None, tags=None, sample_rate=1 ): self.statsd = statsd self.metric = metric self.error_metric = error_metric - self.tags = tags + self.tags = tags or {} self.sample_rate = sample_rate self.elapsed = None # this is for testing purpose def __call__(self, func): """ Decorator which returns the elapsed time of the function call. Default to the function name if metric was not provided. """ if not self.metric: self.metric = "%s.%s" % (func.__module__, func.__name__) # Coroutines if iscoroutinefunction(func): @wraps(func) async def wrapped_co(*args, **kwargs): start = monotonic() try: result = await func(*args, **kwargs) - except: # noqa - self._send_error() + except BaseException as e: + self._send_error(error_type=type(e).__name__) raise self._send(start) return result return wrapped_co # Others @wraps(func) def wrapped(*args, **kwargs): start = monotonic() try: result = func(*args, **kwargs) - except: # noqa - self._send_error() + except BaseException as e: + self._send_error(error_type=type(e).__name__) raise self._send(start) return result return wrapped def __enter__(self): if not self.metric: raise TypeError("Cannot used timed without a metric!") self._start = monotonic() return self def __exit__(self, type, value, traceback): # Report the elapsed time of the context manager if no error. if type is None: self._send(self._start) else: - self._send_error() + self._send_error(error_type=type.__name__) def _send(self, start): elapsed = (monotonic() - start) * 1000 self.statsd.timing( self.metric, elapsed, tags=self.tags, sample_rate=self.sample_rate ) self.elapsed = elapsed - def _send_error(self): + def _send_error(self, error_type=None): if self.error_metric is None: self.error_metric = self.metric + "_error_count" - self.statsd.increment(self.error_metric, tags=self.tags) + if error_type is not None: + tags = {**self.tags, "error_type": error_type} + else: + tags = self.tags + self.statsd.increment(self.error_metric, tags=tags) def start(self): """Start the timer""" self.__enter__() def stop(self): """Stop the timer, send the metric value""" self.__exit__(None, None, None) class Statsd(object): """Initialize a client to send metrics to a StatsD server. Arguments: host (str): the host of the StatsD server. Defaults to localhost. port (int): the port of the StatsD server. Defaults to 8125. max_buffer_size (int): Maximum number of metrics to buffer before sending to the server if sending metrics in batch namespace (str): Namespace to prefix all metric names constant_tags (Dict[str, str]): Tags to attach to all metrics Note: This class also supports the following environment variables: STATSD_HOST Override the default host of the statsd server STATSD_PORT Override the default port of the statsd server STATSD_TAGS Tags to attach to every metric reported. Example value: "label:value,other_label:other_value" """ def __init__( self, host=None, port=None, max_buffer_size=50, namespace=None, constant_tags=None, ): # Connection if host is None: host = os.environ.get("STATSD_HOST") or "localhost" self.host = host if port is None: port = os.environ.get("STATSD_PORT") or 8125 self.port = int(port) # Socket self._socket = None self.lock = threading.Lock() self.max_buffer_size = max_buffer_size self._send = self._send_to_server self.encoding = "utf-8" # Tags self.constant_tags = {} tags_envvar = os.environ.get("STATSD_TAGS", "") for tag in tags_envvar.split(","): if not tag: continue if ":" not in tag: warnings.warn( "STATSD_TAGS needs to be in key:value format, " "%s invalid" % tag, UserWarning, ) continue k, v = tag.split(":", 1) # look for a possible env var substitution, using $NAME or ${NAME} format m = re.match(r"^[$]([{])?(?P\w+)(?(1)[}]|)$", v) if m: envvar = m.group("envvar") if envvar in os.environ: v = os.environ[envvar] self.constant_tags[k] = v if constant_tags: self.constant_tags.update( {str(k): str(v) for k, v in constant_tags.items()} ) # Namespace if namespace is not None: namespace = str(namespace) self.namespace = namespace def __enter__(self): self.open_buffer(self.max_buffer_size) return self def __exit__(self, type, value, traceback): self.close_buffer() def gauge(self, metric, value, tags=None, sample_rate=1): """ Record the value of a gauge, optionally setting a list of tags and a sample rate. >>> statsd.gauge('users.online', 123) >>> statsd.gauge('active.connections', 1001, tags={"protocol": "http"}) """ return self._report(metric, "g", value, tags, sample_rate) def increment(self, metric, value=1, tags=None, sample_rate=1): """ Increment a counter, optionally setting a value, tags and a sample rate. >>> statsd.increment('page.views') >>> statsd.increment('files.transferred', 124) """ self._report(metric, "c", value, tags, sample_rate) def decrement(self, metric, value=1, tags=None, sample_rate=1): """ Decrement a counter, optionally setting a value, tags and a sample rate. >>> statsd.decrement('files.remaining') >>> statsd.decrement('active.connections', 2) """ metric_value = -value if value else value self._report(metric, "c", metric_value, tags, sample_rate) def histogram(self, metric, value, tags=None, sample_rate=1): """ Sample a histogram value, optionally setting tags and a sample rate. >>> statsd.histogram('uploaded.file.size', 1445) >>> statsd.histogram('file.count', 26, tags={"filetype": "python"}) """ self._report(metric, "h", value, tags, sample_rate) def timing(self, metric, value, tags=None, sample_rate=1): """ Record a timing, optionally setting tags and a sample rate. >>> statsd.timing("query.response.time", 1234) """ self._report(metric, "ms", value, tags, sample_rate) def timed(self, metric=None, error_metric=None, tags=None, sample_rate=1): """ A decorator or context manager that will measure the distribution of a function's/context's run time. Optionally specify a list of tags or a sample rate. If the metric is not defined as a decorator, the module name and function name will be used. The metric is required as a context manager. :: @statsd.timed('user.query.time', sample_rate=0.5) def get_user(user_id): # Do what you need to ... pass # Is equivalent to ... with statsd.timed('user.query.time', sample_rate=0.5): # Do what you need to ... pass # Is equivalent to ... start = time.monotonic() try: get_user(user_id) finally: statsd.timing('user.query.time', time.monotonic() - start) """ return TimedContextManagerDecorator( statsd=self, metric=metric, error_metric=error_metric, tags=tags, sample_rate=sample_rate, ) def set(self, metric, value, tags=None, sample_rate=1): """ Sample a set value. >>> statsd.set('visitors.uniques', 999) """ self._report(metric, "s", value, tags, sample_rate) @property def socket(self): """ Return a connected socket. Note: connect the socket before assigning it to the class instance to avoid bad thread race conditions. """ with self.lock: if not self._socket: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.connect((self.host, self.port)) self._socket = sock return self._socket def open_buffer(self, max_buffer_size=50): """ Open a buffer to send a batch of metrics in one packet. You can also use this as a context manager. >>> with Statsd() as batch: ... batch.gauge('users.online', 123) ... batch.gauge('active.connections', 1001) """ self.max_buffer_size = max_buffer_size self.buffer = [] self._send = self._send_to_buffer def close_buffer(self): """ Flush the buffer and switch back to single metric packets. """ self._send = self._send_to_server if self.buffer: # Only send packets if there are packets to send self._flush_buffer() def close_socket(self): """ Closes connected socket if connected. """ with self.lock: if self._socket: self._socket.close() self._socket = None def _report(self, metric, metric_type, value, tags, sample_rate): """ Create a metric packet and send it. """ if value is None: return if sample_rate != 1 and random() > sample_rate: return # Resolve the full tag list tags = self._add_constant_tags(tags) # Create/format the metric packet payload = "%s%s:%s|%s%s%s" % ( (self.namespace + ".") if self.namespace else "", metric, value, metric_type, ("|@" + str(sample_rate)) if sample_rate != 1 else "", ("|#" + ",".join("%s:%s" % (k, v) for (k, v) in sorted(tags.items()))) if tags else "", ) # Send it self._send(payload) def _send_to_server(self, packet): try: # If set, use socket directly self.socket.send(packet.encode("utf-8")) except socket.timeout: return except socket.error: log.debug( "Error submitting statsd packet." " Dropping the packet and closing the socket." ) self.close_socket() def _send_to_buffer(self, packet): self.buffer.append(packet) if len(self.buffer) >= self.max_buffer_size: self._flush_buffer() def _flush_buffer(self): self._send_to_server("\n".join(self.buffer)) self.buffer = [] def _add_constant_tags(self, tags): return { str(k): str(v) for k, v in itertools.chain( self.constant_tags.items(), (tags if tags else {}).items(), ) } @contextmanager def status_gauge( self, metric_name: str, statuses: Collection[str], tags: Optional[Dict[str, str]] = None, ): """Context manager to keep track of status changes as a gauge In addition to the `metric_name` and `tags` arguments, it expects a list of `statuses` to declare which statuses are possible, and returns a callable as context manager. This callable takes ones of the possible statuses as argument. Typical usage would be: >>> with statsd.status_gauge( "metric_name", ["starting", "processing", "waiting"]) as set_status: set_status("starting") # ... set_status("waiting") # ... """ if tags is None: tags = {} current_status: Optional[str] = None # reset status gauges to make sure they do not "leak" for status in statuses: self.gauge(metric_name, 0, {**tags, "status": status}) def set_status(new_status: str): nonlocal current_status assert isinstance(tags, dict) if new_status not in statuses: raise ValueError(f"{new_status} not in {statuses}") if current_status and new_status != current_status: self.gauge(metric_name, 0, {**tags, "status": current_status}) current_status = new_status self.gauge(metric_name, 1, {**tags, "status": current_status}) yield set_status # reset gauges on exit for status in statuses: self.gauge(metric_name, 0, {**tags, "status": status}) statsd = Statsd() diff --git a/swh/core/tests/test_statsd.py b/swh/core/tests/test_statsd.py index 760a618..13d1bc6 100644 --- a/swh/core/tests/test_statsd.py +++ b/swh/core/tests/test_statsd.py @@ -1,603 +1,605 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Initially imported from https://github.com/DataDog/datadogpy/ # at revision 62b3a3e89988dc18d78c282fe3ff5d1813917436 # # Copyright (c) 2015, Datadog # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of Datadog nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # import socket import time import pytest from swh.core.pytest_plugin import FakeSocket from swh.core.statsd import Statsd, TimedContextManagerDecorator class BrokenSocket(FakeSocket): def send(self, payload): raise socket.error("Socket error") class SlowSocket(FakeSocket): def send(self, payload): raise socket.timeout("Socket timeout") def assert_almost_equal(a, b, delta): assert 0 <= abs(a - b) <= delta, f"|{a} - {b}| not within {delta}" def test_set(statsd): statsd.set("set", 123) assert statsd.socket.recv() == "set:123|s" def test_gauge(statsd): statsd.gauge("gauge", 123.4) assert statsd.socket.recv() == "gauge:123.4|g" def test_counter(statsd): statsd.increment("page.views") assert statsd.socket.recv() == "page.views:1|c" statsd.increment("page.views", 11) assert statsd.socket.recv() == "page.views:11|c" statsd.decrement("page.views") assert statsd.socket.recv() == "page.views:-1|c" statsd.decrement("page.views", 12) assert statsd.socket.recv() == "page.views:-12|c" def test_histogram(statsd): statsd.histogram("histo", 123.4) assert statsd.socket.recv() == "histo:123.4|h" def test_tagged_gauge(statsd): statsd.gauge("gt", 123.4, tags={"country": "china", "age": 45}) assert statsd.socket.recv() == "gt:123.4|g|#age:45,country:china" def test_tagged_counter(statsd): statsd.increment("ct", tags={"country": "españa"}) assert statsd.socket.recv() == "ct:1|c|#country:españa" def test_tagged_histogram(statsd): statsd.histogram("h", 1, tags={"test_tag": "tag_value"}) assert statsd.socket.recv() == "h:1|h|#test_tag:tag_value" def test_sample_rate(statsd): statsd.increment("c", sample_rate=0) assert not statsd.socket.recv() for i in range(10000): statsd.increment("sampled_counter", sample_rate=0.3) assert_almost_equal(3000, len(statsd.socket.payloads), 150) assert statsd.socket.recv() == "sampled_counter:1|c|@0.3" def test_tags_and_samples(statsd): for i in range(100): statsd.gauge("gst", 23, tags={"sampled": True}, sample_rate=0.9) assert_almost_equal(90, len(statsd.socket.payloads), 10) assert statsd.socket.recv() == "gst:23|g|@0.9|#sampled:True" def test_timing(statsd): statsd.timing("t", 123) assert statsd.socket.recv() == "t:123|ms" def test_metric_namespace(statsd): """ Namespace prefixes all metric names. """ statsd.namespace = "foo" statsd.gauge("gauge", 123.4) assert statsd.socket.recv() == "foo.gauge:123.4|g" # Test Client level constant tags def test_gauge_constant_tags(statsd): statsd.constant_tags = { "bar": "baz", } statsd.gauge("gauge", 123.4) assert statsd.socket.recv() == "gauge:123.4|g|#bar:baz" def test_counter_constant_tag_with_metric_level_tags(statsd): statsd.constant_tags = { "bar": "baz", "foo": True, } statsd.increment("page.views", tags={"extra": "extra"}) assert statsd.socket.recv() == "page.views:1|c|#bar:baz,extra:extra,foo:True" def test_gauge_constant_tags_with_metric_level_tags_twice(statsd): metric_level_tag = {"foo": "bar"} statsd.constant_tags = {"bar": "baz"} statsd.gauge("gauge", 123.4, tags=metric_level_tag) assert statsd.socket.recv() == "gauge:123.4|g|#bar:baz,foo:bar" # sending metrics multiple times with same metric-level tags # should not duplicate the tags being sent statsd.gauge("gauge", 123.4, tags=metric_level_tag) assert statsd.socket.recv() == "gauge:123.4|g|#bar:baz,foo:bar" def test_socket_error(statsd): statsd._socket = BrokenSocket() statsd.gauge("no error", 1) assert True, "success" def test_socket_timeout(statsd): statsd._socket = SlowSocket() statsd.gauge("no error", 1) assert True, "success" def test_timed(statsd): """ Measure the distribution of a function's run time. """ @statsd.timed("timed.test") def func(a, b, c=1, d=1): """docstring""" time.sleep(0.5) return (a, b, c, d) assert func.__name__ == "func" assert func.__doc__ == "docstring" result = func(1, 2, d=3) # Assert it handles args and kwargs correctly. assert result, (1, 2, 1 == 3) packet = statsd.socket.recv() name_value, type_ = packet.split("|") name, value = name_value.split(":") assert type_ == "ms" assert name == "timed.test" assert_almost_equal(500, float(value), 100) def test_timed_exception(statsd): """ Exception bubble out of the decorator and is reported to statsd as a dedicated counter. """ @statsd.timed("timed.test") def func(a, b, c=1, d=1): """docstring""" time.sleep(0.5) return (a / b, c, d) assert func.__name__ == "func" assert func.__doc__ == "docstring" with pytest.raises(ZeroDivisionError): func(1, 0) packet = statsd.socket.recv() - name_value, type_ = packet.split("|") + name_value, type_, tags = packet.split("|") name, value = name_value.split(":") assert type_ == "c" assert name == "timed.test_error_count" assert int(value) == 1 + assert tags == "#error_type:ZeroDivisionError" def test_timed_no_metric(statsd): """ Test using a decorator without providing a metric. """ @statsd.timed() def func(a, b, c=1, d=1): """docstring""" time.sleep(0.5) return (a, b, c, d) assert func.__name__ == "func" assert func.__doc__ == "docstring" result = func(1, 2, d=3) # Assert it handles args and kwargs correctly. assert result, (1, 2, 1 == 3) packet = statsd.socket.recv() name_value, type_ = packet.split("|") name, value = name_value.split(":") assert type_ == "ms" assert name == "swh.core.tests.test_statsd.func" assert_almost_equal(500, float(value), 100) def test_timed_coroutine(statsd): """ Measure the distribution of a coroutine function's run time. Warning: Python >= 3.5 only. """ import asyncio @statsd.timed("timed.test") @asyncio.coroutine def print_foo(): """docstring""" time.sleep(0.5) print("foo") loop = asyncio.new_event_loop() loop.run_until_complete(print_foo()) loop.close() # Assert packet = statsd.socket.recv() name_value, type_ = packet.split("|") name, value = name_value.split(":") assert type_ == "ms" assert name == "timed.test" assert_almost_equal(500, float(value), 100) def test_timed_context(statsd): """ Measure the distribution of a context's run time. """ # In milliseconds with statsd.timed("timed_context.test") as timer: assert isinstance(timer, TimedContextManagerDecorator) time.sleep(0.5) packet = statsd.socket.recv() name_value, type_ = packet.split("|") name, value = name_value.split(":") assert type_ == "ms" assert name == "timed_context.test" assert_almost_equal(500, float(value), 100) assert_almost_equal(500, timer.elapsed, 100) def test_timed_context_exception(statsd): """ Exception bubbles out of the `timed` context manager and is reported to statsd as a dedicated counter. """ class ContextException(Exception): pass def func(statsd): with statsd.timed("timed_context.test"): time.sleep(0.5) raise ContextException() # Ensure the exception was raised. with pytest.raises(ContextException): func(statsd) # Ensure the timing was recorded. packet = statsd.socket.recv() - name_value, type_ = packet.split("|") + name_value, type_, tags = packet.split("|") name, value = name_value.split(":") assert type_ == "c" assert name == "timed_context.test_error_count" assert int(value) == 1 + assert tags == "#error_type:ContextException" def test_timed_context_no_metric_name_exception(statsd): """Test that an exception occurs if using a context manager without a metric name. """ def func(statsd): with statsd.timed(): time.sleep(0.5) # Ensure the exception was raised. with pytest.raises(TypeError): func(statsd) # Ensure the timing was recorded. packet = statsd.socket.recv() assert packet is None def test_timed_start_stop_calls(statsd): timer = statsd.timed("timed_context.test") timer.start() time.sleep(0.5) timer.stop() packet = statsd.socket.recv() name_value, type_ = packet.split("|") name, value = name_value.split(":") assert type_ == "ms" assert name == "timed_context.test" assert_almost_equal(500, float(value), 100) def test_batched(statsd): statsd.open_buffer() statsd.gauge("page.views", 123) statsd.timing("timer", 123) statsd.close_buffer() assert statsd.socket.recv() == "page.views:123|g\ntimer:123|ms" def test_context_manager(): fake_socket = FakeSocket() with Statsd() as statsd: statsd._socket = fake_socket statsd.gauge("page.views", 123) statsd.timing("timer", 123) assert fake_socket.recv() == "page.views:123|g\ntimer:123|ms" def test_batched_buffer_autoflush(): fake_socket = FakeSocket() with Statsd() as statsd: statsd._socket = fake_socket for i in range(51): statsd.increment("mycounter") assert "\n".join(["mycounter:1|c" for i in range(50)]) == fake_socket.recv() assert fake_socket.recv() == "mycounter:1|c" def test_module_level_instance(statsd): from swh.core.statsd import statsd assert isinstance(statsd, Statsd) def test_instantiating_does_not_connect(): local_statsd = Statsd() assert local_statsd._socket is None def test_accessing_socket_opens_socket(): local_statsd = Statsd() try: assert local_statsd.socket is not None finally: local_statsd.close_socket() def test_accessing_socket_multiple_times_returns_same_socket(): local_statsd = Statsd() fresh_socket = FakeSocket() local_statsd._socket = fresh_socket assert fresh_socket == local_statsd.socket assert FakeSocket() != local_statsd.socket def test_tags_from_environment(monkeypatch): monkeypatch.setenv("STATSD_TAGS", "country:china,age:45") statsd = Statsd() statsd._socket = FakeSocket() statsd.gauge("gt", 123.4) assert statsd.socket.recv() == "gt:123.4|g|#age:45,country:china" def test_tags_from_environment_with_substitution(monkeypatch): monkeypatch.setenv("HOSTNAME", "sweethome") monkeypatch.setenv("PORT", "42") monkeypatch.setenv( "STATSD_TAGS", "country:china,age:45,host:$HOSTNAME,port:${PORT}" ) statsd = Statsd() statsd._socket = FakeSocket() statsd.gauge("gt", 123.4) assert ( statsd.socket.recv() == "gt:123.4|g|#age:45,country:china,host:sweethome,port:42" ) def test_tags_from_environment_and_constant(monkeypatch): monkeypatch.setenv("STATSD_TAGS", "country:china,age:45") statsd = Statsd(constant_tags={"country": "canada"}) statsd._socket = FakeSocket() statsd.gauge("gt", 123.4) assert statsd.socket.recv() == "gt:123.4|g|#age:45,country:canada" def test_tags_from_environment_warning(monkeypatch): monkeypatch.setenv("STATSD_TAGS", "valid:tag,invalid_tag") with pytest.warns(UserWarning) as record: statsd = Statsd() assert len(record) == 1 assert "invalid_tag" in record[0].message.args[0] assert "valid:tag" not in record[0].message.args[0] assert statsd.constant_tags == {"valid": "tag"} def test_gauge_doesnt_send_none(statsd): statsd.gauge("metric", None) assert statsd.socket.recv() is None def test_increment_doesnt_send_none(statsd): statsd.increment("metric", None) assert statsd.socket.recv() is None def test_decrement_doesnt_send_none(statsd): statsd.decrement("metric", None) assert statsd.socket.recv() is None def test_timing_doesnt_send_none(statsd): statsd.timing("metric", None) assert statsd.socket.recv() is None def test_histogram_doesnt_send_none(statsd): statsd.histogram("metric", None) assert statsd.socket.recv() is None def test_param_host(monkeypatch): monkeypatch.setenv("STATSD_HOST", "test-value") monkeypatch.setenv("STATSD_PORT", "") local_statsd = Statsd(host="actual-test-value") assert local_statsd.host == "actual-test-value" assert local_statsd.port == 8125 def test_param_port(monkeypatch): monkeypatch.setenv("STATSD_HOST", "") monkeypatch.setenv("STATSD_PORT", "12345") local_statsd = Statsd(port=4321) assert local_statsd.host == "localhost" assert local_statsd.port == 4321 def test_envvar_host(monkeypatch): monkeypatch.setenv("STATSD_HOST", "test-value") monkeypatch.setenv("STATSD_PORT", "") local_statsd = Statsd() assert local_statsd.host == "test-value" assert local_statsd.port == 8125 def test_envvar_port(monkeypatch): monkeypatch.setenv("STATSD_HOST", "") monkeypatch.setenv("STATSD_PORT", "12345") local_statsd = Statsd() assert local_statsd.host == "localhost" assert local_statsd.port == 12345 def test_namespace_added(): local_statsd = Statsd(namespace="test-namespace") local_statsd._socket = FakeSocket() local_statsd.gauge("gauge", 123.4) assert local_statsd.socket.recv() == "test-namespace.gauge:123.4|g" def test_contextmanager_empty(statsd): with statsd: assert True, "success" def test_contextmanager_buffering(statsd): with statsd as s: s.gauge("gauge", 123.4) s.gauge("gauge_other", 456.78) assert s.socket.recv() is None assert statsd.socket.recv() == "gauge:123.4|g\ngauge_other:456.78|g" def test_timed_elapsed(statsd): with statsd.timed("test_timer") as t: pass assert t.elapsed >= 0 assert statsd.socket.recv() == "test_timer:%s|ms" % t.elapsed def test_status_gauge(statsd): with statsd.status_gauge("test_status_gauge", ["s1", "s2", "s3"]) as set_status: set_status("s1") set_status("s2") set_status("s3") # enter the context manager: initialisation of gauges for listed statuses assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s1" assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s2" assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s3" # set_status("s1") assert statsd.socket.recv() == "test_status_gauge:1|g|#status:s1" # set_status("s2") assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s1" assert statsd.socket.recv() == "test_status_gauge:1|g|#status:s2" # set_status("s3") assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s2" assert statsd.socket.recv() == "test_status_gauge:1|g|#status:s3" # exit the context manager: cleanup gauges assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s1" assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s2" assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s3" def test_status_gauge_error(statsd): with statsd.status_gauge("test_status_gauge", ["s1", "s2", "s3"]) as set_status: with pytest.raises(ValueError): set_status("s4") def test_status_gauge_repeated(statsd): with statsd.status_gauge("test_status_gauge", ["s1", "s2", "s3"]) as set_status: set_status("s1") set_status("s1") set_status("s1") # enter the context manager: initialisation of gauges for listed statuses assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s1" assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s2" assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s3" # set_status("s1") assert statsd.socket.recv() == "test_status_gauge:1|g|#status:s1" # set_status("s1") assert statsd.socket.recv() == "test_status_gauge:1|g|#status:s1" # set_status("s1") assert statsd.socket.recv() == "test_status_gauge:1|g|#status:s1" # exit the context manager: cleanup gauges assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s1" assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s2" assert statsd.socket.recv() == "test_status_gauge:0|g|#status:s3"