diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py
index a7803ae7..00eb1057 100644
--- a/swh/storage/cassandra/model.py
+++ b/swh/storage/cassandra/model.py
@@ -1,318 +1,323 @@
 # Copyright (C) 2020  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 """Classes representing tables in the Cassandra database.
 
 They are very close to classes found in swh.model.model, but most of
 them are subtly different:
 
 * Large objects are split into other classes (eg. RevisionRow has no
   'parents' field, because parents are stored in a different table,
   represented by RevisionParentRow)
 * They have a "cols" field, which returns the list of column names
   of the table
 * They only use types that map directly to Cassandra's schema (ie. no enums)
 
 Therefore, this model doesn't reuse swh.model.model, except for types
 that can be mapped to UDTs (Person and TimestampWithTimezone).
 """
 
 import dataclasses
 import datetime
 from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, TypeVar
 
 from swh.model.model import Person, TimestampWithTimezone
 
 MAGIC_NULL_PK = b"<null>"
 """
 NULLs (or all-empty blobs) are not allowed in primary keys; instead we use a
 special value that can't possibly be a valid hash.
 """
 
 
 T = TypeVar("T", bound="BaseRow")
 
 
 def content_index_table_name(algo: str, skipped_content: bool) -> str:
     """Given an algorithm name, returns the name of one of the 'content_by_*'
     and 'skipped_content_by_*' tables that serve as index for the 'content'
     and 'skipped_content' tables based on this algorithm's hashes.
 
     For now it is a simple substitution, but future versions may append a version
     number to it, if needed for schema updates."""
     if skipped_content:
         return f"skipped_content_by_{algo}"
     else:
         return f"content_by_{algo}"
 
 
 class BaseRow:
     TABLE: ClassVar[str]
     PARTITION_KEY: ClassVar[Tuple[str, ...]]
     CLUSTERING_KEY: ClassVar[Tuple[str, ...]] = ()
 
     @classmethod
     def from_dict(cls: Type[T], d: Dict[str, Any]) -> T:
         return cls(**d)  # type: ignore
 
     @classmethod
     def cols(cls) -> List[str]:
         return [field.name for field in dataclasses.fields(cls)]
 
     def to_dict(self) -> Dict[str, Any]:
         return dataclasses.asdict(self)
 
 
 @dataclasses.dataclass
 class ContentRow(BaseRow):
     TABLE = "content"
-    PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256")
+    PARTITION_KEY: ClassVar[Tuple[str, ...]] = (
+        "sha1",
+        "sha1_git",
+        "sha256",
+        "blake2s256",
+    )
 
     sha1: bytes
     sha1_git: bytes
     sha256: bytes
     blake2s256: bytes
     length: int
     ctime: datetime.datetime
     status: str
 
 
 @dataclasses.dataclass
 class SkippedContentRow(BaseRow):
     TABLE = "skipped_content"
     PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256")
 
     sha1: Optional[bytes]
     sha1_git: Optional[bytes]
     sha256: Optional[bytes]
     blake2s256: Optional[bytes]
     length: Optional[int]
     ctime: Optional[datetime.datetime]
     status: str
     reason: str
     origin: str
 
     @classmethod
     def from_dict(cls, d: Dict[str, Any]) -> "SkippedContentRow":
         d = d.copy()
         for k in ("sha1", "sha1_git", "sha256", "blake2s256"):
             if d[k] == MAGIC_NULL_PK:
                 d[k] = None
         return super().from_dict(d)
 
 
 @dataclasses.dataclass
 class DirectoryRow(BaseRow):
     TABLE = "directory"
     PARTITION_KEY = ("id",)
 
     id: bytes
 
 
 @dataclasses.dataclass
 class DirectoryEntryRow(BaseRow):
     TABLE = "directory_entry"
     PARTITION_KEY = ("directory_id",)
     CLUSTERING_KEY = ("name",)
 
     directory_id: bytes
     name: bytes
     target: bytes
     perms: int
     type: str
 
 
 @dataclasses.dataclass
 class RevisionRow(BaseRow):
     TABLE = "revision"
     PARTITION_KEY = ("id",)
 
     id: bytes
     date: Optional[TimestampWithTimezone]
     committer_date: Optional[TimestampWithTimezone]
     type: str
     directory: bytes
     message: bytes
     author: Person
     committer: Person
     synthetic: bool
     metadata: str
     extra_headers: dict
 
 
 @dataclasses.dataclass
 class RevisionParentRow(BaseRow):
     TABLE = "revision_parent"
     PARTITION_KEY = ("id",)
     CLUSTERING_KEY = ("parent_rank",)
 
     id: bytes
     parent_rank: int
     parent_id: bytes
 
 
 @dataclasses.dataclass
 class ReleaseRow(BaseRow):
     TABLE = "release"
     PARTITION_KEY = ("id",)
 
     id: bytes
     target_type: str
     target: bytes
     date: TimestampWithTimezone
     name: bytes
     message: bytes
     author: Person
     synthetic: bool
 
 
 @dataclasses.dataclass
 class SnapshotRow(BaseRow):
     TABLE = "snapshot"
     PARTITION_KEY = ("id",)
 
     id: bytes
 
 
 @dataclasses.dataclass
 class SnapshotBranchRow(BaseRow):
     TABLE = "snapshot_branch"
     PARTITION_KEY = ("snapshot_id",)
     CLUSTERING_KEY = ("name",)
 
     snapshot_id: bytes
     name: bytes
     target_type: Optional[str]
     target: Optional[bytes]
 
 
 @dataclasses.dataclass
 class OriginVisitRow(BaseRow):
     TABLE = "origin_visit"
     PARTITION_KEY = ("origin",)
     CLUSTERING_KEY = ("visit",)
 
     origin: str
     visit: int
     date: datetime.datetime
     type: str
 
 
 @dataclasses.dataclass
 class OriginVisitStatusRow(BaseRow):
     TABLE = "origin_visit_status"
     PARTITION_KEY = ("origin",)
     CLUSTERING_KEY = ("visit", "date")
 
     origin: str
     visit: int
     date: datetime.datetime
     type: str
     status: str
     metadata: str
     snapshot: bytes
 
     @classmethod
     def from_dict(cls: Type[T], d: Dict[str, Any]) -> T:
         return cls(**d)  # type: ignore
 
 
 @dataclasses.dataclass
 class OriginRow(BaseRow):
     TABLE = "origin"
     PARTITION_KEY = ("sha1",)
 
     sha1: bytes
     url: str
     next_visit_id: int
 
 
 @dataclasses.dataclass
 class MetadataAuthorityRow(BaseRow):
     TABLE = "metadata_authority"
     PARTITION_KEY = ("url",)
     CLUSTERING_KEY = ("type",)
 
     url: str
     type: str
     metadata: str
 
 
 @dataclasses.dataclass
 class MetadataFetcherRow(BaseRow):
     TABLE = "metadata_fetcher"
     PARTITION_KEY = ("name",)
     CLUSTERING_KEY = ("version",)
 
     name: str
     version: str
     metadata: str
 
 
 @dataclasses.dataclass
 class RawExtrinsicMetadataRow(BaseRow):
     TABLE = "raw_extrinsic_metadata"
     PARTITION_KEY = ("target",)
     CLUSTERING_KEY = (
         "authority_type",
         "authority_url",
         "discovery_date",
         "id",
     )
 
     id: bytes
 
     type: str
     target: str
 
     authority_type: str
     authority_url: str
     discovery_date: datetime.datetime
     fetcher_name: str
     fetcher_version: str
 
     format: str
     metadata: bytes
 
     origin: Optional[str]
     visit: Optional[int]
     snapshot: Optional[str]
     release: Optional[str]
     revision: Optional[str]
     path: Optional[bytes]
     directory: Optional[str]
 
 
 @dataclasses.dataclass
 class ObjectCountRow(BaseRow):
     TABLE = "object_count"
     PARTITION_KEY = ("partition_key",)
     CLUSTERING_KEY = ("object_type",)
 
     partition_key: int
     object_type: str
     count: int
 
 
 @dataclasses.dataclass
 class ExtIDRow(BaseRow):
     TABLE = "extid"
     PARTITION_KEY = ("target", "target_type", "extid", "extid_type")
 
     extid_type: str
     extid: bytes
     target_type: str
     target: bytes
 
 
 @dataclasses.dataclass
 class ExtIDByTargetRow(BaseRow):
     TABLE = "extid_by_target"
     PARTITION_KEY = ("target_type", "target")
     CLUSTERING_KEY = ("target_token",)
 
     target_type: str
     target: bytes
     target_token: int
diff --git a/swh/storage/tests/test_cassandra_migration.py b/swh/storage/tests/test_cassandra_migration.py
index 40c1d262..1e46f52e 100644
--- a/swh/storage/tests/test_cassandra_migration.py
+++ b/swh/storage/tests/test_cassandra_migration.py
@@ -1,163 +1,341 @@
 # Copyright (C) 2021  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 """This module tests the migration capabilities of the Cassandra backend,
 by sending CQL commands (eg. 'ALTER TABLE'), and
 by monkey-patching large parts of the implementations to simulate code updates,."""
 
 import dataclasses
 import functools
 import operator
 from typing import Dict, Iterable, Optional
 
 import attr
 import pytest
 
 from swh.model.model import Content
 from swh.storage import get_storage
 from swh.storage.cassandra.cql import (
     CqlRunner,
     _prepared_insert_statement,
     _prepared_select_statement,
 )
 from swh.storage.cassandra.model import ContentRow
 from swh.storage.cassandra.schema import CONTENT_INDEX_TEMPLATE, HASH_ALGORITHMS
 from swh.storage.cassandra.storage import CassandraStorage
 from swh.storage.exc import StorageArgumentException
 
 from .storage_data import StorageData
 from .test_cassandra import (  # noqa, needed for swh_storage fixture
     cassandra_cluster,
     keyspace,
     swh_storage_backend_config,
 )
 
+##############################
+# Common structures
+
 
 def byte_xor_hash(data):
     # Behold, a one-line hash function:
     return bytes([functools.reduce(operator.xor, data)])
 
 
 @attr.s
 class ContentWithXor(Content):
     """An hypothetical upgrade of Content with an extra "hash"."""
 
     byte_xor = attr.ib(type=bytes, default=None)
 
 
+##############################
+# Test simple migrations
+
+
 @dataclasses.dataclass
 class ContentRowWithXor(ContentRow):
-    """An hypothetical upgrade of ContentRow with an extra "hash"."""
+    """An hypothetical upgrade of ContentRow with an extra "hash",
+    but not in the primary key."""
 
     byte_xor: bytes
 
 
 class CqlRunnerWithXor(CqlRunner):
+    """An hypothetical upgrade of ContentRow with an extra "hash",
+    but not in the primary key."""
+
     @_prepared_select_statement(
         ContentRowWithXor,
         f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}",
     )
     def content_get_from_pk(
         self, content_hashes: Dict[str, bytes], *, statement
     ) -> Optional[ContentRow]:
         rows = list(
             self._execute_with_retries(
                 statement, [content_hashes[algo] for algo in HASH_ALGORITHMS]
             )
         )
         assert len(rows) <= 1
         if rows:
             return ContentRowWithXor(**rows[0])
         else:
             return None
 
     @_prepared_select_statement(
-        ContentRowWithXor, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?"
+        ContentRowWithXor,
+        f"WHERE token({', '.join(ContentRowWithXor.PARTITION_KEY)}) = ?",
     )
-    def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]:
+    def content_get_from_token(
+        self, token, *, statement
+    ) -> Iterable[ContentRowWithXor]:
         return map(
             ContentRowWithXor.from_dict, self._execute_with_retries(statement, [token])
         )
 
     # Redecorate content_add_prepare with the new ContentRow class
     content_add_prepare = _prepared_insert_statement(ContentRowWithXor)(  # type: ignore
         CqlRunner.content_add_prepare.__wrapped__  # type: ignore
     )
 
 
 def test_add_content_column(
     swh_storage: CassandraStorage, swh_storage_backend_config, mocker  # noqa
 ) -> None:
     """Adds a column to the 'content' table and a new matching index.
     This is a simple migration, as it does not require an update to the primary key.
     """
     content_xor_hash = byte_xor_hash(StorageData.content.data)
 
     # First insert some existing data
     swh_storage.content_add([StorageData.content, StorageData.content2])
 
     # Then update the schema
     swh_storage._cql_runner._session.execute("ALTER TABLE content ADD byte_xor blob")
     for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"):
         swh_storage._cql_runner._session.execute(statement.format(main_algo="byte_xor"))
 
     # Should not affect the running code at all:
     assert swh_storage.content_get([StorageData.content.sha1]) == [
         attr.evolve(StorageData.content, data=None)
     ]
     with pytest.raises(StorageArgumentException):
         swh_storage.content_find({"byte_xor": content_xor_hash})
 
     # Then update the running code:
     new_hash_algos = HASH_ALGORITHMS + ["byte_xor"]
     mocker.patch("swh.storage.cassandra.storage.HASH_ALGORITHMS", new_hash_algos)
     mocker.patch("swh.storage.cassandra.cql.HASH_ALGORITHMS", new_hash_algos)
     mocker.patch("swh.model.model.DEFAULT_ALGORITHMS", new_hash_algos)
     mocker.patch("swh.storage.cassandra.storage.Content", ContentWithXor)
     mocker.patch("swh.storage.cassandra.storage.ContentRow", ContentRowWithXor)
     mocker.patch("swh.storage.cassandra.model.ContentRow", ContentRowWithXor)
     mocker.patch("swh.storage.cassandra.storage.CqlRunner", CqlRunnerWithXor)
 
     # Forge new objects with this extra hash:
     new_content = ContentWithXor.from_dict(
         {
             "byte_xor": byte_xor_hash(StorageData.content.data),
             **StorageData.content.to_dict(),
         }
     )
     new_content2 = ContentWithXor.from_dict(
         {
             "byte_xor": byte_xor_hash(StorageData.content2.data),
             **StorageData.content2.to_dict(),
         }
     )
 
     # Simulates a restart:
     swh_storage._set_cql_runner()
 
     # Old algos still works, and return the new object type:
     assert swh_storage.content_get([StorageData.content.sha1]) == [
         attr.evolve(new_content, data=None, byte_xor=None)
     ]
 
     # The new algo does not work, we did not backfill it yet:
     assert swh_storage.content_find({"byte_xor": content_xor_hash}) == []
 
     # A normal storage would not overwrite, because the object already exists,
     # as it is not aware it is missing a field:
     swh_storage.content_add([new_content, new_content2])
     assert swh_storage.content_find({"byte_xor": content_xor_hash}) == []
 
     # Backfill (in production this would be done with a replayer reading from
     # the journal):
     overwriting_swh_storage = get_storage(
         allow_overwrite=True, **swh_storage_backend_config
     )
     overwriting_swh_storage.content_add([new_content, new_content2])
 
     # Now, the object can be found:
     assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [
         attr.evolve(new_content, data=None)
     ]
+
+
+##############################
+# Test complex  migrations
+
+
+@dataclasses.dataclass
+class ContentRowWithXorPK(ContentRow):
+    """An hypothetical upgrade of ContentRow with an extra "hash",
+    in the primary key."""
+
+    TABLE = "content_v2"
+    PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256", "byte_xor")
+
+    byte_xor: bytes
+
+
+class CqlRunnerWithXorPK(CqlRunner):
+    """An hypothetical upgrade of ContentRow with an extra "hash",
+    but not in the primary key."""
+
+    @_prepared_select_statement(
+        ContentRowWithXorPK,
+        f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))} AND byte_xor=?",
+    )
+    def content_get_from_pk(
+        self, content_hashes: Dict[str, bytes], *, statement
+    ) -> Optional[ContentRow]:
+        rows = list(
+            self._execute_with_retries(
+                statement,
+                [content_hashes[algo] for algo in HASH_ALGORITHMS + ["byte_xor"]],
+            )
+        )
+        assert len(rows) <= 1
+        if rows:
+            return ContentRowWithXorPK(**rows[0])
+        else:
+            return None
+
+    @_prepared_select_statement(
+        ContentRowWithXorPK,
+        f"WHERE token({', '.join(ContentRowWithXorPK.PARTITION_KEY)}) = ?",
+    )
+    def content_get_from_token(
+        self, token, *, statement
+    ) -> Iterable[ContentRowWithXorPK]:
+        return map(
+            ContentRowWithXorPK.from_dict,
+            self._execute_with_retries(statement, [token]),
+        )
+
+    # Redecorate content_add_prepare with the new ContentRow class
+    content_add_prepare = _prepared_insert_statement(ContentRowWithXorPK)(  # type: ignore  # noqa
+        CqlRunner.content_add_prepare.__wrapped__  # type: ignore
+    )
+
+
+def test_change_content_pk(
+    swh_storage: CassandraStorage, swh_storage_backend_config, mocker  # noqa
+) -> None:
+    """Adds a column to the 'content' table and a new matching index;
+    and make this new column part of the primary key
+    This is a complex migration, as it requires copying the whole table
+    """
+    content_xor_hash = byte_xor_hash(StorageData.content.data)
+
+    # First insert some existing data
+    swh_storage.content_add([StorageData.content, StorageData.content2])
+
+    # Then add a new table and a new index
+    swh_storage._cql_runner._session.execute(
+        """
+        CREATE TABLE IF NOT EXISTS content_v2 (
+            sha1          blob,
+            sha1_git      blob,
+            sha256        blob,
+            blake2s256    blob,
+            byte_xor      blob,
+            length        bigint,
+            ctime         timestamp,
+                -- creation time, i.e. time of (first) injection into the storage
+            status        ascii,
+            PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256, byte_xor))
+        );"""
+    )
+    for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"):
+        swh_storage._cql_runner._session.execute(statement.format(main_algo="byte_xor"))
+
+    # Should not affect the running code at all:
+    assert swh_storage.content_get([StorageData.content.sha1]) == [
+        attr.evolve(StorageData.content, data=None)
+    ]
+    with pytest.raises(StorageArgumentException):
+        swh_storage.content_find({"byte_xor": content_xor_hash})
+
+    # Then update the running code:
+    new_hash_algos = HASH_ALGORITHMS + ["byte_xor"]
+    mocker.patch("swh.storage.cassandra.storage.HASH_ALGORITHMS", new_hash_algos)
+    mocker.patch("swh.storage.cassandra.cql.HASH_ALGORITHMS", new_hash_algos)
+    mocker.patch("swh.model.model.DEFAULT_ALGORITHMS", new_hash_algos)
+    mocker.patch("swh.storage.cassandra.storage.Content", ContentWithXor)
+    mocker.patch("swh.storage.cassandra.storage.ContentRow", ContentRowWithXorPK)
+    mocker.patch("swh.storage.cassandra.model.ContentRow", ContentRowWithXorPK)
+    mocker.patch("swh.storage.cassandra.storage.CqlRunner", CqlRunnerWithXorPK)
+
+    # Forge new objects with this extra hash:
+    new_content = ContentWithXor.from_dict(
+        {
+            "byte_xor": byte_xor_hash(StorageData.content.data),
+            **StorageData.content.to_dict(),
+        }
+    )
+    new_content2 = ContentWithXor.from_dict(
+        {
+            "byte_xor": byte_xor_hash(StorageData.content2.data),
+            **StorageData.content2.to_dict(),
+        }
+    )
+
+    # Replay to the new table.
+    # In production this would be done with a replayer reading from the journal,
+    # while loaders would still write to the DB.
+    overwriting_swh_storage = get_storage(
+        allow_overwrite=True, **swh_storage_backend_config
+    )
+    overwriting_swh_storage.content_add([new_content, new_content2])
+
+    # Old algos still works, and return the new object type;
+    # but the byte_xor value is None because it is only available in the new
+    # table, which this storage is not yet configured to use
+    assert swh_storage.content_get([StorageData.content.sha1]) == [
+        attr.evolve(new_content, data=None, byte_xor=None)
+    ]
+
+    # When the replayer gets close to the end of the logs, loaders are stopped
+    # to allow the replayer to catch up with the end of the log.
+    # When it does, we can switch over to the new swh-storage's code.
+
+    # Simulates a restart:
+    swh_storage._set_cql_runner()
+
+    # Now, the object can be found with the new hash:
+    assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [
+        attr.evolve(new_content, data=None)
+    ]
+
+    # Remove the old table:
+    swh_storage._cql_runner._session.execute("DROP TABLE content")
+
+    # Object is still available, because we don't use it anymore
+    assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [
+        attr.evolve(new_content, data=None)
+    ]
+
+    # THE END.
+
+    # Test teardown expects a table with this name to exist:
+    swh_storage._cql_runner._session.execute(
+        "CREATE TABLE content (foo blob PRIMARY KEY);"
+    )
+
+    # Clean up this table, test teardown does not know about it:
+    swh_storage._cql_runner._session.execute("DROP TABLE content_v2;")