Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/retry.py
Show First 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | class RetryingProxyStorage: | ||||
""" | """ | ||||
def __init__(self, storage): | def __init__(self, storage): | ||||
self.storage = get_storage(**storage) | self.storage = get_storage(**storage) | ||||
def __getattr__(self, key): | def __getattr__(self, key): | ||||
return getattr(self.storage, key) | return getattr(self.storage, key) | ||||
@swh_retry | @swh_retry | ||||
def content_add(self, content: List[Dict]) -> Dict: | def content_add(self, content: Iterable[Dict]) -> Dict: | ||||
contents = list(content) | contents = list(content) | ||||
return self.storage.content_add(contents) | return self.storage.content_add(contents) | ||||
@swh_retry | @swh_retry | ||||
def content_add_metadata(self, content: List[Dict]) -> Dict: | def content_add_metadata(self, content: Iterable[Dict]) -> Dict: | ||||
contents = list(content) | contents = list(content) | ||||
return self.storage.content_add_metadata(contents) | return self.storage.content_add_metadata(contents) | ||||
@swh_retry | @swh_retry | ||||
def skipped_content_add(self, content: Iterable[Dict]) -> Dict: | |||||
contents = list(content) | |||||
return self.storage.skipped_content_add(contents) | |||||
@swh_retry | |||||
def origin_add_one(self, origin: Dict) -> str: | def origin_add_one(self, origin: Dict) -> str: | ||||
return self.storage.origin_add_one(origin) | return self.storage.origin_add_one(origin) | ||||
@swh_retry | @swh_retry | ||||
def origin_visit_add(self, origin: Dict, | def origin_visit_add(self, origin: Dict, | ||||
date: Union[datetime, str], type: str) -> Dict: | date: Union[datetime, str], type: str) -> Dict: | ||||
return self.storage.origin_visit_add(origin, date, type) | return self.storage.origin_visit_add(origin, date, type) | ||||
@swh_retry | @swh_retry | ||||
def origin_visit_update( | def origin_visit_update( | ||||
self, origin: str, visit_id: int, status: Optional[str] = None, | self, origin: str, visit_id: int, status: Optional[str] = None, | ||||
metadata: Optional[Dict] = None, | metadata: Optional[Dict] = None, | ||||
snapshot: Optional[Dict] = None) -> Dict: | snapshot: Optional[Dict] = None) -> Dict: | ||||
return self.storage.origin_visit_update( | return self.storage.origin_visit_update( | ||||
origin, visit_id, status=status, | origin, visit_id, status=status, | ||||
metadata=metadata, snapshot=snapshot) | metadata=metadata, snapshot=snapshot) | ||||
@swh_retry | @swh_retry | ||||
def tool_add(self, tools: List[Dict]) -> List[Dict]: | def tool_add(self, tools: Iterable[Dict]) -> List[Dict]: | ||||
tools = list(tools) | tools = list(tools) | ||||
return self.storage.tool_add(tools) | return self.storage.tool_add(tools) | ||||
@swh_retry | @swh_retry | ||||
def metadata_provider_add( | def metadata_provider_add( | ||||
self, provider_name: str, provider_type: str, provider_url: str, | self, provider_name: str, provider_type: str, provider_url: str, | ||||
metadata: Dict) -> Union[str, int]: | metadata: Dict) -> Union[str, int]: | ||||
return self.storage.metadata_provider_add( | return self.storage.metadata_provider_add( | ||||
provider_name, provider_type, provider_url, metadata) | provider_name, provider_type, provider_url, metadata) | ||||
@swh_retry | @swh_retry | ||||
def origin_metadata_add( | def origin_metadata_add( | ||||
self, origin_url: str, ts: Union[str, datetime], | self, origin_url: str, ts: Union[str, datetime], | ||||
provider_id: int, tool_id: int, metadata: Dict) -> None: | provider_id: int, tool_id: int, metadata: Dict) -> None: | ||||
return self.storage.origin_metadata_add( | return self.storage.origin_metadata_add( | ||||
origin_url, ts, provider_id, tool_id, metadata) | origin_url, ts, provider_id, tool_id, metadata) | ||||
@swh_retry | @swh_retry | ||||
def directory_add(self, directories: List[Dict]) -> Dict: | def directory_add(self, directories: Iterable[Dict]) -> Dict: | ||||
directories = list(directories) | directories = list(directories) | ||||
return self.storage.directory_add(directories) | return self.storage.directory_add(directories) | ||||
@swh_retry | @swh_retry | ||||
def revision_add(self, revisions: List[Dict]) -> Dict: | def revision_add(self, revisions: Iterable[Dict]) -> Dict: | ||||
revisions = list(revisions) | revisions = list(revisions) | ||||
return self.storage.revision_add(revisions) | return self.storage.revision_add(revisions) | ||||
@swh_retry | @swh_retry | ||||
def release_add(self, releases: List[Dict]) -> Dict: | def release_add(self, releases: Iterable[Dict]) -> Dict: | ||||
releases = list(releases) | releases = list(releases) | ||||
return self.storage.release_add(releases) | return self.storage.release_add(releases) | ||||
@swh_retry | @swh_retry | ||||
def snapshot_add(self, snapshot: List[Dict]) -> Dict: | def snapshot_add(self, snapshot: Iterable[Dict]) -> Dict: | ||||
snapshots = list(snapshot) | snapshots = list(snapshot) | ||||
return self.storage.snapshot_add(snapshots) | return self.storage.snapshot_add(snapshots) | ||||
@swh_retry | @swh_retry | ||||
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | ||||
"""Specific case for buffer proxy storage failing to flush data | """Specific case for buffer proxy storage failing to flush data | ||||
""" | """ | ||||
if hasattr(self.storage, 'flush'): | if hasattr(self.storage, 'flush'): | ||||
return self.storage.flush(object_types) | return self.storage.flush(object_types) | ||||
return {} | return {} |