diff --git a/swh/scheduler/pytest_plugin.py b/swh/scheduler/pytest_plugin.py
--- a/swh/scheduler/pytest_plugin.py
+++ b/swh/scheduler/pytest_plugin.py
@@ -7,6 +7,10 @@
 import glob
 import os
 
+from celery.contrib.testing import worker
+from celery.contrib.testing.app import TestApp, setup_default_app
+
+import pkg_resources
 import pytest
 
 from swh.core.utils import numfile_sortkey as sortkey
@@ -61,3 +65,47 @@
 # this alias is used to be able to easily instantiate a db-backed Scheduler
 # eg. for the RPC client/server test suite.
 swh_db_scheduler = swh_scheduler
+
+
+@pytest.fixture(scope="session")
+def swh_scheduler_celery_app():
+    """Set up a Celery app as swh.scheduler and swh worker tests would expect it"""
+    test_app = TestApp(
+        set_as_current=True,
+        enable_logging=True,
+        task_cls="swh.scheduler.task:SWHTask",
+        config={
+            "accept_content": ["application/x-msgpack", "application/json"],
+            "task_serializer": "msgpack",
+            "result_serializer": "json",
+        },
+    )
+    with setup_default_app(test_app, use_trap=False):
+        from swh.scheduler.celery_backend import config
+
+        config.app = test_app
+        test_app.set_default()
+        test_app.set_current()
+        yield test_app
+
+
+@pytest.fixture(scope="session")
+def swh_scheduler_celery_includes():
+    """List of task modules that should be loaded by the swh_scheduler_celery_worker on
+startup."""
+    task_modules = ["swh.scheduler.tests.tasks"]
+
+    for entrypoint in pkg_resources.iter_entry_points("swh.workers"):
+        task_modules.extend(entrypoint.load()().get("task_modules", []))
+    return task_modules
+
+
+@pytest.fixture(scope="session")
+def swh_scheduler_celery_worker(
+    swh_scheduler_celery_app, swh_scheduler_celery_includes,
+):
+    """Spawn a worker"""
+    for module in swh_scheduler_celery_includes:
+        swh_scheduler_celery_app.loader.import_task_module(module)
+    with worker.start_worker(swh_scheduler_celery_app, pool="solo") as w:
+        yield w
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -4,14 +4,18 @@
 # See top-level LICENSE file for more information
 
 import os
-import pytest
 from datetime import datetime, timezone
-import pkg_resources
 from typing import List
 
+import pytest
+
 from swh.scheduler.model import ListedOrigin, Lister
 from swh.scheduler.tests.common import LISTERS
 
+from swh.scheduler.pytest_plugin import (  # noqa: F401 (backwards-compat imports)
+    swh_scheduler_celery_app as swh_app,
+    swh_scheduler_celery_worker as celery_session_worker,
+)
 
 # make sure we are not fooled by CELERY_ config environment vars
 for var in [x for x in os.environ.keys() if x.startswith("CELERY")]:
@@ -22,48 +26,6 @@
 os.environ["LC_ALL"] = "C.UTF-8"
 
 
-@pytest.fixture(scope="session")
-def celery_enable_logging():
-    return True
-
-
-@pytest.fixture(scope="session")
-def celery_includes():
-    task_modules = [
-        "swh.scheduler.tests.tasks",
-    ]
-    for entrypoint in pkg_resources.iter_entry_points("swh.workers"):
-        task_modules.extend(entrypoint.load()().get("task_modules", []))
-    return task_modules
-
-
-@pytest.fixture(scope="session")
-def celery_parameters():
-    return {
-        "task_cls": "swh.scheduler.task:SWHTask",
-    }
-
-
-@pytest.fixture(scope="session")
-def celery_config():
-    return {
-        "accept_content": ["application/x-msgpack", "application/json"],
-        "task_serializer": "msgpack",
-        "result_serializer": "json",
-    }
-
-
-# use the celery_session_app fixture to monkeypatch the 'main'
-# swh.scheduler.celery_backend.config.app Celery application
-# with the test application
-@pytest.fixture(scope="session")
-def swh_app(celery_session_app):
-    from swh.scheduler.celery_backend import config
-
-    config.app = celery_session_app
-    yield celery_session_app
-
-
 @pytest.fixture
 def stored_lister(swh_scheduler) -> Lister:
     """Store a lister in the scheduler and return its information"""
diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py
--- a/swh/scheduler/tests/test_celery_tasks.py
+++ b/swh/scheduler/tests/test_celery_tasks.py
@@ -10,25 +10,29 @@
 from swh.scheduler.celery_backend.runner import run_ready_tasks
 
 
-def test_ping(swh_app, celery_session_worker):
-    res = swh_app.send_task("swh.scheduler.tests.tasks.ping")
+def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
+    res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.ping")
     assert res
     res.wait()
     assert res.successful()
     assert res.result == "OK"
 
 
-def test_ping_with_kw(swh_app, celery_session_worker):
-    res = swh_app.send_task("swh.scheduler.tests.tasks.ping", kwargs={"a": 1})
+def test_ping_with_kw(swh_scheduler_celery_app, swh_scheduler_celery_worker):
+    res = swh_scheduler_celery_app.send_task(
+        "swh.scheduler.tests.tasks.ping", kwargs={"a": 1}
+    )
     assert res
     res.wait()
     assert res.successful()
     assert res.result == "OK (kw={'a': 1})"
 
 
-def test_multiping(swh_app, celery_session_worker):
+def test_multiping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
     "Test that a task that spawns subtasks (group) works"
-    res = swh_app.send_task("swh.scheduler.tests.tasks.multiping", kwargs={"n": 5})
+    res = swh_scheduler_celery_app.send_task(
+        "swh.scheduler.tests.tasks.multiping", kwargs={"n": 5}
+    )
     assert res
 
     res.wait()
@@ -38,7 +42,7 @@
     # to complete
     promise_id = res.result
     assert promise_id
-    promise = GroupResult.restore(promise_id, app=swh_app)
+    promise = GroupResult.restore(promise_id, app=swh_scheduler_celery_app)
     for i in range(5):
         if promise.ready():
             break
@@ -50,7 +54,9 @@
         assert ("OK (kw={'i': %s})" % i) in results
 
 
-def test_scheduler_fixture(swh_app, celery_session_worker, swh_scheduler):
+def test_scheduler_fixture(
+    swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler
+):
     "Test that the scheduler fixture works properly"
     task_type = swh_scheduler.get_task_type("swh-test-ping")
 
@@ -59,35 +65,39 @@
 
     swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")])
 
-    backend_tasks = run_ready_tasks(swh_scheduler, swh_app)
+    backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app)
     assert backend_tasks
     for task in backend_tasks:
         # Make sure the task completed
         AsyncResult(id=task["backend_id"]).get()
 
 
-def test_task_return_value(swh_app, celery_session_worker, swh_scheduler):
+def test_task_return_value(
+    swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler
+):
     task_type = swh_scheduler.get_task_type("swh-test-add")
     assert task_type
     assert task_type["backend_name"] == "swh.scheduler.tests.tasks.add"
 
     swh_scheduler.create_tasks([create_task_dict("swh-test-add", "oneshot", 12, 30)])
 
-    backend_tasks = run_ready_tasks(swh_scheduler, swh_app)
+    backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app)
     assert len(backend_tasks) == 1
     task = backend_tasks[0]
     value = AsyncResult(id=task["backend_id"]).get()
     assert value == 42
 
 
-def test_task_exception(swh_app, celery_session_worker, swh_scheduler):
+def test_task_exception(
+    swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler
+):
     task_type = swh_scheduler.get_task_type("swh-test-error")
     assert task_type
     assert task_type["backend_name"] == "swh.scheduler.tests.tasks.error"
 
     swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")])
 
-    backend_tasks = run_ready_tasks(swh_scheduler, swh_app)
+    backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app)
     assert len(backend_tasks) == 1
 
     task = backend_tasks[0]
@@ -96,11 +106,11 @@
         result.get()
 
 
-def test_statsd(swh_app, celery_session_worker, mocker):
+def test_statsd(swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker):
     m = mocker.patch("swh.scheduler.task.Statsd._send_to_server")
     mocker.patch("swh.scheduler.task.ts", side_effect=count())
     mocker.patch("swh.core.statsd.monotonic", side_effect=count())
