Page MenuHomeSoftware Heritage

D8376.id30230.diff
No OneTemporary

D8376.id30230.diff

diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py
--- a/swh/indexer/cli.py
+++ b/swh/indexer/cli.py
@@ -288,17 +288,22 @@
scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url)
- brokers = brokers or journal_cfg.get("brokers")
- if not brokers:
+ if brokers:
+ journal_cfg["brokers"] = brokers
+ if not journal_cfg["brokers"]:
raise ValueError("The brokers configuration is mandatory.")
- prefix = prefix or journal_cfg.get("prefix")
- group_id = group_id or journal_cfg.get("group_id")
+ if prefix:
+ journal_cfg["prefix"] = prefix
+ if group_id:
+ journal_cfg["group_id"] = group_id
origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get(
"origin_metadata_task_type"
)
- stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects")
- batch_size = batch_size or journal_cfg.get("batch_size", 200)
+ if stop_after_objects:
+ journal_cfg["stop_after_objects"] = stop_after_objects
+ if batch_size:
+ journal_cfg["batch_size"] = batch_size
object_types = set()
worker_fns: List[Callable[[ObjectsDict], Dict]] = []
@@ -359,12 +364,8 @@
client = get_journal_client(
cls="kafka",
- brokers=brokers,
- prefix=prefix,
- group_id=group_id,
object_types=list(object_types),
- stop_after_objects=stop_after_objects,
- batch_size=batch_size,
+ **journal_cfg,
)
def worker_fn(objects: ObjectsDict):

File Metadata

Mime Type
text/plain
Expires
Dec 19 2024, 6:10 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3235085

Event Timeline