diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py index 8c5874c..9aef7ae 100644 --- a/swh/search/elasticsearch.py +++ b/swh/search/elasticsearch.py @@ -1,207 +1,208 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 from typing import Iterable, Dict, List, Iterator from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk, scan import msgpack from swh.core.api import remote_api_endpoint from swh.model import model from swh.model.identifiers import origin_identifier def _sanitize_origin(origin): origin = origin.copy() res = { 'url': origin.pop('url') } for field_name in ('intrinsic_metadata',): if field_name in origin: res[field_name] = origin.pop(field_name) return res class ElasticSearch: def __init__(self, hosts: List[str]): self._backend = Elasticsearch(hosts=hosts) @remote_api_endpoint('check') def check(self): return self._backend.ping() def deinitialize(self) -> None: """Removes all indices from the Elasticsearch backend""" self._backend.indices.delete(index='*') def initialize(self) -> None: """Declare Elasticsearch indices and mappings""" self._backend.indices.create( index='origin', body={ 'mappings': { 'properties': { 'url': { 'type': 'text', # TODO: consider removing fielddata when # swh-storage allows querying by hash, so the # full URL does not have to be stored in ES' # memory. See: # https://www.elastic.co/guide/en/elasticsearch/reference/current/fielddata.html#before-enabling-fielddata 'fielddata': True, # To split URLs into token on any character # that is not alphanumerical 'analyzer': 'simple', 'fields': { 'as_you_type': { 'type': 'search_as_you_type', 'analyzer': 'simple', } } }, 'intrinsic_metadata': { 'type': 'nested', 'properties': { '@context': { # don't bother indexing tokens 'type': 'keyword', } }, }, } } } ) @remote_api_endpoint('origin/update') def origin_update(self, documents: Iterable[dict]) -> None: documents = map(_sanitize_origin, documents) actions = [ { '_op_type': 'update', '_id': origin_identifier(document), '_index': 'origin', 'doc': document, 'doc_as_upsert': True, } for document in documents ] # TODO: make refresh='wait_for' configurable (we don't need it # in production, it will probably be a performance issue) bulk(self._backend, actions, index='origin', refresh='wait_for') def origin_dump(self) -> Iterator[model.Origin]: """Returns all content in Elasticsearch's index. Not exposed publicly; but useful for tests.""" results = scan(self._backend, index='*') for hit in results: yield self._backend.termvectors( index='origin', id=hit['_id'], fields=['*']) @remote_api_endpoint('origin/search') def origin_search( self, *, url_pattern: str = None, metadata_pattern: str = None, - cursor: str = None, count: int = 50 + scroll_token: str = None, count: int = 50 ) -> Dict[str, object]: """Searches for origins matching the `url_pattern`. Args: url_pattern (str): Part of thr URL to search for - cursor (str): `cursor` is opaque value used for pagination. + scroll_token (str): `scroll_token` is an opaque value used for + pagination. count (int): number of results to return. Returns: a dictionary with keys: - * `cursor`: + * `scroll_token`: opaque value used for fetching more results. `None` if there are no more result. * `results`: list of dictionaries with key: * `url`: URL of a matching origin """ - # TODO: find a better name for "cursor" query_clauses = [] if url_pattern: query_clauses.append({ 'multi_match': { 'query': url_pattern, 'type': 'bool_prefix', 'fields': [ 'url.as_you_type', 'url.as_you_type._2gram', 'url.as_you_type._3gram', ] } }) if metadata_pattern: query_clauses.append({ 'nested': { 'path': 'intrinsic_metadata', 'query': { 'multi_match': { 'query': metadata_pattern, 'fields': ['intrinsic_metadata.*'] } }, } }) if not query_clauses: raise ValueError( 'At least one of url_pattern and metadata_pattern ' 'must be provided.') body = { 'query': { 'bool': { 'should': query_clauses, # TODO: must? } }, 'size': count, 'sort': [ {'_score': 'desc'}, {'_id': 'asc'}, ] } - if cursor: + if scroll_token: # TODO: use ElasticSearch's scroll API? - cursor = msgpack.loads(base64.b64decode(cursor)) + scroll_token = msgpack.loads(base64.b64decode(scroll_token)) body['search_after'] = \ - [cursor[b'score'], cursor[b'id'].decode('ascii')] + [scroll_token[b'score'], scroll_token[b'id'].decode('ascii')] res = self._backend.search( index='origin', body=body, size=count, ) hits = res['hits']['hits'] if len(hits) == count: last_hit = hits[-1] - next_cursor = { + next_scroll_token = { b'score': last_hit['_score'], b'id': last_hit['_id'], } - next_cursor = base64.b64encode(msgpack.dumps(next_cursor)) + next_scroll_token = base64.b64encode(msgpack.dumps( + next_scroll_token)) else: - next_cursor = None + next_scroll_token = None return { - 'cursor': next_cursor, + 'scroll_token': next_scroll_token, 'results': [ { # TODO: also add 'id'? 'url': hit['_source']['url'], } for hit in hits ] } diff --git a/swh/search/in_memory.py b/swh/search/in_memory.py index 8eae987..4592457 100644 --- a/swh/search/in_memory.py +++ b/swh/search/in_memory.py @@ -1,117 +1,118 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 from collections import defaultdict import itertools import re from typing import Iterable, Dict import msgpack from swh.core.api import remote_api_endpoint from swh.model.identifiers import origin_identifier def _sanitize_origin(origin): origin = origin.copy() res = { 'url': origin.pop('url') } for field_name in ('type', 'intrinsic_metadata'): if field_name in origin: res[field_name] = origin.pop(field_name) return res class InMemorySearch: def __init__(self): pass @remote_api_endpoint('check') def check(self): return True def deinitialize(self) -> None: if hasattr(self, '_origins'): del self._origins del self._origin_ids def initialize(self) -> None: self._origins = defaultdict(dict) self._origin_ids = [] _url_splitter = re.compile(r'\W') @remote_api_endpoint('origin/update') def origin_update(self, documents: Iterable[dict]) -> None: for document in documents: document = document.copy() id_ = origin_identifier(document) if 'url' in document: document['_url_tokens'] = \ set(self._url_splitter.split(document['url'])) self._origins[id_].update(document) if id_ not in self._origin_ids: self._origin_ids.append(id_) @remote_api_endpoint('origin/search') def origin_search( self, *, url_pattern: str = None, metadata_pattern: str = None, - cursor: str = None, count: int = 50 + scroll_token: str = None, count: int = 50 ) -> Dict[str, object]: matches = (self._origins[id_] for id_ in self._origin_ids) if url_pattern: tokens = set(self._url_splitter.split(url_pattern)) def predicate(match): missing_tokens = tokens - match['_url_tokens'] if len(missing_tokens) == 0: return True elif len(missing_tokens) > 1: return False else: # There is one missing token, look up by prefix. (missing_token,) = missing_tokens return any(token.startswith(missing_token) for token in match['_url_tokens']) matches = filter(predicate, matches) if metadata_pattern: raise NotImplementedError( 'Metadata search is not implemented in the in-memory backend.') if not url_pattern and not metadata_pattern: raise ValueError( 'At least one of url_pattern and metadata_pattern ' 'must be provided.') - if cursor: - cursor = msgpack.loads(base64.b64decode(cursor)) - start_at_index = cursor[b'start_at_index'] + if scroll_token: + scroll_token = msgpack.loads(base64.b64decode(scroll_token)) + start_at_index = scroll_token[b'start_at_index'] else: start_at_index = 0 hits = list(itertools.islice( matches, start_at_index, start_at_index+count)) if len(hits) == count: - next_cursor = { + next_scroll_token = { b'start_at_index': start_at_index+count, } - next_cursor = base64.b64encode(msgpack.dumps(next_cursor)) + next_scroll_token = base64.b64encode(msgpack.dumps( + next_scroll_token)) else: - next_cursor = None + next_scroll_token = None return { - 'cursor': next_cursor, + 'scroll_token': next_scroll_token, 'results': [ {'url': hit['url']} for hit in hits ] } diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py index e7cf7f7..15bd576 100644 --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -1,86 +1,86 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile from unittest.mock import patch, MagicMock from click.testing import CliRunner from swh.journal.serializers import value_to_kafka from swh.search.cli import cli from .test_elasticsearch import BaseElasticsearchTest CLI_CONFIG = ''' search: cls: elasticsearch args: hosts: - '{elasticsearch_host}' ''' JOURNAL_OBJECTS_CONFIG = ''' journal: brokers: - 192.0.2.1 prefix: swh.journal.objects group_id: test-consumer ''' def invoke(catch_exceptions, args, config='', *, elasticsearch_host): runner = CliRunner() with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write((CLI_CONFIG + config).format( elasticsearch_host=elasticsearch_host )) config_fd.seek(0) result = runner.invoke(cli, ['-C' + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result class CliTestCase(BaseElasticsearchTest): def test__journal_client__origin(self): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" mock_consumer = MagicMock() topic = 'swh.journal.objects.origin' value = value_to_kafka({ 'url': 'http://foobar.baz', }) message = MagicMock() message.error.return_value = None message.topic.return_value = topic message.value.return_value = value mock_consumer.poll.return_value = message with patch('swh.journal.client.Consumer', return_value=mock_consumer): result = invoke(False, [ 'journal-client', 'objects', '--max-messages', '1', ], JOURNAL_OBJECTS_CONFIG, elasticsearch_host=self._elasticsearch_host) mock_consumer.subscribe.assert_called_once_with(topics=[topic]) mock_consumer.poll.assert_called_once_with(timeout=1.0) mock_consumer.commit.assert_called_once_with() # Check the output expected_output = ( 'Processed 1 messages.\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output results = self.search.origin_search(url_pattern='foobar') - assert results == {'cursor': None, 'results': [ + assert results == {'scroll_token': None, 'results': [ {'url': 'http://foobar.baz'}]} diff --git a/swh/search/tests/test_search.py b/swh/search/tests/test_search.py index e25f03e..4c4e2e9 100644 --- a/swh/search/tests/test_search.py +++ b/swh/search/tests/test_search.py @@ -1,225 +1,225 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given, strategies, settings from swh.search.utils import stream_results class CommonSearchTest: def test_origin_url_unique_word_prefix(self): self.search.origin_update([ {'url': 'http://foobar.baz'}, {'url': 'http://barbaz.qux'}, {'url': 'http://qux.quux'}, ]) results = self.search.origin_search(url_pattern='foobar') - assert results == {'cursor': None, 'results': [ + assert results == {'scroll_token': None, 'results': [ {'url': 'http://foobar.baz'}]} results = self.search.origin_search(url_pattern='barb') - assert results == {'cursor': None, 'results': [ + assert results == {'scroll_token': None, 'results': [ {'url': 'http://barbaz.qux'}]} # 'bar' is part of 'foobar', but is not the beginning of it results = self.search.origin_search(url_pattern='bar') - assert results == {'cursor': None, 'results': [ + assert results == {'scroll_token': None, 'results': [ {'url': 'http://barbaz.qux'}]} results = self.search.origin_search(url_pattern='barbaz') - assert results == {'cursor': None, 'results': [ + assert results == {'scroll_token': None, 'results': [ {'url': 'http://barbaz.qux'}]} def test_origin_url_unique_word_prefix_multiple_results(self): self.search.origin_update([ {'url': 'http://foobar.baz'}, {'url': 'http://barbaz.qux'}, {'url': 'http://qux.quux'}, ]) results = self.search.origin_search(url_pattern='qu') - assert results['cursor'] is None + assert results['scroll_token'] is None results = [res['url'] for res in results['results']] expected_results = ['http://qux.quux', 'http://barbaz.qux'] assert sorted(results) == sorted(expected_results) results = self.search.origin_search(url_pattern='qux') - assert results['cursor'] is None + assert results['scroll_token'] is None results = [res['url'] for res in results['results']] expected_results = ['http://barbaz.qux', 'http://qux.quux'] assert sorted(results) == sorted(expected_results) def test_origin_intrinsic_metadata_description(self): self.search.origin_update([ { 'url': 'http://origin1', 'intrinsic_metadata': {}, }, { 'url': 'http://origin2', 'intrinsic_metadata': { '@context': 'https://doi.org/10.5063/schema/codemeta-2.0', 'description': 'foo bar', }, }, { 'url': 'http://origin3', 'intrinsic_metadata': { '@context': 'https://doi.org/10.5063/schema/codemeta-2.0', 'description': 'bar baz', } }, ]) results = self.search.origin_search(metadata_pattern='foo') - assert results == {'cursor': None, 'results': [ + assert results == {'scroll_token': None, 'results': [ {'url': 'http://origin2'}]} # ES returns both results, because blahblah results = self.search.origin_search(metadata_pattern='foo bar') - assert results == {'cursor': None, 'results': [ + assert results == {'scroll_token': None, 'results': [ {'url': 'http://origin2'}, {'url': 'http://origin3'}]} results = self.search.origin_search(metadata_pattern='bar baz') - assert results == {'cursor': None, 'results': [ + assert results == {'scroll_token': None, 'results': [ {'url': 'http://origin3'}, {'url': 'http://origin2'}]} def test_origin_intrinsic_metadata_nested(self): self.search.origin_update([ { 'url': 'http://origin1', 'intrinsic_metadata': {}, }, { 'url': 'http://origin2', 'intrinsic_metadata': { '@context': 'https://doi.org/10.5063/schema/codemeta-2.0', 'keywords': ['foo', 'bar'], }, }, { 'url': 'http://origin3', 'intrinsic_metadata': { '@context': 'https://doi.org/10.5063/schema/codemeta-2.0', 'keywords': ['bar', 'baz'], } }, ]) results = self.search.origin_search(metadata_pattern='foo') - assert results == {'cursor': None, 'results': [ + assert results == {'scroll_token': None, 'results': [ {'url': 'http://origin2'}]} results = self.search.origin_search(metadata_pattern='foo bar') - assert results == {'cursor': None, 'results': [ + assert results == {'scroll_token': None, 'results': [ {'url': 'http://origin2'}, {'url': 'http://origin3'}]} results = self.search.origin_search(metadata_pattern='bar baz') - assert results == {'cursor': None, 'results': [ + assert results == {'scroll_token': None, 'results': [ {'url': 'http://origin3'}, {'url': 'http://origin2'}]} # TODO: add more tests with more codemeta terms # TODO: add more tests with edge cases @settings(deadline=None) @given(strategies.integers(min_value=1, max_value=4)) def test_origin_url_paging(self, count): # TODO: no hypothesis self.reset() self.search.origin_update([ {'url': 'http://origin1/foo'}, {'url': 'http://origin2/foo/bar'}, {'url': 'http://origin3/foo/bar/baz'}, ]) results = stream_results( self.search.origin_search, url_pattern='foo bar baz', count=count) results = [res['url'] for res in results] expected_results = [ 'http://origin3/foo/bar/baz', ] assert sorted(results[0:len(expected_results)]) == \ sorted(expected_results) results = stream_results( self.search.origin_search, url_pattern='foo bar', count=count) expected_results = [ 'http://origin2/foo/bar', 'http://origin3/foo/bar/baz', ] results = [res['url'] for res in results] assert sorted(results[0:len(expected_results)]) == \ sorted(expected_results) results = stream_results( self.search.origin_search, url_pattern='foo', count=count) expected_results = [ 'http://origin1/foo', 'http://origin2/foo/bar', 'http://origin3/foo/bar/baz', ] results = [res['url'] for res in results] assert sorted(results[0:len(expected_results)]) == \ sorted(expected_results) @settings(deadline=None) @given(strategies.integers(min_value=1, max_value=4)) def test_origin_intrinsic_metadata_paging(self, count): # TODO: no hypothesis self.reset() self.search.origin_update([ { 'url': 'http://origin1', 'intrinsic_metadata': { '@context': 'https://doi.org/10.5063/schema/codemeta-2.0', 'keywords': ['foo'], }, }, { 'url': 'http://origin2', 'intrinsic_metadata': { '@context': 'https://doi.org/10.5063/schema/codemeta-2.0', 'keywords': ['foo', 'bar'], }, }, { 'url': 'http://origin3', 'intrinsic_metadata': { '@context': 'https://doi.org/10.5063/schema/codemeta-2.0', 'keywords': ['foo', 'bar', 'baz'], } }, ]) results = stream_results( self.search.origin_search, metadata_pattern='foo bar baz', count=count) assert list(results) == [ {'url': 'http://origin3'}, {'url': 'http://origin2'}, {'url': 'http://origin1'}] results = stream_results( self.search.origin_search, metadata_pattern='foo bar', count=count) assert list(results) == [ {'url': 'http://origin2'}, {'url': 'http://origin3'}, {'url': 'http://origin1'}] results = stream_results( self.search.origin_search, metadata_pattern='foo', count=count) assert list(results) == [ {'url': 'http://origin1'}, {'url': 'http://origin2'}, {'url': 'http://origin3'}] diff --git a/swh/search/utils.py b/swh/search/utils.py index a91c98a..19b6551 100644 --- a/swh/search/utils.py +++ b/swh/search/utils.py @@ -1,16 +1,16 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def stream_results(f, *args, **kwargs): - if 'cursor' in kwargs: - raise TypeError('stream_results has no argument "cursor".') - cursor = None + if 'scroll_token' in kwargs: + raise TypeError('stream_results has no argument "scroll_token".') + scroll_token = None while True: - results = f(*args, cursor=cursor, **kwargs) + results = f(*args, scroll_token=scroll_token, **kwargs) yield from results['results'] - cursor = results['cursor'] - if cursor is None: + scroll_token = results['scroll_token'] + if scroll_token is None: break