Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show First 20 Lines • Show All 84 Lines • ▼ Show 20 Lines | |||||
) | ) | ||||
from .schema import HASH_ALGORITHMS | from .schema import HASH_ALGORITHMS | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
class CassandraStorage: | class CassandraStorage: | ||||
def __init__(self, hosts, keyspace, objstorage, port=9042, journal_writer=None): | def __init__( | ||||
self._cql_runner: CqlRunner = CqlRunner(hosts, keyspace, port) | self, | ||||
hosts, | |||||
keyspace, | |||||
objstorage, | |||||
port=9042, | |||||
journal_writer=None, | |||||
allow_overwrite=False, | |||||
): | |||||
""" | |||||
A backend of swh-storage backed by Cassandra | |||||
Args: | |||||
hosts: Seed Cassandra nodes, to start connecting to the cluster | |||||
keyspace: Name of the Cassandra database to use | |||||
objstorage: Passed as argument to :class:`ObjStorage` | |||||
port: Cassandra port | |||||
journal_writer: Passed as argument to :class:`JournalWriter` | |||||
allow_overwrite: Whether ``*_add`` functions will check if an object | |||||
already exists in the database before sending it in an INSERT. | |||||
``False`` is the default as it is more efficient when there is | |||||
a moderately high probability the object is already known, | |||||
but ``True`` can be useful to overwrite existing objects | |||||
(eg. when applying a schema update), | |||||
or when the database is known to be mostly empty. | |||||
Note that a ``False`` value does not guarantee there won't be | |||||
any overwrite. | |||||
""" | |||||
self._hosts = hosts | |||||
self._keyspace = keyspace | |||||
self._port = port | |||||
self._set_cql_runner() | |||||
self.journal_writer: JournalWriter = JournalWriter(journal_writer) | self.journal_writer: JournalWriter = JournalWriter(journal_writer) | ||||
self.objstorage: ObjStorage = ObjStorage(objstorage) | self.objstorage: ObjStorage = ObjStorage(objstorage) | ||||
self._allow_overwrite = allow_overwrite | |||||
def _set_cql_runner(self): | |||||
"""Used by tests when they need to reset the CqlRunner""" | |||||
self._cql_runner: CqlRunner = CqlRunner(self._hosts, self._keyspace, self._port) | |||||
def check_config(self, *, check_write: bool) -> bool: | def check_config(self, *, check_write: bool) -> bool: | ||||
self._cql_runner.check_read() | self._cql_runner.check_read() | ||||
return True | return True | ||||
def _content_get_from_hash(self, algo, hash_) -> Iterable: | def _content_get_from_hash(self, algo, hash_) -> Iterable: | ||||
"""From the name of a hash algorithm and a value of that hash, | """From the name of a hash algorithm and a value of that hash, | ||||
Show All 10 Lines | def _content_get_from_hash(self, algo, hash_) -> Iterable: | ||||
for row in res: | for row in res: | ||||
# re-check the the hash (in case of murmur3 collision) | # re-check the the hash (in case of murmur3 collision) | ||||
if getattr(row, algo) == hash_: | if getattr(row, algo) == hash_: | ||||
yield row | yield row | ||||
def _content_add(self, contents: List[Content], with_data: bool) -> Dict[str, int]: | def _content_add(self, contents: List[Content], with_data: bool) -> Dict[str, int]: | ||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
if not self._allow_overwrite: | |||||
contents = [ | contents = [ | ||||
c for c in contents if not self._cql_runner.content_get_from_pk(c.to_dict()) | c | ||||
for c in contents | |||||
if not self._cql_runner.content_get_from_pk(c.to_dict()) | |||||
] | ] | ||||
if with_data: | if with_data: | ||||
# First insert to the objstorage, if the endpoint is | # First insert to the objstorage, if the endpoint is | ||||
# `content_add` (as opposed to `content_add_metadata`). | # `content_add` (as opposed to `content_add_metadata`). | ||||
# Must add to the objstorage before the DB and journal. Otherwise: | # Must add to the objstorage before the DB and journal. Otherwise: | ||||
# 1. in case of a crash the DB may "believe" we have the content, but | # 1. in case of a crash the DB may "believe" we have the content, but | ||||
# we didn't have time to write to the objstorage before the crash | # we didn't have time to write to the objstorage before the crash | ||||
Show All 12 Lines | def _content_add(self, contents: List[Content], with_data: bool) -> Dict[str, int]: | ||||
# Check for sha1 or sha1_git collisions. This test is not atomic | # Check for sha1 or sha1_git collisions. This test is not atomic | ||||
# with the insertion, so it won't detect a collision if both | # with the insertion, so it won't detect a collision if both | ||||
# contents are inserted at the same time, but it's good enough. | # contents are inserted at the same time, but it's good enough. | ||||
# | # | ||||
# The proper way to do it would probably be a BATCH, but this | # The proper way to do it would probably be a BATCH, but this | ||||
# would be inefficient because of the number of partitions we | # would be inefficient because of the number of partitions we | ||||
# need to affect (len(HASH_ALGORITHMS)+1, which is currently 5) | # need to affect (len(HASH_ALGORITHMS)+1, which is currently 5) | ||||
if not self._allow_overwrite: | |||||
for algo in {"sha1", "sha1_git"}: | for algo in {"sha1", "sha1_git"}: | ||||
collisions = [] | collisions = [] | ||||
# Get tokens of 'content' rows with the same value for | # Get tokens of 'content' rows with the same value for | ||||
# sha1/sha1_git | # sha1/sha1_git | ||||
rows = self._content_get_from_hash(algo, content.get_hash(algo)) | rows = self._content_get_from_hash(algo, content.get_hash(algo)) | ||||
for row in rows: | for row in rows: | ||||
if getattr(row, algo) != content.get_hash(algo): | if getattr(row, algo) != content.get_hash(algo): | ||||
# collision of token(partition key), ignore this | # collision of token(partition key), ignore this | ||||
# row | # row | ||||
continue | continue | ||||
for other_algo in HASH_ALGORITHMS: | for other_algo in HASH_ALGORITHMS: | ||||
if getattr(row, other_algo) != content.get_hash(other_algo): | if getattr(row, other_algo) != content.get_hash(other_algo): | ||||
# This hash didn't match; discard the row. | # This hash didn't match; discard the row. | ||||
collisions.append( | collisions.append( | ||||
{k: getattr(row, k) for k in HASH_ALGORITHMS} | {k: getattr(row, k) for k in HASH_ALGORITHMS} | ||||
) | ) | ||||
if collisions: | if collisions: | ||||
collisions.append(content.hashes()) | collisions.append(content.hashes()) | ||||
raise HashCollision(algo, content.get_hash(algo), collisions) | raise HashCollision(algo, content.get_hash(algo), collisions) | ||||
douardda: why is the whole collision detection block is now run only if check_missing is True? | |||||
Done Inline ActionsBecause, if an object already exists with the exact same hashes and it was not filtered out initially, then self._content_get_from_hash(algo, content.get_hash(algo)) will return it and it would be detected as a collision. I could filter the results of self._content_get_from_hash, but I don't think it's worth it, as _check_missing is pretty niche anyway. vlorentz: Because, if an object already exists with the exact same hashes and it was not filtered out… | |||||
Done Inline ActionsAnd collision detection in Cassandra isn't guaranteed anyway, because two colliding objects may be inserted at the same time and Cassandra is only eventually consistent. vlorentz: And collision detection in Cassandra isn't guaranteed anyway, because two colliding objects may… | |||||
(token, insertion_finalizer) = self._cql_runner.content_add_prepare( | (token, insertion_finalizer) = self._cql_runner.content_add_prepare( | ||||
ContentRow(**remove_keys(content.to_dict(), ("data",))) | ContentRow(**remove_keys(content.to_dict(), ("data",))) | ||||
) | ) | ||||
# Then add to index tables | # Then add to index tables | ||||
for algo in HASH_ALGORITHMS: | for algo in HASH_ALGORITHMS: | ||||
self._cql_runner.content_index_add_one(algo, content, token) | self._cql_runner.content_index_add_one(algo, content, token) | ||||
▲ Show 20 Lines • Show All 136 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def content_get_random(self) -> Sha1Git: | def content_get_random(self) -> Sha1Git: | ||||
content = self._cql_runner.content_get_random() | content = self._cql_runner.content_get_random() | ||||
assert content, "Could not find any content" | assert content, "Could not find any content" | ||||
return content.sha1_git | return content.sha1_git | ||||
def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict[str, int]: | def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict[str, int]: | ||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
if not self._allow_overwrite: | |||||
contents = [ | contents = [ | ||||
c | c | ||||
for c in contents | for c in contents | ||||
if not self._cql_runner.skipped_content_get_from_pk(c.to_dict()) | if not self._cql_runner.skipped_content_get_from_pk(c.to_dict()) | ||||
] | ] | ||||
self.journal_writer.skipped_content_add(contents) | self.journal_writer.skipped_content_add(contents) | ||||
for content in contents: | for content in contents: | ||||
# Compute token of the row in the main table | # Compute token of the row in the main table | ||||
(token, insertion_finalizer) = self._cql_runner.skipped_content_add_prepare( | (token, insertion_finalizer) = self._cql_runner.skipped_content_add_prepare( | ||||
SkippedContentRow.from_dict({"origin": None, **content.to_dict()}) | SkippedContentRow.from_dict({"origin": None, **content.to_dict()}) | ||||
) | ) | ||||
Show All 15 Lines | def skipped_content_missing( | ||||
self, contents: List[Dict[str, Any]] | self, contents: List[Dict[str, Any]] | ||||
) -> Iterable[Dict[str, Any]]: | ) -> Iterable[Dict[str, Any]]: | ||||
for content in contents: | for content in contents: | ||||
if not self._cql_runner.skipped_content_get_from_pk(content): | if not self._cql_runner.skipped_content_get_from_pk(content): | ||||
yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} | yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} | ||||
def directory_add(self, directories: List[Directory]) -> Dict[str, int]: | def directory_add(self, directories: List[Directory]) -> Dict[str, int]: | ||||
to_add = {d.id: d for d in directories}.values() | to_add = {d.id: d for d in directories}.values() | ||||
if not self._allow_overwrite: | |||||
# Filter out directories that are already inserted. | # Filter out directories that are already inserted. | ||||
missing = self.directory_missing([dir_.id for dir_ in to_add]) | missing = self.directory_missing([dir_.id for dir_ in to_add]) | ||||
directories = [dir_ for dir_ in directories if dir_.id in missing] | directories = [dir_ for dir_ in directories if dir_.id in missing] | ||||
self.journal_writer.directory_add(directories) | self.journal_writer.directory_add(directories) | ||||
for directory in directories: | for directory in directories: | ||||
# Add directory entries to the 'directory_entry' table | # Add directory entries to the 'directory_entry' table | ||||
for entry in directory.entries: | for entry in directory.entries: | ||||
self._cql_runner.directory_entry_add_one( | self._cql_runner.directory_entry_add_one( | ||||
DirectoryEntryRow(directory_id=directory.id, **entry.to_dict()) | DirectoryEntryRow(directory_id=directory.id, **entry.to_dict()) | ||||
▲ Show 20 Lines • Show All 106 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def directory_get_random(self) -> Sha1Git: | def directory_get_random(self) -> Sha1Git: | ||||
directory = self._cql_runner.directory_get_random() | directory = self._cql_runner.directory_get_random() | ||||
assert directory, "Could not find any directory" | assert directory, "Could not find any directory" | ||||
return directory.id | return directory.id | ||||
def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: | def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: | ||||
# Filter-out revisions already in the database | # Filter-out revisions already in the database | ||||
if not self._allow_overwrite: | |||||
to_add = {r.id: r for r in revisions}.values() | to_add = {r.id: r for r in revisions}.values() | ||||
missing = self.revision_missing([rev.id for rev in to_add]) | missing = self.revision_missing([rev.id for rev in to_add]) | ||||
revisions = [rev for rev in revisions if rev.id in missing] | revisions = [rev for rev in revisions if rev.id in missing] | ||||
self.journal_writer.revision_add(revisions) | self.journal_writer.revision_add(revisions) | ||||
for revision in revisions: | for revision in revisions: | ||||
revobject = converters.revision_to_db(revision) | revobject = converters.revision_to_db(revision) | ||||
if revobject: | if revobject: | ||||
# Add parents first | # Add parents first | ||||
for (rank, parent) in enumerate(revision.parents): | for (rank, parent) in enumerate(revision.parents): | ||||
self._cql_runner.revision_parent_add_one( | self._cql_runner.revision_parent_add_one( | ||||
▲ Show 20 Lines • Show All 92 Lines • ▼ Show 20 Lines | ) -> Iterable[Optional[Tuple[Sha1Git, Tuple[Sha1Git, ...]]]]: | ||||
yield from self._get_parent_revs(revisions, seen, limit, True) | yield from self._get_parent_revs(revisions, seen, limit, True) | ||||
def revision_get_random(self) -> Sha1Git: | def revision_get_random(self) -> Sha1Git: | ||||
revision = self._cql_runner.revision_get_random() | revision = self._cql_runner.revision_get_random() | ||||
assert revision, "Could not find any revision" | assert revision, "Could not find any revision" | ||||
return revision.id | return revision.id | ||||
def release_add(self, releases: List[Release]) -> Dict[str, int]: | def release_add(self, releases: List[Release]) -> Dict[str, int]: | ||||
if not self._allow_overwrite: | |||||
to_add = {r.id: r for r in releases}.values() | to_add = {r.id: r for r in releases}.values() | ||||
missing = set(self.release_missing([rel.id for rel in to_add])) | missing = set(self.release_missing([rel.id for rel in to_add])) | ||||
releases = [rel for rel in to_add if rel.id in missing] | releases = [rel for rel in to_add if rel.id in missing] | ||||
self.journal_writer.release_add(releases) | self.journal_writer.release_add(releases) | ||||
for release in releases: | for release in releases: | ||||
if release: | if release: | ||||
self._cql_runner.release_add_one(converters.release_to_db(release)) | self._cql_runner.release_add_one(converters.release_to_db(release)) | ||||
return {"release:add": len(releases)} | return {"release:add": len(releases)} | ||||
Show All 10 Lines | def release_get(self, releases: List[Sha1Git]) -> List[Optional[Release]]: | ||||
return [rels.get(rel_id) for rel_id in releases] | return [rels.get(rel_id) for rel_id in releases] | ||||
def release_get_random(self) -> Sha1Git: | def release_get_random(self) -> Sha1Git: | ||||
release = self._cql_runner.release_get_random() | release = self._cql_runner.release_get_random() | ||||
assert release, "Could not find any release" | assert release, "Could not find any release" | ||||
return release.id | return release.id | ||||
def snapshot_add(self, snapshots: List[Snapshot]) -> Dict[str, int]: | def snapshot_add(self, snapshots: List[Snapshot]) -> Dict[str, int]: | ||||
if not self._allow_overwrite: | |||||
to_add = {s.id: s for s in snapshots}.values() | to_add = {s.id: s for s in snapshots}.values() | ||||
missing = self._cql_runner.snapshot_missing([snp.id for snp in to_add]) | missing = self._cql_runner.snapshot_missing([snp.id for snp in to_add]) | ||||
snapshots = [snp for snp in snapshots if snp.id in missing] | snapshots = [snp for snp in snapshots if snp.id in missing] | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
self.journal_writer.snapshot_add([snapshot]) | self.journal_writer.snapshot_add([snapshot]) | ||||
# Add branches | # Add branches | ||||
for (branch_name, branch) in snapshot.branches.items(): | for (branch_name, branch) in snapshot.branches.items(): | ||||
if branch is None: | if branch is None: | ||||
target_type: Optional[str] = None | target_type: Optional[str] = None | ||||
▲ Show 20 Lines • Show All 252 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def origin_count( | def origin_count( | ||||
self, url_pattern: str, regexp: bool = False, with_visit: bool = False | self, url_pattern: str, regexp: bool = False, with_visit: bool = False | ||||
) -> int: | ) -> int: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
"The Cassandra backend does not implement origin_count" | "The Cassandra backend does not implement origin_count" | ||||
) | ) | ||||
def origin_add(self, origins: List[Origin]) -> Dict[str, int]: | def origin_add(self, origins: List[Origin]) -> Dict[str, int]: | ||||
if not self._allow_overwrite: | |||||
to_add = {o.url: o for o in origins}.values() | to_add = {o.url: o for o in origins}.values() | ||||
origins = [ori for ori in to_add if self.origin_get_one(ori.url) is None] | origins = [ori for ori in to_add if self.origin_get_one(ori.url) is None] | ||||
self.journal_writer.origin_add(origins) | self.journal_writer.origin_add(origins) | ||||
for origin in origins: | for origin in origins: | ||||
self._cql_runner.origin_add_one( | self._cql_runner.origin_add_one( | ||||
OriginRow(sha1=hash_url(origin.url), url=origin.url, next_visit_id=1) | OriginRow(sha1=hash_url(origin.url), url=origin.url, next_visit_id=1) | ||||
) | ) | ||||
return {"origin:add": len(origins)} | return {"origin:add": len(origins)} | ||||
▲ Show 20 Lines • Show All 444 Lines • ▼ Show 20 Lines | ) -> Optional[MetadataAuthority]: | ||||
url=authority.url, | url=authority.url, | ||||
metadata=json.loads(authority.metadata), | metadata=json.loads(authority.metadata), | ||||
) | ) | ||||
else: | else: | ||||
return None | return None | ||||
# ExtID tables | # ExtID tables | ||||
def extid_add(self, ids: List[ExtID]) -> Dict[str, int]: | def extid_add(self, ids: List[ExtID]) -> Dict[str, int]: | ||||
if not self._allow_overwrite: | |||||
extids = [ | extids = [ | ||||
extid | extid | ||||
for extid in ids | for extid in ids | ||||
if not self._cql_runner.extid_get_from_pk( | if not self._cql_runner.extid_get_from_pk( | ||||
extid_type=extid.extid_type, extid=extid.extid, target=extid.target, | extid_type=extid.extid_type, extid=extid.extid, target=extid.target, | ||||
) | ) | ||||
] | ] | ||||
else: | |||||
extids = list(ids) | |||||
self.journal_writer.extid_add(extids) | self.journal_writer.extid_add(extids) | ||||
inserted = 0 | inserted = 0 | ||||
for extid in extids: | for extid in extids: | ||||
target_type = extid.target.object_type.value | target_type = extid.target.object_type.value | ||||
target = extid.target.object_id | target = extid.target.object_id | ||||
extidrow = ExtIDRow( | extidrow = ExtIDRow( | ||||
▲ Show 20 Lines • Show All 60 Lines • Show Last 20 Lines |
why is the whole collision detection block is now run only if check_missing is True?