diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 42e5de9..1f8a978 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,154 +1,154 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.core.db import db_utils # TODO: remove this in favour of local db_utils module from swh.model.hashutil import (hash_to_bytes, hash_to_hex) from swh.storage import get_storage # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002" }, - "db": "postgresql://postgres:postgres@localhost/provenance" + "db": "postgresql://postgres:postgres@localhost/provenance" # TODO: fix this! } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP, ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file""", ) @click.pass_context def cli(ctx, config_file: Optional[str]): from .db_utils import adapt_conn if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf conn = db_utils.connect_to_conninfo(conf["db"]) adapt_conn(conn) ctx.obj["conn"] = conn @cli.command(name="create") @click.option("--name", default='provenance') @click.pass_context def create(ctx, name): """Create new provenance database.""" from .provenance import create_database from .db_utils import adapt_conn # Close default connection as it won't be used ctx.obj["conn"].close() # Connect to server without selecting a database conninfo = os.path.dirname(ctx.obj["config"]["db"]) conn = db_utils.connect_to_conninfo(conninfo) adapt_conn(conn) - + create_database(conn, conninfo, name) @cli.command(name="iter-revisions") @click.argument("filename") @click.option('-l', '--limit', type=int) @click.option('-t', '--threads', type=int, default=1) @click.pass_context def iter_revisions(ctx, filename, limit, threads): """Iterate over provided list of revisions and add them to the provenance database.""" from .provenance import FileRevisionIterator from .provenance import RevisionWorker conninfo = ctx.obj["config"]["db"] revisions = FileRevisionIterator(filename, limit=limit) storage = get_storage(**ctx.obj["config"]["storage"]) workers = [] for id in range(threads): worker = RevisionWorker(id, conninfo, storage, revisions) worker.start() workers.append(worker) for worker in workers: worker.join() @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from .provenance import content_find_first conn = ctx.obj["conn"] cursor = conn.cursor(); row = content_find_first(cursor, hash_to_bytes(swhid)) print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') @cli.command(name="find-all") @click.argument("swhid") @click.pass_context def find_all(ctx, swhid): """Find all occurrences of the requested blob.""" from .provenance import content_find_all conn = ctx.obj["conn"] cursor = conn.cursor(); for row in content_find_all(cursor, hash_to_bytes(swhid)): print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') diff --git a/swh/provenance/db/provenance.sql b/swh/provenance/db/provenance.sql index 0002d91..26ce1ea 100644 --- a/swh/provenance/db/provenance.sql +++ b/swh/provenance/db/provenance.sql @@ -1,124 +1,124 @@ -- a Git object ID, i.e., a Git-style salted SHA1 checksum drop domain if exists sha1_git cascade; create domain sha1_git as bytea check (length(value) = 20); -- UNIX path (absolute, relative, individual path component, etc.) drop domain if exists unix_path cascade; create domain unix_path as bytea; drop table if exists content; create table content ( id sha1_git primary key, -- id of the content blob date timestamptz not null -- timestamp of the revision where the blob appears early ); comment on column content.id is 'Content identifier'; comment on column content.date is 'Earliest timestamp for the content (first seen time)'; drop table if exists content_early_in_rev; create table content_early_in_rev ( blob sha1_git not null, -- id of the content blob rev sha1_git not null, -- id of the revision where the blob appears for the first time path unix_path not null, -- path to the content relative to the revision root directory primary key (blob, rev, path) -- foreign key (blob) references content (id), -- foreign key (rev) references revision (id) ); comment on column content_early_in_rev.blob is 'Content identifier'; comment on column content_early_in_rev.rev is 'Revision identifier'; comment on column content_early_in_rev.path is 'Path to content in revision'; drop table if exists content_in_dir; create table content_in_dir ( blob sha1_git not null, -- id of the content blob dir sha1_git not null, -- id of the directory contaning the blob path unix_path not null, -- path name relative to its parent on the isochrone frontier primary key (blob, dir, path) -- foreign key (blob) references content (id), -- foreign key (dir) references directory (id) ); comment on column content_in_dir.blob is 'Content identifier'; comment on column content_in_dir.dir is 'Directory identifier'; --- comment on column content_early_in_rev.path is 'Path to content in directory'; +comment on column content_in_dir.path is 'Path to content in directory'; drop table if exists directory; create table directory ( id sha1_git primary key, -- id of the directory appearing in an isochrone inner frontier date timestamptz not null -- max timestamp among those of the directory children's ); comment on column directory.id is 'Directory identifier'; comment on column directory.date is 'Latest timestamp for the content in the directory'; drop table if exists directory_in_rev; create table directory_in_rev ( dir sha1_git not null, -- id of the directory appearing in the revision rev sha1_git not null, -- id of the revision containing the directory path unix_path not null, -- path to the directory relative to the revision root directory primary key (dir, rev, path) -- foreign key (dir) references directory (id), -- foreign key (rev) references revision (id) ); comment on column directory_in_rev.dir is 'Directory identifier'; comment on column directory_in_rev.rev is 'Revision identifier'; comment on column directory_in_rev.path is 'Path to directory in revision'; drop table if exists origin; create table origin ( id bigserial primary key, -- id of the origin url unix_path unique -- url of the origin ); comment on column origin.id is 'Origin internal identifier'; comment on column origin.url is 'URL of the origin'; drop table if exists revision; create table revision ( id sha1_git primary key, -- id of the revision date timestamptz not null, -- timestamp of the revision org bigint -- id of the prefered origin ); comment on column revision.id is 'Revision identifier'; comment on column revision.date is 'Revision timestamp'; comment on column revision.org is 'Prefered origin for the revision'; drop table if exists revision_before_rev; create table revision_before_rev ( prev sha1_git not null, -- id of the source revision next sha1_git not null, -- id of the destination revision primary key (prev, next) ); comment on column revision_before_rev.prev is 'Source revision identifier'; comment on column revision_before_rev.next is 'Destination revision identifier'; drop table if exists revision_in_org; create table revision_in_org ( rev sha1_git not null, -- id of the revision poined by the origin org bigint not null, -- id of the origin that points to the revision primary key (rev, org) ); comment on column revision_in_org.rev is 'Revision identifier'; comment on column revision_in_org.org is 'Origin identifier'; diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 599657c..cdc1f37 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,348 +1,395 @@ import logging import os import psycopg2 -import time import threading from .db_utils import ( adapt_conn, execute_sql ) from .model import ( DirectoryEntry, FileEntry, TreeEntry, Tree ) from .revision import ( RevisionEntry, RevisionIterator, ArchiveRevisionIterator, FileRevisionIterator ) from datetime import datetime from pathlib import PosixPath from swh.core.db import db_utils # TODO: remove this in favour of local db_utils module from swh.model.hashutil import hash_to_hex from swh.storage.interface import StorageInterface def create_database( conn: psycopg2.extensions.connection, conninfo: str, name: str ): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) # Create new database dropping previous one if exists cursor = conn.cursor(); cursor.execute(f'''DROP DATABASE IF EXISTS {name}''') cursor.execute(f'''CREATE DATABASE {name};'''); conn.close() # Reconnect to server selecting newly created database to add tables conn = db_utils.connect_to_conninfo(os.path.join(conninfo, name)) adapt_conn(conn) dir = os.path.dirname(os.path.realpath(__file__)) execute_sql(conn, os.path.join(dir, 'db/provenance.sql')) def revision_add( cursor: psycopg2.extensions.cursor, storage: StorageInterface, revision: RevisionEntry, id : int ): logging.info(f'Thread {id} - Processing revision {hash_to_hex(revision.swhid)} (timestamp: {revision.timestamp})') # Processed content starting from the revision's root directory directory = Tree(storage, revision.directory).root revision_process_directory(cursor, revision, directory, directory.name) # Add current revision to the compact DB cursor.execute('INSERT INTO revision VALUES (%s,%s, NULL)', (revision.swhid, revision.timestamp)) def content_find_first( cursor: psycopg2.extensions.cursor, swhid: str ): logging.info(f'Retrieving first occurrence of content {hash_to_hex(swhid)}') cursor.execute('''SELECT blob, rev, date, path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1''', (swhid,)) return cursor.fetchone() def content_find_all( cursor: psycopg2.extensions.cursor, swhid: str ): logging.info(f'Retrieving all occurrences of content {hash_to_hex(swhid)}') cursor.execute('''(SELECT blob, rev, date, path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s) UNION (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path FROM (SELECT content_in_dir.blob, directory_in_rev.rev, CASE directory_in_rev.path WHEN '.' THEN content_in_dir.path ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path END AS path FROM content_in_dir JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir WHERE content_in_dir.blob=%s) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev) ORDER BY date, rev, path''', (swhid, swhid)) # POSTGRESQL EXPLAIN yield from cursor.fetchall() ################################################################################ ################################################################################ ################################################################################ def normalize(path: PosixPath) -> PosixPath: spath = str(path) if spath.startswith('./'): return PosixPath(spath[2:]) return path def content_get_early_timestamp( cursor: psycopg2.extensions.cursor, + cache: dict, blob: FileEntry ): logging.debug(f'Getting content {hash_to_hex(blob.swhid)} early timestamp') - start = time.perf_counter_ns() - cursor.execute('SELECT date FROM content WHERE id=%s', (blob.swhid,)) - row = cursor.fetchone() - stop = time.perf_counter_ns() - logging.debug(f' Time elapsed: {stop-start}ns') - return row[0] if row is not None else None + if blob.swhid in cache['content'].keys(): + return cache['content'][blob.swhid] + else: + cursor.execute('SELECT date FROM content WHERE id=%s', + (blob.swhid,)) + row = cursor.fetchone() + return row[0] if row is not None else None def content_set_early_timestamp( cursor: psycopg2.extensions.cursor, + cache: dict, blob: FileEntry, timestamp: datetime ): logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.swhid)} (timestamp: {timestamp})') - start = time.perf_counter_ns() - cursor.execute('''INSERT INTO content VALUES (%s,%s) - ON CONFLICT (id) DO UPDATE SET date=%s''', - (blob.swhid, timestamp, timestamp)) - stop = time.perf_counter_ns() - logging.debug(f' Time elapsed: {stop-start}ns') + # cursor.execute('''INSERT INTO content VALUES (%s,%s) + # ON CONFLICT (id) DO UPDATE SET date=%s''', + # (blob.swhid, timestamp, timestamp)) + cache['content'][blob.swhid] = timestamp def content_add_to_directory( cursor: psycopg2.extensions.cursor, + cache: dict, directory: DirectoryEntry, blob: FileEntry, prefix: PosixPath ): logging.debug(f'NEW occurrence of content {hash_to_hex(blob.swhid)} in directory {hash_to_hex(directory.swhid)} (path: {prefix / blob.name})') - start = time.perf_counter_ns() - cursor.execute('INSERT INTO content_in_dir VALUES (%s,%s,%s)', - (blob.swhid, directory.swhid, bytes(normalize(prefix / blob.name)))) - stop = time.perf_counter_ns() - logging.debug(f' Time elapsed: {stop-start}ns') + # cursor.execute('INSERT INTO content_in_dir VALUES (%s,%s,%s)', + # (blob.swhid, directory.swhid, bytes(normalize(prefix / blob.name)))) + cache['content_in_dir'].append( + (blob.swhid, directory.swhid, bytes(normalize(prefix / blob.name))) + ) def content_add_to_revision( cursor: psycopg2.extensions.cursor, + cache: dict, revision: RevisionEntry, blob: FileEntry, prefix: PosixPath ): logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.swhid)} in revision {hash_to_hex(revision.swhid)} (path: {prefix / blob.name})') - start = time.perf_counter_ns() - cursor.execute('INSERT INTO content_early_in_rev VALUES (%s,%s,%s)', - (blob.swhid, revision.swhid, bytes(normalize(prefix / blob.name)))) - stop = time.perf_counter_ns() - logging.debug(f' Time elapsed: {stop-start}ns') + # cursor.execute('INSERT INTO content_early_in_rev VALUES (%s,%s,%s)', + # (blob.swhid, revision.swhid, bytes(normalize(prefix / blob.name)))) + cache['content_early_in_rev'].append( + (blob.swhid, revision.swhid, bytes(normalize(prefix / blob.name))) + ) def directory_get_early_timestamp( cursor: psycopg2.extensions.cursor, + cache: dict, directory: DirectoryEntry ): logging.debug(f'Getting directory {hash_to_hex(directory.swhid)} early timestamp') - start = time.perf_counter_ns() - cursor.execute('SELECT date FROM directory WHERE id=%s', (directory.swhid,)) - row = cursor.fetchone() - stop = time.perf_counter_ns() - logging.debug(f' Time elapsed: {stop-start}ns') - return row[0] if row is not None else None + if directory.swhid in cache['directory'].keys(): + return cache['directory'][directory.swhid] + else: + cursor.execute('SELECT date FROM directory WHERE id=%s', + (directory.swhid,)) + row = cursor.fetchone() + return row[0] if row is not None else None def directory_set_early_timestamp( cursor: psycopg2.extensions.cursor, + cache: dict, directory: DirectoryEntry, timestamp: datetime ): logging.debug(f'EARLY occurrence of directory {hash_to_hex(directory.swhid)} on the ISOCHRONE FRONTIER (timestamp: {timestamp})') - start = time.perf_counter_ns() - cursor.execute('''INSERT INTO directory VALUES (%s,%s) - ON CONFLICT (id) DO UPDATE SET date=%s''', - (directory.swhid, timestamp, timestamp)) - stop = time.perf_counter_ns() - logging.debug(f' Time elapsed: {stop-start}ns') + # cursor.execute('''INSERT INTO directory VALUES (%s,%s) + # ON CONFLICT (id) DO UPDATE SET date=%s''', + # (directory.swhid, timestamp, timestamp)) + cache['directory'][directory.swhid] = timestamp def directory_add_to_revision( cursor: psycopg2.extensions.cursor, + cache: dict, revision: RevisionEntry, directory: DirectoryEntry, path: PosixPath ): logging.debug(f'NEW occurrence of directory {hash_to_hex(directory.swhid)} on the ISOCHRONE FRONTIER of revision {hash_to_hex(revision.swhid)} (path: {path})') - start = time.perf_counter_ns() - cursor.execute('INSERT INTO directory_in_rev VALUES (%s,%s,%s)', - (directory.swhid, revision.swhid, bytes(normalize(path)))) - stop = time.perf_counter_ns() - logging.debug(f' Time elapsed: {stop-start}ns') + # cursor.execute('INSERT INTO directory_in_rev VALUES (%s,%s,%s)', + # (directory.swhid, revision.swhid, bytes(normalize(path)))) + cache['directory_in_rev'].append( + (directory.swhid, revision.swhid, bytes(normalize(path))) + ) def directory_process_content( cursor: psycopg2.extensions.cursor, + cache: dict, directory: DirectoryEntry, relative: DirectoryEntry, prefix: PosixPath ): stack = [(directory, relative, prefix)] while stack: directory, relative, prefix = stack.pop() for child in iter(directory): if isinstance(child, FileEntry): # Add content to the relative directory with the computed prefix. - content_add_to_directory(cursor, relative, child, prefix) + content_add_to_directory(cursor, cache, relative, child, prefix) else: # Recursively walk the child directory. # directory_process_content(cursor, child, relative, prefix / child.name) stack.append((child, relative, prefix / child.name)) def revision_process_directory( cursor: psycopg2.extensions.cursor, revision: RevisionEntry, directory: DirectoryEntry, path: PosixPath ): stack = [(revision, directory, path)] + cache = { + "content": dict(), + "content_early_in_rev": list(), + "content_in_dir": list(), + "directory": dict(), + "directory_in_rev": list() + } - # TODO: try to cache the info and psotpone inserts while stack: revision, directory, path = stack.pop() - timestamp = directory_get_early_timestamp(cursor, directory) - logging.debug(timestamp) + timestamp = directory_get_early_timestamp(cursor, cache, directory) if timestamp is None: # The directory has never been seen on the isochrone graph of a # revision. Its children should be checked. - timestamps = [] + children = [] for child in iter(directory): - logging.debug(f'child {child}') if isinstance(child, FileEntry): - timestamps.append(content_get_early_timestamp(cursor, child)) + children.append((child, content_get_early_timestamp(cursor, cache, child))) else: - timestamps.append(directory_get_early_timestamp(cursor, child)) - logging.debug(timestamps) + children.append((child, directory_get_early_timestamp(cursor, cache, child))) + timestamps = [x[1] for x in children] + # timestamps = list(zip(*children))[1] if timestamps != [] and None not in timestamps and max(timestamps) <= revision.timestamp: # The directory belongs to the isochrone frontier of the current # revision, and this is the first time it appears as such. - directory_set_early_timestamp(cursor, directory, max(timestamps)) - directory_add_to_revision(cursor, revision, directory, path) - directory_process_content(cursor, directory=directory, relative=directory, prefix=PosixPath('.')) + directory_set_early_timestamp(cursor, cache, directory, max(timestamps)) + directory_add_to_revision(cursor, cache, revision, directory, path) + directory_process_content(cursor, cache, directory=directory, relative=directory, prefix=PosixPath('.')) else: # The directory is not on the isochrone frontier of the current # revision. Its child nodes should be analyzed. # revision_process_content(cursor, revision, directory, path) ################################################################ - for child in iter(directory): + for child, timestamp in children: if isinstance(child, FileEntry): - # TODO: store info from previous iterator to avoid quering twice! - timestamp = content_get_early_timestamp(cursor, child) if timestamp is None or revision.timestamp < timestamp: - content_set_early_timestamp(cursor, child, revision.timestamp) - content_add_to_revision(cursor, revision, child, path) + content_set_early_timestamp(cursor, cache, child, revision.timestamp) + content_add_to_revision(cursor, cache, revision, child, path) else: # revision_process_directory(cursor, revision, child, path / child.name) stack.append((revision, child, path / child.name)) ################################################################ elif revision.timestamp < timestamp: # The directory has already been seen on the isochrone frontier of a # revision, but current revision is earlier. Its children should be # updated. # revision_process_content(cursor, revision, directory, path) #################################################################### for child in iter(directory): if isinstance(child, FileEntry): - timestamp = content_get_early_timestamp(cursor, child) + timestamp = content_get_early_timestamp(cursor, cache, child) if timestamp is None or revision.timestamp < timestamp: - content_set_early_timestamp(cursor, child, revision.timestamp) - content_add_to_revision(cursor, revision, child, path) + content_set_early_timestamp(cursor, cache, child, revision.timestamp) + content_add_to_revision(cursor, cache, revision, child, path) else: # revision_process_directory(cursor, revision, child, path / child.name) stack.append((revision, child, path / child.name)) #################################################################### - directory_set_early_timestamp(cursor, directory, revision.timestamp) + directory_set_early_timestamp(cursor, cache, directory, revision.timestamp) else: # The directory has already been seen on the isochrone frontier of an # earlier revision. Just add it to the current revision. - directory_add_to_revision(cursor, revision, directory, path) + directory_add_to_revision(cursor, cache, revision, directory, path) + + perform_insertions(cursor, cache) + + +def perform_insertions( + cursor: psycopg2.extensions.cursor, + cache: dict +): + psycopg2.extras.execute_values( + cursor, + '''INSERT INTO content(id, date) VALUES %s + ON CONFLICT (id) DO UPDATE SET date=excluded.date''', + cache['content'].items() + ) + + psycopg2.extras.execute_values( + cursor, + '''INSERT INTO content_early_in_rev VALUES %s''', + cache['content_early_in_rev'] + ) + + psycopg2.extras.execute_values( + cursor, + '''INSERT INTO content_in_dir VALUES %s''', + cache['content_in_dir'] + ) + + psycopg2.extras.execute_values( + cursor, + '''INSERT INTO directory(id, date) VALUES %s + ON CONFLICT (id) DO UPDATE SET date=excluded.date''', + cache['directory'].items() + ) + + psycopg2.extras.execute_values( + cursor, + '''INSERT INTO directory_in_rev VALUES %s''', + cache['directory_in_rev'] + ) ################################################################################ ################################################################################ ################################################################################ class RevisionWorker(threading.Thread): def __init__( self, - id : int, - conninfo : str, - storage : StorageInterface, - revisions : RevisionIterator + id: int, + conninfo: str, + storage: StorageInterface, + revisions: RevisionIterator ): super().__init__() self.id = id self.conninfo = conninfo self.revisions = revisions self.storage = storage def run(self): conn = db_utils.connect_to_conninfo(self.conninfo) adapt_conn(conn) with conn.cursor() as cursor: while True: processed = False revision = self.revisions.next() if revision is None: break while not processed: try: revision_add(cursor, self.storage, revision, self.id) conn.commit() processed = True except psycopg2.DatabaseError: logging.warning(f'Thread {self.id} - Failed to process revision {hash_to_hex(revision.swhid)} (timestamp: {revision.timestamp})') conn.rollback() # TODO: maybe serialize and auto-merge transations. # The only conflicts are on: # - content: we keep the earliest timestamp # - directory: we keep the earliest timestamp # - content_in_dir: there should be just duplicated entries. except Exception as error: logging.warning(f'Exection: {error}') conn.close()