Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/retry.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
import traceback | import traceback | ||||
from typing import Dict, Iterable, Optional | from typing import Dict, Iterable, List, Optional | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, | retry, | ||||
stop_after_attempt, | stop_after_attempt, | ||||
wait_random_exponential, | wait_random_exponential, | ||||
) | ) | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
SkippedContent, | SkippedContent, | ||||
Directory, | Directory, | ||||
Revision, | Revision, | ||||
Release, | Release, | ||||
Snapshot, | Snapshot, | ||||
OriginVisit, | OriginVisit, | ||||
MetadataAuthority, | MetadataAuthority, | ||||
MetadataFetcher, | MetadataFetcher, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
) | ) | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.exc import StorageArgumentException | from swh.storage.exc import StorageArgumentException | ||||
from swh.storage.interface import StorageInterface | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def should_retry_adding(retry_state) -> bool: | def should_retry_adding(retry_state) -> bool: | ||||
"""Retry if the error/exception is (probably) not about a caller error | """Retry if the error/exception is (probably) not about a caller error | ||||
Show All 40 Lines | |||||
class RetryingProxyStorage: | class RetryingProxyStorage: | ||||
"""Storage implementation which retries adding objects when it specifically | """Storage implementation which retries adding objects when it specifically | ||||
fails (hash collision, integrity error). | fails (hash collision, integrity error). | ||||
""" | """ | ||||
def __init__(self, storage): | def __init__(self, storage): | ||||
self.storage = get_storage(**storage) | self.storage: StorageInterface = get_storage(**storage) | ||||
def __getattr__(self, key): | def __getattr__(self, key): | ||||
if key == "storage": | if key == "storage": | ||||
raise AttributeError(key) | raise AttributeError(key) | ||||
return getattr(self.storage, key) | return getattr(self.storage, key) | ||||
@swh_retry | @swh_retry | ||||
def content_add(self, content: Iterable[Content]) -> Dict: | def content_add(self, content: List[Content]) -> Dict: | ||||
return self.storage.content_add(content) | return self.storage.content_add(content) | ||||
@swh_retry | @swh_retry | ||||
def content_add_metadata(self, content: Iterable[Content]) -> Dict: | def content_add_metadata(self, content: List[Content]) -> Dict: | ||||
return self.storage.content_add_metadata(content) | return self.storage.content_add_metadata(content) | ||||
@swh_retry | @swh_retry | ||||
def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: | def skipped_content_add(self, content: List[SkippedContent]) -> Dict: | ||||
return self.storage.skipped_content_add(content) | return self.storage.skipped_content_add(content) | ||||
@swh_retry | @swh_retry | ||||
def origin_visit_add(self, visits: Iterable[OriginVisit]) -> Iterable[OriginVisit]: | def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]: | ||||
return self.storage.origin_visit_add(visits) | return self.storage.origin_visit_add(visits) | ||||
@swh_retry | @swh_retry | ||||
def metadata_fetcher_add(self, fetchers: Iterable[MetadataFetcher],) -> None: | def metadata_fetcher_add(self, fetchers: List[MetadataFetcher],) -> None: | ||||
return self.storage.metadata_fetcher_add(fetchers) | return self.storage.metadata_fetcher_add(fetchers) | ||||
@swh_retry | @swh_retry | ||||
def metadata_authority_add(self, authorities: Iterable[MetadataAuthority]) -> None: | def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None: | ||||
return self.storage.metadata_authority_add(authorities) | return self.storage.metadata_authority_add(authorities) | ||||
@swh_retry | @swh_retry | ||||
def raw_extrinsic_metadata_add( | def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata],) -> None: | ||||
self, metadata: Iterable[RawExtrinsicMetadata], | |||||
) -> None: | |||||
return self.storage.raw_extrinsic_metadata_add(metadata) | return self.storage.raw_extrinsic_metadata_add(metadata) | ||||
@swh_retry | @swh_retry | ||||
def directory_add(self, directories: Iterable[Directory]) -> Dict: | def directory_add(self, directories: List[Directory]) -> Dict: | ||||
return self.storage.directory_add(directories) | return self.storage.directory_add(directories) | ||||
@swh_retry | @swh_retry | ||||
def revision_add(self, revisions: Iterable[Revision]) -> Dict: | def revision_add(self, revisions: List[Revision]) -> Dict: | ||||
return self.storage.revision_add(revisions) | return self.storage.revision_add(revisions) | ||||
@swh_retry | @swh_retry | ||||
def release_add(self, releases: Iterable[Release]) -> Dict: | def release_add(self, releases: List[Release]) -> Dict: | ||||
return self.storage.release_add(releases) | return self.storage.release_add(releases) | ||||
@swh_retry | @swh_retry | ||||
def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: | def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: | ||||
return self.storage.snapshot_add(snapshots) | return self.storage.snapshot_add(snapshots) | ||||
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: | ||||
return self.storage.clear_buffers(object_types) | return self.storage.clear_buffers(object_types) | ||||
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | def flush(self, object_types: Optional[List[str]] = None) -> Dict: | ||||
"""Specific case for buffer proxy storage failing to flush data | """Specific case for buffer proxy storage failing to flush data | ||||
""" | """ | ||||
return self.storage.flush(object_types) | return self.storage.flush(object_types) |