diff --git a/swh/scheduler/backend_es.py b/swh/scheduler/backend_es.py index 8ff1481..30bbbbb 100644 --- a/swh/scheduler/backend_es.py +++ b/swh/scheduler/backend_es.py @@ -1,268 +1,265 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Elastic Search backend """ import datetime # noqa import logging from copy import deepcopy from typing import Any, Dict from elasticsearch import helpers from swh.core import utils logger = logging.getLogger(__name__) DEFAULT_CONFIG = { 'elasticsearch': { 'cls': 'local', 'args': { 'index_name_prefix': 'swh-tasks', 'storage_nodes': ['localhost:9200'], 'client_options': { 'sniff_on_start': False, 'sniff_on_connection_fail': True, 'http_compress': False, 'sniffer_timeout': 60 }, }, } } def get_elasticsearch(cls: str, args: Dict[str, Any] = {}): """Instantiate an elastic search instance """ if cls == 'local': from elasticsearch import Elasticsearch elif cls == 'memory': from .elasticsearch_memory import MemoryElasticsearch as Elasticsearch # type: ignore # noqa else: raise ValueError('Unknown elasticsearch class `%s`' % cls) return Elasticsearch(**args) class ElasticSearchBackend: """ElasticSearch backend to index tasks This uses an elasticsearch client to actually discuss with the elasticsearch instance. """ def __init__(self, **config): self.config = deepcopy(DEFAULT_CONFIG) self.config.update(config) es_conf = self.config['elasticsearch'] args = deepcopy(es_conf['args']) self.index_name_prefix = args.pop('index_name_prefix') self.storage = get_elasticsearch( cls=es_conf['cls'], args={ 'hosts': args.get('storage_nodes', []), **args.get('client_options', {}), } ) # document's index type (cf. /data/elastic-template.json) self.doc_type = 'task' def initialize(self): self.storage.indices.put_mapping( index=f"{self.index_name_prefix}-*", doc_type=self.doc_type, # to allow type definition below include_type_name=True, # to allow install mapping even if no index yet allow_no_indices=True, body={ "properties": { "task_id": {"type": "double"}, "task_policy": {"type": "text"}, "task_status": {"type": "text"}, "task_run_id": {"type": "double"}, "arguments": { "type": "object", "properties": { "args": { "type": "nested", "dynamic": False }, "kwargs": { "type": "text" } } }, "type": {"type": "text"}, "backend_id": {"type": "text"}, "metadata": { "type": "object", "enabled": False }, "scheduled": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa }, "started": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa }, "ended": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa } } }) self.storage.indices.put_settings( index=f"{self.index_name_prefix}-*", allow_no_indices=True, body={ "index": { "codec": "best_compression", "refresh_interval": "1s", "number_of_shards": 1 } }) def create(self, index_name) -> None: """Create and initialize index_name with mapping for all indices matching `swh-tasks-` pattern """ assert index_name.startswith(self.index_name_prefix) self.storage.indices.create(index_name) def compute_index_name(self, year, month): """Given a year, month, compute the index's name. """ return '%s-%s-%s' % ( self.index_name_prefix, year, '%02d' % month) def mget(self, index_name, doc_ids, chunk_size=500, source=True): """Retrieve document's full content according to their ids as per source's setup. The `source` allows to retrieve only what's interesting, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=['task_id'] ; returns only task_id in the _source field Args: index_name (str): Name of the concerned index. doc_ids (generator): Generator of ids to retrieve chunk_size (int): Number of documents chunk to send for retrieval source (bool/[str]): Source of information to return Yields: document indexed as per source's setup """ if isinstance(source, list): source = {'_source': ','.join(source)} else: source = {'_source': str(source).lower()} for ids in utils.grouper(doc_ids, n=1000): res = self.storage.mget(body={'ids': list(ids)}, index=index_name, doc_type=self.doc_type, params=source) if not res: logger.error('Error during retrieval of data, skipping!') continue for doc in res['docs']: found = doc.get('found') if not found: msg = 'Doc id %s not found, not indexed yet' % doc['_id'] logger.warning(msg) continue yield doc['_source'] def _streaming_bulk(self, index_name, doc_stream, chunk_size=500): """Bulk index data and returns the successful indexed data's identifier. Args: index_name (str): Name of the concerned index. doc_stream (generator): Generator of documents to index chunk_size (int): Number of documents chunk to send for indexation Yields: document id indexed """ actions = ({'_index': index_name, '_op_type': 'index', '_type': self.doc_type, '_source': data} for data in doc_stream) for ok, result in helpers.streaming_bulk(client=self.storage, actions=actions, chunk_size=chunk_size, raise_on_error=False, raise_on_exception=False): if not ok: logger.error('Error during %s indexation. Skipping.', result) continue yield result['index']['_id'] def is_index_opened(self, index_name: str) -> bool: """Determine if an index is opened or not """ try: self.storage.indices.stats(index_name) return True except Exception: # fails when indice is closed (no other api call found) return False def streaming_bulk(self, index_name, doc_stream, chunk_size=500, source=True): """Bulk index data and returns the successful indexed data as per source's setup. the `source` permits to retrieve only what's of interest to us, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=['task_id'] ; returns only task_id in the _source field + Note that: + - if the index is closed, it will be opened + - if the index does not exist, it will be created and opened + + This keeps the index opened for performance reasons. + Args: index_name (str): Name of the concerned index. doc_stream (generator): Document generator to index chunk_size (int): Number of documents chunk to send source (bool, [str]): the information to return """ - to_close = False # index must exist if not self.storage.indices.exists(index_name): self.create(index_name) - # Close that new index (to avoid too much opened indices) - to_close = True # index must be opened if not self.is_index_opened(index_name): - to_close = True self.storage.indices.open(index_name) - try: - indexed_ids = self._streaming_bulk( - index_name, doc_stream, chunk_size=chunk_size) - yield from self.mget( - index_name, indexed_ids, chunk_size=chunk_size, source=source) - finally: - # closing it to stay in the same state as prior to the call - if to_close: - self.storage.indices.close(index_name) + indexed_ids = self._streaming_bulk( + index_name, doc_stream, chunk_size=chunk_size) + yield from self.mget( + index_name, indexed_ids, chunk_size=chunk_size, source=source) diff --git a/swh/scheduler/tests/es/test_backend_es.py b/swh/scheduler/tests/es/test_backend_es.py index 62178f8..cae6f03 100644 --- a/swh/scheduler/tests/es/test_backend_es.py +++ b/swh/scheduler/tests/es/test_backend_es.py @@ -1,78 +1,78 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import pytest import elasticsearch from swh.scheduler.backend_es import get_elasticsearch from ..common import tasks_from_template, TEMPLATES def test_get_elasticsearch(): with pytest.raises(ValueError, match='Unknown elasticsearch class'): get_elasticsearch('unknown') es = get_elasticsearch('memory') assert es from swh.scheduler.elasticsearch_memory import MemoryElasticsearch assert isinstance(es, MemoryElasticsearch) es = get_elasticsearch('local') assert es assert isinstance(es, elasticsearch.Elasticsearch) def test_backend_setup_basic(swh_elasticsearch_backend): """Elastic search instance should allow to create/close/check index """ index_name = 'swh-tasks-2010-01' try: swh_elasticsearch_backend.storage.indices.get_mapping(index_name) except (elasticsearch.exceptions.NotFoundError, KeyError): pass assert not swh_elasticsearch_backend.storage.indices.exists(index_name) swh_elasticsearch_backend.create(index_name) assert swh_elasticsearch_backend.storage.indices.exists(index_name) assert swh_elasticsearch_backend.is_index_opened(index_name) # index exists with a mapping mapping = swh_elasticsearch_backend.storage.indices.get_mapping(index_name) assert mapping != {} def test_backend_setup_index(swh_elasticsearch_backend): """Elastic search instance should allow to bulk index """ template_git = TEMPLATES['git'] next_run_date = datetime.datetime.utcnow() - datetime.timedelta(days=1) tasks = tasks_from_template(template_git, next_run_date, 1) index_name = swh_elasticsearch_backend.compute_index_name( next_run_date.year, next_run_date.month) assert not swh_elasticsearch_backend.storage.indices.exists(index_name) tasks = list(swh_elasticsearch_backend.streaming_bulk(index_name, tasks)) assert len(tasks) > 0 for output_task in tasks: assert output_task is not None assert output_task['type'] == template_git['type'] assert output_task['arguments'] is not None next_run = output_task['next_run'] if isinstance(next_run, str): # real elasticsearch assert next_run == next_run_date.isoformat() else: # memory implem. does not really index assert next_run == next_run_date assert swh_elasticsearch_backend.storage.indices.exists(index_name) - assert not swh_elasticsearch_backend.is_index_opened(index_name) + assert swh_elasticsearch_backend.is_index_opened(index_name) mapping = swh_elasticsearch_backend.storage.indices.get_mapping(index_name) assert mapping != {}