cli_runner = <click.testing.CliRunner object at 0x7fa617ae9c50>
swh_config = '/tmp/pytest-of-jenkins/pytest-0/test_cli_journal_client_withou0/indexer.yml'
kafka_prefix = 'tllrzfbkui', kafka_server = '127.0.0.1:36733'
consumer = <cimpl.Consumer object at 0x7fa617bdf5e8>
def test_cli_journal_client_without_brokers(
cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer
):
"""Without brokers configuration, the cli fails."""
with pytest.raises(ValueError, match="brokers"):
cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"journal-client",
],
> catch_exceptions=False,
)
.tox/py3/lib/python3.7/site-packages/swh/indexer/tests/test_cli.py:556:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/click/testing.py:408: in invoke
return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
.tox/py3/lib/python3.7/site-packages/click/core.py:1055: in main
rv = self.invoke(ctx)
.tox/py3/lib/python3.7/site-packages/click/core.py:1657: in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
.tox/py3/lib/python3.7/site-packages/click/core.py:1404: in invoke
return ctx.invoke(self.callback, **ctx.params)
.tox/py3/lib/python3.7/site-packages/click/core.py:760: in invoke
return __callback(*args, **kwargs)
.tox/py3/lib/python3.7/site-packages/click/decorators.py:26: in new_func
return f(get_current_context(), *args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
ctx = <click.core.Context object at 0x7fa617b01c50>, indexer = None
scheduler_url = None, origin_metadata_task_type = 'index-origin-metadata'
brokers = (), prefix = None, group_id = None, stop_after_objects = None
batch_size = None
@indexer_cli_group.command("journal-client")
@click.argument(
"indexer",
type=click.Choice(
[
"origin_intrinsic_metadata",
"extrinsic_metadata",
"content_mimetype",
"content_fossology_license",
"*",
]
),
required=False
# TODO: remove required=False after we stop using it
)
@click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API")
@click.option(
"--origin-metadata-task-type",
default="index-origin-metadata",
help="Name of the task running the origin metadata indexer.",
)
@click.option(
"--broker", "brokers", type=str, multiple=True, help="Kafka broker to connect to."
)
@click.option(
"--prefix", type=str, default=None, help="Prefix of Kafka topic names to read from."
)
@click.option("--group-id", type=str, help="Consumer/group id for reading from Kafka.")
@click.option(
"--stop-after-objects",
"-m",
default=None,
type=int,
help="Maximum number of objects to replay. Default is to run forever.",
)
@click.option(
"--batch-size",
"-b",
default=None,
type=int,
help="Batch size. Default is 200.",
)
@click.pass_context
def journal_client(
ctx,
indexer: Optional[str],
scheduler_url: str,
origin_metadata_task_type: str,
brokers: List[str],
prefix: str,
group_id: str,
stop_after_objects: Optional[int],
batch_size: Optional[int],
):
"""
Listens for new objects from the SWH Journal, and either:
* runs the indexer with the name passed as argument, if any
* schedules tasks to run relevant indexers (currently, only
origin_intrinsic_metadata) on these new objects otherwise.
Passing '*' as indexer name runs all indexers.
"""
import functools
import warnings
from swh.indexer.indexer import BaseIndexer, ObjectsDict
from swh.indexer.journal_client import process_journal_objects
from swh.journal.client import get_journal_client
from swh.scheduler import get_scheduler
cfg = ctx.obj["config"]
journal_cfg = cfg.get("journal", {})
scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url)
if brokers:
journal_cfg["brokers"] = brokers
> if not journal_cfg["brokers"]:
E KeyError: 'brokers'
.tox/py3/lib/python3.7/site-packages/swh/indexer/cli.py:293: KeyError
TEST RESULT
TEST RESULT
- Run At
- Sep 1 2022, 2:08 PM