diff --git a/contents.py b/contents.py new file mode 100644 index 0000000..299a602 --- /dev/null +++ b/contents.py @@ -0,0 +1,51 @@ +import os +import psycopg2 + +from swh.model.hashutil import (hash_to_bytes, hash_to_hex) +from swh.provenance.provenance import get_provenance + + +if __name__ == "__main__": + conninfo = { + "host": "localhost", + "database": "test2", + "user": "postgres", + "password": "postgres" + } + provenance = get_provenance(conninfo) + + print('content(id, date): ################################################') + provenance.cursor.execute('''SELECT id, date FROM content''') + for row in provenance.cursor.fetchall(): + print(f'{hash_to_hex(row[0])}, {row[1]}') + print('###################################################################') + + print('content_early_in_rev(blob, rev, path): ############################') + provenance.cursor.execute('''SELECT blob, rev, path FROM content_early_in_rev''') + for row in provenance.cursor.fetchall(): + print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {os.fsdecode(row[2])}') + print('###################################################################') + + print('content_in_dir(blob, dir, path): ##################################') + provenance.cursor.execute('''SELECT blob, dir, path FROM content_in_dir''') + for row in provenance.cursor.fetchall(): + print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {os.fsdecode(row[2])}') + print('###################################################################') + + print('directory(id, date): ##############################################') + provenance.cursor.execute('''SELECT id, date FROM directory''') + for row in provenance.cursor.fetchall(): + print(f'{hash_to_hex(row[0])}, {row[1]}') + print('###################################################################') + + print('directory_in_rev(dir, rev, path): #################################') + provenance.cursor.execute('''SELECT dir, rev, path FROM directory_in_rev''') + for row in provenance.cursor.fetchall(): + print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {os.fsdecode(row[2])}') + print('###################################################################') + + print('revision(id, date): ###############################################') + provenance.cursor.execute('''SELECT id, date FROM revision''') + for row in provenance.cursor.fetchall(): + print(f'{hash_to_hex(row[0])}, {row[1]}') + print('###################################################################') diff --git a/revisions.py b/revisions.py new file mode 100644 index 0000000..eff0fcf --- /dev/null +++ b/revisions.py @@ -0,0 +1,62 @@ +import io +import random +import pytz + +from datetime import datetime + +from swh.model.hashutil import (hash_to_bytes, hash_to_hex) +from swh.storage import get_storage +from swh.provenance.revision import RevisionEntry + + +def rev_to_csv(revision: RevisionEntry): + return ','.join([ + hash_to_hex(revision.id), + str(pytz.utc.localize(revision.date)), + hash_to_hex(revision.root) + ]) + '\n' + + +if __name__ == "__main__": + conninfo = { + "cls": "remote", + "url": "http://uffizi.internal.softwareheritage.org:5002" + } + storage = get_storage(**conninfo) + + revisions = [ + # '6eec5815ef8fc88d9fc5bcc91c6465a8899c1445', + # 'd1468bb5f06ca44cc42c43fbd011c5dcbdc262c6', + # '6a45ebb887d87ee53f359aaeba8a9840576c907b' + '02f95c0a1868cbef82ff73fc1b903183a579c7de', + 'da061f1caf293a5da00bff6a45abcf4d7ae54c50', + 'e3bfd73a9fd8ef3dd4c5b05a927de485f9871323' + ] + print(revisions) + + revisions = list(map(hash_to_bytes, revisions)) + print(revisions) + + entries = [] + for revision in storage.revision_get(revisions): + if revision is not None: + print(revision) + entries.append(RevisionEntry( + storage, + revision.id, + datetime.fromtimestamp(revision.date.timestamp.seconds), + revision.directory + )) + + random.shuffle(entries) + with io.open('random.csv', 'w') as outfile: + for revision in entries: + outfile.write(rev_to_csv(revision)) + + with io.open('ordered.csv', 'w') as outfile: + for revision in sorted(entries, key=lambda rev: rev.date): + outfile.write(rev_to_csv(revision)) + + with io.open('reverse.csv', 'w') as outfile: + for revision in sorted(entries, key=lambda rev: rev.date, reverse=True): + outfile.write(rev_to_csv(revision)) diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py new file mode 100644 index 0000000..4bc725e --- /dev/null +++ b/swh/provenance/archive.py @@ -0,0 +1,82 @@ +import psycopg2 + +from .db_utils import connect + +from typing import List + +from swh.storage import get_storage + + +class ArchiveInterface: + def __init__(self): + raise NotImplementedError + + def directory_ls(self, id: bytes): + raise NotImplementedError + + def revision_get(self, ids: List[bytes]): + raise NotImplementedError + + +class ArchiveStorage(ArchiveInterface): + def __init__(self, cls: str, **kwargs): + self.storage = get_storage(cls, **kwargs) + + def directory_ls(self, id: bytes): + # TODO: filter unused fields + yield from self.storage.directory_ls(id) + + def revision_get(self, ids: List[bytes]): + # TODO: filter unused fields + yield from self.storage.revision_get(ids) + + +class Archive(ArchiveInterface): + def __init__(self, conn: psycopg2.extensions.connection): + self.conn = conn + self.cursor = conn.cursor() + + def directory_ls(self, id: bytes): + self.cursor.execute('''WITH + dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries + FROM directory WHERE id=%s), + ls_d AS (SELECT dir_id, unnest(dir_entries) AS entry_id from dir), + ls_f AS (SELECT dir_id, unnest(file_entries) AS entry_id from dir), + ls_r AS (SELECT dir_id, unnest(rev_entries) AS entry_id from dir) + (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git + FROM ls_d + LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) + UNION + (WITH known_contents AS + (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git + FROM ls_f + LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id + INNER JOIN content c ON e.target=c.sha1_git) + SELECT * FROM known_contents + UNION + (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git + FROM ls_f + LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id + LEFT JOIN skipped_content c ON e.target=c.sha1_git + WHERE NOT EXISTS (SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target))) + UNION + (SELECT 'rev'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git + FROM ls_r + LEFT JOIN directory_entry_rev e ON ls_r.entry_id=e.id) + ORDER BY name + ''', (id,)) + for row in self.cursor.fetchall(): + yield {'type': row[0], 'target': row[1], 'name': row[2]} + + def revision_get(self, ids: List[bytes]): + raise NotImplementedError + + +def get_archive(cls: str, **kwargs) -> ArchiveInterface: + if cls == "api": + return ArchiveStorage(**kwargs["storage"]) + elif cls == "ps": + conn = connect(kwargs["db"]) + return Archive(conn) + else: + raise NotImplementedError diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 959c041..c013181 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,195 +1,236 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import (hash_to_bytes, hash_to_hex) -from swh.storage import get_storage # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { - "storage": { - "cls": "remote", - "url": "http://uffizi.internal.softwareheritage.org:5002" + "archive": { + # "cls": "api", + # "storage": { + # "cls": "remote", + # "url": "http://uffizi.internal.softwareheritage.org:5002" + # } + "cls": "ps", + "db": { + "host": "db.internal.softwareheritage.org", + "database": "softwareheritage", + "user": "guest" + } }, "db": { "host": "localhost", - "database": "provenance", + "database": "test3", "user": "postgres", "password": "postgres" } } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP, ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file""", ) -@click.option("--profile", is_flag=True) +@click.option("--profile", default=None) @click.pass_context -def cli(ctx, config_file: Optional[str], profile: bool): +def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf - - if profile: - import cProfile - import pstats - import io - import atexit - - print("Profiling...") - pr = cProfile.Profile() - pr.enable() - - def exit(): - pr.disable() - print("Profiling completed") - s = io.StringIO() - pstats.Stats(pr, stream=s).sort_stats("cumulative").print_stats() - print(s.getvalue()) - - atexit.register(exit) + ctx.obj["profile"] = profile + + # if profile: + # import cProfile + # import pstats + # import io + # import atexit + # + # print("Profiling...") + # pr = cProfile.Profile() + # pr.enable() + # + # def exit(): + # pr.disable() + # print("Profiling completed") + # s = io.StringIO() + # pstats.Stats(pr, stream=s).sort_stats("cumulative").print_stats() + # print(s.getvalue()) + # + # atexit.register(exit) @cli.command(name="create") @click.option("--name", default=None) @click.pass_context def create(ctx, name): """Create new provenance database.""" from .db_utils import connect from .provenance import create_database # Connect to server without selecting a database conninfo = ctx.obj["config"]["db"] database = conninfo.pop('database', None) conn = connect(conninfo) if name is None: name = database create_database(conn, conninfo, name) @cli.command(name="iter-revisions") @click.argument("filename") @click.option('-l', '--limit', type=int) @click.option('-t', '--threads', type=int, default=1) @click.pass_context def iter_revisions(ctx, filename, limit, threads): """Iterate over provided list of revisions and add them to the provenance database.""" + from .archive import get_archive from .revision import FileRevisionIterator - from .revision import RevisionWorker + # from .revision import RevisionWorker + + # conninfo = ctx.obj["config"]["db"] + # archive = get_archive(**ctx.obj["config"]["archive"]) + # revisions = FileRevisionIterator(filename, archive, limit=limit) + # workers = [] + # + # for id in range(threads): + # worker = RevisionWorker(id, conninfo, archive, revisions) + # worker.start() + # workers.append(worker) + # + # for worker in workers: + # worker.join() + + ############################################################################ + archive = get_archive(**ctx.obj["config"]["archive"]) + revisions = FileRevisionIterator(filename, archive, limit=limit) + + if ctx.obj["profile"]: + from .provenance import get_provenance, revision_add + import cProfile - conninfo = ctx.obj["config"]["db"] - storage = get_storage(**ctx.obj["config"]["storage"]) - revisions = FileRevisionIterator(filename, storage, limit=limit) - workers = [] + provenance = get_provenance(ctx.obj["config"]["db"]) + command = """ +while True: + revision = revisions.next() + if revision is None: break + revision_add(provenance, archive, revision) + """ - for id in range(threads): - worker = RevisionWorker(id, conninfo, storage, revisions) - worker.start() - workers.append(worker) + cProfile.runctx(command, globals(), locals(), filename=ctx.obj["profile"]) - for worker in workers: - worker.join() + else: + from .revision import RevisionWorker + + conninfo = ctx.obj["config"]["db"] + workers = [] + + for id in range(threads): + worker = RevisionWorker(id, conninfo, archive, revisions) + worker.start() + workers.append(worker) + + for worker in workers: + worker.join() + ############################################################################ @cli.command(name="iter-origins") @click.argument("filename") @click.option('-l', '--limit', type=int) #@click.option('-t', '--threads', type=int, default=1) @click.pass_context #def iter_revisions(ctx, filename, limit, threads): def iter_origins(ctx, filename, limit): """Iterate over provided list of revisions and add them to the provenance database.""" - from .db_utils import connect + from .archive import get_archive from .origin import FileOriginIterator - from .provenance import origin_add + from .provenance import get_provenance, origin_add - conn = connect(ctx.obj["config"]["db"]) - storage = get_storage(**ctx.obj["config"]["storage"]) + provenance = get_provenance(ctx.obj["config"]["db"]) + archive = get_archive(**ctx.obj["config"]["archive"]) - for origin in FileOriginIterator(filename, storage, limit=limit): + for origin in FileOriginIterator(filename, archive, limit=limit): # TODO: consider using threads and a OriginWorker class - origin_add(conn, storage, origin) + origin_add(provenance, origin) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" - from .db_utils import connect - from .provenance import content_find_first + from .provenance import get_provenance - with connect(ctx.obj["config"]["db"]).cursor() as cursor: - # TODO: return a dictionary with proper keys for each field - row = content_find_first(cursor, hash_to_bytes(swhid)) - if row is not None: - print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') - else: - print(f'Cannot find a content with the id {swhid}') + provenance = get_provenance(ctx.obj["config"]["db"]) + # TODO: return a dictionary with proper keys for each field + row = provenance.content_find_first(hash_to_bytes(swhid)) + if row is not None: + print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') + else: + print(f'Cannot find a content with the id {swhid}') @cli.command(name="find-all") @click.argument("swhid") @click.pass_context def find_all(ctx, swhid): """Find all occurrences of the requested blob.""" - from .db_utils import connect - from .provenance import content_find_all + from .provenance import get_provenance - with connect(ctx.obj["config"]["db"]).cursor() as cursor: - # TODO: return a dictionary with proper keys for each field - for row in content_find_all(cursor, hash_to_bytes(swhid)): - print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') + provenance = get_provenance(ctx.obj["config"]["db"]) + # TODO: return a dictionary with proper keys for each field + for row in provenance.content_find_all(hash_to_bytes(swhid)): + print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') diff --git a/swh/provenance/model.py b/swh/provenance/model.py index 4b06320..9858af4 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,45 +1,46 @@ import os +from .archive import ArchiveInterface + from pathlib import PosixPath -from swh.storage.interface import StorageInterface class Tree: - def __init__(self, storage: StorageInterface, id: bytes): - self.root = DirectoryEntry(storage, id, PosixPath('.')) + def __init__(self, archive: ArchiveInterface, id: bytes): + self.root = DirectoryEntry(archive, id, PosixPath('.')) class TreeEntry: def __init__(self, id: bytes, name: PosixPath): self.id = id self.name = name class DirectoryEntry(TreeEntry): - def __init__(self, storage: StorageInterface, id: bytes, name: PosixPath): + def __init__(self, archive: ArchiveInterface, id: bytes, name: PosixPath): super().__init__(id, name) - self.storage = storage + self.archive = archive self.children = None def __iter__(self): if self.children is None: self.children = [] - for child in self.storage.directory_ls(self.id): + for child in self.archive.directory_ls(self.id): if child['type'] == 'dir': self.children.append(DirectoryEntry( - self.storage, + self.archive, child['target'], PosixPath(os.fsdecode(child['name'])) )) elif child['type'] == 'file': self.children.append(FileEntry( child['target'], PosixPath(os.fsdecode(child['name'])) )) return iter(self.children) class FileEntry(TreeEntry): pass diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index e82f528..5b607ac 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,511 +1,600 @@ +import itertools import logging import os import psycopg2 import psycopg2.extras +from .archive import ArchiveInterface from .db_utils import connect, execute_sql from .model import DirectoryEntry, FileEntry, Tree from .origin import OriginEntry from .revision import RevisionEntry from datetime import datetime from pathlib import PosixPath +from typing import Dict, List from swh.model.hashutil import hash_to_hex -from swh.storage.interface import StorageInterface def normalize(path: PosixPath) -> PosixPath: spath = str(path) if spath.startswith('./'): return PosixPath(spath[2:]) return path def create_database( conn: psycopg2.extensions.connection, conninfo: dict, name: str ): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) # Create new database dropping previous one if exists cursor = conn.cursor(); cursor.execute(f'''DROP DATABASE IF EXISTS {name}''') cursor.execute(f'''CREATE DATABASE {name}'''); conn.close() # Reconnect to server selecting newly created database to add tables conninfo['database'] = name conn = connect(conninfo) sqldir = os.path.dirname(os.path.realpath(__file__)) execute_sql(conn, os.path.join(sqldir, 'db/provenance.sql')) ################################################################################ ################################################################################ ################################################################################ -def content_add_to_dir( - cursor: psycopg2.extensions.cursor, - cache: dict, - directory: DirectoryEntry, - blob: FileEntry, - prefix: PosixPath -): - logging.debug(f'NEW occurrence of content {hash_to_hex(blob.id)} in directory {hash_to_hex(directory.id)} (path: {prefix / blob.name})') - # cursor.execute('INSERT INTO content_in_dir VALUES (%s,%s,%s)', - # (blob.id, directory.id, bytes(normalize(prefix / blob.name)))) - cache['content_in_dir'].append( - (blob.id, directory.id, bytes(normalize(prefix / blob.name))) - ) +class ProvenanceInterface: + # TODO: turn this into a real interface and move PostgreSQL implementation + # to a separate file + def __init__(self, conn: psycopg2.extensions.connection): + # TODO: consider addind a mutex for thread safety + self.conn = conn + self.cursor = self.conn.cursor() + self.insert_cache = None + self.select_cache = None + self.clear_caches() + + + def clear_caches(self): + self.insert_cache = { + "content": dict(), + "content_early_in_rev": list(), + "content_in_dir": list(), + "directory": dict(), + "directory_in_rev": list(), + "revision": dict() + } + self.select_cache = { + "content": dict(), + "directory": dict(), + "revision": dict() + } + + + def commit(self): + result = False + try: + self.insert_all() + self.conn.commit() + result = True + + except psycopg2.DatabaseError: + # Database error occurred, rollback all changes + self.conn.rollback() + # TODO: maybe serialize and auto-merge transations. + # The only conflicts are on: + # - content: we keep the earliest date + # - directory: we keep the earliest date + # - content_in_dir: there should be just duplicated entries. + + except Exception as error: + # Unexpected error occurred, rollback all changes and log message + logging.warning(f'Unexpected error: {error}') + self.conn.rollback() + + finally: + self.clear_caches() + + return result + + + def content_add_to_directory( + self, + directory: DirectoryEntry, + blob: FileEntry, + prefix: PosixPath + ): + # logging.debug(f'NEW occurrence of content {hash_to_hex(blob.id)} in directory {hash_to_hex(directory.id)} (path: {prefix / blob.name})') + # self.cursor.execute('''INSERT INTO content_in_dir VALUES (%s,%s,%s)''', + # (blob.id, directory.id, bytes(normalize(prefix / blob.name)))) + self.insert_cache['content_in_dir'].append( + (blob.id, directory.id, bytes(normalize(prefix / blob.name))) + ) + + + def content_add_to_revision( + self, + revision: RevisionEntry, + blob: FileEntry, + prefix: PosixPath + ): + # logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} in revision {hash_to_hex(revision.id)} (path: {prefix / blob.name})') + # self.cursor.execute('''INSERT INTO content_early_in_rev VALUES (%s,%s,%s)''', + # (blob.id, revision.id, bytes(normalize(prefix / blob.name)))) + self.insert_cache['content_early_in_rev'].append( + (blob.id, revision.id, bytes(normalize(prefix / blob.name))) + ) + + + def content_find_first(self, blobid: str): + logging.info(f'Retrieving first occurrence of content {hash_to_hex(blobid)}') + self.cursor.execute('''SELECT blob, rev, date, path + FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev + WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1''', (blobid,)) + return self.cursor.fetchone() + + + def content_find_all(self, blobid: str): + logging.info(f'Retrieving all occurrences of content {hash_to_hex(blobid)}') + self.cursor.execute('''(SELECT blob, rev, date, path + FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev + WHERE content_early_in_rev.blob=%s) + UNION + (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path + FROM (SELECT content_in_dir.blob, directory_in_rev.rev, + CASE directory_in_rev.path + WHEN '.' THEN content_in_dir.path + ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path + END AS path + FROM content_in_dir + JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir + WHERE content_in_dir.blob=%s) AS content_in_rev + JOIN revision ON revision.id=content_in_rev.rev) + ORDER BY date, rev, path''', (blobid, blobid)) + # POSTGRESQL EXPLAIN + yield from self.cursor.fetchall() + + + # def content_get_early_date(self, blob: FileEntry) -> datetime: + # logging.debug(f'Getting content {hash_to_hex(blob.id)} early date') + # # First check if the date is being modified by current transection. + # date = self.insert_cache['content'].get(blob.id, None) + # if date is None: + # # If not, check whether it's been query before + # date = self.select_cache['content'].get(blob.id, None) + # if date is None: + # # Otherwise, query the database and cache the value + # self.cursor.execute('''SELECT date FROM content WHERE id=%s''', + # (blob.id,)) + # row = self.cursor.fetchone() + # date = row[0] if row is not None else None + # self.select_cache['content'][blob.id] = date + # return date + + + def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: + dates = {} + pending = [] + for blob in blobs: + # First check if the date is being modified by current transection. + date = self.insert_cache['content'].get(blob.id, None) + if date is not None: + dates[blob.id] = date + else: + # If not, check whether it's been query before + date = self.select_cache['content'].get(blob.id, None) + if date is not None: + dates[blob.id] = date + else: + pending.append(blob.id) + if pending: + # Otherwise, query the database and cache the values + values = ', '.join(itertools.repeat('%s', len(pending))) + self.cursor.execute(f'''SELECT id, date FROM content WHERE id IN ({values})''', + tuple(pending)) + for row in self.cursor.fetchall(): + dates[row[0]] = row[1] + self.select_cache['content'][row[0]] = row[1] + return dates + + + def content_set_early_date(self, blob: FileEntry, date: datetime): + # logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} (timestamp: {date})') + # self.cursor.execute('''INSERT INTO content VALUES (%s,%s) + # ON CONFLICT (id) DO UPDATE SET date=%s''', + # (blob.id, date, date)) + self.insert_cache['content'][blob.id] = date + + + def directory_add_to_revision( + self, + revision: RevisionEntry, + directory: DirectoryEntry, + path: PosixPath + ): + # logging.debug(f'NEW occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER of revision {hash_to_hex(revision.id)} (path: {path})') + # self.cursor.execute('''INSERT INTO directory_in_rev VALUES (%s,%s,%s)''', + # (directory.id, revision.id, bytes(normalize(path)))) + self.insert_cache['directory_in_rev'].append( + (directory.id, revision.id, bytes(normalize(path))) + ) + + + def directory_date_in_isochrone_frontier(self, directory: DirectoryEntry) -> datetime: + # logging.debug(f'Getting directory {hash_to_hex(directory.id)} early date') + # First check if the date is being modified by current transection. + date = self.insert_cache['directory'].get(directory.id, None) + if date is None: + # If not, check whether it's been query before + date = self.select_cache['directory'].get(directory.id, None) + if date is None: + # Otherwise, query the database and cache the value + self.cursor.execute('''SELECT date FROM directory WHERE id=%s''', + (directory.id,)) + row = self.cursor.fetchone() + date = row[0] if row is not None else None + self.select_cache['directory'][directory.id] = date + return date + + + def directory_get_early_dates(self, dirs: List[DirectoryEntry]) -> Dict[bytes, datetime]: + dates = {} + pending = [] + for dir in dirs: + # First check if the date is being modified by current transection. + date = self.insert_cache['directory'].get(dir.id, None) + if date is not None: + dates[dir.id] = date + else: + # If not, check whether it's been query before + date = self.select_cache['directory'].get(dir.id, None) + if date is not None: + dates[dir.id] = date + else: + pending.append(dir.id) + if pending: + # Otherwise, query the database and cache the values + values = ', '.join(itertools.repeat('%s', len(pending))) + self.cursor.execute(f'''SELECT id, date FROM directory WHERE id IN ({values})''', + tuple(pending)) + for row in self.cursor.fetchall(): + dates[row[0]] = row[1] + self.select_cache['directory'][row[0]] = row[1] + return dates + + + def directory_add_to_isochrone_frontier(self, directory: DirectoryEntry,date: datetime): + # logging.debug(f'EARLY occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER (timestamp: {date})') + # self.cursor.execute('''INSERT INTO directory VALUES (%s,%s) + # ON CONFLICT (id) DO UPDATE SET date=%s''', + # (directory.id, date, date)) + self.insert_cache['directory'][directory.id] = date + + + def insert_all(self): + # Performe insertions with cached information + psycopg2.extras.execute_values( + self.cursor, + '''INSERT INTO content(id, date) VALUES %s + ON CONFLICT (id) DO UPDATE SET date=excluded.date''', # TODO: keep earliest date on conflict + self.insert_cache['content'].items() + ) + + psycopg2.extras.execute_values( + self.cursor, + '''INSERT INTO content_early_in_rev VALUES %s + ON CONFLICT DO NOTHING''', + self.insert_cache['content_early_in_rev'] + ) + + psycopg2.extras.execute_values( + self.cursor, + '''INSERT INTO content_in_dir VALUES %s + ON CONFLICT DO NOTHING''', + self.insert_cache['content_in_dir'] + ) + + psycopg2.extras.execute_values( + self.cursor, + '''INSERT INTO directory(id, date) VALUES %s + ON CONFLICT (id) DO UPDATE SET date=excluded.date''', # TODO: keep earliest date on conflict + self.insert_cache['directory'].items() + ) + + psycopg2.extras.execute_values( + self.cursor, + '''INSERT INTO directory_in_rev VALUES %s + ON CONFLICT DO NOTHING''', + self.insert_cache['directory_in_rev'] + ) + + psycopg2.extras.execute_values( + self.cursor, + '''INSERT INTO revision(id, date) VALUES %s + ON CONFLICT (id) DO UPDATE SET date=excluded.date''', # TODO: keep earliest date on conflict + self.insert_cache['revision'].items() + ) + + + def origin_get_id(self, origin: OriginEntry) -> int: + if origin.id is None: + # Check if current origin is already known and retrieve its internal id. + self.cursor.execute('''SELECT id FROM origin WHERE url=%s''', (origin.url,)) + row = self.cursor.fetchone() + + if row is None: + # If the origin is seen for the first time, current revision is + # the prefered one. + self.cursor.execute('''INSERT INTO origin (url) VALUES (%s) RETURNING id''', + (origin.url,)) + return self.cursor.fetchone()[0] + else: + return row[0] + else: + return origin.id -def content_add_to_rev( - cursor: psycopg2.extensions.cursor, - cache: dict, - revision: RevisionEntry, - blob: FileEntry, - prefix: PosixPath -): - logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} in revision {hash_to_hex(revision.id)} (path: {prefix / blob.name})') - # cursor.execute('INSERT INTO content_early_in_rev VALUES (%s,%s,%s)', - # (blob.id, revision.id, bytes(normalize(prefix / blob.name)))) - cache['content_early_in_rev'].append( - (blob.id, revision.id, bytes(normalize(prefix / blob.name))) - ) + def revision_add(self, revision: RevisionEntry): + # Add current revision to the compact DB + self.insert_cache['revision'][revision.id] = revision.date -def content_find_first( - cursor: psycopg2.extensions.cursor, - blobid: str -): - logging.info(f'Retrieving first occurrence of content {hash_to_hex(blobid)}') - cursor.execute('''SELECT blob, rev, date, path - FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev - WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1''', (blobid,)) - return cursor.fetchone() + def revision_add_before_revision(self, relative: RevisionEntry, revision: RevisionEntry): + self.cursor.execute('''INSERT INTO revision_before_rev VALUES (%s,%s)''', + (revision.id, relative.id)) -def content_find_all( - cursor: psycopg2.extensions.cursor, - blobid: str -): - logging.info(f'Retrieving all occurrences of content {hash_to_hex(blobid)}') - cursor.execute('''(SELECT blob, rev, date, path - FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev - WHERE content_early_in_rev.blob=%s) - UNION - (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path - FROM (SELECT content_in_dir.blob, directory_in_rev.rev, - CASE directory_in_rev.path - WHEN '.' THEN content_in_dir.path - ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path - END AS path - FROM content_in_dir - JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir - WHERE content_in_dir.blob=%s) AS content_in_rev - JOIN revision ON revision.id=content_in_rev.rev) - ORDER BY date, rev, path''', (blobid, blobid)) - # POSTGRESQL EXPLAIN - yield from cursor.fetchall() - - -def content_get_early_date( - cursor: psycopg2.extensions.cursor, - cache: dict, - blob: FileEntry -) -> datetime: - logging.debug(f'Getting content {hash_to_hex(blob.id)} early date') - if blob.id in cache['content'].keys(): - return cache['content'][blob.id] - else: - cursor.execute('SELECT date FROM content WHERE id=%s', - (blob.id,)) - row = cursor.fetchone() - return row[0] if row is not None else None - - -def content_set_early_date( - cursor: psycopg2.extensions.cursor, - cache: dict, - blob: FileEntry, - date: datetime -): - logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} (timestamp: {date})') - # cursor.execute('''INSERT INTO content VALUES (%s,%s) - # ON CONFLICT (id) DO UPDATE SET date=%s''', - # (blob.id, date, date)) - cache['content'][blob.id] = date + def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): + self.cursor.execute('''INSERT INTO revision_in_org VALUES (%s,%s) + ON CONFLICT DO NOTHING''', + (revision.id, origin.id)) -def directory_add_to_rev( - cursor: psycopg2.extensions.cursor, - cache: dict, - revision: RevisionEntry, - directory: DirectoryEntry, - path: PosixPath -): - logging.debug(f'NEW occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER of revision {hash_to_hex(revision.id)} (path: {path})') - # cursor.execute('INSERT INTO directory_in_rev VALUES (%s,%s,%s)', - # (directory.id, revision.id, bytes(normalize(path)))) - cache['directory_in_rev'].append( - (directory.id, revision.id, bytes(normalize(path))) - ) + def revision_get_early_date(self, revision: RevisionEntry) -> datetime: + # logging.debug(f'Getting revision {hash_to_hex(revision.id)} early date') + # First check if the date is being modified by current transection. + date = self.insert_cache['revision'].get(revision.id, None) + if date is None: + # If not, check whether it's been query before + date = self.select_cache['revision'].get(revision.id, None) + if date is None: + # Otherwise, query the database and cache the value + self.cursor.execute('''SELECT date FROM revision WHERE id=%s''', + (revision.id,)) + row = self.cursor.fetchone() + date = row[0] if row is not None else None + self.select_cache['revision'][revision.id] = date + return date -def directory_get_early_date( - cursor: psycopg2.extensions.cursor, - cache: dict, - directory: DirectoryEntry -) -> datetime: - logging.debug(f'Getting directory {hash_to_hex(directory.id)} early date') - if directory.id in cache['directory'].keys(): - return cache['directory'][directory.id] - else: - cursor.execute('SELECT date FROM directory WHERE id=%s', - (directory.id,)) - row = cursor.fetchone() - return row[0] if row is not None else None + def revision_get_prefered_origin(self, revision: RevisionEntry) -> int: + self.cursor.execute('''SELECT COALESCE(org,0) FROM revision WHERE id=%s''', + (revision.id,)) + row = self.cursor.fetchone() + # None means revision is not in database + # 0 means revision has no prefered origin + return row[0] if row is not None and row[0] != 0 else None + + + def revision_in_history(self, revision: RevisionEntry) -> bool: + self.cursor.execute('''SELECT 1 FROM revision_before_rev WHERE prev=%s''', + (revision.id,)) + return self.cursor.fetchone() is not None + + + def revision_set_prefered_origin(self, origin: OriginEntry, revision: RevisionEntry): + self.cursor.execute('''UPDATE revision SET org=%s WHERE id=%s''', + (origin.id, revision.id)) + + + def revision_visited(self, revision: RevisionEntry) -> bool: + self.cursor.execute('''SELECT 1 FROM revision_in_org WHERE rev=%s''', + (revision.id,)) + return self.cursor.fetchone() is not None +################################################################################ +################################################################################ +################################################################################ + def directory_process_content( - cursor: psycopg2.extensions.cursor, - cache: dict, + provenance: ProvenanceInterface, directory: DirectoryEntry, relative: DirectoryEntry, prefix: PosixPath ): stack = [(directory, prefix)] while stack: dir, path = stack.pop() for child in iter(dir): if isinstance(child, FileEntry): # Add content to the relative directory with the computed path. - content_add_to_dir(cursor, cache, relative, child, path) + provenance.content_add_to_directory(relative, child, path) else: # Recursively walk the child directory. - # directory_process_content(cursor, child, relative, path / child.name) stack.append((child, path / child.name)) -def directory_set_early_date( - cursor: psycopg2.extensions.cursor, - cache: dict, - directory: DirectoryEntry, - date: datetime -): - logging.debug(f'EARLY occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER (timestamp: {date})') - # cursor.execute('''INSERT INTO directory VALUES (%s,%s) - # ON CONFLICT (id) DO UPDATE SET date=%s''', - # (directory.id, date, date)) - cache['directory'][directory.id] = date - - def origin_add( - conn: psycopg2.extensions.connection, - storage: StorageInterface, + provenance: ProvenanceInterface, origin: OriginEntry ): - with conn.cursor() as cursor: - origin.id = origin_get_id(cursor, origin) + origin.id = provenance.origin_get_id(cursor, origin) - for revision in origin.revisions: - logging.info(f'Processing revision {hash_to_hex(revision.id)} from origin {origin.url}') - origin_add_revision(cursor, storage, origin, revision) + for revision in origin.revisions: + # logging.info(f'Processing revision {hash_to_hex(revision.id)} from origin {origin.url}') + origin_add_revision(provenance, origin, revision) - # Commit after each revision - conn.commit() + # Commit after each revision + provenance.commit() # TODO: verify this! def origin_add_revision( - cursor: psycopg2.extensions.cursor, - storage: StorageInterface, + provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry ): stack = [(None, revision)] while stack: relative, rev = stack.pop() # Check if current revision has no prefered origin and update if necessary. - prefered = revision_get_prefered_org(cursor, rev) - logging.debug(f'Prefered origin for revision {hash_to_hex(rev.id)}: {prefered}') + prefered = provenance.revision_get_prefered_origin(rev) + # logging.debug(f'Prefered origin for revision {hash_to_hex(rev.id)}: {prefered}') if prefered is None: - revision_set_prefered_org(cursor, origin, rev) + provenance.revision_set_prefered_origin(origin, rev) ######################################################################## if relative is None: # This revision is pointed directly by the origin. - visited = revision_visited(cursor, rev) + visited = provenance.revision_visited(rev) logging.debug(f'Revision {hash_to_hex(rev.id)} in origin {origin.id}: {visited}') logging.debug(f'Adding revision {hash_to_hex(rev.id)} to origin {origin.id}') - revision_add_to_org(cursor, origin, rev) + provenance.revision_add_to_origin(origin, rev) if not visited: - # revision_walk_history(cursor, origin, rev.id, rev) stack.append((rev, rev)) else: # This revision is a parent of another one in the history of the # relative revision. for parent in iter(rev): - visited = revision_visited(cursor, parent) + visited = provenance.revision_visited(parent) logging.debug(f'Parent {hash_to_hex(parent.id)} in some origin: {visited}') if not visited: # The parent revision has never been seen before pointing # directly to an origin. - known = revision_in_history(cursor, parent) + known = provenance.revision_in_history(parent) logging.debug(f'Revision {hash_to_hex(parent.id)} before revision: {known}') if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. logging.debug(f'Adding revision {hash_to_hex(parent.id)} directly to origin {origin.id}') - # origin_add_revision(cursor, origin, parent) stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. logging.debug(f'Adding parent revision {hash_to_hex(parent.id)} to revision {hash_to_hex(relative.id)}') - revision_add_before_rev(cursor, relative, parent) - # revision_walk_history(cursor, origin, relative, parent) + provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. logging.debug(f'Adding parent revision {hash_to_hex(parent.id)} to origin {origin.id}') - revision_add_to_org(cursor, origin, parent) - - -def origin_get_id( - cursor: psycopg2.extensions.cursor, - origin: OriginEntry -) -> int: - if origin.id is None: - # Check if current origin is already known and retrieve its internal id. - cursor.execute('''SELECT id FROM origin WHERE url=%s''', (origin.url,)) - row = cursor.fetchone() - - if row is None: - # If the origin is seen for the first time, current revision is - # the prefered one. - cursor.execute('''INSERT INTO origin (url) VALUES (%s) RETURNING id''', - (origin.url,)) - return cursor.fetchone()[0] - else: - return row[0] - else: - return origin.id + provenance.revision_add_to_origin(origin, parent) def revision_add( - conn: psycopg2.extensions.connection, - storage: StorageInterface, - revision: RevisionEntry -): - try: - with conn.cursor() as cursor: - # Processed content starting from the revision's root directory - directory = Tree(storage, revision.root).root - revision_process_dir(cursor, revision, directory) - - # Add current revision to the compact DB - cursor.execute('INSERT INTO revision VALUES (%s,%s,NULL)', - (revision.id, revision.date)) - - # Commit changes (one transaction per revision) - conn.commit() - return True - - except psycopg2.DatabaseError: - # Database error occurred, rollback all changes - conn.rollback() - # TODO: maybe serialize and auto-merge transations. - # The only conflicts are on: - # - content: we keep the earliest date - # - directory: we keep the earliest date - # - content_in_dir: there should be just duplicated entries. - return False - - except Exception as error: - # Unexpected error occurred, rollback all changes and log message - logging.warning(f'Unexpected error: {error}') - conn.rollback() - return False - - -def revision_add_before_rev( - cursor: psycopg2.extensions.cursor, - relative: RevisionEntry, + provenance: ProvenanceInterface, + archive: ArchiveInterface, revision: RevisionEntry ): - cursor.execute('''INSERT INTO revision_before_rev VALUES (%s,%s)''', - (revision.id, relative.id)) - - -def revision_add_to_org( - cursor: psycopg2.extensions.cursor, - origin: OriginEntry, - revision: RevisionEntry -): - cursor.execute('''INSERT INTO revision_in_org VALUES (%s,%s) - ON CONFLICT DO NOTHING''', - (revision.id, origin.id)) - - -def revision_get_prefered_org( - cursor: psycopg2.extensions.cursor, - revision: RevisionEntry -) -> int: - cursor.execute('''SELECT COALESCE(org,0) FROM revision WHERE id=%s''', - (revision.id,)) - row = cursor.fetchone() - # None means revision is not in database - # 0 means revision has no prefered origin - return row[0] if row is not None and row[0] != 0 else None - - -def revision_in_history( - cursor: psycopg2.extensions.cursor, - revision: RevisionEntry -) -> bool: - cursor.execute('''SELECT 1 FROM revision_before_rev WHERE prev=%s''', - (revision.id,)) - return cursor.fetchone() is not None + # Processed content starting from the revision's root directory + directory = Tree(archive, revision.root).root + date = provenance.revision_get_early_date(revision) + if date is None or revision.date < date: + provenance.revision_add(revision) + revision_process_content(provenance, revision, directory) + return provenance.commit() -def revision_process_dir( - cursor: psycopg2.extensions.cursor, +def revision_process_content( + provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry ): - stack = [(directory, directory.name)] - cache = { - "content": dict(), - "content_early_in_rev": list(), - "content_in_dir": list(), - "directory": dict(), - "directory_in_rev": list() - } + stack = [(directory, provenance.directory_date_in_isochrone_frontier(directory), directory.name)] while stack: - dir, path = stack.pop() - - date = directory_get_early_date(cursor, cache, dir) + dir, date, path = stack.pop() if date is None: # The directory has never been seen on the isochrone graph of a # revision. Its children should be checked. - children = [] - for child in iter(dir): - if isinstance(child, FileEntry): - children.append((child, content_get_early_date(cursor, cache, child))) + blobs = [child for child in iter(dir) if isinstance(child, FileEntry)] + dirs = [child for child in iter(dir) if isinstance(child, DirectoryEntry)] + + blobdates = provenance.content_get_early_dates(blobs) + dirdates = provenance.directory_get_early_dates(dirs) + + if blobs + dirs: + dates = list(blobdates.values()) + list(dirdates.values()) + + if len(dates) == len(blobs) + len(dirs) and max(dates) <= revision.date: + # The directory belongs to the isochrone frontier of the + # current revision, and this is the first time it appears + # as such. + provenance.directory_add_to_isochrone_frontier(dir, max(dates)) + provenance.directory_add_to_revision(revision, dir, path) + directory_process_content( + provenance, + directory=dir, + relative=dir, + prefix=PosixPath('.') + ) + else: - children.append((child, directory_get_early_date(cursor, cache, child))) - dates = [child[1] for child in children] - - if dates != [] and None not in dates and max(dates) <= revision.date: - # The directory belongs to the isochrone frontier of the current - # revision, and this is the first time it appears as such. - directory_set_early_date(cursor, cache, dir, max(dates)) - directory_add_to_rev(cursor, cache, revision, dir, path) - directory_process_content(cursor, cache, directory=dir, relative=dir, prefix=PosixPath('.')) - else: - # The directory is not on the isochrone frontier of the current - # revision. Its child nodes should be analyzed. - # revision_process_content(cursor, revision, dir, path) - ################################################################ - for child, date in children: - if isinstance(child, FileEntry): + # The directory is not on the isochrone frontier of the + # current revision. Its child nodes should be analyzed. + ############################################################ + for child in blobs: + date = blobdates.get(child.id, None) if date is None or revision.date < date: - content_set_early_date(cursor, cache, child, revision.date) - content_add_to_rev(cursor, cache, revision, child, path) - else: - # revision_process_dir(cursor, revision, child, path / child.name) - stack.append((child, path / child.name)) - ################################################################ + provenance.content_set_early_date(child, revision.date) + provenance.content_add_to_revision(revision, child, path) + + for child in dirs: + date = dirdates.get(child.id, None) + stack.append((child, date, path / child.name)) + ############################################################ elif revision.date < date: - # The directory has already been seen on the isochrone frontier of a - # revision, but current revision is earlier. Its children should be - # updated. - # revision_process_content(cursor, revision, dir, path) + # The directory has already been seen on the isochrone frontier of + # a revision, but current revision is earlier. Its children should + # be updated. + blobs = [child for child in iter(dir) if isinstance(child, FileEntry)] + dirs = [child for child in iter(dir) if isinstance(child, DirectoryEntry)] + + blobdates = provenance.content_get_early_dates(blobs) + dirdates = provenance.directory_get_early_dates(dirs) + #################################################################### - for child in iter(dir): - if isinstance(child, FileEntry): - date = content_get_early_date(cursor, cache, child) - if date is None or revision.date < date: - content_set_early_date(cursor, cache, child, revision.date) - content_add_to_rev(cursor, cache, revision, child, path) - else: - # revision_process_dir(cursor, revision, child, path / child.name) - stack.append((child, path / child.name)) + for child in blobs: + date = blobdates.get(child.id, None) + if date is None or revision.date < date: + provenance.content_set_early_date(child, revision.date) + provenance.content_add_to_revision(revision, child, path) + + for child in dirs: + date = dirdates.get(child.id, None) + stack.append((child, date, path / child.name)) #################################################################### - directory_set_early_date(cursor, cache, dir, revision.date) + + provenance.directory_add_to_isochrone_frontier(dir, revision.date) else: - # The directory has already been seen on the isochrone frontier of an - # earlier revision. Just add it to the current revision. - directory_add_to_rev(cursor, cache, revision, dir, path) - - # Performe insertions with cached information - psycopg2.extras.execute_values( - cursor, - '''INSERT INTO content(id, date) VALUES %s - ON CONFLICT (id) DO UPDATE SET date=excluded.date''', - cache['content'].items() - ) - - psycopg2.extras.execute_values( - cursor, - '''INSERT INTO content_early_in_rev VALUES %s''', - cache['content_early_in_rev'] - ) - - psycopg2.extras.execute_values( - cursor, - '''INSERT INTO content_in_dir VALUES %s''', - cache['content_in_dir'] - ) - - psycopg2.extras.execute_values( - cursor, - '''INSERT INTO directory(id, date) VALUES %s - ON CONFLICT (id) DO UPDATE SET date=excluded.date''', - cache['directory'].items() - ) - - psycopg2.extras.execute_values( - cursor, - '''INSERT INTO directory_in_rev VALUES %s''', - cache['directory_in_rev'] - ) - - -def revision_set_prefered_org( - cursor: psycopg2.extensions.cursor, - origin: OriginEntry, - revision: RevisionEntry -): - cursor.execute('''UPDATE revision SET org=%s WHERE id=%s''', - (origin.id, revision.id)) + # The directory has already been seen on the isochrone frontier of + # an earlier revision. Just add it to the current revision. + provenance.directory_add_to_revision(revision, dir, path) -def revision_visited( - cursor: psycopg2.extensions.cursor, - revision: RevisionEntry -) -> bool: - cursor.execute('''SELECT 1 FROM revision_in_org WHERE rev=%s''', - (revision.id,)) - return cursor.fetchone() is not None +def get_provenance(conninfo: dict) -> ProvenanceInterface: + # TODO: improve this methos to allow backend selection + conn = connect(conninfo) + return ProvenanceInterface(conn) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index b33763a..0344a93 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,169 +1,168 @@ import logging import threading +from .archive import ArchiveInterface from .db_utils import connect from datetime import datetime from swh.model.hashutil import hash_to_bytes, hash_to_hex -from swh.storage.interface import StorageInterface class RevisionEntry: def __init__( self, - storage: StorageInterface, + archive: ArchiveInterface, id: bytes, date: datetime=None, root: bytes=None, parents: list=None ): + self.archive = archive self.id = id self.date = date self.parents = parents self.root = root - self.storage = storage def __iter__(self): if self.parents is None: self.parents = [] - for parent in self.storage.revision_get([self.id]): + for parent in self.archive.revision_get([self.id]): if parent is not None: self.parents.append( RevisionEntry( - self.storage, + self.archive, parent.id, - parents=[RevisionEntry(self.storage, id) for id in parent.parents] + parents=[RevisionEntry(self.archive, id) for id in parent.parents] ) ) return iter(self.parents) ################################################################################ ################################################################################ class RevisionIterator: """Iterator interface.""" def __iter__(self): pass def __next__(self): pass class FileRevisionIterator(RevisionIterator): """Iterator over revisions present in the given CSV file.""" - def __init__(self, filename: str, storage: StorageInterface, limit: int=None): + def __init__(self, filename: str, archive: ArchiveInterface, limit: int=None): self.file = open(filename) self.idx = 0 self.limit = limit self.mutex = threading.Lock() - self.storage = storage + self.archive = archive def next(self): self.mutex.acquire() line = self.file.readline().strip() if line and (self.limit is None or self.idx < self.limit): self.idx = self.idx + 1 id, date, root = line.strip().split(',') self.mutex.release() return RevisionEntry( - self.storage, + self.archive, hash_to_bytes(id), - date=datetime.strptime(date, '%Y-%m-%d %H:%M:%S%z'), + date=datetime.fromisoformat(date), root=hash_to_bytes(root) ) else: self.mutex.release() return None # class ArchiveRevisionIterator(RevisionIterator): # """Iterator over revisions present in the given database.""" # # def __init__(self, conn, limit=None, chunksize=100): # self.cur = conn.cursor() # self.chunksize = chunksize # self.records = [] # if limit is None: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision''') # else: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision # LIMIT %s''', (limit,)) # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # self.mutex = threading.Lock() # # def __del__(self): # self.cur.close() # # def next(self): # self.mutex.acquire() # if not self.records: # self.records.clear() # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # # if self.records: # revision, *self.records = self.records # self.mutex.release() # return revision # else: # self.mutex.release() # return None # # def make_record(self, row): # # Only revision with author or commiter date are considered # if row[1] is not None: # # If the revision has author date, it takes precedence # return RevisionEntry(row[0], row[1], row[3]) # elif row[2] is not None: # # If not, we use the commiter date # return RevisionEntry(row[0], row[2], row[3]) ################################################################################ ################################################################################ class RevisionWorker(threading.Thread): def __init__( self, id: int, conninfo: dict, - storage: StorageInterface, + archive: ArchiveInterface, revisions: RevisionIterator ): + from .provenance import get_provenance + super().__init__() + self.archive = archive self.id = id - self.conninfo = conninfo + self.provenance = get_provenance(conninfo) self.revisions = revisions - self.storage = storage def run(self): from .provenance import revision_add - conn = connect(self.conninfo) while True: revision = self.revisions.next() if revision is None: break processed = False while not processed: logging.info(f'Thread {self.id} - Processing revision {hash_to_hex(revision.id)} (timestamp: {revision.date})') - processed = revision_add(conn, self.storage, revision) + processed = revision_add(self.provenance, self.archive, revision) if not processed: logging.warning(f'Thread {self.id} - Failed to process revision {hash_to_hex(revision.id)} (timestamp: {revision.date})') - - conn.close()