diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py index b96fea7d..14320eb3 100644 --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -1,112 +1,113 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import importlib from typing import TYPE_CHECKING, Any, Dict, List import warnings if TYPE_CHECKING: from .interface import StorageInterface STORAGE_IMPLEMENTATIONS = { "local": ".postgresql.storage.Storage", "remote": ".api.client.RemoteStorage", "memory": ".in_memory.InMemoryStorage", - "filter": ".filter.FilteringProxyStorage", - "buffer": ".buffer.BufferingProxyStorage", - "retry": ".retry.RetryingProxyStorage", "cassandra": ".cassandra.CassandraStorage", - "validate": ".validate.ValidatingProxyStorage", - "tenacious": ".tenacious.TenaciousProxyStorage", + # proxy storages + "filter": ".proxies.filter.FilteringProxyStorage", + "buffer": ".proxies.buffer.BufferingProxyStorage", + "retry": ".proxies.retry.RetryingProxyStorage", + "validate": ".proxies.validate.ValidatingProxyStorage", + "tenacious": ".proxies.tenacious.TenaciousProxyStorage", } def get_storage(cls: str, **kwargs) -> "StorageInterface": """Get a storage object of class `storage_class` with arguments `storage_args`. Args: cls (str): storage's class, can be: - ``local`` to use a postgresql database - ``cassandra`` to use a cassandra database - ``remote`` to connect to a swh-storage server - ``memory`` for an in-memory storage, useful for fast tests - ``filter``, ``buffer``, ... to use specific storage "proxies", see their respective documentations args (dict): dictionary with keys Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] if cls == "pipeline": return get_storage_pipeline(**kwargs) class_path = STORAGE_IMPLEMENTATIONS.get(cls) if class_path is None: raise ValueError( "Unknown storage class `%s`. Supported: %s" % (cls, ", ".join(STORAGE_IMPLEMENTATIONS)) ) (module_path, class_name) = class_path.rsplit(".", 1) module = importlib.import_module(module_path, package=__package__) Storage = getattr(module, class_name) check_config = kwargs.pop("check_config", {}) storage = Storage(**kwargs) if check_config: if not storage.check_config(**check_config): raise EnvironmentError("storage check config failed") return storage def get_storage_pipeline( steps: List[Dict[str, Any]], check_config=None ) -> "StorageInterface": """Recursively get a storage object that may use other storage objects as backends. Args: steps (List[dict]): List of dicts that may be used as kwargs for `get_storage`. Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ storage_config = None for step in reversed(steps): if "args" in step: warnings.warn( 'Explicit "args" key is deprecated, use keys directly ' "instead.", DeprecationWarning, ) step = { "cls": step["cls"], **step["args"], } if storage_config: step["storage"] = storage_config step["check_config"] = check_config storage_config = step if storage_config is None: raise ValueError("'pipeline' has no steps.") return get_storage(**storage_config) diff --git a/swh/storage/buffer.py b/swh/storage/proxies/buffer.py similarity index 100% rename from swh/storage/buffer.py rename to swh/storage/proxies/buffer.py diff --git a/swh/storage/filter.py b/swh/storage/proxies/filter.py similarity index 100% rename from swh/storage/filter.py rename to swh/storage/proxies/filter.py diff --git a/swh/storage/retry.py b/swh/storage/proxies/retry.py similarity index 100% rename from swh/storage/retry.py rename to swh/storage/proxies/retry.py diff --git a/swh/storage/tenacious.py b/swh/storage/proxies/tenacious.py similarity index 100% rename from swh/storage/tenacious.py rename to swh/storage/proxies/tenacious.py diff --git a/swh/storage/validate.py b/swh/storage/proxies/validate.py similarity index 100% rename from swh/storage/validate.py rename to swh/storage/proxies/validate.py diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py index 090237f8..0335c060 100644 --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -1,622 +1,622 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Optional from unittest.mock import Mock from swh.storage import get_storage -from swh.storage.buffer import BufferingProxyStorage +from swh.storage.proxies.buffer import BufferingProxyStorage def get_storage_with_buffer_config(**buffer_config) -> BufferingProxyStorage: steps = [ {"cls": "buffer", **buffer_config}, {"cls": "memory"}, ] ret = get_storage("pipeline", steps=steps) assert isinstance(ret, BufferingProxyStorage) return ret def test_buffering_proxy_storage_content_threshold_not_hit(sample_data) -> None: contents = sample_data.contents[:2] contents_dict = [c.to_dict() for c in contents] storage = get_storage_with_buffer_config(min_batch_size={"content": 10,}) s = storage.content_add(contents) assert s == {} # contents have not been written to storage missing_contents = storage.content_missing(contents_dict) assert set(missing_contents) == set([contents[0].sha1, contents[1].sha1]) s = storage.flush() assert s == { "content:add": 1 + 1, "content:add:bytes": contents[0].length + contents[1].length, } missing_contents = storage.content_missing(contents_dict) assert list(missing_contents) == [] def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data) -> None: content = sample_data.content content_dict = content.to_dict() storage = get_storage_with_buffer_config(min_batch_size={"content": 1,}) s = storage.content_add([content]) assert s == { "content:add": 1, "content:add:bytes": content.length, } missing_contents = storage.content_missing([content_dict]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_content_deduplicate(sample_data) -> None: contents = sample_data.contents[:2] storage = get_storage_with_buffer_config(min_batch_size={"content": 2,}) s = storage.content_add([contents[0], contents[0]]) assert s == {} s = storage.content_add([contents[0]]) assert s == {} s = storage.content_add([contents[1]]) assert s == { "content:add": 1 + 1, "content:add:bytes": contents[0].length + contents[1].length, } missing_contents = storage.content_missing([c.to_dict() for c in contents]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data) -> None: contents = sample_data.contents[:2] content_bytes_min_batch_size = 2 storage = get_storage_with_buffer_config( min_batch_size={"content": 10, "content_bytes": content_bytes_min_batch_size,} ) assert contents[0].length > content_bytes_min_batch_size s = storage.content_add([contents[0]]) assert s == { "content:add": 1, "content:add:bytes": contents[0].length, } missing_contents = storage.content_missing([contents[0].to_dict()]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_skipped_content_threshold_not_hit(sample_data) -> None: contents = sample_data.skipped_contents contents_dict = [c.to_dict() for c in contents] storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 10,}) s = storage.skipped_content_add([contents[0], contents[1]]) assert s == {} # contents have not been written to storage missing_contents = storage.skipped_content_missing(contents_dict) assert {c["sha1"] for c in missing_contents} == {c.sha1 for c in contents} s = storage.flush() assert s == {"skipped_content:add": 1 + 1} missing_contents = storage.skipped_content_missing(contents_dict) assert list(missing_contents) == [] def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data) -> None: contents = sample_data.skipped_contents storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 1,}) s = storage.skipped_content_add([contents[0]]) assert s == {"skipped_content:add": 1} missing_contents = storage.skipped_content_missing([contents[0].to_dict()]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_skipped_content_deduplicate(sample_data): contents = sample_data.skipped_contents[:2] storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 2,}) s = storage.skipped_content_add([contents[0], contents[0]]) assert s == {} s = storage.skipped_content_add([contents[0]]) assert s == {} s = storage.skipped_content_add([contents[1]]) assert s == { "skipped_content:add": 1 + 1, } missing_contents = storage.skipped_content_missing([c.to_dict() for c in contents]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_extid_threshold_not_hit(sample_data) -> None: extid = sample_data.extid1 storage = get_storage_with_buffer_config(min_batch_size={"extid": 10,}) s = storage.extid_add([extid]) assert s == {} present_extids = storage.extid_get_from_target( extid.target.object_type, [extid.target.object_id] ) assert list(present_extids) == [] s = storage.flush() assert s == { "extid:add": 1, } present_extids = storage.extid_get_from_target( extid.target.object_type, [extid.target.object_id] ) assert list(present_extids) == [extid] def test_buffering_proxy_storage_extid_threshold_hit(sample_data) -> None: extid = sample_data.extid1 storage = get_storage_with_buffer_config(min_batch_size={"extid": 1,}) s = storage.extid_add([extid]) assert s == { "extid:add": 1, } present_extids = storage.extid_get_from_target( extid.target.object_type, [extid.target.object_id] ) assert list(present_extids) == [extid] s = storage.flush() assert s == {} def test_buffering_proxy_storage_extid_deduplicate(sample_data) -> None: extids = sample_data.extids[:2] storage = get_storage_with_buffer_config(min_batch_size={"extid": 2,}) s = storage.extid_add([extids[0], extids[0]]) assert s == {} s = storage.extid_add([extids[0]]) assert s == {} s = storage.extid_add([extids[1]]) assert s == { "extid:add": 1 + 1, } for extid in extids: present_extids = storage.extid_get_from_target( extid.target.object_type, [extid.target.object_id] ) assert list(present_extids) == [extid] s = storage.flush() assert s == {} def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data) -> None: directory = sample_data.directory storage = get_storage_with_buffer_config(min_batch_size={"directory": 10,}) s = storage.directory_add([directory]) assert s == {} missing_directories = storage.directory_missing([directory.id]) assert list(missing_directories) == [directory.id] s = storage.flush() assert s == { "directory:add": 1, } missing_directories = storage.directory_missing([directory.id]) assert list(missing_directories) == [] def test_buffering_proxy_storage_directory_threshold_hit(sample_data) -> None: directory = sample_data.directory storage = get_storage_with_buffer_config(min_batch_size={"directory": 1,}) s = storage.directory_add([directory]) assert s == { "directory:add": 1, } missing_directories = storage.directory_missing([directory.id]) assert list(missing_directories) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_directory_deduplicate(sample_data) -> None: directories = sample_data.directories[:2] storage = get_storage_with_buffer_config(min_batch_size={"directory": 2,}) s = storage.directory_add([directories[0], directories[0]]) assert s == {} s = storage.directory_add([directories[0]]) assert s == {} s = storage.directory_add([directories[1]]) assert s == { "directory:add": 1 + 1, } missing_directories = storage.directory_missing([d.id for d in directories]) assert list(missing_directories) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data) -> None: revision = sample_data.revision storage = get_storage_with_buffer_config(min_batch_size={"revision": 10,}) s = storage.revision_add([revision]) assert s == {} missing_revisions = storage.revision_missing([revision.id]) assert list(missing_revisions) == [revision.id] s = storage.flush() assert s == { "revision:add": 1, } missing_revisions = storage.revision_missing([revision.id]) assert list(missing_revisions) == [] def test_buffering_proxy_storage_revision_threshold_hit(sample_data) -> None: revision = sample_data.revision storage = get_storage_with_buffer_config(min_batch_size={"revision": 1,}) s = storage.revision_add([revision]) assert s == { "revision:add": 1, } missing_revisions = storage.revision_missing([revision.id]) assert list(missing_revisions) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_revision_deduplicate(sample_data) -> None: revisions = sample_data.revisions[:2] storage = get_storage_with_buffer_config(min_batch_size={"revision": 2,}) s = storage.revision_add([revisions[0], revisions[0]]) assert s == {} s = storage.revision_add([revisions[0]]) assert s == {} s = storage.revision_add([revisions[1]]) assert s == { "revision:add": 1 + 1, } missing_revisions = storage.revision_missing([r.id for r in revisions]) assert list(missing_revisions) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_release_threshold_not_hit(sample_data) -> None: releases = sample_data.releases threshold = 10 assert len(releases) < threshold storage = get_storage_with_buffer_config( min_batch_size={"release": threshold,} # configuration set ) s = storage.release_add(releases) assert s == {} release_ids = [r.id for r in releases] missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == release_ids s = storage.flush() assert s == { "release:add": len(releases), } missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == [] def test_buffering_proxy_storage_release_threshold_hit(sample_data) -> None: releases = sample_data.releases threshold = 2 assert len(releases) > threshold storage = get_storage_with_buffer_config( min_batch_size={"release": threshold,} # configuration set ) s = storage.release_add(releases) assert s == { "release:add": len(releases), } release_ids = [r.id for r in releases] missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_release_deduplicate(sample_data) -> None: releases = sample_data.releases[:2] storage = get_storage_with_buffer_config(min_batch_size={"release": 2,}) s = storage.release_add([releases[0], releases[0]]) assert s == {} s = storage.release_add([releases[0]]) assert s == {} s = storage.release_add([releases[1]]) assert s == { "release:add": 1 + 1, } missing_releases = storage.release_missing([r.id for r in releases]) assert list(missing_releases) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_snapshot_threshold_not_hit(sample_data) -> None: snapshots = sample_data.snapshots threshold = 10 assert len(snapshots) < threshold storage = get_storage_with_buffer_config( min_batch_size={"snapshot": threshold,} # configuration set ) s = storage.snapshot_add(snapshots) assert s == {} snapshot_ids = [r.id for r in snapshots] missing_snapshots = storage.snapshot_missing(snapshot_ids) assert list(missing_snapshots) == snapshot_ids s = storage.flush() assert s == { "snapshot:add": len(snapshots), } missing_snapshots = storage.snapshot_missing(snapshot_ids) assert list(missing_snapshots) == [] def test_buffering_proxy_storage_snapshot_threshold_hit(sample_data) -> None: snapshots = sample_data.snapshots threshold = 2 assert len(snapshots) > threshold storage = get_storage_with_buffer_config( min_batch_size={"snapshot": threshold,} # configuration set ) s = storage.snapshot_add(snapshots) assert s == { "snapshot:add": len(snapshots), } snapshot_ids = [r.id for r in snapshots] missing_snapshots = storage.snapshot_missing(snapshot_ids) assert list(missing_snapshots) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_snapshot_deduplicate(sample_data) -> None: snapshots = sample_data.snapshots[:2] storage = get_storage_with_buffer_config(min_batch_size={"snapshot": 2,}) s = storage.snapshot_add([snapshots[0], snapshots[0]]) assert s == {} s = storage.snapshot_add([snapshots[0]]) assert s == {} s = storage.snapshot_add([snapshots[1]]) assert s == { "snapshot:add": 1 + 1, } missing_snapshots = storage.snapshot_missing([r.id for r in snapshots]) assert list(missing_snapshots) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_clear(sample_data) -> None: """Clear operation on buffer """ threshold = 10 contents = sample_data.contents assert 0 < len(contents) < threshold skipped_contents = sample_data.skipped_contents assert 0 < len(skipped_contents) < threshold directories = sample_data.directories assert 0 < len(directories) < threshold revisions = sample_data.revisions assert 0 < len(revisions) < threshold releases = sample_data.releases assert 0 < len(releases) < threshold snapshots = sample_data.snapshots assert 0 < len(snapshots) < threshold storage = get_storage_with_buffer_config( min_batch_size={ "content": threshold, "skipped_content": threshold, "directory": threshold, "revision": threshold, "release": threshold, } ) s = storage.content_add(contents) assert s == {} s = storage.skipped_content_add(skipped_contents) assert s == {} s = storage.directory_add(directories) assert s == {} s = storage.revision_add(revisions) assert s == {} s = storage.release_add(releases) assert s == {} s = storage.snapshot_add(snapshots) assert s == {} assert len(storage._objects["content"]) == len(contents) assert len(storage._objects["skipped_content"]) == len(skipped_contents) assert len(storage._objects["directory"]) == len(directories) assert len(storage._objects["revision"]) == len(revisions) assert len(storage._objects["release"]) == len(releases) assert len(storage._objects["snapshot"]) == len(snapshots) # clear only content from the buffer s = storage.clear_buffers(["content"]) # type: ignore assert s is None # specific clear operation on specific object type content only touched # them assert len(storage._objects["content"]) == 0 assert len(storage._objects["skipped_content"]) == len(skipped_contents) assert len(storage._objects["directory"]) == len(directories) assert len(storage._objects["revision"]) == len(revisions) assert len(storage._objects["release"]) == len(releases) assert len(storage._objects["snapshot"]) == len(snapshots) # clear current buffer from all object types s = storage.clear_buffers() # type: ignore assert s is None assert len(storage._objects["content"]) == 0 assert len(storage._objects["skipped_content"]) == 0 assert len(storage._objects["directory"]) == 0 assert len(storage._objects["revision"]) == 0 assert len(storage._objects["release"]) == 0 assert len(storage._objects["snapshot"]) == 0 def test_buffer_proxy_with_default_args() -> None: storage = get_storage_with_buffer_config() assert storage is not None def test_buffer_flush_stats(sample_data) -> None: storage = get_storage_with_buffer_config() s = storage.content_add(sample_data.contents) assert s == {} s = storage.skipped_content_add(sample_data.skipped_contents) assert s == {} s = storage.directory_add(sample_data.directories) assert s == {} s = storage.revision_add(sample_data.revisions) assert s == {} s = storage.release_add(sample_data.releases) assert s == {} s = storage.snapshot_add(sample_data.snapshots) assert s == {} # Flush all the things s = storage.flush() assert s["content:add"] > 0 assert s["content:add:bytes"] > 0 assert s["skipped_content:add"] > 0 assert s["directory:add"] > 0 assert s["revision:add"] > 0 assert s["release:add"] > 0 assert s["snapshot:add"] > 0 def test_buffer_operation_order(sample_data) -> None: storage = get_storage_with_buffer_config() # Wrap the inner storage in a mock to track all method calls. storage.storage = mocked_storage = Mock(wraps=storage.storage) # Simulate a loader: add contents, directories, revisions, releases, then # snapshots. storage.content_add(sample_data.contents) storage.skipped_content_add(sample_data.skipped_contents) storage.directory_add(sample_data.directories) storage.revision_add(sample_data.revisions) storage.release_add(sample_data.releases) storage.snapshot_add(sample_data.snapshots) # Check that nothing has been flushed yet assert mocked_storage.method_calls == [] # Flush all the things storage.flush() methods_called = [c[0] for c in mocked_storage.method_calls] prev = -1 for method in [ "content_add", "skipped_content_add", "directory_add", "revision_add", "release_add", "snapshot_add", "flush", ]: try: cur: Optional[int] = methods_called.index(method) except ValueError: cur = None assert cur is not None, "Method %s not called" % method assert cur > prev, "Method %s called out of order; all calls were: %s" % ( method, methods_called, ) prev = cur diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py index e448b278..5fad7c86 100644 --- a/swh/storage/tests/test_init.py +++ b/swh/storage/tests/test_init.py @@ -1,232 +1,232 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from unittest.mock import patch import pytest from swh.core.pytest_plugin import RPCTestAdapter from swh.storage import get_storage from swh.storage.api import client, server -from swh.storage.buffer import BufferingProxyStorage -from swh.storage.filter import FilteringProxyStorage from swh.storage.in_memory import InMemoryStorage from swh.storage.postgresql.storage import Storage as DbStorage -from swh.storage.retry import RetryingProxyStorage +from swh.storage.proxies.buffer import BufferingProxyStorage +from swh.storage.proxies.filter import FilteringProxyStorage +from swh.storage.proxies.retry import RetryingProxyStorage STORAGES = [ pytest.param(cls, real_class, kwargs, id=cls) for (cls, real_class, kwargs) in [ ("remote", client.RemoteStorage, {"url": "url"}), ("memory", InMemoryStorage, {}), ( "local", DbStorage, {"db": "postgresql://db", "objstorage": {"cls": "memory"}}, ), ("filter", FilteringProxyStorage, {"storage": {"cls": "memory"}}), ("buffer", BufferingProxyStorage, {"storage": {"cls": "memory"}}), ("retry", RetryingProxyStorage, {"storage": {"cls": "memory"}}), ] ] @pytest.mark.parametrize("cls,real_class,args", STORAGES) @patch("swh.storage.postgresql.storage.psycopg2.pool") def test_get_storage(mock_pool, cls, real_class, args): """Instantiating an existing storage should be ok """ mock_pool.ThreadedConnectionPool.return_value = None actual_storage = get_storage(cls, **args) assert actual_storage is not None assert isinstance(actual_storage, real_class) @pytest.mark.parametrize("cls,real_class,args", STORAGES) @patch("swh.storage.postgresql.storage.psycopg2.pool") def test_get_storage_legacy_args(mock_pool, cls, real_class, args): """Instantiating an existing storage should be ok even with the legacy explicit 'args' keys """ mock_pool.ThreadedConnectionPool.return_value = None with pytest.warns(DeprecationWarning): actual_storage = get_storage(cls, args=args) assert actual_storage is not None assert isinstance(actual_storage, real_class) def test_get_storage_failure(): """Instantiating an unknown storage should raise """ with pytest.raises(ValueError, match="Unknown storage class `unknown`"): get_storage("unknown") def test_get_storage_pipeline(): config = { "cls": "pipeline", "steps": [ {"cls": "filter",}, {"cls": "buffer", "min_batch_size": {"content": 10,},}, {"cls": "memory",}, ], } storage = get_storage(**config) assert isinstance(storage, FilteringProxyStorage) assert isinstance(storage.storage, BufferingProxyStorage) assert isinstance(storage.storage.storage, InMemoryStorage) def test_get_storage_pipeline_legacy_args(): config = { "cls": "pipeline", "steps": [ {"cls": "filter",}, {"cls": "buffer", "args": {"min_batch_size": {"content": 10,},}}, {"cls": "memory",}, ], } with pytest.warns(DeprecationWarning): storage = get_storage(**config) assert isinstance(storage, FilteringProxyStorage) assert isinstance(storage.storage, BufferingProxyStorage) assert isinstance(storage.storage.storage, InMemoryStorage) # get_storage's check_config argument tests # the "remote" and "pipeline" cases are tested in dedicated test functions below @pytest.mark.parametrize( "cls,real_class,kwargs", [x for x in STORAGES if x.id not in ("remote", "local")] ) def test_get_storage_check_config(cls, real_class, kwargs, monkeypatch): """Instantiating an existing storage with check_config should be ok """ check_backend_check_config(monkeypatch, dict(cls=cls, **kwargs)) @patch("swh.storage.postgresql.storage.psycopg2.pool") def test_get_storage_local_check_config(mock_pool, monkeypatch): """Instantiating a local storage with check_config should be ok """ mock_pool.ThreadedConnectionPool.return_value = None check_backend_check_config( monkeypatch, {"cls": "local", "db": "postgresql://db", "objstorage": {"cls": "memory"}}, backend_storage_cls=DbStorage, ) def test_get_storage_pipeline_check_config(monkeypatch): """Test that the check_config option works as intended for a pipelined storage""" config = { "cls": "pipeline", "steps": [ {"cls": "filter",}, {"cls": "buffer", "min_batch_size": {"content": 10,},}, {"cls": "memory",}, ], } check_backend_check_config( monkeypatch, config, ) def test_get_storage_remote_check_config(monkeypatch): """Test that the check_config option works as intended for a remote storage""" monkeypatch.setattr( server, "storage", get_storage(cls="memory", journal_writer={"cls": "memory"}) ) test_client = server.app.test_client() class MockedRemoteStorage(client.RemoteStorage): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.session.adapters.clear() self.session.mount("mock://", RPCTestAdapter(test_client)) monkeypatch.setattr(client, "RemoteStorage", MockedRemoteStorage) config = { "cls": "remote", "url": "mock://example.com", } check_backend_check_config( monkeypatch, config, ) def check_backend_check_config( monkeypatch, config, backend_storage_cls=InMemoryStorage ): """Check the staged/indirect storage (pipeline or remote) works as desired with regard to the check_config option of the get_storage() factory function. If set, the check_config argument is used to call the Storage.check_config() at instantiation time in the get_storage() factory function. This is supposed to be passed through each step of the Storage pipeline until it reached the actual backend's (typically in memory or local) check_config() method which will perform the verification for read/write access to the backend storage. monkeypatch is supposed to be the monkeypatch pytest fixture to be used from the calling test_ function. config is the config dict passed to get_storage() backend_storage_cls is the class of the backend storage to be mocked to simulate the check_config behavior; it should then be the class of the actual backend storage defined in the `config`. """ access = None def mockcheck(self, check_write=False): if access == "none": return False if access == "read": return check_write is False if access == "write": return True monkeypatch.setattr(backend_storage_cls, "check_config", mockcheck) # simulate no read nor write access to the underlying (memory) storage access = "none" # by default, no check, so no complain assert get_storage(**config) # if asked to check, complain with pytest.raises(EnvironmentError): get_storage(check_config={"check_write": False}, **config) with pytest.raises(EnvironmentError): get_storage(check_config={"check_write": True}, **config) # simulate no write access to the underlying (memory) storage access = "read" # by default, no check so no complain assert get_storage(**config) # if asked to check for read access, no complain get_storage(check_config={"check_write": False}, **config) # if asked to check for write access, complain with pytest.raises(EnvironmentError): get_storage(check_config={"check_write": True}, **config) # simulate read & write access to the underlying (memory) storage access = "write" # by default, no check so no complain assert get_storage(**config) # if asked to check for read access, no complain get_storage(check_config={"check_write": False}, **config) # if asked to check for write access, no complain get_storage(check_config={"check_write": True}, **config) diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py index 132acdad..4b0b75fc 100644 --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -1,197 +1,197 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from unittest.mock import call import attr import psycopg2 import pytest from swh.storage.exc import HashCollision, StorageArgumentException from swh.storage.utils import now @pytest.fixture def monkeypatch_sleep(monkeypatch, swh_storage): """In test context, we don't want to wait, make test faster """ - from swh.storage.retry import RetryingProxyStorage + from swh.storage.proxies.retry import RetryingProxyStorage for method_name, method in RetryingProxyStorage.__dict__.items(): if "_add" in method_name or "_update" in method_name: monkeypatch.setattr(method.retry, "sleep", lambda x: None) return monkeypatch @pytest.fixture def fake_hash_collision(sample_data): return HashCollision("sha1", "38762cf7f55934b34d179ae6a4c80cadccbb7f0a", []) @pytest.fixture def swh_storage_backend_config(): yield { "cls": "pipeline", "steps": [{"cls": "retry"}, {"cls": "memory"},], } def test_retrying_proxy_storage_content_add(swh_storage, sample_data): """Standard content_add works as before """ sample_content = sample_data.content content = swh_storage.content_get_data(sample_content.sha1) assert content is None s = swh_storage.content_add([sample_content]) assert s == { "content:add": 1, "content:add:bytes": sample_content.length, } content = swh_storage.content_get_data(sample_content.sha1) assert content == sample_content.data def test_retrying_proxy_storage_content_add_with_retry( monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision, ): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add") mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("content already inserted"), # ok then! {"content:add": 1}, ] sample_content = sample_data.content content = swh_storage.content_get_data(sample_content.sha1) assert content is None s = swh_storage.content_add([sample_content]) assert s == {"content:add": 1} mock_memory.assert_has_calls( [call([sample_content]), call([sample_content]), call([sample_content]),] ) def test_retrying_proxy_swh_storage_content_add_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add") mock_memory.side_effect = StorageArgumentException("Refuse to add content always!") sample_content = sample_data.content content = swh_storage.content_get_data(sample_content.sha1) assert content is None with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.content_add([sample_content]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_content_add_metadata(swh_storage, sample_data): """Standard content_add_metadata works as before """ sample_content = sample_data.content content = attr.evolve(sample_content, data=None) pk = content.sha1 content_metadata = swh_storage.content_get([pk]) assert content_metadata == [None] s = swh_storage.content_add_metadata([attr.evolve(content, ctime=now())]) assert s == { "content:add": 1, } content_metadata = swh_storage.content_get([pk]) assert len(content_metadata) == 1 assert content_metadata[0].sha1 == pk def test_retrying_proxy_storage_content_add_metadata_with_retry( monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.content_add_metadata" ) mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("content_metadata already inserted"), # ok then! {"content:add": 1}, ] sample_content = sample_data.content content = attr.evolve(sample_content, data=None) s = swh_storage.content_add_metadata([content]) assert s == {"content:add": 1} mock_memory.assert_has_calls( [call([content]), call([content]), call([content]),] ) def test_retrying_proxy_swh_storage_content_add_metadata_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.content_add_metadata" ) mock_memory.side_effect = StorageArgumentException( "Refuse to add content_metadata!" ) sample_content = sample_data.content content = attr.evolve(sample_content, data=None) with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.content_add_metadata([content]) assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_keyboardinterrupt(swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add") mock_memory.side_effect = KeyboardInterrupt() sample_content = sample_data.content content = swh_storage.content_get_data(sample_content.sha1) assert content is None with pytest.raises(KeyboardInterrupt): swh_storage.content_add([sample_content]) assert mock_memory.call_count == 1 diff --git a/swh/storage/tests/test_tenacious.py b/swh/storage/tests/test_tenacious.py index 960eaa78..cddacf06 100644 --- a/swh/storage/tests/test_tenacious.py +++ b/swh/storage/tests/test_tenacious.py @@ -1,394 +1,394 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from contextlib import contextmanager from unittest.mock import patch import attr import pytest from swh.model import model from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.storage import get_storage from swh.storage.in_memory import InMemoryStorage -from swh.storage.tenacious import TenaciousProxyStorage +from swh.storage.proxies.tenacious import TenaciousProxyStorage from swh.storage.tests.storage_data import StorageData from swh.storage.tests.storage_tests import ( TestStorageGeneratedData as _TestStorageGeneratedData, ) from swh.storage.tests.storage_tests import TestStorage as _TestStorage # noqa data = StorageData() collections = { "origin": data.origins, "content": data.contents, "skipped_content": data.skipped_contents, "revision": data.revisions, "directory": data.directories, "release": data.releases, "snapshot": data.snapshots, } # generic storage tests (using imported TestStorage* classes) @pytest.fixture def swh_storage_backend_config2(): yield { "cls": "memory", "journal_writer": {"cls": "memory",}, } @pytest.fixture def swh_storage(): storage_config = { "cls": "pipeline", "steps": [ {"cls": "tenacious"}, {"cls": "memory", "journal_writer": {"cls": "memory",}}, ], } storage = get_storage(**storage_config) storage.journal_writer = storage.storage.journal_writer return storage class TestTenaciousStorage(_TestStorage): @pytest.mark.skip( 'The "person" table of the pgsql is a legacy thing, and not ' "supported by the cassandra/in-memory backend." ) def test_person_fullname_unicity(self): pass @pytest.mark.skip(reason="No collision with the tenacious storage") def test_content_add_collision(self, swh_storage, sample_data): pass @pytest.mark.skip("content_update is not implemented") def test_content_update(self): pass @pytest.mark.skip("Not supported by Cassandra/InMemory storage") def test_origin_count(self): pass class TestTenaciousStorageGeneratedData(_TestStorageGeneratedData): @pytest.mark.skip("Not supported by Cassandra/InMemory") def test_origin_count(self): pass @pytest.mark.skip("Not supported by Cassandra/InMemory") def test_origin_count_with_visit_no_visits(self): pass @pytest.mark.skip("Not supported by Cassandra/InMemory") def test_origin_count_with_visit_with_visits_and_snapshot(self): pass @pytest.mark.skip("Not supported by Cassandra/InMemory") def test_origin_count_with_visit_with_visits_no_snapshot(self): pass # specific tests for the tenacious behavior def get_tenacious_storage(**config): storage_config = { "cls": "pipeline", "steps": [ {"cls": "validate"}, {"cls": "tenacious", **config}, {"cls": "memory"}, ], } return get_storage(**storage_config) @contextmanager def disabled_validators(): attr.set_run_validators(False) yield attr.set_run_validators(True) def popid(d): d.pop("id") return d testdata = [ pytest.param( "content", list(TEST_OBJECTS["content"]), attr.evolve(model.Content.from_data(data=b"too big"), length=1000), attr.evolve(model.Content.from_data(data=b"to fail"), length=1000), id="content", ), pytest.param( "skipped_content", list(TEST_OBJECTS["skipped_content"]), attr.evolve( model.SkippedContent.from_data(data=b"too big", reason="too big"), length=1000, ), attr.evolve( model.SkippedContent.from_data(data=b"to fail", reason="to fail"), length=1000, ), id="skipped_content", ), pytest.param( "directory", list(TEST_OBJECTS["directory"]), data.directory, data.directory2, id="directory", ), pytest.param( "revision", list(TEST_OBJECTS["revision"]), data.revision, data.revision2, id="revision", ), pytest.param( "release", list(TEST_OBJECTS["release"]), data.release, data.release2, id="release", ), pytest.param( "snapshot", list(TEST_OBJECTS["snapshot"]), data.snapshot, data.complete_snapshot, id="snapshot", ), pytest.param( "origin", list(TEST_OBJECTS["origin"]), data.origin, data.origin2, id="origin", ), ] class LimitedInMemoryStorage(InMemoryStorage): # forbidden are 'bad1' and 'bad2' arguments of `testdata` forbidden = [x[0][2] for x in testdata] + [x[0][3] for x in testdata] def __init__(self, *args, **kw): self.add_calls = Counter() super().__init__(*args, **kw) def reset(self): super().reset() self.add_calls.clear() def content_add(self, contents): return self._maybe_add(super().content_add, "content", contents) def skipped_content_add(self, skipped_contents): return self._maybe_add( super().skipped_content_add, "skipped_content", skipped_contents ) def origin_add(self, origins): return self._maybe_add(super().origin_add, "origin", origins) def directory_add(self, directories): return self._maybe_add(super().directory_add, "directory", directories) def revision_add(self, revisions): return self._maybe_add(super().revision_add, "revision", revisions) def release_add(self, releases): return self._maybe_add(super().release_add, "release", releases) def snapshot_add(self, snapshots): return self._maybe_add(super().snapshot_add, "snapshot", snapshots) def _maybe_add(self, add_func, object_type, objects): self.add_calls[object_type] += 1 if any(c in self.forbidden for c in objects): raise ValueError( f"{object_type} is forbidden", [c.unique_key() for c in objects if c in self.forbidden], ) return add_func(objects) @patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage) @pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata) def test_tenacious_proxy_storage(object_type, objects, bad1, bad2): storage = get_tenacious_storage() tenacious = storage.storage in_memory = tenacious.storage assert isinstance(tenacious, TenaciousProxyStorage) assert isinstance(in_memory, LimitedInMemoryStorage) size = len(objects) add_func = getattr(storage, f"{object_type}_add") # Note: when checking the LimitedInMemoryStorage.add_calls counter, it's # hard to guess the exact number of calls in the end (depends on the size # of batch and the position of bad objects in this batch). So we will only # check a lower limit of the form (n + m), where n is the minimum expected # number of additions (due to the batch begin split), and m is the fact # that bad objects are tried (individually) several (3) times before giving # up. So for one bad object, m is 3; for 2 bad objects, m is 6, etc. s = add_func(objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 0 assert storage.add_calls[object_type] == (1 + 0) in_memory.reset() tenacious.reset() # bad1 is the last element s = add_func(objects + [bad1]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 assert storage.add_calls[object_type] >= (2 + 3) in_memory.reset() tenacious.reset() # bad1 and bad2 are the last elements s = add_func(objects + [bad1, bad2]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() # bad1 is the first element s = add_func([bad1] + objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 assert storage.add_calls[object_type] >= (2 + 3) in_memory.reset() tenacious.reset() # bad1 and bad2 are the first elements s = add_func([bad1, bad2] + objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() # bad1 is in the middle of the list of inserted elements s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 assert storage.add_calls[object_type] >= (3 + 3) in_memory.reset() tenacious.reset() # bad1 and bad2 are together in the middle of the list of inserted elements s = add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() # bad1 and bad2 are spread in the middle of the list of inserted elements s = add_func( objects[: size // 3] + [bad1] + objects[size // 3 : 2 * (size // 3)] + [bad2] + objects[2 * (size // 3) :] ) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 assert storage.add_calls[object_type] >= (3 + 6) in_memory.reset() tenacious.reset() # bad1 is the only element s = add_func([bad1]) assert s.get(f"{object_type}:add", 0) == 0 assert s.get(f"{object_type}:add:errors", 0) == 1 assert storage.add_calls[object_type] == (0 + 3) in_memory.reset() tenacious.reset() # bad1 and bad2 are the only elements s = add_func([bad1, bad2]) assert s.get(f"{object_type}:add", 0) == 0 assert s.get(f"{object_type}:add:errors", 0) == 2 assert storage.add_calls[object_type] == (1 + 6) in_memory.reset() tenacious.reset() @patch("swh.storage.in_memory.InMemoryStorage", LimitedInMemoryStorage) @pytest.mark.parametrize("object_type, objects, bad1, bad2", testdata) def test_tenacious_proxy_storage_rate_limit(object_type, objects, bad1, bad2): storage = get_tenacious_storage(error_rate_limit={"errors": 1, "window_size": 3}) tenacious = storage.storage in_memory = tenacious.storage assert isinstance(tenacious, TenaciousProxyStorage) assert isinstance(in_memory, LimitedInMemoryStorage) size = len(objects) add_func = getattr(storage, f"{object_type}_add") # with no insertion failure, no impact s = add_func(objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 0 in_memory.reset() tenacious.reset() # with one insertion failure, no impact s = add_func([bad1] + objects) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 in_memory.reset() tenacious.reset() s = add_func(objects[: size // 2] + [bad1] + objects[size // 2 :]) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 1 in_memory.reset() tenacious.reset() # with two consecutive insertion failures, exception is raised with pytest.raises(RuntimeError, match="Too many insertion errors"): add_func([bad1, bad2] + objects) in_memory.reset() tenacious.reset() if size > 2: # with two consecutive insertion failures, exception is raised # (errors not at the beginning) with pytest.raises(RuntimeError, match="Too many insertion errors"): add_func(objects[: size // 2] + [bad1, bad2] + objects[size // 2 :]) in_memory.reset() tenacious.reset() # with two non-consecutive insertion failures, no impact # (errors are far enough to not reach the rate limit) s = add_func( objects[: size // 3] + [bad1] + objects[size // 3 : 2 * (size // 3)] + [bad2] + objects[2 * (size // 3) :] ) assert s.get(f"{object_type}:add", 0) == size assert s.get(f"{object_type}:add:errors", 0) == 2 in_memory.reset() tenacious.reset()