diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -3,12 +3,14 @@
 warn_unused_ignores = True
 exclude = swh/provenance/tools/
 
-
 # 3rd party libraries without stubs (yet)
 
 [mypy-bson.*]
 ignore_missing_imports = True
 
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
 [mypy-iso8601.*]
 ignore_missing_imports = True
 
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -3,3 +3,4 @@
 swh.model >= 2.6.1
 swh.storage
 swh.graph
+swh.journal
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -5,3 +5,4 @@
 swh.journal >= 0.8
 swh.storage >= 0.40
 swh.graph >= 0.3.2
+types-Deprecated
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,6 +2,7 @@
 # should match https://pypi.python.org/pypi names. For the full spec or
 # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
 click
+deprecated
 iso8601
 methodtools
 mongomock
diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py
--- a/swh/provenance/cli.py
+++ b/swh/provenance/cli.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2021  The Software Heritage developers
+# Copyright (C) 2021-2022  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
@@ -6,10 +6,12 @@
 # WARNING: do not import unnecessary things here to keep cli startup time under
 # control
 from datetime import datetime, timezone
+from functools import partial
 import os
 from typing import Any, Dict, Generator, Optional, Tuple
 
 import click
+from deprecated import deprecated
 import iso8601
 import yaml
 
@@ -139,6 +141,78 @@
         atexit.register(exit)
 
 
+@cli.group(name="origin")
+@click.pass_context
+def origin(ctx: click.core.Context):
+    from . import get_archive, get_provenance
+
+    archive = get_archive(**ctx.obj["config"]["provenance"]["archive"])
+    provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"])
+
+    ctx.obj["provenance"] = provenance
+    ctx.obj["archive"] = archive
+
+
+@origin.command(name="from-csv")
+@click.argument("filename", type=click.Path(exists=True))
+@click.option(
+    "-l",
+    "--limit",
+    type=int,
+    help="""Limit the amount of entries (origins) to read from the input file.""",
+)
+@click.pass_context
+def origin_from_csv(ctx: click.core.Context, filename: str, limit: Optional[int]):
+    from .origin import CSVOriginIterator, origin_add
+
+    provenance = ctx.obj["provenance"]
+    archive = ctx.obj["archive"]
+
+    origins_provider = generate_origin_tuples(filename)
+    origins = CSVOriginIterator(origins_provider, limit=limit)
+
+    with provenance:
+        for origin in origins:
+            origin_add(provenance, archive, [origin])
+
+
+@origin.command(name="from-journal")
+@click.pass_context
+def origin_from_journal(ctx: click.core.Context):
+    from swh.journal.client import get_journal_client
+
+    from .journal_client import process_journal_objects
+
+    provenance = ctx.obj["provenance"]
+    archive = ctx.obj["archive"]
+
+    journal_cfg = ctx.obj["config"].get("journal_client", {})
+
+    worker_fn = partial(
+        process_journal_objects,
+        archive=archive,
+        provenance=provenance,
+    )
+
+    cls = journal_cfg.pop("cls", None) or "kafka"
+    client = get_journal_client(
+        cls,
+        **{
+            **journal_cfg,
+            "object_types": ["origin_visit_status"],
+        },
+    )
+
+    try:
+        client.process(worker_fn)
+    except KeyboardInterrupt:
+        ctx.exit(0)
+    else:
+        print("Done.")
+    finally:
+        client.close()
+
+
 @cli.command(name="iter-frontiers")
 @click.argument("filename")
 @click.option(
@@ -289,6 +363,7 @@
     help="""Limit the amount of entries (origins) to read from the input file.""",
 )
 @click.pass_context
+@deprecated(version="0.0.1", reason="Use `swh provenance origin from-csv` instead")
 def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None:
     """Process a provided list of origins."""
     from . import get_archive, get_provenance
