diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index ca68b0e..01fd12a 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,234 +1,263 @@ from datetime import datetime import itertools import logging from typing import Any, Dict, Generator, List, Optional, Set, Tuple import psycopg2 import psycopg2.extras from swh.model.model import Sha1Git class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") self._flavor: Optional[str] = None @property def flavor(self) -> str: if self._flavor is None: self.cursor.execute("select swh_get_dbflavor()") self._flavor = self.cursor.fetchone()[0] assert self._flavor is not None return self._flavor @property def with_path(self) -> bool: return self.flavor == "with-path" def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: try: # First insert entities for entity in ("content", "directory", "revision"): self.insert_entity( entity, { sha1: data[entity]["data"][sha1] for sha1 in data[entity]["added"] }, ) data[entity]["data"].clear() data[entity]["added"].clear() # Relations should come after ids for entities were resolved for relation in ( "content_in_revision", "content_in_directory", "directory_in_revision", ): self.insert_relation(relation, data[relation]) # Insert origins self.insert_origin( { sha1: data["origin"]["data"][sha1] for sha1 in data["origin"]["added"] }, ) data["origin"]["data"].clear() data["origin"]["added"].clear() # Insert relations from the origin-revision layer - self.insert_origin_head(data["revision_in_origin"]) self.insert_revision_history(data["revision_before_revision"]) + self.insert_origin_head(data["revision_in_origin"]) # Update preferred origins self.update_preferred_origin( { sha1: data["revision_origin"]["data"][sha1] for sha1 in data["revision_origin"]["added"] } ) data["revision_origin"]["data"].clear() data["revision_origin"]["added"].clear() return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if raise_on_commit: raise return False def content_find_first( self, id: Sha1Git ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: ... def get_dates(self, entity: str, ids: List[Sha1Git]) -> Dict[Sha1Git, datetime]: dates = {} if ids: values = ", ".join(itertools.repeat("%s", len(ids))) self.cursor.execute( f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", tuple(ids), ) dates.update(self.cursor.fetchall()) return dates def insert_entity(self, entity: str, data: Dict[Sha1Git, datetime]): if data: psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, data.items(), ) # XXX: not sure if Python takes a reference or a copy. # This might be useless! data.clear() def insert_origin(self, data: Dict[Sha1Git, str]): if data: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY origin; INSERT INTO origin(sha1, url) VALUES %s ON CONFLICT DO NOTHING """, data.items(), ) # XXX: not sure if Python takes a reference or a copy. # This might be useless! data.clear() def insert_origin_head(self, data: Set[Tuple[Sha1Git, Sha1Git]]): if data: + # Insert revisions first, to ensure "foreign keys" exist + # Origins are assumed to be already inserted (they require knowing the url) + psycopg2.extras.execute_values( + self.cursor, + """ + LOCK TABLE ONLY revision; + INSERT INTO revision(sha1) VALUES %s + ON CONFLICT DO NOTHING + """, + {(rev,) for rev, _ in data}, + ) + psycopg2.extras.execute_values( self.cursor, # XXX: not clear how conflicts are handled here! """ LOCK TABLE ONLY revision_in_origin; INSERT INTO revision_in_origin - SELECT R.id, O.id - FROM (VALUES %s) AS V(rev, org) - INNER JOIN revision AS R on (R.sha1=V.rev) - INNER JOIN origin AS O on (O.sha1=V.org) + SELECT R.id, O.id + FROM (VALUES %s) AS V(rev, org) + INNER JOIN revision AS R on (R.sha1=V.rev) + INNER JOIN origin AS O on (O.sha1=V.org) + ON CONFLICT DO NOTHING """, data, ) data.clear() def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): ... - def insert_revision_history(self, data: Dict[Sha1Git, Sha1Git]): + def insert_revision_history(self, data: Dict[Sha1Git, Set[Sha1Git]]): if data: + # print(f"Inserting histories: {data}") + # Insert revisions first, to ensure "foreign keys" exist + revisions = set(data) + for rev in data: + revisions.update(data[rev]) + psycopg2.extras.execute_values( + self.cursor, + """ + LOCK TABLE ONLY revision; + INSERT INTO revision(sha1) VALUES %s + ON CONFLICT DO NOTHING + """, + ((rev,) for rev in revisions), + ) + values = [[(prev, next) for next in data[prev]] for prev in data] psycopg2.extras.execute_values( self.cursor, # XXX: not clear how conflicts are handled here! """ LOCK TABLE ONLY revision_before_revision; INSERT INTO revision_before_revision - SELECT P.id, N.id - FROM (VALUES %s) AS V(prev, next) - INNER JOIN revision AS P on (P.sha1=V.prev) - INNER JOIN revision AS N on (N.sha1=V.next) + SELECT P.id, N.id + FROM (VALUES %s) AS V(prev, next) + INNER JOIN revision AS P on (P.sha1=V.prev) + INNER JOIN revision AS N on (N.sha1=V.next) + ON CONFLICT DO NOTHING """, - tuple(sum(values, [])), + sum(values, []), ) data.clear() def revision_get_preferred_origin(self, revision: Sha1Git) -> Optional[Sha1Git]: self.cursor.execute( """ SELECT O.sha1 FROM revision AS R JOIN origin as O ON R.origin=O.id WHERE R.sha1=%s""", (revision,), ) row = self.cursor.fetchone() return row[0] if row is not None else None def revision_in_history(self, revision: Sha1Git) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_before_revision JOIN revision ON revision.id=revision_before_revision.prev WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None def revision_visited(self, revision: Sha1Git) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_in_origin JOIN revision ON revision.id=revision_in_origin.revision WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None def update_preferred_origin(self, data: Dict[Sha1Git, Sha1Git]): if data: # XXX: this is assuming the revision already exists in the db! It should # be improved by allowing null dates in the revision table. psycopg2.extras.execute_values( self.cursor, """ UPDATE revision R SET origin=O.id FROM (VALUES %s) AS V(rev, org) INNER JOIN origin AS O on (O.sha1=V.org) WHERE R.sha1=V.rev """, data.items(), ) data.clear() diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 45fabc4..e4bd5ac 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,306 +1,309 @@ from datetime import datetime import logging import os from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple import psycopg2 from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry # XXX: this protocol doesn't make much sense now that flavours have been delegated to # another class, lower in the callstack. @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, id: Sha1Git ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_add(self, origin: OriginEntry) -> None: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... class DatetimeCache(TypedDict): - data: Dict[Sha1Git, datetime] + data: Dict[Sha1Git, Optional[datetime]] added: Set[Sha1Git] class OriginCache(TypedDict): data: Dict[Sha1Git, str] added: Set[Sha1Git] class RevisionCache(TypedDict): data: Dict[Sha1Git, Sha1Git] added: Set[Sha1Git] class ProvenanceCache(TypedDict): content: DatetimeCache directory: DatetimeCache revision: DatetimeCache # below are insertion caches only content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] # these two are for the origin layer origin: OriginCache revision_origin: RevisionCache revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] def new_cache(): return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), revision=DatetimeCache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), origin=OriginCache(data={}, added=set()), revision_origin=RevisionCache(data={}, added=set()), revision_before_revision={}, revision_in_origin=set(), ) # TODO: maybe move this to a separate file class ProvenanceBackend: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection): from .postgresql.provenancedb_base import ProvenanceDBBase # TODO: this class should not know what the actual used DB is. self.storage: ProvenanceDBBase flavor = ProvenanceDBBase(conn).flavor if flavor == "with-path": from .postgresql.provenancedb_with_path import ProvenanceWithPathDB self.storage = ProvenanceWithPathDB(conn) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB self.storage = ProvenanceWithoutPathDB(conn) self.cache: ProvenanceCache = new_cache() def clear_caches(self): self.cache = new_cache() def commit(self): # TODO: for now we just forward the cache. This should be improved! while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): logging.warning( f"Unable to commit cached information {self.write_cache}. Retrying..." ) self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first( self, id: Sha1Git ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - return self.get_dates("content", [blob.id]).get(blob.id, None) + return self.get_dates("content", [blob.id]).get(blob.id) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime): self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.cache["directory_in_revision"].add( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: - return self.get_dates("directory", [directory.id]).get(directory.id, None) + return self.get_dates("directory", [directory.id]).get(directory.id) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( self, entity: Literal["content", "revision", "directory"], ids: List[Sha1Git] ) -> Dict[Sha1Git, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) - return {sha1: cache["data"][sha1] for sha1 in ids if sha1 in cache["data"]} + dates: Dict[Sha1Git, datetime] = {} + for sha1 in ids: + date = cache["data"].get(sha1) + if date is not None: + dates[sha1] = date + return dates def origin_add(self, origin: OriginEntry) -> None: self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) def revision_add(self, revision: RevisionEntry): - # Add current revision to the compact DB - assert revision.date is not None self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.cache["revision_before_revision"].setdefault(revision.id, set()).add( relative.id ) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: - return self.get_dates("revision", [revision.id]).get(revision.id, None) + return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: cache = self.cache["revision_origin"] if revision.id not in cache: origin = self.storage.revision_get_preferred_origin(revision.id) if origin is not None: cache["data"][revision.id] = origin return cache["data"].get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: return revision.id in self.cache[ "revision_before_revision" ] or self.storage.revision_in_history(revision.id) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): self.cache["revision_origin"]["data"][revision.id] = origin.id self.cache["revision_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: return revision.id in dict( self.cache["revision_in_origin"] ) or self.storage.revision_visited(revision.id) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql index 3deb51d..310e004 100644 --- a/swh/provenance/sql/30-schema.sql +++ b/swh/provenance/sql/30-schema.sql @@ -1,135 +1,135 @@ -- psql variables to get the current database flavor create table dbversion ( version int primary key, release timestamptz, description text ); comment on table dbversion is 'Details of current db version'; comment on column dbversion.version is 'SQL schema version'; comment on column dbversion.release is 'Version deployment timestamp'; comment on column dbversion.description is 'Release description'; -- latest schema version insert into dbversion(version, release, description) values(1, now(), 'Work In Progress'); -- a Git object ID, i.e., a Git-style salted SHA1 checksum create domain sha1_git as bytea check (length(value) = 20); -- UNIX path (absolute, relative, individual path component, etc.) create domain unix_path as bytea; -- entity tables create table content ( id bigserial primary key, -- internal identifier of the content blob sha1 sha1_git unique not null, -- intrinsic identifier of the content blob date timestamptz not null -- timestamp of the revision where the blob appears early ); comment on column content.id is 'Content internal identifier'; comment on column content.sha1 is 'Content intrinsic identifier'; comment on column content.date is 'Earliest timestamp for the content (first seen time)'; create table directory ( id bigserial primary key, -- internal identifier of the directory appearing in an isochrone inner frontier sha1 sha1_git unique not null, -- intrinsic identifier of the directory date timestamptz not null -- max timestamp among those of the directory children's ); comment on column directory.id is 'Directory internal identifier'; comment on column directory.sha1 is 'Directory intrinsic identifier'; comment on column directory.date is 'Latest timestamp for the content in the directory'; create table revision ( id bigserial primary key, -- internal identifier of the revision sha1 sha1_git unique not null, -- intrinsic identifier of the revision - date timestamptz not null, -- timestamp of the revision + date timestamptz, -- timestamp of the revision origin bigint -- id of the preferred origin -- foreign key (org) references origin (id) ); comment on column revision.id is 'Revision internal identifier'; comment on column revision.sha1 is 'Revision intrinsic identifier'; comment on column revision.date is 'Revision timestamp'; comment on column revision.origin is 'preferred origin for the revision'; create table location ( id bigserial primary key, -- internal identifier of the location path unix_path unique not null -- path to the location ); comment on column location.id is 'Location internal identifier'; comment on column location.path is 'Path to the location'; create table origin ( id bigserial primary key, -- internal identifier of the origin sha1 sha1_git unique not null, -- intrinsic identifier of the origin url unix_path unique not null -- url of the origin ); comment on column origin.id is 'Origin internal identifier'; comment on column origin.sha1 is 'Origin intrinsic identifier'; comment on column origin.url is 'URL of the origin'; -- relation tables create table content_in_revision ( content bigint not null, -- internal identifier of the content blob revision bigint not null, -- internal identifier of the revision where the blob appears for the first time location bigint -- location of the content relative to the revision root directory -- foreign key (blob) references content (id), -- foreign key (rev) references revision (id), -- foreign key (loc) references location (id) ); comment on column content_in_revision.content is 'Content internal identifier'; comment on column content_in_revision.revision is 'Revision internal identifier'; comment on column content_in_revision.location is 'Location of content in revision'; create table content_in_directory ( content bigint not null, -- internal identifier of the content blob directory bigint not null, -- internal identifier of the directory containing the blob location bigint -- location of the content relative to its parent directory in the isochrone frontier -- foreign key (blob) references content (id), -- foreign key (dir) references directory (id), -- foreign key (loc) references location (id) ); comment on column content_in_directory.content is 'Content internal identifier'; comment on column content_in_directory.directory is 'Directory internal identifier'; comment on column content_in_directory.location is 'Location of content in directory'; create table directory_in_revision ( directory bigint not null, -- internal identifier of the directory appearing in the revision revision bigint not null, -- internal identifier of the revision containing the directory location bigint -- location of the directory relative to the revision root directory -- foreign key (dir) references directory (id), -- foreign key (rev) references revision (id), -- foreign key (loc) references location (id) ); comment on column directory_in_revision.directory is 'Directory internal identifier'; comment on column directory_in_revision.revision is 'Revision internal identifier'; comment on column directory_in_revision.location is 'Location of directory in revision'; create table revision_in_origin ( revision bigint not null, -- internal identifier of the revision poined by the origin origin bigint not null -- internal identifier of the origin that points to the revision -- foreign key (rev) references revision (id), -- foreign key (org) references origin (id) ); comment on column revision_in_origin.revision is 'Revision internal identifier'; comment on column revision_in_origin.origin is 'Origin internal identifier'; create table revision_before_revision ( prev bigserial not null, -- internal identifier of the source revision next bigserial not null -- internal identifier of the destination revision -- foreign key (prev) references revision (id), -- foreign key (next) references revision (id) ); comment on column revision_before_revision.prev is 'Source revision internal identifier'; comment on column revision_before_revision.next is 'Destination revision internal identifier';