Changeset View
Changeset View
Standalone View
Standalone View
swh/search/elasticsearch.py
Show First 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | |||||
def token_decode(page_token: str) -> Dict[bytes, Any]: | def token_decode(page_token: str) -> Dict[bytes, Any]: | ||||
"""Read the page_token | """Read the page_token | ||||
""" | """ | ||||
return msgpack.loads(base64.b64decode(page_token.encode()), raw=True) | return msgpack.loads(base64.b64decode(page_token.encode()), raw=True) | ||||
class ElasticSearch: | class ElasticSearch: | ||||
def __init__(self, hosts: List[str], index_prefix=None): | def __init__( | ||||
self, | |||||
hosts: List[str], | |||||
index_name: str = "origin", | |||||
read_alias: str = "origin-read", | |||||
write_alias: str = "origin-write", | |||||
): | |||||
self._backend = Elasticsearch(hosts=hosts) | self._backend = Elasticsearch(hosts=hosts) | ||||
self.index_prefix = index_prefix | |||||
self.origin_index = "origin" | |||||
if index_prefix: | self.index_name = index_name | ||||
self.origin_index = index_prefix + "_" + self.origin_index | self.read_alias = read_alias | ||||
self.write_alias = write_alias | |||||
@timed | @timed | ||||
def check(self): | def check(self): | ||||
return self._backend.ping() | return self._backend.ping() | ||||
def deinitialize(self) -> None: | def deinitialize(self) -> None: | ||||
"""Removes all indices from the Elasticsearch backend""" | """Removes all indices from the Elasticsearch backend""" | ||||
self._backend.indices.delete(index="*") | self._backend.indices.delete(index="*") | ||||
def initialize(self) -> None: | def initialize(self) -> None: | ||||
"""Declare Elasticsearch indices and mappings""" | """Declare Elasticsearch indices, aliases and mappings""" | ||||
if not self._backend.indices.exists(index=self.origin_index): | if not self._backend.indices.exists(index=self.index_name): | ||||
self._backend.indices.create(index=self.origin_index) | self._backend.indices.create(index=self.index_name) | ||||
if not self._backend.indices.exists_alias(self.read_alias): | |||||
self._backend.indices.put_alias(index=self.index_name, name=self.read_alias) | |||||
if not self._backend.indices.exists_alias(self.write_alias): | |||||
self._backend.indices.put_alias( | |||||
index=self.index_name, name=self.write_alias | |||||
) | |||||
self._backend.indices.put_mapping( | self._backend.indices.put_mapping( | ||||
index=self.origin_index, | index=self.index_name, | ||||
body={ | body={ | ||||
"date_detection": False, | "date_detection": False, | ||||
"properties": { | "properties": { | ||||
# sha1 of the URL; used as the document id | # sha1 of the URL; used as the document id | ||||
"sha1": {"type": "keyword", "doc_values": True,}, | "sha1": {"type": "keyword", "doc_values": True,}, | ||||
# Used both to search URLs, and as the result to return | # Used both to search URLs, and as the result to return | ||||
# as a response to queries | # as a response to queries | ||||
"url": { | "url": { | ||||
Show All 24 Lines | def initialize(self) -> None: | ||||
}, | }, | ||||
}, | }, | ||||
}, | }, | ||||
}, | }, | ||||
) | ) | ||||
@timed | @timed | ||||
def flush(self) -> None: | def flush(self) -> None: | ||||
self._backend.indices.refresh(index=self.origin_index) | self._backend.indices.refresh(index=self.write_alias) | ||||
@timed | @timed | ||||
def origin_update(self, documents: Iterable[OriginDict]) -> None: | def origin_update(self, documents: Iterable[OriginDict]) -> None: | ||||
documents = map(_sanitize_origin, documents) | documents = map(_sanitize_origin, documents) | ||||
documents_with_sha1 = ( | documents_with_sha1 = ( | ||||
(origin_identifier(document), document) for document in documents | (origin_identifier(document), document) for document in documents | ||||
) | ) | ||||
# painless script that will be executed when updating an origin document | # painless script that will be executed when updating an origin document | ||||
Show All 13 Lines | def origin_update(self, documents: Iterable[OriginDict]) -> None: | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
actions = [ | actions = [ | ||||
{ | { | ||||
"_op_type": "update", | "_op_type": "update", | ||||
"_id": sha1, | "_id": sha1, | ||||
"_index": self.origin_index, | "_index": self.write_alias, | ||||
"scripted_upsert": True, | "scripted_upsert": True, | ||||
"upsert": {**document, "sha1": sha1,}, | "upsert": {**document, "sha1": sha1,}, | ||||
"script": { | "script": { | ||||
"source": update_script, | "source": update_script, | ||||
"lang": "painless", | "lang": "painless", | ||||
"params": document, | "params": document, | ||||
}, | }, | ||||
} | } | ||||
for (sha1, document) in documents_with_sha1 | for (sha1, document) in documents_with_sha1 | ||||
] | ] | ||||
indexed_count, errors = helpers.bulk( | indexed_count, errors = helpers.bulk( | ||||
self._backend, actions, index=self.origin_index | self._backend, actions, index=self.write_alias | ||||
) | ) | ||||
assert isinstance(errors, List) # Make mypy happy | assert isinstance(errors, List) # Make mypy happy | ||||
send_metric("document:index", count=indexed_count, method_name="origin_update") | send_metric("document:index", count=indexed_count, method_name="origin_update") | ||||
send_metric( | send_metric( | ||||
"document:index_error", count=len(errors), method_name="origin_update" | "document:index_error", count=len(errors), method_name="origin_update" | ||||
) | ) | ||||
def origin_dump(self) -> Iterator[model.Origin]: | def origin_dump(self) -> Iterator[model.Origin]: | ||||
results = helpers.scan(self._backend, index=self.origin_index) | results = helpers.scan(self._backend, index=self.read_alias) | ||||
for hit in results: | for hit in results: | ||||
yield self._backend.termvectors( | yield self._backend.termvectors( | ||||
index=self.origin_index, id=hit["_id"], fields=["*"] | index=self.read_alias, id=hit["_id"], fields=["*"] | ||||
) | ) | ||||
@timed | @timed | ||||
def origin_search( | def origin_search( | ||||
self, | self, | ||||
*, | *, | ||||
url_pattern: Optional[str] = None, | url_pattern: Optional[str] = None, | ||||
metadata_pattern: Optional[str] = None, | metadata_pattern: Optional[str] = None, | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | ) -> PagedResult[MinimalOriginDict]: | ||||
if page_token: | if page_token: | ||||
# TODO: use ElasticSearch's scroll API? | # TODO: use ElasticSearch's scroll API? | ||||
page_token_content = token_decode(page_token) | page_token_content = token_decode(page_token) | ||||
body["search_after"] = [ | body["search_after"] = [ | ||||
page_token_content[b"score"], | page_token_content[b"score"], | ||||
page_token_content[b"sha1"].decode("ascii"), | page_token_content[b"sha1"].decode("ascii"), | ||||
] | ] | ||||
res = self._backend.search(index=self.origin_index, body=body, size=limit) | res = self._backend.search(index=self.read_alias, body=body, size=limit) | ||||
hits = res["hits"]["hits"] | hits = res["hits"]["hits"] | ||||
next_page_token: Optional[str] = None | next_page_token: Optional[str] = None | ||||
if len(hits) == limit: | if len(hits) == limit: | ||||
# There are more results after this page; return a pagination token | # There are more results after this page; return a pagination token | ||||
# to get them in a future query | # to get them in a future query | ||||
Show All 13 Lines |