Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend_es.py
Show First 20 Lines • Show All 65 Lines • ▼ Show 20 Lines | def index(self, data): | ||||
doc_type=self.doc_type, | doc_type=self.doc_type, | ||||
body=data) | body=data) | ||||
def mget(self, index_name, doc_ids, chunk_size=500, | def mget(self, index_name, doc_ids, chunk_size=500, | ||||
source=True, log=None): | source=True, log=None): | ||||
"""Retrieve document's full content according to their ids as per | """Retrieve document's full content according to their ids as per | ||||
source's setup. | source's setup. | ||||
The `source` permits to retrieve only what's of interest to | The `source` allows to retrieve only what's interesting, e.g: | ||||
us, e.g: | |||||
- source=True ; gives back the original indexed data | - source=True ; gives back the original indexed data | ||||
- source=False ; returns without the original _source field | - source=False ; returns without the original _source field | ||||
- source=['task_id'] ; returns only task_id in the _source field | - source=['task_id'] ; returns only task_id in the _source field | ||||
Args: | Args: | ||||
index_name (str): Name of the concerned index. | index_name (str): Name of the concerned index. | ||||
doc_ids (generator): Generator of ids to retrieve | doc_ids (generator): Generator of ids to retrieve | ||||
chunk_size (int): Number of documents chunk to send for retrieval | chunk_size (int): Number of documents chunk to send for retrieval | ||||
▲ Show 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | def _streaming_bulk(self, index_name, doc_stream, chunk_size=500, | ||||
raise_on_error=False, | raise_on_error=False, | ||||
raise_on_exception=False): | raise_on_exception=False): | ||||
if not ok: | if not ok: | ||||
if log: | if log: | ||||
log.error('Error during %s indexation. Skipping.' % result) | log.error('Error during %s indexation. Skipping.' % result) | ||||
continue | continue | ||||
yield result['index']['_id'] | yield result['index']['_id'] | ||||
def is_index_opened(self, index_name: str) -> bool: | |||||
"""Determine if an index is opened or not | |||||
""" | |||||
try: | |||||
self.storage.indices.stats(index_name) | |||||
return True | |||||
except Exception: | |||||
# fails when indice is closed (no other api call found) | |||||
ardumont: I did not find a better way to check for the index's state...
If you know better, by all means… | |||||
return False | |||||
def streaming_bulk(self, index_name, doc_stream, chunk_size=500, | def streaming_bulk(self, index_name, doc_stream, chunk_size=500, | ||||
source=True, log=None): | source=True, log=None): | ||||
"""Bulk index data and returns the successful indexed data as per | """Bulk index data and returns the successful indexed data as per | ||||
source's setup. | source's setup. | ||||
the `source` permits to retrieve only what's of interest to | the `source` permits to retrieve only what's of interest to | ||||
us, e.g: | us, e.g: | ||||
- source=True ; gives back the original indexed data | - source=True ; gives back the original indexed data | ||||
- source=False ; returns without the original _source field | - source=False ; returns without the original _source field | ||||
- source=['task_id'] ; returns only task_id in the _source field | - source=['task_id'] ; returns only task_id in the _source field | ||||
Args: | Args: | ||||
index_name (str): Name of the concerned index. | index_name (str): Name of the concerned index. | ||||
doc_stream (generator): Document generator to index | doc_stream (generator): Document generator to index | ||||
chunk_size (int): Number of documents chunk to send | chunk_size (int): Number of documents chunk to send | ||||
source (bool, [str]): the information to return | source (bool, [str]): the information to return | ||||
""" | """ | ||||
to_close = False | |||||
# index must exist | |||||
if not self.storage.indices.exists(index_name): | |||||
# server is setup-ed correctly (mappings, settings are | |||||
# automatically set, cf. /data/README.md) | |||||
self.storage.indices.create(index_name) | |||||
# Close that new index (to avoid too much opened indices) | |||||
to_close = True | |||||
# index must be opened | |||||
if not self.is_index_opened(index_name): | |||||
to_close = True | |||||
self.storage.indices.open(index_name) | |||||
indexed_ids = self._streaming_bulk( | indexed_ids = self._streaming_bulk( | ||||
index_name, doc_stream, chunk_size=chunk_size, log=log) | index_name, doc_stream, chunk_size=chunk_size, log=log) | ||||
yield from self.mget(index_name, indexed_ids, chunk_size=chunk_size, | yield from self.mget(index_name, indexed_ids, chunk_size=chunk_size, | ||||
source=source, log=log) | source=source, log=log) | ||||
# closing it to stay in the same state as prior to the call | |||||
if to_close: | |||||
self.storage.indices.close(index_name) | |||||
vlorentzUnsubmitted Done Inline ActionsYou may want to have this in a finally:. vlorentz: You may want to have this in a `finally:`. |
I did not find a better way to check for the index's state...
If you know better, by all means ;)