diff --git a/MANIFEST.in b/MANIFEST.in index d6f7f03..c31099e 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,6 +1,7 @@ include README.md include Makefile include requirements.txt include requirements-swh.txt include version.txt recursive-include swh/scheduler/sql *.sql +recursive-include swh py.typed diff --git a/PKG-INFO b/PKG-INFO index 8dad843..5d66cc3 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.59 +Version: 0.0.60 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler -Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 8dad843..5d66cc3 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.59 +Version: 0.0.60 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler -Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index 3d64c8d..d5dfd88 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,64 +1,65 @@ MANIFEST.in Makefile README.md requirements-swh.txt requirements.txt setup.py version.txt bin/swh-worker-control swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/backend_es.py swh/scheduler/cli_utils.py +swh/scheduler/py.typed swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py swh/scheduler/api/server.py swh/scheduler/api/wsgi.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/runner.py swh/scheduler/cli/__init__.py swh/scheduler/cli/admin.py swh/scheduler/cli/task.py swh/scheduler/cli/task_type.py swh/scheduler/cli/utils.py swh/scheduler/sql/30-swh-schema.sql swh/scheduler/sql/40-swh-func.sql swh/scheduler/sql/50-swh-data.sql swh/scheduler/sql/60-swh-indexes.sql swh/scheduler/sql/updater/10-swh-init.sql swh/scheduler/sql/updater/30-swh-schema.sql swh/scheduler/sql/updater/40-swh-func.sql swh/scheduler/tests/__init__.py swh/scheduler/tests/conftest.py swh/scheduler/tests/tasks.py swh/scheduler/tests/test_api_client.py swh/scheduler/tests/test_celery_tasks.py swh/scheduler/tests/test_cli.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_server.py swh/scheduler/tests/test_utils.py swh/scheduler/tests/updater/__init__.py swh/scheduler/tests/updater/conftest.py swh/scheduler/tests/updater/test_backend.py swh/scheduler/tests/updater/test_consumer.py swh/scheduler/tests/updater/test_events.py swh/scheduler/tests/updater/test_ghtorrent.py swh/scheduler/tests/updater/test_writer.py swh/scheduler/updater/__init__.py swh/scheduler/updater/backend.py swh/scheduler/updater/consumer.py swh/scheduler/updater/events.py swh/scheduler/updater/writer.py swh/scheduler/updater/ghtorrent/__init__.py swh/scheduler/updater/ghtorrent/cli.py \ No newline at end of file diff --git a/swh/__init__.py b/swh/__init__.py index 69e3be5..f14e196 100644 --- a/swh/__init__.py +++ b/swh/__init__.py @@ -1 +1,4 @@ -__path__ = __import__('pkgutil').extend_path(__path__, __name__) +from pkgutil import extend_path +from typing import Iterable + +__path__ = extend_path(__path__, __name__) # type: Iterable[str] diff --git a/swh/scheduler/__init__.py b/swh/scheduler/__init__.py index d706ed7..371324e 100644 --- a/swh/scheduler/__init__.py +++ b/swh/scheduler/__init__.py @@ -1,69 +1,71 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Any, Dict + # Percentage of tasks with priority to schedule PRIORITY_SLOT = 0.6 DEFAULT_CONFIG = { 'scheduler': ('dict', { 'cls': 'local', 'args': { 'db': 'dbname=softwareheritage-scheduler-dev', }, }) } # current configuration. To be set by the config loading mechanism -CONFIG = {} +CONFIG = {} # type: Dict[str, Any] def compute_nb_tasks_from(num_tasks): """Compute and returns the tuple, number of tasks without priority, number of tasks with priority. Args: num_tasks (int): Returns: tuple number of tasks without priority (int), number of tasks with priority (int) """ if not num_tasks: return None, None return (int((1 - PRIORITY_SLOT) * num_tasks), int(PRIORITY_SLOT * num_tasks)) def get_scheduler(cls, args={}): """ Get a scheduler object of class `scheduler_class` with arguments `scheduler_args`. Args: scheduler (dict): dictionary with keys: cls (str): scheduler's class, either 'local' or 'remote' args (dict): dictionary with keys, default to empty. Returns: an instance of swh.scheduler, either local or remote: local: swh.scheduler.backend.SchedulerBackend remote: swh.scheduler.api.client.RemoteScheduler Raises: ValueError if passed an unknown storage class. """ if cls == 'remote': from .api.client import RemoteScheduler as SchedulerBackend elif cls == 'local': from .backend import SchedulerBackend else: raise ValueError('Unknown swh.scheduler class `%s`' % cls) return SchedulerBackend(**args) diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py index 0f275b8..be801a4 100644 --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -1,282 +1,284 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import pkg_resources import urllib.parse from celery import Celery from celery.signals import setup_logging, celeryd_after_setup from celery.utils.log import ColorFormatter from celery.worker.control import Panel from kombu import Exchange, Queue from kombu.five import monotonic as _monotonic import requests +from typing import Any, Dict + from swh.scheduler import CONFIG as SWH_CONFIG from swh.core.config import load_named_config, merge_configs from swh.core.logger import JournalHandler DEFAULT_CONFIG_NAME = 'worker' CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' CONFIG_NAME_TEMPLATE = 'worker/%s' DEFAULT_CONFIG = { 'task_broker': ('str', 'amqp://guest@localhost//'), 'task_modules': ('list[str]', []), 'task_queues': ('list[str]', []), 'task_soft_time_limit': ('int', 0), } logger = logging.getLogger(__name__) @setup_logging.connect def setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, log_console=None, log_journal=None, **kwargs): """Setup logging according to Software Heritage preferences. We use the command-line loglevel for tasks only, as we never really care about the debug messages from celery. """ if loglevel is None: loglevel = logging.DEBUG if isinstance(loglevel, str): loglevel = logging._nameToLevel[loglevel] formatter = logging.Formatter(format) root_logger = logging.getLogger('') root_logger.setLevel(logging.INFO) log_target = os.environ.get('SWH_LOG_TARGET', 'console') if log_target == 'console': log_console = True elif log_target == 'journal': log_journal = True # this looks for log levels *higher* than DEBUG if loglevel <= logging.DEBUG and log_console is None: log_console = True if log_console: color_formatter = ColorFormatter(format) if colorize else formatter console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(color_formatter) root_logger.addHandler(console) if log_journal: systemd_journal = JournalHandler() systemd_journal.setLevel(logging.DEBUG) systemd_journal.setFormatter(formatter) root_logger.addHandler(systemd_journal) logging.getLogger('celery').setLevel(logging.INFO) # Silence amqp heartbeat_tick messages logger = logging.getLogger('amqp') logger.addFilter(lambda record: not record.msg.startswith( 'heartbeat_tick')) logger.setLevel(loglevel) # Silence useless "Starting new HTTP connection" messages logging.getLogger('urllib3').setLevel(logging.WARNING) logging.getLogger('swh').setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task logging.getLogger('celery.task').setLevel(loglevel) return loglevel @celeryd_after_setup.connect def setup_queues_and_tasks(sender, instance, **kwargs): """Signal called on worker start. This automatically registers swh.scheduler.task.Task subclasses as available celery tasks. This also subscribes the worker to the "implicit" per-task queues defined for these task classes. """ logger.info('Setup Queues & Tasks for %s', sender) instance.app.conf['worker_name'] = sender @Panel.register def monotonic(state): """Get the current value for the monotonic clock""" return {'monotonic': _monotonic()} def route_for_task(name, args, kwargs, options, task=None, **kw): """Route tasks according to the task_queue attribute in the task class""" if name is not None and name.startswith('swh.'): return {'queue': name} def get_queue_stats(app, queue_name): """Get the statistics regarding a queue on the broker. Arguments: queue_name: name of the queue to check Returns a dictionary raw from the RabbitMQ management API; or `None` if the current configuration does not use RabbitMQ. Interesting keys: - Consumers (number of consumers for the queue) - messages (number of messages in queue) - messages_unacknowledged (number of messages currently being processed) Documentation: https://www.rabbitmq.com/management.html#http-api """ conn_info = app.connection().info() if conn_info['transport'] == 'memory': # We're running in a test environment, without RabbitMQ. return None url = 'http://{hostname}:{port}/api/queues/{vhost}/{queue}'.format( hostname=conn_info['hostname'], port=conn_info['port'] + 10000, vhost=urllib.parse.quote(conn_info['virtual_host'], safe=''), queue=urllib.parse.quote(queue_name, safe=''), ) credentials = (conn_info['userid'], conn_info['password']) r = requests.get(url, auth=credentials) if r.status_code == 404: return {} if r.status_code != 200: raise ValueError('Got error %s when reading queue stats: %s' % ( r.status_code, r.json())) return r.json() def get_queue_length(app, queue_name): """Shortcut to get a queue's length""" stats = get_queue_stats(app, queue_name) if stats: return stats.get('messages') def register_task_class(app, name, cls): """Register a class-based task under the given name""" if name in app.tasks: return task_instance = cls() task_instance.name = name app.register_task(task_instance) INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) CONFIG_NAME = os.environ.get('SWH_CONFIG_FILENAME') -CONFIG = {} +CONFIG = {} # type: Dict[str, Any] if CONFIG_NAME: # load the celery config from the main config file given as # SWH_CONFIG_FILENAME environment variable. # This is expected to have a [celery] section in which we have the # celery specific configuration. SWH_CONFIG.clear() SWH_CONFIG.update(load_named_config(CONFIG_NAME)) - CONFIG = SWH_CONFIG.get('celery') + CONFIG = SWH_CONFIG.get('celery', default={}) if not CONFIG: # otherwise, back to compat config loading mechanism if INSTANCE_NAME: CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME else: CONFIG_NAME = DEFAULT_CONFIG_NAME # Load the Celery config CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) CONFIG.setdefault('task_modules', []) # load tasks modules declared as plugin entry points for entrypoint in pkg_resources.iter_entry_points('swh.workers'): worker_registrer_fn = entrypoint.load() # The registry function is expected to return a dict which the 'tasks' key # is a string (or a list of strings) with the name of the python module in # which celery tasks are defined. task_modules = worker_registrer_fn().get('task_modules', []) CONFIG['task_modules'].extend(task_modules) # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] CELERY_DEFAULT_CONFIG = dict( # Timezone configuration: all in UTC enable_utc=True, timezone='UTC', # Imported modules imports=CONFIG.get('task_modules', []), # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. None means to never expire results. result_expires=None, # A string identifying the default serialization method to use. Can # be json (default), pickle, yaml, msgpack, or any custom # serialization methods that have been registered with task_serializer='msgpack', # Result serialization format result_serializer='msgpack', # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. task_acks_late=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry accept_content=['msgpack', 'json'], # If True the task will report its status as “started” # when the task is executed by a worker. task_track_started=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # result_compression='bzip2', # task_compression='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) worker_disable_rate_limits=True, # Task routing task_routes=route_for_task, # Allow pool restarts from remote worker_pool_restarts=True, # Do not prefetch tasks worker_prefetch_multiplier=1, # Send events worker_send_task_events=True, # Do not send useless task_sent events task_send_sent_event=False, ) def build_app(config=None): config = merge_configs( {k: v for (k, (_, v)) in DEFAULT_CONFIG.items()}, config or {}) config['task_queues'] = CELERY_QUEUES + [ Queue(queue, Exchange(queue), routing_key=queue) for queue in config.get('task_queues', ())] logger.debug('Creating a Celery app with %s', config) # Instantiate the Celery app app = Celery(broker=config['task_broker'], task_cls='swh.scheduler.task:SWHTask') app.add_defaults(CELERY_DEFAULT_CONFIG) app.add_defaults(config) return app app = build_app(CONFIG) # XXX for BW compat Celery.get_queue_length = get_queue_length diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py index 113ac64..276014d 100644 --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -1,133 +1,124 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import time import click from . import cli @cli.command('start-runner') @click.option('--period', '-p', default=0, help=('Period (in s) at witch pending tasks are checked and ' 'executed. Set to 0 (default) for a one shot.')) @click.pass_context def runner(ctx, period): """Starts a swh-scheduler runner service. This process is responsible for checking for ready-to-run tasks and schedule them.""" from swh.scheduler.celery_backend.runner import run_ready_tasks from swh.scheduler.celery_backend.config import build_app app = build_app(ctx.obj['config'].get('celery')) app.set_current() logger = logging.getLogger(__name__ + '.runner') scheduler = ctx.obj['scheduler'] logger.debug('Scheduler %s' % scheduler) try: while True: logger.debug('Run ready tasks') try: ntasks = len(run_ready_tasks(scheduler, app)) if ntasks: logger.info('Scheduled %s tasks', ntasks) except Exception: logger.exception('Unexpected error in run_ready_tasks()') if not period: break time.sleep(period) except KeyboardInterrupt: ctx.exit(0) @cli.command('start-listener') @click.pass_context def listener(ctx): """Starts a swh-scheduler listener service. This service is responsible for listening at task lifecycle events and handle their workflow status in the database.""" scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') from swh.scheduler.celery_backend.config import build_app app = build_app(ctx.obj['config'].get('celery')) app.set_current() from swh.scheduler.celery_backend.listener import event_monitor event_monitor(app, backend=scheduler) @cli.command('rpc-serve') @click.option('--host', default='0.0.0.0', help="Host to run the scheduler server api") @click.option('--port', default=5008, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=None, help=("Indicates if the server should run in debug mode. " "Defaults to True if log-level is DEBUG, False otherwise.") ) @click.pass_context def rpc_server(ctx, host, port, debug): """Starts a swh-scheduler API HTTP server. """ if ctx.obj['config']['scheduler']['cls'] == 'remote': click.echo("The API server can only be started with a 'local' " "configuration", err=True) ctx.exit(1) from swh.scheduler.api import server server.app.config.update(ctx.obj['config']) if debug is None: debug = ctx.obj['log_level'] <= logging.DEBUG server.app.run(host, port=port, debug=bool(debug)) @cli.command('start-updater') @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') @click.pass_context def updater(ctx, verbose): """Starts a scheduler-updater service. Insert tasks in the scheduler from the scheduler-updater's events read from the db cache (filled e.g. by the ghtorrent consumer service) . """ from swh.scheduler.updater.writer import UpdaterWriter UpdaterWriter(**ctx.obj['config']).run() @cli.command('start-ghtorrent') @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') @click.pass_context def ghtorrent(ctx, verbose): """Starts a ghtorrent consumer service. Consumes events from ghtorrent and write them to a cache. """ from swh.scheduler.updater.ghtorrent import GHTorrentConsumer from swh.scheduler.updater.backend import SchedulerUpdaterBackend ght_config = ctx.obj['config'].get('ghtorrent', {}) back_config = ctx.obj['config'].get('scheduler_updater', {}) backend = SchedulerUpdaterBackend(**back_config) GHTorrentConsumer(backend, **ght_config).run() - - -# for bw compat -cli.add_alias(ghtorrent, 'ghtorrent') -cli.add_alias(listener, 'listener') -cli.add_alias(runner, 'runner') -cli.add_alias(updater, 'updater') -cli.add_alias(rpc_server, 'serve') -cli.add_alias(rpc_server, 'api-server') diff --git a/swh/scheduler/cli/task_type.py b/swh/scheduler/cli/task_type.py index 08123c1..2a84782 100644 --- a/swh/scheduler/cli/task_type.py +++ b/swh/scheduler/cli/task_type.py @@ -1,82 +1,82 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click from . import cli @cli.group('task-type') @click.pass_context def task_type(ctx): """Manipulate task types.""" pass @task_type.command('list') @click.option('--verbose', '-v', is_flag=True, default=False, help='Verbose mode') @click.option('--task_type', '-t', multiple=True, default=None, help='List task types of given type') @click.option('--task_name', '-n', multiple=True, default=None, help='List task types of given backend task name') @click.pass_context def list_task_types(ctx, verbose, task_type, task_name): click.echo("Known task types:") if verbose: tmpl = click.style('{type}: ', bold=True) + '''{backend_name} {description} interval: {default_interval} [{min_interval}, {max_interval}] backoff_factor: {backoff_factor} max_queue_length: {max_queue_length} num_retries: {num_retries} retry_delay: {retry_delay} ''' else: tmpl = '{type}:\n {description}' for tasktype in sorted(ctx.obj['scheduler'].get_task_types(), key=lambda x: x['type']): if task_type and tasktype['type'] not in task_type: continue if task_name and tasktype['backend_name'] not in task_name: continue click.echo(tmpl.format(**tasktype)) @task_type.command('add') -@click.argument('type', required=1) -@click.argument('task-name', required=1) -@click.argument('description', required=1) +@click.argument('type', required=True) +@click.argument('task-name', required=True) +@click.argument('description', required=True) @click.option('--default-interval', '-i', default='90 days', help='Default interval ("90 days" by default)') @click.option('--min-interval', default=None, help='Minimum interval (default interval if not set)') @click.option('--max-interval', '-i', default=None, help='Maximal interval (default interval if not set)') @click.option('--backoff-factor', '-f', type=float, default=1, help='Backoff factor') @click.pass_context def add_task_type(ctx, type, task_name, description, default_interval, min_interval, max_interval, backoff_factor): """Create a new task type """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') task_type = dict( type=type, backend_name=task_name, description=description, default_interval=default_interval, min_interval=min_interval, max_interval=max_interval, backoff_factor=backoff_factor, max_queue_length=None, num_retries=None, retry_delay=None, ) scheduler.create_task_type(task_type) click.echo('OK') diff --git a/swh/scheduler/py.typed b/swh/scheduler/py.typed new file mode 100644 index 0000000..1242d43 --- /dev/null +++ b/swh/scheduler/py.typed @@ -0,0 +1 @@ +# Marker file for PEP 561. diff --git a/swh/scheduler/tests/test_server.py b/swh/scheduler/tests/test_server.py index 4d3ea5f..ae4e660 100644 --- a/swh/scheduler/tests/test_server.py +++ b/swh/scheduler/tests/test_server.py @@ -1,133 +1,133 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import pytest import yaml from swh.scheduler.api.server import load_and_check_config def prepare_config_file(tmpdir, content, name='config.yml'): """Prepare configuration file in `$tmpdir/name` with content `content`. Args: tmpdir (LocalPath): root directory content (str/dict): Content of the file either as string or as a dict. If a dict, converts the dict into a yaml string. name (str): configuration filename Returns path (str) of the configuration file prepared. """ config_path = tmpdir / name if isinstance(content, dict): # convert if needed content = yaml.dump(content) config_path.write_text(content, encoding='utf-8') # pytest on python3.5 does not support LocalPath manipulation, so # convert path to string return str(config_path) def test_load_and_check_config_no_configuration(): - """Inexistant configuration files raises""" + """Inexistent configuration files raises""" with pytest.raises(EnvironmentError) as e: load_and_check_config(None) assert e.value.args[0] == 'Configuration file must be defined' - config_path = '/some/inexistant/config.yml' + config_path = '/some/inexistent/config.yml' with pytest.raises(FileNotFoundError) as e: load_and_check_config(config_path) assert e.value.args[0] == 'Configuration file %s does not exist' % ( config_path, ) def test_load_and_check_config_wrong_configuration(tmpdir): """Wrong configuration raises""" config_path = prepare_config_file(tmpdir, 'something: useless') with pytest.raises(KeyError) as e: load_and_check_config(config_path) assert e.value.args[0] == 'Missing \'%scheduler\' configuration' def test_load_and_check_config_remote_config_local_type_raise(tmpdir): """'local' configuration without 'local' storage raises""" config = { 'scheduler': { 'cls': 'remote', 'args': {} } } config_path = prepare_config_file(tmpdir, config) with pytest.raises(ValueError) as e: load_and_check_config(config_path, type='local') assert ( e.value.args[0] == "The scheduler backend can only be started with a 'local'" " configuration" ) def test_load_and_check_config_local_incomplete_configuration(tmpdir): """Incomplete 'local' configuration should raise""" config = { 'scheduler': { 'cls': 'local', 'args': { 'db': 'database', 'something': 'needed-for-test', } } } for key in ['db', 'args']: c = copy.deepcopy(config) if key == 'db': source = c['scheduler']['args'] else: source = c['scheduler'] source.pop(key) config_path = prepare_config_file(tmpdir, c) with pytest.raises(KeyError) as e: load_and_check_config(config_path) assert ( e.value.args[0] == "Invalid configuration; missing '%s' config entry" % key ) def test_load_and_check_config_local_config_fine(tmpdir): """Local configuration is fine""" config = { 'scheduler': { 'cls': 'local', 'args': { 'db': 'db', } } } config_path = prepare_config_file(tmpdir, config) cfg = load_and_check_config(config_path, type='local') assert cfg == config def test_load_and_check_config_remote_config_fine(tmpdir): """'Remote configuration is fine""" config = { 'scheduler': { 'cls': 'remote', 'args': {} } } config_path = prepare_config_file(tmpdir, config) cfg = load_and_check_config(config_path, type='any') assert cfg == config diff --git a/version.txt b/version.txt index 748751f..3b6dab0 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.59-0-ga64ac3f \ No newline at end of file +v0.0.60-0-g06137f0 \ No newline at end of file