diff --git a/PKG-INFO b/PKG-INFO index 2de3505..62be70f 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.68 +Version: 0.0.69 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 2de3505..62be70f 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.68 +Version: 0.0.69 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh/scheduler/tests/es/conftest.py b/swh/scheduler/tests/es/conftest.py index f669cfd..a699e07 100644 --- a/swh/scheduler/tests/es/conftest.py +++ b/swh/scheduler/tests/es/conftest.py @@ -1,53 +1,53 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import yaml import pytest from swh.scheduler import get_scheduler @pytest.fixture -def swh_scheduler_conf(swh_scheduler_config): +def swh_sched_config(swh_scheduler_config): return { 'scheduler': { 'cls': 'local', 'args': swh_scheduler_config, }, 'elasticsearch': { 'cls': 'memory', 'args': { 'index_name_prefix': 'swh-tasks', }, }, } @pytest.fixture -def swh_scheduler_conf_file(swh_scheduler_conf, monkeypatch, tmp_path): +def swh_sched_config_file(swh_sched_config, monkeypatch, tmp_path): conffile = str(tmp_path / 'elastic.yml') with open(conffile, 'w') as f: - f.write(yaml.dump(swh_scheduler_conf)) + f.write(yaml.dump(swh_sched_config)) monkeypatch.setenv('SWH_CONFIG_FILENAME', conffile) return conffile @pytest.fixture -def swh_scheduler(swh_scheduler_conf): - return get_scheduler(**swh_scheduler_conf['scheduler']) +def swh_sched(swh_sched_config): + return get_scheduler(**swh_sched_config['scheduler']) @pytest.fixture -def swh_elasticsearch(swh_scheduler_conf): +def swh_elasticsearch_backend(swh_sched_config): from swh.scheduler.backend_es import ElasticSearchBackend - backend = ElasticSearchBackend(**swh_scheduler_conf) + backend = ElasticSearchBackend(**swh_sched_config) backend.initialize() return backend @pytest.fixture -def swh_memory_elasticsearch(swh_elasticsearch): - return swh_elasticsearch.storage +def swh_elasticsearch_memory(swh_elasticsearch_backend): + return swh_elasticsearch_backend.storage diff --git a/swh/scheduler/tests/es/test_backend_es.py b/swh/scheduler/tests/es/test_backend_es.py index dd01546..62178f8 100644 --- a/swh/scheduler/tests/es/test_backend_es.py +++ b/swh/scheduler/tests/es/test_backend_es.py @@ -1,78 +1,78 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import pytest import elasticsearch from swh.scheduler.backend_es import get_elasticsearch from ..common import tasks_from_template, TEMPLATES def test_get_elasticsearch(): with pytest.raises(ValueError, match='Unknown elasticsearch class'): get_elasticsearch('unknown') es = get_elasticsearch('memory') assert es from swh.scheduler.elasticsearch_memory import MemoryElasticsearch assert isinstance(es, MemoryElasticsearch) es = get_elasticsearch('local') assert es assert isinstance(es, elasticsearch.Elasticsearch) -def test_backend_setup_basic(swh_elasticsearch): +def test_backend_setup_basic(swh_elasticsearch_backend): """Elastic search instance should allow to create/close/check index """ index_name = 'swh-tasks-2010-01' try: - swh_elasticsearch.storage.indices.get_mapping(index_name) + swh_elasticsearch_backend.storage.indices.get_mapping(index_name) except (elasticsearch.exceptions.NotFoundError, KeyError): pass - assert not swh_elasticsearch.storage.indices.exists(index_name) - swh_elasticsearch.create(index_name) - assert swh_elasticsearch.storage.indices.exists(index_name) - assert swh_elasticsearch.is_index_opened(index_name) + assert not swh_elasticsearch_backend.storage.indices.exists(index_name) + swh_elasticsearch_backend.create(index_name) + assert swh_elasticsearch_backend.storage.indices.exists(index_name) + assert swh_elasticsearch_backend.is_index_opened(index_name) # index exists with a mapping - mapping = swh_elasticsearch.storage.indices.get_mapping(index_name) + mapping = swh_elasticsearch_backend.storage.indices.get_mapping(index_name) assert mapping != {} -def test_backend_setup_index(swh_elasticsearch): +def test_backend_setup_index(swh_elasticsearch_backend): """Elastic search instance should allow to bulk index """ template_git = TEMPLATES['git'] next_run_date = datetime.datetime.utcnow() - datetime.timedelta(days=1) tasks = tasks_from_template(template_git, next_run_date, 1) - index_name = swh_elasticsearch.compute_index_name( + index_name = swh_elasticsearch_backend.compute_index_name( next_run_date.year, next_run_date.month) - assert not swh_elasticsearch.storage.indices.exists(index_name) + assert not swh_elasticsearch_backend.storage.indices.exists(index_name) - tasks = list(swh_elasticsearch.streaming_bulk(index_name, tasks)) + tasks = list(swh_elasticsearch_backend.streaming_bulk(index_name, tasks)) assert len(tasks) > 0 for output_task in tasks: assert output_task is not None assert output_task['type'] == template_git['type'] assert output_task['arguments'] is not None next_run = output_task['next_run'] if isinstance(next_run, str): # real elasticsearch assert next_run == next_run_date.isoformat() else: # memory implem. does not really index assert next_run == next_run_date - assert swh_elasticsearch.storage.indices.exists(index_name) - assert not swh_elasticsearch.is_index_opened(index_name) - mapping = swh_elasticsearch.storage.indices.get_mapping(index_name) + assert swh_elasticsearch_backend.storage.indices.exists(index_name) + assert not swh_elasticsearch_backend.is_index_opened(index_name) + mapping = swh_elasticsearch_backend.storage.indices.get_mapping(index_name) assert mapping != {} diff --git a/swh/scheduler/tests/es/test_cli_task.py b/swh/scheduler/tests/es/test_cli_task.py index 40b3ad0..0f366f3 100644 --- a/swh/scheduler/tests/es/test_cli_task.py +++ b/swh/scheduler/tests/es/test_cli_task.py @@ -1,108 +1,109 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import datetime import logging import uuid import random import pytest from click.testing import CliRunner from swh.scheduler.cli import cli from ..common import tasks_from_template, TASK_TYPES, TEMPLATES logger = logging.getLogger(__name__) -@pytest.mark.usefixtures('swh_elasticsearch') -def test_cli_archive_tasks(swh_scheduler, swh_scheduler_conf_file): +@pytest.mark.usefixtures('swh_elasticsearch_backend') +def test_cli_archive_tasks(swh_sched, swh_sched_config_file): + scheduler = swh_sched template_git = TEMPLATES['git'] template_hg = TEMPLATES['hg'] # first initialize scheduler's db (is this still needed?) for tt in TASK_TYPES.values(): - swh_scheduler.create_task_type(tt) + scheduler.create_task_type(tt) next_run_start = arrow.utcnow().datetime - datetime.timedelta(days=1) recurring = tasks_from_template( template_git, next_run_start, 100) oneshots = tasks_from_template( template_hg, next_run_start - datetime.timedelta(days=1), 50) past_time = next_run_start - datetime.timedelta(days=7) all_tasks = recurring + oneshots - result = swh_scheduler.create_tasks(all_tasks) + result = scheduler.create_tasks(all_tasks) assert len(result) == len(all_tasks) # simulate task run backend_tasks = [ { 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': next_run_start - datetime.timedelta(minutes=i % 60), } for i, task in enumerate(result) ] - swh_scheduler.mass_schedule_task_runs(backend_tasks) + scheduler.mass_schedule_task_runs(backend_tasks) # Disable some tasks tasks_to_disable = set() for task in result: status = random.choice(['disabled', 'completed']) if status == 'disabled': tasks_to_disable.add(task['id']) - swh_scheduler.disable_tasks(tasks_to_disable) + scheduler.disable_tasks(tasks_to_disable) - git_tasks = swh_scheduler.search_tasks(task_type=template_git['type']) - hg_tasks = swh_scheduler.search_tasks(task_type=template_hg['type']) + git_tasks = scheduler.search_tasks(task_type=template_git['type']) + hg_tasks = scheduler.search_tasks(task_type=template_hg['type']) assert len(git_tasks) + len(hg_tasks) == len(all_tasks) # Ensure the task_run are in expected state - task_runs = swh_scheduler.get_task_runs([ + task_runs = scheduler.get_task_runs([ t['id'] for t in git_tasks + hg_tasks ]) # Same for the tasks for t in git_tasks + hg_tasks: if t['id'] in tasks_to_disable: assert t['status'] == 'disabled' future_time = next_run_start + datetime.timedelta(days=1) for tr in task_runs: assert past_time <= tr['scheduled'] assert tr['scheduled'] < future_time runner = CliRunner() result = runner.invoke(cli, [ - '--config-file', swh_scheduler_conf_file, + '--config-file', swh_sched_config_file, 'task', 'archive', '--after', past_time.isoformat(), '--before', future_time.isoformat(), '--cleanup', ], obj={ 'log_level': logging.DEBUG, }) assert result.exit_code == 0, result.output # disabled tasks should no longer be in the scheduler - git_tasks = swh_scheduler.search_tasks(task_type=template_git['type']) - hg_tasks = swh_scheduler.search_tasks(task_type=template_hg['type']) + git_tasks = scheduler.search_tasks(task_type=template_git['type']) + hg_tasks = scheduler.search_tasks(task_type=template_hg['type']) remaining_tasks = git_tasks + hg_tasks count_disabled = 0 for task in remaining_tasks: logger.debug(f"task status: {task['status']}") if task['status'] == 'disabled': count_disabled += 1 assert count_disabled == 0 assert len(remaining_tasks) == len(all_tasks) - len(tasks_to_disable) diff --git a/swh/scheduler/tests/es/test_elasticsearch_memory.py b/swh/scheduler/tests/es/test_elasticsearch_memory.py index 8da4ece..beb48f8 100644 --- a/swh/scheduler/tests/es/test_elasticsearch_memory.py +++ b/swh/scheduler/tests/es/test_elasticsearch_memory.py @@ -1,159 +1,159 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import hashlib import logging import random import pytest from swh.scheduler.elasticsearch_memory import BasicSerializer, BasicTransport from ..common import tasks_from_template, TEMPLATES from typing import Any, Dict logger = logging.getLogger(__name__) def test_serializer(): s = BasicSerializer() assert s data = {'something': [1, 2, 3], 'cool': {'1': '2'}} actual_data = s.dumps(data) assert isinstance(actual_data, str) assert actual_data == str(data) def test_basic_transport(): b = BasicTransport() assert b assert isinstance(b.serializer, BasicSerializer) -def test_index_manipulation(swh_memory_elasticsearch): +def test_index_manipulation(swh_elasticsearch_memory): index_name = 'swh-tasks-xxxx' - indices = swh_memory_elasticsearch.index + indices = swh_elasticsearch_memory.index - assert not swh_memory_elasticsearch.exists(index_name) + assert not swh_elasticsearch_memory.exists(index_name) assert index_name not in indices # so stat raises with pytest.raises(Exception): - swh_memory_elasticsearch.stats(index_name) + swh_elasticsearch_memory.stats(index_name) # we create the index - swh_memory_elasticsearch.create(index_name) + swh_elasticsearch_memory.create(index_name) # now the index exists - assert swh_memory_elasticsearch.exists(index_name) + assert swh_elasticsearch_memory.exists(index_name) assert index_name in indices # it's opened assert indices[index_name]['status'] == 'opened' # so stats is happy - swh_memory_elasticsearch.stats(index_name) + swh_elasticsearch_memory.stats(index_name) # open the index, nothing changes - swh_memory_elasticsearch.open(index_name) + swh_elasticsearch_memory.open(index_name) assert indices[index_name]['status'] == 'opened' # close the index - swh_memory_elasticsearch.close(index_name) + swh_elasticsearch_memory.close(index_name) assert indices[index_name]['status'] == 'closed' # reopen the index (fun times) - swh_memory_elasticsearch.open(index_name) + swh_elasticsearch_memory.open(index_name) assert indices[index_name]['status'] == 'opened' -def test_bulk_and_mget(swh_memory_elasticsearch): +def test_bulk_and_mget(swh_elasticsearch_memory): # initialize tasks template_git = TEMPLATES['git'] next_run_start = datetime.datetime.utcnow() - datetime.timedelta(days=1) tasks = tasks_from_template(template_git, next_run_start, 100) def compute_id(stask): return hashlib.sha1(stask.encode('utf-8')).hexdigest() body = [] ids_to_task = {} for task in tasks: date = task['next_run'] index_name = f'swh-tasks-{date.year}-{date.month}' idx = {'index': {'_index': index_name}} - sidx = swh_memory_elasticsearch.transport.serializer.dumps(idx) + sidx = swh_elasticsearch_memory.transport.serializer.dumps(idx) body.append(sidx) - stask = swh_memory_elasticsearch.transport.serializer.dumps(task) + stask = swh_elasticsearch_memory.transport.serializer.dumps(task) body.append(stask) _id = compute_id(stask) ids_to_task[_id] = task logger.debug(f'_id: {_id}, task: {task}') # store # create the index first - swh_memory_elasticsearch.create(index_name) + swh_elasticsearch_memory.create(index_name) # then bulk insert new data - result = swh_memory_elasticsearch.bulk('\n'.join(body)) + result = swh_elasticsearch_memory.bulk('\n'.join(body)) # no guarantee in the order assert result actual_items = result['items'] assert len(actual_items) == len(ids_to_task) def get_id(data: Dict[str, Any]) -> str: return data['index']['_id'] actual_items = sorted(actual_items, key=get_id) expected_items = { 'items': [ { 'index': { 'status': 200, '_id': _id } } for _id in list(ids_to_task) ] } expected_items = sorted(expected_items['items'], key=get_id) assert actual_items == expected_items # retrieve nb_docs = 10 ids = list(ids_to_task) random_ids = [] # add some inexistent ids for i in range(16): noisy_id = f'{i}' * 40 random_ids.append(noisy_id) random_ids.extend(random.sample(ids, nb_docs)) # add relevant ids for i in range(16, 32): noisy_id = f'{i}' * 40 random_ids.append(noisy_id) - result = swh_memory_elasticsearch.mget( + result = swh_elasticsearch_memory.mget( index=index_name, body={'ids': random_ids}) assert result['docs'] assert len(result['docs']) == nb_docs, "no random and inexistent id found" for doc in result['docs']: assert doc['found'] actual_task = doc['_source'] _id = compute_id(str(actual_task)) expected_task = ids_to_task[_id] assert actual_task == expected_task diff --git a/version.txt b/version.txt index 4d0c673..9acf97f 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.68-0-g73ade78 \ No newline at end of file +v0.0.69-0-gcc2de16 \ No newline at end of file