diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -3,7 +3,6 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import base64 import bisect import collections import datetime @@ -17,7 +16,6 @@ Callable, Dict, Generic, - Hashable, Iterable, Iterator, List, @@ -28,18 +26,9 @@ Union, ) -import attr - -from swh.core.api.serializers import msgpack_loads, msgpack_dumps -from swh.model.identifiers import SWHID from swh.model.model import ( Content, SkippedContent, - MetadataAuthority, - MetadataAuthorityType, - MetadataFetcher, - MetadataTargetType, - RawExtrinsicMetadata, Sha1Git, ) @@ -49,10 +38,13 @@ ContentRow, DirectoryRow, DirectoryEntryRow, + MetadataAuthorityRow, + MetadataFetcherRow, ObjectCountRow, OriginRow, OriginVisitRow, OriginVisitStatusRow, + RawExtrinsicMetadataRow, ReleaseRow, RevisionRow, RevisionParentRow, @@ -60,14 +52,10 @@ SnapshotRow, SnapshotBranchRow, ) -from swh.storage.interface import ( - ListOrder, - PagedResult, -) +from swh.storage.interface import ListOrder from swh.storage.objstorage import ObjStorage from .converters import origin_url_to_sha1 -from .exc import StorageArgumentException from .writer import JournalWriter # Max block size of contents to return @@ -243,6 +231,9 @@ self._origins = Table(OriginRow) self._origin_visits = Table(OriginVisitRow) self._origin_visit_statuses = Table(OriginVisitStatusRow) + self._metadata_authorities = Table(MetadataAuthorityRow) + self._metadata_fetchers = Table(MetadataFetcherRow) + self._raw_extrinsic_metadata = Table(RawExtrinsicMetadataRow) self._stat_counters = defaultdict(int) def increment_counter(self, object_type: str, nb: int): @@ -617,6 +608,75 @@ statuses.sort(key=lambda s: s.date, reverse=True) return iter(statuses) + ########################## + # 'metadata_authority' table + ########################## + + def metadata_authority_add(self, authority: MetadataAuthorityRow): + self._metadata_authorities.insert(authority) + self.increment_counter("metadata_authority", 1) + + def metadata_authority_get(self, type, url) -> Optional[MetadataAuthorityRow]: + return self._metadata_authorities.get_from_primary_key((url, type)) + + ########################## + # 'metadata_fetcher' table + ########################## + + def metadata_fetcher_add(self, fetcher: MetadataFetcherRow): + self._metadata_fetchers.insert(fetcher) + self.increment_counter("metadata_fetcher", 1) + + def metadata_fetcher_get(self, name, version) -> Optional[MetadataAuthorityRow]: + return self._metadata_fetchers.get_from_primary_key((name, version)) + + ######################### + # 'raw_extrinsic_metadata' table + ######################### + + def raw_extrinsic_metadata_add(self, raw_extrinsic_metadata): + self._raw_extrinsic_metadata.insert(raw_extrinsic_metadata) + self.increment_counter("raw_extrinsic_metadata", 1) + + def raw_extrinsic_metadata_get_after_date( + self, + id: str, + authority_type: str, + authority_url: str, + after: datetime.datetime, + ) -> Iterable[RawExtrinsicMetadataRow]: + metadata = self.raw_extrinsic_metadata_get(id, authority_type, authority_url) + return (m for m in metadata if m.discovery_date > after) + + def raw_extrinsic_metadata_get_after_date_and_fetcher( + self, + id: str, + authority_type: str, + authority_url: str, + after_date: datetime.datetime, + after_fetcher_name: str, + after_fetcher_version: str, + ) -> Iterable[RawExtrinsicMetadataRow]: + metadata = self._raw_extrinsic_metadata.get_from_partition_key((id,)) + after_tuple = (after_date, after_fetcher_name, after_fetcher_version) + return ( + m + for m in metadata + if m.authority_type == authority_type + and m.authority_url == authority_url + and (m.discovery_date, m.fetcher_name, m.fetcher_version) > after_tuple + ) + + def raw_extrinsic_metadata_get( + self, id: str, authority_type: str, authority_url: str + ) -> Iterable[RawExtrinsicMetadataRow]: + metadata = self._raw_extrinsic_metadata.get_from_partition_key((id,)) + return ( + m + for m in metadata + if m.authority_type == authority_type and m.authority_url == authority_url + ) + class InMemoryStorage(CassandraStorage): _cql_runner: InMemoryCqlRunner # type: ignore @@ -629,33 +689,6 @@ self._cql_runner = InMemoryCqlRunner() self._persons = {} - # {object_type: {id: {authority: [metadata]}}} - self._raw_extrinsic_metadata: Dict[ - MetadataTargetType, - Dict[ - Union[str, SWHID], - Dict[ - Hashable, - SortedList[ - Tuple[datetime.datetime, FetcherKey], RawExtrinsicMetadata - ], - ], - ], - ] = defaultdict( - lambda: defaultdict( - lambda: defaultdict( - lambda: SortedList( - key=lambda x: ( - x.discovery_date, - self._metadata_fetcher_key(x.fetcher), - ) - ) - ) - ) - ) # noqa - - self._metadata_fetchers: Dict[FetcherKey, MetadataFetcher] = {} - self._metadata_authorities: Dict[Hashable, MetadataAuthority] = {} self._objects = defaultdict(list) self._sorted_sha1s = SortedList[bytes, bytes]() @@ -664,160 +697,6 @@ def check_config(self, *, check_write: bool) -> bool: return True - def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata],) -> None: - self.journal_writer.raw_extrinsic_metadata_add(metadata) - for metadata_entry in metadata: - authority_key = self._metadata_authority_key(metadata_entry.authority) - if authority_key not in self._metadata_authorities: - raise StorageArgumentException( - f"Unknown authority {metadata_entry.authority}" - ) - fetcher_key = self._metadata_fetcher_key(metadata_entry.fetcher) - if fetcher_key not in self._metadata_fetchers: - raise StorageArgumentException( - f"Unknown fetcher {metadata_entry.fetcher}" - ) - - raw_extrinsic_metadata_list = self._raw_extrinsic_metadata[ - metadata_entry.type - ][metadata_entry.id][authority_key] - - for existing_raw_extrinsic_metadata in raw_extrinsic_metadata_list: - if ( - self._metadata_fetcher_key(existing_raw_extrinsic_metadata.fetcher) - == fetcher_key - and existing_raw_extrinsic_metadata.discovery_date - == metadata_entry.discovery_date - ): - # Duplicate of an existing one; ignore it. - break - else: - raw_extrinsic_metadata_list.add(metadata_entry) - - def raw_extrinsic_metadata_get( - self, - type: MetadataTargetType, - id: Union[str, SWHID], - authority: MetadataAuthority, - after: Optional[datetime.datetime] = None, - page_token: Optional[bytes] = None, - limit: int = 1000, - ) -> PagedResult[RawExtrinsicMetadata]: - authority_key = self._metadata_authority_key(authority) - - if type == MetadataTargetType.ORIGIN: - if isinstance(id, SWHID): - raise StorageArgumentException( - f"raw_extrinsic_metadata_get called with type='origin', " - f"but provided id is an SWHID: {id!r}" - ) - else: - if not isinstance(id, SWHID): - raise StorageArgumentException( - f"raw_extrinsic_metadata_get called with type!='origin', " - f"but provided id is not an SWHID: {id!r}" - ) - - if page_token is not None: - (after_time, after_fetcher) = msgpack_loads(base64.b64decode(page_token)) - after_fetcher = tuple(after_fetcher) - if after is not None and after > after_time: - raise StorageArgumentException( - "page_token is inconsistent with the value of 'after'." - ) - entries = self._raw_extrinsic_metadata[type][id][authority_key].iter_after( - (after_time, after_fetcher) - ) - elif after is not None: - entries = self._raw_extrinsic_metadata[type][id][authority_key].iter_from( - (after,) - ) - entries = (entry for entry in entries if entry.discovery_date > after) - else: - entries = iter(self._raw_extrinsic_metadata[type][id][authority_key]) - - if limit: - entries = itertools.islice(entries, 0, limit + 1) - - results = [] - for entry in entries: - entry_authority = self._metadata_authorities[ - self._metadata_authority_key(entry.authority) - ] - entry_fetcher = self._metadata_fetchers[ - self._metadata_fetcher_key(entry.fetcher) - ] - if after: - assert entry.discovery_date > after - results.append( - attr.evolve( - entry, - authority=attr.evolve(entry_authority, metadata=None), - fetcher=attr.evolve(entry_fetcher, metadata=None), - ) - ) - - if len(results) > limit: - results.pop() - assert len(results) == limit - last_result = results[-1] - next_page_token: Optional[str] = base64.b64encode( - msgpack_dumps( - ( - last_result.discovery_date, - self._metadata_fetcher_key(last_result.fetcher), - ) - ) - ).decode() - else: - next_page_token = None - - return PagedResult(next_page_token=next_page_token, results=results,) - - def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> None: - self.journal_writer.metadata_fetcher_add(fetchers) - for fetcher in fetchers: - if fetcher.metadata is None: - raise StorageArgumentException( - "MetadataFetcher.metadata may not be None in metadata_fetcher_add." - ) - key = self._metadata_fetcher_key(fetcher) - if key not in self._metadata_fetchers: - self._metadata_fetchers[key] = fetcher - - def metadata_fetcher_get( - self, name: str, version: str - ) -> Optional[MetadataFetcher]: - return self._metadata_fetchers.get( - self._metadata_fetcher_key(MetadataFetcher(name=name, version=version)) - ) - - def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None: - self.journal_writer.metadata_authority_add(authorities) - for authority in authorities: - if authority.metadata is None: - raise StorageArgumentException( - "MetadataAuthority.metadata may not be None in " - "metadata_authority_add." - ) - key = self._metadata_authority_key(authority) - self._metadata_authorities[key] = authority - - def metadata_authority_get( - self, type: MetadataAuthorityType, url: str - ) -> Optional[MetadataAuthority]: - return self._metadata_authorities.get( - self._metadata_authority_key(MetadataAuthority(type=type, url=url)) - ) - - @staticmethod - def _metadata_fetcher_key(fetcher: MetadataFetcher) -> FetcherKey: - return (fetcher.name, fetcher.version) - - @staticmethod - def _metadata_authority_key(authority: MetadataAuthority) -> Hashable: - return (authority.type, authority.url) - def diff_directories(self, from_dir, to_dir, track_renaming=False): raise NotImplementedError("InMemoryStorage.diff_directories")