diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index c688eeb..67195ab 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,518 +1,502 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import hashlib import logging import os from types import TracebackType from typing import Dict, Generator, Iterable, Optional, Set, Tuple, Type from typing_extensions import Literal, TypedDict from swh.core.statsd import statsd from swh.model.model import Sha1Git from .interface import ProvenanceInterface from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry from .storage.interface import ( DirectoryData, ProvenanceResult, ProvenanceStorageInterface, RelationData, RelationType, RevisionData, ) from .util import path_normalize LOGGER = logging.getLogger(__name__) BACKEND_DURATION_METRIC = "swh_provenance_backend_duration_seconds" BACKEND_OPERATIONS_METRIC = "swh_provenance_backend_operations_total" class DatetimeCache(TypedDict): data: Dict[Sha1Git, Optional[datetime]] # None means unknown added: Set[Sha1Git] class OriginCache(TypedDict): data: Dict[Sha1Git, str] added: Set[Sha1Git] class RevisionCache(TypedDict): data: Dict[Sha1Git, Sha1Git] added: Set[Sha1Git] class ProvenanceCache(TypedDict): content: DatetimeCache directory: DatetimeCache directory_flatten: Dict[Sha1Git, Optional[bool]] # None means unknown revision: DatetimeCache # below are insertion caches only content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] # these two are for the origin layer origin: OriginCache revision_origin: RevisionCache revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] def new_cache() -> ProvenanceCache: return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), directory_flatten={}, revision=DatetimeCache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), origin=OriginCache(data={}, added=set()), revision_origin=RevisionCache(data={}, added=set()), revision_before_revision={}, revision_in_origin=set(), ) class Provenance: MAX_CACHE_ELEMENTS = 40000 def __init__(self, storage: ProvenanceStorageInterface) -> None: self.storage = storage self.cache = new_cache() def __enter__(self) -> ProvenanceInterface: self.open() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: self.close() def _flush_limit_reached(self) -> bool: return sum(self._get_cache_stats().values()) > self.MAX_CACHE_ELEMENTS def _get_cache_stats(self) -> Dict[str, int]: return { k: len(v["data"]) if (isinstance(v, dict) and v.get("data") is not None) else len(v) # type: ignore for (k, v) in self.cache.items() } def clear_caches(self) -> None: self.cache = new_cache() def close(self) -> None: self.storage.close() @statsd.timed(metric=BACKEND_DURATION_METRIC, tags={"method": "flush"}) def flush(self) -> None: self.flush_revision_content_layer() self.flush_origin_revision_layer() self.clear_caches() def flush_if_necessary(self) -> bool: """Flush if the number of cached information reached a limit.""" LOGGER.debug("Cache stats: %s", self._get_cache_stats()) if self._flush_limit_reached(): self.flush() return True else: return False @statsd.timed( metric=BACKEND_DURATION_METRIC, tags={"method": "flush_origin_revision"} ) def flush_origin_revision_layer(self) -> None: # Origins and revisions should be inserted first so that internal ids' # resolution works properly. urls = { sha1: url for sha1, url in self.cache["origin"]["data"].items() if sha1 in self.cache["origin"]["added"] } if urls: while not self.storage.origin_add(urls): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_origin_revision_retry_origin"}, ) LOGGER.warning( "Unable to write origins urls to the storage. Retrying..." ) rev_orgs = { # Destinations in this relation should match origins in the next one **{ src: RevisionData(date=None, origin=None) for src in self.cache["revision_before_revision"] }, **{ # This relation comes second so that non-None origins take precedence src: RevisionData(date=None, origin=org) for src, org in self.cache["revision_in_origin"] }, } if rev_orgs: while not self.storage.revision_add(rev_orgs): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_origin_revision_retry_revision"}, ) LOGGER.warning( "Unable to write revision entities to the storage. Retrying..." ) # Second, flat models for revisions' histories (ie. revision-before-revision). if self.cache["revision_before_revision"]: rev_before_rev = { src: {RelationData(dst=dst, path=None) for dst in dsts} for src, dsts in self.cache["revision_before_revision"].items() } while not self.storage.relation_add( RelationType.REV_BEFORE_REV, rev_before_rev ): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={ "method": "flush_origin_revision_retry_revision_before_revision" }, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.REV_BEFORE_REV, ) # Heads (ie. revision-in-origin entries) should be inserted once flat models for # their histories were already added. This is to guarantee consistent results if # something needs to be reprocessed due to a failure: already inserted heads # won't get reprocessed in such a case. if self.cache["revision_in_origin"]: rev_in_org: Dict[Sha1Git, Set[RelationData]] = {} for src, dst in self.cache["revision_in_origin"]: rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None)) while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_origin_revision_retry_revision_in_origin"}, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.REV_IN_ORG, ) @statsd.timed( metric=BACKEND_DURATION_METRIC, tags={"method": "flush_revision_content"} ) def flush_revision_content_layer(self) -> None: # Register in the storage all entities, to ensure the coming relations can # properly resolve any internal reference if needed. Content and directory # entries may safely be registered with their associated dates. In contrast, # revision entries should be registered without date, as it is used to # acknowledge that the flushing was successful. Also, directories are # registered with their flatten flag not set. cnt_dates = { sha1: date for sha1, date in self.cache["content"]["data"].items() if sha1 in self.cache["content"]["added"] and date is not None } if cnt_dates: while not self.storage.content_add(cnt_dates): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_content_date"}, ) LOGGER.warning( "Unable to write content dates to the storage. Retrying..." ) dir_dates = { sha1: DirectoryData(date=date, flat=False) for sha1, date in self.cache["directory"]["data"].items() if sha1 in self.cache["directory"]["added"] and date is not None } if dir_dates: while not self.storage.directory_add(dir_dates): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_directory_date"}, ) LOGGER.warning( "Unable to write directory dates to the storage. Retrying..." ) - - revs = { - sha1: RevisionData(date=None, origin=None) - for sha1, date in self.cache["revision"]["data"].items() - if sha1 in self.cache["revision"]["added"] and date is not None - } - if revs: - while not self.storage.revision_add(revs): - statsd.increment( - metric=BACKEND_OPERATIONS_METRIC, - tags={"method": "flush_revision_content_retry_revision_none"}, - ) - LOGGER.warning( - "Unable to write revision entities to the storage. Retrying..." - ) - paths = { hashlib.sha1(path).digest(): path for _, _, path in self.cache["content_in_revision"] | self.cache["content_in_directory"] | self.cache["directory_in_revision"] } if paths: while not self.storage.location_add(paths): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_location"}, ) LOGGER.warning( "Unable to write locations entities to the storage. Retrying..." ) # For this layer, relations need to be inserted first so that, in case of # failure, reprocessing the input does not generated an inconsistent database. if self.cache["content_in_revision"]: cnt_in_rev: Dict[Sha1Git, Set[RelationData]] = {} for src, dst, path in self.cache["content_in_revision"]: cnt_in_rev.setdefault(src, set()).add(RelationData(dst=dst, path=path)) while not self.storage.relation_add( RelationType.CNT_EARLY_IN_REV, cnt_in_rev ): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_content_in_revision"}, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.CNT_EARLY_IN_REV, ) if self.cache["content_in_directory"]: cnt_in_dir: Dict[Sha1Git, Set[RelationData]] = {} for src, dst, path in self.cache["content_in_directory"]: cnt_in_dir.setdefault(src, set()).add(RelationData(dst=dst, path=path)) while not self.storage.relation_add(RelationType.CNT_IN_DIR, cnt_in_dir): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={ "method": "flush_revision_content_retry_content_in_directory" }, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.CNT_IN_DIR, ) if self.cache["directory_in_revision"]: dir_in_rev: Dict[Sha1Git, Set[RelationData]] = {} for src, dst, path in self.cache["directory_in_revision"]: dir_in_rev.setdefault(src, set()).add(RelationData(dst=dst, path=path)) while not self.storage.relation_add(RelationType.DIR_IN_REV, dir_in_rev): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={ "method": "flush_revision_content_retry_directory_in_revision" }, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.DIR_IN_REV, ) # After relations, flatten flags for directories can be safely set (if # applicable) acknowledging those directories that have already be flattened. # Similarly, dates for the revisions are set to acknowledge that these revisions # won't need to be reprocessed in case of failure. dir_acks = { sha1: DirectoryData( date=date, flat=self.cache["directory_flatten"].get(sha1) or False ) for sha1, date in self.cache["directory"]["data"].items() if self.cache["directory_flatten"].get(sha1) and date is not None } if dir_acks: while not self.storage.directory_add(dir_acks): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_directory_ack"}, ) LOGGER.warning( "Unable to write directory dates to the storage. Retrying..." ) rev_dates = { sha1: RevisionData(date=date, origin=None) for sha1, date in self.cache["revision"]["data"].items() if sha1 in self.cache["revision"]["added"] and date is not None } if rev_dates: while not self.storage.revision_add(rev_dates): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_revision_date"}, ) LOGGER.warning( "Unable to write revision dates to the storage. Retrying..." ) def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_directory"].add( (blob.id, directory.id, path_normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_revision"].add( (blob.id, revision.id, path_normalize(os.path.join(prefix, blob.name))) ) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: self.cache["directory_in_revision"].add( (directory.id, revision.id, path_normalize(path)) ) def directory_already_flattened(self, directory: DirectoryEntry) -> Optional[bool]: cache = self.cache["directory_flatten"] if directory.id not in cache: cache.setdefault(directory.id, None) ret = self.storage.directory_get([directory.id]) if directory.id in ret: dir = ret[directory.id] cache[directory.id] = dir.flat # date is kept to ensure we have it available when flushing self.cache["directory"]["data"][directory.id] = dir.date return cache.get(directory.id) def directory_flag_as_flattened(self, directory: DirectoryEntry) -> None: self.cache["directory_flatten"][directory.id] = True def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( self, entity: Literal["content", "directory", "revision"], ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: if entity == "content": cache["data"].update(self.storage.content_get(missing_ids)) elif entity == "directory": cache["data"].update( { id: dir.date for id, dir in self.storage.directory_get(missing_ids).items() } ) elif entity == "revision": cache["data"].update( { id: rev.date for id, rev in self.storage.revision_get(missing_ids).items() } ) dates: Dict[Sha1Git, datetime] = {} for sha1 in ids: date = cache["data"].setdefault(sha1, None) if date is not None: dates[sha1] = date return dates def open(self) -> None: self.storage.open() def origin_add(self, origin: OriginEntry) -> None: self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) def revision_add(self, revision: RevisionEntry) -> None: self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, head_id: Sha1Git, revision_id: Sha1Git ) -> None: self.cache["revision_before_revision"].setdefault(revision_id, set()).add( head_id ) def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_is_head(self, revision: RevisionEntry) -> bool: return bool(self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id])) def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin(self, revision_id: Sha1Git) -> Optional[Sha1Git]: cache = self.cache["revision_origin"]["data"] if revision_id not in cache: ret = self.storage.revision_get([revision_id]) if revision_id in ret: origin = ret[revision_id].origin if origin is not None: cache[revision_id] = origin return cache.get(revision_id) def revision_set_preferred_origin( self, origin: OriginEntry, revision_id: Sha1Git ) -> None: self.cache["revision_origin"]["data"][revision_id] = origin.id self.cache["revision_origin"]["added"].add(revision_id) diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql index 3e3a4fc..b0cbab3 100644 --- a/swh/provenance/sql/30-schema.sql +++ b/swh/provenance/sql/30-schema.sql @@ -1,156 +1,156 @@ -- psql variables to get the current database flavor select position('denormalized' in swh_get_dbflavor()::text) = 0 as dbflavor_norm \gset -- a Git object ID, i.e., a Git-style salted SHA1 checksum create domain sha1_git as bytea check (length(value) = 20); -- UNIX path (absolute, relative, individual path component, etc.) create domain unix_path as bytea; -- relation filter options for querying create type rel_flt as enum ( 'filter-src', 'filter-dst', 'no-filter' ); comment on type rel_flt is 'Relation get filter types'; -- entity tables create table content ( id bigserial primary key, -- internal identifier of the content blob sha1 sha1_git unique not null, -- intrinsic identifier of the content blob - date timestamptz not null -- timestamp of the revision where the blob appears early + date timestamptz -- timestamp of the revision where the blob appears early ); comment on column content.id is 'Content internal identifier'; comment on column content.sha1 is 'Content intrinsic identifier'; comment on column content.date is 'Earliest timestamp for the content (first seen time)'; create table directory ( id bigserial primary key, -- internal identifier of the directory appearing in an isochrone inner frontier sha1 sha1_git unique not null, -- intrinsic identifier of the directory - date timestamptz not null, -- max timestamp among those of the directory children's + date timestamptz, -- max timestamp among those of the directory children's flat boolean not null default false -- flag acknowledging if the directory is flattenned in the model ); comment on column directory.id is 'Directory internal identifier'; comment on column directory.sha1 is 'Directory intrinsic identifier'; comment on column directory.date is 'Latest timestamp for the content in the directory'; create table revision ( id bigserial primary key, -- internal identifier of the revision sha1 sha1_git unique not null, -- intrinsic identifier of the revision date timestamptz, -- timestamp of the revision origin bigint -- id of the preferred origin -- foreign key (origin) references origin (id) ); comment on column revision.id is 'Revision internal identifier'; comment on column revision.sha1 is 'Revision intrinsic identifier'; comment on column revision.date is 'Revision timestamp'; comment on column revision.origin is 'preferred origin for the revision'; create table location ( id bigserial primary key, -- internal identifier of the location path unix_path -- path to the location ); comment on column location.id is 'Location internal identifier'; comment on column location.path is 'Path to the location'; create table origin ( id bigserial primary key, -- internal identifier of the origin sha1 sha1_git unique not null, -- intrinsic identifier of the origin url text -- url of the origin ); comment on column origin.id is 'Origin internal identifier'; comment on column origin.sha1 is 'Origin intrinsic identifier'; comment on column origin.url is 'URL of the origin'; -- relation tables create table content_in_revision ( content bigint not null, -- internal identifier of the content blob \if :dbflavor_norm revision bigint not null, -- internal identifier of the revision where the blob appears for the first time location bigint -- location of the content relative to the revision's root directory \else revision bigint[], -- internal identifiers of the revisions where the blob appears for the first time location bigint[] -- locations of the content relative to the revisions' root directory \endif -- foreign key (content) references content (id), -- foreign key (revision) references revision (id), -- foreign key (location) references location (id) ); comment on column content_in_revision.content is 'Content internal identifier'; \if :dbflavor_norm comment on column content_in_revision.revision is 'Revision internal identifier'; comment on column content_in_revision.location is 'Location of content in revision'; \else comment on column content_in_revision.revision is 'Revision/location internal identifiers'; \endif create table content_in_directory ( content bigint not null, -- internal identifier of the content blob \if :dbflavor_norm directory bigint not null, -- internal identifier of the directory containing the blob location bigint -- location of the content relative to its parent directory in the isochrone frontier \else directory bigint[], -- internal reference of the directories containing the blob location bigint[] -- locations of the content relative to its parent directories in the isochrone frontier \endif -- foreign key (content) references content (id), -- foreign key (directory) references directory (id), -- foreign key (location) references location (id) ); comment on column content_in_directory.content is 'Content internal identifier'; \if :dbflavor_norm comment on column content_in_directory.directory is 'Directory internal identifier'; comment on column content_in_directory.location is 'Location of content in directory'; \else comment on column content_in_directory.directory is 'Directory/location internal identifiers'; \endif create table directory_in_revision ( directory bigint not null, -- internal identifier of the directory appearing in the revision \if :dbflavor_norm revision bigint not null, -- internal identifier of the revision containing the directory location bigint -- location of the directory relative to the revision's root directory \else revision bigint[], -- internal identifiers of the revisions containing the directory location bigint[] -- locations of the directory relative to the revisions' root directory \endif -- foreign key (directory) references directory (id), -- foreign key (revision) references revision (id), -- foreign key (location) references location (id) ); comment on column directory_in_revision.directory is 'Directory internal identifier'; \if :dbflavor_norm comment on column directory_in_revision.revision is 'Revision internal identifier'; comment on column directory_in_revision.location is 'Location of content in revision'; \else comment on column directory_in_revision.revision is 'Revision/location internal identifiers'; \endif create table revision_in_origin ( revision bigint not null, -- internal identifier of the revision poined by the origin origin bigint not null -- internal identifier of the origin that points to the revision -- foreign key (revision) references revision (id), -- foreign key (origin) references origin (id) ); comment on column revision_in_origin.revision is 'Revision internal identifier'; comment on column revision_in_origin.origin is 'Origin internal identifier'; create table revision_before_revision ( prev bigserial not null, -- internal identifier of the source revision next bigserial not null -- internal identifier of the destination revision -- foreign key (prev) references revision (id), -- foreign key (next) references revision (id) ); comment on column revision_before_revision.prev is 'Source revision internal identifier'; comment on column revision_before_revision.next is 'Destination revision internal identifier'; diff --git a/swh/provenance/sql/40-funcs.sql b/swh/provenance/sql/40-funcs.sql index d23eefa..a401416 100644 --- a/swh/provenance/sql/40-funcs.sql +++ b/swh/provenance/sql/40-funcs.sql @@ -1,392 +1,436 @@ -- psql variables to get the current database flavor select position('denormalized' in swh_get_dbflavor()::text) = 0 as dbflavor_norm \gset create or replace function swh_mktemp_relation_add() returns void language sql as $$ create temp table tmp_relation_add ( src sha1_git not null, dst sha1_git not null, path unix_path ) on commit drop $$; \if :dbflavor_norm -- -- normalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from content as C inner join content_in_revision as CR on (CR.content = C.id) inner join location as L on (L.id = CR.location) inner join revision as R on (R.id = CR.revision) left join origin as O on (O.id = R.origin) where C.sha1 = content_id order by date, revision, origin, path asc limit 1 $$; create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ (select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from content as C inner join content_in_revision as CR on (CR.content = C.id) inner join location as L on (L.id = CR.location) inner join revision as R on (R.id = CR.revision) left join origin as O on (O.id = R.origin) where C.sha1 = content_id) union (select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, case DL.path when '' then CL.path when '.' then CL.path else (DL.path || '/' || CL.path)::unix_path end as path from content as C inner join content_in_directory as CD on (CD.content = C.id) inner join directory_in_revision as DR on (DR.directory = CD.directory) inner join revision as R on (R.id = DR.revision) inner join location as CL on (CL.id = CD.location) inner join location as DL on (DL.id = DR.location) left join origin as O on (O.id = R.origin) where C.sha1 = content_id) order by date, revision, origin, path limit early_cut $$; create or replace function swh_provenance_relation_add_from_temp( rel_table regclass, src_table regclass, dst_table regclass ) returns void language plpgsql volatile as $$ declare select_fields text; join_location text; begin if src_table in ('content'::regclass, 'directory'::regclass) then select_fields := 'D.id, L.id'; join_location := 'inner join location as L on (digest(L.path,''sha1'') = digest(V.path,''sha1''))'; else select_fields := 'D.id'; join_location := ''; end if; + execute format( + 'insert into %s (sha1) + select src + from tmp_relation_add + on conflict do nothing', + src_table); + + execute format( + 'insert into %s (sha1) + select dst + from tmp_relation_add + on conflict do nothing', + dst_table); + + if src_table in ('content'::regclass, 'directory'::regclass) then + insert into location(path) + select distinct path + from tmp_relation_add + on conflict do nothing; + end if; + execute format( 'insert into %s select S.id, ' || select_fields || ' from tmp_relation_add as V inner join %s as S on (S.sha1 = V.src) inner join %s as D on (D.sha1 = V.dst) ' || join_location || ' on conflict do nothing', rel_table, src_table, dst_table ); end; $$; create or replace function swh_provenance_relation_get( rel_table regclass, src_table regclass, dst_table regclass, filter rel_flt, sha1s sha1_git[] ) returns table ( src sha1_git, dst sha1_git, path unix_path ) language plpgsql stable as $$ declare src_field text; dst_field text; join_location text; proj_location text; filter_result text; begin if rel_table = 'revision_before_revision'::regclass then src_field := 'prev'; dst_field := 'next'; else src_field := src_table::text; dst_field := dst_table::text; end if; if src_table in ('content'::regclass, 'directory'::regclass) then join_location := 'inner join location as L on (L.id = R.location)'; proj_location := 'L.path'; else join_location := ''; proj_location := 'NULL::unix_path'; end if; case filter when 'filter-src'::rel_flt then filter_result := 'where S.sha1 = any($1)'; when 'filter-dst'::rel_flt then filter_result := 'where D.sha1 = any($1)'; else filter_result := ''; end case; return query execute format( 'select S.sha1 as src, D.sha1 as dst, ' || proj_location || ' as path from %s as R inner join %s as S on (S.id = R.' || src_field || ') inner join %s as D on (D.id = R.' || dst_field || ') ' || join_location || ' ' || filter_result, rel_table, src_table, dst_table ) using sha1s; end; $$; -- :dbflavor_norm \else -- -- denormalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ select CL.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from ( select C.sha1 as sha1, unnest(CR.revision) as revision, unnest(CR.location) as location from content_in_revision as CR inner join content as C on (C.id = CR.content) where C.sha1 = content_id ) as CL inner join revision as R on (R.id = CL.revision) inner join location as L on (L.id = CL.location) left join origin as O on (O.id = R.origin) order by date, revision, origin, path asc limit 1 $$; create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ (with cntrev as ( select C.sha1 as sha1, unnest(CR.revision) as revision, unnest(CR.location) as location from content_in_revision as CR inner join content as C on (C.id = CR.content) where C.sha1 = content_id) select CR.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from cntrev as CR inner join revision as R on (R.id = CR.revision) inner join location as L on (L.id = CR.location) left join origin as O on (O.id = R.origin)) union (with cntdir as ( select C.sha1 as sha1, unnest(CD.directory) as directory, unnest(CD.location) as location from content as C inner join content_in_directory as CD on (CD.content = C.id) where C.sha1 = content_id), cntrev as ( select CD.sha1 as sha1, L.path as path, unnest(DR.revision) as revision, unnest(DR.location) as location from cntdir as CD inner join directory_in_revision as DR on (DR.directory = CD.directory) inner join location as L on (L.id = CD.location)) select CR.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, case DL.path when '' then CR.path when '.' then CR.path else (DL.path || '/' || CR.path)::unix_path end as path from cntrev as CR inner join revision as R on (R.id = CR.revision) inner join location as DL on (DL.id = CR.location) left join origin as O on (O.id = R.origin)) order by date, revision, origin, path limit early_cut $$; create or replace function swh_provenance_relation_add_from_temp( rel_table regclass, src_table regclass, dst_table regclass ) returns void language plpgsql volatile as $$ declare select_fields text; join_location text; group_entries text; on_conflict text; begin + + execute format( + 'insert into %s (sha1) + select src + from tmp_relation_add + on conflict do nothing', + src_table); + + execute format( + 'insert into %s (sha1) + select dst + from tmp_relation_add + on conflict do nothing', + dst_table); + + if src_table in ('content'::regclass, 'directory'::regclass) then + insert into location(path) + select distinct path + from tmp_relation_add + on conflict do nothing; + end if; + + if src_table in ('content'::regclass, 'directory'::regclass) then select_fields := 'array_agg(D.id), array_agg(L.id)'; join_location := 'inner join location as L on (digest(L.path,''sha1'') = digest(V.path,''sha1''))'; group_entries := 'group by S.id'; on_conflict := format(' (%s) do update set (%s, location) = ( with pairs as ( select distinct * from unnest( %s.' || dst_table::text || ' || excluded.' || dst_table::text || ', %s.location || excluded.location ) as pair(dst, loc) ) select array(select pairs.dst from pairs), array(select pairs.loc from pairs) )', src_table, dst_table, rel_table, rel_table, rel_table, rel_table ); else select_fields := 'D.id'; join_location := ''; group_entries := ''; on_conflict := 'do nothing'; end if; execute format( 'insert into %s select S.id, ' || select_fields || ' from tmp_relation_add as V inner join %s as S on (S.sha1 = V.src) inner join %s as D on (D.sha1 = V.dst) ' || join_location || ' ' || group_entries || ' on conflict ' || on_conflict, rel_table, src_table, dst_table ); end; $$; create or replace function swh_provenance_relation_get( rel_table regclass, src_table regclass, dst_table regclass, filter rel_flt, sha1s sha1_git[] ) returns table ( src sha1_git, dst sha1_git, path unix_path ) language plpgsql stable as $$ declare src_field text; dst_field text; proj_dst_id text; proj_unnested text; proj_location text; join_location text; filter_inner_result text; filter_outer_result text; begin if rel_table = 'revision_before_revision'::regclass then src_field := 'prev'; dst_field := 'next'; else src_field := src_table::text; dst_field := dst_table::text; end if; if src_table in ('content'::regclass, 'directory'::regclass) then proj_unnested := 'unnest(R.' || dst_field || ') as dst, unnest(R.location) as loc'; proj_dst_id := 'CL.dst'; join_location := 'inner join location as L on (L.id = CL.loc)'; proj_location := 'L.path'; else proj_unnested := 'R.' || dst_field || ' as dst'; proj_dst_id := 'CL.dst'; join_location := ''; proj_location := 'NULL::unix_path'; end if; case filter when 'filter-src'::rel_flt then filter_inner_result := 'where S.sha1 = any($1)'; filter_outer_result := ''; when 'filter-dst'::rel_flt then filter_inner_result := ''; filter_outer_result := 'where D.sha1 = any($1)'; else filter_inner_result := ''; filter_outer_result := ''; end case; return query execute format( 'select CL.src, D.sha1 as dst, ' || proj_location || ' as path from (select S.sha1 as src, ' || proj_unnested || ' from %s as R inner join %s as S on (S.id = R.' || src_field || ') ' || filter_inner_result || ') as CL inner join %s as D on (D.id = ' || proj_dst_id || ') ' || join_location || ' ' || filter_outer_result, rel_table, src_table, dst_table ) using sha1s; end; $$; \endif -- :dbflavor_norm diff --git a/swh/provenance/sql/upgrades/004.sql b/swh/provenance/sql/upgrades/004.sql index bf1b62f..fc12c78 100644 --- a/swh/provenance/sql/upgrades/004.sql +++ b/swh/provenance/sql/upgrades/004.sql @@ -1,26 +1,29 @@ -- SWH provenance DB schema upgrade -- from_version: 3 -- to_version: 4 -- description: rename db flavor (without-path heving been removed) -- will fail if the db is using a without-path flavor. alter type database_flavor rename value 'with-path' to 'normalized'; alter type database_flavor rename value 'with-path-denormalized' to 'denormalized'; alter type database_flavor rename to database_flavor_old; create type database_flavor as enum ( 'normalized', 'denormalized' ); comment on type database_flavor is 'Flavor of the current database'; drop function swh_get_dbflavor; alter table dbflavor alter column flavor type database_flavor using flavor::text::database_flavor; create function swh_get_dbflavor() returns database_flavor language sql stable as $$ select coalesce((select flavor from dbflavor), 'normalized'); $$; drop type database_flavor_old; + +alter table content alter column date drop not null; +alter table directory alter column date drop not null;