diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py index 33c11d5..a61db46 100644 --- a/swh/provenance/tests/test_journal_client.py +++ b/swh/provenance/tests/test_journal_client.py @@ -1,133 +1,134 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict from confluent_kafka import Consumer import pytest from swh.model.hashutil import MultiHash from swh.storage.interface import StorageInterface from .utils import fill_storage, invoke, load_repo_data @pytest.fixture def swh_storage_backend_config(swh_storage_backend_config, kafka_server, kafka_prefix): writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": False, + "auto_flush": False, } yield {**swh_storage_backend_config, "journal_writer": writer_config} @pytest.mark.origin_layer @pytest.mark.kafka def test_cli_origin_from_journal_client( swh_storage: StorageInterface, swh_storage_backend_config: Dict, kafka_prefix: str, kafka_server: str, consumer: Consumer, provenance, postgres_provenance, ) -> None: """Test origin journal client cli""" # Prepare storage data data = load_repo_data("cmdbts2") assert len(data["origin"]) >= 1 origin_url = data["origin"][0]["url"] fill_storage(swh_storage, data) # Prepare configuration for cli call swh_storage_backend_config.pop("journal_writer", None) # no need for that config storage_config_dict = swh_storage_backend_config cfg = { "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": "toto", "prefix": kafka_prefix, "stop_on_eof": True, }, "provenance": { "archive": { "cls": "api", "storage": storage_config_dict, }, "storage": { "cls": "postgresql", "db": postgres_provenance.get_dsn_parameters(), }, }, } # call the cli 'swh provenance origin from-journal' cli_result = invoke(["origin", "from-journal"], config=cfg) assert cli_result.exit_code == 0, f"Unexpected result: {cli_result.output}" origin_sha1 = MultiHash.from_data( origin_url.encode(), hash_names=["sha1"] ).digest()["sha1"] actual_result = provenance.storage.origin_get([origin_sha1]) assert actual_result == {origin_sha1: origin_url} @pytest.mark.kafka def test_cli_revision_from_journal_client( swh_storage: StorageInterface, swh_storage_backend_config: Dict, kafka_prefix: str, kafka_server: str, consumer: Consumer, provenance, postgres_provenance, ) -> None: """Test revision journal client cli""" # Prepare storage data data = load_repo_data("cmdbts2") assert len(data["origin"]) >= 1 fill_storage(swh_storage, data) # Prepare configuration for cli call swh_storage_backend_config.pop("journal_writer", None) # no need for that config storage_config_dict = swh_storage_backend_config cfg = { "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": "toto", "prefix": kafka_prefix, "stop_on_eof": True, }, "provenance": { "archive": { "cls": "api", "storage": storage_config_dict, }, "storage": { "cls": "postgresql", "db": postgres_provenance.get_dsn_parameters(), }, }, } revisions = [rev["id"] for rev in data["revision"]] result = provenance.storage.revision_get(revisions) assert not result # call the cli 'swh provenance revision from-journal' cli_result = invoke(["revision", "from-journal"], config=cfg) assert cli_result.exit_code == 0, f"Unexpected result: {cli_result.output}" result = provenance.storage.revision_get(revisions) assert set(result.keys()) == set(revisions) diff --git a/swh/provenance/tests/test_replay.py b/swh/provenance/tests/test_replay.py index 6253202..8b4f544 100644 --- a/swh/provenance/tests/test_replay.py +++ b/swh/provenance/tests/test_replay.py @@ -1,170 +1,171 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from typing import Dict import psycopg2 import pytest from swh.journal.client import JournalClient from swh.provenance.algos.revision import revision_add from swh.provenance.archive.interface import ArchiveInterface from swh.provenance.model import RevisionEntry from swh.provenance.provenance import Provenance from swh.provenance.storage import get_provenance_storage from swh.provenance.storage.interface import ( EntityType, ProvenanceStorageInterface, RelationType, ) from swh.provenance.storage.replay import ( ProvenanceObjectDeserializer, process_replay_objects, ) from .utils import fill_storage, load_repo_data, ts2dt @pytest.fixture(scope="function") def object_types(): """Set of object types to precreate topics for.""" return { # objects "revision", "directory", "content", "location", # relations "content_in_revision", "content_in_directory", "directory_in_revision", } @pytest.fixture() def replayer_storage_and_client( provenance_postgresqldb: Dict[str, str], kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, ): cfg = { "storage": { "cls": "postgresql", "db": provenance_postgresqldb, "raise_on_commit": True, }, "journal_writer": { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": False, + "auto_flush": False, }, } with get_provenance_storage(cls="journal", **cfg) as storage: deserializer = ProvenanceObjectDeserializer() replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, value_deserializer=deserializer.convert, ) yield storage, replayer @pytest.fixture() def secondary_db(provenance_postgresqldb: Dict[str, str]): """Create a new test db the new db is named after the db configured in provenance_postgresqldb and is using the same template as this later. """ dsn = provenance_postgresqldb.copy() conn = psycopg2.connect( dbname="postgres", user=dsn["user"], password=dsn.get("password"), host=dsn["host"], port=dsn["port"], ) conn.autocommit = True with conn.cursor() as cur: dbname = dsn["dbname"] template_name = f"{dbname}_tmpl" secondary_dbname = f"{dbname}_dst" cur.execute(f'CREATE DATABASE "{secondary_dbname}" TEMPLATE "{template_name}"') try: dsn["dbname"] = secondary_dbname yield dsn finally: with conn.cursor() as cur: cur.execute(f'DROP DATABASE "{secondary_dbname}"') @pytest.mark.kafka @pytest.mark.parametrize( "repo", ( "cmdbts2", "out-of-order", "with-merges", ), ) def test_provenance_replayer( provenance_storage: ProvenanceStorageInterface, archive: ArchiveInterface, replayer_storage_and_client, secondary_db, repo: str, ): """Optimal replayer scenario. This: - writes objects to a provenance storage (which have a journal writer) - replayer consumes objects from the topic and replays them - a destination provenance storage is filled from this In the end, both storages should have the same content. """ # load test data and fill a swh-storage data = load_repo_data(repo) fill_storage(archive.storage, data) prov_sto_src, replayer = replayer_storage_and_client # Fill Kafka by filling the source provenance storage revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] revision_add(Provenance(prov_sto_src), archive, revisions) # now replay the kafka log in a new provenance storage with get_provenance_storage( cls="postgresql", db=secondary_db, raise_on_commit=True ) as prov_sto_dst: worker_fn = functools.partial(process_replay_objects, storage=prov_sto_dst) replayer.process(worker_fn) compare_provenance_storages(prov_sto_src, prov_sto_dst) def compare_provenance_storages(sto1, sto2): entities1 = {etype: sto1.entity_get_all(etype) for etype in EntityType} entities2 = {etype: sto2.entity_get_all(etype) for etype in EntityType} assert entities1 == entities2 relations1 = {rtype: sto1.relation_get_all(rtype) for rtype in RelationType} relations2 = {rtype: sto2.relation_get_all(rtype) for rtype in RelationType} assert relations1 == relations2