kafka_prefix = 'ijagrmzwck', kafka_consumer_group = 'test-consumer-ijagrmzwck'
kafka_server = '127.0.0.1:59549'
def test_client_with_deserializer(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str
):
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka
revisions = cast(List[Revision], TEST_OBJECTS["revision"])
for rev in revisions:
producer.produce(
topic=kafka_prefix + ".revision",
key=rev.id,
value=value_to_kafka(rev.to_dict()),
)
producer.flush()
def custom_deserializer(object_type, msg):
assert object_type == "revision"
obj = kafka_to_value(msg)
# filter the first revision
if obj["id"] == revisions[0].id:
return None
return Revision.from_dict(obj)
client = JournalClient(
brokers=[kafka_server],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_on_eof=True,
value_deserializer=custom_deserializer,
)
worker_fn = MagicMock()
client.process(worker_fn)
# a commit seems to be needed to prevent some race condition situation
# where the worker_fn has not yet been called at this point (not sure how)
client.consumer.commit()
# Check the first revision has not been passed to worker_fn
> processed_revisions = set(worker_fn.call_args.args[0]["revision"])
E TypeError: string indices must be integers
.tox/py3/lib/python3.7/site-packages/swh/journal/tests/test_client.py:395: TypeError
TEST RESULT
TEST RESULT
- Run At
- Mar 24 2022, 2:38 PM