cli_runner = <click.testing.CliRunner object at 0x7f969956b2e8>
swh_config = '/tmp/pytest-of-jenkins/pytest-0/test_cli_journal_client0/indexer.yml'
indexer_scheduler = <swh.scheduler.backend.SchedulerBackend object at 0x7f96994e85c0>
kafka_prefix = 'ynxlcjjpeo', kafka_server = '127.0.0.1:48669'
consumer = <cimpl.Consumer object at 0x7f9699502c80>
def test_cli_journal_client(
cli_runner,
swh_config,
indexer_scheduler,
kafka_prefix: str,
kafka_server,
consumer: Consumer,
):
"""Test the 'swh indexer journal-client' cli tool."""
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
visit_statuses = [
OriginVisitStatus(
origin="file:///dev/zero",
visit=1,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/foobar",
visit=2,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///tmp/spamegg",
visit=3,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'partial' status
origin="file:///dev/0000",
visit=4,
date=now(),
status="partial",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'ongoing' status
origin="file:///dev/0001",
visit=5,
date=now(),
status="ongoing",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/0002",
visit=6,
date=now(),
status="full",
snapshot=None,
),
]
topic = f"{kafka_prefix}.origin_visit_status"
for visit_status in visit_statuses:
visit_status_d = visit_status.to_dict()
visit_status_d.pop("date") # to avoid serialization issue about datetime
key = key_to_kafka(visit_status_d)
value = value_to_kafka(visit_status.to_dict())
print(f"key: {key!r}, value: {value!r}")
result = producer.produce(topic=topic, key=key, value=value)
print(f"result: {result}")
producer.flush()
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"journal-client",
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
"test-consumer",
"--stop-after-objects",
3,
"--origin-metadata-task-type",
"index-origin-metadata",
],
catch_exceptions=False,
)
# Check the output
expected_output = "Done.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata")
# more than 3 visit statuses exists
assert len([vs for vs in visit_statuses if vs.status == "full"]) > 3
# so we should have 3 visit status scheduled (we stopped at stop-after-objects to 3)
> assert len(tasks) == 3
E assert 1 == 3
E +1
E -3
.tox/py3/lib/python3.7/site-packages/swh/indexer/tests/test_cli.py:480: AssertionError
TEST RESULT
TEST RESULT
- Run At
- Dec 2 2020, 9:27 AM