Changeset View
Changeset View
Standalone View
Standalone View
swh/search/elasticsearch.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2021 The Software Heritage developers | ||||||||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||||||||||||
import base64 | import base64 | ||||||||||||||||||
from typing import Any, Dict, Iterable, Iterator, List, Optional | from typing import Any, Dict, Iterable, Iterator, List, Optional | ||||||||||||||||||
from elasticsearch import Elasticsearch | from elasticsearch import Elasticsearch | ||||||||||||||||||
from elasticsearch.helpers import bulk, scan | from elasticsearch.helpers import bulk, scan | ||||||||||||||||||
import msgpack | import msgpack | ||||||||||||||||||
from swh.indexer import codemeta | from swh.indexer import codemeta | ||||||||||||||||||
from swh.model import model | from swh.model import model | ||||||||||||||||||
from swh.model.identifiers import origin_identifier | from swh.model.identifiers import origin_identifier | ||||||||||||||||||
from swh.search.interface import PagedResult | from swh.search.interface import PagedResult | ||||||||||||||||||
def _sanitize_origin(origin): | def _sanitize_origin(origin): | ||||||||||||||||||
origin = origin.copy() | origin = origin.copy() | ||||||||||||||||||
# Whitelist fields to be saved in Elasticsearch | # Whitelist fields to be saved in Elasticsearch | ||||||||||||||||||
res = {"url": origin.pop("url")} | res = {"url": origin.pop("url")} | ||||||||||||||||||
for field_name in ("intrinsic_metadata", "has_visits"): | for field_name in ("intrinsic_metadata", "has_visits", "visit_types"): | ||||||||||||||||||
if field_name in origin: | if field_name in origin: | ||||||||||||||||||
res[field_name] = origin.pop(field_name) | res[field_name] = origin.pop(field_name) | ||||||||||||||||||
# Run the JSON-LD expansion algorithm | # Run the JSON-LD expansion algorithm | ||||||||||||||||||
# <https://www.w3.org/TR/json-ld-api/#expansion> | # <https://www.w3.org/TR/json-ld-api/#expansion> | ||||||||||||||||||
# to normalize the Codemeta metadata. | # to normalize the Codemeta metadata. | ||||||||||||||||||
# This is required as Elasticsearch will needs each field to have a consistent | # This is required as Elasticsearch will needs each field to have a consistent | ||||||||||||||||||
# type across documents to be searchable; and non-expanded JSON-LD documents | # type across documents to be searchable; and non-expanded JSON-LD documents | ||||||||||||||||||
▲ Show 20 Lines • Show All 62 Lines • ▼ Show 20 Lines | def initialize(self) -> None: | ||||||||||||||||||
# 2-gram and partial-3-gram search (ie. with the end of the | # 2-gram and partial-3-gram search (ie. with the end of the | ||||||||||||||||||
# third word potentially missing) | # third word potentially missing) | ||||||||||||||||||
"fields": { | "fields": { | ||||||||||||||||||
"as_you_type": { | "as_you_type": { | ||||||||||||||||||
"type": "search_as_you_type", | "type": "search_as_you_type", | ||||||||||||||||||
"analyzer": "simple", | "analyzer": "simple", | ||||||||||||||||||
} | } | ||||||||||||||||||
}, | }, | ||||||||||||||||||
}, | }, | ||||||||||||||||||
vlorentz: should be `keyword`, it has a very small set of possible values and doesn't need full-text… | |||||||||||||||||||
Done Inline Actionsack anlambert: ack | |||||||||||||||||||
"visit_types": {"type": "keyword"}, | |||||||||||||||||||
# used to filter out origins that were never visited | # used to filter out origins that were never visited | ||||||||||||||||||
"has_visits": {"type": "boolean",}, | "has_visits": {"type": "boolean",}, | ||||||||||||||||||
"intrinsic_metadata": { | "intrinsic_metadata": { | ||||||||||||||||||
"type": "nested", | "type": "nested", | ||||||||||||||||||
"properties": { | "properties": { | ||||||||||||||||||
"@context": { | "@context": { | ||||||||||||||||||
# don't bother indexing tokens in these URIs, as the | # don't bother indexing tokens in these URIs, as the | ||||||||||||||||||
# are used as namespaces | # are used as namespaces | ||||||||||||||||||
"type": "keyword", | "type": "keyword", | ||||||||||||||||||
} | } | ||||||||||||||||||
}, | }, | ||||||||||||||||||
}, | }, | ||||||||||||||||||
} | } | ||||||||||||||||||
}, | }, | ||||||||||||||||||
) | ) | ||||||||||||||||||
def flush(self) -> None: | def flush(self) -> None: | ||||||||||||||||||
self._backend.indices.refresh(index=self.origin_index) | self._backend.indices.refresh(index=self.origin_index) | ||||||||||||||||||
def origin_update(self, documents: Iterable[Dict]) -> None: | def origin_update(self, documents: Iterable[Dict]) -> None: | ||||||||||||||||||
documents = map(_sanitize_origin, documents) | documents = map(_sanitize_origin, documents) | ||||||||||||||||||
documents_with_sha1 = ( | documents_with_sha1 = ( | ||||||||||||||||||
(origin_identifier(document), document) for document in documents | (origin_identifier(document), document) for document in documents | ||||||||||||||||||
) | ) | ||||||||||||||||||
update_script = """ | |||||||||||||||||||
for (int i = 0; i < params.visit_types.length; ++i) { | |||||||||||||||||||
if (!ctx._source.visit_types.contains(params.visit_types[i])) { | |||||||||||||||||||
ctx._source.visit_types.add(params.visit_types[i]); | |||||||||||||||||||
} | |||||||||||||||||||
} | |||||||||||||||||||
ctx._source.has_visits = ( | |||||||||||||||||||
ctx._source.getOrDefault("has_visits", false) || params.has_visits | |||||||||||||||||||
Not Done Inline Actions
for readability vlorentz: for readability | |||||||||||||||||||
); | |||||||||||||||||||
""" | |||||||||||||||||||
Not Done Inline Actions
IMO that's easier to read vlorentz: IMO that's easier to read
| |||||||||||||||||||
Done Inline Actionsack, been a a while since I did not write java-ish code. anlambert: ack, been a a while since I did not write java-ish code. | |||||||||||||||||||
Done Inline ActionsCould you add comments? It's not a bit hard to follow. vlorentz: Could you add comments? It's not a bit hard to follow. | |||||||||||||||||||
actions = [ | actions = [ | ||||||||||||||||||
{ | { | ||||||||||||||||||
"_op_type": "update", | "_op_type": "update", | ||||||||||||||||||
"_id": sha1, | "_id": sha1, | ||||||||||||||||||
"_index": self.origin_index, | "_index": self.origin_index, | ||||||||||||||||||
"doc": {**document, "sha1": sha1,}, | "scripted_upsert": True, | ||||||||||||||||||
"doc_as_upsert": True, | "upsert": {**document, "sha1": sha1,}, | ||||||||||||||||||
"script": { | |||||||||||||||||||
"source": update_script, | |||||||||||||||||||
"lang": "painless", | |||||||||||||||||||
"params": { | |||||||||||||||||||
"visit_types": document.get("visit_types", []), | |||||||||||||||||||
"has_visits": document.get("has_visits", False), | |||||||||||||||||||
}, | |||||||||||||||||||
}, | |||||||||||||||||||
} | } | ||||||||||||||||||
for (sha1, document) in documents_with_sha1 | for (sha1, document) in documents_with_sha1 | ||||||||||||||||||
] | ] | ||||||||||||||||||
bulk(self._backend, actions, index=self.origin_index) | bulk(self._backend, actions, index=self.origin_index) | ||||||||||||||||||
def origin_dump(self) -> Iterator[model.Origin]: | def origin_dump(self) -> Iterator[model.Origin]: | ||||||||||||||||||
results = scan(self._backend, index=self.origin_index) | results = scan(self._backend, index=self.origin_index) | ||||||||||||||||||
for hit in results: | for hit in results: | ||||||||||||||||||
yield self._backend.termvectors( | yield self._backend.termvectors( | ||||||||||||||||||
index=self.origin_index, id=hit["_id"], fields=["*"] | index=self.origin_index, id=hit["_id"], fields=["*"] | ||||||||||||||||||
) | ) | ||||||||||||||||||
def origin_search( | def origin_search( | ||||||||||||||||||
self, | self, | ||||||||||||||||||
*, | *, | ||||||||||||||||||
url_pattern: Optional[str] = None, | url_pattern: Optional[str] = None, | ||||||||||||||||||
metadata_pattern: Optional[str] = None, | metadata_pattern: Optional[str] = None, | ||||||||||||||||||
with_visit: bool = False, | with_visit: bool = False, | ||||||||||||||||||
visit_types: Optional[List[str]] = None, | |||||||||||||||||||
page_token: Optional[str] = None, | page_token: Optional[str] = None, | ||||||||||||||||||
limit: int = 50, | limit: int = 50, | ||||||||||||||||||
) -> PagedResult[Dict[str, Any]]: | ) -> PagedResult[Dict[str, Any]]: | ||||||||||||||||||
query_clauses: List[Dict[str, Any]] = [] | query_clauses: List[Dict[str, Any]] = [] | ||||||||||||||||||
if url_pattern: | if url_pattern: | ||||||||||||||||||
query_clauses.append( | query_clauses.append( | ||||||||||||||||||
{ | { | ||||||||||||||||||
Show All 38 Lines | ) -> PagedResult[Dict[str, Any]]: | ||||||||||||||||||
if not query_clauses: | if not query_clauses: | ||||||||||||||||||
raise ValueError( | raise ValueError( | ||||||||||||||||||
"At least one of url_pattern and metadata_pattern must be provided." | "At least one of url_pattern and metadata_pattern must be provided." | ||||||||||||||||||
) | ) | ||||||||||||||||||
if with_visit: | if with_visit: | ||||||||||||||||||
query_clauses.append({"term": {"has_visits": True,}}) | query_clauses.append({"term": {"has_visits": True,}}) | ||||||||||||||||||
if visit_types is not None: | |||||||||||||||||||
query_clauses.append({"terms": {"visit_types": visit_types}}) | |||||||||||||||||||
body = { | body = { | ||||||||||||||||||
"query": {"bool": {"must": query_clauses,}}, | "query": {"bool": {"must": query_clauses,}}, | ||||||||||||||||||
"sort": [{"_score": "desc"}, {"sha1": "asc"},], | "sort": [{"_score": "desc"}, {"sha1": "asc"},], | ||||||||||||||||||
} | } | ||||||||||||||||||
if page_token: | if page_token: | ||||||||||||||||||
# TODO: use ElasticSearch's scroll API? | # TODO: use ElasticSearch's scroll API? | ||||||||||||||||||
page_token_content = token_decode(page_token) | page_token_content = token_decode(page_token) | ||||||||||||||||||
body["search_after"] = [ | body["search_after"] = [ | ||||||||||||||||||
Show All 26 Lines |
should be keyword, it has a very small set of possible values and doesn't need full-text search.