Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend_es.py
# Copyright (C) 2018 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""Elastic Search backend | """Elastic Search backend | ||||
""" | """ | ||||
import datetime # noqa | |||||
import logging | |||||
from copy import deepcopy | from copy import deepcopy | ||||
from typing import Any, Dict | |||||
from swh.core import utils | |||||
from elasticsearch import Elasticsearch | |||||
from elasticsearch import helpers | from elasticsearch import helpers | ||||
from swh.core import utils | |||||
logger = logging.getLogger(__name__) | |||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'elastic_search': { | 'elasticsearch': { | ||||
'storage_nodes': {'host': 'localhost', 'port': 9200}, | 'cls': 'local', | ||||
'args': { | |||||
'index_name_prefix': 'swh-tasks', | 'index_name_prefix': 'swh-tasks', | ||||
'storage_nodes': ['localhost:9200'], | |||||
'client_options': { | 'client_options': { | ||||
'sniff_on_start': False, | 'sniff_on_start': False, | ||||
'sniff_on_connection_fail': True, | 'sniff_on_connection_fail': True, | ||||
'http_compress': False, | 'http_compress': False, | ||||
'sniffer_timeout': 60 | |||||
}, | }, | ||||
}, | }, | ||||
} | } | ||||
} | |||||
class SWHElasticSearchClient: | def get_elasticsearch(cls: str, args: Dict[str, Any] = {}): | ||||
"""Instantiate an elastic search instance | |||||
""" | |||||
if cls == 'local': | |||||
from elasticsearch import Elasticsearch | |||||
elif cls == 'memory': | |||||
from .elasticsearch_memory import MemoryElasticsearch as Elasticsearch # type: ignore # noqa | |||||
else: | |||||
raise ValueError('Unknown elasticsearch class `%s`' % cls) | |||||
return Elasticsearch(**args) | |||||
class ElasticSearchBackend: | |||||
"""ElasticSearch backend to index tasks | |||||
This uses an elasticsearch client to actually discuss with the | |||||
elasticsearch instance. | |||||
""" | |||||
def __init__(self, **config): | def __init__(self, **config): | ||||
self.config = deepcopy(DEFAULT_CONFIG) | self.config = deepcopy(DEFAULT_CONFIG) | ||||
self.config.update(config) | self.config.update(config) | ||||
es_conf = self.config['elastic_search'] | es_conf = self.config['elasticsearch'] | ||||
options = es_conf.get('client_options', {}) | args = deepcopy(es_conf['args']) | ||||
self.storage = Elasticsearch( | self.index_name_prefix = args.pop('index_name_prefix') | ||||
# nodes to use by default | self.storage = get_elasticsearch( | ||||
es_conf['storage_nodes'], | cls=es_conf['cls'], | ||||
# auto detect cluster's status | args={ | ||||
sniff_on_start=options['sniff_on_start'], | 'storage_nodes': args.get('storage_nodes', []), | ||||
sniff_on_connection_fail=options['sniff_on_connection_fail'], | **args.get('client_options', {}), | ||||
sniffer_timeout=60, | } | ||||
# compression or not | ) | ||||
http_compress=options['http_compress']) | # document's index type (cf. /data/elastic-template.json) | ||||
self.index_name_prefix = es_conf['index_name_prefix'] | |||||
# document's index type (cf. ../../data/elastic-template.json) | |||||
self.doc_type = 'task' | self.doc_type = 'task' | ||||
def initialize(self): | |||||
self.storage.indices.put_mapping( | |||||
index=f"{self.index_name_prefix}-*", | |||||
doc_type=self.doc_type, | |||||
# to allow type definition below | |||||
include_type_name=True, | |||||
# to allow install mapping even if no index yet | |||||
allow_no_indices=True, | |||||
body={ | |||||
"properties": { | |||||
"task_id": {"type": "double"}, | |||||
"task_policy": {"type": "text"}, | |||||
"task_status": {"type": "text"}, | |||||
"task_run_id": {"type": "double"}, | |||||
"arguments": { | |||||
"type": "object", | |||||
"properties": { | |||||
"args": { | |||||
"type": "nested", | |||||
"dynamic": False | |||||
}, | |||||
"kwargs": { | |||||
"type": "text" | |||||
} | |||||
} | |||||
}, | |||||
"type": {"type": "text"}, | |||||
"backend_id": {"type": "text"}, | |||||
"metadata": { | |||||
"type": "object", | |||||
"enabled": False | |||||
}, | |||||
"scheduled": { | |||||
"type": "date", | |||||
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa | |||||
}, | |||||
"started": { | |||||
"type": "date", | |||||
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa | |||||
}, | |||||
"ended": { | |||||
"type": "date", | |||||
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa | |||||
} | |||||
} | |||||
}) | |||||
self.storage.indices.put_settings( | |||||
index=f"{self.index_name_prefix}-*", | |||||
allow_no_indices=True, | |||||
body={ | |||||
"index": { | |||||
"codec": "best_compression", | |||||
"refresh_interval": "1s", | |||||
"number_of_shards": 1 | |||||
} | |||||
}) | |||||
def create(self, index_name) -> None: | |||||
"""Create and initialize index_name with mapping for all indices | |||||
matching `swh-tasks-` pattern | |||||
""" | |||||
assert index_name.startswith(self.index_name_prefix) | |||||
self.storage.indices.create(index_name) | |||||
def compute_index_name(self, year, month): | def compute_index_name(self, year, month): | ||||
"""Given a year, month, compute the index's name. | """Given a year, month, compute the index's name. | ||||
""" | """ | ||||
return '%s-%s-%s' % ( | return '%s-%s-%s' % ( | ||||
self.index_name_prefix, year, '%02d' % month) | self.index_name_prefix, year, '%02d' % month) | ||||
def index(self, data): | |||||
"""Index given data to elasticsearch. | |||||
The field 'ended' in data is used to compute the index to | |||||
index data to. | |||||
""" | |||||
date = data['ended'] | |||||
index_name = self.compute_index_name(date.year, date.month) | |||||
return self.storage.index(index=index_name, | |||||
doc_type=self.doc_type, | |||||
body=data) | |||||
def mget(self, index_name, doc_ids, chunk_size=500, | def mget(self, index_name, doc_ids, chunk_size=500, | ||||
source=True, log=None): | source=True): | ||||
"""Retrieve document's full content according to their ids as per | """Retrieve document's full content according to their ids as per | ||||
source's setup. | source's setup. | ||||
The `source` permits to retrieve only what's of interest to | The `source` allows to retrieve only what's interesting, e.g: | ||||
us, e.g: | |||||
- source=True ; gives back the original indexed data | - source=True ; gives back the original indexed data | ||||
- source=False ; returns without the original _source field | - source=False ; returns without the original _source field | ||||
- source=['task_id'] ; returns only task_id in the _source field | - source=['task_id'] ; returns only task_id in the _source field | ||||
Args: | Args: | ||||
index_name (str): Name of the concerned index. | index_name (str): Name of the concerned index. | ||||
doc_ids (generator): Generator of ids to retrieve | doc_ids (generator): Generator of ids to retrieve | ||||
chunk_size (int): Number of documents chunk to send for retrieval | chunk_size (int): Number of documents chunk to send for retrieval | ||||
Show All 9 Lines | def mget(self, index_name, doc_ids, chunk_size=500, | ||||
source = {'_source': str(source).lower()} | source = {'_source': str(source).lower()} | ||||
for ids in utils.grouper(doc_ids, n=1000): | for ids in utils.grouper(doc_ids, n=1000): | ||||
res = self.storage.mget(body={'ids': list(ids)}, | res = self.storage.mget(body={'ids': list(ids)}, | ||||
index=index_name, | index=index_name, | ||||
doc_type=self.doc_type, | doc_type=self.doc_type, | ||||
params=source) | params=source) | ||||
if not res: | if not res: | ||||
if log: | logger.error('Error during retrieval of data, skipping!') | ||||
log.error('Error during retrieval of data, skipping!') | |||||
continue | continue | ||||
for doc in res['docs']: | for doc in res['docs']: | ||||
found = doc.get('found') | found = doc.get('found') | ||||
if not found: | if not found: | ||||
msg = 'Doc id %s not found, not indexed yet' % doc['_id'] | msg = 'Doc id %s not found, not indexed yet' % doc['_id'] | ||||
if log: | logger.warning(msg) | ||||
log.warning(msg) | |||||
continue | continue | ||||
yield doc['_source'] | yield doc['_source'] | ||||
def _streaming_bulk(self, index_name, doc_stream, chunk_size=500, | def _streaming_bulk(self, index_name, doc_stream, chunk_size=500): | ||||
log=None): | |||||
"""Bulk index data and returns the successful indexed data's | """Bulk index data and returns the successful indexed data's | ||||
identifier. | identifier. | ||||
Args: | Args: | ||||
index_name (str): Name of the concerned index. | index_name (str): Name of the concerned index. | ||||
doc_stream (generator): Generator of documents to index | doc_stream (generator): Generator of documents to index | ||||
chunk_size (int): Number of documents chunk to send for indexation | chunk_size (int): Number of documents chunk to send for indexation | ||||
Yields: | Yields: | ||||
document id indexed | document id indexed | ||||
""" | """ | ||||
actions = ({'_index': index_name, | actions = ({'_index': index_name, | ||||
'_op_type': 'index', | '_op_type': 'index', | ||||
'_type': self.doc_type, | '_type': self.doc_type, | ||||
'_source': data} for data in doc_stream) | '_source': data} for data in doc_stream) | ||||
for ok, result in helpers.streaming_bulk(client=self.storage, | for ok, result in helpers.streaming_bulk(client=self.storage, | ||||
actions=actions, | actions=actions, | ||||
chunk_size=chunk_size, | chunk_size=chunk_size, | ||||
raise_on_error=False, | raise_on_error=False, | ||||
raise_on_exception=False): | raise_on_exception=False): | ||||
if not ok: | if not ok: | ||||
if log: | logger.error('Error during %s indexation. Skipping.', result) | ||||
log.error('Error during %s indexation. Skipping.' % result) | |||||
continue | continue | ||||
yield result['index']['_id'] | yield result['index']['_id'] | ||||
def is_index_opened(self, index_name: str) -> bool: | |||||
"""Determine if an index is opened or not | |||||
""" | |||||
try: | |||||
self.storage.indices.stats(index_name) | |||||
return True | |||||
except Exception: | |||||
# fails when indice is closed (no other api call found) | |||||
ardumont: I did not find a better way to check for the index's state...
If you know better, by all means… | |||||
return False | |||||
def streaming_bulk(self, index_name, doc_stream, chunk_size=500, | def streaming_bulk(self, index_name, doc_stream, chunk_size=500, | ||||
source=True, log=None): | source=True): | ||||
"""Bulk index data and returns the successful indexed data as per | """Bulk index data and returns the successful indexed data as per | ||||
source's setup. | source's setup. | ||||
the `source` permits to retrieve only what's of interest to | the `source` permits to retrieve only what's of interest to | ||||
us, e.g: | us, e.g: | ||||
- source=True ; gives back the original indexed data | - source=True ; gives back the original indexed data | ||||
- source=False ; returns without the original _source field | - source=False ; returns without the original _source field | ||||
- source=['task_id'] ; returns only task_id in the _source field | - source=['task_id'] ; returns only task_id in the _source field | ||||
Args: | Args: | ||||
index_name (str): Name of the concerned index. | index_name (str): Name of the concerned index. | ||||
doc_stream (generator): Document generator to index | doc_stream (generator): Document generator to index | ||||
chunk_size (int): Number of documents chunk to send | chunk_size (int): Number of documents chunk to send | ||||
source (bool, [str]): the information to return | source (bool, [str]): the information to return | ||||
""" | """ | ||||
to_close = False | |||||
# index must exist | |||||
if not self.storage.indices.exists(index_name): | |||||
self.create(index_name) | |||||
# Close that new index (to avoid too much opened indices) | |||||
to_close = True | |||||
# index must be opened | |||||
if not self.is_index_opened(index_name): | |||||
to_close = True | |||||
self.storage.indices.open(index_name) | |||||
try: | |||||
indexed_ids = self._streaming_bulk( | indexed_ids = self._streaming_bulk( | ||||
index_name, doc_stream, chunk_size=chunk_size, log=log) | index_name, doc_stream, chunk_size=chunk_size) | ||||
Done Inline ActionsYou may want to have this in a finally:. vlorentz: You may want to have this in a `finally:`. | |||||
yield from self.mget(index_name, indexed_ids, chunk_size=chunk_size, | yield from self.mget( | ||||
source=source, log=log) | index_name, indexed_ids, chunk_size=chunk_size, source=source) | ||||
finally: | |||||
# closing it to stay in the same state as prior to the call | |||||
if to_close: | |||||
self.storage.indices.close(index_name) |
I did not find a better way to check for the index's state...
If you know better, by all means ;)