diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 0a34e69..a9a5118 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,427 +1,607 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control from datetime import datetime, timezone from functools import partial import os from typing import Any, Dict, Generator, Optional, Tuple import click from deprecated import deprecated import iso8601 import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import Sha1Git # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) DEFAULT_CONFIG: Dict[str, Any] = { "provenance": { "archive": { # Storage API based Archive object # "cls": "api", # "storage": { # "cls": "remote", # "url": "http://uffizi.internal.softwareheritage.org:5002", # } # Direct access Archive object "cls": "direct", "db": { "host": "belvedere.internal.softwareheritage.org", "port": 5432, "dbname": "softwareheritage", "user": "guest", }, }, "storage": { # Local PostgreSQL Storage # "cls": "postgresql", # "db": { # "host": "localhost", # "user": "postgres", # "password": "postgres", # "dbname": "provenance", # }, # Local MongoDB Storage # "cls": "mongodb", # "db": { # "dbname": "provenance", # }, # Remote RabbitMQ/PostgreSQL Storage "cls": "rabbitmq", "url": "amqp://localhost:5672/%2f", "storage_config": { "cls": "postgresql", "db": { "host": "localhost", "user": "postgres", "password": "postgres", "dbname": "provenance", }, }, "batch_size": 100, "prefetch_count": 100, }, } } CONFIG_FILE_HELP = f""" \b Configuration can be loaded from a yaml file given either as --config-file option or the {CONFIG_ENVVAR} environment variable. If no configuration file is specified, use the following default configuration:: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage provenance index database tools {CONFIG_FILE_HELP} """ @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None: if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not os.path.exists(config_file): raise FileNotFoundError(config_file) conf = yaml.safe_load(open(config_file, "rb")) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit() -> None: pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.group(name="origin") @click.pass_context def origin(ctx: click.core.Context): from . import get_archive, get_provenance archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) ctx.obj["provenance"] = provenance ctx.obj["archive"] = archive @origin.command(name="from-csv") @click.argument("filename", type=click.Path(exists=True)) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (origins) to read from the input file.""", ) @click.pass_context def origin_from_csv(ctx: click.core.Context, filename: str, limit: Optional[int]): from .origin import CSVOriginIterator, origin_add provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) with provenance: for origin in origins: origin_add(provenance, archive, [origin]) @origin.command(name="from-journal") @click.pass_context def origin_from_journal(ctx: click.core.Context): from swh.journal.client import get_journal_client from .journal_client import process_journal_origins provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] journal_cfg = ctx.obj["config"].get("journal_client", {}) worker_fn = partial( process_journal_origins, archive=archive, provenance=provenance, ) cls = journal_cfg.pop("cls", None) or "kafka" client = get_journal_client( cls, **{ **journal_cfg, "object_types": ["origin_visit_status"], }, ) try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: client.close() +@cli.group(name="revision") +@click.pass_context +def revision(ctx: click.core.Context): + from . import get_archive, get_provenance + + archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) + provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) + + ctx.obj["provenance"] = provenance + ctx.obj["archive"] = archive + + +@revision.command(name="from-csv") +@click.argument("filename", type=click.Path(exists=True)) +@click.option( + "-a", + "--track-all", + default=True, + type=bool, + help="""Index all occurrences of files in the development history.""", +) +@click.option( + "-f", + "--flatten", + default=True, + type=bool, + help="""Create flat models for directories in the isochrone frontier.""", +) +@click.option( + "-l", + "--limit", + type=int, + help="""Limit the amount of entries (revisions) to read from the input file.""", +) +@click.option( + "-m", + "--min-depth", + default=1, + type=int, + help="""Set minimum depth (in the directory tree) at which an isochrone """ + """frontier can be defined.""", +) +@click.option( + "-r", + "--reuse", + default=True, + type=bool, + help="""Prioritize the usage of previously defined isochrone frontiers """ + """whenever possible.""", +) +@click.option( + "-s", + "--min-size", + default=0, + type=int, + help="""Set the minimum size (in bytes) of files to be indexed. """ + """Any smaller file will be ignored.""", +) +@click.pass_context +def revision_from_csv( + ctx: click.core.Context, + filename: str, + track_all: bool, + flatten: bool, + limit: Optional[int], + min_depth: int, + reuse: bool, + min_size: int, +) -> None: + from .revision import CSVRevisionIterator, revision_add + + provenance = ctx.obj["provenance"] + archive = ctx.obj["archive"] + + revisions_provider = generate_revision_tuples(filename) + revisions = CSVRevisionIterator(revisions_provider, limit=limit) + + with provenance: + for revision in revisions: + revision_add( + provenance, + archive, + [revision], + trackall=track_all, + flatten=flatten, + lower=reuse, + mindepth=min_depth, + minsize=min_size, + ) + + +@revision.command(name="from-journal") +@click.option( + "-a", + "--track-all", + default=True, + type=bool, + help="""Index all occurrences of files in the development history.""", +) +@click.option( + "-f", + "--flatten", + default=True, + type=bool, + help="""Create flat models for directories in the isochrone frontier.""", +) +@click.option( + "-l", + "--limit", + type=int, + help="""Limit the amount of entries (revisions) to read from the input file.""", +) +@click.option( + "-m", + "--min-depth", + default=1, + type=int, + help="""Set minimum depth (in the directory tree) at which an isochrone """ + """frontier can be defined.""", +) +@click.option( + "-r", + "--reuse", + default=True, + type=bool, + help="""Prioritize the usage of previously defined isochrone frontiers """ + """whenever possible.""", +) +@click.option( + "-s", + "--min-size", + default=0, + type=int, + help="""Set the minimum size (in bytes) of files to be indexed. """ + """Any smaller file will be ignored.""", +) +@click.pass_context +def revision_from_journal( + ctx: click.core.Context, + track_all: bool, + flatten: bool, + limit: Optional[int], + min_depth: int, + reuse: bool, + min_size: int, +) -> None: + from swh.journal.client import get_journal_client + + from .journal_client import process_journal_revisions + + provenance = ctx.obj["provenance"] + archive = ctx.obj["archive"] + + journal_cfg = ctx.obj["config"].get("journal_client", {}) + + worker_fn = partial( + process_journal_revisions, + archive=archive, + provenance=provenance, + ) + + cls = journal_cfg.pop("cls", None) or "kafka" + client = get_journal_client( + cls, + **{ + **journal_cfg, + "object_types": ["revision"], + }, + ) + + try: + client.process(worker_fn) + except KeyboardInterrupt: + ctx.exit(0) + else: + print("Done.") + finally: + client.close() + + @cli.command(name="iter-frontiers") @click.argument("filename") @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (directories) to read from the input file.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.pass_context def iter_frontiers( ctx: click.core.Context, filename: str, limit: Optional[int], min_size: int, ) -> None: """Process a provided list of directories in the isochrone frontier.""" from . import get_archive, get_provenance from .directory import CSVDirectoryIterator, directory_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) directories_provider = generate_directory_ids(filename) directories = CSVDirectoryIterator(directories_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for directory in directories: directory_add( provenance, archive, [directory], minsize=min_size, ) def generate_directory_ids( filename: str, ) -> Generator[Sha1Git, None, None]: for line in open(filename, "r"): if line.strip(): yield hash_to_bytes(line.strip()) @cli.command(name="iter-revisions") @click.argument("filename") @click.option( "-a", "--track-all", default=True, type=bool, help="""Index all occurrences of files in the development history.""", ) @click.option( "-f", "--flatten", default=True, type=bool, help="""Create flat models for directories in the isochrone frontier.""", ) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (revisions) to read from the input file.""", ) @click.option( "-m", "--min-depth", default=1, type=int, help="""Set minimum depth (in the directory tree) at which an isochrone """ """frontier can be defined.""", ) @click.option( "-r", "--reuse", default=True, type=bool, help="""Prioritize the usage of previously defined isochrone frontiers """ """whenever possible.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.pass_context def iter_revisions( ctx: click.core.Context, filename: str, track_all: bool, flatten: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, ) -> None: """Process a provided list of revisions.""" from . import get_archive, get_provenance from .revision import CSVRevisionIterator, revision_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, flatten=flatten, lower=reuse, mindepth=min_depth, minsize=min_size, ) def generate_revision_tuples( filename: str, ) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]: for line in open(filename, "r"): if line.strip(): revision, date, root = line.strip().split(",") yield ( hash_to_bytes(revision), iso8601.parse_date(date, default_timezone=timezone.utc), hash_to_bytes(root), ) @cli.command(name="iter-origins") @click.argument("filename") @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (origins) to read from the input file.""", ) @click.pass_context @deprecated(version="0.0.1", reason="Use `swh provenance origin from-csv` instead") def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None: """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import CSVOriginIterator, origin_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for origin in origins: origin_add(provenance, archive, [origin]) def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]: for line in open(filename, "r"): if line.strip(): url, snapshot = line.strip().split(",") yield (url, hash_to_bytes(snapshot)) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx: click.core.Context, swhid: str) -> None: """Find first occurrence of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: occur = provenance.content_find_first(hash_to_bytes(swhid)) if occur is not None: print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option( "-l", "--limit", type=int, help="""Limit the amount results to be retrieved.""" ) @click.pass_context def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None: """Find all occurrences of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/journal_client.py b/swh/provenance/journal_client.py index f17a139..4240256 100644 --- a/swh/provenance/journal_client.py +++ b/swh/provenance/journal_client.py @@ -1,24 +1,46 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from swh.model.model import TimestampWithTimezone from swh.provenance.interface import ProvenanceInterface -from swh.provenance.model import OriginEntry +from swh.provenance.model import OriginEntry, RevisionEntry from swh.provenance.origin import origin_add +from swh.provenance.revision import revision_add from swh.storage.interface import StorageInterface def process_journal_origins( messages, *, provenance: ProvenanceInterface, archive: StorageInterface, **cfg ) -> None: """Worker function for `JournalClient.process(worker_fn)`.""" assert set(messages) == {"origin_visit_status"}, set(messages) origin_entries = [ OriginEntry(url=visit["origin"], snapshot=visit["snapshot"]) for visit in messages["origin_visit_status"] if visit["snapshot"] is not None ] if origin_entries: with provenance: origin_add(provenance, archive, origin_entries, **cfg) + + +def process_journal_revisions( + messages, *, provenance: ProvenanceInterface, archive: StorageInterface, **cfg +) -> None: + """Worker function for `JournalClient.process(worker_fn)`.""" + assert set(messages) == {"revision"}, set(messages) + revisions = [ + RevisionEntry( + id=rev["id"], + date=TimestampWithTimezone.from_dict(rev["date"]).to_datetime(), + root=rev["directory"], + parents=rev["parents"], + ) + for rev in messages["revision"] + if rev["date"] is not None + ] + if revisions: + with provenance: + revision_add(provenance, archive, revisions, **cfg) diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py index bed4898..85225b8 100644 --- a/swh/provenance/tests/test_journal_client.py +++ b/swh/provenance/tests/test_journal_client.py @@ -1,82 +1,135 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict from confluent_kafka import Consumer import pytest from swh.model.hashutil import MultiHash from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface from .test_utils import invoke, write_configuration_path @pytest.fixture def swh_storage_backend_config(swh_storage_backend_config, kafka_server, kafka_prefix): writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": False, } yield {**swh_storage_backend_config, "journal_writer": writer_config} def test_cli_origin_from_journal_client( swh_storage: StorageInterface, swh_storage_backend_config: Dict, kafka_prefix: str, kafka_server: str, consumer: Consumer, tmp_path: str, provenance, provenance_postgresql, ) -> None: """Test origin journal client cli""" # Prepare storage data data = load_repo_data("cmdbts2") assert len(data["origin"]) == 1 origin_url = data["origin"][0]["url"] fill_storage(swh_storage, data) # Prepare configuration for cli call swh_storage_backend_config.pop("journal_writer", None) # no need for that config storage_config_dict = swh_storage_backend_config cfg = { "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": "toto", "prefix": kafka_prefix, - "object_types": ["origin_visit_status"], "stop_on_eof": True, }, "provenance": { "archive": { "cls": "api", "storage": storage_config_dict, }, "storage": { "cls": "postgresql", "db": provenance_postgresql.get_dsn_parameters(), }, }, } config_path = write_configuration_path(cfg, tmp_path) # call the cli 'swh provenance origin from-journal' result = invoke(["origin", "from-journal"], config_path) assert result.exit_code == 0, f"Unexpected result: {result.output}" origin_sha1 = MultiHash.from_data( origin_url.encode(), hash_names=["sha1"] ).digest()["sha1"] actual_result = provenance.storage.origin_get([origin_sha1]) assert actual_result == {origin_sha1: origin_url} + + +def test_cli_revision_from_journal_client( + swh_storage: StorageInterface, + swh_storage_backend_config: Dict, + kafka_prefix: str, + kafka_server: str, + consumer: Consumer, + tmp_path: str, + provenance, + provenance_postgresql, +) -> None: + """Test revision journal client cli""" + + # Prepare storage data + data = load_repo_data("cmdbts2") + assert len(data["origin"]) == 1 + fill_storage(swh_storage, data) + + # Prepare configuration for cli call + swh_storage_backend_config.pop("journal_writer", None) # no need for that config + storage_config_dict = swh_storage_backend_config + cfg = { + "journal_client": { + "cls": "kafka", + "brokers": [kafka_server], + "group_id": "toto", + "prefix": kafka_prefix, + "stop_on_eof": True, + }, + "provenance": { + "archive": { + "cls": "api", + "storage": storage_config_dict, + }, + "storage": { + "cls": "postgresql", + "db": provenance_postgresql.get_dsn_parameters(), + }, + }, + } + config_path = write_configuration_path(cfg, tmp_path) + + revisions = [rev["id"] for rev in data["revision"]] + result = provenance.storage.revision_get(revisions) + assert not result + + # call the cli 'swh provenance revision from-journal' + cli_result = invoke(["revision", "from-journal"], config_path) + assert cli_result.exit_code == 0, f"Unexpected result: {result.output}" + + result = provenance.storage.revision_get(revisions) + + assert set(result.keys()) == set(revisions)