diff --git a/swh/search/cli.py b/swh/search/cli.py --- a/swh/search/cli.py +++ b/swh/search/cli.py @@ -59,14 +59,10 @@ "--object-type", "-o", multiple=True, - default=["origin", "origin_visit", "origin_visit_status"], help="Default list of object types to subscribe to", ) @click.option( - "--prefix", - "-p", - default="swh.journal.objects", - help="Topic prefix to use (e.g swh.journal.indexed)", + "--prefix", "-p", help="Topic prefix to use (e.g swh.journal.indexed)", ) @click.pass_context def journal_client_objects(ctx, stop_after_objects, object_type, prefix): @@ -85,15 +81,19 @@ config = ctx.obj["config"] journal_cfg = config["journal"] - object_types = object_type or journal_cfg.get("object_types") - prefix = prefix or journal_cfg.get("prefix") - - client = get_journal_client( - cls="kafka", - object_types=object_types, - stop_after_objects=stop_after_objects, - **journal_cfg, + journal_cfg["object_types"] = object_type or journal_cfg.get("object_types", []) + journal_cfg["prefix"] = prefix or journal_cfg.get("prefix") + journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get( + "stop_after_objects" ) + + if len(journal_cfg["object_types"]) == 0: + raise ValueError("'object_types' must be specified by cli or configuration") + + if journal_cfg["prefix"] is None: + raise ValueError("'prefix' must be specified by cli or configuration") + + client = get_journal_client(cls="kafka", **journal_cfg,) search = get_search(**config["search"]) worker_fn = functools.partial(process_journal_objects, search=search,) diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -185,7 +185,16 @@ ) result = invoke( False, - ["journal-client", "objects", "--stop-after-objects", "1",], + [ + "journal-client", + "objects", + "--stop-after-objects", + "1", + "--prefix", + kafka_prefix, + "--object-type", + "origin_visit_status", + ], journal_objects_config, elasticsearch_host=elasticsearch_host, ) @@ -250,9 +259,12 @@ result = invoke( False, [ - "journal-client", "objects", - "--stop-after-objects", "1", - "--object-type", "origin_intrinsic_metadata" + "journal-client", + "objects", + "--stop-after-objects", + "1", + "--object-type", + "origin_intrinsic_metadata", ], journal_objects_config, elasticsearch_host=elasticsearch_host, @@ -289,8 +301,9 @@ def test__journal_client__missing_journal_config_keys(elasticsearch_host): """Missing configuration on mandatory journal keys should raise""" + kafka_prefix = "swh.journal.objects" journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( - broker="192.0.2.1", prefix="swh.journal.objects", group_id="test-consumer" + broker="192.0.2.1", prefix=kafka_prefix, group_id="test-consumer" ) journal_config = yaml.safe_load(journal_objects_config) @@ -304,7 +317,76 @@ with pytest.raises(TypeError, match=f"{key}"): invoke( catch_exceptions=False, - args=["journal-client", "objects", "--stop-after-objects", "1",], + args=[ + "journal-client", + "objects", + "--stop-after-objects", + "1", + "--prefix", + kafka_prefix, + "--object-type", + "origin_visit_status", + ], config=yaml_cfg, # incomplete config will make the cli raise elasticsearch_host=elasticsearch_host, ) + + +def test__journal_client__missing_prefix_config_key( + swh_search, elasticsearch_host, kafka_server +): + """Missing configuration on mandatory prefix key should raise""" + + journal_cfg_template = """ +journal: + brokers: + - {broker} + group_id: {group_id} + """ + + journal_cfg = journal_cfg_template.format( + broker=kafka_server, group_id="test-consumer" + ) + + with pytest.raises(ValueError, match="prefix"): + invoke( + False, + # Missing --prefix (and no config key) will make the cli raise + [ + "journal-client", + "objects", + "--stop-after-objects", + "1", + "--object-type", + "origin_visit_status", + ], + journal_cfg, + elasticsearch_host=elasticsearch_host, + ) + + +def test__journal_client__missing_object_types_config_key( + swh_search, elasticsearch_host, kafka_server +): + """Missing configuration on mandatory object-types key should raise""" + + journal_cfg_template = """ +journal: + brokers: + - {broker} + prefix: swh.journal.objects + group_id: {group_id} + """ + + journal_cfg = journal_cfg_template.format( + broker=kafka_server, group_id="test-consumer" + ) + + with pytest.raises(ValueError, match="object_types"): + invoke( + False, + # Missing --object-types (and no config key) will make the cli raise + ["journal-client", "objects", "--stop-after-objects", "1"], + journal_cfg, + elasticsearch_host=elasticsearch_host, + )