Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_client.py
Show First 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | producer.produce( | ||||
topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), | topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), | ||||
) | ) | ||||
producer.flush() | producer.flush() | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=1, | stop_on_eof=True, | ||||
) | ) | ||||
worker_fn = MagicMock() | worker_fn = MagicMock() | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
worker_fn.assert_called_once_with({"revision": [REV]}) | worker_fn.assert_called_once_with({"revision": [REV]}) | ||||
def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): | @pytest.mark.parametrize("count", [1, 2]) | ||||
def test_client_stop_after_objects( | |||||
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, count: int | |||||
): | |||||
producer = Producer( | producer = Producer( | ||||
{ | { | ||||
"bootstrap.servers": kafka_server, | "bootstrap.servers": kafka_server, | ||||
"client.id": "test producer", | "client.id": "test producer", | ||||
"acks": "all", | "acks": "all", | ||||
} | } | ||||
) | ) | ||||
# Fill Kafka | # Fill Kafka | ||||
revisions = cast(List[Revision], TEST_OBJECTS["revision"]) | |||||
for rev in revisions: | |||||
producer.produce( | producer.produce( | ||||
topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), | topic=kafka_prefix + ".revision", | ||||
key=rev.id, | |||||
value=value_to_kafka(rev.to_dict()), | |||||
) | ) | ||||
producer.flush() | producer.flush() | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=None, | stop_on_eof=False, | ||||
stop_on_eof=True, | stop_after_objects=count, | ||||
) | ) | ||||
worker_fn = MagicMock() | worker_fn = MagicMock() | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
worker_fn.assert_called_once_with({"revision": [REV]}) | # this code below is not pretty, but needed since we have to deal with | ||||
# dicts (so no set) which can have values that are list vs tuple, and we do | |||||
# not know for sure how many calls of the worker_fn will happen during the | |||||
olasd: consuming -> consumption | |||||
# consumption of the topic... | |||||
worker_fn.assert_called() | |||||
revs = [] # list of (unique) rev dicts we got from the client | |||||
for call in worker_fn.call_args_list: | |||||
callrevs = call[0][0]["revision"] | |||||
for rev in callrevs: | |||||
assert Revision.from_dict(rev) in revisions | |||||
if rev not in revs: | |||||
revs.append(rev) | |||||
assert len(revs) == count | |||||
@pytest.mark.parametrize("batch_size", [1, 5, 100]) | @pytest.mark.parametrize("batch_size", [1, 5, 100]) | ||||
def test_client_batch_size( | def test_client_batch_size( | ||||
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, | kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, | ||||
): | ): | ||||
num_objects = 2 * batch_size + 1 | num_objects = 2 * batch_size + 1 | ||||
assert num_objects < 256, "Too many objects, generation will fail" | assert num_objects < 256, "Too many objects, generation will fail" | ||||
Show All 17 Lines | for content in contents: | ||||
) | ) | ||||
producer.flush() | producer.flush() | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=num_objects, | stop_on_eof=True, | ||||
batch_size=batch_size, | batch_size=batch_size, | ||||
) | ) | ||||
collected_output: List[Dict] = [] | collected_output: List[Dict] = [] | ||||
def worker_fn(objects): | def worker_fn(objects): | ||||
received = objects["content"] | received = objects["content"] | ||||
assert len(received) <= batch_size | assert len(received) <= batch_size | ||||
Show All 35 Lines | |||||
def test_client_subscribe_all( | def test_client_subscribe_all( | ||||
kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str | kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str | ||||
): | ): | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=[kafka_server_base], | brokers=[kafka_server_base], | ||||
group_id="whatever", | group_id="whatever", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=2, | stop_on_eof=True, | ||||
) | ) | ||||
assert set(client.subscription) == { | assert set(client.subscription) == { | ||||
f"{kafka_prefix}.something", | f"{kafka_prefix}.something", | ||||
f"{kafka_prefix}.else", | f"{kafka_prefix}.else", | ||||
} | } | ||||
worker_fn = MagicMock() | worker_fn = MagicMock() | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
worker_fn.assert_called_once_with( | worker_fn.assert_called_once_with( | ||||
{"something": ["value1"], "else": ["value2"],} | {"something": ["value1"], "else": ["value2"],} | ||||
) | ) | ||||
def test_client_subscribe_one_topic( | def test_client_subscribe_one_topic( | ||||
kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str | kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str | ||||
): | ): | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=[kafka_server_base], | brokers=[kafka_server_base], | ||||
group_id="whatever", | group_id="whatever", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=1, | stop_on_eof=True, | ||||
object_types=["else"], | object_types=["else"], | ||||
) | ) | ||||
assert client.subscription == [f"{kafka_prefix}.else"] | assert client.subscription == [f"{kafka_prefix}.else"] | ||||
worker_fn = MagicMock() | worker_fn = MagicMock() | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
worker_fn.assert_called_once_with({"else": ["value2"]}) | worker_fn.assert_called_once_with({"else": ["value2"]}) | ||||
def test_client_subscribe_absent_topic( | def test_client_subscribe_absent_topic( | ||||
kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str | kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str | ||||
): | ): | ||||
with pytest.raises(ValueError): | with pytest.raises(ValueError): | ||||
JournalClient( | JournalClient( | ||||
brokers=[kafka_server_base], | brokers=[kafka_server_base], | ||||
group_id="whatever", | group_id="whatever", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=1, | stop_on_eof=True, | ||||
object_types=["really"], | object_types=["really"], | ||||
) | ) | ||||
def test_client_subscribe_absent_prefix( | def test_client_subscribe_absent_prefix( | ||||
kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str | kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str | ||||
): | ): | ||||
with pytest.raises(ValueError): | with pytest.raises(ValueError): | ||||
JournalClient( | JournalClient( | ||||
brokers=[kafka_server_base], | brokers=[kafka_server_base], | ||||
group_id="whatever", | group_id="whatever", | ||||
prefix="wrong.prefix", | prefix="wrong.prefix", | ||||
stop_after_objects=1, | stop_on_eof=True, | ||||
) | ) | ||||
with pytest.raises(ValueError): | with pytest.raises(ValueError): | ||||
JournalClient( | JournalClient( | ||||
brokers=[kafka_server_base], | brokers=[kafka_server_base], | ||||
group_id="whatever", | group_id="whatever", | ||||
prefix="wrong.prefix", | prefix="wrong.prefix", | ||||
stop_after_objects=1, | stop_on_eof=True, | ||||
object_types=["else"], | object_types=["else"], | ||||
) | ) | ||||
def test_client_subscriptions_with_anonymized_topics( | def test_client_subscriptions_with_anonymized_topics( | ||||
kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str | kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str | ||||
): | ): | ||||
producer = Producer( | producer = Producer( | ||||
Show All 16 Lines | ): | ||||
) | ) | ||||
producer.flush() | producer.flush() | ||||
# without privileged "channels" activated on the client side | # without privileged "channels" activated on the client side | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=[kafka_server_base], | brokers=[kafka_server_base], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=1, | stop_on_eof=True, | ||||
privileged=False, | privileged=False, | ||||
) | ) | ||||
# we only subscribed to "standard" topics | # we only subscribed to "standard" topics | ||||
assert client.subscription == [kafka_prefix + ".revision"] | assert client.subscription == [kafka_prefix + ".revision"] | ||||
# with privileged "channels" activated on the client side | # with privileged "channels" activated on the client side | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=[kafka_server_base], | brokers=[kafka_server_base], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=1, | |||||
privileged=True, | privileged=True, | ||||
) | ) | ||||
# we only subscribed to "privileged" topics | # we only subscribed to "privileged" topics | ||||
assert client.subscription == [kafka_prefix + "_privileged.revision"] | assert client.subscription == [kafka_prefix + "_privileged.revision"] | ||||
def test_client_subscriptions_without_anonymized_topics( | def test_client_subscriptions_without_anonymized_topics( | ||||
kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str | kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str | ||||
Show All 12 Lines | ): | ||||
) | ) | ||||
producer.flush() | producer.flush() | ||||
# without privileged channel activated on the client side | # without privileged channel activated on the client side | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=[kafka_server_base], | brokers=[kafka_server_base], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=1, | stop_on_eof=True, | ||||
privileged=False, | privileged=False, | ||||
) | ) | ||||
# we only subscribed to the standard prefix | # we only subscribed to the standard prefix | ||||
assert client.subscription == [kafka_prefix + ".revision"] | assert client.subscription == [kafka_prefix + ".revision"] | ||||
# with privileged channel activated on the client side | # with privileged channel activated on the client side | ||||
client = JournalClient( | client = JournalClient( | ||||
brokers=[kafka_server_base], | brokers=[kafka_server_base], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=1, | stop_on_eof=True, | ||||
privileged=True, | privileged=True, | ||||
) | ) | ||||
# we also only subscribed to the standard prefix, since there is no priviled prefix | # we also only subscribed to the standard prefix, since there is no priviled prefix | ||||
# on the kafka broker | # on the kafka broker | ||||
assert client.subscription == [kafka_prefix + ".revision"] | assert client.subscription == [kafka_prefix + ".revision"] | ||||
def test_client_with_deserializer( | def test_client_with_deserializer( | ||||
▲ Show 20 Lines • Show All 44 Lines • Show Last 20 Lines |
consuming -> consumption