Page MenuHomeSoftware Heritage

D3151.diff
No OneTemporary

D3151.diff

diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -5,6 +5,9 @@
# 3rd party libraries without stubs (yet)
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
[mypy-elasticsearch.*]
ignore_missing_imports = True
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,4 +1,4 @@
# Add here internal Software Heritage dependencies, one per line.
swh.core[http]
-swh.journal >= 0.0.31
+swh.journal >= 0.1.0
swh.model
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1 +1,2 @@
pytest
+confluent-kafka
diff --git a/swh/search/tests/conftest.py b/swh/search/tests/conftest.py
--- a/swh/search/tests/conftest.py
+++ b/swh/search/tests/conftest.py
@@ -1,8 +1,9 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import logging
import socket
import subprocess
import time
@@ -10,6 +11,11 @@
import elasticsearch
import pytest
+from swh.search import get_search
+
+
+logger = logging.getLogger(__name__)
+
def free_port():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -109,3 +115,16 @@
@pytest.fixture(scope="class")
def elasticsearch_host(elasticsearch_session):
yield elasticsearch_session
+
+
+@pytest.fixture
+def swh_search(elasticsearch_host):
+ """Instantiate a search client, initialize the elasticsearch instance,
+ and returns it
+
+ """
+ logger.debug("swh_search: elasticsearch_host: %s", elasticsearch_host)
+ search = get_search("elasticsearch", {"hosts": [elasticsearch_host],})
+ search.deinitialize() # To reset internal state from previous runs
+ search.initialize() # install required index
+ yield search
diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py
--- a/swh/search/tests/test_cli.py
+++ b/swh/search/tests/test_cli.py
@@ -9,14 +9,12 @@
import pytest
-from unittest.mock import patch, MagicMock
+from confluent_kafka import Producer
from click.testing import CliRunner
from swh.journal.serializers import value_to_kafka
-from swh.journal.tests.utils import MockedKafkaConsumer
from swh.search.cli import cli
-from .test_elasticsearch import BaseElasticsearchTest
CLI_CONFIG = """
@@ -27,23 +25,15 @@
- '{elasticsearch_host}'
"""
-JOURNAL_OBJECTS_CONFIG = """
+JOURNAL_OBJECTS_CONFIG_TEMPLATE = """
journal:
brokers:
- - 192.0.2.1
- prefix: swh.journal.objects
- group_id: test-consumer
+ - {broker}
+ prefix: {prefix}
+ group_id: {group_id}
"""
-class MockedKafkaConsumerWithTopics(MockedKafkaConsumer):
- def list_topics(self, timeout=None):
- return {
- "swh.journal.objects.origin",
- "swh.journal.objects.origin_visit",
- }
-
-
def invoke(catch_exceptions, args, config="", *, elasticsearch_host):
runner = CliRunner()
with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
@@ -58,103 +48,126 @@
return result
-class CliTestCase(BaseElasticsearchTest):
- def test__journal_client__origin(self):
- """Tests the re-indexing when origin_batch_size*task_batch_size is a
- divisor of nb_origins."""
- topic = "swh.journal.objects.origin"
- value = value_to_kafka({"url": "http://foobar.baz",})
- message = MagicMock()
- message.error.return_value = None
- message.topic.return_value = topic
- message.value.return_value = value
-
- mock_consumer = MockedKafkaConsumerWithTopics([message])
-
- with patch("swh.journal.client.Consumer", return_value=mock_consumer):
- result = invoke(
- False,
- ["journal-client", "objects", "--stop-after-objects", "1",],
- JOURNAL_OBJECTS_CONFIG,
- elasticsearch_host=self._elasticsearch_host,
- )
-
- # Check the output
- expected_output = "Processed 1 messages.\nDone.\n"
- assert result.exit_code == 0, result.output
- assert result.output == expected_output
-
- self.search.flush()
-
- results = self.search.origin_search(url_pattern="foobar")
- assert results == {
- "next_page_token": None,
- "results": [{"url": "http://foobar.baz"}],
+def test__journal_client__origin(
+ swh_search, elasticsearch_host: str, kafka_prefix: str, kafka_server
+):
+ """Tests the re-indexing when origin_batch_size*task_batch_size is a
+ divisor of nb_origins."""
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test search origin producer",
+ "acks": "all",
}
+ )
+ value = value_to_kafka({"url": "http://foobar.baz",})
+ topic = f"{kafka_prefix}.origin"
+ producer.produce(topic=topic, key=b"bogus-origin", value=value)
+
+ journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer"
+ )
+ result = invoke(
+ False,
+ ["journal-client", "objects", "--stop-after-objects", "1",],
+ journal_objects_config,
+ elasticsearch_host=elasticsearch_host,
+ )
+
+ # Check the output
+ expected_output = "Processed 1 messages.\nDone.\n"
+ assert result.exit_code == 0, result.output
+ assert result.output == expected_output
+
+ swh_search.flush()
+
+ # searching origin without visit as requirement
+ results = swh_search.origin_search(url_pattern="foobar")
+ # We find it
+ assert results == {
+ "next_page_token": None,
+ "results": [{"url": "http://foobar.baz"}],
+ }
+
+ # It's an origin with no visit, searching for it with visit
+ results = swh_search.origin_search(url_pattern="foobar", with_visit=True)
+ # returns nothing
+ assert results == {"next_page_token": None, "results": []}
+
+
+def test__journal_client__origin_visit(
+ swh_search, elasticsearch_host, kafka_prefix: str, kafka_server
+):
+ """Tests the re-indexing when origin_batch_size*task_batch_size is a
+ divisor of nb_origins."""
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test search origin visit producer",
+ "acks": "all",
+ }
+ )
+ topic = f"{kafka_prefix}.origin_visit"
+ value = value_to_kafka({"origin": "http://baz.foobar",})
+ producer.produce(topic=topic, key=b"bogus-origin-visit", value=value)
+
+ journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer"
+ )
+ result = invoke(
+ False,
+ ["journal-client", "objects", "--stop-after-objects", "1",],
+ journal_objects_config,
+ elasticsearch_host=elasticsearch_host,
+ )
+
+ # Check the output
+ expected_output = "Processed 1 messages.\nDone.\n"
+ assert result.exit_code == 0, result.output
+ assert result.output == expected_output
+
+ swh_search.flush()
+
+ expected_result = {
+ "next_page_token": None,
+ "results": [{"url": "http://baz.foobar"}],
+ }
+ # Both search returns the visit
+ results = swh_search.origin_search(url_pattern="foobar", with_visit=False)
+ assert results == expected_result
+ results = swh_search.origin_search(url_pattern="foobar", with_visit=True)
+ assert results == expected_result
+
+
+def test__journal_client__missing_main_journal_config_key(elasticsearch_host):
+ """Missing configuration on journal should raise"""
+ with pytest.raises(KeyError, match="journal"):
+ invoke(
+ catch_exceptions=False,
+ args=["journal-client", "objects", "--stop-after-objects", "1",],
+ config="", # missing config will make it raise
+ elasticsearch_host=elasticsearch_host,
+ )
- results = self.search.origin_search(url_pattern="foobar", with_visit=True)
- assert results == {"next_page_token": None, "results": []}
-
- def test__journal_client__origin_visit(self):
- """Tests the re-indexing when origin_batch_size*task_batch_size is a
- divisor of nb_origins."""
- topic = "swh.journal.objects.origin_visit"
- value = value_to_kafka({"origin": "http://foobar.baz",})
- message = MagicMock()
- message.error.return_value = None
- message.topic.return_value = topic
- message.value.return_value = value
-
- mock_consumer = MockedKafkaConsumerWithTopics([message])
-
- with patch("swh.journal.client.Consumer", return_value=mock_consumer):
- result = invoke(
- False,
- ["journal-client", "objects", "--stop-after-objects", "1",],
- JOURNAL_OBJECTS_CONFIG,
- elasticsearch_host=self._elasticsearch_host,
- )
-
- # Check the output
- expected_output = "Processed 1 messages.\nDone.\n"
- assert result.exit_code == 0, result.output
- assert result.output == expected_output
- self.search.flush()
+def test__journal_client__missing_journal_config_keys(elasticsearch_host):
+ """Missing configuration on mandatory journal keys should raise"""
+ journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker="192.0.2.1", prefix="swh.journal.objects", group_id="test-consumer"
+ )
+ journal_config = yaml.safe_load(journal_objects_config)
- results = self.search.origin_search(url_pattern="foobar", with_visit=True)
- assert results == {
- "next_page_token": None,
- "results": [{"url": "http://foobar.baz"}],
- }
+ for key in journal_config["journal"].keys():
+ if key == "prefix": # optional
+ continue
+ cfg = copy.deepcopy(journal_config)
+ del cfg["journal"][key] # make config incomplete
+ yaml_cfg = yaml.dump(cfg)
- def test__journal_client__missing_main_journal_config_key(self):
- """Missing configuration on journal should raise"""
- with pytest.raises(KeyError, match="journal"):
+ with pytest.raises(TypeError, match=f"{key}"):
invoke(
catch_exceptions=False,
args=["journal-client", "objects", "--stop-after-objects", "1",],
- config="", # missing config will make it raise
- elasticsearch_host=self._elasticsearch_host,
+ config=yaml_cfg, # incomplete config will make the cli raise
+ elasticsearch_host=elasticsearch_host,
)
-
- def test__journal_client__missing_journal_config_keys(self):
- """Missing configuration on mandatory journal keys should raise"""
- journal_config = yaml.safe_load(
- JOURNAL_OBJECTS_CONFIG
- ) # default configuration which is fine
-
- for key in journal_config["journal"].keys():
- if key == "prefix": # optional
- continue
- cfg = copy.deepcopy(journal_config)
- del cfg["journal"][key] # make config incomplete
- yaml_cfg = yaml.dump(cfg)
-
- with pytest.raises(TypeError, match=f"{key}"):
- invoke(
- catch_exceptions=False,
- args=["journal-client", "objects", "--stop-after-objects", "1",],
- config=yaml_cfg, # incomplete config will make the cli raise
- elasticsearch_host=self._elasticsearch_host,
- )
diff --git a/swh/search/tests/test_elasticsearch.py b/swh/search/tests/test_elasticsearch.py
--- a/swh/search/tests/test_elasticsearch.py
+++ b/swh/search/tests/test_elasticsearch.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -7,18 +7,14 @@
import pytest
-from swh.search import get_search
from .test_search import CommonSearchTest
class BaseElasticsearchTest(unittest.TestCase):
@pytest.fixture(autouse=True)
- def _instantiate_search(self, elasticsearch_host):
+ def _instantiate_search(self, swh_search, elasticsearch_host):
self._elasticsearch_host = elasticsearch_host
- self.search = get_search("elasticsearch", {"hosts": [elasticsearch_host],})
-
- def setUp(self):
- self.reset()
+ self.search = swh_search
def reset(self):
self.search.deinitialize()

File Metadata

Mime Type
text/plain
Expires
Wed, Dec 18, 2:10 AM (1 d, 22 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214612

Event Timeline