diff --git a/swh/search/cli.py b/swh/search/cli.py --- a/swh/search/cli.py +++ b/swh/search/cli.py @@ -55,11 +55,20 @@ type=int, help="Maximum number of objects to replay. Default is to run forever.", ) +@click.option( + "--object-type", + "-o", + multiple=True, + default=["origin", "origin_visit"], + help="Default list of object types to subscribe to", +) @click.pass_context -def journal_client_objects(ctx, stop_after_objects): +def journal_client_objects(ctx, stop_after_objects, object_type): """Listens for new objects from the SWH Journal, and schedules tasks - to run relevant indexers (currently, only origin) - on these new objects.""" + to run relevant indexers (currently, origin and origin_visit) + on these new objects. + + """ import functools from swh.journal.client import get_journal_client @@ -70,9 +79,11 @@ config = ctx.obj["config"] journal_cfg = config["journal"] + object_types = object_type or journal_cfg.get("object_types") + client = get_journal_client( cls="kafka", - object_types=["origin", "origin_visit"], + object_types=object_types, stop_after_objects=stop_after_objects, **journal_cfg, ) diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -69,7 +69,14 @@ ) result = invoke( False, - ["journal-client", "objects", "--stop-after-objects", "1",], + [ + "journal-client", + "objects", + "--stop-after-objects", + "1", + "--object-type", + "origin", + ], journal_objects_config, elasticsearch_host=elasticsearch_host, ) @@ -116,7 +123,14 @@ ) result = invoke( False, - ["journal-client", "objects", "--stop-after-objects", "1",], + [ + "journal-client", + "objects", + "--stop-after-objects", + "1", + "--object-type", + "origin_visit", + ], journal_objects_config, elasticsearch_host=elasticsearch_host, )