diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py index a7803ae7..00eb1057 100644 --- a/swh/storage/cassandra/model.py +++ b/swh/storage/cassandra/model.py @@ -1,318 +1,323 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Classes representing tables in the Cassandra database. They are very close to classes found in swh.model.model, but most of them are subtly different: * Large objects are split into other classes (eg. RevisionRow has no 'parents' field, because parents are stored in a different table, represented by RevisionParentRow) * They have a "cols" field, which returns the list of column names of the table * They only use types that map directly to Cassandra's schema (ie. no enums) Therefore, this model doesn't reuse swh.model.model, except for types that can be mapped to UDTs (Person and TimestampWithTimezone). """ import dataclasses import datetime from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, TypeVar from swh.model.model import Person, TimestampWithTimezone MAGIC_NULL_PK = b"<null>" """ NULLs (or all-empty blobs) are not allowed in primary keys; instead we use a special value that can't possibly be a valid hash. """ T = TypeVar("T", bound="BaseRow") def content_index_table_name(algo: str, skipped_content: bool) -> str: """Given an algorithm name, returns the name of one of the 'content_by_*' and 'skipped_content_by_*' tables that serve as index for the 'content' and 'skipped_content' tables based on this algorithm's hashes. For now it is a simple substitution, but future versions may append a version number to it, if needed for schema updates.""" if skipped_content: return f"skipped_content_by_{algo}" else: return f"content_by_{algo}" class BaseRow: TABLE: ClassVar[str] PARTITION_KEY: ClassVar[Tuple[str, ...]] CLUSTERING_KEY: ClassVar[Tuple[str, ...]] = () @classmethod def from_dict(cls: Type[T], d: Dict[str, Any]) -> T: return cls(**d) # type: ignore @classmethod def cols(cls) -> List[str]: return [field.name for field in dataclasses.fields(cls)] def to_dict(self) -> Dict[str, Any]: return dataclasses.asdict(self) @dataclasses.dataclass class ContentRow(BaseRow): TABLE = "content" - PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256") + PARTITION_KEY: ClassVar[Tuple[str, ...]] = ( + "sha1", + "sha1_git", + "sha256", + "blake2s256", + ) sha1: bytes sha1_git: bytes sha256: bytes blake2s256: bytes length: int ctime: datetime.datetime status: str @dataclasses.dataclass class SkippedContentRow(BaseRow): TABLE = "skipped_content" PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256") sha1: Optional[bytes] sha1_git: Optional[bytes] sha256: Optional[bytes] blake2s256: Optional[bytes] length: Optional[int] ctime: Optional[datetime.datetime] status: str reason: str origin: str @classmethod def from_dict(cls, d: Dict[str, Any]) -> "SkippedContentRow": d = d.copy() for k in ("sha1", "sha1_git", "sha256", "blake2s256"): if d[k] == MAGIC_NULL_PK: d[k] = None return super().from_dict(d) @dataclasses.dataclass class DirectoryRow(BaseRow): TABLE = "directory" PARTITION_KEY = ("id",) id: bytes @dataclasses.dataclass class DirectoryEntryRow(BaseRow): TABLE = "directory_entry" PARTITION_KEY = ("directory_id",) CLUSTERING_KEY = ("name",) directory_id: bytes name: bytes target: bytes perms: int type: str @dataclasses.dataclass class RevisionRow(BaseRow): TABLE = "revision" PARTITION_KEY = ("id",) id: bytes date: Optional[TimestampWithTimezone] committer_date: Optional[TimestampWithTimezone] type: str directory: bytes message: bytes author: Person committer: Person synthetic: bool metadata: str extra_headers: dict @dataclasses.dataclass class RevisionParentRow(BaseRow): TABLE = "revision_parent" PARTITION_KEY = ("id",) CLUSTERING_KEY = ("parent_rank",) id: bytes parent_rank: int parent_id: bytes @dataclasses.dataclass class ReleaseRow(BaseRow): TABLE = "release" PARTITION_KEY = ("id",) id: bytes target_type: str target: bytes date: TimestampWithTimezone name: bytes message: bytes author: Person synthetic: bool @dataclasses.dataclass class SnapshotRow(BaseRow): TABLE = "snapshot" PARTITION_KEY = ("id",) id: bytes @dataclasses.dataclass class SnapshotBranchRow(BaseRow): TABLE = "snapshot_branch" PARTITION_KEY = ("snapshot_id",) CLUSTERING_KEY = ("name",) snapshot_id: bytes name: bytes target_type: Optional[str] target: Optional[bytes] @dataclasses.dataclass class OriginVisitRow(BaseRow): TABLE = "origin_visit" PARTITION_KEY = ("origin",) CLUSTERING_KEY = ("visit",) origin: str visit: int date: datetime.datetime type: str @dataclasses.dataclass class OriginVisitStatusRow(BaseRow): TABLE = "origin_visit_status" PARTITION_KEY = ("origin",) CLUSTERING_KEY = ("visit", "date") origin: str visit: int date: datetime.datetime type: str status: str metadata: str snapshot: bytes @classmethod def from_dict(cls: Type[T], d: Dict[str, Any]) -> T: return cls(**d) # type: ignore @dataclasses.dataclass class OriginRow(BaseRow): TABLE = "origin" PARTITION_KEY = ("sha1",) sha1: bytes url: str next_visit_id: int @dataclasses.dataclass class MetadataAuthorityRow(BaseRow): TABLE = "metadata_authority" PARTITION_KEY = ("url",) CLUSTERING_KEY = ("type",) url: str type: str metadata: str @dataclasses.dataclass class MetadataFetcherRow(BaseRow): TABLE = "metadata_fetcher" PARTITION_KEY = ("name",) CLUSTERING_KEY = ("version",) name: str version: str metadata: str @dataclasses.dataclass class RawExtrinsicMetadataRow(BaseRow): TABLE = "raw_extrinsic_metadata" PARTITION_KEY = ("target",) CLUSTERING_KEY = ( "authority_type", "authority_url", "discovery_date", "id", ) id: bytes type: str target: str authority_type: str authority_url: str discovery_date: datetime.datetime fetcher_name: str fetcher_version: str format: str metadata: bytes origin: Optional[str] visit: Optional[int] snapshot: Optional[str] release: Optional[str] revision: Optional[str] path: Optional[bytes] directory: Optional[str] @dataclasses.dataclass class ObjectCountRow(BaseRow): TABLE = "object_count" PARTITION_KEY = ("partition_key",) CLUSTERING_KEY = ("object_type",) partition_key: int object_type: str count: int @dataclasses.dataclass class ExtIDRow(BaseRow): TABLE = "extid" PARTITION_KEY = ("target", "target_type", "extid", "extid_type") extid_type: str extid: bytes target_type: str target: bytes @dataclasses.dataclass class ExtIDByTargetRow(BaseRow): TABLE = "extid_by_target" PARTITION_KEY = ("target_type", "target") CLUSTERING_KEY = ("target_token",) target_type: str target: bytes target_token: int diff --git a/swh/storage/tests/test_cassandra_migration.py b/swh/storage/tests/test_cassandra_migration.py index 40c1d262..1e46f52e 100644 --- a/swh/storage/tests/test_cassandra_migration.py +++ b/swh/storage/tests/test_cassandra_migration.py @@ -1,163 +1,341 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This module tests the migration capabilities of the Cassandra backend, by sending CQL commands (eg. 'ALTER TABLE'), and by monkey-patching large parts of the implementations to simulate code updates,.""" import dataclasses import functools import operator from typing import Dict, Iterable, Optional import attr import pytest from swh.model.model import Content from swh.storage import get_storage from swh.storage.cassandra.cql import ( CqlRunner, _prepared_insert_statement, _prepared_select_statement, ) from swh.storage.cassandra.model import ContentRow from swh.storage.cassandra.schema import CONTENT_INDEX_TEMPLATE, HASH_ALGORITHMS from swh.storage.cassandra.storage import CassandraStorage from swh.storage.exc import StorageArgumentException from .storage_data import StorageData from .test_cassandra import ( # noqa, needed for swh_storage fixture cassandra_cluster, keyspace, swh_storage_backend_config, ) +############################## +# Common structures + def byte_xor_hash(data): # Behold, a one-line hash function: return bytes([functools.reduce(operator.xor, data)]) @attr.s class ContentWithXor(Content): """An hypothetical upgrade of Content with an extra "hash".""" byte_xor = attr.ib(type=bytes, default=None) +############################## +# Test simple migrations + + @dataclasses.dataclass class ContentRowWithXor(ContentRow): - """An hypothetical upgrade of ContentRow with an extra "hash".""" + """An hypothetical upgrade of ContentRow with an extra "hash", + but not in the primary key.""" byte_xor: bytes class CqlRunnerWithXor(CqlRunner): + """An hypothetical upgrade of ContentRow with an extra "hash", + but not in the primary key.""" + @_prepared_select_statement( ContentRowWithXor, f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}", ) def content_get_from_pk( self, content_hashes: Dict[str, bytes], *, statement ) -> Optional[ContentRow]: rows = list( self._execute_with_retries( statement, [content_hashes[algo] for algo in HASH_ALGORITHMS] ) ) assert len(rows) <= 1 if rows: return ContentRowWithXor(**rows[0]) else: return None @_prepared_select_statement( - ContentRowWithXor, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" + ContentRowWithXor, + f"WHERE token({', '.join(ContentRowWithXor.PARTITION_KEY)}) = ?", ) - def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]: + def content_get_from_token( + self, token, *, statement + ) -> Iterable[ContentRowWithXor]: return map( ContentRowWithXor.from_dict, self._execute_with_retries(statement, [token]) ) # Redecorate content_add_prepare with the new ContentRow class content_add_prepare = _prepared_insert_statement(ContentRowWithXor)( # type: ignore CqlRunner.content_add_prepare.__wrapped__ # type: ignore ) def test_add_content_column( swh_storage: CassandraStorage, swh_storage_backend_config, mocker # noqa ) -> None: """Adds a column to the 'content' table and a new matching index. This is a simple migration, as it does not require an update to the primary key. """ content_xor_hash = byte_xor_hash(StorageData.content.data) # First insert some existing data swh_storage.content_add([StorageData.content, StorageData.content2]) # Then update the schema swh_storage._cql_runner._session.execute("ALTER TABLE content ADD byte_xor blob") for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"): swh_storage._cql_runner._session.execute(statement.format(main_algo="byte_xor")) # Should not affect the running code at all: assert swh_storage.content_get([StorageData.content.sha1]) == [ attr.evolve(StorageData.content, data=None) ] with pytest.raises(StorageArgumentException): swh_storage.content_find({"byte_xor": content_xor_hash}) # Then update the running code: new_hash_algos = HASH_ALGORITHMS + ["byte_xor"] mocker.patch("swh.storage.cassandra.storage.HASH_ALGORITHMS", new_hash_algos) mocker.patch("swh.storage.cassandra.cql.HASH_ALGORITHMS", new_hash_algos) mocker.patch("swh.model.model.DEFAULT_ALGORITHMS", new_hash_algos) mocker.patch("swh.storage.cassandra.storage.Content", ContentWithXor) mocker.patch("swh.storage.cassandra.storage.ContentRow", ContentRowWithXor) mocker.patch("swh.storage.cassandra.model.ContentRow", ContentRowWithXor) mocker.patch("swh.storage.cassandra.storage.CqlRunner", CqlRunnerWithXor) # Forge new objects with this extra hash: new_content = ContentWithXor.from_dict( { "byte_xor": byte_xor_hash(StorageData.content.data), **StorageData.content.to_dict(), } ) new_content2 = ContentWithXor.from_dict( { "byte_xor": byte_xor_hash(StorageData.content2.data), **StorageData.content2.to_dict(), } ) # Simulates a restart: swh_storage._set_cql_runner() # Old algos still works, and return the new object type: assert swh_storage.content_get([StorageData.content.sha1]) == [ attr.evolve(new_content, data=None, byte_xor=None) ] # The new algo does not work, we did not backfill it yet: assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [] # A normal storage would not overwrite, because the object already exists, # as it is not aware it is missing a field: swh_storage.content_add([new_content, new_content2]) assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [] # Backfill (in production this would be done with a replayer reading from # the journal): overwriting_swh_storage = get_storage( allow_overwrite=True, **swh_storage_backend_config ) overwriting_swh_storage.content_add([new_content, new_content2]) # Now, the object can be found: assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [ attr.evolve(new_content, data=None) ] + + +############################## +# Test complex migrations + + +@dataclasses.dataclass +class ContentRowWithXorPK(ContentRow): + """An hypothetical upgrade of ContentRow with an extra "hash", + in the primary key.""" + + TABLE = "content_v2" + PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256", "byte_xor") + + byte_xor: bytes + + +class CqlRunnerWithXorPK(CqlRunner): + """An hypothetical upgrade of ContentRow with an extra "hash", + but not in the primary key.""" + + @_prepared_select_statement( + ContentRowWithXorPK, + f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))} AND byte_xor=?", + ) + def content_get_from_pk( + self, content_hashes: Dict[str, bytes], *, statement + ) -> Optional[ContentRow]: + rows = list( + self._execute_with_retries( + statement, + [content_hashes[algo] for algo in HASH_ALGORITHMS + ["byte_xor"]], + ) + ) + assert len(rows) <= 1 + if rows: + return ContentRowWithXorPK(**rows[0]) + else: + return None + + @_prepared_select_statement( + ContentRowWithXorPK, + f"WHERE token({', '.join(ContentRowWithXorPK.PARTITION_KEY)}) = ?", + ) + def content_get_from_token( + self, token, *, statement + ) -> Iterable[ContentRowWithXorPK]: + return map( + ContentRowWithXorPK.from_dict, + self._execute_with_retries(statement, [token]), + ) + + # Redecorate content_add_prepare with the new ContentRow class + content_add_prepare = _prepared_insert_statement(ContentRowWithXorPK)( # type: ignore # noqa + CqlRunner.content_add_prepare.__wrapped__ # type: ignore + ) + + +def test_change_content_pk( + swh_storage: CassandraStorage, swh_storage_backend_config, mocker # noqa +) -> None: + """Adds a column to the 'content' table and a new matching index; + and make this new column part of the primary key + This is a complex migration, as it requires copying the whole table + """ + content_xor_hash = byte_xor_hash(StorageData.content.data) + + # First insert some existing data + swh_storage.content_add([StorageData.content, StorageData.content2]) + + # Then add a new table and a new index + swh_storage._cql_runner._session.execute( + """ + CREATE TABLE IF NOT EXISTS content_v2 ( + sha1 blob, + sha1_git blob, + sha256 blob, + blake2s256 blob, + byte_xor blob, + length bigint, + ctime timestamp, + -- creation time, i.e. time of (first) injection into the storage + status ascii, + PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256, byte_xor)) + );""" + ) + for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"): + swh_storage._cql_runner._session.execute(statement.format(main_algo="byte_xor")) + + # Should not affect the running code at all: + assert swh_storage.content_get([StorageData.content.sha1]) == [ + attr.evolve(StorageData.content, data=None) + ] + with pytest.raises(StorageArgumentException): + swh_storage.content_find({"byte_xor": content_xor_hash}) + + # Then update the running code: + new_hash_algos = HASH_ALGORITHMS + ["byte_xor"] + mocker.patch("swh.storage.cassandra.storage.HASH_ALGORITHMS", new_hash_algos) + mocker.patch("swh.storage.cassandra.cql.HASH_ALGORITHMS", new_hash_algos) + mocker.patch("swh.model.model.DEFAULT_ALGORITHMS", new_hash_algos) + mocker.patch("swh.storage.cassandra.storage.Content", ContentWithXor) + mocker.patch("swh.storage.cassandra.storage.ContentRow", ContentRowWithXorPK) + mocker.patch("swh.storage.cassandra.model.ContentRow", ContentRowWithXorPK) + mocker.patch("swh.storage.cassandra.storage.CqlRunner", CqlRunnerWithXorPK) + + # Forge new objects with this extra hash: + new_content = ContentWithXor.from_dict( + { + "byte_xor": byte_xor_hash(StorageData.content.data), + **StorageData.content.to_dict(), + } + ) + new_content2 = ContentWithXor.from_dict( + { + "byte_xor": byte_xor_hash(StorageData.content2.data), + **StorageData.content2.to_dict(), + } + ) + + # Replay to the new table. + # In production this would be done with a replayer reading from the journal, + # while loaders would still write to the DB. + overwriting_swh_storage = get_storage( + allow_overwrite=True, **swh_storage_backend_config + ) + overwriting_swh_storage.content_add([new_content, new_content2]) + + # Old algos still works, and return the new object type; + # but the byte_xor value is None because it is only available in the new + # table, which this storage is not yet configured to use + assert swh_storage.content_get([StorageData.content.sha1]) == [ + attr.evolve(new_content, data=None, byte_xor=None) + ] + + # When the replayer gets close to the end of the logs, loaders are stopped + # to allow the replayer to catch up with the end of the log. + # When it does, we can switch over to the new swh-storage's code. + + # Simulates a restart: + swh_storage._set_cql_runner() + + # Now, the object can be found with the new hash: + assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [ + attr.evolve(new_content, data=None) + ] + + # Remove the old table: + swh_storage._cql_runner._session.execute("DROP TABLE content") + + # Object is still available, because we don't use it anymore + assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [ + attr.evolve(new_content, data=None) + ] + + # THE END. + + # Test teardown expects a table with this name to exist: + swh_storage._cql_runner._session.execute( + "CREATE TABLE content (foo blob PRIMARY KEY);" + ) + + # Clean up this table, test teardown does not know about it: + swh_storage._cql_runner._session.execute("DROP TABLE content_v2;")