diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py --- a/swh/storage/cassandra/model.py +++ b/swh/storage/cassandra/model.py @@ -68,7 +68,12 @@ @dataclasses.dataclass class ContentRow(BaseRow): TABLE = "content" - PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256") + PARTITION_KEY: ClassVar[Tuple[str, ...]] = ( + "sha1", + "sha1_git", + "sha256", + "blake2s256", + ) sha1: bytes sha1_git: bytes diff --git a/swh/storage/tests/test_cassandra_migration.py b/swh/storage/tests/test_cassandra_migration.py --- a/swh/storage/tests/test_cassandra_migration.py +++ b/swh/storage/tests/test_cassandra_migration.py @@ -34,6 +34,9 @@ swh_storage_backend_config, ) +############################## +# Common structures + def byte_xor_hash(data): # Behold, a one-line hash function: @@ -47,14 +50,22 @@ byte_xor = attr.ib(type=bytes, default=None) +############################## +# Test simple migrations + + @dataclasses.dataclass class ContentRowWithXor(ContentRow): - """An hypothetical upgrade of ContentRow with an extra "hash".""" + """An hypothetical upgrade of ContentRow with an extra "hash", + but not in the primary key.""" byte_xor: bytes class CqlRunnerWithXor(CqlRunner): + """An hypothetical upgrade of ContentRow with an extra "hash", + but not in the primary key.""" + @_prepared_select_statement( ContentRowWithXor, f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}", @@ -74,9 +85,12 @@ return None @_prepared_select_statement( - ContentRowWithXor, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" + ContentRowWithXor, + f"WHERE token({', '.join(ContentRowWithXor.PARTITION_KEY)}) = ?", ) - def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]: + def content_get_from_token( + self, token, *, statement + ) -> Iterable[ContentRowWithXor]: return map( ContentRowWithXor.from_dict, self._execute_with_retries(statement, [token]) ) @@ -161,3 +175,167 @@ assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [ attr.evolve(new_content, data=None) ] + + +############################## +# Test complex migrations + + +@dataclasses.dataclass +class ContentRowWithXorPK(ContentRow): + """An hypothetical upgrade of ContentRow with an extra "hash", + in the primary key.""" + + TABLE = "content_v2" + PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256", "byte_xor") + + byte_xor: bytes + + +class CqlRunnerWithXorPK(CqlRunner): + """An hypothetical upgrade of ContentRow with an extra "hash", + but not in the primary key.""" + + @_prepared_select_statement( + ContentRowWithXorPK, + f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))} AND byte_xor=?", + ) + def content_get_from_pk( + self, content_hashes: Dict[str, bytes], *, statement + ) -> Optional[ContentRow]: + rows = list( + self._execute_with_retries( + statement, + [content_hashes[algo] for algo in HASH_ALGORITHMS + ["byte_xor"]], + ) + ) + assert len(rows) <= 1 + if rows: + return ContentRowWithXorPK(**rows[0]) + else: + return None + + @_prepared_select_statement( + ContentRowWithXorPK, + f"WHERE token({', '.join(ContentRowWithXorPK.PARTITION_KEY)}) = ?", + ) + def content_get_from_token( + self, token, *, statement + ) -> Iterable[ContentRowWithXorPK]: + return map( + ContentRowWithXorPK.from_dict, + self._execute_with_retries(statement, [token]), + ) + + # Redecorate content_add_prepare with the new ContentRow class + content_add_prepare = _prepared_insert_statement(ContentRowWithXorPK)( # type: ignore # noqa + CqlRunner.content_add_prepare.__wrapped__ # type: ignore + ) + + +def test_change_content_pk( + swh_storage: CassandraStorage, swh_storage_backend_config, mocker # noqa +) -> None: + """Adds a column to the 'content' table and a new matching index; + and make this new column part of the primary key + This is a complex migration, as it requires copying the whole table + """ + content_xor_hash = byte_xor_hash(StorageData.content.data) + + # First insert some existing data + swh_storage.content_add([StorageData.content, StorageData.content2]) + + # Then add a new table and a new index + swh_storage._cql_runner._session.execute( + """ + CREATE TABLE IF NOT EXISTS content_v2 ( + sha1 blob, + sha1_git blob, + sha256 blob, + blake2s256 blob, + byte_xor blob, + length bigint, + ctime timestamp, + -- creation time, i.e. time of (first) injection into the storage + status ascii, + PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256, byte_xor)) + );""" + ) + for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"): + swh_storage._cql_runner._session.execute(statement.format(main_algo="byte_xor")) + + # Should not affect the running code at all: + assert swh_storage.content_get([StorageData.content.sha1]) == [ + attr.evolve(StorageData.content, data=None) + ] + with pytest.raises(StorageArgumentException): + swh_storage.content_find({"byte_xor": content_xor_hash}) + + # Then update the running code: + new_hash_algos = HASH_ALGORITHMS + ["byte_xor"] + mocker.patch("swh.storage.cassandra.storage.HASH_ALGORITHMS", new_hash_algos) + mocker.patch("swh.storage.cassandra.cql.HASH_ALGORITHMS", new_hash_algos) + mocker.patch("swh.model.model.DEFAULT_ALGORITHMS", new_hash_algos) + mocker.patch("swh.storage.cassandra.storage.Content", ContentWithXor) + mocker.patch("swh.storage.cassandra.storage.ContentRow", ContentRowWithXorPK) + mocker.patch("swh.storage.cassandra.model.ContentRow", ContentRowWithXorPK) + mocker.patch("swh.storage.cassandra.storage.CqlRunner", CqlRunnerWithXorPK) + + # Forge new objects with this extra hash: + new_content = ContentWithXor.from_dict( + { + "byte_xor": byte_xor_hash(StorageData.content.data), + **StorageData.content.to_dict(), + } + ) + new_content2 = ContentWithXor.from_dict( + { + "byte_xor": byte_xor_hash(StorageData.content2.data), + **StorageData.content2.to_dict(), + } + ) + + # Replay to the new table. + # In production this would be done with a replayer reading from the journal, + # while loaders would still write to the DB. + overwriting_swh_storage = get_storage( + allow_overwrite=True, **swh_storage_backend_config + ) + overwriting_swh_storage.content_add([new_content, new_content2]) + + # Old algos still works, and return the new object type; + # but the byte_xor value is None because it is only available in the new + # table, which this storage is not yet configured to use + assert swh_storage.content_get([StorageData.content.sha1]) == [ + attr.evolve(new_content, data=None, byte_xor=None) + ] + + # When the replayer gets close to the end of the logs, loaders are stopped + # to allow the replayer to catch up with the end of the log. + # When it does, we can switch over to the new swh-storage's code. + + # Simulates a restart: + swh_storage._set_cql_runner() + + # Now, the object can be found with the new hash: + assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [ + attr.evolve(new_content, data=None) + ] + + # Remove the old table: + swh_storage._cql_runner._session.execute("DROP TABLE content") + + # Object is still available, because we don't use it anymore + assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [ + attr.evolve(new_content, data=None) + ] + + # THE END. + + # Test teardown expects a table with this name to exist: + swh_storage._cql_runner._session.execute( + "CREATE TABLE content (foo blob PRIMARY KEY);" + ) + + # Clean up this table, test teardown does not know about it: + swh_storage._cql_runner._session.execute("DROP TABLE content_v2;")