Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123189
D3151.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
D3151.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -5,6 +5,9 @@
# 3rd party libraries without stubs (yet)
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
[mypy-elasticsearch.*]
ignore_missing_imports = True
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,4 +1,4 @@
# Add here internal Software Heritage dependencies, one per line.
swh.core[http]
-swh.journal >= 0.0.31
+swh.journal >= 0.1.0
swh.model
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1 +1,2 @@
pytest
+confluent-kafka
diff --git a/swh/search/tests/conftest.py b/swh/search/tests/conftest.py
--- a/swh/search/tests/conftest.py
+++ b/swh/search/tests/conftest.py
@@ -1,8 +1,9 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import logging
import socket
import subprocess
import time
@@ -10,6 +11,11 @@
import elasticsearch
import pytest
+from swh.search import get_search
+
+
+logger = logging.getLogger(__name__)
+
def free_port():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -109,3 +115,16 @@
@pytest.fixture(scope="class")
def elasticsearch_host(elasticsearch_session):
yield elasticsearch_session
+
+
+@pytest.fixture
+def swh_search(elasticsearch_host):
+ """Instantiate a search client, initialize the elasticsearch instance,
+ and returns it
+
+ """
+ logger.debug("swh_search: elasticsearch_host: %s", elasticsearch_host)
+ search = get_search("elasticsearch", {"hosts": [elasticsearch_host],})
+ search.deinitialize() # To reset internal state from previous runs
+ search.initialize() # install required index
+ yield search
diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py
--- a/swh/search/tests/test_cli.py
+++ b/swh/search/tests/test_cli.py
@@ -9,14 +9,12 @@
import pytest
-from unittest.mock import patch, MagicMock
+from confluent_kafka import Producer
from click.testing import CliRunner
from swh.journal.serializers import value_to_kafka
-from swh.journal.tests.utils import MockedKafkaConsumer
from swh.search.cli import cli
-from .test_elasticsearch import BaseElasticsearchTest
CLI_CONFIG = """
@@ -27,23 +25,15 @@
- '{elasticsearch_host}'
"""
-JOURNAL_OBJECTS_CONFIG = """
+JOURNAL_OBJECTS_CONFIG_TEMPLATE = """
journal:
brokers:
- - 192.0.2.1
- prefix: swh.journal.objects
- group_id: test-consumer
+ - {broker}
+ prefix: {prefix}
+ group_id: {group_id}
"""
-class MockedKafkaConsumerWithTopics(MockedKafkaConsumer):
- def list_topics(self, timeout=None):
- return {
- "swh.journal.objects.origin",
- "swh.journal.objects.origin_visit",
- }
-
-
def invoke(catch_exceptions, args, config="", *, elasticsearch_host):
runner = CliRunner()
with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
@@ -58,103 +48,126 @@
return result
-class CliTestCase(BaseElasticsearchTest):
- def test__journal_client__origin(self):
- """Tests the re-indexing when origin_batch_size*task_batch_size is a
- divisor of nb_origins."""
- topic = "swh.journal.objects.origin"
- value = value_to_kafka({"url": "http://foobar.baz",})
- message = MagicMock()
- message.error.return_value = None
- message.topic.return_value = topic
- message.value.return_value = value
-
- mock_consumer = MockedKafkaConsumerWithTopics([message])
-
- with patch("swh.journal.client.Consumer", return_value=mock_consumer):
- result = invoke(
- False,
- ["journal-client", "objects", "--stop-after-objects", "1",],
- JOURNAL_OBJECTS_CONFIG,
- elasticsearch_host=self._elasticsearch_host,
- )
-
- # Check the output
- expected_output = "Processed 1 messages.\nDone.\n"
- assert result.exit_code == 0, result.output
- assert result.output == expected_output
-
- self.search.flush()
-
- results = self.search.origin_search(url_pattern="foobar")
- assert results == {
- "next_page_token": None,
- "results": [{"url": "http://foobar.baz"}],
+def test__journal_client__origin(
+ swh_search, elasticsearch_host: str, kafka_prefix: str, kafka_server
+):
+ """Tests the re-indexing when origin_batch_size*task_batch_size is a
+ divisor of nb_origins."""
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test search origin producer",
+ "acks": "all",
}
+ )
+ value = value_to_kafka({"url": "http://foobar.baz",})
+ topic = f"{kafka_prefix}.origin"
+ producer.produce(topic=topic, key=b"bogus-origin", value=value)
+
+ journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer"
+ )
+ result = invoke(
+ False,
+ ["journal-client", "objects", "--stop-after-objects", "1",],
+ journal_objects_config,
+ elasticsearch_host=elasticsearch_host,
+ )
+
+ # Check the output
+ expected_output = "Processed 1 messages.\nDone.\n"
+ assert result.exit_code == 0, result.output
+ assert result.output == expected_output
+
+ swh_search.flush()
+
+ # searching origin without visit as requirement
+ results = swh_search.origin_search(url_pattern="foobar")
+ # We find it
+ assert results == {
+ "next_page_token": None,
+ "results": [{"url": "http://foobar.baz"}],
+ }
+
+ # It's an origin with no visit, searching for it with visit
+ results = swh_search.origin_search(url_pattern="foobar", with_visit=True)
+ # returns nothing
+ assert results == {"next_page_token": None, "results": []}
+
+
+def test__journal_client__origin_visit(
+ swh_search, elasticsearch_host, kafka_prefix: str, kafka_server
+):
+ """Tests the re-indexing when origin_batch_size*task_batch_size is a
+ divisor of nb_origins."""
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test search origin visit producer",
+ "acks": "all",
+ }
+ )
+ topic = f"{kafka_prefix}.origin_visit"
+ value = value_to_kafka({"origin": "http://baz.foobar",})
+ producer.produce(topic=topic, key=b"bogus-origin-visit", value=value)
+
+ journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer"
+ )
+ result = invoke(
+ False,
+ ["journal-client", "objects", "--stop-after-objects", "1",],
+ journal_objects_config,
+ elasticsearch_host=elasticsearch_host,
+ )
+
+ # Check the output
+ expected_output = "Processed 1 messages.\nDone.\n"
+ assert result.exit_code == 0, result.output
+ assert result.output == expected_output
+
+ swh_search.flush()
+
+ expected_result = {
+ "next_page_token": None,
+ "results": [{"url": "http://baz.foobar"}],
+ }
+ # Both search returns the visit
+ results = swh_search.origin_search(url_pattern="foobar", with_visit=False)
+ assert results == expected_result
+ results = swh_search.origin_search(url_pattern="foobar", with_visit=True)
+ assert results == expected_result
+
+
+def test__journal_client__missing_main_journal_config_key(elasticsearch_host):
+ """Missing configuration on journal should raise"""
+ with pytest.raises(KeyError, match="journal"):
+ invoke(
+ catch_exceptions=False,
+ args=["journal-client", "objects", "--stop-after-objects", "1",],
+ config="", # missing config will make it raise
+ elasticsearch_host=elasticsearch_host,
+ )
- results = self.search.origin_search(url_pattern="foobar", with_visit=True)
- assert results == {"next_page_token": None, "results": []}
-
- def test__journal_client__origin_visit(self):
- """Tests the re-indexing when origin_batch_size*task_batch_size is a
- divisor of nb_origins."""
- topic = "swh.journal.objects.origin_visit"
- value = value_to_kafka({"origin": "http://foobar.baz",})
- message = MagicMock()
- message.error.return_value = None
- message.topic.return_value = topic
- message.value.return_value = value
-
- mock_consumer = MockedKafkaConsumerWithTopics([message])
-
- with patch("swh.journal.client.Consumer", return_value=mock_consumer):
- result = invoke(
- False,
- ["journal-client", "objects", "--stop-after-objects", "1",],
- JOURNAL_OBJECTS_CONFIG,
- elasticsearch_host=self._elasticsearch_host,
- )
-
- # Check the output
- expected_output = "Processed 1 messages.\nDone.\n"
- assert result.exit_code == 0, result.output
- assert result.output == expected_output
- self.search.flush()
+def test__journal_client__missing_journal_config_keys(elasticsearch_host):
+ """Missing configuration on mandatory journal keys should raise"""
+ journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker="192.0.2.1", prefix="swh.journal.objects", group_id="test-consumer"
+ )
+ journal_config = yaml.safe_load(journal_objects_config)
- results = self.search.origin_search(url_pattern="foobar", with_visit=True)
- assert results == {
- "next_page_token": None,
- "results": [{"url": "http://foobar.baz"}],
- }
+ for key in journal_config["journal"].keys():
+ if key == "prefix": # optional
+ continue
+ cfg = copy.deepcopy(journal_config)
+ del cfg["journal"][key] # make config incomplete
+ yaml_cfg = yaml.dump(cfg)
- def test__journal_client__missing_main_journal_config_key(self):
- """Missing configuration on journal should raise"""
- with pytest.raises(KeyError, match="journal"):
+ with pytest.raises(TypeError, match=f"{key}"):
invoke(
catch_exceptions=False,
args=["journal-client", "objects", "--stop-after-objects", "1",],
- config="", # missing config will make it raise
- elasticsearch_host=self._elasticsearch_host,
+ config=yaml_cfg, # incomplete config will make the cli raise
+ elasticsearch_host=elasticsearch_host,
)
-
- def test__journal_client__missing_journal_config_keys(self):
- """Missing configuration on mandatory journal keys should raise"""
- journal_config = yaml.safe_load(
- JOURNAL_OBJECTS_CONFIG
- ) # default configuration which is fine
-
- for key in journal_config["journal"].keys():
- if key == "prefix": # optional
- continue
- cfg = copy.deepcopy(journal_config)
- del cfg["journal"][key] # make config incomplete
- yaml_cfg = yaml.dump(cfg)
-
- with pytest.raises(TypeError, match=f"{key}"):
- invoke(
- catch_exceptions=False,
- args=["journal-client", "objects", "--stop-after-objects", "1",],
- config=yaml_cfg, # incomplete config will make the cli raise
- elasticsearch_host=self._elasticsearch_host,
- )
diff --git a/swh/search/tests/test_elasticsearch.py b/swh/search/tests/test_elasticsearch.py
--- a/swh/search/tests/test_elasticsearch.py
+++ b/swh/search/tests/test_elasticsearch.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -7,18 +7,14 @@
import pytest
-from swh.search import get_search
from .test_search import CommonSearchTest
class BaseElasticsearchTest(unittest.TestCase):
@pytest.fixture(autouse=True)
- def _instantiate_search(self, elasticsearch_host):
+ def _instantiate_search(self, swh_search, elasticsearch_host):
self._elasticsearch_host = elasticsearch_host
- self.search = get_search("elasticsearch", {"hosts": [elasticsearch_host],})
-
- def setUp(self):
- self.reset()
+ self.search = swh_search
def reset(self):
self.search.deinitialize()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Dec 18, 2:10 AM (2 d, 5 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214612
Attached To
D3151: (fix ci) Fix search journal client tests
Event Timeline
Log In to Comment