diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -205,7 +205,7 @@ TEST_CONFIG = { "consumer_id": "swh.journal.consumer", - "stop_after_objects": 1, # will read 1 object and stop + "stop_on_eof": True, "storage": {"cls": "memory", "args": {}}, } diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -59,7 +59,7 @@ brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=1, + stop_on_eof=True, ) worker_fn = MagicMock() client.process(worker_fn) @@ -67,7 +67,10 @@ worker_fn.assert_called_once_with({"revision": [REV]}) -def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): +@pytest.mark.parametrize("count", [1, 2]) +def test_client_stop_after_objects( + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, count: int +): producer = Producer( { "bootstrap.servers": kafka_server, @@ -77,23 +80,39 @@ ) # Fill Kafka - producer.produce( - topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), - ) + revisions = cast(List[Revision], TEST_OBJECTS["revision"]) + for rev in revisions: + producer.produce( + topic=kafka_prefix + ".revision", + key=rev.id, + value=value_to_kafka(rev.to_dict()), + ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=None, - stop_on_eof=True, + stop_on_eof=False, + stop_after_objects=count, ) worker_fn = MagicMock() client.process(worker_fn) - worker_fn.assert_called_once_with({"revision": [REV]}) + # this code below is not pretty, but needed since we have to deal with + # dicts (so no set) which can have values that are list vs tuple, and we do + # not know for sure how many calls of the worker_fn will happen during the + # consuming of the topic... + worker_fn.assert_called() + revs = [] # list of (unique) rev dicts we got from the client + for call in worker_fn.call_args_list: + callrevs = call.args[0]["revision"] + for rev in callrevs: + assert Revision.from_dict(rev) in revisions + if rev not in revs: + revs.append(rev) + assert len(revs) == count @pytest.mark.parametrize("batch_size", [1, 5, 100]) @@ -127,7 +146,7 @@ brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=num_objects, + stop_on_eof=True, batch_size=batch_size, ) @@ -179,7 +198,7 @@ brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, - stop_after_objects=2, + stop_on_eof=True, ) assert set(client.subscription) == { f"{kafka_prefix}.something", @@ -200,7 +219,7 @@ brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, - stop_after_objects=1, + stop_on_eof=True, object_types=["else"], ) assert client.subscription == [f"{kafka_prefix}.else"] @@ -218,7 +237,7 @@ brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, - stop_after_objects=1, + stop_on_eof=True, object_types=["really"], ) @@ -231,14 +250,14 @@ brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", - stop_after_objects=1, + stop_on_eof=True, ) with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", - stop_after_objects=1, + stop_on_eof=True, object_types=["else"], ) @@ -271,7 +290,7 @@ brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=1, + stop_on_eof=True, privileged=False, ) # we only subscribed to "standard" topics @@ -282,7 +301,6 @@ brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=1, privileged=True, ) # we only subscribed to "privileged" topics @@ -311,7 +329,7 @@ brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=1, + stop_on_eof=True, privileged=False, ) # we only subscribed to the standard prefix @@ -322,7 +340,7 @@ brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=1, + stop_on_eof=True, privileged=True, ) # we also only subscribed to the standard prefix, since there is no priviled prefix diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py --- a/swh/journal/tests/test_pytest_plugin.py +++ b/swh/journal/tests/test_pytest_plugin.py @@ -50,7 +50,7 @@ def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): assert test_config == { "consumer_id": "swh.journal.consumer", - "stop_after_objects": 1, + "stop_on_eof": True, "storage": {"cls": "memory", "args": {}}, "object_types": { "content",