diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -33,7 +33,6 @@ try: # First insert entities for entity in ("content", "directory", "revision"): - self.insert_entity( entity, { @@ -41,47 +40,82 @@ for sha1 in data[entity]["added"] }, ) + data[entity]["data"].clear() + data[entity]["added"].clear() # Relations should come after ids for entities were resolved - for rel_table in ( + for relation in ( "content_in_revision", "content_in_directory", "directory_in_revision", ): - self.insert_relation(rel_table, data[rel_table]) + self.insert_relation(relation, data[relation]) # TODO: this should be updated when origin-revision layer gets properly # updated. - # if data["revision_before_revision"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """ - # LOCK TABLE ONLY revision_before_revision; - # INSERT INTO revision_before_revision VALUES %s - # ON CONFLICT DO NOTHING - # """, - # data["revision_before_revision"], - # ) - # data["revision_before_revision"].clear() - # - # if data["revision_in_origin"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """ - # LOCK TABLE ONLY revision_in_origin; - # INSERT INTO revision_in_origin VALUES %s - # ON CONFLICT DO NOTHING - # """, - # data["revision_in_origin"], - # ) - # data["revision_in_origin"].clear() + if data["revision_before_revision"]: + values = [ + [(prev, next) for next in data["revision_before_revision"][prev]] + for prev in data["revision_before_revision"] + ] + psycopg2.extras.execute_values( + self.cursor, + # XXX: not clear how conflicts are handled here! + """ + LOCK TABLE ONLY revision_before_revision; + INSERT INTO revision_before_revision + SELECT P.id, N.id + FROM (VALUES %s) AS V(prev, next) + INNER JOIN revision AS P on (P.sha1=V.prev) + INNER JOIN revision AS N on (N.sha1=V.next) + """, + tuple(sum(values, [])), + ) + data["revision_before_revision"].clear() + + if data["revision_in_origin"]: + psycopg2.extras.execute_values( + self.cursor, + # XXX: not clear how conflicts are handled here! + """ + LOCK TABLE ONLY revision_in_origin; + INSERT INTO revision_in_origin + SELECT R.id, V.org + FROM (VALUES %s) AS V(rev, org) + INNER JOIN revision AS R on (R.sha1=V.rev) + """, + data["revision_in_origin"], + ) + data["revision_in_origin"].clear() + + if data["revision_preferred_origin"]["added"]: + # XXX: this is assuming the revision already exists in the db! It should + # be improved by allowing null dates in the revision table. + tuples = [ + (sha1, data["revision_preferred_origin"]["data"][sha1]) + for sha1 in data["revision_preferred_origin"]["added"] + ] + psycopg2.extras.execute_values( + self.cursor, + """ + UPDATE revision + SET origin=V.org + FROM (VALUES %s) AS V(rev, org) + WHERE sha1=V.rev + """, + tuples, + ) + data["revision_preferred_origin"]["data"].clear() + data["revision_preferred_origin"]["added"].clear() return True + except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if raise_on_commit: raise + return False def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: @@ -102,8 +136,8 @@ f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) + ON CONFLICT (sha1) DO + UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, data.items(), ) @@ -126,18 +160,19 @@ def origin_get_id(self, url: str) -> int: # Insert origin in the DB and return the assigned id + # XXX: not sure this works as expected if url is already in the db! self.cursor.execute( """ LOCK TABLE ONLY origin; INSERT INTO origin(url) VALUES (%s) - ON CONFLICT DO NOTHING - RETURNING id + ON CONFLICT DO NOTHING + RETURNING id """, (url,), ) return self.cursor.fetchone()[0] - def revision_get_preferred_origin(self, revision: bytes) -> int: + def revision_get_preferred_origin(self, revision: bytes) -> Optional[int]: self.cursor.execute( """SELECT COALESCE(origin, 0) FROM revision WHERE sha1=%s""", (revision,) ) @@ -159,11 +194,6 @@ ) return self.cursor.fetchone() is not None - def revision_set_preferred_origin(self, origin: int, revision: bytes): - self.cursor.execute( - """UPDATE revision SET origin=%s WHERE sha1=%s""", (origin, revision) - ) - def revision_visited(self, revision: bytes) -> bool: self.cursor.execute( """ diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -76,7 +76,6 @@ "content_in_directory", "directory_in_revision", ) - # insert missing locations src, dst = relation.split("_in_") # insert missing locations @@ -86,18 +85,21 @@ """ LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s - ON CONFLICT (path) DO NOTHING + ON CONFLICT (path) DO NOTHING """, locations, ) - sql = f""" - LOCK TABLE ONLY {relation}; - INSERT INTO {relation} - SELECT {src}.id, {dst}.id, location.id - FROM (VALUES %s) AS V(src, dst, path) - INNER JOIN {src} on ({src}.sha1=V.src) - INNER JOIN {dst} on ({dst}.sha1=V.dst) - INNER JOIN location on (location.path=V.path) - """ - psycopg2.extras.execute_values(self.cursor, sql, data) + psycopg2.extras.execute_values( + self.cursor, + f""" + LOCK TABLE ONLY {relation}; + INSERT INTO {relation} + SELECT {src}.id, {dst}.id, location.id + FROM (VALUES %s) AS V(src, dst, path) + INNER JOIN {src} on ({src}.sha1=V.src) + INNER JOIN {dst} on ({dst}.sha1=V.dst) + INNER JOIN location on (location.path=V.path) + """, + data, + ) data.clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -68,16 +68,18 @@ "content_in_directory", "directory_in_revision", ) - # insert missing locations src, dst = relation.split("_in_") - sql = f""" - LOCK TABLE ONLY {relation}; - INSERT INTO {relation} - SELECT {src}.id, {dst}.id - FROM (VALUES %s) AS V(src, dst) - INNER JOIN {src} on ({src}.sha1=V.src) - INNER JOIN {dst} on ({dst}.sha1=V.dst) - """ - psycopg2.extras.execute_values(self.cursor, sql, data) + psycopg2.extras.execute_values( + self.cursor, + f""" + LOCK TABLE ONLY {relation}; + INSERT INTO {relation} + SELECT {src}.id, {dst}.id + FROM (VALUES %s) AS V(src, dst) + INNER JOIN {src} on ({src}.sha1=V.src) + INNER JOIN {dst} on ({dst}.sha1=V.dst) + """, + data, + ) data.clear() diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,7 +1,7 @@ from datetime import datetime import logging import os -from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple +from typing import Any, Dict, Generator, Iterable, List, Optional, Set, Tuple import psycopg2 from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable @@ -89,7 +89,7 @@ def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... - def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: + def revision_get_preferred_origin(self, revision: RevisionEntry) -> Optional[int]: ... def revision_in_history(self, revision: RevisionEntry) -> bool: @@ -105,7 +105,7 @@ class Cache(TypedDict): - data: Dict[bytes, datetime] + data: Dict[bytes, Any] added: Set[bytes] @@ -118,8 +118,9 @@ content_in_directory: Set[Tuple[bytes, bytes, bytes]] directory_in_revision: Set[Tuple[bytes, bytes, bytes]] # these two are for the origin layer - revision_before_revision: List[Tuple[bytes, bytes]] + revision_before_revision: Dict[bytes, List[bytes]] revision_in_origin: List[Tuple[bytes, int]] + revision_preferred_origin: Cache def new_cache(): @@ -130,8 +131,9 @@ content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), - revision_before_revision=[], + revision_before_revision={}, revision_in_origin=[], + revision_preferred_origin=Cache(data={}, added=set()), ) @@ -159,7 +161,7 @@ self.cache = new_cache() def commit(self): - # TODO: for now we just forward the write_cache. This should be improved! + # TODO: for now we just forward the cache. This should be improved! while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): logging.warning( f"Unable to commit cached information {self.write_cache}. Retrying..." @@ -249,7 +251,9 @@ def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): - self.cache["revision_before_revision"].append((revision.id, relative.id)) + self.cache["revision_before_revision"][revision.id] = self.cache[ + "revision_before_revision" + ].get(revision.id, []) + [relative.id] def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): assert origin.id is not None @@ -258,24 +262,30 @@ def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id, None) - def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: - # TODO: adapt this method to consider cached values - return self.storage.revision_get_preferred_origin(revision.id) + def revision_get_preferred_origin(self, revision: RevisionEntry) -> Optional[int]: + origin = self.cache["revision_preferred_origin"]["data"].get(revision.id, None) + if origin is None: + origin = self.storage.revision_get_preferred_origin(revision.id) + if origin is not None: + self.cache["revision_preferred_origin"]["data"][revision.id] = origin + return origin def revision_in_history(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values - return self.storage.revision_in_history(revision.id) + return revision.id in self.cache[ + "revision_before_revision" + ] or self.storage.revision_in_history(revision.id) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): assert origin.id is not None - # TODO: adapt this method to consider cached values - self.storage.revision_set_preferred_origin(origin.id, revision.id) + self.cache["revision_preferred_origin"]["data"][revision.id] = origin.id + self.cache["revision_preferred_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values - return self.storage.revision_visited(revision.id) + return revision.id in dict( + self.cache["revision_in_origin"] + ) or self.storage.revision_visited(revision.id) def normalize(path: bytes) -> bytes: diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql --- a/swh/provenance/sql/30-schema.sql +++ b/swh/provenance/sql/30-schema.sql @@ -2,9 +2,9 @@ create table dbversion ( - version int primary key, - release timestamptz, - description text + version int primary key, + release timestamptz, + description text ); comment on table dbversion is 'Details of current db version'; @@ -14,7 +14,7 @@ -- latest schema version insert into dbversion(version, release, description) - values(1, now(), 'Work In Progress'); + values(1, now(), 'Work In Progress'); -- a Git object ID, i.e., a Git-style salted SHA1 checksum create domain sha1_git as bytea check (length(value) = 20); @@ -77,7 +77,7 @@ ( content bigint not null, -- internal identifier of the content blob revision bigint not null, -- internal identifier of the revision where the blob appears for the first time - location bigint -- location of the content relative to the revision root directory + location bigint -- location of the content relative to the revision root directory -- foreign key (blob) references content (id), -- foreign key (rev) references revision (id), -- foreign key (loc) references location (id)