Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_kafka_writer.py
Show First 20 Lines • Show All 68 Lines • ▼ Show 20 Lines | |||||
def test_kafka_writer( | def test_kafka_writer( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
consumer: Consumer): | consumer: Consumer): | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
config = { | config = { | ||||
'brokers': 'localhost:%d' % kafka_server[1], | 'brokers': ['localhost:%d' % kafka_server[1]], | ||||
'client_id': 'kafka_writer', | 'client_id': 'kafka_writer', | ||||
'prefix': kafka_prefix, | 'prefix': kafka_prefix, | ||||
} | } | ||||
writer = KafkaJournalWriter(**config) | writer = KafkaJournalWriter(**config) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
Show All 11 Lines | |||||
def test_storage_direct_writer( | def test_storage_direct_writer( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
consumer: Consumer): | consumer: Consumer): | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
config = { | config = { | ||||
'brokers': 'localhost:%d' % kafka_server[1], | 'brokers': ['localhost:%d' % kafka_server[1]], | ||||
'client_id': 'kafka_writer', | 'client_id': 'kafka_writer', | ||||
'prefix': kafka_prefix, | 'prefix': kafka_prefix, | ||||
} | } | ||||
storage = get_storage('memory', {'journal_writer': { | storage = get_storage('memory', {'journal_writer': { | ||||
'cls': 'kafka', 'args': config}}) | 'cls': 'kafka', 'args': config}}) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
Show All 23 Lines |