Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show First 20 Lines • Show All 151 Lines • ▼ Show 20 Lines | def _set_cql_runner(self): | ||||
) | ) | ||||
@timed | @timed | ||||
def check_config(self, *, check_write: bool) -> bool: | def check_config(self, *, check_write: bool) -> bool: | ||||
self._cql_runner.check_read() | self._cql_runner.check_read() | ||||
return True | return True | ||||
def _content_get_from_hash(self, algo, hash_) -> Iterable: | def _content_get_from_hashes(self, algo, hashes: List[bytes]) -> Iterable: | ||||
"""From the name of a hash algorithm and a value of that hash, | """From the name of a hash algorithm and a value of that hash, | ||||
looks up the "hash -> token" secondary table (content_by_{algo}) | looks up the "hash -> token" secondary table (content_by_{algo}) | ||||
to get tokens. | to get tokens. | ||||
Then, looks up the main table (content) to get all contents with | Then, looks up the main table (content) to get all contents with | ||||
that token, and filters out contents whose hash doesn't match.""" | that token, and filters out contents whose hash doesn't match.""" | ||||
found_tokens = self._cql_runner.content_get_tokens_from_single_hash(algo, hash_) | found_tokens = self._cql_runner.content_get_tokens_from_single_algo( | ||||
algo, hashes | |||||
) | |||||
for token in found_tokens: | for token in found_tokens: | ||||
assert isinstance(token, int), found_tokens | assert isinstance(token, int), found_tokens | ||||
# Query the main table ('content'). | # Query the main table ('content'). | ||||
res = self._cql_runner.content_get_from_token(token) | res = self._cql_runner.content_get_from_token(token) | ||||
for row in res: | for row in res: | ||||
# re-check the the hash (in case of murmur3 collision) | # re-check the the hash (in case of murmur3 collision) | ||||
if getattr(row, algo) == hash_: | if getattr(row, algo) in hashes: | ||||
yield row | yield row | ||||
def _content_add(self, contents: List[Content], with_data: bool) -> Dict[str, int]: | def _content_add(self, contents: List[Content], with_data: bool) -> Dict[str, int]: | ||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
if not self._allow_overwrite: | if not self._allow_overwrite: | ||||
contents = [ | contents = [ | ||||
c | c | ||||
for c in contents | for c in contents | ||||
Show All 27 Lines | def _content_add(self, contents: List[Content], with_data: bool) -> Dict[str, int]: | ||||
# The proper way to do it would probably be a BATCH, but this | # The proper way to do it would probably be a BATCH, but this | ||||
# would be inefficient because of the number of partitions we | # would be inefficient because of the number of partitions we | ||||
# need to affect (len(HASH_ALGORITHMS)+1, which is currently 5) | # need to affect (len(HASH_ALGORITHMS)+1, which is currently 5) | ||||
if not self._allow_overwrite: | if not self._allow_overwrite: | ||||
for algo in {"sha1", "sha1_git"}: | for algo in {"sha1", "sha1_git"}: | ||||
collisions = [] | collisions = [] | ||||
# Get tokens of 'content' rows with the same value for | # Get tokens of 'content' rows with the same value for | ||||
# sha1/sha1_git | # sha1/sha1_git | ||||
rows = self._content_get_from_hash(algo, content.get_hash(algo)) | # TODO: batch these requests, instead of sending them one by one | ||||
rows = self._content_get_from_hashes(algo, [content.get_hash(algo)]) | |||||
for row in rows: | for row in rows: | ||||
if getattr(row, algo) != content.get_hash(algo): | if getattr(row, algo) != content.get_hash(algo): | ||||
# collision of token(partition key), ignore this | # collision of token(partition key), ignore this | ||||
# row | # row | ||||
continue | continue | ||||
for other_algo in HASH_ALGORITHMS: | for other_algo in HASH_ALGORITHMS: | ||||
if getattr(row, other_algo) != content.get_hash(other_algo): | if getattr(row, other_algo) != content.get_hash(other_algo): | ||||
▲ Show 20 Lines • Show All 101 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
) -> List[Optional[Content]]: | ) -> List[Optional[Content]]: | ||||
if algo not in DEFAULT_ALGORITHMS: | if algo not in DEFAULT_ALGORITHMS: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"algo should be one of {','.join(DEFAULT_ALGORITHMS)}" | "algo should be one of {','.join(DEFAULT_ALGORITHMS)}" | ||||
) | ) | ||||
key = operator.attrgetter(algo) | key = operator.attrgetter(algo) | ||||
contents_by_hash: Dict[Sha1, Optional[Content]] = {} | contents_by_hash: Dict[Sha1, Optional[Content]] = {} | ||||
for hash_ in contents: | for row in self._content_get_from_hashes(algo, contents): | ||||
# Get all (sha1, sha1_git, sha256, blake2s256) whose sha1/sha1_git | # Get all (sha1, sha1_git, sha256, blake2s256) whose sha1/sha1_git | ||||
# matches the argument, from the index table ('content_by_*') | # matches the argument, from the index table ('content_by_*') | ||||
for row in self._content_get_from_hash(algo, hash_): | |||||
row_d = row.to_dict() | row_d = row.to_dict() | ||||
row_d.pop("ctime") | row_d.pop("ctime") | ||||
content = Content(**row_d) | content = Content(**row_d) | ||||
contents_by_hash[key(content)] = content | contents_by_hash[key(content)] = content | ||||
return [contents_by_hash.get(hash_) for hash_ in contents] | return [contents_by_hash.get(hash_) for hash_ in contents] | ||||
@timed | @timed | ||||
def content_find(self, content: Dict[str, Any]) -> List[Content]: | def content_find(self, content: Dict[str, Any]) -> List[Content]: | ||||
return self._content_find_many([content]) | |||||
def _content_find_many(self, contents: List[Dict[str, Any]]) -> List[Content]: | |||||
# Find an algorithm that is common to all the requested contents. | # Find an algorithm that is common to all the requested contents. | ||||
# It will be used to do an initial filtering efficiently. | # It will be used to do an initial filtering efficiently. | ||||
filter_algos = list(set(content).intersection(HASH_ALGORITHMS)) | filter_algos = set(HASH_ALGORITHMS) | ||||
for content in contents: | |||||
filter_algos &= set(content) | |||||
if not filter_algos: | if not filter_algos: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"content keys must contain at least one " | "content keys must contain at least one " | ||||
f"of: {', '.join(sorted(HASH_ALGORITHMS))}" | f"of: {', '.join(sorted(HASH_ALGORITHMS))}" | ||||
) | ) | ||||
common_algo = filter_algos[0] | common_algo = list(filter_algos)[0] | ||||
results = [] | results = [] | ||||
rows = self._content_get_from_hash(common_algo, content[common_algo]) | rows = self._content_get_from_hashes( | ||||
common_algo, [content[common_algo] for content in contents] | |||||
) | |||||
for row in rows: | for row in rows: | ||||
# Re-check all the hashes, in case of collisions (either of the | # Re-check all the hashes, in case of collisions (either of the | ||||
# hash of the partition key, or the hashes in it) | # hash of the partition key, or the hashes in it) | ||||
for content in contents: | |||||
for algo in HASH_ALGORITHMS: | for algo in HASH_ALGORITHMS: | ||||
if content.get(algo) and getattr(row, algo) != content[algo]: | if content.get(algo) and getattr(row, algo) != content[algo]: | ||||
# This hash didn't match; discard the row. | # This hash didn't match; discard the row. | ||||
break | break | ||||
else: | else: | ||||
# All hashes match, keep this row. | # All hashes match, keep this row. | ||||
row_d = row.to_dict() | row_d = row.to_dict() | ||||
row_d["ctime"] = row.ctime.replace(tzinfo=datetime.timezone.utc) | row_d["ctime"] = row.ctime.replace(tzinfo=datetime.timezone.utc) | ||||
results.append(Content(**row_d)) | results.append(Content(**row_d)) | ||||
break | |||||
else: | |||||
# No content matched; skip it | |||||
pass | |||||
return results | return results | ||||
@timed | @timed | ||||
def content_missing( | def content_missing( | ||||
self, contents: List[Dict[str, Any]], key_hash: str = "sha1" | self, contents: List[Dict[str, Any]], key_hash: str = "sha1" | ||||
) -> Iterable[bytes]: | ) -> Iterable[bytes]: | ||||
if key_hash not in DEFAULT_ALGORITHMS: | if key_hash not in DEFAULT_ALGORITHMS: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
▲ Show 20 Lines • Show All 1,282 Lines • Show Last 20 Lines |