diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -174,12 +174,10 @@ self.consumer = Consumer(consumer_settings) - topics = ["%s.%s" % (prefix, object_type) for object_type in object_types] - - logger.debug("Upstream topics: %s", self.consumer.list_topics(timeout=10)) - logger.debug("Subscribing to: %s", topics) - - self.consumer.subscribe(topics=topics) + self.subscription = [ + "%s.%s" % (prefix, object_type) for object_type in object_types + ] + self.subscribe() self.stop_after_objects = stop_after_objects self.process_timeout = process_timeout @@ -188,6 +186,12 @@ self._object_types = object_types + def subscribe(self): + logger.debug("Upstream topics: %s", self.consumer.list_topics(timeout=10)) + logger.debug("Subscribing to: %s", self.subscription) + + self.consumer.subscribe(topics=self.subscription) + def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages.