diff --git a/swh/storage/tenacious.py b/swh/storage/tenacious.py index 59625295..78bc9bcd 100644 --- a/swh/storage/tenacious.py +++ b/swh/storage/tenacious.py @@ -1,148 +1,174 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter, deque from functools import partial import logging from typing import Counter as CounterT -from typing import Deque, Dict, Iterable, List +from typing import Deque, Dict, Iterable, List, Optional from swh.model.model import BaseModel from swh.storage import get_storage logger = logging.getLogger(__name__) class RateQueue: def __init__(self, size: int, max_errors: int): assert size > max_errors self._size = size self._max_errors = max_errors self._errors: Deque[bool] = deque(maxlen=size) def add_ok(self, n_ok: int = 1) -> None: self._errors.extend([False] * n_ok) def add_error(self, n_error: int = 1) -> None: self._errors.extend([True] * n_error) def limit_reached(self) -> bool: return sum(self._errors) > self._max_errors def reset(self): # mainly for testing purpose self._errors.clear() class TenaciousProxyStorage: """Storage proxy that have a tenacious insertion behavior. When an xxx_add method is called, it's first attempted as is against the backend storage. If a failure occurs, split the list of inserted objects in pieces until erroneous objects have been identified, so all the valid objects are guaranteed to be inserted. Also provides a error-rate limit feature: if more than n errors occurred during the insertion of the last p (window_size) objects, stop accepting any insertion. + The number of insertion retries for a single object can be specified via + the 'retries' parameter. + This proxy is mainly intended to be used in a replayer configuration (aka a mirror stack), where insertion errors are mostly unexpected (which explains the low default ratio errors/window_size). Conversely, it should not be used in a loader configuration, as it may drop objects without stopping the loader, which leads to holes in the graph. Deployments using this proxy should carefully monitor their logs to check any failure is expected (because the failed object is corrupted), not because of transient errors or issues with the storage backend. Sample configuration use case for tenacious storage: .. code-block:: yaml storage: cls: tenacious storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ error-rate-limit: errors: 10 window_size: 1000 """ tenacious_methods = ( "content_add", "skipped_content_add", "directory_add", "revision_add", "extid_add", "release_add", "snapshot_add", "origin_add", ) - def __init__(self, storage, error_rate_limit=None): + def __init__( + self, + storage, + error_rate_limit: Optional[Dict[str, int]] = None, + retries: int = 3, + ): self.storage = get_storage(**storage) if error_rate_limit is None: error_rate_limit = {"errors": 10, "window_size": 1000} assert "errors" in error_rate_limit assert "window_size" in error_rate_limit self.rate_queue = RateQueue( size=error_rate_limit["window_size"], max_errors=error_rate_limit["errors"], ) + self._single_object_retries: int = retries def __getattr__(self, key): if key in self.tenacious_methods: return partial(self._tenacious_add, key) return getattr(self.storage, key) def _tenacious_add(self, func_name, objects: Iterable[BaseModel]) -> Dict[str, int]: """Enqueue objects to write to the storage. This checks if the queue's threshold is hit. If it is actually write those to the storage. """ add_function = getattr(self.storage, func_name) object_type = func_name[:-4] # remove the _add suffix # list of lists of objects; note this to_add list is consumed from the tail to_add: List[List[BaseModel]] = [list(objects)] + n_objs: int = len(to_add[0]) + results: CounterT[str] = Counter() + retries: int = self._single_object_retries while to_add: if self.rate_queue.limit_reached(): logging.error( "Too many insertion errors have been detected; " "disabling insertions" ) raise RuntimeError( "Too many insertion errors have been detected; " "disabling insertions" ) objs = to_add.pop() try: results.update(add_function(objs)) self.rate_queue.add_ok(len(objs)) except Exception as exc: if len(objs) > 1: logger.info( f"{func_name}: failed to insert a batch of " f"{len(objs)} {object_type} objects, splitting" ) # reinsert objs split in 2 parts at the end of to_add to_add.append(objs[(len(objs) // 2) :]) to_add.append(objs[: (len(objs) // 2)]) + # each time we append a batch in the to_add bag, reset the + # one-object-batch retries counter + retries = self._single_object_retries else: - logger.error( - f"{func_name}: failed to insert an object, excluding {objs}" - ) - # logger.error(f"Exception: {exc}") - logger.exception(f"Exception was: {exc}") - results.update({f"{object_type}:add:errors": 1}) - self.rate_queue.add_error() + retries -= 1 + if retries: + logger.info( + f"{func_name}: failed to insert an {object_type}, retrying" + ) + # give it another chance + to_add.append(objs) + else: + logger.error( + f"{func_name}: failed to insert an object, " + f"excluding {objs} (from a batch of {n_objs})" + ) + logger.exception(f"Exception was: {exc}") + results.update({f"{object_type}:add:errors": 1}) + self.rate_queue.add_error() + # reset the retries counter (needed in case the next + # batch is also 1 element only) + retries = self._single_object_retries return dict(results) def reset(self): self.rate_queue.reset() diff --git a/swh/storage/tests/test_tenacious.py b/swh/storage/tests/test_tenacious.py index e85ec2b3..960eaa78 100644 --- a/swh/storage/tests/test_tenacious.py +++ b/swh/storage/tests/test_tenacious.py @@ -1,366 +1,394 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import Counter from contextlib import contextmanager from unittest.mock import patch import attr import pytest from swh.model import model from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.storage import get_storage from swh.storage.in_memory import InMemoryStorage from swh.storage.tenacious import TenaciousProxyStorage from swh.storage.tests.storage_data import StorageData from swh.storage.tests.storage_tests import ( TestStorageGeneratedData as _TestStorageGeneratedData, ) from swh.storage.tests.storage_tests import TestStorage as _TestStorage # noqa data = StorageData() collections = { "origin": data.origins, "content": data.contents, "skipped_content": data.skipped_contents, "revision": data.revisions, "directory": data.directories, "release": data.releases, "snapshot": data.snapshots, } # generic storage tests (using imported TestStorage* classes) @pytest.fixture def swh_storage_backend_config2(): yield { "cls": "memory", "journal_writer": {"cls": "memory",}, } @pytest.fixture def swh_storage(): storage_config = { "cls": "pipeline", "steps": [ {"cls": "tenacious"}, {"cls": "memory", "journal_writer": {"cls": "memory",}}, ], } storage = get_storage(**storage_config) storage.journal_writer = storage.storage.journal_writer return storage class TestTenaciousStorage(_TestStorage): @pytest.mark.skip( 'The "person" table of the pgsql is a legacy thing, and not ' "supported by the cassandra/in-memory backend." ) def test_person_fullname_unicity(self): pass @pytest.mark.skip(reason="No collision with the tenacious storage") def test_content_add_collision(self, swh_storage, sample_data): pass @pytest.mark.skip("content_update is not implemented") def test_content_update(self): pass @pytest.mark.skip("Not supported by Cassandra/InMemory storage") def test_origin_count(self): pass class TestTenaciousStorageGeneratedData(_TestStorageGeneratedData): @pytest.mark.skip("Not supported by Cassandra/InMemory") def test_origin_count(self): pass @pytest.mark.skip("Not supported by Cassandra/InMemory") def test_origin_count_with_visit_no_visits(self): pass @pytest.mark.skip("Not supported by Cassandra/InMemory") def test_origin_count_with_visit_with_visits_and_snapshot(self): pass @pytest.mark.skip("Not supported by Cassandra/InMemory") def test_origin_count_with_visit_with_visits_no_snapshot(self): pass # specific tests for the tenacious behavior def get_tenacious_storage(**config): storage_config = { "cls": "pipeline", "steps": [ {"cls": "validate"}, {"cls": "tenacious", **config}, {"cls": "memory"}, ], } return get_storage(**storage_config) @contextmanager def disabled_validators(): attr.set_run_validators(False) yield attr.set_run_validators(True) def popid(d): d.pop("id") return d testdata = [ pytest.param( "content", list(TEST_OBJECTS["content"]), attr.evolve(model.Content.from_data(data=b"too big"), length=1000), attr.evolve(model.Content.from_data(data=b"to fail"), length=1000), id="content", ), pytest.param( "skipped_content", list(TEST_OBJECTS["skipped_content"]), attr.evolve( model.SkippedContent.from_data(data=b"too big", reason="too big"), length=1000, ), attr.evolve( model.SkippedContent.from_data(data=b"to fail", reason="to fail"), length=1000, ), id="skipped_content", ), pytest.param( "directory", list(TEST_OBJECTS["directory"]), data.directory, data.directory2, id="directory", ), pytest.param( "revision", list(TEST_OBJECTS["revision"]), data.revision, data.revision2, id="revision", ), pytest.param( "release", list(TEST_OBJECTS["release"]), data.release, data.release2, id="release", ), pytest.param( "snapshot", list(TEST_OBJECTS["snapshot"]), data.snapshot, data.complete_snapshot, id="snapshot", ), pytest.param( "origin", list(TEST_OBJECTS["origin"]), data.origin, data.origin2, id="origin", ), ] class LimitedInMemoryStorage(InMemoryStorage): # forbidden are 'bad1' and 'bad2' arguments of `testdata` forbidden = [x[0][2] for x in testdata] + [x[0][3] for x in testdata] + def __init__(self, *args, **kw): + self.add_calls = Counter() + super().__init__(*args, **kw) + + def reset(self): + super().reset() + self.add_calls.clear() + def content_add(self, contents): return self._maybe_add(super().content_add, "content", contents) def skipped_content_add(self, skipped_contents): return self._maybe_add( super().skipped_content_add, "skipped_content", skipped_contents ) def origin_add(self, origins): return self._maybe_add(super().origin_add, "origin", origins) def directory_add(self, directories): return self._maybe_add(super().directory_add, "directory", directories) def revision_add(self, revisions): return self._maybe_add(super().revision_add, "revision", revisions) def release_add(self, releases): return self._maybe_add(super().release_add, "release", releases) def snapshot_add(self, snapshots): return self._maybe_add(super().snapshot_add, "snapshot", snapshots) def _maybe_add(self, add_func, object_type, objects): - # forbidden = [c.id for c in collections[object_type]] + self.add_calls[object_type] += 1 if any(c in self.forbidden for c in objects): raise ValueError( f"{object_type} is forbidden", [c.unique_key() for c in objects if c in self.forbidden], ) return add_func(objects) @patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage) @pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata) def test_tenacious_proxy_storage(object_type, objects, bad1, bad2): storage = get_tenacious_storage() tenacious = storage.storage in_memory = tenacious.storage assert isinstance(tenacious, TenaciousProxyStorage) assert isinstance(in_memory, LimitedInMemoryStorage) size = len(objects) add_func = getattr(storage, f"{object_type}_add") + # Note: when checking the LimitedInMemoryStorage.add_calls counter, it's + # hard to guess the exact number of calls in the end (depends on the size + # of batch and the position of bad objects in this batch). So we will only + # check a lower limit of the form (n + m), where n is the minimum expected + # number of additions (due to the batch begin split), and m is the fact + # that bad objects are tried (individually) several (3) times before giving + # up. So for one bad object, m is 3; for 2 bad objects, m is 6, etc. + s = add_func(objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 0 + assert storage.add_calls[object_type] == (1 + 0) in_memory.reset() tenacious.reset() # bad1 is the last element s = add_func(objects + [bad1]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 + + assert storage.add_calls[object_type] >= (2 + 3) in_memory.reset() tenacious.reset() # bad1 and bad2 are the last elements s = add_func(objects + [bad1, bad2]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 + assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() # bad1 is the first element s = add_func([bad1] + objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 + assert storage.add_calls[object_type] >= (2 + 3) in_memory.reset() tenacious.reset() # bad1 and bad2 are the first elements s = add_func([bad1, bad2] + objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 + assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() # bad1 is in the middle of the list of inserted elements s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 + assert storage.add_calls[object_type] >= (3 + 3) in_memory.reset() tenacious.reset() # bad1 and bad2 are together in the middle of the list of inserted elements s = add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 + assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() # bad1 and bad2 are spread in the middle of the list of inserted elements s = add_func( objects[: size // 3] + [bad1] + objects[size // 3 : 2 * (size // 3)] + [bad2] + objects[2 * (size // 3) :] ) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 + assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() # bad1 is the only element s = add_func([bad1]) assert s.get(f"{object_type}:add", 0) == 0 assert s.get(f"{object_type}:add:errors", 0) == 1 + assert storage.add_calls[object_type] == (0 + 3) in_memory.reset() tenacious.reset() # bad1 and bad2 are the only elements s = add_func([bad1, bad2]) assert s.get(f"{object_type}:add", 0) == 0 assert s.get(f"{object_type}:add:errors", 0) == 2 + assert storage.add_calls[object_type] == (1 + 6) in_memory.reset() tenacious.reset() @patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage) @pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata) def test_tenacious_proxy_storage_rate_limit(object_type, objects, bad1, bad2): storage = get_tenacious_storage(error_rate_limit={"errors": 1, "window_size": 3}) tenacious = storage.storage in_memory = tenacious.storage assert isinstance(tenacious, TenaciousProxyStorage) assert isinstance(in_memory, LimitedInMemoryStorage) size = len(objects) add_func = getattr(storage, f"{object_type}_add") # with no insertion failure, no impact s = add_func(objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 0 in_memory.reset() tenacious.reset() # with one insertion failure, no impact s = add_func([bad1] + objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 in_memory.reset() tenacious.reset() s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 in_memory.reset() tenacious.reset() # with two consecutive insertion failures, exception is raised with pytest.raises(RuntimeError, match="Too many insertion errors"): add_func([bad1, bad2] + objects) in_memory.reset() tenacious.reset() if size > 2: # with two consecutive insertion failures, exception is raised # (errors not at the beginning) with pytest.raises(RuntimeError, match="Too many insertion errors"): add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :]) in_memory.reset() tenacious.reset() # with two non-consecutive insertion failures, no impact # (errors are far enough to not reach the rate limit) s = add_func( objects[: size // 3] + [bad1] + objects[size // 3 : 2 * (size // 3)] + [bad2] + objects[2 * (size // 3) :] ) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 in_memory.reset() tenacious.reset()