Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend_es.py
# Copyright (C) 2018 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""Elastic Search backend | """Elastic Search backend | ||||
""" | """ | ||||
import logging | |||||
from copy import deepcopy | from copy import deepcopy | ||||
from swh.core import utils | from swh.core import utils | ||||
from elasticsearch import Elasticsearch | from elasticsearch import Elasticsearch | ||||
from elasticsearch import helpers | from elasticsearch import helpers | ||||
logger = logging.getLogger(__name__) | |||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'elastic_search': { | 'elastic_search': { | ||||
'storage_nodes': {'host': 'localhost', 'port': 9200}, | 'storage_nodes': {'host': 'localhost', 'port': 9200}, | ||||
'index_name_prefix': 'swh-tasks', | 'index_name_prefix': 'swh-tasks', | ||||
'client_options': { | 'client_options': { | ||||
'sniff_on_start': False, | 'sniff_on_start': False, | ||||
'sniff_on_connection_fail': True, | 'sniff_on_connection_fail': True, | ||||
Show All 38 Lines | def index(self, data): | ||||
""" | """ | ||||
date = data['ended'] | date = data['ended'] | ||||
index_name = self.compute_index_name(date.year, date.month) | index_name = self.compute_index_name(date.year, date.month) | ||||
return self.storage.index(index=index_name, | return self.storage.index(index=index_name, | ||||
doc_type=self.doc_type, | doc_type=self.doc_type, | ||||
body=data) | body=data) | ||||
def mget(self, index_name, doc_ids, chunk_size=500, | def mget(self, index_name, doc_ids, chunk_size=500, | ||||
source=True, log=None): | source=True): | ||||
"""Retrieve document's full content according to their ids as per | """Retrieve document's full content according to their ids as per | ||||
source's setup. | source's setup. | ||||
The `source` permits to retrieve only what's of interest to | The `source` allows to retrieve only what's interesting, e.g: | ||||
us, e.g: | |||||
- source=True ; gives back the original indexed data | - source=True ; gives back the original indexed data | ||||
- source=False ; returns without the original _source field | - source=False ; returns without the original _source field | ||||
- source=['task_id'] ; returns only task_id in the _source field | - source=['task_id'] ; returns only task_id in the _source field | ||||
Args: | Args: | ||||
index_name (str): Name of the concerned index. | index_name (str): Name of the concerned index. | ||||
doc_ids (generator): Generator of ids to retrieve | doc_ids (generator): Generator of ids to retrieve | ||||
chunk_size (int): Number of documents chunk to send for retrieval | chunk_size (int): Number of documents chunk to send for retrieval | ||||
Show All 9 Lines | def mget(self, index_name, doc_ids, chunk_size=500, | ||||
source = {'_source': str(source).lower()} | source = {'_source': str(source).lower()} | ||||
for ids in utils.grouper(doc_ids, n=1000): | for ids in utils.grouper(doc_ids, n=1000): | ||||
res = self.storage.mget(body={'ids': list(ids)}, | res = self.storage.mget(body={'ids': list(ids)}, | ||||
index=index_name, | index=index_name, | ||||
doc_type=self.doc_type, | doc_type=self.doc_type, | ||||
params=source) | params=source) | ||||
if not res: | if not res: | ||||
if log: | logger.error('Error during retrieval of data, skipping!') | ||||
log.error('Error during retrieval of data, skipping!') | |||||
continue | continue | ||||
for doc in res['docs']: | for doc in res['docs']: | ||||
found = doc.get('found') | found = doc.get('found') | ||||
if not found: | if not found: | ||||
msg = 'Doc id %s not found, not indexed yet' % doc['_id'] | msg = 'Doc id %s not found, not indexed yet' % doc['_id'] | ||||
if log: | logger.warning(msg) | ||||
log.warning(msg) | |||||
continue | continue | ||||
yield doc['_source'] | yield doc['_source'] | ||||
def _streaming_bulk(self, index_name, doc_stream, chunk_size=500, | def _streaming_bulk(self, index_name, doc_stream, chunk_size=500): | ||||
log=None): | |||||
"""Bulk index data and returns the successful indexed data's | """Bulk index data and returns the successful indexed data's | ||||
identifier. | identifier. | ||||
Args: | Args: | ||||
index_name (str): Name of the concerned index. | index_name (str): Name of the concerned index. | ||||
doc_stream (generator): Generator of documents to index | doc_stream (generator): Generator of documents to index | ||||
chunk_size (int): Number of documents chunk to send for indexation | chunk_size (int): Number of documents chunk to send for indexation | ||||
Yields: | Yields: | ||||
document id indexed | document id indexed | ||||
""" | """ | ||||
actions = ({'_index': index_name, | actions = ({'_index': index_name, | ||||
'_op_type': 'index', | '_op_type': 'index', | ||||
'_type': self.doc_type, | '_type': self.doc_type, | ||||
'_source': data} for data in doc_stream) | '_source': data} for data in doc_stream) | ||||
for ok, result in helpers.streaming_bulk(client=self.storage, | for ok, result in helpers.streaming_bulk(client=self.storage, | ||||
actions=actions, | actions=actions, | ||||
chunk_size=chunk_size, | chunk_size=chunk_size, | ||||
raise_on_error=False, | raise_on_error=False, | ||||
raise_on_exception=False): | raise_on_exception=False): | ||||
if not ok: | if not ok: | ||||
if log: | logger.error('Error during %s indexation. Skipping.', result) | ||||
log.error('Error during %s indexation. Skipping.' % result) | |||||
continue | continue | ||||
yield result['index']['_id'] | yield result['index']['_id'] | ||||
def is_index_opened(self, index_name: str) -> bool: | |||||
"""Determine if an index is opened or not | |||||
""" | |||||
try: | |||||
self.storage.indices.stats(index_name) | |||||
return True | |||||
except Exception: | |||||
# fails when indice is closed (no other api call found) | |||||
ardumont: I did not find a better way to check for the index's state...
If you know better, by all means… | |||||
return False | |||||
def streaming_bulk(self, index_name, doc_stream, chunk_size=500, | def streaming_bulk(self, index_name, doc_stream, chunk_size=500, | ||||
source=True, log=None): | source=True): | ||||
"""Bulk index data and returns the successful indexed data as per | """Bulk index data and returns the successful indexed data as per | ||||
source's setup. | source's setup. | ||||
the `source` permits to retrieve only what's of interest to | the `source` permits to retrieve only what's of interest to | ||||
us, e.g: | us, e.g: | ||||
- source=True ; gives back the original indexed data | - source=True ; gives back the original indexed data | ||||
- source=False ; returns without the original _source field | - source=False ; returns without the original _source field | ||||
- source=['task_id'] ; returns only task_id in the _source field | - source=['task_id'] ; returns only task_id in the _source field | ||||
Args: | Args: | ||||
index_name (str): Name of the concerned index. | index_name (str): Name of the concerned index. | ||||
doc_stream (generator): Document generator to index | doc_stream (generator): Document generator to index | ||||
chunk_size (int): Number of documents chunk to send | chunk_size (int): Number of documents chunk to send | ||||
source (bool, [str]): the information to return | source (bool, [str]): the information to return | ||||
""" | """ | ||||
to_close = False | |||||
# index must exist | |||||
if not self.storage.indices.exists(index_name): | |||||
Done Inline ActionsYou may want to have this in a finally:. vlorentz: You may want to have this in a `finally:`. | |||||
# server is setup-ed correctly (mappings, settings are | |||||
# automatically set, cf. /data/README.md) | |||||
self.storage.indices.create(index_name) | |||||
# Close that new index (to avoid too much opened indices) | |||||
to_close = True | |||||
# index must be opened | |||||
if not self.is_index_opened(index_name): | |||||
to_close = True | |||||
self.storage.indices.open(index_name) | |||||
try: | |||||
indexed_ids = self._streaming_bulk( | indexed_ids = self._streaming_bulk( | ||||
index_name, doc_stream, chunk_size=chunk_size, log=log) | index_name, doc_stream, chunk_size=chunk_size) | ||||
yield from self.mget(index_name, indexed_ids, chunk_size=chunk_size, | yield from self.mget( | ||||
source=source, log=log) | index_name, indexed_ids, chunk_size=chunk_size, source=source) | ||||
finally: | |||||
# closing it to stay in the same state as prior to the call | |||||
if to_close: | |||||
self.storage.indices.close(index_name) |
I did not find a better way to check for the index's state...
If you know better, by all means ;)