Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/utils.py
Show All 31 Lines | def send(self, topic, key, value): | ||||
msg = FakeKafkaMessage(topic=topic, key=key, value=value) | msg = FakeKafkaMessage(topic=topic, key=key, value=value) | ||||
self.queue.append(msg) | self.queue.append(msg) | ||||
def flush(self): | def flush(self): | ||||
pass | pass | ||||
class MockedKafkaConsumer: | class MockedKafkaConsumer: | ||||
"""Mimic the confluent_kafka.Consumer API, producing the messages stored | |||||
in `queue`. | |||||
You're only allowed to subscribe to topics in which the queue has | |||||
messages. | |||||
""" | |||||
def __init__(self, queue): | def __init__(self, queue): | ||||
self.queue = queue | self.queue = queue | ||||
self.committed = False | self.committed = False | ||||
def poll(self, timeout=None): | def poll(self, timeout=None): | ||||
return self.queue.pop(0) | return self.queue.pop(0) | ||||
def commit(self): | def commit(self): | ||||
if self.queue == []: | if self.queue == []: | ||||
self.committed = True | self.committed = True | ||||
def list_topics(self, timeout=None): | |||||
return set(message.topic() for message in self.queue) | |||||
def subscribe(self, topics): | |||||
unknown_topics = set(topics) - self.list_topics() | |||||
if unknown_topics: | |||||
raise ValueError('Unknown topics %s' % ', '.join(unknown_topics)) | |||||
class MockedJournalClient(JournalClient): | class MockedJournalClient(JournalClient): | ||||
def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): | def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): | ||||
self._object_types = object_types | self._object_types = object_types | ||||
self.consumer = MockedKafkaConsumer(queue) | self.consumer = MockedKafkaConsumer(queue) | ||||
self.process_timeout = 0 | self.process_timeout = 0 | ||||
self.max_messages = 0 | self.max_messages = 0 | ||||
self.value_deserializer = kafka_to_value | self.value_deserializer = kafka_to_value |