diff --git a/swh/scheduler/celery_backend/orchestrator.py b/swh/scheduler/celery_backend/orchestrator.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/celery_backend/orchestrator.py
@@ -0,0 +1,105 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+"""This is the scheduler orchestrator. It's in charge of scheduling recurring origins.
+For "oneshot" tasks, check the :mod:`swh.scheduler.celery_backend.runner` and
+:mod:`swh.scheduler.celery_backend.pika_listener` modules.
+from __future__ import annotations
+import logging
+from typing import TYPE_CHECKING, Dict, List
+from kombu.utils.uuid import uuid
+from swh.scheduler.celery_backend.config import get_available_slots
+    from ..interface import SchedulerInterface
+    from ..model import ListedOrigin
+logger = logging.getLogger(__name__)
+# either a dvcs or a package
+MAPPING_TYPE_TO_RATIO_KEY: Dict[str, str] = {
+    "git": "dvcs",
+    "hg": "dvcs",
+    "svn": "dvcs",
+    "cvs": "dvcs",
+    "bzr": "dvcs",
+# Default policy ratio, let's start that configuration in the module first
+POLICY_RATIO: Dict[str, Dict[str, float]] = {
+    "package": {
+        "already_visited_order_by_lag": 0.5,
+        "never_visited_oldest_update_first": 0.5,
+    },
+    "dvcs": {
+        "already_visited_order_by_lag": 0.49,
+        "never_visited_oldest_update_first": 0.49,
+        "origins_without_last_update": 0.02,
+    },
+def orchestrate(scheduler: SchedulerInterface, app, task_types: List[Dict]) -> None:
+    """Orchestrate recurring origins scheduling for given visit types.
+    """
+    logger.debug(
+        "Orchestrate recurring origin visits for %s task types",
+        ", ".join(t["type"] for t in task_types),
+    )
+    for task_type in task_types:
+        queue_name = task_type["backend_name"]
+        # FIXME: compute the visit type out of the task's type...
+        visit_type = task_type["type"][5:]  # crop "load-"
+        # Determine the available slots to fill in the queue
+        num_tasks = get_available_slots(
+            app, queue_name, task_type.get("max_queue_length")
+        )
+        if not num_tasks:
+            # queue filled in, next visit type
+            continue
+        logging.info(
+            "%s: %s slots available in queue %s", visit_type, num_tasks, queue_name,
+        )
+        policy_ratio_key = MAPPING_TYPE_TO_RATIO_KEY.get(visit_type, "package")
+        policy_ratio = POLICY_RATIO[policy_ratio_key]
+        all_origins: List[ListedOrigin] = []
+        for policy, ratio in policy_ratio.items():
+            num_tasks_ratio: int = num_tasks * ratio
+            logger.info(
+                "%s: %s tasks to send with policy %s",
+                visit_type,
+                num_tasks_ratio,
+                policy,
+            )
+            origins = scheduler.grab_next_visits(
+                visit_type, num_tasks_ratio, policy=policy
+            )
+            all_origins.extend(origins)
+        # Actually send messages to queue
+        for origin in all_origins:
+            task_dict = origin.as_task_dict()
+            app.send_task(
+                queue_name,
+                task_id=uuid(),
+                args=task_dict["arguments"]["args"],
+                kwargs=task_dict["arguments"]["kwargs"],
+                queue=queue_name,
+            )
+        logger.info(
+            "%s: %s sent visits to queue %s", visit_type, len(all_origins), queue_name,
+        )
diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py
--- a/swh/scheduler/cli/admin.py
+++ b/swh/scheduler/cli/admin.py
@@ -3,10 +3,13 @@
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
+from __future__ import annotations
 # WARNING: do not import unnecessary things here to keep cli startup time under
 # control
 import logging
 import time
+from typing import List
 import click
@@ -57,7 +60,7 @@
     logger = logging.getLogger(__name__ + ".runner")
     scheduler = ctx.obj["scheduler"]
-    logger.debug("Scheduler %s" % scheduler)
+    logger.debug("Scheduler %s", scheduler)
     task_types = []
     for task_type_name in task_type_names:
         task_type = scheduler.get_task_type(task_type_name)
@@ -107,6 +110,76 @@
+    "--period",
+    "-p",
+    default=0,
+    help=(
+        "Period (in s) at witch pending tasks are checked and "
+        "executed. Set to 0 (default) for a one shot."
+    ),
+    "--visit-type",
+    "visit_types",
+    multiple=True,
+    default=[],
+    help=(
+        "Visit types to schedule. If not provided, this iterates over every "
+        "corresponding load task types referenced in the scheduler backend."
+    ),
+def orchestrator(ctx, period: int, visit_types: List[str]):
+    """Starts an orchestrator service.
+    This process is responsible for scheduling recurring loading tasks of visit types.
+    """
+    from swh.scheduler.celery_backend.config import build_app
+    from swh.scheduler.celery_backend.orchestrator import orchestrate
+    config = ctx.obj["config"]
+    app = build_app(config.get("celery"))
+    app.set_current()
+    logger = logging.getLogger(f"{__name__}.orchestrator")
+    scheduler = ctx.obj["scheduler"]
+    logger.debug("Scheduler %s", scheduler)
+    logger.debug("visit_types: %s", visit_types)
+    task_types = []
+    # Bootstrap the runner tasks we want to deal with
+    if not visit_types:
+        all_task_types = scheduler.get_task_types()
+        for task_type in all_task_types:
+            if not task_type["type"].startswith("load-"):
+                # only load tasks are recurring, the rest we dismiss
+                continue
+            task_types.append(task_type)
+    else:
+        for visit_type in visit_types:
+            task_type_name = f"load-{visit_type}"
+            task_type = scheduler.get_task_type(task_type_name)
+            if not task_type:
+                raise ValueError(f"Unknown {task_type_name}")
+            task_types.append(task_type)
+    try:
+        while True:
+            try:
+                orchestrate(scheduler, app, task_types)
+            except Exception:
+                logger.exception("Unexpected error in orchestrate()")
+            if not period:
+                break
+            time.sleep(period)
+    except KeyboardInterrupt:
+        ctx.exit(0)
 @click.option("--host", default="", help="Host to run the scheduler server api")
 @click.option("--port", default=5008, type=click.INT, help="Binding port of the server")
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -6,6 +6,7 @@
 from datetime import datetime, timezone
 import os
 from typing import Dict, List
+from unittest.mock import patch
 import pytest
@@ -60,3 +61,12 @@
 def listed_origins(listed_origins_by_type) -> List[ListedOrigin]:
     """Return a (fixed) set of listed origins"""
     return sum(listed_origins_by_type.values(), [])
+def storage(swh_storage):
+    """An instance of in-memory storage that gets injected
+    into the CLI functions."""
+    with patch("swh.storage.get_storage") as get_storage_mock:
+        get_storage_mock.return_value = swh_storage
+        yield swh_storage
diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py
--- a/swh/scheduler/tests/test_cli.py
+++ b/swh/scheduler/tests/test_cli.py
@@ -595,15 +595,6 @@
     return origins
-def storage(swh_storage):
-    """An instance of in-memory storage that gets injected
-    into the CLI functions."""
-    with patch("swh.storage.get_storage") as get_storage_mock:
-        get_storage_mock.return_value = swh_storage
-        yield swh_storage
 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
 def test_task_schedule_origins_dry_run(swh_scheduler, storage):
     """Tests the scheduling when origin_batch_size*task_batch_size is a
