Page MenuHomeSoftware Heritage

D3160.id11232.diff
No OneTemporary

D3160.id11232.diff

diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -165,32 +165,39 @@
self.consumer = Consumer(consumer_settings)
- existing_topics = self.consumer.list_topics(timeout=10).topics.keys()
- if not any(topic.startswith(f"{prefix}.") for topic in existing_topics):
+ privileged_prefix = f"{prefix}_privileged"
+ existing_topics = [
+ topic
+ for topic in self.consumer.list_topics(timeout=10).topics.keys()
+ if (
+ topic.startswith(f"{prefix}.")
+ or topic.startswith(f"{privileged_prefix}.")
+ )
+ ]
+ if not existing_topics:
raise ValueError(
f"The prefix {prefix} does not match any existing topic "
"on the kafka broker"
)
- if object_types:
- unknown_topics = []
- for object_type in object_types:
- topic = f"{prefix}.{object_type}"
- if topic not in existing_topics:
- unknown_topics.append(topic)
- if unknown_topics:
- raise ValueError(
- f"Topic(s) {','.join(unknown_topics)} "
- "are unknown on the kafka broker"
- )
- self.subscription = [
- f"{prefix}.{object_type}" for object_type in object_types
- ]
- else:
- # subscribe to every topic under the prefix
- self.subscription = [
- topic for topic in existing_topics if topic.startswith(prefix)
- ]
+ if not object_types:
+ object_types = list({topic.split(".")[-1] for topic in existing_topics})
+
+ self.subscription = []
+ unknown_types = []
+ for object_type in object_types:
+ topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}")
+ for topic in topics:
+ if topic in existing_topics:
+ self.subscription.append(topic)
+ break
+ else:
+ unknown_types.append(object_type)
+ if unknown_types:
+ raise ValueError(
+ f"Topic(s) for object types {','.join(unknown_types)} "
+ "are unknown on the kafka broker"
+ )
logger.debug(f"Upstream topics: {existing_topics}")
self.subscribe()
diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py
--- a/swh/journal/pytest_plugin.py
+++ b/swh/journal/pytest_plugin.py
@@ -48,7 +48,9 @@
fetched_messages += 1
topic = msg.topic()
- assert topic.startswith(kafka_prefix + "."), "Unexpected topic"
+ assert topic.startswith(kafka_prefix + ".") or topic.startswith(
+ kafka_prefix + "_privileged."
+ ), "Unexpected topic"
object_type = topic[len(kafka_prefix + ".") :]
consumed_messages[object_type].append(
@@ -100,14 +102,31 @@
return set(TEST_OBJECT_DICTS.keys())
+@pytest.fixture(scope="function")
+def privileged_object_types():
+ """Set of object types to precreate privileged topics for."""
+ return {"revision", "release"}
+
+
@pytest.fixture(scope="function")
def kafka_server(
- kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]
+ kafka_server_base: str,
+ kafka_prefix: str,
+ object_types: Iterator[str],
+ privileged_object_types: Iterator[str],
) -> str:
"""A kafka server with existing topics
- topics are built from the ``kafka_prefix`` and the ``object_types`` list"""
- topics = [f"{kafka_prefix}.{obj}" for obj in object_types]
+ Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type
+ from the ``object_types`` list.
+
+ Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with
+ object_type from the ``privileged_object_types`` list.
+
+ """
+ topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [
+ f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types
+ ]
# unfortunately, the Mock broker does not support the CreatTopic admin API, so we
# have to create topics using a Producer.
@@ -156,13 +175,19 @@
@pytest.fixture
-def test_config(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]):
+def test_config(
+ kafka_server_base: str,
+ kafka_prefix: str,
+ object_types: Iterator[str],
+ privileged_object_types: Iterator[str],
+):
"""Test configuration needed for producer/consumer
"""
return {
**TEST_CONFIG,
"object_types": object_types,
+ "privileged_object_types": privileged_object_types,
"brokers": [kafka_server_base],
"prefix": kafka_prefix,
}
@@ -170,7 +195,7 @@
@pytest.fixture
def consumer(
- kafka_server: str, test_config: Dict, kafka_consumer_group: str,
+ kafka_server: str, test_config: Dict, kafka_consumer_group: str
) -> Consumer:
"""Get a connected Kafka consumer.
@@ -183,12 +208,13 @@
"group.id": kafka_consumer_group,
}
)
-
+ prefix = test_config["prefix"]
kafka_topics = [
- "%s.%s" % (test_config["prefix"], object_type)
- for object_type in test_config["object_types"]
+ f"{prefix}.{object_type}" for object_type in test_config["object_types"]
+ ] + [
+ f"{prefix}_privileged.{object_type}"
+ for object_type in test_config["privileged_object_types"]
]
-
consumer.subscribe(kafka_topics)
yield consumer
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -3,6 +3,8 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from typing import Iterable
+
import pytest
from confluent_kafka import Consumer, Producer
@@ -13,7 +15,12 @@
from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError
-def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer):
+def test_kafka_writer(
+ kafka_prefix: str,
+ kafka_server: str,
+ consumer: Consumer,
+ privileged_object_types: Iterable[str],
+):
writer = KafkaJournalWriter(
brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix,
)
@@ -24,6 +31,10 @@
writer.write_additions(object_type, objects)
expected_messages += len(objects)
+ if object_type in privileged_object_types:
+ writer.write_additions(object_type, objects, privileged=True)
+ expected_messages += len(objects)
+
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages)
diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py
--- a/swh/journal/tests/test_pytest_plugin.py
+++ b/swh/journal/tests/test_pytest_plugin.py
@@ -20,9 +20,14 @@
def test_kafka_server_with_topics(
- kafka_server: str, kafka_prefix: str, object_types: Iterator[str]
+ kafka_server: str,
+ kafka_prefix: str,
+ object_types: Iterator[str],
+ privileged_object_types: Iterator[str],
):
admin = AdminClient({"bootstrap.servers": kafka_server})
+
+ # check unprivileged topics are present
topics = {
topic
for topic in admin.list_topics().topics
@@ -30,6 +35,16 @@
}
assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types}
+ # check privileged topics are present
+ topics = {
+ topic
+ for topic in admin.list_topics().topics
+ if topic.startswith(f"{kafka_prefix}_privileged.")
+ }
+ assert topics == {
+ f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types
+ }
+
def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str):
assert test_config == {
@@ -46,6 +61,7 @@
"snapshot",
"skipped_content",
},
+ "privileged_object_types": {"release", "revision",},
"brokers": [kafka_server_base],
"prefix": kafka_prefix,
}
diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py
--- a/swh/journal/writer/inmemory.py
+++ b/swh/journal/writer/inmemory.py
@@ -20,13 +20,21 @@
# Share the list of objects across processes, for RemoteAPI tests.
self.manager = Manager()
self.objects = self.manager.list()
+ self.privileged_objects = self.manager.list()
- def write_addition(self, object_type: str, object_: ModelObject) -> None:
+ def write_addition(
+ self, object_type: str, object_: ModelObject, privileged: bool = False
+ ) -> None:
assert isinstance(object_, BaseModel)
- self.objects.append((object_type, object_))
+ if privileged:
+ self.privileged_objects.append((object_type, object_))
+ else:
+ self.objects.append((object_type, object_))
write_update = write_addition
- def write_additions(self, object_type: str, objects: List[ModelObject]) -> None:
+ def write_additions(
+ self, object_type: str, objects: List[ModelObject], privileged: bool = False
+ ) -> None:
for object_ in objects:
- self.write_addition(object_type, object_)
+ self.write_addition(object_type, object_, privileged)
diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py
--- a/swh/journal/writer/kafka.py
+++ b/swh/journal/writer/kafka.py
@@ -85,6 +85,7 @@
producer_class: Type[Producer] = Producer,
):
self._prefix = prefix
+ self._prefix_privileged = f"{self._prefix}_privileged"
if not producer_config:
producer_config = {}
@@ -188,24 +189,34 @@
dict_.pop("data", None)
return dict_
- def _write_addition(self, object_type: str, object_: ModelObject) -> None:
+ def _write_addition(
+ self, object_type: str, object_: ModelObject, privileged: bool
+ ) -> None:
"""Write a single object to the journal"""
- topic = f"{self._prefix}.{object_type}"
+ if privileged:
+ topic = f"{self._prefix_privileged}.{object_type}"
+ else:
+ topic = f"{self._prefix}.{object_type}"
+
key = object_key(object_type, object_)
dict_ = self._sanitize_object(object_type, object_)
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_)
self.send(topic, key=key, value=dict_)
- def write_addition(self, object_type: str, object_: ModelObject) -> None:
+ def write_addition(
+ self, object_type: str, object_: ModelObject, privileged: bool = False
+ ) -> None:
"""Write a single object to the journal"""
- self._write_addition(object_type, object_)
+ self._write_addition(object_type, object_, privileged)
self.flush()
write_update = write_addition
- def write_additions(self, object_type: str, objects: Iterable[ModelObject]) -> None:
+ def write_additions(
+ self, object_type: str, objects: Iterable[ModelObject], privileged: bool = False
+ ) -> None:
"""Write a set of objects to the journal"""
for object_ in objects:
- self._write_addition(object_type, object_)
+ self._write_addition(object_type, object_, privileged)
self.flush()

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 10:20 AM (3 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221234

Event Timeline