diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py index 5ebe657b..04ac3b13 100644 --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -1,117 +1,129 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import deque from functools import partial -from typing import Optional, Iterable, Dict +from typing import Dict, Iterable, List, Optional from swh.core.utils import grouper from swh.model.model import Content, BaseModel from swh.storage import get_storage class BufferingProxyStorage: """Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. Sample configuration use case for buffering storage: .. code-block:: yaml storage: cls: buffer args: storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ min_batch_size: content: 10000 content_bytes: 100000000 skipped_content: 10000 directory: 5000 revision: 1000 release: 10000 """ def __init__(self, storage, min_batch_size=None): self.storage = get_storage(**storage) if min_batch_size is None: min_batch_size = {} self.min_batch_size = { 'content': min_batch_size.get('content', 10000), 'content_bytes': min_batch_size.get('content_bytes', 100*1024*1024), 'skipped_content': min_batch_size.get('skipped_content', 10000), 'directory': min_batch_size.get('directory', 25000), 'revision': min_batch_size.get('revision', 100000), 'release': min_batch_size.get('release', 100000), } self.object_types = [ 'content', 'skipped_content', 'directory', 'revision', 'release'] - self._objects = {k: deque() for k in self.object_types} + self._objects = {k: {} for k in self.object_types} def __getattr__(self, key): if key.endswith('_add'): object_type = key.rsplit('_', 1)[0] if object_type in self.object_types: return partial( - self.object_add, object_type=object_type + self.object_add, object_type=object_type, + keys=['id'], ) if key == 'storage': raise AttributeError(key) return getattr(self.storage, key) def content_add(self, content: Iterable[Content]) -> Dict: """Enqueue contents to write to the storage. Following policies apply: - First, check if the queue's threshold is hit. If it is flush content to the storage. - If not, check if the total size of enqueued contents's threshold is hit. If it is flush content to the storage. """ content = list(content) - s = self.object_add(content, object_type='content') + s = self.object_add( + content, object_type='content', + keys=['sha1', 'sha1_git', 'sha256', 'blake2s256']) if not s: - q = self._objects['content'] - total_size = sum(c.length for c in q) + buffer_ = self._objects['content'].values() + total_size = sum(c.length for c in buffer_) if total_size >= self.min_batch_size['content_bytes']: return self.flush(['content']) return s + def skipped_content_add(self, content: Iterable[Content]) -> Dict: + return self.object_add( + content, object_type='skipped_content', + keys=['sha1', 'sha1_git', 'sha256', 'blake2s256']) + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: if object_types is None: object_types = self.object_types summary = {} # type: Dict[str, Dict] for object_type in object_types: - q = self._objects[object_type] - for objs in grouper(q, n=self.min_batch_size[object_type]): + buffer_ = self._objects[object_type] + batches = grouper( + buffer_.values(), n=self.min_batch_size[object_type]) + for batch in batches: add_fn = getattr(self.storage, '%s_add' % object_type) - s = add_fn(objs) + s = add_fn(batch) summary = {k: v + summary.get(k, 0) for k, v in s.items()} - q.clear() + buffer_.clear() return summary def object_add( - self, objects: Iterable[BaseModel], *, object_type: str) -> Dict: + self, objects: Iterable[BaseModel], *, + object_type: str, keys: List[str]) -> Dict: """Enqueue objects to write to the storage. This checks if the queue's threshold is hit. If it is actually write those to the storage. """ - q = self._objects[object_type] + buffer_ = self._objects[object_type] threshold = self.min_batch_size[object_type] - q.extend(objects) - if len(q) >= threshold: + for obj in objects: + obj_key = tuple(getattr(obj, key) for key in keys) + buffer_[obj_key] = obj + if len(buffer_) >= threshold: return self.flush() return {} diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py index 5bdcfa8e..5449d01d 100644 --- a/swh/storage/tests/conftest.py +++ b/swh/storage/tests/conftest.py @@ -1,243 +1,243 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob import pytest from typing import Union from pytest_postgresql import factories from pytest_postgresql.janitor import DatabaseJanitor, psycopg2, Version from os import path, environ from hypothesis import settings from typing import Dict import swh.storage from swh.core.utils import numfile_sortkey as sortkey from swh.model.tests.generate_testdata import gen_contents, gen_origins SQL_DIR = path.join(path.dirname(swh.storage.__file__), 'sql') environ['LC_ALL'] = 'C.UTF-8' DUMP_FILES = path.join(SQL_DIR, '*.sql') # define tests profile. Full documentation is at: # https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles settings.register_profile("fast", max_examples=5, deadline=5000) settings.register_profile("slow", max_examples=20, deadline=5000) @pytest.fixture def swh_storage_backend_config(postgresql_proc, swh_storage_postgresql): yield { 'cls': 'local', 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( host=postgresql_proc.host, port=postgresql_proc.port, user='postgres', dbname='tests'), 'objstorage': { 'cls': 'memory', 'args': {} }, 'journal_writer': { 'cls': 'memory', }, } @pytest.fixture def swh_storage(swh_storage_backend_config): storage_config = { 'cls': 'validate', 'storage': swh_storage_backend_config } storage = swh.storage.get_storage(**storage_config) return storage @pytest.fixture def swh_contents(swh_storage): contents = gen_contents(n=20) swh_storage.content_add( [c for c in contents if c['status'] != 'absent']) swh_storage.skipped_content_add( [c for c in contents if c['status'] == 'absent']) return contents @pytest.fixture def swh_origins(swh_storage): origins = gen_origins(n=100) swh_storage.origin_add(origins) return origins # the postgres_fact factory fixture below is mostly a copy of the code # from pytest-postgresql. We need a custom version here to be able to # specify our version of the DBJanitor we use. def postgresql_fact(process_fixture_name, db_name=None, dump_files=DUMP_FILES): @pytest.fixture def postgresql_factory(request): """ Fixture factory for PostgreSQL. :param FixtureRequest request: fixture request object :rtype: psycopg2.connection :returns: postgresql client """ config = factories.get_config(request) if not psycopg2: raise ImportError( 'No module named psycopg2. Please install it.' ) proc_fixture = request.getfixturevalue(process_fixture_name) # _, config = try_import('psycopg2', request) pg_host = proc_fixture.host pg_port = proc_fixture.port pg_user = proc_fixture.user pg_options = proc_fixture.options pg_db = db_name or config['dbname'] with SwhDatabaseJanitor( pg_user, pg_host, pg_port, pg_db, proc_fixture.version, dump_files=dump_files ): connection = psycopg2.connect( dbname=pg_db, user=pg_user, host=pg_host, port=pg_port, options=pg_options ) yield connection connection.close() return postgresql_factory swh_storage_postgresql = postgresql_fact('postgresql_proc') # This version of the DatabaseJanitor implement a different setup/teardown # behavior than than the stock one: instead of dropping, creating and # initializing the database for each test, it create and initialize the db only # once, then it truncate the tables. This is needed to have acceptable test # performances. class SwhDatabaseJanitor(DatabaseJanitor): def __init__( self, user: str, host: str, port: str, db_name: str, version: Union[str, float, Version], dump_files: str = DUMP_FILES ) -> None: super().__init__(user, host, port, db_name, version) self.dump_files = sorted( glob.glob(dump_files), key=sortkey) def db_setup(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: for fname in self.dump_files: with open(fname) as fobj: sql = fobj.read().replace('concurrently', '').strip() if sql: cur.execute(sql) cnx.commit() def db_reset(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: cur.execute( "SELECT table_name FROM information_schema.tables " "WHERE table_schema = %s", ('public',)) tables = set(table for (table,) in cur.fetchall()) for table in tables: cur.execute('truncate table %s cascade' % table) cur.execute( "SELECT sequence_name FROM information_schema.sequences " "WHERE sequence_schema = %s", ('public',)) seqs = set(seq for (seq,) in cur.fetchall()) for seq in seqs: cur.execute('ALTER SEQUENCE %s RESTART;' % seq) cnx.commit() def init(self): with self.cursor() as cur: cur.execute( "SELECT COUNT(1) FROM pg_database WHERE datname=%s;", (self.db_name,)) db_exists = cur.fetchone()[0] == 1 if db_exists: cur.execute( 'UPDATE pg_database SET datallowconn=true ' 'WHERE datname = %s;', (self.db_name,)) if db_exists: self.db_reset() else: with self.cursor() as cur: cur.execute('CREATE DATABASE "{}";'.format(self.db_name)) self.db_setup() def drop(self): pid_column = 'pid' with self.cursor() as cur: cur.execute( 'UPDATE pg_database SET datallowconn=false ' 'WHERE datname = %s;', (self.db_name,)) cur.execute( 'SELECT pg_terminate_backend(pg_stat_activity.{})' 'FROM pg_stat_activity ' 'WHERE pg_stat_activity.datname = %s;'.format(pid_column), (self.db_name,)) @pytest.fixture def sample_data() -> Dict: """Pre-defined sample storage object data to manipulate Returns: Dict of data (keys: content, directory, revision, release, person, origin) """ from .storage_data import data return { 'content': [data.cont, data.cont2], 'content_metadata': [data.cont3], 'skipped_content': [data.skipped_cont, data.skipped_cont2], 'person': [data.person], 'directory': [data.dir2, data.dir], - 'revision': [data.revision], + 'revision': [data.revision, data.revision2, data.revision3], 'release': [data.release, data.release2, data.release3], 'snapshot': [data.snapshot], 'origin': [data.origin, data.origin2], 'tool': [data.metadata_tool], 'provider': [data.provider], 'origin_metadata': [data.origin_metadata, data.origin_metadata2], } diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py index 41f4bbad..aceab756 100644 --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -1,279 +1,415 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.storage import get_storage def get_storage_with_buffer_config(**buffer_config): storage_config = { 'cls': 'pipeline', 'steps': [ {'cls': 'validate'}, {'cls': 'buffer', **buffer_config}, {'cls': 'memory'}, ] } return get_storage(**storage_config) def test_buffering_proxy_storage_content_threshold_not_hit(sample_data): contents = sample_data['content'] storage = get_storage_with_buffer_config( min_batch_size={ 'content': 10, } ) s = storage.content_add([contents[0], contents[1]]) assert s == {} # contents have not been written to storage missing_contents = storage.content_missing( [contents[0], contents[1]]) assert set(missing_contents) == set( [contents[0]['sha1'], contents[1]['sha1']]) s = storage.flush() assert s == { 'content:add': 1 + 1, 'content:add:bytes': contents[0]['length'] + contents[1]['length'], } missing_contents = storage.content_missing( [contents[0], contents[1]]) assert list(missing_contents) == [] def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data): contents = sample_data['content'] storage = get_storage_with_buffer_config( min_batch_size={ 'content': 1, } ) s = storage.content_add([contents[0]]) assert s == { 'content:add': 1, 'content:add:bytes': contents[0]['length'], } missing_contents = storage.content_missing([contents[0]]) assert list(missing_contents) == [] s = storage.flush() assert s == {} +def test_buffering_proxy_storage_content_deduplicate(sample_data): + contents = sample_data['content'] + storage = get_storage_with_buffer_config( + min_batch_size={ + 'content': 2, + } + ) + + s = storage.content_add([contents[0], contents[0]]) + assert s == {} + + s = storage.content_add([contents[0]]) + assert s == {} + + s = storage.content_add([contents[1]]) + assert s == { + 'content:add': 1 + 1, + 'content:add:bytes': contents[0]['length'] + contents[1]['length'], + } + + missing_contents = storage.content_missing( + [contents[0], contents[1]]) + assert list(missing_contents) == [] + + s = storage.flush() + assert s == {} + + def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data): contents = sample_data['content'] content_bytes_min_batch_size = 2 storage = get_storage_with_buffer_config( min_batch_size={ 'content': 10, 'content_bytes': content_bytes_min_batch_size, } ) assert contents[0]['length'] > content_bytes_min_batch_size s = storage.content_add([contents[0]]) assert s == { 'content:add': 1, 'content:add:bytes': contents[0]['length'], } missing_contents = storage.content_missing([contents[0]]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_skipped_content_threshold_not_hit( sample_data): contents = sample_data['skipped_content'] storage = get_storage_with_buffer_config( min_batch_size={ 'skipped_content': 10, } ) s = storage.skipped_content_add([contents[0], contents[1]]) assert s == {} # contents have not been written to storage missing_contents = storage.skipped_content_missing( [contents[0], contents[1]]) assert {c['sha1'] for c in missing_contents} \ == {c['sha1'] for c in contents} s = storage.flush() assert s == { 'skipped_content:add': 1 + 1 } missing_contents = storage.skipped_content_missing( [contents[0], contents[1]]) assert list(missing_contents) == [] def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data): contents = sample_data['skipped_content'] storage = get_storage_with_buffer_config( min_batch_size={ 'skipped_content': 1, } ) s = storage.skipped_content_add([contents[0]]) assert s == { 'skipped_content:add': 1 } missing_contents = storage.skipped_content_missing([contents[0]]) assert list(missing_contents) == [] s = storage.flush() assert s == {} +def test_buffering_proxy_storage_skipped_content_deduplicate(sample_data): + contents = sample_data['skipped_content'] + storage = get_storage_with_buffer_config( + min_batch_size={ + 'skipped_content': 2, + } + ) + + s = storage.skipped_content_add([contents[0], contents[0]]) + assert s == {} + + s = storage.skipped_content_add([contents[0]]) + assert s == {} + + s = storage.skipped_content_add([contents[1]]) + assert s == { + 'skipped_content:add': 1 + 1, + } + + missing_contents = storage.skipped_content_missing( + [contents[0], contents[1]]) + assert list(missing_contents) == [] + + s = storage.flush() + assert s == {} + + def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data): directories = sample_data['directory'] storage = get_storage_with_buffer_config( min_batch_size={ 'directory': 10, } ) s = storage.directory_add([directories[0]]) assert s == {} directory_id = directories[0]['id'] missing_directories = storage.directory_missing( [directory_id]) assert list(missing_directories) == [directory_id] s = storage.flush() assert s == { 'directory:add': 1, } missing_directories = storage.directory_missing( [directory_id]) assert list(missing_directories) == [] def test_buffering_proxy_storage_directory_threshold_hit(sample_data): directories = sample_data['directory'] storage = get_storage_with_buffer_config( min_batch_size={ 'directory': 1, } ) s = storage.directory_add([directories[0]]) assert s == { 'directory:add': 1, } missing_directories = storage.directory_missing( [directories[0]['id']]) assert list(missing_directories) == [] s = storage.flush() assert s == {} +def test_buffering_proxy_storage_directory_deduplicate(sample_data): + directories = sample_data['directory'] + storage = get_storage_with_buffer_config( + min_batch_size={ + 'directory': 2, + } + ) + + s = storage.directory_add([directories[0], directories[0]]) + assert s == {} + + s = storage.directory_add([directories[0]]) + assert s == {} + + s = storage.directory_add([directories[1]]) + assert s == { + 'directory:add': 1 + 1, + } + + missing_directories = storage.directory_missing( + [directories[0]['id'], directories[1]['id']]) + assert list(missing_directories) == [] + + s = storage.flush() + assert s == {} + + def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data): revisions = sample_data['revision'] storage = get_storage_with_buffer_config( min_batch_size={ 'revision': 10, } ) s = storage.revision_add([revisions[0]]) assert s == {} revision_id = revisions[0]['id'] missing_revisions = storage.revision_missing( [revision_id]) assert list(missing_revisions) == [revision_id] s = storage.flush() assert s == { 'revision:add': 1, } missing_revisions = storage.revision_missing( [revision_id]) assert list(missing_revisions) == [] def test_buffering_proxy_storage_revision_threshold_hit(sample_data): revisions = sample_data['revision'] storage = get_storage_with_buffer_config( min_batch_size={ 'revision': 1, } ) s = storage.revision_add([revisions[0]]) assert s == { 'revision:add': 1, } missing_revisions = storage.revision_missing( [revisions[0]['id']]) assert list(missing_revisions) == [] s = storage.flush() assert s == {} +def test_buffering_proxy_storage_revision_deduplicate(sample_data): + revisions = sample_data['revision'] + storage = get_storage_with_buffer_config( + min_batch_size={ + 'revision': 2, + } + ) + + s = storage.revision_add([revisions[0], revisions[0]]) + assert s == {} + + s = storage.revision_add([revisions[0]]) + assert s == {} + + s = storage.revision_add([revisions[1]]) + assert s == { + 'revision:add': 1 + 1, + } + + missing_revisions = storage.revision_missing( + [revisions[0]['id'], revisions[1]['id']]) + assert list(missing_revisions) == [] + + s = storage.flush() + assert s == {} + + def test_buffering_proxy_storage_release_threshold_not_hit(sample_data): releases = sample_data['release'] threshold = 10 assert len(releases) < threshold storage = get_storage_with_buffer_config( min_batch_size={ 'release': threshold, # configuration set } ) s = storage.release_add(releases) assert s == {} release_ids = [r['id'] for r in releases] missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == release_ids s = storage.flush() assert s == { 'release:add': len(releases), } missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == [] def test_buffering_proxy_storage_release_threshold_hit(sample_data): releases = sample_data['release'] threshold = 2 assert len(releases) > threshold storage = get_storage_with_buffer_config( min_batch_size={ 'release': threshold, # configuration set } ) s = storage.release_add(releases) assert s == { 'release:add': len(releases), } release_ids = [r['id'] for r in releases] missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == [] s = storage.flush() assert s == {} + + +def test_buffering_proxy_storage_release_deduplicate(sample_data): + releases = sample_data['release'] + storage = get_storage_with_buffer_config( + min_batch_size={ + 'release': 2, + } + ) + + s = storage.release_add([releases[0], releases[0]]) + assert s == {} + + s = storage.release_add([releases[0]]) + assert s == {} + + s = storage.release_add([releases[1]]) + assert s == { + 'release:add': 1 + 1, + } + + missing_releases = storage.release_missing( + [releases[0]['id'], releases[1]['id']]) + assert list(missing_releases) == [] + + s = storage.flush() + assert s == {}