Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show All 16 Lines | from typing import ( | ||||
Dict, | Dict, | ||||
Iterable, | Iterable, | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Sequence, | Sequence, | ||||
Set, | Set, | ||||
Tuple, | Tuple, | ||||
Union, | Union, | ||||
cast, | |||||
) | ) | ||||
import attr | import attr | ||||
from swh.core.api.classes import stream_results | from swh.core.api.classes import stream_results | ||||
from swh.core.api.serializers import msgpack_dumps, msgpack_loads | from swh.core.api.serializers import msgpack_dumps, msgpack_loads | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_hex | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Show All 15 Lines | from swh.model.model import ( | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
) | ) | ||||
from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID | from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID | ||||
from swh.model.swhids import ObjectType as SwhidObjectType | from swh.model.swhids import ObjectType as SwhidObjectType | ||||
from swh.storage.interface import ( | from swh.storage.interface import ( | ||||
VISIT_STATUSES, | VISIT_STATUSES, | ||||
HashDict, | |||||
ListOrder, | ListOrder, | ||||
OriginVisitWithStatuses, | OriginVisitWithStatuses, | ||||
PagedResult, | PagedResult, | ||||
PartialBranches, | PartialBranches, | ||||
Sha1, | Sha1, | ||||
TotalHashDict, | |||||
) | ) | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.utils import map_optional, now | from swh.storage.utils import map_optional, now | ||||
from swh.storage.writer import JournalWriter | from swh.storage.writer import JournalWriter | ||||
from . import converters | from . import converters | ||||
from ..exc import HashCollision, StorageArgumentException | from ..exc import HashCollision, StorageArgumentException | ||||
from ..utils import remove_keys | from ..utils import remove_keys | ||||
▲ Show 20 Lines • Show All 262 Lines • ▼ Show 20 Lines | ) -> List[Optional[Content]]: | ||||
# Get all (sha1, sha1_git, sha256, blake2s256) whose sha1/sha1_git | # Get all (sha1, sha1_git, sha256, blake2s256) whose sha1/sha1_git | ||||
# matches the argument, from the index table ('content_by_*') | # matches the argument, from the index table ('content_by_*') | ||||
row_d = row.to_dict() | row_d = row.to_dict() | ||||
row_d.pop("ctime") | row_d.pop("ctime") | ||||
content = Content(**row_d) | content = Content(**row_d) | ||||
contents_by_hash[key(content)] = content | contents_by_hash[key(content)] = content | ||||
return [contents_by_hash.get(hash_) for hash_ in contents] | return [contents_by_hash.get(hash_) for hash_ in contents] | ||||
def content_find(self, content: Dict[str, Any]) -> List[Content]: | def content_find(self, content: HashDict) -> List[Content]: | ||||
return self._content_find_many([content]) | return self._content_find_many([content]) | ||||
def _content_find_many(self, contents: List[Dict[str, Any]]) -> List[Content]: | def _content_find_many(self, contents: List[HashDict]) -> List[Content]: | ||||
# Find an algorithm that is common to all the requested contents. | # Find an algorithm that is common to all the requested contents. | ||||
# It will be used to do an initial filtering efficiently. | # It will be used to do an initial filtering efficiently. | ||||
# TODO: prioritize sha256, we can do more efficient lookups from this hash. | # TODO: prioritize sha256, we can do more efficient lookups from this hash. | ||||
filter_algos = set(HASH_ALGORITHMS) | filter_algos = set(HASH_ALGORITHMS) | ||||
for content in contents: | for content in contents: | ||||
filter_algos &= set(content) | filter_algos &= set(content) | ||||
if not filter_algos: | if not filter_algos: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"content keys must contain at least one " | "content keys must contain at least one " | ||||
f"of: {', '.join(sorted(HASH_ALGORITHMS))}" | f"of: {', '.join(sorted(HASH_ALGORITHMS))}" | ||||
) | ) | ||||
common_algo = list(filter_algos)[0] | common_algo = list(filter_algos)[0] | ||||
results = [] | results = [] | ||||
rows = self._content_get_from_hashes( | rows = self._content_get_from_hashes( | ||||
common_algo, [content[common_algo] for content in contents] | common_algo, | ||||
[content[common_algo] for content in cast(List[dict], contents)], | |||||
) | ) | ||||
for row in rows: | for row in rows: | ||||
# Re-check all the hashes, in case of collisions (either of the | # Re-check all the hashes, in case of collisions (either of the | ||||
# hash of the partition key, or the hashes in it) | # hash of the partition key, or the hashes in it) | ||||
for content in contents: | for content in contents: | ||||
for algo in HASH_ALGORITHMS: | for algo in HASH_ALGORITHMS: | ||||
if content.get(algo) and getattr(row, algo) != content[algo]: | hash_ = content.get(algo) | ||||
if hash_ and getattr(row, algo) != hash_: | |||||
# This hash didn't match; discard the row. | # This hash didn't match; discard the row. | ||||
break | break | ||||
else: | else: | ||||
# All hashes match, keep this row. | # All hashes match, keep this row. | ||||
row_d = row.to_dict() | row_d = row.to_dict() | ||||
row_d["ctime"] = row.ctime.replace(tzinfo=datetime.timezone.utc) | row_d["ctime"] = row.ctime.replace(tzinfo=datetime.timezone.utc) | ||||
results.append(Content(**row_d)) | results.append(Content(**row_d)) | ||||
break | break | ||||
else: | else: | ||||
# No content matched; skip it | # No content matched; skip it | ||||
pass | pass | ||||
return results | return results | ||||
def content_missing( | def content_missing( | ||||
self, contents: List[Dict[str, Any]], key_hash: str = "sha1" | self, contents: List[HashDict], key_hash: str = "sha1" | ||||
) -> Iterable[bytes]: | ) -> Iterable[bytes]: | ||||
if key_hash not in DEFAULT_ALGORITHMS: | if key_hash not in DEFAULT_ALGORITHMS: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"key_hash should be one of {','.join(DEFAULT_ALGORITHMS)}" | "key_hash should be one of {','.join(DEFAULT_ALGORITHMS)}" | ||||
) | ) | ||||
contents_with_all_hashes = [] | contents_with_all_hashes: List[TotalHashDict] = [] | ||||
contents_with_missing_hashes = [] | contents_with_missing_hashes: List[HashDict] = [] | ||||
for content in contents: | for content in contents: | ||||
if DEFAULT_ALGORITHMS <= set(content): | if DEFAULT_ALGORITHMS <= set(content): | ||||
contents_with_all_hashes.append(content) | contents_with_all_hashes.append(content) | ||||
else: | else: | ||||
contents_with_missing_hashes.append(content) | contents_with_missing_hashes.append(content) | ||||
# These contents can be queried efficiently directly in the main table | # These contents can be queried efficiently directly in the main table | ||||
for content in self._cql_runner.content_missing_from_all_hashes( | for content in self._cql_runner.content_missing_from_all_hashes( | ||||
contents_with_all_hashes | contents_with_all_hashes | ||||
): | ): | ||||
yield content[key_hash] | yield content[key_hash] # type: ignore | ||||
if contents_with_missing_hashes: | if contents_with_missing_hashes: | ||||
# For these, we need the expensive index lookups + main table. | # For these, we need the expensive index lookups + main table. | ||||
# Get all contents in the database that match (at least) one of the | # Get all contents in the database that match (at least) one of the | ||||
# requested contents, concurrently. | # requested contents, concurrently. | ||||
found_contents = self._content_find_many(contents_with_missing_hashes) | found_contents = self._content_find_many(contents_with_missing_hashes) | ||||
Show All 13 Lines | ) -> Iterable[bytes]: | ||||
for missing_content in contents_with_missing_hashes: | for missing_content in contents_with_missing_hashes: | ||||
# Pick any of the algorithms provided in missing_content | # Pick any of the algorithms provided in missing_content | ||||
algo = next(algo for (algo, hash_) in missing_content.items() if hash_) | algo = next(algo for (algo, hash_) in missing_content.items() if hash_) | ||||
# Get the list of found_contents that match this hash in the | # Get the list of found_contents that match this hash in the | ||||
# missing_content. (its length is at most 1, unless there is a | # missing_content. (its length is at most 1, unless there is a | ||||
# collision) | # collision) | ||||
found_contents_with_same_hash = found_contents_by_hash[algo][ | found_contents_with_same_hash = found_contents_by_hash[algo][ | ||||
missing_content[algo] | missing_content[algo] # type: ignore | ||||
] | ] | ||||
# Check if there is a found_content that matches all hashes in the | # Check if there is a found_content that matches all hashes in the | ||||
# missing_content. | # missing_content. | ||||
# This is functionally equivalent to 'for found_content in | # This is functionally equivalent to 'for found_content in | ||||
# found_contents', but runs almost in constant time (it is linear | # found_contents', but runs almost in constant time (it is linear | ||||
# in the number of hash collisions) instead of linear. | # in the number of hash collisions) instead of linear. | ||||
# This allows this function to run in linear time overall instead of | # This allows this function to run in linear time overall instead of | ||||
# quadratic. | # quadratic. | ||||
for found_content in found_contents_with_same_hash: | for found_content in found_contents_with_same_hash: | ||||
# check if the found_content.hashes() dictionary contains a superset | # check if the found_content.hashes() dictionary contains a superset | ||||
# of the (key, value) pairs in missing_content | # of the (key, value) pairs in missing_content | ||||
if missing_content.items() <= found_content.hashes().items(): | if missing_content.items() <= found_content.hashes().items(): | ||||
# Found! | # Found! | ||||
break | break | ||||
else: | else: | ||||
# Not found | # Not found | ||||
yield missing_content[key_hash] | yield missing_content[key_hash] # type: ignore | ||||
def content_missing_per_sha1(self, contents: List[bytes]) -> Iterable[bytes]: | def content_missing_per_sha1(self, contents: List[bytes]) -> Iterable[bytes]: | ||||
return self.content_missing([{"sha1": c} for c in contents]) | return self.content_missing([{"sha1": c} for c in contents]) | ||||
def content_missing_per_sha1_git( | def content_missing_per_sha1_git( | ||||
self, contents: List[Sha1Git] | self, contents: List[Sha1Git] | ||||
) -> Iterable[Sha1Git]: | ) -> Iterable[Sha1Git]: | ||||
return self.content_missing( | return self.content_missing( | ||||
▲ Show 20 Lines • Show All 1,303 Lines • Show Last 20 Lines |