Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9340453
D4033.id14288.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
D4033.id14288.diff
View Options
diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py
--- a/swh/storage/buffer.py
+++ b/swh/storage/buffer.py
@@ -4,20 +4,20 @@
# See top-level LICENSE file for more information
from functools import partial
-from typing import Collection, Dict, Iterable, List, Mapping, Optional, Tuple
+from typing import Dict, Iterable, Mapping, Sequence, Tuple
from swh.core.utils import grouper
from swh.model.model import BaseModel, Content, SkippedContent
from swh.storage import get_storage
from swh.storage.interface import StorageInterface
-OBJECT_TYPES: List[str] = [
+OBJECT_TYPES: Tuple[str, ...] = (
"content",
"skipped_content",
"directory",
"revision",
"release",
-]
+)
DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = {
"content": 10000,
@@ -74,7 +74,7 @@
raise AttributeError(key)
return getattr(self.storage, key)
- def content_add(self, contents: Collection[Content]) -> Dict:
+ def content_add(self, contents: Sequence[Content]) -> Dict:
"""Push contents to write to the storage in the buffer.
Following policies apply:
@@ -95,7 +95,7 @@
return stats
- def skipped_content_add(self, contents: Collection[SkippedContent]) -> Dict:
+ def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict:
return self.object_add(
contents,
object_type="skipped_content",
@@ -103,7 +103,7 @@
)
def object_add(
- self, objects: Collection[BaseModel], *, object_type: str, keys: Iterable[str],
+ self, objects: Sequence[BaseModel], *, object_type: str, keys: Iterable[str],
) -> Dict[str, int]:
"""Push objects to write to the storage in the buffer. Flushes the
buffer to the storage if the threshold is hit.
@@ -118,11 +118,8 @@
return {}
- def flush(self, object_types: Optional[List[str]] = None) -> Dict[str, int]:
+ def flush(self, object_types: Sequence[str] = OBJECT_TYPES) -> Dict[str, int]:
summary: Dict[str, int] = self.storage.flush(object_types)
- if object_types is None:
- object_types = OBJECT_TYPES
-
for object_type in object_types:
buffer_ = self._objects[object_type]
batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type])
@@ -135,7 +132,7 @@
return summary
- def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
+ def clear_buffers(self, object_types: Sequence[str] = OBJECT_TYPES) -> None:
"""Clear objects from current buffer.
WARNING:
@@ -145,13 +142,10 @@
you want to continue your processing.
"""
- if object_types is None:
- object_types = OBJECT_TYPES
-
for object_type in object_types:
buffer_ = self._objects[object_type]
buffer_.clear()
if object_type == "content":
self._contents_size = 0
- return self.storage.clear_buffers(object_types)
+ self.storage.clear_buffers(object_types)
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -9,7 +9,18 @@
import json
import random
import re
-from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Sequence,
+ Set,
+ Tuple,
+ Union,
+)
import attr
@@ -1303,11 +1314,11 @@
else:
return None
- def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
+ def clear_buffers(self, object_types: Sequence[str]) -> None:
"""Do nothing
"""
return None
- def flush(self, object_types: Optional[List[str]] = None) -> Dict:
+ def flush(self, object_types: Sequence[str]) -> Dict[str, int]:
return {}
diff --git a/swh/storage/interface.py b/swh/storage/interface.py
--- a/swh/storage/interface.py
+++ b/swh/storage/interface.py
@@ -5,7 +5,7 @@
import datetime
from enum import Enum
-from typing import Any, Dict, Iterable, List, Optional, Tuple, TypeVar, Union
+from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar, Union
from typing_extensions import Protocol, TypedDict, runtime_checkable
@@ -1185,7 +1185,7 @@
...
@remote_api_endpoint("clear/buffer")
- def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
+ def clear_buffers(self, object_types: Sequence[str]) -> None:
"""For backend storages (pg, storage, in-memory), this is a noop operation. For proxy
storages (especially filter, buffer), this is an operation which cleans internal
state.
@@ -1193,7 +1193,7 @@
"""
@remote_api_endpoint("flush")
- def flush(self, object_types: Optional[List[str]] = None) -> Dict:
+ def flush(self, object_types: Sequence[str]) -> Dict[str, int]:
"""For backend storages (pg, storage, in-memory), this is expected to be a noop
operation. For proxy storages (especially buffer), this is expected to trigger
actual writes to the backend.
diff --git a/swh/storage/postgresql/storage.py b/swh/storage/postgresql/storage.py
--- a/swh/storage/postgresql/storage.py
+++ b/swh/storage/postgresql/storage.py
@@ -9,7 +9,7 @@
from contextlib import contextmanager
import datetime
import itertools
-from typing import Any, Counter, Dict, Iterable, List, Optional, Tuple, Union
+from typing import Any, Counter, Dict, Iterable, List, Optional, Sequence, Tuple, Union
import attr
import psycopg2
@@ -1396,13 +1396,13 @@
return None
return MetadataAuthority.from_dict(dict(zip(db.metadata_authority_cols, row)))
- def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
+ def clear_buffers(self, object_types: Sequence[str]) -> None:
"""Do nothing
"""
return None
- def flush(self, object_types: Optional[List[str]] = None) -> Dict:
+ def flush(self, object_types: Sequence[str]) -> Dict[str, int]:
return {}
def _get_authority_id(self, authority: MetadataAuthority, db, cur):
diff --git a/swh/storage/retry.py b/swh/storage/retry.py
--- a/swh/storage/retry.py
+++ b/swh/storage/retry.py
@@ -5,7 +5,7 @@
import logging
import traceback
-from typing import Dict, Iterable, List, Optional
+from typing import Dict, Iterable, List
from tenacity import retry, stop_after_attempt, wait_random_exponential
@@ -129,12 +129,3 @@
@swh_retry
def snapshot_add(self, snapshots: List[Snapshot]) -> Dict:
return self.storage.snapshot_add(snapshots)
-
- def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
- return self.storage.clear_buffers(object_types)
-
- def flush(self, object_types: Optional[List[str]] = None) -> Dict:
- """Specific case for buffer proxy storage failing to flush data
-
- """
- return self.storage.flush(object_types)
diff --git a/swh/storage/tests/test_postgresql.py b/swh/storage/tests/test_postgresql.py
--- a/swh/storage/tests/test_postgresql.py
+++ b/swh/storage/tests/test_postgresql.py
@@ -248,13 +248,13 @@
"""Calling clear buffers on real storage does nothing
"""
- assert swh_storage.clear_buffers() is None
+ assert swh_storage.clear_buffers([]) is None
def test_flush(self, swh_storage):
"""Calling clear buffers on real storage does nothing
"""
- assert swh_storage.flush() == {}
+ assert swh_storage.flush([]) == {}
def test_dbversion(self, swh_storage):
with swh_storage.db() as db:
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Jul 3 2025, 10:42 AM (4 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220522
Attached To
D4033: Improve typing of the buffering interface
Event Timeline
Log In to Comment