diff --git a/swh/journal/client.py b/swh/journal/client.py index 3955b0d..039025d 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,293 +1,295 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging import os import time from typing import Any, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaException, KafkaError from .serializers import kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ["earliest", "latest"] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] def get_journal_client(cls: str, **kwargs: Any): """Factory function to instantiate a journal client object. Currently, only the "kafka" journal client is supported. """ if cls == "kafka": return JournalClient(**kwargs) raise ValueError("Unknown journal client class `%s`" % cls) def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: logger.debug("Received non-fatal kafka error: %s", error) else: logger.info("Received non-fatal kafka error: %s", error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the `prefix` argument is None (default value), it will take the default value `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all existing kafka topic under the prefix). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `worker_fn` callback passed to the `process` method, in batches of maximum `batch_size` messages (defaults to 200). If set, the processing stops after processing `stop_after_objects` messages in total. `stop_on_eof` stops the processing when the client has reached the end of each partition in turn. `auto_offset_reset` sets the behavior of the client when the consumer group initializes: `'earliest'` (the default) processes all objects since the inception of the topics; `''` Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers: Union[str, List[str]], group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, stop_after_objects: Optional[int] = None, batch_size: int = 200, process_timeout: Optional[float] = None, auto_offset_reset: str = "earliest", stop_on_eof: bool = False, **kwargs, ): if prefix is None: prefix = DEFAULT_PREFIX if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( "Option 'auto_offset_reset' only accept %s, not %s" % (ACCEPTED_OFFSET_RESET, auto_offset_reset) ) if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") self.value_deserializer = kafka_to_value if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and "debug" not in kwargs: kwargs["debug"] = "consumer" # Static group instance id management group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID") if group_instance_id: kwargs["group.instance.id"] = group_instance_id if "group.instance.id" in kwargs: # When doing static consumer group membership, set a higher default # session timeout. The session timeout is the duration after which # the broker considers that a consumer has left the consumer group # for good, and triggers a rebalance. Considering our current # processing pattern, 10 minutes gives the consumer ample time to # restart before that happens. if "session.timeout.ms" not in kwargs: kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes if "session.timeout.ms" in kwargs: # When the session timeout is set, rdkafka requires the max poll # interval to be set to a higher value; the max poll interval is # rdkafka's way of figuring out whether the client's message # processing thread has stalled: when the max poll interval lapses # between two calls to consumer.poll(), rdkafka leaves the consumer # group and terminates the connection to the brokers. # # We default to 1.5 times the session timeout if "max.poll.interval.ms" not in kwargs: kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3 consumer_settings = { **kwargs, "bootstrap.servers": ",".join(brokers), "auto.offset.reset": auto_offset_reset, "group.id": group_id, "on_commit": _on_commit, "error_cb": _error_cb, "enable.auto.commit": False, "logger": rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: consumer_settings["enable.partition.eof"] = True logger.debug("Consumer settings: %s", consumer_settings) self.consumer = Consumer(consumer_settings) existing_topics = self.consumer.list_topics(timeout=10).topics.keys() if not any(topic.startswith(f"{prefix}.") for topic in existing_topics): raise ValueError( f"The prefix {prefix} does not match any existing topic " "on the kafka broker" ) if object_types: unknown_topics = [] for object_type in object_types: topic = f"{prefix}.{object_type}" if topic not in existing_topics: unknown_topics.append(topic) if unknown_topics: raise ValueError( f"Topic(s) {','.join(unknown_topics)} " "are unknown on the kafka broker" ) self.subscription = [ f"{prefix}.{object_type}" for object_type in object_types ] else: # subscribe to every topic under the prefix self.subscription = [ topic for topic in existing_topics if topic.startswith(prefix) ] logger.debug(f"Upstream topics: {existing_topics}") self.subscribe() self.stop_after_objects = stop_after_objects self.process_timeout = process_timeout self.eof_reached: Set[Tuple[str, str]] = set() self.batch_size = batch_size def subscribe(self): """Subscribe to topics listed in self.subscription This can be overridden if you need, for instance, to manually assign partitions. """ logger.debug(f"Subscribing to: {self.subscription}") self.consumer.subscribe(topics=self.subscription) def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ start_time = time.monotonic() total_objects_processed = 0 while True: # timeout for message poll timeout = 1.0 elapsed = time.monotonic() - start_time if self.process_timeout: # +0.01 to prevent busy-waiting on / spamming consumer.poll. # consumer.consume() returns shortly before X expired # (a matter of milliseconds), so after it returns a first # time, it would then be called with a timeout in the order # of milliseconds, therefore returning immediately, then be # called again, etc. if elapsed + 0.01 >= self.process_timeout: break timeout = self.process_timeout - elapsed batch_size = self.batch_size if self.stop_after_objects: if total_objects_processed >= self.stop_after_objects: break # clamp batch size to avoid overrunning stop_after_objects batch_size = min( self.stop_after_objects - total_objects_processed, batch_size, ) messages = self.consumer.consume(timeout=timeout, num_messages=batch_size) if not messages: continue batch_processed, at_eof = self.handle_messages(messages, worker_fn) total_objects_processed += batch_processed if at_eof: break return total_objects_processed def handle_messages(self, messages, worker_fn): objects: Dict[str, List[Any]] = defaultdict(list) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: self.eof_reached.add((message.topic(), message.partition())) else: _error_cb(error) continue - + if message.value() is None: + # ignore message with no payload, these can be generated in tests + continue nb_processed += 1 object_type = message.topic().split(".")[-1] objects[object_type].append(self.deserialize_message(message)) if objects: worker_fn(dict(objects)) self.consumer.commit() at_eof = self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() ) return nb_processed, at_eof def deserialize_message(self, message): return self.value_deserializer(message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py index f6e87b6..2258df4 100644 --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -1,159 +1,196 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random import string from typing import Dict, Iterator from collections import defaultdict import pytest -from confluent_kafka import Consumer, Producer, KafkaException +from confluent_kafka import Consumer, KafkaException, Producer +from confluent_kafka.admin import AdminClient from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError( "Timed out fetching messages from kafka. " f"Only {fetched_messages}/{expected_messages} fetched" ) msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() assert topic.startswith(kafka_prefix + "."), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages def assert_all_objects_consumed(consumed_messages): """Check whether all objects from TEST_OBJECT_DICTS have been consumed""" for object_type, known_values in TEST_OBJECT_DICTS.items(): known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] if not consumed_messages[object_type]: return (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type == "origin_visit": for value in received_values: del value["visit"] elif object_type == "content": for value in received_values: del value["ctime"] for key in known_keys: assert key in received_keys for value in known_values: assert value in received_values @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix +@pytest.fixture(scope="function") +def object_types(): + """Set of object types to precreate topics for.""" + return set(TEST_OBJECT_DICTS.keys()) + + +@pytest.fixture(scope="function") +def kafka_server( + kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str] +) -> str: + """A kafka server with existing topics + + topics are built from the ``kafka_prefix`` and the ``object_types`` list""" + topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + + # unfortunately, the Mock broker does not support the CreatTopic admin API, so we + # have to create topics using a Producer. + producer = Producer( + { + "bootstrap.servers": kafka_server_base, + "client.id": "bootstrap producer", + "acks": "all", + } + ) + for topic in topics: + producer.produce(topic=topic, value=None) + for i in range(10): + if producer.flush(0.1) == 0: + break + + return kafka_server_base + + @pytest.fixture(scope="session") -def kafka_server() -> Iterator[str]: - p = Producer({"test.mock.num.brokers": "1"}) +def kafka_server_base() -> Iterator[str]: + """Create a mock kafka cluster suitable for tests. + + Yield a connection string. + + Note: this is a generator to keep the mock broker alive during the whole test + session. - metadata = p.list_topics() + see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h + """ + admin = AdminClient({"test.mock.num.brokers": "1"}) + + metadata = admin.list_topics() brokers = [str(broker) for broker in metadata.brokers.values()] assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" broker_connstr, broker_id = brokers[0].split("/") - ip, port_str = broker_connstr.split(":") - assert ip == "127.0.0.1" - assert int(port_str) - yield broker_connstr - p.flush() - TEST_CONFIG = { "consumer_id": "swh.journal.consumer", "object_types": TEST_OBJECT_DICTS.keys(), "stop_after_objects": 1, # will read 1 object and stop "storage": {"cls": "memory", "args": {}}, } @pytest.fixture -def test_config(kafka_server: str, kafka_prefix: str): +def test_config(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]): """Test configuration needed for producer/consumer """ return { **TEST_CONFIG, - "brokers": [kafka_server], "prefix": kafka_prefix + ".swh.journal.objects", + "brokers": [kafka_server_base], } @pytest.fixture def consumer( kafka_server: str, test_config: Dict, kafka_consumer_group: str, ) -> Consumer: """Get a connected Kafka consumer. """ consumer = Consumer( { "bootstrap.servers": kafka_server, "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) kafka_topics = [ "%s.%s" % (test_config["prefix"], object_type) for object_type in test_config["object_types"] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py index 12d0c64..02bfd0e 100644 --- a/swh/journal/tests/journal_data.py +++ b/swh/journal/tests/journal_data.py @@ -1,150 +1,153 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from typing import Any, Dict, List from swh.model.hashutil import MultiHash, hash_to_bytes from swh.journal.serializers import ModelObject from swh.journal.writer.kafka import OBJECT_TYPES CONTENTS = [ {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, ] duplicate_content1 = { "length": 4, "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), "sha1_git": b"another-foo", "blake2s256": b"another-bar", "sha256": b"another-baz", "status": "visible", } # Craft a sha1 collision duplicate_content2 = duplicate_content1.copy() sha1_array = bytearray(duplicate_content1["sha1_git"]) sha1_array[0] += 1 duplicate_content2["sha1_git"] = bytes(sha1_array) DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] COMMITTERS = [ {"fullname": b"foo", "name": b"foo", "email": b"",}, {"fullname": b"bar", "name": b"bar", "email": b"",}, ] DATES = [ { "timestamp": {"seconds": 1234567891, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, { "timestamp": {"seconds": 1234567892, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, ] REVISIONS = [ { "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), "message": b"hello", "date": DATES[0], "committer": COMMITTERS[0], "author": COMMITTERS[0], "committer_date": DATES[0], "type": "git", "directory": b"\x01" * 20, "synthetic": False, "metadata": None, "parents": [], }, { "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), "message": b"hello again", "date": DATES[1], "committer": COMMITTERS[1], "author": COMMITTERS[1], "committer_date": DATES[1], "type": "hg", "directory": b"\x02" * 20, "synthetic": False, "metadata": None, "parents": [], }, ] RELEASES = [ { "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "name": b"v0.0.1", "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, "author": COMMITTERS[0], "target_type": "revision", "target": b"\x04" * 20, "message": b"foo", "synthetic": False, }, ] ORIGINS = [ {"url": "https://somewhere.org/den/fox",}, {"url": "https://overtherainbow.org/fox/den",}, ] ORIGIN_VISITS = [ { "origin": ORIGINS[0]["url"], "date": "2013-05-07 04:20:39.369271+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"foo": "bar"}, "type": "git", }, { "origin": ORIGINS[0]["url"], "date": "2018-11-27 17:20:39+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"baz": "qux"}, "type": "git", }, ] TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { "content": CONTENTS, - "revision": REVISIONS, - "release": RELEASES, + "directory": [], "origin": ORIGINS, "origin_visit": ORIGIN_VISITS, + "release": RELEASES, + "revision": REVISIONS, + "snapshot": [], + "skipped_content": [], } MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} TEST_OBJECTS: Dict[str, List[ModelObject]] = {} for object_type, objects in TEST_OBJECT_DICTS.items(): converted_objects: List[ModelObject] = [] model = MODEL_OBJECTS[object_type] for (num, obj_d) in enumerate(objects): if object_type == "origin_visit": obj_d = {**obj_d, "visit": num} elif object_type == "content": obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()} converted_objects.append(model.from_dict(obj_d)) TEST_OBJECTS[object_type] = converted_objects diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index dad120e..35ddf15 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,233 +1,233 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List from unittest.mock import MagicMock from confluent_kafka import Producer import pytest from swh.model.hypothesis_strategies import revisions from swh.model.model import Content from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): kafka_prefix += ".swh.journal.objects" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) rev = revisions().example() # Fill Kafka producer.produce( topic=kafka_prefix + ".revision", key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): kafka_prefix += ".swh.journal.objects" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) rev = revisions().example() # Fill Kafka producer.produce( topic=kafka_prefix + ".revision", key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=None, stop_on_eof=True, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) @pytest.mark.parametrize("batch_size", [1, 5, 100]) def test_client_batch_size( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, ): kafka_prefix += ".swh.journal.objects" num_objects = 2 * batch_size + 1 assert num_objects < 256, "Too many objects, generation will fail" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) contents = [Content.from_data(bytes([i])) for i in range(num_objects)] # Fill Kafka for content in contents: producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(content.sha1), value=value_to_kafka(content.to_dict()), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=num_objects, batch_size=batch_size, ) collected_output: List[Dict] = [] def worker_fn(objects): received = objects["content"] assert len(received) <= batch_size collected_output.extend(received) client.process(worker_fn) expected_output = [content.to_dict() for content in contents] assert len(collected_output) == len(expected_output) for output in collected_output: assert output in expected_output @pytest.fixture() -def kafka_producer(kafka_prefix: str, kafka_server: str): +def kafka_producer(kafka_prefix: str, kafka_server_base: str): producer = Producer( { - "bootstrap.servers": kafka_server, + "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".something", key=key_to_kafka(b"key1"), value=value_to_kafka("value1"), ) producer.produce( topic=kafka_prefix + ".else", key=key_to_kafka(b"key1"), value=value_to_kafka("value2"), ) producer.flush() return producer def test_client_subscribe_all( - kafka_producer: Producer, kafka_prefix: str, kafka_server: str + kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( - brokers=[kafka_server], + brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=2, ) assert set(client.subscription) == { f"{kafka_prefix}.something", f"{kafka_prefix}.else", } worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with( {"something": ["value1"], "else": ["value2"],} ) def test_client_subscribe_one_topic( - kafka_producer: Producer, kafka_prefix: str, kafka_server: str + kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( - brokers=[kafka_server], + brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=1, object_types=["else"], ) assert client.subscription == [f"{kafka_prefix}.else"] worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"else": ["value2"]}) def test_client_subscribe_absent_topic( - kafka_producer: Producer, kafka_prefix: str, kafka_server: str + kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( - brokers=[kafka_server], + brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=1, object_types=["really"], ) def test_client_subscribe_absent_prefix( - kafka_producer: Producer, kafka_prefix: str, kafka_server: str + kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( - brokers=[kafka_server], + brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_after_objects=1, ) with pytest.raises(ValueError): JournalClient( - brokers=[kafka_server], + brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_after_objects=1, object_types=["else"], ) diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py new file mode 100644 index 0000000..c75970d --- /dev/null +++ b/swh/journal/tests/test_pytest_plugin.py @@ -0,0 +1,51 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Iterator +from confluent_kafka.admin import AdminClient + + +def test_kafka_server(kafka_server_base: str): + ip, port_str = kafka_server_base.split(":") + assert ip == "127.0.0.1" + assert int(port_str) + + admin = AdminClient({"bootstrap.servers": kafka_server_base}) + + topics = admin.list_topics() + + assert len(topics.brokers) == 1 + + +def test_kafka_server_with_topics( + kafka_server: str, kafka_prefix: str, object_types: Iterator[str] +): + admin = AdminClient({"bootstrap.servers": kafka_server}) + topics = { + topic + for topic in admin.list_topics().topics + if topic.startswith(f"{kafka_prefix}.") + } + assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types} + + +def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): + assert test_config == { + "consumer_id": "swh.journal.consumer", + "stop_after_objects": 1, + "storage": {"cls": "memory", "args": {}}, + "object_types": { + "content", + "directory", + "origin", + "origin_visit", + "release", + "revision", + "snapshot", + "skipped_content", + }, + "brokers": [kafka_server_base], + "prefix": kafka_prefix + ".swh.journal.objects", + }