diff --git a/mypy.ini b/mypy.ini index 51a26dc..5c756c5 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,18 +1,21 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) +[mypy-confluent_kafka.*] +ignore_missing_imports = True + [mypy-elasticsearch.*] ignore_missing_imports = True [mypy-msgpack.*] ignore_missing_imports = True [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt index 9c86f71..3e0520d 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. swh.core[http] -swh.journal >= 0.0.31 +swh.journal >= 0.1.0 swh.model diff --git a/requirements-test.txt b/requirements-test.txt index e079f8a..90fac90 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1 +1,2 @@ pytest +confluent-kafka diff --git a/swh/search/tests/conftest.py b/swh/search/tests/conftest.py index 9077aa0..1859fc2 100644 --- a/swh/search/tests/conftest.py +++ b/swh/search/tests/conftest.py @@ -1,111 +1,130 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging import socket import subprocess import time import elasticsearch import pytest +from swh.search import get_search + + +logger = logging.getLogger(__name__) + def free_port(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("127.0.0.1", 0)) port = sock.getsockname()[1] sock.close() return port def wait_for_peer(addr, port): while True: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((addr, port)) except ConnectionRefusedError: time.sleep(0.1) else: sock.close() break CONFIG_TEMPLATE = """ node.name: node-1 path.data: {data} path.logs: {logs} network.host: 127.0.0.1 http.port: {http_port} transport.port: {transport_port} """ def _run_elasticsearch(conf_dir, data_dir, logs_dir, http_port, transport_port): es_home = "/usr/share/elasticsearch" with open(conf_dir + "/elasticsearch.yml", "w") as fd: fd.write( CONFIG_TEMPLATE.format( data=data_dir, logs=logs_dir, http_port=http_port, transport_port=transport_port, ) ) with open(conf_dir + "/log4j2.properties", "w") as fd: pass cmd = [ "/usr/share/elasticsearch/jdk/bin/java", "-Des.path.home={}".format(es_home), "-Des.path.conf={}".format(conf_dir), "-Des.bundled_jdk=true", "-Dlog4j2.disable.jmx=true", "-cp", "{}/lib/*".format(es_home), "org.elasticsearch.bootstrap.Elasticsearch", ] host = "127.0.0.1:{}".format(http_port) with open(logs_dir + "/output.txt", "w") as fd: p = subprocess.Popen(cmd) wait_for_peer("127.0.0.1", http_port) client = elasticsearch.Elasticsearch([host]) assert client.ping() return p @pytest.fixture(scope="session") def elasticsearch_session(tmpdir_factory): tmpdir = tmpdir_factory.mktemp("elasticsearch") es_conf = tmpdir.mkdir("conf") http_port = free_port() transport_port = free_port() p = _run_elasticsearch( conf_dir=str(es_conf), data_dir=str(tmpdir.mkdir("data")), logs_dir=str(tmpdir.mkdir("logs")), http_port=http_port, transport_port=transport_port, ) yield "127.0.0.1:{}".format(http_port) # Check ES didn't stop assert p.returncode is None, p.returncode p.kill() p.wait() @pytest.fixture(scope="class") def elasticsearch_host(elasticsearch_session): yield elasticsearch_session + + +@pytest.fixture +def swh_search(elasticsearch_host): + """Instantiate a search client, initialize the elasticsearch instance, + and returns it + + """ + logger.debug("swh_search: elasticsearch_host: %s", elasticsearch_host) + search = get_search("elasticsearch", {"hosts": [elasticsearch_host],}) + search.deinitialize() # To reset internal state from previous runs + search.initialize() # install required index + yield search diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py index b3a409e..cfd8eb5 100644 --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -1,160 +1,173 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import tempfile import yaml import pytest -from unittest.mock import patch, MagicMock +from confluent_kafka import Producer from click.testing import CliRunner from swh.journal.serializers import value_to_kafka -from swh.journal.tests.utils import MockedKafkaConsumer from swh.search.cli import cli -from .test_elasticsearch import BaseElasticsearchTest CLI_CONFIG = """ search: cls: elasticsearch args: hosts: - '{elasticsearch_host}' """ -JOURNAL_OBJECTS_CONFIG = """ +JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ journal: brokers: - - 192.0.2.1 - prefix: swh.journal.objects - group_id: test-consumer + - {broker} + prefix: {prefix} + group_id: {group_id} """ -class MockedKafkaConsumerWithTopics(MockedKafkaConsumer): - def list_topics(self, timeout=None): - return { - "swh.journal.objects.origin", - "swh.journal.objects.origin_visit", - } - - def invoke(catch_exceptions, args, config="", *, elasticsearch_host): runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: config_fd.write( (CLI_CONFIG + config).format(elasticsearch_host=elasticsearch_host) ) config_fd.seek(0) result = runner.invoke(cli, ["-C" + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result -class CliTestCase(BaseElasticsearchTest): - def test__journal_client__origin(self): - """Tests the re-indexing when origin_batch_size*task_batch_size is a - divisor of nb_origins.""" - topic = "swh.journal.objects.origin" - value = value_to_kafka({"url": "http://foobar.baz",}) - message = MagicMock() - message.error.return_value = None - message.topic.return_value = topic - message.value.return_value = value - - mock_consumer = MockedKafkaConsumerWithTopics([message]) - - with patch("swh.journal.client.Consumer", return_value=mock_consumer): - result = invoke( - False, - ["journal-client", "objects", "--stop-after-objects", "1",], - JOURNAL_OBJECTS_CONFIG, - elasticsearch_host=self._elasticsearch_host, - ) - - # Check the output - expected_output = "Processed 1 messages.\nDone.\n" - assert result.exit_code == 0, result.output - assert result.output == expected_output - - self.search.flush() - - results = self.search.origin_search(url_pattern="foobar") - assert results == { - "next_page_token": None, - "results": [{"url": "http://foobar.baz"}], +def test__journal_client__origin( + swh_search, elasticsearch_host: str, kafka_prefix: str, kafka_server +): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test search origin producer", + "acks": "all", } + ) + value = value_to_kafka({"url": "http://foobar.baz",}) + topic = f"{kafka_prefix}.origin" + producer.produce(topic=topic, key=b"bogus-origin", value=value) + + journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( + broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" + ) + result = invoke( + False, + ["journal-client", "objects", "--stop-after-objects", "1",], + journal_objects_config, + elasticsearch_host=elasticsearch_host, + ) + + # Check the output + expected_output = "Processed 1 messages.\nDone.\n" + assert result.exit_code == 0, result.output + assert result.output == expected_output + + swh_search.flush() + + # searching origin without visit as requirement + results = swh_search.origin_search(url_pattern="foobar") + # We find it + assert results == { + "next_page_token": None, + "results": [{"url": "http://foobar.baz"}], + } + + # It's an origin with no visit, searching for it with visit + results = swh_search.origin_search(url_pattern="foobar", with_visit=True) + # returns nothing + assert results == {"next_page_token": None, "results": []} + + +def test__journal_client__origin_visit( + swh_search, elasticsearch_host, kafka_prefix: str, kafka_server +): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test search origin visit producer", + "acks": "all", + } + ) + topic = f"{kafka_prefix}.origin_visit" + value = value_to_kafka({"origin": "http://baz.foobar",}) + producer.produce(topic=topic, key=b"bogus-origin-visit", value=value) + + journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( + broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" + ) + result = invoke( + False, + ["journal-client", "objects", "--stop-after-objects", "1",], + journal_objects_config, + elasticsearch_host=elasticsearch_host, + ) + + # Check the output + expected_output = "Processed 1 messages.\nDone.\n" + assert result.exit_code == 0, result.output + assert result.output == expected_output + + swh_search.flush() + + expected_result = { + "next_page_token": None, + "results": [{"url": "http://baz.foobar"}], + } + # Both search returns the visit + results = swh_search.origin_search(url_pattern="foobar", with_visit=False) + assert results == expected_result + results = swh_search.origin_search(url_pattern="foobar", with_visit=True) + assert results == expected_result + + +def test__journal_client__missing_main_journal_config_key(elasticsearch_host): + """Missing configuration on journal should raise""" + with pytest.raises(KeyError, match="journal"): + invoke( + catch_exceptions=False, + args=["journal-client", "objects", "--stop-after-objects", "1",], + config="", # missing config will make it raise + elasticsearch_host=elasticsearch_host, + ) - results = self.search.origin_search(url_pattern="foobar", with_visit=True) - assert results == {"next_page_token": None, "results": []} - - def test__journal_client__origin_visit(self): - """Tests the re-indexing when origin_batch_size*task_batch_size is a - divisor of nb_origins.""" - topic = "swh.journal.objects.origin_visit" - value = value_to_kafka({"origin": "http://foobar.baz",}) - message = MagicMock() - message.error.return_value = None - message.topic.return_value = topic - message.value.return_value = value - - mock_consumer = MockedKafkaConsumerWithTopics([message]) - - with patch("swh.journal.client.Consumer", return_value=mock_consumer): - result = invoke( - False, - ["journal-client", "objects", "--stop-after-objects", "1",], - JOURNAL_OBJECTS_CONFIG, - elasticsearch_host=self._elasticsearch_host, - ) - - # Check the output - expected_output = "Processed 1 messages.\nDone.\n" - assert result.exit_code == 0, result.output - assert result.output == expected_output - self.search.flush() +def test__journal_client__missing_journal_config_keys(elasticsearch_host): + """Missing configuration on mandatory journal keys should raise""" + journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( + broker="192.0.2.1", prefix="swh.journal.objects", group_id="test-consumer" + ) + journal_config = yaml.safe_load(journal_objects_config) - results = self.search.origin_search(url_pattern="foobar", with_visit=True) - assert results == { - "next_page_token": None, - "results": [{"url": "http://foobar.baz"}], - } + for key in journal_config["journal"].keys(): + if key == "prefix": # optional + continue + cfg = copy.deepcopy(journal_config) + del cfg["journal"][key] # make config incomplete + yaml_cfg = yaml.dump(cfg) - def test__journal_client__missing_main_journal_config_key(self): - """Missing configuration on journal should raise""" - with pytest.raises(KeyError, match="journal"): + with pytest.raises(TypeError, match=f"{key}"): invoke( catch_exceptions=False, args=["journal-client", "objects", "--stop-after-objects", "1",], - config="", # missing config will make it raise - elasticsearch_host=self._elasticsearch_host, + config=yaml_cfg, # incomplete config will make the cli raise + elasticsearch_host=elasticsearch_host, ) - - def test__journal_client__missing_journal_config_keys(self): - """Missing configuration on mandatory journal keys should raise""" - journal_config = yaml.safe_load( - JOURNAL_OBJECTS_CONFIG - ) # default configuration which is fine - - for key in journal_config["journal"].keys(): - if key == "prefix": # optional - continue - cfg = copy.deepcopy(journal_config) - del cfg["journal"][key] # make config incomplete - yaml_cfg = yaml.dump(cfg) - - with pytest.raises(TypeError, match=f"{key}"): - invoke( - catch_exceptions=False, - args=["journal-client", "objects", "--stop-after-objects", "1",], - config=yaml_cfg, # incomplete config will make the cli raise - elasticsearch_host=self._elasticsearch_host, - ) diff --git a/swh/search/tests/test_elasticsearch.py b/swh/search/tests/test_elasticsearch.py index c5c185b..7fc24ed 100644 --- a/swh/search/tests/test_elasticsearch.py +++ b/swh/search/tests/test_elasticsearch.py @@ -1,29 +1,25 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import pytest -from swh.search import get_search from .test_search import CommonSearchTest class BaseElasticsearchTest(unittest.TestCase): @pytest.fixture(autouse=True) - def _instantiate_search(self, elasticsearch_host): + def _instantiate_search(self, swh_search, elasticsearch_host): self._elasticsearch_host = elasticsearch_host - self.search = get_search("elasticsearch", {"hosts": [elasticsearch_host],}) - - def setUp(self): - self.reset() + self.search = swh_search def reset(self): self.search.deinitialize() self.search.initialize() class TestElasticsearchSearch(CommonSearchTest, BaseElasticsearchTest): pass