diff --git a/swh/core/pytest_plugin.py b/swh/core/pytest_plugin.py index 1cce566..a1e689e 100644 --- a/swh/core/pytest_plugin.py +++ b/swh/core/pytest_plugin.py @@ -1,329 +1,368 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import deque from functools import partial import logging from os import path import re from typing import Dict, List, Optional from urllib.parse import unquote, urlparse from _pytest.fixtures import FixtureRequest import pytest import requests from requests.adapters import BaseAdapter from requests.structures import CaseInsensitiveDict from requests.utils import get_encoding_from_headers logger = logging.getLogger(__name__) # Check get_local_factory function # Maximum number of iteration checks to generate requests responses MAX_VISIT_FILES = 10 def get_response_cb( request: requests.Request, context, datadir, ignore_urls: List[str] = [], visits: Optional[Dict] = None, ): """Mount point callback to fetch on disk the request's content. The request urls provided are url decoded first to resolve the associated file on disk. This is meant to be used as 'body' argument of the requests_mock.get() method. It will look for files on the local filesystem based on the requested URL, using the following rules: - files are searched in the datadir/ directory - the local file name is the path part of the URL with path hierarchy markers (aka '/') replaced by '_' Eg. if you use the requests_mock fixture in your test file as: requests_mock.get('https?://nowhere.com', body=get_response_cb) # or even requests_mock.get(re.compile('https?://'), body=get_response_cb) then a call requests.get like: requests.get('https://nowhere.com/path/to/resource?a=b&c=d') will look the content of the response in: datadir/https_nowhere.com/path_to_resource,a=b,c=d or a call requests.get like: requests.get('http://nowhere.com/path/to/resource?a=b&c=d') will look the content of the response in: datadir/http_nowhere.com/path_to_resource,a=b,c=d Args: request: Object requests context (requests.Context): Object holding response metadata information (status_code, headers, etc...) datadir: Data files path ignore_urls: urls whose status response should be 404 even if the local file exists visits: Dict of url, number of visits. If None, disable multi visit support (default) Returns: Optional[FileDescriptor] on disk file to read from the test context """ logger.debug("get_response_cb(%s, %s)", request, context) logger.debug("url: %s", request.url) logger.debug("ignore_urls: %s", ignore_urls) unquoted_url = unquote(request.url) if unquoted_url in ignore_urls: context.status_code = 404 return None url = urlparse(unquoted_url) # http://pypi.org ~> http_pypi.org # https://files.pythonhosted.org ~> https_files.pythonhosted.org dirname = "%s_%s" % (url.scheme, url.hostname) # url.path: pypi//json -> local file: pypi__json filename = url.path[1:] if filename.endswith("/"): filename = filename[:-1] filename = filename.replace("/", "_") if url.query: filename += "," + url.query.replace("&", ",") filepath = path.join(datadir, dirname, filename) if visits is not None: visit = visits.get(url, 0) visits[url] = visit + 1 if visit: filepath = filepath + "_visit%s" % visit if not path.isfile(filepath): logger.debug("not found filepath: %s", filepath) context.status_code = 404 return None fd = open(filepath, "rb") context.headers["content-length"] = str(path.getsize(filepath)) return fd @pytest.fixture def datadir(request: FixtureRequest) -> str: """By default, returns the test directory's data directory. This can be overridden on a per file tree basis. Add an override definition in the local conftest, for example:: import pytest from os import path @pytest.fixture def datadir(): return path.join(path.abspath(path.dirname(__file__)), 'resources') """ return path.join(path.dirname(str(request.fspath)), "data") def requests_mock_datadir_factory( ignore_urls: List[str] = [], has_multi_visit: bool = False ): """This factory generates fixtures which allow to look for files on the local filesystem based on the requested URL, using the following rules: - files are searched in the data/ directory - the local file name is the path part of the URL with path hierarchy markers (aka '/') replaced by '_' Multiple implementations are possible, for example: ``requests_mock_datadir_factory([])`` This computes the file name from the query and always returns the same result. ``requests_mock_datadir_factory(has_multi_visit=True)`` This computes the file name from the query and returns the content of the filename the first time, the next call returning the content of files suffixed with _visit1 and so on and so forth. If the file is not found, returns a 404. ``requests_mock_datadir_factory(ignore_urls=['url1', 'url2'])`` This will ignore any files corresponding to url1 and url2, always returning 404. Args: ignore_urls: List of urls to always returns 404 (whether file exists or not) has_multi_visit: Activate or not the multiple visits behavior """ @pytest.fixture def requests_mock_datadir(requests_mock, datadir): if not has_multi_visit: cb = partial(get_response_cb, ignore_urls=ignore_urls, datadir=datadir) requests_mock.get(re.compile("https?://"), body=cb) else: visits = {} requests_mock.get( re.compile("https?://"), body=partial( get_response_cb, ignore_urls=ignore_urls, visits=visits, datadir=datadir, ), ) return requests_mock return requests_mock_datadir # Default `requests_mock_datadir` implementation requests_mock_datadir = requests_mock_datadir_factory() """ Instance of :py:func:`requests_mock_datadir_factory`, with the default arguments. """ # Implementation for multiple visits behavior: # - first time, it checks for a file named `filename` # - second time, it checks for a file named `filename`_visit1 # etc... requests_mock_datadir_visits = requests_mock_datadir_factory(has_multi_visit=True) """ Instance of :py:func:`requests_mock_datadir_factory`, with the default arguments, but `has_multi_visit=True`. """ @pytest.fixture def swh_rpc_client(swh_rpc_client_class, swh_rpc_adapter): """This fixture generates an RPCClient instance that uses the class generated by the rpc_client_class fixture as backend. Since it uses the swh_rpc_adapter, HTTP queries will be intercepted and routed directly to the current Flask app (as provided by the `app` fixture). So this stack of fixtures allows to test the RPCClient -> RPCServerApp communication path using a real RPCClient instance and a real Flask (RPCServerApp) app instance. To use this fixture: - ensure an `app` fixture exists and generate a Flask application, - implement an `swh_rpc_client_class` fixtures that returns the RPCClient-based class to use as client side for the tests, - implement your tests using this `swh_rpc_client` fixture. See swh/core/api/tests/test_rpc_client_server.py for an example of usage. """ url = "mock://example.com" cli = swh_rpc_client_class(url=url) # we need to clear the list of existing adapters here so we ensure we # have one and only one adapter which is then used for all the requests. cli.session.adapters.clear() cli.session.mount("mock://", swh_rpc_adapter) return cli @pytest.fixture def swh_rpc_adapter(app): """Fixture that generates a requests.Adapter instance that can be used to test client/servers code based on swh.core.api classes. See swh/core/api/tests/test_rpc_client_server.py for an example of usage. """ with app.test_client() as client: yield RPCTestAdapter(client) class RPCTestAdapter(BaseAdapter): def __init__(self, client): self._client = client def build_response(self, req, resp): response = requests.Response() # Fallback to None if there's no status_code, for whatever reason. response.status_code = resp.status_code # Make headers case-insensitive. response.headers = CaseInsensitiveDict(getattr(resp, "headers", {})) # Set encoding. response.encoding = get_encoding_from_headers(response.headers) response.raw = resp response.reason = response.raw.status if isinstance(req.url, bytes): response.url = req.url.decode("utf-8") else: response.url = req.url # Give the Response some context. response.request = req response.connection = self response._content = resp.data return response def send(self, request, **kw): """ Overrides ``requests.adapters.BaseAdapter.send`` """ resp = self._client.open( request.url, method=request.method, headers=request.headers.items(), data=request.body, ) return self.build_response(request, resp) @pytest.fixture def flask_app_client(app): with app.test_client() as client: yield client # stolen from pytest-flask, required to have url_for() working within tests # using flask_app_client fixture. @pytest.fixture(autouse=True) def _push_request_context(request: FixtureRequest): """During tests execution request context has been pushed, e.g. `url_for`, `session`, etc. can be used in tests as is:: def test_app(app, client): assert client.get(url_for('myview')).status_code == 200 """ if "app" not in request.fixturenames: return app = request.getfixturevalue("app") ctx = app.test_request_context() ctx.push() def teardown(): ctx.pop() request.addfinalizer(teardown) + + +class FakeSocket(object): + """ A fake socket for testing. """ + + def __init__(self): + self.payloads = deque() + + def send(self, payload): + assert type(payload) == bytes + self.payloads.append(payload) + + def recv(self): + try: + return self.payloads.popleft().decode("utf-8") + except IndexError: + return None + + def close(self): + pass + + def __repr__(self): + return str(self.payloads) + + +@pytest.fixture +def statsd(): + """Simple fixture giving a Statsd instance suitable for tests + + The Statsd instance uses a FakeSocket as `.socket` attribute in which one + can get the accumulated statsd messages in a deque in `.socket.payloads`. + """ + + from swh.core.statsd import Statsd + + statsd = Statsd() + statsd._socket = FakeSocket() + yield statsd diff --git a/swh/core/tests/test_statsd.py b/swh/core/tests/test_statsd.py index c0fa1ff..1f41483 100644 --- a/swh/core/tests/test_statsd.py +++ b/swh/core/tests/test_statsd.py @@ -1,560 +1,536 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Initially imported from https://github.com/DataDog/datadogpy/ # at revision 62b3a3e89988dc18d78c282fe3ff5d1813917436 # # Copyright (c) 2015, Datadog # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of Datadog nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # -from collections import deque -from contextlib import contextmanager -import os import socket import time -import unittest import pytest +from swh.core.pytest_plugin import FakeSocket from swh.core.statsd import Statsd, TimedContextManagerDecorator -@contextmanager -def preserve_envvars(*envvars): - """Context manager preserving the value of environment variables""" - preserved = {} - to_delete = object() +class BrokenSocket(FakeSocket): + def send(self, payload): + raise socket.error("Socket error") - for var in envvars: - preserved[var] = os.environ.get(var, to_delete) - yield +class SlowSocket(FakeSocket): + def send(self, payload): + raise socket.timeout("Socket timeout") - for var in envvars: - old = preserved[var] - if old is not to_delete: - os.environ[var] = old - else: - del os.environ[var] +def assert_almost_equal(a, b, delta): + assert 0 <= abs(a - b) <= delta, f"|{a} - {b}| not within {delta}" -class FakeSocket(object): - """ A fake socket for testing. """ - def __init__(self): - self.payloads = deque() +def test_set(statsd): + statsd.set("set", 123) + assert statsd.socket.recv() == "set:123|s" - def send(self, payload): - assert type(payload) == bytes - self.payloads.append(payload) - def recv(self): - try: - return self.payloads.popleft().decode("utf-8") - except IndexError: - return None +def test_gauge(statsd): + statsd.gauge("gauge", 123.4) + assert statsd.socket.recv() == "gauge:123.4|g" - def close(self): - pass - def __repr__(self): - return str(self.payloads) +def test_counter(statsd): + statsd.increment("page.views") + assert statsd.socket.recv() == "page.views:1|c" + statsd.increment("page.views", 11) + assert statsd.socket.recv() == "page.views:11|c" -class BrokenSocket(FakeSocket): - def send(self, payload): - raise socket.error("Socket error") + statsd.decrement("page.views") + assert statsd.socket.recv() == "page.views:-1|c" + statsd.decrement("page.views", 12) + assert statsd.socket.recv() == "page.views:-12|c" -class SlowSocket(FakeSocket): - def send(self, payload): - raise socket.timeout("Socket timeout") +def test_histogram(statsd): + statsd.histogram("histo", 123.4) + assert statsd.socket.recv() == "histo:123.4|h" -class TestStatsd(unittest.TestCase): - def setUp(self): - """ - Set up a default Statsd instance and mock the socket. - """ - # - self.statsd = Statsd() - self.statsd._socket = FakeSocket() - - def recv(self): - return self.statsd.socket.recv() - - def test_set(self): - self.statsd.set("set", 123) - assert self.recv() == "set:123|s" - - def test_gauge(self): - self.statsd.gauge("gauge", 123.4) - assert self.recv() == "gauge:123.4|g" - - def test_counter(self): - self.statsd.increment("page.views") - self.assertEqual("page.views:1|c", self.recv()) - - self.statsd.increment("page.views", 11) - self.assertEqual("page.views:11|c", self.recv()) - - self.statsd.decrement("page.views") - self.assertEqual("page.views:-1|c", self.recv()) - - self.statsd.decrement("page.views", 12) - self.assertEqual("page.views:-12|c", self.recv()) - - def test_histogram(self): - self.statsd.histogram("histo", 123.4) - self.assertEqual("histo:123.4|h", self.recv()) - - def test_tagged_gauge(self): - self.statsd.gauge("gt", 123.4, tags={"country": "china", "age": 45}) - self.assertEqual("gt:123.4|g|#age:45,country:china", self.recv()) - - def test_tagged_counter(self): - self.statsd.increment("ct", tags={"country": "españa"}) - self.assertEqual("ct:1|c|#country:españa", self.recv()) - - def test_tagged_histogram(self): - self.statsd.histogram("h", 1, tags={"test_tag": "tag_value"}) - self.assertEqual("h:1|h|#test_tag:tag_value", self.recv()) - - def test_sample_rate(self): - self.statsd.increment("c", sample_rate=0) - assert not self.recv() - for i in range(10000): - self.statsd.increment("sampled_counter", sample_rate=0.3) - self.assert_almost_equal(3000, len(self.statsd.socket.payloads), 150) - self.assertEqual("sampled_counter:1|c|@0.3", self.recv()) - - def test_tags_and_samples(self): - for i in range(100): - self.statsd.gauge("gst", 23, tags={"sampled": True}, sample_rate=0.9) - - self.assert_almost_equal(90, len(self.statsd.socket.payloads), 10) - self.assertEqual("gst:23|g|@0.9|#sampled:True", self.recv()) - - def test_timing(self): - self.statsd.timing("t", 123) - self.assertEqual("t:123|ms", self.recv()) - - def test_metric_namespace(self): - """ - Namespace prefixes all metric names. - """ - self.statsd.namespace = "foo" - self.statsd.gauge("gauge", 123.4) - self.assertEqual("foo.gauge:123.4|g", self.recv()) - - # Test Client level constant tags - def test_gauge_constant_tags(self): - self.statsd.constant_tags = { - "bar": "baz", - } - self.statsd.gauge("gauge", 123.4) - assert self.recv() == "gauge:123.4|g|#bar:baz" - - def test_counter_constant_tag_with_metric_level_tags(self): - self.statsd.constant_tags = { - "bar": "baz", - "foo": True, - } - self.statsd.increment("page.views", tags={"extra": "extra"}) - self.assertEqual( - "page.views:1|c|#bar:baz,extra:extra,foo:True", self.recv(), - ) - - def test_gauge_constant_tags_with_metric_level_tags_twice(self): - metric_level_tag = {"foo": "bar"} - self.statsd.constant_tags = {"bar": "baz"} - self.statsd.gauge("gauge", 123.4, tags=metric_level_tag) - assert self.recv() == "gauge:123.4|g|#bar:baz,foo:bar" - - # sending metrics multiple times with same metric-level tags - # should not duplicate the tags being sent - self.statsd.gauge("gauge", 123.4, tags=metric_level_tag) - assert self.recv() == "gauge:123.4|g|#bar:baz,foo:bar" - - def assert_almost_equal(self, a, b, delta): - self.assertTrue( - 0 <= abs(a - b) <= delta, "%s - %s not within %s" % (a, b, delta) - ) - - def test_socket_error(self): - self.statsd._socket = BrokenSocket() - self.statsd.gauge("no error", 1) - assert True, "success" - def test_socket_timeout(self): - self.statsd._socket = SlowSocket() - self.statsd.gauge("no error", 1) - assert True, "success" +def test_tagged_gauge(statsd): + statsd.gauge("gt", 123.4, tags={"country": "china", "age": 45}) + assert statsd.socket.recv() == "gt:123.4|g|#age:45,country:china" - def test_timed(self): - """ - Measure the distribution of a function's run time. - """ - @self.statsd.timed("timed.test") - def func(a, b, c=1, d=1): - """docstring""" - time.sleep(0.5) - return (a, b, c, d) +def test_tagged_counter(statsd): + statsd.increment("ct", tags={"country": "españa"}) + assert statsd.socket.recv() == "ct:1|c|#country:españa" - self.assertEqual("func", func.__name__) - self.assertEqual("docstring", func.__doc__) - result = func(1, 2, d=3) - # Assert it handles args and kwargs correctly. - self.assertEqual(result, (1, 2, 1, 3)) +def test_tagged_histogram(statsd): + statsd.histogram("h", 1, tags={"test_tag": "tag_value"}) + assert statsd.socket.recv() == "h:1|h|#test_tag:tag_value" - packet = self.recv() - name_value, type_ = packet.split("|") - name, value = name_value.split(":") - self.assertEqual("ms", type_) - self.assertEqual("timed.test", name) - self.assert_almost_equal(500, float(value), 100) +def test_sample_rate(statsd): + statsd.increment("c", sample_rate=0) + assert not statsd.socket.recv() + for i in range(10000): + statsd.increment("sampled_counter", sample_rate=0.3) + assert_almost_equal(3000, len(statsd.socket.payloads), 150) + assert statsd.socket.recv() == "sampled_counter:1|c|@0.3" - def test_timed_exception(self): - """ - Exception bubble out of the decorator and is reported - to statsd as a dedicated counter. - """ - @self.statsd.timed("timed.test") - def func(a, b, c=1, d=1): - """docstring""" - time.sleep(0.5) - return (a / b, c, d) +def test_tags_and_samples(statsd): + for i in range(100): + statsd.gauge("gst", 23, tags={"sampled": True}, sample_rate=0.9) - self.assertEqual("func", func.__name__) - self.assertEqual("docstring", func.__doc__) + assert_almost_equal(90, len(statsd.socket.payloads), 10) + assert statsd.socket.recv() == "gst:23|g|@0.9|#sampled:True" - with self.assertRaises(ZeroDivisionError): - func(1, 0) - packet = self.recv() - name_value, type_ = packet.split("|") - name, value = name_value.split(":") +def test_timing(statsd): + statsd.timing("t", 123) + assert statsd.socket.recv() == "t:123|ms" - self.assertEqual("c", type_) - self.assertEqual("timed.test_error_count", name) - self.assertEqual(int(value), 1) - def test_timed_no_metric(self,): - """ - Test using a decorator without providing a metric. - """ +def test_metric_namespace(statsd): + """ + Namespace prefixes all metric names. + """ + statsd.namespace = "foo" + statsd.gauge("gauge", 123.4) + assert statsd.socket.recv() == "foo.gauge:123.4|g" - @self.statsd.timed() - def func(a, b, c=1, d=1): - """docstring""" - time.sleep(0.5) - return (a, b, c, d) - self.assertEqual("func", func.__name__) - self.assertEqual("docstring", func.__doc__) +# Test Client level constant tags +def test_gauge_constant_tags(statsd): + statsd.constant_tags = { + "bar": "baz", + } + statsd.gauge("gauge", 123.4) + assert statsd.socket.recv() == "gauge:123.4|g|#bar:baz" + + +def test_counter_constant_tag_with_metric_level_tags(statsd): + statsd.constant_tags = { + "bar": "baz", + "foo": True, + } + statsd.increment("page.views", tags={"extra": "extra"}) + assert statsd.socket.recv() == "page.views:1|c|#bar:baz,extra:extra,foo:True" + + +def test_gauge_constant_tags_with_metric_level_tags_twice(statsd): + metric_level_tag = {"foo": "bar"} + statsd.constant_tags = {"bar": "baz"} + statsd.gauge("gauge", 123.4, tags=metric_level_tag) + assert statsd.socket.recv() == "gauge:123.4|g|#bar:baz,foo:bar" + + # sending metrics multiple times with same metric-level tags + # should not duplicate the tags being sent + statsd.gauge("gauge", 123.4, tags=metric_level_tag) + assert statsd.socket.recv() == "gauge:123.4|g|#bar:baz,foo:bar" + + +def test_socket_error(statsd): + statsd._socket = BrokenSocket() + statsd.gauge("no error", 1) + assert True, "success" + + +def test_socket_timeout(statsd): + statsd._socket = SlowSocket() + statsd.gauge("no error", 1) + assert True, "success" + + +def test_timed(statsd): + """ + Measure the distribution of a function's run time. + """ + + @statsd.timed("timed.test") + def func(a, b, c=1, d=1): + """docstring""" + time.sleep(0.5) + return (a, b, c, d) + + assert func.__name__ == "func" + assert func.__doc__ == "docstring" + + result = func(1, 2, d=3) + # Assert it handles args and kwargs correctly. + assert result, (1, 2, 1 == 3) + + packet = statsd.socket.recv() + name_value, type_ = packet.split("|") + name, value = name_value.split(":") + + assert type_ == "ms" + assert name == "timed.test" + assert_almost_equal(500, float(value), 100) + + +def test_timed_exception(statsd): + """ + Exception bubble out of the decorator and is reported + to statsd as a dedicated counter. + """ + + @statsd.timed("timed.test") + def func(a, b, c=1, d=1): + """docstring""" + time.sleep(0.5) + return (a / b, c, d) + + assert func.__name__ == "func" + assert func.__doc__ == "docstring" + + with pytest.raises(ZeroDivisionError): + func(1, 0) + + packet = statsd.socket.recv() + name_value, type_ = packet.split("|") + name, value = name_value.split(":") + + assert type_ == "c" + assert name == "timed.test_error_count" + assert int(value) == 1 + + +def test_timed_no_metric(statsd): + """ + Test using a decorator without providing a metric. + """ + + @statsd.timed() + def func(a, b, c=1, d=1): + """docstring""" + time.sleep(0.5) + return (a, b, c, d) + + assert func.__name__ == "func" + assert func.__doc__ == "docstring" + + result = func(1, 2, d=3) + # Assert it handles args and kwargs correctly. + assert result, (1, 2, 1 == 3) + + packet = statsd.socket.recv() + name_value, type_ = packet.split("|") + name, value = name_value.split(":") + + assert type_ == "ms" + assert name == "swh.core.tests.test_statsd.func" + assert_almost_equal(500, float(value), 100) + + +def test_timed_coroutine(statsd): + """ + Measure the distribution of a coroutine function's run time. + + Warning: Python >= 3.5 only. + """ + import asyncio + + @statsd.timed("timed.test") + @asyncio.coroutine + def print_foo(): + """docstring""" + time.sleep(0.5) + print("foo") + + loop = asyncio.new_event_loop() + loop.run_until_complete(print_foo()) + loop.close() + + # Assert + packet = statsd.socket.recv() + name_value, type_ = packet.split("|") + name, value = name_value.split(":") + + assert type_ == "ms" + assert name == "timed.test" + assert_almost_equal(500, float(value), 100) + - result = func(1, 2, d=3) - # Assert it handles args and kwargs correctly. - self.assertEqual(result, (1, 2, 1, 3)) +def test_timed_context(statsd): + """ + Measure the distribution of a context's run time. + """ + # In milliseconds + with statsd.timed("timed_context.test") as timer: + assert isinstance(timer, TimedContextManagerDecorator) + time.sleep(0.5) + + packet = statsd.socket.recv() + name_value, type_ = packet.split("|") + name, value = name_value.split(":") - packet = self.recv() - name_value, type_ = packet.split("|") - name, value = name_value.split(":") + assert type_ == "ms" + assert name == "timed_context.test" + assert_almost_equal(500, float(value), 100) + assert_almost_equal(500, timer.elapsed, 100) - self.assertEqual("ms", type_) - self.assertEqual("swh.core.tests.test_statsd.func", name) - self.assert_almost_equal(500, float(value), 100) - def test_timed_coroutine(self): - """ - Measure the distribution of a coroutine function's run time. +def test_timed_context_exception(statsd): + """ + Exception bubbles out of the `timed` context manager and is + reported to statsd as a dedicated counter. + """ - Warning: Python >= 3.5 only. - """ - import asyncio + class ContextException(Exception): + pass - @self.statsd.timed("timed.test") - @asyncio.coroutine - def print_foo(): - """docstring""" + def func(statsd): + with statsd.timed("timed_context.test"): time.sleep(0.5) - print("foo") - - loop = asyncio.new_event_loop() - loop.run_until_complete(print_foo()) - loop.close() - - # Assert - packet = self.recv() - name_value, type_ = packet.split("|") - name, value = name_value.split(":") - - self.assertEqual("ms", type_) - self.assertEqual("timed.test", name) - self.assert_almost_equal(500, float(value), 100) - - def test_timed_context(self): - """ - Measure the distribution of a context's run time. - """ - # In milliseconds - with self.statsd.timed("timed_context.test") as timer: - self.assertIsInstance(timer, TimedContextManagerDecorator) + raise ContextException() + + # Ensure the exception was raised. + with pytest.raises(ContextException): + func(statsd) + + # Ensure the timing was recorded. + packet = statsd.socket.recv() + name_value, type_ = packet.split("|") + name, value = name_value.split(":") + + assert type_ == "c" + assert name == "timed_context.test_error_count" + assert int(value) == 1 + + +def test_timed_context_no_metric_name_exception(statsd): + """Test that an exception occurs if using a context manager without a + metric name. + """ + + def func(statsd): + with statsd.timed(): time.sleep(0.5) - packet = self.recv() - name_value, type_ = packet.split("|") - name, value = name_value.split(":") + # Ensure the exception was raised. + with pytest.raises(TypeError): + func(statsd) - self.assertEqual("ms", type_) - self.assertEqual("timed_context.test", name) - self.assert_almost_equal(500, float(value), 100) - self.assert_almost_equal(500, timer.elapsed, 100) + # Ensure the timing was recorded. + packet = statsd.socket.recv() + assert packet is None - def test_timed_context_exception(self): - """ - Exception bubbles out of the `timed` context manager and is - reported to statsd as a dedicated counter. - """ - class ContextException(Exception): - pass +def test_timed_start_stop_calls(statsd): + timer = statsd.timed("timed_context.test") + timer.start() + time.sleep(0.5) + timer.stop() - def func(self): - with self.statsd.timed("timed_context.test"): - time.sleep(0.5) - raise ContextException() + packet = statsd.socket.recv() + name_value, type_ = packet.split("|") + name, value = name_value.split(":") - # Ensure the exception was raised. - self.assertRaises(ContextException, func, self) + assert type_ == "ms" + assert name == "timed_context.test" + assert_almost_equal(500, float(value), 100) - # Ensure the timing was recorded. - packet = self.recv() - name_value, type_ = packet.split("|") - name, value = name_value.split(":") - self.assertEqual("c", type_) - self.assertEqual("timed_context.test_error_count", name) - self.assertEqual(int(value), 1) +def test_batched(statsd): + statsd.open_buffer() + statsd.gauge("page.views", 123) + statsd.timing("timer", 123) + statsd.close_buffer() - def test_timed_context_no_metric_name_exception(self): - """Test that an exception occurs if using a context manager without a - metric name. - """ + assert statsd.socket.recv() == "page.views:123|g\ntimer:123|ms" - def func(self): - with self.statsd.timed(): - time.sleep(0.5) - # Ensure the exception was raised. - self.assertRaises(TypeError, func, self) +def test_context_manager(): + fake_socket = FakeSocket() + with Statsd() as statsd: + statsd._socket = fake_socket + statsd.gauge("page.views", 123) + statsd.timing("timer", 123) - # Ensure the timing was recorded. - packet = self.recv() - self.assertEqual(packet, None) + assert fake_socket.recv() == "page.views:123|g\ntimer:123|ms" - def test_timed_start_stop_calls(self): - timer = self.statsd.timed("timed_context.test") - timer.start() - time.sleep(0.5) - timer.stop() - - packet = self.recv() - name_value, type_ = packet.split("|") - name, value = name_value.split(":") - - self.assertEqual("ms", type_) - self.assertEqual("timed_context.test", name) - self.assert_almost_equal(500, float(value), 100) - - def test_batched(self): - self.statsd.open_buffer() - self.statsd.gauge("page.views", 123) - self.statsd.timing("timer", 123) - self.statsd.close_buffer() - - self.assertEqual("page.views:123|g\ntimer:123|ms", self.recv()) - - def test_context_manager(self): - fake_socket = FakeSocket() - with Statsd() as statsd: - statsd._socket = fake_socket - statsd.gauge("page.views", 123) - statsd.timing("timer", 123) - - self.assertEqual("page.views:123|g\ntimer:123|ms", fake_socket.recv()) - - def test_batched_buffer_autoflush(self): - fake_socket = FakeSocket() - with Statsd() as statsd: - statsd._socket = fake_socket - for i in range(51): - statsd.increment("mycounter") - self.assertEqual( - "\n".join(["mycounter:1|c" for i in range(50)]), fake_socket.recv(), - ) - - self.assertEqual("mycounter:1|c", fake_socket.recv()) - - def test_module_level_instance(self): - from swh.core.statsd import statsd - - self.assertTrue(isinstance(statsd, Statsd)) - - def test_instantiating_does_not_connect(self): - local_statsd = Statsd() - self.assertEqual(None, local_statsd._socket) - - def test_accessing_socket_opens_socket(self): - local_statsd = Statsd() - try: - self.assertIsNotNone(local_statsd.socket) - finally: - local_statsd.close_socket() - - def test_accessing_socket_multiple_times_returns_same_socket(self): - local_statsd = Statsd() - fresh_socket = FakeSocket() - local_statsd._socket = fresh_socket - self.assertEqual(fresh_socket, local_statsd.socket) - self.assertNotEqual(FakeSocket(), local_statsd.socket) - - def test_tags_from_environment(self): - with preserve_envvars("STATSD_TAGS"): - os.environ["STATSD_TAGS"] = "country:china,age:45" - statsd = Statsd() - - statsd._socket = FakeSocket() - statsd.gauge("gt", 123.4) - self.assertEqual("gt:123.4|g|#age:45,country:china", statsd.socket.recv()) - - def test_tags_from_environment_and_constant(self): - with preserve_envvars("STATSD_TAGS"): - os.environ["STATSD_TAGS"] = "country:china,age:45" - statsd = Statsd(constant_tags={"country": "canada"}) - statsd._socket = FakeSocket() - statsd.gauge("gt", 123.4) - self.assertEqual("gt:123.4|g|#age:45,country:canada", statsd.socket.recv()) - - def test_tags_from_environment_warning(self): - with preserve_envvars("STATSD_TAGS"): - os.environ["STATSD_TAGS"] = "valid:tag,invalid_tag" - with pytest.warns(UserWarning) as record: - statsd = Statsd() - - assert len(record) == 1 - assert "invalid_tag" in record[0].message.args[0] - assert "valid:tag" not in record[0].message.args[0] - assert statsd.constant_tags == {"valid": "tag"} - - def test_gauge_doesnt_send_none(self): - self.statsd.gauge("metric", None) - assert self.recv() is None - - def test_increment_doesnt_send_none(self): - self.statsd.increment("metric", None) - assert self.recv() is None - - def test_decrement_doesnt_send_none(self): - self.statsd.decrement("metric", None) - assert self.recv() is None - - def test_timing_doesnt_send_none(self): - self.statsd.timing("metric", None) - assert self.recv() is None - - def test_histogram_doesnt_send_none(self): - self.statsd.histogram("metric", None) - assert self.recv() is None - - def test_param_host(self): - with preserve_envvars("STATSD_HOST", "STATSD_PORT"): - os.environ["STATSD_HOST"] = "test-value" - os.environ["STATSD_PORT"] = "" - local_statsd = Statsd(host="actual-test-value") - - self.assertEqual(local_statsd.host, "actual-test-value") - self.assertEqual(local_statsd.port, 8125) - - def test_param_port(self): - with preserve_envvars("STATSD_HOST", "STATSD_PORT"): - os.environ["STATSD_HOST"] = "" - os.environ["STATSD_PORT"] = "12345" - local_statsd = Statsd(port=4321) - - self.assertEqual(local_statsd.host, "localhost") - self.assertEqual(local_statsd.port, 4321) - - def test_envvar_host(self): - with preserve_envvars("STATSD_HOST", "STATSD_PORT"): - os.environ["STATSD_HOST"] = "test-value" - os.environ["STATSD_PORT"] = "" - local_statsd = Statsd() - - self.assertEqual(local_statsd.host, "test-value") - self.assertEqual(local_statsd.port, 8125) - - def test_envvar_port(self): - with preserve_envvars("STATSD_HOST", "STATSD_PORT"): - os.environ["STATSD_HOST"] = "" - os.environ["STATSD_PORT"] = "12345" - local_statsd = Statsd() - - self.assertEqual(local_statsd.host, "localhost") - self.assertEqual(local_statsd.port, 12345) - - def test_namespace_added(self): - local_statsd = Statsd(namespace="test-namespace") - local_statsd._socket = FakeSocket() - - local_statsd.gauge("gauge", 123.4) - assert local_statsd.socket.recv() == "test-namespace.gauge:123.4|g" - - def test_contextmanager_empty(self): - with self.statsd: - assert True, "success" - - def test_contextmanager_buffering(self): - with self.statsd as s: - s.gauge("gauge", 123.4) - s.gauge("gauge_other", 456.78) - self.assertIsNone(s.socket.recv()) - - self.assertEqual(self.recv(), "gauge:123.4|g\ngauge_other:456.78|g") - - def test_timed_elapsed(self): - with self.statsd.timed("test_timer") as t: - pass - - self.assertGreaterEqual(t.elapsed, 0) - self.assertEqual(self.recv(), "test_timer:%s|ms" % t.elapsed) + +def test_batched_buffer_autoflush(): + fake_socket = FakeSocket() + with Statsd() as statsd: + statsd._socket = fake_socket + for i in range(51): + statsd.increment("mycounter") + assert "\n".join(["mycounter:1|c" for i in range(50)]) == fake_socket.recv() + + assert fake_socket.recv() == "mycounter:1|c" + + +def test_module_level_instance(statsd): + from swh.core.statsd import statsd + + assert isinstance(statsd, Statsd) + + +def test_instantiating_does_not_connect(): + local_statsd = Statsd() + assert local_statsd._socket is None + + +def test_accessing_socket_opens_socket(): + local_statsd = Statsd() + try: + assert local_statsd.socket is not None + finally: + local_statsd.close_socket() + + +def test_accessing_socket_multiple_times_returns_same_socket(): + local_statsd = Statsd() + fresh_socket = FakeSocket() + local_statsd._socket = fresh_socket + assert fresh_socket == local_statsd.socket + assert FakeSocket() != local_statsd.socket + + +def test_tags_from_environment(monkeypatch): + monkeypatch.setenv("STATSD_TAGS", "country:china,age:45") + statsd = Statsd() + statsd._socket = FakeSocket() + statsd.gauge("gt", 123.4) + assert statsd.socket.recv() == "gt:123.4|g|#age:45,country:china" + + +def test_tags_from_environment_and_constant(monkeypatch): + monkeypatch.setenv("STATSD_TAGS", "country:china,age:45") + statsd = Statsd(constant_tags={"country": "canada"}) + statsd._socket = FakeSocket() + statsd.gauge("gt", 123.4) + assert statsd.socket.recv() == "gt:123.4|g|#age:45,country:canada" + + +def test_tags_from_environment_warning(monkeypatch): + monkeypatch.setenv("STATSD_TAGS", "valid:tag,invalid_tag") + with pytest.warns(UserWarning) as record: + statsd = Statsd() + + assert len(record) == 1 + assert "invalid_tag" in record[0].message.args[0] + assert "valid:tag" not in record[0].message.args[0] + assert statsd.constant_tags == {"valid": "tag"} + + +def test_gauge_doesnt_send_none(statsd): + statsd.gauge("metric", None) + assert statsd.socket.recv() is None + + +def test_increment_doesnt_send_none(statsd): + statsd.increment("metric", None) + assert statsd.socket.recv() is None + + +def test_decrement_doesnt_send_none(statsd): + statsd.decrement("metric", None) + assert statsd.socket.recv() is None + + +def test_timing_doesnt_send_none(statsd): + statsd.timing("metric", None) + assert statsd.socket.recv() is None + + +def test_histogram_doesnt_send_none(statsd): + statsd.histogram("metric", None) + assert statsd.socket.recv() is None + + +def test_param_host(monkeypatch): + monkeypatch.setenv("STATSD_HOST", "test-value") + monkeypatch.setenv("STATSD_PORT", "") + local_statsd = Statsd(host="actual-test-value") + + assert local_statsd.host == "actual-test-value" + assert local_statsd.port == 8125 + + +def test_param_port(monkeypatch): + monkeypatch.setenv("STATSD_HOST", "") + monkeypatch.setenv("STATSD_PORT", "12345") + local_statsd = Statsd(port=4321) + assert local_statsd.host == "localhost" + assert local_statsd.port == 4321 + + +def test_envvar_host(monkeypatch): + monkeypatch.setenv("STATSD_HOST", "test-value") + monkeypatch.setenv("STATSD_PORT", "") + local_statsd = Statsd() + assert local_statsd.host == "test-value" + assert local_statsd.port == 8125 + + +def test_envvar_port(monkeypatch): + monkeypatch.setenv("STATSD_HOST", "") + monkeypatch.setenv("STATSD_PORT", "12345") + local_statsd = Statsd() + + assert local_statsd.host == "localhost" + assert local_statsd.port == 12345 + + +def test_namespace_added(): + local_statsd = Statsd(namespace="test-namespace") + local_statsd._socket = FakeSocket() + + local_statsd.gauge("gauge", 123.4) + assert local_statsd.socket.recv() == "test-namespace.gauge:123.4|g" + + +def test_contextmanager_empty(statsd): + with statsd: + assert True, "success" + + +def test_contextmanager_buffering(statsd): + with statsd as s: + s.gauge("gauge", 123.4) + s.gauge("gauge_other", 456.78) + assert s.socket.recv() is None + + assert statsd.socket.recv() == "gauge:123.4|g\ngauge_other:456.78|g" + + +def test_timed_elapsed(statsd): + with statsd.timed("test_timer") as t: + pass + + assert t.elapsed >= 0 + assert statsd.socket.recv() == "test_timer:%s|ms" % t.elapsed