diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index ba4d0d2..51cc895 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,239 +1,193 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import (hash_to_bytes, hash_to_hex) # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002" } # "cls": "ps", # "db": { # "host": "db.internal.softwareheritage.org", # "database": "softwareheritage", # "user": "guest" # } }, "db": { "host": "localhost", "database": "new_test", "user": "postgres", "password": "postgres" } } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP, ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file""", ) @click.option("--profile", default=None) @click.pass_context def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf - ctx.obj["profile"] = profile - - # if profile: - # import cProfile - # import pstats - # import io - # import atexit - # - # print("Profiling...") - # pr = cProfile.Profile() - # pr.enable() - # - # def exit(): - # pr.disable() - # # print("Profiling completed") - # # s = io.StringIO() - # # pstats.Stats(pr, stream=s).sort_stats(SortKey.CUMULATIVE) - # # print(s.getvalue()) - # ps = pstats.Stats(pr).sort_stats(SortKey.CUMULATIVE) - # with io.open(ctx.obj["profile"], 'wb') as prof: - # prof.write(ps) - # - # atexit.register(exit) + # ctx.obj["profile"] = profile + + if profile: + import cProfile + import atexit + + print("Profiling...") + pr = cProfile.Profile() + pr.enable() + + def exit(): + pr.disable() + pr.dump_stats(profile) + + atexit.register(exit) @cli.command(name="create") @click.option("--name", default=None) @click.pass_context def create(ctx, name): """Create new provenance database.""" from .db_utils import connect from .provenance import create_database # Connect to server without selecting a database conninfo = ctx.obj["config"]["db"] database = conninfo.pop('database', None) conn = connect(conninfo) if name is None: name = database create_database(conn, conninfo, name) @cli.command(name="iter-revisions") @click.argument("filename") @click.option('-l', '--limit', type=int) @click.option('-t', '--threads', type=int, default=1) @click.pass_context def iter_revisions(ctx, filename, limit, threads): """Iterate over provided list of revisions and add them to the provenance database.""" from .archive import get_archive + from .provenance import get_provenance, revision_add from .revision import FileRevisionIterator - # from .revision import RevisionWorker - - # conninfo = ctx.obj["config"]["db"] - # archive = get_archive(**ctx.obj["config"]["archive"]) - # revisions = FileRevisionIterator(filename, archive, limit=limit) - # workers = [] - # - # for id in range(threads): - # worker = RevisionWorker(id, conninfo, archive, revisions) - # worker.start() - # workers.append(worker) - # - # for worker in workers: - # worker.join() - - ############################################################################ + archive = get_archive(**ctx.obj["config"]["archive"]) + provenance = get_provenance(ctx.obj["config"]["db"]) revisions = FileRevisionIterator(filename, archive, limit=limit) - if ctx.obj["profile"]: - from .provenance import get_provenance, revision_add - import cProfile - - provenance = get_provenance(ctx.obj["config"]["db"]) - command = """ -while True: - revision = revisions.next() - if revision is None: break - revision_add(provenance, archive, revision) - """ - - cProfile.runctx(command, globals(), locals(), filename=ctx.obj["profile"]) - - else: - from .revision import RevisionWorker - - conninfo = ctx.obj["config"]["db"] - workers = [] - - for id in range(threads): - worker = RevisionWorker(id, conninfo, archive, revisions) - worker.start() - workers.append(worker) - - for worker in workers: - worker.join() - ############################################################################ + while True: + revision = revisions.next() + if revision is None: break + revision_add(provenance, archive, revision) @cli.command(name="iter-origins") @click.argument("filename") @click.option('-l', '--limit', type=int) #@click.option('-t', '--threads', type=int, default=1) @click.pass_context #def iter_revisions(ctx, filename, limit, threads): def iter_origins(ctx, filename, limit): """Iterate over provided list of revisions and add them to the provenance database.""" from .archive import get_archive from .origin import FileOriginIterator from .provenance import get_provenance, origin_add provenance = get_provenance(ctx.obj["config"]["db"]) archive = get_archive(**ctx.obj["config"]["archive"]) for origin in FileOriginIterator(filename, archive, limit=limit): - # TODO: consider using threads and a OriginWorker class origin_add(provenance, origin) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from .provenance import get_provenance provenance = get_provenance(ctx.obj["config"]["db"]) # TODO: return a dictionary with proper keys for each field row = provenance.content_find_first(hash_to_bytes(swhid)) if row is not None: print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') else: print(f'Cannot find a content with the id {swhid}') @cli.command(name="find-all") @click.argument("swhid") @click.pass_context def find_all(ctx, swhid): """Find all occurrences of the requested blob.""" from .provenance import get_provenance provenance = get_provenance(ctx.obj["config"]["db"]) # TODO: return a dictionary with proper keys for each field for row in provenance.content_find_all(hash_to_bytes(swhid)): print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index 0344a93..7b6f49b 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,168 +1,168 @@ import logging import threading from .archive import ArchiveInterface from .db_utils import connect from datetime import datetime from swh.model.hashutil import hash_to_bytes, hash_to_hex class RevisionEntry: def __init__( self, archive: ArchiveInterface, id: bytes, date: datetime=None, root: bytes=None, parents: list=None ): self.archive = archive self.id = id self.date = date self.parents = parents self.root = root def __iter__(self): if self.parents is None: self.parents = [] for parent in self.archive.revision_get([self.id]): if parent is not None: self.parents.append( RevisionEntry( self.archive, parent.id, parents=[RevisionEntry(self.archive, id) for id in parent.parents] ) ) return iter(self.parents) ################################################################################ ################################################################################ class RevisionIterator: """Iterator interface.""" def __iter__(self): pass def __next__(self): pass class FileRevisionIterator(RevisionIterator): """Iterator over revisions present in the given CSV file.""" def __init__(self, filename: str, archive: ArchiveInterface, limit: int=None): self.file = open(filename) self.idx = 0 self.limit = limit self.mutex = threading.Lock() self.archive = archive def next(self): self.mutex.acquire() line = self.file.readline().strip() if line and (self.limit is None or self.idx < self.limit): self.idx = self.idx + 1 id, date, root = line.strip().split(',') self.mutex.release() return RevisionEntry( self.archive, hash_to_bytes(id), date=datetime.fromisoformat(date), root=hash_to_bytes(root) ) else: self.mutex.release() return None # class ArchiveRevisionIterator(RevisionIterator): # """Iterator over revisions present in the given database.""" # # def __init__(self, conn, limit=None, chunksize=100): # self.cur = conn.cursor() # self.chunksize = chunksize # self.records = [] # if limit is None: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision''') # else: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision # LIMIT %s''', (limit,)) # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # self.mutex = threading.Lock() # # def __del__(self): # self.cur.close() # # def next(self): # self.mutex.acquire() # if not self.records: # self.records.clear() # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # # if self.records: # revision, *self.records = self.records # self.mutex.release() # return revision # else: # self.mutex.release() # return None # # def make_record(self, row): # # Only revision with author or commiter date are considered # if row[1] is not None: # # If the revision has author date, it takes precedence # return RevisionEntry(row[0], row[1], row[3]) # elif row[2] is not None: # # If not, we use the commiter date # return RevisionEntry(row[0], row[2], row[3]) ################################################################################ ################################################################################ -class RevisionWorker(threading.Thread): - def __init__( - self, - id: int, - conninfo: dict, - archive: ArchiveInterface, - revisions: RevisionIterator - ): - from .provenance import get_provenance - - super().__init__() - self.archive = archive - self.id = id - self.provenance = get_provenance(conninfo) - self.revisions = revisions - - - def run(self): - from .provenance import revision_add - - - while True: - revision = self.revisions.next() - if revision is None: break - - processed = False - while not processed: - logging.info(f'Thread {self.id} - Processing revision {hash_to_hex(revision.id)} (timestamp: {revision.date})') - processed = revision_add(self.provenance, self.archive, revision) - if not processed: - logging.warning(f'Thread {self.id} - Failed to process revision {hash_to_hex(revision.id)} (timestamp: {revision.date})') +# class RevisionWorker(threading.Thread): +# def __init__( +# self, +# id: int, +# conninfo: dict, +# archive: ArchiveInterface, +# revisions: RevisionIterator +# ): +# from .provenance import get_provenance +# +# super().__init__() +# self.archive = archive +# self.id = id +# self.provenance = get_provenance(conninfo) +# self.revisions = revisions +# +# +# def run(self): +# from .provenance import revision_add +# +# +# while True: +# revision = self.revisions.next() +# if revision is None: break +# +# processed = False +# while not processed: +# logging.info(f'Thread {self.id} - Processing revision {hash_to_hex(revision.id)} (timestamp: {revision.date})') +# processed = revision_add(self.provenance, self.archive, revision) +# if not processed: +# logging.warning(f'Thread {self.id} - Failed to process revision {hash_to_hex(revision.id)} (timestamp: {revision.date})')