diff --git a/swh/scheduler/tests/test_orchestrator.py b/swh/scheduler/tests/test_orchestrator.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_orchestrator.py
@@ -0,0 +1,119 @@
+# Copyright (C) 2021  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+from datetime import timedelta
+import pytest
+from swh.scheduler.celery_backend.orchestrator import orchestrate
+from .test_cli import _fill_storage_with_origins, invoke
+def _compute_backend_name(visit_type: str) -> str:
+    "Build a dummy reproducible backend name"
+    return f"swh.loader.{visit_type}.tasks"
+def swh_scheduler(swh_scheduler):
+    """Override default fixture of the scheduler to install some more task types.
+    """
+    for visit_type in ["git", "hg", "svn"]:
+        task_type = f"load-{visit_type}"
+        swh_scheduler.create_task_type(
+            {
+                "type": task_type,
+                "max_queue_length": TEST_MAX_QUEUE,
+                "description": "The {} testing task".format(task_type),
+                "backend_name": _compute_backend_name(visit_type),
+                "default_interval": timedelta(days=1),
+                "min_interval": timedelta(hours=6),
+                "max_interval": timedelta(days=12),
+            }
+        )
+    return swh_scheduler
+def test_cli_orchestrator_unknown_visit_type(swh_scheduler):
+    """When passing unknown visit type, orchestrator should refuse to start."""
+    with pytest.raises(ValueError, match="Unknown"):
+        invoke(
+            swh_scheduler,
+            False,
+            ["start-orchestrator", "--visit-type", "unknown", "--visit-type", "git",],
+        )
+def test_cli_orchestrator_noop(swh_scheduler):
+    """Trigger orchestrator without any parameter nor anything to do should noop."""
+    # The orchestrator will just iterate over existing tasks from the scheduler and do
+    # noop. We are just checking it does not explode here.
+    result = invoke(swh_scheduler, False, ["start-orchestrator",],)
+    assert result.exit_code == 0, result.output
+def test_orchestrator_scheduling(
+    swh_scheduler,
+    swh_scheduler_celery_app,
+    storage,
+    caplog,
+    listed_origins_by_type,
+    mocker,
+    """Orchestrator schedules known tasks."""
+    nb_total_origins = 1000
+    mock = mocker.patch("swh.scheduler.celery_backend.orchestrator.get_available_slots")
+    mock.return_value = nb_total_origins  # Slots available in queue
+    nb_total_origins = 50
+    origins = _fill_storage_with_origins(storage, nb_total_origins)
+    # Make sure the schedule is properly configured in terms of visit/task types
+    all_task_types = {
+        task_type_d["type"]: task_type_d
+        for task_type_d in swh_scheduler.get_task_types()
+    }
+    visit_types = list(listed_origins_by_type.keys())
+    assert len(visit_types) > 0
+    task_types = []
+    origins = []
+    for visit_type, _origins in listed_origins_by_type.items():
+        origins.extend(swh_scheduler.record_listed_origins(_origins))
+        task_type_name = f"load-{visit_type}"
+        assert task_type_name in all_task_types.keys()
+        task_types.append(all_task_types[task_type_name])
+    orchestrate(swh_scheduler, swh_scheduler_celery_app, task_types)
+    records = [
+        record.message for record in caplog.records if record.levelname == "INFO"
+    ]
+    # Mapping over the dict ratio/policies entries can change overall order so let's
+    # check the set of records
+    assert set(records) == set(
+        [
+            "git: 1000 slots available in queue swh.loader.git.tasks",
+            "git: 490.0 tasks to send with policy already_visited_order_by_lag",
+            "git: 490.0 tasks to send with policy never_visited_oldest_update_first",
+            "git: 20.0 tasks to send with policy origins_without_last_update",
+            "git: 490 sent visits to queue swh.loader.git.tasks",
+            "svn: 1000 slots available in queue swh.loader.svn.tasks",
+            "svn: 490.0 tasks to send with policy already_visited_order_by_lag",
+            "svn: 490.0 tasks to send with policy never_visited_oldest_update_first",
+            "svn: 20.0 tasks to send with policy origins_without_last_update",
+            "svn: 490 sent visits to queue swh.loader.svn.tasks",
+        ]
+    )