diff --git a/setup.py b/setup.py
index 65d2792..15163f0 100755
--- a/setup.py
+++ b/setup.py
@@ -1,69 +1,69 @@
 #!/usr/bin/env python3
 # Copyright (C) 2015-2018  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 from setuptools import setup, find_packages
 from os import path
 from io import open
 here = path.abspath(path.dirname(__file__))
 # Get the long description from the README file
 with open(path.join(here, 'README.md'), encoding='utf-8') as f:
     long_description = f.read()
 def parse_requirements(name=None):
     if name:
         reqf = 'requirements-%s.txt' % name
         reqf = 'requirements.txt'
     requirements = []
     if not path.exists(reqf):
         return requirements
     with open(reqf) as f:
         for line in f.readlines():
             line = line.strip()
             if not line or line.startswith('#'):
     return requirements
     description='Software Heritage Scheduler',
     author='Software Heritage developers',
     install_requires=parse_requirements() + parse_requirements('swh'),
     extras_require={'testing': parse_requirements('test')},
-        swh-scheduler=swh.scheduler.cli:cli
+        swh-scheduler=swh.scheduler.cli:main
         "Programming Language :: Python :: 3",
         "Intended Audience :: Developers",
         "License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
         "Operating System :: OS Independent",
         "Development Status :: 5 - Production/Stable",
         'Bug Reports': 'https://forge.softwareheritage.org/maniphest',
         'Funding': 'https://www.softwareheritage.org/donate',
         'Source': 'https://forge.softwareheritage.org/source/swh-scheduler',
diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py
index 91c1c5d..e0237b7 100644
--- a/swh/scheduler/cli.py
+++ b/swh/scheduler/cli.py
@@ -1,665 +1,669 @@
 # Copyright (C) 2016-2018  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 import arrow
 import click
 import csv
 import itertools
 import json
 import locale
 import logging
 import time
 from swh.core import utils, config
 from . import compute_nb_tasks_from
 from .backend_es import SWHElasticSearchClient
 from . import get_scheduler, DEFAULT_CONFIG
 locale.setlocale(locale.LC_ALL, '')
 ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0]
 class DateTimeType(click.ParamType):
     name = 'time and date'
     def convert(self, value, param, ctx):
         if not isinstance(value, arrow.Arrow):
             value = arrow.get(value)
         return value
 DATETIME = DateTimeType()
 CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
 def pretty_print_list(list, indent):
     """Pretty-print a list"""
     return ''.join('%s%s\n' % (' ' * indent, item) for item in list)
 def pretty_print_dict(dict, indent):
     """Pretty-print a list"""
     return ''.join('%s%s: %s\n' %
                    (' ' * indent, click.style(key, bold=True), value)
                    for key, value in dict.items())
 def pretty_print_task(task, full=False):
     """Pretty-print a task
     If 'full' is True, also print the status and priority fields.
     next_run = arrow.get(task['next_run'])
     lines = [
         '%s %s\n' % (click.style('Task', bold=True), task['id']),
         click.style('  Next run: ', bold=True),
         "%s (%s)" % (next_run.humanize(locale=ARROW_LOCALE),
         click.style('  Interval: ', bold=True),
         str(task['current_interval']), '\n',
         click.style('  Type: ', bold=True), task['type'] or '', '\n',
         click.style('  Policy: ', bold=True), task['policy'] or '', '\n',
     if full:
         lines += [
             click.style('  Status: ', bold=True),
             task['status'] or '', '\n',
             click.style('  Priority: ', bold=True),
             task['priority'] or '', '\n',
     lines += [
         click.style('  Args:\n', bold=True),
         pretty_print_list(task['arguments']['args'], indent=4),
         click.style('  Keyword args:\n', bold=True),
         pretty_print_dict(task['arguments']['kwargs'], indent=4),
     return ''.join(lines)
 @click.option('--config-file', '-C', default=None,
               type=click.Path(exists=True, dir_okay=False,),
               help="Configuration file.")
 @click.option('--database', '-d', default=None,
               help="Scheduling database DSN (imply cls is 'local')")
 @click.option('--url', '-u', default=None,
               help="Scheduler's url access (imply cls is 'remote')")
 @click.option('--log-level', '-l', default='INFO',
               help="Log level (default to INFO)")
 def cli(ctx, config_file, database, url, log_level):
     """Software Heritage Scheduler CLI interface
     Default to use the the local scheduler instance (plugged to the
     main scheduler db).
     from swh.scheduler.celery_backend.config import setup_log_handler
     log_level = setup_log_handler(
         loglevel=log_level, colorize=False,
         format='[%(levelname)s] %(name)s -- %(message)s')
     logger = logging.getLogger(__name__)
     scheduler = None
     conf = config.read(config_file, DEFAULT_CONFIG)
     if 'scheduler' not in conf:
         raise ValueError("missing 'scheduler' configuration")
     if database:
         conf['scheduler']['cls'] = 'local'
         conf['scheduler']['args']['db'] = database
     elif url:
         conf['scheduler']['cls'] = 'remote'
         conf['scheduler']['args'] = {'url': url}
     sched_conf = conf['scheduler']
         logger.debug('Instanciating scheduler with %s' % (
         scheduler = get_scheduler(**sched_conf)
     except ValueError:
         # it's the subcommand to decide whether not having a proper
         # scheduler instance is a problem.
     ctx.obj['scheduler'] = scheduler
     ctx.obj['config'] = conf
     ctx.obj['loglevel'] = log_level
 def task(ctx):
     """Manipulate tasks."""
 @click.option('--columns', '-c', multiple=True,
               default=['type', 'args', 'kwargs', 'next_run'],
                   'type', 'args', 'kwargs', 'policy', 'next_run']),
               help='columns present in the CSV file')
 @click.option('--delimiter', '-d', default=',')
 @click.argument('file', type=click.File(encoding='utf-8'))
 def schedule_tasks(ctx, columns, delimiter, file):
     """Schedule tasks from a CSV input file.
     The following columns are expected, and can be set through the -c option:
      - type: the type of the task to be scheduled (mandatory)
      - args: the arguments passed to the task (JSON list, defaults to an empty
      - kwargs: the keyword arguments passed to the task (JSON object, defaults
        to an empty dict)
      - next_run: the date at which the task should run (datetime, defaults to
     The CSV can be read either from a named file, or from stdin (use - as
     Use sample:
     cat scheduling-task.txt | \
         python3 -m swh.scheduler.cli \
             --database 'service=swh-scheduler-dev' \
             task schedule \
                 --columns type --columns kwargs --columns policy \
                 --delimiter ';' -
     tasks = []
     now = arrow.utcnow()
     scheduler = ctx.obj['scheduler']
     if not scheduler:
         raise ValueError('Scheduler class (local/remote) must be instantiated')
     reader = csv.reader(file, delimiter=delimiter)
     for line in reader:
         task = dict(zip(columns, line))
         args = json.loads(task.pop('args', '[]'))
         kwargs = json.loads(task.pop('kwargs', '{}'))
         task['arguments'] = {
             'args': args,
             'kwargs': kwargs,
         task['next_run'] = DATETIME.convert(task.get('next_run', now),
                                             None, None)
     created = scheduler.create_tasks(tasks)
     output = [
         'Created %d tasks\n' % len(created),
     for task in created:
 @click.argument('type', nargs=1, required=True)
 @click.argument('options', nargs=-1)
 @click.option('--policy', '-p', default='recurring',
               type=click.Choice(['recurring', 'oneshot']))
 @click.option('--priority', '-P', default=None,
               type=click.Choice(['low', 'normal', 'high']))
 @click.option('--next-run', '-n', default=None)
 def schedule_task(ctx, type, options, policy, priority, next_run):
     """Schedule one task from arguments.
     Usage sample:
     swh-scheduler --database 'service=swh-scheduler' \
         task add swh-lister-pypi
     swh-scheduler --database 'service=swh-scheduler' \
         task add swh-lister-debian --policy=oneshot distribution=stretch
     Note: if the priority is not given, the task won't have the priority set,
     which is considered as the lowest priority level.
     scheduler = ctx.obj['scheduler']
     if not scheduler:
         raise ValueError('Scheduler class (local/remote) must be instantiated')
     now = arrow.utcnow()
     args = [x for x in options if '=' not in x]
     kw = dict(x.split('=', 1) for x in options if '=' in x)
     task = {'type': type,
             'policy': policy,
             'priority': priority,
             'arguments': {
                 'args': args,
                 'kwargs': kw,
             'next_run': DATETIME.convert(next_run or now,
                                          None, None),
     created = scheduler.create_tasks([task])
     output = [
         'Created %d tasks\n' % len(created),
     for task in created:
 @click.argument('task-types', required=True, nargs=-1)
 @click.option('--limit', '-l', required=False, type=click.INT,
               help='The maximum number of tasks to fetch')
 @click.option('--before', '-b', required=False, type=DATETIME,
               help='List all jobs supposed to run before the given date')
 def list_pending_tasks(ctx, task_types, limit, before):
     """List the tasks that are going to be run.
     You can override the number of tasks to fetch
     scheduler = ctx.obj['scheduler']
     if not scheduler:
         raise ValueError('Scheduler class (local/remote) must be instantiated')
     num_tasks, num_tasks_priority = compute_nb_tasks_from(limit)
     output = []
     for task_type in task_types:
         pending = scheduler.peek_ready_tasks(
             task_type, timestamp=before,
             num_tasks=num_tasks, num_tasks_priority=num_tasks_priority)
         output.append('Found %d %s tasks\n' % (
             len(pending), task_type))
         for task in pending:
 @click.option('--task-id', '-i', default=None, multiple=True, metavar='ID',
               help='List only tasks whose id is ID.')
 @click.option('--task-type', '-t', default=None, multiple=True, metavar='TYPE',
               help='List only tasks of type TYPE')
 @click.option('--limit', '-l', required=False, type=click.INT,
               help='The maximum number of tasks to fetch.')
 @click.option('--status', '-s', multiple=True, metavar='STATUS',
               help='List tasks whose status is STATUS.')
 @click.option('--policy', '-p', default=None,
               type=click.Choice(['recurring', 'oneshot']),
               help='List tasks whose policy is POLICY.')
 @click.option('--priority', '-P', default=None, multiple=True,
               type=click.Choice(['all', 'low', 'normal', 'high']),
               help='List tasks whose priority is PRIORITY.')
 @click.option('--before', '-b', required=False, type=DATETIME,
               help='Limit to tasks supposed to run before the given date.')
 @click.option('--after', '-a', required=False, type=DATETIME,
               help='Limit to tasks supposed to run after the given date.')
 def list_tasks(ctx, task_id, task_type, limit, status, policy, priority,
                before, after):
     """List tasks.
     scheduler = ctx.obj['scheduler']
     if not scheduler:
         raise ValueError('Scheduler class (local/remote) must be instantiated')
     if not task_type:
         task_type = [x['type'] for x in scheduler.get_task_types()]
     # if task_id is not given, default value for status is
     #  'next_run_not_scheduled'
     # if task_id is given, default status is 'all'
     if task_id is None and status is None:
         status = ['next_run_not_scheduled']
     if status and 'all' in status:
         status = None
     if priority and 'all' in priority:
         priority = None
     output = []
     tasks = scheduler.search_tasks(
         status=status, priority=priority, policy=policy,
         before=before, after=after,
     output.append('Found %d tasks\n' % (
     for task in tasks:
         output.append(pretty_print_task(task, full=True))
 @click.argument('task-ids', required=True, nargs=-1)
 @click.option('--next-run', '-n', required=False, type=DATETIME,
               metavar='DATETIME', default=None,
               help='Re spawn the selected tasks at this date')
 def respawn_tasks(ctx, task_ids, next_run):
     """Respawn tasks.
     Respawn tasks given by their ids (see the 'task list' command to
     find task ids) at the given date (immediately by default).
        swh-scheduler task respawn 1 3 12
     scheduler = ctx.obj['scheduler']
     if not scheduler:
         raise ValueError('Scheduler class (local/remote) must be instantiated')
     if next_run is None:
         next_run = arrow.utcnow()
     output = []
         task_ids, status='next_run_not_scheduled', next_run=next_run)
     output.append('Respawn tasks %s\n' % (task_ids,))
 @click.option('--before', '-b', default=None,
               help='''Task whose ended date is anterior will be archived.
                       Default to current month's first day.''')
 @click.option('--after', '-a', default=None,
               help='''Task whose ended date is after the specified date will
                       be archived. Default to prior month's first day.''')
 @click.option('--batch-index', default=1000, type=click.INT,
               help='Batch size of tasks to read from db to archive')
 @click.option('--bulk-index', default=200, type=click.INT,
               help='Batch size of tasks to bulk index')
 @click.option('--batch-clean', default=1000, type=click.INT,
               help='Batch size of task to clean after archival')
 @click.option('--dry-run/--no-dry-run', is_flag=True, default=False,
               help='Default to list only what would be archived.')
 @click.option('--verbose', is_flag=True, default=False,
               help='Default to list only what would be archived.')
 @click.option('--cleanup/--no-cleanup', is_flag=True, default=True,
               help='Clean up archived tasks (default)')
 @click.option('--start-from', type=click.INT, default=-1,
               help='(Optional) default task id to start from. Default is -1.')
 def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean,
                   dry_run, verbose, cleanup, start_from):
     """Archive task/task_run whose (task_type is 'oneshot' and task_status
        is 'completed') or (task_type is 'recurring' and task_status is
        With --dry-run flag set (default), only list those.
     scheduler = ctx.obj['scheduler']
     if not scheduler:
         raise ValueError('Scheduler class (local/remote) must be instantiated')
     es_client = SWHElasticSearchClient()
     logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)
     log = logging.getLogger('swh.scheduler.cli.archive')
     if dry_run:
         log.info('**DRY-RUN** (only reading db)')
     if not cleanup:
         log.info('**NO CLEANUP**')
     now = arrow.utcnow()
     # Default to archive tasks from a rolling month starting the week
     # prior to the current one
     if not before:
         before = now.shift(weeks=-1).format('YYYY-MM-DD')
     if not after:
         after = now.shift(weeks=-1).shift(months=-1).format('YYYY-MM-DD')
     log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % (
         not dry_run, not dry_run and cleanup, after, before))
     def group_by_index_name(data, es_client=es_client):
         """Given a data record, determine the index's name through its ending
            date. This varies greatly depending on the task_run's
         date = data.get('started')
         if not date:
             date = data['scheduled']
         return es_client.compute_index_name(date.year, date.month)
     def index_data(before, last_id, batch_index):
         tasks_in = scheduler.filter_task_to_archive(
             after, before, last_id=last_id, limit=batch_index)
         for index_name, tasks_group in itertools.groupby(
                 tasks_in, key=group_by_index_name):
             log.debug('Index tasks to %s' % index_name)
             if dry_run:
                 for task in tasks_group:
                     yield task
             yield from es_client.streaming_bulk(
                 index_name, tasks_group, source=['task_id', 'task_run_id'],
                 chunk_size=bulk_index, log=log)
     gen = index_data(before, last_id=start_from, batch_index=batch_index)
     if cleanup:
         for task_ids in utils.grouper(gen, n=batch_clean):
             task_ids = list(task_ids)
             log.info('Clean up %s tasks: [%s, ...]' % (
                 len(task_ids), task_ids[0]))
             if dry_run:  # no clean up
         for task_ids in utils.grouper(gen, n=batch_index):
             task_ids = list(task_ids)
             log.info('Indexed %s tasks: [%s, ...]' % (
                 len(task_ids), task_ids[0]))
 @click.option('--period', '-p', default=0,
               help=('Period (in s) at witch pending tasks are checked and '
                     'executed. Set to 0 (default) for a one shot.'))
 def runner(ctx, period):
     """Starts a swh-scheduler runner service.
     This process is responsible for checking for ready-to-run tasks and
     schedule them."""
     from swh.scheduler.celery_backend.runner import run_ready_tasks
     from swh.scheduler.celery_backend.config import app
     logger = logging.getLogger(__name__ + '.runner')
     scheduler = ctx.obj['scheduler']
     logger.debug('Scheduler %s' % scheduler)
         while True:
             logger.debug('Run ready tasks')
                 ntasks = len(run_ready_tasks(scheduler, app))
                 if ntasks:
                     logger.info('Scheduled %s tasks', ntasks)
             except Exception:
                 logger.exception('Unexpected error in run_ready_tasks()')
             if not period:
     except KeyboardInterrupt:
 def listener(ctx):
     """Starts a swh-scheduler listener service.
     This service is responsible for listening at task lifecycle events and
     handle their workflow status in the database."""
     scheduler = ctx.obj['scheduler']
     if not scheduler:
         raise ValueError('Scheduler class (local/remote) must be instantiated')
     from swh.scheduler.celery_backend.listener import (
         event_monitor, main_app)
     event_monitor(main_app, backend=scheduler)
 @click.option('--host', default='',
               help="Host to run the scheduler server api")
 @click.option('--port', default=5008, type=click.INT,
               help="Binding port of the server")
 @click.option('--debug/--nodebug', default=None,
               help=("Indicates if the server should run in debug mode. "
                     "Defaults to True if log-level is DEBUG, False otherwise.")
 def api_server(ctx, host, port, debug):
     """Starts a swh-scheduler API HTTP server.
     if ctx.obj['config']['scheduler']['cls'] == 'remote':
         click.echo("The API server can only be started with a 'local' "
                    "configuration", err=True)
     from swh.scheduler.api import server
     server.app.scheduler = ctx.obj['scheduler']
     if debug is None:
         debug = ctx.obj['loglevel'] <= logging.DEBUG
     server.app.run(host, port=port, debug=bool(debug))
 def task_type(ctx):
     """Manipulate task types."""
 @click.option('--verbose', '-v', is_flag=True, default=False)
 @click.option('--task_type', '-t', multiple=True, default=None,
               help='List task types of given type')
 @click.option('--task_name', '-n', multiple=True, default=None,
               help='List task types of given backend task name')
 def list_task_types(ctx, verbose, task_type, task_name):
     click.echo("Known task types:")
     if verbose:
         tmpl = click.style('{type}: ', bold=True) + '''{backend_name}
   interval: {default_interval} [{min_interval}, {max_interval}]
   backoff_factor: {backoff_factor}
   max_queue_length: {max_queue_length}
   num_retries: {num_retries}
   retry_delay: {retry_delay}
         tmpl = '{type}:\n  {description}'
     for tasktype in ctx.obj['scheduler'].get_task_types():
         if task_type and tasktype['type'] not in task_type:
         if task_name and tasktype['backend_name'] not in task_name:
 @click.argument('type', required=1)
 @click.argument('task-name', required=1)
 @click.argument('description', required=1)
 @click.option('--default-interval', '-i', default='90 days',
               help='Default interval ("90 days" by default)')
 @click.option('--min-interval', default=None,
               help='Minimum interval (default interval if not set)')
 @click.option('--max-interval', '-i', default=None,
               help='Maximal interval (default interval if not set)')
 @click.option('--backoff-factor', '-f', type=float, default=1,
               help='Backoff factor')
 def add_task_type(ctx, type, task_name, description,
                   default_interval, min_interval, max_interval,
     """Create a new task type
     scheduler = ctx.obj['scheduler']
     if not scheduler:
         raise ValueError('Scheduler class (local/remote) must be instantiated')
     task_type = dict(
 @click.option('--verbose/--no-verbose', '-v', default=False,
               help='Verbose mode')
 def updater(ctx, verbose):
     """Insert tasks in the scheduler from the scheduler-updater's events
     from swh.scheduler.updater.writer import UpdaterWriter
 @click.option('--verbose/--no-verbose', '-v', default=False,
               help='Verbose mode')
 def ghtorrent(ctx, verbose):
     """Consume events from ghtorrent and write them to cache.
     from swh.scheduler.updater.ghtorrent import GHTorrentConsumer
     from swh.scheduler.updater.backend import SchedulerUpdaterBackend
     ght_config = ctx.obj['config'].get('ghtorrent', {})
     back_config = ctx.obj['config'].get('scheduler_updater', {})
     backend = SchedulerUpdaterBackend(**back_config)
     GHTorrentConsumer(backend, **ght_config).run()
+def main():
+    return cli(auto_envvar_prefix='SWH_SCHEDULER')
 if __name__ == '__main__':
-    cli()
+    main()