diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py --- a/swh/search/elasticsearch.py +++ b/swh/search/elasticsearch.py @@ -46,14 +46,12 @@ index='origin', body={ 'properties': { + 'sha1': { + 'type': 'keyword', + 'doc_values': True, + }, 'url': { 'type': 'text', - # TODO: consider removing fielddata when - # swh-storage allows querying by hash, so the - # full URL does not have to be stored in ES' - # memory. See: - # https://www.elastic.co/guide/en/elasticsearch/reference/current/fielddata.html#before-enabling-fielddata - 'fielddata': True, # To split URLs into token on any character # that is not alphanumerical 'analyzer': 'simple', @@ -80,22 +78,31 @@ } ) + @remote_api_endpoint('flush') + def flush(self) -> None: + """Blocks until all previous calls to _update() are completely + applied.""" + self._backend.indices.refresh(index='_all') + @remote_api_endpoint('origin/update') def origin_update(self, documents: Iterable[dict]) -> None: documents = map(_sanitize_origin, documents) + documents_with_sha1 = ((origin_identifier(document), document) + for document in documents) actions = [ { '_op_type': 'update', - '_id': origin_identifier(document), + '_id': sha1, '_index': 'origin', - 'doc': document, + 'doc': { + **document, + 'sha1': sha1, + }, 'doc_as_upsert': True, } - for document in documents + for (sha1, document) in documents_with_sha1 ] - # TODO: make refresh='wait_for' configurable (we don't need it - # in production, it will probably be a performance issue) - bulk(self._backend, actions, index='origin', refresh='wait_for') + bulk(self._backend, actions, index='origin') def origin_dump(self) -> Iterator[model.Origin]: """Returns all content in Elasticsearch's index. Not exposed @@ -180,7 +187,7 @@ 'size': count, 'sort': [ {'_score': 'desc'}, - {'_id': 'asc'}, + {'sha1': 'asc'}, ] } if page_token: @@ -189,7 +196,7 @@ base64.b64decode(page_token)) body['search_after'] = \ [page_token_content[b'score'], - page_token_content[b'id'].decode('ascii')] + page_token_content[b'sha1'].decode('ascii')] res = self._backend.search( index='origin', @@ -203,7 +210,7 @@ last_hit = hits[-1] next_page_token_content = { b'score': last_hit['_score'], - b'id': last_hit['_id'], + b'sha1': last_hit['_source']['sha1'], } next_page_token = base64.b64encode(msgpack.dumps( next_page_token_content)) # type: Optional[bytes] diff --git a/swh/search/in_memory.py b/swh/search/in_memory.py --- a/swh/search/in_memory.py +++ b/swh/search/in_memory.py @@ -43,6 +43,9 @@ self._origins = defaultdict(dict) # type: Dict[str, Dict[str, Any]] self._origin_ids = [] # type: List[str] + def flush(self) -> None: + pass + _url_splitter = re.compile(r'\W') @remote_api_endpoint('origin/update') diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -85,6 +85,8 @@ assert result.exit_code == 0, result.output assert result.output == expected_output + self.search.flush() + results = self.search.origin_search(url_pattern='foobar') assert results == {'next_page_token': None, 'results': [ {'url': 'http://foobar.baz'}]} @@ -123,6 +125,8 @@ assert result.exit_code == 0, result.output assert result.output == expected_output + self.search.flush() + results = self.search.origin_search(url_pattern='foobar', with_visit=True) assert results == {'next_page_token': None, 'results': [ diff --git a/swh/search/tests/test_search.py b/swh/search/tests/test_search.py --- a/swh/search/tests/test_search.py +++ b/swh/search/tests/test_search.py @@ -15,6 +15,7 @@ {'url': 'http://barbaz.qux'}, {'url': 'http://qux.quux'}, ]) + self.search.flush() results = self.search.origin_search(url_pattern='foobar') assert results == {'next_page_token': None, 'results': [ @@ -39,6 +40,7 @@ {'url': 'http://barbaz.qux'}, {'url': 'http://qux.quux'}, ]) + self.search.flush() results = self.search.origin_search(url_pattern='qu') assert results['next_page_token'] is None @@ -58,6 +60,7 @@ self.search.origin_update([ {'url': 'http://foobar.baz', 'has_visits': True}, ]) + self.search.flush() results = self.search.origin_search( url_pattern='foobar', with_visit=True) @@ -68,6 +71,7 @@ self.search.origin_update([ {'url': 'http://foobar.baz'}, ]) + self.search.flush() results = self.search.origin_search( url_pattern='foobar', with_visit=True) @@ -76,6 +80,7 @@ self.search.origin_update([ {'url': 'http://foobar.baz', 'has_visits': True}, ]) + self.search.flush() results = self.search.origin_search( url_pattern='foobar', with_visit=True) @@ -103,6 +108,7 @@ } }, ]) + self.search.flush() results = self.search.origin_search(metadata_pattern='foo') assert results == {'next_page_token': None, 'results': [ @@ -138,6 +144,7 @@ } }, ]) + self.search.flush() results = self.search.origin_search(metadata_pattern='foo') assert results == {'next_page_token': None, 'results': [ @@ -165,6 +172,7 @@ {'url': 'http://origin2/foo/bar'}, {'url': 'http://origin3/foo/bar/baz'}, ]) + self.search.flush() results = stream_results( self.search.origin_search, @@ -227,6 +235,7 @@ } }, ]) + self.search.flush() results = stream_results( self.search.origin_search,