diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -141,6 +141,113 @@ atexit.register(exit) +@cli.command(name="replay") +@click.option( + "--stop-after-objects", + "-n", + default=None, + type=int, + help="Stop after processing this many objects. Default is to " "run forever.", +) +@click.option( + "--type", + "-t", + "object_types", + default=[], + type=click.Choice( + [ + "content", + "directory", + "revision", + "location", + "content_in_revision", + "content_in_directory", + "directory_in_revision", + ] + ), + help="Object types to replay", + multiple=True, +) +@click.pass_context +def replay(ctx: click.core.Context, stop_after_objects, object_types): + """Fill a ProvenanceStorage by reading a Journal. + + This is typically used to replicate a Provenance database, reading the + Software Heritage kafka journal to retrieve objects of the Software + Heritage provenance storage to feed a replicate provenance storage. There + can be several 'replayers' filling a ProvenanceStorage as long as they use + the same `group-id`. + + The expected configuration file should have one 'provenance' section with 2 + subsections: + + - storage: the configuration of the provenance storage in which to add + objects received from the kafka journal, + + - journal_client: the configuration of access to the kafka journal. See the + documentation of `swh.journal` for more details on the possible + configuration entries in this section. + + https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html + + eg.:: + + provenance: + storage: + cls: postgresql + db: + [...] + journal_client: + cls: kafka + prefix: swh.journal.provenance + brokers: [...] + [...] + """ + import functools + + from swh.journal.client import get_journal_client + from swh.provenance.storage import get_provenance_storage + from swh.provenance.storage.replay import ( + ProvenanceObjectDeserializer, + process_replay_objects, + ) + + conf = ctx.obj["config"]["provenance"] + storage = get_provenance_storage(**conf.pop("storage")) + + client_cfg = conf.pop("journal_client") + + deserializer = ProvenanceObjectDeserializer() + + client_cfg["value_deserializer"] = deserializer.convert + if object_types: + client_cfg["object_types"] = object_types + if stop_after_objects: + client_cfg["stop_after_objects"] = stop_after_objects + + try: + client = get_journal_client(**client_cfg) + except ValueError as exc: + ctx.fail(str(exc)) + + worker_fn = functools.partial(process_replay_objects, storage=storage) + + if notify: + notify("READY=1") + + try: + with storage: + n = client.process(worker_fn) + except KeyboardInterrupt: + ctx.exit(0) + else: + print(f"Done, processed {n} messages") + finally: + if notify: + notify("STOPPING=1") + client.close() + + @cli.group(name="origin") @click.pass_context def origin(ctx: click.core.Context): diff --git a/swh/provenance/storage/interface.py b/swh/provenance/storage/interface.py --- a/swh/provenance/storage/interface.py +++ b/swh/provenance/storage/interface.py @@ -49,7 +49,7 @@ already been created. """ - date: datetime + date: Optional[datetime] flat: bool diff --git a/swh/provenance/storage/rabbitmq/server.py b/swh/provenance/storage/rabbitmq/server.py --- a/swh/provenance/storage/rabbitmq/server.py +++ b/swh/provenance/storage/rabbitmq/server.py @@ -74,6 +74,8 @@ for sha1, dir in data: known = result.setdefault(sha1, dir) value = known + assert dir.date is not None + assert known.date is not None if dir.date < known.date: value = DirectoryData(date=dir.date, flat=value.flat) if dir.flat: diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py --- a/swh/provenance/tests/test_cli.py +++ b/swh/provenance/tests/test_cli.py @@ -3,10 +3,14 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from datetime import datetime, timezone +import logging +import re from typing import Dict, List from _pytest.monkeypatch import MonkeyPatch from click.testing import CliRunner +from confluent_kafka import Producer import psycopg2.extensions import pytest @@ -14,13 +18,21 @@ import swh.core.cli.db # noqa ; ensure cli is loaded from swh.core.db import BaseDb from swh.core.db.db_utils import init_admin_extensions +from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.model.hashutil import MultiHash import swh.provenance.cli # noqa ; ensure cli is loaded +from swh.provenance.storage.interface import EntityType, RelationType from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface from .conftest import get_datafile -from .test_utils import invoke, write_configuration_path +from .test_utils import invoke + +logger = logging.getLogger(__name__) + + +def now(): + return datetime.now(timezone.utc) def test_cli_swh_db_help() -> None: @@ -144,12 +156,10 @@ }, } - config_path = write_configuration_path(cfg, tmp_path) - csv_filepath = get_datafile("origins.csv") subcommand = subcommand + [csv_filepath] - result = invoke(subcommand, config_path) + result = invoke(subcommand, config=cfg) assert result.exit_code == 0, f"Unexpected result: {result.output}" origin_sha1 = MultiHash.from_data( @@ -158,3 +168,128 @@ actual_result = provenance.storage.origin_get([origin_sha1]) assert actual_result == {origin_sha1: origin_url} + + +@pytest.mark.kafka +def test_replay( + provenance_storage, + provenance_postgresqldb, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: str, +): + kafka_prefix += ".swh.journal.provenance" + + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test-producer", + "acks": "all", + } + ) + + for i in range(10): + cntkey = (b"cnt:" + bytes([i])).ljust(20, b"\x00") + producer.produce( + topic=kafka_prefix + ".content", + key=key_to_kafka(cntkey), + value=value_to_kafka({"id": cntkey, "value": {"date": None}}), + ) + dirkey = (b"dir:" + bytes([i])).ljust(20, b"\x00") + producer.produce( + topic=kafka_prefix + ".directory", + key=key_to_kafka(dirkey), + value=value_to_kafka( + {"id": dirkey, "value": {"date": None, "flat": False}} + ), + ) + revkey = (b"rev:" + bytes([i])).ljust(20, b"\x00") + producer.produce( + topic=kafka_prefix + ".revision", + key=key_to_kafka(revkey), + value=value_to_kafka( + {"id": revkey, "value": {"date": None, "origin": None}} + ), + ) + loc = f"dir/{i}".encode() + lockey = (b"loc:" + bytes([i])).ljust(20, b"\x00") + producer.produce( + topic=kafka_prefix + ".location", + key=key_to_kafka(lockey), + value=value_to_kafka({"id": lockey, "value": loc}), + ) + + producer.produce( + topic=kafka_prefix + ".content_in_revision", + key=key_to_kafka(cntkey), + value=value_to_kafka( + {"id": cntkey, "value": [{"dst": revkey, "path": loc}]} + ), + ) + producer.produce( + topic=kafka_prefix + ".content_in_directory", + key=key_to_kafka(cntkey), + value=value_to_kafka( + {"id": cntkey, "value": [{"dst": dirkey, "path": loc}]} + ), + ) + producer.produce( + topic=kafka_prefix + ".directory_in_revision", + key=key_to_kafka(dirkey), + value=value_to_kafka( + {"id": dirkey, "value": [{"dst": revkey, "path": loc}]} + ), + ) + + # now add dates to entities + producer.produce( + topic=kafka_prefix + ".content", + key=key_to_kafka(cntkey), + value=value_to_kafka({"id": cntkey, "value": {"date": now()}}), + ) + producer.produce( + topic=kafka_prefix + ".directory", + key=key_to_kafka(dirkey), + value=value_to_kafka( + {"id": dirkey, "value": {"date": now(), "flat": False}} + ), + ) + producer.produce( + topic=kafka_prefix + ".revision", + key=key_to_kafka(revkey), + value=value_to_kafka( + {"id": revkey, "value": {"date": now(), "origin": None}} + ), + ) + + producer.flush() + logger.debug("Flushed producer") + config = { + "provenance": { + "storage": { + "cls": "postgresql", + "db": provenance_postgresqldb, + }, + "journal_client": { + "cls": "kafka", + "brokers": [kafka_server], + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + "stop_on_eof": True, + }, + } + } + + result = invoke(["replay"], config=config) + expected = r"Done. processed 100 messages\n" + + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + assert len(provenance_storage.entity_get_all(EntityType.CONTENT)) == 10 + assert len(provenance_storage.entity_get_all(EntityType.REVISION)) == 10 + assert len(provenance_storage.entity_get_all(EntityType.DIRECTORY)) == 10 + assert len(provenance_storage.location_get_all()) == 10 + assert len(provenance_storage.relation_get_all(RelationType.CNT_EARLY_IN_REV)) == 10 + assert len(provenance_storage.relation_get_all(RelationType.DIR_IN_REV)) == 10 + assert len(provenance_storage.relation_get_all(RelationType.CNT_IN_DIR)) == 10 diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py --- a/swh/provenance/tests/test_journal_client.py +++ b/swh/provenance/tests/test_journal_client.py @@ -12,7 +12,7 @@ from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface -from .test_utils import invoke, write_configuration_path +from .test_utils import invoke @pytest.fixture @@ -35,7 +35,6 @@ kafka_prefix: str, kafka_server: str, consumer: Consumer, - tmp_path: str, provenance, postgres_provenance, ) -> None: @@ -69,10 +68,9 @@ }, }, } - config_path = write_configuration_path(cfg, tmp_path) # call the cli 'swh provenance origin from-journal' - result = invoke(["origin", "from-journal"], config_path) + result = invoke(["origin", "from-journal"], config=cfg) assert result.exit_code == 0, f"Unexpected result: {result.output}" origin_sha1 = MultiHash.from_data( @@ -90,7 +88,6 @@ kafka_prefix: str, kafka_server: str, consumer: Consumer, - tmp_path: str, provenance, postgres_provenance, ) -> None: @@ -123,14 +120,13 @@ }, }, } - config_path = write_configuration_path(cfg, tmp_path) revisions = [rev["id"] for rev in data["revision"]] result = provenance.storage.revision_get(revisions) assert not result # call the cli 'swh provenance revision from-journal' - cli_result = invoke(["revision", "from-journal"], config_path) + cli_result = invoke(["revision", "from-journal"], config=cfg) assert cli_result.exit_code == 0, f"Unexpected result: {result.output}" result = provenance.storage.revision_get(revisions) diff --git a/swh/provenance/tests/test_revision_content_layer.py b/swh/provenance/tests/test_revision_content_layer.py --- a/swh/provenance/tests/test_revision_content_layer.py +++ b/swh/provenance/tests/test_revision_content_layer.py @@ -284,6 +284,7 @@ # check timestamps for rd in synth_rev["R_D"]: dir_data = provenance.storage.directory_get([rd["dst"]])[rd["dst"]] + assert dir_data.date is not None assert rev_ts + rd["rel_ts"] == dir_data.date.timestamp(), synth_rev["msg"] assert dir_data.flat, synth_rev["msg"] diff --git a/swh/provenance/tests/test_utils.py b/swh/provenance/tests/test_utils.py --- a/swh/provenance/tests/test_utils.py +++ b/swh/provenance/tests/test_utils.py @@ -4,8 +4,9 @@ # See top-level LICENSE file for more information -from os.path import join -from typing import Dict, List +import logging +import tempfile +from typing import Dict, List, Optional from click.testing import CliRunner, Result from yaml import safe_dump @@ -13,19 +14,19 @@ from swh.provenance.cli import cli -def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result: +def invoke( + args: List[str], config: Optional[Dict] = None, catch_exceptions: bool = False +) -> Result: """Invoke swh journal subcommands""" runner = CliRunner() - result = runner.invoke(cli, ["-C" + config_path] + args) - if not catch_exceptions and result.exception: - print(result.output) - raise result.exception + with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: + if config is not None: + safe_dump(config, config_fd) + config_fd.seek(0) + args = ["-C" + config_fd.name] + args + + result = runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=None) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception return result - - -def write_configuration_path(config: Dict, tmp_path: str) -> str: - """Serialize yaml dict on disk given a configuration dict and and a temporary path.""" - config_path = join(str(tmp_path), "config.yml") - with open(config_path, "w") as f: - f.write(safe_dump(config)) - return config_path