Changeset View
Changeset View
Standalone View
Standalone View
swh/search/elasticsearch.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import base64 | import base64 | ||||
import msgpack | import msgpack | ||||
from typing import Any, Iterable, Dict, List, Iterator, Optional | from typing import Any, Iterable, Dict, List, Iterator, Optional | ||||
from elasticsearch import Elasticsearch | from elasticsearch import Elasticsearch | ||||
from elasticsearch.helpers import bulk, scan | from elasticsearch.helpers import bulk, scan | ||||
from swh.core.api import remote_api_endpoint | |||||
from swh.model.identifiers import origin_identifier | from swh.model.identifiers import origin_identifier | ||||
from swh.model import model | from swh.model import model | ||||
from swh.search.interface import PagedResult | from swh.search.interface import PagedResult | ||||
def _sanitize_origin(origin): | def _sanitize_origin(origin): | ||||
origin = origin.copy() | origin = origin.copy() | ||||
Show All 18 Lines | def token_decode(page_token: str) -> Dict[bytes, Any]: | ||||
""" | """ | ||||
return msgpack.loads(base64.b64decode(page_token.encode()), raw=True) | return msgpack.loads(base64.b64decode(page_token.encode()), raw=True) | ||||
class ElasticSearch: | class ElasticSearch: | ||||
def __init__(self, hosts: List[str]): | def __init__(self, hosts: List[str]): | ||||
self._backend = Elasticsearch(hosts=hosts) | self._backend = Elasticsearch(hosts=hosts) | ||||
@remote_api_endpoint("check") | |||||
def check(self): | def check(self): | ||||
return self._backend.ping() | return self._backend.ping() | ||||
def deinitialize(self) -> None: | def deinitialize(self) -> None: | ||||
"""Removes all indices from the Elasticsearch backend""" | """Removes all indices from the Elasticsearch backend""" | ||||
self._backend.indices.delete(index="*") | self._backend.indices.delete(index="*") | ||||
def initialize(self) -> None: | def initialize(self) -> None: | ||||
Show All 26 Lines | def initialize(self) -> None: | ||||
"type": "keyword", | "type": "keyword", | ||||
} | } | ||||
}, | }, | ||||
}, | }, | ||||
} | } | ||||
}, | }, | ||||
) | ) | ||||
@remote_api_endpoint("flush") | |||||
def flush(self) -> None: | def flush(self) -> None: | ||||
"""Blocks until all previous calls to _update() are completely | |||||
applied.""" | |||||
self._backend.indices.refresh(index="_all") | self._backend.indices.refresh(index="_all") | ||||
@remote_api_endpoint("origin/update") | def origin_update(self, documents: Iterable[Dict]) -> None: | ||||
def origin_update(self, documents: Iterable[dict]) -> None: | |||||
documents = map(_sanitize_origin, documents) | documents = map(_sanitize_origin, documents) | ||||
documents_with_sha1 = ( | documents_with_sha1 = ( | ||||
(origin_identifier(document), document) for document in documents | (origin_identifier(document), document) for document in documents | ||||
) | ) | ||||
actions = [ | actions = [ | ||||
{ | { | ||||
"_op_type": "update", | "_op_type": "update", | ||||
"_id": sha1, | "_id": sha1, | ||||
"_index": "origin", | "_index": "origin", | ||||
"doc": {**document, "sha1": sha1,}, | "doc": {**document, "sha1": sha1,}, | ||||
"doc_as_upsert": True, | "doc_as_upsert": True, | ||||
} | } | ||||
for (sha1, document) in documents_with_sha1 | for (sha1, document) in documents_with_sha1 | ||||
] | ] | ||||
bulk(self._backend, actions, index="origin") | bulk(self._backend, actions, index="origin") | ||||
def origin_dump(self) -> Iterator[model.Origin]: | def origin_dump(self) -> Iterator[model.Origin]: | ||||
"""Returns all content in Elasticsearch's index. Not exposed | |||||
publicly; but useful for tests.""" | |||||
results = scan(self._backend, index="*") | results = scan(self._backend, index="*") | ||||
for hit in results: | for hit in results: | ||||
yield self._backend.termvectors(index="origin", id=hit["_id"], fields=["*"]) | yield self._backend.termvectors(index="origin", id=hit["_id"], fields=["*"]) | ||||
@remote_api_endpoint("origin/search") | |||||
def origin_search( | def origin_search( | ||||
self, | self, | ||||
*, | *, | ||||
url_pattern: Optional[str] = None, | url_pattern: Optional[str] = None, | ||||
metadata_pattern: str = None, | metadata_pattern: Optional[str] = None, | ||||
with_visit: bool = False, | with_visit: bool = False, | ||||
page_token: Optional[str] = None, | page_token: Optional[str] = None, | ||||
limit: int = 50, | limit: int = 50, | ||||
) -> PagedResult[Dict[str, Any]]: | ) -> PagedResult[Dict[str, Any]]: | ||||
"""Searches for origins matching the `url_pattern`. | |||||
Args: | |||||
url_pattern: Part of the URL to search for | |||||
with_visit: Whether origins with no visit are to be | |||||
filtered out | |||||
page_token: Opaque value used for pagination | |||||
limit: number of results to return | |||||
Returns: | |||||
PagedResult of origin dicts matching the search criteria. If next_page_token | |||||
is None, there is no longer data to retrieve. | |||||
""" | |||||
query_clauses: List[Dict[str, Any]] = [] | query_clauses: List[Dict[str, Any]] = [] | ||||
if url_pattern: | if url_pattern: | ||||
query_clauses.append( | query_clauses.append( | ||||
{ | { | ||||
"multi_match": { | "multi_match": { | ||||
"query": url_pattern, | "query": url_pattern, | ||||
"type": "bool_prefix", | "type": "bool_prefix", | ||||
▲ Show 20 Lines • Show All 66 Lines • Show Last 20 Lines |