diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -270,7 +270,9 @@ else: _error_cb(error) continue - + if message.value() is None: + # ignore message with no payload, these can be generated in tests + continue nb_processed += 1 object_type = message.topic().split(".")[-1] objects[object_type].append(self.deserialize_message(message)) diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -11,7 +11,8 @@ import pytest -from confluent_kafka import Consumer, Producer, KafkaException +from confluent_kafka import Consumer, KafkaException, Producer +from confluent_kafka.admin import AdminClient from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS @@ -27,7 +28,10 @@ while fetched_messages < expected_messages: if retries_left == 0: - raise ValueError("Timed out fetching messages from kafka") + raise ValueError( + "Timed out fetching messages from kafka. " + f"Only {fetched_messages}/{expected_messages} fetched" + ) msg = consumer.poll(timeout=0.01) @@ -59,6 +63,9 @@ for object_type, known_values in TEST_OBJECT_DICTS.items(): known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] + if not consumed_messages[object_type]: + return + (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type == "origin_visit": @@ -87,41 +94,77 @@ return "test-consumer-%s" % kafka_prefix +@pytest.fixture(scope="function") +def object_types(): + """Set of object types to precreate topics for.""" + return set(TEST_OBJECT_DICTS.keys()) + + +@pytest.fixture(scope="function") +def kafka_server( + kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str] +) -> str: + """A kafka server with existing topics + + topics are built from the ``kafka_prefix`` and the ``object_types`` list""" + topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + + # unfortunately, the Mock broker does not support the CreatTopic admin API, so we + # have to create topics using a Producer. + producer = Producer( + { + "bootstrap.servers": kafka_server_base, + "client.id": "bootstrap producer", + "acks": "all", + } + ) + for topic in topics: + producer.produce(topic=topic, value=None) + for i in range(10): + if producer.flush(0.1) == 0: + break + + return kafka_server_base + + @pytest.fixture(scope="session") -def kafka_server() -> Iterator[str]: - p = Producer({"test.mock.num.brokers": "1"}) +def kafka_server_base() -> Iterator[str]: + """Create a mock kafka cluster suitable for tests. + + Yield a connection string. + + Note: this is a generator to keep the mock broker alive during the whole test + session. - metadata = p.list_topics() + see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h + """ + admin = AdminClient({"test.mock.num.brokers": "1"}) + + metadata = admin.list_topics() brokers = [str(broker) for broker in metadata.brokers.values()] assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" broker_connstr, broker_id = brokers[0].split("/") - ip, port_str = broker_connstr.split(":") - assert ip == "127.0.0.1" - assert int(port_str) - yield broker_connstr - p.flush() - TEST_CONFIG = { "consumer_id": "swh.journal.consumer", - "object_types": TEST_OBJECT_DICTS.keys(), "stop_after_objects": 1, # will read 1 object and stop "storage": {"cls": "memory", "args": {}}, } @pytest.fixture -def test_config(kafka_server: str, kafka_prefix: str): +def test_config(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]): """Test configuration needed for producer/consumer """ return { **TEST_CONFIG, - "brokers": [kafka_server], - "prefix": kafka_prefix + ".swh.journal.objects", + "object_types": object_types, + "brokers": [kafka_server_base], + "prefix": kafka_prefix, } diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py --- a/swh/journal/tests/journal_data.py +++ b/swh/journal/tests/journal_data.py @@ -125,10 +125,13 @@ TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { "content": CONTENTS, - "revision": REVISIONS, - "release": RELEASES, + "directory": [], "origin": ORIGINS, "origin_visit": ORIGIN_VISITS, + "release": RELEASES, + "revision": REVISIONS, + "snapshot": [], + "skipped_content": [], } MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -9,16 +9,38 @@ from confluent_kafka import Producer import pytest -from swh.model.hypothesis_strategies import revisions from swh.model.model import Content from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka +REV = { + "message": b"something cool", + "author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"}, + "committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None}, + "date": { + "timestamp": {"seconds": 123456789, "microseconds": 123}, + "offset": 120, + "negative_utc": False, + }, + "committer_date": { + "timestamp": {"seconds": 123123456, "microseconds": 0}, + "offset": 0, + "negative_utc": False, + }, + "type": "git", + "directory": ( + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x01\x02\x03\x04\x05" + ), + "synthetic": False, + "metadata": None, + "parents": [], + "id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c", +} -def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): - kafka_prefix += ".swh.journal.objects" +def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): producer = Producer( { "bootstrap.servers": kafka_server, @@ -27,13 +49,9 @@ } ) - rev = revisions().example() - # Fill Kafka producer.produce( - topic=kafka_prefix + ".revision", - key=key_to_kafka(rev.id), - value=value_to_kafka(rev.to_dict()), + topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() @@ -46,12 +64,10 @@ worker_fn = MagicMock() client.process(worker_fn) - worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) + worker_fn.assert_called_once_with({"revision": [REV]}) def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): - kafka_prefix += ".swh.journal.objects" - producer = Producer( { "bootstrap.servers": kafka_server, @@ -60,13 +76,9 @@ } ) - rev = revisions().example() - # Fill Kafka producer.produce( - topic=kafka_prefix + ".revision", - key=key_to_kafka(rev.id), - value=value_to_kafka(rev.to_dict()), + topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() @@ -81,15 +93,13 @@ worker_fn = MagicMock() client.process(worker_fn) - worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) + worker_fn.assert_called_once_with({"revision": [REV]}) @pytest.mark.parametrize("batch_size", [1, 5, 100]) def test_client_batch_size( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, ): - kafka_prefix += ".swh.journal.objects" - num_objects = 2 * batch_size + 1 assert num_objects < 256, "Too many objects, generation will fail" @@ -138,10 +148,10 @@ @pytest.fixture() -def kafka_producer(kafka_prefix: str, kafka_server: str): +def kafka_producer(kafka_prefix: str, kafka_server_base: str): producer = Producer( { - "bootstrap.servers": kafka_server, + "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } @@ -163,10 +173,10 @@ def test_client_subscribe_all( - kafka_producer: Producer, kafka_prefix: str, kafka_server: str + kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( - brokers=[kafka_server], + brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=2, @@ -184,10 +194,10 @@ def test_client_subscribe_one_topic( - kafka_producer: Producer, kafka_prefix: str, kafka_server: str + kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( - brokers=[kafka_server], + brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=1, @@ -201,11 +211,11 @@ def test_client_subscribe_absent_topic( - kafka_producer: Producer, kafka_prefix: str, kafka_server: str + kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( - brokers=[kafka_server], + brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=1, @@ -214,18 +224,18 @@ def test_client_subscribe_absent_prefix( - kafka_producer: Producer, kafka_prefix: str, kafka_server: str + kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( - brokers=[kafka_server], + brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_after_objects=1, ) with pytest.raises(ValueError): JournalClient( - brokers=[kafka_server], + brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_after_objects=1, diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -14,8 +14,6 @@ def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): - kafka_prefix += ".swh.journal.objects" - writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) @@ -78,7 +76,6 @@ def produce(self, **kwargs): produced.append(kwargs) - kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py new file mode 100644 --- /dev/null +++ b/swh/journal/tests/test_pytest_plugin.py @@ -0,0 +1,51 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Iterator +from confluent_kafka.admin import AdminClient + + +def test_kafka_server(kafka_server_base: str): + ip, port_str = kafka_server_base.split(":") + assert ip == "127.0.0.1" + assert int(port_str) + + admin = AdminClient({"bootstrap.servers": kafka_server_base}) + + topics = admin.list_topics() + + assert len(topics.brokers) == 1 + + +def test_kafka_server_with_topics( + kafka_server: str, kafka_prefix: str, object_types: Iterator[str] +): + admin = AdminClient({"bootstrap.servers": kafka_server}) + topics = { + topic + for topic in admin.list_topics().topics + if topic.startswith(f"{kafka_prefix}.") + } + assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types} + + +def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): + assert test_config == { + "consumer_id": "swh.journal.consumer", + "stop_after_objects": 1, + "storage": {"cls": "memory", "args": {}}, + "object_types": { + "content", + "directory", + "origin", + "origin_visit", + "release", + "revision", + "snapshot", + "skipped_content", + }, + "brokers": [kafka_server_base], + "prefix": kafka_prefix, + }