diff --git a/debian/control b/debian/control
index 6eca8ff..82c3ef3 100644
--- a/debian/control
+++ b/debian/control
@@ -1,24 +1,24 @@
 Source: swh-scheduler
 Maintainer: Software Heritage developers <swh-devel@inria.fr>
 Section: python
 Priority: optional
 Build-Depends: debhelper (>= 9),
                dh-python (>= 2),
                python3-all,
                python3-arrow,
                python3-celery,
                python3-click,
                python3-elasticsearch (>= 5.4.0),
                python3-flask,
                python3-nose,
                python3-psycopg2,
                python3-setuptools,
-               python3-swh.core (>= 0.0.34),
+               python3-swh.core (>= 0.0.38~),
                python3-vcversioner
 Standards-Version: 3.9.6
 Homepage: https://forge.softwareheritage.org/diffusion/DSCH/
 
 Package: python3-swh.scheduler
 Architecture: all
-Depends: python3-swh.core (>= 0.0.34), ${misc:Depends}, ${python3:Depends}
+Depends: python3-swh.core (>= 0.0.38~), ${misc:Depends}, ${python3:Depends}
 Description: Software Heritage Scheduler
diff --git a/requirements-swh.txt b/requirements-swh.txt
index aaeccaa..a152b02 100644
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1 +1 @@
-swh.core >= 0.0.34
+swh.core >= 0.0.38
diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py
index f044b9f..a73cd68 100644
--- a/swh/scheduler/api/client.py
+++ b/swh/scheduler/api/client.py
@@ -1,96 +1,103 @@
 # Copyright (C) 2018  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 
 from swh.core.api import SWHRemoteAPI
 
 
 class SchedulerAPIError(Exception):
     """Specific internal scheduler api issue (mainly connection)
 
     """
 
     def __str__(self):
         args = self.args
         return 'An unexpected error occurred in the api backend: %s' % args
 
 
 class RemoteScheduler(SWHRemoteAPI):
     """Proxy to a remote scheduler API
 
     """
     def __init__(self, url):
         super().__init__(api_exception=SchedulerAPIError, url=url)
 
+    def close_connection(self):
+        return self.post('close_connection', {})
+
+    def set_status_tasks(self, task_ids, status='disabled'):
+        return self.post('set_status_tasks', {'task_ids': task_ids,
+                                              'status': status})
+
     def create_task_type(self, task_type):
         return self.post('create_task_type', {'task_type': task_type})
 
     def get_task_type(self, task_type_name):
         return self.post('get_task_type', {'task_type_name': task_type_name})
 
     def get_task_types(self):
         return self.post('get_task_types', {})
 
     def create_tasks(self, tasks):
         return self.post('create_tasks', {'tasks': tasks})
 
     def disable_tasks(self, task_ids):
         return self.post('disable_tasks', {'task_ids': task_ids})
 
     def get_tasks(self, task_ids):
         return self.post('get_tasks', {'task_ids': task_ids})
 
     def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None):
         return self.post('peek_ready_tasks', {
             'task_type': task_type,
             'timestamp': timestamp,
             'num_tasks': num_tasks,
         })
 
     def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None):
         return self.post('grab_ready_tasks', {
             'task_type': task_type,
             'timestamp': timestamp,
             'num_tasks': num_tasks,
         })
 
     def schedule_task_run(self, task_id, backend_id, metadata=None,
                           timestamp=None):
         return self.post('schedule_task_run', {
             'task_id': task_id,
             'backend_id': backend_id,
             'metadata': metadata,
             'timestamp': timestamp,
         })
 
     def mass_schedule_task_runs(self, task_runs):
         return self.post('mass_schedule_task_runs', {'task_runs': task_runs})
 
     def start_task_run(self, backend_id, metadata=None, timestamp=None):
         return self.post('start_task_run', {
             'backend_id': backend_id,
             'metadata': metadata,
             'timestamp': timestamp,
         })
 
     def end_task_run(self, backend_id, status, metadata=None, timestamp=None):
         return self.post('end_task_run', {
             'backend_id': backend_id,
             'status': status,
             'metadata': metadata,
             'timestamp': timestamp,
         })
 
     def filter_task_to_archive(self, after_ts, before_ts, limit=10,
                                last_id=-1):
         return self.post('filter_task_to_archive', {
             'after_ts': after_ts,
             'before_ts': before_ts,
             'limit': limit,
             'last_id': last_id,
         })
 
     def delete_archived_tasks(self, task_ids):
         return self.post('delete_archived_tasks', {'task_ids': task_ids})
diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py
index 687c378..c7f1de8 100644
--- a/swh/scheduler/api/server.py
+++ b/swh/scheduler/api/server.py
@@ -1,144 +1,154 @@
 # Copyright (C) 2018  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import logging
 import click
 
 from flask import g, request
 
 from swh.core import config
 from swh.scheduler import get_scheduler
 from swh.core.api import (SWHServerAPIApp, decode_request,
                           error_handler,
                           encode_data_server as encode_data)
 
 DEFAULT_CONFIG_PATH = 'backend/scheduler'
 DEFAULT_CONFIG = {
     'scheduler': ('dict', {
         'cls': 'local',
         'args': {
             'scheduling_db': 'dbname=softwareheritage-scheduler-dev',
         },
     })
 }
 
 
 app = SWHServerAPIApp(__name__)
 
 
 @app.errorhandler(Exception)
 def my_error_handler(exception):
     return error_handler(exception, encode_data)
 
 
 @app.before_request
 def before_request():
     g.scheduler = get_scheduler(**app.config['scheduler'])
 
 
 @app.route('/')
 def index():
     return 'SWH Scheduler API server'
 
 
+@app.route('/close_connection', methods=['POST'])
+def close_connection():
+    return encode_data(g.scheduler.close_connection())
+
+
+@app.route('/set_status_tasks', methods=['POST'])
+def set_status_tasks():
+    return encode_data(g.scheduler.set_status_tasks(**decode_request(request)))
+
+
 @app.route('/create_task_type', methods=['POST'])
 def create_task_type():
     return encode_data(g.scheduler.create_task_type(**decode_request(request)))
 
 
 @app.route('/get_task_type', methods=['POST'])
 def get_task_type():
     return encode_data(g.scheduler.get_task_type(**decode_request(request)))
 
 
 @app.route('/get_task_types', methods=['POST'])
 def get_task_types():
     return encode_data(g.scheduler.get_task_types(**decode_request(request)))
 
 
 @app.route('/create_tasks', methods=['POST'])
 def create_tasks():
     return encode_data(g.scheduler.create_tasks(**decode_request(request)))
 
 
 @app.route('/disable_tasks', methods=['POST'])
 def disable_tasks():
     return encode_data(g.scheduler.disable_tasks(**decode_request(request)))
 
 
 @app.route('/get_tasks', methods=['POST'])
 def get_tasks():
     return encode_data(g.scheduler.get_tasks(**decode_request(request)))
 
 
 @app.route('/peek_ready_tasks', methods=['POST'])
 def peek_ready_tasks():
     return encode_data(g.scheduler.peek_ready_tasks(**decode_request(request)))
 
 
 @app.route('/grab_ready_tasks', methods=['POST'])
 def grab_ready_tasks():
     return encode_data(g.scheduler.grab_ready_tasks(**decode_request(request)))
 
 
 @app.route('/schedule_task_run', methods=['POST'])
 def schedule_task_run():
     return encode_data(g.scheduler.schedule_task_run(
         **decode_request(request)))
 
 
 @app.route('/mass_schedule_task_runs', methods=['POST'])
 def mass_schedule_task_runs():
     return encode_data(
         g.scheduler.mass_schedule_task_runs(**decode_request(request)))
 
 
 @app.route('/start_task_run', methods=['POST'])
 def start_task_run():
     return encode_data(g.scheduler.start_task_run(**decode_request(request)))
 
 
 @app.route('/end_task_run', methods=['POST'])
 def end_task_run():
     return encode_data(g.scheduler.end_task_run(**decode_request(request)))
 
 
 @app.route('/filter_task_to_archive', methods=['POST'])
 def filter_task_to_archive():
     return encode_data(
         g.scheduler.filter_task_to_archive(**decode_request(request)))
 
 
 @app.route('/delete_archived_tasks', methods=['POST'])
 def delete_archived_tasks():
     return encode_data(
         g.scheduler.delete_archived_tasks(**decode_request(request)))
 
 
 def run_from_webserver(environ, start_response,
                        config_path=DEFAULT_CONFIG_PATH):
     """Run the WSGI app from the webserver, loading the configuration."""
     cfg = config.load_named_config(config_path, DEFAULT_CONFIG)
     app.config.update(cfg)
     handler = logging.StreamHandler()
     app.logger.addHandler(handler)
     return app(environ, start_response)
 
 
 @click.command()
 @click.argument('config-path', required=1)
 @click.option('--host', default='0.0.0.0',
               help="Host to run the scheduler server api")
 @click.option('--port', default=5008, type=click.INT,
               help="Binding port of the server")
 @click.option('--debug/--nodebug', default=True,
               help="Indicates if the server should run in debug mode")
 def launch(config_path, host, port, debug):
     app.config.update(config.read(config_path, DEFAULT_CONFIG))
     app.run(host, port=port, debug=bool(debug))
 
 
 if __name__ == '__main__':
     launch()
diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
index e93391c..a288a5c 100644
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -1,488 +1,493 @@
 # Copyright (C) 2015-2018  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import binascii
 import datetime
 from functools import wraps
 import json
 import tempfile
 
 from arrow import Arrow, utcnow
 import psycopg2
 import psycopg2.extras
 from psycopg2.extensions import AsIs
 
 from swh.core.config import SWHConfig
 
 
 def adapt_arrow(arrow):
     return AsIs("'%s'::timestamptz" % arrow.isoformat())
 
 
 psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
 psycopg2.extensions.register_adapter(Arrow, adapt_arrow)
 
 
 def autocommit(fn):
     @wraps(fn)
     def wrapped(self, *args, **kwargs):
         autocommit = False
         if 'cursor' not in kwargs or not kwargs['cursor']:
             autocommit = True
             kwargs['cursor'] = self.cursor()
 
         try:
             ret = fn(self, *args, **kwargs)
         except Exception:
             if autocommit:
                 self.rollback()
             raise
 
         if autocommit:
             self.commit()
 
         return ret
 
     return wrapped
 
 
 class SchedulerBackend(SWHConfig):
     """Backend for the Software Heritage scheduling database.
 
     """
     CONFIG_BASE_FILENAME = 'scheduler'
     DEFAULT_CONFIG = {
         'scheduling_db': ('str', 'dbname=softwareheritage-scheduler-dev'),
     }
 
     def __init__(self, **override_config):
         self.config = self.parse_config_file(global_config=False)
         self.config.update(override_config)
 
         self.db = None
 
         self.reconnect()
 
     def reconnect(self):
         if not self.db or self.db.closed:
             self.db = psycopg2.connect(
                 dsn=self.config['scheduling_db'],
                 cursor_factory=psycopg2.extras.RealDictCursor,
             )
 
     def cursor(self):
         """Return a fresh cursor on the database, with auto-reconnection in
         case of failure
 
         """
         cur = None
 
         # Get a fresh cursor and reconnect at most three times
         tries = 0
         while True:
             tries += 1
             try:
                 cur = self.db.cursor()
                 cur.execute('select 1')
                 break
             except psycopg2.OperationalError:
                 if tries < 3:
                     self.reconnect()
                 else:
                     raise
 
         return cur
 
     def commit(self):
         """Commit a transaction"""
         self.db.commit()
 
     def rollback(self):
         """Rollback a transaction"""
         self.db.rollback()
 
