diff --git a/setup.py b/setup.py index a66caa1..7d4fd32 100755 --- a/setup.py +++ b/setup.py @@ -1,69 +1,69 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, 'README.md'), encoding='utf-8') as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = 'requirements-%s.txt' % name else: reqf = 'requirements.txt' requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.search', description='Software Heritage search service', long_description=long_description, long_description_content_type='text/markdown', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DSEA', packages=find_packages(), # packages's modules install_requires=parse_requirements() + parse_requirements('swh'), tests_require=parse_requirements('test'), entry_points=''' [swh.cli.subcommands] - search=swh.search.cli:search + search=swh.search.cli:cli ''', setup_requires=['vcversioner'], extras_require={'testing': parse_requirements('test')}, vcversioner={}, include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 3 - Alpha", ], project_urls={ 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', 'Funding': 'https://www.softwareheritage.org/donate', 'Source': 'https://forge.softwareheritage.org/source/swh-search', }, ) diff --git a/swh/search/cli.py b/swh/search/cli.py index 9e9ab9f..9621d84 100644 --- a/swh/search/cli.py +++ b/swh/search/cli.py @@ -1,15 +1,77 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import functools + import click +from swh.core import config from swh.core.cli import CONTEXT_SETTINGS +from swh.journal.cli import get_journal_client + +from . import get_search +from .journal_client import process_journal_objects +from .api.server import load_and_check_config, app @click.group(name='search', context_settings=CONTEXT_SETTINGS) +@click.option('--config-file', '-C', default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.") @click.pass_context -def cli(ctx): +def cli(ctx, config_file): '''Software Heritage Search tools.''' + ctx.ensure_object(dict) + + conf = config.read(config_file) + ctx.obj['config'] = conf + + +@cli.group('journal-client') +@click.pass_context +def journal_client(ctx): + """""" pass + + +@journal_client.command('objects') +@click.option('--max-messages', '-m', default=None, type=int, + help='Maximum number of objects to replay. Default is to ' + 'run forever.') +@click.pass_context +def journal_client_objects(ctx, max_messages): + """Listens for new objects from the SWH Journal, and schedules tasks + to run relevant indexers (currently, only origin) + on these new objects.""" + client = get_journal_client(ctx, object_types=['origin']) + search = get_search(**ctx.obj['config']['search']) + + worker_fn = functools.partial( + process_journal_objects, + search=search, + ) + nb_messages = 0 + try: + while not max_messages or nb_messages < max_messages: + nb_messages += client.process(worker_fn) + print('Processed %d messages.' % nb_messages) + except KeyboardInterrupt: + ctx.exit(0) + else: + print('Done.') + + +@cli.command('rpc-serve') +@click.argument('config-path', required=1) +@click.option('--host', default='0.0.0.0', help="Host to run the server") +@click.option('--port', default=5010, type=click.INT, + help="Binding port of the server") +@click.option('--debug/--nodebug', default=True, + help="Indicates if the server should run in debug mode") +def rpc_server(config_path, host, port, debug): + """Starts a Software Heritage Indexer RPC HTTP server.""" + api_cfg = load_and_check_config(config_path, type='any') + app.config.update(api_cfg) + app.run(host, port=int(port), debug=bool(debug)) diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py new file mode 100644 index 0000000..8504ab5 --- /dev/null +++ b/swh/search/tests/test_cli.py @@ -0,0 +1,89 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import namedtuple +import tempfile +import unittest +from unittest.mock import patch, MagicMock + +from click.testing import CliRunner + +from swh.journal.serializers import value_to_kafka + +from swh.search.cli import cli +from swh.search.elasticsearch import ElasticSearch +from .test_elasticsearch import BaseElasticsearchTest + + +CLI_CONFIG = ''' +search: + cls: elasticsearch + args: + hosts: + - '{elasticsearch_host}' +''' + +JOURNAL_OBJECTS_CONFIG = ''' +journal: + brokers: + - 192.0.2.1 + prefix: swh.journal.objects + group_id: test-consumer +''' + + +def invoke(catch_exceptions, args, config='', *, elasticsearch_host): + runner = CliRunner() + with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: + config_fd.write((CLI_CONFIG + config).format( + elasticsearch_host=elasticsearch_host + )) + config_fd.seek(0) + result = runner.invoke(cli, ['-C' + config_fd.name] + args) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception + return result + + +class CliTestCase(BaseElasticsearchTest): + def test__journal_client__origin(self): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + mock_consumer = MagicMock() + + topic = 'swh.journal.objects.origin' + value = value_to_kafka({ + 'url': 'http://foobar.baz', + }) + message = MagicMock() + message.error.return_value = None + message.topic.return_value = topic + message.value.return_value = value + mock_consumer.poll.return_value = message + + with patch('swh.journal.client.Consumer', + return_value=mock_consumer): + result = invoke(False, [ + 'journal-client', 'objects', + '--max-messages', '1', + ], JOURNAL_OBJECTS_CONFIG, + elasticsearch_host=self._elasticsearch_host) + + mock_consumer.subscribe.assert_called_once_with(topics=[topic]) + mock_consumer.poll.assert_called_once_with(timeout=1.0) + mock_consumer.commit.assert_called_once_with() + + # Check the output + expected_output = ( + 'Processed 1 messages.\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + results = self.search.origin_search(url_substring='foobar') + assert results == {'cursor': None, 'results': [ + {'url': 'http://foobar.baz'}]} diff --git a/swh/search/tests/test_elasticsearch.py b/swh/search/tests/test_elasticsearch.py index 6ff2015..88ed9d9 100644 --- a/swh/search/tests/test_elasticsearch.py +++ b/swh/search/tests/test_elasticsearch.py @@ -1,24 +1,28 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import pytest from swh.search.elasticsearch import ElasticSearch from .test_search import CommonSearchTest -class TestElasticsearchSearch(CommonSearchTest, unittest.TestCase): +class BaseElasticsearchTest(unittest.TestCase): @pytest.fixture(autouse=True) def _instantiate_search(self, elasticsearch_host): + self._elasticsearch_host = elasticsearch_host self.search = ElasticSearch([elasticsearch_host]) def setUp(self): self.reset() def reset(self): self.search.deinitialize() self.search.initialize() + +class TestElasticsearchSearch(CommonSearchTest, BaseElasticsearchTest): + pass