cli_runner = <click.testing.CliRunner object at 0x7f8cf2e52b70>
swh_config = '/tmp/pytest-of-jenkins/pytest-0/test_cli_journal_client_schedu0/indexer.yml'
indexer_scheduler = <swh.scheduler.backend.SchedulerBackend object at 0x7f8cf2f36400>
kafka_prefix = 'nfkdjvesnd', kafka_server = '127.0.0.1:36401'
consumer = <cimpl.Consumer object at 0x7f8cf2e5e408>
def test_cli_journal_client_schedule(
cli_runner,
swh_config,
indexer_scheduler,
kafka_prefix: str,
kafka_server,
consumer: Consumer,
):
"""Test the 'swh indexer journal-client' cli tool."""
journal_writer = get_journal_writer(
"kafka",
brokers=[kafka_server],
prefix=kafka_prefix,
client_id="test producer",
value_sanitizer=lambda object_type, value: value,
flush_timeout=3, # fail early if something is going wrong
)
visit_statuses = [
OriginVisitStatus(
origin="file:///dev/zero",
visit=1,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/foobar",
visit=2,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///tmp/spamegg",
visit=3,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/0002",
visit=6,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'partial' status
origin="file:///dev/0000",
visit=4,
date=now(),
status="partial",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'ongoing' status
origin="file:///dev/0001",
visit=5,
date=now(),
status="ongoing",
snapshot=None,
),
]
journal_writer.write_additions("origin_visit_status", visit_statuses)
visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"]
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"journal-client",
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
"test-consumer",
"--stop-after-objects",
len(visit_statuses),
"--origin-metadata-task-type",
"index-origin-metadata",
],
> catch_exceptions=False,
)
.tox/py3/lib/python3.7/site-packages/swh/indexer/tests/test_cli.py:491:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/click/testing.py:408: in invoke
return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
.tox/py3/lib/python3.7/site-packages/click/core.py:1055: in main
rv = self.invoke(ctx)
.tox/py3/lib/python3.7/site-packages/click/core.py:1657: in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
.tox/py3/lib/python3.7/site-packages/click/core.py:1404: in invoke
return ctx.invoke(self.callback, **ctx.params)
.tox/py3/lib/python3.7/site-packages/click/core.py:760: in invoke
return __callback(*args, **kwargs)
.tox/py3/lib/python3.7/site-packages/click/decorators.py:26: in new_func
return f(get_current_context(), *args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
ctx = <click.core.Context object at 0x7f8cf2dda358>, indexer = None
scheduler_url = None, origin_metadata_task_type = 'index-origin-metadata'
brokers = ('127.0.0.1:36401',), prefix = 'nfkdjvesnd'
group_id = 'test-consumer', stop_after_objects = 6
@indexer_cli_group.command("journal-client")
@click.argument(
"indexer",
type=click.Choice(["origin-intrinsic-metadata", "*"]),
required=False
# TODO: remove required=False after we stop using it
)
@click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API")
@click.option(
"--origin-metadata-task-type",
default="index-origin-metadata",
help="Name of the task running the origin metadata indexer.",
)
@click.option(
"--broker", "brokers", type=str, multiple=True, help="Kafka broker to connect to."
)
@click.option(
"--prefix", type=str, default=None, help="Prefix of Kafka topic names to read from."
)
@click.option("--group-id", type=str, help="Consumer/group id for reading from Kafka.")
@click.option(
"--stop-after-objects",
"-m",
default=None,
type=int,
help="Maximum number of objects to replay. Default is to run forever.",
)
@click.pass_context
def journal_client(
ctx,
indexer: Optional[str],
scheduler_url: str,
origin_metadata_task_type: str,
brokers: List[str],
prefix: str,
group_id: str,
stop_after_objects: Optional[int],
):
"""
Listens for new objects from the SWH Journal, and either:
* runs the indexer with the name passed as argument, if any
* schedules tasks to run relevant indexers (currently, only
origin-intrinsic-metadata) on these new objects otherwise.
Passing '*' as indexer name runs all indexers.
"""
import functools
import warnings
from swh.indexer.indexer import ObjectsDict
from swh.indexer.journal_client import process_journal_objects
from swh.journal.client import get_journal_client
from swh.scheduler import get_scheduler
cfg = ctx.obj["config"]
journal_cfg = cfg.get("journal", {})
scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url)
brokers = brokers or journal_cfg.get("brokers")
if not brokers:
raise ValueError("The brokers configuration is mandatory.")
prefix = prefix or journal_cfg.get("prefix")
group_id = group_id or journal_cfg.get("group_id")
origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get(
"origin_metadata_task_type"
)
stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects")
object_types = set()
worker_fns: List[Callable[[ObjectsDict], Dict]] = []
if indexer is None:
warnings.warn(
"'swh indexer journal-client' with no argument creates scheduler tasks "
"to index, rather than index directly.",
DeprecationWarning,
)
object_types.add("origin_visit_status")
worker_fns.append(
functools.partial(
process_journal_objects,
scheduler=scheduler,
task_names={
"origin_metadata": origin_metadata_task_type,
},
)
)
from swh.indexer.indexer import BaseIndexer
idx: BaseIndexer
if indexer in ("origin-intrinsic-metadata", "*"):
from swh.indexer.metadata import OriginMetadataIndexer
object_types.add("origin_visit_status")
idx = OriginMetadataIndexer()
if indexer in ("content-mimetype", "*"):
from swh.indexer.mimetype import MimetypeIndexer
object_types.add("content")
idx = MimetypeIndexer()
if indexer in ("content-fossology-license", "*"):
from swh.indexer.fossology_license import FossologyLicenseIndexer
object_types.add("content")
idx = FossologyLicenseIndexer()
> if idx:
E UnboundLocalError: local variable 'idx' referenced before assignment
.tox/py3/lib/python3.7/site-packages/swh/indexer/cli.py:327: UnboundLocalError
TEST RESULT
TEST RESULT
- Run At
- Jul 20 2022, 7:23 PM