Page MenuHomeSoftware Heritage

D7382.diff
No OneTemporary

D7382.diff

diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py
--- a/swh/dataset/cli.py
+++ b/swh/dataset/cli.py
@@ -51,7 +51,15 @@
@graph.command("export")
@click.argument("export-path", type=click.Path())
-@click.option("--export-id", "-e", help="Unique ID of the export run.")
+@click.option(
+ "--export-id",
+ "-e",
+ help=(
+ "Unique ID of the export run. This is appended to the kafka "
+ "group_id config file option. If group_id is not set in the "
+ "'journal' section of the config file, defaults to 'swh-dataset-export-'."
+ ),
+)
@click.option(
"--formats",
"-f",
diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py
--- a/swh/dataset/journalprocessor.py
+++ b/swh/dataset/journalprocessor.py
@@ -164,7 +164,8 @@
"""
self.config = config
self.exporters = exporters
- self.group_id = "swh-dataset-export-{}".format(export_id)
+ prefix = self.config["journal"].get("group_id", "swh-dataset-export-")
+ self.group_id = f"{prefix}{export_id}"
self.obj_type = obj_type
self.processes = processes
self.node_sets_path = node_sets_path
@@ -180,11 +181,10 @@
If available, use committed offsets as lo offset boundaries.
"""
if self.offsets is None:
- client = JournalClient(
- **self.config["journal"],
- object_types=[self.obj_type],
- group_id=self.group_id,
- )
+ cfg = self.config["journal"].copy()
+ cfg["object_types"] = [self.obj_type]
+ cfg["group_id"] = self.group_id
+ client = JournalClient(**cfg)
topic_name = client.subscription[0]
topics = client.consumer.list_topics(topic_name).topics
partitions = topics[topic_name].partitions
@@ -379,8 +379,9 @@
Start a Journal client on the given assignment and process all the
incoming messages.
"""
- client = JournalClientOffsetRanges(
- **self.config["journal"],
+ logger.debug("Start the JournalProcessorWorker")
+ cfg = self.config["journal"].copy()
+ cfg.update(
object_types=[self.obj_type],
group_id=self.group_id,
debug="cgrp,broker",
@@ -389,6 +390,7 @@
progress_queue=self.progress_queue,
**{"message.max.bytes": str(500 * 1024 * 1024)},
)
+ client = JournalClientOffsetRanges(**cfg)
client.process(self.process_messages)
def process_messages(self, messages):

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 12:47 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223945

Event Timeline