diff --git a/swh/search/__init__.py b/swh/search/__init__.py index e69de29..b85d8f0 100644 --- a/swh/search/__init__.py +++ b/swh/search/__init__.py @@ -0,0 +1,26 @@ +def get_search(cls, args): + """Get an search object of class `search_class` with + arguments `search_args`. + + Args: + cls (str): search's class, either 'local' or 'remote' + args (dict): dictionary of arguments passed to the + search class constructor + + Returns: + an instance of swh.search's classes (either local or remote) + + Raises: + ValueError if passed an unknown search class. + + """ + if cls == 'remote': + from .api.client import RemoteSearch as Search + elif cls == 'elasticsearch': + from .elasticsearch import ElasticSearch as Search + else: + raise ValueError('Unknown indexer search class `%s`' % cls) + + return Search(**args) + + diff --git a/swh/search/__init__.py b/swh/search/api/__init.py similarity index 100% copy from swh/search/__init__.py copy to swh/search/api/__init.py diff --git a/swh/search/api/client.py b/swh/search/api/client.py new file mode 100644 index 0000000..71d1490 --- /dev/null +++ b/swh/search/api/client.py @@ -0,0 +1,15 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.core.api import RPCClient + +from ..elasticsearch import ElasticSearch + + +class RemoteSearch(RPCClient): + """Proxy to a remote search API""" + + backend_class = ElasticSearch + diff --git a/swh/search/api/server.py b/swh/search/api/server.py new file mode 100644 index 0000000..f014699 --- /dev/null +++ b/swh/search/api/server.py @@ -0,0 +1,65 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.core.api import (RPCServerApp, error_handler, + encode_data_server as encode_data) + +from .. import get_search +from ..elasticsearch import ElasticSearch + +def _get_search(): + global search + if not search: + search = get_search(**app.config['search']) + print('got search', repr(search.__class__)) + + return search + + +app = RPCServerApp(__name__, + backend_class=ElasticSearch, + backend_factory=_get_search) + +search = None + + +@app.errorhandler(Exception) +def my_error_handler(exception): + return error_handler(exception, encode_data) + + +@app.route('/') +def index(): + return 'SWH Search API server' + + +def load_and_check_config(config_file, type='elasticsearch'): + """Check the minimal configuration is set to run the api or raise an + error explanation. + + Args: + config_file (str): Path to the configuration file to load + type (str): configuration type. For 'local' type, more + checks are done. + + Raises: + Error if the setup is not as expected + + Returns: + configuration as a dict + + """ + if not config_file: + raise EnvironmentError('Configuration file must be defined') + + if not os.path.exists(config_file): + raise FileNotFoundError('Configuration file %s does not exist' % ( + config_file, )) + + cfg = config.read(config_file) + if 'search' not in cfg: + raise KeyError("Missing 'search' configuration") + + return cfg diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py index 9706006..720a6b1 100644 --- a/swh/search/elasticsearch.py +++ b/swh/search/elasticsearch.py @@ -1,191 +1,193 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 from typing import Iterable, Dict, List, Iterator from elasticsearch import Elasticsearch from elasticsearch.client import IndicesClient from elasticsearch.helpers import bulk, scan import msgpack +from swh.core.api import remote_api_endpoint from swh.model import model from swh.model.identifiers import origin_identifier def _sanitize_origin(origin): origin = origin.copy() res = { 'url': origin.pop('url') } for field_name in ('type', 'intrinsic_metadata'): if field_name in origin: res[field_name] = origin.pop(field_name) return res class ElasticSearch: def __init__(self, hosts: List[str]): self._backend = Elasticsearch(hosts=hosts) def check(self): self._backend.ping() def deinitialize(self) -> None: self._backend.indices.delete(index='*') def initialize(self) -> None: self._backend.indices.create( index='origin', body={ 'mappings': { 'properties': { 'url': { 'type': 'text', # TODO: consider removing fielddata when # swh-storage allows querying by hash, so the # full URL does not have to be stored in ES' # memory. See: # https://www.elastic.co/guide/en/elasticsearch/reference/current/fielddata.html#before-enabling-fielddata 'fielddata': True, 'analyzer': 'simple', 'fields': { 'as_you_type': { 'type': 'search_as_you_type', 'analyzer': 'simple', } } }, 'intrinsic_metadata': { 'type': 'nested', 'properties': { '@context': { # don't bother indexing substrings 'type': 'keyword', } }, }, } } } ) - + @remote_api_endpoint('origin/update') def origin_update(self, documents: Iterable[dict]) -> None: documents = map(_sanitize_origin, documents) actions = [ { '_op_type': 'update', '_id': origin_identifier(document), '_index': 'origin', 'doc': document, 'doc_as_upsert': True, } for document in documents ] res = bulk(self._backend, actions, index='origin', refresh='wait_for') def origin_dump(self) -> Iterator[model.Origin]: results = list(scan(self._backend, index='*')) for hit in results: yield self._backend.termvectors( index='origin', id=hit['_id'], fields=['*']) + @remote_api_endpoint('origin/search') def origin_search( self, *, url_substring: str = None, metadata_substring: str = None, cursor: str = None, count: int = 50 ) -> Dict[str, object]: """Searches for origins matching the `url_substring`. Args: url_substring (str): Part of thr URL to search for cursor (str): `cursor` is opaque value used for pagination. count (int): number of results to return. Returns: a dictionary with keys: * `cursor`: opaque value used for fetching more results. `None` if there are no more result. * `results`: list of dictionaries with key: * `url`: URL of a matching origin """ query_clauses = [] if url_substring: query_clauses.append({ 'multi_match': { 'query': url_substring, 'type': 'bool_prefix', 'fields': [ 'url.as_you_type', 'url.as_you_type._2gram', 'url.as_you_type._3gram', ] } }) if metadata_substring: query_clauses.append({ 'nested': { 'path': 'intrinsic_metadata', 'query': { 'multi_match': { 'query': metadata_substring, 'fields': ['intrinsic_metadata.*'] } }, } }) if not query_clauses: raise ValueError( 'At least one of url_substring and metadata_substring ' 'must be provided.') body = { 'query': { 'bool': { 'should': query_clauses, } }, 'size': count, 'sort': [ {'_score': 'desc'}, {'_id': 'asc'}, ] } if cursor: cursor = msgpack.loads(base64.b64decode(cursor)) body['search_after'] = [cursor[b'score'], cursor[b'id']] res = self._backend.search( index='origin', body=body, size=count, ) hits = res['hits']['hits'] if len(hits) == count: last_hit = hits[-1] next_cursor = { b'score': last_hit['_score'], b'id': last_hit['_id'], } next_cursor = base64.b64encode(msgpack.dumps(next_cursor)) else: next_cursor = None return { 'cursor': next_cursor, 'results': [ {'url': hit['_source']['url']} for hit in hits ] } diff --git a/swh/search/tests/test_api_client.py b/swh/search/tests/test_api_client.py new file mode 100644 index 0000000..8d232b0 --- /dev/null +++ b/swh/search/tests/test_api_client.py @@ -0,0 +1,40 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest + +import pytest + +from swh.core.api.tests.server_testing import ServerTestFixture + +from swh.search.elasticsearch import ElasticSearch +from swh.search.api.server import app +from swh.search.api.client import RemoteSearch +from .test_search import CommonSearchTest + + +class TestRemoteSearch(CommonSearchTest, ServerTestFixture, unittest.TestCase): + @pytest.fixture(autouse=True) + def _instantiate_search(self, elasticsearch_host): + self._elasticsearch_host = elasticsearch_host + + def setUp(self): + self.config = { + 'search': { + 'cls': 'elasticsearch', + 'args': { + 'hosts': [self._elasticsearch_host], + } + } + } + self.app = app + super().setUp() + self.reset() + self.search = RemoteSearch(self.url()) + + def reset(self): + search = ElasticSearch([self._elasticsearch_host]) + search.deinitialize() + search.initialize() diff --git a/swh/search/utils.py b/swh/search/utils.py index 811c9e7..8b0eca8 100644 --- a/swh/search/utils.py +++ b/swh/search/utils.py @@ -1,10 +1,15 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + def stream_results(f, *args, **kwargs): if 'cursor' in kwargs: raise ArgumentError('stream_results has no argument "cursor".') cursor = None while True: results = f(*args, cursor=cursor, **kwargs) yield from results['results'] cursor = results['cursor'] if cursor is None: break