Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_cassandra_migration.py
Show All 28 Lines | |||||
from .storage_data import StorageData | from .storage_data import StorageData | ||||
from .test_cassandra import ( # noqa, needed for swh_storage fixture | from .test_cassandra import ( # noqa, needed for swh_storage fixture | ||||
cassandra_cluster, | cassandra_cluster, | ||||
keyspace, | keyspace, | ||||
swh_storage_backend_config, | swh_storage_backend_config, | ||||
) | ) | ||||
############################## | |||||
# Common structures | |||||
def byte_xor_hash(data): | def byte_xor_hash(data): | ||||
# Behold, a one-line hash function: | # Behold, a one-line hash function: | ||||
return bytes([functools.reduce(operator.xor, data)]) | return bytes([functools.reduce(operator.xor, data)]) | ||||
@attr.s | @attr.s | ||||
class ContentWithXor(Content): | class ContentWithXor(Content): | ||||
"""An hypothetical upgrade of Content with an extra "hash".""" | """An hypothetical upgrade of Content with an extra "hash".""" | ||||
byte_xor = attr.ib(type=bytes, default=None) | byte_xor = attr.ib(type=bytes, default=None) | ||||
############################## | |||||
# Test simple migrations | |||||
@dataclasses.dataclass | @dataclasses.dataclass | ||||
class ContentRowWithXor(ContentRow): | class ContentRowWithXor(ContentRow): | ||||
"""An hypothetical upgrade of ContentRow with an extra "hash".""" | """An hypothetical upgrade of ContentRow with an extra "hash", | ||||
but not in the primary key.""" | |||||
byte_xor: bytes | byte_xor: bytes | ||||
class CqlRunnerWithXor(CqlRunner): | class CqlRunnerWithXor(CqlRunner): | ||||
"""An hypothetical upgrade of ContentRow with an extra "hash", | |||||
but not in the primary key.""" | |||||
@_prepared_select_statement( | @_prepared_select_statement( | ||||
ContentRowWithXor, | ContentRowWithXor, | ||||
f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}", | f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}", | ||||
) | ) | ||||
def content_get_from_pk( | def content_get_from_pk( | ||||
self, content_hashes: Dict[str, bytes], *, statement | self, content_hashes: Dict[str, bytes], *, statement | ||||
) -> Optional[ContentRow]: | ) -> Optional[ContentRow]: | ||||
rows = list( | rows = list( | ||||
self._execute_with_retries( | self._execute_with_retries( | ||||
statement, [content_hashes[algo] for algo in HASH_ALGORITHMS] | statement, [content_hashes[algo] for algo in HASH_ALGORITHMS] | ||||
) | ) | ||||
) | ) | ||||
assert len(rows) <= 1 | assert len(rows) <= 1 | ||||
if rows: | if rows: | ||||
return ContentRowWithXor(**rows[0]) | return ContentRowWithXor(**rows[0]) | ||||
else: | else: | ||||
return None | return None | ||||
@_prepared_select_statement( | @_prepared_select_statement( | ||||
ContentRowWithXor, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" | ContentRowWithXor, | ||||
f"WHERE token({', '.join(ContentRowWithXor.PARTITION_KEY)}) = ?", | |||||
) | ) | ||||
def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]: | def content_get_from_token( | ||||
self, token, *, statement | |||||
) -> Iterable[ContentRowWithXor]: | |||||
return map( | return map( | ||||
ContentRowWithXor.from_dict, self._execute_with_retries(statement, [token]) | ContentRowWithXor.from_dict, self._execute_with_retries(statement, [token]) | ||||
) | ) | ||||
# Redecorate content_add_prepare with the new ContentRow class | # Redecorate content_add_prepare with the new ContentRow class | ||||
content_add_prepare = _prepared_insert_statement(ContentRowWithXor)( # type: ignore | content_add_prepare = _prepared_insert_statement(ContentRowWithXor)( # type: ignore | ||||
CqlRunner.content_add_prepare.__wrapped__ # type: ignore | CqlRunner.content_add_prepare.__wrapped__ # type: ignore | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 68 Lines • ▼ Show 20 Lines | overwriting_swh_storage = get_storage( | ||||
allow_overwrite=True, **swh_storage_backend_config | allow_overwrite=True, **swh_storage_backend_config | ||||
) | ) | ||||
overwriting_swh_storage.content_add([new_content, new_content2]) | overwriting_swh_storage.content_add([new_content, new_content2]) | ||||
# Now, the object can be found: | # Now, the object can be found: | ||||
assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [ | assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [ | ||||
attr.evolve(new_content, data=None) | attr.evolve(new_content, data=None) | ||||
] | ] | ||||
############################## | |||||
# Test complex migrations | |||||
@dataclasses.dataclass | |||||
class ContentRowWithXorPK(ContentRow): | |||||
"""An hypothetical upgrade of ContentRow with an extra "hash", | |||||
in the primary key.""" | |||||
TABLE = "content_v2" | |||||
PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256", "byte_xor") | |||||
byte_xor: bytes | |||||
class CqlRunnerWithXorPK(CqlRunner): | |||||
"""An hypothetical upgrade of ContentRow with an extra "hash", | |||||
but not in the primary key.""" | |||||
@_prepared_select_statement( | |||||
ContentRowWithXorPK, | |||||
f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))} AND byte_xor=?", | |||||
) | |||||
def content_get_from_pk( | |||||
self, content_hashes: Dict[str, bytes], *, statement | |||||
) -> Optional[ContentRow]: | |||||
rows = list( | |||||
self._execute_with_retries( | |||||
statement, | |||||
[content_hashes[algo] for algo in HASH_ALGORITHMS + ["byte_xor"]], | |||||
) | |||||
) | |||||
assert len(rows) <= 1 | |||||
if rows: | |||||
return ContentRowWithXorPK(**rows[0]) | |||||
else: | |||||
return None | |||||
@_prepared_select_statement( | |||||
ContentRowWithXorPK, | |||||
f"WHERE token({', '.join(ContentRowWithXorPK.PARTITION_KEY)}) = ?", | |||||
) | |||||
def content_get_from_token( | |||||
self, token, *, statement | |||||
) -> Iterable[ContentRowWithXorPK]: | |||||
return map( | |||||
ContentRowWithXorPK.from_dict, | |||||
self._execute_with_retries(statement, [token]), | |||||
) | |||||
# Redecorate content_add_prepare with the new ContentRow class | |||||
content_add_prepare = _prepared_insert_statement(ContentRowWithXorPK)( # type: ignore # noqa | |||||
CqlRunner.content_add_prepare.__wrapped__ # type: ignore | |||||
) | |||||
def test_change_content_pk( | |||||
swh_storage: CassandraStorage, swh_storage_backend_config, mocker # noqa | |||||
) -> None: | |||||
"""Adds a column to the 'content' table and a new matching index; | |||||
and make this new column part of the primary key | |||||
This is a complex migration, as it requires copying the whole table | |||||
""" | |||||
content_xor_hash = byte_xor_hash(StorageData.content.data) | |||||
# First insert some existing data | |||||
swh_storage.content_add([StorageData.content, StorageData.content2]) | |||||
# Then add a new table and a new index | |||||
swh_storage._cql_runner._session.execute( | |||||
""" | |||||
CREATE TABLE IF NOT EXISTS content_v2 ( | |||||
sha1 blob, | |||||
sha1_git blob, | |||||
sha256 blob, | |||||
blake2s256 blob, | |||||
byte_xor blob, | |||||
length bigint, | |||||
ctime timestamp, | |||||
-- creation time, i.e. time of (first) injection into the storage | |||||
status ascii, | |||||
PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256, byte_xor)) | |||||
);""" | |||||
) | |||||
for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"): | |||||
swh_storage._cql_runner._session.execute(statement.format(main_algo="byte_xor")) | |||||
# Should not affect the running code at all: | |||||
assert swh_storage.content_get([StorageData.content.sha1]) == [ | |||||
attr.evolve(StorageData.content, data=None) | |||||
] | |||||
with pytest.raises(StorageArgumentException): | |||||
swh_storage.content_find({"byte_xor": content_xor_hash}) | |||||
# Then update the running code: | |||||
new_hash_algos = HASH_ALGORITHMS + ["byte_xor"] | |||||
mocker.patch("swh.storage.cassandra.storage.HASH_ALGORITHMS", new_hash_algos) | |||||
mocker.patch("swh.storage.cassandra.cql.HASH_ALGORITHMS", new_hash_algos) | |||||
mocker.patch("swh.model.model.DEFAULT_ALGORITHMS", new_hash_algos) | |||||
mocker.patch("swh.storage.cassandra.storage.Content", ContentWithXor) | |||||
mocker.patch("swh.storage.cassandra.storage.ContentRow", ContentRowWithXorPK) | |||||
mocker.patch("swh.storage.cassandra.model.ContentRow", ContentRowWithXorPK) | |||||
mocker.patch("swh.storage.cassandra.storage.CqlRunner", CqlRunnerWithXorPK) | |||||
# Forge new objects with this extra hash: | |||||
new_content = ContentWithXor.from_dict( | |||||
{ | |||||
"byte_xor": byte_xor_hash(StorageData.content.data), | |||||
**StorageData.content.to_dict(), | |||||
} | |||||
) | |||||
new_content2 = ContentWithXor.from_dict( | |||||
{ | |||||
"byte_xor": byte_xor_hash(StorageData.content2.data), | |||||
**StorageData.content2.to_dict(), | |||||
} | |||||
) | |||||
# Replay to the new table. | |||||
# In production this would be done with a replayer reading from the journal, | |||||
# while loaders would still write to the DB. | |||||
overwriting_swh_storage = get_storage( | |||||
allow_overwrite=True, **swh_storage_backend_config | |||||
) | |||||
overwriting_swh_storage.content_add([new_content, new_content2]) | |||||
# Old algos still works, and return the new object type; | |||||
# but the byte_xor value is None because it is only available in the new | |||||
# table, which this storage is not yet configured to use | |||||
assert swh_storage.content_get([StorageData.content.sha1]) == [ | |||||
attr.evolve(new_content, data=None, byte_xor=None) | |||||
] | |||||
# When the replayer gets close to the end of the logs, loaders are stopped | |||||
# to allow the replayer to catch up with the end of the log. | |||||
# When it does, we can switch over to the new swh-storage's code. | |||||
# Simulates a restart: | |||||
swh_storage._set_cql_runner() | |||||
# Now, the object can be found with the new hash: | |||||
assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [ | |||||
attr.evolve(new_content, data=None) | |||||
] | |||||
# Remove the old table: | |||||
swh_storage._cql_runner._session.execute("DROP TABLE content") | |||||
# Object is still available, because we don't use it anymore | |||||
assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [ | |||||
attr.evolve(new_content, data=None) | |||||
] | |||||
# THE END. | |||||
# Test teardown expects a table with this name to exist: | |||||
swh_storage._cql_runner._session.execute( | |||||
"CREATE TABLE content (foo blob PRIMARY KEY);" | |||||
) | |||||
# Clean up this table, test teardown does not know about it: | |||||
swh_storage._cql_runner._session.execute("DROP TABLE content_v2;") |