diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py index 1e192790..b8a3be08 100644 --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -1,96 +1,104 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import importlib from typing import Any, Dict, List import warnings from .interface import StorageInterface STORAGE_IMPLEMENTATIONS = { "local": ".postgresql.storage.Storage", "remote": ".api.client.RemoteStorage", "memory": ".in_memory.InMemoryStorage", "filter": ".filter.FilteringProxyStorage", "buffer": ".buffer.BufferingProxyStorage", "retry": ".retry.RetryingProxyStorage", "cassandra": ".cassandra.CassandraStorage", } def get_storage(cls: str, **kwargs) -> StorageInterface: """Get a storage object of class `storage_class` with arguments `storage_args`. Args: storage (dict): dictionary with keys: - cls (str): storage's class, either local, remote, memory, filter, buffer - args (dict): dictionary with keys Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] if cls == "pipeline": return get_storage_pipeline(**kwargs) class_path = STORAGE_IMPLEMENTATIONS.get(cls) if class_path is None: raise ValueError( "Unknown storage class `%s`. Supported: %s" % (cls, ", ".join(STORAGE_IMPLEMENTATIONS)) ) (module_path, class_name) = class_path.rsplit(".", 1) module = importlib.import_module(module_path, package=__package__) Storage = getattr(module, class_name) - return Storage(**kwargs) + check_config = kwargs.pop("check_config", {}) + storage = Storage(**kwargs) + if check_config: + if not storage.check_config(**check_config): + raise EnvironmentError("storage check config failed") + return storage -def get_storage_pipeline(steps: List[Dict[str, Any]]) -> StorageInterface: +def get_storage_pipeline( + steps: List[Dict[str, Any]], check_config=None +) -> StorageInterface: """Recursively get a storage object that may use other storage objects as backends. Args: steps (List[dict]): List of dicts that may be used as kwargs for `get_storage`. Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ storage_config = None for step in reversed(steps): if "args" in step: warnings.warn( 'Explicit "args" key is deprecated, use keys directly ' "instead.", DeprecationWarning, ) step = { "cls": step["cls"], **step["args"], } if storage_config: step["storage"] = storage_config + step["check_config"] = check_config storage_config = step if storage_config is None: raise ValueError("'pipeline' has no steps.") return get_storage(**storage_config) diff --git a/swh/storage/pytest_plugin.py b/swh/storage/pytest_plugin.py index 90a5dd3e..7e7fbfdb 100644 --- a/swh/storage/pytest_plugin.py +++ b/swh/storage/pytest_plugin.py @@ -1,200 +1,201 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob from os import path, environ from typing import Union import pytest import swh.storage from pytest_postgresql import factories from pytest_postgresql.janitor import DatabaseJanitor, psycopg2, Version from swh.core.utils import numfile_sortkey as sortkey from swh.storage import get_storage from swh.storage.tests.storage_data import StorageData SQL_DIR = path.join(path.dirname(swh.storage.__file__), "sql") environ["LC_ALL"] = "C.UTF-8" DUMP_FILES = path.join(SQL_DIR, "*.sql") @pytest.fixture def swh_storage_backend_config(postgresql_proc, swh_storage_postgresql): """Basic pg storage configuration with no journal collaborator (to avoid pulling optional dependency on clients of this fixture) """ yield { "cls": "local", "db": "postgresql://{user}@{host}:{port}/{dbname}".format( host=postgresql_proc.host, port=postgresql_proc.port, user="postgres", dbname="tests", ), "objstorage": {"cls": "memory", "args": {}}, + "check_config": {"check_write": True}, } @pytest.fixture def swh_storage(swh_storage_backend_config): return get_storage(**swh_storage_backend_config) # the postgres_fact factory fixture below is mostly a copy of the code # from pytest-postgresql. We need a custom version here to be able to # specify our version of the DBJanitor we use. def postgresql_fact(process_fixture_name, db_name=None, dump_files=DUMP_FILES): @pytest.fixture def postgresql_factory(request): """ Fixture factory for PostgreSQL. :param FixtureRequest request: fixture request object :rtype: psycopg2.connection :returns: postgresql client """ config = factories.get_config(request) if not psycopg2: raise ImportError("No module named psycopg2. Please install it.") proc_fixture = request.getfixturevalue(process_fixture_name) # _, config = try_import('psycopg2', request) pg_host = proc_fixture.host pg_port = proc_fixture.port pg_user = proc_fixture.user pg_options = proc_fixture.options pg_db = db_name or config["dbname"] with SwhDatabaseJanitor( pg_user, pg_host, pg_port, pg_db, proc_fixture.version, dump_files=dump_files, ): connection = psycopg2.connect( dbname=pg_db, user=pg_user, host=pg_host, port=pg_port, options=pg_options, ) yield connection connection.close() return postgresql_factory swh_storage_postgresql = postgresql_fact("postgresql_proc") # This version of the DatabaseJanitor implement a different setup/teardown # behavior than than the stock one: instead of dropping, creating and # initializing the database for each test, it create and initialize the db only # once, then it truncate the tables. This is needed to have acceptable test # performances. class SwhDatabaseJanitor(DatabaseJanitor): def __init__( self, user: str, host: str, port: str, db_name: str, version: Union[str, float, Version], dump_files: str = DUMP_FILES, ) -> None: super().__init__(user, host, port, db_name, version) self.dump_files = sorted(glob.glob(dump_files), key=sortkey) def db_setup(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: for fname in self.dump_files: with open(fname) as fobj: sql = fobj.read().replace("concurrently", "").strip() if sql: cur.execute(sql) cnx.commit() def db_reset(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: cur.execute( "SELECT table_name FROM information_schema.tables " "WHERE table_schema = %s", ("public",), ) tables = set(table for (table,) in cur.fetchall()) - {"dbversion"} for table in tables: cur.execute("truncate table %s cascade" % table) cur.execute( "SELECT sequence_name FROM information_schema.sequences " "WHERE sequence_schema = %s", ("public",), ) seqs = set(seq for (seq,) in cur.fetchall()) for seq in seqs: cur.execute("ALTER SEQUENCE %s RESTART;" % seq) cnx.commit() def init(self): with self.cursor() as cur: cur.execute( "SELECT COUNT(1) FROM pg_database WHERE datname=%s;", (self.db_name,) ) db_exists = cur.fetchone()[0] == 1 if db_exists: cur.execute( "UPDATE pg_database SET datallowconn=true " "WHERE datname = %s;", (self.db_name,), ) if db_exists: self.db_reset() else: with self.cursor() as cur: cur.execute('CREATE DATABASE "{}";'.format(self.db_name)) self.db_setup() def drop(self): pid_column = "pid" with self.cursor() as cur: cur.execute( "UPDATE pg_database SET datallowconn=false " "WHERE datname = %s;", (self.db_name,), ) cur.execute( "SELECT pg_terminate_backend(pg_stat_activity.{})" "FROM pg_stat_activity " "WHERE pg_stat_activity.datname = %s;".format(pid_column), (self.db_name,), ) @pytest.fixture def sample_data() -> StorageData: """Pre-defined sample storage object data to manipulate Returns: StorageData whose attribute keys are data model objects. Either multiple objects: contents, directories, revisions, releases, ... or simple ones: content, directory, revision, release, ... """ return StorageData() diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py index de487261..1b6bdb28 100644 --- a/swh/storage/tests/test_init.py +++ b/swh/storage/tests/test_init.py @@ -1,107 +1,238 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from unittest.mock import patch +from swh.core.pytest_plugin import RPCTestAdapter from swh.storage import get_storage - -from swh.storage.api.client import RemoteStorage +from swh.storage.api import server +from swh.storage.api import client from swh.storage.postgresql.storage import Storage as DbStorage from swh.storage.in_memory import InMemoryStorage from swh.storage.buffer import BufferingProxyStorage from swh.storage.filter import FilteringProxyStorage from swh.storage.retry import RetryingProxyStorage -@patch("swh.storage.postgresql.storage.psycopg2.pool") -def test_get_storage(mock_pool): - """Instantiating an existing storage should be ok - - """ - mock_pool.ThreadedConnectionPool.return_value = None - for cls, real_class, dummy_args in [ - ("remote", RemoteStorage, {"url": "url"}), +STORAGES = [ + pytest.param(cls, real_class, kwargs, id=cls) + for (cls, real_class, kwargs) in [ + ("remote", client.RemoteStorage, {"url": "url"}), ("memory", InMemoryStorage, {}), ( "local", DbStorage, - {"db": "postgresql://db", "objstorage": {"cls": "memory", "args": {},},}, + {"db": "postgresql://db", "objstorage": {"cls": "memory", "args": {}}}, ), ("filter", FilteringProxyStorage, {"storage": {"cls": "memory"}}), ("buffer", BufferingProxyStorage, {"storage": {"cls": "memory"}}), ("retry", RetryingProxyStorage, {"storage": {"cls": "memory"}}), - ]: - actual_storage = get_storage(cls, **dummy_args) - assert actual_storage is not None - assert isinstance(actual_storage, real_class) + ] +] + + +@pytest.mark.parametrize("cls,real_class,args", STORAGES) +@patch("swh.storage.postgresql.storage.psycopg2.pool") +def test_get_storage(mock_pool, cls, real_class, args): + """Instantiating an existing storage should be ok + + """ + mock_pool.ThreadedConnectionPool.return_value = None + actual_storage = get_storage(cls, **args) + assert actual_storage is not None + assert isinstance(actual_storage, real_class) +@pytest.mark.parametrize("cls,real_class,args", STORAGES) @patch("swh.storage.postgresql.storage.psycopg2.pool") -def test_get_storage_legacy_args(mock_pool): +def test_get_storage_legacy_args(mock_pool, cls, real_class, args): """Instantiating an existing storage should be ok even with the legacy explicit 'args' keys """ mock_pool.ThreadedConnectionPool.return_value = None - for cls, real_class, dummy_args in [ - ("remote", RemoteStorage, {"url": "url"}), - ("memory", InMemoryStorage, {}), - ( - "local", - DbStorage, - {"db": "postgresql://db", "objstorage": {"cls": "memory", "args": {},},}, - ), - ("filter", FilteringProxyStorage, {"storage": {"cls": "memory", "args": {}}}), - ("buffer", BufferingProxyStorage, {"storage": {"cls": "memory", "args": {}}}), - ]: - with pytest.warns(DeprecationWarning): - actual_storage = get_storage(cls, args=dummy_args) + with pytest.warns(DeprecationWarning): + actual_storage = get_storage(cls, args=args) assert actual_storage is not None assert isinstance(actual_storage, real_class) def test_get_storage_failure(): """Instantiating an unknown storage should raise """ with pytest.raises(ValueError, match="Unknown storage class `unknown`"): get_storage("unknown", args=[]) def test_get_storage_pipeline(): config = { "cls": "pipeline", "steps": [ {"cls": "filter",}, {"cls": "buffer", "min_batch_size": {"content": 10,},}, {"cls": "memory",}, ], } storage = get_storage(**config) assert isinstance(storage, FilteringProxyStorage) assert isinstance(storage.storage, BufferingProxyStorage) assert isinstance(storage.storage.storage, InMemoryStorage) def test_get_storage_pipeline_legacy_args(): config = { "cls": "pipeline", "steps": [ {"cls": "filter",}, {"cls": "buffer", "args": {"min_batch_size": {"content": 10,},}}, {"cls": "memory",}, ], } with pytest.warns(DeprecationWarning): storage = get_storage(**config) assert isinstance(storage, FilteringProxyStorage) assert isinstance(storage.storage, BufferingProxyStorage) assert isinstance(storage.storage.storage, InMemoryStorage) + + +# get_storage's check_config argument tests + +# the "remote" and "pipeline" cases are tested in dedicated test functions below +@pytest.mark.parametrize( + "cls,real_class,kwargs", [x for x in STORAGES if x.id not in ("remote", "local")] +) +def test_get_storage_check_config(cls, real_class, kwargs, monkeypatch): + """Instantiating an existing storage with check_config should be ok + + """ + check_backend_check_config(monkeypatch, dict(cls=cls, **kwargs)) + + +@patch("swh.storage.postgresql.storage.psycopg2.pool") +def test_get_storage_local_check_config(mock_pool, monkeypatch): + """Instantiating a local storage with check_config should be ok + + """ + mock_pool.ThreadedConnectionPool.return_value = None + check_backend_check_config( + monkeypatch, + { + "cls": "local", + "db": "postgresql://db", + "objstorage": {"cls": "memory", "args": {}}, + }, + backend_storage_cls=DbStorage, + ) + + +def test_get_storage_pipeline_check_config(monkeypatch): + """Test that the check_config option works as intended for a pipelined storage""" + config = { + "cls": "pipeline", + "steps": [ + {"cls": "filter",}, + {"cls": "buffer", "min_batch_size": {"content": 10,},}, + {"cls": "memory",}, + ], + } + check_backend_check_config( + monkeypatch, config, + ) + + +def test_get_storage_remote_check_config(monkeypatch): + """Test that the check_config option works as intended for a remote storage""" + + monkeypatch.setattr( + server, "storage", get_storage(cls="memory", journal_writer={"cls": "memory"}) + ) + test_client = server.app.test_client() + + class MockedRemoteStorage(client.RemoteStorage): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.session.adapters.clear() + self.session.mount("mock://", RPCTestAdapter(test_client)) + + monkeypatch.setattr(client, "RemoteStorage", MockedRemoteStorage) + + config = { + "cls": "remote", + "url": "mock://example.com", + } + check_backend_check_config( + monkeypatch, config, + ) + + +def check_backend_check_config( + monkeypatch, config, backend_storage_cls=InMemoryStorage +): + """Check the staged/indirect storage (pipeline or remote) works + as desired with regard to the check_config option of the get_storage() + factory function. + + If set, the check_config argument is used to call the Storage.check_config() at + instantiation time in the get_storage() factory function. This is supposed to be + passed through each step of the Storage pipeline until it reached the actual + backend's (typically in memory or local) check_config() method which will perform + the verification for read/write access to the backend storage. + + monkeypatch is supposed to be the monkeypatch pytest fixture to be used from the + calling test_ function. + + config is the config dict passed to get_storage() + + backend_storage_cls is the class of the backend storage to be mocked to + simulate the check_config behavior; it should then be the class of the + actual backend storage defined in the `config`. + """ + access = None + + def mockcheck(self, check_write=False): + if access == "none": + return False + if access == "read": + return check_write is False + if access == "write": + return True + + monkeypatch.setattr(backend_storage_cls, "check_config", mockcheck) + + # simulate no read nor write access to the underlying (memory) storage + access = "none" + # by default, no check, so no complain + assert get_storage(**config) + # if asked to check, complain + with pytest.raises(EnvironmentError): + get_storage(check_config={"check_write": False}, **config) + with pytest.raises(EnvironmentError): + get_storage(check_config={"check_write": True}, **config) + + # simulate no write access to the underlying (memory) storage + access = "read" + # by default, no check so no complain + assert get_storage(**config) + # if asked to check for read access, no complain + get_storage(check_config={"check_write": False}, **config) + # if asked to check for write access, complain + with pytest.raises(EnvironmentError): + get_storage(check_config={"check_write": True}, **config) + + # simulate read & write access to the underlying (memory) storage + access = "write" + # by default, no check so no complain + assert get_storage(**config) + # if asked to check for read access, no complain + get_storage(check_config={"check_write": False}, **config) + # if asked to check for write access, no complain + get_storage(check_config={"check_write": True}, **config)