Changeset View
Changeset View
Standalone View
Standalone View
swh/search/elasticsearch.py
Show All 12 Lines | |||||
from swh.indexer import codemeta | from swh.indexer import codemeta | ||||
from swh.model import model | from swh.model import model | ||||
from swh.model.identifiers import origin_identifier | from swh.model.identifiers import origin_identifier | ||||
from swh.search.interface import PagedResult | from swh.search.interface import PagedResult | ||||
def _sanitize_origin(origin): | def _sanitize_origin(origin): | ||||
origin = origin.copy() | origin = origin.copy() | ||||
# Whitelist fields to be saved in Elasticsearch | |||||
res = {"url": origin.pop("url")} | res = {"url": origin.pop("url")} | ||||
for field_name in ("intrinsic_metadata", "has_visits"): | for field_name in ("intrinsic_metadata", "has_visits"): | ||||
if field_name in origin: | if field_name in origin: | ||||
res[field_name] = origin.pop(field_name) | res[field_name] = origin.pop(field_name) | ||||
# Run the JSON-LD expansion algorithm | |||||
# <https://www.w3.org/TR/json-ld-api/#expansion> | |||||
# to normalize the Codemeta metadata. | |||||
# This is required as Elasticsearch will needs each field to have a consistent | |||||
# type across documents to be searchable; and non-expanded JSON-LD documents | |||||
# can have various types in the same field. For example, all these are | |||||
# equivalent in JSON-LD: | |||||
# * {"author": "Jane Doe"} | |||||
# * {"author": ["Jane Doe"]} | |||||
# * {"author": {"@value": "Jane Doe"}} | |||||
# * {"author": [{"@value": "Jane Doe"}]} | |||||
# and JSON-LD expansion will convert them all to the last one. | |||||
if "intrinsic_metadata" in res: | if "intrinsic_metadata" in res: | ||||
res["intrinsic_metadata"] = codemeta.expand(res["intrinsic_metadata"]) | res["intrinsic_metadata"] = codemeta.expand(res["intrinsic_metadata"]) | ||||
return res | return res | ||||
def token_encode(index_to_tokenize: Dict[bytes, Any]) -> str: | def token_encode(index_to_tokenize: Dict[bytes, Any]) -> str: | ||||
"""Tokenize as string an index page result from a search | """Tokenize as string an index page result from a search | ||||
""" | """ | ||||
page_token = base64.b64encode(msgpack.dumps(index_to_tokenize)) | page_token = base64.b64encode(msgpack.dumps(index_to_tokenize)) | ||||
Show All 21 Lines | class ElasticSearch: | ||||
def initialize(self) -> None: | def initialize(self) -> None: | ||||
"""Declare Elasticsearch indices and mappings""" | """Declare Elasticsearch indices and mappings""" | ||||
if not self._backend.indices.exists(index="origin"): | if not self._backend.indices.exists(index="origin"): | ||||
self._backend.indices.create(index="origin") | self._backend.indices.create(index="origin") | ||||
self._backend.indices.put_mapping( | self._backend.indices.put_mapping( | ||||
index="origin", | index="origin", | ||||
body={ | body={ | ||||
"properties": { | "properties": { | ||||
# sha1 of the URL; used as the document id | |||||
"sha1": {"type": "keyword", "doc_values": True,}, | "sha1": {"type": "keyword", "doc_values": True,}, | ||||
# Used both to search URLs, and as the result to return | |||||
# as a response to queries | |||||
"url": { | "url": { | ||||
"type": "text", | "type": "text", | ||||
# To split URLs into token on any character | # To split URLs into token on any character | ||||
# that is not alphanumerical | # that is not alphanumerical | ||||
"analyzer": "simple", | "analyzer": "simple", | ||||
# 2-gram and partial-3-gram search (ie. with the end of the | |||||
# third word potentially missing) | |||||
"fields": { | "fields": { | ||||
"as_you_type": { | "as_you_type": { | ||||
"type": "search_as_you_type", | "type": "search_as_you_type", | ||||
"analyzer": "simple", | "analyzer": "simple", | ||||
} | } | ||||
}, | }, | ||||
}, | }, | ||||
# used to filter out origins that were never visited | |||||
"has_visits": {"type": "boolean",}, | "has_visits": {"type": "boolean",}, | ||||
"intrinsic_metadata": { | "intrinsic_metadata": { | ||||
"type": "nested", | "type": "nested", | ||||
"properties": { | "properties": { | ||||
"@context": { | "@context": { | ||||
# don't bother indexing tokens | # don't bother indexing tokens in these URIs, as the | ||||
# are used as namespaces | |||||
"type": "keyword", | "type": "keyword", | ||||
} | } | ||||
}, | }, | ||||
}, | }, | ||||
} | } | ||||
}, | }, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | ) -> PagedResult[Dict[str, Any]]: | ||||
if metadata_pattern: | if metadata_pattern: | ||||
query_clauses.append( | query_clauses.append( | ||||
{ | { | ||||
"nested": { | "nested": { | ||||
"path": "intrinsic_metadata", | "path": "intrinsic_metadata", | ||||
"query": { | "query": { | ||||
"multi_match": { | "multi_match": { | ||||
"query": metadata_pattern, | "query": metadata_pattern, | ||||
# Makes it so that the "foo bar" query returns | |||||
# documents which contain "foo" in a field and "bar" | |||||
# in a different field | |||||
"type": "cross_fields", | "type": "cross_fields", | ||||
# All keywords must be found in a document for it to | |||||
# be considered a match. | |||||
# TODO: allow missing keywords? | |||||
"operator": "and", | "operator": "and", | ||||
# Searches on all fields of the intrinsic_metadata dict, | |||||
# recursively. | |||||
"fields": ["intrinsic_metadata.*"], | "fields": ["intrinsic_metadata.*"], | ||||
} | } | ||||
}, | }, | ||||
} | } | ||||
} | } | ||||
) | ) | ||||
if not query_clauses: | if not query_clauses: | ||||
raise ValueError( | raise ValueError( | ||||
"At least one of url_pattern and metadata_pattern must be provided." | "At least one of url_pattern and metadata_pattern must be provided." | ||||
) | ) | ||||
next_page_token: Optional[str] = None | |||||
if with_visit: | if with_visit: | ||||
query_clauses.append({"term": {"has_visits": True,}}) | query_clauses.append({"term": {"has_visits": True,}}) | ||||
body = { | body = { | ||||
"query": {"bool": {"must": query_clauses,}}, | "query": {"bool": {"must": query_clauses,}}, | ||||
"sort": [{"_score": "desc"}, {"sha1": "asc"},], | "sort": [{"_score": "desc"}, {"sha1": "asc"},], | ||||
} | } | ||||
if page_token: | if page_token: | ||||
# TODO: use ElasticSearch's scroll API? | # TODO: use ElasticSearch's scroll API? | ||||
page_token_content = token_decode(page_token) | page_token_content = token_decode(page_token) | ||||
body["search_after"] = [ | body["search_after"] = [ | ||||
page_token_content[b"score"], | page_token_content[b"score"], | ||||
page_token_content[b"sha1"].decode("ascii"), | page_token_content[b"sha1"].decode("ascii"), | ||||
] | ] | ||||
res = self._backend.search(index="origin", body=body, size=limit) | res = self._backend.search(index="origin", body=body, size=limit) | ||||
hits = res["hits"]["hits"] | hits = res["hits"]["hits"] | ||||
next_page_token: Optional[str] = None | |||||
if len(hits) == limit: | if len(hits) == limit: | ||||
# There are more results after this page; return a pagination token | |||||
# to get them in a future query | |||||
last_hit = hits[-1] | last_hit = hits[-1] | ||||
next_page_token_content = { | next_page_token_content = { | ||||
b"score": last_hit["_score"], | b"score": last_hit["_score"], | ||||
b"sha1": last_hit["_source"]["sha1"], | b"sha1": last_hit["_source"]["sha1"], | ||||
} | } | ||||
next_page_token = token_encode(next_page_token_content) | next_page_token = token_encode(next_page_token_content) | ||||
assert len(hits) <= limit | assert len(hits) <= limit | ||||
return PagedResult( | return PagedResult( | ||||
results=[{"url": hit["_source"]["url"]} for hit in hits], | results=[{"url": hit["_source"]["url"]} for hit in hits], | ||||
next_page_token=next_page_token, | next_page_token=next_page_token, | ||||
) | ) |