diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -4,20 +4,20 @@ # See top-level LICENSE file for more information from functools import partial -from typing import Collection, Dict, Iterable, List, Mapping, Optional, Tuple +from typing import Dict, Iterable, Mapping, Sequence, Tuple from swh.core.utils import grouper from swh.model.model import BaseModel, Content, SkippedContent from swh.storage import get_storage from swh.storage.interface import StorageInterface -OBJECT_TYPES: List[str] = [ +OBJECT_TYPES: Tuple[str, ...] = ( "content", "skipped_content", "directory", "revision", "release", -] +) DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { "content": 10000, @@ -74,7 +74,7 @@ raise AttributeError(key) return getattr(self.storage, key) - def content_add(self, contents: Collection[Content]) -> Dict: + def content_add(self, contents: Sequence[Content]) -> Dict: """Push contents to write to the storage in the buffer. Following policies apply: @@ -95,7 +95,7 @@ return stats - def skipped_content_add(self, contents: Collection[SkippedContent]) -> Dict: + def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict: return self.object_add( contents, object_type="skipped_content", @@ -103,7 +103,7 @@ ) def object_add( - self, objects: Collection[BaseModel], *, object_type: str, keys: Iterable[str], + self, objects: Sequence[BaseModel], *, object_type: str, keys: Iterable[str], ) -> Dict[str, int]: """Push objects to write to the storage in the buffer. Flushes the buffer to the storage if the threshold is hit. @@ -118,11 +118,8 @@ return {} - def flush(self, object_types: Optional[List[str]] = None) -> Dict[str, int]: + def flush(self, object_types: Sequence[str] = OBJECT_TYPES) -> Dict[str, int]: summary: Dict[str, int] = self.storage.flush(object_types) - if object_types is None: - object_types = OBJECT_TYPES - for object_type in object_types: buffer_ = self._objects[object_type] batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) @@ -135,7 +132,7 @@ return summary - def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: + def clear_buffers(self, object_types: Sequence[str] = OBJECT_TYPES) -> None: """Clear objects from current buffer. WARNING: @@ -145,13 +142,10 @@ you want to continue your processing. """ - if object_types is None: - object_types = OBJECT_TYPES - for object_type in object_types: buffer_ = self._objects[object_type] buffer_.clear() if object_type == "content": self._contents_size = 0 - return self.storage.clear_buffers(object_types) + self.storage.clear_buffers(object_types) diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -9,7 +9,18 @@ import json import random import re -from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union +from typing import ( + Any, + Callable, + Dict, + Iterable, + List, + Optional, + Sequence, + Set, + Tuple, + Union, +) import attr @@ -1303,11 +1314,11 @@ else: return None - def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: + def clear_buffers(self, object_types: Sequence[str]) -> None: """Do nothing """ return None - def flush(self, object_types: Optional[List[str]] = None) -> Dict: + def flush(self, object_types: Sequence[str]) -> Dict[str, int]: return {} diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -5,7 +5,7 @@ import datetime from enum import Enum -from typing import Any, Dict, Iterable, List, Optional, Tuple, TypeVar, Union +from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar, Union from typing_extensions import Protocol, TypedDict, runtime_checkable @@ -1185,7 +1185,7 @@ ... @remote_api_endpoint("clear/buffer") - def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: + def clear_buffers(self, object_types: Sequence[str]) -> None: """For backend storages (pg, storage, in-memory), this is a noop operation. For proxy storages (especially filter, buffer), this is an operation which cleans internal state. @@ -1193,7 +1193,7 @@ """ @remote_api_endpoint("flush") - def flush(self, object_types: Optional[List[str]] = None) -> Dict: + def flush(self, object_types: Sequence[str]) -> Dict[str, int]: """For backend storages (pg, storage, in-memory), this is expected to be a noop operation. For proxy storages (especially buffer), this is expected to trigger actual writes to the backend. diff --git a/swh/storage/postgresql/storage.py b/swh/storage/postgresql/storage.py --- a/swh/storage/postgresql/storage.py +++ b/swh/storage/postgresql/storage.py @@ -9,7 +9,7 @@ from contextlib import contextmanager import datetime import itertools -from typing import Any, Counter, Dict, Iterable, List, Optional, Tuple, Union +from typing import Any, Counter, Dict, Iterable, List, Optional, Sequence, Tuple, Union import attr import psycopg2 @@ -1396,13 +1396,13 @@ return None return MetadataAuthority.from_dict(dict(zip(db.metadata_authority_cols, row))) - def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: + def clear_buffers(self, object_types: Sequence[str]) -> None: """Do nothing """ return None - def flush(self, object_types: Optional[List[str]] = None) -> Dict: + def flush(self, object_types: Sequence[str]) -> Dict[str, int]: return {} def _get_authority_id(self, authority: MetadataAuthority, db, cur): diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -5,7 +5,7 @@ import logging import traceback -from typing import Dict, Iterable, List, Optional +from typing import Dict, Iterable, List from tenacity import retry, stop_after_attempt, wait_random_exponential @@ -129,12 +129,3 @@ @swh_retry def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: return self.storage.snapshot_add(snapshots) - - def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: - return self.storage.clear_buffers(object_types) - - def flush(self, object_types: Optional[List[str]] = None) -> Dict: - """Specific case for buffer proxy storage failing to flush data - - """ - return self.storage.flush(object_types) diff --git a/swh/storage/tests/test_postgresql.py b/swh/storage/tests/test_postgresql.py --- a/swh/storage/tests/test_postgresql.py +++ b/swh/storage/tests/test_postgresql.py @@ -248,13 +248,13 @@ """Calling clear buffers on real storage does nothing """ - assert swh_storage.clear_buffers() is None + assert swh_storage.clear_buffers([]) is None def test_flush(self, swh_storage): """Calling clear buffers on real storage does nothing """ - assert swh_storage.flush() == {} + assert swh_storage.flush([]) == {} def test_dbversion(self, swh_storage): with swh_storage.db() as db: