diff --git a/swh/storage/tenacious.py b/swh/storage/tenacious.py --- a/swh/storage/tenacious.py +++ b/swh/storage/tenacious.py @@ -7,7 +7,7 @@ from functools import partial import logging from typing import Counter as CounterT -from typing import Deque, Dict, Iterable, List +from typing import Deque, Dict, Iterable, List, Optional from swh.model.model import BaseModel from swh.storage import get_storage @@ -47,6 +47,9 @@ Also provides a error-rate limit feature: if more than n errors occurred during the insertion of the last p (window_size) objects, stop accepting any insertion. + The number of insertion retries for a single object can be specified via + the 'retries' parameter. + This proxy is mainly intended to be used in a replayer configuration (aka a mirror stack), where insertion errors are mostly unexpected (which explains the low default ratio errors/window_size). @@ -84,7 +87,12 @@ "origin_add", ) - def __init__(self, storage, error_rate_limit=None): + def __init__( + self, + storage, + error_rate_limit: Optional[Dict[str, int]] = None, + retries: int = 3, + ): self.storage = get_storage(**storage) if error_rate_limit is None: error_rate_limit = {"errors": 10, "window_size": 1000} @@ -93,6 +101,7 @@ self.rate_queue = RateQueue( size=error_rate_limit["window_size"], max_errors=error_rate_limit["errors"], ) + self._single_object_retries: int = retries def __getattr__(self, key): if key in self.tenacious_methods: @@ -109,7 +118,10 @@ # list of lists of objects; note this to_add list is consumed from the tail to_add: List[List[BaseModel]] = [list(objects)] + n_objs: int = len(to_add[0]) + results: CounterT[str] = Counter() + retries: int = self._single_object_retries while to_add: if self.rate_queue.limit_reached(): @@ -134,14 +146,28 @@ # reinsert objs split in 2 parts at the end of to_add to_add.append(objs[(len(objs) // 2) :]) to_add.append(objs[: (len(objs) // 2)]) + # each time we append a batch in the to_add bag, reset the + # one-object-batch retries counter + retries = self._single_object_retries else: - logger.error( - f"{func_name}: failed to insert an object, excluding {objs}" - ) - # logger.error(f"Exception: {exc}") - logger.exception(f"Exception was: {exc}") - results.update({f"{object_type}:add:errors": 1}) - self.rate_queue.add_error() + retries -= 1 + if retries: + logger.info( + f"{func_name}: failed to insert an {object_type}, retrying" + ) + # give it another chance + to_add.append(objs) + else: + logger.error( + f"{func_name}: failed to insert an object, " + f"excluding {objs} (from a batch of {n_objs})" + ) + logger.exception(f"Exception was: {exc}") + results.update({f"{object_type}:add:errors": 1}) + self.rate_queue.add_error() + # reset the retries counter (needed in case the next + # batch is also 1 element only) + retries = self._single_object_retries return dict(results) def reset(self): diff --git a/swh/storage/tests/test_tenacious.py b/swh/storage/tests/test_tenacious.py --- a/swh/storage/tests/test_tenacious.py +++ b/swh/storage/tests/test_tenacious.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import Counter from contextlib import contextmanager from unittest.mock import patch @@ -182,6 +183,14 @@ # forbidden are 'bad1' and 'bad2' arguments of `testdata` forbidden = [x[0][2] for x in testdata] + [x[0][3] for x in testdata] + def __init__(self, *args, **kw): + self.add_calls = Counter() + super().__init__(*args, **kw) + + def reset(self): + super().reset() + self.add_calls.clear() + def content_add(self, contents): return self._maybe_add(super().content_add, "content", contents) @@ -206,7 +215,7 @@ return self._maybe_add(super().snapshot_add, "snapshot", snapshots) def _maybe_add(self, add_func, object_type, objects): - # forbidden = [c.id for c in collections[object_type]] + self.add_calls[object_type] += 1 if any(c in self.forbidden for c in objects): raise ValueError( f"{object_type} is forbidden", @@ -228,9 +237,18 @@ add_func = getattr(storage, f"{object_type}_add") + # Note: when checking the LimitedInMemoryStorage.add_calls counter, it's + # hard to guess the exact number of calls in the end (depends on the size + # of batch and the position of bad objects in this batch). So we will only + # check a lower limit of the form (n + m), where n is the minimum expected + # number of additions (due to the batch begin split), and m is the fact + # that bad objects are tried (individually) several (3) times before giving + # up. So for one bad object, m is 3; for 2 bad objects, m is 6, etc. + s = add_func(objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 0 + assert storage.add_calls[object_type] == (1 + 0) in_memory.reset() tenacious.reset() @@ -238,6 +256,8 @@ s = add_func(objects + [bad1]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 + + assert storage.add_calls[object_type] >= (2 + 3) in_memory.reset() tenacious.reset() @@ -245,6 +265,7 @@ s = add_func(objects + [bad1, bad2]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 + assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() @@ -252,6 +273,7 @@ s = add_func([bad1] + objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 + assert storage.add_calls[object_type] >= (2 + 3) in_memory.reset() tenacious.reset() @@ -259,6 +281,7 @@ s = add_func([bad1, bad2] + objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 + assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() @@ -266,6 +289,7 @@ s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 + assert storage.add_calls[object_type] >= (3 + 3) in_memory.reset() tenacious.reset() @@ -273,6 +297,7 @@ s = add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 + assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() @@ -286,6 +311,7 @@ ) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 + assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() @@ -293,6 +319,7 @@ s = add_func([bad1]) assert s.get(f"{object_type}:add", 0) == 0 assert s.get(f"{object_type}:add:errors", 0) == 1 + assert storage.add_calls[object_type] == (0 + 3) in_memory.reset() tenacious.reset() @@ -300,6 +327,7 @@ s = add_func([bad1, bad2]) assert s.get(f"{object_type}:add", 0) == 0 assert s.get(f"{object_type}:add:errors", 0) == 2 + assert storage.add_calls[object_type] == (1 + 6) in_memory.reset() tenacious.reset()