Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9343433
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
20 KB
Subscribers
None
View Options
diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py
index a7803ae7..00eb1057 100644
--- a/swh/storage/cassandra/model.py
+++ b/swh/storage/cassandra/model.py
@@ -1,318 +1,323 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Classes representing tables in the Cassandra database.
They are very close to classes found in swh.model.model, but most of
them are subtly different:
* Large objects are split into other classes (eg. RevisionRow has no
'parents' field, because parents are stored in a different table,
represented by RevisionParentRow)
* They have a "cols" field, which returns the list of column names
of the table
* They only use types that map directly to Cassandra's schema (ie. no enums)
Therefore, this model doesn't reuse swh.model.model, except for types
that can be mapped to UDTs (Person and TimestampWithTimezone).
"""
import dataclasses
import datetime
from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, TypeVar
from swh.model.model import Person, TimestampWithTimezone
MAGIC_NULL_PK = b"<null>"
"""
NULLs (or all-empty blobs) are not allowed in primary keys; instead we use a
special value that can't possibly be a valid hash.
"""
T = TypeVar("T", bound="BaseRow")
def content_index_table_name(algo: str, skipped_content: bool) -> str:
"""Given an algorithm name, returns the name of one of the 'content_by_*'
and 'skipped_content_by_*' tables that serve as index for the 'content'
and 'skipped_content' tables based on this algorithm's hashes.
For now it is a simple substitution, but future versions may append a version
number to it, if needed for schema updates."""
if skipped_content:
return f"skipped_content_by_{algo}"
else:
return f"content_by_{algo}"
class BaseRow:
TABLE: ClassVar[str]
PARTITION_KEY: ClassVar[Tuple[str, ...]]
CLUSTERING_KEY: ClassVar[Tuple[str, ...]] = ()
@classmethod
def from_dict(cls: Type[T], d: Dict[str, Any]) -> T:
return cls(**d) # type: ignore
@classmethod
def cols(cls) -> List[str]:
return [field.name for field in dataclasses.fields(cls)]
def to_dict(self) -> Dict[str, Any]:
return dataclasses.asdict(self)
@dataclasses.dataclass
class ContentRow(BaseRow):
TABLE = "content"
- PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256")
+ PARTITION_KEY: ClassVar[Tuple[str, ...]] = (
+ "sha1",
+ "sha1_git",
+ "sha256",
+ "blake2s256",
+ )
sha1: bytes
sha1_git: bytes
sha256: bytes
blake2s256: bytes
length: int
ctime: datetime.datetime
status: str
@dataclasses.dataclass
class SkippedContentRow(BaseRow):
TABLE = "skipped_content"
PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256")
sha1: Optional[bytes]
sha1_git: Optional[bytes]
sha256: Optional[bytes]
blake2s256: Optional[bytes]
length: Optional[int]
ctime: Optional[datetime.datetime]
status: str
reason: str
origin: str
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "SkippedContentRow":
d = d.copy()
for k in ("sha1", "sha1_git", "sha256", "blake2s256"):
if d[k] == MAGIC_NULL_PK:
d[k] = None
return super().from_dict(d)
@dataclasses.dataclass
class DirectoryRow(BaseRow):
TABLE = "directory"
PARTITION_KEY = ("id",)
id: bytes
@dataclasses.dataclass
class DirectoryEntryRow(BaseRow):
TABLE = "directory_entry"
PARTITION_KEY = ("directory_id",)
CLUSTERING_KEY = ("name",)
directory_id: bytes
name: bytes
target: bytes
perms: int
type: str
@dataclasses.dataclass
class RevisionRow(BaseRow):
TABLE = "revision"
PARTITION_KEY = ("id",)
id: bytes
date: Optional[TimestampWithTimezone]
committer_date: Optional[TimestampWithTimezone]
type: str
directory: bytes
message: bytes
author: Person
committer: Person
synthetic: bool
metadata: str
extra_headers: dict
@dataclasses.dataclass
class RevisionParentRow(BaseRow):
TABLE = "revision_parent"
PARTITION_KEY = ("id",)
CLUSTERING_KEY = ("parent_rank",)
id: bytes
parent_rank: int
parent_id: bytes
@dataclasses.dataclass
class ReleaseRow(BaseRow):
TABLE = "release"
PARTITION_KEY = ("id",)
id: bytes
target_type: str
target: bytes
date: TimestampWithTimezone
name: bytes
message: bytes
author: Person
synthetic: bool
@dataclasses.dataclass
class SnapshotRow(BaseRow):
TABLE = "snapshot"
PARTITION_KEY = ("id",)
id: bytes
@dataclasses.dataclass
class SnapshotBranchRow(BaseRow):
TABLE = "snapshot_branch"
PARTITION_KEY = ("snapshot_id",)
CLUSTERING_KEY = ("name",)
snapshot_id: bytes
name: bytes
target_type: Optional[str]
target: Optional[bytes]
@dataclasses.dataclass
class OriginVisitRow(BaseRow):
TABLE = "origin_visit"
PARTITION_KEY = ("origin",)
CLUSTERING_KEY = ("visit",)
origin: str
visit: int
date: datetime.datetime
type: str
@dataclasses.dataclass
class OriginVisitStatusRow(BaseRow):
TABLE = "origin_visit_status"
PARTITION_KEY = ("origin",)
CLUSTERING_KEY = ("visit", "date")
origin: str
visit: int
date: datetime.datetime
type: str
status: str
metadata: str
snapshot: bytes
@classmethod
def from_dict(cls: Type[T], d: Dict[str, Any]) -> T:
return cls(**d) # type: ignore
@dataclasses.dataclass
class OriginRow(BaseRow):
TABLE = "origin"
PARTITION_KEY = ("sha1",)
sha1: bytes
url: str
next_visit_id: int
@dataclasses.dataclass
class MetadataAuthorityRow(BaseRow):
TABLE = "metadata_authority"
PARTITION_KEY = ("url",)
CLUSTERING_KEY = ("type",)
url: str
type: str
metadata: str
@dataclasses.dataclass
class MetadataFetcherRow(BaseRow):
TABLE = "metadata_fetcher"
PARTITION_KEY = ("name",)
CLUSTERING_KEY = ("version",)
name: str
version: str
metadata: str
@dataclasses.dataclass
class RawExtrinsicMetadataRow(BaseRow):
TABLE = "raw_extrinsic_metadata"
PARTITION_KEY = ("target",)
CLUSTERING_KEY = (
"authority_type",
"authority_url",
"discovery_date",
"id",
)
id: bytes
type: str
target: str
authority_type: str
authority_url: str
discovery_date: datetime.datetime
fetcher_name: str
fetcher_version: str
format: str
metadata: bytes
origin: Optional[str]
visit: Optional[int]
snapshot: Optional[str]
release: Optional[str]
revision: Optional[str]
path: Optional[bytes]
directory: Optional[str]
@dataclasses.dataclass
class ObjectCountRow(BaseRow):
TABLE = "object_count"
PARTITION_KEY = ("partition_key",)
CLUSTERING_KEY = ("object_type",)
partition_key: int
object_type: str
count: int
@dataclasses.dataclass
class ExtIDRow(BaseRow):
TABLE = "extid"
PARTITION_KEY = ("target", "target_type", "extid", "extid_type")
extid_type: str
extid: bytes
target_type: str
target: bytes
@dataclasses.dataclass
class ExtIDByTargetRow(BaseRow):
TABLE = "extid_by_target"
PARTITION_KEY = ("target_type", "target")
CLUSTERING_KEY = ("target_token",)
target_type: str
target: bytes
target_token: int
diff --git a/swh/storage/tests/test_cassandra_migration.py b/swh/storage/tests/test_cassandra_migration.py
index 40c1d262..1e46f52e 100644
--- a/swh/storage/tests/test_cassandra_migration.py
+++ b/swh/storage/tests/test_cassandra_migration.py
@@ -1,163 +1,341 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""This module tests the migration capabilities of the Cassandra backend,
by sending CQL commands (eg. 'ALTER TABLE'), and
by monkey-patching large parts of the implementations to simulate code updates,."""
import dataclasses
import functools
import operator
from typing import Dict, Iterable, Optional
import attr
import pytest
from swh.model.model import Content
from swh.storage import get_storage
from swh.storage.cassandra.cql import (
CqlRunner,
_prepared_insert_statement,
_prepared_select_statement,
)
from swh.storage.cassandra.model import ContentRow
from swh.storage.cassandra.schema import CONTENT_INDEX_TEMPLATE, HASH_ALGORITHMS
from swh.storage.cassandra.storage import CassandraStorage
from swh.storage.exc import StorageArgumentException
from .storage_data import StorageData
from .test_cassandra import ( # noqa, needed for swh_storage fixture
cassandra_cluster,
keyspace,
swh_storage_backend_config,
)
+##############################
+# Common structures
+
def byte_xor_hash(data):
# Behold, a one-line hash function:
return bytes([functools.reduce(operator.xor, data)])
@attr.s
class ContentWithXor(Content):
"""An hypothetical upgrade of Content with an extra "hash"."""
byte_xor = attr.ib(type=bytes, default=None)
+##############################
+# Test simple migrations
+
+
@dataclasses.dataclass
class ContentRowWithXor(ContentRow):
- """An hypothetical upgrade of ContentRow with an extra "hash"."""
+ """An hypothetical upgrade of ContentRow with an extra "hash",
+ but not in the primary key."""
byte_xor: bytes
class CqlRunnerWithXor(CqlRunner):
+ """An hypothetical upgrade of ContentRow with an extra "hash",
+ but not in the primary key."""
+
@_prepared_select_statement(
ContentRowWithXor,
f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}",
)
def content_get_from_pk(
self, content_hashes: Dict[str, bytes], *, statement
) -> Optional[ContentRow]:
rows = list(
self._execute_with_retries(
statement, [content_hashes[algo] for algo in HASH_ALGORITHMS]
)
)
assert len(rows) <= 1
if rows:
return ContentRowWithXor(**rows[0])
else:
return None
@_prepared_select_statement(
- ContentRowWithXor, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?"
+ ContentRowWithXor,
+ f"WHERE token({', '.join(ContentRowWithXor.PARTITION_KEY)}) = ?",
)
- def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]:
+ def content_get_from_token(
+ self, token, *, statement
+ ) -> Iterable[ContentRowWithXor]:
return map(
ContentRowWithXor.from_dict, self._execute_with_retries(statement, [token])
)
# Redecorate content_add_prepare with the new ContentRow class
content_add_prepare = _prepared_insert_statement(ContentRowWithXor)( # type: ignore
CqlRunner.content_add_prepare.__wrapped__ # type: ignore
)
def test_add_content_column(
swh_storage: CassandraStorage, swh_storage_backend_config, mocker # noqa
) -> None:
"""Adds a column to the 'content' table and a new matching index.
This is a simple migration, as it does not require an update to the primary key.
"""
content_xor_hash = byte_xor_hash(StorageData.content.data)
# First insert some existing data
swh_storage.content_add([StorageData.content, StorageData.content2])
# Then update the schema
swh_storage._cql_runner._session.execute("ALTER TABLE content ADD byte_xor blob")
for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"):
swh_storage._cql_runner._session.execute(statement.format(main_algo="byte_xor"))
# Should not affect the running code at all:
assert swh_storage.content_get([StorageData.content.sha1]) == [
attr.evolve(StorageData.content, data=None)
]
with pytest.raises(StorageArgumentException):
swh_storage.content_find({"byte_xor": content_xor_hash})
# Then update the running code:
new_hash_algos = HASH_ALGORITHMS + ["byte_xor"]
mocker.patch("swh.storage.cassandra.storage.HASH_ALGORITHMS", new_hash_algos)
mocker.patch("swh.storage.cassandra.cql.HASH_ALGORITHMS", new_hash_algos)
mocker.patch("swh.model.model.DEFAULT_ALGORITHMS", new_hash_algos)
mocker.patch("swh.storage.cassandra.storage.Content", ContentWithXor)
mocker.patch("swh.storage.cassandra.storage.ContentRow", ContentRowWithXor)
mocker.patch("swh.storage.cassandra.model.ContentRow", ContentRowWithXor)
mocker.patch("swh.storage.cassandra.storage.CqlRunner", CqlRunnerWithXor)
# Forge new objects with this extra hash:
new_content = ContentWithXor.from_dict(
{
"byte_xor": byte_xor_hash(StorageData.content.data),
**StorageData.content.to_dict(),
}
)
new_content2 = ContentWithXor.from_dict(
{
"byte_xor": byte_xor_hash(StorageData.content2.data),
**StorageData.content2.to_dict(),
}
)
# Simulates a restart:
swh_storage._set_cql_runner()
# Old algos still works, and return the new object type:
assert swh_storage.content_get([StorageData.content.sha1]) == [
attr.evolve(new_content, data=None, byte_xor=None)
]
# The new algo does not work, we did not backfill it yet:
assert swh_storage.content_find({"byte_xor": content_xor_hash}) == []
# A normal storage would not overwrite, because the object already exists,
# as it is not aware it is missing a field:
swh_storage.content_add([new_content, new_content2])
assert swh_storage.content_find({"byte_xor": content_xor_hash}) == []
# Backfill (in production this would be done with a replayer reading from
# the journal):
overwriting_swh_storage = get_storage(
allow_overwrite=True, **swh_storage_backend_config
)
overwriting_swh_storage.content_add([new_content, new_content2])
# Now, the object can be found:
assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [
attr.evolve(new_content, data=None)
]
+
+
+##############################
+# Test complex migrations
+
+
+@dataclasses.dataclass
+class ContentRowWithXorPK(ContentRow):
+ """An hypothetical upgrade of ContentRow with an extra "hash",
+ in the primary key."""
+
+ TABLE = "content_v2"
+ PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256", "byte_xor")
+
+ byte_xor: bytes
+
+
+class CqlRunnerWithXorPK(CqlRunner):
+ """An hypothetical upgrade of ContentRow with an extra "hash",
+ but not in the primary key."""
+
+ @_prepared_select_statement(
+ ContentRowWithXorPK,
+ f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))} AND byte_xor=?",
+ )
+ def content_get_from_pk(
+ self, content_hashes: Dict[str, bytes], *, statement
+ ) -> Optional[ContentRow]:
+ rows = list(
+ self._execute_with_retries(
+ statement,
+ [content_hashes[algo] for algo in HASH_ALGORITHMS + ["byte_xor"]],
+ )
+ )
+ assert len(rows) <= 1
+ if rows:
+ return ContentRowWithXorPK(**rows[0])
+ else:
+ return None
+
+ @_prepared_select_statement(
+ ContentRowWithXorPK,
+ f"WHERE token({', '.join(ContentRowWithXorPK.PARTITION_KEY)}) = ?",
+ )
+ def content_get_from_token(
+ self, token, *, statement
+ ) -> Iterable[ContentRowWithXorPK]:
+ return map(
+ ContentRowWithXorPK.from_dict,
+ self._execute_with_retries(statement, [token]),
+ )
+
+ # Redecorate content_add_prepare with the new ContentRow class
+ content_add_prepare = _prepared_insert_statement(ContentRowWithXorPK)( # type: ignore # noqa
+ CqlRunner.content_add_prepare.__wrapped__ # type: ignore
+ )
+
+
+def test_change_content_pk(
+ swh_storage: CassandraStorage, swh_storage_backend_config, mocker # noqa
+) -> None:
+ """Adds a column to the 'content' table and a new matching index;
+ and make this new column part of the primary key
+ This is a complex migration, as it requires copying the whole table
+ """
+ content_xor_hash = byte_xor_hash(StorageData.content.data)
+
+ # First insert some existing data
+ swh_storage.content_add([StorageData.content, StorageData.content2])
+
+ # Then add a new table and a new index
+ swh_storage._cql_runner._session.execute(
+ """
+ CREATE TABLE IF NOT EXISTS content_v2 (
+ sha1 blob,
+ sha1_git blob,
+ sha256 blob,
+ blake2s256 blob,
+ byte_xor blob,
+ length bigint,
+ ctime timestamp,
+ -- creation time, i.e. time of (first) injection into the storage
+ status ascii,
+ PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256, byte_xor))
+ );"""
+ )
+ for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"):
+ swh_storage._cql_runner._session.execute(statement.format(main_algo="byte_xor"))
+
+ # Should not affect the running code at all:
+ assert swh_storage.content_get([StorageData.content.sha1]) == [
+ attr.evolve(StorageData.content, data=None)
+ ]
+ with pytest.raises(StorageArgumentException):
+ swh_storage.content_find({"byte_xor": content_xor_hash})
+
+ # Then update the running code:
+ new_hash_algos = HASH_ALGORITHMS + ["byte_xor"]
+ mocker.patch("swh.storage.cassandra.storage.HASH_ALGORITHMS", new_hash_algos)
+ mocker.patch("swh.storage.cassandra.cql.HASH_ALGORITHMS", new_hash_algos)
+ mocker.patch("swh.model.model.DEFAULT_ALGORITHMS", new_hash_algos)
+ mocker.patch("swh.storage.cassandra.storage.Content", ContentWithXor)
+ mocker.patch("swh.storage.cassandra.storage.ContentRow", ContentRowWithXorPK)
+ mocker.patch("swh.storage.cassandra.model.ContentRow", ContentRowWithXorPK)
+ mocker.patch("swh.storage.cassandra.storage.CqlRunner", CqlRunnerWithXorPK)
+
+ # Forge new objects with this extra hash:
+ new_content = ContentWithXor.from_dict(
+ {
+ "byte_xor": byte_xor_hash(StorageData.content.data),
+ **StorageData.content.to_dict(),
+ }
+ )
+ new_content2 = ContentWithXor.from_dict(
+ {
+ "byte_xor": byte_xor_hash(StorageData.content2.data),
+ **StorageData.content2.to_dict(),
+ }
+ )
+
+ # Replay to the new table.
+ # In production this would be done with a replayer reading from the journal,
+ # while loaders would still write to the DB.
+ overwriting_swh_storage = get_storage(
+ allow_overwrite=True, **swh_storage_backend_config
+ )
+ overwriting_swh_storage.content_add([new_content, new_content2])
+
+ # Old algos still works, and return the new object type;
+ # but the byte_xor value is None because it is only available in the new
+ # table, which this storage is not yet configured to use
+ assert swh_storage.content_get([StorageData.content.sha1]) == [
+ attr.evolve(new_content, data=None, byte_xor=None)
+ ]
+
+ # When the replayer gets close to the end of the logs, loaders are stopped
+ # to allow the replayer to catch up with the end of the log.
+ # When it does, we can switch over to the new swh-storage's code.
+
+ # Simulates a restart:
+ swh_storage._set_cql_runner()
+
+ # Now, the object can be found with the new hash:
+ assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [
+ attr.evolve(new_content, data=None)
+ ]
+
+ # Remove the old table:
+ swh_storage._cql_runner._session.execute("DROP TABLE content")
+
+ # Object is still available, because we don't use it anymore
+ assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [
+ attr.evolve(new_content, data=None)
+ ]
+
+ # THE END.
+
+ # Test teardown expects a table with this name to exist:
+ swh_storage._cql_runner._session.execute(
+ "CREATE TABLE content (foo blob PRIMARY KEY);"
+ )
+
+ # Clean up this table, test teardown does not know about it:
+ swh_storage._cql_runner._session.execute("DROP TABLE content_v2;")
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:31 PM (5 d, 5 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276520
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment