diff --git a/pytest.ini b/pytest.ini --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,4 @@ [pytest] norecursedirs = docs .* +filterwarnings = + ignore:.*uses the 'db_with_data' fixture diff --git a/swh/core/db/tests/db_testing.py b/swh/core/db/tests/db_testing.py deleted file mode 100644 --- a/swh/core/db/tests/db_testing.py +++ /dev/null @@ -1,305 +0,0 @@ -# Copyright (C) 2015-2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import glob -import os -import subprocess -from typing import Dict, Iterable, Optional, Tuple, Union - -import psycopg2 - -from swh.core.utils import numfile_sortkey as sortkey - -DB_DUMP_TYPES = {".sql": "psql", ".dump": "pg_dump"} # type: Dict[str, str] - - -def pg_restore(dbname, dumpfile, dumptype="pg_dump"): - """ - Args: - dbname: name of the DB to restore into - dumpfile: path of the dump file - dumptype: one of 'pg_dump' (for binary dumps), 'psql' (for SQL dumps) - """ - assert dumptype in ["pg_dump", "psql"] - if dumptype == "pg_dump": - subprocess.check_call( - [ - "pg_restore", - "--no-owner", - "--no-privileges", - "--dbname", - dbname, - dumpfile, - ] - ) - elif dumptype == "psql": - subprocess.check_call( - [ - "psql", - "--quiet", - "--no-psqlrc", - "-v", - "ON_ERROR_STOP=1", - "-f", - dumpfile, - dbname, - ] - ) - - -def pg_dump(dbname, dumpfile): - subprocess.check_call( - ["pg_dump", "--no-owner", "--no-privileges", "-Fc", "-f", dumpfile, dbname] - ) - - -def pg_dropdb(dbname): - subprocess.check_call(["dropdb", dbname]) - - -def pg_createdb(dbname, check=True): - """Create a db. If check is True and the db already exists, this will - raise an exception (original behavior). If check is False and - the db already exists, this will fail silently. If the db does - not exist, the db will be created. - - """ - subprocess.run(["createdb", dbname], check=check) - - -def db_create(dbname, dumps=None): - """create the test DB and load the test data dumps into it - - dumps is an iterable of couples (dump_file, dump_type). - - context: setUpClass - - """ - try: - pg_createdb(dbname) - except subprocess.CalledProcessError: # try recovering once, in case - pg_dropdb(dbname) # the db already existed - pg_createdb(dbname) - for dump, dtype in dumps: - pg_restore(dbname, dump, dtype) - return dbname - - -def db_destroy(dbname): - """destroy the test DB - - context: tearDownClass - - """ - pg_dropdb(dbname) - - -def db_connect(dbname): - """connect to the test DB and open a cursor - - context: setUp - - """ - conn = psycopg2.connect("dbname=" + dbname) - return {"conn": conn, "cursor": conn.cursor()} - - -def db_close(conn): - """rollback current transaction and disconnect from the test DB - - context: tearDown - - """ - if not conn.closed: - conn.rollback() - conn.close() - - -class DbTestConn: - def __init__(self, dbname): - self.dbname = dbname - - def __enter__(self): - self.db_setup = db_connect(self.dbname) - self.conn = self.db_setup["conn"] - self.cursor = self.db_setup["cursor"] - return self - - def __exit__(self, *_): - db_close(self.conn) - - -class DbTestContext: - def __init__(self, name="softwareheritage-test", dumps=None): - self.dbname = name - self.dumps = dumps - - def __enter__(self): - db_create(dbname=self.dbname, dumps=self.dumps) - return self - - def __exit__(self, *_): - db_destroy(self.dbname) - - -class DbTestFixture: - """Mix this in a test subject class to get DB testing support. - - Use the class method add_db() to add a new database to be tested. - Using this will create a DbTestConn entry in the `test_db` dictionary for - all the tests, indexed by the name of the database. - - Example: - - class TestDb(DbTestFixture, unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.add_db('db_name', DUMP) - super().setUpClass() - - def setUp(self): - db = self.test_db['db_name'] - print('conn: {}, cursor: {}'.format(db.conn, db.cursor)) - - To ensure test isolation, each test method of the test case class will - execute in its own connection, cursor, and transaction. - - Note that if you want to define setup/teardown methods, you need to - explicitly call super() to ensure that the fixture setup/teardown methods - are invoked. Here is an example where all setup/teardown methods are - defined in a test case: - - class TestDb(DbTestFixture, unittest.TestCase): - @classmethod - def setUpClass(cls): - # your add_db() calls here - super().setUpClass() - # your class setup code here - - def setUp(self): - super().setUp() - # your instance setup code here - - def tearDown(self): - # your instance teardown code here - super().tearDown() - - @classmethod - def tearDownClass(cls): - # your class teardown code here - super().tearDownClass() - - """ - - _DB_DUMP_LIST = {} # type: Dict[str, Iterable[Tuple[str, str]]] - _DB_LIST = {} # type: Dict[str, DbTestContext] - DB_TEST_FIXTURE_IMPORTED = True - - @classmethod - def add_db(cls, name="softwareheritage-test", dumps=None): - cls._DB_DUMP_LIST[name] = dumps - - @classmethod - def setUpClass(cls): - for name, dumps in cls._DB_DUMP_LIST.items(): - cls._DB_LIST[name] = DbTestContext(name, dumps) - cls._DB_LIST[name].__enter__() - super().setUpClass() - - @classmethod - def tearDownClass(cls): - super().tearDownClass() - for name, context in cls._DB_LIST.items(): - context.__exit__() - - def setUp(self, *args, **kwargs): - self.test_db = {} - for name in self._DB_LIST.keys(): - self.test_db[name] = DbTestConn(name) - self.test_db[name].__enter__() - super().setUp(*args, **kwargs) - - def tearDown(self): - super().tearDown() - for name in self._DB_LIST.keys(): - self.test_db[name].__exit__() - - def reset_db_tables(self, name, excluded=None): - db = self.test_db[name] - conn = db.conn - cursor = db.cursor - - cursor.execute( - """SELECT table_name FROM information_schema.tables - WHERE table_schema = %s""", - ("public",), - ) - - tables = set(table for (table,) in cursor.fetchall()) - if excluded is not None: - tables -= set(excluded) - - for table in tables: - cursor.execute("truncate table %s cascade" % table) - - conn.commit() - - -class SingleDbTestFixture(DbTestFixture): - """Simplified fixture like DbTest but that can only handle a single DB. - - Gives access to shortcuts like self.cursor and self.conn. - - DO NOT use this with other fixtures that need to access databases, like - StorageTestFixture. - - The class can override the following class attributes: - TEST_DB_NAME: name of the DB used for testing - TEST_DB_DUMP: DB dump to be restored before running test methods; can - be set to None if no restore from dump is required. - If the dump file name endswith" - - '.sql' it will be loaded via psql, - - '.dump' it will be loaded via pg_restore. - Other file extensions will be ignored. - Can be a string or a list of strings; each path will be expanded - using glob pattern matching. - - The test case class will then have the following attributes, accessible via - self: - - dbname: name of the test database - conn: psycopg2 connection object - cursor: open psycopg2 cursor to the DB - """ - - TEST_DB_NAME = "softwareheritage-test" - TEST_DB_DUMP = None # type: Optional[Union[str, Iterable[str]]] - - @classmethod - def setUpClass(cls): - cls.dbname = cls.TEST_DB_NAME # XXX to kill? - - dump_files = cls.TEST_DB_DUMP - if dump_files is None: - dump_files = [] - elif isinstance(dump_files, str): - dump_files = [dump_files] - all_dump_files = [] - for files in dump_files: - all_dump_files.extend(sorted(glob.glob(files), key=sortkey)) - - all_dump_files = [ - (x, DB_DUMP_TYPES[os.path.splitext(x)[1]]) for x in all_dump_files - ] - - cls.add_db(name=cls.TEST_DB_NAME, dumps=all_dump_files) - super().setUpClass() - - def setUp(self, *args, **kwargs): - super().setUp(*args, **kwargs) - - db = self.test_db[self.TEST_DB_NAME] - self.conn = db.conn - self.cursor = db.cursor diff --git a/swh/core/db/tests/test_db.py b/swh/core/db/tests/test_db.py --- a/swh/core/db/tests/test_db.py +++ b/swh/core/db/tests/test_db.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,11 +7,8 @@ import datetime from enum import IntEnum import inspect -import os.path from string import printable -import tempfile from typing import Any -import unittest from unittest.mock import MagicMock, Mock import uuid @@ -23,8 +20,7 @@ from swh.core.db import BaseDb from swh.core.db.common import db_transaction, db_transaction_generator - -from .db_testing import SingleDbTestFixture, db_close, db_create, db_destroy +from swh.core.db.pytest_plugin import postgresql_fact # workaround mypy bug https://github.com/python/mypy/issues/5485 @@ -238,44 +234,26 @@ ] -@pytest.mark.db -def test_connect(): - db_name = db_create("test-db2", dumps=[]) - try: - db = BaseDb.connect("dbname=%s" % db_name) - with db.cursor() as cur: - psycopg2.extras.register_default_jsonb(cur) - cur.execute(INIT_SQL) - cur.execute(INSERT_SQL, STATIC_ROW_IN) - cur.execute("select * from test_table;") - output = convert_lines(cur) - assert len(output) == 1 - assert EXPECTED_ROW_OUT == output[0] - finally: - db_close(db.conn) - db_destroy(db_name) - - -@pytest.mark.db -class TestDb(SingleDbTestFixture, unittest.TestCase): - TEST_DB_NAME = "test-db" +test_db = postgresql_fact("postgresql_proc", db_name="test-db2") - @classmethod - def setUpClass(cls): - with tempfile.TemporaryDirectory() as td: - with open(os.path.join(td, "init.sql"), "a") as fd: - fd.write(INIT_SQL) - cls.TEST_DB_DUMP = os.path.join(td, "*.sql") +@pytest.fixture +def db_with_data(test_db, request): + """Fixture to initialize a db with some data out of the "INIT_SQL above - super().setUpClass() + """ + db = BaseDb.connect(test_db.dsn) + with db.cursor() as cur: + psycopg2.extras.register_default_jsonb(cur) + cur.execute(INIT_SQL) + yield db + db.conn.rollback() + db.conn.close() - def setUp(self): - super().setUp() - self.db = BaseDb(self.conn) - def test_initialized(self): - cur = self.db.cursor() +@pytest.mark.db +def test_db_connect(db_with_data): + with db_with_data.cursor() as cur: psycopg2.extras.register_default_jsonb(cur) cur.execute(INSERT_SQL, STATIC_ROW_IN) cur.execute("select * from test_table;") @@ -283,44 +261,47 @@ assert len(output) == 1 assert EXPECTED_ROW_OUT == output[0] - def test_reset_tables(self): - cur = self.db.cursor() + +def test_db_initialized(db_with_data): + with db_with_data.cursor() as cur: + psycopg2.extras.register_default_jsonb(cur) cur.execute(INSERT_SQL, STATIC_ROW_IN) - self.reset_db_tables("test-db") cur.execute("select * from test_table;") - assert convert_lines(cur) == [] + output = convert_lines(cur) + assert len(output) == 1 + assert EXPECTED_ROW_OUT == output[0] - def test_copy_to_static(self): - items = [{field.name: field.example for field in FIELDS}] - self.db.copy_to(items, "test_table", COLUMNS) - cur = self.db.cursor() +def test_db_copy_to_static(db_with_data): + items = [{field.name: field.example for field in FIELDS}] + db_with_data.copy_to(items, "test_table", COLUMNS) + with db_with_data.cursor() as cur: cur.execute("select * from test_table;") output = convert_lines(cur) assert len(output) == 1 assert EXPECTED_ROW_OUT == output[0] - @given(db_rows) - def test_copy_to(self, data): - try: - # the table is not reset between runs by hypothesis - self.reset_db_tables("test-db") - items = [dict(zip(COLUMNS, item)) for item in data] - self.db.copy_to(items, "test_table", COLUMNS) +@given(db_rows) +def test_db_copy_to(db_with_data, data): + items = [dict(zip(COLUMNS, item)) for item in data] + with db_with_data.cursor() as cur: + cur.execute("TRUNCATE TABLE test_table CASCADE") + + db_with_data.copy_to(items, "test_table", COLUMNS) + + with db_with_data.cursor() as cur: + cur.execute("select * from test_table;") + converted_lines = convert_lines(cur) + assert converted_lines == data - cur = self.db.cursor() - cur.execute("select * from test_table;") - assert convert_lines(cur) == data - finally: - self.db.conn.rollback() - def test_copy_to_thread_exception(self): - data = [(2 ** 65, "foo", b"bar")] +def test_db_copy_to_thread_exception(db_with_data): + data = [(2 ** 65, "foo", b"bar")] - items = [dict(zip(COLUMNS, item)) for item in data] - with self.assertRaises(psycopg2.errors.NumericValueOutOfRange): - self.db.copy_to(items, "test_table", COLUMNS) + items = [dict(zip(COLUMNS, item)) for item in data] + with pytest.raises(psycopg2.errors.NumericValueOutOfRange): + db_with_data.copy_to(items, "test_table", COLUMNS) def test_db_transaction(mocker): diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -9,11 +9,9 @@ db: db, testing-db server: http deps = - db: pifpaf cover: pytest-cov commands = - db: pifpaf run postgresql -- \ - pytest --doctest-modules \ + pytest --doctest-modules \ slow: --hypothesis-profile=slow \ cover: --cov={envsitepackagesdir}/swh/core --cov-branch \ core: {envsitepackagesdir}/swh/core/tests \