diff --git a/swh/scheduler/backend_es.py b/swh/scheduler/backend_es.py index 6089405..9980dcb 100644 --- a/swh/scheduler/backend_es.py +++ b/swh/scheduler/backend_es.py @@ -1,269 +1,268 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Elastic Search backend """ import datetime # noqa import logging from copy import deepcopy from typing import Any, Dict from elasticsearch import helpers from swh.core import utils logger = logging.getLogger(__name__) DEFAULT_CONFIG = { 'elasticsearch': { 'cls': 'local', 'args': { 'index_name_prefix': 'swh-tasks', 'storage_nodes': ['localhost:9200'], 'client_options': { 'sniff_on_start': False, 'sniff_on_connection_fail': True, 'http_compress': False, 'sniffer_timeout': 60 }, }, } } def get_elasticsearch(cls: str, args: Dict[str, Any] = {}): """Instantiate an elastic search instance """ if cls == 'local': from elasticsearch import Elasticsearch elif cls == 'memory': - from .backend_es_memory import MemoryElasticsearch - Elasticsearch = MemoryElasticsearch + from .backend_es_memory import MemoryElasticsearch as Elasticsearch # type: ignore # noqa else: raise ValueError('Unknown elasticsearch class `%s`' % cls) return Elasticsearch(**args) class ElasticSearchBackend: """ElasticSearch backend to index tasks This uses an elasticsearch client to actually discuss with the elasticsearch instance. """ def __init__(self, **config): self.config = deepcopy(DEFAULT_CONFIG) self.config.update(config) es_conf = self.config['elasticsearch'] args = deepcopy(es_conf['args']) self.index_name_prefix = args.pop('index_name_prefix') self.storage = get_elasticsearch( cls=es_conf['cls'], args={ 'storage_nodes': args.get('storage_nodes', []), **args.get('client_options', {}), } ) # document's index type (cf. /data/elastic-template.json) self.doc_type = 'task' def initialize(self): self.storage.indices.put_mapping( index=f"{self.index_name_prefix}-*", doc_type=self.doc_type, # to allow type definition below include_type_name=True, # to allow install mapping even if no index yet allow_no_indices=True, body={ "properties": { "task_id": {"type": "double"}, "task_policy": {"type": "text"}, "task_status": {"type": "text"}, "task_run_id": {"type": "double"}, "arguments": { "type": "object", "properties": { "args": { "type": "nested", "dynamic": False }, "kwargs": { "type": "text" } } }, "type": {"type": "text"}, "backend_id": {"type": "text"}, "metadata": { "type": "object", "enabled": False }, "scheduled": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa }, "started": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa }, "ended": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa } } }) self.storage.indices.put_settings( index=f"{self.index_name_prefix}-*", allow_no_indices=True, body={ "index": { "codec": "best_compression", "refresh_interval": "1s", "number_of_shards": 1 } }) def create(self, index_name) -> None: """Create and initialize index_name with mapping for all indices matching `swh-tasks-` pattern """ assert index_name.startswith(self.index_name_prefix) self.storage.indices.create(index_name) def compute_index_name(self, year, month): """Given a year, month, compute the index's name. """ return '%s-%s-%s' % ( self.index_name_prefix, year, '%02d' % month) def mget(self, index_name, doc_ids, chunk_size=500, source=True): """Retrieve document's full content according to their ids as per source's setup. The `source` allows to retrieve only what's interesting, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=['task_id'] ; returns only task_id in the _source field Args: index_name (str): Name of the concerned index. doc_ids (generator): Generator of ids to retrieve chunk_size (int): Number of documents chunk to send for retrieval source (bool/[str]): Source of information to return Yields: document indexed as per source's setup """ if isinstance(source, list): source = {'_source': ','.join(source)} else: source = {'_source': str(source).lower()} for ids in utils.grouper(doc_ids, n=1000): res = self.storage.mget(body={'ids': list(ids)}, index=index_name, doc_type=self.doc_type, params=source) if not res: logger.error('Error during retrieval of data, skipping!') continue for doc in res['docs']: found = doc.get('found') if not found: msg = 'Doc id %s not found, not indexed yet' % doc['_id'] logger.warning(msg) continue yield doc['_source'] def _streaming_bulk(self, index_name, doc_stream, chunk_size=500): """Bulk index data and returns the successful indexed data's identifier. Args: index_name (str): Name of the concerned index. doc_stream (generator): Generator of documents to index chunk_size (int): Number of documents chunk to send for indexation Yields: document id indexed """ actions = ({'_index': index_name, '_op_type': 'index', '_type': self.doc_type, '_source': data} for data in doc_stream) for ok, result in helpers.streaming_bulk(client=self.storage, actions=actions, chunk_size=chunk_size, raise_on_error=False, raise_on_exception=False): if not ok: logger.error('Error during %s indexation. Skipping.', result) continue yield result['index']['_id'] def is_index_opened(self, index_name: str) -> bool: """Determine if an index is opened or not """ try: self.storage.indices.stats(index_name) return True except Exception: # fails when indice is closed (no other api call found) return False def streaming_bulk(self, index_name, doc_stream, chunk_size=500, source=True): """Bulk index data and returns the successful indexed data as per source's setup. the `source` permits to retrieve only what's of interest to us, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=['task_id'] ; returns only task_id in the _source field Args: index_name (str): Name of the concerned index. doc_stream (generator): Document generator to index chunk_size (int): Number of documents chunk to send source (bool, [str]): the information to return """ to_close = False # index must exist if not self.storage.indices.exists(index_name): self.create(index_name) # Close that new index (to avoid too much opened indices) to_close = True # index must be opened if not self.is_index_opened(index_name): to_close = True self.storage.indices.open(index_name) try: indexed_ids = self._streaming_bulk( index_name, doc_stream, chunk_size=chunk_size) yield from self.mget( index_name, indexed_ids, chunk_size=chunk_size, source=source) finally: # closing it to stay in the same state as prior to the call if to_close: self.storage.indices.close(index_name) diff --git a/swh/scheduler/backend_es_memory.py b/swh/scheduler/backend_es_memory.py index cc77ad6..c8c9964 100644 --- a/swh/scheduler/backend_es_memory.py +++ b/swh/scheduler/backend_es_memory.py @@ -1,152 +1,154 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Memory Elastic Search backend """ import datetime # noqa serialization purposes import hashlib import logging from ast import literal_eval from typing import Optional import psycopg2 # noqa serialization purposes logger = logging.getLogger(__name__) class BasicSerializer: """For memory elastic search implementation (not for production) """ def __init__(self, *args, **kwargs): pass def dumps(self, *args, **kwargs): return str(*args) class BasicTransport: """For memory elastic search implementation, (not for production) """ def __init__(self, *args, **kwargs): self.serializer = BasicSerializer() class MemoryElasticsearch: """Memory Elasticsearch instance (for test purposes) Partial implementation oriented towards index storage (and not search) For now, its sole client is the scheduler for task archival purposes. """ def __init__(self, *args, **kwargs): self.index = {} self.mapping = {} self.settings = {} self.indices = self # HACK self.main_mapping_key: Optional[str] = None self.main_settings_key: Optional[str] = None self.transport = BasicTransport() def create(self, index, **kwargs): logger.debug(f'create index {index}') logger.debug(f'indices: {self.index}') logger.debug(f'mapping: {self.mapping}') logger.debug(f'settings: {self.settings}') self.index[index] = { 'status': 'opened', 'data': {}, 'mapping': self.get_mapping(self.main_mapping_key), 'settings': self.get_settings(self.main_settings_key), } logger.debug(f'index {index} created') def close(self, index, **kwargs): """Close index""" idx = self.index.get(index) if idx: idx['status'] = 'closed' def open(self, index, **kwargs): """Open index""" idx = self.index.get(index) if idx: idx['status'] = 'opened' - def bulk(self, body, doc_type=None, index=None, params=None): + def bulk(self, body, **kwargs): """Bulk insert document in index""" assert isinstance(body, str) - all_data = body.split('\n')[:-1] # drop the empty line + all_data = body.split('\n') + if all_data[-1] == '': + all_data = all_data[:-1] # drop the empty line if any ids = [] # data is sent as tuple (index, data-to-index) for i in range(0, len(all_data), 2): # The first entry is about the index to use # not about a data to index # find the index index_data = literal_eval(all_data[i]) idx_name = index_data['index']['_index'] # associated data to index data = all_data[i+1] _id = hashlib.sha1(data.encode('utf-8')).hexdigest() parsed_data = eval(data) # for datetime self.index[idx_name]['data'][_id] = parsed_data ids.append(_id) # everything is indexed fine return { 'items': [ { 'index': { 'status': 200, '_id': _id, } } for _id in ids ] } def mget(self, *args, body, index, **kwargs): """Bulk indexed documents retrieval""" idx = self.index[index] docs = [] idx_docs = idx['data'] for _id in body['ids']: doc = idx_docs.get(_id) if doc: d = { 'found': True, '_source': doc, } docs.append(d) return {'docs': docs} def stats(self, index, **kwargs): idx = self.index[index] # will raise if it does not exist if not idx or idx['status'] == 'closed': raise ValueError('Closed index') # simulate issue if index closed def exists(self, index, **kwargs): return self.index.get(index) is not None def put_mapping(self, index, body, **kwargs): self.mapping[index] = body self.main_mapping_key = index def get_mapping(self, index, **kwargs): return self.mapping.get(index) or \ self.index.get(index, {}).get('mapping', {}) def put_settings(self, index, body, **kwargs): self.settings[index] = body self.main_settings_key = index def get_settings(self, index, **kwargs): return self.settings.get(index) or \ self.index.get(index, {}).get('settings', {}) diff --git a/swh/scheduler/tests/es/conftest.py b/swh/scheduler/tests/es/conftest.py index be78109..c91d110 100644 --- a/swh/scheduler/tests/es/conftest.py +++ b/swh/scheduler/tests/es/conftest.py @@ -1,48 +1,53 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import yaml import pytest from swh.scheduler import get_scheduler @pytest.fixture def swh_scheduler_config(swh_scheduler_config): return { 'scheduler': { 'cls': 'local', 'args': swh_scheduler_config, }, 'elasticsearch': { 'cls': 'memory', 'args': { 'index_name_prefix': 'swh-tasks', }, }, } @pytest.fixture def swh_scheduler_config_file(swh_scheduler_config, monkeypatch, tmp_path): conffile = str(tmp_path / 'elastic.yml') with open(conffile, 'w') as f: f.write(yaml.dump(swh_scheduler_config)) monkeypatch.setenv('SWH_CONFIG_FILENAME', conffile) return conffile @pytest.fixture def swh_scheduler(swh_scheduler_config): return get_scheduler(**swh_scheduler_config['scheduler']) @pytest.fixture def swh_elasticsearch(swh_scheduler_config): from swh.scheduler.backend_es import ElasticSearchBackend backend = ElasticSearchBackend(**swh_scheduler_config) backend.initialize() return backend + + +@pytest.fixture +def swh_memory_elasticsearch(swh_elasticsearch): + return swh_elasticsearch.storage diff --git a/swh/scheduler/tests/es/test_backend_es_memory.py b/swh/scheduler/tests/es/test_backend_es_memory.py new file mode 100644 index 0000000..cd76406 --- /dev/null +++ b/swh/scheduler/tests/es/test_backend_es_memory.py @@ -0,0 +1,161 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import hashlib +import logging +import random + +import pytest + +from swh.scheduler.backend_es_memory import ( + BasicSerializer, BasicTransport +) + +from ..common import tasks_from_template, TEMPLATES +from typing import Any, Dict + + +logger = logging.getLogger(__name__) + + +def test_serializer(): + s = BasicSerializer() + assert s + + data = {'something': [1, 2, 3], 'cool': {'1': '2'}} + actual_data = s.dumps(data) + + assert isinstance(actual_data, str) + assert actual_data == str(data) + + +def test_basic_transport(): + b = BasicTransport() + assert b + + assert isinstance(b.serializer, BasicSerializer) + + +def test_index_manipulation(swh_memory_elasticsearch): + index_name = 'swh-tasks-xxxx' + indices = swh_memory_elasticsearch.index + + assert not swh_memory_elasticsearch.exists(index_name) + assert index_name not in indices + + # so stat raises + with pytest.raises(Exception): + swh_memory_elasticsearch.stats(index_name) + + # we create the index + swh_memory_elasticsearch.create(index_name) + + # now the index exists + assert swh_memory_elasticsearch.exists(index_name) + assert index_name in indices + # it's opened + assert indices[index_name]['status'] == 'opened' + + # so stats is happy + swh_memory_elasticsearch.stats(index_name) + + # open the index, nothing changes + swh_memory_elasticsearch.open(index_name) + assert indices[index_name]['status'] == 'opened' + + # close the index + swh_memory_elasticsearch.close(index_name) + + assert indices[index_name]['status'] == 'closed' + + # reopen the index (fun times) + swh_memory_elasticsearch.open(index_name) + assert indices[index_name]['status'] == 'opened' + + +def test_bulk_and_mget(swh_memory_elasticsearch): + # initialize tasks + template_git = TEMPLATES['git'] + next_run_start = datetime.datetime.utcnow() - datetime.timedelta(days=1) + + tasks = tasks_from_template(template_git, next_run_start, 100) + + def compute_id(stask): + return hashlib.sha1(stask.encode('utf-8')).hexdigest() + + body = [] + ids_to_task = {} + for task in tasks: + date = task['next_run'] + index_name = f'swh-tasks-{date.year}-{date.month}' + idx = {'index': {'_index': index_name}} + sidx = swh_memory_elasticsearch.transport.serializer.dumps(idx) + body.append(sidx) + + stask = swh_memory_elasticsearch.transport.serializer.dumps(task) + body.append(stask) + + _id = compute_id(stask) + ids_to_task[_id] = task + logger.debug(f'_id: {_id}, task: {task}') + + # store + + # create the index first + swh_memory_elasticsearch.create(index_name) + + # then bulk insert new data + result = swh_memory_elasticsearch.bulk('\n'.join(body)) + + # no guarantee in the order + assert result + actual_items = result['items'] + assert len(actual_items) == len(ids_to_task) + + def get_id(data: Dict[str, Any]) -> str: + return data['index']['_id'] + + actual_items = sorted(actual_items, key=get_id) + + expected_items = { + 'items': [ + { + 'index': { + 'status': 200, + '_id': _id + } + } for _id in list(ids_to_task) + ] + } + + expected_items = sorted(expected_items['items'], key=get_id) + assert actual_items == expected_items + + # retrieve + + nb_docs = 10 + ids = list(ids_to_task) + random_ids = [] + # add some inexistent ids + for i in range(16): + noisy_id = f'{i}' * 40 + random_ids.append(noisy_id) + random_ids.extend(random.sample(ids, nb_docs)) # add relevant ids + for i in range(16, 32): + noisy_id = f'{i}' * 40 + random_ids.append(noisy_id) + + result = swh_memory_elasticsearch.mget( + index=index_name, body={'ids': random_ids}) + assert result['docs'] + assert len(result['docs']) == nb_docs, "no random and inexistent id found" + for doc in result['docs']: + assert doc['found'] + + actual_task = doc['_source'] + _id = compute_id(str(actual_task)) + expected_task = ids_to_task[_id] + assert actual_task == expected_task