Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9339258
D5584.id20139.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Subscribers
None
D5584.id20139.diff
View Options
diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py
--- a/swh/storage/cassandra/model.py
+++ b/swh/storage/cassandra/model.py
@@ -68,7 +68,12 @@
@dataclasses.dataclass
class ContentRow(BaseRow):
TABLE = "content"
- PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256")
+ PARTITION_KEY: ClassVar[Tuple[str, ...]] = (
+ "sha1",
+ "sha1_git",
+ "sha256",
+ "blake2s256",
+ )
sha1: bytes
sha1_git: bytes
diff --git a/swh/storage/tests/test_cassandra_migration.py b/swh/storage/tests/test_cassandra_migration.py
--- a/swh/storage/tests/test_cassandra_migration.py
+++ b/swh/storage/tests/test_cassandra_migration.py
@@ -34,6 +34,9 @@
swh_storage_backend_config,
)
+##############################
+# Common structures
+
def byte_xor_hash(data):
# Behold, a one-line hash function:
@@ -47,14 +50,22 @@
byte_xor = attr.ib(type=bytes, default=None)
+##############################
+# Test simple migrations
+
+
@dataclasses.dataclass
class ContentRowWithXor(ContentRow):
- """An hypothetical upgrade of ContentRow with an extra "hash"."""
+ """An hypothetical upgrade of ContentRow with an extra "hash",
+ but not in the primary key."""
byte_xor: bytes
class CqlRunnerWithXor(CqlRunner):
+ """An hypothetical upgrade of ContentRow with an extra "hash",
+ but not in the primary key."""
+
@_prepared_select_statement(
ContentRowWithXor,
f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}",
@@ -74,9 +85,12 @@
return None
@_prepared_select_statement(
- ContentRowWithXor, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?"
+ ContentRowWithXor,
+ f"WHERE token({', '.join(ContentRowWithXor.PARTITION_KEY)}) = ?",
)
- def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]:
+ def content_get_from_token(
+ self, token, *, statement
+ ) -> Iterable[ContentRowWithXor]:
return map(
ContentRowWithXor.from_dict, self._execute_with_retries(statement, [token])
)
@@ -161,3 +175,167 @@
assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [
attr.evolve(new_content, data=None)
]
+
+
+##############################
+# Test complex migrations
+
+
+@dataclasses.dataclass
+class ContentRowWithXorPK(ContentRow):
+ """An hypothetical upgrade of ContentRow with an extra "hash",
+ in the primary key."""
+
+ TABLE = "content_v2"
+ PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256", "byte_xor")
+
+ byte_xor: bytes
+
+
+class CqlRunnerWithXorPK(CqlRunner):
+ """An hypothetical upgrade of ContentRow with an extra "hash",
+ but not in the primary key."""
+
+ @_prepared_select_statement(
+ ContentRowWithXorPK,
+ f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))} AND byte_xor=?",
+ )
+ def content_get_from_pk(
+ self, content_hashes: Dict[str, bytes], *, statement
+ ) -> Optional[ContentRow]:
+ rows = list(
+ self._execute_with_retries(
+ statement,
+ [content_hashes[algo] for algo in HASH_ALGORITHMS + ["byte_xor"]],
+ )
+ )
+ assert len(rows) <= 1
+ if rows:
+ return ContentRowWithXorPK(**rows[0])
+ else:
+ return None
+
+ @_prepared_select_statement(
+ ContentRowWithXorPK,
+ f"WHERE token({', '.join(ContentRowWithXorPK.PARTITION_KEY)}) = ?",
+ )
+ def content_get_from_token(
+ self, token, *, statement
+ ) -> Iterable[ContentRowWithXorPK]:
+ return map(
+ ContentRowWithXorPK.from_dict,
+ self._execute_with_retries(statement, [token]),
+ )
+
+ # Redecorate content_add_prepare with the new ContentRow class
+ content_add_prepare = _prepared_insert_statement(ContentRowWithXorPK)( # type: ignore # noqa
+ CqlRunner.content_add_prepare.__wrapped__ # type: ignore
+ )
+
+
+def test_change_content_pk(
+ swh_storage: CassandraStorage, swh_storage_backend_config, mocker # noqa
+) -> None:
+ """Adds a column to the 'content' table and a new matching index;
+ and make this new column part of the primary key
+ This is a complex migration, as it requires copying the whole table
+ """
+ content_xor_hash = byte_xor_hash(StorageData.content.data)
+
+ # First insert some existing data
+ swh_storage.content_add([StorageData.content, StorageData.content2])
+
+ # Then add a new table and a new index
+ swh_storage._cql_runner._session.execute(
+ """
+ CREATE TABLE IF NOT EXISTS content_v2 (
+ sha1 blob,
+ sha1_git blob,
+ sha256 blob,
+ blake2s256 blob,
+ byte_xor blob,
+ length bigint,
+ ctime timestamp,
+ -- creation time, i.e. time of (first) injection into the storage
+ status ascii,
+ PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256, byte_xor))
+ );"""
+ )
+ for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"):
+ swh_storage._cql_runner._session.execute(statement.format(main_algo="byte_xor"))
+
+ # Should not affect the running code at all:
+ assert swh_storage.content_get([StorageData.content.sha1]) == [
+ attr.evolve(StorageData.content, data=None)
+ ]
+ with pytest.raises(StorageArgumentException):
+ swh_storage.content_find({"byte_xor": content_xor_hash})
+
+ # Then update the running code:
+ new_hash_algos = HASH_ALGORITHMS + ["byte_xor"]
+ mocker.patch("swh.storage.cassandra.storage.HASH_ALGORITHMS", new_hash_algos)
+ mocker.patch("swh.storage.cassandra.cql.HASH_ALGORITHMS", new_hash_algos)
+ mocker.patch("swh.model.model.DEFAULT_ALGORITHMS", new_hash_algos)
+ mocker.patch("swh.storage.cassandra.storage.Content", ContentWithXor)
+ mocker.patch("swh.storage.cassandra.storage.ContentRow", ContentRowWithXorPK)
+ mocker.patch("swh.storage.cassandra.model.ContentRow", ContentRowWithXorPK)
+ mocker.patch("swh.storage.cassandra.storage.CqlRunner", CqlRunnerWithXorPK)
+
+ # Forge new objects with this extra hash:
+ new_content = ContentWithXor.from_dict(
+ {
+ "byte_xor": byte_xor_hash(StorageData.content.data),
+ **StorageData.content.to_dict(),
+ }
+ )
+ new_content2 = ContentWithXor.from_dict(
+ {
+ "byte_xor": byte_xor_hash(StorageData.content2.data),
+ **StorageData.content2.to_dict(),
+ }
+ )
+
+ # Replay to the new table.
+ # In production this would be done with a replayer reading from the journal,
+ # while loaders would still write to the DB.
+ overwriting_swh_storage = get_storage(
+ allow_overwrite=True, **swh_storage_backend_config
+ )
+ overwriting_swh_storage.content_add([new_content, new_content2])
+
+ # Old algos still works, and return the new object type;
+ # but the byte_xor value is None because it is only available in the new
+ # table, which this storage is not yet configured to use
+ assert swh_storage.content_get([StorageData.content.sha1]) == [
+ attr.evolve(new_content, data=None, byte_xor=None)
+ ]
+
+ # When the replayer gets close to the end of the logs, loaders are stopped
+ # to allow the replayer to catch up with the end of the log.
+ # When it does, we can switch over to the new swh-storage's code.
+
+ # Simulates a restart:
+ swh_storage._set_cql_runner()
+
+ # Now, the object can be found with the new hash:
+ assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [
+ attr.evolve(new_content, data=None)
+ ]
+
+ # Remove the old table:
+ swh_storage._cql_runner._session.execute("DROP TABLE content")
+
+ # Object is still available, because we don't use it anymore
+ assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [
+ attr.evolve(new_content, data=None)
+ ]
+
+ # THE END.
+
+ # Test teardown expects a table with this name to exist:
+ swh_storage._cql_runner._session.execute(
+ "CREATE TABLE content (foo blob PRIMARY KEY);"
+ )
+
+ # Clean up this table, test teardown does not know about it:
+ swh_storage._cql_runner._session.execute("DROP TABLE content_v2;")
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Jul 3 2025, 9:33 AM (5 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3215886
Attached To
D5584: cassandra: Add a test of a 'complex' migration, with a PK update
Event Timeline
Log In to Comment