diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py index 5518f91c..b96fea7d 100644 --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -1,111 +1,112 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import importlib from typing import TYPE_CHECKING, Any, Dict, List import warnings if TYPE_CHECKING: from .interface import StorageInterface STORAGE_IMPLEMENTATIONS = { "local": ".postgresql.storage.Storage", "remote": ".api.client.RemoteStorage", "memory": ".in_memory.InMemoryStorage", "filter": ".filter.FilteringProxyStorage", "buffer": ".buffer.BufferingProxyStorage", "retry": ".retry.RetryingProxyStorage", "cassandra": ".cassandra.CassandraStorage", "validate": ".validate.ValidatingProxyStorage", + "tenacious": ".tenacious.TenaciousProxyStorage", } def get_storage(cls: str, **kwargs) -> "StorageInterface": """Get a storage object of class `storage_class` with arguments `storage_args`. Args: cls (str): storage's class, can be: - ``local`` to use a postgresql database - ``cassandra`` to use a cassandra database - ``remote`` to connect to a swh-storage server - ``memory`` for an in-memory storage, useful for fast tests - ``filter``, ``buffer``, ... to use specific storage "proxies", see their respective documentations args (dict): dictionary with keys Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] if cls == "pipeline": return get_storage_pipeline(**kwargs) class_path = STORAGE_IMPLEMENTATIONS.get(cls) if class_path is None: raise ValueError( "Unknown storage class `%s`. Supported: %s" % (cls, ", ".join(STORAGE_IMPLEMENTATIONS)) ) (module_path, class_name) = class_path.rsplit(".", 1) module = importlib.import_module(module_path, package=__package__) Storage = getattr(module, class_name) check_config = kwargs.pop("check_config", {}) storage = Storage(**kwargs) if check_config: if not storage.check_config(**check_config): raise EnvironmentError("storage check config failed") return storage def get_storage_pipeline( steps: List[Dict[str, Any]], check_config=None ) -> "StorageInterface": """Recursively get a storage object that may use other storage objects as backends. Args: steps (List[dict]): List of dicts that may be used as kwargs for `get_storage`. Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ storage_config = None for step in reversed(steps): if "args" in step: warnings.warn( 'Explicit "args" key is deprecated, use keys directly ' "instead.", DeprecationWarning, ) step = { "cls": step["cls"], **step["args"], } if storage_config: step["storage"] = storage_config step["check_config"] = check_config storage_config = step if storage_config is None: raise ValueError("'pipeline' has no steps.") return get_storage(**storage_config) diff --git a/swh/storage/tenacious.py b/swh/storage/tenacious.py new file mode 100644 index 00000000..40224cad --- /dev/null +++ b/swh/storage/tenacious.py @@ -0,0 +1,141 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import Counter, deque +from functools import partial +import logging +from typing import Counter as CounterT +from typing import Deque, Dict, Iterable, List + +from swh.model.model import BaseModel +from swh.storage import get_storage + +logger = logging.getLogger(__name__) + + +class RateQueue: + def __init__(self, size: int, max_errors: int): + assert size > max_errors + self._size = size + self._max_errors = max_errors + self._errors: Deque[bool] = deque(maxlen=size) + + def add_ok(self, n_ok: int = 1) -> None: + self._errors.extend([False] * n_ok) + + def add_error(self, n_error: int = 1) -> None: + self._errors.extend([True] * n_error) + + def limit_reached(self) -> bool: + return sum(self._errors) > self._max_errors + + def reset(self): + # mainly for testing purpose + self._errors.clear() + + +class TenaciousProxyStorage: + """Storage proxy that have a tenacious insertion behavior. + + When an xxx_add method is called, it's first attempted as is against the backend + storage. If a failure occurs, split the list of inserted objects in pieces until + erroneous objects have been identified, so all the valid objects are guaranteed to + be inserted. + + Also provides a error-rate limit feature: if more than n errors occurred during the + insertion of the last p (window_size) objects, stop accepting any insertion. + + This proxy is mainly intended to be used in a replayer configuration (aka a + mirror stack), where insertion errors are mostly unexpected (which explains + the low default ratio errors/window_size). + + Sample configuration use case for tenacious storage: + + .. code-block:: yaml + + storage: + cls: tenacious + storage: + cls: remote + args: http://storage.internal.staging.swh.network:5002/ + error-rate-limit: + errors: 10 + window_size: 1000 + + """ + + tenacious_methods = ( + "content_add", + "skipped_content_add", + "directory_add", + "revision_add", + "extid_add", + "release_add", + "snapshot_add", + "origin_add", + ) + + def __init__(self, storage, error_rate_limit=None): + self.storage = get_storage(**storage) + if error_rate_limit is None: + error_rate_limit = {"errors": 10, "window_size": 1000} + assert "errors" in error_rate_limit + assert "window_size" in error_rate_limit + self.rate_queue = RateQueue( + size=error_rate_limit["window_size"], max_errors=error_rate_limit["errors"], + ) + + def __getattr__(self, key): + if key in self.tenacious_methods: + return partial(self._tenacious_add, key) + return getattr(self.storage, key) + + def _tenacious_add(self, func_name, objects: Iterable[BaseModel]) -> Dict[str, int]: + """Enqueue objects to write to the storage. This checks if the queue's + threshold is hit. If it is actually write those to the storage. + + """ + add_function = getattr(self.storage, func_name) + object_type = func_name[:-4] # remove the _add suffix + + # list of lists of objects; note this to_add list is consumed from the tail + to_add: List[List[BaseModel]] = [list(objects)] + results: CounterT[str] = Counter() + + while to_add: + if self.rate_queue.limit_reached(): + logging.error( + "Too many insertion errors have been detected; " + "disabling insertions" + ) + raise RuntimeError( + "Too many insertion errors have been detected; " + "disabling insertions" + ) + objs = to_add.pop() + try: + results.update(add_function(objs)) + self.rate_queue.add_ok(len(objs)) + except Exception as exc: + if len(objs) > 1: + logger.info( + f"{func_name}: failed to insert a batch of " + f"{len(objs)} {object_type} objects, splitting" + ) + # reinsert objs split in 2 parts at the end of to_add + to_add.append(objs[(len(objs) // 2) :]) + to_add.append(objs[: (len(objs) // 2)]) + else: + logger.error( + f"{func_name}: failed to insert an object, excluding {objs}" + ) + # logger.error(f"Exception: {exc}") + logger.exception(f"Exception was: {exc}") + results.update({f"{object_type}:add:errors": 1}) + self.rate_queue.add_error() + return dict(results) + + def reset(self): + self.rate_queue.reset() diff --git a/swh/storage/tests/test_tenacious.py b/swh/storage/tests/test_tenacious.py new file mode 100644 index 00000000..e85ec2b3 --- /dev/null +++ b/swh/storage/tests/test_tenacious.py @@ -0,0 +1,366 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from contextlib import contextmanager +from unittest.mock import patch + +import attr +import pytest + +from swh.model import model +from swh.model.tests.swh_model_data import TEST_OBJECTS +from swh.storage import get_storage +from swh.storage.in_memory import InMemoryStorage +from swh.storage.tenacious import TenaciousProxyStorage +from swh.storage.tests.storage_data import StorageData +from swh.storage.tests.storage_tests import ( + TestStorageGeneratedData as _TestStorageGeneratedData, +) +from swh.storage.tests.storage_tests import TestStorage as _TestStorage # noqa + +data = StorageData() +collections = { + "origin": data.origins, + "content": data.contents, + "skipped_content": data.skipped_contents, + "revision": data.revisions, + "directory": data.directories, + "release": data.releases, + "snapshot": data.snapshots, +} +# generic storage tests (using imported TestStorage* classes) + + +@pytest.fixture +def swh_storage_backend_config2(): + yield { + "cls": "memory", + "journal_writer": {"cls": "memory",}, + } + + +@pytest.fixture +def swh_storage(): + storage_config = { + "cls": "pipeline", + "steps": [ + {"cls": "tenacious"}, + {"cls": "memory", "journal_writer": {"cls": "memory",}}, + ], + } + + storage = get_storage(**storage_config) + storage.journal_writer = storage.storage.journal_writer + return storage + + +class TestTenaciousStorage(_TestStorage): + @pytest.mark.skip( + 'The "person" table of the pgsql is a legacy thing, and not ' + "supported by the cassandra/in-memory backend." + ) + def test_person_fullname_unicity(self): + pass + + @pytest.mark.skip(reason="No collision with the tenacious storage") + def test_content_add_collision(self, swh_storage, sample_data): + pass + + @pytest.mark.skip("content_update is not implemented") + def test_content_update(self): + pass + + @pytest.mark.skip("Not supported by Cassandra/InMemory storage") + def test_origin_count(self): + pass + + +class TestTenaciousStorageGeneratedData(_TestStorageGeneratedData): + @pytest.mark.skip("Not supported by Cassandra/InMemory") + def test_origin_count(self): + pass + + @pytest.mark.skip("Not supported by Cassandra/InMemory") + def test_origin_count_with_visit_no_visits(self): + pass + + @pytest.mark.skip("Not supported by Cassandra/InMemory") + def test_origin_count_with_visit_with_visits_and_snapshot(self): + pass + + @pytest.mark.skip("Not supported by Cassandra/InMemory") + def test_origin_count_with_visit_with_visits_no_snapshot(self): + pass + + +# specific tests for the tenacious behavior + + +def get_tenacious_storage(**config): + storage_config = { + "cls": "pipeline", + "steps": [ + {"cls": "validate"}, + {"cls": "tenacious", **config}, + {"cls": "memory"}, + ], + } + + return get_storage(**storage_config) + + +@contextmanager +def disabled_validators(): + attr.set_run_validators(False) + yield + attr.set_run_validators(True) + + +def popid(d): + d.pop("id") + return d + + +testdata = [ + pytest.param( + "content", + list(TEST_OBJECTS["content"]), + attr.evolve(model.Content.from_data(data=b"too big"), length=1000), + attr.evolve(model.Content.from_data(data=b"to fail"), length=1000), + id="content", + ), + pytest.param( + "skipped_content", + list(TEST_OBJECTS["skipped_content"]), + attr.evolve( + model.SkippedContent.from_data(data=b"too big", reason="too big"), + length=1000, + ), + attr.evolve( + model.SkippedContent.from_data(data=b"to fail", reason="to fail"), + length=1000, + ), + id="skipped_content", + ), + pytest.param( + "directory", + list(TEST_OBJECTS["directory"]), + data.directory, + data.directory2, + id="directory", + ), + pytest.param( + "revision", + list(TEST_OBJECTS["revision"]), + data.revision, + data.revision2, + id="revision", + ), + pytest.param( + "release", + list(TEST_OBJECTS["release"]), + data.release, + data.release2, + id="release", + ), + pytest.param( + "snapshot", + list(TEST_OBJECTS["snapshot"]), + data.snapshot, + data.complete_snapshot, + id="snapshot", + ), + pytest.param( + "origin", list(TEST_OBJECTS["origin"]), data.origin, data.origin2, id="origin", + ), +] + + +class LimitedInMemoryStorage(InMemoryStorage): + # forbidden are 'bad1' and 'bad2' arguments of `testdata` + forbidden = [x[0][2] for x in testdata] + [x[0][3] for x in testdata] + + def content_add(self, contents): + return self._maybe_add(super().content_add, "content", contents) + + def skipped_content_add(self, skipped_contents): + return self._maybe_add( + super().skipped_content_add, "skipped_content", skipped_contents + ) + + def origin_add(self, origins): + return self._maybe_add(super().origin_add, "origin", origins) + + def directory_add(self, directories): + return self._maybe_add(super().directory_add, "directory", directories) + + def revision_add(self, revisions): + return self._maybe_add(super().revision_add, "revision", revisions) + + def release_add(self, releases): + return self._maybe_add(super().release_add, "release", releases) + + def snapshot_add(self, snapshots): + return self._maybe_add(super().snapshot_add, "snapshot", snapshots) + + def _maybe_add(self, add_func, object_type, objects): + # forbidden = [c.id for c in collections[object_type]] + if any(c in self.forbidden for c in objects): + raise ValueError( + f"{object_type} is forbidden", + [c.unique_key() for c in objects if c in self.forbidden], + ) + return add_func(objects) + + +@patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage) +@pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata) +def test_tenacious_proxy_storage(object_type, objects, bad1, bad2): + storage = get_tenacious_storage() + tenacious = storage.storage + in_memory = tenacious.storage + assert isinstance(tenacious, TenaciousProxyStorage) + assert isinstance(in_memory, LimitedInMemoryStorage) + + size = len(objects) + + add_func = getattr(storage, f"{object_type}_add") + + s = add_func(objects) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 0 + in_memory.reset() + tenacious.reset() + + # bad1 is the last element + s = add_func(objects + [bad1]) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 1 + in_memory.reset() + tenacious.reset() + + # bad1 and bad2 are the last elements + s = add_func(objects + [bad1, bad2]) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset() + + # bad1 is the first element + s = add_func([bad1] + objects) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 1 + in_memory.reset() + tenacious.reset() + + # bad1 and bad2 are the first elements + s = add_func([bad1, bad2] + objects) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset() + + # bad1 is in the middle of the list of inserted elements + s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :]) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 1 + in_memory.reset() + tenacious.reset() + + # bad1 and bad2 are together in the middle of the list of inserted elements + s = add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :]) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset() + + # bad1 and bad2 are spread in the middle of the list of inserted elements + s = add_func( + objects[: size // 3] + + [bad1] + + objects[size // 3 : 2 * (size // 3)] + + [bad2] + + objects[2 * (size // 3) :] + ) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset() + + # bad1 is the only element + s = add_func([bad1]) + assert s.get(f"{object_type}:add", 0) == 0 + assert s.get(f"{object_type}:add:errors", 0) == 1 + in_memory.reset() + tenacious.reset() + + # bad1 and bad2 are the only elements + s = add_func([bad1, bad2]) + assert s.get(f"{object_type}:add", 0) == 0 + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset() + + +@patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage) +@pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata) +def test_tenacious_proxy_storage_rate_limit(object_type, objects, bad1, bad2): + storage = get_tenacious_storage(error_rate_limit={"errors": 1, "window_size": 3}) + tenacious = storage.storage + in_memory = tenacious.storage + assert isinstance(tenacious, TenaciousProxyStorage) + assert isinstance(in_memory, LimitedInMemoryStorage) + + size = len(objects) + + add_func = getattr(storage, f"{object_type}_add") + + # with no insertion failure, no impact + s = add_func(objects) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 0 + in_memory.reset() + tenacious.reset() + + # with one insertion failure, no impact + s = add_func([bad1] + objects) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 1 + in_memory.reset() + tenacious.reset() + + s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :]) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 1 + in_memory.reset() + tenacious.reset() + + # with two consecutive insertion failures, exception is raised + with pytest.raises(RuntimeError, match="Too many insertion errors"): + add_func([bad1, bad2] + objects) + in_memory.reset() + tenacious.reset() + + if size > 2: + # with two consecutive insertion failures, exception is raised + # (errors not at the beginning) + with pytest.raises(RuntimeError, match="Too many insertion errors"): + add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :]) + in_memory.reset() + tenacious.reset() + + # with two non-consecutive insertion failures, no impact + # (errors are far enough to not reach the rate limit) + s = add_func( + objects[: size // 3] + + [bad1] + + objects[size // 3 : 2 * (size // 3)] + + [bad2] + + objects[2 * (size // 3) :] + ) + assert s.get(f"{object_type}:add", 0) == size + assert s.get(f"{object_type}:add:errors", 0) == 2 + in_memory.reset() + tenacious.reset()