Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7163566
D3160.id11232.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Subscribers
None
D3160.id11232.diff
View Options
diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -165,32 +165,39 @@
self.consumer = Consumer(consumer_settings)
- existing_topics = self.consumer.list_topics(timeout=10).topics.keys()
- if not any(topic.startswith(f"{prefix}.") for topic in existing_topics):
+ privileged_prefix = f"{prefix}_privileged"
+ existing_topics = [
+ topic
+ for topic in self.consumer.list_topics(timeout=10).topics.keys()
+ if (
+ topic.startswith(f"{prefix}.")
+ or topic.startswith(f"{privileged_prefix}.")
+ )
+ ]
+ if not existing_topics:
raise ValueError(
f"The prefix {prefix} does not match any existing topic "
"on the kafka broker"
)
- if object_types:
- unknown_topics = []
- for object_type in object_types:
- topic = f"{prefix}.{object_type}"
- if topic not in existing_topics:
- unknown_topics.append(topic)
- if unknown_topics:
- raise ValueError(
- f"Topic(s) {','.join(unknown_topics)} "
- "are unknown on the kafka broker"
- )
- self.subscription = [
- f"{prefix}.{object_type}" for object_type in object_types
- ]
- else:
- # subscribe to every topic under the prefix
- self.subscription = [
- topic for topic in existing_topics if topic.startswith(prefix)
- ]
+ if not object_types:
+ object_types = list({topic.split(".")[-1] for topic in existing_topics})
+
+ self.subscription = []
+ unknown_types = []
+ for object_type in object_types:
+ topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}")
+ for topic in topics:
+ if topic in existing_topics:
+ self.subscription.append(topic)
+ break
+ else:
+ unknown_types.append(object_type)
+ if unknown_types:
+ raise ValueError(
+ f"Topic(s) for object types {','.join(unknown_types)} "
+ "are unknown on the kafka broker"
+ )
logger.debug(f"Upstream topics: {existing_topics}")
self.subscribe()
diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py
--- a/swh/journal/pytest_plugin.py
+++ b/swh/journal/pytest_plugin.py
@@ -48,7 +48,9 @@
fetched_messages += 1
topic = msg.topic()
- assert topic.startswith(kafka_prefix + "."), "Unexpected topic"
+ assert topic.startswith(kafka_prefix + ".") or topic.startswith(
+ kafka_prefix + "_privileged."
+ ), "Unexpected topic"
object_type = topic[len(kafka_prefix + ".") :]
consumed_messages[object_type].append(
@@ -100,14 +102,31 @@
return set(TEST_OBJECT_DICTS.keys())
+@pytest.fixture(scope="function")
+def privileged_object_types():
+ """Set of object types to precreate privileged topics for."""
+ return {"revision", "release"}
+
+
@pytest.fixture(scope="function")
def kafka_server(
- kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]
+ kafka_server_base: str,
+ kafka_prefix: str,
+ object_types: Iterator[str],
+ privileged_object_types: Iterator[str],
) -> str:
"""A kafka server with existing topics
- topics are built from the ``kafka_prefix`` and the ``object_types`` list"""
- topics = [f"{kafka_prefix}.{obj}" for obj in object_types]
+ Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type
+ from the ``object_types`` list.
+
+ Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with
+ object_type from the ``privileged_object_types`` list.
+
+ """
+ topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [
+ f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types
+ ]
# unfortunately, the Mock broker does not support the CreatTopic admin API, so we
# have to create topics using a Producer.
@@ -156,13 +175,19 @@
@pytest.fixture
-def test_config(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]):
+def test_config(
+ kafka_server_base: str,
+ kafka_prefix: str,
+ object_types: Iterator[str],
+ privileged_object_types: Iterator[str],
+):
"""Test configuration needed for producer/consumer
"""
return {
**TEST_CONFIG,
"object_types": object_types,
+ "privileged_object_types": privileged_object_types,
"brokers": [kafka_server_base],
"prefix": kafka_prefix,
}
@@ -170,7 +195,7 @@
@pytest.fixture
def consumer(
- kafka_server: str, test_config: Dict, kafka_consumer_group: str,
+ kafka_server: str, test_config: Dict, kafka_consumer_group: str
) -> Consumer:
"""Get a connected Kafka consumer.
@@ -183,12 +208,13 @@
"group.id": kafka_consumer_group,
}
)
-
+ prefix = test_config["prefix"]
kafka_topics = [
- "%s.%s" % (test_config["prefix"], object_type)
- for object_type in test_config["object_types"]
+ f"{prefix}.{object_type}" for object_type in test_config["object_types"]
+ ] + [
+ f"{prefix}_privileged.{object_type}"
+ for object_type in test_config["privileged_object_types"]
]
-
consumer.subscribe(kafka_topics)
yield consumer
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -3,6 +3,8 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from typing import Iterable
+
import pytest
from confluent_kafka import Consumer, Producer
@@ -13,7 +15,12 @@
from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError
-def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer):
+def test_kafka_writer(
+ kafka_prefix: str,
+ kafka_server: str,
+ consumer: Consumer,
+ privileged_object_types: Iterable[str],
+):
writer = KafkaJournalWriter(
brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix,
)
@@ -24,6 +31,10 @@
writer.write_additions(object_type, objects)
expected_messages += len(objects)
+ if object_type in privileged_object_types:
+ writer.write_additions(object_type, objects, privileged=True)
+ expected_messages += len(objects)
+
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages)
diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py
--- a/swh/journal/tests/test_pytest_plugin.py
+++ b/swh/journal/tests/test_pytest_plugin.py
@@ -20,9 +20,14 @@
def test_kafka_server_with_topics(
- kafka_server: str, kafka_prefix: str, object_types: Iterator[str]
+ kafka_server: str,
+ kafka_prefix: str,
+ object_types: Iterator[str],
+ privileged_object_types: Iterator[str],
):
admin = AdminClient({"bootstrap.servers": kafka_server})
+
+ # check unprivileged topics are present
topics = {
topic
for topic in admin.list_topics().topics
@@ -30,6 +35,16 @@
}
assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types}
+ # check privileged topics are present
+ topics = {
+ topic
+ for topic in admin.list_topics().topics
+ if topic.startswith(f"{kafka_prefix}_privileged.")
+ }
+ assert topics == {
+ f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types
+ }
+
def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str):
assert test_config == {
@@ -46,6 +61,7 @@
"snapshot",
"skipped_content",
},
+ "privileged_object_types": {"release", "revision",},
"brokers": [kafka_server_base],
"prefix": kafka_prefix,
}
diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py
--- a/swh/journal/writer/inmemory.py
+++ b/swh/journal/writer/inmemory.py
@@ -20,13 +20,21 @@
# Share the list of objects across processes, for RemoteAPI tests.
self.manager = Manager()
self.objects = self.manager.list()
+ self.privileged_objects = self.manager.list()
- def write_addition(self, object_type: str, object_: ModelObject) -> None:
+ def write_addition(
+ self, object_type: str, object_: ModelObject, privileged: bool = False
+ ) -> None:
assert isinstance(object_, BaseModel)
- self.objects.append((object_type, object_))
+ if privileged:
+ self.privileged_objects.append((object_type, object_))
+ else:
+ self.objects.append((object_type, object_))
write_update = write_addition
- def write_additions(self, object_type: str, objects: List[ModelObject]) -> None:
+ def write_additions(
+ self, object_type: str, objects: List[ModelObject], privileged: bool = False
+ ) -> None:
for object_ in objects:
- self.write_addition(object_type, object_)
+ self.write_addition(object_type, object_, privileged)
diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py
--- a/swh/journal/writer/kafka.py
+++ b/swh/journal/writer/kafka.py
@@ -85,6 +85,7 @@
producer_class: Type[Producer] = Producer,
):
self._prefix = prefix
+ self._prefix_privileged = f"{self._prefix}_privileged"
if not producer_config:
producer_config = {}
@@ -188,24 +189,34 @@
dict_.pop("data", None)
return dict_
- def _write_addition(self, object_type: str, object_: ModelObject) -> None:
+ def _write_addition(
+ self, object_type: str, object_: ModelObject, privileged: bool
+ ) -> None:
"""Write a single object to the journal"""
- topic = f"{self._prefix}.{object_type}"
+ if privileged:
+ topic = f"{self._prefix_privileged}.{object_type}"
+ else:
+ topic = f"{self._prefix}.{object_type}"
+
key = object_key(object_type, object_)
dict_ = self._sanitize_object(object_type, object_)
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_)
self.send(topic, key=key, value=dict_)
- def write_addition(self, object_type: str, object_: ModelObject) -> None:
+ def write_addition(
+ self, object_type: str, object_: ModelObject, privileged: bool = False
+ ) -> None:
"""Write a single object to the journal"""
- self._write_addition(object_type, object_)
+ self._write_addition(object_type, object_, privileged)
self.flush()
write_update = write_addition
- def write_additions(self, object_type: str, objects: Iterable[ModelObject]) -> None:
+ def write_additions(
+ self, object_type: str, objects: Iterable[ModelObject], privileged: bool = False
+ ) -> None:
"""Write a set of objects to the journal"""
for object_ in objects:
- self._write_addition(object_type, object_)
+ self._write_addition(object_type, object_, privileged)
self.flush()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 30, 10:20 AM (3 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221234
Attached To
D3160: Add support for a priviledged "channel" of topics for non-anonymized objects
Event Timeline
Log In to Comment