diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py index 7c73d3a9..829f7546 100644 --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -1,112 +1,114 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import deque from functools import partial from typing import Optional, Iterable, Dict from swh.core.utils import grouper from swh.storage import get_storage class BufferingProxyStorage: """Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. Sample configuration use case for buffering storage: .. code-block:: yaml storage: cls: buffer args: storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ min_batch_size: content: 10000 content_bytes: 100000000 skipped_content: 10000 directory: 5000 revision: 1000 release: 10000 """ def __init__(self, storage, min_batch_size=None): self.storage = get_storage(**storage) if min_batch_size is None: min_batch_size = {} self.min_batch_size = { 'content': min_batch_size.get('content', 10000), 'content_bytes': min_batch_size.get('content_bytes', 100*1024*1024), 'skipped_content': min_batch_size.get('skipped_content', 10000), 'directory': min_batch_size.get('directory', 25000), 'revision': min_batch_size.get('revision', 100000), 'release': min_batch_size.get('release', 100000), } self.object_types = [ 'content', 'skipped_content', 'directory', 'revision', 'release'] self._objects = {k: deque() for k in self.object_types} def __getattr__(self, key): if key.endswith('_add'): object_type = key.rsplit('_', 1)[0] if object_type in self.object_types: return partial( self.object_add, object_type=object_type ) + if key == 'storage': + raise AttributeError(key) return getattr(self.storage, key) def content_add(self, content: Iterable[Dict]) -> Dict: """Enqueue contents to write to the storage. Following policies apply: - First, check if the queue's threshold is hit. If it is flush content to the storage. - If not, check if the total size of enqueued contents's threshold is hit. If it is flush content to the storage. """ s = self.object_add(content, object_type='content') if not s: q = self._objects['content'] total_size = sum(c['length'] for c in q) if total_size >= self.min_batch_size['content_bytes']: return self.flush(['content']) return s def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: if object_types is None: object_types = self.object_types summary = {} # type: Dict[str, Dict] for object_type in object_types: q = self._objects[object_type] for objs in grouper(q, n=self.min_batch_size[object_type]): add_fn = getattr(self.storage, '%s_add' % object_type) s = add_fn(objs) summary = {k: v + summary.get(k, 0) for k, v in s.items()} q.clear() return summary def object_add(self, objects: Iterable[Dict], *, object_type: str) -> Dict: """Enqueue objects to write to the storage. This checks if the queue's threshold is hit. If it is actually write those to the storage. """ q = self._objects[object_type] threshold = self.min_batch_size[object_type] q.extend(objects) if len(q) >= threshold: return self.flush() return {} diff --git a/swh/storage/filter.py b/swh/storage/filter.py index 15893103..7fa1bb99 100644 --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -1,143 +1,145 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, Generator, Iterable, Set from swh.storage import get_storage class FilteringProxyStorage: """Filtering Storage implementation. This is in charge of transparently filtering out known objects prior to adding them to storage. Sample configuration use case for filtering storage: .. code-block: yaml storage: cls: filter storage: cls: remote url: http://storage.internal.staging.swh.network:5002/ """ def __init__(self, storage): self.storage = get_storage(**storage) self.objects_seen = { 'content': set(), # sha256 'skipped_content': set(), # sha1_git 'directory': set(), # sha1_git 'revision': set(), # sha1_git } def __getattr__(self, key): + if key == 'storage': + raise AttributeError(key) return getattr(self.storage, key) def content_add(self, content: Iterable[Dict]) -> Dict: contents = list(content) contents_to_add = self._filter_missing_contents(contents) return self.storage.content_add( x for x in contents if x['sha256'] in contents_to_add ) def skipped_content_add(self, content: Iterable[Dict]) -> Dict: contents = list(content) contents_to_add = self._filter_missing_skipped_contents(contents) return self.storage.skipped_content_add( x for x in contents if x['sha1_git'] in contents_to_add ) def directory_add(self, directories: Iterable[Dict]) -> Dict: directories = list(directories) missing_ids = self._filter_missing_ids( 'directory', (d['id'] for d in directories) ) return self.storage.directory_add( d for d in directories if d['id'] in missing_ids ) def revision_add(self, revisions): revisions = list(revisions) missing_ids = self._filter_missing_ids( 'revision', (d['id'] for d in revisions) ) return self.storage.revision_add( r for r in revisions if r['id'] in missing_ids ) def _filter_missing_contents( self, content_hashes: Iterable[Dict]) -> Set[bytes]: """Return only the content keys missing from swh Args: content_hashes: List of sha256 to check for existence in swh storage """ objects_seen = self.objects_seen['content'] missing_hashes = [] for hashes in content_hashes: if hashes['sha256'] in objects_seen: continue objects_seen.add(hashes['sha256']) missing_hashes.append(hashes) return set(self.storage.content_missing( missing_hashes, key_hash='sha256', )) def _filter_missing_skipped_contents( self, content_hashes: Iterable[Dict]) -> Set[bytes]: """Return only the content keys missing from swh Args: content_hashes: List of sha1_git to check for existence in swh storage """ objects_seen = self.objects_seen['skipped_content'] missing_hashes = [] for hashes in content_hashes: if hashes['sha1_git'] in objects_seen: continue objects_seen.add(hashes['sha1_git']) missing_hashes.append(hashes) return {c['sha1_git'] for c in self.storage.skipped_content_missing(missing_hashes)} def _filter_missing_ids( self, object_type: str, ids: Generator[bytes, None, None]) -> Set[bytes]: """Filter missing ids from the storage for a given object type. Args: object_type: object type to use {revision, directory} ids: Iterable of object_type ids Returns: Missing ids from the storage for object_type """ objects_seen = self.objects_seen[object_type] missing_ids = [] for id in ids: if id in objects_seen: continue objects_seen.add(id) missing_ids.append(id) fn_by_object_type = { 'revision': self.storage.revision_missing, 'directory': self.storage.directory_missing, } fn = fn_by_object_type[object_type] return set(fn(missing_ids)) diff --git a/swh/storage/objstorage.py b/swh/storage/objstorage.py index 5f30bce1..67764146 100644 --- a/swh/storage/objstorage.py +++ b/swh/storage/objstorage.py @@ -1,63 +1,65 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, Generator, Iterable from swh.model.model import Content from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from .exc import StorageArgumentException class ObjStorage: """Objstorage collaborator in charge of adding objects to the objstorage. """ def __init__(self, objstorage_config: Dict): self.objstorage = get_objstorage(**objstorage_config) def __getattr__(self, key): + if key == 'objstorage': + raise AttributeError(key) return getattr(self.objstorage, key) def content_get(self, contents: Iterable[bytes]) -> Generator: """Retrieve content data from the objstorage Args: contents: List of contents to retrieve data from """ for obj_id in contents: try: data = self.objstorage.get(obj_id) except ObjNotFoundError: yield None continue yield {'sha1': obj_id, 'data': data} def content_add(self, contents: Iterable[Content]) -> Dict: """Add contents to the objstorage. Args: contents: List of contents to add1 Returns: The summary dict of content and content bytes added to the objstorage. """ contents = list(contents) if any(cont.data is None for cont in contents): raise StorageArgumentException('Missing data') summary = self.objstorage.add_batch({ cont.sha1: cont.data for cont in contents }) return { 'content:add': summary['object:add'], 'content:add:bytes': summary['object:add:bytes'] } diff --git a/swh/storage/retry.py b/swh/storage/retry.py index bf7b0acd..109e63c3 100644 --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -1,145 +1,147 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import traceback from datetime import datetime from typing import Dict, Iterable, List, Optional, Union from tenacity import ( retry, stop_after_attempt, wait_random_exponential, ) from swh.storage import get_storage from swh.storage.exc import StorageArgumentException logger = logging.getLogger(__name__) def should_retry_adding(retry_state) -> bool: """Retry if the error/exception is (probably) not about a caller error """ if retry_state.outcome.failed: error = retry_state.outcome.exception() if isinstance(error, StorageArgumentException): # Exception is due to an invalid argument return False else: # Other exception module = getattr(error, '__module__', None) if module: error_name = error.__module__ + '.' + error.__class__.__name__ else: error_name = error.__class__.__name__ logger.warning('Retry adding a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exc(), }) return True else: # No exception return False swh_retry = retry(retry=should_retry_adding, wait=wait_random_exponential(multiplier=1, max=10), stop=stop_after_attempt(3)) class RetryingProxyStorage: """Storage implementation which retries adding objects when it specifically fails (hash collision, integrity error). """ def __init__(self, storage): self.storage = get_storage(**storage) def __getattr__(self, key): + if key == 'storage': + raise AttributeError(key) return getattr(self.storage, key) @swh_retry def content_add(self, content: Iterable[Dict]) -> Dict: contents = list(content) return self.storage.content_add(contents) @swh_retry def content_add_metadata(self, content: Iterable[Dict]) -> Dict: contents = list(content) return self.storage.content_add_metadata(contents) @swh_retry def skipped_content_add(self, content: Iterable[Dict]) -> Dict: contents = list(content) return self.storage.skipped_content_add(contents) @swh_retry def origin_add_one(self, origin: Dict) -> str: return self.storage.origin_add_one(origin) @swh_retry def origin_visit_add(self, origin: Dict, date: Union[datetime, str], type: str) -> Dict: return self.storage.origin_visit_add(origin, date, type) @swh_retry def origin_visit_update( self, origin: str, visit_id: int, status: Optional[str] = None, metadata: Optional[Dict] = None, snapshot: Optional[Dict] = None) -> Dict: return self.storage.origin_visit_update( origin, visit_id, status=status, metadata=metadata, snapshot=snapshot) @swh_retry def tool_add(self, tools: Iterable[Dict]) -> List[Dict]: tools = list(tools) return self.storage.tool_add(tools) @swh_retry def metadata_provider_add( self, provider_name: str, provider_type: str, provider_url: str, metadata: Dict) -> Union[str, int]: return self.storage.metadata_provider_add( provider_name, provider_type, provider_url, metadata) @swh_retry def origin_metadata_add( self, origin_url: str, ts: Union[str, datetime], provider_id: int, tool_id: int, metadata: Dict) -> None: return self.storage.origin_metadata_add( origin_url, ts, provider_id, tool_id, metadata) @swh_retry def directory_add(self, directories: Iterable[Dict]) -> Dict: directories = list(directories) return self.storage.directory_add(directories) @swh_retry def revision_add(self, revisions: Iterable[Dict]) -> Dict: revisions = list(revisions) return self.storage.revision_add(revisions) @swh_retry def release_add(self, releases: Iterable[Dict]) -> Dict: releases = list(releases) return self.storage.release_add(releases) @swh_retry def snapshot_add(self, snapshot: Iterable[Dict]) -> Dict: snapshots = list(snapshot) return self.storage.snapshot_add(snapshots) @swh_retry def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: """Specific case for buffer proxy storage failing to flush data """ if hasattr(self.storage, 'flush'): return self.storage.flush(object_types) return {} diff --git a/swh/storage/validate.py b/swh/storage/validate.py index 654cf06f..975a3040 100644 --- a/swh/storage/validate.py +++ b/swh/storage/validate.py @@ -1,105 +1,107 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime from typing import Dict, Iterable, List, Union from swh.model.model import ( SkippedContent, Content, Directory, Revision, Release, Snapshot, OriginVisit, Origin ) from . import get_storage from .exc import StorageArgumentException VALIDATION_EXCEPTIONS = ( KeyError, TypeError, ValueError, ) @contextlib.contextmanager def convert_validation_exceptions(): """Catches validation errors arguments, and re-raises a StorageArgumentException.""" try: yield except VALIDATION_EXCEPTIONS as e: raise StorageArgumentException(*e.args) def now(): return datetime.datetime.now(tz=datetime.timezone.utc) class ValidatingProxyStorage: """Storage implementation converts dictionaries to swh-model objects before calling its backend, and back to dicts before returning results """ def __init__(self, storage): self.storage = get_storage(**storage) def __getattr__(self, key): + if key == 'storage': + raise AttributeError(key) return getattr(self.storage, key) def content_add(self, content: Iterable[Dict]) -> Dict: with convert_validation_exceptions(): contents = [Content.from_dict({**c, 'ctime': now()}) for c in content] return self.storage.content_add(contents) def content_add_metadata(self, content: Iterable[Dict]) -> Dict: with convert_validation_exceptions(): contents = [Content.from_dict(c) for c in content] return self.storage.content_add_metadata(contents) def skipped_content_add(self, content: Iterable[Dict]) -> Dict: with convert_validation_exceptions(): contents = [SkippedContent.from_dict({**c, 'ctime': now()}) for c in content] return self.storage.skipped_content_add(contents) def directory_add(self, directories: Iterable[Dict]) -> Dict: with convert_validation_exceptions(): directories = [Directory.from_dict(d) for d in directories] return self.storage.directory_add(directories) def revision_add(self, revisions: Iterable[Dict]) -> Dict: with convert_validation_exceptions(): revisions = [Revision.from_dict(r) for r in revisions] return self.storage.revision_add(revisions) def release_add(self, releases: Iterable[Dict]) -> Dict: with convert_validation_exceptions(): releases = [Release.from_dict(r) for r in releases] return self.storage.release_add(releases) def snapshot_add(self, snapshots: Iterable[Dict]) -> Dict: with convert_validation_exceptions(): snapshots = [Snapshot.from_dict(s) for s in snapshots] return self.storage.snapshot_add(snapshots) def origin_visit_add( self, origin, date, type) -> Dict[str, Union[str, int]]: with convert_validation_exceptions(): visit = OriginVisit(origin=origin, date=date, type=type, status='ongoing', snapshot=None) return self.storage.origin_visit_add( visit.origin, visit.date, visit.type) def origin_add(self, origins: Iterable[Dict]) -> List[Dict]: with convert_validation_exceptions(): origins = [Origin.from_dict(o) for o in origins] return self.storage.origin_add(origins) def origin_add_one(self, origin: Dict) -> int: with convert_validation_exceptions(): origin = Origin.from_dict(origin) return self.storage.origin_add_one(origin)