Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/utils.py
from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES | from swh.journal.client import JournalClient | ||||
from swh.journal.writer.kafka import KafkaJournalWriter | from swh.journal.writer.kafka import KafkaJournalWriter | ||||
from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka | from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka | ||||
class FakeKafkaMessage: | class FakeKafkaMessage: | ||||
def __init__(self, topic, key, value): | def __init__(self, topic, key, value): | ||||
self._topic = topic | self._topic = topic | ||||
self._key = key_to_kafka(key) | self._key = key_to_kafka(key) | ||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | def subscribe(self, topics): | ||||
if unknown_topics: | if unknown_topics: | ||||
raise ValueError("Unknown topics %s" % ", ".join(unknown_topics)) | raise ValueError("Unknown topics %s" % ", ".join(unknown_topics)) | ||||
def close(self): | def close(self): | ||||
pass | pass | ||||
class MockedJournalClient(JournalClient): | class MockedJournalClient(JournalClient): | ||||
def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): | def __init__(self, queue, object_types=None): | ||||
self._object_types = object_types | self._object_types = object_types | ||||
self.consumer = MockedKafkaConsumer(queue) | self.consumer = MockedKafkaConsumer(queue) | ||||
self.process_timeout = None | self.process_timeout = None | ||||
self.stop_after_objects = None | self.stop_after_objects = None | ||||
self.value_deserializer = kafka_to_value | self.value_deserializer = kafka_to_value | ||||
self.stop_on_eof = False | self.stop_on_eof = False | ||||
self.batch_size = 200 | self.batch_size = 200 |