diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -4,20 +4,23 @@ # See top-level LICENSE file for more information from functools import partial -from typing import Collection, Dict, Iterable, List, Mapping, Optional, Tuple +from typing import Collection, Dict, Iterable, Mapping, Tuple + +from typing_extensions import Literal from swh.core.utils import grouper from swh.model.model import BaseModel, Content, SkippedContent from swh.storage import get_storage from swh.storage.interface import StorageInterface -OBJECT_TYPES: List[str] = [ +LObjectType = Literal["content", "skipped_content", "directory", "revision", "release"] +OBJECT_TYPES: Tuple[LObjectType, ...] = ( "content", "skipped_content", "directory", "revision", "release", -] +) DEFAULT_BUFFER_TRESHOLDS: Dict[str, int] = { "content": 10000, @@ -64,7 +67,7 @@ if min_batch_size is not DEFAULT_BUFFER_TRESHOLDS: self._buffer_tresholds = {**DEFAULT_BUFFER_TRESHOLDS, **min_batch_size} - self._objects: Dict[str, Dict[Tuple[str, ...], BaseModel]] = { + self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = { k: {} for k in OBJECT_TYPES } self._contents_size: int = 0 @@ -107,7 +110,11 @@ ) def object_add( - self, objects: Collection[BaseModel], *, object_type: str, keys: Iterable[str], + self, + objects: Collection[BaseModel], + *, + object_type: LObjectType, + keys: Iterable[str], ) -> Dict[str, int]: """Push objects to write to the storage in the buffer. Flushes the buffer to the storage if the threshold is hit. @@ -123,11 +130,10 @@ return {} - def flush(self, object_types: Optional[List[str]] = None) -> Dict[str, int]: + def flush( + self, object_types: Iterable[LObjectType] = OBJECT_TYPES + ) -> Dict[str, int]: summary: Dict[str, int] = self.storage.flush(object_types) - if object_types is None: - object_types = OBJECT_TYPES - for object_type in object_types: buffer_ = self._objects[object_type] batches = grouper(buffer_.values(), n=self._buffer_tresholds[object_type]) @@ -140,7 +146,7 @@ return summary - def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: + def clear_buffers(self, object_types: Iterable[LObjectType] = OBJECT_TYPES) -> None: """Clear objects from current buffer. WARNING: @@ -150,13 +156,10 @@ you want to continue your processing. """ - if object_types is None: - object_types = OBJECT_TYPES - for object_type in object_types: buffer_ = self._objects[object_type] buffer_.clear() if object_type == "content": self._contents_size = 0 - return self.storage.clear_buffers(object_types) + self.storage.clear_buffers(object_types) diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -1303,11 +1303,11 @@ else: return None - def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: + def clear_buffers(self, object_types: Iterable[str]) -> None: """Do nothing """ return None - def flush(self, object_types: Optional[List[str]] = None) -> Dict: + def flush(self, object_types: Iterable[str]) -> Dict[str, int]: return {} diff --git a/swh/storage/filter.py b/swh/storage/filter.py --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -63,7 +63,7 @@ """Return only the content keys missing from swh Args: - content_hashes: List of sha256 to check for existence in swh + content_hashes: list of sha256 to check for existence in swh storage """ @@ -79,7 +79,7 @@ """Return only the content keys missing from swh Args: - content_hashes: List of sha1_git to check for existence in swh + content_hashes: list of sha1_git to check for existence in swh storage """ @@ -97,7 +97,7 @@ Args: object_type: object type to use {revision, directory} - ids: List of object_type ids + ids: list of object_type ids Returns: Missing ids from the storage for object_type @@ -114,3 +114,9 @@ fn = fn_by_object_type[object_type] return set(fn(missing_ids)) + + def clear_buffers(self, object_types: Iterable[str]) -> None: + self.storage.clear_buffers(object_types) + + def flush(self, object_types: Iterable[str]) -> Dict[str, int]: + return self.storage.flush(object_types) diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -1047,7 +1047,7 @@ ... @remote_api_endpoint("origin/add_multi") - def origin_add(self, origins: List[Origin]) -> Dict[str, int]: + def origin_add(self, origins: List[Origin]) -> Dict: """Add origins to the storage Args: @@ -1185,7 +1185,7 @@ ... @remote_api_endpoint("clear/buffer") - def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: + def clear_buffers(self, object_types: Iterable[str]) -> None: """For backend storages (pg, storage, in-memory), this is a noop operation. For proxy storages (especially filter, buffer), this is an operation which cleans internal state. @@ -1193,7 +1193,7 @@ """ @remote_api_endpoint("flush") - def flush(self, object_types: Optional[List[str]] = None) -> Dict: + def flush(self, object_types: Iterable[str]) -> Dict: """For backend storages (pg, storage, in-memory), this is expected to be a noop operation. For proxy storages (especially buffer), this is expected to trigger actual writes to the backend. diff --git a/swh/storage/postgresql/storage.py b/swh/storage/postgresql/storage.py --- a/swh/storage/postgresql/storage.py +++ b/swh/storage/postgresql/storage.py @@ -1396,13 +1396,13 @@ return None return MetadataAuthority.from_dict(dict(zip(db.metadata_authority_cols, row))) - def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: + def clear_buffers(self, object_types: Iterable[str]) -> None: """Do nothing """ return None - def flush(self, object_types: Optional[List[str]] = None) -> Dict: + def flush(self, object_types: Iterable[str]) -> Dict[str, int]: return {} def _get_authority_id(self, authority: MetadataAuthority, db, cur): diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -5,7 +5,7 @@ import logging import traceback -from typing import Dict, Iterable, List, Optional +from typing import Dict, Iterable, List from tenacity import retry, stop_after_attempt, wait_random_exponential @@ -130,10 +130,10 @@ def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: return self.storage.snapshot_add(snapshots) - def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: + def clear_buffers(self, object_types: Iterable[str]) -> None: return self.storage.clear_buffers(object_types) - def flush(self, object_types: Optional[List[str]] = None) -> Dict: + def flush(self, object_types: Iterable[str]) -> Dict[str, int]: """Specific case for buffer proxy storage failing to flush data """ diff --git a/swh/storage/validate.py b/swh/storage/validate.py --- a/swh/storage/validate.py +++ b/swh/storage/validate.py @@ -69,3 +69,9 @@ def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: self._check_hashes(snapshots) return self.storage.snapshot_add(snapshots) + + def clear_buffers(self, object_types: Iterable[str]) -> None: + self.storage.clear_buffers(object_types) + + def flush(self, object_types: Iterable[str]) -> Dict[str, int]: + return self.storage.flush(object_types)