diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -181,7 +181,7 @@ def origin_from_journal(ctx: click.core.Context): from swh.journal.client import get_journal_client - from .journal_client import process_journal_objects + from .journal_client import process_journal_origins provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] @@ -189,7 +189,187 @@ journal_cfg = ctx.obj["config"].get("journal_client", {}) worker_fn = partial( - process_journal_objects, + process_journal_origins, + archive=archive, + provenance=provenance, + ) + + cls = journal_cfg.pop("cls", None) or "kafka" + client = get_journal_client( + cls, + **{ + **journal_cfg, + "object_types": ["origin_visit_status"], + }, + ) + + try: + client.process(worker_fn) + except KeyboardInterrupt: + ctx.exit(0) + else: + print("Done.") + finally: + client.close() + + +@cli.group(name="revision") +@click.pass_context +def revision(ctx: click.core.Context): + from . import get_archive, get_provenance + + archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) + provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) + + ctx.obj["provenance"] = provenance + ctx.obj["archive"] = archive + + +@revision.command(name="from-csv") +@click.argument("filename", type=click.Path(exists=True)) +@click.option( + "-a", + "--track-all", + default=True, + type=bool, + help="""Index all occurrences of files in the development history.""", +) +@click.option( + "-f", + "--flatten", + default=True, + type=bool, + help="""Create flat models for directories in the isochrone frontier.""", +) +@click.option( + "-l", + "--limit", + type=int, + help="""Limit the amount of entries (revisions) to read from the input file.""", +) +@click.option( + "-m", + "--min-depth", + default=1, + type=int, + help="""Set minimum depth (in the directory tree) at which an isochrone """ + """frontier can be defined.""", +) +@click.option( + "-r", + "--reuse", + default=True, + type=bool, + help="""Prioritize the usage of previously defined isochrone frontiers """ + """whenever possible.""", +) +@click.option( + "-s", + "--min-size", + default=0, + type=int, + help="""Set the minimum size (in bytes) of files to be indexed. """ + """Any smaller file will be ignored.""", +) +@click.pass_context +def revision_from_csv( + ctx: click.core.Context, + filename: str, + track_all: bool, + flatten: bool, + limit: Optional[int], + min_depth: int, + reuse: bool, + min_size: int, +) -> None: + from .revision import CSVRevisionIterator, revision_add + + provenance = ctx.obj["provenance"] + archive = ctx.obj["archive"] + + revisions_provider = generate_revision_tuples(filename) + revisions = CSVRevisionIterator(revisions_provider, limit=limit) + + with provenance: + for revision in revisions: + revision_add( + provenance, + archive, + [revision], + trackall=track_all, + flatten=flatten, + lower=reuse, + mindepth=min_depth, + minsize=min_size, + ) + + +@revision.command(name="from-journal") +@click.option( + "-a", + "--track-all", + default=True, + type=bool, + help="""Index all occurrences of files in the development history.""", +) +@click.option( + "-f", + "--flatten", + default=True, + type=bool, + help="""Create flat models for directories in the isochrone frontier.""", +) +@click.option( + "-l", + "--limit", + type=int, + help="""Limit the amount of entries (revisions) to read from the input file.""", +) +@click.option( + "-m", + "--min-depth", + default=1, + type=int, + help="""Set minimum depth (in the directory tree) at which an isochrone """ + """frontier can be defined.""", +) +@click.option( + "-r", + "--reuse", + default=True, + type=bool, + help="""Prioritize the usage of previously defined isochrone frontiers """ + """whenever possible.""", +) +@click.option( + "-s", + "--min-size", + default=0, + type=int, + help="""Set the minimum size (in bytes) of files to be indexed. """ + """Any smaller file will be ignored.""", +) +@click.pass_context +def revision_from_journal( + ctx: click.core.Context, + track_all: bool, + flatten: bool, + limit: Optional[int], + min_depth: int, + reuse: bool, + min_size: int, +) -> None: + from swh.journal.client import get_journal_client + + from .journal_client import process_journal_origins + + provenance = ctx.obj["provenance"] + archive = ctx.obj["archive"] + + journal_cfg = ctx.obj["config"].get("journal_client", {}) + + worker_fn = partial( + process_journal_origins, archive=archive, provenance=provenance, ) diff --git a/swh/provenance/journal_client.py b/swh/provenance/journal_client.py --- a/swh/provenance/journal_client.py +++ b/swh/provenance/journal_client.py @@ -4,13 +4,14 @@ # See top-level LICENSE file for more information from swh.provenance.interface import ProvenanceInterface -from swh.provenance.model import OriginEntry +from swh.provenance.model import OriginEntry, RevisionEntry from swh.provenance.origin import origin_add +from swh.provenance.revision import revision_add from swh.storage.interface import StorageInterface -def process_journal_objects( - messages, *, provenance: ProvenanceInterface, archive: StorageInterface +def process_journal_origins( + messages, *, provenance: ProvenanceInterface, archive: StorageInterface, **cfg ) -> None: """Worker function for `JournalClient.process(worker_fn)`.""" assert set(messages) == {"origin_visit_status"}, set(messages) @@ -19,5 +20,26 @@ for visit in messages["origin_visit_status"] if visit["snapshot"] is not None ] - with provenance: - origin_add(provenance, archive, origin_entries) + if origin_entries: + with provenance: + origin_add(provenance, archive, origin_entries, **cfg) + + +def process_journal_revisions( + messages, *, provenance: ProvenanceInterface, archive: StorageInterface, **cfg +) -> None: + """Worker function for `JournalClient.process(worker_fn)`.""" + assert set(messages) == {"revision"}, set(messages) + revisions = [ + RevisionEntry( + id=rev["id"], + date=rev["date"], + root=rev["directory"], + parents=rev["parents"], + ) + for rev in messages["revision"] + if rev["date"] is not None + ] + if revisions: + with provenance: + revision_add(provenance, archive, revisions, **cfg)