Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/tests/test_cli.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from functools import reduce | from functools import reduce | ||||
import re | import re | ||||
import tempfile | import tempfile | ||||
from unittest.mock import patch | from unittest.mock import patch | ||||
from click.testing import CliRunner | from click.testing import CliRunner | ||||
from confluent_kafka import Consumer, Producer | |||||
from swh.journal.tests.utils import FakeKafkaMessage, MockedKafkaConsumer | from swh.journal.serializers import value_to_kafka | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.indexer.cli import cli | from swh.indexer.cli import cli | ||||
CLI_CONFIG = """ | CLI_CONFIG = """ | ||||
scheduler: | scheduler: | ||||
cls: foo | cls: foo | ||||
▲ Show 20 Lines • Show All 308 Lines • ▼ Show 20 Lines | ): | ||||
assert result.output == expected_output | assert result.output == expected_output | ||||
# Check scheduled tasks | # Check scheduled tasks | ||||
tasks = indexer_scheduler.search_tasks() | tasks = indexer_scheduler.search_tasks() | ||||
assert len(tasks) == 6 | assert len(tasks) == 6 | ||||
_assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) | _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) | ||||
def test_journal_client(storage, indexer_scheduler): | def test_journal_client( | ||||
storage, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer | |||||
): | |||||
"""Test the 'swh indexer journal-client' cli tool.""" | """Test the 'swh indexer journal-client' cli tool.""" | ||||
message = FakeKafkaMessage( | producer = Producer( | ||||
"swh.journal.objects.origin_visit", | { | ||||
"bogus", | "bootstrap.servers": kafka_server, | ||||
{"status": "full", "origin": {"url": "file://dev/0000",}}, | "client.id": "test producer", | ||||
"acks": "all", | |||||
} | |||||
) | ) | ||||
consumer = MockedKafkaConsumer([message]) | STATUS = {"status": "full", "origin": {"url": "file://dev/0000",}} | ||||
producer.produce( | |||||
topic=kafka_prefix + ".origin_visit", | |||||
key=b"bogus", | |||||
value=value_to_kafka(STATUS), | |||||
) | |||||
with patch("swh.journal.client.Consumer", return_value=consumer): | |||||
result = invoke( | result = invoke( | ||||
indexer_scheduler, | indexer_scheduler, | ||||
False, | False, | ||||
[ | [ | ||||
"journal-client", | "journal-client", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
"1", | "1", | ||||
"--broker", | "--broker", | ||||
"192.0.2.1", | kafka_server, | ||||
"--prefix", | "--prefix", | ||||
"swh.journal.objects", | kafka_prefix, | ||||
"--group-id", | "--group-id", | ||||
"test-consumer", | "test-consumer", | ||||
], | ], | ||||
) | ) | ||||
# Check the output | # Check the output | ||||
expected_output = "Done.\n" | expected_output = "Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert result.output == expected_output | assert result.output == expected_output | ||||
# Check scheduled tasks | # Check scheduled tasks | ||||
tasks = indexer_scheduler.search_tasks() | tasks = indexer_scheduler.search_tasks() | ||||
assert len(tasks) == 1 | assert len(tasks) == 1 | ||||
_assert_tasks_for_origins(tasks, [0]) | _assert_tasks_for_origins(tasks, [0]) |