diff --git a/swh/provenance/storage/replay.py b/swh/provenance/storage/replay.py new file mode 100644 --- /dev/null +++ b/swh/provenance/storage/replay.py @@ -0,0 +1,107 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +from typing import Any, Callable, Dict, List, Optional, Tuple + +try: + from systemd.daemon import notify +except ImportError: + notify = None + +from swh.core.statsd import statsd +from swh.journal.serializers import kafka_to_value +from swh.provenance.storage.interface import ( + DirectoryData, + RelationData, + RelationType, + RevisionData, +) + +from .interface import ProvenanceStorageInterface + +logger = logging.getLogger(__name__) + +REPLAY_OPERATIONS_METRIC = "swh_provenance_replayer_operations_total" +REPLAY_DURATION_METRIC = "swh_provenance_replayer_duration_seconds" + + +def cvrt_directory(msg_d): + return (msg_d["id"], DirectoryData(**msg_d["value"])) + + +def cvrt_revision(msg_d): + return (msg_d["id"], RevisionData(**msg_d["value"])) + + +def cvrt_default(msg_d): + return (msg_d["id"], msg_d["value"]) + + +def cvrt_relation(msg_d): + return (msg_d["id"], {RelationData(**v) for v in msg_d["value"]}) + + +OBJECT_CONVERTERS: Dict[str, Callable[[Dict], Tuple[bytes, Any]]] = { + "directory": cvrt_directory, + "revision": cvrt_revision, + "content": cvrt_default, + "location": cvrt_default, + "content_in_revision": cvrt_relation, + "content_in_directory": cvrt_relation, + "directory_in_revision": cvrt_relation, +} + + +class ProvenanceObjectDeserializer: + def __init__( + self, + raise_on_error: bool = False, + reporter: Optional[Callable[[str, bytes], None]] = None, + ): + self.reporter = reporter + self.raise_on_error = raise_on_error + + def convert(self, object_type: str, msg: bytes) -> Optional[Tuple[bytes, Any]]: + dict_repr = kafka_to_value(msg) + obj = OBJECT_CONVERTERS[object_type](dict_repr) + return obj + + def report_failure(self, msg: bytes, obj: Dict): + if self.reporter: + self.reporter(obj["id"].hex(), msg) + + +def process_replay_objects( + all_objects: Dict[str, List[Tuple[bytes, Any]]], + *, + storage: ProvenanceStorageInterface, +) -> None: + for object_type, objects in all_objects.items(): + logger.debug("Inserting %s %s objects", len(objects), object_type) + with statsd.timed(REPLAY_DURATION_METRIC, tags={"object_type": object_type}): + _insert_objects(object_type, objects, storage) + statsd.increment( + REPLAY_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} + ) + if notify: + notify("WATCHDOG=1") + + +def _insert_objects( + object_type: str, + objects: List[Tuple[bytes, Any]], + storage: ProvenanceStorageInterface, +) -> None: + """Insert objects of type object_type in the storage.""" + if object_type not in OBJECT_CONVERTERS: + logger.warning("Received a series of %s, this should not happen", object_type) + return + + data = dict(objects) + if "_in_" in object_type: + storage.relation_add(relation=RelationType(object_type), data=data) + else: + getattr(storage, f"{object_type}_add")(data) diff --git a/swh/provenance/tests/test_replay.py b/swh/provenance/tests/test_replay.py new file mode 100644 --- /dev/null +++ b/swh/provenance/tests/test_replay.py @@ -0,0 +1,170 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools +from typing import Dict + +import psycopg2 +import pytest + +from swh.journal.client import JournalClient +from swh.provenance.algos.revision import revision_add +from swh.provenance.archive.interface import ArchiveInterface +from swh.provenance.model import RevisionEntry +from swh.provenance.provenance import Provenance +from swh.provenance.storage import get_provenance_storage +from swh.provenance.storage.interface import ( + EntityType, + ProvenanceStorageInterface, + RelationType, +) +from swh.provenance.storage.replay import ( + ProvenanceObjectDeserializer, + process_replay_objects, +) + +from .conftest import fill_storage, load_repo_data, ts2dt + + +@pytest.fixture(scope="function") +def object_types(): + """Set of object types to precreate topics for.""" + return { + # objects + "revision", + "directory", + "content", + "location", + # relations + "content_in_revision", + "content_in_directory", + "directory_in_revision", + } + + +@pytest.fixture() +def replayer_storage_and_client( + provenance_postgresqldb: Dict[str, str], + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: str, +): + cfg = { + "storage": { + "cls": "postgresql", + "db": provenance_postgresqldb, + "raise_on_commit": True, + }, + "journal_writer": { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer", + "prefix": kafka_prefix, + "anonymize": False, + }, + } + with get_provenance_storage(cls="journal", **cfg) as storage: + deserializer = ProvenanceObjectDeserializer() + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_on_eof=True, + value_deserializer=deserializer.convert, + ) + + yield storage, replayer + + +@pytest.fixture() +def secondary_db(provenance_postgresqldb: Dict[str, str]): + """Create a new test db + + the new db is named after the db configured in provenance_postgresqldb and + is using the same template as this later. + """ + dsn = provenance_postgresqldb.copy() + conn = psycopg2.connect( + dbname="postgres", + user=dsn["user"], + password=dsn.get("password"), + host=dsn["host"], + port=dsn["port"], + ) + conn.autocommit = True + with conn.cursor() as cur: + dbname = dsn["dbname"] + template_name = f"{dbname}_tmpl" + secondary_dbname = f"{dbname}_dst" + cur.execute(f'CREATE DATABASE "{secondary_dbname}" TEMPLATE "{template_name}"') + try: + dsn["dbname"] = secondary_dbname + yield dsn + finally: + with conn.cursor() as cur: + cur.execute(f'DROP DATABASE "{secondary_dbname}"') + + +@pytest.mark.kafka +@pytest.mark.parametrize( + "repo", + ( + "cmdbts2", + "out-of-order", + "with-merges", + ), +) +def test_provenance_replayer( + provenance_storage: ProvenanceStorageInterface, + archive: ArchiveInterface, + replayer_storage_and_client, + secondary_db, + repo: str, +): + """Optimal replayer scenario. + + This: + - writes objects to a provenance storage (which have a journal writer) + - replayer consumes objects from the topic and replays them + - a destination provenance storage is filled from this + + In the end, both storages should have the same content. + """ + # load test data and fill a swh-storage + data = load_repo_data(repo) + fill_storage(archive.storage, data) + + prov_sto_src, replayer = replayer_storage_and_client + + # Fill Kafka by filling the source provenance storage + revisions = [ + RevisionEntry( + id=revision["id"], + date=ts2dt(revision["date"]), + root=revision["directory"], + ) + for revision in data["revision"] + ] + + revision_add(Provenance(prov_sto_src), archive, revisions) + + # now replay the kafka log in a new provenance storage + with get_provenance_storage( + cls="postgresql", db=secondary_db, raise_on_commit=True + ) as prov_sto_dst: + worker_fn = functools.partial(process_replay_objects, storage=prov_sto_dst) + replayer.process(worker_fn) + + compare_provenance_storages(prov_sto_src, prov_sto_dst) + + +def compare_provenance_storages(sto1, sto2): + entities1 = {etype: sto1.entity_get_all(etype) for etype in EntityType} + entities2 = {etype: sto2.entity_get_all(etype) for etype in EntityType} + assert entities1 == entities2 + + relations1 = {rtype: sto1.relation_get_all(rtype) for rtype in RelationType} + relations2 = {rtype: sto2.relation_get_all(rtype) for rtype in RelationType} + assert relations1 == relations2