diff --git a/swh/storage/replay.py b/swh/storage/replay.py
index 0a15d08d..bbe30720 100644
--- a/swh/storage/replay.py
+++ b/swh/storage/replay.py
@@ -1,128 +1,132 @@
 # Copyright (C) 2019-2020 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 import logging
 from typing import Any, Callable, Dict, Iterable, List
     from systemd.daemon import notify
 except ImportError:
     notify = None
 from swh.core.statsd import statsd
 from swh.storage.fixer import fix_objects
 from swh.model.model import (
 from swh.storage.exc import HashCollision
 logger = logging.getLogger(__name__)
 GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total"
 GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds"
 object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = {
     "origin": Origin.from_dict,
     "origin_visit": OriginVisit.from_dict,
     "snapshot": Snapshot.from_dict,
     "revision": Revision.from_dict,
     "release": Release.from_dict,
     "directory": Directory.from_dict,
     "content": Content.from_dict,
     "skipped_content": SkippedContent.from_dict,
 def process_replay_objects(all_objects, *, storage):
     for (object_type, objects) in all_objects.items():
         logger.debug("Inserting %s %s objects", len(objects), object_type)
         with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}):
             _insert_objects(object_type, objects, storage)
             GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type}
     if notify:
 def collision_aware_content_add(
     content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent]
 ) -> None:
     """Add contents to storage. If a hash collision is detected, an error is
        logged. Then this adds the other non colliding contents to the storage.
         content_add_fn: Storage content callable
         contents: List of contents or skipped contents to add to storage
     if not contents:
     colliding_content_hashes: List[Dict[str, Any]] = []
     while True:
         except HashCollision as e:
                     "algo": e.algo,
                     "hash": e.hash_id,  # hex hash id
                     "objects": e.colliding_contents,  # hex hashes
             colliding_hashes = e.colliding_content_hashes()
             # Drop the colliding contents from the transaction
             contents = [c for c in contents if c.hashes() not in colliding_hashes]
             # Successfully added contents, we are done
     if colliding_content_hashes:
         for collision in colliding_content_hashes:
             logger.error("Collision detected: %(collision)s", {"collision": collision})
 def _insert_objects(object_type: str, objects: List[Dict], storage) -> None:
     """Insert objects of type object_type in the storage.
     objects = fix_objects(object_type, objects)
     if object_type == "content":
+        # for bw compat, skipped content should now be delivered in the skipped_content
+        # topic
         contents: List[BaseContent] = []
         skipped_contents: List[BaseContent] = []
         for content in objects:
             c = BaseContent.from_dict(content)
             if isinstance(c, SkippedContent):
         collision_aware_content_add(storage.skipped_content_add, skipped_contents)
         collision_aware_content_add(storage.content_add_metadata, contents)
+    if object_type == "skipped_content":
+        skipped_contents = [SkippedContent.from_dict(obj) for obj in objects]
+        collision_aware_content_add(storage.skipped_content_add, skipped_contents)
     elif object_type == "origin_visit":
         visits: List[OriginVisit] = []
         origins: List[Origin] = []
         for obj in objects:
             visit = OriginVisit.from_dict(obj)
-    elif object_type in ("directory", "revision", "release", "snapshot", "origin"):
+    elif object_type in ("directory", "revision", "release", "snapshot", "origin",):
         method = getattr(storage, object_type + "_add")
         method(object_converter_fn[object_type](o) for o in objects)
         logger.warning("Received a series of %s, this should not happen", object_type)
diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py
index 055b6b01..d31c57a7 100644
--- a/swh/storage/tests/test_replay.py
+++ b/swh/storage/tests/test_replay.py
@@ -1,390 +1,465 @@
 # Copyright (C) 2019-2020 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 import datetime
 import functools
 import random
 import logging
 import dateutil
 from typing import Dict, List
 from confluent_kafka import Producer
 import pytest
