diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -5,6 +5,9 @@ # 3rd party libraries without stubs (yet) +[mypy-confluent_kafka.*] +ignore_missing_imports = True + [mypy-elasticsearch.*] ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. swh.core[http] -swh.journal >= 0.0.31 +swh.journal >= 0.1.0 swh.model diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1 +1,2 @@ pytest +confluent-kafka diff --git a/swh/search/tests/conftest.py b/swh/search/tests/conftest.py --- a/swh/search/tests/conftest.py +++ b/swh/search/tests/conftest.py @@ -1,8 +1,9 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging import socket import subprocess import time @@ -10,6 +11,11 @@ import elasticsearch import pytest +from swh.search import get_search + + +logger = logging.getLogger(__name__) + def free_port(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -109,3 +115,16 @@ @pytest.fixture(scope="class") def elasticsearch_host(elasticsearch_session): yield elasticsearch_session + + +@pytest.fixture +def swh_search(elasticsearch_host): + """Instantiate a search client, initialize the elasticsearch instance, + and returns it + + """ + logger.debug("swh_search: elasticsearch_host: %s", elasticsearch_host) + search = get_search("elasticsearch", {"hosts": [elasticsearch_host],}) + search.initialize() + yield search + search.deinitialize() diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -9,14 +9,12 @@ import pytest -from unittest.mock import patch, MagicMock +from confluent_kafka import Producer from click.testing import CliRunner from swh.journal.serializers import value_to_kafka -from swh.journal.tests.utils import MockedKafkaConsumer from swh.search.cli import cli -from .test_elasticsearch import BaseElasticsearchTest CLI_CONFIG = """ @@ -27,21 +25,17 @@ - '{elasticsearch_host}' """ -JOURNAL_OBJECTS_CONFIG = """ +JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ journal: brokers: - - 192.0.2.1 - prefix: swh.journal.objects - group_id: test-consumer + - {broker} + prefix: {prefix} + group_id: {group_id} """ - -class MockedKafkaConsumerWithTopics(MockedKafkaConsumer): - def list_topics(self, timeout=None): - return { - "swh.journal.objects.origin", - "swh.journal.objects.origin_visit", - } +JOURNAL_OBJECTS_CONFIG = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( + broker="192.0.2.1", prefix="swh.journal.objects", group_id="test-consumer" +) def invoke(catch_exceptions, args, config="", *, elasticsearch_host): @@ -58,103 +52,125 @@ return result -class CliTestCase(BaseElasticsearchTest): - def test__journal_client__origin(self): - """Tests the re-indexing when origin_batch_size*task_batch_size is a - divisor of nb_origins.""" - topic = "swh.journal.objects.origin" - value = value_to_kafka({"url": "http://foobar.baz",}) - message = MagicMock() - message.error.return_value = None - message.topic.return_value = topic - message.value.return_value = value - - mock_consumer = MockedKafkaConsumerWithTopics([message]) - - with patch("swh.journal.client.Consumer", return_value=mock_consumer): - result = invoke( - False, - ["journal-client", "objects", "--stop-after-objects", "1",], - JOURNAL_OBJECTS_CONFIG, - elasticsearch_host=self._elasticsearch_host, - ) - - # Check the output - expected_output = "Processed 1 messages.\nDone.\n" - assert result.exit_code == 0, result.output - assert result.output == expected_output - - self.search.flush() - - results = self.search.origin_search(url_pattern="foobar") - assert results == { - "next_page_token": None, - "results": [{"url": "http://foobar.baz"}], +def test__journal_client__origin( + swh_search, elasticsearch_host: str, kafka_prefix: str, kafka_server +): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test search origin producer", + "acks": "all", } + ) + value = value_to_kafka({"url": "http://foobar.baz",}) + topic = f"{kafka_prefix}.origin" + producer.produce(topic=topic, key=b"bogus-origin", value=value) + + journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( + broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" + ) + result = invoke( + False, + ["journal-client", "objects", "--stop-after-objects", "1",], + journal_objects_config, + elasticsearch_host=elasticsearch_host, + ) + + # Check the output + expected_output = "Processed 1 messages.\nDone.\n" + assert result.exit_code == 0, result.output + assert result.output == expected_output + + swh_search.flush() + + # It's an origin with no visit, searching for it with visit + results = swh_search.origin_search(url_pattern="foobar", with_visit=True) + # returns nothing + assert results == {"next_page_token": None, "results": []} + + # searching origin without visit as requirement + results = swh_search.origin_search(url_pattern="foobar", with_visit=False) + # We find it + assert results == { + "next_page_token": None, + "results": [{"url": "http://foobar.baz"}], + } + + +def test__journal_client__origin_visit( + swh_search, elasticsearch_host, kafka_prefix: str, kafka_server +): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test search origin visit producer", + "acks": "all", + } + ) + topic = f"{kafka_prefix}.origin_visit" + value = value_to_kafka({"origin": "http://baz.foobar",}) + producer.produce(topic=topic, key=b"bogus-origin-visit", value=value) + + journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( + broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" + ) + result = invoke( + False, + ["journal-client", "objects", "--stop-after-objects", "1",], + journal_objects_config, + elasticsearch_host=elasticsearch_host, + ) + + # Check the output + expected_output = "Processed 1 messages.\nDone.\n" + assert result.exit_code == 0, result.output + assert result.output == expected_output + + swh_search.flush() + + expected_result = { + "next_page_token": None, + "results": [{"url": "http://baz.foobar"}], + } + # Both search returns the visit + results = swh_search.origin_search(url_pattern="foobar", with_visit=False) + assert results == expected_result + results = swh_search.origin_search(url_pattern="foobar", with_visit=True) + assert results == expected_result + + +def test__journal_client__missing_main_journal_config_key(elasticsearch_host): + """Missing configuration on journal should raise""" + with pytest.raises(KeyError, match="journal"): + invoke( + catch_exceptions=False, + args=["journal-client", "objects", "--stop-after-objects", "1",], + config="", # missing config will make it raise + elasticsearch_host=elasticsearch_host, + ) - results = self.search.origin_search(url_pattern="foobar", with_visit=True) - assert results == {"next_page_token": None, "results": []} - - def test__journal_client__origin_visit(self): - """Tests the re-indexing when origin_batch_size*task_batch_size is a - divisor of nb_origins.""" - topic = "swh.journal.objects.origin_visit" - value = value_to_kafka({"origin": "http://foobar.baz",}) - message = MagicMock() - message.error.return_value = None - message.topic.return_value = topic - message.value.return_value = value - - mock_consumer = MockedKafkaConsumerWithTopics([message]) - - with patch("swh.journal.client.Consumer", return_value=mock_consumer): - result = invoke( - False, - ["journal-client", "objects", "--stop-after-objects", "1",], - JOURNAL_OBJECTS_CONFIG, - elasticsearch_host=self._elasticsearch_host, - ) - - # Check the output - expected_output = "Processed 1 messages.\nDone.\n" - assert result.exit_code == 0, result.output - assert result.output == expected_output - self.search.flush() +def test__journal_client__missing_journal_config_keys(elasticsearch_host): + """Missing configuration on mandatory journal keys should raise""" + journal_config = yaml.safe_load( + JOURNAL_OBJECTS_CONFIG + ) # default configuration which is fine - results = self.search.origin_search(url_pattern="foobar", with_visit=True) - assert results == { - "next_page_token": None, - "results": [{"url": "http://foobar.baz"}], - } + for key in journal_config["journal"].keys(): + if key == "prefix": # optional + continue + cfg = copy.deepcopy(journal_config) + del cfg["journal"][key] # make config incomplete + yaml_cfg = yaml.dump(cfg) - def test__journal_client__missing_main_journal_config_key(self): - """Missing configuration on journal should raise""" - with pytest.raises(KeyError, match="journal"): + with pytest.raises(TypeError, match=f"{key}"): invoke( catch_exceptions=False, args=["journal-client", "objects", "--stop-after-objects", "1",], - config="", # missing config will make it raise - elasticsearch_host=self._elasticsearch_host, + config=yaml_cfg, # incomplete config will make the cli raise + elasticsearch_host=elasticsearch_host, ) - - def test__journal_client__missing_journal_config_keys(self): - """Missing configuration on mandatory journal keys should raise""" - journal_config = yaml.safe_load( - JOURNAL_OBJECTS_CONFIG - ) # default configuration which is fine - - for key in journal_config["journal"].keys(): - if key == "prefix": # optional - continue - cfg = copy.deepcopy(journal_config) - del cfg["journal"][key] # make config incomplete - yaml_cfg = yaml.dump(cfg) - - with pytest.raises(TypeError, match=f"{key}"): - invoke( - catch_exceptions=False, - args=["journal-client", "objects", "--stop-after-objects", "1",], - config=yaml_cfg, # incomplete config will make the cli raise - elasticsearch_host=self._elasticsearch_host, - )