+    def close_connection(self):
+        """Close db connection"""
+        if self.db and not self.db.closed:
+            self.db.close()
+
     def copy_to(self, items, tblname, columns, cursor=None, item_cb=None):
         def escape(data):
             if data is None:
                 return ''
             if isinstance(data, bytes):
                 return '\\x%s' % binascii.hexlify(data).decode('ascii')
             elif isinstance(data, str):
                 return '"%s"' % data.replace('"', '""')
             elif isinstance(data, (datetime.datetime, Arrow)):
                 # We escape twice to make sure the string generated by
                 # isoformat gets escaped
                 return escape(data.isoformat())
             elif isinstance(data, dict):
                 return escape(json.dumps(data))
             elif isinstance(data, list):
                 return escape("{%s}" % ','.join(escape(d) for d in data))
             elif isinstance(data, psycopg2.extras.Range):
                 # We escape twice here too, so that we make sure
                 # everything gets passed to copy properly
                 return escape(
                     '%s%s,%s%s' % (
                         '[' if data.lower_inc else '(',
                         '-infinity' if data.lower_inf else escape(data.lower),
                         'infinity' if data.upper_inf else escape(data.upper),
                         ']' if data.upper_inc else ')',
                     )
                 )
             else:
                 # We don't escape here to make sure we pass literals properly
                 return str(data)
         with tempfile.TemporaryFile('w+') as f:
             for d in items:
                 if item_cb is not None:
                     item_cb(d)
                 line = [escape(d.get(k)) for k in columns]
                 f.write(','.join(line))
                 f.write('\n')
             f.seek(0)
             cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % (
                 tblname, ', '.join(columns)), f)
 
     task_type_keys = [
         'type', 'description', 'backend_name', 'default_interval',
         'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length',
         'num_retries', 'retry_delay',
     ]
 
     def _format_query(self, query, keys):
         """Format a query with the given keys"""
 
         query_keys = ', '.join(keys)
         placeholders = ', '.join(['%s'] * len(keys))
 
         return query.format(keys=query_keys, placeholders=placeholders)
 
     def _format_multiquery(self, query, keys, values):
         """Format a query with placeholders generated for multiple values"""
         query_keys = ', '.join(keys)
         placeholders = '), ('.join(
             [', '.join(['%s'] * len(keys))] * len(values)
         )
         ret_values = sum([[value[key] for key in keys]
                           for value in values], [])
 
         return (
             query.format(keys=query_keys, placeholders=placeholders),
             ret_values,
         )
 
     @autocommit
     def create_task_type(self, task_type, cursor=None):
         """Create a new task type ready for scheduling.
 
         Args:
             task_type (dict): a dictionary with the following keys:
 
                 - type (str): an identifier for the task type
                 - description (str): a human-readable description of what the
                   task does
                 - backend_name (str): the name of the task in the
                   job-scheduling backend
                 - default_interval (datetime.timedelta): the default interval
                   between two task runs
                 - min_interval (datetime.timedelta): the minimum interval
                   between two task runs
                 - max_interval (datetime.timedelta): the maximum interval
                   between two task runs
                 - backoff_factor (float): the factor by which the interval
                   changes at each run
                 - max_queue_length (int): the maximum length of the task queue
                   for this task type
 
         """
         query = self._format_query(
             """insert into task_type ({keys}) values ({placeholders})""",
             self.task_type_keys,
         )
         cursor.execute(query, [task_type[key] for key in self.task_type_keys])
 
     @autocommit
     def get_task_type(self, task_type_name, cursor=None):
         """Retrieve the task type with id task_type_name"""
         query = self._format_query(
             "select {keys} from task_type where type=%s",
             self.task_type_keys,
         )
         cursor.execute(query, (task_type_name,))
 
         ret = cursor.fetchone()
 
         return ret
 
     @autocommit
     def get_task_types(self, cursor=None):
         query = self._format_query(
             "select {keys} from task_type",
             self.task_type_keys,
         )
         cursor.execute(query)
         ret = cursor.fetchall()
         return ret
 
     task_create_keys = [
         'type', 'arguments', 'next_run', 'policy', 'retries_left',
     ]
     task_keys = task_create_keys + ['id', 'current_interval', 'status']
 
     @autocommit
     def create_tasks(self, tasks, cursor=None):
         """Create new tasks.
 
         Args:
             tasks (list): each task is a dictionary with the following keys:
 
                 - type (str): the task type
                 - arguments (dict): the arguments for the task runner, keys:
 
                       - args (list of str): arguments
                       - kwargs (dict str -> str): keyword arguments
 
                 - next_run (datetime.datetime): the next scheduled run for the
                   task
 
         Returns:
             a list of created tasks.
 
         """
         cursor.execute('select swh_scheduler_mktemp_task()')
         self.copy_to(tasks, 'tmp_task', self.task_create_keys, cursor)
         query = self._format_query(
             'select {keys} from swh_scheduler_create_tasks_from_temp()',
             self.task_keys,
         )
         cursor.execute(query)
         return cursor.fetchall()
 
     @autocommit
     def set_status_tasks(self, task_ids, status='disabled', cursor=None):
         """Set the tasks' status whose ids are listed."""
         query = "UPDATE task SET status = %s WHERE id IN %s"
         cursor.execute(query, (status, tuple(task_ids),))
         return None
 
     @autocommit
     def disable_tasks(self, task_ids, cursor=None):
         """Disable the tasks whose ids are listed."""
         return self.set_status_tasks(task_ids)
 
     @autocommit
     def get_tasks(self, task_ids, cursor=None):
         """Retrieve the info of tasks whose ids are listed."""
         query = self._format_query('select {keys} from task where id in %s',
                                    self.task_keys)
         cursor.execute(query, (tuple(task_ids),))
         return cursor.fetchall()
 
     @autocommit
     def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None,
                          cursor=None):
         """Fetch the list of ready tasks
 
         Args:
             task_type (str): filtering task per their type
             timestamp (datetime.datetime): peek tasks that need to be executed
                 before that timestamp
             num_tasks (int): only peek at num_tasks tasks
 
         Returns:
             a list of tasks
         """
 
         if timestamp is None:
             timestamp = utcnow()
 
         cursor.execute(
             'select * from swh_scheduler_peek_ready_tasks(%s, %s, %s)',
             (task_type, timestamp, num_tasks)
         )
 
         return cursor.fetchall()
 
     @autocommit
     def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None,
                          cursor=None):
         """Fetch the list of ready tasks, and mark them as scheduled
 
         Args:
             task_type (str): filtering task per their type
             timestamp (datetime.datetime): grab tasks that need to be executed
                 before that timestamp
             num_tasks (int): only grab num_tasks tasks
 
         Returns:
             a list of tasks
         """
 
         if timestamp is None:
             timestamp = utcnow()
 
         cursor.execute(
             'select * from swh_scheduler_grab_ready_tasks(%s, %s, %s)',
             (task_type, timestamp, num_tasks)
         )
 
         return cursor.fetchall()
 
     task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata']
 
     @autocommit
     def schedule_task_run(self, task_id, backend_id, metadata=None,
                           timestamp=None, cursor=None):
         """Mark a given task as scheduled, adding a task_run entry in the database.
 
         Args:
             task_id (int): the identifier for the task being scheduled
             backend_id (str): the identifier of the job in the backend
             metadata (dict): metadata to add to the task_run entry
             timestamp (datetime.datetime): the instant the event occurred
 
         Returns:
             a fresh task_run entry
 
         """
 
         if metadata is None:
             metadata = {}
 
         if timestamp is None:
             timestamp = utcnow()
 
         cursor.execute(
             'select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)',
             (task_id, backend_id, metadata, timestamp)
         )
 
         return cursor.fetchone()
 
     @autocommit
     def mass_schedule_task_runs(self, task_runs, cursor=None):
         """Schedule a bunch of task runs.
 
         Args:
             task_runs (list): a list of dicts with keys:
 
                 - task (int): the identifier for the task being scheduled
                 - backend_id (str): the identifier of the job in the backend
                 - metadata (dict): metadata to add to the task_run entry
                 - scheduled (datetime.datetime): the instant the event occurred
 
         Returns:
             None
         """
         cursor.execute('select swh_scheduler_mktemp_task_run()')
         self.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys,
                      cursor)
         cursor.execute('select swh_scheduler_schedule_task_run_from_temp()')
 
     @autocommit
     def start_task_run(self, backend_id, metadata=None, timestamp=None,
                        cursor=None):
         """Mark a given task as started, updating the corresponding task_run
            entry in the database.
 
         Args:
             backend_id (str): the identifier of the job in the backend
             metadata (dict): metadata to add to the task_run entry
             timestamp (datetime.datetime): the instant the event occurred
 
         Returns:
             the updated task_run entry
 
         """
 
         if metadata is None:
             metadata = {}
 
         if timestamp is None:
             timestamp = utcnow()
 
         cursor.execute(
             'select * from swh_scheduler_start_task_run(%s, %s, %s)',
             (backend_id, metadata, timestamp)
         )
 
         return cursor.fetchone()
 
     @autocommit
     def end_task_run(self, backend_id, status, metadata=None, timestamp=None,
                      result=None, cursor=None):
         """Mark a given task as ended, updating the corresponding task_run entry in the
         database.
 
         Args:
             backend_id (str): the identifier of the job in the backend
             status (str): how the task ended; one of: 'eventful', 'uneventful',
                 'failed'
             metadata (dict): metadata to add to the task_run entry
             timestamp (datetime.datetime): the instant the event occurred
 
         Returns:
             the updated task_run entry
 
         """
 
         if metadata is None:
             metadata = {}
 
         if timestamp is None:
             timestamp = utcnow()
 
         cursor.execute(
             'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)',
             (backend_id, status, metadata, timestamp)
         )
 
         return cursor.fetchone()
 
     @autocommit
     def filter_task_to_archive(self, after_ts, before_ts, limit=10, last_id=-1,
                                cursor=None):
         """Returns the list of task/task_run prior to a given date to archive.
 
         """
         last_task_run_id = None
         while True:
             row = None
             cursor.execute(
                 "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)",
                 (after_ts, before_ts, last_id, limit)
             )
             for row in cursor:
                 # nested type index does not accept bare values
                 # transform it as a dict to comply with this
                 row['arguments']['args'] = {
                     i: v for i, v in enumerate(row['arguments']['args'])
                 }
                 kwargs = row['arguments']['kwargs']
                 row['arguments']['kwargs'] = json.dumps(kwargs)
                 yield row
 
             if not row:
                 break
             _id = row.get('task_id')
             _task_run_id = row.get('task_run_id')
             if last_id == _id and last_task_run_id == _task_run_id:
                 break
             last_id = _id
             last_task_run_id = _task_run_id
 
     @autocommit
     def delete_archived_tasks(self, task_ids, cursor=None):
         """Delete archived tasks as much as possible. Only the task_ids whose
            complete associated task_run have been cleaned up will be.
 
         """
         _task_ids = _task_run_ids = []
         for task_id in task_ids:
             _task_ids.append(task_id['task_id'])
             _task_run_ids.append(task_id['task_run_id'])
 
         cursor.execute(
             "select * from swh_scheduler_delete_archived_tasks(%s, %s)",
             (_task_ids, _task_run_ids))
diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py
new file mode 100644
index 0000000..c395e8c
--- /dev/null
+++ b/swh/scheduler/tests/test_api_client.py
@@ -0,0 +1,36 @@
+# Copyright (C) 2018  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import unittest
+
+from swh.core.tests.server_testing import ServerTestFixture
+from swh.scheduler import get_scheduler
+from swh.scheduler.tests.test_scheduler import CommonSchedulerTest
+from swh.scheduler.api.server import app
+
+
+class RemoteSchedulerTest(CommonSchedulerTest, ServerTestFixture,
+                          unittest.TestCase):
+    """Test the remote scheduler API.
+
+    This class doesn't define any tests as we want identical
+    functionality between local and remote scheduler. All the tests are
+    therefore defined in CommonSchedulerTest.
+    """
+
+    def setUp(self):
+        self.config = {
+            'scheduler': {
+                'cls': 'local',
+                'args': {
+                    'scheduling_db': 'dbname=%s' % self.TEST_DB_NAME,
+                }
+            }
+        }
+        self.app = app  # this will setup the local scheduler...
+        super().setUp()
+        # accessible through a remote scheduler accessible on the
+        # given port
+        self.backend = get_scheduler('remote', {'url': self.url()})
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
index 835d876..43edfc5 100644
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -1,353 +1,358 @@
 # Copyright (C) 2017-2018  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import copy
 import datetime
 import os
 import random
 import unittest
 import uuid
 
 from arrow import utcnow
 from collections import defaultdict
 from nose.plugins.attrib import attr
 from nose.tools import istest
 import psycopg2
 
 from swh.core.tests.db_testing import SingleDbTestFixture
 from swh.scheduler import get_scheduler
 
 
 TEST_DIR = os.path.dirname(os.path.abspath(__file__))
 TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata')
 
 
 @attr('db')
