diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py index 01bbb51..0c08958 100644 --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -1,79 +1,80 @@ from swh.journal.client import JournalClient from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka class FakeKafkaMessage: def __init__(self, topic, key, value): self._topic = topic self._key = key_to_kafka(key) self._value = value_to_kafka(value) def topic(self): return self._topic def value(self): return self._value def key(self): return self._key def error(self): return None class MockedKafkaWriter(KafkaJournalWriter): - def __init__(self, queue): + def __init__(self, queue, anonymize: bool = False): self._prefix = "prefix" self.queue = queue + self.anonymize = anonymize def send(self, topic, key, value): msg = FakeKafkaMessage(topic=topic, key=key, value=value) self.queue.append(msg) def flush(self): pass class MockedKafkaConsumer: """Mimic the confluent_kafka.Consumer API, producing the messages stored in `queue`. You're only allowed to subscribe to topics in which the queue has messages. """ def __init__(self, queue): self.queue = queue self.committed = False def consume(self, num_messages, timeout=None): L = self.queue[0:num_messages] self.queue[0:num_messages] = [] return L def commit(self): if self.queue == []: self.committed = True def list_topics(self, timeout=None): return set(message.topic() for message in self.queue) def subscribe(self, topics): unknown_topics = set(topics) - self.list_topics() if unknown_topics: raise ValueError("Unknown topics %s" % ", ".join(unknown_topics)) def close(self): pass class MockedJournalClient(JournalClient): def __init__(self, queue, object_types=None): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) self.process_timeout = None self.stop_after_objects = None self.value_deserializer = kafka_to_value self.stop_on_eof = False self.batch_size = 200