Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show All 21 Lines | from typing import ( | ||||
Tuple, | Tuple, | ||||
Union, | Union, | ||||
) | ) | ||||
import attr | import attr | ||||
from swh.core.api.classes import stream_results | from swh.core.api.classes import stream_results | ||||
from swh.core.api.serializers import msgpack_dumps, msgpack_loads | from swh.core.api.serializers import msgpack_dumps, msgpack_loads | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_hex | ||||
from swh.model.identifiers import CoreSWHID, ExtendedObjectType, ExtendedSWHID | from swh.model.identifiers import CoreSWHID, ExtendedObjectType, ExtendedSWHID | ||||
from swh.model.identifiers import ObjectType as SwhidObjectType | from swh.model.identifiers import ObjectType as SwhidObjectType | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
DirectoryEntry, | DirectoryEntry, | ||||
ExtID, | ExtID, | ||||
MetadataAuthority, | MetadataAuthority, | ||||
Show All 33 Lines | from .model import ( | ||||
DirectoryRow, | DirectoryRow, | ||||
ExtIDByTargetRow, | ExtIDByTargetRow, | ||||
ExtIDRow, | ExtIDRow, | ||||
MetadataAuthorityRow, | MetadataAuthorityRow, | ||||
MetadataFetcherRow, | MetadataFetcherRow, | ||||
OriginRow, | OriginRow, | ||||
OriginVisitRow, | OriginVisitRow, | ||||
OriginVisitStatusRow, | OriginVisitStatusRow, | ||||
RawExtrinsicMetadataByIdRow, | |||||
RawExtrinsicMetadataRow, | RawExtrinsicMetadataRow, | ||||
RevisionParentRow, | RevisionParentRow, | ||||
SkippedContentRow, | SkippedContentRow, | ||||
SnapshotBranchRow, | SnapshotBranchRow, | ||||
SnapshotRow, | SnapshotRow, | ||||
) | ) | ||||
from .schema import HASH_ALGORITHMS | from .schema import HASH_ALGORITHMS | ||||
▲ Show 20 Lines • Show All 1,210 Lines • ▼ Show 20 Lines | ) -> Dict[str, int]: | ||||
origin=metadata_entry.origin, | origin=metadata_entry.origin, | ||||
visit=metadata_entry.visit, | visit=metadata_entry.visit, | ||||
snapshot=map_optional(str, metadata_entry.snapshot), | snapshot=map_optional(str, metadata_entry.snapshot), | ||||
release=map_optional(str, metadata_entry.release), | release=map_optional(str, metadata_entry.release), | ||||
revision=map_optional(str, metadata_entry.revision), | revision=map_optional(str, metadata_entry.revision), | ||||
path=metadata_entry.path, | path=metadata_entry.path, | ||||
directory=map_optional(str, metadata_entry.directory), | directory=map_optional(str, metadata_entry.directory), | ||||
) | ) | ||||
self._cql_runner.raw_extrinsic_metadata_add(row) | |||||
counter[metadata_entry.target.object_type] += 1 | |||||
except TypeError as e: | except TypeError as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
# Add to the index first | |||||
self._cql_runner.raw_extrinsic_metadata_by_id_add( | |||||
RawExtrinsicMetadataByIdRow( | |||||
id=row.id, | |||||
target=row.target, | |||||
authority_type=row.authority_type, | |||||
authority_url=row.authority_url, | |||||
) | |||||
) | |||||
# Then to the main table | |||||
self._cql_runner.raw_extrinsic_metadata_add(row) | |||||
counter[metadata_entry.target.object_type] += 1 | |||||
return { | return { | ||||
f"{type.value}_metadata:add": count for (type, count) in counter.items() | f"{type.value}_metadata:add": count for (type, count) in counter.items() | ||||
} | } | ||||
def raw_extrinsic_metadata_get( | def raw_extrinsic_metadata_get( | ||||
self, | self, | ||||
target: ExtendedSWHID, | target: ExtendedSWHID, | ||||
authority: MetadataAuthority, | authority: MetadataAuthority, | ||||
Show All 19 Lines | ) -> PagedResult[RawExtrinsicMetadata]: | ||||
str(target), authority.type.value, authority.url | str(target), authority.type.value, authority.url | ||||
) | ) | ||||
if limit: | if limit: | ||||
entries = itertools.islice(entries, 0, limit + 1) | entries = itertools.islice(entries, 0, limit + 1) | ||||
results = [] | results = [] | ||||
for entry in entries: | for entry in entries: | ||||
discovery_date = entry.discovery_date.replace(tzinfo=datetime.timezone.utc) | |||||
assert str(target) == entry.target | assert str(target) == entry.target | ||||
result = RawExtrinsicMetadata( | results.append(converters.row_to_raw_extrinsic_metadata(entry)) | ||||
target=target, | |||||
authority=MetadataAuthority( | |||||
type=MetadataAuthorityType(entry.authority_type), | |||||
url=entry.authority_url, | |||||
), | |||||
fetcher=MetadataFetcher( | |||||
name=entry.fetcher_name, version=entry.fetcher_version, | |||||
), | |||||
discovery_date=discovery_date, | |||||
format=entry.format, | |||||
metadata=entry.metadata, | |||||
origin=entry.origin, | |||||
visit=entry.visit, | |||||
snapshot=map_optional(CoreSWHID.from_string, entry.snapshot), | |||||
release=map_optional(CoreSWHID.from_string, entry.release), | |||||
revision=map_optional(CoreSWHID.from_string, entry.revision), | |||||
path=entry.path, | |||||
directory=map_optional(CoreSWHID.from_string, entry.directory), | |||||
) | |||||
results.append(result) | |||||
if len(results) > limit: | if len(results) > limit: | ||||
results.pop() | results.pop() | ||||
assert len(results) == limit | assert len(results) == limit | ||||
last_result = results[-1] | last_result = results[-1] | ||||
next_page_token: Optional[str] = base64.b64encode( | next_page_token: Optional[str] = base64.b64encode( | ||||
msgpack_dumps((last_result.discovery_date, last_result.id,)) | msgpack_dumps((last_result.discovery_date, last_result.id,)) | ||||
).decode() | ).decode() | ||||
else: | else: | ||||
next_page_token = None | next_page_token = None | ||||
return PagedResult(next_page_token=next_page_token, results=results,) | return PagedResult(next_page_token=next_page_token, results=results,) | ||||
def raw_extrinsic_metadata_get_by_ids( | |||||
self, ids: List[Sha1Git] | |||||
) -> List[RawExtrinsicMetadata]: | |||||
keys = self._cql_runner.raw_extrinsic_metadata_get_by_ids(ids) | |||||
results: Set[RawExtrinsicMetadata] = set() | |||||
for key in keys: | |||||
candidates = self._cql_runner.raw_extrinsic_metadata_get( | |||||
key.target, key.authority_type, key.authority_url | |||||
) | |||||
ardumont: one print to remove, one! | |||||
candidates = [ | |||||
candidate for candidate in candidates if candidate.id == key.id | |||||
] | |||||
if len(candidates) > 1: | |||||
raise Exception( | |||||
"Found multiple RawExtrinsicMetadata objects with the same id: " | |||||
+ hash_to_hex(key.id) | |||||
) | |||||
results.update(map(converters.row_to_raw_extrinsic_metadata, candidates)) | |||||
return list(results) | |||||
def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> Dict[str, int]: | def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> Dict[str, int]: | ||||
self.journal_writer.metadata_fetcher_add(fetchers) | self.journal_writer.metadata_fetcher_add(fetchers) | ||||
for fetcher in fetchers: | for fetcher in fetchers: | ||||
self._cql_runner.metadata_fetcher_add( | self._cql_runner.metadata_fetcher_add( | ||||
MetadataFetcherRow(name=fetcher.name, version=fetcher.version,) | MetadataFetcherRow(name=fetcher.name, version=fetcher.version,) | ||||
) | ) | ||||
return {"metadata_fetcher:add": len(fetchers)} | return {"metadata_fetcher:add": len(fetchers)} | ||||
▲ Show 20 Lines • Show All 110 Lines • Show Last 20 Lines |
one print to remove, one!