Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_kafka_writer.py
Show First 20 Lines • Show All 98 Lines • ▼ Show 20 Lines | |||||
def test_storage_direct_writer( | def test_storage_direct_writer( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
consumer: Consumer): | consumer: Consumer): | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
config = { | writer_config = { | ||||
'cls': 'kafka', | |||||
'brokers': ['localhost:%d' % kafka_server[1]], | 'brokers': ['localhost:%d' % kafka_server[1]], | ||||
'client_id': 'kafka_writer', | 'client_id': 'kafka_writer', | ||||
'prefix': kafka_prefix, | 'prefix': kafka_prefix, | ||||
'producer_config': { | 'producer_config': { | ||||
'message.max.bytes': 100000000, | 'message.max.bytes': 100000000, | ||||
} | } | ||||
} | } | ||||
storage_config = { | |||||
'cls': 'pipeline', | |||||
'steps': [ | |||||
{'cls': 'validate'}, | |||||
{'cls': 'memory', 'journal_writer': writer_config}, | |||||
] | |||||
} | |||||
storage = get_storage('memory', journal_writer={ | storage = get_storage(**storage_config) | ||||
'cls': 'kafka', **config, | |||||
}) | |||||
expected_messages = 0 | expected_messages = 0 | ||||
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
method = getattr(storage, object_type + '_add') | method = getattr(storage, object_type + '_add') | ||||
if object_type in ('content', 'directory', 'revision', 'release', | if object_type in ('content', 'directory', 'revision', 'release', | ||||
'snapshot', 'origin'): | 'snapshot', 'origin'): | ||||
if object_type == 'content': | if object_type == 'content': | ||||
Show All 18 Lines |