diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index fc926c4..3c06d7e 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,97 +1,97 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from typing import TYPE_CHECKING if TYPE_CHECKING: from .archive import ArchiveInterface from .interface import ProvenanceInterface, ProvenanceStorageInterface def get_archive(cls: str, **kwargs) -> ArchiveInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: archive's class, either 'api' or 'direct' args: dictionary of arguments passed to the archive class constructor Returns: an instance of archive object (either using swh.storage API or direct queries to the archive's database) Raises: :cls:`ValueError` if passed an unknown archive class. """ if cls == "api": from swh.storage import get_storage from .storage.archive import ArchiveStorage return ArchiveStorage(get_storage(**kwargs["storage"])) elif cls == "direct": from swh.core.db import BaseDb from .postgresql.archive import ArchivePostgreSQL return ArchivePostgreSQL(BaseDb.connect(**kwargs["db"]).conn) else: raise ValueError def get_provenance(**kwargs) -> ProvenanceInterface: """Get an provenance object with arguments ``args``. Args: args: dictionary of arguments to retrieve a swh.provenance.storage class (see :func:`get_provenance_storage` for details) Returns: an instance of provenance object """ from .provenance import Provenance return Provenance(get_provenance_storage(**kwargs)) def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: storage's class, only 'local' is currently supported args: dictionary of arguments passed to the storage class constructor Returns: an instance of storage object Raises: :cls:`ValueError` if passed an unknown archive class. """ if cls == "local": from swh.core.db import BaseDb from .postgresql.provenancedb_base import ProvenanceDBBase conn = BaseDb.connect(**kwargs["db"]).conn raise_on_commit = kwargs.get("raise_on_commit", False) - if ProvenanceDBBase(conn, raise_on_commit).flavor == "with-path": + if "with-path" in ProvenanceDBBase(conn, raise_on_commit).flavor: from .postgresql.provenancedb_with_path import ProvenanceWithPathDB return ProvenanceWithPathDB(conn, raise_on_commit) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB return ProvenanceWithoutPathDB(conn, raise_on_commit) elif cls == "remote": from .api.client import RemoteProvenanceStorage storage = RemoteProvenanceStorage(**kwargs) assert isinstance(storage, ProvenanceStorageInterface) return storage else: raise ValueError diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index 6e1b6f3..42d6490 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,319 +1,357 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import itertools import logging from typing import Dict, Generator, Iterable, Optional, Set, Tuple import psycopg2 import psycopg2.extras from typing_extensions import Literal from swh.core.db import BaseDb from swh.model.model import Sha1Git from ..interface import ( EntityType, ProvenanceResult, RelationData, RelationType, RevisionData, ) class ProvenanceDBBase: def __init__( self, conn: psycopg2.extensions.connection, raise_on_commit: bool = False ): BaseDb.adapt_conn(conn) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # XXX: not sure this is the best place to do it! sql = "SET timezone TO 'UTC'" self.cursor.execute(sql) self._flavor: Optional[str] = None self.raise_on_commit = raise_on_commit @property def flavor(self) -> str: if self._flavor is None: sql = "SELECT swh_get_dbflavor() AS flavor" self.cursor.execute(sql) self._flavor = self.cursor.fetchone()["flavor"] assert self._flavor is not None return self._flavor def with_path(self) -> bool: - return self.flavor == "with-path" + return "with-path" in self.flavor + + @property + def denormalized(self) -> bool: + return "denormalized" in self.flavor def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: ... def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("content", dates) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("content", ids) def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("directory", dates) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("directory", ids) def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: sql = f"SELECT sha1 FROM {entity.value}" self.cursor.execute(sql) return {row["sha1"] for row in self.cursor.fetchall()} def location_get(self) -> Set[bytes]: sql = "SELECT encode(location.path::bytea, 'escape') AS path FROM location" self.cursor.execute(sql) return {row["path"] for row in self.cursor.fetchall()} def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: try: if urls: sql = """ LOCK TABLE ONLY origin; INSERT INTO origin(sha1, url) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, urls.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: urls: Dict[Sha1Git, str] = {} sha1s = tuple(ids) if sha1s: values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, url FROM origin WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) urls.update((row["sha1"], row["url"]) for row in self.cursor.fetchall()) return urls def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("revision", dates) def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: try: if origins: sql = """ LOCK TABLE ONLY revision; INSERT INTO revision(sha1, origin) (SELECT V.rev AS sha1, O.id AS origin FROM (VALUES %s) AS V(rev, org) JOIN origin AS O ON (O.sha1=V.org)) ON CONFLICT (sha1) DO UPDATE SET origin=EXCLUDED.origin """ psycopg2.extras.execute_values(self.cursor, sql, origins.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: result: Dict[Sha1Git, RevisionData] = {} sha1s = tuple(ids) if sha1s: values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date, origin FROM revision WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) result.update( (row["sha1"], RevisionData(date=row["date"], origin=row["origin"])) for row in self.cursor.fetchall() ) return result def relation_add( self, relation: RelationType, data: Iterable[RelationData] ) -> bool: try: rows = tuple((rel.src, rel.dst, rel.path) for rel in data) if rows: table = relation.value src, *_, dst = table.split("_") if src != "origin": # Origin entries should be inserted previously as they require extra # non-null information srcs = tuple(set((sha1,) for (sha1, _, _) in rows)) sql = f""" LOCK TABLE ONLY {src}; INSERT INTO {src}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, srcs) if dst != "origin": # Origin entries should be inserted previously as they require extra # non-null information dsts = tuple(set((sha1,) for (_, sha1, _) in rows)) sql = f""" LOCK TABLE ONLY {dst}; INSERT INTO {dst}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, dsts) joins = [ f"INNER JOIN {src} AS S ON (S.sha1=V.src)", f"INNER JOIN {dst} AS D ON (D.sha1=V.dst)", ] - selected = ["S.id", "D.id"] + nope = (RelationType.REV_BEFORE_REV, RelationType.REV_IN_ORG) + selected = ["S.id"] + if self.denormalized and relation not in nope: + selected.append("ARRAY_AGG(D.id)") + else: + selected.append("D.id") if self._relation_uses_location_table(relation): locations = tuple(set((path,) for (_, _, path) in rows)) sql = """ LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s ON CONFLICT (path) DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, locations) joins.append("INNER JOIN location AS L ON (L.path=V.path)") - selected.append("L.id") + if self.denormalized: + selected.append("ARRAY_AGG(L.id)") + else: + selected.append("L.id") + sql_l = [ + f"INSERT INTO {table}", + f" SELECT {', '.join(selected)}", + " FROM (VALUES %s) AS V(src, dst, path)", + *joins, + ] - sql = f""" - INSERT INTO {table} - (SELECT {", ".join(selected)} - FROM (VALUES %s) AS V(src, dst, path) - {''' - '''.join(joins)}) - ON CONFLICT DO NOTHING + if self.denormalized and relation not in nope: + sql_l.append("GROUP BY S.id") + sql_l.append( + f"""ON CONFLICT ({src}) DO UPDATE + SET {dst}=ARRAY( + SELECT UNNEST({table}.{dst} || excluded.{dst})), + location=ARRAY( + SELECT UNNEST({relation.value}.location || excluded.location)) """ + ) + else: + sql_l.append("ON CONFLICT DO NOTHING") + sql = "\n".join(sql_l) psycopg2.extras.execute_values(self.cursor, sql, rows) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Set[RelationData]: return self._relation_get(relation, ids, reverse) def relation_get_all(self, relation: RelationType) -> Set[RelationData]: return self._relation_get(relation, None) def _entity_get_date( self, entity: Literal["content", "directory", "revision"], ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: dates: Dict[Sha1Git, datetime] = {} sha1s = tuple(ids) if sha1s: values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date FROM {entity} WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) dates.update((row["sha1"], row["date"]) for row in self.cursor.fetchall()) return dates def _entity_set_date( self, entity: Literal["content", "directory", "revision"], data: Dict[Sha1Git, datetime], ) -> bool: try: if data: sql = f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """ psycopg2.extras.execute_values(self.cursor, sql, data.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def _relation_get( self, relation: RelationType, ids: Optional[Iterable[Sha1Git]], reverse: bool = False, ) -> Set[RelationData]: result: Set[RelationData] = set() sha1s: Optional[Tuple[Tuple[Sha1Git, ...]]] if ids is not None: sha1s = (tuple(ids),) - where = f"WHERE {'S.sha1' if not reverse else 'D.sha1'} IN %s" + where = f"WHERE {'S' if not reverse else 'D'}.sha1 IN %s" else: sha1s = None where = "" + aggreg_dst = self.denormalized and relation in ( + RelationType.CNT_EARLY_IN_REV, + RelationType.CNT_IN_DIR, + RelationType.DIR_IN_REV, + ) if sha1s is None or sha1s[0]: table = relation.value src, *_, dst = table.split("_") # TODO: improve this! if src == "revision" and dst == "revision": src_field = "prev" dst_field = "next" else: src_field = src dst_field = dst - joins = [ - f"INNER JOIN {src} AS S ON (S.id=R.{src_field})", - f"INNER JOIN {dst} AS D ON (D.id=R.{dst_field})", - ] - selected = ["S.sha1 AS src", "D.sha1 AS dst"] + if aggreg_dst: + revloc = f"UNNEST(R.{dst_field}) AS dst" + if self._relation_uses_location_table(relation): + revloc += ", UNNEST(R.location) AS path" + else: + revloc = f"R.{dst_field} AS dst" + if self._relation_uses_location_table(relation): + revloc += ", R.location AS path" + + inner_sql = f""" + SELECT S.sha1 AS src, {revloc} + FROM {table} AS R + INNER JOIN {src} AS S ON (S.id=R.{src_field}) + {where} + """ if self._relation_uses_location_table(relation): - joins.append("INNER JOIN location AS L ON (L.id=R.location)") - selected.append("L.path AS path") + loc = "L.path AS path" else: - selected.append("NULL AS path") - + loc = "NULL AS path" sql = f""" - SELECT {", ".join(selected)} - FROM {table} AS R - {" ".join(joins)} - {where} - """ + SELECT CL.src, D.sha1 AS dst, {loc} + FROM ({inner_sql}) AS CL + INNER JOIN {dst} AS D ON (D.id=CL.dst) + """ + if self._relation_uses_location_table(relation): + sql += "INNER JOIN location AS L ON (L.id=CL.path)" + self.cursor.execute(sql, sha1s) result.update(RelationData(**row) for row in self.cursor.fetchall()) return result def _relation_uses_location_table(self, relation: RelationType) -> bool: ... diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py index 149458d..6ec25a1 100644 --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,75 +1,149 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Generator, Optional from swh.model.model import Sha1Git from ..interface import ProvenanceResult, RelationType from .provenancedb_base import ProvenanceDBBase class ProvenanceWithPathDB(ProvenanceDBBase): def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - sql = """ + if self.denormalized: + sql = """ + SELECT C_L.sha1 AS content, + R.sha1 AS revision, + R.date AS date, + O.url AS origin, + L.path AS path + FROM ( + sELECT C.sha1 AS sha1, + UNNEST(revision) AS revision, + UNNEST(location) AS location + FROM content_in_revision AS C_R + INNER JOIN content AS C ON (C.id=C_R.content) + WHERE C.sha1=%s) AS C_L + INNER JOIN revision AS R ON (R.id=C_L.revision) + INNER JOIN location AS L ON (L.id=C_L.location) + LEFT JOIN origin AS O ON (R.origin=O.id) + ORDER BY date, revision, origin, path ASC LIMIT 1 + """ + else: + sql = """ SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, L.path AS path FROM content AS C INNER JOIN content_in_revision AS CR ON (CR.content=C.id) INNER JOIN location as L ON (CR.location=L.id) INNER JOIN revision as R ON (CR.revision=R.id) - LEFT JOIN origin as O ON (R.origin=O.id) + LEFT JOIN origin AS O ON (R.origin=O.id) WHERE C.sha1=%s ORDER BY date, revision, origin, path ASC LIMIT 1 """ self.cursor.execute(sql, (id,)) row = self.cursor.fetchone() return ProvenanceResult(**row) if row is not None else None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" - sql = f""" + if self.denormalized: + sql = f""" + (SELECT C_L.sha1 AS content, + R.sha1 AS revision, + R.date AS date, + O.url AS origin, + L.path AS path + FROM ( + SELECT C.sha1 AS sha1, + unnest(revision) AS revision, + unnest(location) AS location + FROM content_in_revision AS C_R + INNER JOIN content AS C ON (C.id = C_R.content) + WHERE C.sha1=%s) AS C_L + INNER JOIN revision AS R ON (R.id = C_L.revision) + INNER JOIN location AS L ON (L.id = C_L.location) + LEFT JOIN origin AS O ON (R.origin=O.id) + ) + UNION + (WITH + C_D as ( + SELECT C.sha1 AS content_sha1, + unnest(CD.directory) AS directory, + unnest(CD.location) AS location + FROM content AS C + INNER JOIN content_in_directory AS CD ON (CD.content = C.id) + WHERE C.sha1=%s + ), + D_R as ( + SELECT C_D.content_sha1 AS content_sha1, + DL.path AS file_path, + unnest(DR.revision) AS revision, + unnest(DR.location) AS prefix_location + FROM C_D + INNER JOIN directory_in_revision AS DR ON (DR.directory = C_D.directory) + INNER JOIN location AS DL ON (DL.id = C_D.location) + ) + SELECT D_R.content_sha1 AS sha1, + R.sha1 AS revision, + R.date AS date, + O.url AS origin, + CASE DL.path + WHEN '' THEN D_R.file_path + WHEN '.' THEN D_R.file_path + ELSE (DL.path || '/' || D_R.file_path)::unix_path + END AS path + FROM D_R + INNER JOIN location AS DL ON (D_R.prefix_location = DL.id) + INNER JOIN revision AS R ON (D_R.revision = R.id) + LEFT JOIN origin AS O ON (R.origin=O.id) + ) + ORDER BY date, revision, origin, path {early_cut} + """ + else: + sql = f""" (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, L.path AS path FROM content AS C INNER JOIN content_in_revision AS CR ON (CR.content=C.id) INNER JOIN location AS L ON (CR.location=L.id) INNER JOIN revision AS R ON (CR.revision=R.id) - LEFT JOIN origin as O ON (R.origin=O.id) + LEFT JOIN origin AS O ON (R.origin=O.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, CASE DL.path WHEN '' THEN CL.path WHEN '.' THEN CL.path ELSE (DL.path || '/' || CL.path)::unix_path END AS path FROM content AS C INNER JOIN content_in_directory AS CD ON (C.id=CD.content) INNER JOIN directory_in_revision AS DR ON (CD.directory=DR.directory) INNER JOIN revision AS R ON (DR.revision=R.id) INNER JOIN location AS CL ON (CD.location=CL.id) INNER JOIN location AS DL ON (DR.location=DL.id) LEFT JOIN origin AS O ON (R.origin=O.id) WHERE C.sha1=%s) ORDER BY date, revision, origin, path {early_cut} """ self.cursor.execute(sql, (id, id)) yield from (ProvenanceResult(**row) for row in self.cursor.fetchall()) def _relation_uses_location_table(self, relation: RelationType) -> bool: src, *_ = relation.value.split("_") return src in ("content", "directory") diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py index 30ceda3..3d5f323 100644 --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,66 +1,127 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Generator, Optional from swh.model.model import Sha1Git from ..provenance import ProvenanceResult, RelationType from .provenancedb_base import ProvenanceDBBase class ProvenanceWithoutPathDB(ProvenanceDBBase): def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - sql = """ + if self.denormalized: + sql = """ + SELECT C_L.sha1 AS content, + R.sha1 AS revision, + R.date AS date, + O.url AS origin, + '\\x'::bytea AS path + FROM ( + SELECT C.sha1, UNNEST(revision) AS revision + FROM content_in_revision AS C_R + INNER JOIN content AS C ON (C.id=C_R.content) + WHERE C.sha1=%s + ) AS C_L + INNER JOIN revision AS R ON (R.id=C_L.revision) + LEFT JOIN origin AS O ON (R.origin=O.id) + ORDER BY date, revision, origin ASC LIMIT 1 + """ + else: + sql = """ SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, - '\\x'::bytea as path + '\\x'::bytea AS path FROM content AS C - INNER JOIN content_in_revision AS CR ON (CR.content=C.id) - INNER JOIN revision as R ON (CR.revision=R.id) - LEFT JOIN origin as O ON (R.origin=O.id) + INNER JOIN content_in_revision AS CR ON (CR.content = C.id) + INNER JOIN revision AS R ON (CR.revision = R.id) + LEFT JOIN origin AS O ON (R.origin=O.id) WHERE C.sha1=%s ORDER BY date, revision, origin ASC LIMIT 1 """ + self.cursor.execute(sql, (id,)) row = self.cursor.fetchone() return ProvenanceResult(**row) if row is not None else None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" - sql = f""" + if self.denormalized: + sql = f""" + (SELECT C_L.sha1 AS content, + R.sha1 AS revision, + R.date AS date, + O.url AS origin, + '\\x'::bytea as path + FROM ( + SELECT C.sha1, UNNEST(revision) AS revision + FROM content_in_revision AS C_R + INNER JOIN content AS C ON (C.id=C_R.content) + WHERE C.sha1=%s) AS C_L + INNER JOIN revision AS R ON (R.id=C_L.revision) + LEFT JOIN origin AS O ON (R.origin=O.id) + ) + UNION + (WITH + C_D AS ( + SELECT C.sha1 AS content_sha1, + unnest(CD.directory) AS directory + FROM content AS C + INNER JOIN content_in_directory AS CD ON (CD.content = C.id) + WHERE C.sha1=%s + ), + D_R AS ( + SELECT C_D.content_sha1 AS content_sha1, + UNNEST(DR.revision) AS revision + FROM C_D + INNER JOIN directory_in_revision AS DR ON (DR.directory = C_D.directory) + ) + SELECT D_R.content_sha1 AS content, + R.sha1 AS revision, + R.date AS date, + O.url AS origin, + '\\x'::bytea AS path + FROM D_R + INNER JOIN revision AS R ON (D_R.revision = R.id) + LEFT JOIN origin AS O ON (R.origin=O.id) + ) + ORDER BY date, revision, path {early_cut} + """ + else: + sql = f""" (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, '\\x'::bytea as path FROM content AS C INNER JOIN content_in_revision AS CR ON (CR.content=C.id) INNER JOIN revision AS R ON (CR.revision=R.id) - LEFT JOIN origin as O ON (R.origin=O.id) + LEFT JOIN origin AS O ON (R.origin=O.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, - '\\x'::bytea as path + '\\x'::bytea AS path FROM content AS C INNER JOIN content_in_directory AS CD ON (C.id=CD.content) INNER JOIN directory_in_revision AS DR ON (CD.directory=DR.directory) INNER JOIN revision AS R ON (DR.revision=R.id) - LEFT JOIN origin as O ON (R.origin=O.id) + LEFT JOIN origin AS O ON (R.origin=O.id) WHERE C.sha1=%s) ORDER BY date, revision, origin {early_cut} """ self.cursor.execute(sql, (id, id)) yield from (ProvenanceResult(**row) for row in self.cursor.fetchall()) def _relation_uses_location_table(self, relation: RelationType) -> bool: return False diff --git a/swh/provenance/sql/15-flavor.sql b/swh/provenance/sql/15-flavor.sql index b270d0b..6a75587 100644 --- a/swh/provenance/sql/15-flavor.sql +++ b/swh/provenance/sql/15-flavor.sql @@ -1,21 +1,23 @@ -- database flavor create type database_flavor as enum ( 'with-path', - 'without-path' + 'without-path', + 'with-path-denormalized', + 'without-path-denormalized' ); comment on type database_flavor is 'Flavor of the current database'; create table dbflavor ( flavor database_flavor, single_row char(1) primary key default 'x', check (single_row = 'x') ); comment on table dbflavor is 'Database flavor storage'; comment on column dbflavor.flavor is 'Database flavor currently deployed'; comment on column dbflavor.single_row is 'Bogus column to force the table to have a single row'; create or replace function swh_get_dbflavor() returns database_flavor language sql stable as $$ select coalesce((select flavor from dbflavor), 'with-path'); $$; comment on function swh_get_dbflavor is 'Get the flavor of the database currently deployed'; diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql index 8bf09a5..976d473 100644 --- a/swh/provenance/sql/30-schema.sql +++ b/swh/provenance/sql/30-schema.sql @@ -1,135 +1,151 @@ -- psql variables to get the current database flavor +select position('denormalized' in swh_get_dbflavor()::text) = 0 as dbflavor_norm \gset create table dbversion ( version int primary key, release timestamptz, description text ); comment on table dbversion is 'Details of current db version'; comment on column dbversion.version is 'SQL schema version'; comment on column dbversion.release is 'Version deployment timestamp'; comment on column dbversion.description is 'Release description'; -- latest schema version insert into dbversion(version, release, description) values(1, now(), 'Work In Progress'); -- a Git object ID, i.e., a Git-style salted SHA1 checksum create domain sha1_git as bytea check (length(value) = 20); -- UNIX path (absolute, relative, individual path component, etc.) create domain unix_path as bytea; -- entity tables create table content ( id bigserial primary key, -- internal identifier of the content blob sha1 sha1_git unique not null, -- intrinsic identifier of the content blob date timestamptz -- timestamp of the revision where the blob appears early ); comment on column content.id is 'Content internal identifier'; comment on column content.sha1 is 'Content intrinsic identifier'; comment on column content.date is 'Earliest timestamp for the content (first seen time)'; create table directory ( id bigserial primary key, -- internal identifier of the directory appearing in an isochrone inner frontier sha1 sha1_git unique not null, -- intrinsic identifier of the directory date timestamptz -- max timestamp among those of the directory children's ); comment on column directory.id is 'Directory internal identifier'; comment on column directory.sha1 is 'Directory intrinsic identifier'; comment on column directory.date is 'Latest timestamp for the content in the directory'; create table revision ( id bigserial primary key, -- internal identifier of the revision sha1 sha1_git unique not null, -- intrinsic identifier of the revision date timestamptz, -- timestamp of the revision origin bigint -- id of the preferred origin -- foreign key (org) references origin (id) ); comment on column revision.id is 'Revision internal identifier'; comment on column revision.sha1 is 'Revision intrinsic identifier'; comment on column revision.date is 'Revision timestamp'; comment on column revision.origin is 'preferred origin for the revision'; create table location ( id bigserial primary key, -- internal identifier of the location path unix_path unique not null -- path to the location ); comment on column location.id is 'Location internal identifier'; comment on column location.path is 'Path to the location'; create table origin ( id bigserial primary key, -- internal identifier of the origin sha1 sha1_git unique not null, -- intrinsic identifier of the origin url unix_path unique not null -- url of the origin ); comment on column origin.id is 'Origin internal identifier'; comment on column origin.sha1 is 'Origin intrinsic identifier'; comment on column origin.url is 'URL of the origin'; -- relation tables create table content_in_revision ( content bigint not null, -- internal identifier of the content blob +\if :dbflavor_norm revision bigint not null, -- internal identifier of the revision where the blob appears for the first time location bigint -- location of the content relative to the revision root directory +\else + revision bigint[], -- internal identifier of the revision where the blob appears for the first time + location bigint[] -- location of the content relative to the revision root directory +\endif -- foreign key (blob) references content (id), -- foreign key (rev) references revision (id), -- foreign key (loc) references location (id) ); comment on column content_in_revision.content is 'Content internal identifier'; comment on column content_in_revision.revision is 'Revision internal identifier'; comment on column content_in_revision.location is 'Location of content in revision'; create table content_in_directory ( content bigint not null, -- internal identifier of the content blob +\if :dbflavor_norm directory bigint not null, -- internal identifier of the directory containing the blob location bigint -- location of the content relative to its parent directory in the isochrone frontier +\else + directory bigint[], + location bigint[] +\endif -- foreign key (blob) references content (id), -- foreign key (dir) references directory (id), -- foreign key (loc) references location (id) ); comment on column content_in_directory.content is 'Content internal identifier'; comment on column content_in_directory.directory is 'Directory internal identifier'; comment on column content_in_directory.location is 'Location of content in directory'; create table directory_in_revision ( directory bigint not null, -- internal identifier of the directory appearing in the revision +\if :dbflavor_norm revision bigint not null, -- internal identifier of the revision containing the directory location bigint -- location of the directory relative to the revision root directory +\else + revision bigint[], + location bigint[] +\endif -- foreign key (dir) references directory (id), -- foreign key (rev) references revision (id), -- foreign key (loc) references location (id) ); comment on column directory_in_revision.directory is 'Directory internal identifier'; comment on column directory_in_revision.revision is 'Revision internal identifier'; comment on column directory_in_revision.location is 'Location of directory in revision'; create table revision_in_origin ( revision bigint not null, -- internal identifier of the revision poined by the origin origin bigint not null -- internal identifier of the origin that points to the revision -- foreign key (rev) references revision (id), -- foreign key (org) references origin (id) ); comment on column revision_in_origin.revision is 'Revision internal identifier'; comment on column revision_in_origin.origin is 'Origin internal identifier'; create table revision_before_revision ( prev bigserial not null, -- internal identifier of the source revision next bigserial not null -- internal identifier of the destination revision -- foreign key (prev) references revision (id), -- foreign key (next) references revision (id) ); comment on column revision_before_revision.prev is 'Source revision internal identifier'; comment on column revision_before_revision.next is 'Destination revision internal identifier'; diff --git a/swh/provenance/sql/60-indexes.sql b/swh/provenance/sql/60-indexes.sql index 5a51468..6141077 100644 --- a/swh/provenance/sql/60-indexes.sql +++ b/swh/provenance/sql/60-indexes.sql @@ -1,9 +1,18 @@ -- psql variables to get the current database flavor +select position('denormalized' in swh_get_dbflavor()::text) = 0 as dbflavor_norm \gset -- create unique indexes (instead of pkey) because location might be null for -- the without-path flavor +\if :dbflavor_norm create unique index on content_in_revision(content, revision, location); create unique index on directory_in_revision(directory, revision, location); create unique index on content_in_directory(content, directory, location); +\else +create unique index on content_in_revision(content); +create unique index on directory_in_revision(directory); +create unique index on content_in_directory(content); +\endif + + alter table revision_in_origin add primary key (revision, origin); alter table revision_before_revision add primary key (prev, next); diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index b07f481..aa54503 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,273 +1,283 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from os import path import re from typing import Any, Dict, Iterable, Iterator, List, Optional import msgpack import psycopg2 import pytest from typing_extensions import TypedDict from swh.journal.serializers import msgpack_ext_hook from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance import get_provenance, get_provenance_storage from swh.provenance.api.client import RemoteProvenanceStorage import swh.provenance.api.server as server from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage from swh.storage.postgresql.storage import Storage from swh.storage.replay import process_replay_objects -@pytest.fixture(params=["with-path", "without-path"]) +@pytest.fixture( + params=[ + "with-path", + "without-path", + "with-path-denormalized", + "without-path-denormalized", + ] +) def populated_db( request, # TODO: add proper type annotation postgresql: psycopg2.extensions.connection, ) -> Dict[str, str]: + """return a working and initialized provenance db""" from swh.core.cli.db import populate_database_for_package - flavor = "with-path" if request.param == "client-server" else request.param - populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor) + # flavor = "with-path" if request.param == "client-server" else request.param + populate_database_for_package( + "swh.provenance", postgresql.dsn, flavor=request.param + ) return { - item.split("=")[0]: item.split("=")[1] - for item in postgresql.dsn.split() - if item.split("=")[0] != "options" + k: v + for (k, v) in (item.split("=") for item in postgresql.dsn.split()) + if k != "options" } # the Flask app used as server in these tests @pytest.fixture def app(populated_db: Dict[str, str]): assert hasattr(server, "storage") server.storage = get_provenance_storage(cls="local", db=populated_db) yield server.app # the RPCClient class used as client used in these tests @pytest.fixture def swh_rpc_client_class(): return RemoteProvenanceStorage @pytest.fixture(params=["local", "remote"]) def provenance( request, # TODO: add proper type annotation populated_db: Dict[str, str], swh_rpc_client: RemoteProvenanceStorage, ) -> ProvenanceInterface: """return a working and initialized provenance db""" if request.param == "remote": from swh.provenance.provenance import Provenance assert isinstance(swh_rpc_client, ProvenanceStorageInterface) return Provenance(swh_rpc_client) else: # in test sessions, we DO want to raise any exception occurring at commit time prov = get_provenance(cls=request.param, db=populated_db, raise_on_commit=True) return prov @pytest.fixture def swh_storage_with_objects(swh_storage: Storage) -> Storage: """return a Storage object (postgresql-based by default) with a few of each object type in it The inserted content comes from swh.model.tests.swh_model_data. """ for obj_type in ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "origin", "origin_visit", "origin_visit_status", ): getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type]) return swh_storage @pytest.fixture def archive_direct(swh_storage_with_objects: Storage) -> ArchiveInterface: return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn) @pytest.fixture def archive_api(swh_storage_with_objects: Storage) -> ArchiveInterface: return ArchiveStorage(swh_storage_with_objects) @pytest.fixture(params=["archive", "db"]) def archive(request, swh_storage_with_objects: Storage) -> Iterator[ArchiveInterface]: """Return a ArchivePostgreSQL based StorageInterface object""" # this is a workaround to prevent tests from hanging because of an unclosed # transaction. # TODO: refactor the ArchivePostgreSQL to properly deal with # transactions and get rid of this fixture if request.param == "db": archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn) yield archive archive.conn.rollback() else: yield ArchiveStorage(swh_storage_with_objects) def get_datafile(fname: str) -> str: return path.join(path.dirname(__file__), "data", fname) def load_repo_data(repo: str) -> Dict[str, Any]: data: Dict[str, Any] = {} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: unpacker = msgpack.Unpacker( fobj, raw=False, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) for objtype, objd in unpacker: data.setdefault(objtype, []).append(objd) return data def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: return {k: v for (k, v) in d.items() if k in keys} def fill_storage(storage: Storage, data: Dict[str, Any]) -> None: process_replay_objects(data, storage=storage) class SynthRelation(TypedDict): prefix: Optional[str] path: str src: Sha1Git dst: Sha1Git rel_ts: float class SynthRevision(TypedDict): sha1: Sha1Git date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_file(fobj) def _parse_synthetic_file(fobj: Iterable[str]) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-z]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=hash_to_bytes(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py index 4b684a0..deeaf09 100644 --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -1,51 +1,56 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta, timezone from typing import Type from swh.model.model import OriginVisitStatus from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface from swh.provenance.model import OriginEntry from swh.provenance.origin import origin_add from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase from swh.provenance.postgresql.provenancedb_with_path import ProvenanceWithPathDB from swh.provenance.postgresql.provenancedb_without_path import ProvenanceWithoutPathDB from swh.provenance.storage.archive import ArchiveStorage from swh.storage.postgresql.storage import Storage # TODO: remove this function in favour of TimestampWithTimezone.to_datetime # from swh.model.model def ts2dt(ts: dict) -> datetime: timestamp = datetime.fromtimestamp( ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) ) return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) def test_provenance_origin_add( provenance: ProvenanceInterface, swh_storage_with_objects: Storage ) -> None: """Test the origin_add function""" archive = ArchiveStorage(swh_storage_with_objects) for status in TEST_OBJECTS["origin_visit_status"]: assert isinstance(status, OriginVisitStatus) if status.snapshot is not None: entry = OriginEntry(url=status.origin, snapshot=status.snapshot) origin_add(provenance, archive, [entry]) # TODO: check some facts here def test_provenance_flavor(provenance: ProvenanceInterface) -> None: if isinstance(provenance.storage, ProvenanceDBBase): - assert provenance.storage.flavor in ("with-path", "without-path") + assert provenance.storage.flavor in ( + "with-path", + "without-path", + "with-path-denormalized", + "without-path-denormalized", + ) backend_class: Type[ProvenanceStorageInterface] - if provenance.storage.flavor == "with-path": + if "with-path" in provenance.storage.flavor: backend_class = ProvenanceWithPathDB else: backend_class = ProvenanceWithoutPathDB assert isinstance(provenance.storage, backend_class)