diff --git a/swh/search/cli.py b/swh/search/cli.py index 79f303b..79bd031 100644 --- a/swh/search/cli.py +++ b/swh/search/cli.py @@ -1,86 +1,86 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import click from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.journal.cli import get_journal_client from . import get_search from .journal_client import process_journal_objects from .api.server import load_and_check_config, app @click.group(name='search', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.pass_context def cli(ctx, config_file): '''Software Heritage Search tools.''' ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj['config'] = conf @cli.command('initialize') @click.pass_context def initialize(ctx): """Creates Elasticsearch indices.""" search = get_search(**ctx.obj['config']['search']) search.initialize() print('Done.') @cli.group('journal-client') @click.pass_context def journal_client(ctx): """""" pass @journal_client.command('objects') @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.pass_context def journal_client_objects(ctx, max_messages): """Listens for new objects from the SWH Journal, and schedules tasks to run relevant indexers (currently, only origin) on these new objects.""" client = get_journal_client(ctx, object_types=['origin']) search = get_search(**ctx.obj['config']['search']) worker_fn = functools.partial( process_journal_objects, search=search, ) nb_messages = 0 try: while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) print('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') @cli.command('rpc-serve') -@click.argument('config-path', required=1) +@click.argument('config-path', required=True) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5010, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def rpc_server(config_path, host, port, debug): """Starts a Software Heritage Indexer RPC HTTP server.""" api_cfg = load_and_check_config(config_path, type='any') app.config.update(api_cfg) app.run(host, port=int(port), debug=bool(debug)) diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py index 328c200..bde055e 100644 --- a/swh/search/elasticsearch.py +++ b/swh/search/elasticsearch.py @@ -1,220 +1,222 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 -from typing import Iterable, Dict, List, Iterator +from typing import Any, Iterable, Dict, List, Iterator, Optional from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk, scan import msgpack from swh.core.api import remote_api_endpoint from swh.model import model from swh.model.identifiers import origin_identifier def _sanitize_origin(origin): origin = origin.copy() res = { 'url': origin.pop('url') } for field_name in ('intrinsic_metadata', 'has_visits'): if field_name in origin: res[field_name] = origin.pop(field_name) return res class ElasticSearch: def __init__(self, hosts: List[str]): self._backend = Elasticsearch(hosts=hosts) @remote_api_endpoint('check') def check(self): return self._backend.ping() def deinitialize(self) -> None: """Removes all indices from the Elasticsearch backend""" self._backend.indices.delete(index='*') def initialize(self) -> None: """Declare Elasticsearch indices and mappings""" self._backend.indices.create( index='origin', body={ 'mappings': { 'properties': { 'url': { 'type': 'text', # TODO: consider removing fielddata when # swh-storage allows querying by hash, so the # full URL does not have to be stored in ES' # memory. See: # https://www.elastic.co/guide/en/elasticsearch/reference/current/fielddata.html#before-enabling-fielddata 'fielddata': True, # To split URLs into token on any character # that is not alphanumerical 'analyzer': 'simple', 'fields': { 'as_you_type': { 'type': 'search_as_you_type', 'analyzer': 'simple', } } }, 'has_visits': { 'type': 'boolean', }, 'intrinsic_metadata': { 'type': 'nested', 'properties': { '@context': { # don't bother indexing tokens 'type': 'keyword', } }, }, } } } ) @remote_api_endpoint('origin/update') def origin_update(self, documents: Iterable[dict]) -> None: documents = map(_sanitize_origin, documents) actions = [ { '_op_type': 'update', '_id': origin_identifier(document), '_index': 'origin', 'doc': document, 'doc_as_upsert': True, } for document in documents ] # TODO: make refresh='wait_for' configurable (we don't need it # in production, it will probably be a performance issue) bulk(self._backend, actions, index='origin', refresh='wait_for') def origin_dump(self) -> Iterator[model.Origin]: """Returns all content in Elasticsearch's index. Not exposed publicly; but useful for tests.""" results = scan(self._backend, index='*') for hit in results: yield self._backend.termvectors( index='origin', id=hit['_id'], fields=['*']) @remote_api_endpoint('origin/search') def origin_search( self, *, url_pattern: str = None, metadata_pattern: str = None, with_visit: bool = False, scroll_token: str = None, count: int = 50 ) -> Dict[str, object]: """Searches for origins matching the `url_pattern`. Args: url_pattern (str): Part of thr URL to search for with_visit (bool): Whether origins with no visit are to be filtered out scroll_token (str): Opaque value used for pagination. count (int): number of results to return. Returns: a dictionary with keys: * `scroll_token`: opaque value used for fetching more results. `None` if there are no more result. * `results`: list of dictionaries with key: * `url`: URL of a matching origin """ - query_clauses = [] + query_clauses = [] # type: List[Dict[str, Any]] if url_pattern: query_clauses.append({ 'multi_match': { 'query': url_pattern, 'type': 'bool_prefix', 'fields': [ 'url.as_you_type', 'url.as_you_type._2gram', 'url.as_you_type._3gram', ] } }) if metadata_pattern: query_clauses.append({ 'nested': { 'path': 'intrinsic_metadata', 'query': { 'multi_match': { 'query': metadata_pattern, 'fields': ['intrinsic_metadata.*'] } }, } }) if not query_clauses: raise ValueError( 'At least one of url_pattern and metadata_pattern ' 'must be provided.') if with_visit: query_clauses.append({ 'term': { 'has_visits': True, } }) body = { 'query': { 'bool': { 'must': query_clauses, } }, 'size': count, 'sort': [ {'_score': 'desc'}, {'_id': 'asc'}, ] } if scroll_token: # TODO: use ElasticSearch's scroll API? - scroll_token = msgpack.loads(base64.b64decode(scroll_token)) + scroll_token_content = msgpack.loads( + base64.b64decode(scroll_token)) body['search_after'] = \ - [scroll_token[b'score'], scroll_token[b'id'].decode('ascii')] + [scroll_token_content[b'score'], + scroll_token_content[b'id'].decode('ascii')] res = self._backend.search( index='origin', body=body, size=count, ) hits = res['hits']['hits'] if len(hits) == count: last_hit = hits[-1] - next_scroll_token = { + next_scroll_token_content = { b'score': last_hit['_score'], b'id': last_hit['_id'], } next_scroll_token = base64.b64encode(msgpack.dumps( - next_scroll_token)) + next_scroll_token_content)) # type: Optional[bytes] else: next_scroll_token = None return { 'scroll_token': next_scroll_token, 'results': [ { # TODO: also add 'id'? 'url': hit['_source']['url'], } for hit in hits ] } diff --git a/swh/search/in_memory.py b/swh/search/in_memory.py index 0b3418d..421729c 100644 --- a/swh/search/in_memory.py +++ b/swh/search/in_memory.py @@ -1,122 +1,123 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 from collections import defaultdict import itertools import re -from typing import Iterable, Dict +from typing import Any, Dict, Iterable, Iterator, List, Optional import msgpack from swh.core.api import remote_api_endpoint from swh.model.identifiers import origin_identifier def _sanitize_origin(origin): origin = origin.copy() res = { 'url': origin.pop('url') } for field_name in ('type', 'intrinsic_metadata'): if field_name in origin: res[field_name] = origin.pop(field_name) return res class InMemorySearch: def __init__(self): pass @remote_api_endpoint('check') def check(self): return True def deinitialize(self) -> None: if hasattr(self, '_origins'): del self._origins del self._origin_ids def initialize(self) -> None: - self._origins = defaultdict(dict) - self._origin_ids = [] + self._origins = defaultdict(dict) # type: Dict[str, Dict[str, Any]] + self._origin_ids = [] # type: List[str] _url_splitter = re.compile(r'\W') @remote_api_endpoint('origin/update') def origin_update(self, documents: Iterable[dict]) -> None: for document in documents: document = document.copy() id_ = origin_identifier(document) if 'url' in document: document['_url_tokens'] = \ set(self._url_splitter.split(document['url'])) self._origins[id_].update(document) if id_ not in self._origin_ids: self._origin_ids.append(id_) @remote_api_endpoint('origin/search') def origin_search( self, *, url_pattern: str = None, metadata_pattern: str = None, with_visit: bool = False, scroll_token: str = None, count: int = 50 ) -> Dict[str, object]: - matches = (self._origins[id_] for id_ in self._origin_ids) + matches = (self._origins[id_] for id_ in self._origin_ids) # type: Iterator[Dict[str, Any]] if url_pattern: tokens = set(self._url_splitter.split(url_pattern)) def predicate(match): missing_tokens = tokens - match['_url_tokens'] if len(missing_tokens) == 0: return True elif len(missing_tokens) > 1: return False else: # There is one missing token, look up by prefix. (missing_token,) = missing_tokens return any(token.startswith(missing_token) for token in match['_url_tokens']) matches = filter(predicate, matches) if metadata_pattern: raise NotImplementedError( 'Metadata search is not implemented in the in-memory backend.') if not url_pattern and not metadata_pattern: raise ValueError( 'At least one of url_pattern and metadata_pattern ' 'must be provided.') if with_visit: matches = filter(lambda o: o.get('has_visits'), matches) if scroll_token: - scroll_token = msgpack.loads(base64.b64decode(scroll_token)) - start_at_index = scroll_token[b'start_at_index'] + scroll_token_content = msgpack.loads( + base64.b64decode(scroll_token)) + start_at_index = scroll_token_content[b'start_at_index'] else: start_at_index = 0 hits = list(itertools.islice( matches, start_at_index, start_at_index+count)) if len(hits) == count: - next_scroll_token = { + next_scroll_token_content = { b'start_at_index': start_at_index+count, } next_scroll_token = base64.b64encode(msgpack.dumps( - next_scroll_token)) + next_scroll_token_content)) # type: Optional[bytes] else: next_scroll_token = None return { 'scroll_token': next_scroll_token, 'results': [ {'url': hit['url']} for hit in hits ] }