diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[db,http] >= 0.5 -swh.model >= 1.0.0 +swh.model >= 2.1.0 swh.objstorage >= 0.2.2 diff --git a/sql/upgrades/168.sql b/sql/upgrades/168.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/168.sql @@ -0,0 +1,43 @@ +-- SWH DB schema upgrade +-- from_version: 167 +-- to_version: 168 +-- description: Add ExtID related tables + +insert into dbversion(version, release, description) + values(168, now(), 'Work In Progress'); + + +-- The ExtID (typ. original VCS) <-> swhid relation table +create table extid +( + extid_type text not null, + extid bytea not null, + target_type object_type not null, + target sha1_git not null +); + +comment on table extid is 'Correspondance SWH object (SWHID) <-> original revision id (vcs id)'; +comment on column extid.extid_type is 'ExtID type'; +comment on column extid.extid is 'Intrinsic identifier of the object (e.g. hg revision)'; +comment on column extid.target_type is 'Type of SWHID of the referenced SWH object'; +comment on column extid.target is 'Value (hash) of SWHID of the refenced SWH object'; + +-- Create entries in extid from tmp_extid +-- operates in bulk: 0. swh_mktemp(extid), 1. COPY to tmp_extid, +-- 2. call this function +create or replace function swh_extid_add() + returns void + language plpgsql +as $$ +begin + insert into extid (extid_type, extid, target_type, target) + select distinct t.extid_type, t.extid, t.target_type, t.target + from tmp_extid t + on conflict do nothing; + return; +end +$$; + +-- extid indexes +create unique index concurrently on extid(extid_type, extid); +create unique index concurrently on extid(target_type, target); diff --git a/swh/storage/api/serializers.py b/swh/storage/api/serializers.py --- a/swh/storage/api/serializers.py +++ b/swh/storage/api/serializers.py @@ -7,7 +7,8 @@ from typing import Callable, Dict, List, Tuple -from swh.model.identifiers import CoreSWHID, ExtendedSWHID, QualifiedSWHID +import swh.model.identifiers as identifiers +from swh.model.identifiers import CoreSWHID, ExtendedSWHID, ObjectType, QualifiedSWHID import swh.model.model as model from swh.storage import interface @@ -29,6 +30,10 @@ return getattr(model, d.pop("__type__"))(d["value"]) +def _decode_identifiers_enum(d): + return getattr(identifiers, d.pop("__type__"))(d["value"]) + + def _decode_storage_enum(d): return getattr(interface, d.pop("__type__"))(d["value"]) @@ -38,6 +43,7 @@ (CoreSWHID, "core_swhid", str), (ExtendedSWHID, "extended_swhid", str), (QualifiedSWHID, "qualified_swhid", str), + (ObjectType, "identifiers_enum", _encode_enum), (model.MetadataAuthorityType, "model_enum", _encode_enum), (interface.ListOrder, "storage_enum", _encode_enum), ] @@ -48,6 +54,7 @@ "extended_swhid": ExtendedSWHID.from_string, "qualified_swhid": QualifiedSWHID.from_string, "model": lambda d: getattr(model, d.pop("__type__")).from_dict(d), + "identifiers_enum": _decode_identifiers_enum, "model_enum": _decode_model_enum, "storage_enum": _decode_storage_enum, } diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py --- a/swh/storage/backfill.py +++ b/swh/storage/backfill.py @@ -24,6 +24,7 @@ BaseModel, Directory, DirectoryEntry, + ExtID, RawExtrinsicMetadata, Release, Revision, @@ -32,6 +33,7 @@ TargetType, ) from swh.storage.postgresql.converters import ( + db_to_extid, db_to_raw_extrinsic_metadata, db_to_release, db_to_revision, @@ -45,6 +47,7 @@ "content": "sha1", "skipped_content": "sha1", "directory": "id", + "extid": "target", "metadata_authority": "type, url", "metadata_fetcher": "name, version", "raw_extrinsic_metadata": "target", @@ -77,8 +80,20 @@ "reason", ], "directory": ["id", "dir_entries", "file_entries", "rev_entries"], + "extid": ["extid_type", "extid", "target_type", "target"], "metadata_authority": ["type", "url", "metadata",], "metadata_fetcher": ["name", "version", "metadata",], + "origin": ["url"], + "origin_visit": ["visit", "type", ("origin.url", "origin"), "date",], + "origin_visit_status": [ + ("origin_visit_status.visit", "visit"), + ("origin.url", "origin"), + ("origin_visit_status.date", "date"), + "type", + "snapshot", + "status", + "metadata", + ], "raw_extrinsic_metadata": [ "raw_extrinsic_metadata.type", "raw_extrinsic_metadata.target", @@ -141,17 +156,6 @@ ("a.fullname", "author_fullname"), ], "snapshot": ["id", "object_id"], - "origin": ["url"], - "origin_visit": ["visit", "type", ("origin.url", "origin"), "date",], - "origin_visit_status": [ - ("origin_visit_status.visit", "visit"), - ("origin.url", "origin"), - ("origin_visit_status.date", "date"), - "type", - "snapshot", - "status", - "metadata", - ], } @@ -212,13 +216,21 @@ def raw_extrinsic_metadata_converter( db: BaseDb, metadata: Dict[str, Any] ) -> RawExtrinsicMetadata: - """Convert revision from the flat representation to swh model + """Convert a raw extrinsic metadata from the flat representation to swh model compatible objects. """ return db_to_raw_extrinsic_metadata(metadata) +def extid_converter(db: BaseDb, extid: Dict[str, Any]) -> ExtID: + """Convert an extid from the flat representation to swh model + compatible objects. + + """ + return db_to_extid(extid) + + def revision_converter(db: BaseDb, revision_d: Dict[str, Any]) -> Revision: """Convert revision from the flat representation to swh model compatible objects. @@ -272,6 +284,7 @@ CONVERTERS: Dict[str, Callable[[BaseDb, Dict[str, Any]], BaseModel]] = { "directory": directory_converter, + "extid": extid_converter, "raw_extrinsic_metadata": raw_extrinsic_metadata_converter, "revision": revision_converter, "release": release_converter, @@ -433,6 +446,7 @@ "content": lambda start, end: byte_ranges(24, start, end), "skipped_content": lambda start, end: [(None, None)], "directory": lambda start, end: byte_ranges(24, start, end), + "extid": lambda start, end: byte_ranges(24, start, end), "revision": lambda start, end: byte_ranges(24, start, end), "release": lambda start, end: byte_ranges(16, start, end), "raw_extrinsic_metadata": raw_extrinsic_metadata_target_ranges, @@ -579,7 +593,7 @@ raise ValueError( "Object type %s is not supported. " "The only possible values are %s" - % (object_type, ", ".join(COLUMNS.keys())) + % (object_type, ", ".join(sorted(COLUMNS.keys()))) ) if object_type in ["origin", "origin_visit", "origin_visit_status"]: diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -34,6 +34,7 @@ wait_random_exponential, ) +from swh.model.identifiers import CoreSWHID from swh.model.model import ( Content, Person, @@ -52,6 +53,7 @@ ContentRow, DirectoryEntryRow, DirectoryRow, + ExtIDRow, MetadataAuthorityRow, MetadataFetcherRow, ObjectCountRow, @@ -964,6 +966,105 @@ ), ) + ########################## + # 'extid' table + ########################## + def _extid_add_finalize(self, statement: BoundStatement) -> None: + """Returned currified by extid_add_prepare, to be called when the + extid row should be added to the primary table.""" + self._execute_with_retries(statement, None) + self._increment_counter("extid", 1) + + @_prepared_insert_statement(ExtIDRow) + def extid_add_prepare( + self, extid: ExtIDRow, *, statement + ) -> Tuple[int, Callable[[], None]]: + statement = statement.bind(dataclasses.astuple(extid)) + token_class = self._cluster.metadata.token_map.token_class + token = token_class.from_key(statement.routing_key).value + assert TOKEN_BEGIN <= token <= TOKEN_END + + # Function to be called after the indexes contain their respective + # row + finalizer = functools.partial(self._extid_add_finalize, statement) + + return (token, finalizer) + + @_prepared_select_statement( + ExtIDRow, "WHERE extid_type=? AND extid=? AND target_type=? AND target=?", + ) + def extid_get_from_pk( + self, extid_type: str, extid: bytes, target: CoreSWHID, *, statement, + ) -> Optional[ExtIDRow]: + rows = list( + self._execute_with_retries( + statement, + [extid_type, extid, target.object_type.value, target.object_id], + ), + ) + assert len(rows) <= 1 + if rows: + return ExtIDRow(**rows[0]) + else: + return None + + @_prepared_select_statement( + ExtIDRow, "WHERE token(extid_type, extid) = ?", + ) + def extid_get_from_token(self, token: int, *, statement) -> Iterable[ExtIDRow]: + return map(ExtIDRow.from_dict, self._execute_with_retries(statement, [token]),) + + @_prepared_select_statement( + ExtIDRow, "WHERE extid_type=? AND extid=?", + ) + def extid_get_from_extid( + self, extid_type: str, extid: bytes, *, statement + ) -> Iterable[ExtIDRow]: + return map( + ExtIDRow.from_dict, + self._execute_with_retries(statement, [extid_type, extid]), + ) + + def extid_get_from_target( + self, target_type: str, target: bytes + ) -> Iterable[ExtIDRow]: + for token in self._extid_get_tokens_from_target(target_type, target): + if token is not None: + for extid in self.extid_get_from_token(token): + # re-check the extid against target (in case of murmur3 collision) + if ( + extid is not None + and extid.target_type == target_type + and extid.target == target + ): + yield extid + + ########################## + # 'extid_by_*' tables + ########################## + + def extid_index_add_one(self, extid: ExtIDRow, token: int) -> None: + """Adds a row mapping extid[target_type, target] to the token of the ExtID in + the main 'extid' table.""" + query = ( + "INSERT INTO extid_by_target (target_type, target, target_token) " + "VALUES (%s, %s, %s)" + ) + self._execute_with_retries(query, [extid.target_type, extid.target, token]) + + def _extid_get_tokens_from_target( + self, target_type: str, target: bytes + ) -> Iterable[int]: + query = ( + "SELECT target_token " + "FROM extid_by_target " + "WHERE target_type = %s AND target = %s" + ) + return ( + row["target_token"] + for row in self._execute_with_retries(query, [target_type, target]) + ) + ########################## # Miscellaneous ########################## diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py --- a/swh/storage/cassandra/model.py +++ b/swh/storage/cassandra/model.py @@ -280,3 +280,14 @@ partition_key: int object_type: str count: int + + +@dataclasses.dataclass +class ExtIDRow(BaseRow): + TABLE = "extid" + PARTITION_KEY = ("target", "target_type", "extid", "extid_type") + + extid_type: str + extid: bytes + target_type: str + target: bytes diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py --- a/swh/storage/cassandra/schema.py +++ b/swh/storage/cassandra/schema.py @@ -220,6 +220,21 @@ object_type ascii, count counter, PRIMARY KEY ((partition_key), object_type) +);""", + """ +CREATE TABLE IF NOT EXISTS extid ( + extid_type ascii, + extid blob, + target_type ascii, + target blob, + PRIMARY KEY ((extid_type, extid), target_type, target) +);""", + """ +CREATE TABLE IF NOT EXISTS extid_by_target ( + target_type ascii, + target blob, + target_token bigint, -- value of token(pk) on the "primary" table + PRIMARY KEY ((target_type, target), target_token) );""", ] @@ -255,6 +270,8 @@ "origin_visit_status", "metadata_authority", "metadata_fetcher", + "extid", + "extid_by_target", ] HASH_ALGORITHMS = ["sha1", "sha1_git", "sha256", "blake2s256"] diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -28,10 +28,12 @@ from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.identifiers import CoreSWHID, ExtendedSWHID +from swh.model.identifiers import ObjectType as SwhidObjectType from swh.model.model import ( Content, Directory, DirectoryEntry, + ExtID, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, @@ -67,6 +69,7 @@ ContentRow, DirectoryEntryRow, DirectoryRow, + ExtIDRow, MetadataAuthorityRow, MetadataFetcherRow, OriginRow, @@ -1323,6 +1326,87 @@ else: return None + # ExtID tables + def extid_add(self, ids: List[ExtID]) -> Dict[str, int]: + extids = [ + extid + for extid in ids + if not self._cql_runner.extid_get_from_pk( + extid_type=extid.extid_type, extid=extid.extid, target=extid.target, + ) + ] + + self.journal_writer.extid_add(extids) + + inserted = 0 + for extid in extids: + extidrow = ExtIDRow( + extid_type=extid.extid_type, + extid=extid.extid, + target_type=extid.target.object_type.value, + target=extid.target.object_id, + ) + (token, insertion_finalizer) = self._cql_runner.extid_add_prepare(extidrow) + if ( + self.extid_get_from_extid(extid.extid_type, [extid.extid])[0] + or self.extid_get_from_target( + extid.target.object_type, [extid.target.object_id] + )[0] + ): + # on conflict do nothing... + continue + self._cql_runner.extid_index_add_one(extidrow, token) + insertion_finalizer() + inserted += 1 + return {"extid:add": inserted} + + def extid_get_from_extid( + self, id_type: str, ids: List[bytes] + ) -> List[Optional[ExtID]]: + result: List[Optional[ExtID]] = [] + for extid in ids: + extidrows = list(self._cql_runner.extid_get_from_extid(id_type, extid)) + assert len(extidrows) <= 1 + if extidrows: + result.append( + ExtID( + extid_type=extidrows[0].extid_type, + extid=extidrows[0].extid, + target=CoreSWHID( + object_type=extidrows[0].target_type, + object_id=extidrows[0].target, + ), + ) + ) + else: + result.append(None) + return result + + def extid_get_from_target( + self, target_type: SwhidObjectType, ids: List[Sha1Git] + ) -> List[Optional[ExtID]]: + result: List[Optional[ExtID]] = [] + for target in ids: + extidrows = list( + self._cql_runner.extid_get_from_target(target_type.value, target) + ) + assert len(extidrows) <= 1 + if extidrows: + result.append( + ExtID( + extid_type=extidrows[0].extid_type, + extid=extidrows[0].extid, + target=CoreSWHID( + object_type=SwhidObjectType(extidrows[0].target_type), + object_id=extidrows[0].target, + ), + ) + ) + else: + result.append(None) + return result + + # Misc def clear_buffers(self, object_types: Sequence[str] = ()) -> None: """Do nothing diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -21,6 +21,7 @@ Union, ) +from swh.model.identifiers import ExtendedSWHID from swh.model.model import Content, Sha1Git, SkippedContent from swh.storage.cassandra import CassandraStorage from swh.storage.cassandra.model import ( @@ -28,6 +29,7 @@ ContentRow, DirectoryEntryRow, DirectoryRow, + ExtIDRow, MetadataAuthorityRow, MetadataFetcherRow, ObjectCountRow, @@ -164,6 +166,7 @@ self._metadata_authorities = Table(MetadataAuthorityRow) self._metadata_fetchers = Table(MetadataFetcherRow) self._raw_extrinsic_metadata = Table(RawExtrinsicMetadataRow) + self._extid = Table(ExtIDRow) self._stat_counters = defaultdict(int) def increment_counter(self, object_type: str, nb: int): @@ -612,6 +615,49 @@ if m.authority_type == authority_type and m.authority_url == authority_url ) + ######################### + # 'extid' table + ######################### + def _extid_add_finalize(self, extid: ExtIDRow) -> None: + self._extid.insert(extid) + self.increment_counter("extid", 1) + + def extid_add_prepare(self, extid: ExtIDRow): + finalizer = functools.partial(self._extid_add_finalize, extid) + return (self._extid.token(self._extid.partition_key(extid)), finalizer) + + def extid_index_add_one(self, extid: ExtIDRow, token: int) -> None: + pass + + def extid_get_from_pk( + self, extid_type: str, extid: bytes, target: ExtendedSWHID, + ) -> Optional[ExtIDRow]: + primary_key = self._extid.primary_key_from_dict( + dict( + extid_type=extid_type, + extid=extid, + target_type=target.object_type.value, + target=target.object_id, + ) + ) + return self._extid.get_from_primary_key(primary_key) + + def extid_get_from_extid(self, extid_type: str, extid: bytes) -> Iterable[ExtIDRow]: + return ( + row + for pk, row in self._extid.iter_all() + if row.extid_type == extid_type and row.extid == extid + ) + + def extid_get_from_target( + self, target_type: str, target: bytes + ) -> Iterable[ExtIDRow]: + return ( + row + for pk, row in self._extid.iter_all() + if row.target_type == target_type and row.target == target + ) + class InMemoryStorage(CassandraStorage): _cql_runner: InMemoryCqlRunner # type: ignore diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -11,10 +11,11 @@ from swh.core.api import remote_api_endpoint from swh.core.api.classes import PagedResult as CorePagedResult -from swh.model.identifiers import ExtendedSWHID +from swh.model.identifiers import ExtendedSWHID, ObjectType from swh.model.model import ( Content, Directory, + ExtID, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, @@ -500,6 +501,52 @@ """ ... + @remote_api_endpoint("extid/from_extid") + def extid_get_from_extid( + self, id_type: str, ids: List[bytes] + ) -> List[Optional[ExtID]]: + """Get ExtID objects from external IDs + + Args: + id_type: type of the given external identifiers (e.g. 'mercurial') + ids: list of external IDs + + Returns: + list of ExtID objects (if the ext ID is known, None otherwise) + + """ + ... + + @remote_api_endpoint("extid/from_target") + def extid_get_from_target( + self, target_type: ObjectType, ids: List[Sha1Git] + ) -> List[Optional[ExtID]]: + """Get ExtID objects from target IDs and target_type + + Args: + target_type: type the SWH object + ids: list of target IDs + + Returns: + list of ExtID objects (if the SWH ID is known, None otherwise) + + """ + ... + + @remote_api_endpoint("extid/add") + def extid_add(self, ids: List[ExtID]) -> Dict[str, int]: + """Add a series of ExtID objects + + Args: + ids: list of ExtID objects + + Returns: + Summary dict of keys with associated count as values + + extid:add: New ExtID objects actually stored in db + """ + ... + @remote_api_endpoint("revision/log") def revision_log( self, revisions: List[Sha1Git], limit: Optional[int] = None diff --git a/swh/storage/postgresql/converters.py b/swh/storage/postgresql/converters.py --- a/swh/storage/postgresql/converters.py +++ b/swh/storage/postgresql/converters.py @@ -8,8 +8,11 @@ import warnings from swh.core.utils import encode_with_unescape -from swh.model.identifiers import CoreSWHID, ExtendedSWHID, origin_identifier +from swh.model.identifiers import CoreSWHID, ExtendedSWHID +from swh.model.identifiers import ObjectType as SwhidObjectType +from swh.model.identifiers import origin_identifier from swh.model.model import ( + ExtID, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, @@ -322,3 +325,14 @@ path=row["path"], directory=map_optional(CoreSWHID.from_string, row["directory"]), ) + + +def db_to_extid(row) -> ExtID: + return ExtID( + extid=row["extid"], + extid_type=row["extid_type"], + target=CoreSWHID( + object_id=row["target"], + object_type=SwhidObjectType[row["target_type"].upper()], + ), + ) diff --git a/swh/storage/postgresql/db.py b/swh/storage/postgresql/db.py --- a/swh/storage/postgresql/db.py +++ b/swh/storage/postgresql/db.py @@ -13,6 +13,7 @@ from swh.core.db.db_utils import execute_values_generator from swh.core.db.db_utils import jsonize as _jsonize from swh.core.db.db_utils import stored_procedure +from swh.model.identifiers import ObjectType from swh.model.model import SHA1_SIZE, OriginVisit, OriginVisitStatus from swh.storage.interface import ListOrder @@ -28,7 +29,7 @@ """ - current_version = 167 + current_version = 168 def mktemp_dir_entry(self, entry_type, cur=None): self._cursor(cur).execute( @@ -76,6 +77,10 @@ def revision_add_from_temp(self, cur=None): pass + @stored_procedure("swh_extid_add") + def extid_add_from_temp(self, cur=None): + pass + @stored_procedure("swh_release_add") def release_add_from_temp(self, cur=None): pass @@ -812,6 +817,51 @@ ((sortkey, id) for sortkey, id in enumerate(revisions)), ) + extid_cols = ["extid", "extid_type", "target", "target_type"] + + def extid_get_from_extid_list(self, extid_type, ids, cur=None): + cur = self._cursor(cur) + query_keys = ", ".join( + self.mangle_query_key(k, "extid") for k in self.extid_cols + ) + sql = """ + SELECT %s + FROM (VALUES %%s) as t(sortkey, extid, extid_type) + LEFT JOIN extid USING (extid, extid_type) + ORDER BY sortkey + """ % ( + query_keys, + ) + + yield from execute_values_generator( + cur, + sql, + (((sortkey, extid, extid_type) for sortkey, extid in enumerate(ids))), + ) + + def extid_get_from_swhid_list(self, target_type, ids, cur=None): + cur = self._cursor(cur) + target_type = ObjectType( + target_type + ).name.lower() # aka "rev" -> "revision", ... + query_keys = ", ".join( + self.mangle_query_key(k, "extid") for k in self.extid_cols + ) + sql = """ + SELECT %s + FROM (VALUES %%s) as t(sortkey, target, target_type) + LEFT JOIN extid USING (target, target_type) + ORDER BY sortkey + """ % ( + query_keys, + ) + yield from execute_values_generator( + cur, + sql, + (((sortkey, target, target_type) for sortkey, target in enumerate(ids))), + template=b"(%s,%s,%s::object_type)", + ) + def revision_log(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) diff --git a/swh/storage/postgresql/storage.py b/swh/storage/postgresql/storage.py --- a/swh/storage/postgresql/storage.py +++ b/swh/storage/postgresql/storage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -19,11 +19,12 @@ from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.core.db.common import db_transaction, db_transaction_generator from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex -from swh.model.identifiers import ExtendedObjectType, ExtendedSWHID +from swh.model.identifiers import ExtendedObjectType, ExtendedSWHID, ObjectType from swh.model.model import ( SHA1_SIZE, Content, Directory, + ExtID, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, @@ -633,6 +634,55 @@ def revision_get_random(self, db=None, cur=None) -> Sha1Git: return db.revision_get_random(cur) + @timed + @db_transaction() + def extid_get_from_extid( + self, id_type: str, ids: List[bytes], db=None, cur=None + ) -> List[Optional[ExtID]]: + extids = [] + for row in db.extid_get_from_extid_list(id_type, ids, cur): + extids.append( + converters.db_to_extid(dict(zip(db.extid_cols, row))) + if row[0] is not None + else None + ) + return extids + + @timed + @db_transaction() + def extid_get_from_target( + self, target_type: ObjectType, ids: List[Sha1Git], db=None, cur=None + ) -> List[Optional[ExtID]]: + extids = [] + for row in db.extid_get_from_swhid_list(target_type.value, ids, cur): + extids.append( + converters.db_to_extid(dict(zip(db.extid_cols, row))) + if row[0] is not None + else None + ) + return extids + + @timed + @db_transaction() + def extid_add(self, ids: List[ExtID], db=None, cur=None) -> Dict[str, int]: + extid = [ + { + "extid": extid.extid, + "extid_type": extid.extid_type, + "target": extid.target.object_id, + "target_type": extid.target.object_type.name.lower(), # arghh + } + for extid in ids + ] + db.mktemp("extid", cur) + + db.copy_to(extid, "tmp_extid", db.extid_cols, cur) + + # move metadata in place + db.extid_add_from_temp(cur) + + return {"extid:add": len(extid)} + @timed @process_metrics @db_transaction() diff --git a/swh/storage/replay.py b/swh/storage/replay.py --- a/swh/storage/replay.py +++ b/swh/storage/replay.py @@ -17,6 +17,7 @@ BaseModel, Content, Directory, + ExtID, MetadataAuthority, MetadataFetcher, Origin, @@ -50,6 +51,7 @@ "metadata_authority": MetadataAuthority.from_dict, "metadata_fetcher": MetadataFetcher.from_dict, "raw_extrinsic_metadata": RawExtrinsicMetadata.from_dict, + "extid": ExtID.from_dict, } @@ -146,6 +148,7 @@ storage.raw_extrinsic_metadata_add(converted) elif object_type in ( "directory", + "extid", "revision", "release", "snapshot", diff --git a/swh/storage/sql/30-schema.sql b/swh/storage/sql/30-schema.sql --- a/swh/storage/sql/30-schema.sql +++ b/swh/storage/sql/30-schema.sql @@ -17,7 +17,7 @@ -- latest schema version insert into dbversion(version, release, description) - values(167, now(), 'Work In Progress'); + values(168, now(), 'Work In Progress'); -- a SHA1 checksum create domain sha1 as bytea check (length(value) = 20); @@ -499,3 +499,19 @@ comment on column object_counts_bucketed.bucket_end is 'Upper bound (exclusive) for the bucket'; comment on column object_counts_bucketed.value is 'Count of objects in the bucket'; comment on column object_counts_bucketed.last_update is 'Last update for the object count in this bucket'; + + +-- The ExtID (typ. original VCS) <-> swhid relation table +create table extid +( + extid_type text not null, + extid bytea not null, + target_type object_type not null, + target sha1_git not null +); + +comment on table extid is 'Correspondance SWH object (SWHID) <-> original revision id (vcs id)'; +comment on column extid.extid_type is 'ExtID type'; +comment on column extid.extid is 'Intrinsic identifier of the object (e.g. hg revision)'; +comment on column extid.target_type is 'Type of SWHID of the referenced SWH object'; +comment on column extid.target is 'Value (hash) of SWHID of the refenced SWH object'; diff --git a/swh/storage/sql/40-funcs.sql b/swh/storage/sql/40-funcs.sql --- a/swh/storage/sql/40-funcs.sql +++ b/swh/storage/sql/40-funcs.sql @@ -549,6 +549,23 @@ $$; +-- Create entries in extid from tmp_extid +-- operates in bulk: 0. swh_mktemp(extid), 1. COPY to tmp_extid, +-- 2. call this function +create or replace function swh_extid_add() + returns void + language plpgsql +as $$ +begin + insert into extid (extid_type, extid, target_type, target) + select distinct t.extid_type, t.extid, t.target_type, t.target + from tmp_extid t + on conflict do nothing; + return; +end +$$; + + -- Create entries in person from tmp_release create or replace function swh_person_add_from_release() returns void diff --git a/swh/storage/sql/60-indexes.sql b/swh/storage/sql/60-indexes.sql --- a/swh/storage/sql/60-indexes.sql +++ b/swh/storage/sql/60-indexes.sql @@ -281,3 +281,7 @@ -- object_counts_bucketed create unique index concurrently object_counts_bucketed_pkey on object_counts_bucketed(line); alter table object_counts_bucketed add primary key using index object_counts_bucketed_pkey; + +-- extid +create unique index concurrently on extid(extid_type, extid); +create unique index concurrently on extid(target_type, target); diff --git a/swh/storage/tests/storage_data.py b/swh/storage/tests/storage_data.py --- a/swh/storage/tests/storage_data.py +++ b/swh/storage/tests/storage_data.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -10,11 +10,13 @@ from swh.model import from_disk from swh.model.hashutil import hash_to_bytes -from swh.model.identifiers import ExtendedObjectType, ExtendedSWHID +from swh.model.identifiers import CoreSWHID, ExtendedObjectType, ExtendedSWHID +from swh.model.identifiers import ObjectType as SwhidObjectType from swh.model.model import ( Content, Directory, DirectoryEntry, + ExtID, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, @@ -311,7 +313,139 @@ extra_headers=(), synthetic=False, ) - revisions: Tuple[Revision, ...] = (revision, revision2, revision3, revision4) + git_revisions: Tuple[Revision, ...] = (revision, revision2, revision3, revision4) + + hg_revision = Revision( + id=hash_to_bytes("951c9503541e7beaf002d7aebf2abd1629084c68"), + message=b"hello", + author=Person( + name=b"Nicolas Dandrimont", + email=b"nicolas@example.com", + fullname=b"Nicolas Dandrimont ", + ), + date=TimestampWithTimezone( + timestamp=Timestamp(seconds=1234567890, microseconds=0), + offset=120, + negative_utc=False, + ), + committer=Person( + name=b"St\xc3fano Zacchiroli", + email=b"stefano@example.com", + fullname=b"St\xc3fano Zacchiroli ", + ), + committer_date=TimestampWithTimezone( + timestamp=Timestamp(seconds=1123456789, microseconds=0), + offset=120, + negative_utc=False, + ), + parents=(), + type=RevisionType.MERCURIAL, + directory=directory.id, + metadata={ + "checksums": {"sha1": "tarball-sha1", "sha256": "tarball-sha256",}, + "signed-off-by": "some-dude", + "node": "a316dfb434af2b451c1f393496b7eaeda343f543", + }, + extra_headers=(), + synthetic=True, + ) + hg_revision2 = Revision( + id=hash_to_bytes("df4afb063236300eb13b96a0d7fff03f7b7cbbaf"), + message=b"hello again", + author=Person( + name=b"Roberto Dicosmo", + email=b"roberto@example.com", + fullname=b"Roberto Dicosmo ", + ), + date=TimestampWithTimezone( + timestamp=Timestamp(seconds=1234567843, microseconds=220000,), + offset=-720, + negative_utc=False, + ), + committer=Person( + name=b"tony", email=b"ar@dumont.fr", fullname=b"tony ", + ), + committer_date=TimestampWithTimezone( + timestamp=Timestamp(seconds=1123456789, microseconds=220000,), + offset=0, + negative_utc=False, + ), + parents=tuple([hg_revision.id]), + type=RevisionType.MERCURIAL, + directory=directory2.id, + metadata=None, + extra_headers=( + (b"node", hash_to_bytes("fa1b7c84a9b40605b67653700f268349a6d6aca1")), + ), + synthetic=False, + ) + hg_revision3 = Revision( + id=hash_to_bytes("84d8e7081b47ebb88cad9fa1f25de5f330872a37"), + message=b"a simple revision with no parents this time", + author=Person( + name=b"Roberto Dicosmo", + email=b"roberto@example.com", + fullname=b"Roberto Dicosmo ", + ), + date=TimestampWithTimezone( + timestamp=Timestamp(seconds=1234567843, microseconds=220000,), + offset=-720, + negative_utc=False, + ), + committer=Person( + name=b"tony", email=b"ar@dumont.fr", fullname=b"tony ", + ), + committer_date=TimestampWithTimezone( + timestamp=Timestamp(seconds=1127351742, microseconds=220000,), + offset=0, + negative_utc=False, + ), + parents=tuple([hg_revision.id, hg_revision2.id]), + type=RevisionType.MERCURIAL, + directory=directory2.id, + metadata=None, + extra_headers=( + (b"node", hash_to_bytes("7f294a01c49065a90b3fe8b4ad49f08ce9656ef6")), + ), + synthetic=True, + ) + hg_revision4 = Revision( + id=hash_to_bytes("42070a39e5387e9b99bb3d83674e3a4a1ff39b69"), + message=b"parent of self.revision2", + author=Person( + name=b"me", email=b"me@soft.heri", fullname=b"me ", + ), + date=TimestampWithTimezone( + timestamp=Timestamp(seconds=1234567843, microseconds=220000,), + offset=-720, + negative_utc=False, + ), + committer=Person( + name=b"committer-dude", + email=b"committer@dude.com", + fullname=b"committer-dude ", + ), + committer_date=TimestampWithTimezone( + timestamp=Timestamp(seconds=1244567843, microseconds=220000,), + offset=-720, + negative_utc=False, + ), + parents=tuple([hg_revision3.id]), + type=RevisionType.MERCURIAL, + directory=directory.id, + metadata=None, + extra_headers=( + (b"node", hash_to_bytes("f4160af0485c85823d9e829bae2c00b00a2e6297")), + ), + synthetic=False, + ) + hg_revisions: Tuple[Revision, ...] = ( + hg_revision, + hg_revision2, + hg_revision3, + hg_revision4, + ) + revisions: Tuple[Revision, ...] = git_revisions + hg_revisions origins: Tuple[Origin, ...] = ( Origin(url="https://github.com/user1/repo1"), @@ -549,3 +683,29 @@ origin_metadata2, origin_metadata3, ) + + extid1 = ExtID( + target=CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=revision.id), + extid_type="git", + extid=revision.id, + ) + + extid2 = ExtID( + target=CoreSWHID( + object_type=SwhidObjectType.REVISION, object_id=hg_revision.id + ), + extid_type="mercurial", + extid=hash_to_bytes("a316dfb434af2b451c1f393496b7eaeda343f543"), + ) + + extid3 = ExtID( + target=CoreSWHID(object_type=SwhidObjectType.DIRECTORY, object_id=directory.id), + extid_type="directory", + extid=b"something", + ) + + extids: Tuple[ExtID, ...] = ( + extid1, + extid2, + extid3, + ) diff --git a/swh/storage/tests/storage_tests.py b/swh/storage/tests/storage_tests.py --- a/swh/storage/tests/storage_tests.py +++ b/swh/storage/tests/storage_tests.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -19,9 +19,11 @@ from swh.model import from_disk from swh.model.hashutil import hash_to_bytes from swh.model.hypothesis_strategies import objects +from swh.model.identifiers import CoreSWHID, ObjectType from swh.model.model import ( Content, Directory, + ExtID, Origin, OriginVisit, OriginVisitStatus, @@ -1060,6 +1062,171 @@ revision3.id, } + def test_extid_add_git(self, swh_storage, sample_data): + + gitids = [ + revision.id + for revision in sample_data.revisions + if revision.type.value == "git" + ] + nullids = [None] * len(gitids) + extids = [ + ExtID( + extid=gitid, + extid_type="git", + target=CoreSWHID(object_id=gitid, object_type=ObjectType.REVISION,), + ) + for gitid in gitids + ] + + assert swh_storage.extid_get_from_extid("git", gitids) == nullids + assert swh_storage.extid_get_from_target(ObjectType.REVISION, gitids) == nullids + + summary = swh_storage.extid_add(extids) + assert summary == {"extid:add": len(gitids)} + + assert swh_storage.extid_get_from_extid("git", gitids) == extids + assert swh_storage.extid_get_from_target(ObjectType.REVISION, gitids) == extids + + assert swh_storage.extid_get_from_extid("hg", gitids) == nullids + assert swh_storage.extid_get_from_target(ObjectType.RELEASE, gitids) == nullids + + def test_extid_add_hg(self, swh_storage, sample_data): + def get_node(revision): + node = None + if revision.extra_headers: + node = dict(revision.extra_headers).get(b"node") + if node is None and revision.metadata: + node = hash_to_bytes(revision.metadata.get("node")) + return node + + swhids = [ + revision.id + for revision in sample_data.revisions + if revision.type.value == "hg" + ] + extids = [ + get_node(revision) + for revision in sample_data.revisions + if revision.type.value == "hg" + ] + nullids = [None] * len(swhids) + + assert swh_storage.extid_get_from_extid("hg", extids) == nullids + assert swh_storage.extid_get_from_target(ObjectType.REVISION, swhids) == nullids + + extid_objs = [ + ExtID( + extid=hgid, + extid_type="hg", + target=CoreSWHID(object_id=swhid, object_type=ObjectType.REVISION,), + ) + for hgid, swhid in zip(extids, swhids) + ] + summary = swh_storage.extid_add(extid_objs) + assert summary == {"extid:add": len(swhids)} + + assert swh_storage.extid_get_from_extid("hg", extids) == extid_objs + assert ( + swh_storage.extid_get_from_target(ObjectType.REVISION, swhids) == extid_objs + ) + + assert swh_storage.extid_get_from_extid("git", extids) == nullids + assert swh_storage.extid_get_from_target(ObjectType.RELEASE, swhids) == nullids + + def test_extid_add_twice(self, swh_storage, sample_data): + + gitids = [ + revision.id + for revision in sample_data.revisions + if revision.type.value == "git" + ] + + extids = [ + ExtID( + extid=gitid, + extid_type="git", + target=CoreSWHID(object_id=gitid, object_type=ObjectType.REVISION,), + ) + for gitid in gitids + ] + summary = swh_storage.extid_add(extids) + assert summary == {"extid:add": len(gitids)} + + # add them again, should be noop + summary = swh_storage.extid_add(extids) + # assert summary == {"extid:add": 0} + assert swh_storage.extid_get_from_extid("git", gitids) == extids + assert swh_storage.extid_get_from_target(ObjectType.REVISION, gitids) == extids + + def test_extid_add_extid_unicity(self, swh_storage, sample_data): + + ids = [ + revision.id + for revision in sample_data.revisions + if revision.type.value == "git" + ] + nullids = [None] * len(ids) + + extids = [ + ExtID( + extid=extid, + extid_type="git", + target=CoreSWHID(object_id=extid, object_type=ObjectType.REVISION,), + ) + for extid in ids + ] + swh_storage.extid_add(extids) + + # try to add "modified-extid" versions, should be noops + extids2 = [ + ExtID( + extid=extid, + extid_type="hg", + target=CoreSWHID(object_id=extid, object_type=ObjectType.REVISION,), + ) + for extid in ids + ] + swh_storage.extid_add(extids2) + + assert swh_storage.extid_get_from_extid("git", ids) == extids + assert swh_storage.extid_get_from_extid("hg", ids) == nullids + assert swh_storage.extid_get_from_target(ObjectType.REVISION, ids) == extids + + def test_extid_add_target_unicity(self, swh_storage, sample_data): + + ids = [ + revision.id + for revision in sample_data.revisions + if revision.type.value == "git" + ] + nullids = [None] * len(ids) + + extids = [ + ExtID( + extid=extid, + extid_type="git", + target=CoreSWHID(object_id=extid, object_type=ObjectType.REVISION,), + ) + for extid in ids + ] + swh_storage.extid_add(extids) + + # try to add "modified" versions, should be noops + extids2 = [ + ExtID( + extid=extid, + extid_type="git", + target=CoreSWHID(object_id=extid, object_type=ObjectType.RELEASE,), + ) + for extid in ids + ] + swh_storage.extid_add(extids2) + + assert swh_storage.extid_get_from_extid("git", ids) == extids + assert swh_storage.extid_get_from_target(ObjectType.REVISION, ids) == extids + assert swh_storage.extid_get_from_target(ObjectType.RELEASE, ids) == nullids + def test_release_add(self, swh_storage, sample_data): release, release2 = sample_data.releases[:2] diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py --- a/swh/storage/tests/test_backfill.py +++ b/swh/storage/tests/test_backfill.py @@ -10,7 +10,7 @@ import pytest from swh.journal.client import JournalClient -from swh.journal.tests.journal_data import TEST_OBJECTS +from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.storage import get_storage from swh.storage.backfill import ( PARTITION_KEY, @@ -59,7 +59,7 @@ error = ( "Object type unknown-object-type is not supported. " - "The only possible values are %s" % (", ".join(PARTITION_KEY)) + "The only possible values are %s" % (", ".join(sorted(PARTITION_KEY))) ) assert e.value.args[0] == error @@ -206,6 +206,7 @@ "content": lambda start, end: [(None, None)], "skipped_content": lambda start, end: [(None, None)], "directory": lambda start, end: [(None, None)], + "extid": lambda start, end: [(None, None)], "metadata_authority": lambda start, end: [(None, None)], "metadata_fetcher": lambda start, end: [(None, None)], "revision": lambda start, end: [(None, None)], diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py --- a/swh/storage/tests/test_cassandra.py +++ b/swh/storage/tests/test_cassandra.py @@ -17,7 +17,7 @@ from swh.core.api.classes import stream_results from swh.storage import get_storage from swh.storage.cassandra import create_keyspace -from swh.storage.cassandra.model import ContentRow +from swh.storage.cassandra.model import ContentRow, ExtIDRow from swh.storage.cassandra.schema import HASH_ALGORITHMS, TABLES from swh.storage.tests.storage_tests import ( TestStorageGeneratedData as _TestStorageGeneratedData, @@ -381,6 +381,37 @@ def test_content_update(self): pass + def test_extid_murmur3_collision(self, swh_storage, mocker, sample_data): + """The Murmur3 token is used as link from index table to the main + table; and non-matching extid with colliding murmur3-hash + are filtered-out when reading the main table. + This test checks the extid methods do filter out these collision. + """ + swh_storage.extid_add(sample_data.extids) + + # For any token, always return all extids, i.e. make as if all tokens + # for all extid entries collide + def mock_egft(token): + return [ + ExtIDRow( + extid_type=extid.extid_type, + extid=extid.extid, + target_type=extid.target.object_type.value, + target=extid.target.object_id, + ) + for extid in sample_data.extids + ] + + mocker.patch.object( + swh_storage._cql_runner, "extid_get_from_token", mock_egft, + ) + + for extid in sample_data.extids: + extids = swh_storage.extid_get_from_target( + target_type=extid.target.object_type, ids=[extid.target.object_id] + ) + assert extids == [extid] + @pytest.mark.skip( 'The "person" table of the pgsql is a legacy thing, and not ' "supported by the cassandra backend." diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py --- a/swh/storage/tests/test_kafka_writer.py +++ b/swh/storage/tests/test_kafka_writer.py @@ -11,9 +11,9 @@ from hypothesis.strategies import lists from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages -from swh.journal.tests.journal_data import TEST_OBJECTS from swh.model.hypothesis_strategies import objects from swh.model.model import Origin, OriginVisit, Person +from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.storage import get_storage @@ -41,6 +41,7 @@ "content", "skipped_content", "directory", + "extid", "metadata_authority", "metadata_fetcher", "revision", @@ -71,6 +72,7 @@ for obj_type in ( "content", "directory", + "extid", "metadata_authority", "metadata_fetcher", "origin", @@ -127,6 +129,7 @@ for obj_type in ( "content", "directory", + "extid", "metadata_authority", "metadata_fetcher", "origin", diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -14,8 +14,8 @@ from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka -from swh.journal.tests.journal_data import DUPLICATE_CONTENTS, TEST_OBJECTS from swh.model.hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_hex +from swh.model.tests.swh_model_data import DUPLICATE_CONTENTS, TEST_OBJECTS from swh.storage import get_storage from swh.storage.cassandra.model import ContentRow, SkippedContentRow from swh.storage.in_memory import InMemoryStorage @@ -198,6 +198,7 @@ "contents", "skipped_contents", "directories", + "extid", "revisions", "releases", "snapshots", diff --git a/swh/storage/writer.py b/swh/storage/writer.py --- a/swh/storage/writer.py +++ b/swh/storage/writer.py @@ -10,6 +10,7 @@ from swh.model.model import ( Content, Directory, + ExtID, MetadataAuthority, MetadataFetcher, Origin, @@ -115,3 +116,6 @@ def metadata_authority_add(self, authorities: Iterable[MetadataAuthority]) -> None: self.write_additions("metadata_authority", authorities) + + def extid_add(self, extids: Iterable[ExtID]) -> None: + self.write_additions("extid", extids)