-class Scheduler(SingleDbTestFixture, unittest.TestCase):
+class CommonSchedulerTest(SingleDbTestFixture):
     TEST_DB_NAME = 'softwareheritage-scheduler-test'
     TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-scheduler.dump')
 
     def setUp(self):
         super().setUp()
-        self.config = {'scheduling_db': 'dbname=' + self.TEST_DB_NAME}
-        self.backend = get_scheduler('local', self.config)
 
         tt = {
             'type': 'update-git',
             'description': 'Update a git repository',
             'backend_name': 'swh.loader.git.tasks.UpdateGitRepository',
             'default_interval': datetime.timedelta(days=64),
             'min_interval': datetime.timedelta(hours=12),
             'max_interval': datetime.timedelta(days=64),
             'backoff_factor': 2,
             'max_queue_length': None,
             'num_retries': 7,
             'retry_delay': datetime.timedelta(hours=2),
         }
         tt2 = tt.copy()
         tt2['type'] = 'update-hg'
         tt2['description'] = 'Update a mercurial repository'
         tt2['backend_name'] = 'swh.loader.mercurial.tasks.UpdateHgRepository'
         tt2['max_queue_length'] = 42
         tt2['num_retries'] = None
         tt2['retry_delay'] = None
 
         self.task_types = {
             tt['type']: tt,
             tt2['type']: tt2,
         }
 
         self.task1_template = t1_template = {
             'type': tt['type'],
             'arguments': {
                 'args': [],
                 'kwargs': {},
             },
             'next_run': None,
         }
         self.task2_template = t2_template = copy.deepcopy(t1_template)
         t2_template['type'] = tt2['type']
         t2_template['policy'] = 'oneshot'
 
     def tearDown(self):
