Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/utils.py
from collections import namedtuple | |||||
from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES | from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES | ||||
from swh.journal.direct_writer import DirectKafkaWriter | from swh.journal.direct_writer import DirectKafkaWriter | ||||
from swh.journal.serializers import ( | from swh.journal.serializers import (kafka_to_value, key_to_kafka, | ||||
key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) | value_to_kafka) | ||||
class FakeKafkaMessage: | |||||
def __init__(self, topic, key, value): | |||||
self._topic = topic | |||||
self._key = key_to_kafka(key) | |||||
self._value = value_to_kafka(value) | |||||
def topic(self): | |||||
return self._topic | |||||
FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') | def value(self): | ||||
FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') | return self._value | ||||
def key(self): | |||||
return self._key | |||||
def error(self): | |||||
return None | |||||
class MockedKafkaWriter(DirectKafkaWriter): | class MockedKafkaWriter(DirectKafkaWriter): | ||||
def __init__(self, queue): | def __init__(self, queue): | ||||
self._prefix = 'prefix' | self._prefix = 'prefix' | ||||
self.queue = queue | self.queue = queue | ||||
def send(self, topic, key, value): | def send(self, topic, key, value): | ||||
key = kafka_to_key(key_to_kafka(key)) | msg = FakeKafkaMessage(topic=topic, key=key, value=value) | ||||
value = kafka_to_value(value_to_kafka(value)) | self.queue.append(msg) | ||||
partition = FakeKafkaPartition(topic) | |||||
msg = FakeKafkaMessage(key=key, value=value) | def flush(self): | ||||
if self.queue and {partition} == set(self.queue[-1]): | pass | ||||
# The last message is of the same object type, groupping them | |||||
self.queue[-1][partition].append(msg) | |||||
else: | |||||
self.queue.append({partition: [msg]}) | |||||
class MockedKafkaConsumer: | class MockedKafkaConsumer: | ||||
def __init__(self, queue): | def __init__(self, queue): | ||||
self.queue = queue | self.queue = queue | ||||
self.committed = False | self.committed = False | ||||
def poll(self): | def poll(self, timeout=None): | ||||
return self.queue.pop(0) | return self.queue.pop(0) | ||||
def commit(self): | def commit(self): | ||||
if self.queue == []: | if self.queue == []: | ||||
self.committed = True | self.committed = True | ||||
class MockedJournalClient(JournalClient): | class MockedJournalClient(JournalClient): | ||||
def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): | def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): | ||||
self._object_types = object_types | self._object_types = object_types | ||||
self.consumer = MockedKafkaConsumer(queue) | self.consumer = MockedKafkaConsumer(queue) | ||||
self.process_timeout = 0 | |||||
self.max_messages = 0 | |||||
self.value_deserializer = kafka_to_value |