diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py
new file mode 100644
index 0000000..04d2045
--- /dev/null
+++ b/swh/scheduler/celery_backend/recurrent_visits.py
@@ -0,0 +1,301 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""This schedules the recurrent visits, for listed origins, in Celery.
+
+For "oneshot" (save code now, lister) tasks, check the
+:mod:`swh.scheduler.celery_backend.runner` and
+:mod:`swh.scheduler.celery_backend.pika_listener` modules.
+
+"""
+
+from __future__ import annotations
+
+from itertools import chain
+import logging
+from queue import Empty, Queue
+import random
+from threading import Thread
+import time
+from typing import TYPE_CHECKING, Any, Dict, List, Tuple
+
+from kombu.utils.uuid import uuid
+
+from swh.scheduler.celery_backend.config import get_available_slots
+
+if TYPE_CHECKING:
+    from ..interface import SchedulerInterface
+    from ..model import ListedOrigin
+
+logger = logging.getLogger(__name__)
+
+
+_VCS_POLICY_RATIOS = {
+    "already_visited_order_by_lag": 0.49,
+    "never_visited_oldest_update_first": 0.49,
+    "origins_without_last_update": 0.02,
+}
+
+# Default policy ratio, let's start that configuration in the module first
+POLICY_RATIO: Dict[str, Dict[str, float]] = {
+    "default": {
+        "already_visited_order_by_lag": 0.5,
+        "never_visited_oldest_update_first": 0.5,
+    },
+    "git": _VCS_POLICY_RATIOS,
+    "hg": _VCS_POLICY_RATIOS,
+    "svn": _VCS_POLICY_RATIOS,
+    "cvs": _VCS_POLICY_RATIOS,
+    "bzr": _VCS_POLICY_RATIOS,
+}
+
+
+MIN_SLOTS_RATIO = 0.05
+"""Quantity of slots that need to be available (with respect to max_queue_length) for
+`grab_next_visits` to trigger"""
+
+QUEUE_FULL_BACKOFF = 60
+"""Backoff time (in seconds) if there's fewer than `MIN_SLOTS_RATIO` slots available in
+the queue."""
+
+NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60
+"""Backoff time (in seconds) if no origins have been scheduled in the current
+iteration"""
+
+BACKOFF_SPLAY = 5.0
+"""Amplitude of the fuzziness between backoffs"""
+
+TERMINATE = object()
+"""Termination request received from command queue (singleton used for identity
+comparison)"""
+
+
+def grab_next_visits_policy_ratio(
+    scheduler: SchedulerInterface, visit_type: str, num_visits: int
+) -> List[ListedOrigin]:
+    """Get the next `num_visits` for the given `visit_type` using the corresponding
+    set of scheduling policies.
+
+    The `POLICY_RATIO` dict sets, for each visit type, the scheduling policies
+    used to pull the next tasks, and what proportion of the available num_visits
+    they take.
+
+    This function emits a warning if the ratio of retrieved origins is off of
+    the requested ratio by more than 5%.
+
+    Returns:
+       at most `num_visits` `ListedOrigin` objects
+    """
+    policy_ratio = POLICY_RATIO.get(visit_type, POLICY_RATIO["default"])
+
+    fetched_origins: Dict[str, List[ListedOrigin]] = {}
+
+    for policy, ratio in policy_ratio.items():
+        num_tasks_to_send = int(num_visits * ratio)
+        fetched_origins[policy] = scheduler.grab_next_visits(
+            visit_type, num_tasks_to_send, policy=policy
+        )
+
+    all_origins: List[ListedOrigin] = list(
+        chain.from_iterable(fetched_origins.values())
+    )
+    if not all_origins:
+        return []
+
+    # Check whether the ratios of origins fetched are skewed with respect to the
+    # ones we requested
+    fetched_origin_ratios = {
+        policy: len(origins) / len(all_origins)
+        for policy, origins in fetched_origins.items()
+    }
+
+    for policy, expected_ratio in policy_ratio.items():
+        # 5% of skew with respect to request
+        if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05:
+            logger.info(
+                "Skewed fetch for visit type %s with policy %s: fetched %s, "
+                "requested %s",
+                visit_type,
+                policy,
+                fetched_origin_ratios[policy],
+                expected_ratio,
+            )
+
+    return all_origins
+
+
+def splay():
+    """Return a random short interval by which to vary the backoffs for the visit
+    scheduling threads"""
+    return random.uniform(0, BACKOFF_SPLAY)
+
+
+def send_visits_for_visit_type(
+    scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict,
+) -> float:
+    """Schedule the next batch of visits for the given `visit_type`.
+
+    First, we determine the number of available slots by introspecting the
+    RabbitMQ queue.
+
+    If there's fewer than `MIN_SLOTS_RATIO` slots available in the queue, we wait
+    for `QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive
+    `grab_next_visits` queries when there's not many jobs to queue.
+
+    Once there's more than `MIN_SLOTS_RATIO` slots available, we run
+    `get_next_visits` to retrieve the next set of origin visits to schedule, and
+    we send them to celery.
+
+    If the last scheduling attempt didn't return any origins, we sleep for
+    `NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive
+    `grab_next_visits` queries too often if there's nothing left to schedule.
+
+    Returns:
+       the earliest `time.monotonic` value at which to run the next iteration of
+       the loop.
+
+    """
+    queue_name = task_type["backend_name"]
+    max_queue_length = task_type.get("max_queue_length") or 0
+    min_available_slots = max_queue_length * MIN_SLOTS_RATIO
+
+    current_iteration_start = time.monotonic()
+
+    # Check queue level
+    available_slots = get_available_slots(app, queue_name, max_queue_length)
+    logger.debug(
+        "%s available slots for visit type %s in queue %s",
+        available_slots,
+        visit_type,
+        queue_name,
+    )
+    if available_slots < min_available_slots:
+        return current_iteration_start + QUEUE_FULL_BACKOFF
+
+    origins = grab_next_visits_policy_ratio(scheduler, visit_type, available_slots)
+
+    if not origins:
+        logger.debug("No origins to visit for type %s", visit_type)
+        return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF
+
+    # Try to smooth the ingestion load, origins pulled by different
+    # scheduling policies have different resource usage patterns
+    random.shuffle(origins)
+
+    for origin in origins:
+        task_dict = origin.as_task_dict()
+        app.send_task(
+            queue_name,
+            task_id=uuid(),
+            args=task_dict["arguments"]["args"],
+            kwargs=task_dict["arguments"]["kwargs"],
+            queue=queue_name,
+        )
+
+    logger.info(
+        "%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name,
+    )
+
+    # When everything worked, we can try to schedule origins again ASAP.
+    return time.monotonic()
+
+
+def visit_scheduler_thread(
+    config: Dict,
+    visit_type: str,
+    command_queue: Queue[object],
+    exc_queue: Queue[Tuple[str, BaseException]],
+):
+    """Target function for the visit sending thread, which initializes local
+    connections and handles exceptions by sending them back to the main thread."""
+
+    from swh.scheduler import get_scheduler
+    from swh.scheduler.celery_backend.config import build_app
+
+    try:
+        # We need to reinitialize these connections because they're not generally
+        # thread-safe
+        app = build_app(config.get("celery"))
+        scheduler = get_scheduler(**config["scheduler"])
+        task_type = scheduler.get_task_type(f"load-{visit_type}")
+
+        if task_type is None:
+            raise ValueError(f"Unknown task type: load-{visit_type}")
+
+        next_iteration = time.monotonic()
+
+        while True:
+            # vary the next iteration time a little bit
+            next_iteration = next_iteration + splay()
+            while time.monotonic() < next_iteration:
+                # Wait for next iteration to start. Listen for termination message.
+                try:
+                    msg = command_queue.get(block=True, timeout=1)
+                except Empty:
+                    continue
+
+                if msg is TERMINATE:
+                    return
+                else:
+                    logger.warn("Received unexpected message %s in command queue", msg)
+
+            next_iteration = send_visits_for_visit_type(
+                scheduler, app, visit_type, task_type
+            )
+
+    except BaseException as e:
+        exc_queue.put((visit_type, e))
+
+
+VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]]
+"""Dict storing the visit scheduler threads and their command queues"""
+
+
+def spawn_visit_scheduler_thread(
+    threads: VisitSchedulerThreads,
+    exc_queue: Queue[Tuple[str, BaseException]],
+    config: Dict[str, Any],
+    visit_type: str,
+):
+    """Spawn a new thread to schedule the visits of type `visit_type`."""
+    command_queue: Queue[object] = Queue()
+    thread = Thread(
+        target=visit_scheduler_thread,
+        kwargs={
+            "config": config,
+            "visit_type": visit_type,
+            "command_queue": command_queue,
+            "exc_queue": exc_queue,
+        },
+    )
+    threads[visit_type] = (thread, command_queue)
+    thread.start()
+
+
+def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]:
+    """Terminate all visit scheduler threads"""
+    logger.info("Termination requested...")
+    for _, command_queue in threads.values():
+        command_queue.put(TERMINATE)
+
+    loops = 0
+    while threads and loops < 10:
+        logger.info(
+            "Terminating visit scheduling threads: %s", ", ".join(sorted(threads))
+        )
+        loops += 1
+        for visit_type, (thread, _) in list(threads.items()):
+            thread.join(timeout=1)
+            if not thread.is_alive():
+                logger.debug("Thread %s terminated", visit_type)
+                del threads[visit_type]
+
+    if threads:
+        logger.warn(
+            "Could not reap the following threads after 10 attempts: %s",
+            ", ".join(sorted(threads)),
+        )
+
+    return list(sorted(threads))
diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py
index 0becbe8..861651e 100644
--- a/swh/scheduler/cli/admin.py
+++ b/swh/scheduler/cli/admin.py
@@ -1,137 +1,226 @@
 # Copyright (C) 2016-2021  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
+from __future__ import annotations
+
 # WARNING: do not import unnecessary things here to keep cli startup time under
 # control
 import logging
 import time
+from typing import List, Tuple
 
 import click
 
 from . import cli
 
 
 @cli.command("start-runner")
 @click.option(
     "--period",
     "-p",
     default=0,
     help=(
         "Period (in s) at witch pending tasks are checked and "
         "executed. Set to 0 (default) for a one shot."
     ),
 )
 @click.option(
     "--task-type",
     "task_type_names",
     multiple=True,
     default=[],
     help=(
         "Task types to schedule. If not provided, this iterates over every "
         "task types referenced in the scheduler backend."
     ),
 )
 @click.option(
     "--with-priority/--without-priority",
     is_flag=True,
     default=False,
     help=(
         "Determine if those tasks should be the ones with priority or not."
         "By default, this deals with tasks without any priority."
     ),
 )
 @click.pass_context
 def runner(ctx, period, task_type_names, with_priority):
     """Starts a swh-scheduler runner service.
 
     This process is responsible for checking for ready-to-run tasks and
     schedule them."""
     from swh.scheduler.celery_backend.config import build_app
     from swh.scheduler.celery_backend.runner import run_ready_tasks
 
     config = ctx.obj["config"]
     app = build_app(config.get("celery"))
     app.set_current()
 
     logger = logging.getLogger(__name__ + ".runner")
     scheduler = ctx.obj["scheduler"]
-    logger.debug("Scheduler %s" % scheduler)
+    logger.debug("Scheduler %s", scheduler)
     task_types = []
     for task_type_name in task_type_names:
         task_type = scheduler.get_task_type(task_type_name)
         if not task_type:
             raise ValueError(f"Unknown {task_type_name}")
         task_types.append(task_type)
 
     try:
         while True:
             logger.debug("Run ready tasks")
             try:
                 ntasks = len(run_ready_tasks(scheduler, app, task_types, with_priority))
                 if ntasks:
                     logger.info("Scheduled %s tasks", ntasks)
             except Exception:
                 logger.exception("Unexpected error in run_ready_tasks()")
             if not period:
                 break
             time.sleep(period)
     except KeyboardInterrupt:
         ctx.exit(0)
 
 
 @cli.command("start-listener")
 @click.pass_context
 def listener(ctx):
     """Starts a swh-scheduler listener service.
 
     This service is responsible for listening at task lifecycle events and
     handle their workflow status in the database."""
     scheduler_backend = ctx.obj["scheduler"]
     if not scheduler_backend:
         raise ValueError("Scheduler class (local/remote) must be instantiated")
 
     broker = (
         ctx.obj["config"]
         .get("celery", {})
         .get("task_broker", "amqp://guest@localhost/%2f")
     )
 
     from swh.scheduler.celery_backend.pika_listener import get_listener
 
     listener = get_listener(broker, "celeryev.listener", scheduler_backend)
     try:
         listener.start_consuming()
     finally:
         listener.stop_consuming()
 
 
+@cli.command("schedule-recurrent")
+@click.option(
+    "--visit-type",
+    "visit_types",
+    multiple=True,
+    default=[],
+    help=(
+        "Visit types to schedule. If not provided, this iterates over every "
+        "corresponding load task types referenced in the scheduler backend."
+    ),
+)
+@click.pass_context
+def schedule_recurrent(ctx, visit_types: List[str]):
+    """Starts the scheduler for recurrent visits.
+
+    This runs one thread for each visit type, which regularly sends new visits
+    to celery.
+
+    """
+    from queue import Queue
+
+    from swh.scheduler.celery_backend.recurrent_visits import (
+        VisitSchedulerThreads,
+        logger,
+        spawn_visit_scheduler_thread,
+        terminate_visit_scheduler_threads,
+    )
+
+    config = ctx.obj["config"]
+    scheduler = ctx.obj["scheduler"]
+
+    if not visit_types:
+        visit_types = []
+        # Figure out which visit types exist in the scheduler
+        all_task_types = scheduler.get_task_types()
+        for task_type in all_task_types:
+            if not task_type["type"].startswith("load-"):
+                # only consider loading tasks as recurring ones, the rest is dismissed
+                continue
+            # get visit type name from task type
+            visit_types.append(task_type["type"][5:])
+    else:
+        # Check that the passed visit types exist in the scheduler
+        for visit_type in visit_types:
+            task_type_name = f"load-{visit_type}"
+            task_type = scheduler.get_task_type(task_type_name)
+            if not task_type:
+                raise ValueError(f"Unknown task type: {task_type_name}")
+
+    exc_queue: Queue[Tuple[str, BaseException]] = Queue()
+    threads: VisitSchedulerThreads = {}
+
+    try:
+        # Spawn initial threads
+        for visit_type in visit_types:
+            spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type)
+
+        # Handle exceptions from child threads
+        while True:
+            visit_type, exc_info = exc_queue.get(block=True)
+
+            logger.exception(
+                "Thread %s died with exception; respawning",
+                visit_type,
+                exc_info=exc_info,
+            )
+
+            dead_thread = threads[visit_type][0]
+            dead_thread.join(timeout=1)
+
+            if dead_thread.is_alive():
+                logger.warn(
+                    "The thread for %s is still alive after sending an exception?! "
+                    "Respawning anyway.",
+                    visit_type,
+                )
+
+            spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type)
+
+    except SystemExit:
+        remaining_threads = terminate_visit_scheduler_threads(threads)
+        if remaining_threads:
+            ctx.exit(1)
+        ctx.exit(0)
+
+
 @cli.command("rpc-serve")
 @click.option("--host", default="0.0.0.0", help="Host to run the scheduler server api")
 @click.option("--port", default=5008, type=click.INT, help="Binding port of the server")
 @click.option(
     "--debug/--nodebug",
     default=None,
     help=(
         "Indicates if the server should run in debug mode. "
         "Defaults to True if log-level is DEBUG, False otherwise."
     ),
 )
 @click.pass_context
 def rpc_server(ctx, host, port, debug):
     """Starts a swh-scheduler API HTTP server.
     """
     if ctx.obj["config"]["scheduler"]["cls"] == "remote":
         click.echo(
             "The API server can only be started with a 'local' " "configuration",
             err=True,
         )
         ctx.exit(1)
 
     from swh.scheduler.api import server
 
     server.app.config.update(ctx.obj["config"])
     if debug is None:
         debug = ctx.obj["log_level"] <= logging.DEBUG
     server.app.run(host, port=port, debug=bool(debug))
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
index 775b38a..4872516 100644
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -1,62 +1,72 @@
-# Copyright (C) 2016-2020  The Software Heritage developers
+# Copyright (C) 2016-2021  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from datetime import datetime, timezone
 import os
 from typing import Dict, List
+from unittest.mock import patch
 
 import pytest
 
 from swh.scheduler.model import ListedOrigin, Lister
 from swh.scheduler.tests.common import LISTERS
 
 # make sure we are not fooled by CELERY_ config environment vars
 for var in [x for x in os.environ.keys() if x.startswith("CELERY")]:
     os.environ.pop(var)
 
 
 # test_cli tests depends on a en/C locale, so ensure it
 os.environ["LC_ALL"] = "C.UTF-8"
 
 
 @pytest.fixture
 def stored_lister(swh_scheduler) -> Lister:
     """Store a lister in the scheduler and return its information"""
     return swh_scheduler.get_or_create_lister(**LISTERS[0])
 
 
 @pytest.fixture
 def visit_types() -> List[str]:
     """Possible visit types in `ListedOrigin`s"""
     return ["git", "svn"]
 
 
 @pytest.fixture
 def listed_origins_by_type(
     stored_lister: Lister, visit_types: List[str]
 ) -> Dict[str, List[ListedOrigin]]:
     """A fixed list of `ListedOrigin`s, for each `visit_type`."""
     count_per_type = 1000
     assert stored_lister.id
     return {
         visit_type: [
             ListedOrigin(
                 lister_id=stored_lister.id,
                 url=f"https://{visit_type}.example.com/{i:04d}",
                 visit_type=visit_type,
                 last_update=datetime(
                     2020, 6, 15, 16, 0, 0, j * count_per_type + i, tzinfo=timezone.utc
                 ),
             )
             for i in range(count_per_type)
         ]
         for j, visit_type in enumerate(visit_types)
     }
 
 
 @pytest.fixture
 def listed_origins(listed_origins_by_type) -> List[ListedOrigin]:
     """Return a (fixed) set of listed origins"""
     return sum(listed_origins_by_type.values(), [])
+
+
+@pytest.fixture
+def storage(swh_storage):
+    """An instance of in-memory storage that gets injected
+    into the CLI functions."""
+    with patch("swh.storage.get_storage") as get_storage_mock:
+        get_storage_mock.return_value = swh_storage
+        yield swh_storage
diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py
index 044a1d8..62b83fb 100644
--- a/swh/scheduler/tests/test_cli.py
+++ b/swh/scheduler/tests/test_cli.py
@@ -1,856 +1,847 @@
 # Copyright (C) 2019-2021  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import datetime
 from itertools import islice
 import logging
 import random
 import re
 import tempfile
 from unittest.mock import patch
 
 from click.testing import CliRunner
 import pytest
 
 from swh.core.api.classes import stream_results
 from swh.model.model import Origin
 from swh.scheduler.cli import cli
 from swh.scheduler.utils import create_task_dict, utcnow
 
 CLI_CONFIG = """
 scheduler:
     cls: foo
     args: {}
 """
 
 
 def invoke(scheduler, catch_exceptions, args):
     runner = CliRunner()
     with patch(
         "swh.scheduler.get_scheduler"
     ) as get_scheduler_mock, tempfile.NamedTemporaryFile(
         "a", suffix=".yml"
     ) as config_fd:
         config_fd.write(CLI_CONFIG)
         config_fd.seek(0)
         get_scheduler_mock.return_value = scheduler
         args = ["-C" + config_fd.name,] + args
         result = runner.invoke(cli, args, obj={"log_level": logging.WARNING})
     if not catch_exceptions and result.exception:
         print(result.output)
         raise result.exception
     return result
 
 
 def test_schedule_tasks(swh_scheduler):
     csv_data = (
         b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};'
         + utcnow().isoformat().encode()
         + b"\n"
         + b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};'
         + utcnow().isoformat().encode()
         + b"\n"
     )
     with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd:
         csv_fd.write(csv_data)
         csv_fd.seek(0)
         result = invoke(
             swh_scheduler, False, ["task", "schedule", "-d", ";", csv_fd.name]
         )
     expected = r"""
 Created 2 tasks
 
 Task 1
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: recurring
   Args:
     \['arg1', 'arg2'\]
   Keyword args:
     key: 'value'
 
 Task 2
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: recurring
   Args:
     \['arg3', 'arg4'\]
   Keyword args:
     key: 'value'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_schedule_tasks_columns(swh_scheduler):
     with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd:
         csv_fd.write(b'swh-test-ping;oneshot;["arg1", "arg2"];{"key": "value"}\n')
         csv_fd.seek(0)
         result = invoke(
             swh_scheduler,
             False,
             [
                 "task",
                 "schedule",
                 "-c",
                 "type",
                 "-c",
                 "policy",
                 "-c",
                 "args",
                 "-c",
                 "kwargs",
                 "-d",
                 ";",
                 csv_fd.name,
             ],
         )
     expected = r"""
 Created 1 tasks
 
 Task 1
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Args:
     'arg1'
     'arg2'
   Keyword args:
     key: 'value'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_schedule_task(swh_scheduler):
     result = invoke(
         swh_scheduler,
         False,
         ["task", "add", "swh-test-ping", "arg1", "arg2", "key=value",],
     )
     expected = r"""
 Created 1 tasks
 
 Task 1
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: recurring
   Args:
     'arg1'
     'arg2'
   Keyword args:
     key: 'value'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_list_pending_tasks_none(swh_scheduler):
     result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
 
     expected = r"""
 Found 0 swh-test-ping tasks
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_list_pending_tasks(swh_scheduler):
     task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
     task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
     task2["next_run"] += datetime.timedelta(days=1)
     swh_scheduler.create_tasks([task1, task2])
 
     result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
 
     expected = r"""
 Found 1 swh-test-ping tasks
 
 Task 1
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Args:
   Keyword args:
     key: 'value1'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
     swh_scheduler.grab_ready_tasks("swh-test-ping")
 
     result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
 
     expected = r"""
 Found 0 swh-test-ping tasks
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_list_pending_tasks_filter(swh_scheduler):
     task = create_task_dict("swh-test-multiping", "oneshot", key="value")
     swh_scheduler.create_tasks([task])
 
     result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
 
     expected = r"""
 Found 0 swh-test-ping tasks
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_list_pending_tasks_filter_2(swh_scheduler):
     swh_scheduler.create_tasks(
         [
             create_task_dict("swh-test-multiping", "oneshot", key="value"),
             create_task_dict("swh-test-ping", "oneshot", key="value2"),
         ]
     )
 
     result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
 
     expected = r"""
 Found 1 swh-test-ping tasks
 
 Task 2
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Args:
   Keyword args:
     key: 'value2'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 # Fails because "task list-pending --limit 3" only returns 2 tasks, because
 # of how compute_nb_tasks_from works.
 @pytest.mark.xfail
 def test_list_pending_tasks_limit(swh_scheduler):
     swh_scheduler.create_tasks(
         [
             create_task_dict("swh-test-ping", "oneshot", key="value%d" % i)
             for i in range(10)
         ]
     )
 
     result = invoke(
         swh_scheduler, False, ["task", "list-pending", "swh-test-ping", "--limit", "3",]
     )
 
     expected = r"""
 Found 2 swh-test-ping tasks
 
 Task 1
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Args:
   Keyword args:
     key: 'value0'
 
 Task 2
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Args:
   Keyword args:
     key: 'value1'
 
 Task 3
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Args:
   Keyword args:
     key: 'value2'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_list_pending_tasks_before(swh_scheduler):
     task1 = create_task_dict("swh-test-ping", "oneshot", key="value")
     task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
     task1["next_run"] += datetime.timedelta(days=3)
     task2["next_run"] += datetime.timedelta(days=1)
     swh_scheduler.create_tasks([task1, task2])
 
     result = invoke(
         swh_scheduler,
         False,
         [
             "task",
             "list-pending",
             "swh-test-ping",
             "--before",
             (datetime.date.today() + datetime.timedelta(days=2)).isoformat(),
         ],
     )
 
     expected = r"""
 Found 1 swh-test-ping tasks
 
 Task 2
   Next run: tomorrow \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Args:
   Keyword args:
     key: 'value2'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_list_tasks(swh_scheduler):
     task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
     task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
     task1["next_run"] += datetime.timedelta(days=3, hours=2)
     swh_scheduler.create_tasks([task1, task2])
 
     swh_scheduler.grab_ready_tasks("swh-test-ping")
 
     result = invoke(swh_scheduler, False, ["task", "list",])
 
     expected = r"""
 Found 2 tasks
 
 Task 1
   Next run: .+ \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Status: next_run_not_scheduled
   Priority:\x20
   Args:
   Keyword args:
     key: 'value1'
 
 Task 2
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Status: next_run_scheduled
   Priority:\x20
   Args:
   Keyword args:
     key: 'value2'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_list_tasks_id(swh_scheduler):
     task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
     task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
     task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
     swh_scheduler.create_tasks([task1, task2, task3])
 
     result = invoke(swh_scheduler, False, ["task", "list", "--task-id", "2",])
 
     expected = r"""
 Found 1 tasks
 
 Task 2
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Status: next_run_not_scheduled
   Priority:\x20
   Args:
   Keyword args:
     key: 'value2'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_list_tasks_id_2(swh_scheduler):
     task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
     task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
     task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
     swh_scheduler.create_tasks([task1, task2, task3])
 
     result = invoke(
         swh_scheduler, False, ["task", "list", "--task-id", "2", "--task-id", "3"]
     )
 
     expected = r"""
 Found 2 tasks
 
 Task 2
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Status: next_run_not_scheduled
   Priority:\x20
   Args:
   Keyword args:
     key: 'value2'
 
 Task 3
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Status: next_run_not_scheduled
   Priority:\x20
   Args:
   Keyword args:
     key: 'value3'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_list_tasks_type(swh_scheduler):
     task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
     task2 = create_task_dict("swh-test-multiping", "oneshot", key="value2")
     task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
     swh_scheduler.create_tasks([task1, task2, task3])
 
     result = invoke(
         swh_scheduler, False, ["task", "list", "--task-type", "swh-test-ping"]
     )
 
     expected = r"""
 Found 2 tasks
 
 Task 1
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Status: next_run_not_scheduled
   Priority:\x20
   Args:
   Keyword args:
     key: 'value1'
 
 Task 3
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Status: next_run_not_scheduled
   Priority:\x20
   Args:
   Keyword args:
     key: 'value3'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_list_tasks_limit(swh_scheduler):
     task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
     task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
     task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
     swh_scheduler.create_tasks([task1, task2, task3])
 
     result = invoke(swh_scheduler, False, ["task", "list", "--limit", "2",])
 
     expected = r"""
 Found 2 tasks
 
 Task 1
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Status: next_run_not_scheduled
   Priority:\x20
   Args:
   Keyword args:
     key: 'value1'
 
 Task 2
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Status: next_run_not_scheduled
   Priority:\x20
   Args:
   Keyword args:
     key: 'value2'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_list_tasks_before(swh_scheduler):
     task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
     task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
     task1["next_run"] += datetime.timedelta(days=3, hours=2)
     swh_scheduler.create_tasks([task1, task2])
 
     swh_scheduler.grab_ready_tasks("swh-test-ping")
 
     result = invoke(
         swh_scheduler,
         False,
         [
             "task",
             "list",
             "--before",
             (datetime.date.today() + datetime.timedelta(days=2)).isoformat(),
         ],
     )
 
     expected = r"""
 Found 1 tasks
 
 Task 2
   Next run: today \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Status: next_run_scheduled
   Priority:\x20
   Args:
   Keyword args:
     key: 'value2'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def test_list_tasks_after(swh_scheduler):
     task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
     task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
     task1["next_run"] += datetime.timedelta(days=3, hours=2)
     swh_scheduler.create_tasks([task1, task2])
 
     swh_scheduler.grab_ready_tasks("swh-test-ping")
 
     result = invoke(
         swh_scheduler,
         False,
         [
             "task",
             "list",
             "--after",
             (datetime.date.today() + datetime.timedelta(days=2)).isoformat(),
         ],
     )
 
     expected = r"""
 Found 1 tasks
 
 Task 1
   Next run: .+ \(.*\)
   Interval: 1 day, 0:00:00
   Type: swh-test-ping
   Policy: oneshot
   Status: next_run_not_scheduled
   Priority:\x20
   Args:
   Keyword args:
     key: 'value1'
 
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
 
 
 def _fill_storage_with_origins(storage, nb_origins):
     origins = [Origin(url=f"http://example.com/{i}") for i in range(nb_origins)]
     storage.origin_add(origins)
     return origins
 
 
