diff --git a/swh/core/statsd.py b/swh/core/statsd.py index 30dcff4..30be881 100644 --- a/swh/core/statsd.py +++ b/swh/core/statsd.py @@ -1,425 +1,430 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Initially imported from https://github.com/DataDog/datadogpy/ # at revision 62b3a3e89988dc18d78c282fe3ff5d1813917436 # # Copyright (c) 2015, Datadog # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of Datadog nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # # Vastly adapted for integration in swh.core: # # - Removed python < 3.5 compat code # - trimmed the imports down to be a single module # - adjust some options: # - drop unix socket connection option # - add environment variable support for setting the statsd host and # port (pulled the idea from the main python statsd module) # - only send timer metrics in milliseconds (that's what # prometheus-statsd-exporter expects) # - drop DataDog-specific metric types (that are unsupported in # prometheus-statsd-exporter) # - made the tags a dict instead of a list (prometheus-statsd-exporter only # supports tags with a value, mirroring prometheus) # - switch from time.time to time.monotonic # - improve unit test coverage # - documentation cleanup from asyncio import iscoroutinefunction from functools import wraps from random import random from time import monotonic import itertools import logging import os import socket +import threading import warnings log = logging.getLogger('swh.core.statsd') class TimedContextManagerDecorator(object): """ A context manager and a decorator which will report the elapsed time in the context OR in a function call. Attributes: elapsed (float): the elapsed time at the point of completion """ def __init__(self, statsd, metric=None, error_metric=None, tags=None, sample_rate=1): self.statsd = statsd self.metric = metric self.error_metric = error_metric self.tags = tags self.sample_rate = sample_rate self.elapsed = None # this is for testing purpose def __call__(self, func): """ Decorator which returns the elapsed time of the function call. Default to the function name if metric was not provided. """ if not self.metric: self.metric = '%s.%s' % (func.__module__, func.__name__) # Coroutines if iscoroutinefunction(func): @wraps(func) async def wrapped_co(*args, **kwargs): start = monotonic() try: result = await func(*args, **kwargs) except: # noqa self._send_error() raise self._send(start) return result return wrapped_co # Others @wraps(func) def wrapped(*args, **kwargs): start = monotonic() try: result = func(*args, **kwargs) except: # noqa self._send_error() raise self._send(start) return result return wrapped def __enter__(self): if not self.metric: raise TypeError("Cannot used timed without a metric!") self._start = monotonic() return self def __exit__(self, type, value, traceback): # Report the elapsed time of the context manager if no error. if type is None: self._send(self._start) else: self._send_error() def _send(self, start): elapsed = (monotonic() - start) * 1000 self.statsd.timing(self.metric, elapsed, tags=self.tags, sample_rate=self.sample_rate) self.elapsed = elapsed def _send_error(self): if self.error_metric is None: self.error_metric = self.metric + '_error_count' self.statsd.increment(self.error_metric, tags=self.tags) def start(self): """Start the timer""" self.__enter__() def stop(self): """Stop the timer, send the metric value""" self.__exit__(None, None, None) class Statsd(object): """Initialize a client to send metrics to a StatsD server. Arguments: host (str): the host of the StatsD server. Defaults to localhost. port (int): the port of the StatsD server. Defaults to 8125. max_buffer_size (int): Maximum number of metrics to buffer before sending to the server if sending metrics in batch namespace (str): Namespace to prefix all metric names constant_tags (Dict[str, str]): Tags to attach to all metrics Note: This class also supports the following environment variables: STATSD_HOST Override the default host of the statsd server STATSD_PORT Override the default port of the statsd server STATSD_TAGS Tags to attach to every metric reported. Example value: "label:value,other_label:other_value" """ def __init__(self, host=None, port=None, max_buffer_size=50, namespace=None, constant_tags=None): # Connection if host is None: host = os.environ.get('STATSD_HOST') or 'localhost' self.host = host if port is None: port = os.environ.get('STATSD_PORT') or 8125 self.port = int(port) # Socket - self.socket = None + self._socket = None + self.lock = threading.Lock() self.max_buffer_size = max_buffer_size self._send = self._send_to_server self.encoding = 'utf-8' # Tags self.constant_tags = {} tags_envvar = os.environ.get('STATSD_TAGS', '') for tag in tags_envvar.split(','): if not tag: continue if ':' not in tag: warnings.warn( 'STATSD_TAGS needs to be in key:value format, ' '%s invalid' % tag, UserWarning, ) continue k, v = tag.split(':', 1) self.constant_tags[k] = v if constant_tags: self.constant_tags.update({ str(k): str(v) for k, v in constant_tags.items() }) # Namespace if namespace is not None: namespace = str(namespace) self.namespace = namespace def __enter__(self): self.open_buffer(self.max_buffer_size) return self def __exit__(self, type, value, traceback): self.close_buffer() def gauge(self, metric, value, tags=None, sample_rate=1): """ Record the value of a gauge, optionally setting a list of tags and a sample rate. >>> statsd.gauge('users.online', 123) >>> statsd.gauge('active.connections', 1001, tags={"protocol": "http"}) """ return self._report(metric, 'g', value, tags, sample_rate) def increment(self, metric, value=1, tags=None, sample_rate=1): """ Increment a counter, optionally setting a value, tags and a sample rate. >>> statsd.increment('page.views') >>> statsd.increment('files.transferred', 124) """ self._report(metric, 'c', value, tags, sample_rate) def decrement(self, metric, value=1, tags=None, sample_rate=1): """ Decrement a counter, optionally setting a value, tags and a sample rate. >>> statsd.decrement('files.remaining') >>> statsd.decrement('active.connections', 2) """ metric_value = -value if value else value self._report(metric, 'c', metric_value, tags, sample_rate) def histogram(self, metric, value, tags=None, sample_rate=1): """ Sample a histogram value, optionally setting tags and a sample rate. >>> statsd.histogram('uploaded.file.size', 1445) >>> statsd.histogram('file.count', 26, tags={"filetype": "python"}) """ self._report(metric, 'h', value, tags, sample_rate) def timing(self, metric, value, tags=None, sample_rate=1): """ Record a timing, optionally setting tags and a sample rate. >>> statsd.timing("query.response.time", 1234) """ self._report(metric, 'ms', value, tags, sample_rate) def timed(self, metric=None, error_metric=None, tags=None, sample_rate=1): """ A decorator or context manager that will measure the distribution of a function's/context's run time. Optionally specify a list of tags or a sample rate. If the metric is not defined as a decorator, the module name and function name will be used. The metric is required as a context manager. :: @statsd.timed('user.query.time', sample_rate=0.5) def get_user(user_id): # Do what you need to ... pass # Is equivalent to ... with statsd.timed('user.query.time', sample_rate=0.5): # Do what you need to ... pass # Is equivalent to ... start = time.monotonic() try: get_user(user_id) finally: statsd.timing('user.query.time', time.monotonic() - start) """ return TimedContextManagerDecorator( statsd=self, metric=metric, error_metric=error_metric, tags=tags, sample_rate=sample_rate) def set(self, metric, value, tags=None, sample_rate=1): """ Sample a set value. >>> statsd.set('visitors.uniques', 999) """ self._report(metric, 's', value, tags, sample_rate) - def get_socket(self): + @property + def socket(self): """ Return a connected socket. Note: connect the socket before assigning it to the class instance to avoid bad thread race conditions. """ - if not self.socket: - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.connect((self.host, self.port)) - self.socket = sock + with self.lock: + if not self._socket: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.connect((self.host, self.port)) + self._socket = sock - return self.socket + return self._socket def open_buffer(self, max_buffer_size=50): """ Open a buffer to send a batch of metrics in one packet. You can also use this as a context manager. >>> with Statsd() as batch: ... batch.gauge('users.online', 123) ... batch.gauge('active.connections', 1001) """ self.max_buffer_size = max_buffer_size self.buffer = [] self._send = self._send_to_buffer def close_buffer(self): """ Flush the buffer and switch back to single metric packets. """ self._send = self._send_to_server if self.buffer: # Only send packets if there are packets to send self._flush_buffer() def close_socket(self): """ Closes connected socket if connected. """ - if self.socket: - self.socket.close() - self.socket = None + with self.lock: + if self._socket: + self._socket.close() + self._socket = None def _report(self, metric, metric_type, value, tags, sample_rate): """ Create a metric packet and send it. """ if value is None: return if sample_rate != 1 and random() > sample_rate: return # Resolve the full tag list tags = self._add_constant_tags(tags) # Create/format the metric packet payload = "%s%s:%s|%s%s%s" % ( (self.namespace + ".") if self.namespace else "", metric, value, metric_type, ("|@" + str(sample_rate)) if sample_rate != 1 else "", ("|#" + ",".join( "%s:%s" % (k, v) for (k, v) in sorted(tags.items()) )) if tags else "", ) # Send it self._send(payload) def _send_to_server(self, packet): try: # If set, use socket directly - (self.socket or self.get_socket()).send(packet.encode('utf-8')) + self.socket.send(packet.encode('utf-8')) except socket.timeout: return except socket.error: log.debug( "Error submitting statsd packet." " Dropping the packet and closing the socket." ) self.close_socket() def _send_to_buffer(self, packet): self.buffer.append(packet) if len(self.buffer) >= self.max_buffer_size: self._flush_buffer() def _flush_buffer(self): self._send_to_server("\n".join(self.buffer)) self.buffer = [] def _add_constant_tags(self, tags): return { str(k): str(v) for k, v in itertools.chain( self.constant_tags.items(), (tags if tags else {}).items(), ) } statsd = Statsd() diff --git a/swh/core/tests/test_statsd.py b/swh/core/tests/test_statsd.py index 7b5dd62..56d1aa9 100644 --- a/swh/core/tests/test_statsd.py +++ b/swh/core/tests/test_statsd.py @@ -1,563 +1,563 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Initially imported from https://github.com/DataDog/datadogpy/ # at revision 62b3a3e89988dc18d78c282fe3ff5d1813917436 # # Copyright (c) 2015, Datadog # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of Datadog nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # from collections import deque from contextlib import contextmanager import os import socket import time import unittest import pytest from swh.core.statsd import Statsd, TimedContextManagerDecorator @contextmanager def preserve_envvars(*envvars): """Context manager preserving the value of environment variables""" preserved = {} to_delete = object() for var in envvars: preserved[var] = os.environ.get(var, to_delete) yield for var in envvars: old = preserved[var] if old is not to_delete: os.environ[var] = old else: del os.environ[var] class FakeSocket(object): """ A fake socket for testing. """ def __init__(self): self.payloads = deque() def send(self, payload): assert type(payload) == bytes self.payloads.append(payload) def recv(self): try: return self.payloads.popleft().decode('utf-8') except IndexError: return None def close(self): pass def __repr__(self): return str(self.payloads) class BrokenSocket(FakeSocket): def send(self, payload): raise socket.error("Socket error") class SlowSocket(FakeSocket): def send(self, payload): raise socket.timeout("Socket timeout") class TestStatsd(unittest.TestCase): def setUp(self): """ Set up a default Statsd instance and mock the socket. """ # self.statsd = Statsd() - self.statsd.socket = FakeSocket() + self.statsd._socket = FakeSocket() def recv(self): return self.statsd.socket.recv() def test_set(self): self.statsd.set('set', 123) assert self.recv() == 'set:123|s' def test_gauge(self): self.statsd.gauge('gauge', 123.4) assert self.recv() == 'gauge:123.4|g' def test_counter(self): self.statsd.increment('page.views') self.assertEqual('page.views:1|c', self.recv()) self.statsd.increment('page.views', 11) self.assertEqual('page.views:11|c', self.recv()) self.statsd.decrement('page.views') self.assertEqual('page.views:-1|c', self.recv()) self.statsd.decrement('page.views', 12) self.assertEqual('page.views:-12|c', self.recv()) def test_histogram(self): self.statsd.histogram('histo', 123.4) self.assertEqual('histo:123.4|h', self.recv()) def test_tagged_gauge(self): self.statsd.gauge('gt', 123.4, tags={'country': 'china', 'age': 45}) self.assertEqual('gt:123.4|g|#age:45,country:china', self.recv()) def test_tagged_counter(self): self.statsd.increment('ct', tags={'country': 'españa'}) self.assertEqual('ct:1|c|#country:españa', self.recv()) def test_tagged_histogram(self): self.statsd.histogram('h', 1, tags={'test_tag': 'tag_value'}) self.assertEqual('h:1|h|#test_tag:tag_value', self.recv()) def test_sample_rate(self): self.statsd.increment('c', sample_rate=0) assert not self.recv() for i in range(10000): self.statsd.increment('sampled_counter', sample_rate=0.3) self.assert_almost_equal(3000, len(self.statsd.socket.payloads), 150) self.assertEqual('sampled_counter:1|c|@0.3', self.recv()) def test_tags_and_samples(self): for i in range(100): self.statsd.gauge('gst', 23, tags={"sampled": True}, sample_rate=0.9) self.assert_almost_equal(90, len(self.statsd.socket.payloads), 10) self.assertEqual('gst:23|g|@0.9|#sampled:True', self.recv()) def test_timing(self): self.statsd.timing('t', 123) self.assertEqual('t:123|ms', self.recv()) def test_metric_namespace(self): """ Namespace prefixes all metric names. """ self.statsd.namespace = "foo" self.statsd.gauge('gauge', 123.4) self.assertEqual('foo.gauge:123.4|g', self.recv()) # Test Client level constant tags def test_gauge_constant_tags(self): self.statsd.constant_tags = { 'bar': 'baz', } self.statsd.gauge('gauge', 123.4) assert self.recv() == 'gauge:123.4|g|#bar:baz' def test_counter_constant_tag_with_metric_level_tags(self): self.statsd.constant_tags = { 'bar': 'baz', 'foo': True, } self.statsd.increment('page.views', tags={'extra': 'extra'}) self.assertEqual( 'page.views:1|c|#bar:baz,extra:extra,foo:True', self.recv(), ) def test_gauge_constant_tags_with_metric_level_tags_twice(self): metric_level_tag = {'foo': 'bar'} self.statsd.constant_tags = {'bar': 'baz'} self.statsd.gauge('gauge', 123.4, tags=metric_level_tag) assert self.recv() == 'gauge:123.4|g|#bar:baz,foo:bar' # sending metrics multiple times with same metric-level tags # should not duplicate the tags being sent self.statsd.gauge('gauge', 123.4, tags=metric_level_tag) assert self.recv() == 'gauge:123.4|g|#bar:baz,foo:bar' def assert_almost_equal(self, a, b, delta): self.assertTrue( 0 <= abs(a - b) <= delta, "%s - %s not within %s" % (a, b, delta) ) def test_socket_error(self): - self.statsd.socket = BrokenSocket() + self.statsd._socket = BrokenSocket() self.statsd.gauge('no error', 1) assert True, 'success' def test_socket_timeout(self): - self.statsd.socket = SlowSocket() + self.statsd._socket = SlowSocket() self.statsd.gauge('no error', 1) assert True, 'success' def test_timed(self): """ Measure the distribution of a function's run time. """ @self.statsd.timed('timed.test') def func(a, b, c=1, d=1): """docstring""" time.sleep(0.5) return (a, b, c, d) self.assertEqual('func', func.__name__) self.assertEqual('docstring', func.__doc__) result = func(1, 2, d=3) # Assert it handles args and kwargs correctly. self.assertEqual(result, (1, 2, 1, 3)) packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('ms', type_) self.assertEqual('timed.test', name) self.assert_almost_equal(500, float(value), 100) def test_timed_exception(self): """ Exception bubble out of the decorator and is reported to statsd as a dedicated counter. """ @self.statsd.timed('timed.test') def func(a, b, c=1, d=1): """docstring""" time.sleep(0.5) return (a / b, c, d) self.assertEqual('func', func.__name__) self.assertEqual('docstring', func.__doc__) with self.assertRaises(ZeroDivisionError): func(1, 0) packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('c', type_) self.assertEqual('timed.test_error_count', name) self.assertEqual(int(value), 1) def test_timed_no_metric(self, ): """ Test using a decorator without providing a metric. """ @self.statsd.timed() def func(a, b, c=1, d=1): """docstring""" time.sleep(0.5) return (a, b, c, d) self.assertEqual('func', func.__name__) self.assertEqual('docstring', func.__doc__) result = func(1, 2, d=3) # Assert it handles args and kwargs correctly. self.assertEqual(result, (1, 2, 1, 3)) packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('ms', type_) self.assertEqual('swh.core.tests.test_statsd.func', name) self.assert_almost_equal(500, float(value), 100) def test_timed_coroutine(self): """ Measure the distribution of a coroutine function's run time. Warning: Python >= 3.5 only. """ import asyncio @self.statsd.timed('timed.test') @asyncio.coroutine def print_foo(): """docstring""" time.sleep(0.5) print("foo") loop = asyncio.new_event_loop() loop.run_until_complete(print_foo()) loop.close() # Assert packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('ms', type_) self.assertEqual('timed.test', name) self.assert_almost_equal(500, float(value), 100) def test_timed_context(self): """ Measure the distribution of a context's run time. """ # In milliseconds with self.statsd.timed('timed_context.test') as timer: self.assertIsInstance(timer, TimedContextManagerDecorator) time.sleep(0.5) packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('ms', type_) self.assertEqual('timed_context.test', name) self.assert_almost_equal(500, float(value), 100) self.assert_almost_equal(500, timer.elapsed, 100) def test_timed_context_exception(self): """ Exception bubbles out of the `timed` context manager and is reported to statsd as a dedicated counter. """ class ContextException(Exception): pass def func(self): with self.statsd.timed('timed_context.test'): time.sleep(0.5) raise ContextException() # Ensure the exception was raised. self.assertRaises(ContextException, func, self) # Ensure the timing was recorded. packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('c', type_) self.assertEqual('timed_context.test_error_count', name) self.assertEqual(int(value), 1) def test_timed_context_no_metric_name_exception(self): """Test that an exception occurs if using a context manager without a metric name. """ def func(self): with self.statsd.timed(): time.sleep(0.5) # Ensure the exception was raised. self.assertRaises(TypeError, func, self) # Ensure the timing was recorded. packet = self.recv() self.assertEqual(packet, None) def test_timed_start_stop_calls(self): timer = self.statsd.timed('timed_context.test') timer.start() time.sleep(0.5) timer.stop() packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('ms', type_) self.assertEqual('timed_context.test', name) self.assert_almost_equal(500, float(value), 100) def test_batched(self): self.statsd.open_buffer() self.statsd.gauge('page.views', 123) self.statsd.timing('timer', 123) self.statsd.close_buffer() self.assertEqual('page.views:123|g\ntimer:123|ms', self.recv()) def test_context_manager(self): fake_socket = FakeSocket() with Statsd() as statsd: - statsd.socket = fake_socket + statsd._socket = fake_socket statsd.gauge('page.views', 123) statsd.timing('timer', 123) self.assertEqual('page.views:123|g\ntimer:123|ms', fake_socket.recv()) def test_batched_buffer_autoflush(self): fake_socket = FakeSocket() with Statsd() as statsd: - statsd.socket = fake_socket + statsd._socket = fake_socket for i in range(51): statsd.increment('mycounter') self.assertEqual( '\n'.join(['mycounter:1|c' for i in range(50)]), fake_socket.recv(), ) self.assertEqual('mycounter:1|c', fake_socket.recv()) def test_module_level_instance(self): from swh.core.statsd import statsd self.assertTrue(isinstance(statsd, Statsd)) def test_instantiating_does_not_connect(self): local_statsd = Statsd() - self.assertEqual(None, local_statsd.socket) + self.assertEqual(None, local_statsd._socket) def test_accessing_socket_opens_socket(self): local_statsd = Statsd() try: - self.assertIsNotNone(local_statsd.get_socket()) + self.assertIsNotNone(local_statsd.socket) finally: - local_statsd.socket.close() + local_statsd.close_socket() def test_accessing_socket_multiple_times_returns_same_socket(self): local_statsd = Statsd() fresh_socket = FakeSocket() - local_statsd.socket = fresh_socket - self.assertEqual(fresh_socket, local_statsd.get_socket()) - self.assertNotEqual(FakeSocket(), local_statsd.get_socket()) + local_statsd._socket = fresh_socket + self.assertEqual(fresh_socket, local_statsd.socket) + self.assertNotEqual(FakeSocket(), local_statsd.socket) def test_tags_from_environment(self): with preserve_envvars('STATSD_TAGS'): os.environ['STATSD_TAGS'] = 'country:china,age:45' statsd = Statsd() - statsd.socket = FakeSocket() + statsd._socket = FakeSocket() statsd.gauge('gt', 123.4) self.assertEqual('gt:123.4|g|#age:45,country:china', statsd.socket.recv()) def test_tags_from_environment_and_constant(self): with preserve_envvars('STATSD_TAGS'): os.environ['STATSD_TAGS'] = 'country:china,age:45' statsd = Statsd(constant_tags={'country': 'canada'}) - statsd.socket = FakeSocket() + statsd._socket = FakeSocket() statsd.gauge('gt', 123.4) self.assertEqual('gt:123.4|g|#age:45,country:canada', statsd.socket.recv()) def test_tags_from_environment_warning(self): with preserve_envvars('STATSD_TAGS'): os.environ['STATSD_TAGS'] = 'valid:tag,invalid_tag' with pytest.warns(UserWarning) as record: statsd = Statsd() assert len(record) == 1 assert 'invalid_tag' in record[0].message.args[0] assert 'valid:tag' not in record[0].message.args[0] assert statsd.constant_tags == {'valid': 'tag'} def test_gauge_doesnt_send_none(self): self.statsd.gauge('metric', None) assert self.recv() is None def test_increment_doesnt_send_none(self): self.statsd.increment('metric', None) assert self.recv() is None def test_decrement_doesnt_send_none(self): self.statsd.decrement('metric', None) assert self.recv() is None def test_timing_doesnt_send_none(self): self.statsd.timing('metric', None) assert self.recv() is None def test_histogram_doesnt_send_none(self): self.statsd.histogram('metric', None) assert self.recv() is None def test_param_host(self): with preserve_envvars('STATSD_HOST', 'STATSD_PORT'): os.environ['STATSD_HOST'] = 'test-value' os.environ['STATSD_PORT'] = '' local_statsd = Statsd(host='actual-test-value') self.assertEqual(local_statsd.host, 'actual-test-value') self.assertEqual(local_statsd.port, 8125) def test_param_port(self): with preserve_envvars('STATSD_HOST', 'STATSD_PORT'): os.environ['STATSD_HOST'] = '' os.environ['STATSD_PORT'] = '12345' local_statsd = Statsd(port=4321) self.assertEqual(local_statsd.host, 'localhost') self.assertEqual(local_statsd.port, 4321) def test_envvar_host(self): with preserve_envvars('STATSD_HOST', 'STATSD_PORT'): os.environ['STATSD_HOST'] = 'test-value' os.environ['STATSD_PORT'] = '' local_statsd = Statsd() self.assertEqual(local_statsd.host, 'test-value') self.assertEqual(local_statsd.port, 8125) def test_envvar_port(self): with preserve_envvars('STATSD_HOST', 'STATSD_PORT'): os.environ['STATSD_HOST'] = '' os.environ['STATSD_PORT'] = '12345' local_statsd = Statsd() self.assertEqual(local_statsd.host, 'localhost') self.assertEqual(local_statsd.port, 12345) def test_namespace_added(self): local_statsd = Statsd(namespace='test-namespace') - local_statsd.socket = FakeSocket() + local_statsd._socket = FakeSocket() local_statsd.gauge('gauge', 123.4) assert local_statsd.socket.recv() == 'test-namespace.gauge:123.4|g' def test_contextmanager_empty(self): with self.statsd: assert True, 'success' def test_contextmanager_buffering(self): with self.statsd as s: s.gauge('gauge', 123.4) s.gauge('gauge_other', 456.78) self.assertIsNone(s.socket.recv()) self.assertEqual(self.recv(), 'gauge:123.4|g\ngauge_other:456.78|g') def test_timed_elapsed(self): with self.statsd.timed('test_timer') as t: pass self.assertGreaterEqual(t.elapsed, 0) self.assertEqual(self.recv(), 'test_timer:%s|ms' % t.elapsed)