Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show First 20 Lines • Show All 245 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def content_missing_per_sha1_git(self, contents): | def content_missing_per_sha1_git(self, contents): | ||||
return self.content_missing([{'sha1_git': c for c in contents}], | return self.content_missing([{'sha1_git': c for c in contents}], | ||||
key_hash='sha1_git') | key_hash='sha1_git') | ||||
def content_get_random(self): | def content_get_random(self): | ||||
return self._cql_runner.content_get_random().sha1_git | return self._cql_runner.content_get_random().sha1_git | ||||
def _skipped_content_get_from_hash(self, algo, hash_) -> Iterable: | |||||
"""From the name of a hash algorithm and a value of that hash, | |||||
looks up the "hash -> token" secondary table | |||||
(skipped_content_by_{algo}) to get tokens. | |||||
Then, looks up the main table (content) to get all contents with | |||||
that token, and filters out contents whose hash doesn't match.""" | |||||
found_tokens = \ | |||||
self._cql_runner.skipped_content_get_tokens_from_single_hash( | |||||
algo, hash_) | |||||
for token in found_tokens: | |||||
# Query the main table ('content'). | |||||
res = self._cql_runner.skipped_content_get_from_token(token) | |||||
for row in res: | |||||
# re-check the the hash (in case of murmur3 collision) | |||||
if getattr(row, algo) == hash_: | |||||
yield row | |||||
def _skipped_content_add(self, contents: Iterable[SkippedContent]) -> Dict: | def _skipped_content_add(self, contents: Iterable[SkippedContent]) -> Dict: | ||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
contents = [ | contents = [ | ||||
c for c in contents | c for c in contents | ||||
if not self._cql_runner.skipped_content_get_from_pk(c.to_dict())] | if not self._cql_runner.skipped_content_get_from_pk(c.to_dict())] | ||||
self.journal_writer.skipped_content_add(contents) | self.journal_writer.skipped_content_add(contents) | ||||
for content in contents: | for content in contents: | ||||
# Add to index tables | # Compute token of the row in the main table | ||||
(token, insertion_finalizer) = \ | |||||
self._cql_runner.skipped_content_add_prepare(content) | |||||
# Then add to index tables | |||||
for algo in HASH_ALGORITHMS: | for algo in HASH_ALGORITHMS: | ||||
if content.get_hash(algo) is not None: | |||||
self._cql_runner.skipped_content_index_add_one( | self._cql_runner.skipped_content_index_add_one( | ||||
algo, content) | algo, content, token) | ||||
# Then to the main table | # Then to the main table | ||||
self._cql_runner.skipped_content_add_one(content) | insertion_finalizer() | ||||
return { | return { | ||||
'skipped_content:add': len(contents) | 'skipped_content:add': len(contents) | ||||
} | } | ||||
def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: | def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
contents = [attr.evolve(c, ctime=now) for c in content] | contents = [attr.evolve(c, ctime=now) for c in content] | ||||
▲ Show 20 Lines • Show All 683 Lines • Show Last 20 Lines |