Changeset View
Changeset View
Standalone View
Standalone View
swh/search/tests/test_cli.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import copy | import copy | ||||
import tempfile | import tempfile | ||||
import yaml | import yaml | ||||
import pytest | import pytest | ||||
from unittest.mock import patch, MagicMock | from confluent_kafka import Producer | ||||
from click.testing import CliRunner | from click.testing import CliRunner | ||||
from swh.journal.serializers import value_to_kafka | from swh.journal.serializers import value_to_kafka | ||||
from swh.journal.tests.utils import MockedKafkaConsumer | |||||
from swh.search.cli import cli | from swh.search.cli import cli | ||||
from .test_elasticsearch import BaseElasticsearchTest | |||||
CLI_CONFIG = """ | CLI_CONFIG = """ | ||||
search: | search: | ||||
cls: elasticsearch | cls: elasticsearch | ||||
args: | args: | ||||
hosts: | hosts: | ||||
- '{elasticsearch_host}' | - '{elasticsearch_host}' | ||||
""" | """ | ||||
JOURNAL_OBJECTS_CONFIG = """ | JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ | ||||
journal: | journal: | ||||
brokers: | brokers: | ||||
- 192.0.2.1 | - {broker} | ||||
prefix: swh.journal.objects | prefix: {prefix} | ||||
group_id: test-consumer | group_id: {group_id} | ||||
""" | """ | ||||
JOURNAL_OBJECTS_CONFIG = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( | |||||
class MockedKafkaConsumerWithTopics(MockedKafkaConsumer): | broker="192.0.2.1", prefix="swh.journal.objects", group_id="test-consumer" | ||||
douardda: why this hardcoded IP for the broker? | |||||
ardumontAuthorUnsubmitted Done Inline ActionsMaybe because it was mock before. This one is really used by the last tests in ardumont: Maybe because it was mock before. This one is really used by the last tests in
this test file… | |||||
vlorentzUnsubmitted Not Done Inline Actionsbecause it needs to be set to something. vlorentz: because it needs to be set to something. | |||||
vlorentzUnsubmitted Not Done Inline ActionsPlus, it's in the TEST-NET-1 range, so there's really no harm in it vlorentz: Plus, it's in the TEST-NET-1 range, so there's really no harm in it | |||||
douarddaUnsubmitted Not Done Inline Actionswe are nitpicking, sure, but since this global var is used in one test only, I see no reason to keep it as a global var. It's at best confusing, even if harmless, as you stated below. Harmless for the computer, not for the human reading / maintaining the code IMHO. douardda: we are nitpicking, sure, but since this global var is used in one test only, I see no reason to… | |||||
ardumontAuthorUnsubmitted Done Inline Actionsah yes, it's only used once now. It was used everywhere before. I missed it. ardumont: ah yes, it's only used once now. It was used everywhere before. I missed it. | |||||
def list_topics(self, timeout=None): | ) | ||||
return { | |||||
"swh.journal.objects.origin", | |||||
"swh.journal.objects.origin_visit", | |||||
} | |||||
def invoke(catch_exceptions, args, config="", *, elasticsearch_host): | def invoke(catch_exceptions, args, config="", *, elasticsearch_host): | ||||
runner = CliRunner() | runner = CliRunner() | ||||
with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: | with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: | ||||
config_fd.write( | config_fd.write( | ||||
(CLI_CONFIG + config).format(elasticsearch_host=elasticsearch_host) | (CLI_CONFIG + config).format(elasticsearch_host=elasticsearch_host) | ||||
) | ) | ||||
config_fd.seek(0) | config_fd.seek(0) | ||||
result = runner.invoke(cli, ["-C" + config_fd.name] + args) | result = runner.invoke(cli, ["-C" + config_fd.name] + args) | ||||
if not catch_exceptions and result.exception: | if not catch_exceptions and result.exception: | ||||
print(result.output) | print(result.output) | ||||
raise result.exception | raise result.exception | ||||
return result | return result | ||||
class CliTestCase(BaseElasticsearchTest): | def test__journal_client__origin( | ||||
def test__journal_client__origin(self): | swh_search, elasticsearch_host: str, kafka_prefix: str, kafka_server | ||||
): | |||||
"""Tests the re-indexing when origin_batch_size*task_batch_size is a | """Tests the re-indexing when origin_batch_size*task_batch_size is a | ||||
divisor of nb_origins.""" | divisor of nb_origins.""" | ||||
topic = "swh.journal.objects.origin" | producer = Producer( | ||||
{ | |||||
"bootstrap.servers": kafka_server, | |||||
"client.id": "test search origin producer", | |||||
"acks": "all", | |||||
} | |||||
) | |||||
value = value_to_kafka({"url": "http://foobar.baz",}) | value = value_to_kafka({"url": "http://foobar.baz",}) | ||||
message = MagicMock() | topic = f"{kafka_prefix}.origin" | ||||
message.error.return_value = None | producer.produce(topic=topic, key=b"bogus-origin", value=value) | ||||
message.topic.return_value = topic | |||||
message.value.return_value = value | |||||
mock_consumer = MockedKafkaConsumerWithTopics([message]) | |||||
with patch("swh.journal.client.Consumer", return_value=mock_consumer): | journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( | ||||
broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" | |||||
) | |||||
result = invoke( | result = invoke( | ||||
False, | False, | ||||
["journal-client", "objects", "--stop-after-objects", "1",], | ["journal-client", "objects", "--stop-after-objects", "1",], | ||||
JOURNAL_OBJECTS_CONFIG, | journal_objects_config, | ||||
elasticsearch_host=self._elasticsearch_host, | elasticsearch_host=elasticsearch_host, | ||||
) | ) | ||||
# Check the output | # Check the output | ||||
expected_output = "Processed 1 messages.\nDone.\n" | expected_output = "Processed 1 messages.\nDone.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert result.output == expected_output | assert result.output == expected_output | ||||
self.search.flush() | swh_search.flush() | ||||
results = self.search.origin_search(url_pattern="foobar") | # searching origin without visit as requirement | ||||
results = swh_search.origin_search(url_pattern="foobar") | |||||
# We find it | |||||
assert results == { | assert results == { | ||||
"next_page_token": None, | "next_page_token": None, | ||||
"results": [{"url": "http://foobar.baz"}], | "results": [{"url": "http://foobar.baz"}], | ||||
} | } | ||||
results = self.search.origin_search(url_pattern="foobar", with_visit=True) | # It's an origin with no visit, searching for it with visit | ||||
results = swh_search.origin_search(url_pattern="foobar", with_visit=True) | |||||
# returns nothing | |||||
assert results == {"next_page_token": None, "results": []} | assert results == {"next_page_token": None, "results": []} | ||||
def test__journal_client__origin_visit(self): | |||||
def test__journal_client__origin_visit( | |||||
swh_search, elasticsearch_host, kafka_prefix: str, kafka_server | |||||
): | |||||
"""Tests the re-indexing when origin_batch_size*task_batch_size is a | """Tests the re-indexing when origin_batch_size*task_batch_size is a | ||||
divisor of nb_origins.""" | divisor of nb_origins.""" | ||||
topic = "swh.journal.objects.origin_visit" | producer = Producer( | ||||
value = value_to_kafka({"origin": "http://foobar.baz",}) | { | ||||
message = MagicMock() | "bootstrap.servers": kafka_server, | ||||
message.error.return_value = None | "client.id": "test search origin visit producer", | ||||
message.topic.return_value = topic | "acks": "all", | ||||
message.value.return_value = value | } | ||||
) | |||||
mock_consumer = MockedKafkaConsumerWithTopics([message]) | topic = f"{kafka_prefix}.origin_visit" | ||||
value = value_to_kafka({"origin": "http://baz.foobar",}) | |||||
producer.produce(topic=topic, key=b"bogus-origin-visit", value=value) | |||||
with patch("swh.journal.client.Consumer", return_value=mock_consumer): | journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( | ||||
broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" | |||||
) | |||||
result = invoke( | result = invoke( | ||||
False, | False, | ||||
["journal-client", "objects", "--stop-after-objects", "1",], | ["journal-client", "objects", "--stop-after-objects", "1",], | ||||
JOURNAL_OBJECTS_CONFIG, | journal_objects_config, | ||||
elasticsearch_host=self._elasticsearch_host, | elasticsearch_host=elasticsearch_host, | ||||
) | ) | ||||
# Check the output | # Check the output | ||||
expected_output = "Processed 1 messages.\nDone.\n" | expected_output = "Processed 1 messages.\nDone.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert result.output == expected_output | assert result.output == expected_output | ||||
self.search.flush() | swh_search.flush() | ||||
results = self.search.origin_search(url_pattern="foobar", with_visit=True) | expected_result = { | ||||
assert results == { | |||||
"next_page_token": None, | "next_page_token": None, | ||||
"results": [{"url": "http://foobar.baz"}], | "results": [{"url": "http://baz.foobar"}], | ||||
} | } | ||||
# Both search returns the visit | |||||
results = swh_search.origin_search(url_pattern="foobar", with_visit=False) | |||||
assert results == expected_result | |||||
results = swh_search.origin_search(url_pattern="foobar", with_visit=True) | |||||
assert results == expected_result | |||||
def test__journal_client__missing_main_journal_config_key(self): | |||||
def test__journal_client__missing_main_journal_config_key(elasticsearch_host): | |||||
"""Missing configuration on journal should raise""" | """Missing configuration on journal should raise""" | ||||
with pytest.raises(KeyError, match="journal"): | with pytest.raises(KeyError, match="journal"): | ||||
invoke( | invoke( | ||||
catch_exceptions=False, | catch_exceptions=False, | ||||
args=["journal-client", "objects", "--stop-after-objects", "1",], | args=["journal-client", "objects", "--stop-after-objects", "1",], | ||||
config="", # missing config will make it raise | config="", # missing config will make it raise | ||||
elasticsearch_host=self._elasticsearch_host, | elasticsearch_host=elasticsearch_host, | ||||
) | ) | ||||
def test__journal_client__missing_journal_config_keys(self): | |||||
def test__journal_client__missing_journal_config_keys(elasticsearch_host): | |||||
"""Missing configuration on mandatory journal keys should raise""" | """Missing configuration on mandatory journal keys should raise""" | ||||
journal_config = yaml.safe_load( | journal_config = yaml.safe_load( | ||||
JOURNAL_OBJECTS_CONFIG | JOURNAL_OBJECTS_CONFIG | ||||
) # default configuration which is fine | ) # default configuration which is fine | ||||
for key in journal_config["journal"].keys(): | for key in journal_config["journal"].keys(): | ||||
if key == "prefix": # optional | if key == "prefix": # optional | ||||
continue | continue | ||||
cfg = copy.deepcopy(journal_config) | cfg = copy.deepcopy(journal_config) | ||||
del cfg["journal"][key] # make config incomplete | del cfg["journal"][key] # make config incomplete | ||||
yaml_cfg = yaml.dump(cfg) | yaml_cfg = yaml.dump(cfg) | ||||
with pytest.raises(TypeError, match=f"{key}"): | with pytest.raises(TypeError, match=f"{key}"): | ||||
invoke( | invoke( | ||||
catch_exceptions=False, | catch_exceptions=False, | ||||
args=["journal-client", "objects", "--stop-after-objects", "1",], | args=["journal-client", "objects", "--stop-after-objects", "1",], | ||||
config=yaml_cfg, # incomplete config will make the cli raise | config=yaml_cfg, # incomplete config will make the cli raise | ||||
elasticsearch_host=self._elasticsearch_host, | elasticsearch_host=elasticsearch_host, | ||||
) | ) |
why this hardcoded IP for the broker?