diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 1f8a978..43e4ad3 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,154 +1,154 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.core.db import db_utils # TODO: remove this in favour of local db_utils module from swh.model.hashutil import (hash_to_bytes, hash_to_hex) from swh.storage import get_storage # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002" }, "db": "postgresql://postgres:postgres@localhost/provenance" # TODO: fix this! } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP, ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file""", ) @click.pass_context def cli(ctx, config_file: Optional[str]): from .db_utils import adapt_conn if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf conn = db_utils.connect_to_conninfo(conf["db"]) adapt_conn(conn) ctx.obj["conn"] = conn @cli.command(name="create") @click.option("--name", default='provenance') @click.pass_context def create(ctx, name): """Create new provenance database.""" from .provenance import create_database from .db_utils import adapt_conn # Close default connection as it won't be used ctx.obj["conn"].close() # Connect to server without selecting a database conninfo = os.path.dirname(ctx.obj["config"]["db"]) conn = db_utils.connect_to_conninfo(conninfo) adapt_conn(conn) create_database(conn, conninfo, name) @cli.command(name="iter-revisions") @click.argument("filename") @click.option('-l', '--limit', type=int) @click.option('-t', '--threads', type=int, default=1) @click.pass_context def iter_revisions(ctx, filename, limit, threads): """Iterate over provided list of revisions and add them to the provenance database.""" - from .provenance import FileRevisionIterator - from .provenance import RevisionWorker + from .revision import FileRevisionIterator + from .revision import RevisionWorker conninfo = ctx.obj["config"]["db"] revisions = FileRevisionIterator(filename, limit=limit) storage = get_storage(**ctx.obj["config"]["storage"]) workers = [] for id in range(threads): worker = RevisionWorker(id, conninfo, storage, revisions) worker.start() workers.append(worker) for worker in workers: worker.join() @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from .provenance import content_find_first conn = ctx.obj["conn"] cursor = conn.cursor(); row = content_find_first(cursor, hash_to_bytes(swhid)) print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') @cli.command(name="find-all") @click.argument("swhid") @click.pass_context def find_all(ctx, swhid): """Find all occurrences of the requested blob.""" from .provenance import content_find_all conn = ctx.obj["conn"] cursor = conn.cursor(); for row in content_find_all(cursor, hash_to_bytes(swhid)): print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') diff --git a/swh/provenance/model.py b/swh/provenance/model.py index d7051b8..e382924 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,61 +1,45 @@ -import operator import os -# import psycopg2 -import swh.storage.interface from pathlib import PosixPath - -# from swh.storage.postgresql.db import Db - -CONTENT = "file" -DIRECTORY = "dir" - -# OTYPE_IDX = 1 -# PATH_IDX = 3 -# SWHID_IDX = 2 +from swh.storage.interface import StorageInterface class Tree: - def __init__(self, storage: swh.storage.interface.StorageInterface, swhid: str): - self.root = DirectoryEntry(storage, swhid, PosixPath('.')) + def __init__(self, storage: StorageInterface, id: str): + self.root = DirectoryEntry(storage, id, PosixPath('.')) class TreeEntry: - def __init__(self, swhid: str, name: PosixPath): - self.swhid = swhid + def __init__(self, id: str, name: PosixPath): + self.id = id self.name = name class DirectoryEntry(TreeEntry): - def __init__( - self, - storage: swh.storage.interface.StorageInterface, - swhid: str, - name: PosixPath - ): - super().__init__(swhid, name) + def __init__(self, storage: StorageInterface, id: str, name: PosixPath): + super().__init__(id, name) self.storage = storage self.children = None def __iter__(self): if self.children is None: self.children = [] - for child in self.storage.directory_ls(self.swhid): - if child['type'] == CONTENT: - self.children.append(FileEntry( + for child in self.storage.directory_ls(self.id): + if child['type'] == 'dir': + self.children.append(DirectoryEntry( + self.storage, child['target'], PosixPath(os.fsdecode(child['name'])) )) - elif child['type'] == DIRECTORY: - self.children.append(DirectoryEntry( - self.storage, + elif child['type'] == 'file': + self.children.append(FileEntry( child['target'], PosixPath(os.fsdecode(child['name'])) )) return iter(self.children) class FileEntry(TreeEntry): pass diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index cdc1f37..da53635 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,395 +1,361 @@ import logging import os import psycopg2 -import threading from .db_utils import ( adapt_conn, execute_sql ) from .model import ( DirectoryEntry, FileEntry, - TreeEntry, Tree ) -from .revision import ( - RevisionEntry, - RevisionIterator, - ArchiveRevisionIterator, - FileRevisionIterator -) +from .revision import RevisionEntry from datetime import datetime from pathlib import PosixPath from swh.core.db import db_utils # TODO: remove this in favour of local db_utils module from swh.model.hashutil import hash_to_hex from swh.storage.interface import StorageInterface def create_database( conn: psycopg2.extensions.connection, conninfo: str, name: str ): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) # Create new database dropping previous one if exists cursor = conn.cursor(); cursor.execute(f'''DROP DATABASE IF EXISTS {name}''') cursor.execute(f'''CREATE DATABASE {name};'''); conn.close() # Reconnect to server selecting newly created database to add tables conn = db_utils.connect_to_conninfo(os.path.join(conninfo, name)) adapt_conn(conn) - dir = os.path.dirname(os.path.realpath(__file__)) - execute_sql(conn, os.path.join(dir, 'db/provenance.sql')) + sqldir = os.path.dirname(os.path.realpath(__file__)) + execute_sql(conn, os.path.join(sqldir, 'db/provenance.sql')) def revision_add( - cursor: psycopg2.extensions.cursor, + conn: psycopg2.extensions.connection, storage: StorageInterface, - revision: RevisionEntry, - id : int + revision: RevisionEntry ): - logging.info(f'Thread {id} - Processing revision {hash_to_hex(revision.swhid)} (timestamp: {revision.timestamp})') - # Processed content starting from the revision's root directory - directory = Tree(storage, revision.directory).root - revision_process_directory(cursor, revision, directory, directory.name) - # Add current revision to the compact DB - cursor.execute('INSERT INTO revision VALUES (%s,%s, NULL)', (revision.swhid, revision.timestamp)) + with conn.cursor() as cursor: + try: + # Processed content starting from the revision's root directory + directory = Tree(storage, revision.root).root + revision_process_dir(cursor, revision, directory) + + # Add current revision to the compact DB + cursor.execute('INSERT INTO revision VALUES (%s,%s,NULL)', + (revision.id, revision.date)) + + # Commit changes (one transaction per revision) + conn.commit() + return True + + except psycopg2.DatabaseError: + # Database error occurred, rollback all changes + conn.rollback() + # TODO: maybe serialize and auto-merge transations. + # The only conflicts are on: + # - content: we keep the earliest date + # - directory: we keep the earliest date + # - content_in_dir: there should be just duplicated entries. + return False + + except Exception as error: + # Unexpected error occurred, rollback all changes and log message + logging.warning(f'Unexpected error: {error}') + conn.rollback() + return False def content_find_first( cursor: psycopg2.extensions.cursor, - swhid: str + blobid: str ): - logging.info(f'Retrieving first occurrence of content {hash_to_hex(swhid)}') + logging.info(f'Retrieving first occurrence of content {hash_to_hex(blobid)}') cursor.execute('''SELECT blob, rev, date, path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev - WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1''', (swhid,)) + WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1''', (blobid,)) return cursor.fetchone() def content_find_all( cursor: psycopg2.extensions.cursor, - swhid: str + blobid: str ): - logging.info(f'Retrieving all occurrences of content {hash_to_hex(swhid)}') + logging.info(f'Retrieving all occurrences of content {hash_to_hex(blobid)}') cursor.execute('''(SELECT blob, rev, date, path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s) UNION (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path FROM (SELECT content_in_dir.blob, directory_in_rev.rev, CASE directory_in_rev.path WHEN '.' THEN content_in_dir.path ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path END AS path FROM content_in_dir JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir WHERE content_in_dir.blob=%s) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev) - ORDER BY date, rev, path''', (swhid, swhid)) + ORDER BY date, rev, path''', (blobid, blobid)) # POSTGRESQL EXPLAIN yield from cursor.fetchall() ################################################################################ ################################################################################ ################################################################################ def normalize(path: PosixPath) -> PosixPath: spath = str(path) if spath.startswith('./'): return PosixPath(spath[2:]) return path -def content_get_early_timestamp( +def content_get_early_date( cursor: psycopg2.extensions.cursor, cache: dict, blob: FileEntry -): - logging.debug(f'Getting content {hash_to_hex(blob.swhid)} early timestamp') - if blob.swhid in cache['content'].keys(): - return cache['content'][blob.swhid] +) -> datetime: + logging.debug(f'Getting content {hash_to_hex(blob.id)} early date') + if blob.id in cache['content'].keys(): + return cache['content'][blob.id] else: cursor.execute('SELECT date FROM content WHERE id=%s', - (blob.swhid,)) + (blob.id,)) row = cursor.fetchone() return row[0] if row is not None else None -def content_set_early_timestamp( +def content_set_early_date( cursor: psycopg2.extensions.cursor, cache: dict, blob: FileEntry, - timestamp: datetime + date: datetime ): - logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.swhid)} (timestamp: {timestamp})') + logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} (timestamp: {date})') # cursor.execute('''INSERT INTO content VALUES (%s,%s) # ON CONFLICT (id) DO UPDATE SET date=%s''', - # (blob.swhid, timestamp, timestamp)) - cache['content'][blob.swhid] = timestamp + # (blob.id, date, date)) + cache['content'][blob.id] = date -def content_add_to_directory( +def content_add_to_dir( cursor: psycopg2.extensions.cursor, cache: dict, directory: DirectoryEntry, blob: FileEntry, prefix: PosixPath ): - logging.debug(f'NEW occurrence of content {hash_to_hex(blob.swhid)} in directory {hash_to_hex(directory.swhid)} (path: {prefix / blob.name})') + logging.debug(f'NEW occurrence of content {hash_to_hex(blob.id)} in directory {hash_to_hex(directory.id)} (path: {prefix / blob.name})') # cursor.execute('INSERT INTO content_in_dir VALUES (%s,%s,%s)', - # (blob.swhid, directory.swhid, bytes(normalize(prefix / blob.name)))) + # (blob.id, directory.id, bytes(normalize(prefix / blob.name)))) cache['content_in_dir'].append( - (blob.swhid, directory.swhid, bytes(normalize(prefix / blob.name))) + (blob.id, directory.id, bytes(normalize(prefix / blob.name))) ) -def content_add_to_revision( +def content_add_to_rev( cursor: psycopg2.extensions.cursor, cache: dict, revision: RevisionEntry, blob: FileEntry, prefix: PosixPath ): - logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.swhid)} in revision {hash_to_hex(revision.swhid)} (path: {prefix / blob.name})') + logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} in revision {hash_to_hex(revision.id)} (path: {prefix / blob.name})') # cursor.execute('INSERT INTO content_early_in_rev VALUES (%s,%s,%s)', - # (blob.swhid, revision.swhid, bytes(normalize(prefix / blob.name)))) + # (blob.id, revision.id, bytes(normalize(prefix / blob.name)))) cache['content_early_in_rev'].append( - (blob.swhid, revision.swhid, bytes(normalize(prefix / blob.name))) + (blob.id, revision.id, bytes(normalize(prefix / blob.name))) ) -def directory_get_early_timestamp( +def directory_get_early_date( cursor: psycopg2.extensions.cursor, cache: dict, directory: DirectoryEntry -): - logging.debug(f'Getting directory {hash_to_hex(directory.swhid)} early timestamp') - if directory.swhid in cache['directory'].keys(): - return cache['directory'][directory.swhid] +) -> datetime: + logging.debug(f'Getting directory {hash_to_hex(directory.id)} early date') + if directory.id in cache['directory'].keys(): + return cache['directory'][directory.id] else: cursor.execute('SELECT date FROM directory WHERE id=%s', - (directory.swhid,)) + (directory.id,)) row = cursor.fetchone() return row[0] if row is not None else None -def directory_set_early_timestamp( +def directory_set_early_date( cursor: psycopg2.extensions.cursor, cache: dict, directory: DirectoryEntry, - timestamp: datetime + date: datetime ): - logging.debug(f'EARLY occurrence of directory {hash_to_hex(directory.swhid)} on the ISOCHRONE FRONTIER (timestamp: {timestamp})') + logging.debug(f'EARLY occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER (timestamp: {date})') # cursor.execute('''INSERT INTO directory VALUES (%s,%s) # ON CONFLICT (id) DO UPDATE SET date=%s''', - # (directory.swhid, timestamp, timestamp)) - cache['directory'][directory.swhid] = timestamp + # (directory.id, date, date)) + cache['directory'][directory.id] = date -def directory_add_to_revision( +def directory_add_to_rev( cursor: psycopg2.extensions.cursor, cache: dict, revision: RevisionEntry, directory: DirectoryEntry, path: PosixPath ): - logging.debug(f'NEW occurrence of directory {hash_to_hex(directory.swhid)} on the ISOCHRONE FRONTIER of revision {hash_to_hex(revision.swhid)} (path: {path})') + logging.debug(f'NEW occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER of revision {hash_to_hex(revision.id)} (path: {path})') # cursor.execute('INSERT INTO directory_in_rev VALUES (%s,%s,%s)', - # (directory.swhid, revision.swhid, bytes(normalize(path)))) + # (directory.id, revision.id, bytes(normalize(path)))) cache['directory_in_rev'].append( - (directory.swhid, revision.swhid, bytes(normalize(path))) + (directory.id, revision.id, bytes(normalize(path))) ) def directory_process_content( cursor: psycopg2.extensions.cursor, cache: dict, directory: DirectoryEntry, relative: DirectoryEntry, prefix: PosixPath ): - stack = [(directory, relative, prefix)] + stack = [(directory, prefix)] while stack: - directory, relative, prefix = stack.pop() + dir, path = stack.pop() - for child in iter(directory): + for child in iter(dir): if isinstance(child, FileEntry): - # Add content to the relative directory with the computed prefix. - content_add_to_directory(cursor, cache, relative, child, prefix) + # Add content to the relative directory with the computed path. + content_add_to_dir(cursor, cache, relative, child, path) else: # Recursively walk the child directory. - # directory_process_content(cursor, child, relative, prefix / child.name) - stack.append((child, relative, prefix / child.name)) + # directory_process_content(cursor, child, relative, path / child.name) + stack.append((child, path / child.name)) -def revision_process_directory( +def revision_process_dir( cursor: psycopg2.extensions.cursor, revision: RevisionEntry, - directory: DirectoryEntry, - path: PosixPath + directory: DirectoryEntry ): - stack = [(revision, directory, path)] + stack = [(directory, directory.name)] cache = { "content": dict(), "content_early_in_rev": list(), "content_in_dir": list(), "directory": dict(), "directory_in_rev": list() } while stack: - revision, directory, path = stack.pop() + dir, path = stack.pop() - timestamp = directory_get_early_timestamp(cursor, cache, directory) + date = directory_get_early_date(cursor, cache, dir) - if timestamp is None: + if date is None: # The directory has never been seen on the isochrone graph of a # revision. Its children should be checked. children = [] - for child in iter(directory): + for child in iter(dir): if isinstance(child, FileEntry): - children.append((child, content_get_early_timestamp(cursor, cache, child))) + children.append((child, content_get_early_date(cursor, cache, child))) else: - children.append((child, directory_get_early_timestamp(cursor, cache, child))) - timestamps = [x[1] for x in children] - # timestamps = list(zip(*children))[1] + children.append((child, directory_get_early_date(cursor, cache, child))) + dates = [child[1] for child in children] - if timestamps != [] and None not in timestamps and max(timestamps) <= revision.timestamp: + if dates != [] and None not in dates and max(dates) <= revision.date: # The directory belongs to the isochrone frontier of the current # revision, and this is the first time it appears as such. - directory_set_early_timestamp(cursor, cache, directory, max(timestamps)) - directory_add_to_revision(cursor, cache, revision, directory, path) - directory_process_content(cursor, cache, directory=directory, relative=directory, prefix=PosixPath('.')) + directory_set_early_date(cursor, cache, dir, max(dates)) + directory_add_to_rev(cursor, cache, revision, dir, path) + directory_process_content(cursor, cache, directory=dir, relative=dir, prefix=PosixPath('.')) else: # The directory is not on the isochrone frontier of the current # revision. Its child nodes should be analyzed. - # revision_process_content(cursor, revision, directory, path) + # revision_process_content(cursor, revision, dir, path) ################################################################ - for child, timestamp in children: + for child, date in children: if isinstance(child, FileEntry): - if timestamp is None or revision.timestamp < timestamp: - content_set_early_timestamp(cursor, cache, child, revision.timestamp) - content_add_to_revision(cursor, cache, revision, child, path) + if date is None or revision.date < date: + content_set_early_date(cursor, cache, child, revision.date) + content_add_to_rev(cursor, cache, revision, child, path) else: - # revision_process_directory(cursor, revision, child, path / child.name) - stack.append((revision, child, path / child.name)) + # revision_process_dir(cursor, revision, child, path / child.name) + stack.append((child, path / child.name)) ################################################################ - elif revision.timestamp < timestamp: + elif revision.date < date: # The directory has already been seen on the isochrone frontier of a # revision, but current revision is earlier. Its children should be # updated. - # revision_process_content(cursor, revision, directory, path) + # revision_process_content(cursor, revision, dir, path) #################################################################### - for child in iter(directory): + for child in iter(dir): if isinstance(child, FileEntry): - timestamp = content_get_early_timestamp(cursor, cache, child) - if timestamp is None or revision.timestamp < timestamp: - content_set_early_timestamp(cursor, cache, child, revision.timestamp) - content_add_to_revision(cursor, cache, revision, child, path) + date = content_get_early_date(cursor, cache, child) + if date is None or revision.date < date: + content_set_early_date(cursor, cache, child, revision.date) + content_add_to_rev(cursor, cache, revision, child, path) else: - # revision_process_directory(cursor, revision, child, path / child.name) - stack.append((revision, child, path / child.name)) + # revision_process_dir(cursor, revision, child, path / child.name) + stack.append((child, path / child.name)) #################################################################### - directory_set_early_timestamp(cursor, cache, directory, revision.timestamp) + directory_set_early_date(cursor, cache, dir, revision.date) else: # The directory has already been seen on the isochrone frontier of an # earlier revision. Just add it to the current revision. - directory_add_to_revision(cursor, cache, revision, directory, path) + directory_add_to_rev(cursor, cache, revision, dir, path) perform_insertions(cursor, cache) def perform_insertions( cursor: psycopg2.extensions.cursor, cache: dict ): psycopg2.extras.execute_values( cursor, '''INSERT INTO content(id, date) VALUES %s ON CONFLICT (id) DO UPDATE SET date=excluded.date''', cache['content'].items() ) psycopg2.extras.execute_values( cursor, '''INSERT INTO content_early_in_rev VALUES %s''', cache['content_early_in_rev'] ) psycopg2.extras.execute_values( cursor, '''INSERT INTO content_in_dir VALUES %s''', cache['content_in_dir'] ) psycopg2.extras.execute_values( cursor, '''INSERT INTO directory(id, date) VALUES %s ON CONFLICT (id) DO UPDATE SET date=excluded.date''', cache['directory'].items() ) psycopg2.extras.execute_values( cursor, '''INSERT INTO directory_in_rev VALUES %s''', cache['directory_in_rev'] ) - - -################################################################################ -################################################################################ -################################################################################ - - -class RevisionWorker(threading.Thread): - def __init__( - self, - id: int, - conninfo: str, - storage: StorageInterface, - revisions: RevisionIterator - ): - super().__init__() - self.id = id - self.conninfo = conninfo - self.revisions = revisions - self.storage = storage - - - def run(self): - conn = db_utils.connect_to_conninfo(self.conninfo) - adapt_conn(conn) - with conn.cursor() as cursor: - while True: - processed = False - revision = self.revisions.next() - if revision is None: break - - while not processed: - try: - revision_add(cursor, self.storage, revision, self.id) - conn.commit() - processed = True - except psycopg2.DatabaseError: - logging.warning(f'Thread {self.id} - Failed to process revision {hash_to_hex(revision.swhid)} (timestamp: {revision.timestamp})') - conn.rollback() - # TODO: maybe serialize and auto-merge transations. - # The only conflicts are on: - # - content: we keep the earliest timestamp - # - directory: we keep the earliest timestamp - # - content_in_dir: there should be just duplicated entries. - except Exception as error: - logging.warning(f'Exection: {error}') - conn.close() diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index 64e656a..283a3cb 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,101 +1,146 @@ -import io -import psycopg2 +import logging import threading +from .db_utils import adapt_conn + from datetime import datetime -from swh.model.identifiers import identifier_to_bytes +from swh.core.db import db_utils # TODO: remove this in favour of local db_utils module +from swh.model.hashutil import (hash_to_bytes, hash_to_hex) +from swh.storage.interface import StorageInterface + + +class RevisionEntry: + def __init__(self, id, date, root): + self.id = id + self.date = date + self.root = root + + +################################################################################ +################################################################################ class RevisionIterator: """Iterator interface.""" def __iter__(self): pass def __next__(self): pass -class RevisionEntry: - def __init__(self, swhid, timestamp, directory): - self.swhid = swhid - self.timestamp = timestamp - self.directory = directory - - class FileRevisionIterator(RevisionIterator): """Iterator over revisions present in the given CSV file.""" def __init__(self, filename, limit=None): self.filename = filename self.limit = limit - self.file = io.open(self.filename) + self.file = open(self.filename) self.idx = 0 self.mutex = threading.Lock() def next(self): self.mutex.acquire() line = self.file.readline().strip() if line and (self.limit is None or self.idx < self.limit): self.idx = self.idx + 1 - swhid, timestamp, directory = line.strip().split(',') + id, date, root = line.strip().split(',') self.mutex.release() return RevisionEntry( - identifier_to_bytes(swhid), - datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S%z'), - identifier_to_bytes(directory) + hash_to_bytes(id), + datetime.strptime(date, '%Y-%m-%d %H:%M:%S%z'), + hash_to_bytes(root) ) else: self.mutex.release() return None -class ArchiveRevisionIterator(RevisionIterator): - """Iterator over revisions present in the given database.""" - - def __init__(self, conn, limit=None, chunksize=100): - self.cur = conn.cursor() - self.chunksize = chunksize - self.records = [] - if limit is None: - self.cur.execute('''SELECT id, date, committer_date, directory - FROM revision''') - else: - self.cur.execute('''SELECT id, date, committer_date, directory - FROM revision - LIMIT %s''', (limit,)) - for row in self.cur.fetchmany(self.chunksize): - record = self.make_record(row) - if record is not None: - self.records.append(record) - self.mutex = threading.Lock() - - def __del__(self): - self.cur.close() - - def next(self): - self.mutex.acquire() - if not self.records: - self.records.clear() - for row in self.cur.fetchmany(self.chunksize): - record = self.make_record(row) - if record is not None: - self.records.append(record) - - if self.records: - revision, *self.records = self.records - self.mutex.release() - return revision - else: - self.mutex.release() - return None - - def make_record(self, row): - # Only revision with author or commiter date are considered - if row[1] is not None: - # If the revision has author date, it takes precedence - return RevisionEntry(row[0], row[1], row[3]) - elif row[2] is not None: - # If not, we use the commiter date - return RevisionEntry(row[0], row[2], row[3]) +# class ArchiveRevisionIterator(RevisionIterator): +# """Iterator over revisions present in the given database.""" +# +# def __init__(self, conn, limit=None, chunksize=100): +# self.cur = conn.cursor() +# self.chunksize = chunksize +# self.records = [] +# if limit is None: +# self.cur.execute('''SELECT id, date, committer_date, directory +# FROM revision''') +# else: +# self.cur.execute('''SELECT id, date, committer_date, directory +# FROM revision +# LIMIT %s''', (limit,)) +# for row in self.cur.fetchmany(self.chunksize): +# record = self.make_record(row) +# if record is not None: +# self.records.append(record) +# self.mutex = threading.Lock() +# +# def __del__(self): +# self.cur.close() +# +# def next(self): +# self.mutex.acquire() +# if not self.records: +# self.records.clear() +# for row in self.cur.fetchmany(self.chunksize): +# record = self.make_record(row) +# if record is not None: +# self.records.append(record) +# +# if self.records: +# revision, *self.records = self.records +# self.mutex.release() +# return revision +# else: +# self.mutex.release() +# return None +# +# def make_record(self, row): +# # Only revision with author or commiter date are considered +# if row[1] is not None: +# # If the revision has author date, it takes precedence +# return RevisionEntry(row[0], row[1], row[3]) +# elif row[2] is not None: +# # If not, we use the commiter date +# return RevisionEntry(row[0], row[2], row[3]) + + +################################################################################ +################################################################################ + +class RevisionWorker(threading.Thread): + def __init__( + self, + id: int, + conninfo: str, + storage: StorageInterface, + revisions: RevisionIterator + ): + super().__init__() + self.id = id + self.conninfo = conninfo + self.revisions = revisions + self.storage = storage + + + def run(self): + from .provenance import revision_add + + conn = db_utils.connect_to_conninfo(self.conninfo) + adapt_conn(conn) + + while True: + revision = self.revisions.next() + if revision is None: break + + processed = False + while not processed: + logging.info(f'Thread {self.id} - Processing revision {hash_to_hex(revision.id)} (timestamp: {revision.date})') + processed = revision_add(conn, self.storage, revision) + if not processed: + logging.warning(f'Thread {self.id} - Failed to process revision {hash_to_hex(revision.id)} (timestamp: {revision.date})') + + conn.close()