diff --git a/requirements-swh.txt b/requirements-swh.txt index 84ad399..8d0328c 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. -swh.core[http] >= 0.2.0 +swh.core[http] >= 0.3.0 swh.journal >= 0.1.0 swh.model diff --git a/setup.py b/setup.py index 261ef13..c6fef58 100755 --- a/setup.py +++ b/setup.py @@ -1,71 +1,71 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import open from os import path from setuptools import find_packages, setup here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( name="swh.search", description="Software Heritage search service", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DSEA", packages=find_packages(), # packages's modules install_requires=parse_requirements() + parse_requirements("swh"), tests_require=parse_requirements("test"), entry_points=""" [swh.cli.subcommands] - search=swh.search.cli:cli + search=swh.search.cli """, setup_requires=["setuptools-scm"], use_scm_version=True, extras_require={"testing": parse_requirements("test")}, include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 3 - Alpha", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-search", "Documentation": "https://docs.softwareheritage.org/devel/swh-search/", }, ) diff --git a/swh/search/cli.py b/swh/search/cli.py index 1c05b57..533fde5 100644 --- a/swh/search/cli.py +++ b/swh/search/cli.py @@ -1,108 +1,108 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import click -from swh.core.cli import CONTEXT_SETTINGS +from swh.core.cli import CONTEXT_SETTINGS, swh as swh_cli_group -@click.group(name="search", context_settings=CONTEXT_SETTINGS) +@swh_cli_group.group(name="search", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context -def cli(ctx, config_file): +def search_cli_group(ctx, config_file): """Software Heritage Search tools.""" from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf -@cli.command("initialize") +@search_cli_group.command("initialize") @click.pass_context def initialize(ctx): """Creates Elasticsearch indices.""" from . import get_search search = get_search(**ctx.obj["config"]["search"]) search.initialize() print("Done.") -@cli.group("journal-client") +@search_cli_group.group("journal-client") @click.pass_context def journal_client(ctx): """""" pass @journal_client.command("objects") @click.option( "--stop-after-objects", "-m", default=None, type=int, help="Maximum number of objects to replay. Default is to run forever.", ) @click.pass_context def journal_client_objects(ctx, stop_after_objects): """Listens for new objects from the SWH Journal, and schedules tasks to run relevant indexers (currently, only origin) on these new objects.""" import functools from swh.journal.client import get_journal_client from . import get_search from .journal_client import process_journal_objects config = ctx.obj["config"] journal_cfg = config["journal"] client = get_journal_client( cls="kafka", object_types=["origin", "origin_visit"], stop_after_objects=stop_after_objects, **journal_cfg, ) search = get_search(**config["search"]) worker_fn = functools.partial(process_journal_objects, search=search,) nb_messages = 0 try: nb_messages = client.process(worker_fn) print("Processed %d messages." % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: client.close() -@cli.command("rpc-serve") +@search_cli_group.command("rpc-serve") @click.argument("config-path", required=True) @click.option("--host", default="0.0.0.0", help="Host to run the server") @click.option("--port", default=5010, type=click.INT, help="Binding port of the server") @click.option( "--debug/--nodebug", default=True, help="Indicates if the server should run in debug mode", ) def rpc_server(config_path, host, port, debug): """Starts a Software Heritage Indexer RPC HTTP server.""" from .api.server import app, load_and_check_config api_cfg = load_and_check_config(config_path, type="any") app.config.update(api_cfg) app.run(host, port=int(port), debug=bool(debug)) diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py index 308f8d7..eda2cc8 100644 --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -1,172 +1,173 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import tempfile from click.testing import CliRunner from confluent_kafka import Producer import pytest import yaml from swh.journal.serializers import value_to_kafka -from swh.search.cli import cli +from swh.search.cli import search_cli_group + CLI_CONFIG = """ search: cls: elasticsearch args: hosts: - '%(elasticsearch_host)s' """ JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ journal: brokers: - {broker} prefix: {prefix} group_id: {group_id} """ def invoke(catch_exceptions, args, config="", *, elasticsearch_host): runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: config_fd.write( (CLI_CONFIG + config) % {"elasticsearch_host": elasticsearch_host} ) config_fd.seek(0) - result = runner.invoke(cli, ["-C" + config_fd.name] + args) + result = runner.invoke(search_cli_group, ["-C" + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test__journal_client__origin( swh_search, elasticsearch_host: str, kafka_prefix: str, kafka_server ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test search origin producer", "acks": "all", } ) origin_foobar_baz = { "url": "http://foobar.baz", } value = value_to_kafka(origin_foobar_baz) topic = f"{kafka_prefix}.origin" producer.produce(topic=topic, key=b"bogus-origin", value=value) journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" ) result = invoke( False, ["journal-client", "objects", "--stop-after-objects", "1",], journal_objects_config, elasticsearch_host=elasticsearch_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output swh_search.flush() # searching origin without visit as requirement actual_page = swh_search.origin_search(url_pattern="foobar") # We find it assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar_baz] # It's an origin with no visit, searching for it with visit actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=True) # returns nothing assert actual_page.next_page_token is None assert actual_page.results == [] def test__journal_client__origin_visit( swh_search, elasticsearch_host, kafka_prefix: str, kafka_server ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" origin_foobar = {"url": "http://baz.foobar"} producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test search origin visit producer", "acks": "all", } ) topic = f"{kafka_prefix}.origin_visit" value = value_to_kafka({"origin": origin_foobar["url"]}) producer.produce(topic=topic, key=b"bogus-origin-visit", value=value) journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" ) result = invoke( False, ["journal-client", "objects", "--stop-after-objects", "1",], journal_objects_config, elasticsearch_host=elasticsearch_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output swh_search.flush() # Both search returns the visit actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=False) assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar] actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=True) assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar] def test__journal_client__missing_main_journal_config_key(elasticsearch_host): """Missing configuration on journal should raise""" with pytest.raises(KeyError, match="journal"): invoke( catch_exceptions=False, args=["journal-client", "objects", "--stop-after-objects", "1",], config="", # missing config will make it raise elasticsearch_host=elasticsearch_host, ) def test__journal_client__missing_journal_config_keys(elasticsearch_host): """Missing configuration on mandatory journal keys should raise""" journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker="192.0.2.1", prefix="swh.journal.objects", group_id="test-consumer" ) journal_config = yaml.safe_load(journal_objects_config) for key in journal_config["journal"].keys(): if key == "prefix": # optional continue cfg = copy.deepcopy(journal_config) del cfg["journal"][key] # make config incomplete yaml_cfg = yaml.dump(cfg) with pytest.raises(TypeError, match=f"{key}"): invoke( catch_exceptions=False, args=["journal-client", "objects", "--stop-after-objects", "1",], config=yaml_cfg, # incomplete config will make the cli raise elasticsearch_host=elasticsearch_host, )