Page MenuHomeSoftware Heritage

D4033.id14288.diff
No OneTemporary

D4033.id14288.diff

diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py
--- a/swh/storage/buffer.py
+++ b/swh/storage/buffer.py
@@ -4,20 +4,20 @@
# See top-level LICENSE file for more information
from functools import partial
-from typing import Collection, Dict, Iterable, List, Mapping, Optional, Tuple
+from typing import Dict, Iterable, Mapping, Sequence, Tuple
from swh.core.utils import grouper
from swh.model.model import BaseModel, Content, SkippedContent
from swh.storage import get_storage
from swh.storage.interface import StorageInterface
-OBJECT_TYPES: List[str] = [
+OBJECT_TYPES: Tuple[str, ...] = (
"content",
"skipped_content",
"directory",
"revision",
"release",
-]
+)
DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = {
"content": 10000,
@@ -74,7 +74,7 @@
raise AttributeError(key)
return getattr(self.storage, key)
- def content_add(self, contents: Collection[Content]) -> Dict:
+ def content_add(self, contents: Sequence[Content]) -> Dict:
"""Push contents to write to the storage in the buffer.
Following policies apply:
@@ -95,7 +95,7 @@
return stats
- def skipped_content_add(self, contents: Collection[SkippedContent]) -> Dict:
+ def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict:
return self.object_add(
contents,
object_type="skipped_content",
@@ -103,7 +103,7 @@
)
def object_add(
- self, objects: Collection[BaseModel], *, object_type: str, keys: Iterable[str],
+ self, objects: Sequence[BaseModel], *, object_type: str, keys: Iterable[str],
) -> Dict[str, int]:
"""Push objects to write to the storage in the buffer. Flushes the
buffer to the storage if the threshold is hit.
@@ -118,11 +118,8 @@
return {}
- def flush(self, object_types: Optional[List[str]] = None) -> Dict[str, int]:
+ def flush(self, object_types: Sequence[str] = OBJECT_TYPES) -> Dict[str, int]:
summary: Dict[str, int] = self.storage.flush(object_types)
- if object_types is None:
- object_types = OBJECT_TYPES
-
for object_type in object_types:
buffer_ = self._objects[object_type]
batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type])
@@ -135,7 +132,7 @@
return summary
- def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
+ def clear_buffers(self, object_types: Sequence[str] = OBJECT_TYPES) -> None:
"""Clear objects from current buffer.
WARNING:
@@ -145,13 +142,10 @@
you want to continue your processing.
"""
- if object_types is None:
- object_types = OBJECT_TYPES
-
for object_type in object_types:
buffer_ = self._objects[object_type]
buffer_.clear()
if object_type == "content":
self._contents_size = 0
- return self.storage.clear_buffers(object_types)
+ self.storage.clear_buffers(object_types)
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -9,7 +9,18 @@
import json
import random
import re
-from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Sequence,
+ Set,
+ Tuple,
+ Union,
+)
import attr
@@ -1303,11 +1314,11 @@
else:
return None
- def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
+ def clear_buffers(self, object_types: Sequence[str]) -> None:
"""Do nothing
"""
return None
- def flush(self, object_types: Optional[List[str]] = None) -> Dict:
+ def flush(self, object_types: Sequence[str]) -> Dict[str, int]:
return {}
diff --git a/swh/storage/interface.py b/swh/storage/interface.py
--- a/swh/storage/interface.py
+++ b/swh/storage/interface.py
@@ -5,7 +5,7 @@
import datetime
from enum import Enum
-from typing import Any, Dict, Iterable, List, Optional, Tuple, TypeVar, Union
+from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar, Union
from typing_extensions import Protocol, TypedDict, runtime_checkable
@@ -1185,7 +1185,7 @@
...
@remote_api_endpoint("clear/buffer")
- def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
+ def clear_buffers(self, object_types: Sequence[str]) -> None:
"""For backend storages (pg, storage, in-memory), this is a noop operation. For proxy
storages (especially filter, buffer), this is an operation which cleans internal
state.
@@ -1193,7 +1193,7 @@
"""
@remote_api_endpoint("flush")
- def flush(self, object_types: Optional[List[str]] = None) -> Dict:
+ def flush(self, object_types: Sequence[str]) -> Dict[str, int]:
"""For backend storages (pg, storage, in-memory), this is expected to be a noop
operation. For proxy storages (especially buffer), this is expected to trigger
actual writes to the backend.
diff --git a/swh/storage/postgresql/storage.py b/swh/storage/postgresql/storage.py
--- a/swh/storage/postgresql/storage.py
+++ b/swh/storage/postgresql/storage.py
@@ -9,7 +9,7 @@
from contextlib import contextmanager
import datetime
import itertools
-from typing import Any, Counter, Dict, Iterable, List, Optional, Tuple, Union
+from typing import Any, Counter, Dict, Iterable, List, Optional, Sequence, Tuple, Union
import attr
import psycopg2
@@ -1396,13 +1396,13 @@
return None
return MetadataAuthority.from_dict(dict(zip(db.metadata_authority_cols, row)))
- def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
+ def clear_buffers(self, object_types: Sequence[str]) -> None:
"""Do nothing
"""
return None
- def flush(self, object_types: Optional[List[str]] = None) -> Dict:
+ def flush(self, object_types: Sequence[str]) -> Dict[str, int]:
return {}
def _get_authority_id(self, authority: MetadataAuthority, db, cur):
diff --git a/swh/storage/retry.py b/swh/storage/retry.py
--- a/swh/storage/retry.py
+++ b/swh/storage/retry.py
@@ -5,7 +5,7 @@
import logging
import traceback
-from typing import Dict, Iterable, List, Optional
+from typing import Dict, Iterable, List
from tenacity import retry, stop_after_attempt, wait_random_exponential
@@ -129,12 +129,3 @@
@swh_retry
def snapshot_add(self, snapshots: List[Snapshot]) -> Dict:
return self.storage.snapshot_add(snapshots)
-
- def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
- return self.storage.clear_buffers(object_types)
-
- def flush(self, object_types: Optional[List[str]] = None) -> Dict:
- """Specific case for buffer proxy storage failing to flush data
-
- """
- return self.storage.flush(object_types)
diff --git a/swh/storage/tests/test_postgresql.py b/swh/storage/tests/test_postgresql.py
--- a/swh/storage/tests/test_postgresql.py
+++ b/swh/storage/tests/test_postgresql.py
@@ -248,13 +248,13 @@
"""Calling clear buffers on real storage does nothing
"""
- assert swh_storage.clear_buffers() is None
+ assert swh_storage.clear_buffers([]) is None
def test_flush(self, swh_storage):
"""Calling clear buffers on real storage does nothing
"""
- assert swh_storage.flush() == {}
+ assert swh_storage.flush([]) == {}
def test_dbversion(self, swh_storage):
with swh_storage.db() as db:

File Metadata

Mime Type
text/plain
Expires
Jul 3 2025, 10:42 AM (4 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220522

Event Timeline