Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show First 20 Lines • Show All 1,015 Lines • ▼ Show 20 Lines | def stat_counters(self): | ||||
return stats | return stats | ||||
def refresh_stat_counters(self): | def refresh_stat_counters(self): | ||||
pass | pass | ||||
def raw_extrinsic_metadata_add( | def raw_extrinsic_metadata_add( | ||||
self, metadata: Iterable[RawExtrinsicMetadata], | self, metadata: Iterable[RawExtrinsicMetadata], | ||||
) -> None: | ) -> None: | ||||
metadata = list(metadata) | |||||
self.journal_writer.raw_extrinsic_metadata_add(metadata) | |||||
for metadata_entry in metadata: | for metadata_entry in metadata: | ||||
authority_key = self._metadata_authority_key(metadata_entry.authority) | authority_key = self._metadata_authority_key(metadata_entry.authority) | ||||
if authority_key not in self._metadata_authorities: | if authority_key not in self._metadata_authorities: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
f"Unknown authority {metadata_entry.authority}" | f"Unknown authority {metadata_entry.authority}" | ||||
) | ) | ||||
fetcher_key = self._metadata_fetcher_key(metadata_entry.fetcher) | fetcher_key = self._metadata_fetcher_key(metadata_entry.fetcher) | ||||
if fetcher_key not in self._metadata_fetchers: | if fetcher_key not in self._metadata_fetchers: | ||||
▲ Show 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | ) -> Dict[str, Union[Optional[bytes], List[RawExtrinsicMetadata]]]: | ||||
next_page_token = None | next_page_token = None | ||||
return { | return { | ||||
"next_page_token": next_page_token, | "next_page_token": next_page_token, | ||||
"results": results, | "results": results, | ||||
} | } | ||||
def metadata_fetcher_add(self, fetchers: Iterable[MetadataFetcher]) -> None: | def metadata_fetcher_add(self, fetchers: Iterable[MetadataFetcher]) -> None: | ||||
fetchers = list(fetchers) | |||||
self.journal_writer.metadata_fetcher_add(fetchers) | |||||
for fetcher in fetchers: | for fetcher in fetchers: | ||||
if fetcher.metadata is None: | if fetcher.metadata is None: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"MetadataFetcher.metadata may not be None in metadata_fetcher_add." | "MetadataFetcher.metadata may not be None in metadata_fetcher_add." | ||||
) | ) | ||||
key = self._metadata_fetcher_key(fetcher) | key = self._metadata_fetcher_key(fetcher) | ||||
if key not in self._metadata_fetchers: | if key not in self._metadata_fetchers: | ||||
self._metadata_fetchers[key] = fetcher | self._metadata_fetchers[key] = fetcher | ||||
def metadata_fetcher_get( | def metadata_fetcher_get( | ||||
self, name: str, version: str | self, name: str, version: str | ||||
) -> Optional[MetadataFetcher]: | ) -> Optional[MetadataFetcher]: | ||||
return self._metadata_fetchers.get( | return self._metadata_fetchers.get( | ||||
self._metadata_fetcher_key(MetadataFetcher(name=name, version=version)) | self._metadata_fetcher_key(MetadataFetcher(name=name, version=version)) | ||||
) | ) | ||||
def metadata_authority_add(self, authorities: Iterable[MetadataAuthority]) -> None: | def metadata_authority_add(self, authorities: Iterable[MetadataAuthority]) -> None: | ||||
authorities = list(authorities) | |||||
self.journal_writer.metadata_authority_add(authorities) | |||||
for authority in authorities: | for authority in authorities: | ||||
if authority.metadata is None: | if authority.metadata is None: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"MetadataAuthority.metadata may not be None in " | "MetadataAuthority.metadata may not be None in " | ||||
"metadata_authority_add." | "metadata_authority_add." | ||||
) | ) | ||||
key = self._metadata_authority_key(authority) | key = self._metadata_authority_key(authority) | ||||
self._metadata_authorities[key] = authority | self._metadata_authorities[key] = authority | ||||
▲ Show 20 Lines • Show All 54 Lines • Show Last 20 Lines |