diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py index 624ccae..ecc61f2 100644 --- a/swh/provenance/postgresql/provenance.py +++ b/swh/provenance/postgresql/provenance.py @@ -1,391 +1,286 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import itertools import logging from typing import Dict, Generator, Iterable, List, Optional, Set import psycopg2.extensions import psycopg2.extras from typing_extensions import Literal from swh.core.db import BaseDb from swh.model.model import Sha1Git from ..interface import ( EntityType, ProvenanceResult, RelationData, RelationType, RevisionData, ) class ProvenanceStoragePostgreSql: def __init__( self, conn: psycopg2.extensions.connection, raise_on_commit: bool = False ) -> None: BaseDb.adapt_conn(conn) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # XXX: not sure this is the best place to do it! sql = "SET timezone TO 'UTC'" self.cursor.execute(sql) self._flavor: Optional[str] = None self.raise_on_commit = raise_on_commit @property def flavor(self) -> str: if self._flavor is None: sql = "SELECT swh_get_dbflavor() AS flavor" self.cursor.execute(sql) self._flavor = self.cursor.fetchone()["flavor"] assert self._flavor is not None return self._flavor @property def denormalized(self) -> bool: return "denormalized" in self.flavor def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: sql = "SELECT * FROM swh_provenance_content_find_first(%s)" self.cursor.execute(sql, (id,)) row = self.cursor.fetchone() return ProvenanceResult(**row) if row is not None else None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: sql = "SELECT * FROM swh_provenance_content_find_all(%s, %s)" self.cursor.execute(sql, (id, limit)) yield from (ProvenanceResult(**row) for row in self.cursor.fetchall()) def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("content", dates) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("content", ids) def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("directory", dates) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("directory", ids) def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: sql = f"SELECT sha1 FROM {entity.value}" self.cursor.execute(sql) return {row["sha1"] for row in self.cursor.fetchall()} def location_get(self) -> Set[bytes]: sql = "SELECT location.path AS path FROM location" self.cursor.execute(sql) return {row["path"] for row in self.cursor.fetchall()} def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: try: if urls: sql = """ LOCK TABLE ONLY origin; INSERT INTO origin(sha1, url) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, urls.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: urls: Dict[Sha1Git, str] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, url FROM origin WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) urls.update((row["sha1"], row["url"]) for row in self.cursor.fetchall()) return urls def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("revision", dates) def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: try: if origins: sql = """ LOCK TABLE ONLY revision; INSERT INTO revision(sha1, origin) (SELECT V.rev AS sha1, O.id AS origin FROM (VALUES %s) AS V(rev, org) JOIN origin AS O ON (O.sha1=V.org)) ON CONFLICT (sha1) DO UPDATE SET origin=EXCLUDED.origin """ psycopg2.extras.execute_values(self.cursor, sql, origins.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: result: Dict[Sha1Git, RevisionData] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT R.sha1, R.date, O.sha1 AS origin FROM revision AS R LEFT JOIN origin AS O ON (O.id=R.origin) WHERE R.sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) result.update( (row["sha1"], RevisionData(date=row["date"], origin=row["origin"])) for row in self.cursor.fetchall() ) return result def relation_add( self, relation: RelationType, data: Iterable[RelationData] ) -> bool: try: - rows = tuple((rel.src, rel.dst, rel.path) for rel in data) + rows = [(rel.src, rel.dst, rel.path) for rel in data] if rows: - table = relation.value - src, *_, dst = table.split("_") + rel_table = relation.value + src_table, *_, dst_table = rel_table.split("_") - if src != "origin": + if src_table != "origin": # Origin entries should be inserted previously as they require extra # non-null information srcs = tuple(set((sha1,) for (sha1, _, _) in rows)) sql = f""" - LOCK TABLE ONLY {src}; - INSERT INTO {src}(sha1) VALUES %s + LOCK TABLE ONLY {src_table}; + INSERT INTO {src_table}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, srcs) - if dst != "origin": + if dst_table != "origin": # Origin entries should be inserted previously as they require extra # non-null information dsts = tuple(set((sha1,) for (_, sha1, _) in rows)) sql = f""" - LOCK TABLE ONLY {dst}; - INSERT INTO {dst}(sha1) VALUES %s + LOCK TABLE ONLY {dst_table}; + INSERT INTO {dst_table}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, dsts) - ######################################################################## - if self.denormalized: - if self.with_path(): - if src in ("content", "directory"): - locations = tuple(set((path,) for (_, _, path) in rows)) - sql = """ - LOCK TABLE ONLY location; - INSERT INTO location(path) VALUES %s - ON CONFLICT (path) DO NOTHING - """ - psycopg2.extras.execute_values(self.cursor, sql, locations) - - selected = "ARRAY_AGG(D.id), ARRAY_AGG(L.id)" - join_location = ( - "INNER JOIN location AS L ON (L.path=V.path)" - ) - grouped_by = "GROUP BY S.id" - on_conflict = f""" - ({src}) DO UPDATE - SET {dst}=ARRAY( - SELECT UNNEST({table}.{dst} || EXCLUDED.{dst}) - ), location=ARRAY( - SELECT UNNEST({table}.location || EXCLUDED.location) - ) - """ - else: - selected = "D.id" - join_location = "" - grouped_by = "" - on_conflict = "DO NOTHING" - - sql_l = [ - f"INSERT INTO {table}", - f" SELECT S.id, {selected}", - " FROM (VALUES %s) AS V(src, dst, path)", - f" INNER JOIN {src} AS S ON (S.sha1=V.src)", - f" INNER JOIN {dst} AS D ON (D.sha1=V.dst)", - join_location, - grouped_by, - f" ON CONFLICT {on_conflict}", - ] - - else: - if src in ("content", "directory"): - select_dst = "ARRAY_AGG(D.id)" - grouped_by = "GROUP BY S.id" - on_conflict = f""" - ({src}) DO UPDATE - SET {dst}=ARRAY( - SELECT UNNEST({table}.{dst} || EXCLUDED.{dst}) - ), location=ARRAY( - SELECT UNNEST({table}.location || EXCLUDED.location) - ) - """ - else: - select_dst = "D.id" - grouped_by = "" - on_conflict = "DO NOTHING" - - sql_l = [ - f"INSERT INTO {table}", - f" SELECT S.id, {select_dst}", - " FROM (VALUES %s) AS V(src, dst, path)", - f" INNER JOIN {src} AS S ON (S.sha1=V.src)", - f" INNER JOIN {dst} AS D ON (D.sha1=V.dst)", - grouped_by, - f" ON CONFLICT {on_conflict}", - ] - - else: - if self.with_path(): - if src in ("content", "directory"): - locations = tuple(set((path,) for (_, _, path) in rows)) - sql = """ - LOCK TABLE ONLY location; - INSERT INTO location(path) VALUES %s - ON CONFLICT (path) DO NOTHING - """ - psycopg2.extras.execute_values(self.cursor, sql, locations) - - select_location = ", L.id" - join_location = ( - "INNER JOIN location AS L ON (L.path=V.path)" - ) - else: - select_location = "" - join_location = "" - - sql_l = [ - f"INSERT INTO {table}", - f" SELECT S.id, D.id{select_location}", - " FROM (VALUES %s) AS V(src, dst, path)", - f" INNER JOIN {src} AS S ON (S.sha1=V.src)", - f" INNER JOIN {dst} AS D ON (D.sha1=V.dst)", - join_location, - " ON CONFLICT DO NOTHING", - ] - - else: - sql_l = [ - f"INSERT INTO {table}", - " SELECT S.id, D.id", - " FROM (VALUES %s) AS V(src, dst, path)", - f" INNER JOIN {src} AS S ON (S.sha1=V.src)", - f" INNER JOIN {dst} AS D ON (D.sha1=V.dst)", - " ON CONFLICT DO NOTHING", - ] - ######################################################################## - - sql = "\n".join(sql_l) - psycopg2.extras.execute_values(self.cursor, sql, rows) + sql = """ + SELECT * FROM swh_provenance_relation_add( + %s, %s, %s, %s::rel_row[] + ) + """ + self.cursor.execute(sql, (rel_table, src_table, dst_table, rows)) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Set[RelationData]: return self._relation_get(relation, ids, reverse) def relation_get_all(self, relation: RelationType) -> Set[RelationData]: return self._relation_get(relation, None) def _entity_get_date( self, entity: Literal["content", "directory", "revision"], ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: dates: Dict[Sha1Git, datetime] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date FROM {entity} WHERE sha1 IN ({values}) AND date IS NOT NULL """ self.cursor.execute(sql, sha1s) dates.update((row["sha1"], row["date"]) for row in self.cursor.fetchall()) return dates def _entity_set_date( self, entity: Literal["content", "directory", "revision"], data: Dict[Sha1Git, datetime], ) -> bool: try: if data: sql = f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """ psycopg2.extras.execute_values(self.cursor, sql, data.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def _relation_get( self, relation: RelationType, ids: Optional[Iterable[Sha1Git]], reverse: bool = False, ) -> Set[RelationData]: result: Set[RelationData] = set() sha1s: List[Sha1Git] if ids is not None: sha1s = list(ids) filter = 1 if not reverse else 2 else: sha1s = [] filter = 0 if filter == 0 or sha1s: rel_table = relation.value src_table, *_, dst_table = rel_table.split("_") sql = "SELECT * FROM swh_provenance_relation_get(%s, %s, %s, %s, %s)" self.cursor.execute(sql, (rel_table, src_table, dst_table, filter, sha1s)) result.update(RelationData(**row) for row in self.cursor.fetchall()) return result def with_path(self) -> bool: return "with-path" in self.flavor diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql index 4940367..72939d6 100644 --- a/swh/provenance/sql/30-schema.sql +++ b/swh/provenance/sql/30-schema.sql @@ -1,151 +1,155 @@ -- psql variables to get the current database flavor select position('denormalized' in swh_get_dbflavor()::text) = 0 as dbflavor_norm \gset create table dbversion ( version int primary key, release timestamptz, description text ); comment on table dbversion is 'Details of current db version'; comment on column dbversion.version is 'SQL schema version'; comment on column dbversion.release is 'Version deployment timestamp'; comment on column dbversion.description is 'Release description'; -- latest schema version insert into dbversion(version, release, description) values(1, now(), 'Work In Progress'); -- a Git object ID, i.e., a Git-style salted SHA1 checksum create domain sha1_git as bytea check (length(value) = 20); -- UNIX path (absolute, relative, individual path component, etc.) create domain unix_path as bytea; +-- a relation entry row, i.e. sr/dst Git object ID and optional UNIX path +create type rel_row as (src sha1_git, dst sha1_git, path unix_path); + + -- entity tables create table content ( id bigserial primary key, -- internal identifier of the content blob sha1 sha1_git unique not null, -- intrinsic identifier of the content blob date timestamptz -- timestamp of the revision where the blob appears early ); comment on column content.id is 'Content internal identifier'; comment on column content.sha1 is 'Content intrinsic identifier'; comment on column content.date is 'Earliest timestamp for the content (first seen time)'; create table directory ( id bigserial primary key, -- internal identifier of the directory appearing in an isochrone inner frontier sha1 sha1_git unique not null, -- intrinsic identifier of the directory date timestamptz -- max timestamp among those of the directory children's ); comment on column directory.id is 'Directory internal identifier'; comment on column directory.sha1 is 'Directory intrinsic identifier'; comment on column directory.date is 'Latest timestamp for the content in the directory'; create table revision ( id bigserial primary key, -- internal identifier of the revision sha1 sha1_git unique not null, -- intrinsic identifier of the revision date timestamptz, -- timestamp of the revision origin bigint -- id of the preferred origin - -- foreign key (org) references origin (id) + -- foreign key (origin) references origin (id) ); comment on column revision.id is 'Revision internal identifier'; comment on column revision.sha1 is 'Revision intrinsic identifier'; comment on column revision.date is 'Revision timestamp'; comment on column revision.origin is 'preferred origin for the revision'; create table location ( id bigserial primary key, -- internal identifier of the location path unix_path unique not null -- path to the location ); comment on column location.id is 'Location internal identifier'; comment on column location.path is 'Path to the location'; create table origin ( id bigserial primary key, -- internal identifier of the origin sha1 sha1_git unique not null, -- intrinsic identifier of the origin url text unique not null -- url of the origin ); comment on column origin.id is 'Origin internal identifier'; comment on column origin.sha1 is 'Origin intrinsic identifier'; comment on column origin.url is 'URL of the origin'; -- relation tables create table content_in_revision ( content bigint not null, -- internal identifier of the content blob \if :dbflavor_norm revision bigint not null, -- internal identifier of the revision where the blob appears for the first time location bigint -- location of the content relative to the revision root directory \else revision bigint[], -- internal identifier of the revision where the blob appears for the first time location bigint[] -- location of the content relative to the revision root directory \endif - -- foreign key (blob) references content (id), - -- foreign key (rev) references revision (id), - -- foreign key (loc) references location (id) + -- foreign key (content) references content (id), + -- foreign key (revision) references revision (id), + -- foreign key (location) references location (id) ); comment on column content_in_revision.content is 'Content internal identifier'; comment on column content_in_revision.revision is 'Revision internal identifier'; comment on column content_in_revision.location is 'Location of content in revision'; create table content_in_directory ( content bigint not null, -- internal identifier of the content blob \if :dbflavor_norm directory bigint not null, -- internal identifier of the directory containing the blob location bigint -- location of the content relative to its parent directory in the isochrone frontier \else - directory bigint[], - location bigint[] + directory bigint[], -- internal identifier of the directory containing the blob + location bigint[] -- location of the content relative to its parent directory in the isochrone frontier \endif - -- foreign key (blob) references content (id), - -- foreign key (dir) references directory (id), - -- foreign key (loc) references location (id) + -- foreign key (content) references content (id), + -- foreign key (directory) references directory (id), + -- foreign key (location) references location (id) ); comment on column content_in_directory.content is 'Content internal identifier'; comment on column content_in_directory.directory is 'Directory internal identifier'; comment on column content_in_directory.location is 'Location of content in directory'; create table directory_in_revision ( directory bigint not null, -- internal identifier of the directory appearing in the revision \if :dbflavor_norm revision bigint not null, -- internal identifier of the revision containing the directory location bigint -- location of the directory relative to the revision root directory \else - revision bigint[], - location bigint[] + revision bigint[], -- internal identifier of the revision containing the directory + location bigint[] -- location of the directory relative to the revision root directory \endif - -- foreign key (dir) references directory (id), - -- foreign key (rev) references revision (id), - -- foreign key (loc) references location (id) + -- foreign key (directory) references directory (id), + -- foreign key (revision) references revision (id), + -- foreign key (location) references location (id) ); comment on column directory_in_revision.directory is 'Directory internal identifier'; comment on column directory_in_revision.revision is 'Revision internal identifier'; comment on column directory_in_revision.location is 'Location of directory in revision'; create table revision_in_origin ( revision bigint not null, -- internal identifier of the revision poined by the origin origin bigint not null -- internal identifier of the origin that points to the revision - -- foreign key (rev) references revision (id), - -- foreign key (org) references origin (id) + -- foreign key (revision) references revision (id), + -- foreign key (origin) references origin (id) ); comment on column revision_in_origin.revision is 'Revision internal identifier'; comment on column revision_in_origin.origin is 'Origin internal identifier'; create table revision_before_revision ( prev bigserial not null, -- internal identifier of the source revision next bigserial not null -- internal identifier of the destination revision -- foreign key (prev) references revision (id), -- foreign key (next) references revision (id) ); comment on column revision_before_revision.prev is 'Source revision internal identifier'; comment on column revision_before_revision.next is 'Destination revision internal identifier'; diff --git a/swh/provenance/sql/40-funcs.sql b/swh/provenance/sql/40-funcs.sql index 50449b4..076ee25 100644 --- a/swh/provenance/sql/40-funcs.sql +++ b/swh/provenance/sql/40-funcs.sql @@ -1,552 +1,714 @@ -- psql variables to get the current database flavor select position('denormalized' in swh_get_dbflavor()::text) = 0 as dbflavor_norm \gset select position('with-path' in swh_get_dbflavor()::text) != 0 as dbflavor_with_path \gset \if :dbflavor_norm \if :dbflavor_with_path -- -- with path and normalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from content as C inner join content_in_revision as CR on (CR.content = C.id) inner join location as L on (L.id = CR.location) inner join revision as R on (R.id = CR.revision) left join origin as O on (O.id = R.origin) where C.sha1 = content_id order by date, revision, origin, path asc limit 1 $$; create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ (select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from content as C inner join content_in_revision as CR on (CR.content = C.id) inner join location as L on (L.id = CR.location) inner join revision as R on (R.id = CR.revision) left join origin as O on (O.id = R.origin) where C.sha1 = content_id) union (select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, case DL.path when '' then CL.path when '.' then CL.path else (DL.path || '/' || CL.path)::unix_path end as path from content as C inner join content_in_directory as CD on (CD.content = C.id) inner join directory_in_revision as DR on (DR.directory = CD.directory) inner join revision as R on (R.id = DR.revision) inner join location as CL on (CL.id = CD.location) inner join location as DL on (DL.id = DR.location) left join origin as O on (O.id = R.origin) where C.sha1 = content_id) order by date, revision, origin, path limit early_cut $$; +create or replace function swh_provenance_relation_add( + rel_table regclass, src_table regclass, dst_table regclass, rel_data rel_row[] +) + returns void + language plpgsql + volatile +as $$ + declare + select_fields text; + join_location text; + begin + if src_table in ('content'::regclass, 'directory'::regclass) then + lock table only location; + insert into location(path) + select V.path + from unnest(rel_data) as V + on conflict (path) do nothing; + + select_fields := 'D.id, L.id'; + join_location := 'inner join location as L on (L.path = V.path)'; + else + select_fields := 'D.id'; + join_location := ''; + end if; + + execute format( + 'lock table only %s; + insert into %s + select S.id, ' || select_fields || ' + from unnest($1) as V + inner join %s as S on (S.sha1 = V.src) + inner join %s as D on (D.sha1 = V.dst) + ' || join_location || ' + on conflict do nothing', + rel_table, rel_table, src_table, dst_table + ) using rel_data; + end; +$$; + create or replace function swh_provenance_relation_get( rel_table regclass, src_table regclass, dst_table regclass, filter integer, sha1s sha1_git[] ) returns table ( src sha1_git, dst sha1_git, path unix_path ) language plpgsql stable as $$ declare src_field text; dst_field text; join_location text; proj_location text; filter_result text; begin if rel_table = 'revision_before_revision'::regclass then src_field := 'prev'; dst_field := 'next'; else src_field := src_table::text; dst_field := dst_table::text; end if; if src_table in ('content'::regclass, 'directory'::regclass) then join_location := 'inner join location as L on (L.id = R.location)'; proj_location := 'L.path'; else join_location := ''; proj_location := 'NULL::unix_path'; end if; case filter when 1 then filter_result := 'where S.sha1 = any($1)'; when 2 then filter_result := 'where D.sha1 = any($1)'; else filter_result := ''; end case; return query execute format( 'select S.sha1 as src, D.sha1 as dst, ' || proj_location || ' as path from %s as R inner join %s as S on (S.id = R.' || src_field || ') inner join %s as D on (D.id = R.' || dst_field || ') ' || join_location || ' ' || filter_result, rel_table, src_table, dst_table ) using sha1s; end; $$; \else -- -- without path and normalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, '\x'::unix_path as path from content as C inner join content_in_revision as CR on (CR.content = C.id) inner join revision as R on (R.id = CR.revision) left join origin as O on (O.id = R.origin) where C.sha1 = content_id order by date, revision, origin asc limit 1 $$; create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ (select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, '\x'::unix_path as path from content as C inner join content_in_revision as CR on (CR.content = C.id) inner join revision as R on (R.id = CR.revision) left join origin as O on (O.id = R.origin) where C.sha1 = content_id) union (select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, '\x'::unix_path as path from content as C inner join content_in_directory as CD on (CD.content = C.id) inner join directory_in_revision as DR on (DR.directory = CD.directory) inner join revision as R on (R.id = DR.revision) left join origin as O on (O.id = R.origin) where C.sha1 = content_id) order by date, revision, origin, path limit early_cut $$; +create or replace function swh_provenance_relation_add( + rel_table regclass, src_table regclass, dst_table regclass, rel_data rel_row[] +) + returns void + language plpgsql + volatile +as $$ + begin + execute format( + 'lock table only %s; + insert into %s + select S.id, D.id + from unnest($1) as V + inner join %s as S on (S.sha1 = V.src) + inner join %s as D on (D.sha1 = V.dst) + on conflict do nothing', + rel_table, rel_table, src_table, dst_table + ) using rel_data; + end; +$$; + create or replace function swh_provenance_relation_get( rel_table regclass, src_table regclass, dst_table regclass, filter integer, sha1s sha1_git[] ) returns table ( src sha1_git, dst sha1_git, path unix_path ) language plpgsql stable as $$ declare src_field text; dst_field text; filter_result text; begin if rel_table = 'revision_before_revision'::regclass then src_field := 'prev'; dst_field := 'next'; else src_field := src_table::text; dst_field := dst_table::text; end if; case filter when 1 then filter_result := 'where S.sha1 = any($1)'; when 2 then filter_result := 'where D.sha1 = any($1)'; else filter_result := ''; end case; return query execute format( 'select S.sha1 as src, D.sha1 as dst, NULL::unix_path as path from %s as R inner join %s as S on (S.id = R.' || src_field || ') inner join %s as D on (D.id = R.' || dst_field || ') ' || filter_result, rel_table, src_table, dst_table ) using sha1s; end; $$; -- :dbflavor_with_path \endif -- :dbflavor_norm \else \if :dbflavor_with_path -- -- with path and denormalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ select CL.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from ( select C.sha1 as sha1, unnest(CR.revision) as revision, unnest(CR.location) as location from content_in_revision as CR inner join content as C on (C.id = CR.content) where C.sha1 = content_id ) as CL inner join revision as R on (R.id = CL.revision) inner join location as L on (L.id = CL.location) left join origin as O on (O.id = R.origin) order by date, revision, origin, path asc limit 1 $$; create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ (with cntrev as ( select C.sha1 as sha1, unnest(CR.revision) as revision, unnest(CR.location) as location from content_in_revision as CR inner join content as C on (C.id = CR.content) where C.sha1 = content_id) select CR.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from cntrev as CR inner join revision as R on (R.id = CR.revision) inner join location as L on (L.id = CR.location) left join origin as O on (O.id = R.origin)) union (with cntdir as ( select C.sha1 as sha1, unnest(CD.directory) as directory, unnest(CD.location) as location from content as C inner join content_in_directory as CD on (CD.content = C.id) where C.sha1 = content_id), cntrev as ( select CD.sha1 as sha1, L.path as path, unnest(DR.revision) as revision, unnest(DR.location) as prefix from cntdir as CD inner join directory_in_revision as DR on (DR.directory = CD.directory) inner join location as L on (L.id = CD.location)) select CR.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, case DL.path when '' then CR.path when '.' then CR.path else (DL.path || '/' || CR.path)::unix_path end as path from cntrev as CR inner join revision as R on (R.id = CR.revision) inner join location as DL on (DL.id = CR.prefix) left join origin as O on (O.id = R.origin)) order by date, revision, origin, path limit early_cut $$; +create or replace function swh_provenance_relation_add( + rel_table regclass, src_table regclass, dst_table regclass, rel_data rel_row[] +) + returns void + language plpgsql + volatile +as $$ + declare + select_fields text; + join_location text; + group_entries text; + on_conflict text; + begin + if src_table in ('content'::regclass, 'directory'::regclass) then + lock table only location; + insert into location(path) + select V.path + from unnest(rel_data) as V + on conflict (path) do nothing; + + select_fields := 'array_agg(D.id), array_agg(L.id)'; + join_location := 'inner join location as L on (L.path = V.path)'; + group_entries := 'group by S.id'; + on_conflict := format(' + (%s) do update + set %s=array( + select unnest( + %s.' || dst_table::text || ' || excluded.' || dst_table::text || ' + ) + ), location=array( + select unnest( + %s.location || excluded.location + ) + )', + src_table, dst_table, rel_table, rel_table, rel_table + ); + else + select_fields := 'D.id'; + join_location := ''; + group_entries := ''; + on_conflict := 'do nothing'; + end if; + + execute format( + 'lock table only %s; + insert into %s + select S.id, ' || select_fields || ' + from unnest($1) as V + inner join %s as S on (S.sha1 = V.src) + inner join %s as D on (D.sha1 = V.dst) + ' || join_location || ' + ' || group_entries || ' + on conflict ' || on_conflict, + rel_table, rel_table, src_table, dst_table + ) using rel_data; + end; +$$; + create or replace function swh_provenance_relation_get( rel_table regclass, src_table regclass, dst_table regclass, filter integer, sha1s sha1_git[] ) returns table ( src sha1_git, dst sha1_git, path unix_path ) language plpgsql stable as $$ declare src_field text; dst_field text; proj_unnested text; proj_location text; join_location text; filter_inner_result text; filter_outer_result text; begin if rel_table = 'revision_before_revision'::regclass then src_field := 'prev'; dst_field := 'next'; else src_field := src_table::text; dst_field := dst_table::text; end if; if src_table in ('content'::regclass, 'directory'::regclass) then proj_unnested := 'unnest(R.' || dst_field || ') as dst, unnest(R.location) as loc'; join_location := 'inner join location as L on (L.id = CL.loc)'; proj_location := 'L.path'; else proj_unnested := 'R.' || dst_field || ' as dst'; join_location := ''; proj_location := 'NULL::unix_path'; end if; case filter when 1 then filter_inner_result := 'where S.sha1 = any($1)'; filter_outer_result := ''; when 2 then filter_inner_result := ''; filter_outer_result := 'where D.sha1 = any($1)'; else filter_inner_result := ''; filter_outer_result := ''; end case; return query execute format( 'select CL.src, D.sha1 as dst, ' || proj_location || ' as path from (select S.sha1 as src, ' || proj_unnested || ' from %s as R inner join %s as S on (S.id = R.' || src_field || ') ' || filter_inner_result || ') as CL inner join %s as D on (D.id = CL.dst) ' || join_location || ' ' || filter_outer_result, rel_table, src_table, dst_table ) using sha1s; end; $$; \else -- -- without path and denormalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ select CL.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, '\x'::unix_path as path from ( select C.sha1, unnest(revision) as revision from content_in_revision as CR inner join content as C on (C.id = CR.content) where C.sha1=content_id ) as CL inner join revision as R on (R.id = CL.revision) left join origin as O on (O.id = R.origin) order by date, revision, origin, path asc limit 1 $$; create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ (with cntrev as ( select C.sha1 as sha1, unnest(CR.revision) as revision from content_in_revision as CR inner join content as C on (C.id = CR.content) where C.sha1 = content_id) select CR.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, '\x'::unix_path as path from cntrev as CR inner join revision as R on (R.id = CR.revision) left join origin as O on (O.id = R.origin)) union (with cntdir as ( select C.sha1 as sha1, unnest(CD.directory) as directory from content as C inner join content_in_directory as CD on (CD.content = C.id) where C.sha1 = content_id), cntrev as ( select CD.sha1 as sha1, unnest(DR.revision) as revision from cntdir as CD inner join directory_in_revision as DR on (DR.directory = CD.directory)) select CR.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, '\x'::unix_path as path from cntrev as CR inner join revision as R on (R.id = CR.revision) left join origin as O on (O.id = R.origin)) order by date, revision, origin, path limit early_cut $$; +create or replace function swh_provenance_relation_add( + rel_table regclass, src_table regclass, dst_table regclass, rel_data rel_row[] +) + returns void + language plpgsql + volatile +as $$ + declare + select_fields text; + group_entries text; + on_conflict text; + begin + if src_table in ('content'::regclass, 'directory'::regclass) then + select_fields := 'array_agg(D.id)'; + group_entries := 'group by S.id'; + on_conflict := format(' + (%s) do update + set %s=array( + select unnest( + %s.' || dst_table::text || ' || excluded.' || dst_table::text || ' + ) + )', + src_table, dst_table, rel_table, rel_table + ); + else + select_fields := 'D.id'; + group_entries := ''; + on_conflict := 'do nothing'; + end if; + + execute format( + 'lock table only %s; + insert into %s + select S.id, ' || select_fields || ' + from unnest($1) as V + inner join %s as S on (S.sha1 = V.src) + inner join %s as D on (D.sha1 = V.dst) + ' || group_entries || ' + on conflict ' || on_conflict, + rel_table, rel_table, src_table, dst_table + ) using rel_data; + end; +$$; + create or replace function swh_provenance_relation_get( rel_table regclass, src_table regclass, dst_table regclass, filter integer, sha1s sha1_git[] ) returns table ( src sha1_git, dst sha1_git, path unix_path ) language plpgsql stable as $$ declare src_field text; dst_field text; proj_unnested text; proj_location text; join_location text; filter_inner_result text; filter_outer_result text; begin if rel_table = 'revision_before_revision'::regclass then src_field := 'prev'; dst_field := 'next'; else src_field := src_table::text; dst_field := dst_table::text; end if; if src_table in ('content'::regclass, 'directory'::regclass) then proj_unnested := 'unnest(R.' || dst_field || ') as dst'; else proj_unnested := 'R.' || dst_field || ' as dst'; end if; case filter when 1 then filter_inner_result := 'where S.sha1 = any($1)'; filter_outer_result := ''; when 2 then filter_inner_result := ''; filter_outer_result := 'where D.sha1 = any($1)'; else filter_inner_result := ''; filter_outer_result := ''; end case; return query execute format( 'select CL.src, D.sha1 as dst, NULL::unix_path as path from (select S.sha1 as src, ' || proj_unnested || ' from %s as R inner join %s as S on (S.id = R.' || src_field || ') ' || filter_inner_result || ') as CL inner join %s as D on (D.id = CL.dst) ' || filter_outer_result, rel_table, src_table, dst_table ) using sha1s; end; $$; \endif -- :dbflavor_with_path \endif -- :dbflavor_norm