diff --git a/swh/scheduler/__init__.py b/swh/scheduler/__init__.py --- a/swh/scheduler/__init__.py +++ b/swh/scheduler/__init__.py @@ -7,6 +7,16 @@ # Percentage of tasks with priority to schedule PRIORITY_SLOT = 0.6 +DEFAULT_CONFIG_PATH = 'backend/scheduler' +DEFAULT_CONFIG = { + 'scheduler': ('dict', { + 'cls': 'local', + 'args': { + 'db': 'dbname=softwareheritage-scheduler-dev', + }, + }) +} + def compute_nb_tasks_from(num_tasks): """Compute and returns the tuple, number of tasks without priority, diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -14,17 +14,7 @@ encode_data_server as encode_data) from swh.core.api.negotiate import negotiate from swh.core.api import JSONFormatter, MsgpackFormatter - - -DEFAULT_CONFIG_PATH = 'backend/scheduler' -DEFAULT_CONFIG = { - 'scheduler': ('dict', { - 'cls': 'local', - 'args': { - 'db': 'dbname=softwareheritage-scheduler-dev', - }, - }) -} +from swh.scheduler import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH app = Flask(__name__) diff --git a/swh/scheduler/backend_es.py b/swh/scheduler/backend_es.py --- a/swh/scheduler/backend_es.py +++ b/swh/scheduler/backend_es.py @@ -7,44 +7,42 @@ """Elastic Search backend """ - +from copy import deepcopy from swh.core import utils -from swh.core.config import SWHConfig from elasticsearch import Elasticsearch from elasticsearch import helpers -class SWHElasticSearchClient(SWHConfig): - CONFIG_BASE_FILENAME = 'backend/elastic' - - DEFAULT_CONFIG = { - 'storage_nodes': ('[dict]', [{'host': 'localhost', 'port': 9200}]), - 'index_name_prefix': ('str', 'swh-tasks'), - 'client_options': ('dict', { +DEFAULT_CONFIG = { + 'elastic_search': { + 'storage_nodes': {'host': 'localhost', 'port': 9200}, + 'index_name_prefix': 'swh-tasks', + 'client_options': { 'sniff_on_start': False, 'sniff_on_connection_fail': True, 'http_compress': False, - }) - } + }, + }, +} - def __init__(self, **config): - if config: - self.config = config - else: - self.config = self.parse_config_file() - options = self.config['client_options'] +class SWHElasticSearchClient: + def __init__(self, **config): + self.config = deepcopy(DEFAULT_CONFIG) + self.config.update(config) + es_conf = self.config['elastic_search'] + options = es_conf.get('client_options', {}) self.storage = Elasticsearch( # nodes to use by default - self.config['storage_nodes'], + es_conf['storage_nodes'], # auto detect cluster's status sniff_on_start=options['sniff_on_start'], sniff_on_connection_fail=options['sniff_on_connection_fail'], sniffer_timeout=60, # compression or not http_compress=options['http_compress']) - self.index_name_prefix = self.config['index_name_prefix'] + self.index_name_prefix = es_conf['index_name_prefix'] # document's index type (cf. ../../data/elastic-template.json) self.doc_type = 'task' diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -15,8 +15,7 @@ from kombu import Queue from celery.events import EventReceiver -from swh.scheduler import get_scheduler -from .config import setup_log_handler, app as main_app +from .config import app as main_app class ReliableEventReceiver(EventReceiver): @@ -197,35 +196,11 @@ @click.command() -@click.option('--cls', '-c', default='local', - help="Scheduler's class, default to 'local'") -@click.option( - '--database', '-d', help='Scheduling database DSN') -@click.option('--url', '-u', - help="(Optional) Scheduler's url access") -@click.option('--log-level', '-l', default='INFO', - type=click.Choice(logging._nameToLevel.keys()), - help='Log level (default to INFO)') -def main(cls, database, url, log_level): - setup_log_handler(loglevel=log_level, colorize=False, - format='[%(levelname)s] %(name)s -- %(message)s') - # logging.basicConfig(level=level) - - scheduler = None - override_config = {} - if cls == 'local': - if database: - override_config = {'scheduling_db': database} - scheduler = get_scheduler(cls, args=override_config) - elif cls == 'remote': - if url: - override_config = {'url': url} - scheduler = get_scheduler(cls, args=override_config) - - if not scheduler: - raise ValueError('Scheduler class (local/remote) must be instantiated') - - event_monitor(main_app, backend=scheduler) +@click.pass_context +def main(ctx): + click.echo("Deprecated! Use 'swh-scheduler listener' instead.", + err=True) + ctx.exit(1) if __name__ == '__main__': diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -15,7 +15,7 @@ from swh.core import utils, config from . import compute_nb_tasks_from from .backend_es import SWHElasticSearchClient -from . import get_scheduler +from . import get_scheduler, DEFAULT_CONFIG locale.setlocale(locale.LC_ALL, '') @@ -83,18 +83,18 @@ @click.group(context_settings=CONTEXT_SETTINGS) -@click.option('--cls', '-c', default='local', - type=click.Choice(['local', 'remote']), - help="Scheduler's class, default to 'local'") -@click.option('--database', '-d', - help="Scheduling database DSN (if cls is 'local')") -@click.option('--url', '-u', - help="Scheduler's url access (if cls is 'remote')") +@click.option('--config-file', '-C', default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.") +@click.option('--database', '-d', default=None, + help="Scheduling database DSN (imply cls is 'local')") +@click.option('--url', '-u', default=None, + help="Scheduler's url access (imply cls is 'remote')") @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys()), help="Log level (default to INFO)") @click.pass_context -def cli(ctx, cls, database, url, log_level): +def cli(ctx, config_file, cls, database, url, log_level): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the @@ -110,22 +110,28 @@ logger = logging.getLogger(__name__) scheduler = None - override_config = {} + conf = config.read(config_file, DEFAULT_CONFIG) + if 'scheduler' not in conf: + raise ValueError("missing 'scheduler' configuration") + + if database: + conf['scheduler']['cls'] = 'local' + conf['scheduler']['args']['db'] = database + elif url: + conf['scheduler']['cls'] = 'local' + conf['scheduler']['args']['url'] = url + sched_conf = conf['scheduler'] try: - if cls == 'local' and database: - override_config = {'scheduling_db': database} - elif cls == 'remote' and url: - override_config = {'url': url} - logger.debug('Instanciating scheduler %s with %s' % ( - cls, override_config)) - scheduler = get_scheduler(cls, args=override_config) + logger.debug('Instanciating scheduler with %s' % ( + sched_conf)) + scheduler = get_scheduler(**sched_conf) except Exception: # it's the subcommand to decide whether not having a proper # scheduler instance is a problem. pass ctx.obj['scheduler'] = scheduler - ctx.obj['config'] = {'cls': cls, 'args': override_config} + ctx.obj['config'] = conf ctx.obj['loglevel'] = log_level @@ -530,7 +536,6 @@ @cli.command('api-server') -@click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the scheduler server api") @click.option('--port', default=5008, type=click.INT, @@ -540,23 +545,20 @@ "Defaults to True if log-level is DEBUG, False otherwise.") ) @click.pass_context -def api_server(ctx, config_path, host, port, debug): +def api_server(ctx, host, port, debug): """Starts a swh-scheduler API HTTP server. """ - if ctx.obj['config']['cls'] == 'remote': + if ctx.obj['config']['scheduler']['cls'] == 'remote': click.echo("The API server can only be started with a 'local' " "configuration", err=True) ctx.exit(1) - from swh.scheduler.api.server import app, DEFAULT_CONFIG - conf = config.read(config_path, DEFAULT_CONFIG) - if ctx.obj['config']['args']: - conf['scheduler']['args'].update(ctx.obj['config']['args']) - app.config.update(conf) + from swh.scheduler.api import server + server.app.scheduler = ctx.obj['scheduler'] + server.app.config.update(ctx.obj['config']) if debug is None: debug = ctx.obj['loglevel'] <= logging.DEBUG - - app.run(host, port=port, debug=bool(debug)) + server.app.run(host, port=port, debug=bool(debug)) @cli.group('task-type')