diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -249,7 +249,21 @@ from swh.journal.client import get_journal_client from swh.scheduler import get_scheduler - scheduler = _get_api(get_scheduler, ctx.obj["config"], "scheduler", scheduler_url) + cfg = ctx.obj["config"] + journal_cfg = cfg.get("journal", {}) + + scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url) + + brokers = brokers or journal_cfg.get("brokers") + if not brokers: + raise ValueError("The brokers configuration is mandatory.") + + prefix = prefix or journal_cfg.get("prefix") + group_id = group_id or journal_cfg.get("group_id") + origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get( + "origin_metadata_task_type" + ) + stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects") client = get_journal_client( cls="kafka", diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -11,6 +11,7 @@ from click.testing import CliRunner from confluent_kafka import Consumer, Producer +import pytest from swh.indexer.cli import indexer_cli_group from swh.indexer.storage.interface import IndexerStorageInterface @@ -383,3 +384,14 @@ tasks = indexer_scheduler.search_tasks() assert len(tasks) == 1 _assert_tasks_for_origins(tasks, [0]) + + +def test_journal_client_without_brokers( + storage, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer +): + """Without brokers configuration, the cli fails.""" + + with pytest.raises(ValueError, match="brokers"): + invoke( + indexer_scheduler, False, ["journal-client",], + )