-from swh.model.hashutil import hash_to_hex
+from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS
 from swh.model.model import Content
 from swh.storage import get_storage
 from swh.storage.replay import process_replay_objects
 from swh.journal.serializers import key_to_kafka, value_to_kafka
 from swh.journal.client import JournalClient
 from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter
 from swh.journal.tests.conftest import (
 UTC = datetime.timezone.utc
 storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]}
 def test_storage_play(
     kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog,
     """Optimal replayer scenario.
     - writes objects to the topic
     - replayer consumes objects from the topic and replay them
     kafka_prefix += ".swh.journal.objects"
     storage = get_storage(**storage_config)
     producer = Producer(
             "bootstrap.servers": kafka_server,
             "client.id": "test producer",
             "acks": "all",
     now = datetime.datetime.now(tz=UTC)
     # Fill Kafka
     nb_sent = 0
     nb_visits = 0
     for object_type, objects in TEST_OBJECT_DICTS.items():
         topic = f"{kafka_prefix}.{object_type}"
         for object_ in objects:
             key = bytes(random.randint(0, 255) for _ in range(40))
             object_ = object_.copy()
             if object_type == "content":
                 object_["ctime"] = now
             elif object_type == "origin_visit":
                 nb_visits += 1
                 object_["visit"] = nb_visits
                 topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_),
             nb_sent += 1
     caplog.set_level(logging.ERROR, "swh.journal.replay")
     # Fill the storage from Kafka
     replayer = JournalClient(
     worker_fn = functools.partial(process_replay_objects, storage=storage)
     nb_inserted = 0
     while nb_inserted < nb_sent:
         nb_inserted += replayer.process(worker_fn)
     assert nb_sent == nb_inserted
     # Check the objects were actually inserted in the storage
     assert TEST_OBJECT_DICTS["revision"] == list(
         storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]])
     assert TEST_OBJECT_DICTS["release"] == list(
         storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]])
     origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]]))
     assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins]
     for origin in origins:
         origin_url = origin["url"]
         expected_visits = [
                 "origin": origin_url,
                 "date": dateutil.parser.parse(visit["date"]),
             for visit in TEST_OBJECT_DICTS["origin_visit"]
             if visit["origin"] == origin["url"]
         actual_visits = list(storage.origin_visit_get(origin_url))
         for visit in actual_visits:
             del visit["visit"]  # opaque identifier
         assert expected_visits == actual_visits
     input_contents = TEST_OBJECT_DICTS["content"]
     contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents])
     assert len(contents) == len(input_contents)
     assert contents == {cont["sha1"]: [cont] for cont in input_contents}
     collision = 0
     for record in caplog.records:
         logtext = record.getMessage()
         if "Colliding contents:" in logtext:
             collision += 1
     assert collision == 0, "No collision should be detected"
 def test_storage_play_with_collision(
     kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog,
     """Another replayer scenario with collisions.
     - writes objects to the topic, including colliding contents
     - replayer consumes objects from the topic and replay them
     - This drops the colliding contents from the replay when detected
     kafka_prefix += ".swh.journal.objects"
     storage = get_storage(**storage_config)
     producer = Producer(
             "bootstrap.servers": kafka_server,
             "client.id": "test producer",
             "enable.idempotence": "true",
     now = datetime.datetime.now(tz=UTC)
     # Fill Kafka
     nb_sent = 0
     nb_visits = 0
     for object_type, objects in TEST_OBJECT_DICTS.items():
         topic = f"{kafka_prefix}.{object_type}"
         for object_ in objects:
             key = bytes(random.randint(0, 255) for _ in range(40))
             object_ = object_.copy()
             if object_type == "content":
                 object_["ctime"] = now
             elif object_type == "origin_visit":
                 nb_visits += 1
                 object_["visit"] = nb_visits
                 topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_),
             nb_sent += 1
     # Create collision in input data
     # They are not written in the destination
     for content in DUPLICATE_CONTENTS:
         topic = f"{kafka_prefix}.content"
             topic=topic, key=key_to_kafka(key), value=value_to_kafka(content),
         nb_sent += 1
     caplog.set_level(logging.ERROR, "swh.journal.replay")
     # Fill the storage from Kafka
     replayer = JournalClient(
     worker_fn = functools.partial(process_replay_objects, storage=storage)
     nb_inserted = 0
     while nb_inserted < nb_sent:
         nb_inserted += replayer.process(worker_fn)
     assert nb_sent == nb_inserted
     # Check the objects were actually inserted in the storage
     assert TEST_OBJECT_DICTS["revision"] == list(
         storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]])
     assert TEST_OBJECT_DICTS["release"] == list(
         storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]])
     origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]]))
     assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins]
     for origin in origins:
         origin_url = origin["url"]
         expected_visits = [
                 "origin": origin_url,
                 "date": dateutil.parser.parse(visit["date"]),
             for visit in TEST_OBJECT_DICTS["origin_visit"]
             if visit["origin"] == origin["url"]
         actual_visits = list(storage.origin_visit_get(origin_url))
         for visit in actual_visits:
             del visit["visit"]  # opaque identifier
         assert expected_visits == actual_visits
     input_contents = TEST_OBJECT_DICTS["content"]
     contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents])
     assert len(contents) == len(input_contents)
     assert contents == {cont["sha1"]: [cont] for cont in input_contents}
     nb_collisions = 0
     actual_collision: Dict
     for record in caplog.records:
         logtext = record.getMessage()
         if "Collision detected:" in logtext:
             nb_collisions += 1
             actual_collision = record.args["collision"]
     assert nb_collisions == 1, "1 collision should be detected"
     algo = "sha1"
     assert actual_collision["algo"] == algo
     expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo])
     assert actual_collision["hash"] == expected_colliding_hash
     actual_colliding_hashes = actual_collision["objects"]
     assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS)
     for content in DUPLICATE_CONTENTS:
         expected_content_hashes = {
             k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items()
         assert expected_content_hashes in actual_colliding_hashes
 def _test_write_replay_origin_visit(visits: List[Dict]):
     """Helper function to write tests for origin_visit.
     Each visit (a dict) given in the 'visits' argument will be sent to
     a (mocked) kafka queue, which a in-memory-storage backed replayer is
     listening to.
     Check that corresponding origin visits entities are present in the storage
     and have correct values if they are not skipped.
     queue: List = []
     replayer = MockedJournalClient(queue)
     writer = MockedKafkaWriter(queue)
     # Note that flipping the order of these two insertions will crash
     # the test, because the legacy origin_format does not allow to create
     # the origin when needed (type is missing)
             "url": "http://example.com/",
             "type": "git",  # test the legacy origin format is accepted
     for visit in visits:
         writer.send("origin_visit", "foo", visit)
     queue_size = len(queue)
     assert replayer.stop_after_objects is None
     replayer.stop_after_objects = queue_size
     storage = get_storage(**storage_config)
     worker_fn = functools.partial(process_replay_objects, storage=storage)
     actual_visits = list(storage.origin_visit_get("http://example.com/"))
     assert len(actual_visits) == len(visits), actual_visits
     for vin, vout in zip(visits, actual_visits):
         vin = vin.copy()
         vout = vout.copy()
         assert vout.pop("origin") == "http://example.com/"
         vin.setdefault("type", "git")
         vin.setdefault("metadata", None)
         assert vin == vout
-def test_write_replay_origin_visit():
+def test_write_replay_origin_visit(kafka_server):
     """Test origin_visit when the 'origin' is just a string."""
     now = datetime.datetime.now(tz=UTC)
     visits = [
             "visit": 1,
             "origin": "http://example.com/",
             "date": now,
             "type": "git",
             "status": "partial",
             "snapshot": None,
 def test_write_replay_legacy_origin_visit1():
     """Origin_visit with no types should make the replayer crash
     We expect the journal's origin_visit topic to no longer reference such
     visits. If it does, the replayer must crash so we can fix the journal's
     now = datetime.datetime.now(tz=UTC)
     visit = {
         "visit": 1,
         "origin": "http://example.com/",
         "date": now,
         "status": "partial",
         "snapshot": None,
     now2 = datetime.datetime.now(tz=UTC)
     visit2 = {
         "visit": 2,
         "origin": {"url": "http://example.com/"},
         "date": now2,
         "status": "partial",
         "snapshot": None,
     for origin_visit in [visit, visit2]:
         with pytest.raises(ValueError, match="Old origin visit format"):
 def test_write_replay_legacy_origin_visit2():
     """Test origin_visit when 'type' is missing from the visit, but not
     from the origin."""
     now = datetime.datetime.now(tz=UTC)
     visits = [
             "visit": 1,
             "origin": {"url": "http://example.com/", "type": "git",},
             "date": now,
             "type": "git",
             "status": "partial",
             "snapshot": None,
 def test_write_replay_legacy_origin_visit3():
     """Test origin_visit when the origin is a dict"""
     now = datetime.datetime.now(tz=UTC)
     visits = [
             "visit": 1,
             "origin": {"url": "http://example.com/",},
             "date": now,
             "type": "git",
             "status": "partial",
             "snapshot": None,
+def test_replay_skipped_content(kafka_server, kafka_prefix):
+    """Test the 'skipped_content' topic is properly replayed."""
+    _check_replay_skipped_content(kafka_server, kafka_prefix, "skipped_content")
+def test_replay_skipped_content_bwcompat(kafka_server, kafka_prefix):
+    """Test the 'content' topic can be used to replay SkippedContent objects."""
+    _check_replay_skipped_content(kafka_server, kafka_prefix, "content")
+def _check_replay_skipped_content(kafka_server, kafka_prefix, topic):
+    skipped_contents = _gen_skipped_contents(100)
+    nb_sent = len(skipped_contents)
+    producer = Producer(
+        {
+            "bootstrap.servers": kafka_server,
+            "client.id": "test producer",
+            "acks": "all",
+        }
+    )
+    replayer = JournalClient(
+        brokers=[kafka_server],
+        group_id="test consumer",
+        prefix=kafka_prefix,
+        stop_after_objects=nb_sent,
+    )
+    assert f"{kafka_prefix}.skipped_content" in replayer.subscription
+    for i, obj in enumerate(skipped_contents):
+        obj.pop("data", None)
+        producer.produce(
+            topic=f"{kafka_prefix}.{topic}",
+            key=key_to_kafka({"sha1": obj["sha1"]}),
+            value=value_to_kafka(obj),
+        )
+    producer.flush()
+    storage = get_storage(cls="memory")
+    worker_fn = functools.partial(process_replay_objects, storage=storage)
+    nb_inserted = replayer.process(worker_fn)
+    assert nb_sent == nb_inserted
+    for content in skipped_contents:
+        assert not storage.content_find({"sha1": content["sha1"]})
+    # no skipped_content_find API endpoint, so use this instead
+    assert not list(storage.skipped_content_missing(skipped_contents))
+def _updated(d1, d2):
+    d1.update(d2)
+    d1.pop("data", None)
+    return d1
+def _gen_skipped_contents(n=10):
+    # we do not use the hypothesis strategy here because this does not play well with
+    # pytest fixtures, and it makes test execution very slow
+    algos = DEFAULT_ALGORITHMS | {"length"}
+    now = datetime.datetime.now(tz=UTC)
+    return [
+        _updated(
+            MultiHash.from_data(data=f"foo{i}".encode(), hash_names=algos).digest(),
+            {
+                "status": "absent",
+                "reason": "why not",
+                "origin": f"https://somewhere/{i}",
+                "ctime": now,
+            },
+        )
+        for i in range(n)
+    ]
diff --git a/swh/storage/writer.py b/swh/storage/writer.py
index 70cd8a58..0aaba63b 100644
--- a/swh/storage/writer.py
+++ b/swh/storage/writer.py
@@ -1,88 +1,88 @@
 # Copyright (C) 2020 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 from typing import Iterable
 from attr import evolve
 from swh.model.model import (
     from swh.journal.writer import get_journal_writer
 except ImportError:
     get_journal_writer = None  # type: ignore
     # mypy limitation, see https://github.com/python/mypy/issues/1153
 class JournalWriter:
     """Journal writer storage collaborator. It's in charge of adding objects to
     the journal.
     def __init__(self, journal_writer):
         if journal_writer:
             if get_journal_writer is None:
                 raise EnvironmentError(
                     "You need the swh.journal package to use the "
                     "journal_writer feature"
             self.journal = get_journal_writer(**journal_writer)
             self.journal = None
     def write_additions(self, obj_type, values) -> None:
         if self.journal:
             self.journal.write_additions(obj_type, values)
     def content_add(self, contents: Iterable[Content]) -> None:
         """Add contents to the journal. Drop the data field if provided.
         contents = [evolve(item, data=None) for item in contents]
         self.write_additions("content", contents)
     def content_update(self, contents: Iterable[Content]) -> None:
         if self.journal:
             raise NotImplementedError("content_update is not supported by the journal.")
     def content_add_metadata(self, contents: Iterable[Content]) -> None:
     def skipped_content_add(self, contents: Iterable[SkippedContent]) -> None:
-        self.write_additions("content", contents)
+        self.write_additions("skipped_content", contents)
     def directory_add(self, directories: Iterable[Directory]) -> None:
         self.write_additions("directory", directories)
     def revision_add(self, revisions: Iterable[Revision]) -> None:
         self.write_additions("revision", revisions)
     def release_add(self, releases: Iterable[Release]) -> None:
         self.write_additions("release", releases)
     def snapshot_add(self, snapshots: Iterable[Snapshot]) -> None:
         self.write_additions("snapshot", snapshots)
     def origin_visit_add(self, visits: Iterable[OriginVisit]) -> None:
         self.write_additions("origin_visit", visits)
     def origin_visit_update(self, visits: Iterable[OriginVisit]) -> None:
         self.write_additions("origin_visit", visits)
     def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None:
         self.write_additions("origin_visit", visits)
     def origin_add(self, origins: Iterable[Origin]) -> None:
         self.write_additions("origin", origins)