storage = <swh.storage.in_memory.InMemoryStorage object at 0x7f93d044a7f0>
indexer_scheduler = <swh.scheduler.backend.SchedulerBackend object at 0x7f93d04400f0>
kafka_prefix = 'wiiwwxcfkf', kafka_server = '127.0.0.1:41057'
consumer = <cimpl.Consumer object at 0x7f93d0416400>
def test_journal_client(
storage, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer
):
"""Test the 'swh indexer journal-client' cli tool."""
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
STATUS = {"status": "full", "origin": {"url": "file://dev/0000",}}
producer.produce(
topic=kafka_prefix + ".origin_visit",
key=b"bogus",
value=value_to_kafka(STATUS),
)
result = invoke(
indexer_scheduler,
False,
[
"journal-client",
"--stop-after-objects",
"1",
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
> "test-consumer",
],
)
.tox/py3/lib/python3.7/site-packages/swh/indexer/tests/test_cli.py:373:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/indexer/tests/test_cli.py:98: in invoke
raise result.exception
.tox/py3/lib/python3.7/site-packages/click/testing.py:329: in invoke
cli.main(args=args or (), prog_name=prog_name, **extra)
.tox/py3/lib/python3.7/site-packages/click/core.py:782: in main
rv = self.invoke(ctx)
.tox/py3/lib/python3.7/site-packages/click/core.py:1259: in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
.tox/py3/lib/python3.7/site-packages/click/core.py:1066: in invoke
return ctx.invoke(self.callback, **ctx.params)
.tox/py3/lib/python3.7/site-packages/click/core.py:610: in invoke
return callback(*args, **kwargs)
.tox/py3/lib/python3.7/site-packages/click/decorators.py:21: in new_func
return f(get_current_context(), *args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
ctx = <click.core.Context object at 0x7f93d044ed30>, scheduler_url = None
origin_metadata_task_type = 'index-origin-metadata'
brokers = ('127.0.0.1:41057',), prefix = 'wiiwwxcfkf'
group_id = 'test-consumer', stop_after_objects = 1
@indexer_cli_group.command("journal-client")
@click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API")
@click.option(
"--origin-metadata-task-type",
default="index-origin-metadata",
help="Name of the task running the origin metadata indexer.",
)
@click.option(
"--broker", "brokers", type=str, multiple=True, help="Kafka broker to connect to."
)
@click.option(
"--prefix", type=str, default=None, help="Prefix of Kafka topic names to read from."
)
@click.option("--group-id", type=str, help="Consumer/group id for reading from Kafka.")
@click.option(
"--stop-after-objects",
"-m",
default=None,
type=int,
help="Maximum number of objects to replay. Default is to run forever.",
)
@click.pass_context
def journal_client(
ctx,
scheduler_url,
origin_metadata_task_type,
brokers,
prefix,
group_id,
stop_after_objects,
):
"""Listens for new objects from the SWH Journal, and schedules tasks
to run relevant indexers (currently, only origin-intrinsic-metadata)
on these new objects."""
import functools
from swh.indexer.journal_client import process_journal_objects
from swh.journal.client import get_journal_client
from swh.scheduler import get_scheduler
cfg = ctx.obj["config"]
journal_cfg = cfg.get("journal")
if not journal_cfg:
> raise ValueError("Configuration is missing the 'journal' key")
E ValueError: Configuration is missing the 'journal' key
.tox/py3/lib/python3.7/site-packages/swh/indexer/cli.py:256: ValueError
TEST RESULT
TEST RESULT
- Run At
- Nov 26 2020, 12:24 PM