diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -37,6 +37,12 @@ class MockedKafkaConsumer: + """Mimic the confluent_kafka.Consumer API, producing the messages stored + in `queue`. + + You're only allowed to subscribe to topics in which the queue has + messages. + """ def __init__(self, queue): self.queue = queue self.committed = False @@ -48,6 +54,14 @@ if self.queue == []: self.committed = True + def list_topics(self, timeout=None): + return set(message.topic() for message in self.queue) + + def subscribe(self, topics): + unknown_topics = set(topics) - self.list_topics() + if unknown_topics: + raise ValueError('Unknown topics %s' % ', '.join(unknown_topics)) + class MockedJournalClient(JournalClient): def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES):