diff --git a/conftest.py b/conftest.py index eb6de3d3..32d44967 100644 --- a/conftest.py +++ b/conftest.py @@ -1,6 +1,76 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + from hypothesis import settings +from typing import Dict + # define tests profile. Full documentation is at: # https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles settings.register_profile("fast", max_examples=5, deadline=5000) settings.register_profile("slow", max_examples=20, deadline=5000) + + +@pytest.fixture +def sample_data() -> Dict: + """Pre-defined sample storage object data to manipulate + + Returns: + Dict of data (keys: content, directory, revision, person) + + """ + sample_content = { + 'blake2s256': b'\xbf?\x05\xed\xc1U\xd2\xc5\x168Xm\x93\xde}f(HO@\xd0\xacn\x04\x1e\x9a\xb9\xfa\xbf\xcc\x08\xc7', # noqa + 'sha1': b'g\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', + 'sha1_git': b'\xf2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa + 'sha256': b"\x87\x022\xedZN\x84\xe8za\xf8'(oA\xc9k\xb1\x80c\x80\xe7J\x06\xea\xd2\xd5\xbeB\x19\xb8\xce", # noqa + 'length': 48, + 'data': b'temp file for testing content storage conversion', + 'status': 'visible', + } + + sample_content2 = { + 'blake2s256': b'\xbf?\x05\xed\xc1U\xd2\xc5\x168Xm\x93\xde}f(HO@\xd0\xacn\x04\x1e\x9a\xb9\xfa\xbf\xcc\x08\xc7', # noqa + 'sha1': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', + 'sha1_git': b'\xc2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa + 'sha256': b"\x77\x022\xedZN\x84\xe8za\xf8'(oA\xc9k\xb1\x80c\x80\xe7J\x06\xea\xd2\xd5\xbeB\x19\xb8\xce", # noqa + 'length': 50, + 'data': b'temp file for testing content storage conversion 2', + 'status': 'visible', + } + + sample_directory = { + 'id': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', + 'entries': [] + } + + sample_person = { + 'name': b'John Doe', + 'email': b'john.doe@institute.org', + 'fullname': b'John Doe ' + } + + sample_revision = { + 'id': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', + 'message': b'something', + 'author': sample_person, + 'committer': sample_person, + 'date': 1567591673, + 'committer_date': 1567591673, + 'type': 'tar', + 'directory': b'\xc2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa + 'synthetic': False, + 'metadata': {}, + 'parents': [], + } + + return { + 'content': [sample_content, sample_content2], + 'person': [sample_person], + 'directory': [sample_directory], + 'revision': [sample_revision], + } diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py index 25685fc8..aef1d384 100644 --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -1,46 +1,48 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from . import storage Storage = storage.Storage class HashCollision(Exception): pass -STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory'} +STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter'} def get_storage(cls, args): """Get a storage object of class `storage_class` with arguments `storage_args`. Args: storage (dict): dictionary with keys: - - cls (str): storage's class, either local, remote, memory + - cls (str): storage's class, either local, remote, memory, filter - args (dict): dictionary with keys Returns: an instance of swh.storage.Storage (either local or remote) Raises: ValueError if passed an unknown storage class. """ if cls not in STORAGE_IMPLEMENTATION: raise ValueError('Unknown storage class `%s`. Supported: %s' % ( cls, ', '.join(STORAGE_IMPLEMENTATION))) if cls == 'remote': from .api.client import RemoteStorage as Storage elif cls == 'local': from .storage import Storage elif cls == 'memory': from .in_memory import Storage + elif cls == 'filter': + from .filter import FilteringProxyStorage as Storage return Storage(**args) diff --git a/swh/storage/filter.py b/swh/storage/filter.py new file mode 100644 index 00000000..813c8cf8 --- /dev/null +++ b/swh/storage/filter.py @@ -0,0 +1,116 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from typing import Dict, Generator, Sequence, Set + +from swh.storage import get_storage + + +class FilteringProxyStorage: + """Filtering Storage implementation. This is in charge of transparently + filtering out known objects prior to adding them to storage. + + Sample configuration use case for filtering storage: + + .. code-block: yaml + + storage: + cls: filter + args: + storage: + cls: remote + args: http://storage.internal.staging.swh.network:5002/ + + """ + def __init__(self, storage): + self.storage = get_storage(**storage) + self.objects_seen = { + 'content': set(), # set of content hashes (sha256) seen + 'directory': set(), + 'revision': set(), + } + + def __getattr__(self, key): + return getattr(self.storage, key) + + def content_add(self, content: Sequence[Dict]) -> Dict: + contents = list(content) + contents_to_add = self._filter_missing_contents(contents) + return self.storage.content_add( + x for x in contents if x['sha256'] in contents_to_add + ) + + def directory_add(self, directories: Sequence[Dict]) -> Dict: + directories = list(directories) + missing_ids = self._filter_missing_ids( + 'directory', + (d['id'] for d in directories) + ) + return self.storage.directory_add( + d for d in directories if d['id'] in missing_ids + ) + + def revision_add(self, revisions): + revisions = list(revisions) + missing_ids = self._filter_missing_ids( + 'revision', + (d['id'] for d in revisions) + ) + return self.storage.revision_add( + r for r in revisions if r['id'] in missing_ids + ) + + def _filter_missing_contents( + self, content_hashes: Sequence[Dict]) -> Set[bytes]: + """Return only the content keys missing from swh + + Args: + content_hashes: List of sha256 to check for existence in swh + storage + + """ + objects_seen = self.objects_seen['content'] + missing_hashes = [] + for hashes in content_hashes: + if hashes['sha256'] in objects_seen: + continue + objects_seen.add(hashes['sha256']) + missing_hashes.append(hashes) + + return set(self.storage.content_missing( + missing_hashes, + key_hash='sha256', + )) + + def _filter_missing_ids( + self, + object_type: str, + ids: Generator[bytes, None, None]) -> Set[bytes]: + """Filter missing ids from the storage for a given object type. + + Args: + object_type: object type to use {revision, directory} + ids: Sequence of object_type ids + + Returns: + Missing ids from the storage for object_type + + """ + objects_seen = self.objects_seen[object_type] + missing_ids = [] + for id in ids: + if id in objects_seen: + continue + objects_seen.add(id) + missing_ids.append(id) + + fn_by_object_type = { + 'revision': self.storage.revision_missing, + 'directory': self.storage.directory_missing, + } + + fn = fn_by_object_type[object_type] + return set(fn(missing_ids)) diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py new file mode 100644 index 00000000..80f3a3e9 --- /dev/null +++ b/swh/storage/tests/test_filter.py @@ -0,0 +1,74 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from swh.storage.filter import FilteringProxyStorage + + +def test_filtering_proxy_storage_content(sample_data): + sample_content = sample_data['content'][0] + storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}}) + + content = next(storage.content_get([sample_content['sha1']])) + assert not content + + s = storage.content_add([sample_content]) + assert s == { + 'content:add': 1, + 'content:add:bytes': 48, + 'skipped_content:add': 0 + } + + content = next(storage.content_get([sample_content['sha1']])) + assert content is not None + + s = storage.content_add([sample_content]) + assert s == { + 'content:add': 0, + 'content:add:bytes': 0, + 'skipped_content:add': 0 + } + + +def test_filtering_proxy_storage_revision(sample_data): + sample_revision = sample_data['revision'][0] + storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}}) + + revision = next(storage.revision_get([sample_revision['id']])) + assert not revision + + s = storage.revision_add([sample_revision]) + assert s == { + 'revision:add': 1, + } + + revision = next(storage.revision_get([sample_revision['id']])) + assert revision is not None + + s = storage.revision_add([sample_revision]) + assert s == { + 'revision:add': 0, + } + + +def test_filtering_proxy_storage_directory(sample_data): + sample_directory = sample_data['directory'][0] + storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}}) + + directory = next(storage.directory_missing([sample_directory['id']])) + assert directory + + s = storage.directory_add([sample_directory]) + assert s == { + 'directory:add': 1, + } + + directory = list(storage.directory_missing([sample_directory['id']])) + assert not directory + + s = storage.directory_add([sample_directory]) + assert s == { + 'directory:add': 0, + } diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py index a61a7ef9..924c3830 100644 --- a/swh/storage/tests/test_init.py +++ b/swh/storage/tests/test_init.py @@ -1,42 +1,46 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from unittest.mock import patch from swh.storage import get_storage from swh.storage.api.client import RemoteStorage from swh.storage.storage import Storage as DbStorage from swh.storage.in_memory import Storage as MemoryStorage +from swh.storage.filter import FilteringProxyStorage @patch('swh.storage.storage.psycopg2.pool') def test_get_storage(mock_pool): """Instantiating an existing storage should be ok """ mock_pool.ThreadedConnectionPool.return_value = None for cls, real_class, dummy_args in [ ('remote', RemoteStorage, {'url': 'url'}), ('memory', MemoryStorage, {}), ('local', DbStorage, { 'db': 'postgresql://db', 'objstorage': { 'cls': 'memory', 'args': {}, }, }), + ('filter', FilteringProxyStorage, {'storage': { + 'cls': 'memory', 'args': {}} + }) ]: actual_storage = get_storage(cls, args=dummy_args) assert actual_storage is not None assert isinstance(actual_storage, real_class) def test_get_storage_failure(): """Instantiating an unknown storage should raise """ with pytest.raises(ValueError, match='Unknown storage class `unknown`'): get_storage('unknown', args=[])