diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -21,17 +21,6 @@ # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ["earliest", "latest"] -# Only accepted object types -ACCEPTED_OBJECT_TYPES = [ - "content", - "directory", - "revision", - "release", - "snapshot", - "origin", - "origin_visit", -] - # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ @@ -111,21 +100,12 @@ ): if prefix is None: prefix = DEFAULT_PREFIX - if object_types is None: - object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( "Option 'auto_offset_reset' only accept %s, not %s" % (ACCEPTED_OFFSET_RESET, auto_offset_reset) ) - for object_type in object_types: - if object_type not in ACCEPTED_OBJECT_TYPES: - raise ValueError( - "Option 'object_types' only accepts %s, not %s." - % (ACCEPTED_OBJECT_TYPES, object_type) - ) - if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") @@ -184,24 +164,40 @@ self.consumer = Consumer(consumer_settings) - self.subscription = [ - "%s.%s" % (prefix, object_type) for object_type in object_types - ] - self.subscribe() + existing_topics = self.consumer.list_topics(timeout=10).topics.keys() + if not [True for topic in existing_topics if topic.startswith(f"{prefix}.")]: + raise ValueError( + f"The prefix {prefix} does not match any existing topic " + "on the kafka broker" + ) + + if object_types: + unknown_topics = [] + for object_type in object_types: + topic = f"{prefix}.{object_type}" + if topic not in existing_topics: + unknown_topics.append(topic) + if unknown_topics: + raise ValueError( + f"Topic(s) {','.join(unknown_topics)} " + "are unknown on the kafka broker" + ) + self.subscription = [ + f"{prefix}.{object_type}" for object_type in object_types + ] + else: + # subscribe to every topic under the prefix + self.subscription = [f"^{prefix}[.]"] + + logger.debug(f"Upstream topics: {existing_topics}") + logger.debug(f"Subscribing to: {self.subscription}") + self.consumer.subscribe(topics=self.subscription) self.stop_after_objects = stop_after_objects self.process_timeout = process_timeout self.eof_reached: Set[Tuple[str, str]] = set() self.batch_size = batch_size - self._object_types = object_types - - def subscribe(self): - logger.debug("Upstream topics: %s", self.consumer.list_topics(timeout=10)) - logger.debug("Subscribing to: %s", self.subscription) - - self.consumer.subscribe(topics=self.subscription) - def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. @@ -266,11 +262,7 @@ continue nb_processed += 1 - object_type = message.topic().split(".")[-1] - # Got a message from a topic we did not subscribe to. - assert object_type in self._object_types, object_type - objects[object_type].append(self.deserialize_message(message)) if objects: diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -135,3 +135,96 @@ for output in collected_output: assert output in expected_output + + +@pytest.fixture() +def kafka_producer(kafka_prefix: str, kafka_server: str): + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test producer", + "acks": "all", + } + ) + + # Fill Kafka + producer.produce( + topic=kafka_prefix + ".something", + key=key_to_kafka(b"key1"), + value=value_to_kafka("value1"), + ) + producer.produce( + topic=kafka_prefix + ".else", + key=key_to_kafka(b"key1"), + value=value_to_kafka("value2"), + ) + producer.flush() + return producer + + +def test_client_subscribe_all( + kafka_producer: Producer, kafka_prefix: str, kafka_server: str +): + client = JournalClient( + brokers=kafka_server, + group_id="whatever", + prefix=kafka_prefix, + stop_after_objects=2, + ) + assert client.subscription == [f"^{kafka_prefix}[.]"] + + worker_fn = MagicMock() + client.process(worker_fn) + worker_fn.assert_called_once_with( + {"something": ["value1"], "else": ["value2"],} + ) + + +def test_client_subscribe_one_topic( + kafka_producer: Producer, kafka_prefix: str, kafka_server: str +): + client = JournalClient( + brokers=kafka_server, + group_id="whatever", + prefix=kafka_prefix, + stop_after_objects=1, + object_types=["else"], + ) + assert client.subscription == [f"{kafka_prefix}.else"] + + worker_fn = MagicMock() + client.process(worker_fn) + worker_fn.assert_called_once_with({"else": ["value2"]}) + + +def test_client_subscribe_absent_topic( + kafka_producer: Producer, kafka_prefix: str, kafka_server: str +): + with pytest.raises(ValueError): + JournalClient( + brokers=kafka_server, + group_id="whatever", + prefix=kafka_prefix, + stop_after_objects=1, + object_types=["really"], + ) + + +def test_client_subscribe_absent_prefix( + kafka_producer: Producer, kafka_prefix: str, kafka_server: str +): + with pytest.raises(ValueError): + JournalClient( + brokers=kafka_server, + group_id="whatever", + prefix="wrong.prefix", + stop_after_objects=1, + ) + with pytest.raises(ValueError): + JournalClient( + brokers=kafka_server, + group_id="whatever", + prefix="wrong.prefix", + stop_after_objects=1, + object_types=["else"], + ) diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -1,4 +1,4 @@ -from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES +from swh.journal.client import JournalClient from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka @@ -69,7 +69,7 @@ class MockedJournalClient(JournalClient): - def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): + def __init__(self, queue, object_types=None): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) self.process_timeout = None