Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123947
D6150.id22256.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
28 KB
Subscribers
None
D6150.id22256.diff
View Options
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,3 +1,4 @@
swh.core[db,http] >= 0.14.0
+swh.counters >= v0.8.0
swh.model >= 2.1.0
swh.objstorage >= 0.2.2
diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -19,11 +19,12 @@
# deprecated
"local": ".postgresql.storage.Storage",
# proxy storages
- "filter": ".proxies.filter.FilteringProxyStorage",
"buffer": ".proxies.buffer.BufferingProxyStorage",
+ "counter": ".proxies.counter.CountingProxyStorage",
+ "filter": ".proxies.filter.FilteringProxyStorage",
"retry": ".proxies.retry.RetryingProxyStorage",
- "validate": ".proxies.validate.ValidatingProxyStorage",
"tenacious": ".proxies.tenacious.TenaciousProxyStorage",
+ "validate": ".proxies.validate.ValidatingProxyStorage",
}
diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -7,6 +7,7 @@
import dataclasses
import datetime
import functools
+import itertools
import logging
import random
from typing import (
@@ -17,6 +18,7 @@
Iterator,
List,
Optional,
+ Sequence,
Tuple,
Type,
TypeVar,
@@ -25,6 +27,7 @@
from cassandra import ConsistencyLevel, CoordinationFailure
from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet
+from cassandra.concurrent import execute_concurrent_with_args
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
from cassandra.query import BoundStatement, PreparedStatement, dict_factory
from mypy_extensions import NamedArg
@@ -59,7 +62,6 @@
ExtIDRow,
MetadataAuthorityRow,
MetadataFetcherRow,
- ObjectCountRow,
OriginRow,
OriginVisitRow,
OriginVisitStatusRow,
@@ -85,6 +87,8 @@
See <https://github.com/scylladb/scylla/pull/4797> for details and rationale.
"""
+BATCH_INSERT_MAX_SIZE = 1000
+
logger = logging.getLogger(__name__)
@@ -166,6 +170,14 @@
TSelf = TypeVar("TSelf")
+def _insert_query(row_class):
+ columns = row_class.cols()
+ return (
+ f"INSERT INTO {row_class.TABLE} ({', '.join(columns)}) "
+ f"VALUES ({', '.join('?' for _ in columns)})"
+ )
+
+
def _prepared_insert_statement(
row_class: Type[BaseRow],
) -> Callable[
@@ -174,11 +186,7 @@
]:
"""Shorthand for using `_prepared_statement` for `INSERT INTO`
statements."""
- columns = row_class.cols()
- return _prepared_statement(
- "INSERT INTO %s (%s) VALUES (%s)"
- % (row_class.TABLE, ", ".join(columns), ", ".join("?" for _ in columns),)
- )
+ return _prepared_statement(_insert_query(row_class))
def _prepared_exists_statement(
@@ -280,22 +288,28 @@
stop=stop_after_attempt(MAX_RETRIES),
retry=retry_if_exception_type(CoordinationFailure),
)
- def _execute_with_retries(self, statement, args) -> ResultSet:
+ def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet:
return self._session.execute(statement, args, timeout=1000.0)
- @_prepared_statement(
- "UPDATE object_count SET count = count + ? "
- "WHERE partition_key = 0 AND object_type = ?"
+ @retry(
+ wait=wait_random_exponential(multiplier=1, max=10),
+ stop=stop_after_attempt(MAX_RETRIES),
+ retry=retry_if_exception_type(CoordinationFailure),
)
- def _increment_counter(
- self, object_type: str, nb: int, *, statement: PreparedStatement
- ) -> None:
- self._execute_with_retries(statement, [nb, object_type])
+ def _execute_many_with_retries(
+ self, statement, args_list: List[Tuple]
+ ) -> ResultSet:
+ return execute_concurrent_with_args(self._session, statement, args_list)
def _add_one(self, statement, obj: BaseRow) -> None:
- self._increment_counter(obj.TABLE, 1)
self._execute_with_retries(statement, dataclasses.astuple(obj))
+ def _add_many(self, statement, objs: Sequence[BaseRow]) -> None:
+ tables = {obj.TABLE for obj in objs}
+ assert len(tables) == 1, f"Cannot insert to multiple tables: {tables}"
+ (table,) = tables
+ self._execute_many_with_retries(statement, list(map(dataclasses.astuple, objs)))
+
_T = TypeVar("_T", bound=BaseRow)
def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa
@@ -328,7 +342,6 @@
"""Returned currified by content_add_prepare, to be called when the
content row should be added to the primary table."""
self._execute_with_retries(statement, None)
- self._increment_counter("content", 1)
@_prepared_insert_statement(ContentRow)
def content_add_prepare(
@@ -482,7 +495,6 @@
"""Returned currified by skipped_content_add_prepare, to be called
when the content row should be added to the primary table."""
self._execute_with_retries(statement, None)
- self._increment_counter("skipped_content", 1)
@_prepared_insert_statement(SkippedContentRow)
def skipped_content_add_prepare(
@@ -677,6 +689,56 @@
def directory_entry_add_one(self, entry: DirectoryEntryRow, *, statement) -> None:
self._add_one(statement, entry)
+ @_prepared_insert_statement(DirectoryEntryRow)
+ def directory_entry_add_concurrent(
+ self, entries: List[DirectoryEntryRow], *, statement
+ ) -> None:
+ if len(entries) == 0:
+ # nothing to do
+ return
+ assert (
+ len({entry.directory_id for entry in entries}) == 1
+ ), "directory_entry_add_many must be called with entries for a single dir"
+ self._add_many(statement, entries)
+
+ def directory_entry_add_batch(self, entries: List[DirectoryEntryRow],) -> None:
+ if len(entries) == 0:
+ # nothing to do
+ return
+ assert (
+ len({entry.directory_id for entry in entries}) == 1
+ ), "directory_entry_add_many must be called with entries for a single dir"
+
+ # query to INSERT one row
+ insert_query = _insert_query(DirectoryEntryRow) + ";\n"
+
+ # In "steady state", we insert batches of the maximum allowed size.
+ # Then, the last one has however many entries remain.
+ last_batch_size = len(entries) % BATCH_INSERT_MAX_SIZE
+ if len(entries) >= BATCH_INSERT_MAX_SIZE:
+ # TODO: the main_statement's size is statically known, so we could avoid
+ # re-preparing it on every call
+ main_statement = self._session.prepare(
+ "BEGIN UNLOGGED BATCH\n"
+ + insert_query * BATCH_INSERT_MAX_SIZE
+ + "APPLY BATCH"
+ )
+ last_statement = self._session.prepare(
+ "BEGIN UNLOGGED BATCH\n" + insert_query * last_batch_size + "APPLY BATCH"
+ )
+
+ for entry_group in grouper(entries, BATCH_INSERT_MAX_SIZE):
+ entry_group = list(map(dataclasses.astuple, entry_group))
+ if len(entry_group) == BATCH_INSERT_MAX_SIZE:
+ self._execute_with_retries(
+ main_statement, list(itertools.chain.from_iterable(entry_group))
+ )
+ else:
+ assert len(entry_group) == last_batch_size
+ self._execute_with_retries(
+ last_statement, list(itertools.chain.from_iterable(entry_group))
+ )
+
@_prepared_select_statement(DirectoryEntryRow, "WHERE directory_id IN ?")
def directory_entry_get(
self, directory_ids, *, statement
@@ -1219,7 +1281,6 @@
"""Returned currified by extid_add_prepare, to be called when the
extid row should be added to the primary table."""
self._execute_with_retries(statement, None)
- self._increment_counter("extid", 1)
@_prepared_insert_statement(ExtIDRow)
def extid_add_prepare(
@@ -1331,7 +1392,3 @@
@_prepared_statement("SELECT uuid() FROM revision LIMIT 1;")
def check_read(self, *, statement):
self._execute_with_retries(statement, [])
-
- @_prepared_select_statement(ObjectCountRow, "WHERE partition_key=0")
- def stat_counters(self, *, statement) -> Iterable[ObjectCountRow]:
- return map(ObjectCountRow.from_dict, self._execute_with_retries(statement, []))
diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py
--- a/swh/storage/cassandra/model.py
+++ b/swh/storage/cassandra/model.py
@@ -300,17 +300,6 @@
authority_url: str
-@dataclasses.dataclass
-class ObjectCountRow(BaseRow):
- TABLE = "object_count"
- PARTITION_KEY = ("partition_key",)
- CLUSTERING_KEY = ("object_type",)
-
- partition_key: int
- object_type: str
- count: int
-
-
@dataclasses.dataclass
class ExtIDRow(BaseRow):
TABLE = "extid"
diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py
--- a/swh/storage/cassandra/schema.py
+++ b/swh/storage/cassandra/schema.py
@@ -267,13 +267,6 @@
PRIMARY KEY ((id))
);""",
"""
-CREATE TABLE IF NOT EXISTS object_count (
- partition_key smallint, -- Constant, must always be 0
- object_type ascii,
- count counter,
- PRIMARY KEY ((partition_key), object_type)
-);""",
- """
CREATE TABLE IF NOT EXISTS extid (
extid_type ascii,
extid blob,
@@ -319,7 +312,6 @@
"origin_visit",
"origin",
"raw_extrinsic_metadata",
- "object_count",
"origin_visit_status",
"metadata_authority",
"metadata_fetcher",
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -100,6 +100,7 @@
journal_writer=None,
allow_overwrite=False,
consistency_level="ONE",
+ directory_entries_insert_algo="one-by-one",
):
"""
A backend of swh-storage backed by Cassandra
@@ -120,6 +121,10 @@
Note that a ``False`` value does not guarantee there won't be
any overwrite.
consistency_level: The default read/write consistency to use
+ directory_entries_insert_algo: Must be one of:
+ * one-by-one: naive, one INSERT per directory entry, serialized
+ * concurrent: one INSERT per directory entry, concurrent
+ * batch: using UNLOGGED BATCH to insert many entries in a few statements
"""
self._hosts = hosts
self._keyspace = keyspace
@@ -129,6 +134,7 @@
self.journal_writer: JournalWriter = JournalWriter(journal_writer)
self.objstorage: ObjStorage = ObjStorage(objstorage)
self._allow_overwrite = allow_overwrite
+ self._directory_entries_insert_algo = directory_entries_insert_algo
def _set_cql_runner(self):
"""Used by tests when they need to reset the CqlRunner"""
@@ -438,9 +444,21 @@
for directory in directories:
# Add directory entries to the 'directory_entry' table
- for entry in directory.entries:
- self._cql_runner.directory_entry_add_one(
- DirectoryEntryRow(directory_id=directory.id, **entry.to_dict())
+ rows = [
+ DirectoryEntryRow(directory_id=directory.id, **entry.to_dict())
+ for entry in directory.entries
+ ]
+ if self._directory_entries_insert_algo == "one-by-one":
+ for row in rows:
+ self._cql_runner.directory_entry_add_one(row)
+ elif self._directory_entries_insert_algo == "concurrent":
+ self._cql_runner.directory_entry_add_concurrent(rows)
+ elif self._directory_entries_insert_algo == "batch":
+ self._cql_runner.directory_entry_add_batch(rows)
+ else:
+ raise ValueError(
+ f"Unexpected value for directory_entries_insert_algo: "
+ f"{self._directory_entries_insert_algo}"
)
# Add the directory *after* adding all the entries, so someone
@@ -1272,20 +1290,7 @@
return None
def stat_counters(self):
- rows = self._cql_runner.stat_counters()
- keys = (
- "content",
- "directory",
- "origin",
- "origin_visit",
- "release",
- "revision",
- "skipped_content",
- "snapshot",
- )
- stats = {key: 0 for key in keys}
- stats.update({row.object_type: row.count for row in rows})
- return stats
+ raise NotImplementedError()
def refresh_stat_counters(self):
pass
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -34,7 +34,6 @@
ExtIDRow,
MetadataAuthorityRow,
MetadataFetcherRow,
- ObjectCountRow,
OriginRow,
OriginVisitRow,
OriginVisitStatusRow,
@@ -171,14 +170,6 @@
self._raw_extrinsic_metadata = Table(RawExtrinsicMetadataRow)
self._raw_extrinsic_metadata_by_id = Table(RawExtrinsicMetadataByIdRow)
self._extid = Table(ExtIDRow)
- self._stat_counters = defaultdict(int)
-
- def increment_counter(self, object_type: str, nb: int):
- self._stat_counters[object_type] += nb
-
- def stat_counters(self) -> Iterable[ObjectCountRow]:
- for (object_type, count) in self._stat_counters.items():
- yield ObjectCountRow(partition_key=0, object_type=object_type, count=count)
##########################
# 'content' table
@@ -186,7 +177,6 @@
def _content_add_finalize(self, content: ContentRow) -> None:
self._contents.insert(content)
- self.increment_counter("content", 1)
def content_add_prepare(self, content: ContentRow):
finalizer = functools.partial(self._content_add_finalize, content)
@@ -248,7 +238,6 @@
def _skipped_content_add_finalize(self, content: SkippedContentRow) -> None:
self._skipped_contents.insert(content)
- self.increment_counter("skipped_content", 1)
def skipped_content_add_prepare(self, content: SkippedContentRow):
finalizer = functools.partial(self._skipped_content_add_finalize, content)
@@ -293,7 +282,6 @@
def directory_add_one(self, directory: DirectoryRow) -> None:
self._directories.insert(directory)
- self.increment_counter("directory", 1)
def directory_get_random(self) -> Optional[DirectoryRow]:
return self._directories.get_random()
@@ -334,7 +322,6 @@
def revision_add_one(self, revision: RevisionRow) -> None:
self._revisions.insert(revision)
- self.increment_counter("revision", 1)
def revision_get_ids(self, revision_ids) -> Iterable[int]:
for id_ in revision_ids:
@@ -374,7 +361,6 @@
def release_add_one(self, release: ReleaseRow) -> None:
self._releases.insert(release)
- self.increment_counter("release", 1)
def release_get(self, release_ids: List[str]) -> Iterable[ReleaseRow]:
for id_ in release_ids:
@@ -398,7 +384,6 @@
def snapshot_add_one(self, snapshot: SnapshotRow) -> None:
self._snapshots.insert(snapshot)
- self.increment_counter("snapshot", 1)
def snapshot_get_random(self) -> Optional[SnapshotRow]:
return self._snapshots.get_random()
@@ -452,7 +437,6 @@
def origin_add_one(self, origin: OriginRow) -> None:
self._origins.insert(origin)
- self.increment_counter("origin", 1)
def origin_get_by_sha1(self, sha1: bytes) -> Iterable[OriginRow]:
return self._origins.get_from_partition_key((sha1,))
@@ -513,7 +497,6 @@
def origin_visit_add_one(self, visit: OriginVisitRow) -> None:
self._origin_visits.insert(visit)
- self.increment_counter("origin_visit", 1)
def origin_visit_get_one(
self, origin_url: str, visit_id: int
@@ -558,7 +541,6 @@
def origin_visit_status_add_one(self, visit_update: OriginVisitStatusRow) -> None:
self._origin_visit_statuses.insert(visit_update)
- self.increment_counter("origin_visit_status", 1)
def origin_visit_status_get_latest(
self, origin: str, visit: int,
@@ -588,7 +570,6 @@
def metadata_authority_add(self, authority: MetadataAuthorityRow):
self._metadata_authorities.insert(authority)
- self.increment_counter("metadata_authority", 1)
def metadata_authority_get(self, type, url) -> Optional[MetadataAuthorityRow]:
return self._metadata_authorities.get_from_primary_key((url, type))
@@ -599,7 +580,6 @@
def metadata_fetcher_add(self, fetcher: MetadataFetcherRow):
self._metadata_fetchers.insert(fetcher)
- self.increment_counter("metadata_fetcher", 1)
def metadata_fetcher_get(self, name, version) -> Optional[MetadataAuthorityRow]:
return self._metadata_fetchers.get_from_primary_key((name, version))
@@ -629,7 +609,6 @@
def raw_extrinsic_metadata_add(self, raw_extrinsic_metadata):
self._raw_extrinsic_metadata.insert(raw_extrinsic_metadata)
- self.increment_counter("raw_extrinsic_metadata", 1)
def raw_extrinsic_metadata_get_after_date(
self,
@@ -682,7 +661,6 @@
#########################
def _extid_add_finalize(self, extid: ExtIDRow) -> None:
self._extid.insert(extid)
- self.increment_counter("extid", 1)
def extid_add_prepare(self, extid: ExtIDRow):
finalizer = functools.partial(self._extid_add_finalize, extid)
@@ -729,6 +707,7 @@
self.reset()
self.journal_writer = JournalWriter(journal_writer)
self._allow_overwrite = False
+ self._directory_entries_insert_algo = "one-by-one"
def reset(self):
self._cql_runner = InMemoryCqlRunner()
diff --git a/swh/storage/proxies/counter.py b/swh/storage/proxies/counter.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/proxies/counter.py
@@ -0,0 +1,66 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from typing import Callable
+
+from swh.counters import get_counters
+from swh.counters.interface import CountersInterface
+from swh.storage import get_storage
+from swh.storage.interface import StorageInterface
+
+OBJECT_TYPES = [
+ "content",
+ "directory",
+ "snapshot",
+ "origin_visit_status",
+ "origin_visit",
+ "origin",
+]
+
+
+class CountingProxyStorage:
+ """Counting Storage Proxy.
+
+ This is in charge of adding objects directly to swh-counters, without
+ going through Kafka/swh-journal.
+ This is meant as a simple way to setup counters for experiments; production
+ should use swh-journal to reduce load/latency of the storage server.
+
+ Additionally, unlike the journal-based counting, it does not count persons
+ or the number of origins per netloc.
+
+ Sample configuration use case for filtering storage:
+
+ .. code-block: yaml
+
+ storage:
+ cls: counter
+ counters:
+ cls: remote
+ url: http://counters.internal.staging.swh.network:5011/
+ storage:
+ cls: remote
+ url: http://storage.internal.staging.swh.network:5002/
+
+ """
+
+ def __init__(self, counters, storage):
+ self.counters: CountersInterface = get_counters(**counters)
+ self.storage: StorageInterface = get_storage(**storage)
+
+ def __getattr__(self, key):
+ if key == "storage":
+ raise AttributeError(key)
+ if key.endswith("_add"):
+ return self._adder(key[0:-4], getattr(self.storage, key))
+ return getattr(self.storage, key)
+
+ def _adder(self, collection: str, backend_function: Callable):
+ def f(objs):
+ self.counters.add(collection, [obj.unique_key() for obj in objs])
+ return backend_function(objs)
+
+ return f
diff --git a/swh/storage/tests/storage_tests.py b/swh/storage/tests/storage_tests.py
--- a/swh/storage/tests/storage_tests.py
+++ b/swh/storage/tests/storage_tests.py
@@ -38,6 +38,8 @@
TargetType,
)
from swh.storage import get_storage
+from swh.storage.api.client import RemoteStorage
+from swh.storage.cassandra.storage import CassandraStorage
from swh.storage.common import origin_url_to_sha1 as sha1
from swh.storage.exc import HashCollision, StorageArgumentException
from swh.storage.interface import ListOrder, PagedResult, StorageInterface
@@ -187,8 +189,9 @@
assert obj.ctime <= insertion_end_time
assert obj == expected_cont
- swh_storage.refresh_stat_counters()
- assert swh_storage.stat_counters()["content"] == 1
+ if not isinstance(swh_storage, (CassandraStorage, RemoteStorage)):
+ swh_storage.refresh_stat_counters()
+ assert swh_storage.stat_counters()["content"] == 1
def test_content_add_from_lazy_content(self, swh_storage, sample_data):
cont = sample_data.content
@@ -221,8 +224,9 @@
assert obj.ctime <= insertion_end_time
assert attr.evolve(obj, ctime=None).to_dict() == expected_cont.to_dict()
- swh_storage.refresh_stat_counters()
- assert swh_storage.stat_counters()["content"] == 1
+ if not isinstance(swh_storage, (CassandraStorage, RemoteStorage)):
+ swh_storage.refresh_stat_counters()
+ assert swh_storage.stat_counters()["content"] == 1
def test_content_get_data_missing(self, swh_storage, sample_data):
cont, cont2 = sample_data.contents[:2]
@@ -705,8 +709,9 @@
after_missing = list(swh_storage.directory_missing([directory.id]))
assert after_missing == []
- swh_storage.refresh_stat_counters()
- assert swh_storage.stat_counters()["directory"] == 1
+ if not isinstance(swh_storage, (CassandraStorage, RemoteStorage)):
+ swh_storage.refresh_stat_counters()
+ assert swh_storage.stat_counters()["directory"] == 1
def test_directory_add_twice(self, swh_storage, sample_data):
directory = sample_data.directories[1]
@@ -975,8 +980,9 @@
actual_result = swh_storage.revision_add([revision])
assert actual_result == {"revision:add": 0}
- swh_storage.refresh_stat_counters()
- assert swh_storage.stat_counters()["revision"] == 1
+ if not isinstance(swh_storage, (CassandraStorage, RemoteStorage)):
+ swh_storage.refresh_stat_counters()
+ assert swh_storage.stat_counters()["revision"] == 1
def test_revision_add_twice(self, swh_storage, sample_data):
revision, revision2 = sample_data.revisions[:2]
@@ -1376,8 +1382,9 @@
actual_result = swh_storage.release_add([release, release2])
assert actual_result == {"release:add": 0}
- swh_storage.refresh_stat_counters()
- assert swh_storage.stat_counters()["release"] == 2
+ if not isinstance(swh_storage, (CassandraStorage, RemoteStorage)):
+ swh_storage.refresh_stat_counters()
+ assert swh_storage.stat_counters()["release"] == 2
def test_release_add_no_author_date(self, swh_storage, sample_data):
full_release = sample_data.release
@@ -1482,8 +1489,9 @@
[("origin", origin) for origin in origins]
)
- swh_storage.refresh_stat_counters()
- assert swh_storage.stat_counters()["origin"] == len(origins)
+ if not isinstance(swh_storage, (CassandraStorage, RemoteStorage)):
+ swh_storage.refresh_stat_counters()
+ assert swh_storage.stat_counters()["origin"] == len(origins)
def test_origin_add_twice(self, swh_storage, sample_data):
origin, origin2 = sample_data.origins[:2]
@@ -1921,11 +1929,11 @@
]
)
- swh_storage.refresh_stat_counters()
-
- stats = swh_storage.stat_counters()
- assert stats["origin"] == len(origins)
- assert stats["origin_visit"] == len(origins) * len(visits)
+ if not isinstance(swh_storage, (CassandraStorage, RemoteStorage)):
+ swh_storage.refresh_stat_counters()
+ stats = swh_storage.stat_counters()
+ assert stats["origin"] == len(origins)
+ assert stats["origin_visit"] == len(origins) * len(visits)
random_ovs = swh_storage.origin_visit_status_get_random(visit_type)
assert random_ovs
@@ -3122,8 +3130,9 @@
"next_branch": None,
}
- swh_storage.refresh_stat_counters()
- assert swh_storage.stat_counters()["snapshot"] == 2
+ if not isinstance(swh_storage, (CassandraStorage, RemoteStorage)):
+ swh_storage.refresh_stat_counters()
+ assert swh_storage.stat_counters()["snapshot"] == 2
def test_snapshot_add_many_incremental(self, swh_storage, sample_data):
snapshot, _, complete_snapshot = sample_data.snapshots[:3]
@@ -3623,6 +3632,8 @@
assert list(missing_snapshots) == [missing_snapshot.id]
def test_stat_counters(self, swh_storage, sample_data):
+ if isinstance(swh_storage, (CassandraStorage, RemoteStorage)):
+ pytest.skip("Cassandra backend does not support stat counters")
origin = sample_data.origin
snapshot = sample_data.snapshot
revision = sample_data.revision
diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py
--- a/swh/storage/tests/test_cassandra.py
+++ b/swh/storage/tests/test_cassandra.py
@@ -494,21 +494,18 @@
# early for this test to be relevant
swh_storage.journal_writer.journal = None
- class MyException(Exception):
- pass
-
class CrashyEntry(DirectoryEntry):
def __init__(self):
pass
def to_dict(self):
- raise MyException()
+ return {**directory.entries[0].to_dict(), "perms": "abcde"}
directory = sample_data.directory3
entries = directory.entries
directory = attr.evolve(directory, entries=entries + (CrashyEntry(),))
- with pytest.raises(MyException):
+ with pytest.raises(TypeError):
swh_storage.directory_add([directory])
# This should have written some of the entries to the database:
diff --git a/swh/storage/tests/test_counter.py b/swh/storage/tests/test_counter.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_counter.py
@@ -0,0 +1,63 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import attr
+import pytest
+
+from swh.storage import get_storage
+
+
+@pytest.fixture
+def swh_storage():
+ storage_config = {
+ "cls": "pipeline",
+ "steps": [
+ {"cls": "counter", "counters": {"cls": "memory"}},
+ {"cls": "memory"},
+ ],
+ }
+
+ return get_storage(**storage_config)
+
+
+def test_couting_proxy_storage_content(swh_storage, sample_data):
+ assert swh_storage.counters.counters["content"] == set()
+
+ swh_storage.content_add([sample_data.content])
+
+ assert swh_storage.counters.counters["content"] == {sample_data.content.sha1}
+
+ swh_storage.content_add([sample_data.content2, sample_data.content3])
+
+ assert swh_storage.counters.counters["content"] == {
+ sample_data.content.sha1,
+ sample_data.content2.sha1,
+ sample_data.content3.sha1,
+ }
+
+ assert [
+ attr.evolve(cnt, ctime=None)
+ for cnt in swh_storage.content_find({"sha256": sample_data.content2.sha256})
+ ] == [attr.evolve(sample_data.content2, data=None)]
+
+
+def test_couting_proxy_storage_revision(swh_storage, sample_data):
+ assert swh_storage.counters.counters["revision"] == set()
+
+ swh_storage.revision_add([sample_data.revision])
+
+ assert swh_storage.counters.counters["revision"] == {sample_data.revision.id}
+
+ swh_storage.revision_add([sample_data.revision2, sample_data.revision3])
+
+ assert swh_storage.counters.counters["revision"] == {
+ sample_data.revision.id,
+ sample_data.revision2.id,
+ sample_data.revision3.id,
+ }
+
+ assert swh_storage.revision_get([sample_data.revision2.id]) == [
+ sample_data.revision2
+ ]
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 20 2024, 7:00 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227525
Attached To
D6150: cassandra: Remove stat_counters.
Event Timeline
Log In to Comment