diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py index 7e2dcd2a..332c2e95 100644 --- a/swh/storage/tests/test_kafka_writer.py +++ b/swh/storage/tests/test_kafka_writer.py @@ -1,60 +1,154 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from confluent_kafka import Consumer from swh.storage import get_storage from swh.model.model import Origin, OriginVisit +from swh.model.hypothesis_strategies import objects from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed from swh.journal.tests.journal_data import TEST_OBJECTS +from swh.model.model import Person +from attr import asdict, has +from hypothesis import given +from hypothesis.strategies import lists + def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, + "anonymize": False, } storage_config = { "cls": "pipeline", "steps": [{"cls": "memory", "journal_writer": writer_config},], } storage = get_storage(**storage_config) expected_messages = 0 - for object_type, objects in TEST_OBJECTS.items(): - method = getattr(storage, object_type + "_add") - if object_type in ( + for obj_type, objs in TEST_OBJECTS.items(): + method = getattr(storage, obj_type + "_add") + if obj_type in ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "origin", ): - method(objects) - expected_messages += len(objects) - elif object_type in ("origin_visit",): - for obj in objects: + method(objs) + expected_messages += len(objs) + elif obj_type in ("origin_visit",): + for obj in objs: assert isinstance(obj, OriginVisit) storage.origin_add_one(Origin(url=obj.origin)) visit = method(obj.origin, date=obj.date, type=obj.type) expected_messages += 1 obj_d = obj.to_dict() for k in ("visit", "origin", "date", "type"): del obj_d[k] storage.origin_visit_update(obj.origin, visit.visit, **obj_d) expected_messages += 1 else: - assert False, object_type + assert False, obj_type + + existing_topics = set( + topic + for topic in consumer.list_topics(timeout=10).topics.keys() + if topic.startswith(f"{kafka_prefix}.") # final . to exclude privileged topics + ) + assert existing_topics == { + f"{kafka_prefix}.{obj_type}" + for obj_type in ( + "content", + "directory", + "origin", + "origin_visit", + "release", + "revision", + "snapshot", + "skipped_content", + ) + } consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) + + +def test_storage_direct_writer_anonymized( + kafka_prefix: str, kafka_server, consumer: Consumer +): + + writer_config = { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer", + "prefix": kafka_prefix, + "anonymize": True, + } + storage_config = { + "cls": "pipeline", + "steps": [{"cls": "memory", "journal_writer": writer_config},], + } + + storage = get_storage(**storage_config) + + expected_messages = 0 + + for obj_type, objs in TEST_OBJECTS.items(): + if obj_type == "origin_visit": + # these have non-consistent API and are unrelated with what we + # want to test here + continue + method = getattr(storage, obj_type + "_add") + method(objs) + expected_messages += len(objs) + + existing_topics = set( + topic + for topic in consumer.list_topics(timeout=10).topics.keys() + if topic.startswith(kafka_prefix) + ) + assert existing_topics == { + f"{kafka_prefix}.{obj_type}" + for obj_type in ( + "content", + "directory", + "origin", + "origin_visit", + "release", + "revision", + "snapshot", + "skipped_content", + ) + } | { + f"{kafka_prefix}_privileged.{obj_type}" for obj_type in ("release", "revision",) + } + + +def check_anonymized_obj(obj): + if has(obj): + if isinstance(obj, Person): + assert obj.name is None + assert obj.email is None + assert len(obj.fullname) == 32 + else: + for key, value in asdict(obj, recurse=False).items(): + check_anonymized_obj(value) + + +@given(lists(objects(split_content=True))) +def test_anonymizer(obj_type_and_objs): + for obj_type, obj in obj_type_and_objs: + check_anonymized_obj(obj.anonymize()) diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py index 670d7431..7c8622b0 100644 --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -1,269 +1,368 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import logging from typing import Container, Dict, Optional import pytest from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS from swh.model.model import Content from swh.storage import get_storage from swh.storage.in_memory import InMemoryStorage from swh.storage.replay import process_replay_objects from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.client import JournalClient from swh.journal.tests.journal_data import ( TEST_OBJECTS, DUPLICATE_CONTENTS, ) UTC = datetime.timezone.utc @pytest.fixture() def replayer_storage_and_client( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str ): journal_writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, } storage_config = { "cls": "memory", "journal_writer": journal_writer_config, } storage = get_storage(**storage_config) replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, ) yield storage, replayer def test_storage_replayer(replayer_storage_and_client, caplog): """Optimal replayer scenario. This: - writes objects to a source storage - - replayer consumes objects from the topic and replay them + - replayer consumes objects from the topic and replays them - a destination storage is filled from this In the end, both storages should have the same content. """ src, replayer = replayer_storage_and_client # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): if object_type == "origin_visit": # src.origin_visit_upsert(objects) for visit in objects: src.origin_visit_add( origin_url=visit.origin, date=visit.date, type=visit.type ) else: method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted _check_replayed(src, dst) collision = 0 for record in caplog.records: logtext = record.getMessage() if "Colliding contents:" in logtext: collision += 1 assert collision == 0, "No collision should be detected" def test_storage_play_with_collision(replayer_storage_and_client, caplog): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ src, replayer = replayer_storage_and_client # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): if object_type == "origin_visit": for visit in objects: src.origin_visit_add( origin_url=visit.origin, date=visit.date, type=visit.type ) else: method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) # Create collision in input data # These should not be written in the destination producer = src.journal_writer.journal.producer prefix = src.journal_writer.journal._prefix for content in DUPLICATE_CONTENTS: topic = f"{prefix}.content" key = content["sha1"] producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted # check the logs for the collision being properly detected nb_collisions = 0 actual_collision: Dict for record in caplog.records: logtext = record.getMessage() if "Collision detected:" in logtext: nb_collisions += 1 actual_collision = record.args["collision"] assert nb_collisions == 1, "1 collision should be detected" algo = "sha1" assert actual_collision["algo"] == algo expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) assert actual_collision["hash"] == expected_colliding_hash actual_colliding_hashes = actual_collision["objects"] assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) for content in DUPLICATE_CONTENTS: expected_content_hashes = { k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items() } assert expected_content_hashes in actual_colliding_hashes # all objects from the src should exists in the dst storage _check_replayed(src, dst, exclude=["contents"]) # but the dst has one content more (one of the 2 colliding ones) assert len(src._contents) == len(dst._contents) - 1 def test_replay_skipped_content(replayer_storage_and_client): """Test the 'skipped_content' topic is properly replayed.""" src, replayer = replayer_storage_and_client _check_replay_skipped_content(src, replayer, "skipped_content") def test_replay_skipped_content_bwcompat(replayer_storage_and_client): """Test the 'content' topic can be used to replay SkippedContent objects.""" src, replayer = replayer_storage_and_client _check_replay_skipped_content(src, replayer, "content") # utility functions def _check_replayed( src: InMemoryStorage, dst: InMemoryStorage, exclude: Optional[Container] = None ): """Simple utility function to compare the content of 2 in_memory storages """ - expected_persons = set(src._persons) - got_persons = set(dst._persons) + expected_persons = set(src._persons.values()) + got_persons = set(dst._persons.values()) assert got_persons == expected_persons for attr in ( "contents", "skipped_contents", "directories", "revisions", "releases", "snapshots", "origins", "origin_visits", ): if exclude and attr in exclude: continue expected_objects = sorted(getattr(src, f"_{attr}").items()) got_objects = sorted(getattr(dst, f"_{attr}").items()) assert got_objects == expected_objects, f"Mismatch object list for {attr}" def _check_replay_skipped_content(storage, replayer, topic): skipped_contents = _gen_skipped_contents(100) nb_sent = len(skipped_contents) producer = storage.journal_writer.journal.producer prefix = storage.journal_writer.journal._prefix for i, obj in enumerate(skipped_contents): producer.produce( topic=f"{prefix}.{topic}", key=key_to_kafka({"sha1": obj["sha1"]}), value=value_to_kafka(obj), ) producer.flush() dst_storage = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst_storage) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted for content in skipped_contents: assert not storage.content_find({"sha1": content["sha1"]}) # no skipped_content_find API endpoint, so use this instead assert not list(dst_storage.skipped_content_missing(skipped_contents)) def _updated(d1, d2): d1.update(d2) d1.pop("data", None) return d1 def _gen_skipped_contents(n=10): # we do not use the hypothesis strategy here because this does not play well with # pytest fixtures, and it makes test execution very slow algos = DEFAULT_ALGORITHMS | {"length"} now = datetime.datetime.now(tz=UTC) return [ _updated( MultiHash.from_data(data=f"foo{i}".encode(), hash_names=algos).digest(), { "status": "absent", "reason": "why not", "origin": f"https://somewhere/{i}", "ctime": now, }, ) for i in range(n) ] + + +def test_storage_play_anonymized( + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str +): + """Optimal replayer scenario. + + This: + - writes objects to the topic + - replayer consumes objects from the topic and replay them + + """ + writer_config = { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer", + "prefix": kafka_prefix, + "anonymize": True, + } + src_config = {"cls": "memory", "journal_writer": writer_config} + + storage = get_storage(**src_config) + + # Fill the src storage + nb_sent = 0 + for obj_type, objs in TEST_OBJECTS.items(): + if obj_type == "origin_visit": + # these have non-consistent API and are unrelated with what we + # want to test here + continue + method = getattr(storage, obj_type + "_add") + method(objs) + nb_sent += len(objs) + + # Fill a destination storage from Kafka **using anonymized topics** + dst_storage = get_storage(cls="memory") + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=nb_sent, + privileged=False, + ) + worker_fn = functools.partial(process_replay_objects, storage=dst_storage) + + nb_inserted = replayer.process(worker_fn) + assert nb_sent == nb_inserted + check_replayed(storage, dst_storage, expected_anonymized=True) + + # Fill a destination storage from Kafka **with stock (non-anonymized) topics** + dst_storage = get_storage(cls="memory") + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=nb_sent, + privileged=True, + ) + worker_fn = functools.partial(process_replay_objects, storage=dst_storage) + + nb_inserted = replayer.process(worker_fn) + assert nb_sent == nb_inserted + check_replayed(storage, dst_storage, expected_anonymized=False) + + +def check_replayed(src, dst, expected_anonymized=False): + """Simple utility function to compare the content of 2 in_memory storages + + If expected_anonymized is True, objects from the source storage are anonymized + before comparing with the destination storage. + + """ + + def maybe_anonymize(obj): + if expected_anonymized: + return obj.anonymize() or obj + return obj + + expected_persons = {maybe_anonymize(person) for person in src._persons.values()} + got_persons = set(dst._persons.values()) + assert got_persons == expected_persons + + for attr in ( + "contents", + "skipped_contents", + "directories", + "revisions", + "releases", + "snapshots", + "origins", + ): + expected_objects = [ + (id, maybe_anonymize(obj)) + for id, obj in sorted(getattr(src, f"_{attr}").items()) + ] + got_objects = [ + (id, obj) for id, obj in sorted(getattr(dst, f"_{attr}").items()) + ] + assert got_objects == expected_objects, f"Mismatch object list for {attr}"