diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index 17f8392..b1b0af4 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,173 +1,165 @@ from datetime import datetime import itertools import logging from typing import Any, Dict, Generator, List, Optional, Set, Tuple import psycopg2 import psycopg2.extras class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: try: # First insert entities for entity in ("content", "directory", "revision"): self.insert_entity( entity, { sha1: data[entity]["data"][sha1] for sha1 in data[entity]["added"] }, ) # Relations should come after ids for entities were resolved - self.insert_relation( - "content", - "revision", - "content_early_in_rev", - data["content_early_in_rev"], - ) - self.insert_relation( - "content", "directory", "content_in_dir", data["content_in_dir"] - ) - self.insert_relation( - "directory", "revision", "directory_in_rev", data["directory_in_rev"] - ) + for rel_table in ( + "content_in_revision", + "content_in_directory", + "directory_in_revision", + ): + self.insert_relation(rel_table, data[rel_table]) # TODO: this should be updated when origin-revision layer gets properly # updated. - # if data["revision_before_rev"]: + # if data["revision_before_revision"]: # psycopg2.extras.execute_values( # self.cursor, # """ - # LOCK TABLE ONLY revision_before_rev; - # INSERT INTO revision_before_rev VALUES %s + # LOCK TABLE ONLY revision_before_revision; + # INSERT INTO revision_before_revision VALUES %s # ON CONFLICT DO NOTHING # """, - # data["revision_before_rev"], + # data["revision_before_revision"], # ) - # data["revision_before_rev"].clear() + # data["revision_before_revision"].clear() # - # if data["revision_in_org"]: + # if data["revision_in_origin"]: # psycopg2.extras.execute_values( # self.cursor, # """ - # LOCK TABLE ONLY revision_in_org; - # INSERT INTO revision_in_org VALUES %s + # LOCK TABLE ONLY revision_in_origin; + # INSERT INTO revision_in_origin VALUES %s # ON CONFLICT DO NOTHING # """, - # data["revision_in_org"], + # data["revision_in_origin"], # ) - # data["revision_in_org"].clear() + # data["revision_in_origin"].clear() return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if raise_on_commit: raise return False def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: dates = {} if ids: values = ", ".join(itertools.repeat("%s", len(ids))) self.cursor.execute( f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", tuple(ids), ) dates.update(self.cursor.fetchall()) return dates def insert_entity(self, entity: str, data: Dict[bytes, datetime]): if data: psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, data.items(), ) # XXX: not sure if Python takes a reference or a copy. # This might be useless! data.clear() - def insert_relation( - self, src: str, dst: str, relation: str, data: Set[Tuple[bytes, bytes, bytes]] - ): + def insert_relation(self, relation: str, data: Set[Tuple[bytes, bytes, bytes]]): ... def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def origin_get_id(self, url: str) -> int: # Insert origin in the DB and return the assigned id self.cursor.execute( """ LOCK TABLE ONLY origin; INSERT INTO origin(url) VALUES (%s) ON CONFLICT DO NOTHING RETURNING id """, (url,), ) return self.cursor.fetchone()[0] def revision_get_preferred_origin(self, revision: bytes) -> int: self.cursor.execute( - """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision,) + """SELECT COALESCE(origin, 0) FROM revision WHERE sha1=%s""", (revision,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 - FROM revision_before_rev + FROM revision_before_revision JOIN revision - ON revision.id=revision_before_rev.prev + ON revision.id=revision_before_revision.prev WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None def revision_set_preferred_origin(self, origin: int, revision: bytes): self.cursor.execute( - """UPDATE revision SET org=%s WHERE sha1=%s""", (origin, revision) + """UPDATE revision SET origin=%s WHERE sha1=%s""", (origin, revision) ) def revision_visited(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 - FROM revision_in_org + FROM revision_in_origin JOIN revision - ON revision.id=revision_in_org.rev + ON revision.id=revision_in_origin.revision WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py index 4dcc1f4..10520ab 100644 --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,98 +1,103 @@ from datetime import datetime from typing import Generator, Optional, Set, Tuple import psycopg2 import psycopg2.extras from .provenancedb_base import ProvenanceDBBase class ProvenanceWithPathDB(ProvenanceDBBase): def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """ SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, L.path AS path FROM content AS C - INNER JOIN content_early_in_rev AS CR ON (CR.blob = C.id) - INNER JOIN location as L ON (CR.loc = L.id) - INNER JOIN revision as R ON (CR.rev = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content = C.id) + INNER JOIN location as L ON (CR.location = L.id) + INNER JOIN revision as R ON (CR.revision = R.id) WHERE C.sha1=%s ORDER BY date, rev, path ASC LIMIT 1 """, (blob,), ) return self.cursor.fetchone() def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, L.path AS path FROM content AS C - INNER JOIN content_early_in_rev AS CR ON (CR.blob = C.id) - INNER JOIN location AS L ON (CR.loc = L.id) - INNER JOIN revision AS R ON (CR.rev = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content = C.id) + INNER JOIN location AS L ON (CR.location = L.id) + INNER JOIN revision AS R ON (CR.revision = R.id) WHERE C.sha1=%s) UNION - (SELECT C.sha1 AS blob, - R.sha1 AS rev, + (SELECT C.sha1 AS content, + R.sha1 AS revision, R.date AS date, CASE DL.path WHEN '' THEN CL.path WHEN '.' THEN CL.path ELSE (DL.path || '/' || CL.path)::unix_path END AS path FROM content AS C - INNER JOIN content_in_dir AS CD ON (C.id = CD.blob) - INNER JOIN directory_in_rev AS DR ON (CD.dir = DR.dir) - INNER JOIN revision AS R ON (DR.rev = R.id) - INNER JOIN location AS CL ON (CD.loc = CL.id) - INNER JOIN location AS DL ON (DR.loc = DL.id) + INNER JOIN content_in_directory AS CD ON (C.id = CD.content) + INNER JOIN directory_in_revision AS DR ON (CD.directory = DR.directory) + INNER JOIN revision AS R ON (DR.revision = R.id) + INNER JOIN location AS CL ON (CD.location = CL.id) + INNER JOIN location AS DL ON (DR.location = DL.id) WHERE C.sha1=%s) ORDER BY date, rev, path {early_cut} """, (blob, blob), ) - # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. yield from self.cursor.fetchall() - def insert_relation( - self, src: str, dst: str, relation: str, data: Set[Tuple[bytes, bytes, bytes]] - ): + def insert_relation(self, relation: str, data: Set[Tuple[bytes, bytes, bytes]]): """Insert entries in `relation` from `data` Also insert missing location entries in the 'location' table. """ if data: + assert relation in ( + "content_in_revision", + "content_in_directory", + "directory_in_revision", + ) + # insert missing locations + src, dst = relation.split("_in_") + # insert missing locations locations = tuple(set((loc,) for (_, _, loc) in data)) psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s ON CONFLICT (path) DO NOTHING """, locations, ) sql = f""" LOCK TABLE ONLY {relation}; INSERT INTO {relation} SELECT {src}.id, {dst}.id, location.id FROM (VALUES %s) AS V(src, dst, path) INNER JOIN {src} on ({src}.sha1=V.src) INNER JOIN {dst} on ({dst}.sha1=V.dst) INNER JOIN location on (location.path=V.path) """ psycopg2.extras.execute_values(self.cursor, sql, data) data.clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py index 5e4237a..581bb81 100644 --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,97 +1,83 @@ from datetime import datetime from typing import Generator, Optional, Set, Tuple import psycopg2 import psycopg2.extras from .provenancedb_base import ProvenanceDBBase ######################################################################################## ######################################################################################## ######################################################################################## class ProvenanceWithoutPathDB(ProvenanceDBBase): def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """ - SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT content_early_in_rev.rev - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ORDER BY date, rev ASC LIMIT 1 + SELECT C.sha1 AS blob, + R.sha1 AS rev, + R.date AS date, + '\\x'::bytea as path + FROM content AS C + INNER JOIN content_in_revision AS CR ON (CR.content = C.id) + INNER JOIN revision as R ON (CR.revision = R.id) + WHERE C.sha1=%s + ORDER BY date, rev ASC LIMIT 1 """, (blob,), ) - row = self.cursor.fetchone() - if row is not None: - # TODO: query revision from the archive and look for blob into a - # recursive directory_ls of the revision's root. - return blob, row[0], row[1], b"" - return None + return self.cursor.fetchone() def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" - (SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT content_early_in_rev.rev - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ) + (SELECT C.sha1 AS blob, + R.sha1 AS rev, + R.date AS date, + '\\x'::bytea as path + FROM content AS C + INNER JOIN content_in_revision AS CR ON (CR.content = C.id) + INNER JOIN revision AS R ON (CR.revision = R.id) + WHERE C.sha1=%s) UNION - (SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT directory_in_rev.rev - FROM (SELECT content_in_dir.dir - FROM content_in_dir - JOIN content - ON content_in_dir.blob=content.id - WHERE content.sha1=%s - ) AS content_dir - JOIN directory_in_rev - ON directory_in_rev.dir=content_dir.dir - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ) - ORDER BY date, rev {early_cut} + (SELECT C.sha1 AS content, + R.sha1 AS revision, + R.date AS date, + '\\x'::bytea as path + FROM content AS C + INNER JOIN content_in_directory AS CD ON (C.id = CD.content) + INNER JOIN directory_in_revision AS DR ON (CD.directory = DR.directory) + INNER JOIN revision AS R ON (DR.revision = R.id) + WHERE C.sha1=%s) + ORDER BY date, rev, path {early_cut} """, (blob, blob), ) - # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. - for row in self.cursor.fetchall(): - # TODO: query revision from the archive and look for blob into a - # recursive directory_ls of the revision's root. - yield blob, row[0], row[1], b"" + yield from self.cursor.fetchall() - def insert_relation( - self, src: str, dst: str, relation: str, data: Set[Tuple[bytes, bytes, bytes]] - ): + def insert_relation(self, relation: str, data: Set[Tuple[bytes, bytes, bytes]]): if data: + assert relation in ( + "content_in_revision", + "content_in_directory", + "directory_in_revision", + ) + # insert missing locations + src, dst = relation.split("_in_") + sql = f""" LOCK TABLE ONLY {relation}; INSERT INTO {relation} SELECT {src}.id, {dst}.id FROM (VALUES %s) AS V(src, dst) INNER JOIN {src} on ({src}.sha1=V.src) INNER JOIN {dst} on ({dst}.sha1=V.dst) """ psycopg2.extras.execute_values(self.cursor, sql, data) data.clear() diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 8c869a9..9adf67d 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,280 +1,282 @@ from datetime import datetime import logging import os from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple import psycopg2 from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry # XXX: this protocol doesn't make much sense now that flavours have been delegated to # another class, lower in the callstack. @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... class Cache(TypedDict): data: Dict[bytes, datetime] added: Set[bytes] class ProvenanceCache(TypedDict): content: Cache directory: Cache revision: Cache # below are insertion caches only - content_early_in_rev: Set[Tuple[bytes, bytes, bytes]] - content_in_dir: Set[Tuple[bytes, bytes, bytes]] - directory_in_rev: Set[Tuple[bytes, bytes, bytes]] + content_in_revision: Set[Tuple[bytes, bytes, bytes]] + content_in_directory: Set[Tuple[bytes, bytes, bytes]] + directory_in_revision: Set[Tuple[bytes, bytes, bytes]] # these two are for the origin layer - revision_before_rev: List[Tuple[bytes, bytes]] - revision_in_org: List[Tuple[bytes, int]] + revision_before_revision: List[Tuple[bytes, bytes]] + revision_in_origin: List[Tuple[bytes, int]] def new_cache(): return ProvenanceCache( content=Cache(data={}, added=set()), directory=Cache(data={}, added=set()), revision=Cache(data={}, added=set()), - content_early_in_rev=set(), - content_in_dir=set(), - directory_in_rev=set(), - revision_before_rev=[], - revision_in_org=[], + content_in_revision=set(), + content_in_directory=set(), + directory_in_revision=set(), + revision_before_revision=[], + revision_in_origin=[], ) # TODO: maybe move this to a separate file class ProvenanceBackend: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection, with_path: bool = True): from .postgresql.provenancedb_base import ProvenanceDBBase # TODO: this class should not know what the actual used DB is. self.storage: ProvenanceDBBase if with_path: from .postgresql.provenancedb_with_path import ProvenanceWithPathDB self.storage = ProvenanceWithPathDB(conn) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB self.storage = ProvenanceWithoutPathDB(conn) self.cache: ProvenanceCache = new_cache() def clear_caches(self): self.cache = new_cache() def commit(self): # TODO: for now we just forward the write_cache. This should be improved! while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): logging.warning( f"Unable to commit cached information {self.write_cache}. Retrying..." ) self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): - self.cache["content_in_dir"].add( + self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): - self.cache["content_early_in_rev"].add( + self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: return self.storage.content_find_first(blob) def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: yield from self.storage.content_find_all(blob, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id, None) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime): self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): - self.cache["directory_in_rev"].add((directory.id, revision.id, normalize(path))) + self.cache["directory_in_revision"].add( + (directory.id, revision.id, normalize(path)) + ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id, None) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( self, entity: Literal["content", "revision", "directory"], ids: List[bytes] ) -> Dict[bytes, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) return {sha1: cache["data"][sha1] for sha1 in ids if sha1 in cache["data"]} def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: return self.storage.origin_get_id(origin.url) else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB assert revision.date is not None self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): - self.cache["revision_before_rev"].append((revision.id, relative.id)) + self.cache["revision_before_revision"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): assert origin.id is not None - self.cache["revision_in_org"].append((revision.id, origin.id)) + self.cache["revision_in_origin"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id, None) def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values return self.storage.revision_get_preferred_origin(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values return self.storage.revision_in_history(revision.id) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): assert origin.id is not None # TODO: adapt this method to consider cached values self.storage.revision_set_preferred_origin(origin.id, revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values return self.storage.revision_visited(revision.id) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql index f4a3591..55fabce 100644 --- a/swh/provenance/sql/30-schema.sql +++ b/swh/provenance/sql/30-schema.sql @@ -1,152 +1,133 @@ -- psql variables to get the current database flavor -select swh_get_dbflavor() = 'with-path' as dbflavor_with_path \gset - create table dbversion ( version int primary key, release timestamptz, description text ); comment on table dbversion is 'Details of current db version'; comment on column dbversion.version is 'SQL schema version'; comment on column dbversion.release is 'Version deployment timestamp'; comment on column dbversion.description is 'Release description'; -- latest schema version insert into dbversion(version, release, description) values(1, now(), 'Work In Progress'); -- a Git object ID, i.e., a Git-style salted SHA1 checksum create domain sha1_git as bytea check (length(value) = 20); -- UNIX path (absolute, relative, individual path component, etc.) create domain unix_path as bytea; +-- entity tables create table content ( id bigserial primary key, -- internal identifier of the content blob sha1 sha1_git unique not null, -- intrinsic identifier of the content blob date timestamptz not null -- timestamp of the revision where the blob appears early ); comment on column content.id is 'Content internal identifier'; comment on column content.sha1 is 'Content intrinsic identifier'; comment on column content.date is 'Earliest timestamp for the content (first seen time)'; -create table content_early_in_rev -( - blob bigint not null, -- internal identifier of the content blob - rev bigint not null -- internal identifier of the revision where the blob appears for the first time -\if :dbflavor_with_path - , - loc bigint not null -- location of the content relative to the revision root directory -\endif - -- foreign key (blob) references content (id), - -- foreign key (rev) references revision (id), - -- foreign key (loc) references location (id) -); -comment on column content_early_in_rev.blob is 'Content internal identifier'; -comment on column content_early_in_rev.rev is 'Revision internal identifier'; -\if :dbflavor_with_path -comment on column content_early_in_rev.loc is 'Location of content in revision'; -\endif - -create table content_in_dir -( - blob bigint not null, -- internal identifier of the content blob - dir bigint not null -- internal identifier of the directory containing the blob -\if :dbflavor_with_path - , - loc bigint not null -- location of the content relative to its parent directory in the isochrone frontier -\endif - -- foreign key (blob) references content (id), - -- foreign key (dir) references directory (id), - -- foreign key (loc) references location (id) -); -comment on column content_in_dir.blob is 'Content internal identifier'; -comment on column content_in_dir.dir is 'Directory internal identifier'; -\if :dbflavor_with_path -comment on column content_in_dir.loc is 'Location of content in directory'; -\endif - create table directory ( id bigserial primary key, -- internal identifier of the directory appearing in an isochrone inner frontier sha1 sha1_git unique not null, -- intrinsic identifier of the directory date timestamptz not null -- max timestamp among those of the directory children's ); comment on column directory.id is 'Directory internal identifier'; comment on column directory.sha1 is 'Directory intrinsic identifier'; comment on column directory.date is 'Latest timestamp for the content in the directory'; -create table directory_in_rev +create table revision ( - dir bigint not null, -- internal identifier of the directory appearing in the revision - rev bigint not null -- internal identifier of the revision containing the directory -\if :dbflavor_with_path - , - loc bigint not null -- location of the directory relative to the revision root directory -\endif - -- foreign key (dir) references directory (id), - -- foreign key (rev) references revision (id), - -- foreign key (loc) references location (id) + id bigserial primary key, -- internal identifier of the revision + sha1 sha1_git unique not null, -- intrinsic identifier of the revision + date timestamptz not null, -- timestamp of the revision + origin bigint -- id of the preferred origin + -- foreign key (org) references origin (id) +); +comment on column revision.id is 'Revision internal identifier'; +comment on column revision.sha1 is 'Revision intrinsic identifier'; +comment on column revision.date is 'Revision timestamp'; +comment on column revision.origin is 'preferred origin for the revision'; + +create table location +( + id bigserial primary key, -- internal identifier of the location + path unix_path unique not null -- path to the location ); -comment on column directory_in_rev.dir is 'Directory internal identifier'; -comment on column directory_in_rev.rev is 'Revision internal identifier'; -\if :dbflavor_with_path -comment on column directory_in_rev.loc is 'Location of directory in revision'; -\endif +comment on column location.id is 'Location internal identifier'; +comment on column location.path is 'Path to the location'; create table origin ( id bigserial primary key, -- internal identifier of the origin url unix_path unique not null -- url of the origin ); comment on column origin.id is 'Origin internal identifier'; comment on column origin.url is 'URL of the origin'; -create table revision +-- relation tables +create table content_in_revision ( - id bigserial primary key, -- internal identifier of the revision - sha1 sha1_git unique not null, -- intrinsic identifier of the revision - date timestamptz not null, -- timestamp of the revision - org bigint -- id of the preferred origin - -- foreign key (org) references origin (id) + content bigint not null, -- internal identifier of the content blob + revision bigint not null, -- internal identifier of the revision where the blob appears for the first time + location bigint -- location of the content relative to the revision root directory + -- foreign key (blob) references content (id), + -- foreign key (rev) references revision (id), + -- foreign key (loc) references location (id) ); -comment on column revision.id is 'Revision internal identifier'; -comment on column revision.sha1 is 'Revision intrinsic identifier'; -comment on column revision.date is 'Revision timestamp'; -comment on column revision.org is 'preferred origin for the revision'; +comment on column content_in_revision.content is 'Content internal identifier'; +comment on column content_in_revision.revision is 'Revision internal identifier'; +comment on column content_in_revision.location is 'Location of content in revision'; -create table revision_before_rev +create table content_in_directory ( - prev bigserial not null, -- internal identifier of the source revision - next bigserial not null, -- internal identifier of the destination revision - primary key (prev, next) - -- foreign key (prev) references revision (id), - -- foreign key (next) references revision (id) + content bigint not null, -- internal identifier of the content blob + directory bigint not null, -- internal identifier of the directory containing the blob + location bigint -- location of the content relative to its parent directory in the isochrone frontier + -- foreign key (blob) references content (id), + -- foreign key (dir) references directory (id), + -- foreign key (loc) references location (id) +); +comment on column content_in_directory.content is 'Content internal identifier'; +comment on column content_in_directory.directory is 'Directory internal identifier'; +comment on column content_in_directory.location is 'Location of content in directory'; + +create table directory_in_revision +( + directory bigint not null, -- internal identifier of the directory appearing in the revision + revision bigint not null, -- internal identifier of the revision containing the directory + location bigint -- location of the directory relative to the revision root directory + -- foreign key (dir) references directory (id), + -- foreign key (rev) references revision (id), + -- foreign key (loc) references location (id) ); -comment on column revision_before_rev.prev is 'Source revision internal identifier'; -comment on column revision_before_rev.next is 'Destination revision internal identifier'; +comment on column directory_in_revision.directory is 'Directory internal identifier'; +comment on column directory_in_revision.revision is 'Revision internal identifier'; +comment on column directory_in_revision.location is 'Location of directory in revision'; -create table revision_in_org +create table revision_in_origin ( - rev bigint not null, -- internal identifier of the revision poined by the origin - org bigint not null, -- internal identifier of the origin that points to the revision - primary key (rev, org) + revision bigint not null, -- internal identifier of the revision poined by the origin + origin bigint not null -- internal identifier of the origin that points to the revision -- foreign key (rev) references revision (id), -- foreign key (org) references origin (id) ); -comment on column revision_in_org.rev is 'Revision internal identifier'; -comment on column revision_in_org.org is 'Origin internal identifier'; +comment on column revision_in_origin.revision is 'Revision internal identifier'; +comment on column revision_in_origin.origin is 'Origin internal identifier'; -\if :dbflavor_with_path -create table location +create table revision_before_revision ( - id bigserial primary key, -- internal identifier of the location - path unix_path unique not null -- path to the location + prev bigserial not null, -- internal identifier of the source revision + next bigserial not null -- internal identifier of the destination revision + -- foreign key (prev) references revision (id), + -- foreign key (next) references revision (id) ); -comment on column location.id is 'Location internal identifier'; -comment on column location.path is 'Path to the location'; -\endif +comment on column revision_before_revision.prev is 'Source revision internal identifier'; +comment on column revision_before_revision.next is 'Destination revision internal identifier'; diff --git a/swh/provenance/sql/60-indexes.sql b/swh/provenance/sql/60-indexes.sql index a3dd4e0..5a51468 100644 --- a/swh/provenance/sql/60-indexes.sql +++ b/swh/provenance/sql/60-indexes.sql @@ -1,12 +1,9 @@ -- psql variables to get the current database flavor -select swh_get_dbflavor() = 'with-path' as dbflavor_with_path \gset -\if :dbflavor_with_path -alter table content_early_in_rev add primary key (blob, rev, loc); -alter table content_in_dir add primary key (blob, dir, loc); -alter table directory_in_rev add primary key (dir, rev, loc); -\else -alter table content_early_in_rev add primary key (blob, rev); -alter table content_in_dir add primary key (blob, dir); -alter table directory_in_rev add primary key (dir, rev); -\endif +-- create unique indexes (instead of pkey) because location might be null for +-- the without-path flavor +create unique index on content_in_revision(content, revision, location); +create unique index on directory_in_revision(directory, revision, location); +create unique index on content_in_directory(content, directory, location); +alter table revision_in_origin add primary key (revision, origin); +alter table revision_before_revision add primary key (prev, next); diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py index 25cb586..0a94547 100644 --- a/swh/provenance/tests/test_cli.py +++ b/swh/provenance/tests/test_cli.py @@ -1,96 +1,97 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from click.testing import CliRunner import psycopg2 import pytest from swh.core.cli import swh as swhmain import swh.core.cli.db # noqa ; ensure cli is loaded import swh.provenance.cli # noqa ; ensure cli is loaded def test_cli_swh_db_help(): # swhmain.add_command(provenance_cli) result = CliRunner().invoke(swhmain, ["provenance", "-h"]) assert result.exit_code == 0 assert "Commands:" in result.output commands = result.output.split("Commands:")[1] for command in ( "find-all", "find-first", "iter-origins", "iter-revisions", ): assert f" {command} " in commands TABLES = { "dbflavor", "dbversion", "content", - "content_early_in_rev", - "content_in_dir", + "content_in_revision", + "content_in_directory", "directory", - "directory_in_rev", + "directory_in_revision", + "location", "origin", "revision", - "revision_before_rev", - "revision_in_org", + "revision_before_revision", + "revision_in_origin", } @pytest.mark.parametrize( "flavor, dbtables", (("with-path", TABLES | {"location"}), ("without-path", TABLES)) ) def test_cli_db_create_and_init_db_with_flavor( monkeypatch, postgresql, flavor, dbtables ): """Test that 'swh db init provenance' works with flavors for both with-path and without-path flavors""" dbname = f"{flavor}-db" # DB creation using 'swh db create' db_params = postgresql.get_dsn_parameters() monkeypatch.setenv("PGHOST", db_params["host"]) monkeypatch.setenv("PGUSER", db_params["user"]) monkeypatch.setenv("PGPORT", db_params["port"]) result = CliRunner().invoke(swhmain, ["db", "create", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output # DB init using 'swh db init' result = CliRunner().invoke( swhmain, ["db", "init", "-d", dbname, "--flavor", flavor, "provenance"] ) assert result.exit_code == 0, result.output assert f"(flavor {flavor})" in result.output db_params["dbname"] = dbname cnx = psycopg2.connect(**db_params) # check the DB looks OK (check for db_flavor and expected tables) with cnx.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == (flavor,) cur.execute( "select table_name from information_schema.tables " "where table_schema = 'public' " f"and table_catalog = '{dbname}'" ) tables = set(x for (x,) in cur.fetchall()) assert tables == dbtables def test_cli_init_db_default_flavor(provenance_db): "Test that 'swh db init provenance' defaults to a with-path flavored DB" dbname = provenance_db.dsn result = CliRunner().invoke(swhmain, ["db", "init", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output with provenance_db.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == ("with-path",) diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py index fe9d62c..6843c78 100644 --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -1,325 +1,311 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List, Tuple import pytest from swh.provenance.model import RevisionEntry from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, synthetic_result, ) from swh.provenance.tests.test_provenance_db import ts2dt def sha1s(cur, table): """return the 'sha1' column from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ cur.execute(f"SELECT sha1 FROM {table}") return set(sha1.hex() for (sha1,) in cur.fetchall()) def locations(cur): """return the 'path' column from the DB location table 'cur' is a cursor to the provenance index DB. """ cur.execute("SELECT encode(location.path::bytea, 'escape') FROM location") return set(x for (x,) in cur.fetchall()) def relations(cur, src, dst): """return the triplets ('sha1', 'sha1', 'path') from the DB for the relation between 'src' table and 'dst' table (i.e. for C-R, C-D and D-R relations). 'cur' is a cursor to the provenance index DB. """ - relation = { - ("content", "revision"): "content_early_in_rev", - ("content", "directory"): "content_in_dir", - ("directory", "revision"): "directory_in_rev", - }[(src, dst)] - - srccol = {"content": "blob", "directory": "dir"}[src] - dstcol = {"directory": "dir", "revision": "rev"}[dst] - + relation = f"{src}_in_{dst}" + # note that the columns have the same name as the relations they refer to, + # so we can write things like "rel.{dst}=src.id" in the query below cur.execute( f"SELECT encode(src.sha1::bytea, 'hex')," f" encode(dst.sha1::bytea, 'hex')," f" encode(location.path::bytea, 'escape') " f"FROM {relation} as rel, " f" {src} as src, {dst} as dst, location " - f"WHERE rel.{srccol}=src.id AND rel.{dstcol}=dst.id AND rel.loc=location.id" + f"WHERE rel.{src}=src.id " + f" AND rel.{dst}=dst.id " + f" AND rel.location=location.id" ) return set(cur.fetchall()) def get_timestamp(cur, table, sha1): """return the date for the 'sha1' from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ if isinstance(sha1, str): sha1 = bytes.fromhex(sha1) cur.execute(f"SELECT date FROM {table} WHERE sha1=%s", (sha1,)) return [date.timestamp() for (date,) in cur.fetchall()] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics(provenance, swh_storage, archive, repo, lower, mindepth): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows = { "content": set(), - "content_in_dir": set(), - "content_early_in_rev": set(), + "content_in_directory": set(), + "content_in_revision": set(), "directory": set(), - "directory_in_rev": set(), + "directory_in_revision": set(), "location": set(), "revision": set(), } + cursor = provenance.storage.cursor for synth_rev in synthetic_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"].hex()) - assert rows["revision"] == sha1s( - provenance.storage.cursor, "revision" - ), synth_rev["msg"] + assert rows["revision"] == sha1s(cursor, "revision"), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] - assert get_timestamp( - provenance.storage.cursor, "revision", synth_rev["sha1"].hex() - ) == [rev_ts], synth_rev["msg"] + assert get_timestamp(cursor, "revision", synth_rev["sha1"].hex()) == [ + rev_ts + ], synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"].hex() for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"].hex() for x in synth_rev["D_C"]) - assert rows["content"] == sha1s( - provenance.storage.cursor, "content" - ), synth_rev["msg"] + assert rows["content"] == sha1s(cursor, "content"), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table - rows["content_early_in_rev"] |= set( + rows["content_in_revision"] |= set( (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_C"] ) - assert rows["content_early_in_rev"] == relations( - provenance.storage.cursor, "content", "revision" + assert rows["content_in_revision"] == relations( + cursor, "content", "revision" ), synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: - assert get_timestamp(provenance.storage.cursor, "content", rc["dst"]) == [ + assert get_timestamp(cursor, "content", rc["dst"]) == [ rev_ts + rc["rel_ts"] ], synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"].hex() for x in synth_rev["R_D"]) - assert rows["directory"] == sha1s( - provenance.storage.cursor, "directory" - ), synth_rev["msg"] + assert rows["directory"] == sha1s(cursor, "directory"), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries - rows["directory_in_rev"] |= set( + rows["directory_in_revision"] |= set( (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_D"] ) - assert rows["directory_in_rev"] == relations( - provenance.storage.cursor, "directory", "revision" + assert rows["directory_in_revision"] == relations( + cursor, "directory", "revision" ), synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: - assert get_timestamp(provenance.storage.cursor, "directory", rd["dst"]) == [ + assert get_timestamp(cursor, "directory", rd["dst"]) == [ rev_ts + rd["rel_ts"] ], synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries - rows["content_in_dir"] |= set( + rows["content_in_directory"] |= set( (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["D_C"] ) - assert rows["content_in_dir"] == relations( - provenance.storage.cursor, "content", "directory" + assert rows["content_in_directory"] == relations( + cursor, "content", "directory" ), synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: - assert get_timestamp(provenance.storage.cursor, "content", dc["dst"]) == [ + assert get_timestamp(cursor, "content", dc["dst"]) == [ rev_ts + dc["rel_ts"] ], synth_rev["msg"] # check for location entries rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) - assert rows["location"] == locations(provenance.storage.cursor), synth_rev[ - "msg" - ] + assert rows["location"] == locations(cursor), synth_rev["msg"] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) -@pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_all( - provenance, swh_storage, archive, repo, lower, mindepth, batch + provenance, swh_storage, archive, repo, lower, mindepth ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] - if batch: - revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) - else: - for revision in revisions: - revision_add( - provenance, archive, [revision], lower=lower, mindepth=mindepth - ) + # XXX adding all revisions at once should be working just fine, but it does not... + # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) + # ...so add revisions one at a time for now + for revision in revisions: + revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences = {} for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, rc["path"]) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, dc["prefix"] + "/" + dc["path"]) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ (blob.hex(), rev.hex(), date.timestamp(), path.decode()) for blob, rev, date, path in provenance.content_find_all( bytes.fromhex(content_id) ) ] assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics_content_find_first( provenance, swh_storage, archive, repo, lower, mindepth ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] # XXX adding all revisions at once should be working just fine, but it does not... # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) # ...so add revisions one at a time for now for revision in revisions: revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, str, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, rc["path"]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): (r_sha1, r_rev_id, r_ts, r_path) = provenance.content_find_first( bytes.fromhex(content_id) ) assert r_sha1.hex() == content_id assert r_rev_id.hex() == rev_id assert r_ts.timestamp() == ts assert r_path.decode() in paths