Page MenuHomeSoftware Heritage

D3334.diff
No OneTemporary

D3334.diff

diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -20,6 +20,7 @@
"retry": ".retry.RetryingProxyStorage",
"cassandra": ".cassandra.CassandraStorage",
"validate": ".validate.ValidatingProxyStorage",
+ "tenacious": ".tenacious.TenaciousProxyStorage",
}
diff --git a/swh/storage/tenacious.py b/swh/storage/tenacious.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tenacious.py
@@ -0,0 +1,141 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from collections import Counter, deque
+from functools import partial
+import logging
+from typing import Counter as CounterT
+from typing import Deque, Dict, Iterable, List
+
+from swh.model.model import BaseModel
+from swh.storage import get_storage
+
+logger = logging.getLogger(__name__)
+
+
+class RateQueue:
+ def __init__(self, size: int, max_errors: int):
+ assert size > max_errors
+ self._size = size
+ self._max_errors = max_errors
+ self._errors: Deque[bool] = deque(maxlen=size)
+
+ def add_ok(self, n_ok: int = 1) -> None:
+ self._errors.extend([False] * n_ok)
+
+ def add_error(self, n_error: int = 1) -> None:
+ self._errors.extend([True] * n_error)
+
+ def limit_reached(self) -> bool:
+ return sum(self._errors) > self._max_errors
+
+ def reset(self):
+ # mainly for testing purpose
+ self._errors.clear()
+
+
+class TenaciousProxyStorage:
+ """Storage proxy that have a tenacious insertion behavior.
+
+ When an xxx_add method is called, it's first attempted as is against the backend
+ storage. If a failure occurs, split the list of inserted objects in pieces until
+ erroneous objects have been identified, so all the valid objects are guaranteed to
+ be inserted.
+
+ Also provides a error-rate limit feature: if more than n errors occurred during the
+ insertion of the last p (window_size) objects, stop accepting any insertion.
+
+ This proxy is mainly intended to be used in a replayer configuration (aka a
+ mirror stack), where insertion errors are mostly unexpected (which explains
+ the low default ratio errors/window_size).
+
+ Sample configuration use case for tenacious storage:
+
+ .. code-block:: yaml
+
+ storage:
+ cls: tenacious
+ storage:
+ cls: remote
+ args: http://storage.internal.staging.swh.network:5002/
+ error-rate-limit:
+ errors: 10
+ window_size: 1000
+
+ """
+
+ tenacious_methods = (
+ "content_add",
+ "skipped_content_add",
+ "directory_add",
+ "revision_add",
+ "extid_add",
+ "release_add",
+ "snapshot_add",
+ "origin_add",
+ )
+
+ def __init__(self, storage, error_rate_limit=None):
+ self.storage = get_storage(**storage)
+ if error_rate_limit is None:
+ error_rate_limit = {"errors": 10, "window_size": 1000}
+ assert "errors" in error_rate_limit
+ assert "window_size" in error_rate_limit
+ self.rate_queue = RateQueue(
+ size=error_rate_limit["window_size"], max_errors=error_rate_limit["errors"],
+ )
+
+ def __getattr__(self, key):
+ if key in self.tenacious_methods:
+ return partial(self._tenacious_add, key)
+ return getattr(self.storage, key)
+
+ def _tenacious_add(self, func_name, objects: Iterable[BaseModel]) -> Dict[str, int]:
+ """Enqueue objects to write to the storage. This checks if the queue's
+ threshold is hit. If it is actually write those to the storage.
+
+ """
+ add_function = getattr(self.storage, func_name)
+ object_type = func_name[:-4] # remove the _add suffix
+
+ # list of lists of objects; note this to_add list is consumed from the tail
+ to_add: List[List[BaseModel]] = [list(objects)]
+ results: CounterT[str] = Counter()
+
+ while to_add:
+ if self.rate_queue.limit_reached():
+ logging.error(
+ "Too many insertion errors have been detected; "
+ "disabling insertions"
+ )
+ raise RuntimeError(
+ "Too many insertion errors have been detected; "
+ "disabling insertions"
+ )
+ objs = to_add.pop()
+ try:
+ results.update(add_function(objs))
+ self.rate_queue.add_ok(len(objs))
+ except Exception as exc:
+ if len(objs) > 1:
+ logger.info(
+ f"{func_name}: failed to insert a batch of "
+ f"{len(objs)} {object_type} objects, splitting"
+ )
+ # reinsert objs split in 2 parts at the end of to_add
+ to_add.append(objs[(len(objs) // 2) :])
+ to_add.append(objs[: (len(objs) // 2)])
+ else:
+ logger.error(
+ f"{func_name}: failed to insert an object, excluding {objs}"
+ )
+ # logger.error(f"Exception: {exc}")
+ logger.exception(f"Exception was: {exc}")
+ results.update({f"{object_type}:add:errors": 1})
+ self.rate_queue.add_error()
+ return dict(results)
+
+ def reset(self):
+ self.rate_queue.reset()
diff --git a/swh/storage/tests/test_tenacious.py b/swh/storage/tests/test_tenacious.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_tenacious.py
@@ -0,0 +1,366 @@
+# Copyright (C) 2020-2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from contextlib import contextmanager
+from unittest.mock import patch
+
+import attr
+import pytest
+
+from swh.model import model
+from swh.model.tests.swh_model_data import TEST_OBJECTS
+from swh.storage import get_storage
+from swh.storage.in_memory import InMemoryStorage
+from swh.storage.tenacious import TenaciousProxyStorage
+from swh.storage.tests.storage_data import StorageData
+from swh.storage.tests.storage_tests import (
+ TestStorageGeneratedData as _TestStorageGeneratedData,
+)
+from swh.storage.tests.storage_tests import TestStorage as _TestStorage # noqa
+
+data = StorageData()
+collections = {
+ "origin": data.origins,
+ "content": data.contents,
+ "skipped_content": data.skipped_contents,
+ "revision": data.revisions,
+ "directory": data.directories,
+ "release": data.releases,
+ "snapshot": data.snapshots,
+}
+# generic storage tests (using imported TestStorage* classes)
+
+
+@pytest.fixture
+def swh_storage_backend_config2():
+ yield {
+ "cls": "memory",
+ "journal_writer": {"cls": "memory",},
+ }
+
+
+@pytest.fixture
+def swh_storage():
+ storage_config = {
+ "cls": "pipeline",
+ "steps": [
+ {"cls": "tenacious"},
+ {"cls": "memory", "journal_writer": {"cls": "memory",}},
+ ],
+ }
+
+ storage = get_storage(**storage_config)
+ storage.journal_writer = storage.storage.journal_writer
+ return storage
+
+
+class TestTenaciousStorage(_TestStorage):
+ @pytest.mark.skip(
+ 'The "person" table of the pgsql is a legacy thing, and not '
+ "supported by the cassandra/in-memory backend."
+ )
+ def test_person_fullname_unicity(self):
+ pass
+
+ @pytest.mark.skip(reason="No collision with the tenacious storage")
+ def test_content_add_collision(self, swh_storage, sample_data):
+ pass
+
+ @pytest.mark.skip("content_update is not implemented")
+ def test_content_update(self):
+ pass
+
+ @pytest.mark.skip("Not supported by Cassandra/InMemory storage")
+ def test_origin_count(self):
+ pass
+
+
+class TestTenaciousStorageGeneratedData(_TestStorageGeneratedData):
+ @pytest.mark.skip("Not supported by Cassandra/InMemory")
+ def test_origin_count(self):
+ pass
+
+ @pytest.mark.skip("Not supported by Cassandra/InMemory")
+ def test_origin_count_with_visit_no_visits(self):
+ pass
+
+ @pytest.mark.skip("Not supported by Cassandra/InMemory")
+ def test_origin_count_with_visit_with_visits_and_snapshot(self):
+ pass
+
+ @pytest.mark.skip("Not supported by Cassandra/InMemory")
+ def test_origin_count_with_visit_with_visits_no_snapshot(self):
+ pass
+
+
+# specific tests for the tenacious behavior
+
+
+def get_tenacious_storage(**config):
+ storage_config = {
+ "cls": "pipeline",
+ "steps": [
+ {"cls": "validate"},
+ {"cls": "tenacious", **config},
+ {"cls": "memory"},
+ ],
+ }
+
+ return get_storage(**storage_config)
+
+
+@contextmanager
+def disabled_validators():
+ attr.set_run_validators(False)
+ yield
+ attr.set_run_validators(True)
+
+
+def popid(d):
+ d.pop("id")
+ return d
+
+
+testdata = [
+ pytest.param(
+ "content",
+ list(TEST_OBJECTS["content"]),
+ attr.evolve(model.Content.from_data(data=b"too big"), length=1000),
+ attr.evolve(model.Content.from_data(data=b"to fail"), length=1000),
+ id="content",
+ ),
+ pytest.param(
+ "skipped_content",
+ list(TEST_OBJECTS["skipped_content"]),
+ attr.evolve(
+ model.SkippedContent.from_data(data=b"too big", reason="too big"),
+ length=1000,
+ ),
+ attr.evolve(
+ model.SkippedContent.from_data(data=b"to fail", reason="to fail"),
+ length=1000,
+ ),
+ id="skipped_content",
+ ),
+ pytest.param(
+ "directory",
+ list(TEST_OBJECTS["directory"]),
+ data.directory,
+ data.directory2,
+ id="directory",
+ ),
+ pytest.param(
+ "revision",
+ list(TEST_OBJECTS["revision"]),
+ data.revision,
+ data.revision2,
+ id="revision",
+ ),
+ pytest.param(
+ "release",
+ list(TEST_OBJECTS["release"]),
+ data.release,
+ data.release2,
+ id="release",
+ ),
+ pytest.param(
+ "snapshot",
+ list(TEST_OBJECTS["snapshot"]),
+ data.snapshot,
+ data.complete_snapshot,
+ id="snapshot",
+ ),
+ pytest.param(
+ "origin", list(TEST_OBJECTS["origin"]), data.origin, data.origin2, id="origin",
+ ),
+]
+
+
+class LimitedInMemoryStorage(InMemoryStorage):
+ # forbidden are 'bad1' and 'bad2' arguments of `testdata`
+ forbidden = [x[0][2] for x in testdata] + [x[0][3] for x in testdata]
+
+ def content_add(self, contents):
+ return self._maybe_add(super().content_add, "content", contents)
+
+ def skipped_content_add(self, skipped_contents):
+ return self._maybe_add(
+ super().skipped_content_add, "skipped_content", skipped_contents
+ )
+
+ def origin_add(self, origins):
+ return self._maybe_add(super().origin_add, "origin", origins)
+
+ def directory_add(self, directories):
+ return self._maybe_add(super().directory_add, "directory", directories)
+
+ def revision_add(self, revisions):
+ return self._maybe_add(super().revision_add, "revision", revisions)
+
+ def release_add(self, releases):
+ return self._maybe_add(super().release_add, "release", releases)
+
+ def snapshot_add(self, snapshots):
+ return self._maybe_add(super().snapshot_add, "snapshot", snapshots)
+
+ def _maybe_add(self, add_func, object_type, objects):
+ # forbidden = [c.id for c in collections[object_type]]
+ if any(c in self.forbidden for c in objects):
+ raise ValueError(
+ f"{object_type} is forbidden",
+ [c.unique_key() for c in objects if c in self.forbidden],
+ )
+ return add_func(objects)
+
+
+@patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage)
+@pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata)
+def test_tenacious_proxy_storage(object_type, objects, bad1, bad2):
+ storage = get_tenacious_storage()
+ tenacious = storage.storage
+ in_memory = tenacious.storage
+ assert isinstance(tenacious, TenaciousProxyStorage)
+ assert isinstance(in_memory, LimitedInMemoryStorage)
+
+ size = len(objects)
+
+ add_func = getattr(storage, f"{object_type}_add")
+
+ s = add_func(objects)
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 0
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 is the last element
+ s = add_func(objects + [bad1])
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 1
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 and bad2 are the last elements
+ s = add_func(objects + [bad1, bad2])
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 2
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 is the first element
+ s = add_func([bad1] + objects)
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 1
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 and bad2 are the first elements
+ s = add_func([bad1, bad2] + objects)
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 2
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 is in the middle of the list of inserted elements
+ s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :])
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 1
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 and bad2 are together in the middle of the list of inserted elements
+ s = add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :])
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 2
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 and bad2 are spread in the middle of the list of inserted elements
+ s = add_func(
+ objects[: size // 3]
+ + [bad1]
+ + objects[size // 3 : 2 * (size // 3)]
+ + [bad2]
+ + objects[2 * (size // 3) :]
+ )
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 2
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 is the only element
+ s = add_func([bad1])
+ assert s.get(f"{object_type}:add", 0) == 0
+ assert s.get(f"{object_type}:add:errors", 0) == 1
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 and bad2 are the only elements
+ s = add_func([bad1, bad2])
+ assert s.get(f"{object_type}:add", 0) == 0
+ assert s.get(f"{object_type}:add:errors", 0) == 2
+ in_memory.reset()
+ tenacious.reset()
+
+
+@patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage)
+@pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata)
+def test_tenacious_proxy_storage_rate_limit(object_type, objects, bad1, bad2):
+ storage = get_tenacious_storage(error_rate_limit={"errors": 1, "window_size": 3})
+ tenacious = storage.storage
+ in_memory = tenacious.storage
+ assert isinstance(tenacious, TenaciousProxyStorage)
+ assert isinstance(in_memory, LimitedInMemoryStorage)
+
+ size = len(objects)
+
+ add_func = getattr(storage, f"{object_type}_add")
+
+ # with no insertion failure, no impact
+ s = add_func(objects)
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 0
+ in_memory.reset()
+ tenacious.reset()
+
+ # with one insertion failure, no impact
+ s = add_func([bad1] + objects)
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 1
+ in_memory.reset()
+ tenacious.reset()
+
+ s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :])
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 1
+ in_memory.reset()
+ tenacious.reset()
+
+ # with two consecutive insertion failures, exception is raised
+ with pytest.raises(RuntimeError, match="Too many insertion errors"):
+ add_func([bad1, bad2] + objects)
+ in_memory.reset()
+ tenacious.reset()
+
+ if size > 2:
+ # with two consecutive insertion failures, exception is raised
+ # (errors not at the beginning)
+ with pytest.raises(RuntimeError, match="Too many insertion errors"):
+ add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :])
+ in_memory.reset()
+ tenacious.reset()
+
+ # with two non-consecutive insertion failures, no impact
+ # (errors are far enough to not reach the rate limit)
+ s = add_func(
+ objects[: size // 3]
+ + [bad1]
+ + objects[size // 3 : 2 * (size // 3)]
+ + [bad2]
+ + objects[2 * (size // 3) :]
+ )
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 2
+ in_memory.reset()
+ tenacious.reset()

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 17, 12:49 AM (2 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3218020

Event Timeline