Changeset View
Changeset View
Standalone View
Standalone View
swh/search/elasticsearch.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import base64 | import base64 | ||||
import msgpack | |||||
from typing import Any, Iterable, Dict, List, Iterator, Optional | from typing import Any, Iterable, Dict, List, Iterator, Optional | ||||
from elasticsearch import Elasticsearch | from elasticsearch import Elasticsearch | ||||
from elasticsearch.helpers import bulk, scan | from elasticsearch.helpers import bulk, scan | ||||
import msgpack | |||||
from swh.core.api import remote_api_endpoint | from swh.core.api import remote_api_endpoint | ||||
from swh.model import model | |||||
from swh.model.identifiers import origin_identifier | from swh.model.identifiers import origin_identifier | ||||
from swh.model import model | |||||
from swh.search.interface import PagedResult | |||||
def _sanitize_origin(origin): | def _sanitize_origin(origin): | ||||
origin = origin.copy() | origin = origin.copy() | ||||
res = {"url": origin.pop("url")} | res = {"url": origin.pop("url")} | ||||
for field_name in ("intrinsic_metadata", "has_visits"): | for field_name in ("intrinsic_metadata", "has_visits"): | ||||
if field_name in origin: | if field_name in origin: | ||||
res[field_name] = origin.pop(field_name) | res[field_name] = origin.pop(field_name) | ||||
return res | return res | ||||
def token_encode(index_to_tokenize: Dict[bytes, Any]) -> str: | |||||
"""Tokenize as string an index page result from a search | |||||
""" | |||||
page_token = base64.b64encode(msgpack.dumps(index_to_tokenize)) | |||||
return page_token.decode() | |||||
def token_decode(page_token: str) -> Dict[bytes, Any]: | |||||
"""Read the page_token | |||||
""" | |||||
ardumont: I extracted those out because i kept on modifying that part which is "buried" inside the module. | |||||
return msgpack.loads(base64.b64decode(page_token.encode()), raw=True) | |||||
class ElasticSearch: | class ElasticSearch: | ||||
def __init__(self, hosts: List[str]): | def __init__(self, hosts: List[str]): | ||||
self._backend = Elasticsearch(hosts=hosts) | self._backend = Elasticsearch(hosts=hosts) | ||||
@remote_api_endpoint("check") | @remote_api_endpoint("check") | ||||
def check(self): | def check(self): | ||||
return self._backend.ping() | return self._backend.ping() | ||||
▲ Show 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | def origin_dump(self) -> Iterator[model.Origin]: | ||||
results = scan(self._backend, index="*") | results = scan(self._backend, index="*") | ||||
for hit in results: | for hit in results: | ||||
yield self._backend.termvectors(index="origin", id=hit["_id"], fields=["*"]) | yield self._backend.termvectors(index="origin", id=hit["_id"], fields=["*"]) | ||||
@remote_api_endpoint("origin/search") | @remote_api_endpoint("origin/search") | ||||
def origin_search( | def origin_search( | ||||
self, | self, | ||||
*, | *, | ||||
url_pattern: str = None, | url_pattern: Optional[str] = None, | ||||
Not Done Inline ActionsWhy doesn't mypy detect this? -_- vlorentz: Why doesn't mypy detect this? -_- | |||||
Done Inline ActionsI asked myself the same question ¯\_(ツ)_/¯ ardumont: I asked myself the same question ¯\_(ツ)_/¯
| |||||
metadata_pattern: str = None, | metadata_pattern: str = None, | ||||
with_visit: bool = False, | with_visit: bool = False, | ||||
page_token: str = None, | page_token: Optional[str] = None, | ||||
count: int = 50, | limit: int = 50, | ||||
) -> Dict[str, object]: | ) -> PagedResult[Dict[str, Any]]: | ||||
"""Searches for origins matching the `url_pattern`. | """Searches for origins matching the `url_pattern`. | ||||
Args: | Args: | ||||
url_pattern (str): Part of thr URL to search for | url_pattern: Part of the URL to search for | ||||
with_visit (bool): Whether origins with no visit are to be | with_visit: Whether origins with no visit are to be | ||||
filtered out | filtered out | ||||
page_token (str): Opaque value used for pagination. | page_token: Opaque value used for pagination | ||||
count (int): number of results to return. | limit: number of results to return | ||||
Returns: | Returns: | ||||
a dictionary with keys: | PagedResult of origin dicts matching the search criteria. If next_page_token | ||||
* `next_page_token`: | is None, there is no longer data to retrieve. | ||||
opaque value used for fetching more results. `None` if there | |||||
are no more result. | |||||
* `results`: | |||||
list of dictionaries with key: | |||||
* `url`: URL of a matching origin | |||||
""" | """ | ||||
query_clauses = [] # type: List[Dict[str, Any]] | query_clauses: List[Dict[str, Any]] = [] | ||||
if url_pattern: | if url_pattern: | ||||
query_clauses.append( | query_clauses.append( | ||||
{ | { | ||||
"multi_match": { | "multi_match": { | ||||
"query": url_pattern, | "query": url_pattern, | ||||
"type": "bool_prefix", | "type": "bool_prefix", | ||||
"operator": "and", | "operator": "and", | ||||
Show All 22 Lines | ) -> PagedResult[Dict[str, Any]]: | ||||
} | } | ||||
) | ) | ||||
if not query_clauses: | if not query_clauses: | ||||
raise ValueError( | raise ValueError( | ||||
"At least one of url_pattern and metadata_pattern must be provided." | "At least one of url_pattern and metadata_pattern must be provided." | ||||
) | ) | ||||
next_page_token: Optional[str] = None | |||||
if with_visit: | if with_visit: | ||||
query_clauses.append({"term": {"has_visits": True,}}) | query_clauses.append({"term": {"has_visits": True,}}) | ||||
body = { | body = { | ||||
"query": {"bool": {"must": query_clauses,}}, | "query": {"bool": {"must": query_clauses,}}, | ||||
"size": count, | |||||
Done Inline ActionsThis does not affect the results. ardumont: This does not affect the results.
It does affect the readability because it confuses the read… | |||||
"sort": [{"_score": "desc"}, {"sha1": "asc"},], | "sort": [{"_score": "desc"}, {"sha1": "asc"},], | ||||
} | } | ||||
if page_token: | if page_token: | ||||
# TODO: use ElasticSearch's scroll API? | # TODO: use ElasticSearch's scroll API? | ||||
page_token_content = msgpack.loads(base64.b64decode(page_token), raw=True) | page_token_content = token_decode(page_token) | ||||
body["search_after"] = [ | body["search_after"] = [ | ||||
page_token_content[b"score"], | page_token_content[b"score"], | ||||
page_token_content[b"sha1"].decode("ascii"), | page_token_content[b"sha1"].decode("ascii"), | ||||
] | ] | ||||
res = self._backend.search(index="origin", body=body, size=count,) | res = self._backend.search(index="origin", body=body, size=limit) | ||||
hits = res["hits"]["hits"] | hits = res["hits"]["hits"] | ||||
if len(hits) == count: | if len(hits) == limit: | ||||
last_hit = hits[-1] | last_hit = hits[-1] | ||||
next_page_token_content = { | next_page_token_content = { | ||||
b"score": last_hit["_score"], | b"score": last_hit["_score"], | ||||
b"sha1": last_hit["_source"]["sha1"], | b"sha1": last_hit["_source"]["sha1"], | ||||
} | } | ||||
next_page_token = base64.b64encode( | next_page_token = token_encode(next_page_token_content) | ||||
msgpack.dumps(next_page_token_content) | |||||
) # type: Optional[bytes] | assert len(hits) <= limit | ||||
else: | |||||
next_page_token = None | return PagedResult( | ||||
results=[{"url": hit["_source"]["url"]} for hit in hits], | |||||
return { | next_page_token=next_page_token, | ||||
"next_page_token": next_page_token, | ) | ||||
"results": [ | |||||
{ | |||||
# TODO: also add 'id'? | |||||
"url": hit["_source"]["url"], | |||||
} | |||||
for hit in hits | |||||
], | |||||
} |
I extracted those out because i kept on modifying that part which is "buried" inside the module.
Now it's at the top, no longer need to check back and forth where it's at.