Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_client.py
Show All 32 Lines | def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): | ||||
producer.produce( | producer.produce( | ||||
topic=kafka_prefix + ".revision", | topic=kafka_prefix + ".revision", | ||||
key=key_to_kafka(rev.id), | key=key_to_kafka(rev.id), | ||||
value=value_to_kafka(rev.to_dict()), | value=value_to_kafka(rev.to_dict()), | ||||
) | ) | ||||
producer.flush() | producer.flush() | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=kafka_server, | brokers=[kafka_server], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=1, | stop_after_objects=1, | ||||
) | ) | ||||
worker_fn = MagicMock() | worker_fn = MagicMock() | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) | worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) | ||||
Show All 16 Lines | def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): | ||||
producer.produce( | producer.produce( | ||||
topic=kafka_prefix + ".revision", | topic=kafka_prefix + ".revision", | ||||
key=key_to_kafka(rev.id), | key=key_to_kafka(rev.id), | ||||
value=value_to_kafka(rev.to_dict()), | value=value_to_kafka(rev.to_dict()), | ||||
) | ) | ||||
producer.flush() | producer.flush() | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=kafka_server, | brokers=[kafka_server], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=None, | stop_after_objects=None, | ||||
stop_on_eof=True, | stop_on_eof=True, | ||||
) | ) | ||||
worker_fn = MagicMock() | worker_fn = MagicMock() | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | ): | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
expected_output = [content.to_dict() for content in contents] | expected_output = [content.to_dict() for content in contents] | ||||
assert len(collected_output) == len(expected_output) | assert len(collected_output) == len(expected_output) | ||||
for output in collected_output: | for output in collected_output: | ||||
assert output in expected_output | assert output in expected_output | ||||
@pytest.fixture() | |||||
def kafka_producer(kafka_prefix: str, kafka_server: str): | |||||
producer = Producer( | |||||
{ | |||||
"bootstrap.servers": kafka_server, | |||||
"client.id": "test producer", | |||||
"acks": "all", | |||||
} | |||||
) | |||||
# Fill Kafka | |||||
producer.produce( | |||||
topic=kafka_prefix + ".something", | |||||
key=key_to_kafka(b"key1"), | |||||
value=value_to_kafka("value1"), | |||||
) | |||||
producer.produce( | |||||
topic=kafka_prefix + ".else", | |||||
key=key_to_kafka(b"key1"), | |||||
value=value_to_kafka("value2"), | |||||
) | |||||
producer.flush() | |||||
return producer | |||||
def test_client_subscribe_all( | |||||
kafka_producer: Producer, kafka_prefix: str, kafka_server: str | |||||
): | |||||
client = JournalClient( | |||||
brokers=[kafka_server], | |||||
group_id="whatever", | |||||
prefix=kafka_prefix, | |||||
stop_after_objects=2, | |||||
) | |||||
assert set(client.subscription) == { | |||||
f"{kafka_prefix}.something", | |||||
f"{kafka_prefix}.else", | |||||
} | |||||
worker_fn = MagicMock() | |||||
client.process(worker_fn) | |||||
worker_fn.assert_called_once_with( | |||||
{"something": ["value1"], "else": ["value2"],} | |||||
) | |||||
def test_client_subscribe_one_topic( | |||||
kafka_producer: Producer, kafka_prefix: str, kafka_server: str | |||||
): | |||||
client = JournalClient( | |||||
brokers=[kafka_server], | |||||
group_id="whatever", | |||||
prefix=kafka_prefix, | |||||
stop_after_objects=1, | |||||
object_types=["else"], | |||||
) | |||||
assert client.subscription == [f"{kafka_prefix}.else"] | |||||
worker_fn = MagicMock() | |||||
client.process(worker_fn) | |||||
worker_fn.assert_called_once_with({"else": ["value2"]}) | |||||
def test_client_subscribe_absent_topic( | |||||
kafka_producer: Producer, kafka_prefix: str, kafka_server: str | |||||
): | |||||
with pytest.raises(ValueError): | |||||
JournalClient( | |||||
brokers=[kafka_server], | |||||
group_id="whatever", | |||||
prefix=kafka_prefix, | |||||
stop_after_objects=1, | |||||
object_types=["really"], | |||||
) | |||||
def test_client_subscribe_absent_prefix( | |||||
kafka_producer: Producer, kafka_prefix: str, kafka_server: str | |||||
): | |||||
with pytest.raises(ValueError): | |||||
JournalClient( | |||||
brokers=[kafka_server], | |||||
group_id="whatever", | |||||
prefix="wrong.prefix", | |||||
stop_after_objects=1, | |||||
) | |||||
with pytest.raises(ValueError): | |||||
JournalClient( | |||||
brokers=[kafka_server], | |||||
group_id="whatever", | |||||
prefix="wrong.prefix", | |||||
stop_after_objects=1, | |||||
object_types=["else"], | |||||
) |