-@pytest.fixture
-def storage(swh_storage):
-    """An instance of in-memory storage that gets injected
-    into the CLI functions."""
-    with patch("swh.storage.get_storage") as get_storage_mock:
-        get_storage_mock.return_value = swh_storage
-        yield swh_storage
-
-
 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
 def test_task_schedule_origins_dry_run(swh_scheduler, storage):
     """Tests the scheduling when origin_batch_size*task_batch_size is a
     divisor of nb_origins."""
     _fill_storage_with_origins(storage, 90)
 
     result = invoke(
         swh_scheduler,
         False,
         ["task", "schedule_origins", "--dry-run", "swh-test-ping",],
     )
 
     # Check the output
     expected = r"""
 Scheduled 3 tasks \(30 origins\).
 Scheduled 6 tasks \(60 origins\).
 Scheduled 9 tasks \(90 origins\).
 Done.
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
 
     # Check scheduled tasks
     tasks = swh_scheduler.search_tasks()
     assert len(tasks) == 0
 
 
 def _assert_origin_tasks_contraints(tasks, max_tasks, max_task_size, expected_origins):
     # check there are not too many tasks
     assert len(tasks) <= max_tasks
 
     # check tasks are not too large
     assert all(len(task["arguments"]["args"][0]) <= max_task_size for task in tasks)
 
     # check the tasks are exhaustive
     assert sum([len(task["arguments"]["args"][0]) for task in tasks]) == len(
         expected_origins
     )
     assert set.union(*(set(task["arguments"]["args"][0]) for task in tasks)) == {
         origin.url for origin in expected_origins
     }
 
 
 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
 def test_task_schedule_origins(swh_scheduler, storage):
     """Tests the scheduling when neither origin_batch_size or
     task_batch_size is a divisor of nb_origins."""
     origins = _fill_storage_with_origins(storage, 70)
 
     result = invoke(
         swh_scheduler,
         False,
         ["task", "schedule_origins", "swh-test-ping", "--batch-size", "20",],
     )
 
     # Check the output
     expected = r"""
 Scheduled 3 tasks \(60 origins\).
 Scheduled 4 tasks \(70 origins\).
 Done.
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
 
     # Check tasks
     tasks = swh_scheduler.search_tasks()
     _assert_origin_tasks_contraints(tasks, 4, 20, origins)
     assert all(task["arguments"]["kwargs"] == {} for task in tasks)
 
 
 def test_task_schedule_origins_kwargs(swh_scheduler, storage):
     """Tests support of extra keyword-arguments."""
     origins = _fill_storage_with_origins(storage, 30)
 
     result = invoke(
         swh_scheduler,
         False,
         [
             "task",
             "schedule_origins",
             "swh-test-ping",
             "--batch-size",
             "20",
             'key1="value1"',
             'key2="value2"',
         ],
     )
 
     # Check the output
     expected = r"""
 Scheduled 2 tasks \(30 origins\).
 Done.
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
 
     # Check tasks
     tasks = swh_scheduler.search_tasks()
     _assert_origin_tasks_contraints(tasks, 2, 20, origins)
     assert all(
         task["arguments"]["kwargs"] == {"key1": "value1", "key2": "value2"}
         for task in tasks
     )
 
 
 def test_task_schedule_origins_with_limit(swh_scheduler, storage):
     """Tests support of extra keyword-arguments."""
     _fill_storage_with_origins(storage, 50)
     limit = 20
     expected_origins = list(islice(stream_results(storage.origin_list), limit))
     nb_origins = len(expected_origins)
 
     assert nb_origins == limit
     max_task_size = 5
     nb_tasks, remainder = divmod(nb_origins, max_task_size)
     assert remainder == 0  # made the numbers go round
 
     result = invoke(
         swh_scheduler,
         False,
         [
             "task",
             "schedule_origins",
             "swh-test-ping",
             "--batch-size",
             max_task_size,
             "--limit",
             limit,
         ],
     )
 
     # Check the output
     expected = rf"""
 Scheduled {nb_tasks} tasks \({nb_origins} origins\).
 Done.
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
 
     tasks = swh_scheduler.search_tasks()
     _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins)
 
 
 def test_task_schedule_origins_with_page_token(swh_scheduler, storage):
     """Tests support of extra keyword-arguments."""
     nb_total_origins = 50
     origins = _fill_storage_with_origins(storage, nb_total_origins)
 
     # prepare page_token and origins result expectancy
     page_result = storage.origin_list(limit=10)
     assert len(page_result.results) == 10
     page_token = page_result.next_page_token
     assert page_token is not None
 
     # remove the first 10 origins listed as we won't see those in tasks
     expected_origins = [o for o in origins if o not in page_result.results]
     nb_origins = len(expected_origins)
     assert nb_origins == nb_total_origins - len(page_result.results)
 
     max_task_size = 10
     nb_tasks, remainder = divmod(nb_origins, max_task_size)
     assert remainder == 0
 
     result = invoke(
         swh_scheduler,
         False,
         [
             "task",
             "schedule_origins",
             "swh-test-ping",
             "--batch-size",
             max_task_size,
             "--page-token",
             page_token,
         ],
     )
 
     # Check the output
     expected = rf"""
 Scheduled {nb_tasks} tasks \({nb_origins} origins\).
 Done.
 """.lstrip()
     assert result.exit_code == 0, result.output
     assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
 
     # Check tasks
     tasks = swh_scheduler.search_tasks()
     _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins)
 
 
 def test_cli_task_runner_unknown_task_types(swh_scheduler, storage):
     """When passing at least one unknown task type, the runner should fail."""
 
     task_types = swh_scheduler.get_task_types()
     task_type_names = [t["type"] for t in task_types]
     known_task_type = random.choice(task_type_names)
     unknown_task_type = "unknown-task-type"
     assert unknown_task_type not in task_type_names
 
     with pytest.raises(ValueError, match="Unknown"):
         invoke(
             swh_scheduler,
             False,
             [
                 "start-runner",
                 "--task-type",
                 known_task_type,
                 "--task-type",
                 unknown_task_type,
             ],
         )
 
 
 @pytest.mark.parametrize("flag_priority", ["--with-priority", "--without-priority"])
 def test_cli_task_runner_with_known_tasks(
     swh_scheduler, storage, caplog, flag_priority
 ):
     """Trigger runner with known tasks runs smoothly."""
 
     task_types = swh_scheduler.get_task_types()
     task_type_names = [t["type"] for t in task_types]
     task_type_name = random.choice(task_type_names)
     task_type_name2 = random.choice(task_type_names)
 
     # The runner will just iterate over the following known tasks and do noop. We are
     # just checking the runner does not explode here.
     result = invoke(
         swh_scheduler,
         False,
         [
             "start-runner",
             flag_priority,
             "--task-type",
             task_type_name,
             "--task-type",
             task_type_name2,
         ],
     )
 
     assert result.exit_code == 0, result.output
 
 
 def test_cli_task_runner_no_task(swh_scheduler, storage):
     """Trigger runner with no parameter should run as before."""
 
     # The runner will just iterate over the existing tasks from the scheduler and do
     # noop. We are just checking the runner does not explode here.
     result = invoke(swh_scheduler, False, ["start-runner",],)
 
     assert result.exit_code == 0, result.output
diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py
new file mode 100644
index 0000000..d118fbf
--- /dev/null
+++ b/swh/scheduler/tests/test_recurrent_visits.py
@@ -0,0 +1,180 @@
+# Copyright (C) 2021  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from datetime import timedelta
+import logging
+from queue import Queue
+from unittest.mock import MagicMock
+
+import pytest
+
+from swh.scheduler.celery_backend.recurrent_visits import (
+    VisitSchedulerThreads,
+    send_visits_for_visit_type,
+    spawn_visit_scheduler_thread,
+    terminate_visit_scheduler_threads,
+    visit_scheduler_thread,
+)
+
+from .test_cli import invoke
+
+TEST_MAX_QUEUE = 10000
+MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits"
+
+
+def _compute_backend_name(visit_type: str) -> str:
+    "Build a dummy reproducible backend name"
+    return f"swh.loader.{visit_type}.tasks"
+
+
+@pytest.fixture
+def swh_scheduler(swh_scheduler):
+    """Override default fixture of the scheduler to install some more task types."""
+    for visit_type in ["git", "hg", "svn"]:
+        task_type = f"load-{visit_type}"
+        swh_scheduler.create_task_type(
+            {
+                "type": task_type,
+                "max_queue_length": TEST_MAX_QUEUE,
+                "description": "The {} testing task".format(task_type),
+                "backend_name": _compute_backend_name(visit_type),
+                "default_interval": timedelta(days=1),
+                "min_interval": timedelta(hours=6),
+                "max_interval": timedelta(days=12),
+            }
+        )
+    return swh_scheduler
+
+
+def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler):
+    """When passed an unknown visit type, the recurrent visit scheduler should refuse
+    to start."""
+
+    with pytest.raises(ValueError, match="Unknown"):
+        invoke(
+            swh_scheduler,
+            False,
+            ["schedule-recurrent", "--visit-type", "unknown", "--visit-type", "git"],
+        )
+
+
+def test_cli_schedule_recurrent_noop(swh_scheduler, mocker):
+    """When passing no visit types, the recurrent visit scheduler should start."""
+
+    spawn_visit_scheduler_thread = mocker.patch(
+        f"{MODULE_NAME}.spawn_visit_scheduler_thread"
+    )
+    spawn_visit_scheduler_thread.side_effect = SystemExit
+    # The actual scheduling threads won't spawn, they'll immediately terminate. This
+    # only exercises the logic to pull task types out of the database
+
+    result = invoke(swh_scheduler, False, ["schedule-recurrent"])
+    assert result.exit_code == 0, result.output
+
+
+def test_recurrent_visit_scheduling(
+    swh_scheduler, caplog, listed_origins_by_type, mocker,
+):
+    """Scheduling known tasks is ok."""
+
+    caplog.set_level(logging.DEBUG, MODULE_NAME)
+    nb_origins = 1000
+
+    mock_celery_app = MagicMock()
+    mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots")
+    mock_available_slots.return_value = nb_origins  # Slots available in queue
+
+    # Make sure the scheduler is properly configured in terms of visit/task types
+    all_task_types = {
+        task_type_d["type"]: task_type_d
+        for task_type_d in swh_scheduler.get_task_types()
+    }
+
+    visit_types = list(listed_origins_by_type.keys())
+    assert len(visit_types) > 0
+
+    task_types = []
+    origins = []
+    for visit_type, _origins in listed_origins_by_type.items():
+        origins.extend(swh_scheduler.record_listed_origins(_origins))
+        task_type_name = f"load-{visit_type}"
+        assert task_type_name in all_task_types.keys()
+        task_type = all_task_types[task_type_name]
+        task_type["visit_type"] = visit_type
+        # we'll limit the orchestrator to the origins' type we know
+        task_types.append(task_type)
+
+    for visit_type in ["git", "svn"]:
+        task_type = f"load-{visit_type}"
+        send_visits_for_visit_type(
+            swh_scheduler, mock_celery_app, visit_type, all_task_types[task_type]
+        )
+
+    assert mock_available_slots.called, "The available slots functions should be called"
+
+    records = [record.message for record in caplog.records]
+
+    # Mapping over the dict ratio/policies entries can change overall order so let's
+    # check the set of records
+    expected_records = set()
+    for task_type in task_types:
+        visit_type = task_type["visit_type"]
+        queue_name = task_type["backend_name"]
+        msg = (
+            f"{nb_origins} available slots for visit type {visit_type} "
+            f"in queue {queue_name}"
+        )
+        expected_records.add(msg)
+
+    for expected_record in expected_records:
+        assert expected_record in set(records)
+
+
+@pytest.fixture
+def scheduler_config(swh_scheduler_config):
+    return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}}
+
+
+def test_visit_scheduler_thread_unknown_task(
+    swh_scheduler, scheduler_config,
+):
+    """Starting a thread with unknown task type reports the error"""
+
+    unknown_visit_type = "unknown"
+    command_queue = Queue()
+    exc_queue = Queue()
+
+    visit_scheduler_thread(
+        scheduler_config, unknown_visit_type, command_queue, exc_queue
+    )
+
+    assert command_queue.empty() is True
+    assert exc_queue.empty() is False
+    assert len(exc_queue.queue) == 1
+    result = exc_queue.queue.pop()
+    assert result[0] == unknown_visit_type
+    assert isinstance(result[1], ValueError)
+
+
+def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker):
+    """Spawning and terminating threads runs smoothly"""
+
+    threads: VisitSchedulerThreads = {}
+    exc_queue = Queue()
+    mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app")
+    mock_build_app.return_value = MagicMock()
+
+    assert len(threads) == 0
+    for visit_type in visit_types:
+        spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type)
+
+    # This actually only checks the spawning and terminating logic is sound
+
+    assert len(threads) == len(visit_types)
+
+    actual_threads = terminate_visit_scheduler_threads(threads)
+    assert not len(actual_threads)
+
+    assert mock_build_app.called