diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -8,6 +8,7 @@ from typing import Optional, Iterable, Dict from swh.core.utils import grouper +from swh.model.model import Content, BaseModel from swh.storage import get_storage @@ -64,7 +65,7 @@ raise AttributeError(key) return getattr(self.storage, key) - def content_add(self, content: Iterable[Dict]) -> Dict: + def content_add(self, content: Iterable[Content]) -> Dict: """Enqueue contents to write to the storage. Following policies apply: @@ -76,10 +77,11 @@ threshold is hit. If it is flush content to the storage. """ + content = list(content) s = self.object_add(content, object_type='content') if not s: q = self._objects['content'] - total_size = sum(c['length'] for c in q) + total_size = sum(c.length for c in q) if total_size >= self.min_batch_size['content_bytes']: return self.flush(['content']) @@ -100,7 +102,8 @@ return summary - def object_add(self, objects: Iterable[Dict], *, object_type: str) -> Dict: + def object_add( + self, objects: Iterable[BaseModel], *, object_type: str) -> Dict: """Enqueue objects to write to the storage. This checks if the queue's threshold is hit. If it is actually write those to the storage. diff --git a/swh/storage/filter.py b/swh/storage/filter.py --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -4,7 +4,11 @@ # See top-level LICENSE file for more information -from typing import Dict, Generator, Iterable, Set +from typing import Dict, Iterable, Set + +from swh.model.model import ( + Content, SkippedContent, Directory, Revision, +) from swh.storage import get_storage @@ -38,43 +42,43 @@ raise AttributeError(key) return getattr(self.storage, key) - def content_add(self, content: Iterable[Dict]) -> Dict: + def content_add(self, content: Iterable[Content]) -> Dict: contents = list(content) contents_to_add = self._filter_missing_contents(contents) return self.storage.content_add( - x for x in contents if x['sha256'] in contents_to_add + x for x in contents if x.sha256 in contents_to_add ) - def skipped_content_add(self, content: Iterable[Dict]) -> Dict: + def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: contents = list(content) contents_to_add = self._filter_missing_skipped_contents(contents) return self.storage.skipped_content_add( x for x in contents - if x.get('sha1_git') is None or x['sha1_git'] in contents_to_add + if x.sha1_git is None or x.sha1_git in contents_to_add ) - def directory_add(self, directories: Iterable[Dict]) -> Dict: + def directory_add(self, directories: Iterable[Directory]) -> Dict: directories = list(directories) missing_ids = self._filter_missing_ids( 'directory', - (d['id'] for d in directories) + (d.id for d in directories) ) return self.storage.directory_add( - d for d in directories if d['id'] in missing_ids + d for d in directories if d.id in missing_ids ) - def revision_add(self, revisions): + def revision_add(self, revisions: Iterable[Revision]) -> Dict: revisions = list(revisions) missing_ids = self._filter_missing_ids( 'revision', - (d['id'] for d in revisions) + (r.id for r in revisions) ) return self.storage.revision_add( - r for r in revisions if r['id'] in missing_ids + r for r in revisions if r.id in missing_ids ) def _filter_missing_contents( - self, content_hashes: Iterable[Dict]) -> Set[bytes]: + self, contents: Iterable[Content]) -> Set[bytes]: """Return only the content keys missing from swh Args: @@ -83,20 +87,20 @@ """ objects_seen = self.objects_seen['content'] - missing_hashes = [] - for hashes in content_hashes: - if hashes['sha256'] in objects_seen: + missing_contents = [] + for content in contents: + if content.sha256 in objects_seen: continue - objects_seen.add(hashes['sha256']) - missing_hashes.append(hashes) + objects_seen.add(content.sha256) + missing_contents.append(content.to_dict()) return set(self.storage.content_missing( - missing_hashes, + missing_contents, key_hash='sha256', )) def _filter_missing_skipped_contents( - self, content_hashes: Iterable[Dict]) -> Set[bytes]: + self, contents: Iterable[SkippedContent]) -> Set[bytes]: """Return only the content keys missing from swh Args: @@ -105,21 +109,21 @@ """ objects_seen = self.objects_seen['skipped_content'] - missing_hashes = [] - for hashes in content_hashes: - if hashes.get('sha1_git') is None \ - or hashes['sha1_git'] in objects_seen: + missing_contents = [] + for content in contents: + if content.sha1_git is None or content.sha1_git in objects_seen: continue - objects_seen.add(hashes['sha1_git']) - missing_hashes.append(hashes) + objects_seen.add(content.sha1_git) + missing_contents.append(content.to_dict()) - return {c['sha1_git'] - for c in self.storage.skipped_content_missing(missing_hashes)} + return { + c.get('sha1_git') + for c in self.storage.skipped_content_missing(missing_contents)} def _filter_missing_ids( self, object_type: str, - ids: Generator[bytes, None, None]) -> Set[bytes]: + ids: Iterable[bytes]) -> Set[bytes]: """Filter missing ids from the storage for a given object type. Args: diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -13,6 +13,11 @@ retry, stop_after_attempt, wait_random_exponential, ) +from swh.model.model import ( + Content, SkippedContent, Directory, Revision, Release, Snapshot, + Origin, +) + from swh.storage import get_storage from swh.storage.exc import StorageArgumentException @@ -66,22 +71,19 @@ return getattr(self.storage, key) @swh_retry - def content_add(self, content: Iterable[Dict]) -> Dict: - contents = list(content) - return self.storage.content_add(contents) + def content_add(self, content: Iterable[Content]) -> Dict: + return self.storage.content_add(content) @swh_retry - def content_add_metadata(self, content: Iterable[Dict]) -> Dict: - contents = list(content) - return self.storage.content_add_metadata(contents) + def content_add_metadata(self, content: Iterable[Content]) -> Dict: + return self.storage.content_add_metadata(content) @swh_retry - def skipped_content_add(self, content: Iterable[Dict]) -> Dict: - contents = list(content) - return self.storage.skipped_content_add(contents) + def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: + return self.storage.skipped_content_add(content) @swh_retry - def origin_add_one(self, origin: Dict) -> str: + def origin_add_one(self, origin: Origin) -> str: return self.storage.origin_add_one(origin) @swh_retry @@ -118,23 +120,19 @@ origin_url, ts, provider_id, tool_id, metadata) @swh_retry - def directory_add(self, directories: Iterable[Dict]) -> Dict: - directories = list(directories) + def directory_add(self, directories: Iterable[Directory]) -> Dict: return self.storage.directory_add(directories) @swh_retry - def revision_add(self, revisions: Iterable[Dict]) -> Dict: - revisions = list(revisions) + def revision_add(self, revisions: Iterable[Revision]) -> Dict: return self.storage.revision_add(revisions) @swh_retry - def release_add(self, releases: Iterable[Dict]) -> Dict: - releases = list(releases) + def release_add(self, releases: Iterable[Release]) -> Dict: return self.storage.release_add(releases) @swh_retry - def snapshot_add(self, snapshot: Iterable[Dict]) -> Dict: - snapshots = list(snapshot) + def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: return self.storage.snapshot_add(snapshots) @swh_retry diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -3,21 +3,25 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.storage.buffer import BufferingProxyStorage +from swh.storage import get_storage -storage_config = { - 'cls': 'validate', - 'storage': { - 'cls': 'memory' +def get_storage_with_buffer_config(**buffer_config): + storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'buffer', **buffer_config}, + {'cls': 'memory'}, + ] } -} + + return get_storage(**storage_config) def test_buffering_proxy_storage_content_threshold_not_hit(sample_data): contents = sample_data['content'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'content': 10, } @@ -44,8 +48,7 @@ def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data): contents = sample_data['content'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'content': 1, } @@ -67,8 +70,7 @@ def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data): contents = sample_data['content'] content_bytes_min_batch_size = 2 - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'content': 10, 'content_bytes': content_bytes_min_batch_size, @@ -93,8 +95,7 @@ def test_buffering_proxy_storage_skipped_content_threshold_not_hit( sample_data): contents = sample_data['skipped_content'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'skipped_content': 10, } @@ -120,8 +121,7 @@ def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data): contents = sample_data['skipped_content'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'skipped_content': 1, } @@ -141,8 +141,7 @@ def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data): directories = sample_data['directory'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'directory': 10, } @@ -167,8 +166,7 @@ def test_buffering_proxy_storage_directory_threshold_hit(sample_data): directories = sample_data['directory'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'directory': 1, } @@ -188,8 +186,7 @@ def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data): revisions = sample_data['revision'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'revision': 10, } @@ -214,8 +211,7 @@ def test_buffering_proxy_storage_revision_threshold_hit(sample_data): revisions = sample_data['revision'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'revision': 1, } @@ -238,8 +234,7 @@ threshold = 10 assert len(releases) < threshold - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'release': threshold, # configuration set } @@ -265,8 +260,7 @@ threshold = 2 assert len(releases) > threshold - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'release': threshold, # configuration set } diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py --- a/swh/storage/tests/test_filter.py +++ b/swh/storage/tests/test_filter.py @@ -4,20 +4,22 @@ # See top-level LICENSE file for more information -from swh.storage.filter import FilteringProxyStorage +from swh.storage import get_storage storage_config = { - 'cls': 'validate', - 'storage': { - 'cls': 'memory' - } + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'filter'}, + {'cls': 'memory'}, + ] } def test_filtering_proxy_storage_content(sample_data): sample_content = sample_data['content'][0] - storage = FilteringProxyStorage(storage=storage_config) + storage = get_storage(**storage_config) content = next(storage.content_get([sample_content['sha1']])) assert not content @@ -40,7 +42,7 @@ def test_filtering_proxy_storage_skipped_content(sample_data): sample_content = sample_data['skipped_content'][0] - storage = FilteringProxyStorage(storage=storage_config) + storage = get_storage(**storage_config) content = next(storage.skipped_content_missing([sample_content])) assert content['sha1'] == sample_content['sha1'] @@ -62,7 +64,7 @@ def test_filtering_proxy_storage_skipped_content_missing_sha1_git(sample_data): sample_content = sample_data['skipped_content'][0] sample_content2 = sample_data['skipped_content'][1] - storage = FilteringProxyStorage(storage=storage_config) + storage = get_storage(**storage_config) sample_content['sha1_git'] = sample_content2['sha1_git'] = None content = next(storage.skipped_content_missing([sample_content])) @@ -87,7 +89,7 @@ def test_filtering_proxy_storage_revision(sample_data): sample_revision = sample_data['revision'][0] - storage = FilteringProxyStorage(storage=storage_config) + storage = get_storage(**storage_config) revision = next(storage.revision_get([sample_revision['id']])) assert not revision @@ -108,7 +110,7 @@ def test_filtering_proxy_storage_directory(sample_data): sample_directory = sample_data['directory'][0] - storage = FilteringProxyStorage(storage=storage_config) + storage = get_storage(**storage_config) directory = next(storage.directory_missing([sample_directory['id']])) assert directory diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -10,19 +10,21 @@ from unittest.mock import call -from swh.storage import HashCollision +from swh.storage import HashCollision, get_storage from swh.storage.exc import StorageArgumentException -from swh.storage.retry import RetryingProxyStorage @pytest.fixture def swh_storage(): - return RetryingProxyStorage(storage={ - 'cls': 'validate', - 'storage': { - 'cls': 'memory' - } - }) + storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'retry'}, + {'cls': 'memory'}, + ] + } + return get_storage(**storage_config) def test_retrying_proxy_storage_content_add(swh_storage, sample_data):