diff --git a/swh/provenance/journal_client.py b/swh/provenance/journal_client.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/journal_client.py
@@ -0,0 +1,22 @@
+# Copyright (C) 2022  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.provenance.interface import ProvenanceInterface
+from swh.provenance.model import OriginEntry
+from swh.provenance.origin import origin_add
+from swh.storage.interface import StorageInterface
+
+
+def process_journal_objects(
+    messages, *, provenance: ProvenanceInterface, archive: StorageInterface
+) -> None:
+    """Worker function for `JournalClient.process(worker_fn)`."""
+    assert set(messages) == {"origin_visit_status"}, set(messages)
+    origin_entries = [
+        OriginEntry(url=visit["origin"], snapshot=visit["snapshot"])
+        for visit in messages["origin_visit_status"]
+        if visit["snapshot"] is not None
+    ]
+    origin_add(provenance, archive, origin_entries)
diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py
--- a/swh/provenance/postgresql/provenance.py
+++ b/swh/provenance/postgresql/provenance.py
@@ -54,6 +54,7 @@
     def __init__(
         self, page_size: Optional[int] = None, raise_on_commit: bool = False, **kwargs
     ) -> None:
+        self.conn: Optional[psycopg2.extensions.connection] = None
         self.conn_args = kwargs
         self._flavor: Optional[str] = None
         self.page_size = page_size
@@ -75,6 +76,9 @@
     def transaction(
         self, readonly: bool = False
     ) -> Generator[psycopg2.extras.RealDictCursor, None, None]:
+        if self.conn is None:  # somehow, "implicit" __enter__ call did not happen
+            self.open()
+        assert self.conn is not None
         self.conn.set_session(readonly=readonly)
         with self.conn:
             with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
@@ -95,6 +99,7 @@
 
     @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"})
     def close(self) -> None:
+        assert self.conn is not None
         self.conn.close()
 
     @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"})
diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py
--- a/swh/provenance/tests/conftest.py
+++ b/swh/provenance/tests/conftest.py
@@ -49,7 +49,7 @@
     return postgresql.get_dsn_parameters()
 
 
