diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -20,6 +20,7 @@ "retry": ".retry.RetryingProxyStorage", "cassandra": ".cassandra.CassandraStorage", "validate": ".validate.ValidatingProxyStorage", + "tenacious": ".tenacious.TenaciousProxyStorage", } diff --git a/swh/storage/tenacious.py b/swh/storage/tenacious.py new file mode 100644 --- /dev/null +++ b/swh/storage/tenacious.py @@ -0,0 +1,136 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import deque +from functools import partial +import logging +from typing import Deque, Dict, Iterable + +from swh.model.model import BaseModel +from swh.storage import get_storage + +logger = logging.getLogger(__name__) + + +class RateQueue: + def __init__(self, size: int, max_errors: int): + assert size > max_errors + self._size = size + self._max_errors = max_errors + self._errors: Deque[bool] = deque(maxlen=size) + + def add_ok(self, n_ok: int = 1) -> None: + self._errors.extend([False] * n_ok) + + def add_error(self, n_error: int = 1) -> None: + self._errors.extend([True] * n_error) + + def limit_reached(self) -> bool: + return sum(self._errors) > self._max_errors + + def reset(self): + # mainly for testing purpose + self._errors.clear() + + +class TenaciousProxyStorage: + """Storage proxy that have a tenacious insertion behavior. + + When an xxx_add method is called, it's first attempted as is against the backend + storage. Itf a failure occurs, split the list of inserted objects in pieces until + erroneous objects have been identified, so all the valid objects are guaranteed to + be inserted. + + Also provides a error-rate limit feature: if more than n errors occurred during the + insertion of the last p objects, stop accepting any insertion. + + Sample configuration use case for tenacious storage: + + .. code-block:: yaml + + storage: + cls: tenacious + args: + storage: + cls: remote + args: http://storage.internal.staging.swh.network:5002/ + error-rate-limit: + errors: 10 + total: 1000 + """ + + tenacious_methods = ( + "content_add", + "skipped_content_add", + "directory_add", + "revision_add", + "extid_add", + "release_add", + "snapshot_add", + "origin_add", + ) + + def __init__(self, storage, error_rate_limit=None): + self.storage = get_storage(**storage) + if error_rate_limit is None: + error_rate_limit = {"errors": 10, "total": 1000} + assert "errors" in error_rate_limit + assert "total" in error_rate_limit + self.rate_queue = RateQueue( + size=error_rate_limit["total"], max_errors=error_rate_limit["errors"], + ) + + def __getattr__(self, key): + if key in self.tenacious_methods: + return partial(self.tenacious_add, key) + return getattr(self.storage, key) + + def tenacious_add(self, func_name, objects: Iterable[BaseModel]) -> Dict[str, int]: + """Enqueue objects to write to the storage. This checks if the queue's + threshold is hit. If it is actually write those to the storage. + + """ + add_function = getattr(self.storage, func_name) + object_type = func_name[:-4] # remove the _add suffix + to_add = [list(objects)] # list of lists of objects + results: Dict[str, int] = {} + + while to_add: + if self.rate_queue.limit_reached(): + logging.error( + "Too many insertion errors have been detected; " + "disabling insertions" + ) + break + objs = to_add.pop(0) + try: + accumulate(results, add_function(objs)) + self.rate_queue.add_ok(len(objs)) + except Exception as exc: + if len(objs) > 1: + logger.info( + f"{func_name}: failed to insert a batch of " + f"{len(objs)} {object_type} objects, splitting" + ) + # reinsert objs split in 2 parts at the beginning of to_add + to_add[:0] = [objs[: len(objs) // 2], objs[len(objs) // 2 :]] + else: + logger.error( + f"{func_name}: failed to insert an object, excluding {objs}" + ) + # logger.error(f"Exception: {exc}") + logger.exception(f"Exception was: {exc}") + accumulate(results, {f"{object_type}:add:errors": 1}) + self.rate_queue.add_error() + return results + + def reset(self): + self.rate_queue.reset() + + +def accumulate(d1, d2): + for k, v in d2.items(): + v += d1.get(k, 0) + d1[k] = v diff --git a/swh/storage/tests/test_tenacious.py b/swh/storage/tests/test_tenacious.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_tenacious.py @@ -0,0 +1,368 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from contextlib import contextmanager +from unittest.mock import patch + +import attr +import pytest + +from swh.model import model +from swh.model.tests.swh_model_data import TEST_OBJECTS +from swh.storage import get_storage +from swh.storage.in_memory import InMemoryStorage +from swh.storage.tenacious import TenaciousProxyStorage +from swh.storage.tests.storage_data import StorageData +from swh.storage.tests.storage_tests import ( + TestStorageGeneratedData as _TestStorageGeneratedData, +) +from swh.storage.tests.storage_tests import TestStorage as _TestStorage # noqa + +data = StorageData() +collections = { + "origin": data.origins, + "content": data.contents, + "skipped_content": data.skipped_contents, + "revision": data.revisions, + "directory": data.directories, + "release": data.releases, + "snapshot": data.snapshots, +} +# generic storage tests (using imported TestStorage* classes) + + +@pytest.fixture +def swh_storage_backend_config2(): + yield { + "cls": "memory", + "journal_writer": {"cls": "memory",}, + } + + +@pytest.fixture +def swh_storage(): + storage_config = { + "cls": "pipeline", + "steps": [ + {"cls": "tenacious"}, + {"cls": "memory", "journal_writer": {"cls": "memory",}}, + ], + } + + storage = get_storage(**storage_config) + storage.journal_writer = storage.storage.journal_writer + return storage + + +class TestTenaciousStorage(_TestStorage): + @pytest.mark.skip( + 'The "person" table of the pgsql is a legacy thing, and not ' + "supported by the cassandra/in-memory backend." + ) + def test_person_fullname_unicity(self): + pass + + @pytest.mark.skip(reason="No collision with the tenacious storage") + def test_content_add_collision(self, swh_storage, sample_data): + pass + + @pytest.mark.skip("content_update is not implemented") + def test_content_update(self): + pass + + @pytest.mark.skip("Not supported by Cassandra/InMemory storage") + def test_origin_count(self): + pass + + +class TestTenaciousStorageGeneratedData(_TestStorageGeneratedData): + @pytest.mark.skip("Not supported by Cassandra/InMemory") + def test_origin_count(self): + pass + + @pytest.mark.skip("Not supported by Cassandra/InMemory") + def test_origin_count_with_visit_no_visits(self): + pass + + @pytest.mark.skip("Not supported by Cassandra/InMemory") + def test_origin_count_with_visit_with_visits_and_snapshot(self): + pass + + @pytest.mark.skip("Not supported by Cassandra/InMemory") + def test_origin_count_with_visit_with_visits_no_snapshot(self): + pass + + +# specific tests for the tenacious behavior + + +def get_tenacious_storage(**config): + storage_config = { + "cls": "pipeline", + "steps": [ + {"cls": "validate"}, + {"cls": "tenacious", **config}, + {"cls": "memory"}, + ], + } + + return get_storage(**storage_config) + + +@contextmanager +def disabled_validators(): + attr.set_run_validators(False) + yield + attr.set_run_validators(True) + + +def popid(d): + d.pop("id") + return d + + +testdata = [ + pytest.param( + "content", + list(TEST_OBJECTS["content"]), + attr.evolve(model.Content.from_data(data=b"too big"), length=1000), + attr.evolve(model.Content.from_data(data=b"to fail"), length=1000), + id="content", + ), + pytest.param( + "skipped_content", + list(TEST_OBJECTS["skipped_content"]), + attr.evolve( + model.SkippedContent.from_data(data=b"too big", reason="too big"), + length=1000, + ), + attr.evolve( + model.SkippedContent.from_data(data=b"to fail", reason="to fail"), + length=1000, + ), + id="skipped_content", + ), + pytest.param( + "directory", + list(TEST_OBJECTS["directory"]), + data.directory, + data.directory2, + id="directory", + ), + pytest.param( + "revision", + list(TEST_OBJECTS["revision"]), + data.revision, + data.revision2, + id="revision", + ), + pytest.param( + "release", + list(TEST_OBJECTS["release"]), + data.release, + data.release2, + id="release", + ), + pytest.param( + "snapshot", + list(TEST_OBJECTS["snapshot"]), + data.snapshot, + data.complete_snapshot, + id="snapshot", + ), + pytest.param( + "origin", list(TEST_OBJECTS["origin"]), data.origin, data.origin2, id="origin", + ), +] + + +class LimitedInMemoryStorage(InMemoryStorage): + # forbidden are 'bad1' and 'bad2' arguments of `testdata` + forbidden = [x[0][2] for x in testdata] + [x[0][3] for x in testdata] + + def content_add(self, contents): + return self._maybe_add(super().content_add, "content", contents) + + def skipped_content_add(self, skipped_contents): + return self._maybe_add( + super().skipped_content_add, "skipped_content", skipped_contents + ) + + def origin_add(self, origins): + return self._maybe_add(super().origin_add, "origin", origins) + + def directory_add(self, directories): + return self._maybe_add(super().directory_add, "directory", directories) + + def revision_add(self, revisions): + return self._maybe_add(super().revision_add, "revision", revisions) + + def release_add(self, releases): + return self._maybe_add(super().release_add, "release", releases) + + def snapshot_add(self, snapshots): + return self._maybe_add(super().snapshot_add, "snapshot", snapshots) + + def _maybe_add(self, add_func, object_type, objects): + # forbidden = [c.id for c in collections[object_type]] + if any(c in self.forbidden for c in objects): + raise ValueError( + f"{object_type} is forbidden", + [c.id for c in objects if c in self.forbidden], + ) + return add_func(objects) + + +@patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage) +@pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata) +def test_tenacious_proxy_storage(object_type, objects, bad1, bad2): + storage = get_tenacious_storage() + tenacious = storage.storage + in_memory = tenacious.storage + assert isinstance(tenacious, TenaciousProxyStorage) + assert isinstance(in_memory, LimitedInMemoryStorage) + + size = len(objects) + + add_func = getattr(storage, f"{object_type}_add") + + s = add_func(objects) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 0 + in_memory.reset() + tenacious.reset() + + # bad1 is the last element + s = add_func(objects + [bad1]) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 1 + in_memory.reset() + tenacious.reset() + + # bad1 and bad2 are the last elements + s = add_func(objects + [bad1, bad2]) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset() + + # bad1 is the first element + s = add_func([bad1] + objects) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 1 + in_memory.reset() + tenacious.reset() + + # bad1 and bad2 are the first elements + s = add_func([bad1, bad2] + objects) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset() + + # bad1 is in the middle of the list of inserted elements + s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :]) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 1 + in_memory.reset() + tenacious.reset() + + # bad1 and bad2 are together in the middle of the list of inserted elements + s = add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :]) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset() + + # bad1 and bad2 are spread in the middle of the list of inserted elements + s = add_func( + objects[: size // 3] + + [bad1] + + objects[size // 3 : 2 * (size // 3)] + + [bad2] + + objects[2 * (size // 3) :] + ) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset() + + # bad1 is the only element + s = add_func([bad1]) + assert s.get(f"{object_type}:add", 0) == 0 + assert s.get(f"{object_type}:add:errors", 0) == 1 + in_memory.reset() + tenacious.reset() + + # bad1 and bad2 are the only elements + s = add_func([bad1, bad2]) + assert s.get(f"{object_type}:add", 0) == 0 + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset() + + +@patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage) +@pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata) +def test_tenacious_proxy_storage_rate_limit(object_type, objects, bad1, bad2): + storage = get_tenacious_storage(error_rate_limit={"errors": 1, "total": 3}) + tenacious = storage.storage + in_memory = tenacious.storage + assert isinstance(tenacious, TenaciousProxyStorage) + assert isinstance(in_memory, LimitedInMemoryStorage) + + size = len(objects) + + add_func = getattr(storage, f"{object_type}_add") + + # with no insertion failure, no impact + s = add_func(objects) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 0 + in_memory.reset() + tenacious.reset() + + # with one insertion failure, no impact + s = add_func([bad1] + objects) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 1 + in_memory.reset() + tenacious.reset() + + s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :]) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 1 + in_memory.reset() + tenacious.reset() + + # with two consecutive insertion failures, no further insertions + s = add_func([bad1, bad2] + objects) + assert s.get(f"{object_type}:add", 0) == 0 + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset() + + if size > 2: + # with two consecutive insertion failures, no further insertions + # (errors not at the beginning) + s = add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :]) + assert s.get(f"{object_type}:add", 0) == size // 2 + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset() + + # with two non-consecutive insertion failures, no impact + # (errors are far enough to not reach the rate limit) + s = add_func( + objects[: size // 3] + + [bad1] + + objects[size // 3 : 2 * (size // 3)] + + [bad2] + + objects[2 * (size // 3) :] + ) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset()