Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_cassandra_migration.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
"""This module tests the migration capabilities of the Cassandra backend, | |||||
by sending CQL commands (eg. 'ALTER TABLE'), and | |||||
by monkey-patching large parts of the implementations to simulate code updates,.""" | |||||
import dataclasses | |||||
import functools | |||||
import operator | |||||
from typing import Dict, Iterable, Optional | |||||
import attr | |||||
import pytest | |||||
from swh.model.model import Content | |||||
from swh.storage import get_storage | |||||
from swh.storage.cassandra.cql import ( | |||||
CqlRunner, | |||||
_prepared_insert_statement, | |||||
_prepared_select_statement, | |||||
) | |||||
from swh.storage.cassandra.model import ContentRow | |||||
from swh.storage.cassandra.schema import CONTENT_INDEX_TEMPLATE, HASH_ALGORITHMS | |||||
from swh.storage.cassandra.storage import CassandraStorage | |||||
from swh.storage.exc import StorageArgumentException | |||||
from .storage_data import StorageData | |||||
from .test_cassandra import ( # noqa, needed for swh_storage fixture | |||||
cassandra_cluster, | |||||
keyspace, | |||||
swh_storage_backend_config, | |||||
) | |||||
def byte_xor_hash(data): | |||||
# Behold, a one-line hash function: | |||||
return bytes([functools.reduce(operator.xor, data)]) | |||||
@attr.s | |||||
class ContentWithXor(Content): | |||||
"""An hypothetical upgrade of Content with an extra "hash".""" | |||||
byte_xor = attr.ib(type=bytes, default=None) | |||||
@dataclasses.dataclass | |||||
class ContentRowWithXor(ContentRow): | |||||
"""An hypothetical upgrade of ContentRow with an extra "hash".""" | |||||
byte_xor: bytes | |||||
class CqlRunnerWithXor(CqlRunner): | |||||
@_prepared_select_statement( | |||||
ContentRowWithXor, | |||||
f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}", | |||||
) | |||||
def content_get_from_pk( | |||||
self, content_hashes: Dict[str, bytes], *, statement | |||||
) -> Optional[ContentRow]: | |||||
rows = list( | |||||
self._execute_with_retries( | |||||
statement, [content_hashes[algo] for algo in HASH_ALGORITHMS] | |||||
) | |||||
) | |||||
assert len(rows) <= 1 | |||||
if rows: | |||||
return ContentRowWithXor(**rows[0]) | |||||
else: | |||||
return None | |||||
@_prepared_select_statement( | |||||
ContentRowWithXor, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" | |||||
) | |||||
def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]: | |||||
return map( | |||||
ContentRowWithXor.from_dict, self._execute_with_retries(statement, [token]) | |||||
) | |||||
# Redecorate content_add_prepare with the new ContentRow class | |||||
content_add_prepare = _prepared_insert_statement(ContentRowWithXor)( # type: ignore | |||||
CqlRunner.content_add_prepare.__wrapped__ # type: ignore | |||||
) | |||||
def test_add_content_column( | |||||
swh_storage: CassandraStorage, swh_storage_backend_config, mocker # noqa | |||||
) -> None: | |||||
"""Adds a column to the 'content' table and a new matching index. | |||||
This is a simple migration, as it does not require an update to the primary key. | |||||
""" | |||||
content_xor_hash = byte_xor_hash(StorageData.content.data) | |||||
# First insert some existing data | |||||
swh_storage.content_add([StorageData.content, StorageData.content2]) | |||||
# Then update the schema | |||||
swh_storage._cql_runner._session.execute("ALTER TABLE content ADD byte_xor blob") | |||||
for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"): | |||||
swh_storage._cql_runner._session.execute(statement.format(main_algo="byte_xor")) | |||||
# Should not affect the running code at all: | |||||
assert swh_storage.content_get([StorageData.content.sha1]) == [ | |||||
attr.evolve(StorageData.content, data=None) | |||||
] | |||||
with pytest.raises(StorageArgumentException): | |||||
swh_storage.content_find({"byte_xor": content_xor_hash}) | |||||
# Then update the running code: | |||||
new_hash_algos = HASH_ALGORITHMS + ["byte_xor"] | |||||
mocker.patch("swh.storage.cassandra.storage.HASH_ALGORITHMS", new_hash_algos) | |||||
mocker.patch("swh.storage.cassandra.cql.HASH_ALGORITHMS", new_hash_algos) | |||||
mocker.patch("swh.model.model.DEFAULT_ALGORITHMS", new_hash_algos) | |||||
mocker.patch("swh.storage.cassandra.storage.Content", ContentWithXor) | |||||
mocker.patch("swh.storage.cassandra.storage.ContentRow", ContentRowWithXor) | |||||
mocker.patch("swh.storage.cassandra.model.ContentRow", ContentRowWithXor) | |||||
mocker.patch("swh.storage.cassandra.storage.CqlRunner", CqlRunnerWithXor) | |||||
# Forge new objects with this extra hash: | |||||
new_content = ContentWithXor.from_dict( | |||||
{ | |||||
"byte_xor": byte_xor_hash(StorageData.content.data), | |||||
**StorageData.content.to_dict(), | |||||
} | |||||
) | |||||
new_content2 = ContentWithXor.from_dict( | |||||
{ | |||||
"byte_xor": byte_xor_hash(StorageData.content2.data), | |||||
**StorageData.content2.to_dict(), | |||||
} | |||||
) | |||||
# Simulates a restart: | |||||
swh_storage._set_cql_runner() | |||||
# Old algos still works, and return the new object type: | |||||
assert swh_storage.content_get([StorageData.content.sha1]) == [ | |||||
attr.evolve(new_content, data=None, byte_xor=None) | |||||
] | |||||
# The new algo does not work, we did not backfill it yet: | |||||
assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [] | |||||
# A normal storage would not overwrite, because the object already exists, | |||||
# as it is not aware it is missing a field: | |||||
swh_storage.content_add([new_content, new_content2]) | |||||
assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [] | |||||
# Backfill (in production this would be done with a replayer reading from | |||||
# the journal): | |||||
overwriting_swh_storage = get_storage( | |||||
allow_overwrite=True, **swh_storage_backend_config | |||||
) | |||||
overwriting_swh_storage.content_add([new_content, new_content2]) | |||||
# Now, the object can be found: | |||||
assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [ | |||||
attr.evolve(new_content, data=None) | |||||
] |