diff --git a/swh/storage/replay.py b/swh/storage/replay.py
--- a/swh/storage/replay.py
+++ b/swh/storage/replay.py
@@ -49,6 +49,7 @@
 def process_replay_objects(all_objects, *, storage):
     for (object_type, objects) in all_objects.items():
         logger.debug("Inserting %s %s objects", len(objects), object_type)
+        object_type = object_type.split(":", 1)[0]
         with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}):
             _insert_objects(object_type, objects, storage)
         statsd.increment(
diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py
--- a/swh/storage/tests/test_kafka_writer.py
+++ b/swh/storage/tests/test_kafka_writer.py
@@ -6,10 +6,17 @@
 from confluent_kafka import Consumer
 
 from swh.storage import get_storage
+from swh.storage.writer import anonymize
 from swh.model.model import Origin, OriginVisit
+from swh.model.hypothesis_strategies import objects
 from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed
 from swh.journal.tests.journal_data import TEST_OBJECTS
 
+from swh.model.model import Person
+from attr import asdict, has
+from hypothesis import given
+from hypothesis.strategies import lists
+
 
 def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer):
 
@@ -18,6 +25,7 @@
         "brokers": [kafka_server],
         "client_id": "kafka_writer",
         "prefix": kafka_prefix,
+        "anonymize": False,
     }
     storage_config = {
         "cls": "pipeline",
@@ -28,9 +36,9 @@
 
     expected_messages = 0
 
-    for object_type, objects in TEST_OBJECTS.items():
-        method = getattr(storage, object_type + "_add")
-        if object_type in (
+    for obj_type, objs in TEST_OBJECTS.items():
+        method = getattr(storage, obj_type + "_add")
+        if obj_type in (
             "content",
             "skipped_content",
             "directory",
@@ -39,10 +47,10 @@
             "snapshot",
             "origin",
         ):
-            method(objects)
-            expected_messages += len(objects)
-        elif object_type in ("origin_visit",):
-            for obj in objects:
+            method(objs)
+            expected_messages += len(objs)
+        elif obj_type in ("origin_visit",):
+            for obj in objs:
                 assert isinstance(obj, OriginVisit)
                 storage.origin_add_one(Origin(url=obj.origin))
                 visit = method(obj.origin, date=obj.date, type=obj.type)
@@ -54,7 +62,87 @@
                 storage.origin_visit_update(obj.origin, visit.visit, **obj_d)
                 expected_messages += 1
         else:
-            assert False, object_type
+            assert False, obj_type
+
+    existing_topics = {
+        topic.split(".", 1)[1]
+        for topic in consumer.list_topics(timeout=10).topics.keys()
+    }
+    assert existing_topics == {
+        "content",
+        "directory",
+        "origin",
+        "origin_visit",
+        "release",
+        "revision",
+        "snapshot",
+        "skipped_content",
+    }
 
     consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
     assert_all_objects_consumed(consumed_messages)
+
+
+def test_storage_direct_writer_anonymized(
+    kafka_prefix: str, kafka_server, consumer: Consumer
+):
+
+    writer_config = {
+        "cls": "kafka",
+        "brokers": [kafka_server],
+        "client_id": "kafka_writer",
+        "prefix": kafka_prefix,
+        "anonymize": True,
+    }
+    storage_config = {
+        "cls": "pipeline",
+        "steps": [{"cls": "memory", "journal_writer": writer_config},],
+    }
+
+    storage = get_storage(**storage_config)
+
+    expected_messages = 0
+
+    for obj_type, objs in TEST_OBJECTS.items():
+        if obj_type == "origin_visit":
+            # these have non-consistent API and are unrelated with what we
+            # want to test here
+            continue
+        method = getattr(storage, obj_type + "_add")
+        method(objs)
+        expected_messages += len(objs)
+
+    existing_topics = {
+        topic.split(".", 1)[1]
+        for topic in consumer.list_topics(timeout=10).topics.keys()
+    }
+    assert existing_topics == {
+        "content",
+        "directory",
+        "origin",
+        "origin_visit",
+        "release",
+        "release:anonymized",
+        "revision",
+        "revision:anonymized",
+        "snapshot",
+        "skipped_content",
+    }
+
+
+def check_anonymized_obj(obj):
+    if has(obj):
+        if isinstance(obj, Person):
+            assert obj.name is None
+            assert obj.email is None
+            assert len(obj.fullname) == 32
+        else:
+            for key, value in asdict(obj, recurse=False).items():
+                check_anonymized_obj(value)
+
+
+@given(lists(objects(split_content=True)))
+def test_anonymizer(obj_type_and_objs):
+    for obj_type, obj in obj_type_and_objs:
+        anonymized_obj = anonymize(obj)
+        check_anonymized_obj(anonymized_obj)
diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py
--- a/swh/storage/tests/test_replay.py
+++ b/swh/storage/tests/test_replay.py
@@ -19,10 +19,12 @@
 from swh.model.model import Content
 
 from swh.storage import get_storage
+from swh.storage.writer import anonymize
 from swh.storage.replay import process_replay_objects
 
 from swh.journal.serializers import key_to_kafka, value_to_kafka
 from swh.journal.client import JournalClient
+from swh.journal.tests.journal_data import TEST_OBJECTS
 
 from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter
 from swh.journal.tests.conftest import (
@@ -463,3 +465,65 @@
         )
         for i in range(n)
     ]
+
+
+def test_storage_play_anonymized(
+    kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog,
+):
+    """Optimal replayer scenario.
+
+    This:
+    - writes objects to the topic
+    - replayer consumes objects from the topic and replay them
+
+    """
+    writer_config = {
+        "cls": "kafka",
+        "brokers": [kafka_server],
+        "client_id": "kafka_writer",
+        "prefix": kafka_prefix,
+        "anonymize": True,
+    }
+    src_config = {"cls": "memory", "journal_writer": writer_config}
+
+    storage = get_storage(**src_config)
+
+    # Fill the src storage
+    nb_sent = 0
+    for obj_type, objs in TEST_OBJECTS.items():
+        if obj_type == "origin_visit":
+            # these have non-consistent API and are unrelated with what we
+            # want to test here
+            continue
+        method = getattr(storage, obj_type + "_add")
+        method(objs)
+        nb_sent += len(objs)
+
+    dst_storage = get_storage(cls="memory")
+    caplog.set_level(logging.ERROR, "swh.journal.replay")
+    # Fill the storage from Kafka using anonymized topics
+    object_types = {
+        "content",
+        "directory",
+        "origin",
+        "origin_visit",
+        "release:anonymized",
+        "revision:anonymized",
+        "snapshot",
+        "skipped_content",
+    }
+
+    replayer = JournalClient(
+        brokers=kafka_server,
+        group_id=kafka_consumer_group,
+        prefix=kafka_prefix,
+        stop_after_objects=nb_sent,
+        object_types=list(object_types),
+    )
+    worker_fn = functools.partial(process_replay_objects, storage=dst_storage)
+
+    nb_inserted = replayer.process(worker_fn)
+    assert nb_sent == nb_inserted
+
+    expected_persons = {anonymize(person) for person in storage._persons}
+    assert set(dst_storage._persons) == expected_persons
diff --git a/swh/storage/writer.py b/swh/storage/writer.py
--- a/swh/storage/writer.py
+++ b/swh/storage/writer.py
@@ -3,20 +3,12 @@
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
-from typing import Iterable
+from typing import Any, Iterable
 
-from attr import evolve
+from attr import asdict, evolve, has
+from hashlib import sha256
 
-from swh.model.model import (
-    Origin,
-    OriginVisit,
-    Snapshot,
-    Directory,
-    Revision,
-    Release,
-    Content,
-    SkippedContent,
-)
+from swh.model import model
 
 try:
     from swh.journal.writer import get_journal_writer
@@ -25,14 +17,37 @@
     # mypy limitation, see https://github.com/python/mypy/issues/1153
 
 
+def anonymize(obj: Any) -> Any:
+    """Anonymize the model obj
+
+    rebuild an obj entity with all instances of Person (recursively) present
+    in ``obj`` replaced with an anonymized version of the Person object.
+
+    An anonymized ``Person`` is built by hashing the original Person's values (fullname
+    + name + email) and use this hash value as fullname.
+
+    """
+    if not has(obj):
+        return obj
+    if isinstance(obj, model.Person):
+        tohash = obj.fullname + (obj.name or b"") + (obj.email or b"")
+        return model.Person(fullname=sha256(tohash).digest(), name=None, email=None)
+    return evolve(
+        obj, **{k: anonymize(v) for (k, v) in asdict(obj, recurse=False).items()}
+    )
+
+
 class JournalWriter:
     """Journal writer storage collaborator. It's in charge of adding objects to
     the journal.
 
     """
 
+    anonymizable = ["release", "revision"]
+
     def __init__(self, journal_writer):
         if journal_writer:
+            self.anonymize = journal_writer.pop("anonymize", False)
             if get_journal_writer is None:
                 raise EnvironmentError(
                     "You need the swh.journal package to use the "
@@ -41,48 +56,53 @@
             self.journal = get_journal_writer(**journal_writer)
         else:
             self.journal = None
+            self.anonymize = False
 
     def write_additions(self, obj_type, values) -> None:
         if self.journal:
             self.journal.write_additions(obj_type, values)
+            if self.anonymize and obj_type in self.anonymizable:
+                self.journal.write_additions(
+                    f"{obj_type}:anonymized", [anonymize(value) for value in values]
+                )
 
-    def content_add(self, contents: Iterable[Content]) -> None:
+    def content_add(self, contents: Iterable[model.Content]) -> None:
         """Add contents to the journal. Drop the data field if provided.
 
         """
         contents = [evolve(item, data=None) for item in contents]
         self.write_additions("content", contents)
 
-    def content_update(self, contents: Iterable[Content]) -> None:
+    def content_update(self, contents: Iterable[model.Content]) -> None:
         if self.journal:
             raise NotImplementedError("content_update is not supported by the journal.")
 
-    def content_add_metadata(self, contents: Iterable[Content]) -> None:
+    def content_add_metadata(self, contents: Iterable[model.Content]) -> None:
         self.content_add(contents)
 
-    def skipped_content_add(self, contents: Iterable[SkippedContent]) -> None:
+    def skipped_content_add(self, contents: Iterable[model.SkippedContent]) -> None:
         self.write_additions("skipped_content", contents)
 
-    def directory_add(self, directories: Iterable[Directory]) -> None:
+    def directory_add(self, directories: Iterable[model.Directory]) -> None:
         self.write_additions("directory", directories)
 
-    def revision_add(self, revisions: Iterable[Revision]) -> None:
+    def revision_add(self, revisions: Iterable[model.Revision]) -> None:
         self.write_additions("revision", revisions)
 
-    def release_add(self, releases: Iterable[Release]) -> None:
+    def release_add(self, releases: Iterable[model.Release]) -> None:
         self.write_additions("release", releases)
 
-    def snapshot_add(self, snapshots: Iterable[Snapshot]) -> None:
+    def snapshot_add(self, snapshots: Iterable[model.Snapshot]) -> None:
         self.write_additions("snapshot", snapshots)
 
-    def origin_visit_add(self, visits: Iterable[OriginVisit]) -> None:
+    def origin_visit_add(self, visits: Iterable[model.OriginVisit]) -> None:
         self.write_additions("origin_visit", visits)
 
-    def origin_visit_update(self, visits: Iterable[OriginVisit]) -> None:
+    def origin_visit_update(self, visits: Iterable[model.OriginVisit]) -> None:
         self.write_additions("origin_visit", visits)
 
-    def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None:
+    def origin_visit_upsert(self, visits: Iterable[model.OriginVisit]) -> None:
         self.write_additions("origin_visit", visits)
 
-    def origin_add(self, origins: Iterable[Origin]) -> None:
+    def origin_add(self, origins: Iterable[model.Origin]) -> None:
         self.write_additions("origin", origins)