diff --git a/swh/storage/cassandra/converters.py b/swh/storage/cassandra/converters.py --- a/swh/storage/cassandra/converters.py +++ b/swh/storage/cassandra/converters.py @@ -12,17 +12,29 @@ from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import ( + CoreSWHID, + ExtendedSWHID, + MetadataAuthority, + MetadataAuthorityType, + MetadataFetcher, ObjectType, OriginVisit, OriginVisitStatus, + RawExtrinsicMetadata, Release, Revision, RevisionType, Sha1Git, ) -from ..utils import remove_keys -from .model import OriginVisitRow, OriginVisitStatusRow, ReleaseRow, RevisionRow +from ..utils import map_optional, remove_keys +from .model import ( + OriginVisitRow, + OriginVisitStatusRow, + RawExtrinsicMetadataRow, + ReleaseRow, + RevisionRow, +) def revision_to_db(revision: Revision) -> RevisionRow: @@ -111,3 +123,25 @@ def visit_status_to_row(status: OriginVisitStatus) -> OriginVisitStatusRow: d = status.to_dict() return OriginVisitStatusRow.from_dict({**d, "metadata": json.dumps(d["metadata"])}) + + +def row_to_raw_extrinsic_metadata(row: RawExtrinsicMetadataRow) -> RawExtrinsicMetadata: + discovery_date = row.discovery_date.replace(tzinfo=datetime.timezone.utc) + + return RawExtrinsicMetadata( + target=ExtendedSWHID.from_string(row.target), + authority=MetadataAuthority( + type=MetadataAuthorityType(row.authority_type), url=row.authority_url, + ), + fetcher=MetadataFetcher(name=row.fetcher_name, version=row.fetcher_version,), + discovery_date=discovery_date, + format=row.format, + metadata=row.metadata, + origin=row.origin, + visit=row.visit, + snapshot=map_optional(CoreSWHID.from_string, row.snapshot), + release=map_optional(CoreSWHID.from_string, row.release), + revision=map_optional(CoreSWHID.from_string, row.revision), + path=row.path, + directory=map_optional(CoreSWHID.from_string, row.directory), + ) diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -62,6 +62,7 @@ OriginRow, OriginVisitRow, OriginVisitStatusRow, + RawExtrinsicMetadataByIdRow, RawExtrinsicMetadataRow, ReleaseRow, RevisionParentRow, @@ -1031,6 +1032,23 @@ else: return None + ######################### + # 'raw_extrinsic_metadata_by_id' table + ######################### + + @_prepared_insert_statement(RawExtrinsicMetadataByIdRow) + def raw_extrinsic_metadata_by_id_add(self, row, *, statement): + self._add_one(statement, row) + + @_prepared_select_statement(RawExtrinsicMetadataByIdRow, "WHERE id IN ?") + def raw_extrinsic_metadata_get_by_ids( + self, ids: List[Sha1Git], *, statement + ) -> Iterable[RawExtrinsicMetadataByIdRow]: + return map( + RawExtrinsicMetadataByIdRow.from_dict, + self._execute_with_retries(statement, [ids]), + ) + ######################### # 'raw_extrinsic_metadata' table ######################### diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py --- a/swh/storage/cassandra/model.py +++ b/swh/storage/cassandra/model.py @@ -288,6 +288,18 @@ directory: Optional[str] +@dataclasses.dataclass +class RawExtrinsicMetadataByIdRow(BaseRow): + TABLE = "raw_extrinsic_metadata_by_id" + PARTITION_KEY = ("id",) + CLUSTERING_KEY = () + + id: bytes + target: str + authority_type: str + authority_url: str + + @dataclasses.dataclass class ObjectCountRow(BaseRow): TABLE = "object_count" diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py --- a/swh/storage/cassandra/schema.py +++ b/swh/storage/cassandra/schema.py @@ -259,6 +259,14 @@ -- <=> (target1, ..., date1, id1) == (target2, ..., date2, id2) );""", """ +CREATE TABLE IF NOT EXISTS raw_extrinsic_metadata_by_id ( + id blob, + target text, + authority_type text, + authority_url text, + PRIMARY KEY ((id)) +);""", + """ CREATE TABLE IF NOT EXISTS object_count ( partition_key smallint, -- Constant, must always be 0 object_type ascii, diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -27,7 +27,7 @@ from swh.core.api.classes import stream_results from swh.core.api.serializers import msgpack_dumps, msgpack_loads -from swh.model.hashutil import DEFAULT_ALGORITHMS +from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_hex from swh.model.identifiers import CoreSWHID, ExtendedObjectType, ExtendedSWHID from swh.model.identifiers import ObjectType as SwhidObjectType from swh.model.model import ( @@ -77,6 +77,7 @@ OriginRow, OriginVisitRow, OriginVisitStatusRow, + RawExtrinsicMetadataByIdRow, RawExtrinsicMetadataRow, RevisionParentRow, SkippedContentRow, @@ -1303,10 +1304,22 @@ path=metadata_entry.path, directory=map_optional(str, metadata_entry.directory), ) - self._cql_runner.raw_extrinsic_metadata_add(row) - counter[metadata_entry.target.object_type] += 1 except TypeError as e: raise StorageArgumentException(*e.args) + + # Add to the index first + self._cql_runner.raw_extrinsic_metadata_by_id_add( + RawExtrinsicMetadataByIdRow( + id=row.id, + target=row.target, + authority_type=row.authority_type, + authority_url=row.authority_url, + ) + ) + + # Then to the main table + self._cql_runner.raw_extrinsic_metadata_add(row) + counter[metadata_entry.target.object_type] += 1 return { f"{type.value}_metadata:add": count for (type, count) in counter.items() } @@ -1342,32 +1355,9 @@ results = [] for entry in entries: - discovery_date = entry.discovery_date.replace(tzinfo=datetime.timezone.utc) - assert str(target) == entry.target - result = RawExtrinsicMetadata( - target=target, - authority=MetadataAuthority( - type=MetadataAuthorityType(entry.authority_type), - url=entry.authority_url, - ), - fetcher=MetadataFetcher( - name=entry.fetcher_name, version=entry.fetcher_version, - ), - discovery_date=discovery_date, - format=entry.format, - metadata=entry.metadata, - origin=entry.origin, - visit=entry.visit, - snapshot=map_optional(CoreSWHID.from_string, entry.snapshot), - release=map_optional(CoreSWHID.from_string, entry.release), - revision=map_optional(CoreSWHID.from_string, entry.revision), - path=entry.path, - directory=map_optional(CoreSWHID.from_string, entry.directory), - ) - - results.append(result) + results.append(converters.row_to_raw_extrinsic_metadata(entry)) if len(results) > limit: results.pop() @@ -1381,6 +1371,28 @@ return PagedResult(next_page_token=next_page_token, results=results,) + def raw_extrinsic_metadata_get_by_ids( + self, ids: List[Sha1Git] + ) -> List[RawExtrinsicMetadata]: + keys = self._cql_runner.raw_extrinsic_metadata_get_by_ids(ids) + + results: Set[RawExtrinsicMetadata] = set() + for key in keys: + candidates = self._cql_runner.raw_extrinsic_metadata_get( + key.target, key.authority_type, key.authority_url + ) + candidates = [ + candidate for candidate in candidates if candidate.id == key.id + ] + if len(candidates) > 1: + raise Exception( + "Found multiple RawExtrinsicMetadata objects with the same id: " + + hash_to_hex(key.id) + ) + results.update(map(converters.row_to_raw_extrinsic_metadata, candidates)) + + return list(results) + def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> Dict[str, int]: self.journal_writer.metadata_fetcher_add(fetchers) for fetcher in fetchers: diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -38,6 +38,7 @@ OriginRow, OriginVisitRow, OriginVisitStatusRow, + RawExtrinsicMetadataByIdRow, RawExtrinsicMetadataRow, ReleaseRow, RevisionParentRow, @@ -168,6 +169,7 @@ self._metadata_authorities = Table(MetadataAuthorityRow) self._metadata_fetchers = Table(MetadataFetcherRow) self._raw_extrinsic_metadata = Table(RawExtrinsicMetadataRow) + self._raw_extrinsic_metadata_by_id = Table(RawExtrinsicMetadataByIdRow) self._extid = Table(ExtIDRow) self._stat_counters = defaultdict(int) @@ -591,6 +593,25 @@ def metadata_fetcher_get(self, name, version) -> Optional[MetadataAuthorityRow]: return self._metadata_fetchers.get_from_primary_key((name, version)) + ######################### + # 'raw_extrinsic_metadata_by_id' table + ######################### + + def raw_extrinsic_metadata_by_id_add( + self, row: RawExtrinsicMetadataByIdRow + ) -> None: + self._raw_extrinsic_metadata_by_id.insert(row) + + def raw_extrinsic_metadata_get_by_ids( + self, ids + ) -> List[RawExtrinsicMetadataByIdRow]: + results = [] + for id_ in ids: + result = self._raw_extrinsic_metadata_by_id.get_from_primary_key((id_,)) + if result: + results.append(result) + return results + ######################### # 'raw_extrinsic_metadata' table ######################### diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -1191,7 +1191,7 @@ page_token: Optional[bytes] = None, limit: int = 1000, ) -> PagedResult[RawExtrinsicMetadata]: - """Retrieve list of all raw_extrinsic_metadata entries for the id + """Retrieve list of all raw_extrinsic_metadata entries targeting the id Args: target: the SWHID of the objects to find metadata on @@ -1206,6 +1206,20 @@ """ ... + @remote_api_endpoint("raw_extrinsic_metadata/get_by_ids") + def raw_extrinsic_metadata_get_by_ids( + self, ids: List[Sha1Git] + ) -> List[RawExtrinsicMetadata]: + """Retrieve list of raw_extrinsic_metadata entries of the given id + (unlike raw_extrinsic_metadata_get, which returns metadata entries + **targeting** the id) + + Args: + ids: list of hashes of RawExtrinsicMetadata objects + + """ + ... + @remote_api_endpoint("metadata_fetcher/add") def metadata_fetcher_add(self, fetchers: List[MetadataFetcher],) -> Dict[str, int]: """Add new metadata fetchers to the storage. diff --git a/swh/storage/postgresql/db.py b/swh/storage/postgresql/db.py --- a/swh/storage/postgresql/db.py +++ b/swh/storage/postgresql/db.py @@ -1266,7 +1266,6 @@ INNER JOIN metadata_authority ON (metadata_authority.id=authority_id) INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) - WHERE raw_extrinsic_metadata.target=%s AND authority_id=%s """ def raw_extrinsic_metadata_add( @@ -1322,6 +1321,7 @@ cur, ): query_parts = [self._raw_extrinsic_metadata_select_query] + query_parts.append("WHERE raw_extrinsic_metadata.target=%s AND authority_id=%s") args = [target, authority_id] if after_fetcher is not None: @@ -1341,6 +1341,15 @@ cur.execute(" ".join(query_parts), args) yield from cur + def raw_extrinsic_metadata_get_by_ids(self, ids: List[Sha1Git], cur=None): + cur = self._cursor(cur) + yield from execute_values_generator( + cur, + self._raw_extrinsic_metadata_select_query + + "INNER JOIN (VALUES %s) AS t(id) ON t.id = raw_extrinsic_metadata.id", + [(id_,) for id_ in ids], + ) + metadata_fetcher_cols = ["name", "version"] def metadata_fetcher_add(self, name: str, version: str, cur=None) -> None: diff --git a/swh/storage/postgresql/storage.py b/swh/storage/postgresql/storage.py --- a/swh/storage/postgresql/storage.py +++ b/swh/storage/postgresql/storage.py @@ -1430,6 +1430,17 @@ return PagedResult(next_page_token=next_page_token, results=results,) + @db_transaction() + def raw_extrinsic_metadata_get_by_ids( + self, ids: List[Sha1Git], db=None, cur=None, + ) -> List[RawExtrinsicMetadata]: + return [ + converters.db_to_raw_extrinsic_metadata( + dict(zip(db.raw_extrinsic_metadata_get_cols, row)) + ) + for row in db.raw_extrinsic_metadata_get_by_ids(ids) + ] + @timed @process_metrics @db_transaction() diff --git a/swh/storage/tests/storage_tests.py b/swh/storage/tests/storage_tests.py --- a/swh/storage/tests/storage_tests.py +++ b/swh/storage/tests/storage_tests.py @@ -4029,6 +4029,41 @@ assert result.results[0].to_dict() == new_content_metadata2.to_dict() assert result.results == [new_content_metadata2] + def test_content_metadata_get_by_ids(self, swh_storage, sample_data): + content, content2 = sample_data.contents[:2] + fetcher, fetcher2 = sample_data.fetchers[:2] + authority, authority2 = sample_data.authorities[:2] + ( + content1_metadata1, + content1_metadata2, + content1_metadata3, + ) = sample_data.content_metadata[:3] + + content2_metadata = RawExtrinsicMetadata.from_dict( + { + **remove_keys(content1_metadata2.to_dict(), ("id",)), # recompute id + "target": str(content2.swhid()), + } + ) + + swh_storage.metadata_authority_add([authority, authority2]) + swh_storage.metadata_fetcher_add([fetcher, fetcher2]) + + swh_storage.raw_extrinsic_metadata_add( + [ + content1_metadata1, + content1_metadata2, + content1_metadata3, + content2_metadata, + ] + ) + + assert set( + swh_storage.raw_extrinsic_metadata_get_by_ids( + [content1_metadata1.id, b"\x00" * 20, content2_metadata.id] + ) + ) == {content1_metadata1, content2_metadata} + def test_origin_metadata_add(self, swh_storage, sample_data): origin = sample_data.origin fetcher = sample_data.metadata_fetcher