diff --git a/swh/scheduler/backend_es.py b/swh/scheduler/backend_es.py --- a/swh/scheduler/backend_es.py +++ b/swh/scheduler/backend_es.py @@ -1,18 +1,22 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - """Elastic Search backend """ + +import logging + from copy import deepcopy from swh.core import utils from elasticsearch import Elasticsearch from elasticsearch import helpers +logger = logging.getLogger(__name__) + DEFAULT_CONFIG = { 'elastic_search': { @@ -67,12 +71,11 @@ body=data) def mget(self, index_name, doc_ids, chunk_size=500, - source=True, log=None): + source=True): """Retrieve document's full content according to their ids as per source's setup. - The `source` permits to retrieve only what's of interest to - us, e.g: + The `source` allows to retrieve only what's interesting, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=['task_id'] ; returns only task_id in the _source field @@ -98,21 +101,18 @@ doc_type=self.doc_type, params=source) if not res: - if log: - log.error('Error during retrieval of data, skipping!') + logger.error('Error during retrieval of data, skipping!') continue for doc in res['docs']: found = doc.get('found') if not found: msg = 'Doc id %s not found, not indexed yet' % doc['_id'] - if log: - log.warning(msg) + logger.warning(msg) continue yield doc['_source'] - def _streaming_bulk(self, index_name, doc_stream, chunk_size=500, - log=None): + def _streaming_bulk(self, index_name, doc_stream, chunk_size=500): """Bulk index data and returns the successful indexed data's identifier. @@ -135,13 +135,23 @@ raise_on_error=False, raise_on_exception=False): if not ok: - if log: - log.error('Error during %s indexation. Skipping.' % result) + logger.error('Error during %s indexation. Skipping.', result) continue yield result['index']['_id'] + def is_index_opened(self, index_name: str) -> bool: + """Determine if an index is opened or not + + """ + try: + self.storage.indices.stats(index_name) + return True + except Exception: + # fails when indice is closed (no other api call found) + return False + def streaming_bulk(self, index_name, doc_stream, chunk_size=500, - source=True, log=None): + source=True): """Bulk index data and returns the successful indexed data as per source's setup. @@ -159,8 +169,25 @@ source (bool, [str]): the information to return """ - - indexed_ids = self._streaming_bulk( - index_name, doc_stream, chunk_size=chunk_size, log=log) - yield from self.mget(index_name, indexed_ids, chunk_size=chunk_size, - source=source, log=log) + to_close = False + # index must exist + if not self.storage.indices.exists(index_name): + # server is setup-ed correctly (mappings, settings are + # automatically set, cf. /data/README.md) + self.storage.indices.create(index_name) + # Close that new index (to avoid too much opened indices) + to_close = True + # index must be opened + if not self.is_index_opened(index_name): + to_close = True + self.storage.indices.open(index_name) + + try: + indexed_ids = self._streaming_bulk( + index_name, doc_stream, chunk_size=chunk_size) + yield from self.mget( + index_name, indexed_ids, chunk_size=chunk_size, source=source) + finally: + # closing it to stay in the same state as prior to the call + if to_close: + self.storage.indices.close(index_name) diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py --- a/swh/scheduler/cli/task.py +++ b/swh/scheduler/cli/task.py @@ -498,11 +498,12 @@ from swh.core.utils import grouper from swh.scheduler.backend_es import SWHElasticSearchClient + config = ctx.obj['config'] scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') - es_client = SWHElasticSearchClient() + es_client = SWHElasticSearchClient(**config) logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) log = logging.getLogger('swh.scheduler.cli.archive') logging.getLogger('urllib3').setLevel(logging.WARN) @@ -525,7 +526,7 @@ log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % ( not dry_run, not dry_run and cleanup, after, before)) - def group_by_index_name(data, es_client=es_client): + def get_index_name(data, es_client=es_client): """Given a data record, determine the index's name through its ending date. This varies greatly depending on the task_run's status. @@ -540,9 +541,9 @@ while last_id is not None: result = scheduler.filter_task_to_archive( after, before, page_token=last_id, count=batch_index) - tasks_in = result['tasks'] - for index_name, tasks_group in itertools.groupby( - tasks_in, key=group_by_index_name): + tasks_sorted = sorted(result['tasks'], key=get_index_name) + groups = itertools.groupby(tasks_sorted, key=get_index_name) + for index_name, tasks_group in groups: log.debug('Index tasks to %s' % index_name) if dry_run: for task in tasks_group: @@ -551,7 +552,7 @@ yield from es_client.streaming_bulk( index_name, tasks_group, source=['task_id', 'task_run_id'], - chunk_size=bulk_index, log=log) + chunk_size=bulk_index) last_id = result.get('next_page_token')