diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..eb6de3d --- /dev/null +++ b/conftest.py @@ -0,0 +1,6 @@ +from hypothesis import settings + +# define tests profile. Full documentation is at: +# https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles +settings.register_profile("fast", max_examples=5, deadline=5000) +settings.register_profile("slow", max_examples=20, deadline=5000) diff --git a/requirements-test.txt b/requirements-test.txt index 641a731..1f29afe 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,4 @@ pytest < 4 pytest-postgresql requests-mock +hypothesis >= 3.11.0 diff --git a/swh/core/db/__init__.py b/swh/core/db/__init__.py index cab7ddb..57506e7 100644 --- a/swh/core/db/__init__.py +++ b/swh/core/db/__init__.py @@ -1,193 +1,192 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime import enum import json import os import threading from contextlib import contextmanager import psycopg2 import psycopg2.extras psycopg2.extras.register_uuid() def escape(data): if data is None: return '' if isinstance(data, bytes): return '\\x%s' % binascii.hexlify(data).decode('ascii') elif isinstance(data, str): return '"%s"' % data.replace('"', '""') elif isinstance(data, datetime.datetime): # We escape twice to make sure the string generated by # isoformat gets escaped return escape(data.isoformat()) elif isinstance(data, dict): return escape(json.dumps(data)) elif isinstance(data, list): return escape("{%s}" % ','.join(escape(d) for d in data)) elif isinstance(data, psycopg2.extras.Range): # We escape twice here too, so that we make sure # everything gets passed to copy properly return escape( '%s%s,%s%s' % ( '[' if data.lower_inc else '(', '-infinity' if data.lower_inf else escape(data.lower), 'infinity' if data.upper_inf else escape(data.upper), ']' if data.upper_inc else ')', ) ) elif isinstance(data, enum.IntEnum): return escape(int(data)) else: # We don't escape here to make sure we pass literals properly return str(data) def typecast_bytea(value, cur): if value is not None: data = psycopg2.BINARY(value, cur) return data.tobytes() class BaseDb: """Base class for swh.*.*Db. cf. swh.storage.db.Db, swh.archiver.db.ArchiverDb """ @classmethod def adapt_conn(cls, conn): """Makes psycopg2 use 'bytes' to decode bytea instead of 'memoryview', for this connection.""" cur = conn.cursor() cur.execute("SELECT null::bytea, null::bytea[]") bytea_oid = cur.description[0][1] bytea_array_oid = cur.description[1][1] t_bytes = psycopg2.extensions.new_type( (bytea_oid,), "bytea", typecast_bytea) psycopg2.extensions.register_type(t_bytes, conn) t_bytes_array = psycopg2.extensions.new_array_type( (bytea_array_oid,), "bytea[]", t_bytes) psycopg2.extensions.register_type(t_bytes_array, conn) @classmethod def connect(cls, *args, **kwargs): """factory method to create a DB proxy Accepts all arguments of psycopg2.connect; only some specific possibilities are reported below. Args: connstring: libpq2 connection string """ conn = psycopg2.connect(*args, **kwargs) - cls.adapt_conn(conn) return cls(conn) @classmethod def from_pool(cls, pool): conn = pool.getconn() - cls.adapt_conn(conn) return cls(conn, pool=pool) def __init__(self, conn, pool=None): """create a DB proxy Args: conn: psycopg2 connection to the SWH DB pool: psycopg2 pool of connections """ + self.adapt_conn(conn) self.conn = conn self.pool = pool def __del__(self): if self.pool: self.pool.putconn(self.conn) def cursor(self, cur_arg=None): """get a cursor: from cur_arg if given, or a fresh one otherwise meant to avoid boilerplate if/then/else in methods that proxy stored procedures """ if cur_arg is not None: return cur_arg else: return self.conn.cursor() _cursor = cursor # for bw compat @contextmanager def transaction(self): """context manager to execute within a DB transaction Yields: a psycopg2 cursor """ with self.conn.cursor() as cur: try: yield cur self.conn.commit() except Exception: if not self.conn.closed: self.conn.rollback() raise def copy_to(self, items, tblname, columns, cur=None, item_cb=None, default_values={}): """Copy items' entries to table tblname with columns information. Args: - items (dict): dictionary of data to copy over tblname. + items (List[dict]): dictionaries of data to copy over tblname. tblname (str): destination table's name. columns ([str]): keys to access data in items and also the column names in the destination table. default_values (dict): dictionnary of default values to use when inserting entried int the tblname table. cur: a db cursor; if not given, a new cursor will be created. item_cb (fn): optional function to apply to items's entry. """ read_file, write_file = os.pipe() def writer(): cursor = self.cursor(cur) with open(read_file, 'r') as f: cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) write_thread = threading.Thread(target=writer) write_thread.start() try: with open(write_file, 'w') as f: for d in items: if item_cb is not None: item_cb(d) line = [escape(d.get(k, default_values.get(k))) for k in columns] f.write(','.join(line)) f.write('\n') finally: # No problem bubbling up exceptions, but we still need to make sure # we finish copying, even though we're probably going to cancel the # transaction. write_thread.join() def mktemp(self, tblname, cur=None): self.cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) diff --git a/swh/core/tests/test_db.py b/swh/core/tests/test_db.py new file mode 100644 index 0000000..939ae21 --- /dev/null +++ b/swh/core/tests/test_db.py @@ -0,0 +1,84 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os.path +import tempfile +import unittest + +from hypothesis import strategies, given +import pytest + +from swh.core.db import BaseDb +from swh.core.tests.db_testing import SingleDbTestFixture + + +INIT_SQL = ''' +create table test_table +( + i int, + txt text, + bytes bytea +); +''' + +db_rows = strategies.lists(strategies.tuples( + strategies.integers(-2147483648, +2147483647), + strategies.text( + alphabet=strategies.characters( + blacklist_categories=['Cs'], # surrogates + blacklist_characters=[ + '\x00', # pgsql does not support the null codepoint + '\r', # pgsql normalizes those + ] + ), + ), + strategies.binary(), +)) + + +@pytest.mark.db +class TestDb(SingleDbTestFixture, unittest.TestCase): + TEST_DB_NAME = 'test-db' + + @classmethod + def setUpClass(cls): + with tempfile.TemporaryDirectory() as td: + with open(os.path.join(td, 'init.sql'), 'a') as fd: + fd.write(INIT_SQL) + + cls.TEST_DB_DUMP = os.path.join(td, '*.sql') + + super().setUpClass() + + def setUp(self): + super().setUp() + self.db = BaseDb(self.conn) + + def test_initialized(self): + cur = self.db.cursor() + cur.execute("insert into test_table values (1, %s, %s);", + ('foo', b'bar')) + cur.execute("select * from test_table;") + self.assertEqual(list(cur), [(1, 'foo', b'bar')]) + + def test_reset_tables(self): + cur = self.db.cursor() + cur.execute("insert into test_table values (1, %s, %s);", + ('foo', b'bar')) + self.reset_db_tables('test-db') + cur.execute("select * from test_table;") + self.assertEqual(list(cur), []) + + @given(db_rows) + def test_copy_to(self, data): + # the table is not reset between runs by hypothesis + self.reset_db_tables('test-db') + + items = [dict(zip(['i', 'txt', 'bytes'], item)) for item in data] + self.db.copy_to(items, 'test_table', ['i', 'txt', 'bytes']) + + cur = self.db.cursor() + cur.execute('select * from test_table;') + self.assertCountEqual(list(cur), data) diff --git a/tox.ini b/tox.ini index 0fb07c6..2a3f1fa 100644 --- a/tox.ini +++ b/tox.ini @@ -1,16 +1,26 @@ [tox] envlist=flake8,py3 [testenv:py3] deps = .[testing] pytest-cov + pifpaf commands = - pytest --cov=swh --cov-branch {posargs} + pifpaf run postgresql -- pytest --hypothesis-profile=fast --cov=swh --cov-branch {posargs} + +[testenv:py3-slow] +deps = + .[testing] + pytest-cov + pifpaf +commands = + pifpaf run postgresql -- pytest --hypothesis-profile=slow --cov=swh --cov-branch {posargs} + [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8