diff --git a/swh/loader/core/tests/test_storage.py b/swh/loader/core/tests/test_storage.py new file mode 100644 index 0000000..cc42b42 --- /dev/null +++ b/swh/loader/core/tests/test_storage.py @@ -0,0 +1,241 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from swh.loader.package.storage import ( + BufferingProxyStorage, FilteringProxyStorage +) + + +sample_content = { + 'blake2s256': b'\xbf?\x05\xed\xc1U\xd2\xc5\x168Xm\x93\xde}f(HO@\xd0\xacn\x04\x1e\x9a\xb9\xfa\xbf\xcc\x08\xc7', # noqa + 'sha1': b'g\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', + 'sha1_git': b'\xf2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa + 'sha256': b"\x87\x022\xedZN\x84\xe8za\xf8'(oA\xc9k\xb1\x80c\x80\xe7J\x06\xea\xd2\xd5\xbeB\x19\xb8\xce", # noqa + 'length': 48, + 'data': b'temp file for testing content storage conversion', + 'status': 'visible', +} + +sample_content2 = { + 'blake2s256': b'\xbf?\x05\xed\xc1U\xd2\xc5\x168Xm\x93\xde}f(HO@\xd0\xacn\x04\x1e\x9a\xb9\xfa\xbf\xcc\x08\xc7', # noqa + 'sha1': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', + 'sha1_git': b'\xc2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa + 'sha256': b"\x77\x022\xedZN\x84\xe8za\xf8'(oA\xc9k\xb1\x80c\x80\xe7J\x06\xea\xd2\xd5\xbeB\x19\xb8\xce", # noqa + 'length': 50, + 'data': b'temp file for testing content storage conversion 2', + 'status': 'visible', +} + + +sample_directory = { + 'id': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', + 'entries': [] +} + + +sample_person = { + 'name': b'John Doe', + 'email': b'john.doe@institute.org', + 'fullname': b'John Doe ' +} + + +sample_revision = { + 'id': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', + 'message': b'something', + 'author': sample_person, + 'committer': sample_person, + 'date': 1567591673, + 'committer_date': 1567591673, + 'type': 'tar', + 'directory': b'\xc2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa + 'synthetic': False, + 'metadata': {}, + 'parents': [], +} + + +def test_buffering_proxy_storage_content_threshold_not_hit(): + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'content': 10, + } + ) + s = storage.content_add([sample_content, sample_content2]) + assert s == {} + + s = storage.flush() + assert s == { + 'content:add': 1 + 1, + 'content:add:bytes': 48 + 50, + 'skipped_content:add': 0 + } + + +def test_buffering_proxy_storage_content_threshold_nb_hit(): + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'content': 1, + } + ) + + s = storage.content_add([sample_content]) + assert s == { + 'content:add': 1, + 'content:add:bytes': 48, + 'skipped_content:add': 0 + } + s = storage.flush() + assert s == {} + + +def test_buffering_proxy_storage_content_threshold_bytes_hit(): + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'content': 10, + 'content_bytes': 20, + } + ) + + s = storage.content_add([sample_content]) + assert s == { + 'content:add': 1, + 'content:add:bytes': 48, + 'skipped_content:add': 0 + } + s = storage.flush() + assert s == {} + + +def test_buffering_proxy_storage_directory_threshold_not_hit(): + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'directory': 10, + } + ) + s = storage.directory_add([sample_directory]) + assert s == {} + + s = storage.flush() + assert s == { + 'directory:add': 1, + } + + +def test_buffering_proxy_storage_directory_threshold_hit(): + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'directory': 1, + } + ) + s = storage.directory_add([sample_directory]) + assert s == { + 'directory:add': 1, + } + + s = storage.flush() + assert s == {} + + +def test_buffering_proxy_storage_revision_threshold_not_hit(): + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'revision': 10, + } + ) + s = storage.revision_add([sample_revision]) + assert s == {} + + s = storage.flush() + assert s == { + 'revision:add': 1, + } + + +def test_buffering_proxy_storage_revision_threshold_hit(): + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'revision': 1, + } + ) + s = storage.revision_add([sample_revision]) + assert s == { + 'revision:add': 1, + } + + s = storage.flush() + assert s == {} + + +def test_filtering_proxy_storage_content(): + storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}}) + + content = next(storage.content_get([sample_content['sha1']])) + assert not content + + s = storage.content_add([sample_content]) + assert s == { + 'content:add': 1, + 'content:add:bytes': 48, + 'skipped_content:add': 0 + } + + content = next(storage.content_get([sample_content['sha1']])) + assert content is not None + + s = storage.content_add([sample_content]) + assert s == { + 'content:add': 0, + 'content:add:bytes': 0, + 'skipped_content:add': 0 + } + + +def test_filtering_proxy_storage_revision(): + storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}}) + + revision = next(storage.revision_get([sample_revision['id']])) + assert not revision + + s = storage.revision_add([sample_revision]) + assert s == { + 'revision:add': 1, + } + + revision = next(storage.revision_get([sample_revision['id']])) + assert revision is not None + + s = storage.revision_add([sample_revision]) + assert s == { + 'revision:add': 0, + } + + +def test_filtering_proxy_storage_directory(): + storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}}) + + directory = next(storage.directory_missing([sample_directory['id']])) + assert directory + + s = storage.directory_add([sample_directory]) + assert s == { + 'directory:add': 1, + } + + directory = list(storage.directory_missing([sample_directory['id']])) + assert not directory + + s = storage.directory_add([sample_directory]) + assert s == { + 'directory:add': 0, + } diff --git a/swh/loader/package/storage.py b/swh/loader/package/storage.py index 3cbb438..535b7b3 100644 --- a/swh/loader/package/storage.py +++ b/swh/loader/package/storage.py @@ -1,184 +1,185 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Optional, Sequence, Dict, Set from functools import partial from collections import deque from swh.core.utils import grouper from swh.storage import get_storage class BufferingProxyStorage: """Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. """ def __init__(self, storage, thresholds=None): self.storage = get_storage(**storage) if thresholds is None: thresholds = {} self.thresholds = { 'content': thresholds.get('content', 10000), 'content_bytes': thresholds.get('content_bytes', 100*1024*1024), 'directory': thresholds.get('directory', 25000), 'revision': thresholds.get('revision', 100000), } self.object_types = ['content', 'directory', 'revision'] self._objects = {k: deque() for k in self.object_types} def __getattr__(self, key): if key.endswith('_add'): object_type = key.split('_')[0] if object_type in self.object_types: return partial( self.object_add, object_type=object_type ) return getattr(self.storage, key) def content_add(self, content: Sequence[Dict]) -> Dict: """Enqueue contents to write to the storage. Following policies apply: - First, check if the queue's threshold is hit. If it is flush content to the storage. - If not, check if the total size of enqueued contents's threshold is hit. If it is flush content to the storage. """ s = self.object_add(content, object_type='content') if not s: q = self._objects['content'] total_size = sum(c['length'] for c in q) - if total_size > self.thresholds['content_bytes']: + if total_size >= self.thresholds['content_bytes']: return self.flush(['content']) return s def flush(self, object_types: Optional[Sequence[str]] = None) -> Dict: if object_types is None: object_types = self.object_types summary = {} for object_type in object_types: q = self._objects[object_type] for objs in grouper(q, n=self.thresholds[object_type]): add_fn = getattr(self.storage, '%s_add' % object_type) s = add_fn(objs) summary = {k: v + summary.get(k, 0) for k, v in s.items()} + q.clear() return summary def object_add(self, objects: Sequence[Dict], *, object_type: str) -> Dict: """Enqueue objects to write to the storage. This checks if the queue's threshold is hit. If it is actually write those to the storage. """ q = self._objects[object_type] threshold = self.thresholds[object_type] q.extend(objects) - if len(q) > threshold: + if len(q) >= threshold: return self.flush() return {} class FilteringProxyStorage: """Storage implementation in charge of filtering existing objects prior to calling the storage api for ingestion. """ def __init__(self, storage): self.storage = get_storage(**storage) self.objects_seen = { 'content': set(), # set of content hashes (sha256) seen 'directory': set(), 'revision': set(), } def __getattr__(self, key): return getattr(self.storage, key) def content_add(self, content: Sequence[Dict]) -> Dict: contents = list(content) contents_to_add = self._filter_missing_contents(contents) return self.storage.content_add( x for x in contents if x['sha256'] in contents_to_add ) def directory_add(self, directories: Sequence[Dict]) -> Dict: directories = list(directories) missing_ids = self._filter_missing_ids( 'directory', (d['id'] for d in directories) ) return self.storage.directory_add( d for d in directories if d['id'] in missing_ids ) def revision_add(self, revisions): revisions = list(revisions) missing_ids = self._filter_missing_ids( 'revision', (d['id'] for d in revisions) ) return self.storage.revision_add( r for r in revisions if r['id'] in missing_ids ) def _filter_missing_contents( self, content_hashes: Sequence[Dict]) -> Set[bytes]: """Return only the content keys missing from swh Args: content_hashes: List of sha256 to check for existence in swh storage """ objects_seen = self.objects_seen['content'] missing_hashes = [] for hashes in content_hashes: if hashes['sha256'] in objects_seen: continue objects_seen.add(hashes['sha256']) missing_hashes.append(hashes) return set(self.storage.content_missing( missing_hashes, key_hash='sha256', )) def _filter_missing_ids( self, object_type: str, ids: Sequence[bytes]) -> Set[bytes]: """Filter missing ids from the storage for a given object type. Args: object_type: object type to use {revision, directory} ids: List of object_type ids Returns: Missing ids from the storage for object_type """ objects_seen = self.objects_seen[object_type] missing_ids = [] for id in ids: if id in objects_seen: continue objects_seen.add(id) missing_ids.append(id) fn_by_object_type = { 'revision': self.storage.revision_missing, 'directory': self.storage.directory_missing, } fn = fn_by_object_type[object_type] return set(fn(missing_ids))