diff --git a/swh/scheduler/backend_es.py b/swh/scheduler/backend_es.py --- a/swh/scheduler/backend_es.py +++ b/swh/scheduler/backend_es.py @@ -71,8 +71,7 @@ """Retrieve document's full content according to their ids as per source's setup. - The `source` permits to retrieve only what's of interest to - us, e.g: + The `source` allows to retrieve only what's interesting, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=['task_id'] ; returns only task_id in the _source field @@ -140,6 +139,17 @@ continue yield result['index']['_id'] + def is_index_opened(self, index_name: str) -> bool: + """Determine if an index is opened or not + + """ + try: + self.storage.indices.stats(index_name) + return True + except Exception: + # fails when indice is closed (no other api call found) + return False + def streaming_bulk(self, index_name, doc_stream, chunk_size=500, source=True, log=None): """Bulk index data and returns the successful indexed data as per @@ -159,8 +169,23 @@ source (bool, [str]): the information to return """ + to_close = False + # index must exist + if not self.storage.indices.exists(index_name): + # server is setup-ed correctly (mappings, settings are + # automatically set, cf. /data/README.md) + self.storage.indices.create(index_name) + # Close that new index (to avoid too much opened indices) + to_close = True + # index must be opened + if not self.is_index_opened(index_name): + to_close = True + self.storage.indices.open(index_name) indexed_ids = self._streaming_bulk( index_name, doc_stream, chunk_size=chunk_size, log=log) yield from self.mget(index_name, indexed_ids, chunk_size=chunk_size, source=source, log=log) + # closing it to stay in the same state as prior to the call + if to_close: + self.storage.indices.close(index_name) diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py --- a/swh/scheduler/cli/task.py +++ b/swh/scheduler/cli/task.py @@ -498,11 +498,12 @@ from swh.core.utils import grouper from swh.scheduler.backend_es import SWHElasticSearchClient + config = ctx.obj['config'] scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') - es_client = SWHElasticSearchClient() + es_client = SWHElasticSearchClient(**config) logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) log = logging.getLogger('swh.scheduler.cli.archive') logging.getLogger('urllib3').setLevel(logging.WARN) @@ -540,9 +541,9 @@ while last_id is not None: result = scheduler.filter_task_to_archive( after, before, last_id=last_id, limit=batch_index) - tasks_in = result['tasks'] - for index_name, tasks_group in itertools.groupby( - tasks_in, key=group_by_index_name): + tasks_sorted = sorted(result['tasks'], key=group_by_index_name) + groups = itertools.groupby(tasks_sorted, key=group_by_index_name) + for index_name, tasks_group in groups: log.debug('Index tasks to %s' % index_name) if dry_run: for task in tasks_group: