diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -181,7 +181,7 @@ def origin_from_journal(ctx: click.core.Context): from swh.journal.client import get_journal_client - from .journal_client import process_journal_objects + from .journal_client import process_journal_origins provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] @@ -189,7 +189,7 @@ journal_cfg = ctx.obj["config"].get("journal_client", {}) worker_fn = partial( - process_journal_objects, + process_journal_origins, archive=archive, provenance=provenance, ) @@ -213,6 +213,186 @@ client.close() +@cli.group(name="revision") +@click.pass_context +def revision(ctx: click.core.Context): + from . import get_archive, get_provenance + + archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) + provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) + + ctx.obj["provenance"] = provenance + ctx.obj["archive"] = archive + + +@revision.command(name="from-csv") +@click.argument("filename", type=click.Path(exists=True)) +@click.option( + "-a", + "--track-all", + default=True, + type=bool, + help="""Index all occurrences of files in the development history.""", +) +@click.option( + "-f", + "--flatten", + default=True, + type=bool, + help="""Create flat models for directories in the isochrone frontier.""", +) +@click.option( + "-l", + "--limit", + type=int, + help="""Limit the amount of entries (revisions) to read from the input file.""", +) +@click.option( + "-m", + "--min-depth", + default=1, + type=int, + help="""Set minimum depth (in the directory tree) at which an isochrone """ + """frontier can be defined.""", +) +@click.option( + "-r", + "--reuse", + default=True, + type=bool, + help="""Prioritize the usage of previously defined isochrone frontiers """ + """whenever possible.""", +) +@click.option( + "-s", + "--min-size", + default=0, + type=int, + help="""Set the minimum size (in bytes) of files to be indexed. """ + """Any smaller file will be ignored.""", +) +@click.pass_context +def revision_from_csv( + ctx: click.core.Context, + filename: str, + track_all: bool, + flatten: bool, + limit: Optional[int], + min_depth: int, + reuse: bool, + min_size: int, +) -> None: + from .revision import CSVRevisionIterator, revision_add + + provenance = ctx.obj["provenance"] + archive = ctx.obj["archive"] + + revisions_provider = generate_revision_tuples(filename) + revisions = CSVRevisionIterator(revisions_provider, limit=limit) + + with provenance: + for revision in revisions: + revision_add( + provenance, + archive, + [revision], + trackall=track_all, + flatten=flatten, + lower=reuse, + mindepth=min_depth, + minsize=min_size, + ) + + +@revision.command(name="from-journal") +@click.option( + "-a", + "--track-all", + default=True, + type=bool, + help="""Index all occurrences of files in the development history.""", +) +@click.option( + "-f", + "--flatten", + default=True, + type=bool, + help="""Create flat models for directories in the isochrone frontier.""", +) +@click.option( + "-l", + "--limit", + type=int, + help="""Limit the amount of entries (revisions) to read from the input file.""", +) +@click.option( + "-m", + "--min-depth", + default=1, + type=int, + help="""Set minimum depth (in the directory tree) at which an isochrone """ + """frontier can be defined.""", +) +@click.option( + "-r", + "--reuse", + default=True, + type=bool, + help="""Prioritize the usage of previously defined isochrone frontiers """ + """whenever possible.""", +) +@click.option( + "-s", + "--min-size", + default=0, + type=int, + help="""Set the minimum size (in bytes) of files to be indexed. """ + """Any smaller file will be ignored.""", +) +@click.pass_context +def revision_from_journal( + ctx: click.core.Context, + track_all: bool, + flatten: bool, + limit: Optional[int], + min_depth: int, + reuse: bool, + min_size: int, +) -> None: + from swh.journal.client import get_journal_client + + from .journal_client import process_journal_revisions + + provenance = ctx.obj["provenance"] + archive = ctx.obj["archive"] + + journal_cfg = ctx.obj["config"].get("journal_client", {}) + + worker_fn = partial( + process_journal_revisions, + archive=archive, + provenance=provenance, + ) + + cls = journal_cfg.pop("cls", None) or "kafka" + client = get_journal_client( + cls, + **{ + **journal_cfg, + "object_types": ["revision"], + }, + ) + + try: + client.process(worker_fn) + except KeyboardInterrupt: + ctx.exit(0) + else: + print("Done.") + finally: + client.close() + + @cli.command(name="iter-frontiers") @click.argument("filename") @click.option( diff --git a/swh/provenance/journal_client.py b/swh/provenance/journal_client.py --- a/swh/provenance/journal_client.py +++ b/swh/provenance/journal_client.py @@ -3,14 +3,16 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from swh.model.model import TimestampWithTimezone from swh.provenance.interface import ProvenanceInterface -from swh.provenance.model import OriginEntry +from swh.provenance.model import OriginEntry, RevisionEntry from swh.provenance.origin import origin_add +from swh.provenance.revision import revision_add from swh.storage.interface import StorageInterface -def process_journal_objects( - messages, *, provenance: ProvenanceInterface, archive: StorageInterface +def process_journal_origins( + messages, *, provenance: ProvenanceInterface, archive: StorageInterface, **cfg ) -> None: """Worker function for `JournalClient.process(worker_fn)`.""" assert set(messages) == {"origin_visit_status"}, set(messages) @@ -19,5 +21,26 @@ for visit in messages["origin_visit_status"] if visit["snapshot"] is not None ] - with provenance: - origin_add(provenance, archive, origin_entries) + if origin_entries: + with provenance: + origin_add(provenance, archive, origin_entries, **cfg) + + +def process_journal_revisions( + messages, *, provenance: ProvenanceInterface, archive: StorageInterface, **cfg +) -> None: + """Worker function for `JournalClient.process(worker_fn)`.""" + assert set(messages) == {"revision"}, set(messages) + revisions = [ + RevisionEntry( + id=rev["id"], + date=TimestampWithTimezone.from_dict(rev["date"]).to_datetime(), + root=rev["directory"], + parents=rev["parents"], + ) + for rev in messages["revision"] + if rev["date"] is not None + ] + if revisions: + with provenance: + revision_add(provenance, archive, revisions, **cfg) diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py --- a/swh/provenance/tests/test_journal_client.py +++ b/swh/provenance/tests/test_journal_client.py @@ -54,7 +54,6 @@ "brokers": [kafka_server], "group_id": "toto", "prefix": kafka_prefix, - "object_types": ["origin_visit_status"], "stop_on_eof": True, }, "provenance": { @@ -80,3 +79,57 @@ actual_result = provenance.storage.origin_get([origin_sha1]) assert actual_result == {origin_sha1: origin_url} + + +def test_cli_revision_from_journal_client( + swh_storage: StorageInterface, + swh_storage_backend_config: Dict, + kafka_prefix: str, + kafka_server: str, + consumer: Consumer, + tmp_path: str, + provenance, + provenance_postgresql, +) -> None: + """Test revision journal client cli""" + + # Prepare storage data + data = load_repo_data("cmdbts2") + assert len(data["origin"]) == 1 + fill_storage(swh_storage, data) + + # Prepare configuration for cli call + swh_storage_backend_config.pop("journal_writer", None) # no need for that config + storage_config_dict = swh_storage_backend_config + cfg = { + "journal_client": { + "cls": "kafka", + "brokers": [kafka_server], + "group_id": "toto", + "prefix": kafka_prefix, + "stop_on_eof": True, + }, + "provenance": { + "archive": { + "cls": "api", + "storage": storage_config_dict, + }, + "storage": { + "cls": "postgresql", + "db": provenance_postgresql.get_dsn_parameters(), + }, + }, + } + config_path = write_configuration_path(cfg, tmp_path) + + revisions = [rev["id"] for rev in data["revision"]] + result = provenance.storage.revision_get(revisions) + assert not result + + # call the cli 'swh provenance revision from-journal' + cli_result = invoke(["revision", "from-journal"], config_path) + assert cli_result.exit_code == 0, f"Unexpected result: {result.output}" + + result = provenance.storage.revision_get(revisions) + + assert set(result.keys()) == set(revisions)