diff --git a/requirements-swh.txt b/requirements-swh.txt index de4709b..9c86f71 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. swh.core[http] -swh.journal >= 0.0.27 +swh.journal >= 0.0.31 swh.model diff --git a/swh/search/cli.py b/swh/search/cli.py index de60702..e2edfa5 100644 --- a/swh/search/cli.py +++ b/swh/search/cli.py @@ -1,98 +1,102 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import click from swh.core import config from swh.core.cli import CONTEXT_SETTINGS -from swh.journal.cli import get_journal_client +from swh.journal.client import get_journal_client from . import get_search from .journal_client import process_journal_objects from .api.server import load_and_check_config, app @click.group(name="search", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def cli(ctx, config_file): """Software Heritage Search tools.""" ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf @cli.command("initialize") @click.pass_context def initialize(ctx): """Creates Elasticsearch indices.""" search = get_search(**ctx.obj["config"]["search"]) search.initialize() print("Done.") @cli.group("journal-client") @click.pass_context def journal_client(ctx): """""" pass @journal_client.command("objects") @click.option( "--stop-after-objects", "-m", default=None, type=int, - help="Maximum number of objects to replay. Default is to " "run forever.", + help="Maximum number of objects to replay. Default is to run forever.", ) @click.pass_context def journal_client_objects(ctx, stop_after_objects): """Listens for new objects from the SWH Journal, and schedules tasks to run relevant indexers (currently, only origin) on these new objects.""" + config = ctx.obj["config"] + journal_cfg = config["journal"] + client = get_journal_client( - ctx, + cls="kafka", object_types=["origin", "origin_visit"], stop_after_objects=stop_after_objects, + **journal_cfg, ) - search = get_search(**ctx.obj["config"]["search"]) + search = get_search(**config["search"]) worker_fn = functools.partial(process_journal_objects, search=search,) nb_messages = 0 try: nb_messages = client.process(worker_fn) print("Processed %d messages." % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: client.close() @cli.command("rpc-serve") @click.argument("config-path", required=True) @click.option("--host", default="0.0.0.0", help="Host to run the server") @click.option("--port", default=5010, type=click.INT, help="Binding port of the server") @click.option( "--debug/--nodebug", default=True, help="Indicates if the server should run in debug mode", ) def rpc_server(config_path, host, port, debug): """Starts a Software Heritage Indexer RPC HTTP server.""" api_cfg = load_and_check_config(config_path, type="any") app.config.update(api_cfg) app.run(host, port=int(port), debug=bool(debug)) diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py index 7e32c1c..b3a409e 100644 --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -1,125 +1,160 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import copy import tempfile -from unittest.mock import patch, MagicMock +import yaml + +import pytest +from unittest.mock import patch, MagicMock from click.testing import CliRunner from swh.journal.serializers import value_to_kafka from swh.journal.tests.utils import MockedKafkaConsumer from swh.search.cli import cli from .test_elasticsearch import BaseElasticsearchTest CLI_CONFIG = """ search: cls: elasticsearch args: hosts: - '{elasticsearch_host}' """ JOURNAL_OBJECTS_CONFIG = """ journal: brokers: - 192.0.2.1 prefix: swh.journal.objects group_id: test-consumer """ class MockedKafkaConsumerWithTopics(MockedKafkaConsumer): def list_topics(self, timeout=None): return { "swh.journal.objects.origin", "swh.journal.objects.origin_visit", } def invoke(catch_exceptions, args, config="", *, elasticsearch_host): runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: config_fd.write( (CLI_CONFIG + config).format(elasticsearch_host=elasticsearch_host) ) config_fd.seek(0) result = runner.invoke(cli, ["-C" + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result class CliTestCase(BaseElasticsearchTest): def test__journal_client__origin(self): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" topic = "swh.journal.objects.origin" value = value_to_kafka({"url": "http://foobar.baz",}) message = MagicMock() message.error.return_value = None message.topic.return_value = topic message.value.return_value = value mock_consumer = MockedKafkaConsumerWithTopics([message]) with patch("swh.journal.client.Consumer", return_value=mock_consumer): result = invoke( False, ["journal-client", "objects", "--stop-after-objects", "1",], JOURNAL_OBJECTS_CONFIG, elasticsearch_host=self._elasticsearch_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output self.search.flush() results = self.search.origin_search(url_pattern="foobar") assert results == { "next_page_token": None, "results": [{"url": "http://foobar.baz"}], } results = self.search.origin_search(url_pattern="foobar", with_visit=True) assert results == {"next_page_token": None, "results": []} def test__journal_client__origin_visit(self): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" topic = "swh.journal.objects.origin_visit" value = value_to_kafka({"origin": "http://foobar.baz",}) message = MagicMock() message.error.return_value = None message.topic.return_value = topic message.value.return_value = value mock_consumer = MockedKafkaConsumerWithTopics([message]) with patch("swh.journal.client.Consumer", return_value=mock_consumer): result = invoke( False, ["journal-client", "objects", "--stop-after-objects", "1",], JOURNAL_OBJECTS_CONFIG, elasticsearch_host=self._elasticsearch_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output self.search.flush() results = self.search.origin_search(url_pattern="foobar", with_visit=True) assert results == { "next_page_token": None, "results": [{"url": "http://foobar.baz"}], } + + def test__journal_client__missing_main_journal_config_key(self): + """Missing configuration on journal should raise""" + with pytest.raises(KeyError, match="journal"): + invoke( + catch_exceptions=False, + args=["journal-client", "objects", "--stop-after-objects", "1",], + config="", # missing config will make it raise + elasticsearch_host=self._elasticsearch_host, + ) + + def test__journal_client__missing_journal_config_keys(self): + """Missing configuration on mandatory journal keys should raise""" + journal_config = yaml.safe_load( + JOURNAL_OBJECTS_CONFIG + ) # default configuration which is fine + + for key in journal_config["journal"].keys(): + if key == "prefix": # optional + continue + cfg = copy.deepcopy(journal_config) + del cfg["journal"][key] # make config incomplete + yaml_cfg = yaml.dump(cfg) + + with pytest.raises(TypeError, match=f"{key}"): + invoke( + catch_exceptions=False, + args=["journal-client", "objects", "--stop-after-objects", "1",], + config=yaml_cfg, # incomplete config will make the cli raise + elasticsearch_host=self._elasticsearch_host, + )