diff --git a/PKG-INFO b/PKG-INFO index 8e342b99..0df6abfb 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,223 +1,223 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.29.1 +Version: 0.30.0 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-storage/ Description: swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). They also expect a cassandra server. #### Debian-like host ``` $ sudo apt install libpq-dev postgresql-11 cassandra ``` #### Non Debian-like host The tests expects the path to `cassandra` to either be unspecified, it is then looked up at `/usr/sbin/cassandra`, either specified through the environment variable `SWH_CASSANDRA_BIN`. Optionally, you can avoid running the cassandra tests. ``` (swh) :~/swh-storage$ tox -- -m 'not cassandra' ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` Note: it is possible to set the `JAVA_HOME` environment variable to specify the version of the JVM to be used by Cassandra. For example, at the time of writing this, Cassandra does not support java 14, so one may want to use for example java 11: ``` (swh) :~/swh-storage$ export JAVA_HOME=/usr/lib/jvm/java-14-openjdk-amd64/bin/java (swh) :~/swh-storage$ tox [...] ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: cls: local db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote url: http://localhost:5002/ ``` You could directly define a local storage with the following snippet: ``` storage: cls: local db: service=swh-dev objstorage: cls: pathslicing root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 8e342b99..0df6abfb 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,223 +1,223 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.29.1 +Version: 0.30.0 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-storage/ Description: swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). They also expect a cassandra server. #### Debian-like host ``` $ sudo apt install libpq-dev postgresql-11 cassandra ``` #### Non Debian-like host The tests expects the path to `cassandra` to either be unspecified, it is then looked up at `/usr/sbin/cassandra`, either specified through the environment variable `SWH_CASSANDRA_BIN`. Optionally, you can avoid running the cassandra tests. ``` (swh) :~/swh-storage$ tox -- -m 'not cassandra' ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` Note: it is possible to set the `JAVA_HOME` environment variable to specify the version of the JVM to be used by Cassandra. For example, at the time of writing this, Cassandra does not support java 14, so one may want to use for example java 11: ``` (swh) :~/swh-storage$ export JAVA_HOME=/usr/lib/jvm/java-14-openjdk-amd64/bin/java (swh) :~/swh-storage$ tox [...] ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: cls: local db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote url: http://localhost:5002/ ``` You could directly define a local storage with the following snippet: ``` storage: cls: local db: service=swh-dev objstorage: cls: pathslicing root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal diff --git a/swh/storage/proxies/tenacious.py b/swh/storage/proxies/tenacious.py index 78bc9bcd..1c47c032 100644 --- a/swh/storage/proxies/tenacious.py +++ b/swh/storage/proxies/tenacious.py @@ -1,174 +1,175 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter, deque from functools import partial import logging from typing import Counter as CounterT from typing import Deque, Dict, Iterable, List, Optional from swh.model.model import BaseModel from swh.storage import get_storage logger = logging.getLogger(__name__) class RateQueue: def __init__(self, size: int, max_errors: int): assert size > max_errors self._size = size self._max_errors = max_errors self._errors: Deque[bool] = deque(maxlen=size) def add_ok(self, n_ok: int = 1) -> None: self._errors.extend([False] * n_ok) def add_error(self, n_error: int = 1) -> None: self._errors.extend([True] * n_error) def limit_reached(self) -> bool: return sum(self._errors) > self._max_errors def reset(self): # mainly for testing purpose self._errors.clear() class TenaciousProxyStorage: """Storage proxy that have a tenacious insertion behavior. When an xxx_add method is called, it's first attempted as is against the backend storage. If a failure occurs, split the list of inserted objects in pieces until erroneous objects have been identified, so all the valid objects are guaranteed to be inserted. Also provides a error-rate limit feature: if more than n errors occurred during the insertion of the last p (window_size) objects, stop accepting any insertion. The number of insertion retries for a single object can be specified via the 'retries' parameter. This proxy is mainly intended to be used in a replayer configuration (aka a mirror stack), where insertion errors are mostly unexpected (which explains the low default ratio errors/window_size). Conversely, it should not be used in a loader configuration, as it may drop objects without stopping the loader, which leads to holes in the graph. Deployments using this proxy should carefully monitor their logs to check any failure is expected (because the failed object is corrupted), not because of transient errors or issues with the storage backend. Sample configuration use case for tenacious storage: .. code-block:: yaml storage: cls: tenacious storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ error-rate-limit: errors: 10 window_size: 1000 """ - tenacious_methods = ( - "content_add", - "skipped_content_add", - "directory_add", - "revision_add", - "extid_add", - "release_add", - "snapshot_add", - "origin_add", - ) + tenacious_methods: Dict[str, str] = { + "content_add": "content", + "content_add_metadata": "content", + "skipped_content_add": "skipped_content", + "directory_add": "directory", + "revision_add": "revision", + "extid_add": "extid", + "release_add": "release", + "snapshot_add": "snapshot", + "origin_add": "origin", + } def __init__( self, storage, error_rate_limit: Optional[Dict[str, int]] = None, retries: int = 3, ): self.storage = get_storage(**storage) if error_rate_limit is None: error_rate_limit = {"errors": 10, "window_size": 1000} assert "errors" in error_rate_limit assert "window_size" in error_rate_limit self.rate_queue = RateQueue( size=error_rate_limit["window_size"], max_errors=error_rate_limit["errors"], ) self._single_object_retries: int = retries def __getattr__(self, key): if key in self.tenacious_methods: return partial(self._tenacious_add, key) return getattr(self.storage, key) def _tenacious_add(self, func_name, objects: Iterable[BaseModel]) -> Dict[str, int]: """Enqueue objects to write to the storage. This checks if the queue's threshold is hit. If it is actually write those to the storage. """ add_function = getattr(self.storage, func_name) - object_type = func_name[:-4] # remove the _add suffix + object_type = self.tenacious_methods[func_name] # list of lists of objects; note this to_add list is consumed from the tail to_add: List[List[BaseModel]] = [list(objects)] n_objs: int = len(to_add[0]) results: CounterT[str] = Counter() retries: int = self._single_object_retries while to_add: if self.rate_queue.limit_reached(): logging.error( "Too many insertion errors have been detected; " "disabling insertions" ) raise RuntimeError( "Too many insertion errors have been detected; " "disabling insertions" ) objs = to_add.pop() try: results.update(add_function(objs)) self.rate_queue.add_ok(len(objs)) except Exception as exc: if len(objs) > 1: logger.info( f"{func_name}: failed to insert a batch of " f"{len(objs)} {object_type} objects, splitting" ) # reinsert objs split in 2 parts at the end of to_add to_add.append(objs[(len(objs) // 2) :]) to_add.append(objs[: (len(objs) // 2)]) # each time we append a batch in the to_add bag, reset the # one-object-batch retries counter retries = self._single_object_retries else: retries -= 1 if retries: logger.info( f"{func_name}: failed to insert an {object_type}, retrying" ) # give it another chance to_add.append(objs) else: logger.error( f"{func_name}: failed to insert an object, " f"excluding {objs} (from a batch of {n_objs})" ) logger.exception(f"Exception was: {exc}") results.update({f"{object_type}:add:errors": 1}) self.rate_queue.add_error() # reset the retries counter (needed in case the next # batch is also 1 element only) retries = self._single_object_retries return dict(results) def reset(self): self.rate_queue.reset() diff --git a/swh/storage/tests/test_tenacious.py b/swh/storage/tests/test_tenacious.py index cddacf06..9fb2fcef 100644 --- a/swh/storage/tests/test_tenacious.py +++ b/swh/storage/tests/test_tenacious.py @@ -1,394 +1,423 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from contextlib import contextmanager from unittest.mock import patch import attr import pytest from swh.model import model from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.storage import get_storage from swh.storage.in_memory import InMemoryStorage from swh.storage.proxies.tenacious import TenaciousProxyStorage from swh.storage.tests.storage_data import StorageData from swh.storage.tests.storage_tests import ( TestStorageGeneratedData as _TestStorageGeneratedData, ) from swh.storage.tests.storage_tests import TestStorage as _TestStorage # noqa +from swh.storage.utils import now data = StorageData() collections = { "origin": data.origins, "content": data.contents, "skipped_content": data.skipped_contents, "revision": data.revisions, "directory": data.directories, "release": data.releases, "snapshot": data.snapshots, } # generic storage tests (using imported TestStorage* classes) @pytest.fixture def swh_storage_backend_config2(): yield { "cls": "memory", "journal_writer": {"cls": "memory",}, } @pytest.fixture def swh_storage(): storage_config = { "cls": "pipeline", "steps": [ {"cls": "tenacious"}, {"cls": "memory", "journal_writer": {"cls": "memory",}}, ], } storage = get_storage(**storage_config) storage.journal_writer = storage.storage.journal_writer return storage class TestTenaciousStorage(_TestStorage): @pytest.mark.skip( 'The "person" table of the pgsql is a legacy thing, and not ' "supported by the cassandra/in-memory backend." ) def test_person_fullname_unicity(self): pass @pytest.mark.skip(reason="No collision with the tenacious storage") def test_content_add_collision(self, swh_storage, sample_data): pass + @pytest.mark.skip(reason="No collision with the tenacious storage") + def test_content_add_metadata_collision(self, swh_storage, sample_data): + pass + @pytest.mark.skip("content_update is not implemented") def test_content_update(self): pass @pytest.mark.skip("Not supported by Cassandra/InMemory storage") def test_origin_count(self): pass class TestTenaciousStorageGeneratedData(_TestStorageGeneratedData): @pytest.mark.skip("Not supported by Cassandra/InMemory") def test_origin_count(self): pass @pytest.mark.skip("Not supported by Cassandra/InMemory") def test_origin_count_with_visit_no_visits(self): pass @pytest.mark.skip("Not supported by Cassandra/InMemory") def test_origin_count_with_visit_with_visits_and_snapshot(self): pass @pytest.mark.skip("Not supported by Cassandra/InMemory") def test_origin_count_with_visit_with_visits_no_snapshot(self): pass # specific tests for the tenacious behavior def get_tenacious_storage(**config): storage_config = { "cls": "pipeline", "steps": [ {"cls": "validate"}, {"cls": "tenacious", **config}, {"cls": "memory"}, ], } return get_storage(**storage_config) @contextmanager def disabled_validators(): attr.set_run_validators(False) yield attr.set_run_validators(True) def popid(d): d.pop("id") return d testdata = [ pytest.param( "content", + "content_add", list(TEST_OBJECTS["content"]), attr.evolve(model.Content.from_data(data=b"too big"), length=1000), attr.evolve(model.Content.from_data(data=b"to fail"), length=1000), id="content", ), + pytest.param( + "content", + "content_add_metadata", + [attr.evolve(cnt, ctime=now()) for cnt in TEST_OBJECTS["content"]], + attr.evolve(model.Content.from_data(data=b"too big"), length=1000, ctime=now()), + attr.evolve(model.Content.from_data(data=b"to fail"), length=1000, ctime=now()), + id="content_metadata", + ), pytest.param( "skipped_content", + "skipped_content_add", list(TEST_OBJECTS["skipped_content"]), attr.evolve( model.SkippedContent.from_data(data=b"too big", reason="too big"), length=1000, ), attr.evolve( model.SkippedContent.from_data(data=b"to fail", reason="to fail"), length=1000, ), id="skipped_content", ), pytest.param( "directory", + "directory_add", list(TEST_OBJECTS["directory"]), data.directory, data.directory2, id="directory", ), pytest.param( "revision", + "revision_add", list(TEST_OBJECTS["revision"]), data.revision, data.revision2, id="revision", ), pytest.param( "release", + "release_add", list(TEST_OBJECTS["release"]), data.release, data.release2, id="release", ), pytest.param( "snapshot", + "snapshot_add", list(TEST_OBJECTS["snapshot"]), data.snapshot, data.complete_snapshot, id="snapshot", ), pytest.param( - "origin", list(TEST_OBJECTS["origin"]), data.origin, data.origin2, id="origin", + "origin", + "origin_add", + list(TEST_OBJECTS["origin"]), + data.origin, + data.origin2, + id="origin", ), ] class LimitedInMemoryStorage(InMemoryStorage): # forbidden are 'bad1' and 'bad2' arguments of `testdata` - forbidden = [x[0][2] for x in testdata] + [x[0][3] for x in testdata] + forbidden = [x[0][3] for x in testdata] + [x[0][4] for x in testdata] def __init__(self, *args, **kw): self.add_calls = Counter() super().__init__(*args, **kw) def reset(self): super().reset() self.add_calls.clear() def content_add(self, contents): return self._maybe_add(super().content_add, "content", contents) + def content_add_metadata(self, contents): + return self._maybe_add(super().content_add_metadata, "content", contents) + def skipped_content_add(self, skipped_contents): return self._maybe_add( super().skipped_content_add, "skipped_content", skipped_contents ) def origin_add(self, origins): return self._maybe_add(super().origin_add, "origin", origins) def directory_add(self, directories): return self._maybe_add(super().directory_add, "directory", directories) def revision_add(self, revisions): return self._maybe_add(super().revision_add, "revision", revisions) def release_add(self, releases): return self._maybe_add(super().release_add, "release", releases) def snapshot_add(self, snapshots): return self._maybe_add(super().snapshot_add, "snapshot", snapshots) def _maybe_add(self, add_func, object_type, objects): self.add_calls[object_type] += 1 if any(c in self.forbidden for c in objects): raise ValueError( f"{object_type} is forbidden", [c.unique_key() for c in objects if c in self.forbidden], ) return add_func(objects) @patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage) -@pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata) -def test_tenacious_proxy_storage(object_type, objects, bad1, bad2): +@pytest.mark.parametrize("object_type, add_func_name, objects, bad1, bad2", testdata) +def test_tenacious_proxy_storage(object_type, add_func_name, objects, bad1, bad2): storage = get_tenacious_storage() tenacious = storage.storage in_memory = tenacious.storage assert isinstance(tenacious, TenaciousProxyStorage) assert isinstance(in_memory, LimitedInMemoryStorage) size = len(objects) - add_func = getattr(storage, f"{object_type}_add") + add_func = getattr(storage, add_func_name) # Note: when checking the LimitedInMemoryStorage.add_calls counter, it's # hard to guess the exact number of calls in the end (depends on the size # of batch and the position of bad objects in this batch). So we will only # check a lower limit of the form (n + m), where n is the minimum expected # number of additions (due to the batch begin split), and m is the fact # that bad objects are tried (individually) several (3) times before giving # up. So for one bad object, m is 3; for 2 bad objects, m is 6, etc. s = add_func(objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 0 assert storage.add_calls[object_type] == (1 + 0) in_memory.reset() tenacious.reset() # bad1 is the last element s = add_func(objects + [bad1]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 assert storage.add_calls[object_type] >= (2 + 3) in_memory.reset() tenacious.reset() # bad1 and bad2 are the last elements s = add_func(objects + [bad1, bad2]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() # bad1 is the first element s = add_func([bad1] + objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 assert storage.add_calls[object_type] >= (2 + 3) in_memory.reset() tenacious.reset() # bad1 and bad2 are the first elements s = add_func([bad1, bad2] + objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() # bad1 is in the middle of the list of inserted elements s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 assert storage.add_calls[object_type] >= (3 + 3) in_memory.reset() tenacious.reset() # bad1 and bad2 are together in the middle of the list of inserted elements s = add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() # bad1 and bad2 are spread in the middle of the list of inserted elements s = add_func( objects[: size // 3] + [bad1] + objects[size // 3 : 2 * (size // 3)] + [bad2] + objects[2 * (size // 3) :] ) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() # bad1 is the only element s = add_func([bad1]) assert s.get(f"{object_type}:add", 0) == 0 assert s.get(f"{object_type}:add:errors", 0) == 1 assert storage.add_calls[object_type] == (0 + 3) in_memory.reset() tenacious.reset() # bad1 and bad2 are the only elements s = add_func([bad1, bad2]) assert s.get(f"{object_type}:add", 0) == 0 assert s.get(f"{object_type}:add:errors", 0) == 2 assert storage.add_calls[object_type] == (1 + 6) in_memory.reset() tenacious.reset() @patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage) -@pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata) -def test_tenacious_proxy_storage_rate_limit(object_type, objects, bad1, bad2): +@pytest.mark.parametrize("object_type, add_func_name, objects, bad1, bad2", testdata) +def test_tenacious_proxy_storage_rate_limit( + object_type, add_func_name, objects, bad1, bad2 +): storage = get_tenacious_storage(error_rate_limit={"errors": 1, "window_size": 3}) tenacious = storage.storage in_memory = tenacious.storage assert isinstance(tenacious, TenaciousProxyStorage) assert isinstance(in_memory, LimitedInMemoryStorage) size = len(objects) - add_func = getattr(storage, f"{object_type}_add") + add_func = getattr(storage, add_func_name) # with no insertion failure, no impact s = add_func(objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 0 in_memory.reset() tenacious.reset() # with one insertion failure, no impact s = add_func([bad1] + objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 in_memory.reset() tenacious.reset() s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 in_memory.reset() tenacious.reset() # with two consecutive insertion failures, exception is raised with pytest.raises(RuntimeError, match="Too many insertion errors"): add_func([bad1, bad2] + objects) in_memory.reset() tenacious.reset() if size > 2: # with two consecutive insertion failures, exception is raised # (errors not at the beginning) with pytest.raises(RuntimeError, match="Too many insertion errors"): add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :]) in_memory.reset() tenacious.reset() # with two non-consecutive insertion failures, no impact # (errors are far enough to not reach the rate limit) s = add_func( objects[: size // 3] + [bad1] + objects[size // 3 : 2 * (size // 3)] + [bad2] + objects[2 * (size // 3) :] ) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 in_memory.reset() tenacious.reset()