diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. swh.core[http] -swh.journal >= 0.0.27 +swh.journal >= 0.0.31 swh.model diff --git a/swh/search/cli.py b/swh/search/cli.py --- a/swh/search/cli.py +++ b/swh/search/cli.py @@ -9,7 +9,7 @@ from swh.core import config from swh.core.cli import CONTEXT_SETTINGS -from swh.journal.cli import get_journal_client +from swh.journal.client import get_journal_client from . import get_search from .journal_client import process_journal_objects @@ -55,15 +55,34 @@ "-m", default=None, type=int, - help="Maximum number of objects to replay. Default is to " "run forever.", + help="Maximum number of objects to replay. Default is to run forever.", ) @click.pass_context def journal_client_objects(ctx, stop_after_objects): """Listens for new objects from the SWH Journal, and schedules tasks to run relevant indexers (currently, only origin) on these new objects.""" + config = ctx.obj["config"] + # Check configuration is fine + journal_cfg = config.get("journal") + if not journal_cfg: + raise ValueError("Configuration is missing the 'journal' key configuration") + missing_keys = [] + mandatory_keys = ["brokers", "prefix", "group_id"] + for key in mandatory_keys: + if key not in journal_cfg: + missing_keys.append(key) + + if missing_keys: + raise ValueError( + f"Configuration is missing the following keys: {','.join(missing_keys)}" + ) + client = get_journal_client( - ctx, + cls="kafka", + brokers=journal_cfg["brokers"], + prefix=journal_cfg["prefix"], + group_id=journal_cfg["group_id"], object_types=["origin", "origin_visit"], stop_after_objects=stop_after_objects, ) diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -3,9 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import copy import tempfile -from unittest.mock import patch, MagicMock +import yaml + +import pytest +from unittest.mock import patch, MagicMock from click.testing import CliRunner from swh.journal.serializers import value_to_kafka @@ -123,3 +127,36 @@ "next_page_token": None, "results": [{"url": "http://foobar.baz"}], } + + def test__journal_client__missing_main_journal_config_key(self): + """Missing configuration on journal should raise""" + with pytest.raises( + ValueError, match="Configuration is missing the 'journal' key" + ): + invoke( + catch_exceptions=False, + args=["journal-client", "objects", "--stop-after-objects", "1",], + config="", # missing config will make it raise + elasticsearch_host=self._elasticsearch_host, + ) + + def test__journal_client__missing_journal_config_keys(self): + """Missing configuration on journal should raise""" + journal_config = yaml.safe_load( + JOURNAL_OBJECTS_CONFIG + ) # default configuration which is fine + + for key in journal_config["journal"].keys(): + cfg = copy.deepcopy(journal_config) + del cfg["journal"][key] # make config incomplete + yaml_cfg = yaml.dump(cfg) + + with pytest.raises( + ValueError, match=f"Configuration is missing the following keys: {key}" + ): + invoke( + catch_exceptions=False, + args=["journal-client", "objects", "--stop-after-objects", "1",], + config=yaml_cfg, # incomplete config will make the cli raise + elasticsearch_host=self._elasticsearch_host, + )