diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index e69de29..c227040 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -0,0 +1,24 @@ +from .archive import ArchiveInterface +from .provenance import ProvenanceInterface +from .storage.archive import ArchiveStorage +from .postgresql.archive import ArchivePostgreSQL +from .postgresql.db_utils import connect +from .postgresql.provenance import ProvenancePostgreSQL + + +def get_archive(cls: str, **kwargs) -> ArchiveInterface: + if cls == "api": + return ArchiveStorage(**kwargs["storage"]) + elif cls == "ps": + conn = connect(kwargs["db"]) + return ArchivePostgreSQL(conn) + else: + raise NotImplementedError + + +def get_provenance(cls: str, **kwargs) -> ProvenanceInterface: + if cls == "ps": + conn = connect(kwargs["db"]) + return ProvenancePostgreSQL(conn) + else: + raise NotImplementedError diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py index bf7693f..bde9d5e 100644 --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -1,113 +1,27 @@ -import psycopg2 - -from .db_utils import connect - from typing import List -from swh.storage import get_storage - class ArchiveInterface: - def __init__(self): + def __init__(self, **kwargs): raise NotImplementedError def directory_ls(self, id: bytes): raise NotImplementedError def iter_origins(self): raise NotImplementedError def iter_origin_visits(self, origin: str): raise NotImplementedError def iter_origin_visit_statuses(self, origin: str, visit: int): raise NotImplementedError def release_get(self, ids: List[bytes]): raise NotImplementedError def revision_get(self, ids: List[bytes]): raise NotImplementedError def snapshot_get_all_branches(self, snapshot: bytes): raise NotImplementedError - - -class ArchiveStorage(ArchiveInterface): - def __init__(self, cls: str, **kwargs): - self.storage = get_storage(cls, **kwargs) - - def directory_ls(self, id: bytes): - # TODO: filter unused fields - yield from self.storage.directory_ls(id) - - def iter_origins(self): - from swh.storage.algos.origin import iter_origins - yield from iter_origins(self.storage) - - def iter_origin_visits(self, origin: str): - from swh.storage.algos.origin import iter_origin_visits - # TODO: filter unused fields - yield from iter_origin_visits(self.storage, origin) - - def iter_origin_visit_statuses(self, origin: str, visit: int): - from swh.storage.algos.origin import iter_origin_visit_statuses - # TODO: filter unused fields - yield from iter_origin_visit_statuses(self.storage, origin, visit) - - def release_get(self, ids: List[bytes]): - # TODO: filter unused fields - yield from self.storage.release_get(ids) - - def revision_get(self, ids: List[bytes]): - # TODO: filter unused fields - yield from self.storage.revision_get(ids) - - def snapshot_get_all_branches(self, snapshot: bytes): - from swh.storage.algos.snapshot import snapshot_get_all_branches - # TODO: filter unused fields - return snapshot_get_all_branches(self.storage, snapshot) - - -class Archive(ArchiveInterface): - def __init__(self, conn: psycopg2.extensions.connection): - self.conn = conn - self.cursor = conn.cursor() - - def directory_ls(self, id: bytes): - self.cursor.execute('''WITH - dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries - FROM directory WHERE id=%s), - ls_d AS (SELECT dir_id, unnest(dir_entries) AS entry_id from dir), - ls_f AS (SELECT dir_id, unnest(file_entries) AS entry_id from dir), - ls_r AS (SELECT dir_id, unnest(rev_entries) AS entry_id from dir) - (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git - FROM ls_d - LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) - UNION - (WITH known_contents AS - (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git - FROM ls_f - LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id - INNER JOIN content c ON e.target=c.sha1_git) - SELECT * FROM known_contents - UNION - (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git - FROM ls_f - LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id - LEFT JOIN skipped_content c ON e.target=c.sha1_git - WHERE NOT EXISTS (SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target))) - ORDER BY name - ''', (id,)) - for row in self.cursor.fetchall(): - yield {'type': row[0], 'target': row[1], 'name': row[2]} - - -def get_archive(cls: str, **kwargs) -> ArchiveInterface: - if cls == "api": - return ArchiveStorage(**kwargs["storage"]) - elif cls == "ps": - conn = connect(kwargs["db"]) - return Archive(conn) - else: - raise NotImplementedError diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 51cc895..1b36fa7 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,193 +1,195 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import (hash_to_bytes, hash_to_hex) # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002" } # "cls": "ps", # "db": { # "host": "db.internal.softwareheritage.org", # "database": "softwareheritage", # "user": "guest" # } }, - "db": { - "host": "localhost", - "database": "new_test", - "user": "postgres", - "password": "postgres" + "provenance": { + "cls": "ps", + "db": { + "host": "localhost", + "database": "new_test", + "user": "postgres", + "password": "postgres" + } } } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP, ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file""", ) @click.option("--profile", default=None) @click.pass_context def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf # ctx.obj["profile"] = profile if profile: import cProfile import atexit print("Profiling...") pr = cProfile.Profile() pr.enable() def exit(): pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="create") @click.option("--name", default=None) @click.pass_context def create(ctx, name): """Create new provenance database.""" - from .db_utils import connect - from .provenance import create_database + from .postgresql.db_utils import connect + from .postgresql.provenance import create_database # Connect to server without selecting a database - conninfo = ctx.obj["config"]["db"] + conninfo = ctx.obj["config"]["provenance"]["db"] database = conninfo.pop('database', None) conn = connect(conninfo) if name is None: name = database create_database(conn, conninfo, name) @cli.command(name="iter-revisions") @click.argument("filename") @click.option('-l', '--limit', type=int) -@click.option('-t', '--threads', type=int, default=1) @click.pass_context -def iter_revisions(ctx, filename, limit, threads): +def iter_revisions(ctx, filename, limit): """Iterate over provided list of revisions and add them to the provenance database.""" - from .archive import get_archive - from .provenance import get_provenance, revision_add + from . import get_archive, get_provenance from .revision import FileRevisionIterator + from .provenance import revision_add archive = get_archive(**ctx.obj["config"]["archive"]) - provenance = get_provenance(ctx.obj["config"]["db"]) + provenance = get_provenance(**ctx.obj["config"]["provenance"]) revisions = FileRevisionIterator(filename, archive, limit=limit) while True: revision = revisions.next() if revision is None: break revision_add(provenance, archive, revision) @cli.command(name="iter-origins") @click.argument("filename") @click.option('-l', '--limit', type=int) #@click.option('-t', '--threads', type=int, default=1) @click.pass_context #def iter_revisions(ctx, filename, limit, threads): def iter_origins(ctx, filename, limit): """Iterate over provided list of revisions and add them to the provenance database.""" - from .archive import get_archive + from . import get_archive, get_provenance from .origin import FileOriginIterator - from .provenance import get_provenance, origin_add + from .provenance import origin_add - provenance = get_provenance(ctx.obj["config"]["db"]) archive = get_archive(**ctx.obj["config"]["archive"]) + provenance = get_provenance(**ctx.obj["config"]["provenance"]) for origin in FileOriginIterator(filename, archive, limit=limit): origin_add(provenance, origin) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from .provenance import get_provenance - provenance = get_provenance(ctx.obj["config"]["db"]) + provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field row = provenance.content_find_first(hash_to_bytes(swhid)) if row is not None: print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') else: print(f'Cannot find a content with the id {swhid}') @cli.command(name="find-all") @click.argument("swhid") @click.pass_context def find_all(ctx, swhid): """Find all occurrences of the requested blob.""" - from .provenance import get_provenance + from swh.provenance import get_provenance - provenance = get_provenance(ctx.obj["config"]["db"]) + provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field for row in provenance.content_find_all(hash_to_bytes(swhid)): print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index 367e27a..3bad168 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,101 +1,98 @@ from .archive import ArchiveInterface from .revision import RevisionEntry from swh.model.model import Origin, ObjectType, TargetType -# from swh.storage.algos.origin import iter_origin_visits, iter_origin_visit_statuses -# from swh.storage.algos.snapshot import snapshot_get_all_branches -# from swh.storage.interface import StorageInterface class OriginEntry: def __init__(self, url, revisions, id=None): self.id = id self.url = url self.revisions = revisions ################################################################################ ################################################################################ class OriginIterator: """Iterator interface.""" def __iter__(self): pass def __next__(self): pass class FileOriginIterator(OriginIterator): """Iterator over origins present in the given CSV file.""" def __init__(self, filename: str, archive: ArchiveInterface, limit: int=None): self.file = open(filename) self.limit = limit # self.mutex = threading.Lock() self.archive = archive def __iter__(self): yield from iterate_statuses( [Origin(url.strip()) for url in self.file], self.archive, self.limit ) class ArchiveOriginIterator: """Iterator over origins present in the given storage.""" def __init__(self, archive: ArchiveInterface, limit: int=None): self.limit = limit # self.mutex = threading.Lock() self.archive = archive def __iter__(self): yield from iterate_statuses( self.archive.iter_origins(), self.archive, self.limit ) def iterate_statuses(origins, archive: ArchiveInterface, limit: int=None): idx = 0 for origin in origins: for visit in archive.iter_origin_visits(origin.url): for status in archive.iter_origin_visit_statuses(origin.url, visit.visit): # TODO: may filter only those whose status is 'full'?? targets = [] releases = [] snapshot = archive.snapshot_get_all_branches(status.snapshot) if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets.append(snapshot.branches[branch].target) elif snapshot.branches[branch].target_type == TargetType.RELEASE: releases.append(snapshot.branches[branch].target) # This is done to keep the query in release_get small, hence avoiding a timeout. limit = 100 for i in range(0, len(releases), limit): for release in archive.release_get(releases[i:i+limit]): if revision is not None: if release.target_type == ObjectType.REVISION: targets.append(release.target) # This is done to keep the query in revision_get small, hence avoiding a timeout. revisions = [] limit = 100 for i in range(0, len(targets), limit): for revision in archive.revision_get(targets[i:i+limit]): if revision is not None: parents = list(map(lambda id: RevisionEntry(archive, id), revision.parents)) revisions.append(RevisionEntry(archive, revision.id, parents=parents)) yield OriginEntry(status.origin, revisions) idx = idx + 1 if idx == limit: return diff --git a/swh/provenance/__init__.py b/swh/provenance/postgresql/__init__.py similarity index 100% copy from swh/provenance/__init__.py copy to swh/provenance/postgresql/__init__.py diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py new file mode 100644 index 0000000..9a320db --- /dev/null +++ b/swh/provenance/postgresql/archive.py @@ -0,0 +1,39 @@ +import psycopg2 + +from ..archive import ArchiveInterface + +from typing import List + + +class ArchivePostgreSQL(ArchiveInterface): + def __init__(self, conn: psycopg2.extensions.connection): + self.conn = conn + self.cursor = conn.cursor() + + def directory_ls(self, id: bytes): + self.cursor.execute('''WITH + dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries + FROM directory WHERE id=%s), + ls_d AS (SELECT dir_id, unnest(dir_entries) AS entry_id from dir), + ls_f AS (SELECT dir_id, unnest(file_entries) AS entry_id from dir), + ls_r AS (SELECT dir_id, unnest(rev_entries) AS entry_id from dir) + (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git + FROM ls_d + LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) + UNION + (WITH known_contents AS + (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git + FROM ls_f + LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id + INNER JOIN content c ON e.target=c.sha1_git) + SELECT * FROM known_contents + UNION + (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git + FROM ls_f + LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id + LEFT JOIN skipped_content c ON e.target=c.sha1_git + WHERE NOT EXISTS (SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target))) + ORDER BY name + ''', (id,)) + for row in self.cursor.fetchall(): + yield {'type': row[0], 'target': row[1], 'name': row[2]} diff --git a/swh/provenance/db_utils.py b/swh/provenance/postgresql/db_utils.py similarity index 100% rename from swh/provenance/db_utils.py rename to swh/provenance/postgresql/db_utils.py diff --git a/swh/provenance/provenance.py b/swh/provenance/postgresql/provenance.py similarity index 63% copy from swh/provenance/provenance.py copy to swh/provenance/postgresql/provenance.py index c8ea228..82a1533 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/postgresql/provenance.py @@ -1,639 +1,424 @@ import itertools import logging import os import psycopg2 import psycopg2.extras -from .archive import ArchiveInterface +from ..model import DirectoryEntry, FileEntry +from ..origin import OriginEntry from .db_utils import connect, execute_sql -from .model import DirectoryEntry, FileEntry -from .origin import OriginEntry -from .revision import RevisionEntry +from ..provenance import ProvenanceInterface +from ..revision import RevisionEntry from datetime import datetime from pathlib import PosixPath from typing import Dict, List from swh.model.hashutil import hash_to_hex def normalize(path: PosixPath) -> PosixPath: spath = str(path) if spath.startswith('./'): return PosixPath(spath[2:]) return path def create_database( conn: psycopg2.extensions.connection, conninfo: dict, name: str ): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) # Create new database dropping previous one if exists cursor = conn.cursor(); cursor.execute(f'''DROP DATABASE IF EXISTS {name}''') cursor.execute(f'''CREATE DATABASE {name}'''); conn.close() # Reconnect to server selecting newly created database to add tables conninfo['database'] = name conn = connect(conninfo) sqldir = os.path.dirname(os.path.realpath(__file__)) - execute_sql(conn, os.path.join(sqldir, 'db/provenance.sql')) + execute_sql(conn, os.path.join(sqldir, 'provenance.sql')) ################################################################################ ################################################################################ ################################################################################ -class ProvenanceInterface: +class ProvenancePostgreSQL(ProvenanceInterface): # TODO: turn this into a real interface and move PostgreSQL implementation # to a separate file def __init__(self, conn: psycopg2.extensions.connection): # TODO: consider addind a mutex for thread safety self.conn = conn self.cursor = self.conn.cursor() self.insert_cache = None self.select_cache = None self.clear_caches() def clear_caches(self): self.insert_cache = { "content": dict(), "content_early_in_rev": list(), "content_in_dir": list(), "directory": dict(), "directory_in_rev": list(), "revision": dict() } self.select_cache = { "content": dict(), "directory": dict(), "revision": dict() } def commit(self): result = False try: self.insert_all() self.conn.commit() result = True except psycopg2.DatabaseError: # Database error occurred, rollback all changes self.conn.rollback() # TODO: maybe serialize and auto-merge transations. # The only conflicts are on: # - content: we keep the earliest date # - directory: we keep the earliest date # - content_in_dir: there should be just duplicated entries. except Exception as error: # Unexpected error occurred, rollback all changes and log message logging.warning(f'Unexpected error: {error}') self.conn.rollback() finally: self.clear_caches() return result def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: PosixPath ): # logging.debug(f'NEW occurrence of content {hash_to_hex(blob.id)} in directory {hash_to_hex(directory.id)} (path: {prefix / blob.name})') # self.cursor.execute('''INSERT INTO content_in_dir VALUES (%s,%s,%s)''', # (blob.id, directory.id, bytes(normalize(prefix / blob.name)))) self.insert_cache['content_in_dir'].append( (blob.id, directory.id, bytes(normalize(prefix / blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: PosixPath ): # logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} in revision {hash_to_hex(revision.id)} (path: {prefix / blob.name})') # self.cursor.execute('''INSERT INTO content_early_in_rev VALUES (%s,%s,%s)''', # (blob.id, revision.id, bytes(normalize(prefix / blob.name)))) self.insert_cache['content_early_in_rev'].append( (blob.id, revision.id, bytes(normalize(prefix / blob.name))) ) def content_find_first(self, blobid: str): logging.info(f'Retrieving first occurrence of content {hash_to_hex(blobid)}') self.cursor.execute('''SELECT blob, rev, date, path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1''', (blobid,)) return self.cursor.fetchone() def content_find_all(self, blobid: str): logging.info(f'Retrieving all occurrences of content {hash_to_hex(blobid)}') self.cursor.execute('''(SELECT blob, rev, date, path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s) UNION (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path FROM (SELECT content_in_dir.blob, directory_in_rev.rev, CASE directory_in_rev.path WHEN '.' THEN content_in_dir.path ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path END AS path FROM content_in_dir JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir WHERE content_in_dir.blob=%s) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev) ORDER BY date, rev, path''', (blobid, blobid)) # POSTGRESQL EXPLAIN yield from self.cursor.fetchall() def content_get_early_date(self, blob: FileEntry) -> datetime: logging.debug(f'Getting content {hash_to_hex(blob.id)} early date') # First check if the date is being modified by current transection. date = self.insert_cache['content'].get(blob.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache['content'].get(blob.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute('''SELECT date FROM content WHERE id=%s''', (blob.id,)) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache['content'][blob.id] = date return date def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: dates = {} pending = [] for blob in blobs: # First check if the date is being modified by current transection. date = self.insert_cache['content'].get(blob.id, None) if date is not None: dates[blob.id] = date else: # If not, check whether it's been query before date = self.select_cache['content'].get(blob.id, None) if date is not None: dates[blob.id] = date else: pending.append(blob.id) if pending: # Otherwise, query the database and cache the values values = ', '.join(itertools.repeat('%s', len(pending))) self.cursor.execute(f'''SELECT id, date FROM content WHERE id IN ({values})''', tuple(pending)) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache['content'][row[0]] = row[1] return dates def content_set_early_date(self, blob: FileEntry, date: datetime): # logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} (timestamp: {date})') # self.cursor.execute('''INSERT INTO content VALUES (%s,%s) # ON CONFLICT (id) DO UPDATE SET date=%s''', # (blob.id, date, date)) self.insert_cache['content'][blob.id] = date def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: PosixPath ): # logging.debug(f'NEW occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER of revision {hash_to_hex(revision.id)} (path: {path})') # self.cursor.execute('''INSERT INTO directory_in_rev VALUES (%s,%s,%s)''', # (directory.id, revision.id, bytes(normalize(path)))) self.insert_cache['directory_in_rev'].append( (directory.id, revision.id, bytes(normalize(path))) ) def directory_get_date_in_isochrone_frontier(self, directory: DirectoryEntry) -> datetime: # logging.debug(f'Getting directory {hash_to_hex(directory.id)} early date') # First check if the date is being modified by current transection. date = self.insert_cache['directory'].get(directory.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache['directory'].get(directory.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute('''SELECT date FROM directory WHERE id=%s''', (directory.id,)) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache['directory'][directory.id] = date return date def directory_get_early_dates(self, dirs: List[DirectoryEntry]) -> Dict[bytes, datetime]: dates = {} pending = [] for dir in dirs: # First check if the date is being modified by current transection. date = self.insert_cache['directory'].get(dir.id, None) if date is not None: dates[dir.id] = date else: # If not, check whether it's been query before date = self.select_cache['directory'].get(dir.id, None) if date is not None: dates[dir.id] = date else: pending.append(dir.id) if pending: # Otherwise, query the database and cache the values values = ', '.join(itertools.repeat('%s', len(pending))) self.cursor.execute(f'''SELECT id, date FROM directory WHERE id IN ({values})''', tuple(pending)) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache['directory'][row[0]] = row[1] return dates def directory_set_date_in_isochrone_frontier(self, directory: DirectoryEntry, date: datetime): # logging.debug(f'EARLY occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER (timestamp: {date})') # self.cursor.execute('''INSERT INTO directory VALUES (%s,%s) # ON CONFLICT (id) DO UPDATE SET date=%s''', # (directory.id, date, date)) self.insert_cache['directory'][directory.id] = date def insert_all(self): + # self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + # Performe insertions with cached information psycopg2.extras.execute_values( self.cursor, '''INSERT INTO content(id, date) VALUES %s ON CONFLICT (id) DO UPDATE SET date=EXCLUDED.date''', # TODO: keep earliest date on conflict # '''INSERT INTO content(id, date) VALUES %s # ON CONFLICT (id) DO # UPDATE SET date = CASE # WHEN EXCLUDED.date < content.date # THEN EXCLUDED.date # ELSE content.date # END''', self.insert_cache['content'].items() ) psycopg2.extras.execute_values( self.cursor, '''INSERT INTO content_early_in_rev VALUES %s ON CONFLICT DO NOTHING''', self.insert_cache['content_early_in_rev'] ) psycopg2.extras.execute_values( self.cursor, '''INSERT INTO content_in_dir VALUES %s ON CONFLICT DO NOTHING''', self.insert_cache['content_in_dir'] ) psycopg2.extras.execute_values( self.cursor, '''INSERT INTO directory(id, date) VALUES %s ON CONFLICT (id) DO UPDATE SET date=EXCLUDED.date''', # TODO: keep earliest date on conflict # '''INSERT INTO directory(id, date) VALUES %s # ON CONFLICT (id) DO # UPDATE SET date = CASE # WHEN EXCLUDED.date < directory.date # THEN EXCLUDED.date # ELSE directory.date # END''', self.insert_cache['directory'].items() ) psycopg2.extras.execute_values( self.cursor, '''INSERT INTO directory_in_rev VALUES %s ON CONFLICT DO NOTHING''', self.insert_cache['directory_in_rev'] ) psycopg2.extras.execute_values( self.cursor, '''INSERT INTO revision(id, date) VALUES %s ON CONFLICT (id) DO UPDATE SET date=EXCLUDED.date''', # TODO: keep earliest date on conflict # '''INSERT INTO revision(id, date) VALUES %s # ON CONFLICT (id) DO # UPDATE SET date = CASE # WHEN EXCLUDED.date < revision.date # THEN EXCLUDED.date # ELSE revision.date # END''', self.insert_cache['revision'].items() ) + # self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_DEFAULT) + def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Check if current origin is already known and retrieve its internal id. self.cursor.execute('''SELECT id FROM origin WHERE url=%s''', (origin.url,)) row = self.cursor.fetchone() if row is None: # If the origin is seen for the first time, current revision is # the prefered one. self.cursor.execute('''INSERT INTO origin (url) VALUES (%s) RETURNING id''', (origin.url,)) return self.cursor.fetchone()[0] else: return row[0] else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB self.insert_cache['revision'][revision.id] = revision.date def revision_add_before_revision(self, relative: RevisionEntry, revision: RevisionEntry): self.cursor.execute('''INSERT INTO revision_before_rev VALUES (%s,%s)''', (revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.cursor.execute('''INSERT INTO revision_in_org VALUES (%s,%s) ON CONFLICT DO NOTHING''', (revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> datetime: # logging.debug(f'Getting revision {hash_to_hex(revision.id)} early date') # First check if the date is being modified by current transection. date = self.insert_cache['revision'].get(revision.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache['revision'].get(revision.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute('''SELECT date FROM revision WHERE id=%s''', (revision.id,)) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache['revision'][revision.id] = date return date def revision_get_prefered_origin(self, revision: RevisionEntry) -> int: self.cursor.execute('''SELECT COALESCE(org,0) FROM revision WHERE id=%s''', (revision.id,)) row = self.cursor.fetchone() # None means revision is not in database # 0 means revision has no prefered origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: RevisionEntry) -> bool: self.cursor.execute('''SELECT 1 FROM revision_before_rev WHERE prev=%s''', (revision.id,)) return self.cursor.fetchone() is not None def revision_set_prefered_origin(self, origin: OriginEntry, revision: RevisionEntry): self.cursor.execute('''UPDATE revision SET org=%s WHERE id=%s''', (origin.id, revision.id)) def revision_visited(self, revision: RevisionEntry) -> bool: self.cursor.execute('''SELECT 1 FROM revision_in_org WHERE rev=%s''', (revision.id,)) return self.cursor.fetchone() is not None - - -################################################################################ -################################################################################ -################################################################################ - -def directory_process_content( - provenance: ProvenanceInterface, - directory: DirectoryEntry, - relative: DirectoryEntry, - prefix: PosixPath -): - stack = [(directory, prefix)] - - while stack: - dir, path = stack.pop() - - for child in iter(dir): - if isinstance(child, FileEntry): - # Add content to the relative directory with the computed path. - provenance.content_add_to_directory(relative, child, path) - else: - # Recursively walk the child directory. - stack.append((child, path / child.name)) - - -def origin_add( - provenance: ProvenanceInterface, - origin: OriginEntry -): - origin.id = provenance.origin_get_id(origin) - - for revision in origin.revisions: - # logging.info(f'Processing revision {hash_to_hex(revision.id)} from origin {origin.url}') - origin_add_revision(provenance, origin, revision) - - # Commit after each revision - provenance.commit() # TODO: verify this! - - -def origin_add_revision( - provenance: ProvenanceInterface, - origin: OriginEntry, - revision: RevisionEntry -): - stack = [(None, revision)] - - while stack: - relative, rev = stack.pop() - - # Check if current revision has no prefered origin and update if necessary. - prefered = provenance.revision_get_prefered_origin(rev) - # logging.debug(f'Prefered origin for revision {hash_to_hex(rev.id)}: {prefered}') - - if prefered is None: - provenance.revision_set_prefered_origin(origin, rev) - ######################################################################## - - if relative is None: - # This revision is pointed directly by the origin. - visited = provenance.revision_visited(rev) - logging.debug(f'Revision {hash_to_hex(rev.id)} in origin {origin.id}: {visited}') - - logging.debug(f'Adding revision {hash_to_hex(rev.id)} to origin {origin.id}') - provenance.revision_add_to_origin(origin, rev) - - if not visited: - stack.append((rev, rev)) - - else: - # This revision is a parent of another one in the history of the - # relative revision. - for parent in iter(rev): - visited = provenance.revision_visited(parent) - logging.debug(f'Parent {hash_to_hex(parent.id)} in some origin: {visited}') - - if not visited: - # The parent revision has never been seen before pointing - # directly to an origin. - known = provenance.revision_in_history(parent) - logging.debug(f'Revision {hash_to_hex(parent.id)} before revision: {known}') - - if known: - # The parent revision is already known in some other - # revision's history. We should point it directly to - # the origin and (eventually) walk its history. - logging.debug(f'Adding revision {hash_to_hex(parent.id)} directly to origin {origin.id}') - stack.append((None, parent)) - else: - # The parent revision was never seen before. We should - # walk its history and associate it with the same - # relative revision. - logging.debug(f'Adding parent revision {hash_to_hex(parent.id)} to revision {hash_to_hex(relative.id)}') - provenance.revision_add_before_revision(relative, parent) - stack.append((relative, parent)) - else: - # The parent revision already points to an origin, so its - # history was properly processed before. We just need to - # make sure it points to the current origin as well. - logging.debug(f'Adding parent revision {hash_to_hex(parent.id)} to origin {origin.id}') - provenance.revision_add_to_origin(origin, parent) - - -def revision_add( - provenance: ProvenanceInterface, - archive: ArchiveInterface, - revision: RevisionEntry -): - # Processed content starting from the revision's root directory - date = provenance.revision_get_early_date(revision) - if date is None or revision.date < date: - provenance.revision_add(revision) - revision_process_content( - provenance, - revision, - DirectoryEntry(archive, revision.root, PosixPath('.')) - ) - return provenance.commit() - - -def revision_process_content( - provenance: ProvenanceInterface, - revision: RevisionEntry, - directory: DirectoryEntry -): - date = provenance.directory_get_date_in_isochrone_frontier(directory) - stack = [(directory, date, directory.name)] - # stack = [(directory, directory.name)] - - while stack: - dir, date, path = stack.pop() - # dir, path = stack.pop() - # date = provenance.directory_get_date_in_isochrone_frontier(dir) - - if date is None: - # The directory has never been seen on the isochrone graph of a - # revision. Its children should be checked. - blobs = [child for child in iter(dir) if isinstance(child, FileEntry)] - dirs = [child for child in iter(dir) if isinstance(child, DirectoryEntry)] - - blobdates = provenance.content_get_early_dates(blobs) - # TODO: this will only return timestamps for diretories that were - # seen in an isochrone frontier. But a directory may only cointain a - # subdirectory whose contents are already known. Which one should be - # added to the frontier then (the root or the sub directory)? - dirdates = provenance.directory_get_early_dates(dirs) - - if blobs + dirs: - dates = list(blobdates.values()) + list(dirdates.values()) - if None in dates: print(dates) - - if len(dates) == len(blobs) + len(dirs) and max(dates) <= revision.date: - # The directory belongs to the isochrone frontier of the - # current revision, and this is the first time it appears - # as such. - provenance.directory_set_date_in_isochrone_frontier(dir, max(dates)) - provenance.directory_add_to_revision(revision, dir, path) - directory_process_content( - provenance, - directory=dir, - relative=dir, - prefix=PosixPath('.') - ) - - else: - # The directory is not on the isochrone frontier of the - # current revision. Its child nodes should be analyzed. - ############################################################ - for child in blobs: - date = blobdates.get(child.id, None) - # date = provenance.content_get_early_date(child) - if date is None or revision.date < date: - provenance.content_set_early_date(child, revision.date) - provenance.content_add_to_revision(revision, child, path) - - for child in dirs: - date = dirdates.get(child.id, None) - # date = provenance.directory_get_date_in_isochrone_frontier(child) - stack.append((child, date, path / child.name)) - # stack.append((child, path / child.name)) - ############################################################ - - elif revision.date < date: - # The directory has already been seen on the isochrone frontier of - # a revision, but current revision is earlier. Its children should - # be updated. - blobs = [child for child in iter(dir) if isinstance(child, FileEntry)] - dirs = [child for child in iter(dir) if isinstance(child, DirectoryEntry)] - - blobdates = provenance.content_get_early_dates(blobs) - dirdates = provenance.directory_get_early_dates(dirs) - - #################################################################### - for child in blobs: - # date = blobdates.get(child.id, None) - date = provenance.content_get_early_date(child) - if date is None or revision.date < date: - provenance.content_set_early_date(child, revision.date) - provenance.content_add_to_revision(revision, child, path) - - for child in dirs: - # date = dirdates.get(child.id, None) - date = provenance.directory_get_date_in_isochrone_frontier(child) - stack.append((child, date, path / child.name)) - # stack.append((child, path / child.name)) - #################################################################### - - provenance.directory_set_date_in_isochrone_frontier(dir, revision.date) - - else: - # The directory has already been seen on the isochrone frontier of - # an earlier revision. Just add it to the current revision. - provenance.directory_add_to_revision(revision, dir, path) - - -def get_provenance(conninfo: dict) -> ProvenanceInterface: - # TODO: improve this methos to allow backend selection - conn = connect(conninfo) - return ProvenanceInterface(conn) diff --git a/swh/provenance/db/provenance.sql b/swh/provenance/postgresql/provenance.sql similarity index 100% rename from swh/provenance/db/provenance.sql rename to swh/provenance/postgresql/provenance.sql diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index c8ea228..b158415 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,639 +1,332 @@ -import itertools import logging -import os -import psycopg2 -import psycopg2.extras from .archive import ArchiveInterface -from .db_utils import connect, execute_sql from .model import DirectoryEntry, FileEntry from .origin import OriginEntry from .revision import RevisionEntry from datetime import datetime from pathlib import PosixPath from typing import Dict, List from swh.model.hashutil import hash_to_hex -def normalize(path: PosixPath) -> PosixPath: - spath = str(path) - if spath.startswith('./'): - return PosixPath(spath[2:]) - return path - - -def create_database( - conn: psycopg2.extensions.connection, - conninfo: dict, - name: str -): - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - - # Create new database dropping previous one if exists - cursor = conn.cursor(); - cursor.execute(f'''DROP DATABASE IF EXISTS {name}''') - cursor.execute(f'''CREATE DATABASE {name}'''); - conn.close() - - # Reconnect to server selecting newly created database to add tables - conninfo['database'] = name - conn = connect(conninfo) - - sqldir = os.path.dirname(os.path.realpath(__file__)) - execute_sql(conn, os.path.join(sqldir, 'db/provenance.sql')) - - -################################################################################ -################################################################################ -################################################################################ - class ProvenanceInterface: # TODO: turn this into a real interface and move PostgreSQL implementation # to a separate file - def __init__(self, conn: psycopg2.extensions.connection): - # TODO: consider addind a mutex for thread safety - self.conn = conn - self.cursor = self.conn.cursor() - self.insert_cache = None - self.select_cache = None - self.clear_caches() - - - def clear_caches(self): - self.insert_cache = { - "content": dict(), - "content_early_in_rev": list(), - "content_in_dir": list(), - "directory": dict(), - "directory_in_rev": list(), - "revision": dict() - } - self.select_cache = { - "content": dict(), - "directory": dict(), - "revision": dict() - } + def __init__(self, **kwargs): + raise NotImplementedError def commit(self): - result = False - try: - self.insert_all() - self.conn.commit() - result = True - - except psycopg2.DatabaseError: - # Database error occurred, rollback all changes - self.conn.rollback() - # TODO: maybe serialize and auto-merge transations. - # The only conflicts are on: - # - content: we keep the earliest date - # - directory: we keep the earliest date - # - content_in_dir: there should be just duplicated entries. - - except Exception as error: - # Unexpected error occurred, rollback all changes and log message - logging.warning(f'Unexpected error: {error}') - self.conn.rollback() - - finally: - self.clear_caches() - - return result - - - def content_add_to_directory( - self, - directory: DirectoryEntry, - blob: FileEntry, - prefix: PosixPath - ): - # logging.debug(f'NEW occurrence of content {hash_to_hex(blob.id)} in directory {hash_to_hex(directory.id)} (path: {prefix / blob.name})') - # self.cursor.execute('''INSERT INTO content_in_dir VALUES (%s,%s,%s)''', - # (blob.id, directory.id, bytes(normalize(prefix / blob.name)))) - self.insert_cache['content_in_dir'].append( - (blob.id, directory.id, bytes(normalize(prefix / blob.name))) - ) + raise NotImplementedError - def content_add_to_revision( - self, - revision: RevisionEntry, - blob: FileEntry, - prefix: PosixPath - ): - # logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} in revision {hash_to_hex(revision.id)} (path: {prefix / blob.name})') - # self.cursor.execute('''INSERT INTO content_early_in_rev VALUES (%s,%s,%s)''', - # (blob.id, revision.id, bytes(normalize(prefix / blob.name)))) - self.insert_cache['content_early_in_rev'].append( - (blob.id, revision.id, bytes(normalize(prefix / blob.name))) - ) + def content_add_to_directory(self, directory: DirectoryEntry, blob: FileEntry, prefix: PosixPath): + raise NotImplementedError + + + def content_add_to_revision(self, revision: RevisionEntry, blob: FileEntry, prefix: PosixPath): + raise NotImplementedError def content_find_first(self, blobid: str): - logging.info(f'Retrieving first occurrence of content {hash_to_hex(blobid)}') - self.cursor.execute('''SELECT blob, rev, date, path - FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev - WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1''', (blobid,)) - return self.cursor.fetchone() + raise NotImplementedError def content_find_all(self, blobid: str): - logging.info(f'Retrieving all occurrences of content {hash_to_hex(blobid)}') - self.cursor.execute('''(SELECT blob, rev, date, path - FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev - WHERE content_early_in_rev.blob=%s) - UNION - (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path - FROM (SELECT content_in_dir.blob, directory_in_rev.rev, - CASE directory_in_rev.path - WHEN '.' THEN content_in_dir.path - ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path - END AS path - FROM content_in_dir - JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir - WHERE content_in_dir.blob=%s) AS content_in_rev - JOIN revision ON revision.id=content_in_rev.rev) - ORDER BY date, rev, path''', (blobid, blobid)) - # POSTGRESQL EXPLAIN - yield from self.cursor.fetchall() + raise NotImplementedError def content_get_early_date(self, blob: FileEntry) -> datetime: - logging.debug(f'Getting content {hash_to_hex(blob.id)} early date') - # First check if the date is being modified by current transection. - date = self.insert_cache['content'].get(blob.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache['content'].get(blob.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute('''SELECT date FROM content WHERE id=%s''', - (blob.id,)) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache['content'][blob.id] = date - return date + raise NotImplementedError def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: - dates = {} - pending = [] - for blob in blobs: - # First check if the date is being modified by current transection. - date = self.insert_cache['content'].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - # If not, check whether it's been query before - date = self.select_cache['content'].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - pending.append(blob.id) - if pending: - # Otherwise, query the database and cache the values - values = ', '.join(itertools.repeat('%s', len(pending))) - self.cursor.execute(f'''SELECT id, date FROM content WHERE id IN ({values})''', - tuple(pending)) - for row in self.cursor.fetchall(): - dates[row[0]] = row[1] - self.select_cache['content'][row[0]] = row[1] - return dates + raise NotImplementedError def content_set_early_date(self, blob: FileEntry, date: datetime): - # logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} (timestamp: {date})') - # self.cursor.execute('''INSERT INTO content VALUES (%s,%s) - # ON CONFLICT (id) DO UPDATE SET date=%s''', - # (blob.id, date, date)) - self.insert_cache['content'][blob.id] = date - - - def directory_add_to_revision( - self, - revision: RevisionEntry, - directory: DirectoryEntry, - path: PosixPath - ): - # logging.debug(f'NEW occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER of revision {hash_to_hex(revision.id)} (path: {path})') - # self.cursor.execute('''INSERT INTO directory_in_rev VALUES (%s,%s,%s)''', - # (directory.id, revision.id, bytes(normalize(path)))) - self.insert_cache['directory_in_rev'].append( - (directory.id, revision.id, bytes(normalize(path))) - ) + raise NotImplementedError + + + def directory_add_to_revision(self, revision: RevisionEntry, directory: DirectoryEntry, path: PosixPath): + raise NotImplementedError def directory_get_date_in_isochrone_frontier(self, directory: DirectoryEntry) -> datetime: - # logging.debug(f'Getting directory {hash_to_hex(directory.id)} early date') - # First check if the date is being modified by current transection. - date = self.insert_cache['directory'].get(directory.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache['directory'].get(directory.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute('''SELECT date FROM directory WHERE id=%s''', - (directory.id,)) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache['directory'][directory.id] = date - return date + raise NotImplementedError def directory_get_early_dates(self, dirs: List[DirectoryEntry]) -> Dict[bytes, datetime]: - dates = {} - pending = [] - for dir in dirs: - # First check if the date is being modified by current transection. - date = self.insert_cache['directory'].get(dir.id, None) - if date is not None: - dates[dir.id] = date - else: - # If not, check whether it's been query before - date = self.select_cache['directory'].get(dir.id, None) - if date is not None: - dates[dir.id] = date - else: - pending.append(dir.id) - if pending: - # Otherwise, query the database and cache the values - values = ', '.join(itertools.repeat('%s', len(pending))) - self.cursor.execute(f'''SELECT id, date FROM directory WHERE id IN ({values})''', - tuple(pending)) - for row in self.cursor.fetchall(): - dates[row[0]] = row[1] - self.select_cache['directory'][row[0]] = row[1] - return dates + raise NotImplementedError def directory_set_date_in_isochrone_frontier(self, directory: DirectoryEntry, date: datetime): - # logging.debug(f'EARLY occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER (timestamp: {date})') - # self.cursor.execute('''INSERT INTO directory VALUES (%s,%s) - # ON CONFLICT (id) DO UPDATE SET date=%s''', - # (directory.id, date, date)) - self.insert_cache['directory'][directory.id] = date - - - def insert_all(self): - # Performe insertions with cached information - psycopg2.extras.execute_values( - self.cursor, - '''INSERT INTO content(id, date) VALUES %s - ON CONFLICT (id) DO UPDATE SET date=EXCLUDED.date''', # TODO: keep earliest date on conflict - # '''INSERT INTO content(id, date) VALUES %s - # ON CONFLICT (id) DO - # UPDATE SET date = CASE - # WHEN EXCLUDED.date < content.date - # THEN EXCLUDED.date - # ELSE content.date - # END''', - self.insert_cache['content'].items() - ) - - psycopg2.extras.execute_values( - self.cursor, - '''INSERT INTO content_early_in_rev VALUES %s - ON CONFLICT DO NOTHING''', - self.insert_cache['content_early_in_rev'] - ) - - psycopg2.extras.execute_values( - self.cursor, - '''INSERT INTO content_in_dir VALUES %s - ON CONFLICT DO NOTHING''', - self.insert_cache['content_in_dir'] - ) - - psycopg2.extras.execute_values( - self.cursor, - '''INSERT INTO directory(id, date) VALUES %s - ON CONFLICT (id) DO UPDATE SET date=EXCLUDED.date''', # TODO: keep earliest date on conflict - # '''INSERT INTO directory(id, date) VALUES %s - # ON CONFLICT (id) DO - # UPDATE SET date = CASE - # WHEN EXCLUDED.date < directory.date - # THEN EXCLUDED.date - # ELSE directory.date - # END''', - self.insert_cache['directory'].items() - ) - - psycopg2.extras.execute_values( - self.cursor, - '''INSERT INTO directory_in_rev VALUES %s - ON CONFLICT DO NOTHING''', - self.insert_cache['directory_in_rev'] - ) - - psycopg2.extras.execute_values( - self.cursor, - '''INSERT INTO revision(id, date) VALUES %s - ON CONFLICT (id) DO UPDATE SET date=EXCLUDED.date''', # TODO: keep earliest date on conflict - # '''INSERT INTO revision(id, date) VALUES %s - # ON CONFLICT (id) DO - # UPDATE SET date = CASE - # WHEN EXCLUDED.date < revision.date - # THEN EXCLUDED.date - # ELSE revision.date - # END''', - self.insert_cache['revision'].items() - ) + raise NotImplementedError def origin_get_id(self, origin: OriginEntry) -> int: - if origin.id is None: - # Check if current origin is already known and retrieve its internal id. - self.cursor.execute('''SELECT id FROM origin WHERE url=%s''', (origin.url,)) - row = self.cursor.fetchone() - - if row is None: - # If the origin is seen for the first time, current revision is - # the prefered one. - self.cursor.execute('''INSERT INTO origin (url) VALUES (%s) RETURNING id''', - (origin.url,)) - return self.cursor.fetchone()[0] - else: - return row[0] - else: - return origin.id + raise NotImplementedError def revision_add(self, revision: RevisionEntry): - # Add current revision to the compact DB - self.insert_cache['revision'][revision.id] = revision.date + raise NotImplementedError def revision_add_before_revision(self, relative: RevisionEntry, revision: RevisionEntry): - self.cursor.execute('''INSERT INTO revision_before_rev VALUES (%s,%s)''', - (revision.id, relative.id)) + raise NotImplementedError def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - self.cursor.execute('''INSERT INTO revision_in_org VALUES (%s,%s) - ON CONFLICT DO NOTHING''', - (revision.id, origin.id)) + raise NotImplementedError def revision_get_early_date(self, revision: RevisionEntry) -> datetime: - # logging.debug(f'Getting revision {hash_to_hex(revision.id)} early date') - # First check if the date is being modified by current transection. - date = self.insert_cache['revision'].get(revision.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache['revision'].get(revision.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute('''SELECT date FROM revision WHERE id=%s''', - (revision.id,)) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache['revision'][revision.id] = date - return date + raise NotImplementedError def revision_get_prefered_origin(self, revision: RevisionEntry) -> int: - self.cursor.execute('''SELECT COALESCE(org,0) FROM revision WHERE id=%s''', - (revision.id,)) - row = self.cursor.fetchone() - # None means revision is not in database - # 0 means revision has no prefered origin - return row[0] if row is not None and row[0] != 0 else None + raise NotImplementedError def revision_in_history(self, revision: RevisionEntry) -> bool: - self.cursor.execute('''SELECT 1 FROM revision_before_rev WHERE prev=%s''', - (revision.id,)) - return self.cursor.fetchone() is not None + raise NotImplementedError def revision_set_prefered_origin(self, origin: OriginEntry, revision: RevisionEntry): - self.cursor.execute('''UPDATE revision SET org=%s WHERE id=%s''', - (origin.id, revision.id)) + raise NotImplementedError def revision_visited(self, revision: RevisionEntry) -> bool: - self.cursor.execute('''SELECT 1 FROM revision_in_org WHERE rev=%s''', - (revision.id,)) - return self.cursor.fetchone() is not None + raise NotImplementedError + + + def revision_get_early_date(self, revision: RevisionEntry) -> datetime: + raise NotImplementedError -################################################################################ -################################################################################ -################################################################################ + def revision_get_prefered_origin(self, revision: RevisionEntry) -> int: + raise NotImplementedError + + + def revision_in_history(self, revision: RevisionEntry) -> bool: + raise NotImplementedError + + + def revision_set_prefered_origin(self, origin: OriginEntry, revision: RevisionEntry): + raise NotImplementedError + + + def revision_visited(self, revision: RevisionEntry) -> bool: + raise NotImplementedError + def directory_process_content( provenance: ProvenanceInterface, directory: DirectoryEntry, relative: DirectoryEntry, prefix: PosixPath ): stack = [(directory, prefix)] while stack: dir, path = stack.pop() for child in iter(dir): if isinstance(child, FileEntry): # Add content to the relative directory with the computed path. provenance.content_add_to_directory(relative, child, path) else: # Recursively walk the child directory. stack.append((child, path / child.name)) def origin_add( provenance: ProvenanceInterface, origin: OriginEntry ): origin.id = provenance.origin_get_id(origin) for revision in origin.revisions: # logging.info(f'Processing revision {hash_to_hex(revision.id)} from origin {origin.url}') origin_add_revision(provenance, origin, revision) # Commit after each revision provenance.commit() # TODO: verify this! def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry ): stack = [(None, revision)] while stack: relative, rev = stack.pop() # Check if current revision has no prefered origin and update if necessary. prefered = provenance.revision_get_prefered_origin(rev) # logging.debug(f'Prefered origin for revision {hash_to_hex(rev.id)}: {prefered}') if prefered is None: provenance.revision_set_prefered_origin(origin, rev) ######################################################################## if relative is None: # This revision is pointed directly by the origin. visited = provenance.revision_visited(rev) logging.debug(f'Revision {hash_to_hex(rev.id)} in origin {origin.id}: {visited}') logging.debug(f'Adding revision {hash_to_hex(rev.id)} to origin {origin.id}') provenance.revision_add_to_origin(origin, rev) if not visited: stack.append((rev, rev)) else: # This revision is a parent of another one in the history of the # relative revision. for parent in iter(rev): visited = provenance.revision_visited(parent) logging.debug(f'Parent {hash_to_hex(parent.id)} in some origin: {visited}') if not visited: # The parent revision has never been seen before pointing # directly to an origin. known = provenance.revision_in_history(parent) logging.debug(f'Revision {hash_to_hex(parent.id)} before revision: {known}') if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. logging.debug(f'Adding revision {hash_to_hex(parent.id)} directly to origin {origin.id}') stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. logging.debug(f'Adding parent revision {hash_to_hex(parent.id)} to revision {hash_to_hex(relative.id)}') provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. logging.debug(f'Adding parent revision {hash_to_hex(parent.id)} to origin {origin.id}') provenance.revision_add_to_origin(origin, parent) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry ): # Processed content starting from the revision's root directory date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: provenance.revision_add(revision) revision_process_content( provenance, revision, DirectoryEntry(archive, revision.root, PosixPath('.')) ) return provenance.commit() def revision_process_content( provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry ): date = provenance.directory_get_date_in_isochrone_frontier(directory) stack = [(directory, date, directory.name)] # stack = [(directory, directory.name)] while stack: dir, date, path = stack.pop() # dir, path = stack.pop() # date = provenance.directory_get_date_in_isochrone_frontier(dir) if date is None: # The directory has never been seen on the isochrone graph of a # revision. Its children should be checked. blobs = [child for child in iter(dir) if isinstance(child, FileEntry)] dirs = [child for child in iter(dir) if isinstance(child, DirectoryEntry)] blobdates = provenance.content_get_early_dates(blobs) # TODO: this will only return timestamps for diretories that were # seen in an isochrone frontier. But a directory may only cointain a # subdirectory whose contents are already known. Which one should be # added to the frontier then (the root or the sub directory)? dirdates = provenance.directory_get_early_dates(dirs) if blobs + dirs: dates = list(blobdates.values()) + list(dirdates.values()) if None in dates: print(dates) if len(dates) == len(blobs) + len(dirs) and max(dates) <= revision.date: # The directory belongs to the isochrone frontier of the # current revision, and this is the first time it appears # as such. provenance.directory_set_date_in_isochrone_frontier(dir, max(dates)) provenance.directory_add_to_revision(revision, dir, path) directory_process_content( provenance, directory=dir, relative=dir, prefix=PosixPath('.') ) else: # The directory is not on the isochrone frontier of the # current revision. Its child nodes should be analyzed. ############################################################ for child in blobs: date = blobdates.get(child.id, None) # date = provenance.content_get_early_date(child) if date is None or revision.date < date: provenance.content_set_early_date(child, revision.date) provenance.content_add_to_revision(revision, child, path) for child in dirs: date = dirdates.get(child.id, None) # date = provenance.directory_get_date_in_isochrone_frontier(child) stack.append((child, date, path / child.name)) # stack.append((child, path / child.name)) ############################################################ elif revision.date < date: # The directory has already been seen on the isochrone frontier of # a revision, but current revision is earlier. Its children should # be updated. blobs = [child for child in iter(dir) if isinstance(child, FileEntry)] dirs = [child for child in iter(dir) if isinstance(child, DirectoryEntry)] blobdates = provenance.content_get_early_dates(blobs) dirdates = provenance.directory_get_early_dates(dirs) #################################################################### for child in blobs: # date = blobdates.get(child.id, None) date = provenance.content_get_early_date(child) if date is None or revision.date < date: provenance.content_set_early_date(child, revision.date) provenance.content_add_to_revision(revision, child, path) for child in dirs: # date = dirdates.get(child.id, None) date = provenance.directory_get_date_in_isochrone_frontier(child) stack.append((child, date, path / child.name)) # stack.append((child, path / child.name)) #################################################################### provenance.directory_set_date_in_isochrone_frontier(dir, revision.date) else: # The directory has already been seen on the isochrone frontier of # an earlier revision. Just add it to the current revision. provenance.directory_add_to_revision(revision, dir, path) - - -def get_provenance(conninfo: dict) -> ProvenanceInterface: - # TODO: improve this methos to allow backend selection - conn = connect(conninfo) - return ProvenanceInterface(conn) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index 7b6f49b..d32a25d 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,168 +1,167 @@ import logging import threading from .archive import ArchiveInterface -from .db_utils import connect from datetime import datetime from swh.model.hashutil import hash_to_bytes, hash_to_hex class RevisionEntry: def __init__( self, archive: ArchiveInterface, id: bytes, date: datetime=None, root: bytes=None, parents: list=None ): self.archive = archive self.id = id self.date = date self.parents = parents self.root = root def __iter__(self): if self.parents is None: self.parents = [] for parent in self.archive.revision_get([self.id]): if parent is not None: self.parents.append( RevisionEntry( self.archive, parent.id, parents=[RevisionEntry(self.archive, id) for id in parent.parents] ) ) return iter(self.parents) ################################################################################ ################################################################################ class RevisionIterator: """Iterator interface.""" def __iter__(self): pass def __next__(self): pass class FileRevisionIterator(RevisionIterator): """Iterator over revisions present in the given CSV file.""" def __init__(self, filename: str, archive: ArchiveInterface, limit: int=None): self.file = open(filename) self.idx = 0 self.limit = limit self.mutex = threading.Lock() self.archive = archive def next(self): self.mutex.acquire() line = self.file.readline().strip() if line and (self.limit is None or self.idx < self.limit): self.idx = self.idx + 1 id, date, root = line.strip().split(',') self.mutex.release() return RevisionEntry( self.archive, hash_to_bytes(id), date=datetime.fromisoformat(date), root=hash_to_bytes(root) ) else: self.mutex.release() return None # class ArchiveRevisionIterator(RevisionIterator): # """Iterator over revisions present in the given database.""" # # def __init__(self, conn, limit=None, chunksize=100): # self.cur = conn.cursor() # self.chunksize = chunksize # self.records = [] # if limit is None: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision''') # else: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision # LIMIT %s''', (limit,)) # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # self.mutex = threading.Lock() # # def __del__(self): # self.cur.close() # # def next(self): # self.mutex.acquire() # if not self.records: # self.records.clear() # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # # if self.records: # revision, *self.records = self.records # self.mutex.release() # return revision # else: # self.mutex.release() # return None # # def make_record(self, row): # # Only revision with author or commiter date are considered # if row[1] is not None: # # If the revision has author date, it takes precedence # return RevisionEntry(row[0], row[1], row[3]) # elif row[2] is not None: # # If not, we use the commiter date # return RevisionEntry(row[0], row[2], row[3]) ################################################################################ ################################################################################ # class RevisionWorker(threading.Thread): # def __init__( # self, # id: int, # conninfo: dict, # archive: ArchiveInterface, # revisions: RevisionIterator # ): # from .provenance import get_provenance # # super().__init__() # self.archive = archive # self.id = id # self.provenance = get_provenance(conninfo) # self.revisions = revisions # # # def run(self): # from .provenance import revision_add # # # while True: # revision = self.revisions.next() # if revision is None: break # # processed = False # while not processed: # logging.info(f'Thread {self.id} - Processing revision {hash_to_hex(revision.id)} (timestamp: {revision.date})') # processed = revision_add(self.provenance, self.archive, revision) # if not processed: # logging.warning(f'Thread {self.id} - Failed to process revision {hash_to_hex(revision.id)} (timestamp: {revision.date})') diff --git a/swh/provenance/__init__.py b/swh/provenance/storage/__init__.py similarity index 100% copy from swh/provenance/__init__.py copy to swh/provenance/storage/__init__.py diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py new file mode 100644 index 0000000..f895ecb --- /dev/null +++ b/swh/provenance/storage/archive.py @@ -0,0 +1,42 @@ +import psycopg2 + +from ..archive import ArchiveInterface + +from typing import List +from swh.storage import get_storage + + +class ArchiveStorage(ArchiveInterface): + def __init__(self, cls: str, **kwargs): + self.storage = get_storage(cls, **kwargs) + + def directory_ls(self, id: bytes): + # TODO: filter unused fields + yield from self.storage.directory_ls(id) + + def iter_origins(self): + from swh.storage.algos.origin import iter_origins + yield from iter_origins(self.storage) + + def iter_origin_visits(self, origin: str): + from swh.storage.algos.origin import iter_origin_visits + # TODO: filter unused fields + yield from iter_origin_visits(self.storage, origin) + + def iter_origin_visit_statuses(self, origin: str, visit: int): + from swh.storage.algos.origin import iter_origin_visit_statuses + # TODO: filter unused fields + yield from iter_origin_visit_statuses(self.storage, origin, visit) + + def release_get(self, ids: List[bytes]): + # TODO: filter unused fields + yield from self.storage.release_get(ids) + + def revision_get(self, ids: List[bytes]): + # TODO: filter unused fields + yield from self.storage.revision_get(ids) + + def snapshot_get_all_branches(self, snapshot: bytes): + from swh.storage.algos.snapshot import snapshot_get_all_branches + # TODO: filter unused fields + return snapshot_get_all_branches(self.storage, snapshot)