Page MenuHomeSoftware Heritage

D6149.id22271.diff
No OneTemporary

D6149.id22271.diff

diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,3 +1,4 @@
swh.core[db,http] >= 0.14.0
+swh.counters >= v0.8.0
swh.model >= 2.1.0
swh.objstorage >= 0.2.2
diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -19,11 +19,12 @@
# deprecated
"local": ".postgresql.storage.Storage",
# proxy storages
- "filter": ".proxies.filter.FilteringProxyStorage",
"buffer": ".proxies.buffer.BufferingProxyStorage",
+ "counter": ".proxies.counter.CountingProxyStorage",
+ "filter": ".proxies.filter.FilteringProxyStorage",
"retry": ".proxies.retry.RetryingProxyStorage",
- "validate": ".proxies.validate.ValidatingProxyStorage",
"tenacious": ".proxies.tenacious.TenaciousProxyStorage",
+ "validate": ".proxies.validate.ValidatingProxyStorage",
}
diff --git a/swh/storage/proxies/counter.py b/swh/storage/proxies/counter.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/proxies/counter.py
@@ -0,0 +1,66 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from typing import Callable
+
+from swh.counters import get_counters
+from swh.counters.interface import CountersInterface
+from swh.storage import get_storage
+from swh.storage.interface import StorageInterface
+
+OBJECT_TYPES = [
+ "content",
+ "directory",
+ "snapshot",
+ "origin_visit_status",
+ "origin_visit",
+ "origin",
+]
+
+
+class CountingProxyStorage:
+ """Counting Storage Proxy.
+
+ This is in charge of adding objects directly to swh-counters, without
+ going through Kafka/swh-journal.
+ This is meant as a simple way to setup counters for experiments; production
+ should use swh-journal to reduce load/latency of the storage server.
+
+ Additionally, unlike the journal-based counting, it does not count persons
+ or the number of origins per netloc.
+
+ Sample configuration use case for filtering storage:
+
+ .. code-block: yaml
+
+ storage:
+ cls: counter
+ counters:
+ cls: remote
+ url: http://counters.internal.staging.swh.network:5011/
+ storage:
+ cls: remote
+ url: http://storage.internal.staging.swh.network:5002/
+
+ """
+
+ def __init__(self, counters, storage):
+ self.counters: CountersInterface = get_counters(**counters)
+ self.storage: StorageInterface = get_storage(**storage)
+
+ def __getattr__(self, key):
+ if key == "storage":
+ raise AttributeError(key)
+ if key.endswith("_add"):
+ return self._adder(key[0:-4], getattr(self.storage, key))
+ return getattr(self.storage, key)
+
+ def _adder(self, collection: str, backend_function: Callable):
+ def f(objs):
+ self.counters.add(collection, [obj.unique_key() for obj in objs])
+ return backend_function(objs)
+
+ return f
diff --git a/swh/storage/tests/test_counter.py b/swh/storage/tests/test_counter.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_counter.py
@@ -0,0 +1,63 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import attr
+import pytest
+
+from swh.storage import get_storage
+
+
+@pytest.fixture
+def swh_storage():
+ storage_config = {
+ "cls": "pipeline",
+ "steps": [
+ {"cls": "counter", "counters": {"cls": "memory"}},
+ {"cls": "memory"},
+ ],
+ }
+
+ return get_storage(**storage_config)
+
+
+def test_counting_proxy_storage_content(swh_storage, sample_data):
+ assert swh_storage.counters.counters["content"] == set()
+
+ swh_storage.content_add([sample_data.content])
+
+ assert swh_storage.counters.counters["content"] == {sample_data.content.sha1}
+
+ swh_storage.content_add([sample_data.content2, sample_data.content3])
+
+ assert swh_storage.counters.counters["content"] == {
+ sample_data.content.sha1,
+ sample_data.content2.sha1,
+ sample_data.content3.sha1,
+ }
+
+ assert [
+ attr.evolve(cnt, ctime=None)
+ for cnt in swh_storage.content_find({"sha256": sample_data.content2.sha256})
+ ] == [attr.evolve(sample_data.content2, data=None)]
+
+
+def test_counting_proxy_storage_revision(swh_storage, sample_data):
+ assert swh_storage.counters.counters["revision"] == set()
+
+ swh_storage.revision_add([sample_data.revision])
+
+ assert swh_storage.counters.counters["revision"] == {sample_data.revision.id}
+
+ swh_storage.revision_add([sample_data.revision2, sample_data.revision3])
+
+ assert swh_storage.counters.counters["revision"] == {
+ sample_data.revision.id,
+ sample_data.revision2.id,
+ sample_data.revision3.id,
+ }
+
+ assert swh_storage.revision_get([sample_data.revision2.id]) == [
+ sample_data.revision2
+ ]

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 4:02 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3215632

Event Timeline