Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_client.py
Show All 21 Lines | def test_client( | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int]): | kafka_server: Tuple[Popen, int]): | ||||
(_, port) = kafka_server | (_, port) = kafka_server | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
producer = Producer({ | producer = Producer({ | ||||
'bootstrap.servers': 'localhost:{}'.format(port), | 'bootstrap.servers': 'localhost:{}'.format(port), | ||||
'client.id': 'test producer', | 'client.id': 'test producer', | ||||
'enable.idempotence': 'true', | 'acks': 'all', | ||||
}) | }) | ||||
rev = revisions().example() | rev = revisions().example() | ||||
# Fill Kafka | # Fill Kafka | ||||
producer.produce( | producer.produce( | ||||
topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), | topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), | ||||
value=value_to_kafka(rev.to_dict()), | value=value_to_kafka(rev.to_dict()), | ||||
Show All 17 Lines | def test_client_eof( | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int]): | kafka_server: Tuple[Popen, int]): | ||||
(_, port) = kafka_server | (_, port) = kafka_server | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
producer = Producer({ | producer = Producer({ | ||||
'bootstrap.servers': 'localhost:{}'.format(port), | 'bootstrap.servers': 'localhost:{}'.format(port), | ||||
'client.id': 'test producer', | 'client.id': 'test producer', | ||||
'enable.idempotence': 'true', | 'acks': 'all', | ||||
}) | }) | ||||
rev = revisions().example() | rev = revisions().example() | ||||
# Fill Kafka | # Fill Kafka | ||||
producer.produce( | producer.produce( | ||||
topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), | topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), | ||||
value=value_to_kafka(rev.to_dict()), | value=value_to_kafka(rev.to_dict()), | ||||
Show All 25 Lines | ): | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
num_objects = 2 * batch_size + 1 | num_objects = 2 * batch_size + 1 | ||||
assert num_objects < 256, "Too many objects, generation will fail" | assert num_objects < 256, "Too many objects, generation will fail" | ||||
producer = Producer({ | producer = Producer({ | ||||
'bootstrap.servers': 'localhost:{}'.format(port), | 'bootstrap.servers': 'localhost:{}'.format(port), | ||||
'client.id': 'test producer', | 'client.id': 'test producer', | ||||
'enable.idempotence': 'true', | 'acks': 'all', | ||||
}) | }) | ||||
contents = [Content.from_data(bytes([i])) for i in range(num_objects)] | contents = [Content.from_data(bytes([i])) for i in range(num_objects)] | ||||
# Fill Kafka | # Fill Kafka | ||||
for content in contents: | for content in contents: | ||||
producer.produce( | producer.produce( | ||||
topic=kafka_prefix + '.content', | topic=kafka_prefix + '.content', | ||||
Show All 24 Lines |