-@pytest.fixture(params=["mongodb", "postgresql", "rabbitmq"])
+@pytest.fixture(params=["mongodb", "postgresql"])
 def provenance_storage(
     request: SubRequest,
     provenance_postgresqldb: Dict[str, str],
diff --git a/swh/provenance/tests/data/origins.csv b/swh/provenance/tests/data/origins.csv
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/data/origins.csv
@@ -0,0 +1 @@
+https://cmdbts2,5f577c4d4e5a1d0bca64f78facfb891933b17d94
diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py
--- a/swh/provenance/tests/test_cli.py
+++ b/swh/provenance/tests/test_cli.py
@@ -1,9 +1,9 @@
-# Copyright (C) 2021  The Software Heritage developers
+# Copyright (C) 2021-2022  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
-from typing import Set
+from typing import Dict, List, Set
 
 from _pytest.monkeypatch import MonkeyPatch
 from click.testing import CliRunner
@@ -14,7 +14,13 @@
 import swh.core.cli.db  # noqa ; ensure cli is loaded
 from swh.core.db import BaseDb
 from swh.core.db.db_utils import init_admin_extensions
+from swh.model.hashutil import MultiHash
 import swh.provenance.cli  # noqa ; ensure cli is loaded
+from swh.provenance.tests.conftest import fill_storage, load_repo_data
+from swh.storage.interface import StorageInterface
+
+from .conftest import get_datafile
+from .test_utils import invoke, write_configuration_path
 
 
 def test_cli_swh_db_help() -> None:
@@ -107,3 +113,52 @@
     with postgresql.cursor() as cur:
         cur.execute("select swh_get_dbflavor()")
         assert cur.fetchone() == ("with-path",)
+
+
+@pytest.mark.parametrize(
+    "subcommand",
+    (["origin", "from-csv"], ["iter-origins"]),
+)
+def test_cli_origin_from_csv(
+    swh_storage: StorageInterface,
+    subcommand: List[str],
+    swh_storage_backend_config: Dict,
+    provenance,
+    tmp_path,
+):
+    repo = "cmdbts2"
+    origin_url = f"https://{repo}"
+    data = load_repo_data(repo)
+    fill_storage(swh_storage, data)
+
+    assert len(data["origin"]) == 1
+    assert {"url": origin_url} in data["origin"]
+
+    cfg = {
+        "provenance": {
+            "archive": {
+                "cls": "api",
+                "storage": swh_storage_backend_config,
+            },
+            "storage": {
+                "cls": "postgresql",
+                # "db": provenance.storage.conn.dsn,
+                "db": provenance.storage.conn.get_dsn_parameters(),
+            },
+        },
+    }
+
+    config_path = write_configuration_path(cfg, tmp_path)
+
+    csv_filepath = get_datafile("origins.csv")
+    subcommand = subcommand + [csv_filepath]
+
+    result = invoke(subcommand, config_path)
+    assert result.exit_code == 0, f"Unexpected result: {result.output}"
+
+    origin_sha1 = MultiHash.from_data(
+        origin_url.encode(), hash_names=["sha1"]
+    ).digest()["sha1"]
+    actual_result = provenance.storage.origin_get([origin_sha1])
+
+    assert actual_result == {origin_sha1: origin_url}
diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/test_journal_client.py
@@ -0,0 +1,81 @@
+# Copyright (C) 2022  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Dict
+
+from confluent_kafka import Consumer
+import pytest
+
+from swh.model.hashutil import MultiHash
+from swh.provenance.tests.conftest import fill_storage, load_repo_data
+from swh.storage.interface import StorageInterface
+
+from .test_utils import invoke, write_configuration_path
+
+
+@pytest.fixture
+def swh_storage_backend_config(swh_storage_backend_config, kafka_server, kafka_prefix):
+    writer_config = {
+        "cls": "kafka",
+        "brokers": [kafka_server],
+        "client_id": "kafka_writer",
+        "prefix": kafka_prefix,
+        "anonymize": False,
+    }
+    yield {**swh_storage_backend_config, "journal_writer": writer_config}
+
+
+def test_cli_origin_from_journal_client(
+    swh_storage: StorageInterface,
+    swh_storage_backend_config: Dict,
+    kafka_prefix: str,
+    kafka_server: str,
+    consumer: Consumer,
+    tmp_path: str,
+    provenance,
+) -> None:
+    """Test origin journal client cli"""
+
+    # Prepare storage data
+    data = load_repo_data("cmdbts2")
+    assert len(data["origin"]) == 1
+    origin_url = data["origin"][0]["url"]
+    fill_storage(swh_storage, data)
+
+    # Prepare configuration for cli call
+    swh_storage_backend_config.pop("journal_writer", None)  # no need for that config
+    storage_config_dict = swh_storage_backend_config
+    cfg = {
+        "journal_client": {
+            "cls": "kafka",
+            "brokers": [kafka_server],
+            "group_id": "toto",
+            "prefix": kafka_prefix,
+            "object_types": ["origin_visit_status"],
+            "stop_on_eof": True,
+        },
+        "provenance": {
+            "archive": {
+                "cls": "api",
+                "storage": storage_config_dict,
+            },
+            "storage": {
+                "cls": "postgresql",
+                "db": provenance.storage.conn.get_dsn_parameters(),
+            },
+        },
+    }
+    config_path = write_configuration_path(cfg, tmp_path)
+
+    # call the cli 'swh provenance origin from-journal'
+    result = invoke(["origin", "from-journal"], config_path)
+    assert result.exit_code == 0, f"Unexpected result: {result.output}"
+
+    origin_sha1 = MultiHash.from_data(
+        origin_url.encode(), hash_names=["sha1"]
+    ).digest()["sha1"]
+    actual_result = provenance.storage.origin_get([origin_sha1])
+
+    assert actual_result == {origin_sha1: origin_url}
diff --git a/swh/provenance/tests/test_utils.py b/swh/provenance/tests/test_utils.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/test_utils.py
@@ -0,0 +1,31 @@
+# Copyright (C) 2022  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from os.path import join
+from typing import Dict, List
+
+from click.testing import CliRunner, Result
+from yaml import safe_dump
+
+from swh.provenance.cli import cli
+
+
+def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result:
+    """Invoke swh journal subcommands"""
+    runner = CliRunner()
+    result = runner.invoke(cli, ["-C" + config_path] + args)
+    if not catch_exceptions and result.exception:
+        print(result.output)
+        raise result.exception
+    return result
+
+
+def write_configuration_path(config: Dict, tmp_path: str) -> str:
+    """Serialize yaml dict on disk given a configuration dict and and a temporary path."""
+    config_path = join(str(tmp_path), "config.yml")
+    with open(config_path, "w") as f:
+        f.write(safe_dump(config))
+    return config_path