Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_client.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Dict, List | from typing import Dict, List, cast | ||||
from unittest.mock import MagicMock | from unittest.mock import MagicMock | ||||
from confluent_kafka import Producer | from confluent_kafka import Producer | ||||
import pytest | import pytest | ||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka | ||||
from swh.model.model import Content | from swh.model.model import Content, Revision | ||||
from swh.model.tests.swh_model_data import TEST_OBJECTS | |||||
REV = { | REV = { | ||||
"message": b"something cool", | "message": b"something cool", | ||||
"author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"}, | "author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"}, | ||||
"committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None}, | "committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None}, | ||||
"date": { | "date": { | ||||
"timestamp": {"seconds": 123456789, "microseconds": 123}, | "timestamp": {"seconds": 123456789, "microseconds": 123}, | ||||
"offset": 120, | "offset": 120, | ||||
▲ Show 20 Lines • Show All 299 Lines • ▼ Show 20 Lines | client = JournalClient( | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=1, | stop_after_objects=1, | ||||
privileged=True, | privileged=True, | ||||
) | ) | ||||
# we also only subscribed to the standard prefix, since there is no priviled prefix | # we also only subscribed to the standard prefix, since there is no priviled prefix | ||||
# on the kafka broker | # on the kafka broker | ||||
assert client.subscription == [kafka_prefix + ".revision"] | assert client.subscription == [kafka_prefix + ".revision"] | ||||
def test_client_with_deserializer( | |||||
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str | |||||
): | |||||
producer = Producer( | |||||
{ | |||||
"bootstrap.servers": kafka_server, | |||||
"client.id": "test producer", | |||||
"acks": "all", | |||||
} | |||||
) | |||||
# Fill Kafka | |||||
revisions = cast(List[Revision], TEST_OBJECTS["revision"]) | |||||
for rev in revisions: | |||||
producer.produce( | |||||
topic=kafka_prefix + ".revision", | |||||
key=rev.id, | |||||
value=value_to_kafka(rev.to_dict()), | |||||
) | |||||
producer.flush() | |||||
def custom_deserializer(object_type, msg): | |||||
assert object_type == "revision" | |||||
obj = kafka_to_value(msg) | |||||
# filter the first revision | |||||
if obj["id"] == revisions[0].id: | |||||
return None | |||||
return Revision.from_dict(obj) | |||||
client = JournalClient( | |||||
brokers=[kafka_server], | |||||
group_id=kafka_consumer_group, | |||||
prefix=kafka_prefix, | |||||
stop_after_objects=1, | |||||
value_deserializer=custom_deserializer, | |||||
) | |||||
worker_fn = MagicMock() | |||||
client.process(worker_fn) | |||||
# check that the first Revision has not been passed to worker_fn | |||||
worker_fn.assert_called_once_with({"revision": revisions[1:]}) |