Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9343665
D5680.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Subscribers
None
D5680.diff
View Options
diff --git a/swh/storage/tenacious.py b/swh/storage/tenacious.py
--- a/swh/storage/tenacious.py
+++ b/swh/storage/tenacious.py
@@ -7,7 +7,7 @@
from functools import partial
import logging
from typing import Counter as CounterT
-from typing import Deque, Dict, Iterable, List
+from typing import Deque, Dict, Iterable, List, Optional
from swh.model.model import BaseModel
from swh.storage import get_storage
@@ -47,6 +47,9 @@
Also provides a error-rate limit feature: if more than n errors occurred during the
insertion of the last p (window_size) objects, stop accepting any insertion.
+ The number of insertion retries for a single object can be specified via
+ the 'retries' parameter.
+
This proxy is mainly intended to be used in a replayer configuration (aka a
mirror stack), where insertion errors are mostly unexpected (which explains
the low default ratio errors/window_size).
@@ -84,7 +87,12 @@
"origin_add",
)
- def __init__(self, storage, error_rate_limit=None):
+ def __init__(
+ self,
+ storage,
+ error_rate_limit: Optional[Dict[str, int]] = None,
+ retries: int = 3,
+ ):
self.storage = get_storage(**storage)
if error_rate_limit is None:
error_rate_limit = {"errors": 10, "window_size": 1000}
@@ -93,6 +101,7 @@
self.rate_queue = RateQueue(
size=error_rate_limit["window_size"], max_errors=error_rate_limit["errors"],
)
+ self._single_object_retries: int = retries
def __getattr__(self, key):
if key in self.tenacious_methods:
@@ -109,7 +118,10 @@
# list of lists of objects; note this to_add list is consumed from the tail
to_add: List[List[BaseModel]] = [list(objects)]
+ n_objs: int = len(to_add[0])
+
results: CounterT[str] = Counter()
+ retries: int = self._single_object_retries
while to_add:
if self.rate_queue.limit_reached():
@@ -134,14 +146,28 @@
# reinsert objs split in 2 parts at the end of to_add
to_add.append(objs[(len(objs) // 2) :])
to_add.append(objs[: (len(objs) // 2)])
+ # each time we append a batch in the to_add bag, reset the
+ # one-object-batch retries counter
+ retries = self._single_object_retries
else:
- logger.error(
- f"{func_name}: failed to insert an object, excluding {objs}"
- )
- # logger.error(f"Exception: {exc}")
- logger.exception(f"Exception was: {exc}")
- results.update({f"{object_type}:add:errors": 1})
- self.rate_queue.add_error()
+ retries -= 1
+ if retries:
+ logger.info(
+ f"{func_name}: failed to insert an {object_type}, retrying"
+ )
+ # give it another chance
+ to_add.append(objs)
+ else:
+ logger.error(
+ f"{func_name}: failed to insert an object, "
+ f"excluding {objs} (from a batch of {n_objs})"
+ )
+ logger.exception(f"Exception was: {exc}")
+ results.update({f"{object_type}:add:errors": 1})
+ self.rate_queue.add_error()
+ # reset the retries counter (needed in case the next
+ # batch is also 1 element only)
+ retries = self._single_object_retries
return dict(results)
def reset(self):
diff --git a/swh/storage/tests/test_tenacious.py b/swh/storage/tests/test_tenacious.py
--- a/swh/storage/tests/test_tenacious.py
+++ b/swh/storage/tests/test_tenacious.py
@@ -3,6 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from collections import Counter
from contextlib import contextmanager
from unittest.mock import patch
@@ -182,6 +183,14 @@
# forbidden are 'bad1' and 'bad2' arguments of `testdata`
forbidden = [x[0][2] for x in testdata] + [x[0][3] for x in testdata]
+ def __init__(self, *args, **kw):
+ self.add_calls = Counter()
+ super().__init__(*args, **kw)
+
+ def reset(self):
+ super().reset()
+ self.add_calls.clear()
+
def content_add(self, contents):
return self._maybe_add(super().content_add, "content", contents)
@@ -206,7 +215,7 @@
return self._maybe_add(super().snapshot_add, "snapshot", snapshots)
def _maybe_add(self, add_func, object_type, objects):
- # forbidden = [c.id for c in collections[object_type]]
+ self.add_calls[object_type] += 1
if any(c in self.forbidden for c in objects):
raise ValueError(
f"{object_type} is forbidden",
@@ -228,9 +237,18 @@
add_func = getattr(storage, f"{object_type}_add")
+ # Note: when checking the LimitedInMemoryStorage.add_calls counter, it's
+ # hard to guess the exact number of calls in the end (depends on the size
+ # of batch and the position of bad objects in this batch). So we will only
+ # check a lower limit of the form (n + m), where n is the minimum expected
+ # number of additions (due to the batch begin split), and m is the fact
+ # that bad objects are tried (individually) several (3) times before giving
+ # up. So for one bad object, m is 3; for 2 bad objects, m is 6, etc.
+
s = add_func(objects)
assert s.get(f"{object_type}:add", 0) == size
assert s.get(f"{object_type}:add:errors", 0) == 0
+ assert storage.add_calls[object_type] == (1 + 0)
in_memory.reset()
tenacious.reset()
@@ -238,6 +256,8 @@
s = add_func(objects + [bad1])
assert s.get(f"{object_type}:add", 0) == size
assert s.get(f"{object_type}:add:errors", 0) == 1
+
+ assert storage.add_calls[object_type] >= (2 + 3)
in_memory.reset()
tenacious.reset()
@@ -245,6 +265,7 @@
s = add_func(objects + [bad1, bad2])
assert s.get(f"{object_type}:add", 0) == size
assert s.get(f"{object_type}:add:errors", 0) == 2
+ assert storage.add_calls[object_type] >= (3 + 6)
in_memory.reset()
tenacious.reset()
@@ -252,6 +273,7 @@
s = add_func([bad1] + objects)
assert s.get(f"{object_type}:add", 0) == size
assert s.get(f"{object_type}:add:errors", 0) == 1
+ assert storage.add_calls[object_type] >= (2 + 3)
in_memory.reset()
tenacious.reset()
@@ -259,6 +281,7 @@
s = add_func([bad1, bad2] + objects)
assert s.get(f"{object_type}:add", 0) == size
assert s.get(f"{object_type}:add:errors", 0) == 2
+ assert storage.add_calls[object_type] >= (3 + 6)
in_memory.reset()
tenacious.reset()
@@ -266,6 +289,7 @@
s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :])
assert s.get(f"{object_type}:add", 0) == size
assert s.get(f"{object_type}:add:errors", 0) == 1
+ assert storage.add_calls[object_type] >= (3 + 3)
in_memory.reset()
tenacious.reset()
@@ -273,6 +297,7 @@
s = add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :])
assert s.get(f"{object_type}:add", 0) == size
assert s.get(f"{object_type}:add:errors", 0) == 2
+ assert storage.add_calls[object_type] >= (3 + 6)
in_memory.reset()
tenacious.reset()
@@ -286,6 +311,7 @@
)
assert s.get(f"{object_type}:add", 0) == size
assert s.get(f"{object_type}:add:errors", 0) == 2
+ assert storage.add_calls[object_type] >= (3 + 6)
in_memory.reset()
tenacious.reset()
@@ -293,6 +319,7 @@
s = add_func([bad1])
assert s.get(f"{object_type}:add", 0) == 0
assert s.get(f"{object_type}:add:errors", 0) == 1
+ assert storage.add_calls[object_type] == (0 + 3)
in_memory.reset()
tenacious.reset()
@@ -300,6 +327,7 @@
s = add_func([bad1, bad2])
assert s.get(f"{object_type}:add", 0) == 0
assert s.get(f"{object_type}:add:errors", 0) == 2
+ assert storage.add_calls[object_type] == (1 + 6)
in_memory.reset()
tenacious.reset()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 1:43 PM (5 d, 17 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219892
Attached To
D5680: Make the TenaciousProxyStorage retry when a single object add fails
Event Timeline
Log In to Comment