Page MenuHomeSoftware Heritage

D5584.id20139.diff
No OneTemporary

D5584.id20139.diff

diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py
--- a/swh/storage/cassandra/model.py
+++ b/swh/storage/cassandra/model.py
@@ -68,7 +68,12 @@
@dataclasses.dataclass
class ContentRow(BaseRow):
TABLE = "content"
- PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256")
+ PARTITION_KEY: ClassVar[Tuple[str, ...]] = (
+ "sha1",
+ "sha1_git",
+ "sha256",
+ "blake2s256",
+ )
sha1: bytes
sha1_git: bytes
diff --git a/swh/storage/tests/test_cassandra_migration.py b/swh/storage/tests/test_cassandra_migration.py
--- a/swh/storage/tests/test_cassandra_migration.py
+++ b/swh/storage/tests/test_cassandra_migration.py
@@ -34,6 +34,9 @@
swh_storage_backend_config,
)
+##############################
+# Common structures
+
def byte_xor_hash(data):
# Behold, a one-line hash function:
@@ -47,14 +50,22 @@
byte_xor = attr.ib(type=bytes, default=None)
+##############################
+# Test simple migrations
+
+
@dataclasses.dataclass
class ContentRowWithXor(ContentRow):
- """An hypothetical upgrade of ContentRow with an extra "hash"."""
+ """An hypothetical upgrade of ContentRow with an extra "hash",
+ but not in the primary key."""
byte_xor: bytes
class CqlRunnerWithXor(CqlRunner):
+ """An hypothetical upgrade of ContentRow with an extra "hash",
+ but not in the primary key."""
+
@_prepared_select_statement(
ContentRowWithXor,
f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}",
@@ -74,9 +85,12 @@
return None
@_prepared_select_statement(
- ContentRowWithXor, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?"
+ ContentRowWithXor,
+ f"WHERE token({', '.join(ContentRowWithXor.PARTITION_KEY)}) = ?",
)
- def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]:
+ def content_get_from_token(
+ self, token, *, statement
+ ) -> Iterable[ContentRowWithXor]:
return map(
ContentRowWithXor.from_dict, self._execute_with_retries(statement, [token])
)
@@ -161,3 +175,167 @@
assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [
attr.evolve(new_content, data=None)
]
+
+
+##############################
+# Test complex migrations
+
+
+@dataclasses.dataclass
+class ContentRowWithXorPK(ContentRow):
+ """An hypothetical upgrade of ContentRow with an extra "hash",
+ in the primary key."""
+
+ TABLE = "content_v2"
+ PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256", "byte_xor")
+
+ byte_xor: bytes
+
+
+class CqlRunnerWithXorPK(CqlRunner):
+ """An hypothetical upgrade of ContentRow with an extra "hash",
+ but not in the primary key."""
+
+ @_prepared_select_statement(
+ ContentRowWithXorPK,
+ f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))} AND byte_xor=?",
+ )
+ def content_get_from_pk(
+ self, content_hashes: Dict[str, bytes], *, statement
+ ) -> Optional[ContentRow]:
+ rows = list(
+ self._execute_with_retries(
+ statement,
+ [content_hashes[algo] for algo in HASH_ALGORITHMS + ["byte_xor"]],
+ )
+ )
+ assert len(rows) <= 1
+ if rows:
+ return ContentRowWithXorPK(**rows[0])
+ else:
+ return None
+
+ @_prepared_select_statement(
+ ContentRowWithXorPK,
+ f"WHERE token({', '.join(ContentRowWithXorPK.PARTITION_KEY)}) = ?",
+ )
+ def content_get_from_token(
+ self, token, *, statement
+ ) -> Iterable[ContentRowWithXorPK]:
+ return map(
+ ContentRowWithXorPK.from_dict,
+ self._execute_with_retries(statement, [token]),
+ )
+
+ # Redecorate content_add_prepare with the new ContentRow class
+ content_add_prepare = _prepared_insert_statement(ContentRowWithXorPK)( # type: ignore # noqa
+ CqlRunner.content_add_prepare.__wrapped__ # type: ignore
+ )
+
+
+def test_change_content_pk(
+ swh_storage: CassandraStorage, swh_storage_backend_config, mocker # noqa
+) -> None:
+ """Adds a column to the 'content' table and a new matching index;
+ and make this new column part of the primary key
+ This is a complex migration, as it requires copying the whole table
+ """
+ content_xor_hash = byte_xor_hash(StorageData.content.data)
+
+ # First insert some existing data
+ swh_storage.content_add([StorageData.content, StorageData.content2])
+
+ # Then add a new table and a new index
+ swh_storage._cql_runner._session.execute(
+ """
+ CREATE TABLE IF NOT EXISTS content_v2 (
+ sha1 blob,
+ sha1_git blob,
+ sha256 blob,
+ blake2s256 blob,
+ byte_xor blob,
+ length bigint,
+ ctime timestamp,
+ -- creation time, i.e. time of (first) injection into the storage
+ status ascii,
+ PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256, byte_xor))
+ );"""
+ )
+ for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"):
+ swh_storage._cql_runner._session.execute(statement.format(main_algo="byte_xor"))
+
+ # Should not affect the running code at all:
+ assert swh_storage.content_get([StorageData.content.sha1]) == [
+ attr.evolve(StorageData.content, data=None)
+ ]
+ with pytest.raises(StorageArgumentException):
+ swh_storage.content_find({"byte_xor": content_xor_hash})
+
+ # Then update the running code:
+ new_hash_algos = HASH_ALGORITHMS + ["byte_xor"]
+ mocker.patch("swh.storage.cassandra.storage.HASH_ALGORITHMS", new_hash_algos)
+ mocker.patch("swh.storage.cassandra.cql.HASH_ALGORITHMS", new_hash_algos)
+ mocker.patch("swh.model.model.DEFAULT_ALGORITHMS", new_hash_algos)
+ mocker.patch("swh.storage.cassandra.storage.Content", ContentWithXor)
+ mocker.patch("swh.storage.cassandra.storage.ContentRow", ContentRowWithXorPK)
+ mocker.patch("swh.storage.cassandra.model.ContentRow", ContentRowWithXorPK)
+ mocker.patch("swh.storage.cassandra.storage.CqlRunner", CqlRunnerWithXorPK)
+
+ # Forge new objects with this extra hash:
+ new_content = ContentWithXor.from_dict(
+ {
+ "byte_xor": byte_xor_hash(StorageData.content.data),
+ **StorageData.content.to_dict(),
+ }
+ )
+ new_content2 = ContentWithXor.from_dict(
+ {
+ "byte_xor": byte_xor_hash(StorageData.content2.data),
+ **StorageData.content2.to_dict(),
+ }
+ )
+
+ # Replay to the new table.
+ # In production this would be done with a replayer reading from the journal,
+ # while loaders would still write to the DB.
+ overwriting_swh_storage = get_storage(
+ allow_overwrite=True, **swh_storage_backend_config
+ )
+ overwriting_swh_storage.content_add([new_content, new_content2])
+
+ # Old algos still works, and return the new object type;
+ # but the byte_xor value is None because it is only available in the new
+ # table, which this storage is not yet configured to use
+ assert swh_storage.content_get([StorageData.content.sha1]) == [
+ attr.evolve(new_content, data=None, byte_xor=None)
+ ]
+
+ # When the replayer gets close to the end of the logs, loaders are stopped
+ # to allow the replayer to catch up with the end of the log.
+ # When it does, we can switch over to the new swh-storage's code.
+
+ # Simulates a restart:
+ swh_storage._set_cql_runner()
+
+ # Now, the object can be found with the new hash:
+ assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [
+ attr.evolve(new_content, data=None)
+ ]
+
+ # Remove the old table:
+ swh_storage._cql_runner._session.execute("DROP TABLE content")
+
+ # Object is still available, because we don't use it anymore
+ assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [
+ attr.evolve(new_content, data=None)
+ ]
+
+ # THE END.
+
+ # Test teardown expects a table with this name to exist:
+ swh_storage._cql_runner._session.execute(
+ "CREATE TABLE content (foo blob PRIMARY KEY);"
+ )
+
+ # Clean up this table, test teardown does not know about it:
+ swh_storage._cql_runner._session.execute("DROP TABLE content_v2;")

File Metadata

Mime Type
text/plain
Expires
Jul 3 2025, 9:33 AM (5 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3215886

Event Timeline