-    res = swh_app.send_task("swh.scheduler.tests.tasks.echo")
+    res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.echo")
     assert res
     res.wait()
     assert res.successful()
@@ -129,11 +139,13 @@
     )
 
 
-def test_statsd_with_status(swh_app, celery_session_worker, mocker):
+def test_statsd_with_status(
+    swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker
+):
     m = mocker.patch("swh.scheduler.task.Statsd._send_to_server")
     mocker.patch("swh.scheduler.task.ts", side_effect=count())
     mocker.patch("swh.core.statsd.monotonic", side_effect=count())
-    res = swh_app.send_task(
+    res = swh_scheduler_celery_app.send_task(
         "swh.scheduler.tests.tasks.echo", kwargs={"status": "eventful"}
     )
     assert res
diff --git a/swh/scheduler/tests/test_cli_celery_monitor.py b/swh/scheduler/tests/test_cli_celery_monitor.py
--- a/swh/scheduler/tests/test_cli_celery_monitor.py
+++ b/swh/scheduler/tests/test_cli_celery_monitor.py
@@ -28,10 +28,12 @@
     assert "Options:" in result.stdout
 
 
-def test_celery_monitor_ping(caplog, swh_app, celery_session_worker):
+def test_celery_monitor_ping(
+    caplog, swh_scheduler_celery_app, swh_scheduler_celery_worker
+):
     caplog.set_level(logging.INFO, "swh.scheduler.cli.celery_monitor")
 
-    result = invoke("--pattern", celery_session_worker.hostname, "ping-workers")
+    result = invoke("--pattern", swh_scheduler_celery_worker.hostname, "ping-workers")
 
     assert result.exit_code == 0
 
@@ -40,7 +42,7 @@
     (record,) = caplog.records
 
     assert record.levelname == "INFO"
-    assert f"response from {celery_session_worker.hostname}" in record.message
+    assert f"response from {swh_scheduler_celery_worker.hostname}" in record.message
 
 
 @pytest.mark.parametrize(
@@ -68,7 +70,12 @@
     ],
 )
 def test_celery_monitor_ping_filter(
-    caplog, swh_app, celery_session_worker, filter_args, filter_message, exit_code
+    caplog,
+    swh_scheduler_celery_app,
+    swh_scheduler_celery_worker,
+    filter_args,
+    filter_message,
+    exit_code,
 ):
     caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
 
@@ -95,27 +102,35 @@
         assert got_no_response_message
 
 
-def test_celery_monitor_list_running(caplog, swh_app, celery_session_worker):
+def test_celery_monitor_list_running(
+    caplog, swh_scheduler_celery_app, swh_scheduler_celery_worker
+):
     caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
 
-    result = invoke("--pattern", celery_session_worker.hostname, "list-running")
+    result = invoke("--pattern", swh_scheduler_celery_worker.hostname, "list-running")
 
     assert result.exit_code == 0, result.stdout
 
     for record in caplog.records:
         if record.levelname != "INFO":
             continue
-        assert f"{celery_session_worker.hostname}: no active tasks" in record.message
+        assert (
+            f"{swh_scheduler_celery_worker.hostname}: no active tasks" in record.message
+        )
 
 
 @pytest.mark.parametrize("format", ["csv", "pretty"])
 def test_celery_monitor_list_running_format(
-    caplog, swh_app, celery_session_worker, format
+    caplog, swh_scheduler_celery_app, swh_scheduler_celery_worker, format
 ):
     caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
 
     result = invoke(
-        "--pattern", celery_session_worker.hostname, "list-running", "--format", format
+        "--pattern",
+        swh_scheduler_celery_worker.hostname,
+        "list-running",
+        "--format",
+        format,
     )
 
     assert result.exit_code == 0, result.stdout
@@ -123,7 +138,9 @@
     for record in caplog.records:
         if record.levelname != "INFO":
             continue
-        assert f"{celery_session_worker.hostname}: no active tasks" in record.message
+        assert (
+            f"{swh_scheduler_celery_worker.hostname}: no active tasks" in record.message
+        )
 
     if format == "csv":
         lines = result.stdout.splitlines()