diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py index 829f7546..5ebe657b 100644 --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -1,114 +1,117 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import deque from functools import partial from typing import Optional, Iterable, Dict from swh.core.utils import grouper +from swh.model.model import Content, BaseModel from swh.storage import get_storage class BufferingProxyStorage: """Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. Sample configuration use case for buffering storage: .. code-block:: yaml storage: cls: buffer args: storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ min_batch_size: content: 10000 content_bytes: 100000000 skipped_content: 10000 directory: 5000 revision: 1000 release: 10000 """ def __init__(self, storage, min_batch_size=None): self.storage = get_storage(**storage) if min_batch_size is None: min_batch_size = {} self.min_batch_size = { 'content': min_batch_size.get('content', 10000), 'content_bytes': min_batch_size.get('content_bytes', 100*1024*1024), 'skipped_content': min_batch_size.get('skipped_content', 10000), 'directory': min_batch_size.get('directory', 25000), 'revision': min_batch_size.get('revision', 100000), 'release': min_batch_size.get('release', 100000), } self.object_types = [ 'content', 'skipped_content', 'directory', 'revision', 'release'] self._objects = {k: deque() for k in self.object_types} def __getattr__(self, key): if key.endswith('_add'): object_type = key.rsplit('_', 1)[0] if object_type in self.object_types: return partial( self.object_add, object_type=object_type ) if key == 'storage': raise AttributeError(key) return getattr(self.storage, key) - def content_add(self, content: Iterable[Dict]) -> Dict: + def content_add(self, content: Iterable[Content]) -> Dict: """Enqueue contents to write to the storage. Following policies apply: - First, check if the queue's threshold is hit. If it is flush content to the storage. - If not, check if the total size of enqueued contents's threshold is hit. If it is flush content to the storage. """ + content = list(content) s = self.object_add(content, object_type='content') if not s: q = self._objects['content'] - total_size = sum(c['length'] for c in q) + total_size = sum(c.length for c in q) if total_size >= self.min_batch_size['content_bytes']: return self.flush(['content']) return s def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: if object_types is None: object_types = self.object_types summary = {} # type: Dict[str, Dict] for object_type in object_types: q = self._objects[object_type] for objs in grouper(q, n=self.min_batch_size[object_type]): add_fn = getattr(self.storage, '%s_add' % object_type) s = add_fn(objs) summary = {k: v + summary.get(k, 0) for k, v in s.items()} q.clear() return summary - def object_add(self, objects: Iterable[Dict], *, object_type: str) -> Dict: + def object_add( + self, objects: Iterable[BaseModel], *, object_type: str) -> Dict: """Enqueue objects to write to the storage. This checks if the queue's threshold is hit. If it is actually write those to the storage. """ q = self._objects[object_type] threshold = self.min_batch_size[object_type] q.extend(objects) if len(q) >= threshold: return self.flush() return {} diff --git a/swh/storage/filter.py b/swh/storage/filter.py index c890d6a6..68e9a6f1 100644 --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -1,147 +1,151 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, Generator, Iterable, Set +from typing import Dict, Iterable, Set + +from swh.model.model import ( + Content, SkippedContent, Directory, Revision, +) from swh.storage import get_storage class FilteringProxyStorage: """Filtering Storage implementation. This is in charge of transparently filtering out known objects prior to adding them to storage. Sample configuration use case for filtering storage: .. code-block: yaml storage: cls: filter storage: cls: remote url: http://storage.internal.staging.swh.network:5002/ """ def __init__(self, storage): self.storage = get_storage(**storage) self.objects_seen = { 'content': set(), # sha256 'skipped_content': set(), # sha1_git 'directory': set(), # sha1_git 'revision': set(), # sha1_git } def __getattr__(self, key): if key == 'storage': raise AttributeError(key) return getattr(self.storage, key) - def content_add(self, content: Iterable[Dict]) -> Dict: + def content_add(self, content: Iterable[Content]) -> Dict: contents = list(content) contents_to_add = self._filter_missing_contents(contents) return self.storage.content_add( - x for x in contents if x['sha256'] in contents_to_add + x for x in contents if x.sha256 in contents_to_add ) - def skipped_content_add(self, content: Iterable[Dict]) -> Dict: + def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: contents = list(content) contents_to_add = self._filter_missing_skipped_contents(contents) return self.storage.skipped_content_add( x for x in contents - if x.get('sha1_git') is None or x['sha1_git'] in contents_to_add + if x.sha1_git is None or x.sha1_git in contents_to_add ) - def directory_add(self, directories: Iterable[Dict]) -> Dict: + def directory_add(self, directories: Iterable[Directory]) -> Dict: directories = list(directories) missing_ids = self._filter_missing_ids( 'directory', - (d['id'] for d in directories) + (d.id for d in directories) ) return self.storage.directory_add( - d for d in directories if d['id'] in missing_ids + d for d in directories if d.id in missing_ids ) - def revision_add(self, revisions): + def revision_add(self, revisions: Iterable[Revision]) -> Dict: revisions = list(revisions) missing_ids = self._filter_missing_ids( 'revision', - (d['id'] for d in revisions) + (r.id for r in revisions) ) return self.storage.revision_add( - r for r in revisions if r['id'] in missing_ids + r for r in revisions if r.id in missing_ids ) def _filter_missing_contents( - self, content_hashes: Iterable[Dict]) -> Set[bytes]: + self, contents: Iterable[Content]) -> Set[bytes]: """Return only the content keys missing from swh Args: content_hashes: List of sha256 to check for existence in swh storage """ objects_seen = self.objects_seen['content'] - missing_hashes = [] - for hashes in content_hashes: - if hashes['sha256'] in objects_seen: + missing_contents = [] + for content in contents: + if content.sha256 in objects_seen: continue - objects_seen.add(hashes['sha256']) - missing_hashes.append(hashes) + objects_seen.add(content.sha256) + missing_contents.append(content.to_dict()) return set(self.storage.content_missing( - missing_hashes, + missing_contents, key_hash='sha256', )) def _filter_missing_skipped_contents( - self, content_hashes: Iterable[Dict]) -> Set[bytes]: + self, contents: Iterable[SkippedContent]) -> Set[bytes]: """Return only the content keys missing from swh Args: content_hashes: List of sha1_git to check for existence in swh storage """ objects_seen = self.objects_seen['skipped_content'] - missing_hashes = [] - for hashes in content_hashes: - if hashes.get('sha1_git') is None \ - or hashes['sha1_git'] in objects_seen: + missing_contents = [] + for content in contents: + if content.sha1_git is None or content.sha1_git in objects_seen: continue - objects_seen.add(hashes['sha1_git']) - missing_hashes.append(hashes) + objects_seen.add(content.sha1_git) + missing_contents.append(content.to_dict()) - return {c['sha1_git'] - for c in self.storage.skipped_content_missing(missing_hashes)} + return { + c.get('sha1_git') + for c in self.storage.skipped_content_missing(missing_contents)} def _filter_missing_ids( self, object_type: str, - ids: Generator[bytes, None, None]) -> Set[bytes]: + ids: Iterable[bytes]) -> Set[bytes]: """Filter missing ids from the storage for a given object type. Args: object_type: object type to use {revision, directory} ids: Iterable of object_type ids Returns: Missing ids from the storage for object_type """ objects_seen = self.objects_seen[object_type] missing_ids = [] for id in ids: if id in objects_seen: continue objects_seen.add(id) missing_ids.append(id) fn_by_object_type = { 'revision': self.storage.revision_missing, 'directory': self.storage.directory_missing, } fn = fn_by_object_type[object_type] return set(fn(missing_ids)) diff --git a/swh/storage/retry.py b/swh/storage/retry.py index 109e63c3..5da95eb7 100644 --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -1,147 +1,145 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import traceback from datetime import datetime from typing import Dict, Iterable, List, Optional, Union from tenacity import ( retry, stop_after_attempt, wait_random_exponential, ) +from swh.model.model import ( + Content, SkippedContent, Directory, Revision, Release, Snapshot, + Origin, +) + from swh.storage import get_storage from swh.storage.exc import StorageArgumentException logger = logging.getLogger(__name__) def should_retry_adding(retry_state) -> bool: """Retry if the error/exception is (probably) not about a caller error """ if retry_state.outcome.failed: error = retry_state.outcome.exception() if isinstance(error, StorageArgumentException): # Exception is due to an invalid argument return False else: # Other exception module = getattr(error, '__module__', None) if module: error_name = error.__module__ + '.' + error.__class__.__name__ else: error_name = error.__class__.__name__ logger.warning('Retry adding a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exc(), }) return True else: # No exception return False swh_retry = retry(retry=should_retry_adding, wait=wait_random_exponential(multiplier=1, max=10), stop=stop_after_attempt(3)) class RetryingProxyStorage: """Storage implementation which retries adding objects when it specifically fails (hash collision, integrity error). """ def __init__(self, storage): self.storage = get_storage(**storage) def __getattr__(self, key): if key == 'storage': raise AttributeError(key) return getattr(self.storage, key) @swh_retry - def content_add(self, content: Iterable[Dict]) -> Dict: - contents = list(content) - return self.storage.content_add(contents) + def content_add(self, content: Iterable[Content]) -> Dict: + return self.storage.content_add(content) @swh_retry - def content_add_metadata(self, content: Iterable[Dict]) -> Dict: - contents = list(content) - return self.storage.content_add_metadata(contents) + def content_add_metadata(self, content: Iterable[Content]) -> Dict: + return self.storage.content_add_metadata(content) @swh_retry - def skipped_content_add(self, content: Iterable[Dict]) -> Dict: - contents = list(content) - return self.storage.skipped_content_add(contents) + def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: + return self.storage.skipped_content_add(content) @swh_retry - def origin_add_one(self, origin: Dict) -> str: + def origin_add_one(self, origin: Origin) -> str: return self.storage.origin_add_one(origin) @swh_retry def origin_visit_add(self, origin: Dict, date: Union[datetime, str], type: str) -> Dict: return self.storage.origin_visit_add(origin, date, type) @swh_retry def origin_visit_update( self, origin: str, visit_id: int, status: Optional[str] = None, metadata: Optional[Dict] = None, snapshot: Optional[Dict] = None) -> Dict: return self.storage.origin_visit_update( origin, visit_id, status=status, metadata=metadata, snapshot=snapshot) @swh_retry def tool_add(self, tools: Iterable[Dict]) -> List[Dict]: tools = list(tools) return self.storage.tool_add(tools) @swh_retry def metadata_provider_add( self, provider_name: str, provider_type: str, provider_url: str, metadata: Dict) -> Union[str, int]: return self.storage.metadata_provider_add( provider_name, provider_type, provider_url, metadata) @swh_retry def origin_metadata_add( self, origin_url: str, ts: Union[str, datetime], provider_id: int, tool_id: int, metadata: Dict) -> None: return self.storage.origin_metadata_add( origin_url, ts, provider_id, tool_id, metadata) @swh_retry - def directory_add(self, directories: Iterable[Dict]) -> Dict: - directories = list(directories) + def directory_add(self, directories: Iterable[Directory]) -> Dict: return self.storage.directory_add(directories) @swh_retry - def revision_add(self, revisions: Iterable[Dict]) -> Dict: - revisions = list(revisions) + def revision_add(self, revisions: Iterable[Revision]) -> Dict: return self.storage.revision_add(revisions) @swh_retry - def release_add(self, releases: Iterable[Dict]) -> Dict: - releases = list(releases) + def release_add(self, releases: Iterable[Release]) -> Dict: return self.storage.release_add(releases) @swh_retry - def snapshot_add(self, snapshot: Iterable[Dict]) -> Dict: - snapshots = list(snapshot) + def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: return self.storage.snapshot_add(snapshots) @swh_retry def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: """Specific case for buffer proxy storage failing to flush data """ if hasattr(self.storage, 'flush'): return self.storage.flush(object_types) return {} diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py index d7981dbb..41f4bbad 100644 --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -1,285 +1,279 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.storage.buffer import BufferingProxyStorage +from swh.storage import get_storage -storage_config = { - 'cls': 'validate', - 'storage': { - 'cls': 'memory' +def get_storage_with_buffer_config(**buffer_config): + storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'buffer', **buffer_config}, + {'cls': 'memory'}, + ] } -} + + return get_storage(**storage_config) def test_buffering_proxy_storage_content_threshold_not_hit(sample_data): contents = sample_data['content'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'content': 10, } ) s = storage.content_add([contents[0], contents[1]]) assert s == {} # contents have not been written to storage missing_contents = storage.content_missing( [contents[0], contents[1]]) assert set(missing_contents) == set( [contents[0]['sha1'], contents[1]['sha1']]) s = storage.flush() assert s == { 'content:add': 1 + 1, 'content:add:bytes': contents[0]['length'] + contents[1]['length'], } missing_contents = storage.content_missing( [contents[0], contents[1]]) assert list(missing_contents) == [] def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data): contents = sample_data['content'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'content': 1, } ) s = storage.content_add([contents[0]]) assert s == { 'content:add': 1, 'content:add:bytes': contents[0]['length'], } missing_contents = storage.content_missing([contents[0]]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data): contents = sample_data['content'] content_bytes_min_batch_size = 2 - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'content': 10, 'content_bytes': content_bytes_min_batch_size, } ) assert contents[0]['length'] > content_bytes_min_batch_size s = storage.content_add([contents[0]]) assert s == { 'content:add': 1, 'content:add:bytes': contents[0]['length'], } missing_contents = storage.content_missing([contents[0]]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_skipped_content_threshold_not_hit( sample_data): contents = sample_data['skipped_content'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'skipped_content': 10, } ) s = storage.skipped_content_add([contents[0], contents[1]]) assert s == {} # contents have not been written to storage missing_contents = storage.skipped_content_missing( [contents[0], contents[1]]) assert {c['sha1'] for c in missing_contents} \ == {c['sha1'] for c in contents} s = storage.flush() assert s == { 'skipped_content:add': 1 + 1 } missing_contents = storage.skipped_content_missing( [contents[0], contents[1]]) assert list(missing_contents) == [] def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data): contents = sample_data['skipped_content'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'skipped_content': 1, } ) s = storage.skipped_content_add([contents[0]]) assert s == { 'skipped_content:add': 1 } missing_contents = storage.skipped_content_missing([contents[0]]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data): directories = sample_data['directory'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'directory': 10, } ) s = storage.directory_add([directories[0]]) assert s == {} directory_id = directories[0]['id'] missing_directories = storage.directory_missing( [directory_id]) assert list(missing_directories) == [directory_id] s = storage.flush() assert s == { 'directory:add': 1, } missing_directories = storage.directory_missing( [directory_id]) assert list(missing_directories) == [] def test_buffering_proxy_storage_directory_threshold_hit(sample_data): directories = sample_data['directory'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'directory': 1, } ) s = storage.directory_add([directories[0]]) assert s == { 'directory:add': 1, } missing_directories = storage.directory_missing( [directories[0]['id']]) assert list(missing_directories) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data): revisions = sample_data['revision'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'revision': 10, } ) s = storage.revision_add([revisions[0]]) assert s == {} revision_id = revisions[0]['id'] missing_revisions = storage.revision_missing( [revision_id]) assert list(missing_revisions) == [revision_id] s = storage.flush() assert s == { 'revision:add': 1, } missing_revisions = storage.revision_missing( [revision_id]) assert list(missing_revisions) == [] def test_buffering_proxy_storage_revision_threshold_hit(sample_data): revisions = sample_data['revision'] - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'revision': 1, } ) s = storage.revision_add([revisions[0]]) assert s == { 'revision:add': 1, } missing_revisions = storage.revision_missing( [revisions[0]['id']]) assert list(missing_revisions) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_release_threshold_not_hit(sample_data): releases = sample_data['release'] threshold = 10 assert len(releases) < threshold - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'release': threshold, # configuration set } ) s = storage.release_add(releases) assert s == {} release_ids = [r['id'] for r in releases] missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == release_ids s = storage.flush() assert s == { 'release:add': len(releases), } missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == [] def test_buffering_proxy_storage_release_threshold_hit(sample_data): releases = sample_data['release'] threshold = 2 assert len(releases) > threshold - storage = BufferingProxyStorage( - storage=storage_config, + storage = get_storage_with_buffer_config( min_batch_size={ 'release': threshold, # configuration set } ) s = storage.release_add(releases) assert s == { 'release:add': len(releases), } release_ids = [r['id'] for r in releases] missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == [] s = storage.flush() assert s == {} diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py index 8c9080e9..fa992da4 100644 --- a/swh/storage/tests/test_filter.py +++ b/swh/storage/tests/test_filter.py @@ -1,127 +1,129 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.storage.filter import FilteringProxyStorage +from swh.storage import get_storage storage_config = { - 'cls': 'validate', - 'storage': { - 'cls': 'memory' - } + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'filter'}, + {'cls': 'memory'}, + ] } def test_filtering_proxy_storage_content(sample_data): sample_content = sample_data['content'][0] - storage = FilteringProxyStorage(storage=storage_config) + storage = get_storage(**storage_config) content = next(storage.content_get([sample_content['sha1']])) assert not content s = storage.content_add([sample_content]) assert s == { 'content:add': 1, 'content:add:bytes': sample_content['length'], } content = next(storage.content_get([sample_content['sha1']])) assert content is not None s = storage.content_add([sample_content]) assert s == { 'content:add': 0, 'content:add:bytes': 0, } def test_filtering_proxy_storage_skipped_content(sample_data): sample_content = sample_data['skipped_content'][0] - storage = FilteringProxyStorage(storage=storage_config) + storage = get_storage(**storage_config) content = next(storage.skipped_content_missing([sample_content])) assert content['sha1'] == sample_content['sha1'] s = storage.skipped_content_add([sample_content]) assert s == { 'skipped_content:add': 1, } content = list(storage.skipped_content_missing([sample_content])) assert content == [] s = storage.skipped_content_add([sample_content]) assert s == { 'skipped_content:add': 0, } def test_filtering_proxy_storage_skipped_content_missing_sha1_git(sample_data): sample_content = sample_data['skipped_content'][0] sample_content2 = sample_data['skipped_content'][1] - storage = FilteringProxyStorage(storage=storage_config) + storage = get_storage(**storage_config) sample_content['sha1_git'] = sample_content2['sha1_git'] = None content = next(storage.skipped_content_missing([sample_content])) assert content['sha1'] == sample_content['sha1'] s = storage.skipped_content_add([sample_content]) assert s == { 'skipped_content:add': 1, } content = list(storage.skipped_content_missing([sample_content])) assert content == [] s = storage.skipped_content_add([sample_content2]) assert s == { 'skipped_content:add': 1, } content = list(storage.skipped_content_missing([sample_content2])) assert content == [] def test_filtering_proxy_storage_revision(sample_data): sample_revision = sample_data['revision'][0] - storage = FilteringProxyStorage(storage=storage_config) + storage = get_storage(**storage_config) revision = next(storage.revision_get([sample_revision['id']])) assert not revision s = storage.revision_add([sample_revision]) assert s == { 'revision:add': 1, } revision = next(storage.revision_get([sample_revision['id']])) assert revision is not None s = storage.revision_add([sample_revision]) assert s == { 'revision:add': 0, } def test_filtering_proxy_storage_directory(sample_data): sample_directory = sample_data['directory'][0] - storage = FilteringProxyStorage(storage=storage_config) + storage = get_storage(**storage_config) directory = next(storage.directory_missing([sample_directory['id']])) assert directory s = storage.directory_add([sample_directory]) assert s == { 'directory:add': 1, } directory = list(storage.directory_missing([sample_directory['id']])) assert not directory s = storage.directory_add([sample_directory]) assert s == { 'directory:add': 0, } diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py index 1a443039..0551c3ff 100644 --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -1,915 +1,917 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import psycopg2 import pytest from typing import Dict from unittest.mock import call -from swh.storage import HashCollision +from swh.storage import HashCollision, get_storage from swh.storage.exc import StorageArgumentException -from swh.storage.retry import RetryingProxyStorage @pytest.fixture def swh_storage(): - return RetryingProxyStorage(storage={ - 'cls': 'validate', - 'storage': { - 'cls': 'memory' - } - }) + storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'retry'}, + {'cls': 'memory'}, + ] + } + return get_storage(**storage_config) def test_retrying_proxy_storage_content_add(swh_storage, sample_data): """Standard content_add works as before """ sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content s = swh_storage.content_add([sample_content]) assert s == { 'content:add': 1, 'content:add:bytes': sample_content['length'], } content = next(swh_storage.content_get([sample_content['sha1']])) assert content['sha1'] == sample_content['sha1'] def test_retrying_proxy_storage_content_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.content_add') mock_memory.side_effect = [ # first try goes ko HashCollision('content hash collision'), # second try goes ko psycopg2.IntegrityError('content already inserted'), # ok then! {'content:add': 1} ] sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content s = swh_storage.content_add([sample_content]) assert s == {'content:add': 1} assert mock_memory.has_calls([ call([sample_content]), call([sample_content]), call([sample_content]), ]) def test_retrying_proxy_swh_storage_content_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.content_add') mock_memory.side_effect = StorageArgumentException( 'Refuse to add content always!') sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content with pytest.raises(StorageArgumentException, match='Refuse to add'): swh_storage.content_add([sample_content]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_content_add_metadata(swh_storage, sample_data): """Standard content_add_metadata works as before """ sample_content = sample_data['content_metadata'][0] pk = sample_content['sha1'] content_metadata = swh_storage.content_get_metadata([pk]) assert not content_metadata[pk] s = swh_storage.content_add_metadata([sample_content]) assert s == { 'content:add': 1, } content_metadata = swh_storage.content_get_metadata([pk]) assert len(content_metadata[pk]) == 1 assert content_metadata[pk][0]['sha1'] == pk def test_retrying_proxy_storage_content_add_metadata_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.content_add_metadata') mock_memory.side_effect = [ # first try goes ko HashCollision('content_metadata hash collision'), # second try goes ko psycopg2.IntegrityError('content_metadata already inserted'), # ok then! {'content:add': 1} ] sample_content = sample_data['content_metadata'][0] s = swh_storage.content_add_metadata([sample_content]) assert s == {'content:add': 1} assert mock_memory.has_calls([ call([sample_content]), call([sample_content]), call([sample_content]), ]) def test_retrying_proxy_swh_storage_content_add_metadata_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.content_add_metadata') mock_memory.side_effect = StorageArgumentException( 'Refuse to add content_metadata!') sample_content = sample_data['content_metadata'][0] pk = sample_content['sha1'] content_metadata = swh_storage.content_get_metadata([pk]) assert not content_metadata[pk] with pytest.raises(StorageArgumentException, match='Refuse to add'): swh_storage.content_add_metadata([sample_content]) assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_origin_add_one(swh_storage, sample_data): """Standard origin_add_one works as before """ sample_origin = sample_data['origin'][0] origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin['url'] origin = swh_storage.origin_get(sample_origin) assert origin['url'] == sample_origin['url'] def test_retrying_proxy_swh_storage_origin_add_one_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.origin_add_one') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! sample_origin['url'] ] origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin['url'] assert mock_memory.has_calls([ call([sample_origin]), call([sample_origin]), call([sample_origin]), ]) def test_retrying_proxy_swh_storage_origin_add_one_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.origin_add_one') mock_memory.side_effect = StorageArgumentException( 'Refuse to add origin always!') sample_origin = sample_data['origin'][0] origin = swh_storage.origin_get(sample_origin) assert not origin with pytest.raises(StorageArgumentException, match='Refuse to add'): swh_storage.origin_add_one(sample_origin) assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_origin_visit_add(swh_storage, sample_data): """Standard origin_visit_add works as before """ sample_origin = sample_data['origin'][0] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin origin_visit = swh_storage.origin_visit_add( origin_url, '2020-01-01', 'hg') assert origin_visit['origin'] == origin_url assert isinstance(origin_visit['visit'], int) origin_visit = next(swh_storage.origin_visit_get(origin_url)) assert origin_visit['origin'] == origin_url assert isinstance(origin_visit['visit'], int) def test_retrying_proxy_swh_storage_origin_visit_add_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.origin_visit_add') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! {'origin': origin_url, 'visit': 1} ] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin r = swh_storage.origin_visit_add(sample_origin, '2020-01-01', 'git') assert r == {'origin': origin_url, 'visit': 1} assert mock_memory.has_calls([ call(sample_origin, '2020-01-01', 'git'), call(sample_origin, '2020-01-01', 'git'), call(sample_origin, '2020-01-01', 'git') ]) def test_retrying_proxy_swh_storage_origin_visit_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.origin_visit_add') mock_memory.side_effect = StorageArgumentException( 'Refuse to add origin always!') origin_url = sample_data['origin'][0]['url'] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin with pytest.raises(StorageArgumentException, match='Refuse to add'): swh_storage.origin_visit_add(origin_url, '2020-01-31', 'svn') assert mock_memory.has_calls([ call(origin_url, '2020-01-31', 'svn'), ]) def test_retrying_proxy_storage_tool_add(swh_storage, sample_data): """Standard tool_add works as before """ sample_tool = sample_data['tool'][0] tool = swh_storage.tool_get(sample_tool) assert not tool tools = swh_storage.tool_add([sample_tool]) assert tools tool = tools[0] tool.pop('id') assert tool == sample_tool tool = swh_storage.tool_get(sample_tool) tool.pop('id') assert tool == sample_tool def test_retrying_proxy_storage_tool_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_tool = sample_data['tool'][0] mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.tool_add') mock_memory.side_effect = [ # first try goes ko HashCollision('tool hash collision'), # second try goes ko psycopg2.IntegrityError('tool already inserted'), # ok then! [sample_tool] ] tool = swh_storage.tool_get(sample_tool) assert not tool tools = swh_storage.tool_add([sample_tool]) assert tools == [sample_tool] assert mock_memory.has_calls([ call([sample_tool]), call([sample_tool]), call([sample_tool]), ]) def test_retrying_proxy_swh_storage_tool_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.tool_add') mock_memory.side_effect = StorageArgumentException( 'Refuse to add tool always!') sample_tool = sample_data['tool'][0] tool = swh_storage.tool_get(sample_tool) assert not tool with pytest.raises(StorageArgumentException, match='Refuse to add'): swh_storage.tool_add([sample_tool]) assert mock_memory.call_count == 1 def to_provider(provider: Dict) -> Dict: return { 'provider_name': provider['name'], 'provider_url': provider['url'], 'provider_type': provider['type'], 'metadata': provider['metadata'], } def test_retrying_proxy_storage_metadata_provider_add( swh_storage, sample_data): """Standard metadata_provider_add works as before """ provider = sample_data['provider'][0] provider_get = to_provider(provider) provider = swh_storage.metadata_provider_get_by(provider_get) assert not provider provider_id = swh_storage.metadata_provider_add(**provider_get) assert provider_id actual_provider = swh_storage.metadata_provider_get(provider_id) assert actual_provider actual_provider_id = actual_provider.pop('id') assert actual_provider_id == provider_id assert actual_provider == provider_get def test_retrying_proxy_storage_metadata_provider_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ provider = sample_data['provider'][0] provider_get = to_provider(provider) mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.metadata_provider_add') mock_memory.side_effect = [ # first try goes ko HashCollision('provider_id hash collision'), # second try goes ko psycopg2.IntegrityError('provider_id already inserted'), # ok then! 'provider_id', ] provider = swh_storage.metadata_provider_get_by(provider_get) assert not provider provider_id = swh_storage.metadata_provider_add(**provider_get) assert provider_id == 'provider_id' assert mock_memory.has_calls([ call(**provider_get), call(**provider_get), call(**provider_get), ]) def test_retrying_proxy_swh_storage_metadata_provider_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.metadata_provider_add') mock_memory.side_effect = StorageArgumentException( 'Refuse to add provider_id always!') provider = sample_data['provider'][0] provider_get = to_provider(provider) provider_id = swh_storage.metadata_provider_get_by(provider_get) assert not provider_id with pytest.raises(StorageArgumentException, match='Refuse to add'): swh_storage.metadata_provider_add(**provider_get) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_origin_metadata_add( swh_storage, sample_data): """Standard origin_metadata_add works as before """ ori_meta = sample_data['origin_metadata'][0] origin = ori_meta['origin'] swh_storage.origin_add_one(origin) provider_get = to_provider(ori_meta['provider']) provider_id = swh_storage.metadata_provider_add(**provider_get) origin_metadata = swh_storage.origin_metadata_get_by(origin['url']) assert not origin_metadata swh_storage.origin_metadata_add( origin['url'], ori_meta['discovery_date'], provider_id, ori_meta['tool'], ori_meta['metadata']) origin_metadata = swh_storage.origin_metadata_get_by( origin['url']) assert origin_metadata def test_retrying_proxy_storage_origin_metadata_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ ori_meta = sample_data['origin_metadata'][0] origin = ori_meta['origin'] swh_storage.origin_add_one(origin) provider_get = to_provider(ori_meta['provider']) provider_id = swh_storage.metadata_provider_add(**provider_get) mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.origin_metadata_add') mock_memory.side_effect = [ # first try goes ko HashCollision('provider_id hash collision'), # second try goes ko psycopg2.IntegrityError('provider_id already inserted'), # ok then! None ] url = origin['url'] ts = ori_meta['discovery_date'] tool_id = ori_meta['tool'] metadata = ori_meta['metadata'] # No exception raised as insertion finally came through swh_storage.origin_metadata_add(url, ts, provider_id, tool_id, metadata) mock_memory.assert_has_calls([ # 3 calls, as long as error raised call(url, ts, provider_id, tool_id, metadata), call(url, ts, provider_id, tool_id, metadata), call(url, ts, provider_id, tool_id, metadata) ]) def test_retrying_proxy_swh_storage_origin_metadata_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.origin_metadata_add') mock_memory.side_effect = StorageArgumentException( 'Refuse to add always!') ori_meta = sample_data['origin_metadata'][0] origin = ori_meta['origin'] swh_storage.origin_add_one(origin) url = origin['url'] ts = ori_meta['discovery_date'] provider_id = 'provider_id' tool_id = ori_meta['tool'] metadata = ori_meta['metadata'] with pytest.raises(StorageArgumentException, match='Refuse to add'): swh_storage.origin_metadata_add(url, ts, provider_id, tool_id, metadata) assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_origin_visit_update( swh_storage, sample_data): """Standard origin_visit_update works as before """ sample_origin = sample_data['origin'][0] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] origin_visit = swh_storage.origin_visit_add( origin_url, '2020-01-01', 'hg') ov = next(swh_storage.origin_visit_get(origin_url)) assert ov['origin'] == origin_url assert ov['visit'] == origin_visit['visit'] assert ov['status'] == 'ongoing' assert ov['snapshot'] is None assert ov['metadata'] is None swh_storage.origin_visit_update(origin_url, ov['visit'], status='full') ov = next(swh_storage.origin_visit_get(origin_url)) assert ov['origin'] == origin_url assert ov['visit'] == origin_visit['visit'] assert ov['status'] == 'full' assert ov['snapshot'] is None assert ov['metadata'] is None def test_retrying_proxy_swh_storage_origin_visit_update_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] origin_url = sample_origin['url'] mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.origin_visit_update') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! {'origin': origin_url, 'visit': 1} ] visit_id = 1 swh_storage.origin_visit_update(origin_url, visit_id, status='full') assert mock_memory.has_calls([ call(origin_url, visit_id, status='full'), call(origin_url, visit_id, status='full'), call(origin_url, visit_id, status='full'), ]) def test_retrying_proxy_swh_storage_origin_visit_update_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.origin_visit_update') mock_memory.side_effect = StorageArgumentException( 'Refuse to add origin always!') origin_url = sample_data['origin'][0]['url'] visit_id = 9 with pytest.raises(StorageArgumentException, match='Refuse to add'): swh_storage.origin_visit_update(origin_url, visit_id, 'partial') assert mock_memory.call_count == 1 def test_retrying_proxy_storage_directory_add(swh_storage, sample_data): """Standard directory_add works as before """ sample_dir = sample_data['directory'][0] directory = swh_storage.directory_get_random() # no directory assert not directory s = swh_storage.directory_add([sample_dir]) assert s == { 'directory:add': 1, } directory_id = swh_storage.directory_get_random() # only 1 assert directory_id == sample_dir['id'] def test_retrying_proxy_storage_directory_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.directory_add') mock_memory.side_effect = [ # first try goes ko HashCollision('directory hash collision'), # second try goes ko psycopg2.IntegrityError('directory already inserted'), # ok then! {'directory:add': 1} ] sample_dir = sample_data['directory'][1] directory_id = swh_storage.directory_get_random() # no directory assert not directory_id s = swh_storage.directory_add([sample_dir]) assert s == { 'directory:add': 1, } assert mock_memory.has_calls([ call([sample_dir]), call([sample_dir]), call([sample_dir]), ]) def test_retrying_proxy_swh_storage_directory_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.directory_add') mock_memory.side_effect = StorageArgumentException( 'Refuse to add directory always!') sample_dir = sample_data['directory'][0] directory_id = swh_storage.directory_get_random() # no directory assert not directory_id with pytest.raises(StorageArgumentException, match='Refuse to add'): swh_storage.directory_add([sample_dir]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_revision_add(swh_storage, sample_data): """Standard revision_add works as before """ sample_rev = sample_data['revision'][0] revision = next(swh_storage.revision_get([sample_rev['id']])) assert not revision s = swh_storage.revision_add([sample_rev]) assert s == { 'revision:add': 1, } revision = next(swh_storage.revision_get([sample_rev['id']])) assert revision['id'] == sample_rev['id'] def test_retrying_proxy_storage_revision_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.revision_add') mock_memory.side_effect = [ # first try goes ko HashCollision('revision hash collision'), # second try goes ko psycopg2.IntegrityError('revision already inserted'), # ok then! {'revision:add': 1} ] sample_rev = sample_data['revision'][0] revision = next(swh_storage.revision_get([sample_rev['id']])) assert not revision s = swh_storage.revision_add([sample_rev]) assert s == { 'revision:add': 1, } assert mock_memory.has_calls([ call([sample_rev]), call([sample_rev]), call([sample_rev]), ]) def test_retrying_proxy_swh_storage_revision_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.revision_add') mock_memory.side_effect = StorageArgumentException( 'Refuse to add revision always!') sample_rev = sample_data['revision'][0] revision = next(swh_storage.revision_get([sample_rev['id']])) assert not revision with pytest.raises(StorageArgumentException, match='Refuse to add'): swh_storage.revision_add([sample_rev]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_release_add(swh_storage, sample_data): """Standard release_add works as before """ sample_rel = sample_data['release'][0] release = next(swh_storage.release_get([sample_rel['id']])) assert not release s = swh_storage.release_add([sample_rel]) assert s == { 'release:add': 1, } release = next(swh_storage.release_get([sample_rel['id']])) assert release['id'] == sample_rel['id'] def test_retrying_proxy_storage_release_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.release_add') mock_memory.side_effect = [ # first try goes ko HashCollision('release hash collision'), # second try goes ko psycopg2.IntegrityError('release already inserted'), # ok then! {'release:add': 1} ] sample_rel = sample_data['release'][0] release = next(swh_storage.release_get([sample_rel['id']])) assert not release s = swh_storage.release_add([sample_rel]) assert s == { 'release:add': 1, } assert mock_memory.has_calls([ call([sample_rel]), call([sample_rel]), call([sample_rel]), ]) def test_retrying_proxy_swh_storage_release_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.release_add') mock_memory.side_effect = StorageArgumentException( 'Refuse to add release always!') sample_rel = sample_data['release'][0] release = next(swh_storage.release_get([sample_rel['id']])) assert not release with pytest.raises(StorageArgumentException, match='Refuse to add'): swh_storage.release_add([sample_rel]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_snapshot_add(swh_storage, sample_data): """Standard snapshot_add works as before """ sample_snap = sample_data['snapshot'][0] snapshot = swh_storage.snapshot_get(sample_snap['id']) assert not snapshot s = swh_storage.snapshot_add([sample_snap]) assert s == { 'snapshot:add': 1, } snapshot = swh_storage.snapshot_get(sample_snap['id']) assert snapshot['id'] == sample_snap['id'] def test_retrying_proxy_storage_snapshot_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.snapshot_add') mock_memory.side_effect = [ # first try goes ko HashCollision('snapshot hash collision'), # second try goes ko psycopg2.IntegrityError('snapshot already inserted'), # ok then! {'snapshot:add': 1} ] sample_snap = sample_data['snapshot'][0] snapshot = swh_storage.snapshot_get(sample_snap['id']) assert not snapshot s = swh_storage.snapshot_add([sample_snap]) assert s == { 'snapshot:add': 1, } assert mock_memory.has_calls([ call([sample_snap]), call([sample_snap]), call([sample_snap]), ]) def test_retrying_proxy_swh_storage_snapshot_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.InMemoryStorage.snapshot_add') mock_memory.side_effect = StorageArgumentException( 'Refuse to add snapshot always!') sample_snap = sample_data['snapshot'][0] snapshot = swh_storage.snapshot_get(sample_snap['id']) assert not snapshot with pytest.raises(StorageArgumentException, match='Refuse to add'): swh_storage.snapshot_add([sample_snap]) assert mock_memory.call_count == 1