diff --git a/docs/index.rst b/docs/index.rst index 89000c1..280cf60 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,143 +1,131 @@ .. _swh-provenance: Software Heritage Provenance ============================ A provenance index database based on the Software Heritage Archive. This is an implementation of the paper `Software Provenance Tracking at the Scale of Public Source Code`_ published in `Empirical Software Engineering`_ This provenance index database is a tool to help answering the question "where does this source code artifact come from?", which the main Software Heritage Archive cannot easily solve. Quick Start ----------- Database creation ~~~~~~~~~~~~~~~~~ Create a provenance index database (in this example we use pifpaf_ to easily set up a test Postgresql database. Adapt the example below to your Postgresql setup): .. code-block:: shell eval $(pifpaf run postgresql) swh db create -d provdb provenance swh db init-admin -d provdb provenance swh db init -d provdb provenance -The provenance index DB comes in 2 feature flags, so there are 4 possible flavors. Feature flags are: - -- ``with-path`` / ``without-path``: whether the provenance index database will store file path, -- ``normalized`` / ``denormalized``: whether or not the main relation tables are normalized (see below). - -So the possible flavors are: - -- ``with-path`` -- ``without-path`` -- ``with-path-denormalized`` -- ``without-path-denormalized`` - Filling the provenance index database ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This step requires an access to the Software Heritage Archive to retrieve the actual data from the Archive. It currently also needs an input CSV file of revisions and origins to insert in the provenance database. Examples of such files are available in the `provenance public dataset`_. .. _`provenance public dataset`: https://annex.softwareheritage.org/public/dataset/provenance .. code-block:: shell wget https://annex.softwareheritage.org/public/dataset/provenance/sample_10k.csv.bz2 bunzip2 sample_10k.csv.bz2 You need a configuration file, like: .. code-block:: yaml # config.yaml provenance: storage: cls: postgresql db: host=/tmp/tmpifn2ov_j port=9824 dbname=provdb archive: cls: api storage: cls: remote url: http://storage:5002/ Note that you need access to the internal API of a :ref:`swh-storage ` instance (here the machine named ``storage``) for this. Then you can feed the provenance index database using: .. code-block:: shell swh provenance -C config.yaml iter-revisions sample_10k.csv This may take a while to complete. Querying the provenance index database ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Using the same config file, you may look for the first known occurrence of a file content: .. code-block:: shell swh provenance -C config.yaml find-first 8a54694c92c944fcb06d73c17743ac72444a5b72 swh:1:cnt:8a54694c92c944fcb06d73c17743ac72444a5b72, swh:1:rev:6193fae0668d082d90207f6c9f33d6e8c98dd04a, 2008-10-06 18:32:23+00:00, None, lua/effects/bloodstream/init.lua Or all the known occurrences: .. code-block:: shell swh provenance -C config.yaml find-all 8a54694c92c944fcb06d73c17743ac72444a5b72 swh:1:cnt:8a54694c92c944fcb06d73c17743ac72444a5b72, swh:1:rev:6193fae0668d082d90207f6c9f33d6e8c98dd04a, 2008-10-06 18:32:23+00:00, None, lua/effects/bloodstream/init.lua swh:1:cnt:8a54694c92c944fcb06d73c17743ac72444a5b72, swh:1:rev:f0a5078eed8808323b93ed09cddb003dbe2a85e4, 2008-10-06 18:32:23+00:00, None, trunk/lua/effects/bloodstream/init.lua [...] (De)normalized database ----------------------- For some relation tables (like the ``content_in_revision`` storing, for each content object, in which revision it has been found), the default data schema is to store one row for each relation. For a big database, this can have a significant cost in terms of storage. So it is possible to store these relations using an array as destination column (the ``revision`` column in the case of the ``content_in_revisison`` table). This can drastically reduce the database storage size, possibly at the price of a slight performance hit. Warning: the denormalized version of the database is still under test and validation. Do not use for serious work. .. _`Empirical Software Engineering`: http://link.springer.com/journal/10664 .. _`Software Provenance Tracking at the Scale of Public Source Code`: http://dx.doi.org/10.1007/s10664-020-09828-5 .. _pifpaf: https://github.com/jd/pifpaf .. toctree:: :maxdepth: 2 :caption: Contents: Indices and tables ================== * :ref:`genindex` * :ref:`modindex` * :ref:`search` diff --git a/swh/provenance/sql/15-flavor.sql b/swh/provenance/sql/15-flavor.sql deleted file mode 100644 index e0ced6c..0000000 --- a/swh/provenance/sql/15-flavor.sql +++ /dev/null @@ -1,21 +0,0 @@ --- database flavor -create type database_flavor as enum ( - 'normalized', - 'denormalized' -); -comment on type database_flavor is 'Flavor of the current database'; - -create table dbflavor ( - flavor database_flavor, - single_row char(1) primary key default 'x', - check (single_row = 'x') -); -comment on table dbflavor is 'Database flavor storage'; -comment on column dbflavor.flavor is 'Database flavor currently deployed'; -comment on column dbflavor.single_row is 'Bogus column to force the table to have a single row'; - -create or replace function swh_get_dbflavor() returns database_flavor language sql stable as $$ - select coalesce((select flavor from dbflavor), 'normalized'); -$$; - -comment on function swh_get_dbflavor is 'Get the flavor of the database currently deployed'; diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql index b0cbab3..949b6f6 100644 --- a/swh/provenance/sql/30-schema.sql +++ b/swh/provenance/sql/30-schema.sql @@ -1,156 +1,126 @@ --- psql variables to get the current database flavor -select position('denormalized' in swh_get_dbflavor()::text) = 0 as dbflavor_norm \gset - -- a Git object ID, i.e., a Git-style salted SHA1 checksum create domain sha1_git as bytea check (length(value) = 20); -- UNIX path (absolute, relative, individual path component, etc.) create domain unix_path as bytea; -- relation filter options for querying create type rel_flt as enum ( 'filter-src', 'filter-dst', 'no-filter' ); comment on type rel_flt is 'Relation get filter types'; -- entity tables create table content ( id bigserial primary key, -- internal identifier of the content blob sha1 sha1_git unique not null, -- intrinsic identifier of the content blob date timestamptz -- timestamp of the revision where the blob appears early ); comment on column content.id is 'Content internal identifier'; comment on column content.sha1 is 'Content intrinsic identifier'; comment on column content.date is 'Earliest timestamp for the content (first seen time)'; create table directory ( id bigserial primary key, -- internal identifier of the directory appearing in an isochrone inner frontier sha1 sha1_git unique not null, -- intrinsic identifier of the directory date timestamptz, -- max timestamp among those of the directory children's flat boolean not null default false -- flag acknowledging if the directory is flattenned in the model ); comment on column directory.id is 'Directory internal identifier'; comment on column directory.sha1 is 'Directory intrinsic identifier'; comment on column directory.date is 'Latest timestamp for the content in the directory'; create table revision ( id bigserial primary key, -- internal identifier of the revision sha1 sha1_git unique not null, -- intrinsic identifier of the revision date timestamptz, -- timestamp of the revision origin bigint -- id of the preferred origin -- foreign key (origin) references origin (id) ); comment on column revision.id is 'Revision internal identifier'; comment on column revision.sha1 is 'Revision intrinsic identifier'; comment on column revision.date is 'Revision timestamp'; comment on column revision.origin is 'preferred origin for the revision'; create table location ( id bigserial primary key, -- internal identifier of the location path unix_path -- path to the location ); comment on column location.id is 'Location internal identifier'; comment on column location.path is 'Path to the location'; create table origin ( id bigserial primary key, -- internal identifier of the origin sha1 sha1_git unique not null, -- intrinsic identifier of the origin url text -- url of the origin ); comment on column origin.id is 'Origin internal identifier'; comment on column origin.sha1 is 'Origin intrinsic identifier'; comment on column origin.url is 'URL of the origin'; -- relation tables create table content_in_revision ( content bigint not null, -- internal identifier of the content blob -\if :dbflavor_norm revision bigint not null, -- internal identifier of the revision where the blob appears for the first time location bigint -- location of the content relative to the revision's root directory -\else - revision bigint[], -- internal identifiers of the revisions where the blob appears for the first time - location bigint[] -- locations of the content relative to the revisions' root directory -\endif -- foreign key (content) references content (id), -- foreign key (revision) references revision (id), -- foreign key (location) references location (id) ); comment on column content_in_revision.content is 'Content internal identifier'; -\if :dbflavor_norm comment on column content_in_revision.revision is 'Revision internal identifier'; comment on column content_in_revision.location is 'Location of content in revision'; -\else -comment on column content_in_revision.revision is 'Revision/location internal identifiers'; -\endif create table content_in_directory ( content bigint not null, -- internal identifier of the content blob -\if :dbflavor_norm directory bigint not null, -- internal identifier of the directory containing the blob location bigint -- location of the content relative to its parent directory in the isochrone frontier -\else - directory bigint[], -- internal reference of the directories containing the blob - location bigint[] -- locations of the content relative to its parent directories in the isochrone frontier -\endif -- foreign key (content) references content (id), -- foreign key (directory) references directory (id), -- foreign key (location) references location (id) ); comment on column content_in_directory.content is 'Content internal identifier'; -\if :dbflavor_norm comment on column content_in_directory.directory is 'Directory internal identifier'; comment on column content_in_directory.location is 'Location of content in directory'; -\else -comment on column content_in_directory.directory is 'Directory/location internal identifiers'; -\endif create table directory_in_revision ( directory bigint not null, -- internal identifier of the directory appearing in the revision -\if :dbflavor_norm revision bigint not null, -- internal identifier of the revision containing the directory location bigint -- location of the directory relative to the revision's root directory -\else - revision bigint[], -- internal identifiers of the revisions containing the directory - location bigint[] -- locations of the directory relative to the revisions' root directory -\endif -- foreign key (directory) references directory (id), -- foreign key (revision) references revision (id), -- foreign key (location) references location (id) ); comment on column directory_in_revision.directory is 'Directory internal identifier'; -\if :dbflavor_norm comment on column directory_in_revision.revision is 'Revision internal identifier'; comment on column directory_in_revision.location is 'Location of content in revision'; -\else -comment on column directory_in_revision.revision is 'Revision/location internal identifiers'; -\endif create table revision_in_origin ( revision bigint not null, -- internal identifier of the revision poined by the origin origin bigint not null -- internal identifier of the origin that points to the revision -- foreign key (revision) references revision (id), -- foreign key (origin) references origin (id) ); comment on column revision_in_origin.revision is 'Revision internal identifier'; comment on column revision_in_origin.origin is 'Origin internal identifier'; create table revision_before_revision ( prev bigserial not null, -- internal identifier of the source revision next bigserial not null -- internal identifier of the destination revision -- foreign key (prev) references revision (id), -- foreign key (next) references revision (id) ); comment on column revision_before_revision.prev is 'Source revision internal identifier'; comment on column revision_before_revision.next is 'Destination revision internal identifier'; diff --git a/swh/provenance/sql/40-funcs.sql b/swh/provenance/sql/40-funcs.sql index 2bdd6d5..30bd997 100644 --- a/swh/provenance/sql/40-funcs.sql +++ b/swh/provenance/sql/40-funcs.sql @@ -1,445 +1,191 @@ --- psql variables to get the current database flavor -select position('denormalized' in swh_get_dbflavor()::text) = 0 as dbflavor_norm \gset - create or replace function swh_mktemp_relation_add() returns void language sql as $$ create temp table tmp_relation_add ( src sha1_git not null, dst sha1_git not null, path unix_path ) on commit drop $$; -\if :dbflavor_norm - --- --- normalized --- - create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from content as C inner join content_in_revision as CR on (CR.content = C.id) inner join location as L on (L.id = CR.location) inner join revision as R on (R.id = CR.revision) left join origin as O on (O.id = R.origin) where C.sha1 = content_id order by date, revision, origin, path asc limit 1 $$; create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin text, path unix_path ) language sql stable as $$ (select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from content as C inner join content_in_revision as CR on (CR.content = C.id) inner join location as L on (L.id = CR.location) inner join revision as R on (R.id = CR.revision) left join origin as O on (O.id = R.origin) where C.sha1 = content_id) union (select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, case DL.path when '' then CL.path when '.' then CL.path else (DL.path || '/' || CL.path)::unix_path end as path from content as C inner join content_in_directory as CD on (CD.content = C.id) inner join directory_in_revision as DR on (DR.directory = CD.directory) inner join revision as R on (R.id = DR.revision) inner join location as CL on (CL.id = CD.location) inner join location as DL on (DL.id = DR.location) left join origin as O on (O.id = R.origin) where C.sha1 = content_id) order by date, revision, origin, path limit early_cut $$; create or replace function swh_provenance_relation_add_from_temp( rel_table regclass, src_table regclass, dst_table regclass ) returns void language plpgsql volatile as $$ declare select_fields text; join_location text; begin if src_table in ('content'::regclass, 'directory'::regclass) then select_fields := 'D.id, L.id'; join_location := 'inner join location as L on (digest(L.path,''sha1'') = digest(V.path,''sha1''))'; else select_fields := 'D.id'; join_location := ''; end if; execute format( 'insert into %s (sha1) select distinct src from tmp_relation_add where not exists (select 1 from %s where %s.sha1=tmp_relation_add.src) on conflict do nothing', src_table, src_table, src_table); execute format( 'insert into %s (sha1) select distinct dst from tmp_relation_add where not exists (select 1 from %s where %s.sha1=tmp_relation_add.dst) on conflict do nothing', dst_table, dst_table, dst_table); if src_table in ('content'::regclass, 'directory'::regclass) then insert into location(path) select distinct path from tmp_relation_add where not exists (select 1 from location where digest(location.path,'sha1')=digest(tmp_relation_add.path,'sha1') ) on conflict do nothing; end if; execute format( 'insert into %s select S.id, ' || select_fields || ' from tmp_relation_add as V inner join %s as S on (S.sha1 = V.src) inner join %s as D on (D.sha1 = V.dst) ' || join_location || ' on conflict do nothing', rel_table, src_table, dst_table ); end; $$; create or replace function swh_provenance_relation_get( rel_table regclass, src_table regclass, dst_table regclass, filter rel_flt, sha1s sha1_git[] ) returns table ( src sha1_git, dst sha1_git, path unix_path ) language plpgsql stable as $$ declare src_field text; dst_field text; join_location text; proj_location text; filter_result text; begin if rel_table = 'revision_before_revision'::regclass then src_field := 'prev'; dst_field := 'next'; else src_field := src_table::text; dst_field := dst_table::text; end if; if src_table in ('content'::regclass, 'directory'::regclass) then join_location := 'inner join location as L on (L.id = R.location)'; proj_location := 'L.path'; else join_location := ''; proj_location := 'NULL::unix_path'; end if; case filter when 'filter-src'::rel_flt then filter_result := 'where S.sha1 = any($1)'; when 'filter-dst'::rel_flt then filter_result := 'where D.sha1 = any($1)'; else filter_result := ''; end case; return query execute format( 'select S.sha1 as src, D.sha1 as dst, ' || proj_location || ' as path from %s as R inner join %s as S on (S.id = R.' || src_field || ') inner join %s as D on (D.id = R.' || dst_field || ') ' || join_location || ' ' || filter_result, rel_table, src_table, dst_table ) using sha1s; end; $$; --- :dbflavor_norm -\else - --- --- denormalized --- - -create or replace function swh_provenance_content_find_first(content_id sha1_git) - returns table ( - content sha1_git, - revision sha1_git, - date timestamptz, - origin text, - path unix_path - ) - language sql - stable -as $$ - select CL.sha1 as content, - R.sha1 as revision, - R.date as date, - O.url as origin, - L.path as path - from ( - select C.sha1 as sha1, - unnest(CR.revision) as revision, - unnest(CR.location) as location - from content_in_revision as CR - inner join content as C on (C.id = CR.content) - where C.sha1 = content_id - ) as CL - inner join revision as R on (R.id = CL.revision) - inner join location as L on (L.id = CL.location) - left join origin as O on (O.id = R.origin) - order by date, revision, origin, path asc limit 1 -$$; - -create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) - returns table ( - content sha1_git, - revision sha1_git, - date timestamptz, - origin text, - path unix_path - ) - language sql - stable -as $$ - (with - cntrev as ( - select C.sha1 as sha1, - unnest(CR.revision) as revision, - unnest(CR.location) as location - from content_in_revision as CR - inner join content as C on (C.id = CR.content) - where C.sha1 = content_id) - select CR.sha1 as content, - R.sha1 as revision, - R.date as date, - O.url as origin, - L.path as path - from cntrev as CR - inner join revision as R on (R.id = CR.revision) - inner join location as L on (L.id = CR.location) - left join origin as O on (O.id = R.origin)) - union - (with - cntdir as ( - select C.sha1 as sha1, - unnest(CD.directory) as directory, - unnest(CD.location) as location - from content as C - inner join content_in_directory as CD on (CD.content = C.id) - where C.sha1 = content_id), - cntrev as ( - select CD.sha1 as sha1, - L.path as path, - unnest(DR.revision) as revision, - unnest(DR.location) as location - from cntdir as CD - inner join directory_in_revision as DR on (DR.directory = CD.directory) - inner join location as L on (L.id = CD.location)) - select CR.sha1 as content, - R.sha1 as revision, - R.date as date, - O.url as origin, - case DL.path - when '' then CR.path - when '.' then CR.path - else (DL.path || '/' || CR.path)::unix_path - end as path - from cntrev as CR - inner join revision as R on (R.id = CR.revision) - inner join location as DL on (DL.id = CR.location) - left join origin as O on (O.id = R.origin)) - order by date, revision, origin, path limit early_cut -$$; - -create or replace function swh_provenance_relation_add_from_temp( - rel_table regclass, src_table regclass, dst_table regclass -) - returns void - language plpgsql - volatile -as $$ - declare - select_fields text; - join_location text; - group_entries text; - on_conflict text; - begin - - execute format( - 'insert into %s (sha1) - select distinct src - from tmp_relation_add - where not exists (select 1 from %s where %s.sha1=tmp_relation_add.src) - on conflict do nothing', - src_table, src_table, src_table); - - execute format( - 'insert into %s (sha1) - select distinct dst - from tmp_relation_add - where not exists (select 1 from %s where %s.sha1=tmp_relation_add.dst) - on conflict do nothing', - dst_table, dst_table, dst_table); - - if src_table in ('content'::regclass, 'directory'::regclass) then - insert into location(path) - select distinct path - from tmp_relation_add - where not exists (select 1 from location - where digest(location.path,'sha1')=digest(tmp_relation_add.path,'sha1') - ) - on conflict do nothing; - end if; - - if src_table in ('content'::regclass, 'directory'::regclass) then - select_fields := 'array_agg(D.id), array_agg(L.id)'; - join_location := 'inner join location as L on (digest(L.path,''sha1'') = digest(V.path,''sha1''))'; - group_entries := 'group by S.id'; - on_conflict := format(' - (%s) do update - set (%s, location) = ( - with pairs as ( - select distinct * from unnest( - %s.' || dst_table::text || ' || excluded.' || dst_table::text || ', - %s.location || excluded.location - ) as pair(dst, loc) - ) - select array(select pairs.dst from pairs), array(select pairs.loc from pairs) - )', - src_table, dst_table, rel_table, rel_table, rel_table, rel_table - ); - else - select_fields := 'D.id'; - join_location := ''; - group_entries := ''; - on_conflict := 'do nothing'; - end if; - - execute format( - 'insert into %s - select S.id, ' || select_fields || ' - from tmp_relation_add as V - inner join %s as S on (S.sha1 = V.src) - inner join %s as D on (D.sha1 = V.dst) - ' || join_location || ' - ' || group_entries || ' - on conflict ' || on_conflict, - rel_table, src_table, dst_table - ); - end; -$$; - -create or replace function swh_provenance_relation_get( - rel_table regclass, src_table regclass, dst_table regclass, filter rel_flt, sha1s sha1_git[] -) - returns table ( - src sha1_git, - dst sha1_git, - path unix_path - ) - language plpgsql - stable -as $$ - declare - src_field text; - dst_field text; - proj_dst_id text; - proj_unnested text; - proj_location text; - join_location text; - filter_inner_result text; - filter_outer_result text; - begin - if rel_table = 'revision_before_revision'::regclass then - src_field := 'prev'; - dst_field := 'next'; - else - src_field := src_table::text; - dst_field := dst_table::text; - end if; - - if src_table in ('content'::regclass, 'directory'::regclass) then - proj_unnested := 'unnest(R.' || dst_field || ') as dst, unnest(R.location) as loc'; - proj_dst_id := 'CL.dst'; - join_location := 'inner join location as L on (L.id = CL.loc)'; - proj_location := 'L.path'; - else - proj_unnested := 'R.' || dst_field || ' as dst'; - proj_dst_id := 'CL.dst'; - join_location := ''; - proj_location := 'NULL::unix_path'; - end if; - - case filter - when 'filter-src'::rel_flt then - filter_inner_result := 'where S.sha1 = any($1)'; - filter_outer_result := ''; - when 'filter-dst'::rel_flt then - filter_inner_result := ''; - filter_outer_result := 'where D.sha1 = any($1)'; - else - filter_inner_result := ''; - filter_outer_result := ''; - end case; - - return query execute format( - 'select CL.src, D.sha1 as dst, ' || proj_location || ' as path - from (select S.sha1 as src, ' || proj_unnested || ' - from %s as R - inner join %s as S on (S.id = R.' || src_field || ') - ' || filter_inner_result || ') as CL - inner join %s as D on (D.id = ' || proj_dst_id || ') - ' || join_location || ' - ' || filter_outer_result, - rel_table, src_table, dst_table - ) using sha1s; - end; -$$; - -\endif --- :dbflavor_norm diff --git a/swh/provenance/sql/60-indexes.sql b/swh/provenance/sql/60-indexes.sql index bf2c70d..ecbfa1a 100644 --- a/swh/provenance/sql/60-indexes.sql +++ b/swh/provenance/sql/60-indexes.sql @@ -1,20 +1,11 @@ --- psql variables to get the current database flavor -select position('denormalized' in swh_get_dbflavor()::text) = 0 as dbflavor_norm \gset - -- create unique indexes (instead of pkey) because location might be null for -- the without-path flavor -\if :dbflavor_norm create unique index on content_in_revision(content, revision, location); create unique index on directory_in_revision(directory, revision, location); create unique index on content_in_directory(content, directory, location); -\else -create unique index on content_in_revision(content); -create unique index on directory_in_revision(directory); -create unique index on content_in_directory(content); -\endif create unique index on location(digest(path, 'sha1')); create index on directory(sha1) where flat=false; alter table revision_in_origin add primary key (revision, origin); alter table revision_before_revision add primary key (prev, next); diff --git a/swh/provenance/sql/upgrades/005.sql b/swh/provenance/sql/upgrades/005.sql new file mode 100644 index 0000000..0f9f497 --- /dev/null +++ b/swh/provenance/sql/upgrades/005.sql @@ -0,0 +1,5 @@ +drop function if exists swh_get_dbflavor(); + +drop table if exists dbflavor; + +drop type if exists database_flavor; diff --git a/swh/provenance/storage/postgresql.py b/swh/provenance/storage/postgresql.py index 9d61efa..8448a72 100644 --- a/swh/provenance/storage/postgresql.py +++ b/swh/provenance/storage/postgresql.py @@ -1,396 +1,392 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from contextlib import contextmanager from datetime import datetime from functools import wraps from hashlib import sha1 import itertools import logging from types import TracebackType from typing import Dict, Generator, Iterable, List, Optional, Set, Type import psycopg2.extensions import psycopg2.extras from swh.core.db import BaseDb from swh.core.statsd import statsd from swh.model.model import Sha1Git from swh.provenance.storage.interface import ( DirectoryData, EntityType, ProvenanceResult, ProvenanceStorageInterface, RelationData, RelationType, RevisionData, ) LOGGER = logging.getLogger(__name__) STORAGE_DURATION_METRIC = "swh_provenance_storage_postgresql_duration_seconds" def handle_raise_on_commit(f): @wraps(f) def handle(self, *args, **kwargs): try: return f(self, *args, **kwargs) except BaseException as ex: # Unexpected error occurred, rollback all changes and log message LOGGER.exception("Unexpected error") if self.raise_on_commit: raise ex return False return handle class ProvenanceStoragePostgreSql: - current_version = 4 + current_version = 5 def __init__( self, page_size: Optional[int] = None, raise_on_commit: bool = False, db: str = "", ) -> None: self.conn: Optional[psycopg2.extensions.connection] = None self.dsn = db self._flavor: Optional[str] = None self.page_size = page_size self.raise_on_commit = raise_on_commit def __enter__(self) -> ProvenanceStorageInterface: self.open() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: self.close() @contextmanager def transaction( self, readonly: bool = False ) -> Generator[psycopg2.extras.RealDictCursor, None, None]: if self.conn is None: raise RuntimeError( "Tried to access ProvenanceStoragePostgreSQL transaction() without opening it" ) self.conn.set_session(readonly=readonly) with self.conn: with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: yield cur @property def flavor(self) -> str: if self._flavor is None: with self.transaction(readonly=True) as cursor: cursor.execute("SELECT swh_get_dbflavor() AS flavor") flavor = cursor.fetchone() assert flavor # please mypy self._flavor = flavor["flavor"] assert self._flavor is not None return self._flavor - @property - def denormalized(self) -> bool: - return "denormalized" in self.flavor - @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"}) def close(self) -> None: assert self.conn is not None self.conn.close() @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"}) @handle_raise_on_commit def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool: if cnts: sql = """ INSERT INTO content(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,content.date) """ page_size = self.page_size or len(cnts) with self.transaction() as cursor: psycopg2.extras.execute_values( cursor, sql, argslist=cnts.items(), page_size=page_size ) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"}) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: sql = "SELECT * FROM swh_provenance_content_find_first(%s)" with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=(id,)) row = cursor.fetchone() return ProvenanceResult(**row) if row is not None else None @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_all"}) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: sql = "SELECT * FROM swh_provenance_content_find_all(%s, %s)" with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=(id, limit)) yield from (ProvenanceResult(**row) for row in cursor) @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_get"}) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: dates: Dict[Sha1Git, datetime] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date FROM content WHERE sha1 IN ({values}) AND date IS NOT NULL """ with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=sha1s) dates.update((row["sha1"], row["date"]) for row in cursor) return dates @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"}) @handle_raise_on_commit def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool: data = [(sha1, rev.date, rev.flat) for sha1, rev in dirs.items()] if data: sql = """ INSERT INTO directory(sha1, date, flat) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date, directory.date), flat=(EXCLUDED.flat OR directory.flat) """ page_size = self.page_size or len(data) with self.transaction() as cursor: psycopg2.extras.execute_values( cur=cursor, sql=sql, argslist=data, page_size=page_size ) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"}) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]: result: Dict[Sha1Git, DirectoryData] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date, flat FROM directory WHERE sha1 IN ({values}) AND date IS NOT NULL """ with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=sha1s) result.update( (row["sha1"], DirectoryData(date=row["date"], flat=row["flat"])) for row in cursor ) return result @statsd.timed( metric=STORAGE_DURATION_METRIC, tags={"method": "directory_iter_not_flattened"} ) def directory_iter_not_flattened( self, limit: int, start_id: Sha1Git ) -> List[Sha1Git]: sql = """ SELECT sha1 FROM directory WHERE flat=false AND sha1>%s ORDER BY sha1 LIMIT %s """ with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=(start_id, limit)) return [row["sha1"] for row in cursor] @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "entity_get_all"}) def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: with self.transaction(readonly=True) as cursor: cursor.execute(f"SELECT sha1 FROM {entity.value}") return {row["sha1"] for row in cursor} @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"}) @handle_raise_on_commit def location_add(self, paths: Dict[Sha1Git, bytes]) -> bool: values = [(path,) for path in paths.values()] if values: sql = """ INSERT INTO location(path) VALUES %s ON CONFLICT DO NOTHING """ page_size = self.page_size or len(values) with self.transaction() as cursor: psycopg2.extras.execute_values( cursor, sql, argslist=values, page_size=page_size ) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"}) def location_get_all(self) -> Dict[Sha1Git, bytes]: with self.transaction(readonly=True) as cursor: cursor.execute("SELECT location.path AS path FROM location") return {sha1(row["path"]).digest(): row["path"] for row in cursor} @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"}) @handle_raise_on_commit def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: if orgs: sql = """ INSERT INTO origin(sha1, url) VALUES %s ON CONFLICT DO NOTHING """ page_size = self.page_size or len(orgs) with self.transaction() as cursor: psycopg2.extras.execute_values( cur=cursor, sql=sql, argslist=orgs.items(), page_size=page_size, ) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"}) def open(self) -> None: self.conn = BaseDb.connect(self.dsn).conn BaseDb.adapt_conn(self.conn) with self.transaction() as cursor: cursor.execute("SET timezone TO 'UTC'") @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_get"}) def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: urls: Dict[Sha1Git, str] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, url FROM origin WHERE sha1 IN ({values}) """ with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=sha1s) urls.update((row["sha1"], row["url"]) for row in cursor) return urls @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"}) @handle_raise_on_commit def revision_add(self, revs: Dict[Sha1Git, RevisionData]) -> bool: if revs: data = [(sha1, rev.date, rev.origin) for sha1, rev in revs.items()] sql = """ INSERT INTO revision(sha1, date, origin) (SELECT V.rev AS sha1, V.date::timestamptz AS date, O.id AS origin FROM (VALUES %s) AS V(rev, date, org) LEFT JOIN origin AS O ON (O.sha1=V.org::sha1_git)) ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date, revision.date), origin=COALESCE(EXCLUDED.origin, revision.origin) """ page_size = self.page_size or len(data) with self.transaction() as cursor: psycopg2.extras.execute_values( cur=cursor, sql=sql, argslist=data, page_size=page_size ) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"}) def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: result: Dict[Sha1Git, RevisionData] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT R.sha1, R.date, O.sha1 AS origin FROM revision AS R LEFT JOIN origin AS O ON (O.id=R.origin) WHERE R.sha1 IN ({values}) AND (R.date is not NULL OR O.sha1 is not NULL) """ with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=sha1s) result.update( (row["sha1"], RevisionData(date=row["date"], origin=row["origin"])) for row in cursor ) return result @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_add"}) @handle_raise_on_commit def relation_add( self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] ) -> bool: rows = [(src, rel.dst, rel.path) for src, dsts in data.items() for rel in dsts] if rows: rel_table = relation.value src_table, *_, dst_table = rel_table.split("_") page_size = self.page_size or len(rows) # Put the next three queries in a manual single transaction: # they use the same temp table with self.transaction() as cursor: cursor.execute("SELECT swh_mktemp_relation_add()") psycopg2.extras.execute_values( cur=cursor, sql="INSERT INTO tmp_relation_add(src, dst, path) VALUES %s", argslist=rows, page_size=page_size, ) sql = "SELECT swh_provenance_relation_add_from_temp(%s, %s, %s)" cursor.execute(query=sql, vars=(rel_table, src_table, dst_table)) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get"}) def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Dict[Sha1Git, Set[RelationData]]: return self._relation_get(relation, ids, reverse) @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get_all"}) def relation_get_all( self, relation: RelationType ) -> Dict[Sha1Git, Set[RelationData]]: return self._relation_get(relation, None) def _relation_get( self, relation: RelationType, ids: Optional[Iterable[Sha1Git]], reverse: bool = False, ) -> Dict[Sha1Git, Set[RelationData]]: result: Dict[Sha1Git, Set[RelationData]] = {} sha1s: List[Sha1Git] if ids is not None: sha1s = list(ids) filter = "filter-src" if not reverse else "filter-dst" else: sha1s = [] filter = "no-filter" if filter == "no-filter" or sha1s: rel_table = relation.value src_table, *_, dst_table = rel_table.split("_") sql = "SELECT * FROM swh_provenance_relation_get(%s, %s, %s, %s, %s)" with self.transaction(readonly=True) as cursor: cursor.execute( query=sql, vars=(rel_table, src_table, dst_table, filter, sha1s) ) for row in cursor: src = row.pop("src") result.setdefault(src, set()).add(RelationData(**row)) return result diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index 74f453c..1d725e8 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,84 +1,84 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import partial from typing import Generator from _pytest.fixtures import SubRequest import psycopg2.extensions import pytest from pytest_postgresql import factories from swh.core.db.db_utils import initialize_database_for_module from swh.provenance import get_provenance from swh.provenance.archive.interface import ArchiveInterface from swh.provenance.archive.storage import ArchiveStorage from swh.provenance.interface import ProvenanceInterface from swh.provenance.storage import get_provenance_storage from swh.provenance.storage.interface import ProvenanceStorageInterface from swh.provenance.storage.postgresql import ProvenanceStoragePostgreSql from swh.storage.interface import StorageInterface provenance_postgresql_proc = factories.postgresql_proc( load=[ partial( initialize_database_for_module, modname="provenance", - flavor="normalized", version=ProvenanceStoragePostgreSql.current_version, ) ], ) postgres_provenance = factories.postgresql("provenance_postgresql_proc") @pytest.fixture() def provenance_postgresqldb(request, postgres_provenance) -> str: return postgres_provenance.dsn @pytest.fixture() def provenance_storage( request: SubRequest, provenance_postgresqldb: str, ) -> Generator[ProvenanceStorageInterface, None, None]: """Return a working and initialized ProvenanceStorageInterface object""" # in test sessions, we DO want to raise any exception occurring at commit time with get_provenance_storage( cls="postgresql", db=provenance_postgresqldb, raise_on_commit=True ) as storage: yield storage @pytest.fixture def provenance( postgres_provenance: psycopg2.extensions.connection, ) -> Generator[ProvenanceInterface, None, None]: """Return a working and initialized ProvenanceInterface object""" from swh.core.db.db_utils import ( init_admin_extensions, populate_database_for_package, ) init_admin_extensions("swh.provenance", postgres_provenance.dsn) populate_database_for_package( - "swh.provenance", postgres_provenance.dsn, flavor="normalized" + "swh.provenance", + postgres_provenance.dsn, ) # in test sessions, we DO want to raise any exception occurring at commit time with get_provenance( cls="postgresql", db=postgres_provenance.dsn, raise_on_commit=True, ) as provenance: yield provenance @pytest.fixture def archive(swh_storage: StorageInterface) -> ArchiveInterface: """Return an ArchiveStorage-based ArchiveInterface object""" return ArchiveStorage(swh_storage) diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py index 2c49e3b..26ec93e 100644 --- a/swh/provenance/tests/test_cli.py +++ b/swh/provenance/tests/test_cli.py @@ -1,259 +1,191 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import logging import re from typing import Dict, List -from _pytest.monkeypatch import MonkeyPatch from click.testing import CliRunner from confluent_kafka import Producer import psycopg2.extensions import pytest from swh.core.cli import swh as swhmain import swh.core.cli.db # noqa ; ensure cli is loaded -from swh.core.db import BaseDb from swh.core.db.db_utils import init_admin_extensions from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.model.hashutil import MultiHash import swh.provenance.cli # noqa ; ensure cli is loaded from swh.provenance.storage.interface import EntityType, RelationType from swh.storage.interface import StorageInterface from .utils import fill_storage, get_datafile, invoke, load_repo_data logger = logging.getLogger(__name__) -def now(): - return datetime.now(timezone.utc) - - def test_cli_swh_db_help() -> None: # swhmain.add_command(provenance_cli) result = CliRunner().invoke(swhmain, ["provenance", "-h"]) assert result.exit_code == 0 assert "Commands:" in result.output commands = result.output.split("Commands:")[1] for command in ( "find-all", "find-first", "iter-frontiers", "iter-origins", "iter-revisions", ): assert f" {command} " in commands -TABLES = { - "dbflavor", - "dbmodule", - "dbversion", - "content", - "content_in_revision", - "content_in_directory", - "directory", - "directory_in_revision", - "location", - "origin", - "revision", - "revision_before_revision", - "revision_in_origin", -} - - -@pytest.mark.parametrize("flavor", ("normalized", "denormalized")) -def test_cli_db_create_and_init_db_with_flavor( - monkeypatch: MonkeyPatch, - postgresql: psycopg2.extensions.connection, - flavor: str, -) -> None: - """Test that 'swh db init provenance' works with flavors""" - - dbname = f"{flavor}-db" - - # DB creation using 'swh db create' - db_params = postgresql.get_dsn_parameters() - monkeypatch.setenv("PGHOST", db_params["host"]) - monkeypatch.setenv("PGUSER", db_params["user"]) - monkeypatch.setenv("PGPORT", db_params["port"]) - result = CliRunner().invoke(swhmain, ["db", "create", "-d", dbname, "provenance"]) - assert result.exit_code == 0, result.output - - # DB init using 'swh db init' - result = CliRunner().invoke( - swhmain, ["db", "init", "-d", dbname, "--flavor", flavor, "provenance"] - ) - assert result.exit_code == 0, result.output - assert f"(flavor {flavor})" in result.output - - db_params["dbname"] = dbname - cnx = BaseDb.connect(**db_params).conn - # check the DB looks OK (check for db_flavor and expected tables) - with cnx.cursor() as cur: - cur.execute("select swh_get_dbflavor()") - assert cur.fetchone() == (flavor,) - - cur.execute( - "select table_name from information_schema.tables " - "where table_schema = 'public' " - f"and table_catalog = '{dbname}'" - ) - tables = set(x for (x,) in cur.fetchall()) - assert tables == TABLES - - def test_cli_init_db_default_flavor(postgresql: psycopg2.extensions.connection) -> None: "Test that 'swh db init provenance' defaults to a normalized flavored DB" dbname = postgresql.dsn init_admin_extensions("swh.provenance", dbname) result = CliRunner().invoke(swhmain, ["db", "init", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output - with postgresql.cursor() as cur: - cur.execute("select swh_get_dbflavor()") - assert cur.fetchone() == ("normalized",) - @pytest.mark.origin_layer @pytest.mark.parametrize( "subcommand", (["origin", "from-csv"], ["iter-origins"]), ) def test_cli_origin_from_csv( swh_storage: StorageInterface, subcommand: List[str], swh_storage_backend_config: Dict, provenance, tmp_path, ): repo = "cmdbts2" origin_url = f"https://{repo}" data = load_repo_data(repo) fill_storage(swh_storage, data) assert len(data["origin"]) >= 1 assert origin_url in [o["url"] for o in data["origin"]] cfg = { "provenance": { "archive": { "cls": "api", "storage": swh_storage_backend_config, }, "storage": { "cls": "postgresql", "db": provenance.storage.conn.dsn, }, }, } csv_filepath = get_datafile("origins.csv") subcommand = subcommand + [csv_filepath] result = invoke(subcommand, config=cfg) assert result.exit_code == 0, f"Unexpected result: {result.output}" origin_sha1 = MultiHash.from_data( origin_url.encode(), hash_names=["sha1"] ).digest()["sha1"] actual_result = provenance.storage.origin_get([origin_sha1]) assert actual_result == {origin_sha1: origin_url} @pytest.mark.kafka def test_replay( provenance_storage, provenance_postgresqldb: str, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, ): kafka_prefix += ".swh.journal.provenance" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) for i in range(10): date = datetime.fromtimestamp(i, tz=timezone.utc) cntkey = (b"cnt:" + bytes([i])).ljust(20, b"\x00") dirkey = (b"dir:" + bytes([i])).ljust(20, b"\x00") revkey = (b"rev:" + bytes([i])).ljust(20, b"\x00") loc = f"dir/{i}".encode() producer.produce( topic=kafka_prefix + ".content_in_revision", key=key_to_kafka(cntkey), value=value_to_kafka({"src": cntkey, "dst": revkey, "path": loc}), ) producer.produce( topic=kafka_prefix + ".content_in_directory", key=key_to_kafka(cntkey), value=value_to_kafka({"src": cntkey, "dst": dirkey, "path": loc}), ) producer.produce( topic=kafka_prefix + ".directory_in_revision", key=key_to_kafka(dirkey), value=value_to_kafka({"src": dirkey, "dst": revkey, "path": loc}), ) # now add dates to entities producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(cntkey), value=value_to_kafka({"id": cntkey, "value": date}), ) producer.produce( topic=kafka_prefix + ".directory", key=key_to_kafka(dirkey), value=value_to_kafka({"id": dirkey, "value": date}), ) producer.produce( topic=kafka_prefix + ".revision", key=key_to_kafka(revkey), value=value_to_kafka({"id": revkey, "value": date}), ) producer.flush() logger.debug("Flushed producer") config = { "provenance": { "storage": { "cls": "postgresql", "db": provenance_postgresqldb, }, "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, "stop_on_eof": True, }, } } result = invoke(["replay"], config=config) expected = r"Done. processed 60 messages\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert len(provenance_storage.entity_get_all(EntityType.CONTENT)) == 10 assert len(provenance_storage.entity_get_all(EntityType.REVISION)) == 10 assert len(provenance_storage.entity_get_all(EntityType.DIRECTORY)) == 10 assert len(provenance_storage.location_get_all()) == 10 assert len(provenance_storage.relation_get_all(RelationType.CNT_EARLY_IN_REV)) == 10 assert len(provenance_storage.relation_get_all(RelationType.DIR_IN_REV)) == 10 assert len(provenance_storage.relation_get_all(RelationType.CNT_IN_DIR)) == 10 diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py deleted file mode 100644 index a98be40..0000000 --- a/swh/provenance/tests/test_provenance_db.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright (C) 2021-2022 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from swh.provenance.interface import ProvenanceInterface -from swh.provenance.storage.postgresql import ProvenanceStoragePostgreSql - - -def test_provenance_flavor(provenance: ProvenanceInterface) -> None: - if isinstance(provenance.storage, ProvenanceStoragePostgreSql): - assert provenance.storage.flavor in ( - "normalized", - "denormalized", - ) diff --git a/swh/provenance/tests/test_provenance_storage_denormalized.py b/swh/provenance/tests/test_provenance_storage_denormalized.py deleted file mode 100644 index 8fc9062..0000000 --- a/swh/provenance/tests/test_provenance_storage_denormalized.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright (C) 2021-2022 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from functools import partial - -from pytest_postgresql import factories - -from swh.core.db.db_utils import initialize_database_for_module -from swh.provenance.storage.postgresql import ProvenanceStoragePostgreSql - -from .test_provenance_storage import TestProvenanceStorage # noqa: F401 - -provenance_postgresql_proc = factories.postgresql_proc( - load=[ - partial( - initialize_database_for_module, - modname="provenance", - flavor="denormalized", - version=ProvenanceStoragePostgreSql.current_version, - ) - ], -)