swh_storage = <swh.storage.cassandra.storage.CassandraStorage object at 0x7f00197aff60>
swh_storage_backend_config = {'cls': 'cassandra', 'hosts': ['127.0.0.1'], 'journal_writer': {'cls': 'memory'}, 'keyspace': 'a85b774fe9ab80a8f274', ...}
mocker = <pytest_mock.plugin.MockerFixture object at 0x7f0011bad8d0>
def test_change_content_pk(
swh_storage: CassandraStorage, swh_storage_backend_config, mocker # noqa
) -> None:
"""Adds a column to the 'content' table and a new matching index;
and make this new column part of the primary key
This is a complex migration, as it requires copying the whole table
"""
content_xor_hash = byte_xor_hash(StorageData.content.data)
# First insert some existing data
swh_storage.content_add([StorageData.content, StorageData.content2])
# Then add a new table and a new index
swh_storage._cql_runner._session.execute(
"""
CREATE TABLE IF NOT EXISTS content_v2 (
sha1 blob,
sha1_git blob,
sha256 blob,
blake2s256 blob,
byte_xor blob,
length bigint,
ctime timestamp,
-- creation time, i.e. time of (first) injection into the storage
status ascii,
PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256, byte_xor))
);"""
)
for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"):
swh_storage._cql_runner._session.execute(statement.format(main_algo="byte_xor"))
# Should not affect the running code at all:
assert swh_storage.content_get([StorageData.content.sha1]) == [
attr.evolve(StorageData.content, data=None)
]
with pytest.raises(StorageArgumentException):
swh_storage.content_find({"byte_xor": content_xor_hash})
# Then update the running code:
new_hash_algos = HASH_ALGORITHMS + ["byte_xor"]
mocker.patch("swh.storage.cassandra.storage.HASH_ALGORITHMS", new_hash_algos)
mocker.patch("swh.storage.cassandra.cql.HASH_ALGORITHMS", new_hash_algos)
mocker.patch("swh.model.model.DEFAULT_ALGORITHMS", new_hash_algos)
mocker.patch("swh.storage.cassandra.storage.Content", ContentWithXor)
mocker.patch("swh.storage.cassandra.storage.ContentRow", ContentRowWithXorPK)
mocker.patch("swh.storage.cassandra.model.ContentRow", ContentRowWithXorPK)
mocker.patch("swh.storage.cassandra.storage.CqlRunner", CqlRunnerWithXorPK)
# Forge new objects with this extra hash:
new_content = ContentWithXor.from_dict(
{
"byte_xor": byte_xor_hash(StorageData.content.data),
**StorageData.content.to_dict(),
}
)
new_content2 = ContentWithXor.from_dict(
{
"byte_xor": byte_xor_hash(StorageData.content2.data),
**StorageData.content2.to_dict(),
}
)
# Replay to the new table.
# In production this would be done with a replayer reading from the journal,
# while loaders would still write to the DB.
overwriting_swh_storage = get_storage(
allow_overwrite=True, **swh_storage_backend_config
)
overwriting_swh_storage.content_add([new_content, new_content2])
# Old algos still works, and return the new object type;
# but the byte_xor value is None because it is only available in the new
# table, which this storage is not yet configured to use
assert swh_storage.content_get([StorageData.content.sha1]) == [
attr.evolve(new_content, data=None, byte_xor=None)
]
# When the replayer gets close to the end of the logs, loaders are stopped
# to allow the replayer to catch up with the end of the log.
# When it does, we can switch over to the new swh-storage's code.
# Simulates a restart:
swh_storage._set_cql_runner()
# Now, the object can be found with the new hash:
> assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [
attr.evolve(new_content, data=None)
]
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_cassandra_migration.py:321:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/storage/metrics.py:24: in d
return f(*a, **kw)
.tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:349: in content_find
return self._content_find_many([content])
.tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:368: in _content_find_many
for row in rows:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.cassandra.storage.CassandraStorage object at 0x7f00197aff60>
algo = 'byte_xor', hashes = [b'\x0c']
def _content_get_from_hashes(self, algo, hashes: List[bytes]) -> Iterable:
"""From the name of a hash algorithm and a value of that hash,
looks up the "hash -> token" secondary table (content_by_{algo})
to get tokens.
Then, looks up the main table (content) to get all contents with
that token, and filters out contents whose hash doesn't match."""
found_tokens = list(
self._cql_runner.content_get_tokens_from_single_algo(algo, hashes)
)
assert all(isinstance(token, int) for token in found_tokens)
# Query the main table ('content').
rows = self._cql_runner.content_get_from_tokens(found_tokens)
for row in rows:
# re-check the the hash (in case of murmur3 collision)
> if getattr(row, algo) in hashes:
E AttributeError: 'ContentRow' object has no attribute 'byte_xor'
.tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:175: AttributeError
TEST RESULT
TEST RESULT
- Run At
- Sep 15 2021, 3:20 PM