diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -21,17 +21,6 @@ # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ["earliest", "latest"] -# Only accepted object types -ACCEPTED_OBJECT_TYPES = [ - "content", - "directory", - "revision", - "release", - "snapshot", - "origin", - "origin_visit", -] - # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ @@ -73,7 +62,8 @@ `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the - `object_types` argument (if unset, defaults to all accepted object types). + `object_types` argument (if unset, defaults to all existing kafka topic under + the prefix). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message @@ -111,21 +101,12 @@ ): if prefix is None: prefix = DEFAULT_PREFIX - if object_types is None: - object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( "Option 'auto_offset_reset' only accept %s, not %s" % (ACCEPTED_OFFSET_RESET, auto_offset_reset) ) - for object_type in object_types: - if object_type not in ACCEPTED_OBJECT_TYPES: - raise ValueError( - "Option 'object_types' only accepts %s, not %s." - % (ACCEPTED_OBJECT_TYPES, object_type) - ) - if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") @@ -184,9 +165,34 @@ self.consumer = Consumer(consumer_settings) - self.subscription = [ - "%s.%s" % (prefix, object_type) for object_type in object_types - ] + existing_topics = self.consumer.list_topics(timeout=10).topics.keys() + if not any(topic.startswith(f"{prefix}.") for topic in existing_topics): + raise ValueError( + f"The prefix {prefix} does not match any existing topic " + "on the kafka broker" + ) + + if object_types: + unknown_topics = [] + for object_type in object_types: + topic = f"{prefix}.{object_type}" + if topic not in existing_topics: + unknown_topics.append(topic) + if unknown_topics: + raise ValueError( + f"Topic(s) {','.join(unknown_topics)} " + "are unknown on the kafka broker" + ) + self.subscription = [ + f"{prefix}.{object_type}" for object_type in object_types + ] + else: + # subscribe to every topic under the prefix + self.subscription = [ + topic for topic in existing_topics if topic.startswith(prefix) + ] + + logger.debug(f"Upstream topics: {existing_topics}") self.subscribe() self.stop_after_objects = stop_after_objects @@ -194,12 +200,12 @@ self.eof_reached: Set[Tuple[str, str]] = set() self.batch_size = batch_size - self._object_types = object_types - def subscribe(self): - logger.debug("Upstream topics: %s", self.consumer.list_topics(timeout=10)) - logger.debug("Subscribing to: %s", self.subscription) + """Subscribe to topics listed in self.subscription + This can be overridden if you need, for instance, to manually assign partitions. + """ + logger.debug(f"Subscribing to: {self.subscription}") self.consumer.subscribe(topics=self.subscription) def process(self, worker_fn): @@ -266,11 +272,7 @@ continue nb_processed += 1 - object_type = message.topic().split(".")[-1] - # Got a message from a topic we did not subscribe to. - assert object_type in self._object_types, object_type - objects[object_type].append(self.deserialize_message(message)) if objects: diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -38,7 +38,7 @@ producer.flush() client = JournalClient( - brokers=kafka_server, + brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, @@ -71,7 +71,7 @@ producer.flush() client = JournalClient( - brokers=kafka_server, + brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=None, @@ -135,3 +135,99 @@ for output in collected_output: assert output in expected_output + + +@pytest.fixture() +def kafka_producer(kafka_prefix: str, kafka_server: str): + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test producer", + "acks": "all", + } + ) + + # Fill Kafka + producer.produce( + topic=kafka_prefix + ".something", + key=key_to_kafka(b"key1"), + value=value_to_kafka("value1"), + ) + producer.produce( + topic=kafka_prefix + ".else", + key=key_to_kafka(b"key1"), + value=value_to_kafka("value2"), + ) + producer.flush() + return producer + + +def test_client_subscribe_all( + kafka_producer: Producer, kafka_prefix: str, kafka_server: str +): + client = JournalClient( + brokers=[kafka_server], + group_id="whatever", + prefix=kafka_prefix, + stop_after_objects=2, + ) + assert set(client.subscription) == { + f"{kafka_prefix}.something", + f"{kafka_prefix}.else", + } + + worker_fn = MagicMock() + client.process(worker_fn) + worker_fn.assert_called_once_with( + {"something": ["value1"], "else": ["value2"],} + ) + + +def test_client_subscribe_one_topic( + kafka_producer: Producer, kafka_prefix: str, kafka_server: str +): + client = JournalClient( + brokers=[kafka_server], + group_id="whatever", + prefix=kafka_prefix, + stop_after_objects=1, + object_types=["else"], + ) + assert client.subscription == [f"{kafka_prefix}.else"] + + worker_fn = MagicMock() + client.process(worker_fn) + worker_fn.assert_called_once_with({"else": ["value2"]}) + + +def test_client_subscribe_absent_topic( + kafka_producer: Producer, kafka_prefix: str, kafka_server: str +): + with pytest.raises(ValueError): + JournalClient( + brokers=[kafka_server], + group_id="whatever", + prefix=kafka_prefix, + stop_after_objects=1, + object_types=["really"], + ) + + +def test_client_subscribe_absent_prefix( + kafka_producer: Producer, kafka_prefix: str, kafka_server: str +): + with pytest.raises(ValueError): + JournalClient( + brokers=[kafka_server], + group_id="whatever", + prefix="wrong.prefix", + stop_after_objects=1, + ) + with pytest.raises(ValueError): + JournalClient( + brokers=[kafka_server], + group_id="whatever", + prefix="wrong.prefix", + stop_after_objects=1, + object_types=["else"], + ) diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -1,4 +1,4 @@ -from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES +from swh.journal.client import JournalClient from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka @@ -69,7 +69,7 @@ class MockedJournalClient(JournalClient): - def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): + def __init__(self, queue, object_types=None): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) self.process_timeout = None