Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7085377
D3130.id11120.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
D3130.id11120.diff
View Options
diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -270,7 +270,9 @@
else:
_error_cb(error)
continue
-
+ if message.value() is None:
+ # ignore message with no payload, these can be generated in tests
+ continue
nb_processed += 1
object_type = message.topic().split(".")[-1]
objects[object_type].append(self.deserialize_message(message))
diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py
--- a/swh/journal/pytest_plugin.py
+++ b/swh/journal/pytest_plugin.py
@@ -11,7 +11,8 @@
import pytest
-from confluent_kafka import Consumer, Producer, KafkaException
+from confluent_kafka import Consumer, KafkaException, Producer
+from confluent_kafka.admin import AdminClient
from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value
from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS
@@ -27,7 +28,10 @@
while fetched_messages < expected_messages:
if retries_left == 0:
- raise ValueError("Timed out fetching messages from kafka")
+ raise ValueError(
+ "Timed out fetching messages from kafka. "
+ f"Only {fetched_messages}/{expected_messages} fetched"
+ )
msg = consumer.poll(timeout=0.01)
@@ -59,6 +63,9 @@
for object_type, known_values in TEST_OBJECT_DICTS.items():
known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]]
+ if not consumed_messages[object_type]:
+ return
+
(received_keys, received_values) = zip(*consumed_messages[object_type])
if object_type == "origin_visit":
@@ -87,41 +94,77 @@
return "test-consumer-%s" % kafka_prefix
+@pytest.fixture(scope="function")
+def object_types():
+ """Set of object types to precreate topics for."""
+ return set(TEST_OBJECT_DICTS.keys())
+
+
+@pytest.fixture(scope="function")
+def kafka_server(
+ kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]
+) -> str:
+ """A kafka server with existing topics
+
+ topics are built from the ``kafka_prefix`` and the ``object_types`` list"""
+ topics = [f"{kafka_prefix}.{obj}" for obj in object_types]
+
+ # unfortunately, the Mock broker does not support the CreatTopic admin API, so we
+ # have to create topics using a Producer.
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server_base,
+ "client.id": "bootstrap producer",
+ "acks": "all",
+ }
+ )
+ for topic in topics:
+ producer.produce(topic=topic, value=None)
+ for i in range(10):
+ if producer.flush(0.1) == 0:
+ break
+
+ return kafka_server_base
+
+
@pytest.fixture(scope="session")
-def kafka_server() -> Iterator[str]:
- p = Producer({"test.mock.num.brokers": "1"})
+def kafka_server_base() -> Iterator[str]:
+ """Create a mock kafka cluster suitable for tests.
+
+ Yield a connection string.
+
+ Note: this is a generator to keep the mock broker alive during the whole test
+ session.
- metadata = p.list_topics()
+ see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h
+ """
+ admin = AdminClient({"test.mock.num.brokers": "1"})
+
+ metadata = admin.list_topics()
brokers = [str(broker) for broker in metadata.brokers.values()]
assert len(brokers) == 1, "More than one broker found in the kafka cluster?!"
broker_connstr, broker_id = brokers[0].split("/")
- ip, port_str = broker_connstr.split(":")
- assert ip == "127.0.0.1"
- assert int(port_str)
-
yield broker_connstr
- p.flush()
-
TEST_CONFIG = {
"consumer_id": "swh.journal.consumer",
- "object_types": TEST_OBJECT_DICTS.keys(),
"stop_after_objects": 1, # will read 1 object and stop
"storage": {"cls": "memory", "args": {}},
}
@pytest.fixture
-def test_config(kafka_server: str, kafka_prefix: str):
+def test_config(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]):
"""Test configuration needed for producer/consumer
"""
return {
**TEST_CONFIG,
- "brokers": [kafka_server],
- "prefix": kafka_prefix + ".swh.journal.objects",
+ "object_types": object_types,
+ "brokers": [kafka_server_base],
+ "prefix": kafka_prefix,
}
diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py
--- a/swh/journal/tests/journal_data.py
+++ b/swh/journal/tests/journal_data.py
@@ -125,10 +125,13 @@
TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = {
"content": CONTENTS,
- "revision": REVISIONS,
- "release": RELEASES,
+ "directory": [],
"origin": ORIGINS,
"origin_visit": ORIGIN_VISITS,
+ "release": RELEASES,
+ "revision": REVISIONS,
+ "snapshot": [],
+ "skipped_content": [],
}
MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()}
diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py
--- a/swh/journal/tests/test_client.py
+++ b/swh/journal/tests/test_client.py
@@ -9,16 +9,38 @@
from confluent_kafka import Producer
import pytest
-from swh.model.hypothesis_strategies import revisions
from swh.model.model import Content
from swh.journal.client import JournalClient
from swh.journal.serializers import key_to_kafka, value_to_kafka
+REV = {
+ "message": b"something cool",
+ "author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"},
+ "committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None},
+ "date": {
+ "timestamp": {"seconds": 123456789, "microseconds": 123},
+ "offset": 120,
+ "negative_utc": False,
+ },
+ "committer_date": {
+ "timestamp": {"seconds": 123123456, "microseconds": 0},
+ "offset": 0,
+ "negative_utc": False,
+ },
+ "type": "git",
+ "directory": (
+ b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+ b"\x01\x02\x03\x04\x05"
+ ),
+ "synthetic": False,
+ "metadata": None,
+ "parents": [],
+ "id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c",
+}
-def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
- kafka_prefix += ".swh.journal.objects"
+def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
producer = Producer(
{
"bootstrap.servers": kafka_server,
@@ -27,13 +49,9 @@
}
)
- rev = revisions().example()
-
# Fill Kafka
producer.produce(
- topic=kafka_prefix + ".revision",
- key=key_to_kafka(rev.id),
- value=value_to_kafka(rev.to_dict()),
+ topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
)
producer.flush()
@@ -46,12 +64,10 @@
worker_fn = MagicMock()
client.process(worker_fn)
- worker_fn.assert_called_once_with({"revision": [rev.to_dict()]})
+ worker_fn.assert_called_once_with({"revision": [REV]})
def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
- kafka_prefix += ".swh.journal.objects"
-
producer = Producer(
{
"bootstrap.servers": kafka_server,
@@ -60,13 +76,9 @@
}
)
- rev = revisions().example()
-
# Fill Kafka
producer.produce(
- topic=kafka_prefix + ".revision",
- key=key_to_kafka(rev.id),
- value=value_to_kafka(rev.to_dict()),
+ topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
)
producer.flush()
@@ -81,15 +93,13 @@
worker_fn = MagicMock()
client.process(worker_fn)
- worker_fn.assert_called_once_with({"revision": [rev.to_dict()]})
+ worker_fn.assert_called_once_with({"revision": [REV]})
@pytest.mark.parametrize("batch_size", [1, 5, 100])
def test_client_batch_size(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int,
):
- kafka_prefix += ".swh.journal.objects"
-
num_objects = 2 * batch_size + 1
assert num_objects < 256, "Too many objects, generation will fail"
@@ -138,10 +148,10 @@
@pytest.fixture()
-def kafka_producer(kafka_prefix: str, kafka_server: str):
+def kafka_producer(kafka_prefix: str, kafka_server_base: str):
producer = Producer(
{
- "bootstrap.servers": kafka_server,
+ "bootstrap.servers": kafka_server_base,
"client.id": "test producer",
"acks": "all",
}
@@ -163,10 +173,10 @@
def test_client_subscribe_all(
- kafka_producer: Producer, kafka_prefix: str, kafka_server: str
+ kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
client = JournalClient(
- brokers=[kafka_server],
+ brokers=[kafka_server_base],
group_id="whatever",
prefix=kafka_prefix,
stop_after_objects=2,
@@ -184,10 +194,10 @@
def test_client_subscribe_one_topic(
- kafka_producer: Producer, kafka_prefix: str, kafka_server: str
+ kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
client = JournalClient(
- brokers=[kafka_server],
+ brokers=[kafka_server_base],
group_id="whatever",
prefix=kafka_prefix,
stop_after_objects=1,
@@ -201,11 +211,11 @@
def test_client_subscribe_absent_topic(
- kafka_producer: Producer, kafka_prefix: str, kafka_server: str
+ kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
with pytest.raises(ValueError):
JournalClient(
- brokers=[kafka_server],
+ brokers=[kafka_server_base],
group_id="whatever",
prefix=kafka_prefix,
stop_after_objects=1,
@@ -214,18 +224,18 @@
def test_client_subscribe_absent_prefix(
- kafka_producer: Producer, kafka_prefix: str, kafka_server: str
+ kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
with pytest.raises(ValueError):
JournalClient(
- brokers=[kafka_server],
+ brokers=[kafka_server_base],
group_id="whatever",
prefix="wrong.prefix",
stop_after_objects=1,
)
with pytest.raises(ValueError):
JournalClient(
- brokers=[kafka_server],
+ brokers=[kafka_server_base],
group_id="whatever",
prefix="wrong.prefix",
stop_after_objects=1,
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -14,8 +14,6 @@
def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer):
- kafka_prefix += ".swh.journal.objects"
-
writer = KafkaJournalWriter(
brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix,
)
@@ -78,7 +76,6 @@
def produce(self, **kwargs):
produced.append(kwargs)
- kafka_prefix += ".swh.journal.objects"
writer = KafkaJournalWriter(
brokers=[kafka_server],
client_id="kafka_writer",
diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/tests/test_pytest_plugin.py
@@ -0,0 +1,51 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Iterator
+from confluent_kafka.admin import AdminClient
+
+
+def test_kafka_server(kafka_server_base: str):
+ ip, port_str = kafka_server_base.split(":")
+ assert ip == "127.0.0.1"
+ assert int(port_str)
+
+ admin = AdminClient({"bootstrap.servers": kafka_server_base})
+
+ topics = admin.list_topics()
+
+ assert len(topics.brokers) == 1
+
+
+def test_kafka_server_with_topics(
+ kafka_server: str, kafka_prefix: str, object_types: Iterator[str]
+):
+ admin = AdminClient({"bootstrap.servers": kafka_server})
+ topics = {
+ topic
+ for topic in admin.list_topics().topics
+ if topic.startswith(f"{kafka_prefix}.")
+ }
+ assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types}
+
+
+def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str):
+ assert test_config == {
+ "consumer_id": "swh.journal.consumer",
+ "stop_after_objects": 1,
+ "storage": {"cls": "memory", "args": {}},
+ "object_types": {
+ "content",
+ "directory",
+ "origin",
+ "origin_visit",
+ "release",
+ "revision",
+ "snapshot",
+ "skipped_content",
+ },
+ "brokers": [kafka_server_base],
+ "prefix": kafka_prefix,
+ }
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mon, Nov 18, 9:43 PM (20 h, 26 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228200
Attached To
D3130: Refactor the pytest_plugin
Event Timeline
Log In to Comment