diff --git a/swh/core/db/tests/db_testing.py b/swh/core/db/tests/db_testing.py index c2122e8..63cbcaf 100644 --- a/swh/core/db/tests/db_testing.py +++ b/swh/core/db/tests/db_testing.py @@ -1,315 +1,315 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import glob import subprocess import psycopg2 from swh.core.utils import numfile_sortkey as sortkey DB_DUMP_TYPES = {'.sql': 'psql', '.dump': 'pg_dump'} def swh_db_version(dbname_or_service): """Retrieve the swh version if any. In case of the db not initialized, this returns None. Otherwise, this returns the db's version. Args: dbname_or_service (str): The db's name or service Returns: Optional[Int]: Either the db's version or None """ query = 'select version from dbversion order by dbversion desc limit 1' cmd = [ 'psql', '--tuples-only', '--no-psqlrc', '--quiet', '-v', 'ON_ERROR_STOP=1', "--command=%s" % query, dbname_or_service ] try: r = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, universal_newlines=True) result = int(r.stdout.strip()) except Exception: # db not initialized result = None return result def pg_restore(dbname, dumpfile, dumptype='pg_dump'): """ Args: dbname: name of the DB to restore into - dumpfile: path fo the dump file + dumpfile: path of the dump file dumptype: one of 'pg_dump' (for binary dumps), 'psql' (for SQL dumps) """ assert dumptype in ['pg_dump', 'psql'] if dumptype == 'pg_dump': subprocess.check_call(['pg_restore', '--no-owner', '--no-privileges', '--dbname', dbname, dumpfile]) elif dumptype == 'psql': subprocess.check_call(['psql', '--quiet', '--no-psqlrc', '-v', 'ON_ERROR_STOP=1', '-f', dumpfile, dbname]) def pg_dump(dbname, dumpfile): subprocess.check_call(['pg_dump', '--no-owner', '--no-privileges', '-Fc', '-f', dumpfile, dbname]) def pg_dropdb(dbname): subprocess.check_call(['dropdb', dbname]) def pg_createdb(dbname, check=True): """Create a db. If check is True and the db already exists, this will raise an exception (original behavior). If check is False and the db already exists, this will fail silently. If the db does not exist, the db will be created. """ subprocess.run(['createdb', dbname], check=check) def db_create(dbname, dumps=None): """create the test DB and load the test data dumps into it dumps is an iterable of couples (dump_file, dump_type). context: setUpClass """ try: pg_createdb(dbname) except subprocess.CalledProcessError: # try recovering once, in case pg_dropdb(dbname) # the db already existed pg_createdb(dbname) for dump, dtype in dumps: pg_restore(dbname, dump, dtype) return dbname def db_destroy(dbname): """destroy the test DB context: tearDownClass """ pg_dropdb(dbname) def db_connect(dbname): """connect to the test DB and open a cursor context: setUp """ conn = psycopg2.connect('dbname=' + dbname) return { 'conn': conn, 'cursor': conn.cursor() } def db_close(conn): """rollback current transaction and disconnect from the test DB context: tearDown """ if not conn.closed: conn.rollback() conn.close() class DbTestConn: def __init__(self, dbname): self.dbname = dbname def __enter__(self): self.db_setup = db_connect(self.dbname) self.conn = self.db_setup['conn'] self.cursor = self.db_setup['cursor'] return self def __exit__(self, *_): db_close(self.conn) class DbTestContext: def __init__(self, name='softwareheritage-test', dumps=None): self.dbname = name self.dumps = dumps def __enter__(self): db_create(dbname=self.dbname, dumps=self.dumps) return self def __exit__(self, *_): db_destroy(self.dbname) class DbTestFixture: """Mix this in a test subject class to get DB testing support. Use the class method add_db() to add a new database to be tested. Using this will create a DbTestConn entry in the `test_db` dictionary for all the tests, indexed by the name of the database. Example: class TestDb(DbTestFixture, unittest.TestCase): @classmethod def setUpClass(cls): cls.add_db('db_name', DUMP) super().setUpClass() def setUp(self): db = self.test_db['db_name'] print('conn: {}, cursor: {}'.format(db.conn, db.cursor)) To ensure test isolation, each test method of the test case class will execute in its own connection, cursor, and transaction. Note that if you want to define setup/teardown methods, you need to explicitly call super() to ensure that the fixture setup/teardown methods are invoked. Here is an example where all setup/teardown methods are defined in a test case: class TestDb(DbTestFixture, unittest.TestCase): @classmethod def setUpClass(cls): # your add_db() calls here super().setUpClass() # your class setup code here def setUp(self): super().setUp() # your instance setup code here def tearDown(self): # your instance teardown code here super().tearDown() @classmethod def tearDownClass(cls): # your class teardown code here super().tearDownClass() """ _DB_DUMP_LIST = {} _DB_LIST = {} DB_TEST_FIXTURE_IMPORTED = True @classmethod def add_db(cls, name='softwareheritage-test', dumps=None): cls._DB_DUMP_LIST[name] = dumps @classmethod def setUpClass(cls): for name, dumps in cls._DB_DUMP_LIST.items(): cls._DB_LIST[name] = DbTestContext(name, dumps) cls._DB_LIST[name].__enter__() super().setUpClass() @classmethod def tearDownClass(cls): super().tearDownClass() for name, context in cls._DB_LIST.items(): context.__exit__() def setUp(self, *args, **kwargs): self.test_db = {} for name in self._DB_LIST.keys(): self.test_db[name] = DbTestConn(name) self.test_db[name].__enter__() super().setUp(*args, **kwargs) def tearDown(self): super().tearDown() for name in self._DB_LIST.keys(): self.test_db[name].__exit__() def reset_db_tables(self, name, excluded=None): db = self.test_db[name] conn = db.conn cursor = db.cursor cursor.execute("""SELECT table_name FROM information_schema.tables WHERE table_schema = %s""", ('public',)) tables = set(table for (table,) in cursor.fetchall()) if excluded is not None: tables -= set(excluded) for table in tables: cursor.execute('truncate table %s cascade' % table) conn.commit() class SingleDbTestFixture(DbTestFixture): """Simplified fixture like DbTest but that can only handle a single DB. Gives access to shortcuts like self.cursor and self.conn. DO NOT use this with other fixtures that need to access databases, like StorageTestFixture. The class can override the following class attributes: TEST_DB_NAME: name of the DB used for testing TEST_DB_DUMP: DB dump to be restored before running test methods; can be set to None if no restore from dump is required. If the dump file name endswith" - '.sql' it will be loaded via psql, - '.dump' it will be loaded via pg_restore. Other file extensions will be ignored. Can be a string or a list of strings; each path will be expanded using glob pattern matching. The test case class will then have the following attributes, accessible via self: dbname: name of the test database conn: psycopg2 connection object cursor: open psycopg2 cursor to the DB """ TEST_DB_NAME = 'softwareheritage-test' TEST_DB_DUMP = None @classmethod def setUpClass(cls): cls.dbname = cls.TEST_DB_NAME # XXX to kill? dump_files = cls.TEST_DB_DUMP if isinstance(dump_files, str): dump_files = [dump_files] all_dump_files = [] for files in dump_files: all_dump_files.extend( sorted(glob.glob(files), key=sortkey)) all_dump_files = [(x, DB_DUMP_TYPES[os.path.splitext(x)[1]]) for x in all_dump_files] cls.add_db(name=cls.TEST_DB_NAME, dumps=all_dump_files) super().setUpClass() def setUp(self, *args, **kwargs): super().setUp(*args, **kwargs) db = self.test_db[self.TEST_DB_NAME] self.conn = db.conn self.cursor = db.cursor diff --git a/swh/core/tests/test_statsd.py b/swh/core/tests/test_statsd.py index a8ea5be..7b5dd62 100644 --- a/swh/core/tests/test_statsd.py +++ b/swh/core/tests/test_statsd.py @@ -1,563 +1,563 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Initially imported from https://github.com/DataDog/datadogpy/ # at revision 62b3a3e89988dc18d78c282fe3ff5d1813917436 # # Copyright (c) 2015, Datadog # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of Datadog nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # from collections import deque from contextlib import contextmanager import os import socket import time import unittest import pytest from swh.core.statsd import Statsd, TimedContextManagerDecorator @contextmanager def preserve_envvars(*envvars): """Context manager preserving the value of environment variables""" preserved = {} to_delete = object() for var in envvars: preserved[var] = os.environ.get(var, to_delete) yield for var in envvars: old = preserved[var] if old is not to_delete: os.environ[var] = old else: del os.environ[var] class FakeSocket(object): """ A fake socket for testing. """ def __init__(self): self.payloads = deque() def send(self, payload): assert type(payload) == bytes self.payloads.append(payload) def recv(self): try: return self.payloads.popleft().decode('utf-8') except IndexError: return None def close(self): pass def __repr__(self): return str(self.payloads) class BrokenSocket(FakeSocket): def send(self, payload): raise socket.error("Socket error") class SlowSocket(FakeSocket): def send(self, payload): raise socket.timeout("Socket timeout") class TestStatsd(unittest.TestCase): def setUp(self): """ Set up a default Statsd instance and mock the socket. """ # self.statsd = Statsd() self.statsd.socket = FakeSocket() def recv(self): return self.statsd.socket.recv() def test_set(self): self.statsd.set('set', 123) assert self.recv() == 'set:123|s' def test_gauge(self): self.statsd.gauge('gauge', 123.4) assert self.recv() == 'gauge:123.4|g' def test_counter(self): self.statsd.increment('page.views') self.assertEqual('page.views:1|c', self.recv()) self.statsd.increment('page.views', 11) self.assertEqual('page.views:11|c', self.recv()) self.statsd.decrement('page.views') self.assertEqual('page.views:-1|c', self.recv()) self.statsd.decrement('page.views', 12) self.assertEqual('page.views:-12|c', self.recv()) def test_histogram(self): self.statsd.histogram('histo', 123.4) self.assertEqual('histo:123.4|h', self.recv()) def test_tagged_gauge(self): self.statsd.gauge('gt', 123.4, tags={'country': 'china', 'age': 45}) self.assertEqual('gt:123.4|g|#age:45,country:china', self.recv()) def test_tagged_counter(self): self.statsd.increment('ct', tags={'country': 'españa'}) self.assertEqual('ct:1|c|#country:españa', self.recv()) def test_tagged_histogram(self): self.statsd.histogram('h', 1, tags={'test_tag': 'tag_value'}) self.assertEqual('h:1|h|#test_tag:tag_value', self.recv()) def test_sample_rate(self): self.statsd.increment('c', sample_rate=0) assert not self.recv() for i in range(10000): self.statsd.increment('sampled_counter', sample_rate=0.3) self.assert_almost_equal(3000, len(self.statsd.socket.payloads), 150) self.assertEqual('sampled_counter:1|c|@0.3', self.recv()) def test_tags_and_samples(self): for i in range(100): self.statsd.gauge('gst', 23, tags={"sampled": True}, sample_rate=0.9) self.assert_almost_equal(90, len(self.statsd.socket.payloads), 10) self.assertEqual('gst:23|g|@0.9|#sampled:True', self.recv()) def test_timing(self): self.statsd.timing('t', 123) self.assertEqual('t:123|ms', self.recv()) def test_metric_namespace(self): """ Namespace prefixes all metric names. """ self.statsd.namespace = "foo" self.statsd.gauge('gauge', 123.4) self.assertEqual('foo.gauge:123.4|g', self.recv()) - # Test Client level contant tags + # Test Client level constant tags def test_gauge_constant_tags(self): self.statsd.constant_tags = { 'bar': 'baz', } self.statsd.gauge('gauge', 123.4) assert self.recv() == 'gauge:123.4|g|#bar:baz' def test_counter_constant_tag_with_metric_level_tags(self): self.statsd.constant_tags = { 'bar': 'baz', 'foo': True, } self.statsd.increment('page.views', tags={'extra': 'extra'}) self.assertEqual( 'page.views:1|c|#bar:baz,extra:extra,foo:True', self.recv(), ) def test_gauge_constant_tags_with_metric_level_tags_twice(self): metric_level_tag = {'foo': 'bar'} self.statsd.constant_tags = {'bar': 'baz'} self.statsd.gauge('gauge', 123.4, tags=metric_level_tag) assert self.recv() == 'gauge:123.4|g|#bar:baz,foo:bar' # sending metrics multiple times with same metric-level tags # should not duplicate the tags being sent self.statsd.gauge('gauge', 123.4, tags=metric_level_tag) assert self.recv() == 'gauge:123.4|g|#bar:baz,foo:bar' def assert_almost_equal(self, a, b, delta): self.assertTrue( 0 <= abs(a - b) <= delta, "%s - %s not within %s" % (a, b, delta) ) def test_socket_error(self): self.statsd.socket = BrokenSocket() self.statsd.gauge('no error', 1) assert True, 'success' def test_socket_timeout(self): self.statsd.socket = SlowSocket() self.statsd.gauge('no error', 1) assert True, 'success' def test_timed(self): """ Measure the distribution of a function's run time. """ @self.statsd.timed('timed.test') def func(a, b, c=1, d=1): """docstring""" time.sleep(0.5) return (a, b, c, d) self.assertEqual('func', func.__name__) self.assertEqual('docstring', func.__doc__) result = func(1, 2, d=3) # Assert it handles args and kwargs correctly. self.assertEqual(result, (1, 2, 1, 3)) packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('ms', type_) self.assertEqual('timed.test', name) self.assert_almost_equal(500, float(value), 100) def test_timed_exception(self): """ Exception bubble out of the decorator and is reported to statsd as a dedicated counter. """ @self.statsd.timed('timed.test') def func(a, b, c=1, d=1): """docstring""" time.sleep(0.5) return (a / b, c, d) self.assertEqual('func', func.__name__) self.assertEqual('docstring', func.__doc__) with self.assertRaises(ZeroDivisionError): func(1, 0) packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('c', type_) self.assertEqual('timed.test_error_count', name) self.assertEqual(int(value), 1) def test_timed_no_metric(self, ): """ Test using a decorator without providing a metric. """ @self.statsd.timed() def func(a, b, c=1, d=1): """docstring""" time.sleep(0.5) return (a, b, c, d) self.assertEqual('func', func.__name__) self.assertEqual('docstring', func.__doc__) result = func(1, 2, d=3) # Assert it handles args and kwargs correctly. self.assertEqual(result, (1, 2, 1, 3)) packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('ms', type_) self.assertEqual('swh.core.tests.test_statsd.func', name) self.assert_almost_equal(500, float(value), 100) def test_timed_coroutine(self): """ Measure the distribution of a coroutine function's run time. Warning: Python >= 3.5 only. """ import asyncio @self.statsd.timed('timed.test') @asyncio.coroutine def print_foo(): """docstring""" time.sleep(0.5) print("foo") loop = asyncio.new_event_loop() loop.run_until_complete(print_foo()) loop.close() # Assert packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('ms', type_) self.assertEqual('timed.test', name) self.assert_almost_equal(500, float(value), 100) def test_timed_context(self): """ Measure the distribution of a context's run time. """ # In milliseconds with self.statsd.timed('timed_context.test') as timer: self.assertIsInstance(timer, TimedContextManagerDecorator) time.sleep(0.5) packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('ms', type_) self.assertEqual('timed_context.test', name) self.assert_almost_equal(500, float(value), 100) self.assert_almost_equal(500, timer.elapsed, 100) def test_timed_context_exception(self): """ Exception bubbles out of the `timed` context manager and is reported to statsd as a dedicated counter. """ class ContextException(Exception): pass def func(self): with self.statsd.timed('timed_context.test'): time.sleep(0.5) raise ContextException() # Ensure the exception was raised. self.assertRaises(ContextException, func, self) # Ensure the timing was recorded. packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('c', type_) self.assertEqual('timed_context.test_error_count', name) self.assertEqual(int(value), 1) def test_timed_context_no_metric_name_exception(self): """Test that an exception occurs if using a context manager without a metric name. """ def func(self): with self.statsd.timed(): time.sleep(0.5) # Ensure the exception was raised. self.assertRaises(TypeError, func, self) # Ensure the timing was recorded. packet = self.recv() self.assertEqual(packet, None) def test_timed_start_stop_calls(self): timer = self.statsd.timed('timed_context.test') timer.start() time.sleep(0.5) timer.stop() packet = self.recv() name_value, type_ = packet.split('|') name, value = name_value.split(':') self.assertEqual('ms', type_) self.assertEqual('timed_context.test', name) self.assert_almost_equal(500, float(value), 100) def test_batched(self): self.statsd.open_buffer() self.statsd.gauge('page.views', 123) self.statsd.timing('timer', 123) self.statsd.close_buffer() self.assertEqual('page.views:123|g\ntimer:123|ms', self.recv()) def test_context_manager(self): fake_socket = FakeSocket() with Statsd() as statsd: statsd.socket = fake_socket statsd.gauge('page.views', 123) statsd.timing('timer', 123) self.assertEqual('page.views:123|g\ntimer:123|ms', fake_socket.recv()) def test_batched_buffer_autoflush(self): fake_socket = FakeSocket() with Statsd() as statsd: statsd.socket = fake_socket for i in range(51): statsd.increment('mycounter') self.assertEqual( '\n'.join(['mycounter:1|c' for i in range(50)]), fake_socket.recv(), ) self.assertEqual('mycounter:1|c', fake_socket.recv()) def test_module_level_instance(self): from swh.core.statsd import statsd self.assertTrue(isinstance(statsd, Statsd)) def test_instantiating_does_not_connect(self): local_statsd = Statsd() self.assertEqual(None, local_statsd.socket) def test_accessing_socket_opens_socket(self): local_statsd = Statsd() try: self.assertIsNotNone(local_statsd.get_socket()) finally: local_statsd.socket.close() def test_accessing_socket_multiple_times_returns_same_socket(self): local_statsd = Statsd() fresh_socket = FakeSocket() local_statsd.socket = fresh_socket self.assertEqual(fresh_socket, local_statsd.get_socket()) self.assertNotEqual(FakeSocket(), local_statsd.get_socket()) def test_tags_from_environment(self): with preserve_envvars('STATSD_TAGS'): os.environ['STATSD_TAGS'] = 'country:china,age:45' statsd = Statsd() statsd.socket = FakeSocket() statsd.gauge('gt', 123.4) self.assertEqual('gt:123.4|g|#age:45,country:china', statsd.socket.recv()) def test_tags_from_environment_and_constant(self): with preserve_envvars('STATSD_TAGS'): os.environ['STATSD_TAGS'] = 'country:china,age:45' statsd = Statsd(constant_tags={'country': 'canada'}) statsd.socket = FakeSocket() statsd.gauge('gt', 123.4) self.assertEqual('gt:123.4|g|#age:45,country:canada', statsd.socket.recv()) def test_tags_from_environment_warning(self): with preserve_envvars('STATSD_TAGS'): os.environ['STATSD_TAGS'] = 'valid:tag,invalid_tag' with pytest.warns(UserWarning) as record: statsd = Statsd() assert len(record) == 1 assert 'invalid_tag' in record[0].message.args[0] assert 'valid:tag' not in record[0].message.args[0] assert statsd.constant_tags == {'valid': 'tag'} def test_gauge_doesnt_send_none(self): self.statsd.gauge('metric', None) assert self.recv() is None def test_increment_doesnt_send_none(self): self.statsd.increment('metric', None) assert self.recv() is None def test_decrement_doesnt_send_none(self): self.statsd.decrement('metric', None) assert self.recv() is None def test_timing_doesnt_send_none(self): self.statsd.timing('metric', None) assert self.recv() is None def test_histogram_doesnt_send_none(self): self.statsd.histogram('metric', None) assert self.recv() is None def test_param_host(self): with preserve_envvars('STATSD_HOST', 'STATSD_PORT'): os.environ['STATSD_HOST'] = 'test-value' os.environ['STATSD_PORT'] = '' local_statsd = Statsd(host='actual-test-value') self.assertEqual(local_statsd.host, 'actual-test-value') self.assertEqual(local_statsd.port, 8125) def test_param_port(self): with preserve_envvars('STATSD_HOST', 'STATSD_PORT'): os.environ['STATSD_HOST'] = '' os.environ['STATSD_PORT'] = '12345' local_statsd = Statsd(port=4321) self.assertEqual(local_statsd.host, 'localhost') self.assertEqual(local_statsd.port, 4321) def test_envvar_host(self): with preserve_envvars('STATSD_HOST', 'STATSD_PORT'): os.environ['STATSD_HOST'] = 'test-value' os.environ['STATSD_PORT'] = '' local_statsd = Statsd() self.assertEqual(local_statsd.host, 'test-value') self.assertEqual(local_statsd.port, 8125) def test_envvar_port(self): with preserve_envvars('STATSD_HOST', 'STATSD_PORT'): os.environ['STATSD_HOST'] = '' os.environ['STATSD_PORT'] = '12345' local_statsd = Statsd() self.assertEqual(local_statsd.host, 'localhost') self.assertEqual(local_statsd.port, 12345) def test_namespace_added(self): local_statsd = Statsd(namespace='test-namespace') local_statsd.socket = FakeSocket() local_statsd.gauge('gauge', 123.4) assert local_statsd.socket.recv() == 'test-namespace.gauge:123.4|g' def test_contextmanager_empty(self): with self.statsd: assert True, 'success' def test_contextmanager_buffering(self): with self.statsd as s: s.gauge('gauge', 123.4) s.gauge('gauge_other', 456.78) self.assertIsNone(s.socket.recv()) self.assertEqual(self.recv(), 'gauge:123.4|g\ngauge_other:456.78|g') def test_timed_elapsed(self): with self.statsd.timed('test_timer') as t: pass self.assertGreaterEqual(t.elapsed, 0) self.assertEqual(self.recv(), 'test_timer:%s|ms' % t.elapsed)