Page MenuHomeSoftware Heritage

D3130.id11120.diff
No OneTemporary

D3130.id11120.diff

diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -270,7 +270,9 @@
else:
_error_cb(error)
continue
-
+ if message.value() is None:
+ # ignore message with no payload, these can be generated in tests
+ continue
nb_processed += 1
object_type = message.topic().split(".")[-1]
objects[object_type].append(self.deserialize_message(message))
diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py
--- a/swh/journal/pytest_plugin.py
+++ b/swh/journal/pytest_plugin.py
@@ -11,7 +11,8 @@
import pytest
-from confluent_kafka import Consumer, Producer, KafkaException
+from confluent_kafka import Consumer, KafkaException, Producer
+from confluent_kafka.admin import AdminClient
from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value
from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS
@@ -27,7 +28,10 @@
while fetched_messages < expected_messages:
if retries_left == 0:
- raise ValueError("Timed out fetching messages from kafka")
+ raise ValueError(
+ "Timed out fetching messages from kafka. "
+ f"Only {fetched_messages}/{expected_messages} fetched"
+ )
msg = consumer.poll(timeout=0.01)
@@ -59,6 +63,9 @@
for object_type, known_values in TEST_OBJECT_DICTS.items():
known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]]
+ if not consumed_messages[object_type]:
+ return
+
(received_keys, received_values) = zip(*consumed_messages[object_type])
if object_type == "origin_visit":
@@ -87,41 +94,77 @@
return "test-consumer-%s" % kafka_prefix
+@pytest.fixture(scope="function")
+def object_types():
+ """Set of object types to precreate topics for."""
+ return set(TEST_OBJECT_DICTS.keys())
+
+
+@pytest.fixture(scope="function")
+def kafka_server(
+ kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]
+) -> str:
+ """A kafka server with existing topics
+
+ topics are built from the ``kafka_prefix`` and the ``object_types`` list"""
+ topics = [f"{kafka_prefix}.{obj}" for obj in object_types]
+
+ # unfortunately, the Mock broker does not support the CreatTopic admin API, so we
+ # have to create topics using a Producer.
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server_base,
+ "client.id": "bootstrap producer",
+ "acks": "all",
+ }
+ )
+ for topic in topics:
+ producer.produce(topic=topic, value=None)
+ for i in range(10):
+ if producer.flush(0.1) == 0:
+ break
+
+ return kafka_server_base
+
+
@pytest.fixture(scope="session")
-def kafka_server() -> Iterator[str]:
- p = Producer({"test.mock.num.brokers": "1"})
+def kafka_server_base() -> Iterator[str]:
+ """Create a mock kafka cluster suitable for tests.
+
+ Yield a connection string.
+
+ Note: this is a generator to keep the mock broker alive during the whole test
+ session.
- metadata = p.list_topics()
+ see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h
+ """
+ admin = AdminClient({"test.mock.num.brokers": "1"})
+
+ metadata = admin.list_topics()
brokers = [str(broker) for broker in metadata.brokers.values()]
assert len(brokers) == 1, "More than one broker found in the kafka cluster?!"
broker_connstr, broker_id = brokers[0].split("/")
- ip, port_str = broker_connstr.split(":")
- assert ip == "127.0.0.1"
- assert int(port_str)
-
yield broker_connstr
- p.flush()
-
TEST_CONFIG = {
"consumer_id": "swh.journal.consumer",
- "object_types": TEST_OBJECT_DICTS.keys(),
"stop_after_objects": 1, # will read 1 object and stop
"storage": {"cls": "memory", "args": {}},
}
@pytest.fixture
-def test_config(kafka_server: str, kafka_prefix: str):
+def test_config(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]):
"""Test configuration needed for producer/consumer
"""
return {
**TEST_CONFIG,
- "brokers": [kafka_server],
- "prefix": kafka_prefix + ".swh.journal.objects",
+ "object_types": object_types,
+ "brokers": [kafka_server_base],
+ "prefix": kafka_prefix,
}
diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py
--- a/swh/journal/tests/journal_data.py
+++ b/swh/journal/tests/journal_data.py
@@ -125,10 +125,13 @@
TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = {
"content": CONTENTS,
- "revision": REVISIONS,
- "release": RELEASES,
+ "directory": [],
"origin": ORIGINS,
"origin_visit": ORIGIN_VISITS,
+ "release": RELEASES,
+ "revision": REVISIONS,
+ "snapshot": [],
+ "skipped_content": [],
}
MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()}
diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py
--- a/swh/journal/tests/test_client.py
+++ b/swh/journal/tests/test_client.py
@@ -9,16 +9,38 @@
from confluent_kafka import Producer
import pytest
-from swh.model.hypothesis_strategies import revisions
from swh.model.model import Content
from swh.journal.client import JournalClient
from swh.journal.serializers import key_to_kafka, value_to_kafka
+REV = {
+ "message": b"something cool",
+ "author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"},
+ "committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None},
+ "date": {
+ "timestamp": {"seconds": 123456789, "microseconds": 123},
+ "offset": 120,
+ "negative_utc": False,
+ },
+ "committer_date": {
+ "timestamp": {"seconds": 123123456, "microseconds": 0},
+ "offset": 0,
+ "negative_utc": False,
+ },
+ "type": "git",
+ "directory": (
+ b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+ b"\x01\x02\x03\x04\x05"
+ ),
+ "synthetic": False,
+ "metadata": None,
+ "parents": [],
+ "id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c",
+}
-def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
- kafka_prefix += ".swh.journal.objects"
+def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
producer = Producer(
{
"bootstrap.servers": kafka_server,
@@ -27,13 +49,9 @@
}
)
- rev = revisions().example()
-
# Fill Kafka
producer.produce(
- topic=kafka_prefix + ".revision",
- key=key_to_kafka(rev.id),
- value=value_to_kafka(rev.to_dict()),
+ topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
)
producer.flush()
@@ -46,12 +64,10 @@
worker_fn = MagicMock()
client.process(worker_fn)
- worker_fn.assert_called_once_with({"revision": [rev.to_dict()]})
+ worker_fn.assert_called_once_with({"revision": [REV]})
def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
- kafka_prefix += ".swh.journal.objects"
-
producer = Producer(
{
"bootstrap.servers": kafka_server,
@@ -60,13 +76,9 @@
}
)
- rev = revisions().example()
-
# Fill Kafka
producer.produce(
- topic=kafka_prefix + ".revision",
- key=key_to_kafka(rev.id),
- value=value_to_kafka(rev.to_dict()),
+ topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
)
producer.flush()
@@ -81,15 +93,13 @@
worker_fn = MagicMock()
client.process(worker_fn)
- worker_fn.assert_called_once_with({"revision": [rev.to_dict()]})
+ worker_fn.assert_called_once_with({"revision": [REV]})
@pytest.mark.parametrize("batch_size", [1, 5, 100])
def test_client_batch_size(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int,
):
- kafka_prefix += ".swh.journal.objects"
-
num_objects = 2 * batch_size + 1
assert num_objects < 256, "Too many objects, generation will fail"
@@ -138,10 +148,10 @@
@pytest.fixture()
-def kafka_producer(kafka_prefix: str, kafka_server: str):
+def kafka_producer(kafka_prefix: str, kafka_server_base: str):
producer = Producer(
{
- "bootstrap.servers": kafka_server,
+ "bootstrap.servers": kafka_server_base,
"client.id": "test producer",
"acks": "all",
}
@@ -163,10 +173,10 @@
def test_client_subscribe_all(
- kafka_producer: Producer, kafka_prefix: str, kafka_server: str
+ kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
client = JournalClient(
- brokers=[kafka_server],
+ brokers=[kafka_server_base],
group_id="whatever",
prefix=kafka_prefix,
stop_after_objects=2,
@@ -184,10 +194,10 @@
def test_client_subscribe_one_topic(
- kafka_producer: Producer, kafka_prefix: str, kafka_server: str
+ kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
client = JournalClient(
- brokers=[kafka_server],
+ brokers=[kafka_server_base],
group_id="whatever",
prefix=kafka_prefix,
stop_after_objects=1,
@@ -201,11 +211,11 @@
def test_client_subscribe_absent_topic(
- kafka_producer: Producer, kafka_prefix: str, kafka_server: str
+ kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
with pytest.raises(ValueError):
JournalClient(
- brokers=[kafka_server],
+ brokers=[kafka_server_base],
group_id="whatever",
prefix=kafka_prefix,
stop_after_objects=1,
@@ -214,18 +224,18 @@
def test_client_subscribe_absent_prefix(
- kafka_producer: Producer, kafka_prefix: str, kafka_server: str
+ kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
with pytest.raises(ValueError):
JournalClient(
- brokers=[kafka_server],
+ brokers=[kafka_server_base],
group_id="whatever",
prefix="wrong.prefix",
stop_after_objects=1,
)
with pytest.raises(ValueError):
JournalClient(
- brokers=[kafka_server],
+ brokers=[kafka_server_base],
group_id="whatever",
prefix="wrong.prefix",
stop_after_objects=1,
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -14,8 +14,6 @@
def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer):
- kafka_prefix += ".swh.journal.objects"
-
writer = KafkaJournalWriter(
brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix,
)
@@ -78,7 +76,6 @@
def produce(self, **kwargs):
produced.append(kwargs)
- kafka_prefix += ".swh.journal.objects"
writer = KafkaJournalWriter(
brokers=[kafka_server],
client_id="kafka_writer",
diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/tests/test_pytest_plugin.py
@@ -0,0 +1,51 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Iterator
+from confluent_kafka.admin import AdminClient
+
+
+def test_kafka_server(kafka_server_base: str):
+ ip, port_str = kafka_server_base.split(":")
+ assert ip == "127.0.0.1"
+ assert int(port_str)
+
+ admin = AdminClient({"bootstrap.servers": kafka_server_base})
+
+ topics = admin.list_topics()
+
+ assert len(topics.brokers) == 1
+
+
+def test_kafka_server_with_topics(
+ kafka_server: str, kafka_prefix: str, object_types: Iterator[str]
+):
+ admin = AdminClient({"bootstrap.servers": kafka_server})
+ topics = {
+ topic
+ for topic in admin.list_topics().topics
+ if topic.startswith(f"{kafka_prefix}.")
+ }
+ assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types}
+
+
+def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str):
+ assert test_config == {
+ "consumer_id": "swh.journal.consumer",
+ "stop_after_objects": 1,
+ "storage": {"cls": "memory", "args": {}},
+ "object_types": {
+ "content",
+ "directory",
+ "origin",
+ "origin_visit",
+ "release",
+ "revision",
+ "snapshot",
+ "skipped_content",
+ },
+ "brokers": [kafka_server_base],
+ "prefix": kafka_prefix,
+ }

File Metadata

Mime Type
text/plain
Expires
Mon, Nov 18, 9:43 PM (20 h, 26 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228200

Event Timeline