Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124778
D3151.id11182.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Subscribers
None
D3151.id11182.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -5,6 +5,9 @@
# 3rd party libraries without stubs (yet)
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
[mypy-elasticsearch.*]
ignore_missing_imports = True
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,4 +1,4 @@
# Add here internal Software Heritage dependencies, one per line.
swh.core[http]
-swh.journal >= 0.0.31
+swh.journal >= 0.1.0
swh.model
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1 +1,2 @@
pytest
+confluent-kafka
diff --git a/swh/search/tests/conftest.py b/swh/search/tests/conftest.py
--- a/swh/search/tests/conftest.py
+++ b/swh/search/tests/conftest.py
@@ -1,8 +1,9 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import logging
import socket
import subprocess
import time
@@ -10,6 +11,11 @@
import elasticsearch
import pytest
+from swh.search import get_search
+
+
+logger = logging.getLogger(__name__)
+
def free_port():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -109,3 +115,16 @@
@pytest.fixture(scope="class")
def elasticsearch_host(elasticsearch_session):
yield elasticsearch_session
+
+
+@pytest.fixture
+def swh_search(elasticsearch_host):
+ """Instantiate a search client, initialize the elasticsearch instance,
+ and returns it
+
+ """
+ logger.debug("swh_search: elasticsearch_host: %s", elasticsearch_host)
+ search = get_search("elasticsearch", {"hosts": [elasticsearch_host],})
+ search.initialize()
+ yield search
+ search.deinitialize()
diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py
--- a/swh/search/tests/test_cli.py
+++ b/swh/search/tests/test_cli.py
@@ -9,14 +9,12 @@
import pytest
-from unittest.mock import patch, MagicMock
+from confluent_kafka import Producer
from click.testing import CliRunner
from swh.journal.serializers import value_to_kafka
-from swh.journal.tests.utils import MockedKafkaConsumer
from swh.search.cli import cli
-from .test_elasticsearch import BaseElasticsearchTest
CLI_CONFIG = """
@@ -27,21 +25,17 @@
- '{elasticsearch_host}'
"""
-JOURNAL_OBJECTS_CONFIG = """
+JOURNAL_OBJECTS_CONFIG_TEMPLATE = """
journal:
brokers:
- - 192.0.2.1
- prefix: swh.journal.objects
- group_id: test-consumer
+ - {broker}
+ prefix: {prefix}
+ group_id: {group_id}
"""
-
-class MockedKafkaConsumerWithTopics(MockedKafkaConsumer):
- def list_topics(self, timeout=None):
- return {
- "swh.journal.objects.origin",
- "swh.journal.objects.origin_visit",
- }
+JOURNAL_OBJECTS_CONFIG = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker="192.0.2.1", prefix="swh.journal.objects", group_id="test-consumer"
+)
def invoke(catch_exceptions, args, config="", *, elasticsearch_host):
@@ -58,103 +52,125 @@
return result
-class CliTestCase(BaseElasticsearchTest):
- def test__journal_client__origin(self):
- """Tests the re-indexing when origin_batch_size*task_batch_size is a
- divisor of nb_origins."""
- topic = "swh.journal.objects.origin"
- value = value_to_kafka({"url": "http://foobar.baz",})
- message = MagicMock()
- message.error.return_value = None
- message.topic.return_value = topic
- message.value.return_value = value
-
- mock_consumer = MockedKafkaConsumerWithTopics([message])
-
- with patch("swh.journal.client.Consumer", return_value=mock_consumer):
- result = invoke(
- False,
- ["journal-client", "objects", "--stop-after-objects", "1",],
- JOURNAL_OBJECTS_CONFIG,
- elasticsearch_host=self._elasticsearch_host,
- )
-
- # Check the output
- expected_output = "Processed 1 messages.\nDone.\n"
- assert result.exit_code == 0, result.output
- assert result.output == expected_output
-
- self.search.flush()
-
- results = self.search.origin_search(url_pattern="foobar")
- assert results == {
- "next_page_token": None,
- "results": [{"url": "http://foobar.baz"}],
+def test__journal_client__origin(
+ swh_search, elasticsearch_host: str, kafka_prefix: str, kafka_server
+):
+ """Tests the re-indexing when origin_batch_size*task_batch_size is a
+ divisor of nb_origins."""
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test search origin producer",
+ "acks": "all",
}
+ )
+ value = value_to_kafka({"url": "http://foobar.baz",})
+ topic = f"{kafka_prefix}.origin"
+ producer.produce(topic=topic, key=b"bogus-origin", value=value)
+
+ journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer"
+ )
+ result = invoke(
+ False,
+ ["journal-client", "objects", "--stop-after-objects", "1",],
+ journal_objects_config,
+ elasticsearch_host=elasticsearch_host,
+ )
+
+ # Check the output
+ expected_output = "Processed 1 messages.\nDone.\n"
+ assert result.exit_code == 0, result.output
+ assert result.output == expected_output
+
+ swh_search.flush()
+
+ # searching origin without visit as requirement
+ results = swh_search.origin_search(url_pattern="foobar")
+ # We find it
+ assert results == {
+ "next_page_token": None,
+ "results": [{"url": "http://foobar.baz"}],
+ }
+
+ # It's an origin with no visit, searching for it with visit
+ results = swh_search.origin_search(url_pattern="foobar", with_visit=True)
+ # returns nothing
+ assert results == {"next_page_token": None, "results": []}
+
+
+def test__journal_client__origin_visit(
+ swh_search, elasticsearch_host, kafka_prefix: str, kafka_server
+):
+ """Tests the re-indexing when origin_batch_size*task_batch_size is a
+ divisor of nb_origins."""
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test search origin visit producer",
+ "acks": "all",
+ }
+ )
+ topic = f"{kafka_prefix}.origin_visit"
+ value = value_to_kafka({"origin": "http://baz.foobar",})
+ producer.produce(topic=topic, key=b"bogus-origin-visit", value=value)
+
+ journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer"
+ )
+ result = invoke(
+ False,
+ ["journal-client", "objects", "--stop-after-objects", "1",],
+ journal_objects_config,
+ elasticsearch_host=elasticsearch_host,
+ )
+
+ # Check the output
+ expected_output = "Processed 1 messages.\nDone.\n"
+ assert result.exit_code == 0, result.output
+ assert result.output == expected_output
+
+ swh_search.flush()
+
+ expected_result = {
+ "next_page_token": None,
+ "results": [{"url": "http://baz.foobar"}],
+ }
+ # Both search returns the visit
+ results = swh_search.origin_search(url_pattern="foobar", with_visit=False)
+ assert results == expected_result
+ results = swh_search.origin_search(url_pattern="foobar", with_visit=True)
+ assert results == expected_result
+
+
+def test__journal_client__missing_main_journal_config_key(elasticsearch_host):
+ """Missing configuration on journal should raise"""
+ with pytest.raises(KeyError, match="journal"):
+ invoke(
+ catch_exceptions=False,
+ args=["journal-client", "objects", "--stop-after-objects", "1",],
+ config="", # missing config will make it raise
+ elasticsearch_host=elasticsearch_host,
+ )
- results = self.search.origin_search(url_pattern="foobar", with_visit=True)
- assert results == {"next_page_token": None, "results": []}
-
- def test__journal_client__origin_visit(self):
- """Tests the re-indexing when origin_batch_size*task_batch_size is a
- divisor of nb_origins."""
- topic = "swh.journal.objects.origin_visit"
- value = value_to_kafka({"origin": "http://foobar.baz",})
- message = MagicMock()
- message.error.return_value = None
- message.topic.return_value = topic
- message.value.return_value = value
-
- mock_consumer = MockedKafkaConsumerWithTopics([message])
-
- with patch("swh.journal.client.Consumer", return_value=mock_consumer):
- result = invoke(
- False,
- ["journal-client", "objects", "--stop-after-objects", "1",],
- JOURNAL_OBJECTS_CONFIG,
- elasticsearch_host=self._elasticsearch_host,
- )
-
- # Check the output
- expected_output = "Processed 1 messages.\nDone.\n"
- assert result.exit_code == 0, result.output
- assert result.output == expected_output
- self.search.flush()
+def test__journal_client__missing_journal_config_keys(elasticsearch_host):
+ """Missing configuration on mandatory journal keys should raise"""
+ journal_config = yaml.safe_load(
+ JOURNAL_OBJECTS_CONFIG
+ ) # default configuration which is fine
- results = self.search.origin_search(url_pattern="foobar", with_visit=True)
- assert results == {
- "next_page_token": None,
- "results": [{"url": "http://foobar.baz"}],
- }
+ for key in journal_config["journal"].keys():
+ if key == "prefix": # optional
+ continue
+ cfg = copy.deepcopy(journal_config)
+ del cfg["journal"][key] # make config incomplete
+ yaml_cfg = yaml.dump(cfg)
- def test__journal_client__missing_main_journal_config_key(self):
- """Missing configuration on journal should raise"""
- with pytest.raises(KeyError, match="journal"):
+ with pytest.raises(TypeError, match=f"{key}"):
invoke(
catch_exceptions=False,
args=["journal-client", "objects", "--stop-after-objects", "1",],
- config="", # missing config will make it raise
- elasticsearch_host=self._elasticsearch_host,
+ config=yaml_cfg, # incomplete config will make the cli raise
+ elasticsearch_host=elasticsearch_host,
)
-
- def test__journal_client__missing_journal_config_keys(self):
- """Missing configuration on mandatory journal keys should raise"""
- journal_config = yaml.safe_load(
- JOURNAL_OBJECTS_CONFIG
- ) # default configuration which is fine
-
- for key in journal_config["journal"].keys():
- if key == "prefix": # optional
- continue
- cfg = copy.deepcopy(journal_config)
- del cfg["journal"][key] # make config incomplete
- yaml_cfg = yaml.dump(cfg)
-
- with pytest.raises(TypeError, match=f"{key}"):
- invoke(
- catch_exceptions=False,
- args=["journal-client", "objects", "--stop-after-objects", "1",],
- config=yaml_cfg, # incomplete config will make the cli raise
- elasticsearch_host=self._elasticsearch_host,
- )
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 21 2024, 6:56 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222041
Attached To
D3151: (fix ci) Fix search journal client tests
Event Timeline
Log In to Comment