Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/utils.py
Show First 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | """Mimic the confluent_kafka.Consumer API, producing the messages stored | ||||
You're only allowed to subscribe to topics in which the queue has | You're only allowed to subscribe to topics in which the queue has | ||||
messages. | messages. | ||||
""" | """ | ||||
def __init__(self, queue): | def __init__(self, queue): | ||||
self.queue = queue | self.queue = queue | ||||
self.committed = False | self.committed = False | ||||
def poll(self, timeout=None): | def consume(self, num_messages, timeout=None): | ||||
return self.queue.pop(0) | L = self.queue[0:num_messages] | ||||
self.queue[0:num_messages] = [] | |||||
return L | |||||
def commit(self): | def commit(self): | ||||
if self.queue == []: | if self.queue == []: | ||||
self.committed = True | self.committed = True | ||||
def list_topics(self, timeout=None): | def list_topics(self, timeout=None): | ||||
return set(message.topic() for message in self.queue) | return set(message.topic() for message in self.queue) | ||||
Show All 13 Lines |