diff --git a/swh/scheduler/backend_es.py b/swh/scheduler/backend_es.py index 82cb02f..6089405 100644 --- a/swh/scheduler/backend_es.py +++ b/swh/scheduler/backend_es.py @@ -1,266 +1,269 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Elastic Search backend """ import datetime # noqa import logging from copy import deepcopy from typing import Any, Dict from elasticsearch import helpers from swh.core import utils logger = logging.getLogger(__name__) DEFAULT_CONFIG = { 'elasticsearch': { 'cls': 'local', 'args': { 'index_name_prefix': 'swh-tasks', 'storage_nodes': ['localhost:9200'], 'client_options': { 'sniff_on_start': False, 'sniff_on_connection_fail': True, 'http_compress': False, 'sniffer_timeout': 60 }, }, } } def get_elasticsearch(cls: str, args: Dict[str, Any] = {}): """Instantiate an elastic search instance """ if cls == 'local': from elasticsearch import Elasticsearch + elif cls == 'memory': + from .backend_es_memory import MemoryElasticsearch + Elasticsearch = MemoryElasticsearch else: raise ValueError('Unknown elasticsearch class `%s`' % cls) return Elasticsearch(**args) class ElasticSearchBackend: """ElasticSearch backend to index tasks This uses an elasticsearch client to actually discuss with the elasticsearch instance. """ def __init__(self, **config): self.config = deepcopy(DEFAULT_CONFIG) self.config.update(config) es_conf = self.config['elasticsearch'] args = deepcopy(es_conf['args']) self.index_name_prefix = args.pop('index_name_prefix') self.storage = get_elasticsearch( cls=es_conf['cls'], args={ 'storage_nodes': args.get('storage_nodes', []), **args.get('client_options', {}), } ) # document's index type (cf. /data/elastic-template.json) self.doc_type = 'task' def initialize(self): self.storage.indices.put_mapping( index=f"{self.index_name_prefix}-*", doc_type=self.doc_type, # to allow type definition below include_type_name=True, # to allow install mapping even if no index yet allow_no_indices=True, body={ "properties": { "task_id": {"type": "double"}, "task_policy": {"type": "text"}, "task_status": {"type": "text"}, "task_run_id": {"type": "double"}, "arguments": { "type": "object", "properties": { "args": { "type": "nested", "dynamic": False }, "kwargs": { "type": "text" } } }, "type": {"type": "text"}, "backend_id": {"type": "text"}, "metadata": { "type": "object", "enabled": False }, "scheduled": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa }, "started": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa }, "ended": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa } } }) self.storage.indices.put_settings( index=f"{self.index_name_prefix}-*", allow_no_indices=True, body={ "index": { "codec": "best_compression", "refresh_interval": "1s", "number_of_shards": 1 } }) def create(self, index_name) -> None: """Create and initialize index_name with mapping for all indices matching `swh-tasks-` pattern """ assert index_name.startswith(self.index_name_prefix) self.storage.indices.create(index_name) def compute_index_name(self, year, month): """Given a year, month, compute the index's name. """ return '%s-%s-%s' % ( self.index_name_prefix, year, '%02d' % month) def mget(self, index_name, doc_ids, chunk_size=500, source=True): """Retrieve document's full content according to their ids as per source's setup. The `source` allows to retrieve only what's interesting, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=['task_id'] ; returns only task_id in the _source field Args: index_name (str): Name of the concerned index. doc_ids (generator): Generator of ids to retrieve chunk_size (int): Number of documents chunk to send for retrieval source (bool/[str]): Source of information to return Yields: document indexed as per source's setup """ if isinstance(source, list): source = {'_source': ','.join(source)} else: source = {'_source': str(source).lower()} for ids in utils.grouper(doc_ids, n=1000): res = self.storage.mget(body={'ids': list(ids)}, index=index_name, doc_type=self.doc_type, params=source) if not res: logger.error('Error during retrieval of data, skipping!') continue for doc in res['docs']: found = doc.get('found') if not found: msg = 'Doc id %s not found, not indexed yet' % doc['_id'] logger.warning(msg) continue yield doc['_source'] def _streaming_bulk(self, index_name, doc_stream, chunk_size=500): """Bulk index data and returns the successful indexed data's identifier. Args: index_name (str): Name of the concerned index. doc_stream (generator): Generator of documents to index chunk_size (int): Number of documents chunk to send for indexation Yields: document id indexed """ actions = ({'_index': index_name, '_op_type': 'index', '_type': self.doc_type, '_source': data} for data in doc_stream) for ok, result in helpers.streaming_bulk(client=self.storage, actions=actions, chunk_size=chunk_size, raise_on_error=False, raise_on_exception=False): if not ok: logger.error('Error during %s indexation. Skipping.', result) continue yield result['index']['_id'] def is_index_opened(self, index_name: str) -> bool: """Determine if an index is opened or not """ try: self.storage.indices.stats(index_name) return True except Exception: # fails when indice is closed (no other api call found) return False def streaming_bulk(self, index_name, doc_stream, chunk_size=500, source=True): """Bulk index data and returns the successful indexed data as per source's setup. the `source` permits to retrieve only what's of interest to us, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=['task_id'] ; returns only task_id in the _source field Args: index_name (str): Name of the concerned index. doc_stream (generator): Document generator to index chunk_size (int): Number of documents chunk to send source (bool, [str]): the information to return """ to_close = False # index must exist if not self.storage.indices.exists(index_name): self.create(index_name) # Close that new index (to avoid too much opened indices) to_close = True # index must be opened if not self.is_index_opened(index_name): to_close = True self.storage.indices.open(index_name) try: indexed_ids = self._streaming_bulk( index_name, doc_stream, chunk_size=chunk_size) yield from self.mget( index_name, indexed_ids, chunk_size=chunk_size, source=source) finally: # closing it to stay in the same state as prior to the call if to_close: self.storage.indices.close(index_name) diff --git a/swh/scheduler/backend_es_memory.py b/swh/scheduler/backend_es_memory.py new file mode 100644 index 0000000..cc77ad6 --- /dev/null +++ b/swh/scheduler/backend_es_memory.py @@ -0,0 +1,152 @@ +# Copyright (C) 2018-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Memory Elastic Search backend + +""" + +import datetime # noqa serialization purposes +import hashlib +import logging + +from ast import literal_eval +from typing import Optional + +import psycopg2 # noqa serialization purposes + + +logger = logging.getLogger(__name__) + + +class BasicSerializer: + """For memory elastic search implementation (not for production) + + """ + def __init__(self, *args, **kwargs): + pass + + def dumps(self, *args, **kwargs): + return str(*args) + + +class BasicTransport: + """For memory elastic search implementation, (not for production) + + """ + def __init__(self, *args, **kwargs): + self.serializer = BasicSerializer() + + +class MemoryElasticsearch: + """Memory Elasticsearch instance (for test purposes) + + Partial implementation oriented towards index storage (and not search) + + For now, its sole client is the scheduler for task archival purposes. + + """ + def __init__(self, *args, **kwargs): + self.index = {} + self.mapping = {} + self.settings = {} + self.indices = self # HACK + self.main_mapping_key: Optional[str] = None + self.main_settings_key: Optional[str] = None + self.transport = BasicTransport() + + def create(self, index, **kwargs): + logger.debug(f'create index {index}') + logger.debug(f'indices: {self.index}') + logger.debug(f'mapping: {self.mapping}') + logger.debug(f'settings: {self.settings}') + self.index[index] = { + 'status': 'opened', + 'data': {}, + 'mapping': self.get_mapping(self.main_mapping_key), + 'settings': self.get_settings(self.main_settings_key), + } + logger.debug(f'index {index} created') + + def close(self, index, **kwargs): + """Close index""" + idx = self.index.get(index) + if idx: + idx['status'] = 'closed' + + def open(self, index, **kwargs): + """Open index""" + idx = self.index.get(index) + if idx: + idx['status'] = 'opened' + + def bulk(self, body, doc_type=None, index=None, params=None): + """Bulk insert document in index""" + assert isinstance(body, str) + all_data = body.split('\n')[:-1] # drop the empty line + ids = [] + # data is sent as tuple (index, data-to-index) + for i in range(0, len(all_data), 2): + # The first entry is about the index to use + # not about a data to index + # find the index + index_data = literal_eval(all_data[i]) + idx_name = index_data['index']['_index'] + # associated data to index + data = all_data[i+1] + _id = hashlib.sha1(data.encode('utf-8')).hexdigest() + parsed_data = eval(data) # for datetime + self.index[idx_name]['data'][_id] = parsed_data + ids.append(_id) + + # everything is indexed fine + return { + 'items': [ + { + 'index': { + 'status': 200, + '_id': _id, + } + } for _id in ids + ] + } + + def mget(self, *args, body, index, **kwargs): + """Bulk indexed documents retrieval""" + idx = self.index[index] + docs = [] + idx_docs = idx['data'] + for _id in body['ids']: + doc = idx_docs.get(_id) + if doc: + d = { + 'found': True, + '_source': doc, + } + docs.append(d) + return {'docs': docs} + + def stats(self, index, **kwargs): + idx = self.index[index] # will raise if it does not exist + if not idx or idx['status'] == 'closed': + raise ValueError('Closed index') # simulate issue if index closed + + def exists(self, index, **kwargs): + return self.index.get(index) is not None + + def put_mapping(self, index, body, **kwargs): + self.mapping[index] = body + self.main_mapping_key = index + + def get_mapping(self, index, **kwargs): + return self.mapping.get(index) or \ + self.index.get(index, {}).get('mapping', {}) + + def put_settings(self, index, body, **kwargs): + self.settings[index] = body + self.main_settings_key = index + + def get_settings(self, index, **kwargs): + return self.settings.get(index) or \ + self.index.get(index, {}).get('settings', {}) diff --git a/swh/scheduler/tests/es/__init__.py b/swh/scheduler/tests/es/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/swh/scheduler/tests/es/conftest.py b/swh/scheduler/tests/es/conftest.py new file mode 100644 index 0000000..be78109 --- /dev/null +++ b/swh/scheduler/tests/es/conftest.py @@ -0,0 +1,48 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import yaml + +import pytest + +from swh.scheduler import get_scheduler + + +@pytest.fixture +def swh_scheduler_config(swh_scheduler_config): + return { + 'scheduler': { + 'cls': 'local', + 'args': swh_scheduler_config, + }, + 'elasticsearch': { + 'cls': 'memory', + 'args': { + 'index_name_prefix': 'swh-tasks', + }, + }, + } + + +@pytest.fixture +def swh_scheduler_config_file(swh_scheduler_config, monkeypatch, tmp_path): + conffile = str(tmp_path / 'elastic.yml') + with open(conffile, 'w') as f: + f.write(yaml.dump(swh_scheduler_config)) + monkeypatch.setenv('SWH_CONFIG_FILENAME', conffile) + return conffile + + +@pytest.fixture +def swh_scheduler(swh_scheduler_config): + return get_scheduler(**swh_scheduler_config['scheduler']) + + +@pytest.fixture +def swh_elasticsearch(swh_scheduler_config): + from swh.scheduler.backend_es import ElasticSearchBackend + backend = ElasticSearchBackend(**swh_scheduler_config) + backend.initialize() + return backend diff --git a/swh/scheduler/tests/es/test_backend.py b/swh/scheduler/tests/es/test_backend.py new file mode 100644 index 0000000..a700998 --- /dev/null +++ b/swh/scheduler/tests/es/test_backend.py @@ -0,0 +1,61 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime + +import elasticsearch + + +from ..common import tasks_from_template, TEMPLATES + + +def test_backend_setup_basic(swh_elasticsearch): + """Elastic search instance should allow to create/close/check index + + """ + index_name = 'swh-tasks-2010-01' + try: + swh_elasticsearch.storage.indices.get_mapping(index_name) + except (elasticsearch.exceptions.NotFoundError, KeyError): + pass + + assert not swh_elasticsearch.storage.indices.exists(index_name) + swh_elasticsearch.create(index_name) + assert swh_elasticsearch.storage.indices.exists(index_name) + assert swh_elasticsearch.is_index_opened(index_name) + + # index exists with a mapping + mapping = swh_elasticsearch.storage.indices.get_mapping(index_name) + assert mapping != {} + + +def test_backend_setup_index(swh_elasticsearch): + """Elastic search instance should allow to bulk index + + """ + template_git = TEMPLATES['git'] + next_run_date = datetime.datetime.utcnow() - datetime.timedelta(days=1) + tasks = tasks_from_template(template_git, next_run_date, 1) + index_name = swh_elasticsearch.compute_index_name( + next_run_date.year, next_run_date.month) + assert not swh_elasticsearch.storage.indices.exists(index_name) + + tasks = list(swh_elasticsearch.streaming_bulk(index_name, tasks)) + assert len(tasks) > 0 + + for output_task in tasks: + assert output_task is not None + assert output_task['type'] == template_git['type'] + assert output_task['arguments'] is not None + next_run = output_task['next_run'] + if isinstance(next_run, str): # real elasticsearch + assert next_run == next_run_date.isoformat() + else: # memory implem. does not really index + assert next_run == next_run_date + + assert swh_elasticsearch.storage.indices.exists(index_name) + assert not swh_elasticsearch.is_index_opened(index_name) + mapping = swh_elasticsearch.storage.indices.get_mapping(index_name) + assert mapping != {} diff --git a/swh/scheduler/tests/es/test_cli_task.py b/swh/scheduler/tests/es/test_cli_task.py new file mode 100644 index 0000000..bbfefae --- /dev/null +++ b/swh/scheduler/tests/es/test_cli_task.py @@ -0,0 +1,108 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import arrow +import datetime +import logging +import uuid +import random + +import pytest + +from click.testing import CliRunner + +from swh.scheduler.cli import cli + + +from ..common import tasks_from_template, TASK_TYPES, TEMPLATES + + +logger = logging.getLogger(__name__) + + +@pytest.mark.usefixtures('swh_elasticsearch') +def test_cli_archive_tasks(swh_scheduler, swh_scheduler_config_file): + template_git = TEMPLATES['git'] + template_hg = TEMPLATES['hg'] + # first initialize scheduler's db (is this still needed?) + for tt in TASK_TYPES.values(): + swh_scheduler.create_task_type(tt) + + next_run_start = arrow.utcnow().datetime - datetime.timedelta(days=1) + + recurring = tasks_from_template( + template_git, next_run_start, 100) + oneshots = tasks_from_template( + template_hg, next_run_start - datetime.timedelta(days=1), 50) + + past_time = next_run_start - datetime.timedelta(days=7) + + all_tasks = recurring + oneshots + result = swh_scheduler.create_tasks(all_tasks) + assert len(result) == len(all_tasks) + + # simulate task run + backend_tasks = [ + { + 'task': task['id'], + 'backend_id': str(uuid.uuid4()), + 'scheduled': next_run_start - datetime.timedelta(minutes=i % 60), + } for i, task in enumerate(result) + ] + swh_scheduler.mass_schedule_task_runs(backend_tasks) + + # Disable some tasks + tasks_to_disable = set() + for task in result: + status = random.choice(['disabled', 'completed']) + if status == 'disabled': + tasks_to_disable.add(task['id']) + + swh_scheduler.disable_tasks(tasks_to_disable) + + git_tasks = swh_scheduler.search_tasks(task_type=template_git['type']) + hg_tasks = swh_scheduler.search_tasks(task_type=template_hg['type']) + assert len(git_tasks) + len(hg_tasks) == len(all_tasks) + + # Ensure the task_run are in expected state + task_runs = swh_scheduler.get_task_runs([ + t['id'] for t in git_tasks + hg_tasks + ]) + + # Same for the tasks + for t in git_tasks + hg_tasks: + if t['id'] in tasks_to_disable: + assert t['status'] == 'disabled' + + future_time = next_run_start + datetime.timedelta(days=1) + for tr in task_runs: + assert past_time <= tr['scheduled'] + assert tr['scheduled'] < future_time + + runner = CliRunner() + result = runner.invoke(cli, [ + '--config-file', swh_scheduler_config_file, + 'task', 'archive', + '--after', past_time.isoformat(), + '--before', future_time.isoformat(), + '--cleanup', + ], obj={ + 'log_level': logging.DEBUG, + }) + + assert result.exit_code == 0, result.output + + # disabled tasks should no longer be in the scheduler + git_tasks = swh_scheduler.search_tasks(task_type=template_git['type']) + hg_tasks = swh_scheduler.search_tasks(task_type=template_hg['type']) + remaining_tasks = git_tasks + hg_tasks + count_disabled = 0 + for task in remaining_tasks: + logger.debug(f"task status: {task['status']}") + if task['status'] == 'disabled': + count_disabled += 1 + + assert count_disabled == 0 + assert len(remaining_tasks) == len(all_tasks) - len(tasks_to_disable)