diff --git a/swh/core/statsd.py b/swh/core/statsd.py index 1747328..8d002c2 100644 --- a/swh/core/statsd.py +++ b/swh/core/statsd.py @@ -1,440 +1,449 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Initially imported from https://github.com/DataDog/datadogpy/ # at revision 62b3a3e89988dc18d78c282fe3ff5d1813917436 # # Copyright (c) 2015, Datadog # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of Datadog nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # # Vastly adapted for integration in swh.core: # # - Removed python < 3.5 compat code # - trimmed the imports down to be a single module # - adjust some options: # - drop unix socket connection option # - add environment variable support for setting the statsd host and # port (pulled the idea from the main python statsd module) # - only send timer metrics in milliseconds (that's what # prometheus-statsd-exporter expects) # - drop DataDog-specific metric types (that are unsupported in # prometheus-statsd-exporter) # - made the tags a dict instead of a list (prometheus-statsd-exporter only # supports tags with a value, mirroring prometheus) # - switch from time.time to time.monotonic # - improve unit test coverage # - documentation cleanup from asyncio import iscoroutinefunction from functools import wraps import itertools import logging import os from random import random +import re import socket import threading from time import monotonic import warnings log = logging.getLogger("swh.core.statsd") class TimedContextManagerDecorator(object): """ A context manager and a decorator which will report the elapsed time in the context OR in a function call. Attributes: elapsed (float): the elapsed time at the point of completion """ def __init__( self, statsd, metric=None, error_metric=None, tags=None, sample_rate=1 ): self.statsd = statsd self.metric = metric self.error_metric = error_metric self.tags = tags self.sample_rate = sample_rate self.elapsed = None # this is for testing purpose def __call__(self, func): """ Decorator which returns the elapsed time of the function call. Default to the function name if metric was not provided. """ if not self.metric: self.metric = "%s.%s" % (func.__module__, func.__name__) # Coroutines if iscoroutinefunction(func): @wraps(func) async def wrapped_co(*args, **kwargs): start = monotonic() try: result = await func(*args, **kwargs) except: # noqa self._send_error() raise self._send(start) return result return wrapped_co # Others @wraps(func) def wrapped(*args, **kwargs): start = monotonic() try: result = func(*args, **kwargs) except: # noqa self._send_error() raise self._send(start) return result return wrapped def __enter__(self): if not self.metric: raise TypeError("Cannot used timed without a metric!") self._start = monotonic() return self def __exit__(self, type, value, traceback): # Report the elapsed time of the context manager if no error. if type is None: self._send(self._start) else: self._send_error() def _send(self, start): elapsed = (monotonic() - start) * 1000 self.statsd.timing( self.metric, elapsed, tags=self.tags, sample_rate=self.sample_rate ) self.elapsed = elapsed def _send_error(self): if self.error_metric is None: self.error_metric = self.metric + "_error_count" self.statsd.increment(self.error_metric, tags=self.tags) def start(self): """Start the timer""" self.__enter__() def stop(self): """Stop the timer, send the metric value""" self.__exit__(None, None, None) class Statsd(object): """Initialize a client to send metrics to a StatsD server. Arguments: host (str): the host of the StatsD server. Defaults to localhost. port (int): the port of the StatsD server. Defaults to 8125. max_buffer_size (int): Maximum number of metrics to buffer before sending to the server if sending metrics in batch namespace (str): Namespace to prefix all metric names constant_tags (Dict[str, str]): Tags to attach to all metrics Note: This class also supports the following environment variables: STATSD_HOST Override the default host of the statsd server STATSD_PORT Override the default port of the statsd server STATSD_TAGS Tags to attach to every metric reported. Example value: "label:value,other_label:other_value" """ def __init__( self, host=None, port=None, max_buffer_size=50, namespace=None, constant_tags=None, ): # Connection if host is None: host = os.environ.get("STATSD_HOST") or "localhost" self.host = host if port is None: port = os.environ.get("STATSD_PORT") or 8125 self.port = int(port) # Socket self._socket = None self.lock = threading.Lock() self.max_buffer_size = max_buffer_size self._send = self._send_to_server self.encoding = "utf-8" # Tags self.constant_tags = {} tags_envvar = os.environ.get("STATSD_TAGS", "") for tag in tags_envvar.split(","): if not tag: continue if ":" not in tag: warnings.warn( "STATSD_TAGS needs to be in key:value format, " "%s invalid" % tag, UserWarning, ) continue k, v = tag.split(":", 1) + + # look for a possible env var substitution, using $NAME or ${NAME} format + m = re.match(r"^[$]([{])?(?P\w+)(?(1)[}]|)$", v) + if m: + envvar = m.group("envvar") + if envvar in os.environ: + v = os.environ[envvar] + self.constant_tags[k] = v if constant_tags: self.constant_tags.update( {str(k): str(v) for k, v in constant_tags.items()} ) # Namespace if namespace is not None: namespace = str(namespace) self.namespace = namespace def __enter__(self): self.open_buffer(self.max_buffer_size) return self def __exit__(self, type, value, traceback): self.close_buffer() def gauge(self, metric, value, tags=None, sample_rate=1): """ Record the value of a gauge, optionally setting a list of tags and a sample rate. >>> statsd.gauge('users.online', 123) >>> statsd.gauge('active.connections', 1001, tags={"protocol": "http"}) """ return self._report(metric, "g", value, tags, sample_rate) def increment(self, metric, value=1, tags=None, sample_rate=1): """ Increment a counter, optionally setting a value, tags and a sample rate. >>> statsd.increment('page.views') >>> statsd.increment('files.transferred', 124) """ self._report(metric, "c", value, tags, sample_rate) def decrement(self, metric, value=1, tags=None, sample_rate=1): """ Decrement a counter, optionally setting a value, tags and a sample rate. >>> statsd.decrement('files.remaining') >>> statsd.decrement('active.connections', 2) """ metric_value = -value if value else value self._report(metric, "c", metric_value, tags, sample_rate) def histogram(self, metric, value, tags=None, sample_rate=1): """ Sample a histogram value, optionally setting tags and a sample rate. >>> statsd.histogram('uploaded.file.size', 1445) >>> statsd.histogram('file.count', 26, tags={"filetype": "python"}) """ self._report(metric, "h", value, tags, sample_rate) def timing(self, metric, value, tags=None, sample_rate=1): """ Record a timing, optionally setting tags and a sample rate. >>> statsd.timing("query.response.time", 1234) """ self._report(metric, "ms", value, tags, sample_rate) def timed(self, metric=None, error_metric=None, tags=None, sample_rate=1): """ A decorator or context manager that will measure the distribution of a function's/context's run time. Optionally specify a list of tags or a sample rate. If the metric is not defined as a decorator, the module name and function name will be used. The metric is required as a context manager. :: @statsd.timed('user.query.time', sample_rate=0.5) def get_user(user_id): # Do what you need to ... pass # Is equivalent to ... with statsd.timed('user.query.time', sample_rate=0.5): # Do what you need to ... pass # Is equivalent to ... start = time.monotonic() try: get_user(user_id) finally: statsd.timing('user.query.time', time.monotonic() - start) """ return TimedContextManagerDecorator( statsd=self, metric=metric, error_metric=error_metric, tags=tags, sample_rate=sample_rate, ) def set(self, metric, value, tags=None, sample_rate=1): """ Sample a set value. >>> statsd.set('visitors.uniques', 999) """ self._report(metric, "s", value, tags, sample_rate) @property def socket(self): """ Return a connected socket. Note: connect the socket before assigning it to the class instance to avoid bad thread race conditions. """ with self.lock: if not self._socket: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.connect((self.host, self.port)) self._socket = sock return self._socket def open_buffer(self, max_buffer_size=50): """ Open a buffer to send a batch of metrics in one packet. You can also use this as a context manager. >>> with Statsd() as batch: ... batch.gauge('users.online', 123) ... batch.gauge('active.connections', 1001) """ self.max_buffer_size = max_buffer_size self.buffer = [] self._send = self._send_to_buffer def close_buffer(self): """ Flush the buffer and switch back to single metric packets. """ self._send = self._send_to_server if self.buffer: # Only send packets if there are packets to send self._flush_buffer() def close_socket(self): """ Closes connected socket if connected. """ with self.lock: if self._socket: self._socket.close() self._socket = None def _report(self, metric, metric_type, value, tags, sample_rate): """ Create a metric packet and send it. """ if value is None: return if sample_rate != 1 and random() > sample_rate: return # Resolve the full tag list tags = self._add_constant_tags(tags) # Create/format the metric packet payload = "%s%s:%s|%s%s%s" % ( (self.namespace + ".") if self.namespace else "", metric, value, metric_type, ("|@" + str(sample_rate)) if sample_rate != 1 else "", ("|#" + ",".join("%s:%s" % (k, v) for (k, v) in sorted(tags.items()))) if tags else "", ) # Send it self._send(payload) def _send_to_server(self, packet): try: # If set, use socket directly self.socket.send(packet.encode("utf-8")) except socket.timeout: return except socket.error: log.debug( "Error submitting statsd packet." " Dropping the packet and closing the socket." ) self.close_socket() def _send_to_buffer(self, packet): self.buffer.append(packet) if len(self.buffer) >= self.max_buffer_size: self._flush_buffer() def _flush_buffer(self): self._send_to_server("\n".join(self.buffer)) self.buffer = [] def _add_constant_tags(self, tags): return { str(k): str(v) for k, v in itertools.chain( self.constant_tags.items(), (tags if tags else {}).items(), ) } statsd = Statsd() diff --git a/swh/core/tests/test_statsd.py b/swh/core/tests/test_statsd.py index 1f41483..bea9b34 100644 --- a/swh/core/tests/test_statsd.py +++ b/swh/core/tests/test_statsd.py @@ -1,536 +1,551 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Initially imported from https://github.com/DataDog/datadogpy/ # at revision 62b3a3e89988dc18d78c282fe3ff5d1813917436 # # Copyright (c) 2015, Datadog # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of Datadog nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # import socket import time import pytest from swh.core.pytest_plugin import FakeSocket from swh.core.statsd import Statsd, TimedContextManagerDecorator class BrokenSocket(FakeSocket): def send(self, payload): raise socket.error("Socket error") class SlowSocket(FakeSocket): def send(self, payload): raise socket.timeout("Socket timeout") def assert_almost_equal(a, b, delta): assert 0 <= abs(a - b) <= delta, f"|{a} - {b}| not within {delta}" def test_set(statsd): statsd.set("set", 123) assert statsd.socket.recv() == "set:123|s" def test_gauge(statsd): statsd.gauge("gauge", 123.4) assert statsd.socket.recv() == "gauge:123.4|g" def test_counter(statsd): statsd.increment("page.views") assert statsd.socket.recv() == "page.views:1|c" statsd.increment("page.views", 11) assert statsd.socket.recv() == "page.views:11|c" statsd.decrement("page.views") assert statsd.socket.recv() == "page.views:-1|c" statsd.decrement("page.views", 12) assert statsd.socket.recv() == "page.views:-12|c" def test_histogram(statsd): statsd.histogram("histo", 123.4) assert statsd.socket.recv() == "histo:123.4|h" def test_tagged_gauge(statsd): statsd.gauge("gt", 123.4, tags={"country": "china", "age": 45}) assert statsd.socket.recv() == "gt:123.4|g|#age:45,country:china" def test_tagged_counter(statsd): statsd.increment("ct", tags={"country": "españa"}) assert statsd.socket.recv() == "ct:1|c|#country:españa" def test_tagged_histogram(statsd): statsd.histogram("h", 1, tags={"test_tag": "tag_value"}) assert statsd.socket.recv() == "h:1|h|#test_tag:tag_value" def test_sample_rate(statsd): statsd.increment("c", sample_rate=0) assert not statsd.socket.recv() for i in range(10000): statsd.increment("sampled_counter", sample_rate=0.3) assert_almost_equal(3000, len(statsd.socket.payloads), 150) assert statsd.socket.recv() == "sampled_counter:1|c|@0.3" def test_tags_and_samples(statsd): for i in range(100): statsd.gauge("gst", 23, tags={"sampled": True}, sample_rate=0.9) assert_almost_equal(90, len(statsd.socket.payloads), 10) assert statsd.socket.recv() == "gst:23|g|@0.9|#sampled:True" def test_timing(statsd): statsd.timing("t", 123) assert statsd.socket.recv() == "t:123|ms" def test_metric_namespace(statsd): """ Namespace prefixes all metric names. """ statsd.namespace = "foo" statsd.gauge("gauge", 123.4) assert statsd.socket.recv() == "foo.gauge:123.4|g" # Test Client level constant tags def test_gauge_constant_tags(statsd): statsd.constant_tags = { "bar": "baz", } statsd.gauge("gauge", 123.4) assert statsd.socket.recv() == "gauge:123.4|g|#bar:baz" def test_counter_constant_tag_with_metric_level_tags(statsd): statsd.constant_tags = { "bar": "baz", "foo": True, } statsd.increment("page.views", tags={"extra": "extra"}) assert statsd.socket.recv() == "page.views:1|c|#bar:baz,extra:extra,foo:True" def test_gauge_constant_tags_with_metric_level_tags_twice(statsd): metric_level_tag = {"foo": "bar"} statsd.constant_tags = {"bar": "baz"} statsd.gauge("gauge", 123.4, tags=metric_level_tag) assert statsd.socket.recv() == "gauge:123.4|g|#bar:baz,foo:bar" # sending metrics multiple times with same metric-level tags # should not duplicate the tags being sent statsd.gauge("gauge", 123.4, tags=metric_level_tag) assert statsd.socket.recv() == "gauge:123.4|g|#bar:baz,foo:bar" def test_socket_error(statsd): statsd._socket = BrokenSocket() statsd.gauge("no error", 1) assert True, "success" def test_socket_timeout(statsd): statsd._socket = SlowSocket() statsd.gauge("no error", 1) assert True, "success" def test_timed(statsd): """ Measure the distribution of a function's run time. """ @statsd.timed("timed.test") def func(a, b, c=1, d=1): """docstring""" time.sleep(0.5) return (a, b, c, d) assert func.__name__ == "func" assert func.__doc__ == "docstring" result = func(1, 2, d=3) # Assert it handles args and kwargs correctly. assert result, (1, 2, 1 == 3) packet = statsd.socket.recv() name_value, type_ = packet.split("|") name, value = name_value.split(":") assert type_ == "ms" assert name == "timed.test" assert_almost_equal(500, float(value), 100) def test_timed_exception(statsd): """ Exception bubble out of the decorator and is reported to statsd as a dedicated counter. """ @statsd.timed("timed.test") def func(a, b, c=1, d=1): """docstring""" time.sleep(0.5) return (a / b, c, d) assert func.__name__ == "func" assert func.__doc__ == "docstring" with pytest.raises(ZeroDivisionError): func(1, 0) packet = statsd.socket.recv() name_value, type_ = packet.split("|") name, value = name_value.split(":") assert type_ == "c" assert name == "timed.test_error_count" assert int(value) == 1 def test_timed_no_metric(statsd): """ Test using a decorator without providing a metric. """ @statsd.timed() def func(a, b, c=1, d=1): """docstring""" time.sleep(0.5) return (a, b, c, d) assert func.__name__ == "func" assert func.__doc__ == "docstring" result = func(1, 2, d=3) # Assert it handles args and kwargs correctly. assert result, (1, 2, 1 == 3) packet = statsd.socket.recv() name_value, type_ = packet.split("|") name, value = name_value.split(":") assert type_ == "ms" assert name == "swh.core.tests.test_statsd.func" assert_almost_equal(500, float(value), 100) def test_timed_coroutine(statsd): """ Measure the distribution of a coroutine function's run time. Warning: Python >= 3.5 only. """ import asyncio @statsd.timed("timed.test") @asyncio.coroutine def print_foo(): """docstring""" time.sleep(0.5) print("foo") loop = asyncio.new_event_loop() loop.run_until_complete(print_foo()) loop.close() # Assert packet = statsd.socket.recv() name_value, type_ = packet.split("|") name, value = name_value.split(":") assert type_ == "ms" assert name == "timed.test" assert_almost_equal(500, float(value), 100) def test_timed_context(statsd): """ Measure the distribution of a context's run time. """ # In milliseconds with statsd.timed("timed_context.test") as timer: assert isinstance(timer, TimedContextManagerDecorator) time.sleep(0.5) packet = statsd.socket.recv() name_value, type_ = packet.split("|") name, value = name_value.split(":") assert type_ == "ms" assert name == "timed_context.test" assert_almost_equal(500, float(value), 100) assert_almost_equal(500, timer.elapsed, 100) def test_timed_context_exception(statsd): """ Exception bubbles out of the `timed` context manager and is reported to statsd as a dedicated counter. """ class ContextException(Exception): pass def func(statsd): with statsd.timed("timed_context.test"): time.sleep(0.5) raise ContextException() # Ensure the exception was raised. with pytest.raises(ContextException): func(statsd) # Ensure the timing was recorded. packet = statsd.socket.recv() name_value, type_ = packet.split("|") name, value = name_value.split(":") assert type_ == "c" assert name == "timed_context.test_error_count" assert int(value) == 1 def test_timed_context_no_metric_name_exception(statsd): """Test that an exception occurs if using a context manager without a metric name. """ def func(statsd): with statsd.timed(): time.sleep(0.5) # Ensure the exception was raised. with pytest.raises(TypeError): func(statsd) # Ensure the timing was recorded. packet = statsd.socket.recv() assert packet is None def test_timed_start_stop_calls(statsd): timer = statsd.timed("timed_context.test") timer.start() time.sleep(0.5) timer.stop() packet = statsd.socket.recv() name_value, type_ = packet.split("|") name, value = name_value.split(":") assert type_ == "ms" assert name == "timed_context.test" assert_almost_equal(500, float(value), 100) def test_batched(statsd): statsd.open_buffer() statsd.gauge("page.views", 123) statsd.timing("timer", 123) statsd.close_buffer() assert statsd.socket.recv() == "page.views:123|g\ntimer:123|ms" def test_context_manager(): fake_socket = FakeSocket() with Statsd() as statsd: statsd._socket = fake_socket statsd.gauge("page.views", 123) statsd.timing("timer", 123) assert fake_socket.recv() == "page.views:123|g\ntimer:123|ms" def test_batched_buffer_autoflush(): fake_socket = FakeSocket() with Statsd() as statsd: statsd._socket = fake_socket for i in range(51): statsd.increment("mycounter") assert "\n".join(["mycounter:1|c" for i in range(50)]) == fake_socket.recv() assert fake_socket.recv() == "mycounter:1|c" def test_module_level_instance(statsd): from swh.core.statsd import statsd assert isinstance(statsd, Statsd) def test_instantiating_does_not_connect(): local_statsd = Statsd() assert local_statsd._socket is None def test_accessing_socket_opens_socket(): local_statsd = Statsd() try: assert local_statsd.socket is not None finally: local_statsd.close_socket() def test_accessing_socket_multiple_times_returns_same_socket(): local_statsd = Statsd() fresh_socket = FakeSocket() local_statsd._socket = fresh_socket assert fresh_socket == local_statsd.socket assert FakeSocket() != local_statsd.socket def test_tags_from_environment(monkeypatch): monkeypatch.setenv("STATSD_TAGS", "country:china,age:45") statsd = Statsd() statsd._socket = FakeSocket() statsd.gauge("gt", 123.4) assert statsd.socket.recv() == "gt:123.4|g|#age:45,country:china" +def test_tags_from_environment_with_substitution(monkeypatch): + monkeypatch.setenv("HOSTNAME", "sweethome") + monkeypatch.setenv("PORT", "42") + monkeypatch.setenv( + "STATSD_TAGS", "country:china,age:45,host:$HOSTNAME,port:${PORT}" + ) + statsd = Statsd() + statsd._socket = FakeSocket() + statsd.gauge("gt", 123.4) + assert ( + statsd.socket.recv() + == "gt:123.4|g|#age:45,country:china,host:sweethome,port:42" + ) + + def test_tags_from_environment_and_constant(monkeypatch): monkeypatch.setenv("STATSD_TAGS", "country:china,age:45") statsd = Statsd(constant_tags={"country": "canada"}) statsd._socket = FakeSocket() statsd.gauge("gt", 123.4) assert statsd.socket.recv() == "gt:123.4|g|#age:45,country:canada" def test_tags_from_environment_warning(monkeypatch): monkeypatch.setenv("STATSD_TAGS", "valid:tag,invalid_tag") with pytest.warns(UserWarning) as record: statsd = Statsd() assert len(record) == 1 assert "invalid_tag" in record[0].message.args[0] assert "valid:tag" not in record[0].message.args[0] assert statsd.constant_tags == {"valid": "tag"} def test_gauge_doesnt_send_none(statsd): statsd.gauge("metric", None) assert statsd.socket.recv() is None def test_increment_doesnt_send_none(statsd): statsd.increment("metric", None) assert statsd.socket.recv() is None def test_decrement_doesnt_send_none(statsd): statsd.decrement("metric", None) assert statsd.socket.recv() is None def test_timing_doesnt_send_none(statsd): statsd.timing("metric", None) assert statsd.socket.recv() is None def test_histogram_doesnt_send_none(statsd): statsd.histogram("metric", None) assert statsd.socket.recv() is None def test_param_host(monkeypatch): monkeypatch.setenv("STATSD_HOST", "test-value") monkeypatch.setenv("STATSD_PORT", "") local_statsd = Statsd(host="actual-test-value") assert local_statsd.host == "actual-test-value" assert local_statsd.port == 8125 def test_param_port(monkeypatch): monkeypatch.setenv("STATSD_HOST", "") monkeypatch.setenv("STATSD_PORT", "12345") local_statsd = Statsd(port=4321) assert local_statsd.host == "localhost" assert local_statsd.port == 4321 def test_envvar_host(monkeypatch): monkeypatch.setenv("STATSD_HOST", "test-value") monkeypatch.setenv("STATSD_PORT", "") local_statsd = Statsd() assert local_statsd.host == "test-value" assert local_statsd.port == 8125 def test_envvar_port(monkeypatch): monkeypatch.setenv("STATSD_HOST", "") monkeypatch.setenv("STATSD_PORT", "12345") local_statsd = Statsd() assert local_statsd.host == "localhost" assert local_statsd.port == 12345 def test_namespace_added(): local_statsd = Statsd(namespace="test-namespace") local_statsd._socket = FakeSocket() local_statsd.gauge("gauge", 123.4) assert local_statsd.socket.recv() == "test-namespace.gauge:123.4|g" def test_contextmanager_empty(statsd): with statsd: assert True, "success" def test_contextmanager_buffering(statsd): with statsd as s: s.gauge("gauge", 123.4) s.gauge("gauge_other", 456.78) assert s.socket.recv() is None assert statsd.socket.recv() == "gauge:123.4|g\ngauge_other:456.78|g" def test_timed_elapsed(statsd): with statsd.timed("test_timer") as t: pass assert t.elapsed >= 0 assert statsd.socket.recv() == "test_timer:%s|ms" % t.elapsed