Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_client.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Dict, List | from typing import Dict, List | ||||
from unittest.mock import MagicMock | from unittest.mock import MagicMock | ||||
from confluent_kafka import Producer | from confluent_kafka import Producer | ||||
import pytest | import pytest | ||||
from swh.model.hypothesis_strategies import revisions | |||||
from swh.model.model import Content | from swh.model.model import Content | ||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import key_to_kafka, value_to_kafka | ||||
REV = { | |||||
"message": b"something cool", | |||||
"author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"}, | |||||
"committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None}, | |||||
"date": { | |||||
"timestamp": {"seconds": 123456789, "microseconds": 123}, | |||||
"offset": 120, | |||||
"negative_utc": False, | |||||
}, | |||||
"committer_date": { | |||||
"timestamp": {"seconds": 123123456, "microseconds": 0}, | |||||
"offset": 0, | |||||
"negative_utc": False, | |||||
}, | |||||
"type": "git", | |||||
"directory": ( | |||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" | |||||
b"\x01\x02\x03\x04\x05" | |||||
), | |||||
"synthetic": False, | |||||
"metadata": None, | |||||
"parents": [], | |||||
"id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c", | |||||
} | |||||
def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): | |||||
kafka_prefix += ".swh.journal.objects" | |||||
def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): | |||||
producer = Producer( | producer = Producer( | ||||
{ | { | ||||
"bootstrap.servers": kafka_server, | "bootstrap.servers": kafka_server, | ||||
"client.id": "test producer", | "client.id": "test producer", | ||||
"acks": "all", | "acks": "all", | ||||
} | } | ||||
) | ) | ||||
rev = revisions().example() | |||||
# Fill Kafka | # Fill Kafka | ||||
producer.produce( | producer.produce( | ||||
topic=kafka_prefix + ".revision", | topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), | ||||
key=key_to_kafka(rev.id), | |||||
value=value_to_kafka(rev.to_dict()), | |||||
) | ) | ||||
producer.flush() | producer.flush() | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=1, | stop_after_objects=1, | ||||
) | ) | ||||
worker_fn = MagicMock() | worker_fn = MagicMock() | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) | worker_fn.assert_called_once_with({"revision": [REV]}) | ||||
def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): | def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): | ||||
kafka_prefix += ".swh.journal.objects" | |||||
producer = Producer( | producer = Producer( | ||||
{ | { | ||||
"bootstrap.servers": kafka_server, | "bootstrap.servers": kafka_server, | ||||
"client.id": "test producer", | "client.id": "test producer", | ||||
"acks": "all", | "acks": "all", | ||||
} | } | ||||
) | ) | ||||
rev = revisions().example() | |||||
# Fill Kafka | # Fill Kafka | ||||
producer.produce( | producer.produce( | ||||
topic=kafka_prefix + ".revision", | topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), | ||||
key=key_to_kafka(rev.id), | |||||
value=value_to_kafka(rev.to_dict()), | |||||
) | ) | ||||
producer.flush() | producer.flush() | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=None, | stop_after_objects=None, | ||||
stop_on_eof=True, | stop_on_eof=True, | ||||
) | ) | ||||
worker_fn = MagicMock() | worker_fn = MagicMock() | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) | worker_fn.assert_called_once_with({"revision": [REV]}) | ||||
@pytest.mark.parametrize("batch_size", [1, 5, 100]) | @pytest.mark.parametrize("batch_size", [1, 5, 100]) | ||||
def test_client_batch_size( | def test_client_batch_size( | ||||
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, | kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, | ||||
): | ): | ||||
kafka_prefix += ".swh.journal.objects" | |||||
num_objects = 2 * batch_size + 1 | num_objects = 2 * batch_size + 1 | ||||
assert num_objects < 256, "Too many objects, generation will fail" | assert num_objects < 256, "Too many objects, generation will fail" | ||||
producer = Producer( | producer = Producer( | ||||
{ | { | ||||
"bootstrap.servers": kafka_server, | "bootstrap.servers": kafka_server, | ||||
"client.id": "test producer", | "client.id": "test producer", | ||||
"acks": "all", | "acks": "all", | ||||
Show All 32 Lines | ): | ||||
expected_output = [content.to_dict() for content in contents] | expected_output = [content.to_dict() for content in contents] | ||||
assert len(collected_output) == len(expected_output) | assert len(collected_output) == len(expected_output) | ||||
for output in collected_output: | for output in collected_output: | ||||
assert output in expected_output | assert output in expected_output | ||||
@pytest.fixture() | @pytest.fixture() | ||||
def kafka_producer(kafka_prefix: str, kafka_server: str): | def kafka_producer(kafka_prefix: str, kafka_server_base: str): | ||||
producer = Producer( | producer = Producer( | ||||
{ | { | ||||
"bootstrap.servers": kafka_server, | "bootstrap.servers": kafka_server_base, | ||||
"client.id": "test producer", | "client.id": "test producer", | ||||
"acks": "all", | "acks": "all", | ||||
} | } | ||||
) | ) | ||||
# Fill Kafka | # Fill Kafka | ||||
producer.produce( | producer.produce( | ||||
topic=kafka_prefix + ".something", | topic=kafka_prefix + ".something", | ||||
key=key_to_kafka(b"key1"), | key=key_to_kafka(b"key1"), | ||||
value=value_to_kafka("value1"), | value=value_to_kafka("value1"), | ||||
) | ) | ||||
producer.produce( | producer.produce( | ||||
topic=kafka_prefix + ".else", | topic=kafka_prefix + ".else", | ||||
key=key_to_kafka(b"key1"), | key=key_to_kafka(b"key1"), | ||||
value=value_to_kafka("value2"), | value=value_to_kafka("value2"), | ||||
) | ) | ||||
producer.flush() | producer.flush() | ||||
return producer | return producer | ||||
def test_client_subscribe_all( | def test_client_subscribe_all( | ||||
kafka_producer: Producer, kafka_prefix: str, kafka_server: str | kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str | ||||
): | ): | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=[kafka_server], | brokers=[kafka_server_base], | ||||
group_id="whatever", | group_id="whatever", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=2, | stop_after_objects=2, | ||||
) | ) | ||||
assert set(client.subscription) == { | assert set(client.subscription) == { | ||||
f"{kafka_prefix}.something", | f"{kafka_prefix}.something", | ||||
f"{kafka_prefix}.else", | f"{kafka_prefix}.else", | ||||
} | } | ||||
worker_fn = MagicMock() | worker_fn = MagicMock() | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
worker_fn.assert_called_once_with( | worker_fn.assert_called_once_with( | ||||
{"something": ["value1"], "else": ["value2"],} | {"something": ["value1"], "else": ["value2"],} | ||||
) | ) | ||||
def test_client_subscribe_one_topic( | def test_client_subscribe_one_topic( | ||||
kafka_producer: Producer, kafka_prefix: str, kafka_server: str | kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str | ||||
): | ): | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=[kafka_server], | brokers=[kafka_server_base], | ||||
group_id="whatever", | group_id="whatever", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=1, | stop_after_objects=1, | ||||
object_types=["else"], | object_types=["else"], | ||||
) | ) | ||||
assert client.subscription == [f"{kafka_prefix}.else"] | assert client.subscription == [f"{kafka_prefix}.else"] | ||||
worker_fn = MagicMock() | worker_fn = MagicMock() | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
worker_fn.assert_called_once_with({"else": ["value2"]}) | worker_fn.assert_called_once_with({"else": ["value2"]}) | ||||
def test_client_subscribe_absent_topic( | def test_client_subscribe_absent_topic( | ||||
kafka_producer: Producer, kafka_prefix: str, kafka_server: str | kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str | ||||
): | ): | ||||
with pytest.raises(ValueError): | with pytest.raises(ValueError): | ||||
JournalClient( | JournalClient( | ||||
brokers=[kafka_server], | brokers=[kafka_server_base], | ||||
group_id="whatever", | group_id="whatever", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=1, | stop_after_objects=1, | ||||
object_types=["really"], | object_types=["really"], | ||||
) | ) | ||||
def test_client_subscribe_absent_prefix( | def test_client_subscribe_absent_prefix( | ||||
kafka_producer: Producer, kafka_prefix: str, kafka_server: str | kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str | ||||
): | ): | ||||
with pytest.raises(ValueError): | with pytest.raises(ValueError): | ||||
JournalClient( | JournalClient( | ||||
brokers=[kafka_server], | brokers=[kafka_server_base], | ||||
group_id="whatever", | group_id="whatever", | ||||
prefix="wrong.prefix", | prefix="wrong.prefix", | ||||
stop_after_objects=1, | stop_after_objects=1, | ||||
) | ) | ||||
with pytest.raises(ValueError): | with pytest.raises(ValueError): | ||||
JournalClient( | JournalClient( | ||||
brokers=[kafka_server], | brokers=[kafka_server_base], | ||||
group_id="whatever", | group_id="whatever", | ||||
prefix="wrong.prefix", | prefix="wrong.prefix", | ||||
stop_after_objects=1, | stop_after_objects=1, | ||||
object_types=["else"], | object_types=["else"], | ||||
) | ) |