Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend_es.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""Elastic Search backend | """Elastic Search backend | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 224 Lines • ▼ Show 20 Lines | def streaming_bulk(self, index_name, doc_stream, chunk_size=500, | ||||
the `source` permits to retrieve only what's of interest to | the `source` permits to retrieve only what's of interest to | ||||
us, e.g: | us, e.g: | ||||
- source=True ; gives back the original indexed data | - source=True ; gives back the original indexed data | ||||
- source=False ; returns without the original _source field | - source=False ; returns without the original _source field | ||||
- source=['task_id'] ; returns only task_id in the _source field | - source=['task_id'] ; returns only task_id in the _source field | ||||
Note that: | |||||
- if the index is closed, it will be opened | |||||
- if the index does not exist, it will be created and opened | |||||
This keeps the index opened for performance reasons. | |||||
douardda: not very fond of the phrasing here. Maybe something like:
"keep the index open for performance… | |||||
Done Inline Actions
will adapt.
it's not. ardumont: > not very fond of the phrasing here. Maybe something like: "keep the index open for… | |||||
Args: | Args: | ||||
index_name (str): Name of the concerned index. | index_name (str): Name of the concerned index. | ||||
doc_stream (generator): Document generator to index | doc_stream (generator): Document generator to index | ||||
chunk_size (int): Number of documents chunk to send | chunk_size (int): Number of documents chunk to send | ||||
source (bool, [str]): the information to return | source (bool, [str]): the information to return | ||||
""" | """ | ||||
to_close = False | |||||
# index must exist | # index must exist | ||||
if not self.storage.indices.exists(index_name): | if not self.storage.indices.exists(index_name): | ||||
self.create(index_name) | self.create(index_name) | ||||
# Close that new index (to avoid too much opened indices) | |||||
to_close = True | |||||
# index must be opened | # index must be opened | ||||
if not self.is_index_opened(index_name): | if not self.is_index_opened(index_name): | ||||
to_close = True | |||||
self.storage.indices.open(index_name) | self.storage.indices.open(index_name) | ||||
try: | |||||
indexed_ids = self._streaming_bulk( | indexed_ids = self._streaming_bulk( | ||||
index_name, doc_stream, chunk_size=chunk_size) | index_name, doc_stream, chunk_size=chunk_size) | ||||
yield from self.mget( | yield from self.mget( | ||||
index_name, indexed_ids, chunk_size=chunk_size, source=source) | index_name, indexed_ids, chunk_size=chunk_size, source=source) | ||||
finally: | |||||
# closing it to stay in the same state as prior to the call | |||||
if to_close: | |||||
self.storage.indices.close(index_name) |
not very fond of the phrasing here. Maybe something like:
"keep the index open for performance reasons." and if it's true "The index will be automatically closed on exit" (because of some __del__ method, gc or whatever handles this properly).