diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/celery_backend/recurrent_visits.py
@@ -0,0 +1,301 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""This schedules the recurrent visits, for listed origins, in Celery.
+
+For "oneshot" (save code now, lister) tasks, check the
+:mod:`swh.scheduler.celery_backend.runner` and
+:mod:`swh.scheduler.celery_backend.pika_listener` modules.
+
+"""
+
+from __future__ import annotations
+
+from itertools import chain
+import logging
+from queue import Empty, Queue
+import random
+from threading import Thread
+import time
+from typing import TYPE_CHECKING, Any, Dict, List, Tuple
+
+from kombu.utils.uuid import uuid
+
+from swh.scheduler.celery_backend.config import get_available_slots
+
+if TYPE_CHECKING:
+    from ..interface import SchedulerInterface
+    from ..model import ListedOrigin
+
+logger = logging.getLogger(__name__)
+
+
+_VCS_POLICY_RATIOS = {
+    "already_visited_order_by_lag": 0.49,
+    "never_visited_oldest_update_first": 0.49,
+    "origins_without_last_update": 0.02,
+}
+
+# Default policy ratio, let's start that configuration in the module first
+POLICY_RATIO: Dict[str, Dict[str, float]] = {
+    "default": {
+        "already_visited_order_by_lag": 0.5,
+        "never_visited_oldest_update_first": 0.5,
+    },
+    "git": _VCS_POLICY_RATIOS,
+    "hg": _VCS_POLICY_RATIOS,
+    "svn": _VCS_POLICY_RATIOS,
+    "cvs": _VCS_POLICY_RATIOS,
+    "bzr": _VCS_POLICY_RATIOS,
+}
+
+
+MIN_SLOTS_RATIO = 0.05
+"""Quantity of slots that need to be available (with respect to max_queue_length) for
+`grab_next_visits` to trigger"""
+
+QUEUE_FULL_BACKOFF = 60
+"""Backoff time (in seconds) if there's fewer than `MIN_SLOTS_RATIO` slots available in
+the queue."""
+
+NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60
+"""Backoff time (in seconds) if no origins have been scheduled in the current
+iteration"""
+
+BACKOFF_SPLAY = 5.0
+"""Amplitude of the fuzziness between backoffs"""
+
+TERMINATE = object()
+"""Termination request received from command queue (singleton used for identity
+comparison)"""
+
+
+def grab_next_visits_policy_ratio(
+    scheduler: SchedulerInterface, visit_type: str, num_visits: int
+) -> List[ListedOrigin]:
+    """Get the next `num_visits` for the given `visit_type` using the corresponding
+    set of scheduling policies.
+
+    The `POLICY_RATIO` dict sets, for each visit type, the scheduling policies
+    used to pull the next tasks, and what proportion of the available num_visits
+    they take.
+
+    This function emits a warning if the ratio of retrieved origins is off of
+    the requested ratio by more than 5%.
+
+    Returns:
+       at most `num_visits` `ListedOrigin` objects
+    """
+    policy_ratio = POLICY_RATIO.get(visit_type, POLICY_RATIO["default"])
+
+    fetched_origins: Dict[str, List[ListedOrigin]] = {}
+
+    for policy, ratio in policy_ratio.items():
+        num_tasks_to_send = int(num_visits * ratio)
+        fetched_origins[policy] = scheduler.grab_next_visits(
+            visit_type, num_tasks_to_send, policy=policy
+        )
+
+    all_origins: List[ListedOrigin] = list(
+        chain.from_iterable(fetched_origins.values())
+    )
+    if not all_origins:
+        return []
+
+    # Check whether the ratios of origins fetched are skewed with respect to the
+    # ones we requested
+    fetched_origin_ratios = {
+        policy: len(origins) / len(all_origins)
+        for policy, origins in fetched_origins.items()
+    }
+
+    for policy, expected_ratio in policy_ratio.items():
+        # 5% of skew with respect to request
+        if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05:
+            logger.info(
+                "Skewed fetch for visit type %s with policy %s: fetched %s, "
+                "requested %s",
+                visit_type,
+                policy,
+                fetched_origin_ratios[policy],
+                expected_ratio,
+            )
+
+    return all_origins
+
+
+def splay():
+    """Return a random short interval by which to vary the backoffs for the visit
+    scheduling threads"""
+    return random.uniform(0, BACKOFF_SPLAY)
+
+
+def send_visits_for_visit_type(
+    scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict,
+) -> float:
+    """Schedule the next batch of visits for the given `visit_type`.
+
+    First, we determine the number of available slots by introspecting the
+    RabbitMQ queue.
+
+    If there's fewer than `MIN_SLOTS_RATIO` slots available in the queue, we wait
+    for `QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive
+    `grab_next_visits` queries when there's not many jobs to queue.
+
+    Once there's more than `MIN_SLOTS_RATIO` slots available, we run
+    `get_next_visits` to retrieve the next set of origin visits to schedule, and
+    we send them to celery.
+
+    If the last scheduling attempt didn't return any origins, we sleep for
+    `NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive
+    `grab_next_visits` queries too often if there's nothing left to schedule.
+
+    Returns:
+       the earliest `time.monotonic` value at which to run the next iteration of
+       the loop.
+
+    """
+    queue_name = task_type["backend_name"]
+    max_queue_length = task_type.get("max_queue_length") or 0
+    min_available_slots = max_queue_length * MIN_SLOTS_RATIO
+
+    current_iteration_start = time.monotonic()
+
+    # Check queue level
+    available_slots = get_available_slots(app, queue_name, max_queue_length)
+    logger.debug(
+        "%s available slots for visit type %s in queue %s",
+        available_slots,
+        visit_type,
+        queue_name,
+    )
+    if available_slots < min_available_slots:
+        return current_iteration_start + QUEUE_FULL_BACKOFF
+
+    origins = grab_next_visits_policy_ratio(scheduler, visit_type, available_slots)
+
+    if not origins:
+        logger.debug("No origins to visit for type %s", visit_type)
+        return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF
+
+    # Try to smooth the ingestion load, origins pulled by different
+    # scheduling policies have different resource usage patterns
+    random.shuffle(origins)
+
+    for origin in origins:
+        task_dict = origin.as_task_dict()
+        app.send_task(
+            queue_name,
+            task_id=uuid(),
+            args=task_dict["arguments"]["args"],
+            kwargs=task_dict["arguments"]["kwargs"],
+            queue=queue_name,
+        )
+
+    logger.info(
+        "%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name,
+    )
+
+    # When everything worked, we can try to schedule origins again ASAP.
+    return time.monotonic()
+
+
+def visit_scheduler_thread(
+    config: Dict,
+    visit_type: str,
+    command_queue: Queue[object],
+    exc_queue: Queue[Tuple[str, BaseException]],
+):
+    """Target function for the visit sending thread, which initializes local
+    connections and handles exceptions by sending them back to the main thread."""
+
+    from swh.scheduler import get_scheduler
+    from swh.scheduler.celery_backend.config import build_app
+
+    try:
+        # We need to reinitialize these connections because they're not generally
+        # thread-safe
+        app = build_app(config.get("celery"))
+        scheduler = get_scheduler(**config["scheduler"])
+        task_type = scheduler.get_task_type(f"load-{visit_type}")
+
+        if task_type is None:
+            raise ValueError(f"Unknown task type: load-{visit_type}")
+
+        next_iteration = time.monotonic()
+
+        while True:
+            # vary the next iteration time a little bit
+            next_iteration = next_iteration + splay()
+            while time.monotonic() < next_iteration:
+                # Wait for next iteration to start. Listen for termination message.
+                try:
+                    msg = command_queue.get(block=True, timeout=1)
+                except Empty:
+                    continue
+
+                if msg is TERMINATE:
+                    return
+                else:
+                    logger.warn("Received unexpected message %s in command queue", msg)
+
+            next_iteration = send_visits_for_visit_type(
+                scheduler, app, visit_type, task_type
+            )
+
+    except BaseException as e:
+        exc_queue.put((visit_type, e))
+
+
+VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]]
+"""Dict storing the visit scheduler threads and their command queues"""
+
+
+def spawn_visit_scheduler_thread(
+    threads: VisitSchedulerThreads,
+    exc_queue: Queue[Tuple[str, BaseException]],
+    config: Dict[str, Any],
+    visit_type: str,
+):
+    """Spawn a new thread to schedule the visits of type `visit_type`."""
+    command_queue: Queue[object] = Queue()
+    thread = Thread(
+        target=visit_scheduler_thread,
+        kwargs={
+            "config": config,
+            "visit_type": visit_type,
+            "command_queue": command_queue,
+            "exc_queue": exc_queue,
+        },
+    )
+    threads[visit_type] = (thread, command_queue)
+    thread.start()
+
+
+def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]:
+    """Terminate all visit scheduler threads"""
+    logger.info("Termination requested...")
+    for _, command_queue in threads.values():
+        command_queue.put(TERMINATE)
+
+    loops = 0
+    while threads and loops < 10:
+        logger.info(
+            "Terminating visit scheduling threads: %s", ", ".join(sorted(threads))
+        )
+        loops += 1
+        for visit_type, (thread, _) in list(threads.items()):
+            thread.join(timeout=1)
+            if not thread.is_alive():
+                logger.debug("Thread %s terminated", visit_type)
+                del threads[visit_type]
+
+    if threads:
+        logger.warn(
+            "Could not reap the following threads after 10 attempts: %s",
+            ", ".join(sorted(threads)),
+        )
+
+    return list(sorted(threads))
diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py
--- a/swh/scheduler/cli/admin.py
+++ b/swh/scheduler/cli/admin.py
@@ -3,10 +3,13 @@
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
+from __future__ import annotations
+
 # WARNING: do not import unnecessary things here to keep cli startup time under
 # control
 import logging
 import time
+from typing import List, Tuple
 
 import click
 
@@ -57,7 +60,7 @@
 
     logger = logging.getLogger(__name__ + ".runner")
     scheduler = ctx.obj["scheduler"]
-    logger.debug("Scheduler %s" % scheduler)
+    logger.debug("Scheduler %s", scheduler)
     task_types = []
     for task_type_name in task_type_names:
         task_type = scheduler.get_task_type(task_type_name)
@@ -107,6 +110,92 @@
         listener.stop_consuming()
 
 
+@cli.command("schedule-recurrent")
+@click.option(
+    "--visit-type",
+    "visit_types",
+    multiple=True,
+    default=[],
+    help=(
+        "Visit types to schedule. If not provided, this iterates over every "
+        "corresponding load task types referenced in the scheduler backend."
+    ),
+)
+@click.pass_context
+def schedule_recurrent(ctx, period: int, visit_types: List[str]):
+    """Starts the scheduler for recurrent visits.
+
+    This runs one thread for each visit type, which regularly sends new visits
+    to celery.
+
+    """
+    from queue import Queue
+
+    from swh.scheduler.celery_backend.recurrent_visits import (
+        VisitSchedulerThreads,
+        logger,
+        spawn_visit_scheduler_thread,
+        terminate_visit_scheduler_threads,
+    )
+
+    config = ctx.obj["config"]
+    scheduler = ctx.obj["scheduler"]
+
+    if not visit_types:
+        visit_types = []
+        # Figure out which visit types exist in the scheduler
+        all_task_types = scheduler.get_task_types()
+        for task_type in all_task_types:
+            if not task_type["type"].startswith("load-"):
+                # only consider loading tasks as recurring ones, the rest is dismissed
+                continue
+            # get visit type name from task type
+            visit_types.append(task_type["type"][5:])
+    else:
+        # Check that the passed visit types exist in the scheduler
+        for visit_type in visit_types:
+            task_type_name = f"load-{visit_type}"
+            task_type = scheduler.get_task_type(task_type_name)
+            if not task_type:
+                raise ValueError(f"Unknown task type: {task_type_name}")
+
+    exc_queue: Queue[Tuple[str, BaseException]] = Queue()
+    threads: VisitSchedulerThreads = {}
+
+    try:
+        # Spawn initial threads
+        for visit_type in visit_types:
+            spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type)
+
+        # Handle exceptions from child threads
+        while True:
+            visit_type, exc_info = exc_queue.get(block=True)
+
+            logger.exception(
+                "Thread %s died with exception; respawning",
+                visit_type,
+                exc_info=exc_info,
+            )
+
+            dead_thread = threads[visit_type][0]
+            dead_thread.join(timeout=1)
+
+            if dead_thread.is_alive():
+                logger.warn(
+                    "The thread for %s is still alive after sending an exception?! "
+                    "Respawning anyway.",
+                    visit_type,
+                )
+
+            spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type)
+
+    except SystemExit:
+        remaining_threads = terminate_visit_scheduler_threads(threads)
+        if remaining_threads:
+            ctx.exit(1)
+        ctx.exit(0)
+
+
 @cli.command("rpc-serve")
 @click.option("--host", default="0.0.0.0", help="Host to run the scheduler server api")
 @click.option("--port", default=5008, type=click.INT, help="Binding port of the server")
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2016-2020  The Software Heritage developers
+# Copyright (C) 2016-2021  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
@@ -6,6 +6,7 @@
 from datetime import datetime, timezone
 import os
 from typing import Dict, List
+from unittest.mock import patch
 
 import pytest
 
@@ -60,3 +61,12 @@
 def listed_origins(listed_origins_by_type) -> List[ListedOrigin]:
     """Return a (fixed) set of listed origins"""
     return sum(listed_origins_by_type.values(), [])
+
+
+@pytest.fixture
+def storage(swh_storage):
+    """An instance of in-memory storage that gets injected
+    into the CLI functions."""
+    with patch("swh.storage.get_storage") as get_storage_mock:
+        get_storage_mock.return_value = swh_storage
+        yield swh_storage
diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py
--- a/swh/scheduler/tests/test_cli.py
+++ b/swh/scheduler/tests/test_cli.py
@@ -595,15 +595,6 @@
     return origins
 
 
-@pytest.fixture
-def storage(swh_storage):
-    """An instance of in-memory storage that gets injected
-    into the CLI functions."""
-    with patch("swh.storage.get_storage") as get_storage_mock:
-        get_storage_mock.return_value = swh_storage
-        yield swh_storage
-
-
 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
 def test_task_schedule_origins_dry_run(swh_scheduler, storage):
     """Tests the scheduling when origin_batch_size*task_batch_size is a
diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_recurrent_visits.py
@@ -0,0 +1,180 @@
+# Copyright (C) 2021  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from datetime import timedelta
+import logging
+from queue import Queue
+from unittest.mock import MagicMock
+
+import pytest
+
+from swh.scheduler.celery_backend.recurrent_visits import (
+    VisitSchedulerThreads,
+    send_visits_for_visit_type,
+    spawn_visit_scheduler_thread,
+    terminate_visit_scheduler_threads,
+    visit_scheduler_thread,
+)
+
+from .test_cli import invoke
+
+TEST_MAX_QUEUE = 10000
+MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits"
+
+
+def _compute_backend_name(visit_type: str) -> str:
+    "Build a dummy reproducible backend name"
+    return f"swh.loader.{visit_type}.tasks"
+
+
+@pytest.fixture
+def swh_scheduler(swh_scheduler):
+    """Override default fixture of the scheduler to install some more task types."""
+    for visit_type in ["git", "hg", "svn"]:
+        task_type = f"load-{visit_type}"
+        swh_scheduler.create_task_type(
+            {
+                "type": task_type,
+                "max_queue_length": TEST_MAX_QUEUE,
+                "description": "The {} testing task".format(task_type),
+                "backend_name": _compute_backend_name(visit_type),
+                "default_interval": timedelta(days=1),
+                "min_interval": timedelta(hours=6),
+                "max_interval": timedelta(days=12),
+            }
+        )
+    return swh_scheduler
+
+
+def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler):
+    """When passed an unknown visit type, the recurrent visit scheduler should refuse
+    to start."""
+
+    with pytest.raises(ValueError, match="Unknown"):
+        invoke(
+            swh_scheduler,
+            False,
+            ["schedule-recurrent", "--visit-type", "unknown", "--visit-type", "git"],
+        )
+
+
+def test_cli_schedule_recurrent_noop(swh_scheduler, mocker):
+    """When passing no visit types, the recurrent visit scheduler should start."""
+
+    spawn_visit_scheduler_thread = mocker.patch(
+        f"{MODULE_NAME}.spawn_visit_scheduler_thread"
+    )
+    spawn_visit_scheduler_thread.side_effect = SystemExit
+    # The actual scheduling threads won't spawn, they'll immediately terminate. This
+    # only exercises the logic to pull task types out of the database
+
+    result = invoke(swh_scheduler, False, ["schedule-recurrent"])
+    assert result.exit_code == 0, result.output
+
+
+def test_recurrent_visit_scheduling(
+    swh_scheduler, caplog, listed_origins_by_type, mocker,
+):
+    """Scheduling known tasks is ok."""
+
+    caplog.set_level(logging.DEBUG, MODULE_NAME)
+    nb_origins = 1000
+
+    mock_celery_app = MagicMock()
+    mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots")
+    mock_available_slots.return_value = nb_origins  # Slots available in queue
+
+    # Make sure the scheduler is properly configured in terms of visit/task types
+    all_task_types = {
+        task_type_d["type"]: task_type_d
+        for task_type_d in swh_scheduler.get_task_types()
+    }
+
+    visit_types = list(listed_origins_by_type.keys())
+    assert len(visit_types) > 0
+
+    task_types = []
+    origins = []
+    for visit_type, _origins in listed_origins_by_type.items():
+        origins.extend(swh_scheduler.record_listed_origins(_origins))
+        task_type_name = f"load-{visit_type}"
+        assert task_type_name in all_task_types.keys()
+        task_type = all_task_types[task_type_name]
+        task_type["visit_type"] = visit_type
+        # we'll limit the orchestrator to the origins' type we know
+        task_types.append(task_type)
+
+    for visit_type in ["git", "svn"]:
+        task_type = f"load-{visit_type}"
+        send_visits_for_visit_type(
+            swh_scheduler, mock_celery_app, visit_type, all_task_types[task_type]
+        )
+
+    assert mock_available_slots.called, "The available slots functions should be called"
+
+    records = [record.message for record in caplog.records]
+
+    # Mapping over the dict ratio/policies entries can change overall order so let's
+    # check the set of records
+    expected_records = set()
+    for task_type in task_types:
+        visit_type = task_type["visit_type"]
+        queue_name = task_type["backend_name"]
+        msg = (
+            f"{nb_origins} available slots for visit type {visit_type} "
+            f"in queue {queue_name}"
+        )
+        expected_records.add(msg)
+
+    for expected_record in expected_records:
+        assert expected_record in set(records)
+
+
+@pytest.fixture
+def scheduler_config(swh_scheduler_config):
+    return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}}
+
+
+def test_visit_scheduler_thread_unknown_task(
+    swh_scheduler, scheduler_config,
+):
+    """Starting a thread with unknown task type reports the error"""
+
+    unknown_visit_type = "unknown"
+    command_queue = Queue()
+    exc_queue = Queue()
+
+    visit_scheduler_thread(
+        scheduler_config, unknown_visit_type, command_queue, exc_queue
+    )
+
+    assert command_queue.empty() is True
+    assert exc_queue.empty() is False
+    assert len(exc_queue.queue) == 1
+    result = exc_queue.queue.pop()
+    assert result[0] == unknown_visit_type
+    assert isinstance(result[1], ValueError)
+
+
+def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker):
+    """Spawning and terminating threads runs smoothly"""
+
+    threads: VisitSchedulerThreads = {}
+    exc_queue = Queue()
+    mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app")
+    mock_build_app.return_value = MagicMock()
+
+    assert len(threads) == 0
+    for visit_type in visit_types:
+        spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type)
+
+    # This actually only checks the spawning and terminating logic is sound
+
+    assert len(threads) == len(visit_types)
+
+    actual_threads = terminate_visit_scheduler_threads(threads)
+    assert not len(actual_threads)
+
+    assert mock_build_app.called