Page MenuHomeSoftware Heritage

D1026.id3256.diff
No OneTemporary

D1026.id3256.diff

diff --git a/swh/scheduler/__init__.py b/swh/scheduler/__init__.py
--- a/swh/scheduler/__init__.py
+++ b/swh/scheduler/__init__.py
@@ -7,6 +7,16 @@
# Percentage of tasks with priority to schedule
PRIORITY_SLOT = 0.6
+DEFAULT_CONFIG_PATH = 'backend/scheduler'
+DEFAULT_CONFIG = {
+ 'scheduler': ('dict', {
+ 'cls': 'local',
+ 'args': {
+ 'db': 'dbname=softwareheritage-scheduler-dev',
+ },
+ })
+}
+
def compute_nb_tasks_from(num_tasks):
"""Compute and returns the tuple, number of tasks without priority,
diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py
--- a/swh/scheduler/api/server.py
+++ b/swh/scheduler/api/server.py
@@ -14,17 +14,7 @@
encode_data_server as encode_data)
from swh.core.api.negotiate import negotiate
from swh.core.api import JSONFormatter, MsgpackFormatter
-
-
-DEFAULT_CONFIG_PATH = 'backend/scheduler'
-DEFAULT_CONFIG = {
- 'scheduler': ('dict', {
- 'cls': 'local',
- 'args': {
- 'db': 'dbname=softwareheritage-scheduler-dev',
- },
- })
-}
+from swh.scheduler import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH
app = Flask(__name__)
diff --git a/swh/scheduler/backend_es.py b/swh/scheduler/backend_es.py
--- a/swh/scheduler/backend_es.py
+++ b/swh/scheduler/backend_es.py
@@ -7,44 +7,42 @@
"""Elastic Search backend
"""
-
+from copy import deepcopy
from swh.core import utils
-from swh.core.config import SWHConfig
from elasticsearch import Elasticsearch
from elasticsearch import helpers
-class SWHElasticSearchClient(SWHConfig):
- CONFIG_BASE_FILENAME = 'backend/elastic'
-
- DEFAULT_CONFIG = {
- 'storage_nodes': ('[dict]', [{'host': 'localhost', 'port': 9200}]),
- 'index_name_prefix': ('str', 'swh-tasks'),
- 'client_options': ('dict', {
+DEFAULT_CONFIG = {
+ 'elastic_search': {
+ 'storage_nodes': {'host': 'localhost', 'port': 9200},
+ 'index_name_prefix': 'swh-tasks',
+ 'client_options': {
'sniff_on_start': False,
'sniff_on_connection_fail': True,
'http_compress': False,
- })
- }
+ },
+ },
+}
- def __init__(self, **config):
- if config:
- self.config = config
- else:
- self.config = self.parse_config_file()
- options = self.config['client_options']
+class SWHElasticSearchClient:
+ def __init__(self, **config):
+ self.config = deepcopy(DEFAULT_CONFIG)
+ self.config.update(config)
+ es_conf = self.config['elastic_search']
+ options = es_conf.get('client_options', {})
self.storage = Elasticsearch(
# nodes to use by default
- self.config['storage_nodes'],
+ es_conf['storage_nodes'],
# auto detect cluster's status
sniff_on_start=options['sniff_on_start'],
sniff_on_connection_fail=options['sniff_on_connection_fail'],
sniffer_timeout=60,
# compression or not
http_compress=options['http_compress'])
- self.index_name_prefix = self.config['index_name_prefix']
+ self.index_name_prefix = es_conf['index_name_prefix']
# document's index type (cf. ../../data/elastic-template.json)
self.doc_type = 'task'
diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py
--- a/swh/scheduler/celery_backend/listener.py
+++ b/swh/scheduler/celery_backend/listener.py
@@ -15,8 +15,7 @@
from kombu import Queue
from celery.events import EventReceiver
-from swh.scheduler import get_scheduler
-from .config import setup_log_handler, app as main_app
+from .config import app as main_app
class ReliableEventReceiver(EventReceiver):
@@ -197,35 +196,11 @@
@click.command()
-@click.option('--cls', '-c', default='local',
- help="Scheduler's class, default to 'local'")
-@click.option(
- '--database', '-d', help='Scheduling database DSN')
-@click.option('--url', '-u',
- help="(Optional) Scheduler's url access")
-@click.option('--log-level', '-l', default='INFO',
- type=click.Choice(logging._nameToLevel.keys()),
- help='Log level (default to INFO)')
-def main(cls, database, url, log_level):
- setup_log_handler(loglevel=log_level, colorize=False,
- format='[%(levelname)s] %(name)s -- %(message)s')
- # logging.basicConfig(level=level)
-
- scheduler = None
- override_config = {}
- if cls == 'local':
- if database:
- override_config = {'scheduling_db': database}
- scheduler = get_scheduler(cls, args=override_config)
- elif cls == 'remote':
- if url:
- override_config = {'url': url}
- scheduler = get_scheduler(cls, args=override_config)
-
- if not scheduler:
- raise ValueError('Scheduler class (local/remote) must be instantiated')
-
- event_monitor(main_app, backend=scheduler)
+@click.pass_context
+def main(ctx):
+ click.echo("Deprecated! Use 'swh-scheduler listener' instead.",
+ err=True)
+ ctx.exit(1)
if __name__ == '__main__':
diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py
--- a/swh/scheduler/cli.py
+++ b/swh/scheduler/cli.py
@@ -15,7 +15,7 @@
from swh.core import utils, config
from . import compute_nb_tasks_from
from .backend_es import SWHElasticSearchClient
-from . import get_scheduler
+from . import get_scheduler, DEFAULT_CONFIG
locale.setlocale(locale.LC_ALL, '')
@@ -83,18 +83,18 @@
@click.group(context_settings=CONTEXT_SETTINGS)
-@click.option('--cls', '-c', default='local',
- type=click.Choice(['local', 'remote']),
- help="Scheduler's class, default to 'local'")
-@click.option('--database', '-d',
- help="Scheduling database DSN (if cls is 'local')")
-@click.option('--url', '-u',
- help="Scheduler's url access (if cls is 'remote')")
+@click.option('--config-file', '-C', default=None,
+ type=click.Path(exists=True, dir_okay=False,),
+ help="Configuration file.")
+@click.option('--database', '-d', default=None,
+ help="Scheduling database DSN (imply cls is 'local')")
+@click.option('--url', '-u', default=None,
+ help="Scheduler's url access (imply cls is 'remote')")
@click.option('--log-level', '-l', default='INFO',
type=click.Choice(logging._nameToLevel.keys()),
help="Log level (default to INFO)")
@click.pass_context
-def cli(ctx, cls, database, url, log_level):
+def cli(ctx, config_file, cls, database, url, log_level):
"""Software Heritage Scheduler CLI interface
Default to use the the local scheduler instance (plugged to the
@@ -110,22 +110,28 @@
logger = logging.getLogger(__name__)
scheduler = None
- override_config = {}
+ conf = config.read(config_file, DEFAULT_CONFIG)
+ if 'scheduler' not in conf:
+ raise ValueError("missing 'scheduler' configuration")
+
+ if database:
+ conf['scheduler']['cls'] = 'local'
+ conf['scheduler']['args']['db'] = database
+ elif url:
+ conf['scheduler']['cls'] = 'local'
+ conf['scheduler']['args']['url'] = url
+ sched_conf = conf['scheduler']
try:
- if cls == 'local' and database:
- override_config = {'scheduling_db': database}
- elif cls == 'remote' and url:
- override_config = {'url': url}
- logger.debug('Instanciating scheduler %s with %s' % (
- cls, override_config))
- scheduler = get_scheduler(cls, args=override_config)
+ logger.debug('Instanciating scheduler with %s' % (
+ sched_conf))
+ scheduler = get_scheduler(**sched_conf)
except Exception:
# it's the subcommand to decide whether not having a proper
# scheduler instance is a problem.
pass
ctx.obj['scheduler'] = scheduler
- ctx.obj['config'] = {'cls': cls, 'args': override_config}
+ ctx.obj['config'] = conf
ctx.obj['loglevel'] = log_level
@@ -530,7 +536,6 @@
@cli.command('api-server')
-@click.argument('config-path', required=1)
@click.option('--host', default='0.0.0.0',
help="Host to run the scheduler server api")
@click.option('--port', default=5008, type=click.INT,
@@ -540,23 +545,20 @@
"Defaults to True if log-level is DEBUG, False otherwise.")
)
@click.pass_context
-def api_server(ctx, config_path, host, port, debug):
+def api_server(ctx, host, port, debug):
"""Starts a swh-scheduler API HTTP server.
"""
- if ctx.obj['config']['cls'] == 'remote':
+ if ctx.obj['config']['scheduler']['cls'] == 'remote':
click.echo("The API server can only be started with a 'local' "
"configuration", err=True)
ctx.exit(1)
- from swh.scheduler.api.server import app, DEFAULT_CONFIG
- conf = config.read(config_path, DEFAULT_CONFIG)
- if ctx.obj['config']['args']:
- conf['scheduler']['args'].update(ctx.obj['config']['args'])
- app.config.update(conf)
+ from swh.scheduler.api import server
+ server.app.scheduler = ctx.obj['scheduler']
+ server.app.config.update(ctx.obj['config'])
if debug is None:
debug = ctx.obj['loglevel'] <= logging.DEBUG
-
- app.run(host, port=port, debug=bool(debug))
+ server.app.run(host, port=port, debug=bool(debug))
@cli.group('task-type')

File Metadata

Mime Type
text/plain
Expires
Dec 21 2024, 8:59 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226055

Event Timeline