-        self.backend.db.close()
+        self.backend.close_connection()
         self.empty_tables()
         super().tearDown()
 
     def empty_tables(self):
         self.cursor.execute("""SELECT table_name FROM information_schema.tables
                                WHERE table_schema = %s""", ('public',))
 
         tables = set(table for (table,) in self.cursor.fetchall())
 
         for table in tables:
             self.cursor.execute('truncate table %s cascade' % table)
         self.conn.commit()
 
     @istest
     def add_task_type(self):
         tt, tt2 = self.task_types.values()
         self.backend.create_task_type(tt)
         self.assertEqual(tt, self.backend.get_task_type(tt['type']))
         with self.assertRaisesRegex(psycopg2.IntegrityError,
                                     '\(type\)=\(%s\)' % tt['type']):
             self.backend.create_task_type(tt)
         self.backend.create_task_type(tt2)
         self.assertEqual(tt, self.backend.get_task_type(tt['type']))
         self.assertEqual(tt2, self.backend.get_task_type(tt2['type']))
 
     @istest
     def get_task_types(self):
         tt, tt2 = self.task_types.values()
         self.backend.create_task_type(tt)
         self.backend.create_task_type(tt2)
         self.assertCountEqual([tt2, tt], self.backend.get_task_types())
 
     @staticmethod
     def _task_from_template(template, next_run, *args, **kwargs):
         ret = copy.deepcopy(template)
         ret['next_run'] = next_run
         if args:
             ret['arguments']['args'] = list(args)
         if kwargs:
             ret['arguments']['kwargs'] = kwargs
         return ret
 
     def _tasks_from_template(self, template, max_timestamp, num):
         return [
             self._task_from_template(
                 template,
                 max_timestamp - datetime.timedelta(microseconds=i),
                 'argument-%03d' % i,
                 **{'kwarg%03d' % i: 'bogus-kwarg'}
             )
             for i in range(num)
         ]
 
     def _create_task_types(self):
         for tt in self.task_types.values():
             self.backend.create_task_type(tt)
 
     @istest
     def create_tasks(self):
         self._create_task_types()
         tasks = (
             self._tasks_from_template(self.task1_template, utcnow(), 100)
             + self._tasks_from_template(self.task2_template, utcnow(), 100)
         )
         ret = self.backend.create_tasks(tasks)
         ids = set()
         for task, orig_task in zip(ret, tasks):
             task = copy.deepcopy(task)
             task_type = self.task_types[orig_task['type']]
             self.assertNotIn(task['id'], ids)
             self.assertEqual(task['status'], 'next_run_not_scheduled')
             self.assertEqual(task['current_interval'],
                              task_type['default_interval'])
             self.assertEqual(task['policy'], orig_task.get('policy',
                                                            'recurring'))
             self.assertEqual(task['retries_left'],
                              task_type['num_retries'] or 0)
             ids.add(task['id'])
             del task['id']
             del task['status']
             del task['current_interval']
             del task['retries_left']
             if 'policy' not in orig_task:
                 del task['policy']
             self.assertEqual(task, orig_task)
 
     @istest
     def peek_ready_tasks(self):
         self._create_task_types()
         t = utcnow()
         task_type = self.task1_template['type']
         tasks = self._tasks_from_template(self.task1_template, t, 100)
         random.shuffle(tasks)
         self.backend.create_tasks(tasks)
 
         ready_tasks = self.backend.peek_ready_tasks(task_type)
         self.assertEqual(len(ready_tasks), len(tasks))
         for i in range(len(ready_tasks) - 1):
             self.assertLessEqual(ready_tasks[i]['next_run'],
                                  ready_tasks[i+1]['next_run'])
 
         # Only get the first few ready tasks
         limit = random.randrange(5, 5 + len(tasks)//2)
         ready_tasks_limited = self.backend.peek_ready_tasks(
             task_type, num_tasks=limit)
 
         self.assertEqual(len(ready_tasks_limited), limit)
         self.assertCountEqual(ready_tasks_limited, ready_tasks[:limit])
 
         # Limit by timestamp
         max_ts = tasks[limit-1]['next_run']
         ready_tasks_timestamped = self.backend.peek_ready_tasks(
             task_type, timestamp=max_ts)
 
         for ready_task in ready_tasks_timestamped:
             self.assertLessEqual(ready_task['next_run'], max_ts)
 
         # Make sure we get proper behavior for the first ready tasks
         self.assertCountEqual(
             ready_tasks[:len(ready_tasks_timestamped)],
             ready_tasks_timestamped,
         )
 
         # Limit by both
         ready_tasks_both = self.backend.peek_ready_tasks(
             task_type, timestamp=max_ts, num_tasks=limit//3)
         self.assertLessEqual(len(ready_tasks_both), limit//3)
         for ready_task in ready_tasks_both:
             self.assertLessEqual(ready_task['next_run'], max_ts)
             self.assertIn(ready_task, ready_tasks[:limit//3])
 
     @istest
     def grab_ready_tasks(self):
         self._create_task_types()
         t = utcnow()
         task_type = self.task1_template['type']
         tasks = self._tasks_from_template(self.task1_template, t, 100)
         random.shuffle(tasks)
         self.backend.create_tasks(tasks)
 
         first_ready_tasks = self.backend.peek_ready_tasks(
             task_type, num_tasks=10)
         grabbed_tasks = self.backend.grab_ready_tasks(task_type, num_tasks=10)
 
         for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks):
             self.assertEqual(peeked['status'], 'next_run_not_scheduled')
             del peeked['status']
             self.assertEqual(grabbed['status'], 'next_run_scheduled')
             del grabbed['status']
             self.assertEqual(peeked, grabbed)
 
     @istest
     def get_tasks(self):
         self._create_task_types()
         t = utcnow()
         tasks = self._tasks_from_template(self.task1_template, t, 100)
         tasks = self.backend.create_tasks(tasks)
         random.shuffle(tasks)
         while len(tasks) > 1:
             length = random.randrange(1, len(tasks))
             cur_tasks = tasks[:length]
             tasks[:length] = []
 
             ret = self.backend.get_tasks(task['id'] for task in cur_tasks)
             self.assertCountEqual(ret, cur_tasks)
 
     @istest
     def filter_task_to_archive(self):
         """Filtering only list disabled recurring or completed oneshot tasks
 
         """
         self._create_task_types()
         _time = utcnow()
-        recurring = self._tasks_from_template(self.task1_template, _time, 100)
-        oneshots = self._tasks_from_template(self.task2_template, _time, 100)
+        recurring = self._tasks_from_template(self.task1_template, _time, 12)
+        oneshots = self._tasks_from_template(self.task2_template, _time, 12)
         total_tasks = len(recurring) + len(oneshots)
 
         # simulate scheduling tasks
         pending_tasks = self.backend.create_tasks(recurring + oneshots)
         backend_tasks = [{
             'task': task['id'],
             'backend_id': str(uuid.uuid4()),
             'scheduled': utcnow(),
         } for task in pending_tasks]
         self.backend.mass_schedule_task_runs(backend_tasks)
 
         # we simulate the task are being done
         _tasks = []
         for task in backend_tasks:
             t = self.backend.end_task_run(
                 task['backend_id'], status='eventful')
             _tasks.append(t)
 
         # Randomly update task's status per policy
         status_per_policy = {'recurring': 0, 'oneshot': 0}
         status_choice = {
             # policy: [tuple (1-for-filtering, 'associated-status')]
             'recurring': [(1, 'disabled'),
                           (0, 'completed'),
                           (0, 'next_run_not_scheduled')],
             'oneshot': [(0, 'next_run_not_scheduled'),
                         (0, 'disabled'),
                         (1, 'completed')]
         }
 
         tasks_to_update = defaultdict(list)
         _task_ids = defaultdict(list)
         # randomize 'disabling' recurring task or 'complete' oneshot task
         for task in pending_tasks:
             policy = task['policy']
             _task_ids[policy].append(task['id'])
             status = random.choice(status_choice[policy])
             if status[0] != 1:
                 continue
             # elected for filtering
             status_per_policy[policy] += status[0]
             tasks_to_update[policy].append(task['id'])
 
         self.backend.disable_tasks(tasks_to_update['recurring'])
         # hack: change the status to something else than completed
         self.backend.set_status_tasks(
             _task_ids['oneshot'], status='disabled')
         self.backend.set_status_tasks(
             tasks_to_update['oneshot'], status='completed')
 
         total_tasks_filtered = (status_per_policy['recurring'] +
                                 status_per_policy['oneshot'])
 
         # retrieve tasks to archive
         after = _time.shift(days=-1).format('YYYY-MM-DD')
         before = utcnow().shift(days=1).format('YYYY-MM-DD')
         tasks_to_archive = list(self.backend.filter_task_to_archive(
             after_ts=after, before_ts=before, limit=total_tasks))
 
         self.assertEqual(len(tasks_to_archive), total_tasks_filtered)
 
         actual_filtered_per_status = {'recurring': 0, 'oneshot': 0}
         for task in tasks_to_archive:
             actual_filtered_per_status[task['task_policy']] += 1
 
         self.assertEqual(actual_filtered_per_status, status_per_policy)
 
     @istest
     def delete_archived_tasks(self):
         self._create_task_types()
         _time = utcnow()
         recurring = self._tasks_from_template(
-            self.task1_template, _time, 100)
+            self.task1_template, _time, 12)
         oneshots = self._tasks_from_template(
-            self.task2_template, _time, 100)
+            self.task2_template, _time, 12)
         total_tasks = len(recurring) + len(oneshots)
         pending_tasks = self.backend.create_tasks(recurring + oneshots)
         backend_tasks = [{
             'task': task['id'],
             'backend_id': str(uuid.uuid4()),
             'scheduled': utcnow(),
         } for task in pending_tasks]
         self.backend.mass_schedule_task_runs(backend_tasks)
 
         _tasks = []
         percent = random.randint(0, 100)  # random election removal boundary
         for task in backend_tasks:
             t = self.backend.end_task_run(
                 task['backend_id'], status='eventful')
             c = random.randint(0, 100)
             if c <= percent:
                 _tasks.append({'task_id': t['task'], 'task_run_id': t['id']})
 
         self.backend.delete_archived_tasks(_tasks)
 
         self.cursor.execute('select count(*) from task')
         tasks_count = self.cursor.fetchone()
 
         self.cursor.execute('select count(*) from task_run')
         tasks_run_count = self.cursor.fetchone()
 
         self.assertEqual(tasks_count[0], total_tasks - len(_tasks))
         self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks))
+
+
+class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase):
+    def setUp(self):
+        super().setUp()
+        self.config = {'scheduling_db': 'dbname=' + self.TEST_DB_NAME}
+        self.backend = get_scheduler('local', self.config)