diff --git a/requirements-swh.txt b/requirements-swh.txt index 489c6ec6..1c2878ea 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,4 @@ swh.core[db,http] >= 0.14.0 +swh.counters >= v0.8.0 swh.model >= 2.1.0 swh.objstorage >= 0.2.2 diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py index 0a76a202..cd342de8 100644 --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -1,121 +1,122 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import importlib from typing import TYPE_CHECKING, Any, Dict, List import warnings if TYPE_CHECKING: from .interface import StorageInterface STORAGE_IMPLEMENTATIONS = { "remote": ".api.client.RemoteStorage", "memory": ".in_memory.InMemoryStorage", "cassandra": ".cassandra.CassandraStorage", "postgresql": ".postgresql.storage.Storage", # deprecated "local": ".postgresql.storage.Storage", # proxy storages - "filter": ".proxies.filter.FilteringProxyStorage", "buffer": ".proxies.buffer.BufferingProxyStorage", + "counter": ".proxies.counter.CountingProxyStorage", + "filter": ".proxies.filter.FilteringProxyStorage", "retry": ".proxies.retry.RetryingProxyStorage", - "validate": ".proxies.validate.ValidatingProxyStorage", "tenacious": ".proxies.tenacious.TenaciousProxyStorage", + "validate": ".proxies.validate.ValidatingProxyStorage", } def get_storage(cls: str, **kwargs) -> "StorageInterface": """Get a storage object of class `storage_class` with arguments `storage_args`. Args: cls (str): storage's class, can be: - ``local`` to use a postgresql database - ``cassandra`` to use a cassandra database - ``remote`` to connect to a swh-storage server - ``memory`` for an in-memory storage, useful for fast tests - ``filter``, ``buffer``, ... to use specific storage "proxies", see their respective documentations args (dict): dictionary with keys Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] if cls == "pipeline": return get_storage_pipeline(**kwargs) if cls == "local": warnings.warn( 'The "local" storage class is deprecated, use "postgresql" instead.', DeprecationWarning, ) class_path = STORAGE_IMPLEMENTATIONS.get(cls) if class_path is None: raise ValueError( "Unknown storage class `%s`. Supported: %s" % (cls, ", ".join(STORAGE_IMPLEMENTATIONS)) ) (module_path, class_name) = class_path.rsplit(".", 1) module = importlib.import_module(module_path, package=__package__) Storage = getattr(module, class_name) check_config = kwargs.pop("check_config", {}) storage = Storage(**kwargs) if check_config: if not storage.check_config(**check_config): raise EnvironmentError("storage check config failed") return storage def get_storage_pipeline( steps: List[Dict[str, Any]], check_config=None ) -> "StorageInterface": """Recursively get a storage object that may use other storage objects as backends. Args: steps (List[dict]): List of dicts that may be used as kwargs for `get_storage`. Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ storage_config = None for step in reversed(steps): if "args" in step: warnings.warn( 'Explicit "args" key is deprecated, use keys directly ' "instead.", DeprecationWarning, ) step = { "cls": step["cls"], **step["args"], } if storage_config: step["storage"] = storage_config step["check_config"] = check_config storage_config = step if storage_config is None: raise ValueError("'pipeline' has no steps.") return get_storage(**storage_config) diff --git a/swh/storage/proxies/counter.py b/swh/storage/proxies/counter.py new file mode 100644 index 00000000..40633b27 --- /dev/null +++ b/swh/storage/proxies/counter.py @@ -0,0 +1,66 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from typing import Callable + +from swh.counters import get_counters +from swh.counters.interface import CountersInterface +from swh.storage import get_storage +from swh.storage.interface import StorageInterface + +OBJECT_TYPES = [ + "content", + "directory", + "snapshot", + "origin_visit_status", + "origin_visit", + "origin", +] + + +class CountingProxyStorage: + """Counting Storage Proxy. + + This is in charge of adding objects directly to swh-counters, without + going through Kafka/swh-journal. + This is meant as a simple way to setup counters for experiments; production + should use swh-journal to reduce load/latency of the storage server. + + Additionally, unlike the journal-based counting, it does not count persons + or the number of origins per netloc. + + Sample configuration use case for filtering storage: + + .. code-block: yaml + + storage: + cls: counter + counters: + cls: remote + url: http://counters.internal.staging.swh.network:5011/ + storage: + cls: remote + url: http://storage.internal.staging.swh.network:5002/ + + """ + + def __init__(self, counters, storage): + self.counters: CountersInterface = get_counters(**counters) + self.storage: StorageInterface = get_storage(**storage) + + def __getattr__(self, key): + if key == "storage": + raise AttributeError(key) + if key.endswith("_add"): + return self._adder(key[0:-4], getattr(self.storage, key)) + return getattr(self.storage, key) + + def _adder(self, collection: str, backend_function: Callable): + def f(objs): + self.counters.add(collection, [obj.unique_key() for obj in objs]) + return backend_function(objs) + + return f diff --git a/swh/storage/tests/test_counter.py b/swh/storage/tests/test_counter.py new file mode 100644 index 00000000..c44adee9 --- /dev/null +++ b/swh/storage/tests/test_counter.py @@ -0,0 +1,63 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import attr +import pytest + +from swh.storage import get_storage + + +@pytest.fixture +def swh_storage(): + storage_config = { + "cls": "pipeline", + "steps": [ + {"cls": "counter", "counters": {"cls": "memory"}}, + {"cls": "memory"}, + ], + } + + return get_storage(**storage_config) + + +def test_counting_proxy_storage_content(swh_storage, sample_data): + assert swh_storage.counters.counters["content"] == set() + + swh_storage.content_add([sample_data.content]) + + assert swh_storage.counters.counters["content"] == {sample_data.content.sha1} + + swh_storage.content_add([sample_data.content2, sample_data.content3]) + + assert swh_storage.counters.counters["content"] == { + sample_data.content.sha1, + sample_data.content2.sha1, + sample_data.content3.sha1, + } + + assert [ + attr.evolve(cnt, ctime=None) + for cnt in swh_storage.content_find({"sha256": sample_data.content2.sha256}) + ] == [attr.evolve(sample_data.content2, data=None)] + + +def test_counting_proxy_storage_revision(swh_storage, sample_data): + assert swh_storage.counters.counters["revision"] == set() + + swh_storage.revision_add([sample_data.revision]) + + assert swh_storage.counters.counters["revision"] == {sample_data.revision.id} + + swh_storage.revision_add([sample_data.revision2, sample_data.revision3]) + + assert swh_storage.counters.counters["revision"] == { + sample_data.revision.id, + sample_data.revision2.id, + sample_data.revision3.id, + } + + assert swh_storage.revision_get([sample_data.revision2.id]) == [ + sample_data.revision2 + ]