Changeset View
Changeset View
Standalone View
Standalone View
swh/search/elasticsearch.py
Show First 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | |||||
def token_decode(page_token: str) -> Dict[bytes, Any]: | def token_decode(page_token: str) -> Dict[bytes, Any]: | ||||
"""Read the page_token | """Read the page_token | ||||
""" | """ | ||||
return msgpack.loads(base64.b64decode(page_token.encode()), raw=True) | return msgpack.loads(base64.b64decode(page_token.encode()), raw=True) | ||||
class ElasticSearch: | class ElasticSearch: | ||||
def __init__(self, hosts: List[str]): | def __init__(self, hosts: List[str], index_prefix=None): | ||||
self._backend = Elasticsearch(hosts=hosts) | self._backend = Elasticsearch(hosts=hosts) | ||||
self.index_prefix = index_prefix | |||||
self.origin_index = "origin" | |||||
if index_prefix: | |||||
self.origin_index = index_prefix + "_" + self.origin_index | |||||
def check(self): | def check(self): | ||||
return self._backend.ping() | return self._backend.ping() | ||||
def deinitialize(self) -> None: | def deinitialize(self) -> None: | ||||
"""Removes all indices from the Elasticsearch backend""" | """Removes all indices from the Elasticsearch backend""" | ||||
self._backend.indices.delete(index="*") | self._backend.indices.delete(index="*") | ||||
def initialize(self) -> None: | def initialize(self) -> None: | ||||
"""Declare Elasticsearch indices and mappings""" | """Declare Elasticsearch indices and mappings""" | ||||
if not self._backend.indices.exists(index="origin"): | if not self._backend.indices.exists(index=self.origin_index): | ||||
self._backend.indices.create(index="origin") | self._backend.indices.create(index=self.origin_index) | ||||
self._backend.indices.put_mapping( | self._backend.indices.put_mapping( | ||||
index="origin", | index=self.origin_index, | ||||
body={ | body={ | ||||
"properties": { | "properties": { | ||||
# sha1 of the URL; used as the document id | # sha1 of the URL; used as the document id | ||||
"sha1": {"type": "keyword", "doc_values": True,}, | "sha1": {"type": "keyword", "doc_values": True,}, | ||||
# Used both to search URLs, and as the result to return | # Used both to search URLs, and as the result to return | ||||
# as a response to queries | # as a response to queries | ||||
"url": { | "url": { | ||||
"type": "text", | "type": "text", | ||||
Show All 21 Lines | def initialize(self) -> None: | ||||
} | } | ||||
}, | }, | ||||
}, | }, | ||||
} | } | ||||
}, | }, | ||||
) | ) | ||||
def flush(self) -> None: | def flush(self) -> None: | ||||
self._backend.indices.refresh(index="_all") | self._backend.indices.refresh(index=self.origin_index) | ||||
def origin_update(self, documents: Iterable[Dict]) -> None: | def origin_update(self, documents: Iterable[Dict]) -> None: | ||||
documents = map(_sanitize_origin, documents) | documents = map(_sanitize_origin, documents) | ||||
documents_with_sha1 = ( | documents_with_sha1 = ( | ||||
(origin_identifier(document), document) for document in documents | (origin_identifier(document), document) for document in documents | ||||
) | ) | ||||
actions = [ | actions = [ | ||||
{ | { | ||||
"_op_type": "update", | "_op_type": "update", | ||||
"_id": sha1, | "_id": sha1, | ||||
"_index": "origin", | "_index": self.origin_index, | ||||
"doc": {**document, "sha1": sha1,}, | "doc": {**document, "sha1": sha1,}, | ||||
"doc_as_upsert": True, | "doc_as_upsert": True, | ||||
} | } | ||||
for (sha1, document) in documents_with_sha1 | for (sha1, document) in documents_with_sha1 | ||||
] | ] | ||||
bulk(self._backend, actions, index="origin") | bulk(self._backend, actions, index=self.origin_index) | ||||
def origin_dump(self) -> Iterator[model.Origin]: | def origin_dump(self) -> Iterator[model.Origin]: | ||||
results = scan(self._backend, index="*") | results = scan(self._backend, index=self.origin_index) | ||||
for hit in results: | for hit in results: | ||||
yield self._backend.termvectors(index="origin", id=hit["_id"], fields=["*"]) | yield self._backend.termvectors( | ||||
index=self.origin_index, id=hit["_id"], fields=["*"] | |||||
) | |||||
def origin_search( | def origin_search( | ||||
self, | self, | ||||
*, | *, | ||||
url_pattern: Optional[str] = None, | url_pattern: Optional[str] = None, | ||||
metadata_pattern: Optional[str] = None, | metadata_pattern: Optional[str] = None, | ||||
with_visit: bool = False, | with_visit: bool = False, | ||||
page_token: Optional[str] = None, | page_token: Optional[str] = None, | ||||
▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | ) -> PagedResult[Dict[str, Any]]: | ||||
if page_token: | if page_token: | ||||
# TODO: use ElasticSearch's scroll API? | # TODO: use ElasticSearch's scroll API? | ||||
page_token_content = token_decode(page_token) | page_token_content = token_decode(page_token) | ||||
body["search_after"] = [ | body["search_after"] = [ | ||||
page_token_content[b"score"], | page_token_content[b"score"], | ||||
page_token_content[b"sha1"].decode("ascii"), | page_token_content[b"sha1"].decode("ascii"), | ||||
] | ] | ||||
res = self._backend.search(index="origin", body=body, size=limit) | res = self._backend.search(index=self.origin_index, body=body, size=limit) | ||||
hits = res["hits"]["hits"] | hits = res["hits"]["hits"] | ||||
next_page_token: Optional[str] = None | next_page_token: Optional[str] = None | ||||
if len(hits) == limit: | if len(hits) == limit: | ||||
# There are more results after this page; return a pagination token | # There are more results after this page; return a pagination token | ||||
# to get them in a future query | # to get them in a future query | ||||
Show All 13 Lines |