diff --git a/requirements.txt b/requirements.txt index 60e006a..12608a1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html click elasticsearch>=7.0.0,<8.0.0 +typing-extensions diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py index 24f74da..a5dd4a3 100644 --- a/swh/search/elasticsearch.py +++ b/swh/search/elasticsearch.py @@ -1,282 +1,282 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 from typing import Any, Dict, Iterable, Iterator, List, Optional from elasticsearch import Elasticsearch, helpers import msgpack from swh.indexer import codemeta from swh.model import model from swh.model.identifiers import origin_identifier -from swh.search.interface import PagedResult +from swh.search.interface import MinimalOriginDict, OriginDict, PagedResult from swh.search.metrics import send_metric, timed def _sanitize_origin(origin): origin = origin.copy() # Whitelist fields to be saved in Elasticsearch res = {"url": origin.pop("url")} for field_name in ("intrinsic_metadata", "has_visits", "visit_types"): if field_name in origin: res[field_name] = origin.pop(field_name) # Run the JSON-LD expansion algorithm # # to normalize the Codemeta metadata. # This is required as Elasticsearch will needs each field to have a consistent # type across documents to be searchable; and non-expanded JSON-LD documents # can have various types in the same field. For example, all these are # equivalent in JSON-LD: # * {"author": "Jane Doe"} # * {"author": ["Jane Doe"]} # * {"author": {"@value": "Jane Doe"}} # * {"author": [{"@value": "Jane Doe"}]} # and JSON-LD expansion will convert them all to the last one. if "intrinsic_metadata" in res: res["intrinsic_metadata"] = codemeta.expand(res["intrinsic_metadata"]) return res def token_encode(index_to_tokenize: Dict[bytes, Any]) -> str: """Tokenize as string an index page result from a search """ page_token = base64.b64encode(msgpack.dumps(index_to_tokenize)) return page_token.decode() def token_decode(page_token: str) -> Dict[bytes, Any]: """Read the page_token """ return msgpack.loads(base64.b64decode(page_token.encode()), raw=True) class ElasticSearch: def __init__(self, hosts: List[str], index_prefix=None): self._backend = Elasticsearch(hosts=hosts) self.index_prefix = index_prefix self.origin_index = "origin" if index_prefix: self.origin_index = index_prefix + "_" + self.origin_index @timed def check(self): return self._backend.ping() def deinitialize(self) -> None: """Removes all indices from the Elasticsearch backend""" self._backend.indices.delete(index="*") def initialize(self) -> None: """Declare Elasticsearch indices and mappings""" if not self._backend.indices.exists(index=self.origin_index): self._backend.indices.create(index=self.origin_index) self._backend.indices.put_mapping( index=self.origin_index, body={ "date_detection": False, "properties": { # sha1 of the URL; used as the document id "sha1": {"type": "keyword", "doc_values": True,}, # Used both to search URLs, and as the result to return # as a response to queries "url": { "type": "text", # To split URLs into token on any character # that is not alphanumerical "analyzer": "simple", # 2-gram and partial-3-gram search (ie. with the end of the # third word potentially missing) "fields": { "as_you_type": { "type": "search_as_you_type", "analyzer": "simple", } }, }, "visit_types": {"type": "keyword"}, # used to filter out origins that were never visited "has_visits": {"type": "boolean",}, "intrinsic_metadata": { "type": "nested", "properties": { "@context": { # don't bother indexing tokens in these URIs, as the # are used as namespaces "type": "keyword", } }, }, }, }, ) @timed def flush(self) -> None: self._backend.indices.refresh(index=self.origin_index) @timed - def origin_update(self, documents: Iterable[Dict]) -> None: + def origin_update(self, documents: Iterable[OriginDict]) -> None: documents = map(_sanitize_origin, documents) documents_with_sha1 = ( (origin_identifier(document), document) for document in documents ) # painless script that will be executed when updating an origin document update_script = """ // backup current visit_types field value List visit_types = ctx._source.getOrDefault("visit_types", []); // update origin document with new field values ctx._source.putAll(params); // restore previous visit types after visit_types field overriding if (ctx._source.containsKey("visit_types")) { for (int i = 0; i < visit_types.length; ++i) { if (!ctx._source.visit_types.contains(visit_types[i])) { ctx._source.visit_types.add(visit_types[i]); } } } """ actions = [ { "_op_type": "update", "_id": sha1, "_index": self.origin_index, "scripted_upsert": True, "upsert": {**document, "sha1": sha1,}, "script": { "source": update_script, "lang": "painless", "params": document, }, } for (sha1, document) in documents_with_sha1 ] indexed_count, errors = helpers.bulk( self._backend, actions, index=self.origin_index ) assert isinstance(errors, List) # Make mypy happy send_metric("document:index", count=indexed_count, method_name="origin_update") send_metric( "document:index_error", count=len(errors), method_name="origin_update" ) def origin_dump(self) -> Iterator[model.Origin]: results = helpers.scan(self._backend, index=self.origin_index) for hit in results: yield self._backend.termvectors( index=self.origin_index, id=hit["_id"], fields=["*"] ) @timed def origin_search( self, *, url_pattern: Optional[str] = None, metadata_pattern: Optional[str] = None, with_visit: bool = False, visit_types: Optional[List[str]] = None, page_token: Optional[str] = None, limit: int = 50, - ) -> PagedResult[Dict[str, Any]]: + ) -> PagedResult[MinimalOriginDict]: query_clauses: List[Dict[str, Any]] = [] if url_pattern: query_clauses.append( { "multi_match": { "query": url_pattern, "type": "bool_prefix", "operator": "and", "fields": [ "url.as_you_type", "url.as_you_type._2gram", "url.as_you_type._3gram", ], } } ) if metadata_pattern: query_clauses.append( { "nested": { "path": "intrinsic_metadata", "query": { "multi_match": { "query": metadata_pattern, # Makes it so that the "foo bar" query returns # documents which contain "foo" in a field and "bar" # in a different field "type": "cross_fields", # All keywords must be found in a document for it to # be considered a match. # TODO: allow missing keywords? "operator": "and", # Searches on all fields of the intrinsic_metadata dict, # recursively. "fields": ["intrinsic_metadata.*"], } }, } } ) if not query_clauses: raise ValueError( "At least one of url_pattern and metadata_pattern must be provided." ) if with_visit: query_clauses.append({"term": {"has_visits": True,}}) if visit_types is not None: query_clauses.append({"terms": {"visit_types": visit_types}}) body = { "query": {"bool": {"must": query_clauses,}}, "sort": [{"_score": "desc"}, {"sha1": "asc"},], } if page_token: # TODO: use ElasticSearch's scroll API? page_token_content = token_decode(page_token) body["search_after"] = [ page_token_content[b"score"], page_token_content[b"sha1"].decode("ascii"), ] res = self._backend.search(index=self.origin_index, body=body, size=limit) hits = res["hits"]["hits"] next_page_token: Optional[str] = None if len(hits) == limit: # There are more results after this page; return a pagination token # to get them in a future query last_hit = hits[-1] next_page_token_content = { b"score": last_hit["_score"], b"sha1": last_hit["_source"]["sha1"], } next_page_token = token_encode(next_page_token_content) assert len(hits) <= limit return PagedResult( results=[{"url": hit["_source"]["url"]} for hit in hits], next_page_token=next_page_token, ) diff --git a/swh/search/in_memory.py b/swh/search/in_memory.py index c668a48..1c400c2 100644 --- a/swh/search/in_memory.py +++ b/swh/search/in_memory.py @@ -1,148 +1,150 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import itertools import re from typing import Any, Dict, Iterable, Iterator, List, Optional from swh.model.identifiers import origin_identifier -from swh.search.interface import PagedResult +from swh.search.interface import MinimalOriginDict, OriginDict, PagedResult _words_regexp = re.compile(r"\w+") def _dict_words_set(d): """Recursively extract set of words from dict content.""" values = set() def extract(obj, words): if isinstance(obj, dict): for k, v in obj.items(): extract(v, words) elif isinstance(obj, list): for item in obj: extract(item, words) else: words.update(_words_regexp.findall(str(obj).lower())) return words return extract(d, values) class InMemorySearch: def __init__(self): pass def check(self): return True def deinitialize(self) -> None: if hasattr(self, "_origins"): del self._origins del self._origin_ids def initialize(self) -> None: self._origins: Dict[str, Dict[str, Any]] = defaultdict(dict) self._origin_ids: List[str] = [] def flush(self) -> None: pass _url_splitter = re.compile(r"\W") - def origin_update(self, documents: Iterable[Dict]) -> None: - for document in documents: - document = document.copy() + def origin_update(self, documents: Iterable[OriginDict]) -> None: + for source_document in documents: + document: Dict[str, Any] = dict(source_document) id_ = origin_identifier(document) if "url" in document: - document["_url_tokens"] = set(self._url_splitter.split(document["url"])) + document["_url_tokens"] = set( + self._url_splitter.split(source_document["url"]) + ) if "visit_types" in document: - document["visit_types"] = set(document["visit_types"]) + document["visit_types"] = set(source_document["visit_types"]) if "visit_types" in self._origins[id_]: document["visit_types"].update(self._origins[id_]["visit_types"]) self._origins[id_].update(document) if id_ not in self._origin_ids: self._origin_ids.append(id_) def origin_search( self, *, url_pattern: Optional[str] = None, metadata_pattern: Optional[str] = None, with_visit: bool = False, visit_types: Optional[List[str]] = None, page_token: Optional[str] = None, limit: int = 50, - ) -> PagedResult[Dict[str, Any]]: + ) -> PagedResult[MinimalOriginDict]: hits: Iterator[Dict[str, Any]] = ( self._origins[id_] for id_ in self._origin_ids ) if url_pattern: tokens = set(self._url_splitter.split(url_pattern)) def predicate(match): missing_tokens = tokens - match["_url_tokens"] if len(missing_tokens) == 0: return True elif len(missing_tokens) > 1: return False else: # There is one missing token, look up by prefix. (missing_token,) = missing_tokens return any( token.startswith(missing_token) for token in match["_url_tokens"] ) hits = filter(predicate, hits) if metadata_pattern: metadata_pattern_words = set( _words_regexp.findall(metadata_pattern.lower()) ) def predicate(match): if "intrinsic_metadata" not in match: return False return metadata_pattern_words.issubset( _dict_words_set(match["intrinsic_metadata"]) ) hits = filter(predicate, hits) if not url_pattern and not metadata_pattern: raise ValueError( "At least one of url_pattern and metadata_pattern must be provided." ) next_page_token: Optional[str] = None if with_visit: hits = filter(lambda o: o.get("has_visits"), hits) if visit_types is not None: visit_types_set = set(visit_types) hits = filter( lambda o: visit_types_set.intersection(o.get("visit_types", set())), hits, ) start_at_index = int(page_token) if page_token else 0 origins = [ {"url": hit["url"]} for hit in itertools.islice(hits, start_at_index, start_at_index + limit) ] if len(origins) == limit: next_page_token = str(start_at_index + limit) assert len(origins) <= limit return PagedResult(results=origins, next_page_token=next_page_token,) diff --git a/swh/search/interface.py b/swh/search/interface.py index b0c3e82..86dbb07 100644 --- a/swh/search/interface.py +++ b/swh/search/interface.py @@ -1,65 +1,80 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable, List, Optional, TypeVar +from typing import Iterable, List, Optional, TypeVar + +from typing_extensions import TypedDict from swh.core.api import remote_api_endpoint from swh.core.api.classes import PagedResult as CorePagedResult TResult = TypeVar("TResult") PagedResult = CorePagedResult[TResult, str] +class MinimalOriginDict(TypedDict): + """Mandatory keys of an :cls:`OriginDict`""" + + url: str + + +class OriginDict(MinimalOriginDict, total=False): + """Argument passed to :meth:`SearchInterface.origin_update`.""" + + visit_types: List[str] + has_visits: bool + + class SearchInterface: @remote_api_endpoint("check") def check(self): """Dedicated method to execute some specific check per implementation. """ ... @remote_api_endpoint("flush") def flush(self) -> None: """Blocks until all previous calls to _update() are completely applied. """ ... @remote_api_endpoint("origin/update") - def origin_update(self, documents: Iterable[Dict]) -> None: + def origin_update(self, documents: Iterable[OriginDict]) -> None: """Persist documents to the search backend. """ ... @remote_api_endpoint("origin/search") def origin_search( self, *, url_pattern: Optional[str] = None, metadata_pattern: Optional[str] = None, with_visit: bool = False, visit_types: Optional[List[str]] = None, page_token: Optional[str] = None, limit: int = 50, - ) -> PagedResult[Dict[str, Any]]: + ) -> PagedResult[MinimalOriginDict]: """Searches for origins matching the `url_pattern`. Args: url_pattern: Part of the URL to search for with_visit: Whether origins with no visit are to be filtered out visit_types: Only origins having any of the provided visit types (e.g. git, svn, pypi) will be returned page_token: Opaque value used for pagination limit: number of results to return Returns: PagedResult of origin dicts matching the search criteria. If next_page_token is None, there is no longer data to retrieve. """ ...