diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -165,32 +165,39 @@ self.consumer = Consumer(consumer_settings) - existing_topics = self.consumer.list_topics(timeout=10).topics.keys() - if not any(topic.startswith(f"{prefix}.") for topic in existing_topics): + privileged_prefix = f"{prefix}_privileged" + existing_topics = [ + topic + for topic in self.consumer.list_topics(timeout=10).topics.keys() + if ( + topic.startswith(f"{prefix}.") + or topic.startswith(f"{privileged_prefix}.") + ) + ] + if not existing_topics: raise ValueError( f"The prefix {prefix} does not match any existing topic " "on the kafka broker" ) - if object_types: - unknown_topics = [] - for object_type in object_types: - topic = f"{prefix}.{object_type}" - if topic not in existing_topics: - unknown_topics.append(topic) - if unknown_topics: - raise ValueError( - f"Topic(s) {','.join(unknown_topics)} " - "are unknown on the kafka broker" - ) - self.subscription = [ - f"{prefix}.{object_type}" for object_type in object_types - ] - else: - # subscribe to every topic under the prefix - self.subscription = [ - topic for topic in existing_topics if topic.startswith(prefix) - ] + if not object_types: + object_types = list({topic.split(".")[-1] for topic in existing_topics}) + + self.subscription = [] + unknown_types = [] + for object_type in object_types: + topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}") + for topic in topics: + if topic in existing_topics: + self.subscription.append(topic) + break + else: + unknown_types.append(object_type) + if unknown_types: + raise ValueError( + f"Topic(s) for object types {','.join(unknown_types)} " + "are unknown on the kafka broker" + ) logger.debug(f"Upstream topics: {existing_topics}") self.subscribe() diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -48,7 +48,9 @@ fetched_messages += 1 topic = msg.topic() - assert topic.startswith(kafka_prefix + "."), "Unexpected topic" + assert topic.startswith(kafka_prefix + ".") or topic.startswith( + kafka_prefix + "_privileged." + ), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( @@ -100,14 +102,31 @@ return set(TEST_OBJECT_DICTS.keys()) +@pytest.fixture(scope="function") +def privileged_object_types(): + """Set of object types to precreate privileged topics for.""" + return {"revision", "release"} + + @pytest.fixture(scope="function") def kafka_server( - kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str] + kafka_server_base: str, + kafka_prefix: str, + object_types: Iterator[str], + privileged_object_types: Iterator[str], ) -> str: """A kafka server with existing topics - topics are built from the ``kafka_prefix`` and the ``object_types`` list""" - topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type + from the ``object_types`` list. + + Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with + object_type from the ``privileged_object_types`` list. + + """ + topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [ + f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types + ] # unfortunately, the Mock broker does not support the CreatTopic admin API, so we # have to create topics using a Producer. @@ -156,13 +175,19 @@ @pytest.fixture -def test_config(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]): +def test_config( + kafka_server_base: str, + kafka_prefix: str, + object_types: Iterator[str], + privileged_object_types: Iterator[str], +): """Test configuration needed for producer/consumer """ return { **TEST_CONFIG, "object_types": object_types, + "privileged_object_types": privileged_object_types, "brokers": [kafka_server_base], "prefix": kafka_prefix, } @@ -170,7 +195,7 @@ @pytest.fixture def consumer( - kafka_server: str, test_config: Dict, kafka_consumer_group: str, + kafka_server: str, test_config: Dict, kafka_consumer_group: str ) -> Consumer: """Get a connected Kafka consumer. @@ -183,12 +208,13 @@ "group.id": kafka_consumer_group, } ) - + prefix = test_config["prefix"] kafka_topics = [ - "%s.%s" % (test_config["prefix"], object_type) - for object_type in test_config["object_types"] + f"{prefix}.{object_type}" for object_type in test_config["object_types"] + ] + [ + f"{prefix}_privileged.{object_type}" + for object_type in test_config["privileged_object_types"] ] - consumer.subscribe(kafka_topics) yield consumer diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -3,6 +3,8 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Iterable + import pytest from confluent_kafka import Consumer, Producer @@ -13,7 +15,12 @@ from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError -def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): +def test_kafka_writer( + kafka_prefix: str, + kafka_server: str, + consumer: Consumer, + privileged_object_types: Iterable[str], +): writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) @@ -24,6 +31,10 @@ writer.write_additions(object_type, objects) expected_messages += len(objects) + if object_type in privileged_object_types: + writer.write_additions(object_type, objects, privileged=True) + expected_messages += len(objects) + consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py --- a/swh/journal/tests/test_pytest_plugin.py +++ b/swh/journal/tests/test_pytest_plugin.py @@ -20,9 +20,14 @@ def test_kafka_server_with_topics( - kafka_server: str, kafka_prefix: str, object_types: Iterator[str] + kafka_server: str, + kafka_prefix: str, + object_types: Iterator[str], + privileged_object_types: Iterator[str], ): admin = AdminClient({"bootstrap.servers": kafka_server}) + + # check unprivileged topics are present topics = { topic for topic in admin.list_topics().topics @@ -30,6 +35,16 @@ } assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types} + # check privileged topics are present + topics = { + topic + for topic in admin.list_topics().topics + if topic.startswith(f"{kafka_prefix}_privileged.") + } + assert topics == { + f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types + } + def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): assert test_config == { @@ -46,6 +61,7 @@ "snapshot", "skipped_content", }, + "privileged_object_types": {"release", "revision",}, "brokers": [kafka_server_base], "prefix": kafka_prefix, } diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py --- a/swh/journal/writer/inmemory.py +++ b/swh/journal/writer/inmemory.py @@ -20,13 +20,21 @@ # Share the list of objects across processes, for RemoteAPI tests. self.manager = Manager() self.objects = self.manager.list() + self.privileged_objects = self.manager.list() - def write_addition(self, object_type: str, object_: ModelObject) -> None: + def write_addition( + self, object_type: str, object_: ModelObject, privileged: bool = False + ) -> None: assert isinstance(object_, BaseModel) - self.objects.append((object_type, object_)) + if privileged: + self.privileged_objects.append((object_type, object_)) + else: + self.objects.append((object_type, object_)) write_update = write_addition - def write_additions(self, object_type: str, objects: List[ModelObject]) -> None: + def write_additions( + self, object_type: str, objects: List[ModelObject], privileged: bool = False + ) -> None: for object_ in objects: - self.write_addition(object_type, object_) + self.write_addition(object_type, object_, privileged) diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -85,6 +85,7 @@ producer_class: Type[Producer] = Producer, ): self._prefix = prefix + self._prefix_privileged = f"{self._prefix}_privileged" if not producer_config: producer_config = {} @@ -188,24 +189,34 @@ dict_.pop("data", None) return dict_ - def _write_addition(self, object_type: str, object_: ModelObject) -> None: + def _write_addition( + self, object_type: str, object_: ModelObject, privileged: bool + ) -> None: """Write a single object to the journal""" - topic = f"{self._prefix}.{object_type}" + if privileged: + topic = f"{self._prefix_privileged}.{object_type}" + else: + topic = f"{self._prefix}.{object_type}" + key = object_key(object_type, object_) dict_ = self._sanitize_object(object_type, object_) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) - def write_addition(self, object_type: str, object_: ModelObject) -> None: + def write_addition( + self, object_type: str, object_: ModelObject, privileged: bool = False + ) -> None: """Write a single object to the journal""" - self._write_addition(object_type, object_) + self._write_addition(object_type, object_, privileged) self.flush() write_update = write_addition - def write_additions(self, object_type: str, objects: Iterable[ModelObject]) -> None: + def write_additions( + self, object_type: str, objects: Iterable[ModelObject], privileged: bool = False + ) -> None: """Write a set of objects to the journal""" for object_ in objects: - self._write_addition(object_type, object_) + self._write_addition(object_type, object_, privileged) self.flush()