Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7122793
D3334.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Subscribers
None
D3334.diff
View Options
diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -20,6 +20,7 @@
"retry": ".retry.RetryingProxyStorage",
"cassandra": ".cassandra.CassandraStorage",
"validate": ".validate.ValidatingProxyStorage",
+ "tenacious": ".tenacious.TenaciousProxyStorage",
}
diff --git a/swh/storage/tenacious.py b/swh/storage/tenacious.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tenacious.py
@@ -0,0 +1,141 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from collections import Counter, deque
+from functools import partial
+import logging
+from typing import Counter as CounterT
+from typing import Deque, Dict, Iterable, List
+
+from swh.model.model import BaseModel
+from swh.storage import get_storage
+
+logger = logging.getLogger(__name__)
+
+
+class RateQueue:
+ def __init__(self, size: int, max_errors: int):
+ assert size > max_errors
+ self._size = size
+ self._max_errors = max_errors
+ self._errors: Deque[bool] = deque(maxlen=size)
+
+ def add_ok(self, n_ok: int = 1) -> None:
+ self._errors.extend([False] * n_ok)
+
+ def add_error(self, n_error: int = 1) -> None:
+ self._errors.extend([True] * n_error)
+
+ def limit_reached(self) -> bool:
+ return sum(self._errors) > self._max_errors
+
+ def reset(self):
+ # mainly for testing purpose
+ self._errors.clear()
+
+
+class TenaciousProxyStorage:
+ """Storage proxy that have a tenacious insertion behavior.
+
+ When an xxx_add method is called, it's first attempted as is against the backend
+ storage. If a failure occurs, split the list of inserted objects in pieces until
+ erroneous objects have been identified, so all the valid objects are guaranteed to
+ be inserted.
+
+ Also provides a error-rate limit feature: if more than n errors occurred during the
+ insertion of the last p (window_size) objects, stop accepting any insertion.
+
+ This proxy is mainly intended to be used in a replayer configuration (aka a
+ mirror stack), where insertion errors are mostly unexpected (which explains
+ the low default ratio errors/window_size).
+
+ Sample configuration use case for tenacious storage:
+
+ .. code-block:: yaml
+
+ storage:
+ cls: tenacious
+ storage:
+ cls: remote
+ args: http://storage.internal.staging.swh.network:5002/
+ error-rate-limit:
+ errors: 10
+ window_size: 1000
+
+ """
+
+ tenacious_methods = (
+ "content_add",
+ "skipped_content_add",
+ "directory_add",
+ "revision_add",
+ "extid_add",
+ "release_add",
+ "snapshot_add",
+ "origin_add",
+ )
+
+ def __init__(self, storage, error_rate_limit=None):
+ self.storage = get_storage(**storage)
+ if error_rate_limit is None:
+ error_rate_limit = {"errors": 10, "window_size": 1000}
+ assert "errors" in error_rate_limit
+ assert "window_size" in error_rate_limit
+ self.rate_queue = RateQueue(
+ size=error_rate_limit["window_size"], max_errors=error_rate_limit["errors"],
+ )
+
+ def __getattr__(self, key):
+ if key in self.tenacious_methods:
+ return partial(self._tenacious_add, key)
+ return getattr(self.storage, key)
+
+ def _tenacious_add(self, func_name, objects: Iterable[BaseModel]) -> Dict[str, int]:
+ """Enqueue objects to write to the storage. This checks if the queue's
+ threshold is hit. If it is actually write those to the storage.
+
+ """
+ add_function = getattr(self.storage, func_name)
+ object_type = func_name[:-4] # remove the _add suffix
+
+ # list of lists of objects; note this to_add list is consumed from the tail
+ to_add: List[List[BaseModel]] = [list(objects)]
+ results: CounterT[str] = Counter()
+
+ while to_add:
+ if self.rate_queue.limit_reached():
+ logging.error(
+ "Too many insertion errors have been detected; "
+ "disabling insertions"
+ )
+ raise RuntimeError(
+ "Too many insertion errors have been detected; "
+ "disabling insertions"
+ )
+ objs = to_add.pop()
+ try:
+ results.update(add_function(objs))
+ self.rate_queue.add_ok(len(objs))
+ except Exception as exc:
+ if len(objs) > 1:
+ logger.info(
+ f"{func_name}: failed to insert a batch of "
+ f"{len(objs)} {object_type} objects, splitting"
+ )
+ # reinsert objs split in 2 parts at the end of to_add
+ to_add.append(objs[(len(objs) // 2) :])
+ to_add.append(objs[: (len(objs) // 2)])
+ else:
+ logger.error(
+ f"{func_name}: failed to insert an object, excluding {objs}"
+ )
+ # logger.error(f"Exception: {exc}")
+ logger.exception(f"Exception was: {exc}")
+ results.update({f"{object_type}:add:errors": 1})
+ self.rate_queue.add_error()
+ return dict(results)
+
+ def reset(self):
+ self.rate_queue.reset()
diff --git a/swh/storage/tests/test_tenacious.py b/swh/storage/tests/test_tenacious.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_tenacious.py
@@ -0,0 +1,366 @@
+# Copyright (C) 2020-2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from contextlib import contextmanager
+from unittest.mock import patch
+
+import attr
+import pytest
+
+from swh.model import model
+from swh.model.tests.swh_model_data import TEST_OBJECTS
+from swh.storage import get_storage
+from swh.storage.in_memory import InMemoryStorage
+from swh.storage.tenacious import TenaciousProxyStorage
+from swh.storage.tests.storage_data import StorageData
+from swh.storage.tests.storage_tests import (
+ TestStorageGeneratedData as _TestStorageGeneratedData,
+)
+from swh.storage.tests.storage_tests import TestStorage as _TestStorage # noqa
+
+data = StorageData()
+collections = {
+ "origin": data.origins,
+ "content": data.contents,
+ "skipped_content": data.skipped_contents,
+ "revision": data.revisions,
+ "directory": data.directories,
+ "release": data.releases,
+ "snapshot": data.snapshots,
+}
+# generic storage tests (using imported TestStorage* classes)
+
+
+@pytest.fixture
+def swh_storage_backend_config2():
+ yield {
+ "cls": "memory",
+ "journal_writer": {"cls": "memory",},
+ }
+
+
+@pytest.fixture
+def swh_storage():
+ storage_config = {
+ "cls": "pipeline",
+ "steps": [
+ {"cls": "tenacious"},
+ {"cls": "memory", "journal_writer": {"cls": "memory",}},
+ ],
+ }
+
+ storage = get_storage(**storage_config)
+ storage.journal_writer = storage.storage.journal_writer
+ return storage
+
+
+class TestTenaciousStorage(_TestStorage):
+ @pytest.mark.skip(
+ 'The "person" table of the pgsql is a legacy thing, and not '
+ "supported by the cassandra/in-memory backend."
+ )
+ def test_person_fullname_unicity(self):
+ pass
+
+ @pytest.mark.skip(reason="No collision with the tenacious storage")
+ def test_content_add_collision(self, swh_storage, sample_data):
+ pass
+
+ @pytest.mark.skip("content_update is not implemented")
+ def test_content_update(self):
+ pass
+
+ @pytest.mark.skip("Not supported by Cassandra/InMemory storage")
+ def test_origin_count(self):
+ pass
+
+
+class TestTenaciousStorageGeneratedData(_TestStorageGeneratedData):
+ @pytest.mark.skip("Not supported by Cassandra/InMemory")
+ def test_origin_count(self):
+ pass
+
+ @pytest.mark.skip("Not supported by Cassandra/InMemory")
+ def test_origin_count_with_visit_no_visits(self):
+ pass
+
+ @pytest.mark.skip("Not supported by Cassandra/InMemory")
+ def test_origin_count_with_visit_with_visits_and_snapshot(self):
+ pass
+
+ @pytest.mark.skip("Not supported by Cassandra/InMemory")
+ def test_origin_count_with_visit_with_visits_no_snapshot(self):
+ pass
+
+
+# specific tests for the tenacious behavior
+
+
+def get_tenacious_storage(**config):
+ storage_config = {
+ "cls": "pipeline",
+ "steps": [
+ {"cls": "validate"},
+ {"cls": "tenacious", **config},
+ {"cls": "memory"},
+ ],
+ }
+
+ return get_storage(**storage_config)
+
+
+@contextmanager
+def disabled_validators():
+ attr.set_run_validators(False)
+ yield
+ attr.set_run_validators(True)
+
+
+def popid(d):
+ d.pop("id")
+ return d
+
+
+testdata = [
+ pytest.param(
+ "content",
+ list(TEST_OBJECTS["content"]),
+ attr.evolve(model.Content.from_data(data=b"too big"), length=1000),
+ attr.evolve(model.Content.from_data(data=b"to fail"), length=1000),
+ id="content",
+ ),
+ pytest.param(
+ "skipped_content",
+ list(TEST_OBJECTS["skipped_content"]),
+ attr.evolve(
+ model.SkippedContent.from_data(data=b"too big", reason="too big"),
+ length=1000,
+ ),
+ attr.evolve(
+ model.SkippedContent.from_data(data=b"to fail", reason="to fail"),
+ length=1000,
+ ),
+ id="skipped_content",
+ ),
+ pytest.param(
+ "directory",
+ list(TEST_OBJECTS["directory"]),
+ data.directory,
+ data.directory2,
+ id="directory",
+ ),
+ pytest.param(
+ "revision",
+ list(TEST_OBJECTS["revision"]),
+ data.revision,
+ data.revision2,
+ id="revision",
+ ),
+ pytest.param(
+ "release",
+ list(TEST_OBJECTS["release"]),
+ data.release,
+ data.release2,
+ id="release",
+ ),
+ pytest.param(
+ "snapshot",
+ list(TEST_OBJECTS["snapshot"]),
+ data.snapshot,
+ data.complete_snapshot,
+ id="snapshot",
+ ),
+ pytest.param(
+ "origin", list(TEST_OBJECTS["origin"]), data.origin, data.origin2, id="origin",
+ ),
+]
+
+
+class LimitedInMemoryStorage(InMemoryStorage):
+ # forbidden are 'bad1' and 'bad2' arguments of `testdata`
+ forbidden = [x[0][2] for x in testdata] + [x[0][3] for x in testdata]
+
+ def content_add(self, contents):
+ return self._maybe_add(super().content_add, "content", contents)
+
+ def skipped_content_add(self, skipped_contents):
+ return self._maybe_add(
+ super().skipped_content_add, "skipped_content", skipped_contents
+ )
+
+ def origin_add(self, origins):
+ return self._maybe_add(super().origin_add, "origin", origins)
+
+ def directory_add(self, directories):
+ return self._maybe_add(super().directory_add, "directory", directories)
+
+ def revision_add(self, revisions):
+ return self._maybe_add(super().revision_add, "revision", revisions)
+
+ def release_add(self, releases):
+ return self._maybe_add(super().release_add, "release", releases)
+
+ def snapshot_add(self, snapshots):
+ return self._maybe_add(super().snapshot_add, "snapshot", snapshots)
+
+ def _maybe_add(self, add_func, object_type, objects):
+ # forbidden = [c.id for c in collections[object_type]]
+ if any(c in self.forbidden for c in objects):
+ raise ValueError(
+ f"{object_type} is forbidden",
+ [c.unique_key() for c in objects if c in self.forbidden],
+ )
+ return add_func(objects)
+
+
+@patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage)
+@pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata)
+def test_tenacious_proxy_storage(object_type, objects, bad1, bad2):
+ storage = get_tenacious_storage()
+ tenacious = storage.storage
+ in_memory = tenacious.storage
+ assert isinstance(tenacious, TenaciousProxyStorage)
+ assert isinstance(in_memory, LimitedInMemoryStorage)
+
+ size = len(objects)
+
+ add_func = getattr(storage, f"{object_type}_add")
+
+ s = add_func(objects)
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 0
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 is the last element
+ s = add_func(objects + [bad1])
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 1
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 and bad2 are the last elements
+ s = add_func(objects + [bad1, bad2])
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 2
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 is the first element
+ s = add_func([bad1] + objects)
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 1
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 and bad2 are the first elements
+ s = add_func([bad1, bad2] + objects)
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 2
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 is in the middle of the list of inserted elements
+ s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :])
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 1
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 and bad2 are together in the middle of the list of inserted elements
+ s = add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :])
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 2
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 and bad2 are spread in the middle of the list of inserted elements
+ s = add_func(
+ objects[: size // 3]
+ + [bad1]
+ + objects[size // 3 : 2 * (size // 3)]
+ + [bad2]
+ + objects[2 * (size // 3) :]
+ )
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 2
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 is the only element
+ s = add_func([bad1])
+ assert s.get(f"{object_type}:add", 0) == 0
+ assert s.get(f"{object_type}:add:errors", 0) == 1
+ in_memory.reset()
+ tenacious.reset()
+
+ # bad1 and bad2 are the only elements
+ s = add_func([bad1, bad2])
+ assert s.get(f"{object_type}:add", 0) == 0
+ assert s.get(f"{object_type}:add:errors", 0) == 2
+ in_memory.reset()
+ tenacious.reset()
+
+
+@patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage)
+@pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata)
+def test_tenacious_proxy_storage_rate_limit(object_type, objects, bad1, bad2):
+ storage = get_tenacious_storage(error_rate_limit={"errors": 1, "window_size": 3})
+ tenacious = storage.storage
+ in_memory = tenacious.storage
+ assert isinstance(tenacious, TenaciousProxyStorage)
+ assert isinstance(in_memory, LimitedInMemoryStorage)
+
+ size = len(objects)
+
+ add_func = getattr(storage, f"{object_type}_add")
+
+ # with no insertion failure, no impact
+ s = add_func(objects)
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 0
+ in_memory.reset()
+ tenacious.reset()
+
+ # with one insertion failure, no impact
+ s = add_func([bad1] + objects)
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 1
+ in_memory.reset()
+ tenacious.reset()
+
+ s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :])
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 1
+ in_memory.reset()
+ tenacious.reset()
+
+ # with two consecutive insertion failures, exception is raised
+ with pytest.raises(RuntimeError, match="Too many insertion errors"):
+ add_func([bad1, bad2] + objects)
+ in_memory.reset()
+ tenacious.reset()
+
+ if size > 2:
+ # with two consecutive insertion failures, exception is raised
+ # (errors not at the beginning)
+ with pytest.raises(RuntimeError, match="Too many insertion errors"):
+ add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :])
+ in_memory.reset()
+ tenacious.reset()
+
+ # with two non-consecutive insertion failures, no impact
+ # (errors are far enough to not reach the rate limit)
+ s = add_func(
+ objects[: size // 3]
+ + [bad1]
+ + objects[size // 3 : 2 * (size // 3)]
+ + [bad2]
+ + objects[2 * (size // 3) :]
+ )
+ assert s.get(f"{object_type}:add", 0) == size
+ assert s.get(f"{object_type}:add:errors", 0) == 2
+ in_memory.reset()
+ tenacious.reset()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 17, 12:49 AM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3218020
Attached To
D3334: Add a new TenaciousProxyStorage
Event Timeline
Log In to Comment