Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show All 21 Lines | from typing import ( | ||||
Union, | Union, | ||||
) | ) | ||||
import attr | import attr | ||||
from swh.core.api.classes import stream_results | from swh.core.api.classes import stream_results | ||||
from swh.core.api.serializers import msgpack_dumps, msgpack_loads | from swh.core.api.serializers import msgpack_dumps, msgpack_loads | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | from swh.model.hashutil import DEFAULT_ALGORITHMS | ||||
from swh.model.identifiers import CoreSWHID, ExtendedSWHID | from swh.model.identifiers import CoreSWHID, ExtendedSWHID, ObjectType | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
DirectoryEntry, | DirectoryEntry, | ||||
ExtID, | |||||
MetadataAuthority, | MetadataAuthority, | ||||
MetadataAuthorityType, | MetadataAuthorityType, | ||||
MetadataFetcher, | MetadataFetcher, | ||||
Origin, | Origin, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Release, | Release, | ||||
Show All 19 Lines | |||||
from ..exc import HashCollision, StorageArgumentException | from ..exc import HashCollision, StorageArgumentException | ||||
from ..utils import remove_keys | from ..utils import remove_keys | ||||
from .common import TOKEN_BEGIN, TOKEN_END, hash_url | from .common import TOKEN_BEGIN, TOKEN_END, hash_url | ||||
from .cql import CqlRunner | from .cql import CqlRunner | ||||
from .model import ( | from .model import ( | ||||
ContentRow, | ContentRow, | ||||
DirectoryEntryRow, | DirectoryEntryRow, | ||||
DirectoryRow, | DirectoryRow, | ||||
ExtIDRow, | |||||
MetadataAuthorityRow, | MetadataAuthorityRow, | ||||
MetadataFetcherRow, | MetadataFetcherRow, | ||||
OriginRow, | OriginRow, | ||||
OriginVisitRow, | OriginVisitRow, | ||||
OriginVisitStatusRow, | OriginVisitStatusRow, | ||||
RawExtrinsicMetadataRow, | RawExtrinsicMetadataRow, | ||||
RevisionParentRow, | RevisionParentRow, | ||||
SkippedContentRow, | SkippedContentRow, | ||||
▲ Show 20 Lines • Show All 1,240 Lines • ▼ Show 20 Lines | ) -> Optional[MetadataAuthority]: | ||||
return MetadataAuthority( | return MetadataAuthority( | ||||
type=MetadataAuthorityType(authority.type), | type=MetadataAuthorityType(authority.type), | ||||
url=authority.url, | url=authority.url, | ||||
metadata=json.loads(authority.metadata), | metadata=json.loads(authority.metadata), | ||||
) | ) | ||||
else: | else: | ||||
return None | return None | ||||
# ExtID tables | |||||
def _extid_get_from_extid( | |||||
self, extid_type: str, extid: bytes | |||||
) -> Iterable[ExtIDRow]: | |||||
"""XXX """ | |||||
yield from self._extid_get_from_tokens( | |||||
self._cql_runner.extid_get_tokens_from_extid( | |||||
extid_type=extid_type, extid=extid | |||||
) | |||||
) | |||||
def _extid_get_from_target( | |||||
self, target_type: str, target: bytes | |||||
) -> Iterable[ExtIDRow]: | |||||
"""XXX """ | |||||
yield from self._extid_get_from_tokens( | |||||
self._cql_runner.extid_get_tokens_from_target( | |||||
target_type=target_type, target=target | |||||
) | |||||
) | |||||
def _extid_get_from_tokens( | |||||
self, tokens: Iterable[Optional[int]] | |||||
) -> Iterable[ExtIDRow]: | |||||
for token in tokens: | |||||
# Query the main table ('content'). | |||||
if token is not None: | |||||
yield from self._cql_runner.extid_get_from_token(token=token) | |||||
def extid_add(self, ids: List[ExtID]) -> Dict[str, int]: | |||||
extids = [ | |||||
extid | |||||
for extid in ids | |||||
if not self._cql_runner.extid_get_from_pk( | |||||
extid_type=extid.extid_type, extid=extid.extid, target=extid.target, | |||||
) | |||||
] | |||||
self.journal_writer.extid_add(extids) | |||||
inserted = 0 | |||||
for extid in extids: | |||||
extidrow = ExtIDRow( | |||||
extid_type=extid.extid_type, | |||||
extid=extid.extid, | |||||
target_type=extid.target.object_type.value, | |||||
target=extid.target.object_id, | |||||
) | |||||
(token, insertion_finalizer) = self._cql_runner.extid_add_prepare(extidrow) | |||||
if ( | |||||
self.extid_get_from_extid(extid.extid_type, [extid.extid])[0] | |||||
or self.extid_get_from_target( | |||||
extid.target.object_type, [extid.target.object_id] | |||||
)[0] | |||||
): | |||||
# on conflict do nothing... | |||||
continue | |||||
self._cql_runner.extid_extid_index_add_one(extidrow, token) | |||||
self._cql_runner.extid_target_index_add_one(extidrow, token) | |||||
insertion_finalizer() | |||||
inserted += 1 | |||||
return {"extid:add": inserted} | |||||
vlorentz: you must re-filter the rows here, because `extid_get_from_target` will return unrelated rows in… | |||||
def extid_get_from_extid( | |||||
self, id_type: str, ids: List[bytes] | |||||
) -> List[Optional[ExtID]]: | |||||
result: List[Optional[ExtID]] = [] | |||||
for extid in ids: | |||||
extidrows = list(self._extid_get_from_extid(id_type, extid)) | |||||
assert len(extidrows) <= 1 | |||||
if extidrows: | |||||
result.append( | |||||
ExtID( | |||||
extid_type=extidrows[0].extid_type, | |||||
extid=extidrows[0].extid, | |||||
target=CoreSWHID( | |||||
object_type=extidrows[0].target_type, | |||||
object_id=extidrows[0].target, | |||||
), | |||||
) | |||||
) | |||||
else: | |||||
result.append(None) | |||||
return result | |||||
def extid_get_from_target( | |||||
self, target_type: ObjectType, ids: List[Sha1Git] | |||||
) -> List[Optional[ExtID]]: | |||||
result: List[Optional[ExtID]] = [] | |||||
for target in ids: | |||||
extidrows = list(self._extid_get_from_target(target_type.value, target)) | |||||
assert len(extidrows) <= 1 | |||||
if extidrows: | |||||
result.append( | |||||
ExtID( | |||||
extid_type=extidrows[0].extid_type, | |||||
extid=extidrows[0].extid, | |||||
target=CoreSWHID( | |||||
object_type=extidrows[0].target_type, | |||||
object_id=extidrows[0].target, | |||||
), | |||||
) | |||||
) | |||||
else: | |||||
result.append(None) | |||||
return result | |||||
# Misc | |||||
def clear_buffers(self, object_types: Sequence[str] = ()) -> None: | def clear_buffers(self, object_types: Sequence[str] = ()) -> None: | ||||
"""Do nothing | """Do nothing | ||||
""" | """ | ||||
return None | return None | ||||
def flush(self, object_types: Sequence[str] = ()) -> Dict[str, int]: | def flush(self, object_types: Sequence[str] = ()) -> Dict[str, int]: | ||||
return {} | return {} |
you must re-filter the rows here, because extid_get_from_target will return unrelated rows in case of murmur3 collisions (which are likely to happen, the hash space is 32 bits). See _content_get_from_hash.
See also the *_murmur3_collision tests to test it.