diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -64,8 +64,8 @@ data["origin"]["added"].clear() # Insert relations from the origin-revision layer - self.insert_origin_head(data["revision_in_origin"]) self.insert_revision_history(data["revision_before_revision"]) + self.insert_origin_head(data["revision_in_origin"]) # Update preferred origins self.update_preferred_origin( @@ -141,16 +141,29 @@ def insert_origin_head(self, data: Set[Tuple[Sha1Git, Sha1Git]]): if data: + # Insert revisions first, to ensure "foreign keys" exist + # Origins are assumed to be already inserted (they require knowing the url) + psycopg2.extras.execute_values( + self.cursor, + """ + LOCK TABLE ONLY revision; + INSERT INTO revision(sha1) VALUES %s + ON CONFLICT DO NOTHING + """, + {(rev,) for rev, _ in data}, + ) + psycopg2.extras.execute_values( self.cursor, # XXX: not clear how conflicts are handled here! """ LOCK TABLE ONLY revision_in_origin; INSERT INTO revision_in_origin - SELECT R.id, O.id - FROM (VALUES %s) AS V(rev, org) - INNER JOIN revision AS R on (R.sha1=V.rev) - INNER JOIN origin AS O on (O.sha1=V.org) + SELECT R.id, O.id + FROM (VALUES %s) AS V(rev, org) + INNER JOIN revision AS R on (R.sha1=V.rev) + INNER JOIN origin AS O on (O.sha1=V.org) + ON CONFLICT DO NOTHING """, data, ) @@ -159,8 +172,23 @@ def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): ... - def insert_revision_history(self, data: Dict[Sha1Git, Sha1Git]): + def insert_revision_history(self, data: Dict[Sha1Git, Set[Sha1Git]]): if data: + # print(f"Inserting histories: {data}") + # Insert revisions first, to ensure "foreign keys" exist + revisions = set(data) + for rev in data: + revisions.update(data[rev]) + psycopg2.extras.execute_values( + self.cursor, + """ + LOCK TABLE ONLY revision; + INSERT INTO revision(sha1) VALUES %s + ON CONFLICT DO NOTHING + """, + ((rev,) for rev in revisions), + ) + values = [[(prev, next) for next in data[prev]] for prev in data] psycopg2.extras.execute_values( self.cursor, @@ -168,12 +196,13 @@ """ LOCK TABLE ONLY revision_before_revision; INSERT INTO revision_before_revision - SELECT P.id, N.id - FROM (VALUES %s) AS V(prev, next) - INNER JOIN revision AS P on (P.sha1=V.prev) - INNER JOIN revision AS N on (N.sha1=V.next) + SELECT P.id, N.id + FROM (VALUES %s) AS V(prev, next) + INNER JOIN revision AS P on (P.sha1=V.prev) + INNER JOIN revision AS N on (N.sha1=V.next) + ON CONFLICT DO NOTHING """, - tuple(sum(values, [])), + sum(values, []), ) data.clear() diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -109,7 +109,7 @@ class DatetimeCache(TypedDict): - data: Dict[Sha1Git, datetime] + data: Dict[Sha1Git, Optional[datetime]] added: Set[Sha1Git] @@ -209,7 +209,7 @@ yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - return self.get_dates("content", [blob.id]).get(blob.id, None) + return self.get_dates("content", [blob.id]).get(blob.id) def content_get_early_dates( self, blobs: Iterable[FileEntry] @@ -230,7 +230,7 @@ def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: - return self.get_dates("directory", [directory.id]).get(directory.id, None) + return self.get_dates("directory", [directory.id]).get(directory.id) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] @@ -250,15 +250,18 @@ missing_ids = set(id for id in ids if id not in cache) if missing_ids: cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) - return {sha1: cache["data"][sha1] for sha1 in ids if sha1 in cache["data"]} + dates: Dict[Sha1Git, datetime] = {} + for sha1 in ids: + date = cache["data"].get(sha1) + if date is not None: + dates[sha1] = date + return dates def origin_add(self, origin: OriginEntry) -> None: self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) def revision_add(self, revision: RevisionEntry): - # Add current revision to the compact DB - assert revision.date is not None self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) @@ -273,7 +276,7 @@ self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: - return self.get_dates("revision", [revision.id]).get(revision.id, None) + return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin( self, revision: RevisionEntry diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql --- a/swh/provenance/sql/30-schema.sql +++ b/swh/provenance/sql/30-schema.sql @@ -47,7 +47,7 @@ ( id bigserial primary key, -- internal identifier of the revision sha1 sha1_git unique not null, -- intrinsic identifier of the revision - date timestamptz not null, -- timestamp of the revision + date timestamptz, -- timestamp of the revision origin bigint -- id of the preferred origin -- foreign key (org) references origin (id) );