diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py --- a/swh/storage/tests/test_kafka_writer.py +++ b/swh/storage/tests/test_kafka_writer.py @@ -7,9 +7,15 @@ from swh.storage import get_storage from swh.model.model import Origin, OriginVisit +from swh.model.hypothesis_strategies import objects from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed from swh.journal.tests.journal_data import TEST_OBJECTS +from swh.model.model import Person +from attr import asdict, has +from hypothesis import given +from hypothesis.strategies import lists + def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): @@ -18,6 +24,7 @@ "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, + "anonymize": False, } storage_config = { "cls": "pipeline", @@ -28,9 +35,9 @@ expected_messages = 0 - for object_type, objects in TEST_OBJECTS.items(): - method = getattr(storage, object_type + "_add") - if object_type in ( + for obj_type, objs in TEST_OBJECTS.items(): + method = getattr(storage, obj_type + "_add") + if obj_type in ( "content", "skipped_content", "directory", @@ -39,10 +46,10 @@ "snapshot", "origin", ): - method(objects) - expected_messages += len(objects) - elif object_type in ("origin_visit",): - for obj in objects: + method(objs) + expected_messages += len(objs) + elif obj_type in ("origin_visit",): + for obj in objs: assert isinstance(obj, OriginVisit) storage.origin_add_one(Origin(url=obj.origin)) visit = method(obj.origin, date=obj.date, type=obj.type) @@ -54,7 +61,94 @@ storage.origin_visit_update(obj.origin, visit.visit, **obj_d) expected_messages += 1 else: - assert False, object_type + assert False, obj_type + + existing_topics = set( + topic + for topic in consumer.list_topics(timeout=10).topics.keys() + if topic.startswith(f"{kafka_prefix}.") # final . to exclude privileged topics + ) + assert existing_topics == { + f"{kafka_prefix}.{obj_type}" + for obj_type in ( + "content", + "directory", + "origin", + "origin_visit", + "release", + "revision", + "snapshot", + "skipped_content", + ) + } consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) + + +def test_storage_direct_writer_anonymized( + kafka_prefix: str, kafka_server, consumer: Consumer +): + + writer_config = { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer", + "prefix": kafka_prefix, + "anonymize": True, + } + storage_config = { + "cls": "pipeline", + "steps": [{"cls": "memory", "journal_writer": writer_config},], + } + + storage = get_storage(**storage_config) + + expected_messages = 0 + + for obj_type, objs in TEST_OBJECTS.items(): + if obj_type == "origin_visit": + # these have non-consistent API and are unrelated with what we + # want to test here + continue + method = getattr(storage, obj_type + "_add") + method(objs) + expected_messages += len(objs) + + existing_topics = set( + topic + for topic in consumer.list_topics(timeout=10).topics.keys() + if topic.startswith(kafka_prefix) + ) + assert existing_topics == { + f"{kafka_prefix}.{obj_type}" + for obj_type in ( + "content", + "directory", + "origin", + "origin_visit", + "release", + "revision", + "snapshot", + "skipped_content", + ) + } | { + f"{kafka_prefix}_privileged.{obj_type}" for obj_type in ("release", "revision",) + } + + +def check_anonymized_obj(obj): + if has(obj): + if isinstance(obj, Person): + assert obj.name is None + assert obj.email is None + assert len(obj.fullname) == 32 + else: + for key, value in asdict(obj, recurse=False).items(): + check_anonymized_obj(value) + + +@given(lists(objects(split_content=True))) +def test_anonymizer(obj_type_and_objs): + for obj_type, obj in obj_type_and_objs: + check_anonymized_obj(obj.anonymize()) diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -60,7 +60,7 @@ This: - writes objects to a source storage - - replayer consumes objects from the topic and replay them + - replayer consumes objects from the topic and replays them - a destination storage is filled from this In the end, both storages should have the same content. @@ -197,8 +197,8 @@ """Simple utility function to compare the content of 2 in_memory storages """ - expected_persons = set(src._persons) - got_persons = set(dst._persons) + expected_persons = set(src._persons.values()) + got_persons = set(dst._persons.values()) assert got_persons == expected_persons for attr in ( @@ -267,3 +267,102 @@ ) for i in range(n) ] + + +def test_storage_play_anonymized( + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str +): + """Optimal replayer scenario. + + This: + - writes objects to the topic + - replayer consumes objects from the topic and replay them + + """ + writer_config = { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer", + "prefix": kafka_prefix, + "anonymize": True, + } + src_config = {"cls": "memory", "journal_writer": writer_config} + + storage = get_storage(**src_config) + + # Fill the src storage + nb_sent = 0 + for obj_type, objs in TEST_OBJECTS.items(): + if obj_type == "origin_visit": + # these have non-consistent API and are unrelated with what we + # want to test here + continue + method = getattr(storage, obj_type + "_add") + method(objs) + nb_sent += len(objs) + + # Fill a destination storage from Kafka **using anonymized topics** + dst_storage = get_storage(cls="memory") + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=nb_sent, + privileged=False, + ) + worker_fn = functools.partial(process_replay_objects, storage=dst_storage) + + nb_inserted = replayer.process(worker_fn) + assert nb_sent == nb_inserted + check_replayed(storage, dst_storage, expected_anonymized=True) + + # Fill a destination storage from Kafka **with stock (non-anonymized) topics** + dst_storage = get_storage(cls="memory") + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=nb_sent, + privileged=True, + ) + worker_fn = functools.partial(process_replay_objects, storage=dst_storage) + + nb_inserted = replayer.process(worker_fn) + assert nb_sent == nb_inserted + check_replayed(storage, dst_storage, expected_anonymized=False) + + +def check_replayed(src, dst, expected_anonymized=False): + """Simple utility function to compare the content of 2 in_memory storages + + If expected_anonymized is True, objects from the source storage are anonymized + before comparing with the destination storage. + + """ + + def maybe_anonymize(obj): + if expected_anonymized: + return obj.anonymize() or obj + return obj + + expected_persons = {maybe_anonymize(person) for person in src._persons.values()} + got_persons = set(dst._persons.values()) + assert got_persons == expected_persons + + for attr in ( + "contents", + "skipped_contents", + "directories", + "revisions", + "releases", + "snapshots", + "origins", + ): + expected_objects = [ + (id, maybe_anonymize(obj)) + for id, obj in sorted(getattr(src, f"_{attr}").items()) + ] + got_objects = [ + (id, obj) for id, obj in sorted(getattr(dst, f"_{attr}").items()) + ] + assert got_objects == expected_objects, f"Mismatch object list for {attr}"