diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,4 @@ swh.core[db,http] >= 0.14.0 +swh.counters >= v0.8.0 swh.model >= 2.1.0 swh.objstorage >= 0.2.2 diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -19,11 +19,12 @@ # deprecated "local": ".postgresql.storage.Storage", # proxy storages - "filter": ".proxies.filter.FilteringProxyStorage", "buffer": ".proxies.buffer.BufferingProxyStorage", + "counter": ".proxies.counter.CountingProxyStorage", + "filter": ".proxies.filter.FilteringProxyStorage", "retry": ".proxies.retry.RetryingProxyStorage", - "validate": ".proxies.validate.ValidatingProxyStorage", "tenacious": ".proxies.tenacious.TenaciousProxyStorage", + "validate": ".proxies.validate.ValidatingProxyStorage", } diff --git a/swh/storage/proxies/counter.py b/swh/storage/proxies/counter.py new file mode 100644 --- /dev/null +++ b/swh/storage/proxies/counter.py @@ -0,0 +1,66 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from typing import Callable + +from swh.counters import get_counters +from swh.counters.interface import CountersInterface +from swh.storage import get_storage +from swh.storage.interface import StorageInterface + +OBJECT_TYPES = [ + "content", + "directory", + "snapshot", + "origin_visit_status", + "origin_visit", + "origin", +] + + +class CountingProxyStorage: + """Counting Storage Proxy. + + This is in charge of adding objects directly to swh-counters, without + going through Kafka/swh-journal. + This is meant as a simple way to setup counters for experiments; production + should use swh-journal to reduce load/latency of the storage server. + + Additionally, unlike the journal-based counting, it does not count persons + or the number of origins per netloc. + + Sample configuration use case for filtering storage: + + .. code-block: yaml + + storage: + cls: counter + counters: + cls: remote + url: http://counters.internal.staging.swh.network:5011/ + storage: + cls: remote + url: http://storage.internal.staging.swh.network:5002/ + + """ + + def __init__(self, counters, storage): + self.counters: CountersInterface = get_counters(**counters) + self.storage: StorageInterface = get_storage(**storage) + + def __getattr__(self, key): + if key == "storage": + raise AttributeError(key) + if key.endswith("_add"): + return self._adder(key[0:-4], getattr(self.storage, key)) + return getattr(self.storage, key) + + def _adder(self, collection: str, backend_function: Callable): + def f(objs): + self.counters.add(collection, [obj.unique_key() for obj in objs]) + return backend_function(objs) + + return f diff --git a/swh/storage/tests/test_counter.py b/swh/storage/tests/test_counter.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_counter.py @@ -0,0 +1,63 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import attr +import pytest + +from swh.storage import get_storage + + +@pytest.fixture +def swh_storage(): + storage_config = { + "cls": "pipeline", + "steps": [ + {"cls": "counter", "counters": {"cls": "memory"}}, + {"cls": "memory"}, + ], + } + + return get_storage(**storage_config) + + +def test_counting_proxy_storage_content(swh_storage, sample_data): + assert swh_storage.counters.counters["content"] == set() + + swh_storage.content_add([sample_data.content]) + + assert swh_storage.counters.counters["content"] == {sample_data.content.sha1} + + swh_storage.content_add([sample_data.content2, sample_data.content3]) + + assert swh_storage.counters.counters["content"] == { + sample_data.content.sha1, + sample_data.content2.sha1, + sample_data.content3.sha1, + } + + assert [ + attr.evolve(cnt, ctime=None) + for cnt in swh_storage.content_find({"sha256": sample_data.content2.sha256}) + ] == [attr.evolve(sample_data.content2, data=None)] + + +def test_counting_proxy_storage_revision(swh_storage, sample_data): + assert swh_storage.counters.counters["revision"] == set() + + swh_storage.revision_add([sample_data.revision]) + + assert swh_storage.counters.counters["revision"] == {sample_data.revision.id} + + swh_storage.revision_add([sample_data.revision2, sample_data.revision3]) + + assert swh_storage.counters.counters["revision"] == { + sample_data.revision.id, + sample_data.revision2.id, + sample_data.revision3.id, + } + + assert swh_storage.revision_get([sample_data.revision2.id]) == [ + sample_data.revision2 + ]