diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py index 108beef..32e9320 100644 --- a/swh/provenance/postgresql/provenance.py +++ b/swh/provenance/postgresql/provenance.py @@ -1,374 +1,331 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import itertools import logging -from typing import Dict, Generator, Iterable, Optional, Set, Tuple +from typing import Dict, Generator, Iterable, List, Optional, Set import psycopg2.extensions import psycopg2.extras from typing_extensions import Literal from swh.core.db import BaseDb from swh.model.model import Sha1Git from ..interface import ( EntityType, ProvenanceResult, RelationData, RelationType, RevisionData, ) class ProvenanceStoragePostgreSql: def __init__( self, conn: psycopg2.extensions.connection, raise_on_commit: bool = False ) -> None: BaseDb.adapt_conn(conn) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # XXX: not sure this is the best place to do it! sql = "SET timezone TO 'UTC'" self.cursor.execute(sql) self._flavor: Optional[str] = None self.raise_on_commit = raise_on_commit @property def flavor(self) -> str: if self._flavor is None: sql = "SELECT swh_get_dbflavor() AS flavor" self.cursor.execute(sql) self._flavor = self.cursor.fetchone()["flavor"] assert self._flavor is not None return self._flavor @property def denormalized(self) -> bool: return "denormalized" in self.flavor def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: sql = "SELECT * FROM swh_provenance_content_find_first(%s)" self.cursor.execute(sql, (id,)) row = self.cursor.fetchone() return ProvenanceResult(**row) if row is not None else None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: sql = "SELECT * FROM swh_provenance_content_find_all(%s, %s)" self.cursor.execute(sql, (id, limit)) yield from (ProvenanceResult(**row) for row in self.cursor.fetchall()) def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("content", dates) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("content", ids) def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("directory", dates) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("directory", ids) def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: sql = f"SELECT sha1 FROM {entity.value}" self.cursor.execute(sql) return {row["sha1"] for row in self.cursor.fetchall()} def location_get(self) -> Set[bytes]: sql = "SELECT location.path AS path FROM location" self.cursor.execute(sql) return {row["path"] for row in self.cursor.fetchall()} def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: try: if urls: sql = """ LOCK TABLE ONLY origin; INSERT INTO origin(sha1, url) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, urls.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: urls: Dict[Sha1Git, str] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, url FROM origin WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) urls.update((row["sha1"], row["url"]) for row in self.cursor.fetchall()) return urls def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("revision", dates) def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: try: if origins: sql = """ LOCK TABLE ONLY revision; INSERT INTO revision(sha1, origin) (SELECT V.rev AS sha1, O.id AS origin FROM (VALUES %s) AS V(rev, org) JOIN origin AS O ON (O.sha1=V.org)) ON CONFLICT (sha1) DO UPDATE SET origin=EXCLUDED.origin """ psycopg2.extras.execute_values(self.cursor, sql, origins.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: result: Dict[Sha1Git, RevisionData] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT R.sha1, R.date, O.sha1 AS origin FROM revision AS R LEFT JOIN origin AS O ON (O.id=R.origin) WHERE R.sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) result.update( (row["sha1"], RevisionData(date=row["date"], origin=row["origin"])) for row in self.cursor.fetchall() ) return result def relation_add( self, relation: RelationType, data: Iterable[RelationData] ) -> bool: try: rows = tuple((rel.src, rel.dst, rel.path) for rel in data) if rows: table = relation.value src, *_, dst = table.split("_") if src != "origin": # Origin entries should be inserted previously as they require extra # non-null information srcs = tuple(set((sha1,) for (sha1, _, _) in rows)) sql = f""" LOCK TABLE ONLY {src}; INSERT INTO {src}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, srcs) if dst != "origin": # Origin entries should be inserted previously as they require extra # non-null information dsts = tuple(set((sha1,) for (_, sha1, _) in rows)) sql = f""" LOCK TABLE ONLY {dst}; INSERT INTO {dst}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, dsts) joins = [ f"INNER JOIN {src} AS S ON (S.sha1=V.src)", f"INNER JOIN {dst} AS D ON (D.sha1=V.dst)", ] nope = (RelationType.REV_BEFORE_REV, RelationType.REV_IN_ORG) selected = ["S.id"] if self.denormalized and relation not in nope: selected.append("ARRAY_AGG(D.id)") else: selected.append("D.id") if self._relation_uses_location_table(relation): locations = tuple(set((path,) for (_, _, path) in rows)) sql = """ LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s ON CONFLICT (path) DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, locations) joins.append("INNER JOIN location AS L ON (L.path=V.path)") if self.denormalized: selected.append("ARRAY_AGG(L.id)") else: selected.append("L.id") sql_l = [ f"INSERT INTO {table}", f" SELECT {', '.join(selected)}", " FROM (VALUES %s) AS V(src, dst, path)", *joins, ] if self.denormalized and relation not in nope: sql_l.append("GROUP BY S.id") sql_l.append( f"""ON CONFLICT ({src}) DO UPDATE SET {dst}=ARRAY( SELECT UNNEST({table}.{dst} || EXCLUDED.{dst}) ), location=ARRAY( SELECT UNNEST({relation.value}.location || EXCLUDED.location) ) """ ) else: sql_l.append("ON CONFLICT DO NOTHING") sql = "\n".join(sql_l) psycopg2.extras.execute_values(self.cursor, sql, rows) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Set[RelationData]: return self._relation_get(relation, ids, reverse) def relation_get_all(self, relation: RelationType) -> Set[RelationData]: return self._relation_get(relation, None) def _entity_get_date( self, entity: Literal["content", "directory", "revision"], ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: dates: Dict[Sha1Git, datetime] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date FROM {entity} WHERE sha1 IN ({values}) AND date IS NOT NULL """ self.cursor.execute(sql, sha1s) dates.update((row["sha1"], row["date"]) for row in self.cursor.fetchall()) return dates def _entity_set_date( self, entity: Literal["content", "directory", "revision"], data: Dict[Sha1Git, datetime], ) -> bool: try: if data: sql = f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """ psycopg2.extras.execute_values(self.cursor, sql, data.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def _relation_get( self, relation: RelationType, ids: Optional[Iterable[Sha1Git]], reverse: bool = False, ) -> Set[RelationData]: result: Set[RelationData] = set() - sha1s: Optional[Tuple[Tuple[Sha1Git, ...]]] + sha1s: List[Sha1Git] if ids is not None: - sha1s = (tuple(ids),) - where = f"WHERE {'S' if not reverse else 'D'}.sha1 IN %s" + sha1s = list(ids) + filter = 1 if not reverse else 2 else: - sha1s = None - where = "" - - aggreg_dst = self.denormalized and relation in ( - RelationType.CNT_EARLY_IN_REV, - RelationType.CNT_IN_DIR, - RelationType.DIR_IN_REV, - ) - if sha1s is None or sha1s[0]: - table = relation.value - src, *_, dst = table.split("_") - - # TODO: improve this! - if src == "revision" and dst == "revision": - src_field = "prev" - dst_field = "next" - else: - src_field = src - dst_field = dst - - if aggreg_dst: - revloc = f"UNNEST(R.{dst_field}) AS dst" - if self._relation_uses_location_table(relation): - revloc += ", UNNEST(R.location) AS path" - else: - revloc = f"R.{dst_field} AS dst" - if self._relation_uses_location_table(relation): - revloc += ", R.location AS path" - - inner_sql = f""" - SELECT S.sha1 AS src, {revloc} - FROM {table} AS R - INNER JOIN {src} AS S ON (S.id=R.{src_field}) - """ - if where != "" and not reverse: - inner_sql += where - - if self._relation_uses_location_table(relation): - loc = "L.path AS path" - else: - loc = "NULL AS path" - sql = f""" - SELECT CL.src, D.sha1 AS dst, {loc} - FROM ({inner_sql}) AS CL - INNER JOIN {dst} AS D ON (D.id=CL.dst) - """ - if self._relation_uses_location_table(relation): - sql += "INNER JOIN location AS L ON (L.id=CL.path)" - if where != "" and reverse: - sql += where + sha1s = [] + filter = 0 - self.cursor.execute(sql, sha1s) + if filter == 0 or sha1s: + rel_table = relation.value + src_table, *_, dst_table = rel_table.split("_") + + sql = "SELECT * FROM swh_provenance_relation_get(%s, %s, %s, %s, %s)" + self.cursor.execute(sql, (rel_table, src_table, dst_table, filter, sha1s)) result.update(RelationData(**row) for row in self.cursor.fetchall()) return result def _relation_uses_location_table(self, relation: RelationType) -> bool: if self.with_path(): src = relation.value.split("_")[0] return src in ("content", "directory") return False def with_path(self) -> bool: return "with-path" in self.flavor diff --git a/swh/provenance/sql/40-funcs.sql b/swh/provenance/sql/40-funcs.sql index 84516a5..b130730 100644 --- a/swh/provenance/sql/40-funcs.sql +++ b/swh/provenance/sql/40-funcs.sql @@ -1,338 +1,551 @@ select position('denormalized' in swh_get_dbflavor()::text) = 0 as dbflavor_norm \gset select position('with-path' in swh_get_dbflavor()::text) != 0 as dbflavor_with_path \gset -create type relation_row as (src sha1_git, dst sha1_git, loc unix_path); - \if :dbflavor_norm \if :dbflavor_with_path -- -- with path and normalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from content as C inner join content_in_revision as CR on (CR.content = C.id) inner join location as L on (L.id = CR.location) inner join revision as R on (R.id = CR.revision) left join origin as O on (O.id = R.origin) where C.sha1 = content_id order by date, revision, origin, path asc limit 1 $$; create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ - (select C.sha1 as content, - R.sha1 as revision, - R.date as date, - O.url as origin, - L.path as path - from content as C - inner join content_in_revision as CR on (CR.content = C.id) - inner join location as L on (L.id = CR.location) - inner join revision as R on (R.id = CR.revision) - left join origin as O on (O.id = R.origin) - where C.sha1 = content_id) - union - (select C.sha1 as content, - R.sha1 as revision, - R.date as date, - O.url as origin, - case DL.path - when '' then CL.path - when '.' then CL.path - else (DL.path || '/' || CL.path)::unix_path - end as path - from content as C - inner join content_in_directory as CD on (CD.content = C.id) - inner join directory_in_revision as DR on (DR.directory = CD.directory) - inner join revision as R on (R.id = DR.revision) - inner join location as CL on (CL.id = CD.location) - inner join location as DL on (DL.id = DR.location) - left join origin as O on (O.id = R.origin) - where C.sha1 = content_id) - order by date, revision, origin, path limit early_cut + (select C.sha1 as content, + R.sha1 as revision, + R.date as date, + O.url as origin, + L.path as path + from content as C + inner join content_in_revision as CR on (CR.content = C.id) + inner join location as L on (L.id = CR.location) + inner join revision as R on (R.id = CR.revision) + left join origin as O on (O.id = R.origin) + where C.sha1 = content_id) + union + (select C.sha1 as content, + R.sha1 as revision, + R.date as date, + O.url as origin, + case DL.path + when '' then CL.path + when '.' then CL.path + else (DL.path || '/' || CL.path)::unix_path + end as path + from content as C + inner join content_in_directory as CD on (CD.content = C.id) + inner join directory_in_revision as DR on (DR.directory = CD.directory) + inner join revision as R on (R.id = DR.revision) + inner join location as CL on (CL.id = CD.location) + inner join location as DL on (DL.id = DR.location) + left join origin as O on (O.id = R.origin) + where C.sha1 = content_id) + order by date, revision, origin, path limit early_cut +$$; + +create or replace function swh_provenance_relation_get( + rel_table regclass, src_table regclass, dst_table regclass, filter integer, sha1s sha1_git[] +) + returns table ( + src sha1_git, + dst sha1_git, + path unix_path + ) + language plpgsql + stable +as $$ + declare + src_field text; + dst_field text; + join_location text; + proj_location text; + filter_result text; + begin + if rel_table = 'revision_before_revision'::regclass then + src_field := 'prev'; + dst_field := 'next'; + else + src_field := src_table::text; + dst_field := dst_table::text; + end if; + + if src_table in ('content'::regclass, 'directory'::regclass) then + join_location := 'inner join location as L on (L.id = R.location)'; + proj_location := 'L.path'; + else + join_location := ''; + proj_location := 'NULL::unix_path'; + end if; + + case filter + when 1 then + filter_result := 'where S.sha1 = any($1)'; + when 2 then + filter_result := 'where D.sha1 = any($1)'; + else + filter_result := ''; + end case; + + return query execute format( + 'select S.sha1 as src, D.sha1 as dst, ' || proj_location || ' as path + from %s as R + inner join %s as S on (S.id = R.' || src_field || ') + inner join %s as D on (D.id = R.' || dst_field || ') + ' || join_location || ' + ' || filter_result, + rel_table, src_table, dst_table + ) using sha1s; + end; $$; \else -- -- without path and normalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ - select C.sha1 as content, + select C.sha1 as content, + R.sha1 as revision, + R.date as date, + O.url as origin, + '\x'::unix_path as path + from content as C + inner join content_in_revision as CR on (CR.content = C.id) + inner join revision as R on (R.id = CR.revision) + left join origin as O on (O.id = R.origin) + where C.sha1 = content_id + order by date, revision, origin asc limit 1 +$$; + +create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) + returns table ( + content sha1_git, + revision sha1_git, + date timestamptz, + origin text, + path unix_path + ) + language sql + stable +as $$ + (select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, '\x'::unix_path as path from content as C inner join content_in_revision as CR on (CR.content = C.id) inner join revision as R on (R.id = CR.revision) left join origin as O on (O.id = R.origin) - where C.sha1 = content_id - order by date, revision, origin asc limit 1 + where C.sha1 = content_id) + union + (select C.sha1 as content, + R.sha1 as revision, + R.date as date, + O.url as origin, + '\x'::unix_path as path + from content as C + inner join content_in_directory as CD on (CD.content = C.id) + inner join directory_in_revision as DR on (DR.directory = CD.directory) + inner join revision as R on (R.id = DR.revision) + left join origin as O on (O.id = R.origin) + where C.sha1 = content_id) + order by date, revision, origin, path limit early_cut $$; -create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) +create or replace function swh_provenance_relation_get( + rel_table regclass, src_table regclass, dst_table regclass, filter integer, sha1s sha1_git[] +) returns table ( - content sha1_git, - revision sha1_git, - date timestamptz, - origin text, + src sha1_git, + dst sha1_git, path unix_path ) - language sql + language plpgsql stable as $$ - (select C.sha1 as content, - R.sha1 as revision, - R.date as date, - O.url as origin, - '\x'::unix_path as path - from content as C - inner join content_in_revision as CR on (CR.content = C.id) - inner join revision as R on (R.id = CR.revision) - left join origin as O on (O.id = R.origin) - where C.sha1 = content_id) - union - (select C.sha1 as content, - R.sha1 as revision, - R.date as date, - O.url as origin, - '\x'::unix_path as path - from content as C - inner join content_in_directory as CD on (CD.content = C.id) - inner join directory_in_revision as DR on (DR.directory = CD.directory) - inner join revision as R on (R.id = DR.revision) - left join origin as O on (O.id = R.origin) - where C.sha1 = content_id) - order by date, revision, origin, path limit early_cut + declare + src_field text; + dst_field text; + filter_result text; + begin + if rel_table = 'revision_before_revision'::regclass then + src_field := 'prev'; + dst_field := 'next'; + else + src_field := src_table::text; + dst_field := dst_table::text; + end if; + + case filter + when 1 then + filter_result := 'where S.sha1 = any($1)'; + when 2 then + filter_result := 'where D.sha1 = any($1)'; + else + filter_result := ''; + end case; + + return query execute format( + 'select S.sha1 as src, D.sha1 as dst, NULL::unix_path as path + from %s as R + inner join %s as S on (S.id = R.' || src_field || ') + inner join %s as D on (D.id = R.' || dst_field || ') + ' || filter_result, + rel_table, src_table, dst_table + ) using sha1s; + end; $$; -- :dbflavor_with_path \endif -- :dbflavor_norm \else \if :dbflavor_with_path -- -- with path and denormalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ select CL.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from ( select C.sha1 as sha1, unnest(CR.revision) as revision, unnest(CR.location) as location from content_in_revision as CR inner join content as C on (C.id = CR.content) where C.sha1 = content_id ) as CL inner join revision as R on (R.id = CL.revision) inner join location as L on (L.id = CL.location) left join origin as O on (O.id = R.origin) order by date, revision, origin, path asc limit 1 $$; create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ - (with cntrev as ( + (with + cntrev as ( select C.sha1 as sha1, unnest(CR.revision) as revision, unnest(CR.location) as location from content_in_revision as CR inner join content as C on (C.id = CR.content) - where C.sha1 = content_id - ) - select CR.sha1 as content, - R.sha1 as revision, - R.date as date, - O.url as origin, - L.path as path - from cntrev as CR - inner join revision as R on (R.id = CR.revision) - inner join location as L on (L.id = CR.location) - left join origin as O on (O.id = R.origin) - ) - union - (with cntdir as ( + where C.sha1 = content_id) + select CR.sha1 as content, + R.sha1 as revision, + R.date as date, + O.url as origin, + L.path as path + from cntrev as CR + inner join revision as R on (R.id = CR.revision) + inner join location as L on (L.id = CR.location) + left join origin as O on (O.id = R.origin)) + union + (with + cntdir as ( select C.sha1 as sha1, unnest(CD.directory) as directory, unnest(CD.location) as location from content as C inner join content_in_directory as CD on (CD.content = C.id) - where C.sha1 = content_id - ), - cntrev as ( - select CD.sha1 as sha1, - L.path as path, - unnest(DR.revision) as revision, - unnest(DR.location) as prefix - from cntdir as CD - inner join directory_in_revision as DR on (DR.directory = CD.directory) - inner join location as L on (L.id = CD.location) - ) - select CR.sha1 as content, - R.sha1 as revision, - R.date as date, - O.url as origin, - case DL.path - when '' then CR.path - when '.' then CR.path - else (DL.path || '/' || CR.path)::unix_path - end as path - from cntrev as CR - inner join revision as R on (R.id = CR.revision) - inner join location as DL on (DL.id = CR.prefix) - left join origin as O on (O.id = R.origin) - ) - order by date, revision, origin, path limit early_cut + where C.sha1 = content_id), + cntrev as ( + select CD.sha1 as sha1, + L.path as path, + unnest(DR.revision) as revision, + unnest(DR.location) as prefix + from cntdir as CD + inner join directory_in_revision as DR on (DR.directory = CD.directory) + inner join location as L on (L.id = CD.location)) + select CR.sha1 as content, + R.sha1 as revision, + R.date as date, + O.url as origin, + case DL.path + when '' then CR.path + when '.' then CR.path + else (DL.path || '/' || CR.path)::unix_path + end as path + from cntrev as CR + inner join revision as R on (R.id = CR.revision) + inner join location as DL on (DL.id = CR.prefix) + left join origin as O on (O.id = R.origin)) + order by date, revision, origin, path limit early_cut +$$; + +create or replace function swh_provenance_relation_get( + rel_table regclass, src_table regclass, dst_table regclass, filter integer, sha1s sha1_git[] +) + returns table ( + src sha1_git, + dst sha1_git, + path unix_path + ) + language plpgsql + stable +as $$ + declare + src_field text; + dst_field text; + proj_unnested text; + proj_location text; + join_location text; + filter_inner_result text; + filter_outer_result text; + begin + if rel_table = 'revision_before_revision'::regclass then + src_field := 'prev'; + dst_field := 'next'; + else + src_field := src_table::text; + dst_field := dst_table::text; + end if; + + if src_table in ('content'::regclass, 'directory'::regclass) then + proj_unnested := 'unnest(R.' || dst_field || ') as dst, unnest(R.location) as loc'; + join_location := 'inner join location as L on (L.id = CL.loc)'; + proj_location := 'L.path'; + else + proj_unnested := 'R.' || dst_field || ' as dst'; + join_location := ''; + proj_location := 'NULL::unix_path'; + end if; + + case filter + when 1 then + filter_inner_result := 'where S.sha1 = any($1)'; + filter_outer_result := ''; + when 2 then + filter_inner_result := ''; + filter_outer_result := 'where D.sha1 = any($1)'; + else + filter_inner_result := ''; + filter_outer_result := ''; + end case; + + return query execute format( + 'select CL.src, D.sha1 as dst, ' || proj_location || ' as path + from (select S.sha1 as src, ' || proj_unnested || ' + from %s as R + inner join %s as S on (S.id = R.' || src_field || ') + ' || filter_inner_result || ') as CL + inner join %s as D on (D.id = CL.dst) + ' || join_location || ' + ' || filter_outer_result, + rel_table, src_table, dst_table + ) using sha1s; + end; $$; \else -- -- without path and denormalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ select CL.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, '\x'::unix_path as path from ( - select C.sha1, unnest(revision) as revision - from content_in_revision as CR - inner join content as C on (C.id = CR.content) - where C.sha1=content_id + select C.sha1, unnest(revision) as revision + from content_in_revision as CR + inner join content as C on (C.id = CR.content) + where C.sha1=content_id ) as CL inner join revision as R on (R.id = CL.revision) left join origin as O on (O.id = R.origin) order by date, revision, origin, path asc limit 1 $$; - create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ - (with cntrev as ( + (with + cntrev as ( select C.sha1 as sha1, unnest(CR.revision) as revision from content_in_revision as CR inner join content as C on (C.id = CR.content) - where C.sha1 = content_id - ) - select CR.sha1 as content, - R.sha1 as revision, - R.date as date, - O.url as origin, - '\x'::unix_path as path - from cntrev as CR - inner join revision as R on (R.id = CR.revision) - left join origin as O on (O.id = R.origin) + where C.sha1 = content_id) + select CR.sha1 as content, + R.sha1 as revision, + R.date as date, + O.url as origin, + '\x'::unix_path as path + from cntrev as CR + inner join revision as R on (R.id = CR.revision) + left join origin as O on (O.id = R.origin)) + union + (with + cntdir as ( + select C.sha1 as sha1, + unnest(CD.directory) as directory + from content as C + inner join content_in_directory as CD on (CD.content = C.id) + where C.sha1 = content_id), + cntrev as ( + select CD.sha1 as sha1, + unnest(DR.revision) as revision + from cntdir as CD + inner join directory_in_revision as DR on (DR.directory = CD.directory)) + select CR.sha1 as content, + R.sha1 as revision, + R.date as date, + O.url as origin, + '\x'::unix_path as path + from cntrev as CR + inner join revision as R on (R.id = CR.revision) + left join origin as O on (O.id = R.origin)) + order by date, revision, origin, path limit early_cut +$$; + +create or replace function swh_provenance_relation_get( + rel_table regclass, src_table regclass, dst_table regclass, filter integer, sha1s sha1_git[] +) + returns table ( + src sha1_git, + dst sha1_git, + path unix_path ) - union - (with cntdir as ( - select C.sha1 as sha1, - unnest(CD.directory) as directory - from content as C - inner join content_in_directory as CD on (CD.content = C.id) - where C.sha1 = content_id - ), - cntrev as ( - select CD.sha1 as sha1, - unnest(DR.revision) as revision - from cntdir as CD - inner join directory_in_revision as DR on (DR.directory = CD.directory) - ) - select CR.sha1 as content, - R.sha1 as revision, - R.date as date, - O.url as origin, - '\x'::unix_path as path - from cntrev as CR - inner join revision as R on (R.id = CR.revision) - left join origin as O on (O.id = R.origin) - ) - order by date, revision, origin, path limit early_cut + language plpgsql + stable +as $$ + declare + src_field text; + dst_field text; + proj_unnested text; + proj_location text; + join_location text; + filter_inner_result text; + filter_outer_result text; + begin + if rel_table = 'revision_before_revision'::regclass then + src_field := 'prev'; + dst_field := 'next'; + else + src_field := src_table::text; + dst_field := dst_table::text; + end if; + + if src_table in ('content'::regclass, 'directory'::regclass) then + proj_unnested := 'unnest(R.' || dst_field || ') as dst'; + else + proj_unnested := 'R.' || dst_field || ' as dst'; + end if; + + case filter + when 1 then + filter_inner_result := 'where S.sha1 = any($1)'; + filter_outer_result := ''; + when 2 then + filter_inner_result := ''; + filter_outer_result := 'where D.sha1 = any($1)'; + else + filter_inner_result := ''; + filter_outer_result := ''; + end case; + + return query execute format( + 'select CL.src, D.sha1 as dst, NULL::unix_path as path + from (select S.sha1 as src, ' || proj_unnested || ' + from %s as R + inner join %s as S on (S.id = R.' || src_field || ') + ' || filter_inner_result || ') as CL + inner join %s as D on (D.id = CL.dst) + ' || filter_outer_result, + rel_table, src_table, dst_table + ) using sha1s; + end; $$; \endif -- :dbflavor_with_path \endif -- :dbflavor_norm diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index e9cb748..67d98c5 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,153 +1,153 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta, timezone from os import path from typing import Any, Dict, Iterable, Iterator from _pytest.fixtures import SubRequest import msgpack import psycopg2.extensions import pymongo.database import pytest from pytest_postgresql.factories import postgresql from swh.journal.serializers import msgpack_ext_hook from swh.provenance import get_provenance, get_provenance_storage from swh.provenance.api.client import RemoteProvenanceStorage import swh.provenance.api.server as server from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface from swh.provenance.storage.archive import ArchiveStorage from swh.storage.interface import StorageInterface from swh.storage.replay import process_replay_objects @pytest.fixture( params=[ "with-path", "without-path", "with-path-denormalized", "without-path-denormalized", ] ) def provenance_postgresqldb( request: SubRequest, postgresql: psycopg2.extensions.connection, ) -> Dict[str, str]: """return a working and initialized provenance db""" from swh.core.cli.db import populate_database_for_package populate_database_for_package( "swh.provenance", postgresql.dsn, flavor=request.param ) return postgresql.get_dsn_parameters() # the Flask app used as server in these tests @pytest.fixture def app( provenance_postgresqldb: Dict[str, str] ) -> Iterator[server.ProvenanceStorageServerApp]: assert hasattr(server, "storage") server.storage = get_provenance_storage( cls="postgresql", db=provenance_postgresqldb ) yield server.app # the RPCClient class used as client used in these tests @pytest.fixture def swh_rpc_client_class() -> type: return RemoteProvenanceStorage -@pytest.fixture(params=["mongodb"]) +@pytest.fixture(params=["mongodb", "postgresql", "remote"]) def provenance_storage( request: SubRequest, provenance_postgresqldb: Dict[str, str], mongodb: pymongo.database.Database, swh_rpc_client: RemoteProvenanceStorage, ) -> ProvenanceStorageInterface: """Return a working and initialized ProvenanceStorageInterface object""" if request.param == "remote": assert isinstance(swh_rpc_client, ProvenanceStorageInterface) return swh_rpc_client elif request.param == "mongodb": from swh.provenance.mongo.backend import ProvenanceStorageMongoDb return ProvenanceStorageMongoDb(mongodb) else: # in test sessions, we DO want to raise any exception occurring at commit time return get_provenance_storage( cls=request.param, db=provenance_postgresqldb, raise_on_commit=True ) provenance_postgresql = postgresql("postgresql_proc", dbname="provenance_tests") @pytest.fixture def provenance( provenance_postgresql: psycopg2.extensions.connection, ) -> ProvenanceInterface: """Return a working and initialized ProvenanceInterface object""" from swh.core.cli.db import populate_database_for_package populate_database_for_package( "swh.provenance", provenance_postgresql.dsn, flavor="with-path" ) # in test sessions, we DO want to raise any exception occurring at commit time return get_provenance( cls="postgresql", db=provenance_postgresql.get_dsn_parameters(), raise_on_commit=True, ) @pytest.fixture def archive(swh_storage: StorageInterface) -> ArchiveInterface: """Return an ArchiveStorage-based ArchiveInterface object""" return ArchiveStorage(swh_storage) def get_datafile(fname: str) -> str: return path.join(path.dirname(__file__), "data", fname) def load_repo_data(repo: str) -> Dict[str, Any]: data: Dict[str, Any] = {} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: unpacker = msgpack.Unpacker( fobj, raw=False, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) for objtype, objd in unpacker: data.setdefault(objtype, []).append(objd) return data def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: return {k: v for (k, v) in d.items() if k in keys} def fill_storage(storage: StorageInterface, data: Dict[str, Any]) -> None: process_replay_objects(data, storage=storage) # TODO: remove this function in favour of TimestampWithTimezone.to_datetime # from swh.model.model def ts2dt(ts: Dict[str, Any]) -> datetime: timestamp = datetime.fromtimestamp( ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) ) return timestamp.replace(microsecond=ts["timestamp"]["microseconds"])