diff --git a/data/elastic-template.json b/data/elastic-template.json --- a/data/elastic-template.json +++ b/data/elastic-template.json @@ -4,7 +4,7 @@ "settings": { "index": { "codec": "best_compression", - "refresh_interval": "30s", + "refresh_interval": "1s", "number_of_shards": 1 } }, diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -446,14 +446,15 @@ (after_ts, before_ts, last_id, limit + 1) ) for row in cur: + task = dict(row) # nested type index does not accept bare values # transform it as a dict to comply with this - row['arguments']['args'] = { - i: v for i, v in enumerate(row['arguments']['args']) + task['arguments']['args'] = { + i: v for i, v in enumerate(task['arguments']['args']) } - kwargs = row['arguments']['kwargs'] - row['arguments']['kwargs'] = json.dumps(kwargs) - tasks.append(row) + kwargs = task['arguments']['kwargs'] + task['arguments']['kwargs'] = json.dumps(kwargs) + tasks.append(task) if len(tasks) >= limit + 1: # remains data, add pagination information result = { diff --git a/swh/scheduler/backend_es.py b/swh/scheduler/backend_es.py --- a/swh/scheduler/backend_es.py +++ b/swh/scheduler/backend_es.py @@ -1,51 +1,145 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - """Elastic Search backend """ + +import datetime # noqa +import logging + from copy import deepcopy +from typing import Any, Dict -from swh.core import utils -from elasticsearch import Elasticsearch from elasticsearch import helpers +from swh.core import utils + +logger = logging.getLogger(__name__) + DEFAULT_CONFIG = { - 'elastic_search': { - 'storage_nodes': {'host': 'localhost', 'port': 9200}, - 'index_name_prefix': 'swh-tasks', - 'client_options': { - 'sniff_on_start': False, - 'sniff_on_connection_fail': True, - 'http_compress': False, + 'elasticsearch': { + 'cls': 'local', + 'args': { + 'index_name_prefix': 'swh-tasks', + 'storage_nodes': ['localhost:9200'], + 'client_options': { + 'sniff_on_start': False, + 'sniff_on_connection_fail': True, + 'http_compress': False, + 'sniffer_timeout': 60 + }, }, - }, + } } -class SWHElasticSearchClient: +def get_elasticsearch(cls: str, args: Dict[str, Any] = {}): + """Instantiate an elastic search instance + + """ + if cls == 'local': + from elasticsearch import Elasticsearch + elif cls == 'memory': + from .backend_es_memory import MemoryElasticsearch as Elasticsearch # type: ignore # noqa + else: + raise ValueError('Unknown elasticsearch class `%s`' % cls) + + return Elasticsearch(**args) + + +class ElasticSearchBackend: + """ElasticSearch backend to index tasks + + This uses an elasticsearch client to actually discuss with the + elasticsearch instance. + + """ def __init__(self, **config): self.config = deepcopy(DEFAULT_CONFIG) self.config.update(config) - es_conf = self.config['elastic_search'] - options = es_conf.get('client_options', {}) - self.storage = Elasticsearch( - # nodes to use by default - es_conf['storage_nodes'], - # auto detect cluster's status - sniff_on_start=options['sniff_on_start'], - sniff_on_connection_fail=options['sniff_on_connection_fail'], - sniffer_timeout=60, - # compression or not - http_compress=options['http_compress']) - self.index_name_prefix = es_conf['index_name_prefix'] - # document's index type (cf. ../../data/elastic-template.json) + es_conf = self.config['elasticsearch'] + args = deepcopy(es_conf['args']) + self.index_name_prefix = args.pop('index_name_prefix') + self.storage = get_elasticsearch( + cls=es_conf['cls'], + args={ + 'storage_nodes': args.get('storage_nodes', []), + **args.get('client_options', {}), + } + ) + # document's index type (cf. /data/elastic-template.json) self.doc_type = 'task' + def initialize(self): + self.storage.indices.put_mapping( + index=f"{self.index_name_prefix}-*", + doc_type=self.doc_type, + # to allow type definition below + include_type_name=True, + # to allow install mapping even if no index yet + allow_no_indices=True, + body={ + "properties": { + "task_id": {"type": "double"}, + "task_policy": {"type": "text"}, + "task_status": {"type": "text"}, + "task_run_id": {"type": "double"}, + "arguments": { + "type": "object", + "properties": { + "args": { + "type": "nested", + "dynamic": False + }, + "kwargs": { + "type": "text" + } + } + }, + "type": {"type": "text"}, + "backend_id": {"type": "text"}, + "metadata": { + "type": "object", + "enabled": False + }, + "scheduled": { + "type": "date", + "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa + }, + "started": { + "type": "date", + "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa + }, + "ended": { + "type": "date", + "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa + } + } + }) + + self.storage.indices.put_settings( + index=f"{self.index_name_prefix}-*", + allow_no_indices=True, + body={ + "index": { + "codec": "best_compression", + "refresh_interval": "1s", + "number_of_shards": 1 + } + }) + + def create(self, index_name) -> None: + """Create and initialize index_name with mapping for all indices + matching `swh-tasks-` pattern + + """ + assert index_name.startswith(self.index_name_prefix) + self.storage.indices.create(index_name) + def compute_index_name(self, year, month): """Given a year, month, compute the index's name. @@ -53,26 +147,12 @@ return '%s-%s-%s' % ( self.index_name_prefix, year, '%02d' % month) - def index(self, data): - """Index given data to elasticsearch. - - The field 'ended' in data is used to compute the index to - index data to. - - """ - date = data['ended'] - index_name = self.compute_index_name(date.year, date.month) - return self.storage.index(index=index_name, - doc_type=self.doc_type, - body=data) - def mget(self, index_name, doc_ids, chunk_size=500, - source=True, log=None): + source=True): """Retrieve document's full content according to their ids as per source's setup. - The `source` permits to retrieve only what's of interest to - us, e.g: + The `source` allows to retrieve only what's interesting, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=['task_id'] ; returns only task_id in the _source field @@ -98,21 +178,18 @@ doc_type=self.doc_type, params=source) if not res: - if log: - log.error('Error during retrieval of data, skipping!') + logger.error('Error during retrieval of data, skipping!') continue for doc in res['docs']: found = doc.get('found') if not found: msg = 'Doc id %s not found, not indexed yet' % doc['_id'] - if log: - log.warning(msg) + logger.warning(msg) continue yield doc['_source'] - def _streaming_bulk(self, index_name, doc_stream, chunk_size=500, - log=None): + def _streaming_bulk(self, index_name, doc_stream, chunk_size=500): """Bulk index data and returns the successful indexed data's identifier. @@ -135,13 +212,23 @@ raise_on_error=False, raise_on_exception=False): if not ok: - if log: - log.error('Error during %s indexation. Skipping.' % result) + logger.error('Error during %s indexation. Skipping.', result) continue yield result['index']['_id'] + def is_index_opened(self, index_name: str) -> bool: + """Determine if an index is opened or not + + """ + try: + self.storage.indices.stats(index_name) + return True + except Exception: + # fails when indice is closed (no other api call found) + return False + def streaming_bulk(self, index_name, doc_stream, chunk_size=500, - source=True, log=None): + source=True): """Bulk index data and returns the successful indexed data as per source's setup. @@ -159,8 +246,23 @@ source (bool, [str]): the information to return """ + to_close = False + # index must exist + if not self.storage.indices.exists(index_name): + self.create(index_name) + # Close that new index (to avoid too much opened indices) + to_close = True + # index must be opened + if not self.is_index_opened(index_name): + to_close = True + self.storage.indices.open(index_name) - indexed_ids = self._streaming_bulk( - index_name, doc_stream, chunk_size=chunk_size, log=log) - yield from self.mget(index_name, indexed_ids, chunk_size=chunk_size, - source=source, log=log) + try: + indexed_ids = self._streaming_bulk( + index_name, doc_stream, chunk_size=chunk_size) + yield from self.mget( + index_name, indexed_ids, chunk_size=chunk_size, source=source) + finally: + # closing it to stay in the same state as prior to the call + if to_close: + self.storage.indices.close(index_name) diff --git a/swh/scheduler/backend_es_memory.py b/swh/scheduler/backend_es_memory.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/backend_es_memory.py @@ -0,0 +1,154 @@ +# Copyright (C) 2018-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Memory Elastic Search backend + +""" + +import datetime # noqa serialization purposes +import hashlib +import logging + +from ast import literal_eval +from typing import Optional + +import psycopg2 # noqa serialization purposes + + +logger = logging.getLogger(__name__) + + +class BasicSerializer: + """For memory elastic search implementation (not for production) + + """ + def __init__(self, *args, **kwargs): + pass + + def dumps(self, *args, **kwargs): + return str(*args) + + +class BasicTransport: + """For memory elastic search implementation, (not for production) + + """ + def __init__(self, *args, **kwargs): + self.serializer = BasicSerializer() + + +class MemoryElasticsearch: + """Memory Elasticsearch instance (for test purposes) + + Partial implementation oriented towards index storage (and not search) + + For now, its sole client is the scheduler for task archival purposes. + + """ + def __init__(self, *args, **kwargs): + self.index = {} + self.mapping = {} + self.settings = {} + self.indices = self # HACK + self.main_mapping_key: Optional[str] = None + self.main_settings_key: Optional[str] = None + self.transport = BasicTransport() + + def create(self, index, **kwargs): + logger.debug(f'create index {index}') + logger.debug(f'indices: {self.index}') + logger.debug(f'mapping: {self.mapping}') + logger.debug(f'settings: {self.settings}') + self.index[index] = { + 'status': 'opened', + 'data': {}, + 'mapping': self.get_mapping(self.main_mapping_key), + 'settings': self.get_settings(self.main_settings_key), + } + logger.debug(f'index {index} created') + + def close(self, index, **kwargs): + """Close index""" + idx = self.index.get(index) + if idx: + idx['status'] = 'closed' + + def open(self, index, **kwargs): + """Open index""" + idx = self.index.get(index) + if idx: + idx['status'] = 'opened' + + def bulk(self, body, **kwargs): + """Bulk insert document in index""" + assert isinstance(body, str) + all_data = body.split('\n') + if all_data[-1] == '': + all_data = all_data[:-1] # drop the empty line if any + ids = [] + # data is sent as tuple (index, data-to-index) + for i in range(0, len(all_data), 2): + # The first entry is about the index to use + # not about a data to index + # find the index + index_data = literal_eval(all_data[i]) + idx_name = index_data['index']['_index'] + # associated data to index + data = all_data[i+1] + _id = hashlib.sha1(data.encode('utf-8')).hexdigest() + parsed_data = eval(data) # for datetime + self.index[idx_name]['data'][_id] = parsed_data + ids.append(_id) + + # everything is indexed fine + return { + 'items': [ + { + 'index': { + 'status': 200, + '_id': _id, + } + } for _id in ids + ] + } + + def mget(self, *args, body, index, **kwargs): + """Bulk indexed documents retrieval""" + idx = self.index[index] + docs = [] + idx_docs = idx['data'] + for _id in body['ids']: + doc = idx_docs.get(_id) + if doc: + d = { + 'found': True, + '_source': doc, + } + docs.append(d) + return {'docs': docs} + + def stats(self, index, **kwargs): + idx = self.index[index] # will raise if it does not exist + if not idx or idx['status'] == 'closed': + raise ValueError('Closed index') # simulate issue if index closed + + def exists(self, index, **kwargs): + return self.index.get(index) is not None + + def put_mapping(self, index, body, **kwargs): + self.mapping[index] = body + self.main_mapping_key = index + + def get_mapping(self, index, **kwargs): + return self.mapping.get(index) or \ + self.index.get(index, {}).get('mapping', {}) + + def put_settings(self, index, body, **kwargs): + self.settings[index] = body + self.main_settings_key = index + + def get_settings(self, index, **kwargs): + return self.settings.get(index) or \ + self.index.get(index, {}).get('settings', {}) diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py --- a/swh/scheduler/cli/task.py +++ b/swh/scheduler/cli/task.py @@ -13,6 +13,8 @@ import csv import click +from typing import Any, Dict + from . import cli @@ -483,8 +485,8 @@ help='Verbose mode') @click.option('--cleanup/--no-cleanup', is_flag=True, default=True, help='Clean up archived tasks (default)') -@click.option('--start-from', type=click.INT, default=-1, - help='(Optional) default task id to start from. Default is -1.') +@click.option('--start-from', type=click.STRING, default=None, + help='(Optional) default page to start from.') @click.pass_context def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean, dry_run, verbose, cleanup, start_from): @@ -496,22 +498,23 @@ """ from swh.core.utils import grouper - from swh.scheduler.backend_es import SWHElasticSearchClient - + from swh.scheduler.backend_es import ElasticSearchBackend + config = ctx.obj['config'] scheduler = ctx.obj['scheduler'] + if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') - es_client = SWHElasticSearchClient() logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) - log = logging.getLogger('swh.scheduler.cli.archive') + logger = logging.getLogger(__name__) logging.getLogger('urllib3').setLevel(logging.WARN) - logging.getLogger('elasticsearch').setLevel(logging.WARN) + logging.getLogger('elasticsearch').setLevel(logging.ERROR) if dry_run: - log.info('**DRY-RUN** (only reading db)') + logger.info('**DRY-RUN** (only reading db)') if not cleanup: - log.info('**NO CLEANUP**') + logger.info('**NO CLEANUP**') + es_storage = ElasticSearchBackend(**config) now = arrow.utcnow() # Default to archive tasks from a rolling month starting the week @@ -522,10 +525,11 @@ if not after: after = now.shift(weeks=-1).shift(months=-1).format('YYYY-MM-DD') - log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % ( + logger.debug('index: %s; cleanup: %s; period: [%s ; %s]' % ( not dry_run, not dry_run and cleanup, after, before)) - def group_by_index_name(data, es_client=es_client): + def get_index_name(data: Dict[str, Any], + es_storage: ElasticSearchBackend = es_storage) -> str: """Given a data record, determine the index's name through its ending date. This varies greatly depending on the task_run's status. @@ -534,32 +538,34 @@ date = data.get('started') if not date: date = data['scheduled'] - return es_client.compute_index_name(date.year, date.month) + return es_storage.compute_index_name(date.year, date.month) def index_data(before, page_token, batch_index): - while page_token is not None: + while True: result = scheduler.filter_task_to_archive( after, before, page_token=page_token, limit=batch_index) - tasks_in = result['tasks'] - for index_name, tasks_group in itertools.groupby( - tasks_in, key=group_by_index_name): - log.debug('Index tasks to %s' % index_name) + tasks_sorted = sorted(result['tasks'], key=get_index_name) + groups = itertools.groupby(tasks_sorted, key=get_index_name) + for index_name, tasks_group in groups: + logger.debug('Index tasks to %s' % index_name) if dry_run: for task in tasks_group: yield task continue - yield from es_client.streaming_bulk( + yield from es_storage.streaming_bulk( index_name, tasks_group, source=['task_id', 'task_run_id'], - chunk_size=bulk_index, log=log) + chunk_size=bulk_index) page_token = result.get('next_page_token') + if page_token is None: + break gen = index_data(before, page_token=start_from, batch_index=batch_index) if cleanup: for task_ids in grouper(gen, n=batch_clean): task_ids = list(task_ids) - log.info('Clean up %s tasks: [%s, ...]' % ( + logger.info('Clean up %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) if dry_run: # no clean up continue @@ -567,5 +573,7 @@ else: for task_ids in grouper(gen, n=batch_index): task_ids = list(task_ids) - log.info('Indexed %s tasks: [%s, ...]' % ( + logger.info('Indexed %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) + + logger.debug('Done!') diff --git a/swh/scheduler/tests/common.py b/swh/scheduler/tests/common.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/common.py @@ -0,0 +1,100 @@ +# Copyright (C) 2017-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import copy +import datetime + + +TEMPLATES = { + 'git': { + 'type': 'update-git', + 'arguments': { + 'args': [], + 'kwargs': {}, + }, + 'next_run': None, + }, + 'hg': { + 'type': 'update-hg', + 'arguments': { + 'args': [], + 'kwargs': {}, + }, + 'next_run': None, + 'policy': 'oneshot', + } +} + + +TASK_TYPES = { + 'git': { + 'type': 'update-git', + 'description': 'Update a git repository', + 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', + 'default_interval': datetime.timedelta(days=64), + 'min_interval': datetime.timedelta(hours=12), + 'max_interval': datetime.timedelta(days=64), + 'backoff_factor': 2, + 'max_queue_length': None, + 'num_retries': 7, + 'retry_delay': datetime.timedelta(hours=2), + }, + 'hg': { + 'type': 'update-hg', + 'description': 'Update a mercurial repository', + 'backend_name': 'swh.loader.mercurial.tasks.UpdateHgRepository', + 'default_interval': datetime.timedelta(days=64), + 'min_interval': datetime.timedelta(hours=12), + 'max_interval': datetime.timedelta(days=64), + 'backoff_factor': 2, + 'max_queue_length': None, + 'num_retries': 7, + 'retry_delay': datetime.timedelta(hours=2), + }, +} + + +def tasks_from_template(template, max_timestamp, num, + num_priority=0, priorities=None): + """Build tasks from template + + """ + def _task_from_template(template, next_run, priority, *args, **kwargs): + ret = copy.deepcopy(template) + ret['next_run'] = next_run + if priority: + ret['priority'] = priority + if args: + ret['arguments']['args'] = list(args) + if kwargs: + ret['arguments']['kwargs'] = kwargs + return ret + + def _pop_priority(priorities): + if not priorities: + return None + for priority, remains in priorities.items(): + if remains > 0: + priorities[priority] = remains - 1 + return priority + return None + + if num_priority and priorities: + priorities = { + priority: ratio * num_priority + for priority, ratio in priorities.items() + } + + tasks = [] + for i in range(num + num_priority): + priority = _pop_priority(priorities) + tasks.append(_task_from_template( + template, + max_timestamp - datetime.timedelta(microseconds=i), + priority, + 'argument-%03d' % i, + **{'kwarg%03d' % i: 'bogus-kwarg'} + )) + return tasks diff --git a/swh/scheduler/tests/es/__init__.py b/swh/scheduler/tests/es/__init__.py new file mode 100644 diff --git a/swh/scheduler/tests/es/conftest.py b/swh/scheduler/tests/es/conftest.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/es/conftest.py @@ -0,0 +1,53 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import yaml + +import pytest + +from swh.scheduler import get_scheduler + + +@pytest.fixture +def swh_scheduler_config(swh_scheduler_config): + return { + 'scheduler': { + 'cls': 'local', + 'args': swh_scheduler_config, + }, + 'elasticsearch': { + 'cls': 'memory', + 'args': { + 'index_name_prefix': 'swh-tasks', + }, + }, + } + + +@pytest.fixture +def swh_scheduler_config_file(swh_scheduler_config, monkeypatch, tmp_path): + conffile = str(tmp_path / 'elastic.yml') + with open(conffile, 'w') as f: + f.write(yaml.dump(swh_scheduler_config)) + monkeypatch.setenv('SWH_CONFIG_FILENAME', conffile) + return conffile + + +@pytest.fixture +def swh_scheduler(swh_scheduler_config): + return get_scheduler(**swh_scheduler_config['scheduler']) + + +@pytest.fixture +def swh_elasticsearch(swh_scheduler_config): + from swh.scheduler.backend_es import ElasticSearchBackend + backend = ElasticSearchBackend(**swh_scheduler_config) + backend.initialize() + return backend + + +@pytest.fixture +def swh_memory_elasticsearch(swh_elasticsearch): + return swh_elasticsearch.storage diff --git a/swh/scheduler/tests/es/test_backend_es.py b/swh/scheduler/tests/es/test_backend_es.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/es/test_backend_es.py @@ -0,0 +1,78 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime + +import pytest + +import elasticsearch + +from swh.scheduler.backend_es import get_elasticsearch + +from ..common import tasks_from_template, TEMPLATES + + +def test_get_elasticsearch(): + with pytest.raises(ValueError, match='Unknown elasticsearch class'): + get_elasticsearch('unknown') + + es = get_elasticsearch('memory') + assert es + from swh.scheduler.backend_es_memory import MemoryElasticsearch + assert isinstance(es, MemoryElasticsearch) + + es = get_elasticsearch('local') + assert es + assert isinstance(es, elasticsearch.Elasticsearch) + + +def test_backend_setup_basic(swh_elasticsearch): + """Elastic search instance should allow to create/close/check index + + """ + index_name = 'swh-tasks-2010-01' + try: + swh_elasticsearch.storage.indices.get_mapping(index_name) + except (elasticsearch.exceptions.NotFoundError, KeyError): + pass + + assert not swh_elasticsearch.storage.indices.exists(index_name) + swh_elasticsearch.create(index_name) + assert swh_elasticsearch.storage.indices.exists(index_name) + assert swh_elasticsearch.is_index_opened(index_name) + + # index exists with a mapping + mapping = swh_elasticsearch.storage.indices.get_mapping(index_name) + assert mapping != {} + + +def test_backend_setup_index(swh_elasticsearch): + """Elastic search instance should allow to bulk index + + """ + template_git = TEMPLATES['git'] + next_run_date = datetime.datetime.utcnow() - datetime.timedelta(days=1) + tasks = tasks_from_template(template_git, next_run_date, 1) + index_name = swh_elasticsearch.compute_index_name( + next_run_date.year, next_run_date.month) + assert not swh_elasticsearch.storage.indices.exists(index_name) + + tasks = list(swh_elasticsearch.streaming_bulk(index_name, tasks)) + assert len(tasks) > 0 + + for output_task in tasks: + assert output_task is not None + assert output_task['type'] == template_git['type'] + assert output_task['arguments'] is not None + next_run = output_task['next_run'] + if isinstance(next_run, str): # real elasticsearch + assert next_run == next_run_date.isoformat() + else: # memory implem. does not really index + assert next_run == next_run_date + + assert swh_elasticsearch.storage.indices.exists(index_name) + assert not swh_elasticsearch.is_index_opened(index_name) + mapping = swh_elasticsearch.storage.indices.get_mapping(index_name) + assert mapping != {} diff --git a/swh/scheduler/tests/es/test_backend_es_memory.py b/swh/scheduler/tests/es/test_backend_es_memory.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/es/test_backend_es_memory.py @@ -0,0 +1,150 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import hashlib +import logging +import random + +import pytest + +from swh.scheduler.backend_es_memory import ( + BasicSerializer, BasicTransport +) + +from ..common import tasks_from_template, TEMPLATES +from typing import Any, Dict + + +logger = logging.getLogger(__name__) + + +def test_serializer(): + s = BasicSerializer() + assert s + + data = {'something': [1, 2, 3], 'cool': {'1': '2'}} + actual_data = s.dumps(data) + + assert isinstance(actual_data, str) + assert actual_data == str(data) + + +def test_basic_transport(): + b = BasicTransport() + assert b + + assert isinstance(b.serializer, BasicSerializer) + + +def test_index_manipulation(swh_memory_elasticsearch): + index_name = 'swh-tasks-xxxx' + indices = swh_memory_elasticsearch.index + + assert not swh_memory_elasticsearch.exists(index_name) + assert index_name not in indices + + # so stat raises + with pytest.raises(Exception): + swh_memory_elasticsearch.stats(index_name) + + # we create the index + swh_memory_elasticsearch.create(index_name) + + # now the index exists + assert swh_memory_elasticsearch.exists(index_name) + assert index_name in indices + # it's opened + assert indices[index_name]['status'] == 'opened' + + # so stats is happy + swh_memory_elasticsearch.stats(index_name) + + # open the index, nothing changes + swh_memory_elasticsearch.open(index_name) + assert indices[index_name]['status'] == 'opened' + + # close the index + swh_memory_elasticsearch.close(index_name) + + assert indices[index_name]['status'] == 'closed' + + # reopen the index (fun times) + swh_memory_elasticsearch.open(index_name) + assert indices[index_name]['status'] == 'opened' + + +def test_bulk_and_mget(swh_memory_elasticsearch): + # initialize tasks + template_git = TEMPLATES['git'] + next_run_start = datetime.datetime.utcnow() - datetime.timedelta(days=1) + + tasks = tasks_from_template(template_git, next_run_start, 100) + + def compute_id(stask): + return hashlib.sha1(stask.encode('utf-8')).hexdigest() + + body = [] + ids_to_task = {} + for task in tasks: + date = task['next_run'] + index_name = f'swh-tasks-{date.year}-{date.month}' + idx = {'index': {'_index': index_name}} + sidx = swh_memory_elasticsearch.transport.serializer.dumps(idx) + body.append(sidx) + + stask = swh_memory_elasticsearch.transport.serializer.dumps(task) + body.append(stask) + + _id = compute_id(stask) + ids_to_task[_id] = task + logger.debug(f'_id: {_id}, task: {task}') + + # store + + # create the index first + swh_memory_elasticsearch.create(index_name) + + # then bulk insert new data + result = swh_memory_elasticsearch.bulk('\n'.join(body)) + + # no guarantee in the order + assert result + actual_items = result['items'] + assert len(actual_items) == len(ids_to_task) + + def get_id(data: Dict[str, Any]) -> str: + return data['index']['_id'] + + actual_items = sorted(actual_items, key=get_id) + + expected_items = { + 'items': [ + { + 'index': { + 'status': 200, + '_id': _id + } + } for _id in list(ids_to_task) + ] + } + + expected_items = sorted(expected_items['items'], key=get_id) + assert actual_items == expected_items + + # retrieve + + random_ids = random.sample(list(ids_to_task), 10) + + result = swh_memory_elasticsearch.mget( + index=index_name, body={'ids': random_ids}) + assert result['docs'] + for doc in result['docs']: + assert doc['found'] + + actual_task = doc['_source'] + _id = compute_id(str(actual_task)) + expected_task = ids_to_task[_id] + assert actual_task == expected_task diff --git a/swh/scheduler/tests/es/test_cli_task.py b/swh/scheduler/tests/es/test_cli_task.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/es/test_cli_task.py @@ -0,0 +1,108 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import arrow +import datetime +import logging +import uuid +import random + +import pytest + +from click.testing import CliRunner + +from swh.scheduler.cli import cli + + +from ..common import tasks_from_template, TASK_TYPES, TEMPLATES + + +logger = logging.getLogger(__name__) + + +@pytest.mark.usefixtures('swh_elasticsearch') +def test_cli_archive_tasks(swh_scheduler, swh_scheduler_config_file): + template_git = TEMPLATES['git'] + template_hg = TEMPLATES['hg'] + # first initialize scheduler's db (is this still needed?) + for tt in TASK_TYPES.values(): + swh_scheduler.create_task_type(tt) + + next_run_start = arrow.utcnow().datetime - datetime.timedelta(days=1) + + recurring = tasks_from_template( + template_git, next_run_start, 100) + oneshots = tasks_from_template( + template_hg, next_run_start - datetime.timedelta(days=1), 50) + + past_time = next_run_start - datetime.timedelta(days=7) + + all_tasks = recurring + oneshots + result = swh_scheduler.create_tasks(all_tasks) + assert len(result) == len(all_tasks) + + # simulate task run + backend_tasks = [ + { + 'task': task['id'], + 'backend_id': str(uuid.uuid4()), + 'scheduled': next_run_start - datetime.timedelta(minutes=i % 60), + } for i, task in enumerate(result) + ] + swh_scheduler.mass_schedule_task_runs(backend_tasks) + + # Disable some tasks + tasks_to_disable = set() + for task in result: + status = random.choice(['disabled', 'completed']) + if status == 'disabled': + tasks_to_disable.add(task['id']) + + swh_scheduler.disable_tasks(tasks_to_disable) + + git_tasks = swh_scheduler.search_tasks(task_type=template_git['type']) + hg_tasks = swh_scheduler.search_tasks(task_type=template_hg['type']) + assert len(git_tasks) + len(hg_tasks) == len(all_tasks) + + # Ensure the task_run are in expected state + task_runs = swh_scheduler.get_task_runs([ + t['id'] for t in git_tasks + hg_tasks + ]) + + # Same for the tasks + for t in git_tasks + hg_tasks: + if t['id'] in tasks_to_disable: + assert t['status'] == 'disabled' + + future_time = next_run_start + datetime.timedelta(days=1) + for tr in task_runs: + assert past_time <= tr['scheduled'] + assert tr['scheduled'] < future_time + + runner = CliRunner() + result = runner.invoke(cli, [ + '--config-file', swh_scheduler_config_file, + 'task', 'archive', + '--after', past_time.isoformat(), + '--before', future_time.isoformat(), + '--cleanup', + ], obj={ + 'log_level': logging.DEBUG, + }) + + assert result.exit_code == 0, result.output + + # disabled tasks should no longer be in the scheduler + git_tasks = swh_scheduler.search_tasks(task_type=template_git['type']) + hg_tasks = swh_scheduler.search_tasks(task_type=template_hg['type']) + remaining_tasks = git_tasks + hg_tasks + count_disabled = 0 + for task in remaining_tasks: + logger.debug(f"task status: {task['status']}") + if task['status'] == 'disabled': + count_disabled += 1 + + assert count_disabled == 0 + assert len(remaining_tasks) == len(all_tasks) - len(tasks_to_disable) diff --git a/swh/scheduler/tests/test_common.py b/swh/scheduler/tests/test_common.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_common.py @@ -0,0 +1,62 @@ +# Copyright (C) 2017-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime + +from .common import tasks_from_template, TEMPLATES + + +def test_tasks_from_template_no_priority(): + nb_tasks = 3 + template = TEMPLATES['git'] + next_run = datetime.datetime.utcnow() + tasks = tasks_from_template(template, next_run, nb_tasks) + + assert len(tasks) == nb_tasks + + for i, t in enumerate(tasks): + assert t['type'] == template['type'] + assert t['arguments'] is not None + assert t.get('policy') is None # not defined in template + assert len(t['arguments']['args']) == 1 + assert len(t['arguments']['kwargs'].keys()) == 1 + assert t['next_run'] == next_run - datetime.timedelta(microseconds=i) + assert t.get('priority') is None + + +def test_tasks_from_template_priority(): + nb_tasks_no_priority = 3 + nb_tasks_priority = 10 + template = TEMPLATES['hg'] + priorities = { + 'high': 0.5, + 'normal': 0.3, + 'low': 0.2, + } + + next_run = datetime.datetime.utcnow() + tasks = tasks_from_template( + template, next_run, + nb_tasks_no_priority, num_priority=nb_tasks_priority, + priorities=priorities) + + assert len(tasks) == nb_tasks_no_priority + nb_tasks_priority + + repartition_priority = {k: 0 for k in priorities.keys()} + for i, t in enumerate(tasks): + assert t['type'] == template['type'] + assert t['arguments'] is not None + assert t['policy'] == template['policy'] + assert len(t['arguments']['args']) == 1 + assert len(t['arguments']['kwargs'].keys()) == 1 + assert t['next_run'] == next_run - datetime.timedelta(microseconds=i) + priority = t.get('priority') + if priority: + assert priority in priorities + repartition_priority[priority] += 1 + + assert repartition_priority == { + k: v * nb_tasks_priority for k, v in priorities.items() + } diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -16,53 +16,7 @@ import psycopg2 import pytest - -TASK_TYPES = { - 'git': { - 'type': 'update-git', - 'description': 'Update a git repository', - 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', - 'default_interval': datetime.timedelta(days=64), - 'min_interval': datetime.timedelta(hours=12), - 'max_interval': datetime.timedelta(days=64), - 'backoff_factor': 2, - 'max_queue_length': None, - 'num_retries': 7, - 'retry_delay': datetime.timedelta(hours=2), - }, - 'hg': { - 'type': 'update-hg', - 'description': 'Update a mercurial repository', - 'backend_name': 'swh.loader.mercurial.tasks.UpdateHgRepository', - 'default_interval': datetime.timedelta(days=64), - 'min_interval': datetime.timedelta(hours=12), - 'max_interval': datetime.timedelta(days=64), - 'backoff_factor': 2, - 'max_queue_length': None, - 'num_retries': 7, - 'retry_delay': datetime.timedelta(hours=2), - }, -} - -TEMPLATES = { - 'git': { - 'type': 'update-git', - 'arguments': { - 'args': [], - 'kwargs': {}, - }, - 'next_run': None, - }, - 'hg': { - 'type': 'update-hg', - 'arguments': { - 'args': [], - 'kwargs': {}, - }, - 'next_run': None, - 'policy': 'oneshot', - } -} +from .common import tasks_from_template, TEMPLATES, TASK_TYPES def subdict(d, keys=None, excl=()): @@ -105,9 +59,9 @@ priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) num_tasks_priority = 100 - tasks_1 = self._tasks_from_template( + tasks_1 = tasks_from_template( TEMPLATES['git'], utcnow(), 100) - tasks_2 = self._tasks_from_template( + tasks_2 = tasks_from_template( TEMPLATES['hg'], utcnow(), 100, num_tasks_priority, priorities=priority_ratio) tasks = tasks_1 + tasks_2 @@ -159,7 +113,7 @@ self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES['git']['type'] - tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) + tasks = tasks_from_template(TEMPLATES['git'], t, 100) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) @@ -207,7 +161,7 @@ num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities - tasks = self._tasks_from_template( + tasks = tasks_from_template( TEMPLATES['git'], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, @@ -264,7 +218,7 @@ num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities - tasks = self._tasks_from_template( + tasks = tasks_from_template( TEMPLATES['git'], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, @@ -288,7 +242,7 @@ def test_get_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() - tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) + tasks = tasks_from_template(TEMPLATES['git'], t, 100) tasks = swh_scheduler.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: @@ -307,7 +261,7 @@ return [dict(d.items()) for d in l] self._create_task_types(swh_scheduler) t = utcnow() - tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) + tasks = tasks_from_template(TEMPLATES['git'], t, 100) tasks = swh_scheduler.create_tasks(tasks) assert make_real_dicts(swh_scheduler.search_tasks()) \ == make_real_dicts(tasks) @@ -334,8 +288,8 @@ """ self._create_task_types(swh_scheduler) _time = utcnow() - recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12) - oneshots = self._tasks_from_template(TEMPLATES['hg'], _time, 12) + recurring = tasks_from_template(TEMPLATES['git'], _time, 12) + oneshots = tasks_from_template(TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks @@ -444,9 +398,9 @@ def test_delete_archived_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) _time = utcnow() - recurring = self._tasks_from_template( + recurring = tasks_from_template( TEMPLATES['git'], _time, 12) - oneshots = self._tasks_from_template( + oneshots = tasks_from_template( TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) @@ -492,9 +446,9 @@ ''' self._create_task_types(swh_scheduler) _time = utcnow() - recurring = self._tasks_from_template( + recurring = tasks_from_template( TEMPLATES['git'], _time, 12) - oneshots = self._tasks_from_template( + oneshots = tasks_from_template( TEMPLATES['hg'], _time, 12) swh_scheduler.create_tasks(recurring + oneshots) @@ -509,9 +463,9 @@ ''' self._create_task_types(swh_scheduler) _time = utcnow() - recurring = self._tasks_from_template( + recurring = tasks_from_template( TEMPLATES['git'], _time, 12) - oneshots = self._tasks_from_template( + oneshots = tasks_from_template( TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) @@ -560,9 +514,9 @@ ''' self._create_task_types(swh_scheduler) _time = utcnow() - recurring = self._tasks_from_template( + recurring = tasks_from_template( TEMPLATES['git'], _time, 12) - oneshots = self._tasks_from_template( + oneshots = tasks_from_template( TEMPLATES['hg'], _time, 12) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ @@ -606,47 +560,6 @@ 'status': 'eventful', } - @staticmethod - def _task_from_template(template, next_run, priority, *args, **kwargs): - ret = copy.deepcopy(template) - ret['next_run'] = next_run - if priority: - ret['priority'] = priority - if args: - ret['arguments']['args'] = list(args) - if kwargs: - ret['arguments']['kwargs'] = kwargs - return ret - - def _pop_priority(self, priorities): - if not priorities: - return None - for priority, remains in priorities.items(): - if remains > 0: - priorities[priority] = remains - 1 - return priority - return None - - def _tasks_from_template(self, template, max_timestamp, num, - num_priority=0, priorities=None): - if num_priority and priorities: - priorities = { - priority: ratio * num_priority - for priority, ratio in priorities.items() - } - - tasks = [] - for i in range(num + num_priority): - priority = self._pop_priority(priorities) - tasks.append(self._task_from_template( - template, - max_timestamp - datetime.timedelta(microseconds=i), - priority, - 'argument-%03d' % i, - **{'kwarg%03d' % i: 'bogus-kwarg'} - )) - return tasks - def _create_task_types(self, scheduler): for tt in TASK_TYPES.values(): scheduler.create_task_type(tt)