diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -269,8 +269,9 @@ ] +@pytest.mark.parametrize("privileged", [True, False]) def test_storage_play_anonymized( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: str + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, privileged: bool, ): """Optimal replayer scenario. @@ -278,6 +279,7 @@ - writes objects to the topic - replayer consumes objects from the topic and replay them + This tests the behavior with both a privileged and non-privileged replayer """ writer_config = { "cls": "kafka", @@ -300,37 +302,24 @@ method(objs) nb_sent += len(objs) - # Fill a destination storage from Kafka **using anonymized topics** + # Fill a destination storage from Kafka, potentially using privileged topics dst_storage = get_storage(cls="memory") replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, - privileged=False, + privileged=privileged, ) worker_fn = functools.partial(process_replay_objects, storage=dst_storage) nb_inserted = replayer.process(worker_fn) replayer.consumer.commit() - assert nb_sent == nb_inserted - check_replayed(storage, dst_storage, expected_anonymized=True) - - # Fill a destination storage from Kafka **with stock (non-anonymized) topics** - dst_storage = get_storage(cls="memory") - replayer = JournalClient( - brokers=kafka_server, - group_id=f"{kafka_consumer_group}-2", - prefix=kafka_prefix, - stop_after_objects=nb_sent, - privileged=True, - ) - worker_fn = functools.partial(process_replay_objects, storage=dst_storage) - nb_inserted = replayer.process(worker_fn) - replayer.consumer.commit() assert nb_sent == nb_inserted - check_replayed(storage, dst_storage, expected_anonymized=False) + # Check the contents of the destination storage, and whether the anonymization was + # properly used + check_replayed(storage, dst_storage, expected_anonymized=not privileged) def check_replayed(src, dst, expected_anonymized=False):