diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index a22cb1d..77342e0 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,200 +1,200 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002", } # "cls": "local", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, "provenance": {"cls": "local", "db": {"host": "localhost", "dbname": "provenance"}}, } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit(): pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="create", deprecated=True) @click.option("--maintenance-db", default=None) @click.option("--drop/--no-drop", "drop_db", default=False) @click.pass_context def create(ctx, maintenance_db, drop_db): """Deprecated, please use: swh db create provenance and swh db init provenance instead. """ @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_revisions(ctx, filename, limit): # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance from .provenance import revision_add - from .revision import FileRevisionIterator + from .revision import CSVRevisionIterator archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) - revisions = FileRevisionIterator(filename, archive, limit=limit) + revisions_provider = ( + line.strip().split(",") for line in open(filename, "r") if line.strip() + ) + revisions = CSVRevisionIterator(revisions_provider, archive, limit=limit) - while True: - revision = revisions.next() - if revision is None: - break + for revision in revisions: revision_add(provenance, archive, revision) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx, filename, limit): """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import FileOriginIterator from .provenance import origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) for origin in FileOriginIterator(filename, archive, limit=limit): origin_add(provenance, origin) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field row = provenance.content_find_first(hash_to_bytes(swhid)) if row is not None: print( "{blob}, {rev}, {date}, {path}".format( blob=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.pass_context def find_all(ctx, swhid): """Find all occurrences of the requested blob.""" from swh.provenance import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field for row in provenance.content_find_all(hash_to_bytes(swhid)): print( "{blob}, {rev}, {date}, {path}".format( blob=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index 3868a6c..f6e67f4 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,183 +1,184 @@ from datetime import datetime +from itertools import islice import threading -from typing import Optional +from typing import Iterable, Iterator, Optional, Tuple from swh.model.hashutil import hash_to_bytes from .archive import ArchiveInterface class RevisionEntry: def __init__( self, archive: ArchiveInterface, id: bytes, date: Optional[datetime] = None, root: Optional[bytes] = None, parents: Optional[list] = None, ): self.archive = archive self.id = id self.date = date self.parents = parents self.root = root def __iter__(self): if self.parents is None: self.parents = [] for parent in self.archive.revision_get([self.id]): if parent is not None: self.parents.append( RevisionEntry( self.archive, parent.id, parents=[ RevisionEntry(self.archive, id) for id in parent.parents ], ) ) return iter(self.parents) ######################################################################################## ######################################################################################## -class RevisionIterator: - """Iterator interface.""" +class CSVRevisionIterator: + """Iterator over revisions typically present in the given CSV file. - def __iter__(self): - pass - - def __next__(self): - pass + The input is an iterator that produces 3 elements per row: + (id, date, root) -class FileRevisionIterator(RevisionIterator): - """Iterator over revisions present in the given CSV file.""" + where: + - id: is the id (sha1_git) of the revision + - date: is the author date + - root: sha1 of the directory + """ def __init__( - self, filename: str, archive: ArchiveInterface, limit: Optional[int] = None + self, + revisions: Iterable[Tuple[bytes, datetime, bytes]], + archive: ArchiveInterface, + limit: Optional[int] = None, ): - self.file = open(filename) - self.idx = 0 - self.limit = limit + self.revisions: Iterator[Tuple[bytes, datetime, bytes]] + if limit is not None: + self.revisions = islice(revisions, limit) + else: + self.revisions = iter(revisions) self.mutex = threading.Lock() self.archive = archive - def next(self): - self.mutex.acquire() - line = self.file.readline().strip() - if line and (self.limit is None or self.idx < self.limit): - self.idx = self.idx + 1 - id, date, root = line.strip().split(",") - self.mutex.release() + def __iter__(self): + return self + def __next__(self): + with self.mutex: + id, date, root = next(self.revisions) return RevisionEntry( self.archive, hash_to_bytes(id), date=datetime.fromisoformat(date), root=hash_to_bytes(root), ) - else: - self.mutex.release() - return None # class ArchiveRevisionIterator(RevisionIterator): # """Iterator over revisions present in the given database.""" # # def __init__(self, conn, limit=None, chunksize=100): # self.cur = conn.cursor() # self.chunksize = chunksize # self.records = [] # if limit is None: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision''') # else: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision # LIMIT %s''', (limit,)) # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # self.mutex = threading.Lock() # # def __del__(self): # self.cur.close() # # def next(self): # self.mutex.acquire() # if not self.records: # self.records.clear() # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # # if self.records: # revision, *self.records = self.records # self.mutex.release() # return revision # else: # self.mutex.release() # return None # # def make_record(self, row): # # Only revision with author or committer date are considered # if row[1] is not None: # # If the revision has author date, it takes precedence # return RevisionEntry(row[0], row[1], row[3]) # elif row[2] is not None: # # If not, we use the committer date # return RevisionEntry(row[0], row[2], row[3]) ######################################################################################## ######################################################################################## # class RevisionWorker(threading.Thread): # def __init__( # self, # id: int, # conninfo: dict, # archive: ArchiveInterface, # revisions: RevisionIterator # ): # from .provenance import get_provenance # # super().__init__() # self.archive = archive # self.id = id # self.provenance = get_provenance(conninfo) # self.revisions = revisions # # # def run(self): # from .provenance import revision_add # # # while True: # revision = self.revisions.next() # if revision is None: break # # processed = False # while not processed: # logging.info( # f'Thread {( # self.id # )} - Processing revision {( # hash_to_hex(revision.id) # )} (timestamp: {revision.date})' # ) # processed = revision_add(self.provenance, self.archive, revision) # if not processed: # logging.warning( # f'Thread {( # self.id # )} - Failed to process revision {( # hash_to_hex(revision.id) # )} (timestamp: {revision.date})' # ) diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index db87dac..b9ca21c 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,57 +1,69 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob from os import path import pytest from swh.core.db.pytest_plugin import postgresql_fact from swh.core.utils import numfile_sortkey as sortkey from swh.model.tests.swh_model_data import TEST_OBJECTS import swh.provenance +from swh.provenance.postgresql.archive import ArchivePostgreSQL +from swh.provenance.storage.archive import ArchiveStorage SQL_DIR = path.join(path.dirname(swh.provenance.__file__), "sql") SQL_FILES = [ sqlfile for sqlfile in sorted(glob.glob(path.join(SQL_DIR, "*.sql")), key=sortkey) if "-without-path-" not in sqlfile ] provenance_db = postgresql_fact( "postgresql_proc", db_name="provenance", dump_files=SQL_FILES ) @pytest.fixture def provenance(provenance_db): """return a working and initialized provenance db""" from swh.provenance.postgresql.provenancedb_with_path import ( ProvenanceWithPathDB as ProvenanceDB, ) return ProvenanceDB(provenance_db) @pytest.fixture def swh_storage_with_objects(swh_storage): """return a Storage object (postgresql-based by default) with a few of each object type in it The inserted content comes from swh.model.tests.swh_model_data. """ for obj_type in ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "origin", "origin_visit", "origin_visit_status", ): getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type]) return swh_storage + + +@pytest.fixture +def archive_direct(swh_storage_with_objects): + return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn) + + +@pytest.fixture +def archive_api(swh_storage_with_objects): + return ArchiveStorage(swh_storage_with_objects) diff --git a/swh/provenance/tests/test_revision_iterator.py b/swh/provenance/tests/test_revision_iterator.py new file mode 100644 index 0000000..c5e1cde --- /dev/null +++ b/swh/provenance/tests/test_revision_iterator.py @@ -0,0 +1,28 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information +import datetime + +from swh.model.model import TimestampWithTimezone +from swh.model.tests.swh_model_data import TEST_OBJECTS +from swh.provenance.revision import CSVRevisionIterator + + +def ts_to_dt(ts_with_tz: TimestampWithTimezone) -> datetime.datetime: + """converts a TimestampWithTimezone into a datetime""" + ts = ts_with_tz.timestamp + timestamp = datetime.datetime.fromtimestamp(ts.seconds, datetime.timezone.utc) + timestamp = timestamp.replace(microsecond=ts.microseconds) + return timestamp + + +def test_archive_direct_revision_iterator(swh_storage_with_objects, archive_direct): + """Test FileOriginIterator""" + revisions_csv = [ + (rev.id, ts_to_dt(rev.date).isoformat(), rev.directory) + for rev in TEST_OBJECTS["revision"] + ] + revisions = list(CSVRevisionIterator(revisions_csv, archive_direct)) + assert revisions + assert len(revisions) == len(TEST_OBJECTS["revision"])