diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -14,6 +14,7 @@ "filter", "buffer", "retry", + "tenacious", "validate", "cassandra", } @@ -66,6 +67,8 @@ from .buffer import BufferingProxyStorage as Storage elif cls == "retry": from .retry import RetryingProxyStorage as Storage + elif cls == "tenacious": + from .tenacious import TenaciousProxyStorage as Storage elif cls == "validate": from .validate import ValidatingProxyStorage as Storage diff --git a/swh/storage/tenacious.py b/swh/storage/tenacious.py new file mode 100644 --- /dev/null +++ b/swh/storage/tenacious.py @@ -0,0 +1,124 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +from collections import deque +from functools import partial +from typing import Deque, Dict, Iterable + +from swh.model.model import BaseModel +from swh.storage import get_storage + + +logger = logging.getLogger(__name__) + + +class RateQueue: + def __init__(self, size: int, max_errors: int): + assert size > max_errors + self._size = size + self._max_errors = max_errors + self._errors: Deque[bool] = deque(maxlen=size) + + def add_ok(self, n_ok: int = 1) -> None: + self._errors.extend([False] * n_ok) + + def add_error(self, n_error: int = 1) -> None: + self._errors.extend([True] * n_error) + + def limit_reached(self) -> bool: + return sum(self._errors) > self._max_errors + + def reset(self): + # mainly for testing purpose + self._errors.clear() + + +class TenaciousProxyStorage: + """Storage proxy that have a tenacious insertion behavior. + + When an xxx_add method is called, it's first attempted as is against the backend + storage. Itf a failure occurs, split the list of inserted objects in pieces until + erroneous objects have been identified, so all the valid objects are guaranteed to + be inserted. + + Also provides a error-rate limit feature: if more than n errors occurred during the + insertion of the last p objects, stop accepting any insertion. + + Sample configuration use case for tenacious storage: + + .. code-block:: yaml + + storage: + cls: tenacious + args: + storage: + cls: remote + args: http://storage.internal.staging.swh.network:5002/ + error-rate-limit: + errors: 10 + total: 1000 + """ + + def __init__(self, storage, error_rate_limit=None): + self.storage = get_storage(**storage) + if error_rate_limit is None: + error_rate_limit = {"errors": 10, "total": 1000} + assert "errors" in error_rate_limit + assert "total" in error_rate_limit + self.rate_queue = RateQueue( + size=error_rate_limit["total"], max_errors=error_rate_limit["errors"], + ) + + def __getattr__(self, key): + if key.endswith("_add"): + return partial(self.tenacious_add, key) + return getattr(self.storage, key) + + def tenacious_add(self, func_name, objects: Iterable[BaseModel]) -> Dict[str, int]: + """Enqueue objects to write to the storage. This checks if the queue's + threshold is hit. If it is actually write those to the storage. + + """ + add_function = getattr(self.storage, func_name) + object_type = func_name[:-4] # remove the _add suffix + to_add = [list(objects)] + results: Dict[str, int] = {} + + while to_add: + if self.rate_queue.limit_reached(): + logging.error( + "Too many insertion errors have been detected; " + "disabling insertions" + ) + break + objs = to_add.pop(0) + try: + accumulate(results, add_function(objs)) + self.rate_queue.add_ok(len(objs)) + except Exception as exc: + if len(objs) > 1: + logger.info( + f"{func_name}: failed to insert an objects batch, splitting" + ) + # reinsert objs split in 2 parts at the beginning of to_add + to_add[:0] = [objs[: len(objs) // 2], objs[len(objs) // 2 :]] + else: + logger.error(f"{func_name}: failed to insert an object, exclude it") + # logger.error(f"Exception: {exc}") + logger.exception(f"Exception: {exc}") + accumulate(results, {f"{object_type}:add:errors": 1}) + self.rate_queue.add_error() + return results + + def reset(self): + self.rate_queue.reset() + + +def accumulate(d1, d2): + for k, v in d2.items(): + v += d1.get(k, 0) + d1[k] = v diff --git a/swh/storage/tests/test_tenacious.py b/swh/storage/tests/test_tenacious.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_tenacious.py @@ -0,0 +1,293 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import attr +from contextlib import contextmanager +from unittest.mock import patch + +from swh.storage import get_storage +from swh.journal.tests.journal_data import TEST_OBJECTS +from swh.storage.tests.storage_data import data +from swh.model import model +from swh.storage.in_memory import InMemoryStorage +from swh.storage.tenacious import TenaciousProxyStorage +from swh.storage.tests.test_storage import TestStorage, TestStorageGeneratedData # noqa + +import pytest + + +# generic storage tests (using imported TestStorage* classes) + + +@pytest.fixture +def swh_storage_backend_config(): + yield { + "cls": "memory", + "journal_writer": {"cls": "memory",}, + } + + +# specific tests for the tenacious behavior + + +class LimitedInMemoryStorage(InMemoryStorage): + def content_add(self, content): + if any(c.length > 100 for c in content): + raise ValueError("Content too large") + return super().content_add(content) + + def skipped_content_add(self, content): + if any(c.length > 100 for c in content): + raise ValueError("SkippedContent too large(!)") + return super().skipped_content_add(content) + + def origin_add(self, origins): + forbidden = [c["url"] for c in data.objects["origin"]] + if any(c.url in forbidden for c in origins): + raise ValueError( + "origin is forbidden", [c.url for c in origins if c.url in forbidden] + ) + return super().origin_add(origins) + + def directory_add(self, directories): + return self._maybe_add(super().directory_add, "directory", directories) + + def revision_add(self, revisions): + return self._maybe_add(super().revision_add, "revision", revisions) + + def release_add(self, releases): + return self._maybe_add(super().release_add, "release", releases) + + def snapshot_add(self, snapshots): + return self._maybe_add(super().snapshot_add, "snapshot", snapshots) + + def _maybe_add(self, add_func, object_type, objects): + forbidden = [c["id"] for c in data.objects[object_type]] + if any(c.id in forbidden for c in objects): + raise ValueError( + f"{object_type} is forbidden", + [c.id for c in objects if c.id in forbidden], + ) + return add_func(objects) + + +def get_tenacious_storage(**config): + storage_config = { + "cls": "pipeline", + "steps": [ + {"cls": "validate"}, + {"cls": "tenacious", **config}, + {"cls": "memory"}, + ], + } + + return get_storage(**storage_config) + + +@contextmanager +def disabled_validators(): + attr.set_run_validators(False) + yield + attr.set_run_validators(True) + + +def popid(d): + d.pop("id") + return d + + +testdata = [ + pytest.param( + "content", + list(TEST_OBJECTS["content"]), + attr.evolve(model.Content.from_data(data=b"too big"), length=1000), + attr.evolve(model.Content.from_data(data=b"to fail"), length=1000), + id="content", + ), + pytest.param( + "skipped_content", + list(TEST_OBJECTS["skipped_content"]), + attr.evolve( + model.SkippedContent.from_data(data=b"too big", reason="too big"), + length=1000, + ), + attr.evolve( + model.SkippedContent.from_data(data=b"to fail", reason="to fail"), + length=1000, + ), + id="skipped_content", + ), + pytest.param( + "directory", + list(TEST_OBJECTS["directory"]), + model.Directory.from_dict(data.dir), + model.Directory.from_dict(data.dir2), + id="directory", + ), + pytest.param( + "revision", + list(TEST_OBJECTS["revision"]), + model.Revision.from_dict(data.revision), + model.Revision.from_dict(data.revision2), + id="revision", + ), + pytest.param( + "release", + list(TEST_OBJECTS["release"]), + model.Release.from_dict(data.release), + model.Release.from_dict(data.release2), + id="release", + ), + pytest.param( + "snapshot", + list(TEST_OBJECTS["snapshot"]), + model.Snapshot.from_dict(data.snapshot), + model.Snapshot.from_dict(data.complete_snapshot), + id="snapshot", + ), + pytest.param( + "origin", + list(TEST_OBJECTS["origin"]), + model.Origin.from_dict(data.origin), + model.Origin.from_dict(data.origin2), + id="origin", + ), +] + + +def subdict(d): + keys = ("add", "errors") + return {k: v for k, v in d.items() if k.split(":")[-1] in keys} + + +@patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage) +@pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata) +def test_tenacious_proxy_storage(object_type, objects, bad1, bad2): + storage = get_tenacious_storage() + tenacious = storage.storage + in_memory = tenacious.storage + assert isinstance(tenacious, TenaciousProxyStorage) + assert isinstance(in_memory, LimitedInMemoryStorage) + size = len(objects) + + add_func = getattr(storage, f"{object_type}_add") + s = add_func(objects) + assert subdict(s) == {f"{object_type}:add": size} + + in_memory.reset() + tenacious.reset() + s = add_func(objects + [bad1]) + # c has not been inserted + assert subdict(s) == {f"{object_type}:add": size, f"{object_type}:add:errors": 1} + in_memory.reset() + tenacious.reset() + s = add_func(objects + [bad1, bad2]) + # c has not been inserted + assert subdict(s) == {f"{object_type}:add": size, f"{object_type}:add:errors": 2} + + in_memory.reset() + tenacious.reset() + s = add_func([bad1] + objects) + # c has not been inserted + assert subdict(s) == {f"{object_type}:add": size, f"{object_type}:add:errors": 1} + in_memory.reset() + tenacious.reset() + s = add_func([bad1, bad2] + objects) + # c has not bee inserted + assert subdict(s) == {f"{object_type}:add": size, f"{object_type}:add:errors": 2} + + in_memory.reset() + tenacious.reset() + s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :]) + # c has not been inserted + assert subdict(s) == {f"{object_type}:add": size, f"{object_type}:add:errors": 1} + in_memory.reset() + tenacious.reset() + s = add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :]) + # c has not been inserted + assert subdict(s) == {f"{object_type}:add": size, f"{object_type}:add:errors": 2} + + in_memory.reset() + tenacious.reset() + s = add_func( + objects[: size // 3] + + [bad1] + + objects[size // 3 : 2 * (size // 3)] + + [bad2] + + objects[2 * (size // 3) :] + ) + # c has not beed inserted + assert subdict(s) == {f"{object_type}:add": size, f"{object_type}:add:errors": 2} + + in_memory.reset() + tenacious.reset() + s = add_func([bad1]) + assert subdict(s) == {f"{object_type}:add:errors": 1} + + in_memory.reset() + tenacious.reset() + s = add_func([bad1, bad2]) + assert subdict(s) == {f"{object_type}:add:errors": 2} + + +@patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage) +@pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata) +def test_tenacious_proxy_storage_rate_limit(object_type, objects, bad1, bad2): + storage = get_tenacious_storage(error_rate_limit={"errors": 1, "total": 3}) + tenacious = storage.storage + in_memory = tenacious.storage + assert isinstance(tenacious, TenaciousProxyStorage) + assert isinstance(in_memory, LimitedInMemoryStorage) + size = len(objects) + add_func = getattr(storage, f"{object_type}_add") + + # with no insertion failure, no impact + s = add_func(objects) + assert subdict(s) == {f"{object_type}:add": size} + in_memory.reset() + tenacious.reset() + + # with one insertion failure, no impact + s = add_func([bad1] + objects) + assert subdict(s) == {f"{object_type}:add": size, f"{object_type}:add:errors": 1} + in_memory.reset() + tenacious.reset() + s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :]) + assert subdict(s) == {f"{object_type}:add": size, f"{object_type}:add:errors": 1} + in_memory.reset() + tenacious.reset() + + # with two consecutive insertion failures, no further insertions + s = add_func([bad1, bad2] + objects) + assert subdict(s) == {f"{object_type}:add:errors": 2} + in_memory.reset() + tenacious.reset() + + if size > 2: + # with two consecutive insertion failures, no further insertions + # (errors not at the beginning) + s = add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :]) + assert subdict(s) == { + f"{object_type}:add": size // 2, + f"{object_type}:add:errors": 2, + } + in_memory.reset() + tenacious.reset() + + # with two non-consecutive insertion failures, no impact + # (errors are far enough to not reach the rate limit) + s = add_func( + objects[: size // 3] + + [bad1] + + objects[size // 3 : 2 * (size // 3)] + + [bad2] + + objects[2 * (size // 3) :] + ) + assert subdict(s) == { + f"{object_type}:add": size, + f"{object_type}:add:errors": 2, + } + in_memory.reset() + tenacious.reset()