diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -51,7 +51,15 @@ @graph.command("export") @click.argument("export-path", type=click.Path()) -@click.option("--export-id", "-e", help="Unique ID of the export run.") +@click.option( + "--export-id", + "-e", + help=( + "Unique ID of the export run. This is appended to the kafka " + "group_id config file option. If group_id is not set in the " + "'journal' section of the config file, defaults to 'swh-dataset-export-'." + ), +) @click.option( "--formats", "-f", diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -164,7 +164,8 @@ """ self.config = config self.exporters = exporters - self.group_id = "swh-dataset-export-{}".format(export_id) + prefix = self.config["journal"].get("group_id", "swh-dataset-export-") + self.group_id = f"{prefix}{export_id}" self.obj_type = obj_type self.processes = processes self.node_sets_path = node_sets_path @@ -180,11 +181,10 @@ If available, use committed offsets as lo offset boundaries. """ if self.offsets is None: - client = JournalClient( - **self.config["journal"], - object_types=[self.obj_type], - group_id=self.group_id, - ) + cfg = self.config["journal"].copy() + cfg["object_types"] = [self.obj_type] + cfg["group_id"] = self.group_id + client = JournalClient(**cfg) topic_name = client.subscription[0] topics = client.consumer.list_topics(topic_name).topics partitions = topics[topic_name].partitions @@ -379,8 +379,9 @@ Start a Journal client on the given assignment and process all the incoming messages. """ - client = JournalClientOffsetRanges( - **self.config["journal"], + logger.debug("Start the JournalProcessorWorker") + cfg = self.config["journal"].copy() + cfg.update( object_types=[self.obj_type], group_id=self.group_id, debug="cgrp,broker", @@ -389,6 +390,7 @@ progress_queue=self.progress_queue, **{"message.max.bytes": str(500 * 1024 * 1024)}, ) + client = JournalClientOffsetRanges(**cfg) client.process(self.process_messages) def process_messages(self, messages):