diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -288,17 +288,22 @@ scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url) - brokers = brokers or journal_cfg.get("brokers") - if not brokers: + if brokers: + journal_cfg["brokers"] = brokers + if not journal_cfg["brokers"]: raise ValueError("The brokers configuration is mandatory.") - prefix = prefix or journal_cfg.get("prefix") - group_id = group_id or journal_cfg.get("group_id") + if prefix: + journal_cfg["prefix"] = prefix + if group_id: + journal_cfg["group_id"] = group_id origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get( "origin_metadata_task_type" ) - stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects") - batch_size = batch_size or journal_cfg.get("batch_size", 200) + if stop_after_objects: + journal_cfg["stop_after_objects"] = stop_after_objects + if batch_size: + journal_cfg["batch_size"] = batch_size object_types = set() worker_fns: List[Callable[[ObjectsDict], Dict]] = [] @@ -359,12 +364,8 @@ client = get_journal_client( cls="kafka", - brokers=brokers, - prefix=prefix, - group_id=group_id, object_types=list(object_types), - stop_after_objects=stop_after_objects, - batch_size=batch_size, + **journal_cfg, ) def worker_fn(objects: ObjectsDict):