Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show All 20 Lines | |||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.writer import JournalWriter | from swh.storage.writer import JournalWriter | ||||
from .. import HashCollision | from .. import HashCollision | ||||
from ..exc import StorageArgumentException | from ..exc import StorageArgumentException | ||||
from .common import TOKEN_BEGIN, TOKEN_END | from .common import TOKEN_BEGIN, TOKEN_END | ||||
from .converters import ( | from .converters import ( | ||||
revision_to_db, revision_from_db, release_to_db, release_from_db, | revision_to_db, revision_from_db, release_to_db, release_from_db, | ||||
row_to_content_hashes, | |||||
) | ) | ||||
from .cql import CqlRunner | from .cql import CqlRunner | ||||
from .schema import HASH_ALGORITHMS | from .schema import HASH_ALGORITHMS | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
Show All 9 Lines | def __init__(self, hosts, keyspace, objstorage, | ||||
self.journal_writer = JournalWriter(journal_writer) | self.journal_writer = JournalWriter(journal_writer) | ||||
self.objstorage = ObjStorage(objstorage) | self.objstorage = ObjStorage(objstorage) | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
self._cql_runner.check_read() | self._cql_runner.check_read() | ||||
return True | return True | ||||
def _content_get_from_hash(self, algo, hash_) -> Iterable: | |||||
"""From the name of a hash algorithm and a value of that hash, | |||||
looks up the "hash -> token" secondary table (content_by_{algo}) | |||||
to get tokens. | |||||
Then, looks up the main table (content) to get all contents with | |||||
that token, and filters out contents whose hash doesn't match.""" | |||||
found_tokens = self._cql_runner.content_get_tokens_from_single_hash( | |||||
algo, hash_) | |||||
for token in found_tokens: | |||||
# Query the main table ('content'). | |||||
res = self._cql_runner.content_get_from_token(token) | |||||
for row in res: | |||||
# re-check the the hash (in case of murmur3 collision) | |||||
if getattr(row, algo) == hash_: | |||||
yield row | |||||
def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | ||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
contents = [c for c in contents | contents = [c for c in contents | ||||
if not self._cql_runner.content_get_from_pk(c.to_dict())] | if not self._cql_runner.content_get_from_pk(c.to_dict())] | ||||
self.journal_writer.content_add(contents) | self.journal_writer.content_add(contents) | ||||
if with_data: | if with_data: | ||||
# First insert to the objstorage, if the endpoint is | # First insert to the objstorage, if the endpoint is | ||||
# `content_add` (as opposed to `content_add_metadata`). | # `content_add` (as opposed to `content_add_metadata`). | ||||
# TODO: this should probably be done in concurrently to inserting | # TODO: this should probably be done in concurrently to inserting | ||||
# in index tables (but still before the main table; so an entry is | # in index tables (but still before the main table; so an entry is | ||||
# only added to the main table after everything else was | # only added to the main table after everything else was | ||||
# successfully inserted. | # successfully inserted. | ||||
summary = self.objstorage.content_add( | summary = self.objstorage.content_add( | ||||
c for c in contents if c.status != 'absent') | c for c in contents if c.status != 'absent') | ||||
content_add_bytes = summary['content:add:bytes'] | content_add_bytes = summary['content:add:bytes'] | ||||
content_add = 0 | content_add = 0 | ||||
for content in contents: | for content in contents: | ||||
content_add += 1 | content_add += 1 | ||||
# Then add to index tables | # Check for sha1 or sha1_git collisions. This test is not atomic | ||||
for algo in HASH_ALGORITHMS: | # with the insertion, so it won't detect a collision if both | ||||
self._cql_runner.content_index_add_one(algo, content) | # contents are inserted at the same time, but it's good enough. | ||||
# Then to the main table | |||||
self._cql_runner.content_add_one(content) | |||||
# Note that we check for collisions *after* inserting. This | |||||
# differs significantly from the pgsql storage, but checking | |||||
# before insertion does not provide any guarantee in case | |||||
# another thread inserts the colliding hash at the same time. | |||||
# | # | ||||
# The proper way to do it would probably be a BATCH, but this | # The proper way to do it would probably be a BATCH, but this | ||||
# would be inefficient because of the number of partitions we | # would be inefficient because of the number of partitions we | ||||
# need to affect (len(HASH_ALGORITHMS)+1, which is currently 5) | # need to affect (len(HASH_ALGORITHMS)+1, which is currently 5) | ||||
for algo in {'sha1', 'sha1_git'}: | for algo in {'sha1', 'sha1_git'}: | ||||
pks = self._cql_runner.content_get_pks_from_single_hash( | collisions = [] | ||||
# Get tokens of 'content' rows with the same value for | |||||
# sha1/sha1_git | |||||
rows = self._content_get_from_hash( | |||||
algo, content.get_hash(algo)) | algo, content.get_hash(algo)) | ||||
if len(pks) > 1: | for row in rows: | ||||
# There are more than the one we just inserted. | if getattr(row, algo) != content.get_hash(algo): | ||||
colliding_content_hashes = [ | # collision of token(partition key), ignore this | ||||
row_to_content_hashes(pk) for pk in pks | # row | ||||
ardumont: Drop row_to_content_hashes if it's no longer used ;) | |||||
] | continue | ||||
for algo in HASH_ALGORITHMS: | |||||
if getattr(row, algo) != content.get_hash(algo): | |||||
# This hash didn't match; discard the row. | |||||
collisions.append({ | |||||
algo: getattr(row, algo) | |||||
for algo in HASH_ALGORITHMS}) | |||||
if collisions: | |||||
collisions.append(content.hashes()) | |||||
raise HashCollision( | raise HashCollision( | ||||
algo, content.get_hash(algo), colliding_content_hashes) | algo, content.get_hash(algo), collisions) | ||||
(token, insertion_finalizer) = \ | |||||
self._cql_runner.content_add_prepare(content) | |||||
# Then add to index tables | |||||
for algo in HASH_ALGORITHMS: | |||||
self._cql_runner.content_index_add_one(algo, content, token) | |||||
# Then to the main table | |||||
insertion_finalizer() | |||||
summary = { | summary = { | ||||
'content:add': content_add, | 'content:add': content_add, | ||||
} | } | ||||
if with_data: | if with_data: | ||||
summary['content:add:bytes'] = content_add_bytes | summary['content:add:bytes'] = content_add_bytes | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | def content_get_partition( | ||||
} | } | ||||
def content_get_metadata( | def content_get_metadata( | ||||
self, contents: List[bytes]) -> Dict[bytes, List[Dict]]: | self, contents: List[bytes]) -> Dict[bytes, List[Dict]]: | ||||
result: Dict[bytes, List[Dict]] = {sha1: [] for sha1 in contents} | result: Dict[bytes, List[Dict]] = {sha1: [] for sha1 in contents} | ||||
for sha1 in contents: | for sha1 in contents: | ||||
# Get all (sha1, sha1_git, sha256, blake2s256) whose sha1 | # Get all (sha1, sha1_git, sha256, blake2s256) whose sha1 | ||||
# matches the argument, from the index table ('content_by_sha1') | # matches the argument, from the index table ('content_by_sha1') | ||||
pks = self._cql_runner.content_get_pks_from_single_hash( | for row in self._content_get_from_hash('sha1', sha1): | ||||
'sha1', sha1) | content_metadata = row._asdict() | ||||
if pks: | |||||
# TODO: what to do if there are more than one? | |||||
pk = pks[0] | |||||
# Query the main table ('content') | |||||
res = self._cql_runner.content_get_from_pk(pk._asdict()) | |||||
# Rows in 'content' are inserted after corresponding | |||||
# rows in 'content_by_*', so we might be missing it | |||||
if res: | |||||
content_metadata = res._asdict() | |||||
content_metadata.pop('ctime') | content_metadata.pop('ctime') | ||||
result[content_metadata['sha1']].append(content_metadata) | result[content_metadata['sha1']].append(content_metadata) | ||||
return result | return result | ||||
def content_find(self, content): | def content_find(self, content): | ||||
# Find an algorithm that is common to all the requested contents. | # Find an algorithm that is common to all the requested contents. | ||||
# It will be used to do an initial filtering efficiently. | # It will be used to do an initial filtering efficiently. | ||||
filter_algos = list(set(content).intersection(HASH_ALGORITHMS)) | filter_algos = list(set(content).intersection(HASH_ALGORITHMS)) | ||||
if not filter_algos: | if not filter_algos: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
'content keys must contain at least one of: ' | 'content keys must contain at least one of: ' | ||||
'%s' % ', '.join(sorted(HASH_ALGORITHMS))) | '%s' % ', '.join(sorted(HASH_ALGORITHMS))) | ||||
common_algo = filter_algos[0] | common_algo = filter_algos[0] | ||||
# Find all contents whose common_algo matches at least one | |||||
# of the requests. | |||||
found_pks = self._cql_runner.content_get_pks_from_single_hash( | |||||
common_algo, content[common_algo]) | |||||
found_pks = [pk._asdict() for pk in found_pks] | |||||
# Filter with the other hash algorithms. | |||||
for algo in filter_algos[1:]: | |||||
found_pks = [pk for pk in found_pks if pk[algo] == content[algo]] | |||||
results = [] | results = [] | ||||
for pk in found_pks: | rows = self._content_get_from_hash( | ||||
# Query the main table ('content'). | common_algo, content[common_algo]) | ||||
res = self._cql_runner.content_get_from_pk(pk) | for row in rows: | ||||
# Re-check all the hashes, in case of collisions (either of the | |||||
# Rows in 'content' are inserted after corresponding | # hash of the partition key, or the hashes in it) | ||||
# rows in 'content_by_*', so we might be missing it | for algo in HASH_ALGORITHMS: | ||||
if res: | if content.get(algo) and getattr(row, algo) != content[algo]: | ||||
# This hash didn't match; discard the row. | |||||
break | |||||
else: | |||||
# All hashes match, keep this row. | |||||
results.append({ | results.append({ | ||||
**res._asdict(), | **row._asdict(), | ||||
'ctime': res.ctime.replace(tzinfo=datetime.timezone.utc) | 'ctime': row.ctime.replace(tzinfo=datetime.timezone.utc) | ||||
}) | }) | ||||
return results | return results | ||||
def content_missing(self, content, key_hash='sha1'): | def content_missing(self, content, key_hash='sha1'): | ||||
for cont in content: | for cont in content: | ||||
res = self.content_find(cont) | res = self.content_find(cont) | ||||
if not res: | if not res: | ||||
yield cont[key_hash] | yield cont[key_hash] | ||||
▲ Show 20 Lines • Show All 721 Lines • Show Last 20 Lines |
Drop row_to_content_hashes if it's no longer used ;)