kafka_prefix = 'kuenwsbeld', kafka_consumer_group = 'test-consumer-kuenwsbeld'
kafka_server = '127.0.0.1:32835', count = 1
@pytest.mark.parametrize("count", [1, 2])
def test_client_stop_after_objects(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, count: int
):
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka
revisions = cast(List[Revision], TEST_OBJECTS["revision"])
for rev in revisions:
producer.produce(
topic=kafka_prefix + ".revision",
key=rev.id,
value=value_to_kafka(rev.to_dict()),
)
producer.flush()
client = JournalClient(
brokers=[kafka_server],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_on_eof=False,
stop_after_objects=count,
)
worker_fn = MagicMock()
client.process(worker_fn)
# this code below is not pretty, but needed since we have to deal with
# dicts (so no set) which can have values that are list vs tuple, and we do
# not know for sure how many calls of the worker_fn will happen during the
# consuming of the topic...
worker_fn.assert_called()
revs = [] # list of (unique) rev dicts we got from the client
for call in worker_fn.call_args_list:
> callrevs = call.args[0]["revision"]
E TypeError: string indices must be integers
.tox/py3/lib/python3.7/site-packages/swh/journal/tests/test_client.py:110: TypeError
TEST RESULT
TEST RESULT
- Run At
- Nov 16 2021, 2:07 PM