diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..62f1279 --- /dev/null +++ b/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["swh.core.db.pytest_plugin", "swh.storage.pytest_plugin"] diff --git a/requirements-swh.txt b/requirements-swh.txt index e4b2e83..0b76cb6 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. -swh.core +swh.core >= 0.12 swh.model -swh.storage \ No newline at end of file +swh.storage diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 070d55d..bc0fd39 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,220 +1,200 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml -from psycopg2.extensions import parse_dsn from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002", } # "cls": "ps", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, "provenance": {"cls": "ps", "db": {"host": "localhost", "dbname": "provenance"}}, } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit(): pr.disable() pr.dump_stats(profile) atexit.register(exit) -@cli.command(name="create") +@cli.command(name="create", deprecated=True) @click.option("--maintenance-db", default=None) @click.option("--drop/--no-drop", "drop_db", default=False) @click.pass_context def create(ctx, maintenance_db, drop_db): - """Create new provenance database.""" - from .postgresql.db_utils import connect - - if ctx.obj["config"]["provenance"]["cls"] != "local": - raise ValueError( - "Unsupported provenance db cls: %s" - % (ctx.obj["config"]["provenance"]["cls"]) - ) - - # Connect to server without selecting a database - dsn = ctx.obj["config"]["provenance"]["db"] - if isinstance(dsn, str): - dsn = parse_dsn(dsn) - dbname = dsn.pop("dbname") - if maintenance_db: - dsn["dbname"] = maintenance_db - - conn = connect(dsn) - - if ctx.obj["config"]["provenance"].get("with_path"): - from .postgresql.provenance import create_database - else: - from .postgresql_nopath.provenance import create_database - - create_database(conn, dbname, drop_db) + """Deprecated, please use: + swh db create provenance + and + swh db init provenance + instead. + """ @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_revisions(ctx, filename, limit): # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance from .provenance import revision_add from .revision import FileRevisionIterator archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) revisions = FileRevisionIterator(filename, archive, limit=limit) while True: revision = revisions.next() if revision is None: break revision_add(provenance, archive, revision) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx, filename, limit): """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import FileOriginIterator from .provenance import origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) for origin in FileOriginIterator(filename, archive, limit=limit): origin_add(provenance, origin) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field row = provenance.content_find_first(hash_to_bytes(swhid)) if row is not None: print( "{blob}, {rev}, {date}, {path}".format( blob=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.pass_context def find_all(ctx, swhid): """Find all occurrences of the requested blob.""" from swh.provenance import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field for row in provenance.content_find_all(hash_to_bytes(swhid)): print( "{blob}, {rev}, {date}, {path}".format( blob=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py index 6990289..bdaa580 100644 --- a/swh/provenance/postgresql/provenance.py +++ b/swh/provenance/postgresql/provenance.py @@ -1,509 +1,484 @@ import itertools import logging import operator import os from datetime import datetime from typing import Any, Dict, Generator, List, Optional, Tuple import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry from ..provenance import ProvenanceInterface from ..revision import RevisionEntry from .db_utils import connect, execute_sql def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path - -def create_database( - conn: psycopg2.extensions.connection, name: str, drop_db: bool = True -): - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - - # Normalize dbname to avoid issues when reconnecting below - name = name.casefold() - - # Create new database dropping previous one if exists - cursor = conn.cursor() - if drop_db: - cursor.execute(f"""DROP DATABASE IF EXISTS {name}""") - cursor.execute(f"""CREATE DATABASE {name}""") - conn.close() - - # Reconnect to server selecting newly created database to add tables - conninfo = psycopg2.extensions.parse_dsn(conn.dsn) - conninfo["dbname"] = name - conn = connect(conninfo) - - sqldir = os.path.dirname(os.path.realpath(__file__)) - execute_sql(conn, os.path.join(sqldir, "provenance.sql")) - - ######################################################################################## ######################################################################################## ######################################################################################## class ProvenancePostgreSQL(ProvenanceInterface): def __init__(self, conn: psycopg2.extensions.connection): # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) self.conn = conn self.cursor = self.conn.cursor() self.insert_cache: Dict[str, Any] = {} self.remove_cache: Dict[str, Any] = {} self.select_cache: Dict[str, Any] = {} self.clear_caches() def clear_caches(self): self.insert_cache = { "content": dict(), "content_early_in_rev": list(), "content_in_dir": list(), "directory": dict(), "directory_in_rev": list(), "revision": dict(), "revision_before_rev": list(), "revision_in_org": list(), } self.remove_cache = {"directory": dict()} self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): result = False try: self.insert_all() self.clear_caches() result = True except Exception as error: # Unexpected error occurred, rollback all changes and log message logging.error(f"Unexpected error: {error}") return result def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_in_dir"].append( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_early_in_rev"].append( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """SELECT content_location.sha1 AS blob, revision.sha1 AS rev, revision.date AS date, content_location.path AS path FROM (SELECT content_hex.sha1, content_hex.rev, location.path FROM (SELECT content.sha1, content_early_in_rev.rev, content_early_in_rev.loc FROM content_early_in_rev JOIN content ON content.id=content_early_in_rev.blob WHERE content.sha1=%s ) AS content_hex JOIN location ON location.id=content_hex.loc ) AS content_location JOIN revision ON revision.id=content_location.rev ORDER BY date, rev, path ASC LIMIT 1""", (blobid,), ) return self.cursor.fetchone() def content_find_all( self, blobid: bytes ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: self.cursor.execute( """(SELECT content_location.sha1 AS blob, revision.sha1 AS rev, revision.date AS date, content_location.path AS path FROM (SELECT content_hex.sha1, content_hex.rev, location.path FROM (SELECT content.sha1, content_early_in_rev.rev, content_early_in_rev.loc FROM content_early_in_rev JOIN content ON content.id=content_early_in_rev.blob WHERE content.sha1=%s ) AS content_hex JOIN location ON location.id=content_hex.loc ) AS content_location JOIN revision ON revision.id=content_location.rev ) UNION (SELECT content_prefix.sha1 AS blob, revision.sha1 AS rev, revision.date AS date, content_prefix.path AS path FROM (SELECT content_in_rev.sha1, content_in_rev.rev, CASE location.path WHEN '' THEN content_in_rev.suffix WHEN '.' THEN content_in_rev.suffix ELSE (location.path || '/' || content_in_rev.suffix)::unix_path END AS path FROM (SELECT content_suffix.sha1, directory_in_rev.rev, directory_in_rev.loc, content_suffix.path AS suffix FROM (SELECT content_hex.sha1, content_hex.dir, location.path FROM (SELECT content.sha1, content_in_dir.dir, content_in_dir.loc FROM content_in_dir JOIN content ON content_in_dir.blob=content.id WHERE content.sha1=%s ) AS content_hex JOIN location ON location.id=content_hex.loc ) AS content_suffix JOIN directory_in_rev ON directory_in_rev.dir=content_suffix.dir ) AS content_in_rev JOIN location ON location.id=content_in_rev.loc ) AS content_prefix JOIN revision ON revision.id=content_prefix.rev ) ORDER BY date, rev, path""", (blobid, blobid), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. yield from self.cursor.fetchall() def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM content WHERE sha1=%s""", (blob.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["content"][blob.id] = date return date def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: dates = {} pending = [] for blob in blobs: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: pending.append(blob.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", tuple(pending), ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["content"][row[0]] = row[1] return dates def content_set_early_date(self, blob: FileEntry, date: datetime): self.insert_cache["content"][blob.id] = date def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.insert_cache["directory_in_rev"].append( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is None and directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["directory"][directory.id] = date return date def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: dates = {} pending = [] for directory in dirs: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date elif directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date else: pending.append(directory.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", tuple(pending), ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["directory"][row[0]] = row[1] return dates def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): self.remove_cache["directory"][directory.id] = None self.insert_cache["directory"].pop(directory.id, None) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.insert_cache["directory"][directory.id] = date self.remove_cache["directory"].pop(directory.id, None) def insert_all(self): # Performe insertions with cached information if self.insert_cache["content"]: psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY content; INSERT INTO content(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,content.date)""", self.insert_cache["content"].items(), ) self.insert_cache["content"].clear() if self.insert_cache["directory"]: psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY directory; INSERT INTO directory(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,directory.date)""", self.insert_cache["directory"].items(), ) self.insert_cache["directory"].clear() if self.insert_cache["revision"]: psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY revision; INSERT INTO revision(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,revision.date)""", self.insert_cache["revision"].items(), ) self.insert_cache["revision"].clear() # Relations should come after ids for elements were resolved if self.insert_cache["content_early_in_rev"]: self.insert_location("content", "revision", "content_early_in_rev") if self.insert_cache["content_in_dir"]: self.insert_location("content", "directory", "content_in_dir") if self.insert_cache["directory_in_rev"]: self.insert_location("directory", "revision", "directory_in_rev") # if self.insert_cache["revision_before_rev"]: # psycopg2.extras.execute_values( # self.cursor, # """INSERT INTO revision_before_rev VALUES %s # ON CONFLICT DO NOTHING""", # self.insert_cache["revision_before_rev"], # ) # self.insert_cache["revision_before_rev"].clear() # if self.insert_cache["revision_in_org"]: # psycopg2.extras.execute_values( # self.cursor, # """INSERT INTO revision_in_org VALUES %s # ON CONFLICT DO NOTHING""", # self.insert_cache["revision_in_org"], # ) # self.insert_cache["revision_in_org"].clear() def insert_location(self, src0_table, src1_table, dst_table): # Resolve src0 ids src0_values = dict().fromkeys( map(operator.itemgetter(0), self.insert_cache[dst_table]) ) values = ", ".join(itertools.repeat("%s", len(src0_values))) self.cursor.execute( f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", tuple(src0_values), ) src0_values = dict(self.cursor.fetchall()) # Resolve src1 ids src1_values = dict().fromkeys( map(operator.itemgetter(1), self.insert_cache[dst_table]) ) values = ", ".join(itertools.repeat("%s", len(src1_values))) self.cursor.execute( f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", tuple(src1_values), ) src1_values = dict(self.cursor.fetchall()) # Resolve location ids location = dict().fromkeys( map(operator.itemgetter(2), self.insert_cache[dst_table]) ) location = dict( psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s ON CONFLICT (path) DO UPDATE SET path=EXCLUDED.path RETURNING path, id""", map(lambda path: (path,), location.keys()), fetch=True, ) ) # Insert values in dst_table rows = map( lambda row: (src0_values[row[0]], src1_values[row[1]], location[row[2]]), self.insert_cache[dst_table], ) psycopg2.extras.execute_values( self.cursor, f"""INSERT INTO {dst_table} VALUES %s ON CONFLICT DO NOTHING""", rows, ) self.insert_cache[dst_table].clear() def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Insert origin in the DB and return the assigned id self.cursor.execute( """INSERT INTO origin (url) VALUES (%s) ON CONFLICT DO NOTHING RETURNING id""", (origin.url,), ) return self.cursor.fetchone()[0] else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB self.insert_cache["revision"][revision.id] = revision.date def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.insert_cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: date = self.insert_cache["revision"].get(revision.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["revision"].get(revision.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["revision"][revision.id] = date return date def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT 1 FROM revision_before_rev JOIN revision ON revision.id=revision_before_rev.prev WHERE revision.sha1=%s""", (revision.id,), ) return self.cursor.fetchone() is not None def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): # TODO: adapt this method to consider cached values self.cursor.execute( """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) ) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT 1 FROM revision_in_org JOIN revision ON revision.id=revision_in_org.rev WHERE revision.sha1=%s""", (revision.id,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/postgresql/provenance.sql b/swh/provenance/postgresql/provenance.sql deleted file mode 100644 index 7d37528..0000000 --- a/swh/provenance/postgresql/provenance.sql +++ /dev/null @@ -1,150 +0,0 @@ --- a Git object ID, i.e., a Git-style salted SHA1 checksum -drop domain if exists sha1_git cascade; -create domain sha1_git as bytea check (length(value) = 20); - --- UNIX path (absolute, relative, individual path component, etc.) -drop domain if exists unix_path cascade; -create domain unix_path as bytea; - - -drop table if exists content; -create table content -( - id bigserial primary key, -- internal identifier of the content blob - sha1 sha1_git unique not null, -- intrinsic identifier of the content blob - date timestamptz not null -- timestamp of the revision where the blob appears early -); - -comment on column content.id is 'Content internal identifier'; -comment on column content.sha1 is 'Content intrinsic identifier'; -comment on column content.date is 'Earliest timestamp for the content (first seen time)'; - - -drop table if exists content_early_in_rev; -create table content_early_in_rev -( - blob bigint not null, -- internal identifier of the content blob - rev bigint not null, -- internal identifier of the revision where the blob appears for the first time - loc bigint not null, -- location of the content relative to the revision root directory - primary key (blob, rev, loc) - -- foreign key (blob) references content (id), - -- foreign key (rev) references revision (id), - -- foreign key (loc) references location (id) -); - -comment on column content_early_in_rev.blob is 'Content internal identifier'; -comment on column content_early_in_rev.rev is 'Revision internal identifier'; -comment on column content_early_in_rev.loc is 'Location of content in revision'; - - -drop table if exists content_in_dir; -create table content_in_dir -( - blob bigint not null, -- internal identifier of the content blob - dir bigint not null, -- internal identifier of the directory containing the blob - loc bigint not null, -- location of the content relative to its parent directory in the isochrone frontier - primary key (blob, dir, loc) - -- foreign key (blob) references content (id), - -- foreign key (dir) references directory (id), - -- foreign key (loc) references location (id) -); - -comment on column content_in_dir.blob is 'Content internal identifier'; -comment on column content_in_dir.dir is 'Directory internal identifier'; -comment on column content_in_dir.loc is 'Location of content in directory'; - - -drop table if exists directory; -create table directory -( - id bigserial primary key, -- internal identifier of the directory appearing in an isochrone inner frontier - sha1 sha1_git unique not null, -- intrinsic identifier of the directory - date timestamptz not null -- max timestamp among those of the directory children's -); - -comment on column directory.id is 'Directory internal identifier'; -comment on column directory.sha1 is 'Directory intrinsic identifier'; -comment on column directory.date is 'Latest timestamp for the content in the directory'; - - -drop table if exists directory_in_rev; -create table directory_in_rev -( - dir bigint not null, -- internal identifier of the directory appearing in the revision - rev bigint not null, -- internal identifier of the revision containing the directory - loc bigint not null, -- location of the directory relative to the revision root directory - primary key (dir, rev, loc) - -- foreign key (dir) references directory (id), - -- foreign key (rev) references revision (id), - -- foreign key (loc) references location (id) -); - -comment on column directory_in_rev.dir is 'Directory internal identifier'; -comment on column directory_in_rev.rev is 'Revision internal identifier'; -comment on column directory_in_rev.loc is 'Location of directory in revision'; - - -drop table if exists location; -create table location -( - id bigserial primary key, -- internal identifier of the location - path unix_path unique not null -- path to the location -); - -comment on column location.id is 'Location internal identifier'; -comment on column location.path is 'Path to the location'; - - -drop table if exists origin; -create table origin -( - id bigserial primary key, -- internal identifier of the origin - url unix_path unique not null -- url of the origin -); - -comment on column origin.id is 'Origin internal identifier'; -comment on column origin.url is 'URL of the origin'; - - -drop table if exists revision; -create table revision -( - id bigserial primary key, -- internal identifier of the revision - sha1 sha1_git unique not null, -- intrinsic identifier of the revision - date timestamptz not null, -- timestamp of the revision - org bigint -- id of the preferred origin - -- foreign key (org) references origin (id) -); - -comment on column revision.id is 'Revision internal identifier'; -comment on column revision.sha1 is 'Revision intrinsic identifier'; -comment on column revision.date is 'Revision timestamp'; -comment on column revision.org is 'preferred origin for the revision'; - - -drop table if exists revision_before_rev; -create table revision_before_rev -( - prev bigserial not null, -- internal identifier of the source revision - next bigserial not null, -- internal identifier of the destination revision - primary key (prev, next) - -- foreign key (prev) references revision (id), - -- foreign key (next) references revision (id) -); - -comment on column revision_before_rev.prev is 'Source revision internal identifier'; -comment on column revision_before_rev.next is 'Destination revision internal identifier'; - - -drop table if exists revision_in_org; -create table revision_in_org -( - rev bigint not null, -- internal identifier of the revision poined by the origin - org bigint not null, -- internal identifier of the origin that points to the revision - primary key (rev, org) - -- foreign key (rev) references revision (id), - -- foreign key (org) references origin (id) -); - -comment on column revision_in_org.rev is 'Revision internal identifier'; -comment on column revision_in_org.org is 'Origin internal identifier'; diff --git a/swh/provenance/postgresql_nopath/provenance.py b/swh/provenance/postgresql_nopath/provenance.py index 65550ca..ccf3f6b 100644 --- a/swh/provenance/postgresql_nopath/provenance.py +++ b/swh/provenance/postgresql_nopath/provenance.py @@ -1,446 +1,422 @@ import itertools import logging import operator import os from datetime import datetime from typing import Any, Dict, Generator, List, Optional, Tuple import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry from ..postgresql.db_utils import connect, execute_sql from ..provenance import ProvenanceInterface from ..revision import RevisionEntry -def create_database( - conn: psycopg2.extensions.connection, name: str, drop_db: bool = True -): - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - - # Normalize dbname to avoid issues when reconnecting below - name = name.casefold() - - # Create new database dropping previous one if exists - cursor = conn.cursor() - if drop_db: - cursor.execute(f"""DROP DATABASE IF EXISTS {name}""") - cursor.execute(f"""CREATE DATABASE {name}""") - conn.close() - - # Reconnect to server selecting newly created database to add tables - conninfo = psycopg2.extensions.parse_dsn(conn.dsn) - conninfo["dbname"] = name - conn = connect(conninfo) - - sqldir = os.path.dirname(os.path.realpath(__file__)) - execute_sql(conn, os.path.join(sqldir, "provenance.sql")) - - ######################################################################################## ######################################################################################## ######################################################################################## class ProvenancePostgreSQLNoPath(ProvenanceInterface): def __init__(self, conn: psycopg2.extensions.connection): # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) self.conn = conn self.cursor = self.conn.cursor() self.insert_cache: Dict[str, Any] = {} self.remove_cache: Dict[str, Any] = {} self.select_cache: Dict[str, Any] = {} self.clear_caches() def clear_caches(self): self.insert_cache = { "content": dict(), "content_early_in_rev": set(), "content_in_dir": set(), "directory": dict(), "directory_in_rev": set(), "revision": dict(), "revision_before_rev": list(), "revision_in_org": list(), } self.remove_cache = {"directory": dict()} self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): result = False try: self.insert_all() self.clear_caches() result = True except Exception as error: # Unexpected error occurred, rollback all changes and log message logging.error(f"Unexpected error: {error}") return result def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_in_dir"].add((blob.id, directory.id)) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_early_in_rev"].add((blob.id, revision.id)) def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """SELECT revision.sha1 AS rev, revision.date AS date FROM (SELECT content_early_in_rev.rev FROM content_early_in_rev JOIN content ON content.id=content_early_in_rev.blob WHERE content.sha1=%s ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ORDER BY date, rev ASC LIMIT 1""", (blobid,), ) row = self.cursor.fetchone() if row is not None: # TODO: query revision from the archive and look for blobid into a # recursive directory_ls of the revision's root. return blobid, row[0], row[1], b"" return None def content_find_all( self, blobid: bytes ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: self.cursor.execute( """(SELECT revision.sha1 AS rev, revision.date AS date FROM (SELECT content_early_in_rev.rev FROM content_early_in_rev JOIN content ON content.id=content_early_in_rev.blob WHERE content.sha1=%s ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ) UNION (SELECT revision.sha1 AS rev, revision.date AS date FROM (SELECT directory_in_rev.rev FROM (SELECT content_in_dir.dir FROM content_in_dir JOIN content ON content_in_dir.blob=content.id WHERE content.sha1=%s ) AS content_dir JOIN directory_in_rev ON directory_in_rev.dir=content_dir.dir ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ) ORDER BY date, rev""", (blobid, blobid), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. for row in self.cursor.fetchall(): # TODO: query revision from the archive and look for blobid into a # recursive directory_ls of the revision's root. yield blobid, row[0], row[1], b"" def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM content WHERE sha1=%s""", (blob.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["content"][blob.id] = date return date def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: dates = {} pending = [] for blob in blobs: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: pending.append(blob.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", tuple(pending), ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["content"][row[0]] = row[1] return dates def content_set_early_date(self, blob: FileEntry, date: datetime): self.insert_cache["content"][blob.id] = date def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.insert_cache["directory_in_rev"].add((directory.id, revision.id)) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is None and directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["directory"][directory.id] = date return date def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: dates = {} pending = [] for directory in dirs: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date elif directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date else: pending.append(directory.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", tuple(pending), ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["directory"][row[0]] = row[1] return dates def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): self.remove_cache["directory"][directory.id] = None self.insert_cache["directory"].pop(directory.id, None) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.insert_cache["directory"][directory.id] = date self.remove_cache["directory"].pop(directory.id, None) def insert_all(self): # Performe insertions with cached information if self.insert_cache["content"]: psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY content; INSERT INTO content(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,content.date)""", self.insert_cache["content"].items(), ) self.insert_cache["content"].clear() if self.insert_cache["directory"]: psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY directory; INSERT INTO directory(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,directory.date)""", self.insert_cache["directory"].items(), ) self.insert_cache["directory"].clear() if self.insert_cache["revision"]: psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY revision; INSERT INTO revision(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,revision.date)""", self.insert_cache["revision"].items(), ) self.insert_cache["revision"].clear() # Relations should come after ids for elements were resolved if self.insert_cache["content_early_in_rev"]: self.insert_location("content", "revision", "content_early_in_rev") if self.insert_cache["content_in_dir"]: self.insert_location("content", "directory", "content_in_dir") if self.insert_cache["directory_in_rev"]: self.insert_location("directory", "revision", "directory_in_rev") # if self.insert_cache["revision_before_rev"]: # psycopg2.extras.execute_values( # self.cursor, # """INSERT INTO revision_before_rev VALUES %s # ON CONFLICT DO NOTHING""", # self.insert_cache["revision_before_rev"], # ) # self.insert_cache["revision_before_rev"].clear() # if self.insert_cache["revision_in_org"]: # psycopg2.extras.execute_values( # self.cursor, # """INSERT INTO revision_in_org VALUES %s # ON CONFLICT DO NOTHING""", # self.insert_cache["revision_in_org"], # ) # self.insert_cache["revision_in_org"].clear() def insert_location(self, src0_table, src1_table, dst_table): # Resolve src0 ids src0_values = dict().fromkeys( map(operator.itemgetter(0), self.insert_cache[dst_table]) ) values = ", ".join(itertools.repeat("%s", len(src0_values))) self.cursor.execute( f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", tuple(src0_values), ) src0_values = dict(self.cursor.fetchall()) # Resolve src1 ids src1_values = dict().fromkeys( map(operator.itemgetter(1), self.insert_cache[dst_table]) ) values = ", ".join(itertools.repeat("%s", len(src1_values))) self.cursor.execute( f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", tuple(src1_values), ) src1_values = dict(self.cursor.fetchall()) # Insert values in dst_table rows = map( lambda row: (src0_values[row[0]], src1_values[row[1]]), self.insert_cache[dst_table], ) psycopg2.extras.execute_values( self.cursor, f"""INSERT INTO {dst_table} VALUES %s ON CONFLICT DO NOTHING""", rows, ) self.insert_cache[dst_table].clear() def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Insert origin in the DB and return the assigned id self.cursor.execute( """INSERT INTO origin (url) VALUES (%s) ON CONFLICT DO NOTHING RETURNING id""", (origin.url,), ) return self.cursor.fetchone()[0] else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB self.insert_cache["revision"][revision.id] = revision.date def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.insert_cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: date = self.insert_cache["revision"].get(revision.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["revision"].get(revision.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["revision"][revision.id] = date return date def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT 1 FROM revision_before_rev JOIN revision ON revision.id=revision_before_rev.prev WHERE revision.sha1=%s""", (revision.id,), ) return self.cursor.fetchone() is not None def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): # TODO: adapt this method to consider cached values self.cursor.execute( """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) ) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT 1 FROM revision_in_org JOIN revision ON revision.id=revision_in_org.rev WHERE revision.sha1=%s""", (revision.id,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/postgresql_nopath/provenance.sql b/swh/provenance/sql/30-schema.sql similarity index 62% rename from swh/provenance/postgresql_nopath/provenance.sql rename to swh/provenance/sql/30-schema.sql index 568d4be..3c33f99 100644 --- a/swh/provenance/postgresql_nopath/provenance.sql +++ b/swh/provenance/sql/30-schema.sql @@ -1,130 +1,68 @@ -- a Git object ID, i.e., a Git-style salted SHA1 checksum -drop domain if exists sha1_git cascade; create domain sha1_git as bytea check (length(value) = 20); -- UNIX path (absolute, relative, individual path component, etc.) -drop domain if exists unix_path cascade; create domain unix_path as bytea; - -drop table if exists content; create table content ( id bigserial primary key, -- internal identifier of the content blob sha1 sha1_git unique not null, -- intrinsic identifier of the content blob date timestamptz not null -- timestamp of the revision where the blob appears early ); - comment on column content.id is 'Content internal identifier'; comment on column content.sha1 is 'Content intrinsic identifier'; comment on column content.date is 'Earliest timestamp for the content (first seen time)'; - -drop table if exists content_early_in_rev; -create table content_early_in_rev -( - blob bigint not null, -- internal identifier of the content blob - rev bigint not null, -- internal identifier of the revision where the blob appears for the first time - primary key (blob, rev) - -- foreign key (blob) references content (id), - -- foreign key (rev) references revision (id) -); - -comment on column content_early_in_rev.blob is 'Content internal identifier'; -comment on column content_early_in_rev.rev is 'Revision internal identifier'; - - -drop table if exists content_in_dir; -create table content_in_dir -( - blob bigint not null, -- internal identifier of the content blob - dir bigint not null, -- internal identifier of the directory containing the blob - primary key (blob, dir) - -- foreign key (blob) references content (id), - -- foreign key (dir) references directory (id) -); - -comment on column content_in_dir.blob is 'Content internal identifier'; -comment on column content_in_dir.dir is 'Directory internal identifier'; - - -drop table if exists directory; create table directory ( id bigserial primary key, -- internal identifier of the directory appearing in an isochrone inner frontier sha1 sha1_git unique not null, -- intrinsic identifier of the directory date timestamptz not null -- max timestamp among those of the directory children's ); - comment on column directory.id is 'Directory internal identifier'; comment on column directory.sha1 is 'Directory intrinsic identifier'; comment on column directory.date is 'Latest timestamp for the content in the directory'; - -drop table if exists directory_in_rev; -create table directory_in_rev -( - dir bigint not null, -- internal identifier of the directory appearing in the revision - rev bigint not null, -- internal identifier of the revision containing the directory - primary key (dir, rev) - -- foreign key (dir) references directory (id), - -- foreign key (rev) references revision (id) -); - -comment on column directory_in_rev.dir is 'Directory internal identifier'; -comment on column directory_in_rev.rev is 'Revision internal identifier'; - - -drop table if exists origin; create table origin ( id bigserial primary key, -- internal identifier of the origin url unix_path unique not null -- url of the origin ); - comment on column origin.id is 'Origin internal identifier'; comment on column origin.url is 'URL of the origin'; - -drop table if exists revision; create table revision ( id bigserial primary key, -- internal identifier of the revision sha1 sha1_git unique not null, -- intrinsic identifier of the revision date timestamptz not null, -- timestamp of the revision org bigint -- id of the preferred origin -- foreign key (org) references origin (id) ); - comment on column revision.id is 'Revision internal identifier'; comment on column revision.sha1 is 'Revision intrinsic identifier'; comment on column revision.date is 'Revision timestamp'; comment on column revision.org is 'preferred origin for the revision'; - -drop table if exists revision_before_rev; create table revision_before_rev ( prev bigserial not null, -- internal identifier of the source revision next bigserial not null, -- internal identifier of the destination revision primary key (prev, next) -- foreign key (prev) references revision (id), -- foreign key (next) references revision (id) ); - comment on column revision_before_rev.prev is 'Source revision internal identifier'; comment on column revision_before_rev.next is 'Destination revision internal identifier'; - -drop table if exists revision_in_org; create table revision_in_org ( rev bigint not null, -- internal identifier of the revision poined by the origin org bigint not null, -- internal identifier of the origin that points to the revision primary key (rev, org) -- foreign key (rev) references revision (id), -- foreign key (org) references origin (id) ); - comment on column revision_in_org.rev is 'Revision internal identifier'; comment on column revision_in_org.org is 'Origin internal identifier'; diff --git a/swh/provenance/sql/35-schema-with-path-flavor.sql b/swh/provenance/sql/35-schema-with-path-flavor.sql new file mode 100644 index 0000000..e28bb3e --- /dev/null +++ b/swh/provenance/sql/35-schema-with-path-flavor.sql @@ -0,0 +1,46 @@ +create table content_early_in_rev +( + blob bigint not null, -- internal identifier of the content blob + rev bigint not null, -- internal identifier of the revision where the blob appears for the first time + loc bigint not null -- location of the content relative to the revision root directory + -- foreign key (blob) references content (id), + -- foreign key (rev) references revision (id), + -- foreign key (loc) references location (id) +); +comment on column content_early_in_rev.blob is 'Content internal identifier'; +comment on column content_early_in_rev.rev is 'Revision internal identifier'; +comment on column content_early_in_rev.loc is 'Location of content in revision'; + +create table content_in_dir +( + blob bigint not null, -- internal identifier of the content blob + dir bigint not null, -- internal identifier of the directory containing the blob + loc bigint not null -- location of the content relative to its parent directory in the isochrone frontier + -- foreign key (blob) references content (id), + -- foreign key (dir) references directory (id), + -- foreign key (loc) references location (id) +); +comment on column content_in_dir.blob is 'Content internal identifier'; +comment on column content_in_dir.dir is 'Directory internal identifier'; +comment on column content_in_dir.loc is 'Location of content in directory'; + +create table directory_in_rev +( + dir bigint not null, -- internal identifier of the directory appearing in the revision + rev bigint not null, -- internal identifier of the revision containing the directory + loc bigint not null -- location of the directory relative to the revision root directory + -- foreign key (dir) references directory (id), + -- foreign key (rev) references revision (id), + -- foreign key (loc) references location (id) +); +comment on column directory_in_rev.dir is 'Directory internal identifier'; +comment on column directory_in_rev.rev is 'Revision internal identifier'; +comment on column directory_in_rev.loc is 'Location of directory in revision'; + +create table location +( + id bigserial primary key, -- internal identifier of the location + path unix_path unique not null -- path to the location +); +comment on column location.id is 'Location internal identifier'; +comment on column location.path is 'Path to the location'; diff --git a/swh/provenance/sql/35-schema-without-path-flavor.sql b/swh/provenance/sql/35-schema-without-path-flavor.sql new file mode 100644 index 0000000..9222595 --- /dev/null +++ b/swh/provenance/sql/35-schema-without-path-flavor.sql @@ -0,0 +1,29 @@ +create table content_early_in_rev +( + blob bigint not null, -- internal identifier of the content blob + rev bigint not null -- internal identifier of the revision where the blob appears for the first time + -- foreign key (blob) references content (id), + -- foreign key (rev) references revision (id) +); +comment on column content_early_in_rev.blob is 'Content internal identifier'; +comment on column content_early_in_rev.rev is 'Revision internal identifier'; + +create table content_in_dir +( + blob bigint not null, -- internal identifier of the content blob + dir bigint not null -- internal identifier of the directory containing the blob + -- foreign key (blob) references content (id), + -- foreign key (dir) references directory (id) +); +comment on column content_in_dir.blob is 'Content internal identifier'; +comment on column content_in_dir.dir is 'Directory internal identifier'; + +create table directory_in_rev +( + dir bigint not null, -- internal identifier of the directory appearing in the revision + rev bigint not null -- internal identifier of the revision containing the directory + -- foreign key (dir) references directory (id), + -- foreign key (rev) references revision (id) +); +comment on column directory_in_rev.dir is 'Directory internal identifier'; +comment on column directory_in_rev.rev is 'Revision internal identifier'; diff --git a/swh/provenance/sql/60-indexes-with-path-flavor.sql b/swh/provenance/sql/60-indexes-with-path-flavor.sql new file mode 100644 index 0000000..9190b7c --- /dev/null +++ b/swh/provenance/sql/60-indexes-with-path-flavor.sql @@ -0,0 +1,3 @@ +alter table content_early_in_rev add primary key (blob, rev, loc); +alter table content_in_dir add primary key (blob, dir, loc); +alter table directory_in_rev add primary key (dir, rev, loc); diff --git a/swh/provenance/sql/60-indexes-without-path-flavor.sql b/swh/provenance/sql/60-indexes-without-path-flavor.sql new file mode 100644 index 0000000..daf925c --- /dev/null +++ b/swh/provenance/sql/60-indexes-without-path-flavor.sql @@ -0,0 +1,3 @@ +alter table content_early_in_rev add primary key (blob, rev); +alter table content_in_dir add primary key (blob, dir); +alter table directory_in_rev add primary key (dir, rev); diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py new file mode 100644 index 0000000..1df70ee --- /dev/null +++ b/swh/provenance/tests/conftest.py @@ -0,0 +1,28 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import glob +from os import path + +import pytest + +from swh.core.db.pytest_plugin import postgresql_fact +from swh.core.utils import numfile_sortkey as sortkey +import swh.provenance + + +SQL_DIR = path.join(path.dirname(swh.provenance.__file__), "sql") +SQL_FILES = [sqlfile for sqlfile in sorted(glob.glob(path.join(SQL_DIR, "*.sql")), key=sortkey) + if '-without-path-' not in sqlfile] + +provenance_db = postgresql_fact( + "postgresql_proc", db_name="provenance", dump_files=SQL_FILES) + + +@pytest.fixture +def provenance(provenance_db): + """return a working and initialized provenance db""" + from swh.provenance.postgresql.provenance import ProvenancePostgreSQL as ProvenanceDB + return ProvenanceDB(provenance_db) diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py index 990c43b..2c2e5fc 100644 --- a/swh/provenance/tests/test_cli.py +++ b/swh/provenance/tests/test_cli.py @@ -1,54 +1,39 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import pytest -import yaml from click.testing import CliRunner -from psycopg2.extensions import parse_dsn +import yaml -import swh.provenance.cli # noqa ; ensure cli is loaded from swh.core.cli import swh as swhmain -from swh.core.db.pytest_plugin import postgresql_fact - -pytest_plugins = ["swh.storage.pytest_plugin"] - -provenance_db = postgresql_fact("postgresql_proc", db_name="provenance",) +import swh.provenance.cli # noqa ; ensure cli is loaded def test_cli_swh_db_help(): # swhmain.add_command(provenance_cli) result = CliRunner().invoke(swhmain, ["provenance", "-h"]) assert result.exit_code == 0 assert "Commands:" in result.output commands = result.output.split("Commands:")[1] for command in ( "create", "find-all", "find-first", "iter-origins", "iter-revisions", ): assert f" {command} " in commands -@pytest.mark.parametrize("with_path", (True, False)) -def test_cli_create(provenance_db, tmp_path, with_path): +def test_cli_create_deprecated(provenance_db, tmp_path): conffile = tmp_path / "config.yml" - dsn = parse_dsn(provenance_db.dsn) - dsn["dbname"] = "test_provenance" conf = { - "provenance": {"cls": "local", "with_path": with_path, "db": dsn,}, + "provenance": {"cls": "local", "with_path": True,}, } yaml.dump(conf, conffile.open("w")) result = CliRunner().invoke( swhmain, ["provenance", "--config-file", str(conffile), "create", "--drop"] ) assert result.exit_code == 0, result.output - - # this will fail because the db already exists - result = CliRunner().invoke( - swhmain, ["provenance", "--config-file", str(conffile), "create"] - ) - assert result.exit_code == 1, result.output + assert "DeprecationWarning" in result.output diff --git a/swh/provenance/tests/test_conftest.py b/swh/provenance/tests/test_conftest.py new file mode 100644 index 0000000..ce1190f --- /dev/null +++ b/swh/provenance/tests/test_conftest.py @@ -0,0 +1,9 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +def test_provenance_fixture(provenance): + assert provenance + provenance.insert_all() # should be a noop