diff --git a/swh/storage/filter.py b/swh/storage/filter.py index 813c8cf8..9531233d 100644 --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -1,116 +1,115 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, Generator, Sequence, Set from swh.storage import get_storage class FilteringProxyStorage: """Filtering Storage implementation. This is in charge of transparently filtering out known objects prior to adding them to storage. Sample configuration use case for filtering storage: .. code-block: yaml storage: cls: filter - args: - storage: - cls: remote - args: http://storage.internal.staging.swh.network:5002/ + storage: + cls: remote + url: http://storage.internal.staging.swh.network:5002/ """ def __init__(self, storage): self.storage = get_storage(**storage) self.objects_seen = { 'content': set(), # set of content hashes (sha256) seen 'directory': set(), 'revision': set(), } def __getattr__(self, key): return getattr(self.storage, key) def content_add(self, content: Sequence[Dict]) -> Dict: contents = list(content) contents_to_add = self._filter_missing_contents(contents) return self.storage.content_add( x for x in contents if x['sha256'] in contents_to_add ) def directory_add(self, directories: Sequence[Dict]) -> Dict: directories = list(directories) missing_ids = self._filter_missing_ids( 'directory', (d['id'] for d in directories) ) return self.storage.directory_add( d for d in directories if d['id'] in missing_ids ) def revision_add(self, revisions): revisions = list(revisions) missing_ids = self._filter_missing_ids( 'revision', (d['id'] for d in revisions) ) return self.storage.revision_add( r for r in revisions if r['id'] in missing_ids ) def _filter_missing_contents( self, content_hashes: Sequence[Dict]) -> Set[bytes]: """Return only the content keys missing from swh Args: content_hashes: List of sha256 to check for existence in swh storage """ objects_seen = self.objects_seen['content'] missing_hashes = [] for hashes in content_hashes: if hashes['sha256'] in objects_seen: continue objects_seen.add(hashes['sha256']) missing_hashes.append(hashes) return set(self.storage.content_missing( missing_hashes, key_hash='sha256', )) def _filter_missing_ids( self, object_type: str, ids: Generator[bytes, None, None]) -> Set[bytes]: """Filter missing ids from the storage for a given object type. Args: object_type: object type to use {revision, directory} ids: Sequence of object_type ids Returns: Missing ids from the storage for object_type """ objects_seen = self.objects_seen[object_type] missing_ids = [] for id in ids: if id in objects_seen: continue objects_seen.add(id) missing_ids.append(id) fn_by_object_type = { 'revision': self.storage.revision_missing, 'directory': self.storage.directory_missing, } fn = fn_by_object_type[object_type] return set(fn(missing_ids)) diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py index 49b979e2..0777f170 100644 --- a/swh/storage/tests/conftest.py +++ b/swh/storage/tests/conftest.py @@ -1,267 +1,265 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob import pytest from typing import Union from pytest_postgresql import factories from pytest_postgresql.janitor import DatabaseJanitor, psycopg2, Version from os import path, environ from hypothesis import settings from typing import Dict import swh.storage from swh.core.utils import numfile_sortkey as sortkey from swh.model.tests.generate_testdata import gen_contents, gen_origins SQL_DIR = path.join(path.dirname(swh.storage.__file__), 'sql') environ['LC_ALL'] = 'C.UTF-8' DUMP_FILES = path.join(SQL_DIR, '*.sql') # define tests profile. Full documentation is at: # https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles settings.register_profile("fast", max_examples=5, deadline=5000) settings.register_profile("slow", max_examples=20, deadline=5000) @pytest.fixture def swh_storage(postgresql_proc, swh_storage_postgresql): storage_config = { 'cls': 'local', - 'args': { - 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( - host=postgresql_proc.host, - port=postgresql_proc.port, - user='postgres', - dbname='tests'), - 'objstorage': { - 'cls': 'memory', - 'args': {} - }, - 'journal_writer': { - 'cls': 'memory', - }, + 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( + host=postgresql_proc.host, + port=postgresql_proc.port, + user='postgres', + dbname='tests'), + 'objstorage': { + 'cls': 'memory', + 'args': {} + }, + 'journal_writer': { + 'cls': 'memory', }, } storage = swh.storage.get_storage(**storage_config) return storage @pytest.fixture def swh_contents(swh_storage): contents = gen_contents(n=20) swh_storage.content_add(contents) return contents @pytest.fixture def swh_origins(swh_storage): origins = gen_origins(n=100) swh_storage.origin_add(origins) return origins # the postgres_fact factory fixture below is mostly a copy of the code # from pytest-postgresql. We need a custom version here to be able to # specify our version of the DBJanitor we use. def postgresql_fact(process_fixture_name, db_name=None, dump_files=DUMP_FILES): @pytest.fixture def postgresql_factory(request): """ Fixture factory for PostgreSQL. :param FixtureRequest request: fixture request object :rtype: psycopg2.connection :returns: postgresql client """ config = factories.get_config(request) if not psycopg2: raise ImportError( 'No module named psycopg2. Please install it.' ) proc_fixture = request.getfixturevalue(process_fixture_name) # _, config = try_import('psycopg2', request) pg_host = proc_fixture.host pg_port = proc_fixture.port pg_user = proc_fixture.user pg_options = proc_fixture.options pg_db = db_name or config['dbname'] with SwhDatabaseJanitor( pg_user, pg_host, pg_port, pg_db, proc_fixture.version, dump_files=dump_files ): connection = psycopg2.connect( dbname=pg_db, user=pg_user, host=pg_host, port=pg_port, options=pg_options ) yield connection connection.close() return postgresql_factory swh_storage_postgresql = postgresql_fact('postgresql_proc') # This version of the DatabaseJanitor implement a different setup/teardown # behavior than than the stock one: instead of dropping, creating and # initializing the database for each test, it create and initialize the db only # once, then it truncate the tables. This is needed to have acceptable test # performances. class SwhDatabaseJanitor(DatabaseJanitor): def __init__( self, user: str, host: str, port: str, db_name: str, version: Union[str, float, Version], dump_files: str = DUMP_FILES ) -> None: super().__init__(user, host, port, db_name, version) self.dump_files = sorted( glob.glob(dump_files), key=sortkey) def db_setup(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: for fname in self.dump_files: with open(fname) as fobj: sql = fobj.read().replace('concurrently', '').strip() if sql: cur.execute(sql) cnx.commit() def db_reset(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: cur.execute( "SELECT table_name FROM information_schema.tables " "WHERE table_schema = %s", ('public',)) tables = set(table for (table,) in cur.fetchall()) for table in tables: cur.execute('truncate table %s cascade' % table) cur.execute( "SELECT sequence_name FROM information_schema.sequences " "WHERE sequence_schema = %s", ('public',)) seqs = set(seq for (seq,) in cur.fetchall()) for seq in seqs: cur.execute('ALTER SEQUENCE %s RESTART;' % seq) cnx.commit() def init(self): with self.cursor() as cur: cur.execute( "SELECT COUNT(1) FROM pg_database WHERE datname=%s;", (self.db_name,)) db_exists = cur.fetchone()[0] == 1 if db_exists: cur.execute( 'UPDATE pg_database SET datallowconn=true ' 'WHERE datname = %s;', (self.db_name,)) if db_exists: self.db_reset() else: with self.cursor() as cur: cur.execute('CREATE DATABASE "{}";'.format(self.db_name)) self.db_setup() def drop(self): pid_column = 'pid' with self.cursor() as cur: cur.execute( 'UPDATE pg_database SET datallowconn=false ' 'WHERE datname = %s;', (self.db_name,)) cur.execute( 'SELECT pg_terminate_backend(pg_stat_activity.{})' 'FROM pg_stat_activity ' 'WHERE pg_stat_activity.datname = %s;'.format(pid_column), (self.db_name,)) @pytest.fixture def sample_data() -> Dict: """Pre-defined sample storage object data to manipulate Returns: Dict of data (keys: content, directory, revision, person) """ sample_content = { 'blake2s256': b'\xbf?\x05\xed\xc1U\xd2\xc5\x168Xm\x93\xde}f(HO@\xd0\xacn\x04\x1e\x9a\xb9\xfa\xbf\xcc\x08\xc7', # noqa 'sha1': b'g\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', 'sha1_git': b'\xf2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa 'sha256': b"\x87\x022\xedZN\x84\xe8za\xf8'(oA\xc9k\xb1\x80c\x80\xe7J\x06\xea\xd2\xd5\xbeB\x19\xb8\xce", # noqa 'length': 48, 'data': b'temp file for testing content storage conversion', 'status': 'visible', } sample_content2 = { 'blake2s256': b'\xbf?\x05\xed\xc1U\xd2\xc5\x168Xm\x93\xde}f(HO@\xd0\xacn\x04\x1e\x9a\xb9\xfa\xbf\xcc\x08\xc7', # noqa 'sha1': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', 'sha1_git': b'\xc2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa 'sha256': b"\x77\x022\xedZN\x84\xe8za\xf8'(oA\xc9k\xb1\x80c\x80\xe7J\x06\xea\xd2\xd5\xbeB\x19\xb8\xce", # noqa 'length': 50, 'data': b'temp file for testing content storage conversion 2', 'status': 'visible', } sample_directory = { 'id': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', 'entries': [] } sample_person = { 'name': b'John Doe', 'email': b'john.doe@institute.org', 'fullname': b'John Doe ' } sample_revision = { 'id': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', 'message': b'something', 'author': sample_person, 'committer': sample_person, 'date': 1567591673, 'committer_date': 1567591673, 'type': 'tar', 'directory': b'\xc2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa 'synthetic': False, 'metadata': {}, 'parents': [], } return { 'content': [sample_content, sample_content2], 'person': [sample_person], 'directory': [sample_directory], 'revision': [sample_revision], } diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/test_api_client.py index 5b2d6648..1ec8b463 100644 --- a/swh/storage/tests/test_api_client.py +++ b/swh/storage/tests/test_api_client.py @@ -1,58 +1,56 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.storage.api.client import RemoteStorage import swh.storage.api.server as server import swh.storage.storage from swh.storage.tests.test_storage import ( # noqa TestStorage, TestStorageGeneratedData) # tests are executed using imported classes (TestStorage and # TestStorageGeneratedData) using overloaded swh_storage fixture # below @pytest.fixture def app(): storage_config = { 'cls': 'memory', - 'args': { - 'journal_writer': { - 'cls': 'memory', - }, + 'journal_writer': { + 'cls': 'memory', }, } server.storage = swh.storage.get_storage(**storage_config) # hack hack hack! # We attach the journal storage to the app here to make it accessible to # the test (as swh_storage.journal_writer); see swh_storage below. server.app.journal_writer = server.storage.journal_writer yield server.app del server.app.journal_writer @pytest.fixture def swh_rpc_client_class(): return RemoteStorage @pytest.fixture def swh_storage(swh_rpc_client, app): # This version of the swh_storage fixture uses the swh_rpc_client fixture # to instantiate a RemoteStorage (see swh_rpc_client_class above) that # proxies, via the swh.core RPC mechanism, the local (in memory) storage # configured in the app fixture above. # # Also note that, for the sake of # making it easier to write tests, the in-memory journal writer of the # in-memory backend storage is attached to the RemoteStorage as its # journal_writer attribute. storage = swh_rpc_client journal_writer = getattr(storage, 'journal_writer', None) storage.journal_writer = app.journal_writer yield storage storage.journal_writer = journal_writer diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py index d9a3fa06..1f35f999 100644 --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -1,179 +1,179 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.storage.buffer import BufferingProxyStorage def test_buffering_proxy_storage_content_threshold_not_hit(sample_data): contents = sample_data['content'] storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, + storage={'cls': 'memory'}, min_batch_size={ 'content': 10, } ) s = storage.content_add([contents[0], contents[1]]) assert s == {} # contents have not been written to storage missing_contents = storage.content_missing( [contents[0], contents[1]]) assert set(missing_contents) == set( [contents[0]['sha1'], contents[1]['sha1']]) s = storage.flush() assert s == { 'content:add': 1 + 1, 'content:add:bytes': contents[0]['length'] + contents[1]['length'], 'skipped_content:add': 0 } missing_contents = storage.content_missing( [contents[0], contents[1]]) assert list(missing_contents) == [] def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data): contents = sample_data['content'] storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, + storage={'cls': 'memory'}, min_batch_size={ 'content': 1, } ) s = storage.content_add([contents[0]]) assert s == { 'content:add': 1, 'content:add:bytes': contents[0]['length'], 'skipped_content:add': 0 } missing_contents = storage.content_missing([contents[0]]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data): contents = sample_data['content'] content_bytes_min_batch_size = 20 storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, + storage={'cls': 'memory'}, min_batch_size={ 'content': 10, 'content_bytes': content_bytes_min_batch_size, } ) assert contents[0]['length'] > content_bytes_min_batch_size s = storage.content_add([contents[0]]) assert s == { 'content:add': 1, 'content:add:bytes': contents[0]['length'], 'skipped_content:add': 0 } missing_contents = storage.content_missing([contents[0]]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data): directories = sample_data['directory'] storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, + storage={'cls': 'memory'}, min_batch_size={ 'directory': 10, } ) s = storage.directory_add([directories[0]]) assert s == {} directory_id = directories[0]['id'] missing_directories = storage.directory_missing( [directory_id]) assert list(missing_directories) == [directory_id] s = storage.flush() assert s == { 'directory:add': 1, } missing_directories = storage.directory_missing( [directory_id]) assert list(missing_directories) == [] def test_buffering_proxy_storage_directory_threshold_hit(sample_data): directories = sample_data['directory'] storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, + storage={'cls': 'memory'}, min_batch_size={ 'directory': 1, } ) s = storage.directory_add([directories[0]]) assert s == { 'directory:add': 1, } missing_directories = storage.directory_missing( [directories[0]['id']]) assert list(missing_directories) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data): revisions = sample_data['revision'] storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, + storage={'cls': 'memory'}, min_batch_size={ 'revision': 10, } ) s = storage.revision_add([revisions[0]]) assert s == {} revision_id = revisions[0]['id'] missing_revisions = storage.revision_missing( [revision_id]) assert list(missing_revisions) == [revision_id] s = storage.flush() assert s == { 'revision:add': 1, } missing_revisions = storage.revision_missing( [revision_id]) assert list(missing_revisions) == [] def test_buffering_proxy_storage_revision_threshold_hit(sample_data): revisions = sample_data['revision'] storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, + storage={'cls': 'memory'}, min_batch_size={ 'revision': 1, } ) s = storage.revision_add([revisions[0]]) assert s == { 'revision:add': 1, } missing_revisions = storage.revision_missing( [revisions[0]['id']]) assert list(missing_revisions) == [] s = storage.flush() assert s == {} diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py index 80f3a3e9..83fc5862 100644 --- a/swh/storage/tests/test_filter.py +++ b/swh/storage/tests/test_filter.py @@ -1,74 +1,74 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.storage.filter import FilteringProxyStorage def test_filtering_proxy_storage_content(sample_data): sample_content = sample_data['content'][0] - storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}}) + storage = FilteringProxyStorage(storage={'cls': 'memory'}) content = next(storage.content_get([sample_content['sha1']])) assert not content s = storage.content_add([sample_content]) assert s == { 'content:add': 1, 'content:add:bytes': 48, 'skipped_content:add': 0 } content = next(storage.content_get([sample_content['sha1']])) assert content is not None s = storage.content_add([sample_content]) assert s == { 'content:add': 0, 'content:add:bytes': 0, 'skipped_content:add': 0 } def test_filtering_proxy_storage_revision(sample_data): sample_revision = sample_data['revision'][0] - storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}}) + storage = FilteringProxyStorage(storage={'cls': 'memory'}) revision = next(storage.revision_get([sample_revision['id']])) assert not revision s = storage.revision_add([sample_revision]) assert s == { 'revision:add': 1, } revision = next(storage.revision_get([sample_revision['id']])) assert revision is not None s = storage.revision_add([sample_revision]) assert s == { 'revision:add': 0, } def test_filtering_proxy_storage_directory(sample_data): sample_directory = sample_data['directory'][0] - storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}}) + storage = FilteringProxyStorage(storage={'cls': 'memory'}) directory = next(storage.directory_missing([sample_directory['id']])) assert directory s = storage.directory_add([sample_directory]) assert s == { 'directory:add': 1, } directory = list(storage.directory_missing([sample_directory['id']])) assert not directory s = storage.directory_add([sample_directory]) assert s == { 'directory:add': 0, } diff --git a/swh/storage/tests/test_in_memory.py b/swh/storage/tests/test_in_memory.py index 0fd7c947..ce6ecad2 100644 --- a/swh/storage/tests/test_in_memory.py +++ b/swh/storage/tests/test_in_memory.py @@ -1,28 +1,26 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.storage import get_storage from swh.storage.tests.test_storage import ( # noqa TestStorage, TestStorageGeneratedData) # tests are executed using imported classes (TestStorage and # TestStorageGeneratedData) using overloaded swh_storage fixture # below @pytest.fixture def swh_storage(): storage_config = { 'cls': 'memory', - 'args': { - 'journal_writer': { - 'cls': 'memory', - }, + 'journal_writer': { + 'cls': 'memory', }, } storage = get_storage(**storage_config) return storage