diff --git a/swh/storage/tests/data/sql-v0.18.0/10-superuser-init.sql b/swh/storage/tests/data/sql-v0.18.0/10-superuser-init.sql
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/data/sql-v0.18.0/10-superuser-init.sql
@@ -0,0 +1,27 @@
+-- require being Postgres super user
+
+create extension if not exists btree_gist;
+create extension if not exists pgcrypto;
+create extension if not exists pg_trgm;
+
+-- courtesy of  Andreas 'ads' Scherbaum in
+-- https://andreas.scherbaum.la/blog/archives/346-create-language-if-not-exist.html
+create or replace function public.create_plpgsql_language ()
+    returns text
+    as $$
+        create language plpgsql;
+        select 'language plpgsql created'::text;
+    $$
+language 'sql';
+
+select case when
+    (select true::boolean
+       from pg_language
+       where lanname='plpgsql')
+    then
+      (select 'language already installed'::text)
+    else
+      (select public.create_plpgsql_language())
+    end;
+
+drop function public.create_plpgsql_language ();
diff --git a/swh/storage/tests/data/sql-v0.18.0/15-flavor.sql b/swh/storage/tests/data/sql-v0.18.0/15-flavor.sql
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/data/sql-v0.18.0/15-flavor.sql
@@ -0,0 +1,22 @@
+-- database flavor
+create type database_flavor as enum (
+  'default', -- default: full index availability for deduplication and read queries
+  'mirror', -- mirror: reduced indexes to allow for out of order insertions
+  'read_replica' -- read replica: minimal indexes to allow read queries
+);
+comment on type database_flavor is 'Flavor of the current database';
+
+create table dbflavor (
+  flavor      database_flavor,
+  single_row  char(1) primary key default 'x',
+  check       (single_row = 'x')
+);
+comment on table dbflavor is 'Database flavor storage';
+comment on column dbflavor.flavor is 'Database flavor currently deployed';
+comment on column dbflavor.single_row is 'Bogus column to force the table to have a single row';
+
+create or replace function swh_get_dbflavor() returns database_flavor language sql stable as $$
+  select coalesce((select flavor from dbflavor), 'default');
+$$;
+
+comment on function swh_get_dbflavor is 'Get the flavor of the database currently deployed';
diff --git a/swh/storage/tests/data/sql-v0.18.0/20-enums.sql b/swh/storage/tests/data/sql-v0.18.0/20-enums.sql
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/data/sql-v0.18.0/20-enums.sql
@@ -0,0 +1,23 @@
+---
+--- Software Heritage Data Types
+---
+
+create type content_status as enum ('absent', 'visible', 'hidden');
+comment on type content_status is 'Content visibility';
+
+create type revision_type as enum ('git', 'tar', 'dsc', 'svn', 'hg');
+comment on type revision_type is 'Possible revision types';
+
+create type object_type as enum ('content', 'directory', 'revision', 'release', 'snapshot');
+comment on type object_type is 'Data object types stored in data model';
+
+create type snapshot_target as enum ('content', 'directory', 'revision', 'release', 'snapshot', 'alias');
+comment on type snapshot_target is 'Types of targets for snapshot branches';
+
+create type origin_visit_state as enum (
+  'created',
+  'ongoing',
+  'full',
+  'partial'
+);
+comment on type origin_visit_state IS 'Possible origin visit values';
diff --git a/swh/storage/tests/data/sql-v0.18.0/30-schema.sql b/swh/storage/tests/data/sql-v0.18.0/30-schema.sql
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/data/sql-v0.18.0/30-schema.sql
@@ -0,0 +1,499 @@
+---
+--- SQL implementation of the Software Heritage data model
+---
+
+-- schema versions
+create table dbversion
+(
+  version     int primary key,
+  release     timestamptz,
+  description text
+);
+
+comment on table dbversion is 'Details of current db version';
+comment on column dbversion.version is 'SQL schema version';
+comment on column dbversion.release is 'Version deployment timestamp';
+comment on column dbversion.description is 'Release description';
+
+-- latest schema version
+insert into dbversion(version, release, description)
+      values(164, now(), 'Work In Progress');
+
+-- a SHA1 checksum
+create domain sha1 as bytea check (length(value) = 20);
+
+-- a Git object ID, i.e., a Git-style salted SHA1 checksum
+create domain sha1_git as bytea check (length(value) = 20);
+
+-- a SHA256 checksum
+create domain sha256 as bytea check (length(value) = 32);
+
+-- a blake2 checksum
+create domain blake2s256 as bytea check (length(value) = 32);
+
+-- UNIX path (absolute, relative, individual path component, etc.)
+create domain unix_path as bytea;
+
+-- a set of UNIX-like access permissions, as manipulated by, e.g., chmod
+create domain file_perms as int;
+
+-- an SWHID
+create domain swhid as text check (value ~ '^swh:[0-9]+:.*');
+
+
+-- Checksums about actual file content. Note that the content itself is not
+-- stored in the DB, but on external (key-value) storage. A single checksum is
+-- used as key there, but the other can be used to verify that we do not inject
+-- content collisions not knowingly.
+create table content
+(
+  sha1       sha1 not null,
+  sha1_git   sha1_git not null,
+  sha256     sha256 not null,
+  blake2s256 blake2s256 not null,
+  length     bigint not null,
+  ctime      timestamptz not null default now(),
+             -- creation time, i.e. time of (first) injection into the storage
+  status     content_status not null default 'visible',
+  object_id  bigserial
+);
+
+comment on table content is 'Checksums of file content which is actually stored externally';
+comment on column content.sha1 is 'Content sha1 hash';
+comment on column content.sha1_git is 'Git object sha1 hash';
+comment on column content.sha256 is 'Content Sha256 hash';
+comment on column content.blake2s256 is 'Content blake2s hash';
+comment on column content.length is 'Content length';
+comment on column content.ctime is 'First seen time';
+comment on column content.status is 'Content status (absent, visible, hidden)';
+comment on column content.object_id is 'Content identifier';
+
+
+-- An origin is a place, identified by an URL, where software source code
+-- artifacts can be found. We support different kinds of origins, e.g., git and
+-- other VCS repositories, web pages that list tarballs URLs (e.g.,
+-- http://www.kernel.org), indirect tarball URLs (e.g.,
+-- http://www.example.org/latest.tar.gz), etc. The key feature of an origin is
+-- that it can be *fetched* from (wget, git clone, svn checkout, etc.) to
+-- retrieve all the contained software.
+create table origin
+(
+  id       bigserial not null,
+  url      text not null
+);
+
+comment on column origin.id is 'Artifact origin id';
+comment on column origin.url is 'URL of origin';
+
+
+-- Content blobs observed somewhere, but not ingested into the archive for
+-- whatever reason. This table is separate from the content table as we might
+-- not have the sha1 checksum of skipped contents (for instance when we inject
+-- git repositories, objects that are too big will be skipped here, and we will
+-- only know their sha1_git). 'reason' contains the reason the content was
+-- skipped. origin is a nullable column allowing to find out which origin
+-- contains that skipped content.
+create table skipped_content
+(
+  sha1       sha1,
+  sha1_git   sha1_git,
+  sha256     sha256,
+  blake2s256 blake2s256,
+  length     bigint not null,
+  ctime      timestamptz not null default now(),
+  status     content_status not null default 'absent',
+  reason     text not null,
+  origin     bigint,
+  object_id  bigserial
+);
+
+comment on table skipped_content is 'Content blobs observed, but not ingested in the archive';
+comment on column skipped_content.sha1 is 'Skipped content sha1 hash';
+comment on column skipped_content.sha1_git is 'Git object sha1 hash';
+comment on column skipped_content.sha256 is 'Skipped content sha256 hash';
+comment on column skipped_content.blake2s256 is 'Skipped content blake2s hash';
+comment on column skipped_content.length is 'Skipped content length';
+comment on column skipped_content.ctime is 'First seen time';
+comment on column skipped_content.status is 'Skipped content status (absent, visible, hidden)';
+comment on column skipped_content.reason is 'Reason for skipping';
+comment on column skipped_content.origin is 'Origin table identifier';
+comment on column skipped_content.object_id is 'Skipped content identifier';
+
+
+-- A file-system directory.  A directory is a list of directory entries (see
+-- tables: directory_entry_{dir,file}).
+--
+-- To list the contents of a directory:
+-- 1. list the contained directory_entry_dir using array dir_entries
+-- 2. list the contained directory_entry_file using array file_entries
+-- 3. list the contained directory_entry_rev using array rev_entries
+-- 4. UNION
+--
+-- Synonyms/mappings:
+-- * git: tree
+create table directory
+(
+  id            sha1_git not null,
+  dir_entries   bigint[],  -- sub-directories, reference directory_entry_dir
+  file_entries  bigint[],  -- contained files, reference directory_entry_file
+  rev_entries   bigint[],  -- mounted revisions, reference directory_entry_rev
+  object_id     bigserial  -- short object identifier
+);
+
+comment on table directory is 'Contents of a directory, synonymous to tree (git)';
+comment on column directory.id is 'Git object sha1 hash';
+comment on column directory.dir_entries is 'Sub-directories, reference directory_entry_dir';
+comment on column directory.file_entries is 'Contained files, reference directory_entry_file';
+comment on column directory.rev_entries is 'Mounted revisions, reference directory_entry_rev';
+comment on column directory.object_id is 'Short object identifier';
+
+
+-- A directory entry pointing to a (sub-)directory.
+create table directory_entry_dir
+(
+  id      bigserial,
+  target  sha1_git not null,   -- id of target directory
+  name    unix_path not null,  -- path name, relative to containing dir
+  perms   file_perms not null  -- unix-like permissions
+);
+
+comment on table directory_entry_dir is 'Directory entry for directory';
+comment on column directory_entry_dir.id is 'Directory identifier';
+comment on column directory_entry_dir.target is 'Target directory identifier';
+comment on column directory_entry_dir.name is 'Path name, relative to containing directory';
+comment on column directory_entry_dir.perms is 'Unix-like permissions';
+
+
+-- A directory entry pointing to a file content.
+create table directory_entry_file
+(
+  id      bigserial,
+  target  sha1_git not null,   -- id of target file
+  name    unix_path not null,  -- path name, relative to containing dir
+  perms   file_perms not null  -- unix-like permissions
+);
+
+comment on table directory_entry_file is 'Directory entry for file';
+comment on column directory_entry_file.id is 'File identifier';
+comment on column directory_entry_file.target is 'Target file identifier';
+comment on column directory_entry_file.name is 'Path name, relative to containing directory';
+comment on column directory_entry_file.perms is 'Unix-like permissions';
+
+
+-- A directory entry pointing to a revision.
+create table directory_entry_rev
+(
+  id      bigserial,
+  target  sha1_git not null,   -- id of target revision
+  name    unix_path not null,  -- path name, relative to containing dir
+  perms   file_perms not null  -- unix-like permissions
+);
+
+comment on table directory_entry_rev is 'Directory entry for revision';
+comment on column directory_entry_dir.id is 'Revision identifier';
+comment on column directory_entry_dir.target is 'Target revision in identifier';
+comment on column directory_entry_dir.name is 'Path name, relative to containing directory';
+comment on column directory_entry_dir.perms is 'Unix-like permissions';
+
+
+-- A person referenced by some source code artifacts, e.g., a VCS revision or
+-- release metadata.
+create table person
+(
+  id        bigserial,
+  name      bytea,          -- advisory: not null if we managed to parse a name
+  email     bytea,          -- advisory: not null if we managed to parse an email
+  fullname  bytea not null  -- freeform specification; what is actually used in the checksums
+                            --     will usually be of the form 'name <email>'
+);
+
+comment on table person is 'Person referenced in code artifact release metadata';
+comment on column person.id is 'Person identifier';
+comment on column person.name is 'Name';
+comment on column person.email is 'Email';
+comment on column person.fullname is 'Full name (raw name)';
+
+
+-- The state of a source code tree at a specific point in time.
+--
+-- Synonyms/mappings:
+-- * git / subversion / etc: commit
+-- * tarball: a specific tarball
+--
+-- Revisions are organized as DAGs. Each revision points to 0, 1, or more (in
+-- case of merges) parent revisions. Each revision points to a directory, i.e.,
+-- a file-system tree containing files and directories.
+create table revision
+(
+  id                    sha1_git not null,
+  date                  timestamptz,
+  date_offset           smallint,
+  committer_date        timestamptz,
+  committer_date_offset smallint,
+  type                  revision_type not null,
+  directory             sha1_git,  -- source code 'root' directory
+  message               bytea,
+  author                bigint,
+  committer             bigint,
+  synthetic             boolean not null default false,  -- true iff revision has been created by Software Heritage
+  metadata              jsonb,  -- extra metadata (tarball checksums, extra commit information, etc...)
+  object_id             bigserial,
+  date_neg_utc_offset   boolean,
+  committer_date_neg_utc_offset boolean,
+  extra_headers         bytea[][] not null  -- extra headers (used in hash computation)
+);
+
+comment on table revision is 'A revision represents the state of a source code tree at a specific point in time';
+comment on column revision.id is 'Git-style SHA1 commit identifier';
+comment on column revision.date is 'Author timestamp as UNIX epoch';
+comment on column revision.date_offset is 'Author timestamp timezone, as minute offsets from UTC';
+comment on column revision.date_neg_utc_offset is 'True indicates a -0 UTC offset on author timestamp';
+comment on column revision.committer_date is 'Committer timestamp as UNIX epoch';
+comment on column revision.committer_date_offset is 'Committer timestamp timezone, as minute offsets from UTC';
+comment on column revision.committer_date_neg_utc_offset is 'True indicates a -0 UTC offset on committer timestamp';
+comment on column revision.type is 'Type of revision';
+comment on column revision.directory is 'Directory identifier';
+comment on column revision.message is 'Commit message';
+comment on column revision.author is 'Author identity';
+comment on column revision.committer is 'Committer identity';
+comment on column revision.synthetic is 'True iff revision has been synthesized by Software Heritage';
+comment on column revision.metadata is 'Extra revision metadata';
+comment on column revision.object_id is 'Non-intrinsic, sequential object identifier';
+comment on column revision.extra_headers is 'Extra revision headers; used in revision hash computation';
+
+
+-- either this table or the sha1_git[] column on the revision table
+create table revision_history
+(
+  id           sha1_git not null,
+  parent_id    sha1_git not null,
+  parent_rank  int not null default 0
+    -- parent position in merge commits, 0-based
+);
+
+comment on table revision_history is 'Sequence of revision history with parent and position in history';
+comment on column revision_history.id is 'Revision history git object sha1 checksum';
+comment on column revision_history.parent_id is 'Parent revision git object identifier';
+comment on column revision_history.parent_rank is 'Parent position in merge commits, 0-based';
+
+
+-- Crawling history of software origins visited by Software Heritage. Each
+-- visit is a 3-way mapping between a software origin, a timestamp, and a
+-- snapshot object capturing the full-state of the origin at visit time.
+create table origin_visit
+(
+  origin       bigint not null,
+  visit        bigint not null,
+  date         timestamptz not null,
+  type         text not null
+);
+
+comment on column origin_visit.origin is 'Visited origin';
+comment on column origin_visit.visit is 'Sequential visit number for the origin';
+comment on column origin_visit.date is 'Visit timestamp';
+comment on column origin_visit.type is 'Type of loader that did the visit (hg, git, ...)';
+
+
+-- Crawling history of software origin visits by Software Heritage. Each
+-- visit see its history change through new origin visit status updates
+create table origin_visit_status
+(
+  origin   bigint not null,
+  visit    bigint not null,
+  date     timestamptz not null,
+  status   origin_visit_state not null,
+  metadata jsonb,
+  snapshot sha1_git
+);
+
+comment on column origin_visit_status.origin is 'Origin concerned by the visit update';
+comment on column origin_visit_status.visit is 'Visit concerned by the visit update';
+comment on column origin_visit_status.date is 'Visit update timestamp';
+comment on column origin_visit_status.status is 'Visit status (ongoing, failed, full)';
+comment on column origin_visit_status.metadata is 'Optional origin visit metadata';
+comment on column origin_visit_status.snapshot is 'Optional, possibly partial, snapshot of the origin visit. It can be partial.';
+
+
+-- A snapshot represents the entire state of a software origin as crawled by
+-- Software Heritage. This table is a simple mapping between (public) intrinsic
+-- snapshot identifiers and (private) numeric sequential identifiers.
+create table snapshot
+(
+  object_id  bigserial not null,  -- PK internal object identifier
+  id         sha1_git not null    -- snapshot intrinsic identifier
+);
+
+comment on table snapshot is 'State of a software origin as crawled by Software Heritage';
+comment on column snapshot.object_id is 'Internal object identifier';
+comment on column snapshot.id is 'Intrinsic snapshot identifier';
+
+
+-- Each snapshot associate "branch" names to other objects in the Software
+-- Heritage Merkle DAG. This table describes branches as mappings between names
+-- and target typed objects.
+create table snapshot_branch
+(
+  object_id    bigserial not null,  -- PK internal object identifier
+  name         bytea not null,      -- branch name, e.g., "master" or "feature/drag-n-drop"
+  target       bytea,               -- target object identifier, e.g., a revision identifier
+  target_type  snapshot_target      -- target object type, e.g., "revision"
+);
+
+comment on table snapshot_branch is 'Associates branches with objects in Heritage Merkle DAG';
+comment on column snapshot_branch.object_id is 'Internal object identifier';
+comment on column snapshot_branch.name is 'Branch name';
+comment on column snapshot_branch.target is 'Target object identifier';
+comment on column snapshot_branch.target_type is 'Target object type';
+
+
+-- Mapping between snapshots and their branches.
+create table snapshot_branches
+(
+  snapshot_id  bigint not null,  -- snapshot identifier, ref. snapshot.object_id
+  branch_id    bigint not null   -- branch identifier, ref. snapshot_branch.object_id
+);
+
+comment on table snapshot_branches is 'Mapping between snapshot and their branches';
+comment on column snapshot_branches.snapshot_id is 'Snapshot identifier';
+comment on column snapshot_branches.branch_id is 'Branch identifier';
+
+
+-- A "memorable" point in time in the development history of a software
+-- project.
+--
+-- Synonyms/mappings:
+-- * git: tag (of the annotated kind, otherwise they are just references)
+-- * tarball: the release version number
+create table release
+(
+  id          sha1_git not null,
+  target      sha1_git,
+  date        timestamptz,
+  date_offset smallint,
+  name        bytea,
+  comment     bytea,
+  author      bigint,
+  synthetic   boolean not null default false,  -- true iff release has been created by Software Heritage
+  object_id   bigserial,
+  target_type object_type not null,
+  date_neg_utc_offset  boolean
+);
+
+comment on table release is 'Details of a software release, synonymous with
+ a tag (git) or version number (tarball)';
+comment on column release.id is 'Release git identifier';
+comment on column release.target is 'Target git identifier';
+comment on column release.date is 'Release timestamp';
+comment on column release.date_offset is 'Timestamp offset from UTC';
+comment on column release.name is 'Name';
+comment on column release.comment is 'Comment';
+comment on column release.author is 'Author';
+comment on column release.synthetic is 'Indicates if created by Software Heritage';
+comment on column release.object_id is 'Object identifier';
+comment on column release.target_type is 'Object type (''content'', ''directory'', ''revision'',
+ ''release'', ''snapshot'')';
+comment on column release.date_neg_utc_offset is 'True indicates -0 UTC offset for release timestamp';
+
+-- Tools
+create table metadata_fetcher
+(
+  id            serial  not null,
+  name          text    not null,
+  version       text    not null,
+  metadata      jsonb   not null
+);
+
+comment on table metadata_fetcher is 'Tools used to retrieve metadata';
+comment on column metadata_fetcher.id is 'Internal identifier of the fetcher';
+comment on column metadata_fetcher.name is 'Fetcher name';
+comment on column metadata_fetcher.version is 'Fetcher version';
+comment on column metadata_fetcher.metadata is 'Extra information about the fetcher';
+
+
+create table metadata_authority
+(
+  id            serial  not null,
+  type          text    not null,
+  url           text    not null,
+  metadata      jsonb   not null
+);
+
+comment on table metadata_authority is 'Metadata authority information';
+comment on column metadata_authority.id is 'Internal identifier of the authority';
+comment on column metadata_authority.type is 'Type of authority (deposit_client/forge/registry)';
+comment on column metadata_authority.url is 'Authority''s uri';
+comment on column metadata_authority.metadata is 'Other metadata about authority';
+
+
+-- Extrinsic metadata on a DAG objects and origins.
+create table raw_extrinsic_metadata
+(
+  type           text          not null,
+  target         text          not null,
+
+  -- metadata source
+  authority_id   bigint        not null,
+  fetcher_id     bigint        not null,
+  discovery_date timestamptz   not null,
+
+  -- metadata itself
+  format         text          not null,
+  metadata       bytea         not null,
+
+  -- context
+  origin         text,
+  visit          bigint,
+  snapshot       swhid,
+  release        swhid,
+  revision       swhid,
+  path           bytea,
+  directory      swhid
+);
+
+comment on table raw_extrinsic_metadata is 'keeps all metadata found concerning an object';
+comment on column raw_extrinsic_metadata.type is 'the type of object (content/directory/revision/release/snapshot/origin) the metadata is on';
+comment on column raw_extrinsic_metadata.target is 'the SWHID or origin URL for which the metadata was found';
+comment on column raw_extrinsic_metadata.discovery_date is 'the date of retrieval';
+comment on column raw_extrinsic_metadata.authority_id is 'the metadata provider: github, openhub, deposit, etc.';
+comment on column raw_extrinsic_metadata.fetcher_id is 'the tool used for extracting metadata: loaders, crawlers, etc.';
+comment on column raw_extrinsic_metadata.format is 'name of the format of metadata, used by readers to interpret it.';
+comment on column raw_extrinsic_metadata.metadata is 'original metadata in opaque format';
+
+
+-- Keep a cache of object counts
+create table object_counts
+(
+  object_type text,             -- table for which we're counting objects (PK)
+  value bigint,                 -- count of objects in the table
+  last_update timestamptz,      -- last update for the object count in this table
+  single_update boolean         -- whether we update this table standalone (true) or through bucketed counts (false)
+);
+
+comment on table object_counts is 'Cache of object counts';
+comment on column object_counts.object_type is 'Object type (''content'', ''directory'', ''revision'',
+ ''release'', ''snapshot'')';
+comment on column object_counts.value is 'Count of objects in the table';
+comment on column object_counts.last_update is 'Last update for object count';
+comment on column object_counts.single_update is 'standalone (true) or bucketed counts (false)';
+
+
+create table object_counts_bucketed
+(
+    line serial not null,       -- PK
+    object_type text not null,  -- table for which we're counting objects
+    identifier text not null,   -- identifier across which we're bucketing objects
+    bucket_start bytea,         -- lower bound (inclusive) for the bucket
+    bucket_end bytea,           -- upper bound (exclusive) for the bucket
+    value bigint,               -- count of objects in the bucket
+    last_update timestamptz     -- last update for the object count in this bucket
+);
+
+comment on table object_counts_bucketed is 'Bucketed count for objects ordered by type';
+comment on column object_counts_bucketed.line is 'Auto incremented idenitfier value';
+comment on column object_counts_bucketed.object_type is 'Object type (''content'', ''directory'', ''revision'',
+ ''release'', ''snapshot'')';
+comment on column object_counts_bucketed.identifier is 'Common identifier for bucketed objects';
+comment on column object_counts_bucketed.bucket_start is 'Lower bound (inclusive) for the bucket';
+comment on column object_counts_bucketed.bucket_end is 'Upper bound (exclusive) for the bucket';
+comment on column object_counts_bucketed.value is 'Count of objects in the bucket';
+comment on column object_counts_bucketed.last_update is 'Last update for the object count in this bucket';
diff --git a/swh/storage/tests/data/sql-v0.18.0/40-funcs.sql b/swh/storage/tests/data/sql-v0.18.0/40-funcs.sql
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/data/sql-v0.18.0/40-funcs.sql
@@ -0,0 +1,960 @@
+create or replace function hash_sha1(text)
+       returns text
+as $$
+   select encode(digest($1, 'sha1'), 'hex')
+$$ language sql strict immutable;
+
+comment on function hash_sha1(text) is 'Compute SHA1 hash as text';
+
+-- create a temporary table called tmp_TBLNAME, mimicking existing table
+-- TBLNAME
+--
+-- Args:
+--     tblname: name of the table to mimic
+create or replace function swh_mktemp(tblname regclass)
+    returns void
+    language plpgsql
+as $$
+begin
+    execute format('
+	create temporary table if not exists tmp_%1$I
+	    (like %1$I including defaults)
+	    on commit delete rows;
+      alter table tmp_%1$I drop column if exists object_id;
+	', tblname);
+    return;
+end
+$$;
+
+-- create a temporary table for directory entries called tmp_TBLNAME,
+-- mimicking existing table TBLNAME with an extra dir_id (sha1_git)
+-- column, and dropping the id column.
+--
+-- This is used to create the tmp_directory_entry_<foo> tables.
+--
+-- Args:
+--     tblname: name of the table to mimic
+create or replace function swh_mktemp_dir_entry(tblname regclass)
+    returns void
+    language plpgsql
+as $$
+begin
+    execute format('
+	create temporary table if not exists tmp_%1$I
+	    (like %1$I including defaults, dir_id sha1_git)
+	    on commit delete rows;
+        alter table tmp_%1$I drop column if exists id;
+	', tblname);
+    return;
+end
+$$;
+
+-- create a temporary table for revisions called tmp_revisions,
+-- mimicking existing table revision, replacing the foreign keys to
+-- people with an email and name field
+--
+create or replace function swh_mktemp_revision()
+    returns void
+    language sql
+as $$
+    create temporary table if not exists tmp_revision (
+        like revision including defaults,
+        author_fullname bytea,
+        author_name bytea,
+        author_email bytea,
+        committer_fullname bytea,
+        committer_name bytea,
+        committer_email bytea
+    ) on commit delete rows;
+    alter table tmp_revision drop column if exists author;
+    alter table tmp_revision drop column if exists committer;
+    alter table tmp_revision drop column if exists object_id;
+$$;
+
+-- create a temporary table for releases called tmp_release,
+-- mimicking existing table release, replacing the foreign keys to
+-- people with an email and name field
+--
+create or replace function swh_mktemp_release()
+    returns void
+    language sql
+as $$
+    create temporary table if not exists tmp_release (
+        like release including defaults,
+        author_fullname bytea,
+        author_name bytea,
+        author_email bytea
+    ) on commit delete rows;
+    alter table tmp_release drop column if exists author;
+    alter table tmp_release drop column if exists object_id;
+$$;
+
+-- create a temporary table for the branches of a snapshot
+create or replace function swh_mktemp_snapshot_branch()
+    returns void
+    language sql
+as $$
+  create temporary table if not exists tmp_snapshot_branch (
+      name bytea not null,
+      target bytea,
+      target_type snapshot_target
+  ) on commit delete rows;
+$$;
+
+-- a content signature is a set of cryptographic checksums that we use to
+-- uniquely identify content, for the purpose of verifying if we already have
+-- some content or not during content injection
+create type content_signature as (
+    sha1       sha1,
+    sha1_git   sha1_git,
+    sha256     sha256,
+    blake2s256 blake2s256
+);
+
+
+-- check which entries of tmp_skipped_content are missing from skipped_content
+--
+-- operates in bulk: 0. swh_mktemp(skipped_content), 1. COPY to tmp_skipped_content,
+-- 2. call this function
+create or replace function swh_skipped_content_missing()
+    returns setof content_signature
+    language plpgsql
+as $$
+begin
+    return query
+	select sha1, sha1_git, sha256, blake2s256 from tmp_skipped_content t
+	where not exists
+	(select 1 from skipped_content s where
+	    s.sha1 is not distinct from t.sha1 and
+	    s.sha1_git is not distinct from t.sha1_git and
+	    s.sha256 is not distinct from t.sha256);
+    return;
+end
+$$;
+
+
+-- add tmp_content entries to content, skipping duplicates
+--
+-- operates in bulk: 0. swh_mktemp(content), 1. COPY to tmp_content,
+-- 2. call this function
+create or replace function swh_content_add()
+    returns void
+    language plpgsql
+as $$
+begin
+    insert into content (sha1, sha1_git, sha256, blake2s256, length, status, ctime)
+        select distinct sha1, sha1_git, sha256, blake2s256, length, status, ctime from tmp_content;
+    return;
+end
+$$;
+
+
+-- add tmp_skipped_content entries to skipped_content, skipping duplicates
+--
+-- operates in bulk: 0. swh_mktemp(skipped_content), 1. COPY to tmp_skipped_content,
+-- 2. call this function
+create or replace function swh_skipped_content_add()
+    returns void
+    language plpgsql
+as $$
+begin
+    insert into skipped_content (sha1, sha1_git, sha256, blake2s256, length, status, reason, origin)
+        select distinct sha1, sha1_git, sha256, blake2s256, length, status, reason, origin
+	from tmp_skipped_content
+	where (coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, '')) in (
+            select coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, '')
+            from swh_skipped_content_missing()
+        );
+        -- TODO XXX use postgres 9.5 "UPSERT" support here, when available.
+        -- Specifically, using "INSERT .. ON CONFLICT IGNORE" we can avoid
+        -- the extra swh_skipped_content_missing() query here.
+    return;
+end
+$$;
+
+-- Update content entries from temporary table.
+-- (columns are potential new columns added to the schema, this cannot be empty)
+--
+create or replace function swh_content_update(columns_update text[])
+    returns void
+    language plpgsql
+as $$
+declare
+   query text;
+   tmp_array text[];
+begin
+    if array_length(columns_update, 1) = 0 then
+        raise exception 'Please, provide the list of column names to update.';
+    end if;
+
+    tmp_array := array(select format('%1$s=t.%1$s', unnest) from unnest(columns_update));
+
+    query = format('update content set %s
+                    from tmp_content t where t.sha1 = content.sha1',
+                    array_to_string(tmp_array, ', '));
+
+    execute query;
+
+    return;
+end
+$$;
+
+comment on function swh_content_update(text[]) IS 'Update existing content''s columns';
+
+
+create type directory_entry_type as enum('file', 'dir', 'rev');
+
+
+-- Add tmp_directory_entry_* entries to directory_entry_* and directory,
+-- skipping duplicates in directory_entry_*.  This is a generic function that
+-- works on all kind of directory entries.
+--
+-- operates in bulk: 0. swh_mktemp_dir_entry('directory_entry_*'), 1 COPY to
+-- tmp_directory_entry_*, 2. call this function
+--
+-- Assumption: this function is used in the same transaction that inserts the
+-- context directory in table "directory".
+create or replace function swh_directory_entry_add(typ directory_entry_type)
+    returns void
+    language plpgsql
+as $$
+begin
+    execute format('
+    insert into directory_entry_%1$s (target, name, perms)
+    select distinct t.target, t.name, t.perms
+    from tmp_directory_entry_%1$s t
+    where not exists (
+    select 1
+    from directory_entry_%1$s i
+    where t.target = i.target and t.name = i.name and t.perms = i.perms)
+   ', typ);
+
+    execute format('
+    with new_entries as (
+	select t.dir_id, array_agg(i.id) as entries
+	from tmp_directory_entry_%1$s t
+	inner join directory_entry_%1$s i
+	using (target, name, perms)
+	group by t.dir_id
+    )
+    update tmp_directory as d
+    set %1$s_entries = new_entries.entries
+    from new_entries
+    where d.id = new_entries.dir_id
+    ', typ);
+
+    return;
+end
+$$;
+
+-- Insert the data from tmp_directory, tmp_directory_entry_file,
+-- tmp_directory_entry_dir, tmp_directory_entry_rev into their final
+-- tables.
+--
+-- Prerequisites:
+--  directory ids in tmp_directory
+--  entries in tmp_directory_entry_{file,dir,rev}
+--
+create or replace function swh_directory_add()
+    returns void
+    language plpgsql
+as $$
+begin
+    perform swh_directory_entry_add('file');
+    perform swh_directory_entry_add('dir');
+    perform swh_directory_entry_add('rev');
+
+    insert into directory
+    select * from tmp_directory t
+    where not exists (
+        select 1 from directory d
+	where d.id = t.id);
+
+    return;
+end
+$$;
+
+-- a directory listing entry with all the metadata
+--
+-- can be used to list a directory, and retrieve all the data in one go.
+create type directory_entry as
+(
+  dir_id   sha1_git,     -- id of the parent directory
+  type     directory_entry_type,  -- type of entry
+  target   sha1_git,     -- id of target
+  name     unix_path,    -- path name, relative to containing dir
+  perms    file_perms,   -- unix-like permissions
+  status   content_status,  -- visible or absent
+  sha1     sha1,            -- content if sha1 if type is not dir
+  sha1_git sha1_git,        -- content's sha1 git if type is not dir
+  sha256   sha256,          -- content's sha256 if type is not dir
+  length   bigint           -- content length if type is not dir
+);
+
+
+-- List a single level of directory walked_dir_id
+-- FIXME: order by name is not correct. For git, we need to order by
+-- lexicographic order but as if a trailing / is present in directory
+-- name
+create or replace function swh_directory_walk_one(walked_dir_id sha1_git)
+    returns setof directory_entry
+    language sql
+    stable
+as $$
+    with dir as (
+	select id as dir_id, dir_entries, file_entries, rev_entries
+	from directory
+	where id = walked_dir_id),
+    ls_d as (select dir_id, unnest(dir_entries) as entry_id from dir),
+    ls_f as (select dir_id, unnest(file_entries) as entry_id from dir),
+    ls_r as (select dir_id, unnest(rev_entries) as entry_id from dir)
+    (select dir_id, 'dir'::directory_entry_type as type,
+            e.target, e.name, e.perms, NULL::content_status,
+            NULL::sha1, NULL::sha1_git, NULL::sha256, NULL::bigint
+     from ls_d
+     left join directory_entry_dir e on ls_d.entry_id = e.id)
+    union
+    (with known_contents as
+	(select dir_id, 'file'::directory_entry_type as type,
+            e.target, e.name, e.perms, c.status,
+            c.sha1, c.sha1_git, c.sha256, c.length
+         from ls_f
+         left join directory_entry_file e on ls_f.entry_id = e.id
+         inner join content c on e.target = c.sha1_git)
+        select * from known_contents
+	union
+	(select dir_id, 'file'::directory_entry_type as type,
+            e.target, e.name, e.perms, c.status,
+            c.sha1, c.sha1_git, c.sha256, c.length
+         from ls_f
+         left join directory_entry_file e on ls_f.entry_id = e.id
+         left join skipped_content c on e.target = c.sha1_git
+         where not exists (select 1 from known_contents where known_contents.sha1_git=e.target)))
+    union
+    (select dir_id, 'rev'::directory_entry_type as type,
+            e.target, e.name, e.perms, NULL::content_status,
+            NULL::sha1, NULL::sha1_git, NULL::sha256, NULL::bigint
+     from ls_r
+     left join directory_entry_rev e on ls_r.entry_id = e.id)
+    order by name;
+$$;
+
+-- List recursively the revision directory arborescence
+create or replace function swh_directory_walk(walked_dir_id sha1_git)
+    returns setof directory_entry
+    language sql
+    stable
+as $$
+    with recursive entries as (
+        select dir_id, type, target, name, perms, status, sha1, sha1_git,
+               sha256, length
+        from swh_directory_walk_one(walked_dir_id)
+        union all
+        select dir_id, type, target, (dirname || '/' || name)::unix_path as name,
+               perms, status, sha1, sha1_git, sha256, length
+        from (select (swh_directory_walk_one(dirs.target)).*, dirs.name as dirname
+              from (select target, name from entries where type = 'dir') as dirs) as with_parent
+    )
+    select dir_id, type, target, name, perms, status, sha1, sha1_git, sha256, length
+    from entries
+$$;
+
+-- Find a directory entry by its path
+create or replace function swh_find_directory_entry_by_path(
+    walked_dir_id sha1_git,
+    dir_or_content_path bytea[])
+    returns directory_entry
+    language plpgsql
+as $$
+declare
+    end_index integer;
+    paths bytea default '';
+    path bytea;
+    res bytea[];
+    r record;
+begin
+    end_index := array_upper(dir_or_content_path, 1);
+    res[1] := walked_dir_id;
+
+    for i in 1..end_index
+    loop
+        path := dir_or_content_path[i];
+        -- concatenate path for patching the name in the result record (if we found it)
+        if i = 1 then
+            paths = path;
+        else
+            paths := paths || '/' || path;  -- concatenate paths
+        end if;
+
+        if i <> end_index then
+            select *
+            from swh_directory_walk_one(res[i] :: sha1_git)
+            where name=path
+            and type = 'dir'
+            limit 1 into r;
+        else
+            select *
+            from swh_directory_walk_one(res[i] :: sha1_git)
+            where name=path
+            limit 1 into r;
+        end if;
+
+        -- find the path
+        if r is null then
+           return null;
+        else
+            -- store the next dir to lookup the next local path from
+            res[i+1] := r.target;
+        end if;
+    end loop;
+
+    -- at this moment, r is the result. Patch its 'name' with the full path before returning it.
+    r.name := paths;
+    return r;
+end
+$$;
+
+-- List all revision IDs starting from a given revision, going back in time
+--
+-- TODO ordering: should be breadth-first right now (what do we want?)
+-- TODO ordering: ORDER BY parent_rank somewhere?
+create or replace function swh_revision_list(root_revisions bytea[], num_revs bigint default NULL)
+    returns table (id sha1_git, parents bytea[])
+    language sql
+    stable
+as $$
+    with recursive full_rev_list(id) as (
+        (select id from revision where id = ANY(root_revisions))
+        union
+        (select h.parent_id
+         from revision_history as h
+         join full_rev_list on h.id = full_rev_list.id)
+    ),
+    rev_list as (select id from full_rev_list limit num_revs)
+    select rev_list.id as id,
+           array(select rh.parent_id::bytea
+                 from revision_history rh
+                 where rh.id = rev_list.id
+                 order by rh.parent_rank
+                ) as parent
+    from rev_list;
+$$;
+
+
+-- Detailed entry for a revision
+create type revision_entry as
+(
+  id                             sha1_git,
+  date                           timestamptz,
+  date_offset                    smallint,
+  date_neg_utc_offset            boolean,
+  committer_date                 timestamptz,
+  committer_date_offset          smallint,
+  committer_date_neg_utc_offset  boolean,
+  type                           revision_type,
+  directory                      sha1_git,
+  message                        bytea,
+  author_id                      bigint,
+  author_fullname                bytea,
+  author_name                    bytea,
+  author_email                   bytea,
+  committer_id                   bigint,
+  committer_fullname             bytea,
+  committer_name                 bytea,
+  committer_email                bytea,
+  metadata                       jsonb,
+  synthetic                      boolean,
+  parents                        bytea[],
+  object_id                      bigint,
+  extra_headers                  bytea[][]
+);
+
+
+-- "git style" revision log. Similar to swh_revision_list(), but returning all
+-- information associated to each revision, and expanding authors/committers
+create or replace function swh_revision_log(root_revisions bytea[], num_revs bigint default NULL)
+    returns setof revision_entry
+    language sql
+    stable
+as $$
+    select t.id, r.date, r.date_offset, r.date_neg_utc_offset,
+           r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset,
+           r.type, r.directory, r.message,
+           a.id, a.fullname, a.name, a.email,
+           c.id, c.fullname, c.name, c.email,
+           r.metadata, r.synthetic, t.parents, r.object_id, r.extra_headers
+    from swh_revision_list(root_revisions, num_revs) as t
+    left join revision r on t.id = r.id
+    left join person a on a.id = r.author
+    left join person c on c.id = r.committer;
+$$;
+
+
+-- Detailed entry for a release
+create type release_entry as
+(
+  id                   sha1_git,
+  target               sha1_git,
+  target_type          object_type,
+  date                 timestamptz,
+  date_offset          smallint,
+  date_neg_utc_offset  boolean,
+  name                 bytea,
+  comment              bytea,
+  synthetic            boolean,
+  author_id            bigint,
+  author_fullname      bytea,
+  author_name          bytea,
+  author_email         bytea,
+  object_id            bigint
+);
+
+-- Create entries in person from tmp_revision
+create or replace function swh_person_add_from_revision()
+    returns void
+    language plpgsql
+as $$
+begin
+    with t as (
+        select author_fullname as fullname, author_name as name, author_email as email from tmp_revision
+    union
+        select committer_fullname as fullname, committer_name as name, committer_email as email from tmp_revision
+    ) insert into person (fullname, name, email)
+    select distinct on (fullname) fullname, name, email from t
+    where not exists (
+        select 1
+        from person p
+        where t.fullname = p.fullname
+    );
+    return;
+end
+$$;
+
+
+-- Create entries in revision from tmp_revision
+create or replace function swh_revision_add()
+    returns void
+    language plpgsql
+as $$
+begin
+    perform swh_person_add_from_revision();
+
+    insert into revision (id, date, date_offset, date_neg_utc_offset, committer_date, committer_date_offset, committer_date_neg_utc_offset, type, directory, message, author, committer, metadata, synthetic, extra_headers)
+    select t.id, t.date, t.date_offset, t.date_neg_utc_offset, t.committer_date, t.committer_date_offset, t.committer_date_neg_utc_offset, t.type, t.directory, t.message, a.id, c.id, t.metadata, t.synthetic, t.extra_headers
+    from tmp_revision t
+    left join person a on a.fullname = t.author_fullname
+    left join person c on c.fullname = t.committer_fullname;
+    return;
+end
+$$;
+
+
+-- Create entries in person from tmp_release
+create or replace function swh_person_add_from_release()
+    returns void
+    language plpgsql
+as $$
+begin
+    with t as (
+        select distinct author_fullname as fullname, author_name as name, author_email as email from tmp_release
+        where author_fullname is not null
+    ) insert into person (fullname, name, email)
+    select distinct on (fullname) fullname, name, email from t
+    where not exists (
+        select 1
+        from person p
+        where t.fullname = p.fullname
+    );
+    return;
+end
+$$;
+
+
+-- Create entries in release from tmp_release
+create or replace function swh_release_add()
+    returns void
+    language plpgsql
+as $$
+begin
+    perform swh_person_add_from_release();
+
+    insert into release (id, target, target_type, date, date_offset, date_neg_utc_offset, name, comment, author, synthetic)
+      select distinct t.id, t.target, t.target_type, t.date, t.date_offset, t.date_neg_utc_offset, t.name, t.comment, a.id, t.synthetic
+        from tmp_release t
+        left join person a on a.fullname = t.author_fullname
+        where not exists (select 1 from release where t.id = release.id);
+    return;
+end
+$$;
+
+
+-- add a new origin_visit for origin origin_id at date.
+--
+-- Returns the new visit id.
+create or replace function swh_origin_visit_add(origin_url text, date timestamptz, type text)
+    returns bigint
+    language sql
+as $$
+  with origin_id as (
+    select id
+    from origin
+    where url = origin_url
+  ), last_known_visit as (
+    select coalesce(max(visit), 0) as visit
+    from origin_visit
+    where origin = (select id from origin_id)
+  )
+  insert into origin_visit (origin, date, type, visit)
+  values ((select id from origin_id), date, type,
+          (select visit from last_known_visit) + 1)
+  returning visit;
+$$;
+
+create or replace function swh_snapshot_add(snapshot_id sha1_git)
+  returns void
+  language plpgsql
+as $$
+declare
+  snapshot_object_id snapshot.object_id%type;
+begin
+  select object_id from snapshot where id = snapshot_id into snapshot_object_id;
+  if snapshot_object_id is null then
+     insert into snapshot (id) values (snapshot_id) returning object_id into snapshot_object_id;
+     insert into snapshot_branch (name, target_type, target)
+       select name, target_type, target from tmp_snapshot_branch tmp
+       where not exists (
+         select 1
+         from snapshot_branch sb
+         where sb.name = tmp.name
+           and sb.target = tmp.target
+           and sb.target_type = tmp.target_type
+       )
+       on conflict do nothing;
+     insert into snapshot_branches (snapshot_id, branch_id)
+     select snapshot_object_id, sb.object_id as branch_id
+       from tmp_snapshot_branch tmp
+       join snapshot_branch sb
+       using (name, target, target_type)
+       where tmp.target is not null and tmp.target_type is not null
+     union
+     select snapshot_object_id, sb.object_id as branch_id
+       from tmp_snapshot_branch tmp
+       join snapshot_branch sb
+       using (name)
+       where tmp.target is null and tmp.target_type is null
+         and sb.target is null and sb.target_type is null;
+  end if;
+  truncate table tmp_snapshot_branch;
+end;
+$$;
+
+create type snapshot_result as (
+  snapshot_id  sha1_git,
+  name         bytea,
+  target       bytea,
+  target_type  snapshot_target
+);
+
+create or replace function swh_snapshot_get_by_id(id sha1_git,
+    branches_from bytea default '', branches_count bigint default null,
+    target_types snapshot_target[] default NULL)
+  returns setof snapshot_result
+  language sql
+  stable
+as $$
+  -- with small limits, the "naive" version of this query can degenerate into
+  -- using the deduplication index on snapshot_branch (name, target,
+  -- target_type); The planner happily scans several hundred million rows.
+
+  -- Do the query in two steps: first pull the relevant branches for the given
+  -- snapshot (filtering them by type), then do the limiting. This two-step
+  -- process guides the planner into using the proper index.
+  with filtered_snapshot_branches as (
+    select swh_snapshot_get_by_id.id as snapshot_id, name, target, target_type
+      from snapshot_branches
+      inner join snapshot_branch on snapshot_branches.branch_id = snapshot_branch.object_id
+      where snapshot_id = (select object_id from snapshot where snapshot.id = swh_snapshot_get_by_id.id)
+        and (target_types is null or target_type = any(target_types))
+      order by name
+  )
+  select snapshot_id, name, target, target_type
+    from filtered_snapshot_branches
+    where name >= branches_from
+    order by name limit branches_count;
+$$;
+
+create type snapshot_size as (
+  target_type snapshot_target,
+  count bigint
+);
+
+create or replace function swh_snapshot_count_branches(id sha1_git)
+  returns setof snapshot_size
+  language sql
+  stable
+as $$
+  SELECT target_type, count(name)
+  from swh_snapshot_get_by_id(swh_snapshot_count_branches.id)
+  group by target_type;
+$$;
+
+-- Absolute path: directory reference + complete path relative to it
+create type content_dir as (
+    directory  sha1_git,
+    path       unix_path
+);
+
+
+-- Find the containing directory of a given content, specified by sha1
+-- (note: *not* sha1_git).
+--
+-- Return a pair (dir_it, path) where path is a UNIX path that, from the
+-- directory root, reach down to a file with the desired content. Return NULL
+-- if no match is found.
+--
+-- In case of multiple paths (i.e., pretty much always), an arbitrary one is
+-- chosen.
+create or replace function swh_content_find_directory(content_id sha1)
+    returns content_dir
+    language sql
+    stable
+as $$
+    with recursive path as (
+	-- Recursively build a path from the requested content to a root
+	-- directory. Each iteration returns a pair (dir_id, filename) where
+	-- filename is relative to dir_id. Stops when no parent directory can
+	-- be found.
+	(select dir.id as dir_id, dir_entry_f.name as name, 0 as depth
+	 from directory_entry_file as dir_entry_f
+	 join content on content.sha1_git = dir_entry_f.target
+	 join directory as dir on dir.file_entries @> array[dir_entry_f.id]
+	 where content.sha1 = content_id
+	 limit 1)
+	union all
+	(select dir.id as dir_id,
+		(dir_entry_d.name || '/' || path.name)::unix_path as name,
+		path.depth + 1
+	 from path
+	 join directory_entry_dir as dir_entry_d on dir_entry_d.target = path.dir_id
+	 join directory as dir on dir.dir_entries @> array[dir_entry_d.id]
+	 limit 1)
+    )
+    select dir_id, name from path order by depth desc limit 1;
+$$;
+
+-- Find the visit of origin closest to date visit_date
+-- Breaks ties by selecting the largest visit id
+create or replace function swh_visit_find_by_date(origin_url text, visit_date timestamptz default NOW())
+    returns setof origin_visit
+    language plpgsql
+    stable
+as $$
+declare
+  origin_id bigint;
+begin
+  select id into origin_id from origin where url=origin_url;
+  return query
+  with closest_two_visits as ((
+    select ov, (date - visit_date), visit as interval
+    from origin_visit ov
+    where ov.origin = origin_id
+          and ov.date >= visit_date
+    order by ov.date asc, ov.visit desc
+    limit 1
+  ) union (
+    select ov, (visit_date - date), visit as interval
+    from origin_visit ov
+    where ov.origin = origin_id
+          and ov.date < visit_date
+    order by ov.date desc, ov.visit desc
+    limit 1
+  )) select (ov).* from closest_two_visits order by interval, visit limit 1;
+end
+$$;
+
+-- Object listing by object_id
+
+create or replace function swh_content_list_by_object_id(
+    min_excl bigint,
+    max_incl bigint
+)
+    returns setof content
+    language sql
+    stable
+as $$
+    select * from content
+    where object_id > min_excl and object_id <= max_incl
+    order by object_id;
+$$;
+
+create or replace function swh_revision_list_by_object_id(
+    min_excl bigint,
+    max_incl bigint
+)
+    returns setof revision_entry
+    language sql
+    stable
+as $$
+    with revs as (
+        select * from revision
+        where object_id > min_excl and object_id <= max_incl
+    )
+    select r.id, r.date, r.date_offset, r.date_neg_utc_offset,
+           r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset,
+           r.type, r.directory, r.message,
+           a.id, a.fullname, a.name, a.email, c.id, c.fullname, c.name, c.email, r.metadata, r.synthetic,
+           array(select rh.parent_id::bytea from revision_history rh where rh.id = r.id order by rh.parent_rank)
+               as parents, r.object_id, r.extra_headers
+    from revs r
+    left join person a on a.id = r.author
+    left join person c on c.id = r.committer
+    order by r.object_id;
+$$;
+
+create or replace function swh_release_list_by_object_id(
+    min_excl bigint,
+    max_incl bigint
+)
+    returns setof release_entry
+    language sql
+    stable
+as $$
+    with rels as (
+        select * from release
+        where object_id > min_excl and object_id <= max_incl
+    )
+    select r.id, r.target, r.target_type, r.date, r.date_offset, r.date_neg_utc_offset, r.name, r.comment,
+           r.synthetic, p.id as author_id, p.fullname as author_fullname, p.name as author_name, p.email as author_email, r.object_id
+    from rels r
+    left join person p on p.id = r.author
+    order by r.object_id;
+$$;
+
+
+-- simple counter mapping a textual label to an integer value
+create type counter as (
+    label  text,
+    value  bigint
+);
+
+-- return statistics about the number of tuples in various SWH tables
+--
+-- Note: the returned values are based on postgres internal statistics
+-- (pg_class table), which are only updated daily (by autovacuum) or so
+create or replace function swh_stat_counters()
+    returns setof counter
+    language sql
+    stable
+as $$
+    select object_type as label, value as value
+    from object_counts
+    where object_type in (
+        'content',
+        'directory',
+        'directory_entry_dir',
+        'directory_entry_file',
+        'directory_entry_rev',
+        'origin',
+        'origin_visit',
+        'person',
+        'release',
+        'revision',
+        'revision_history',
+        'skipped_content',
+        'snapshot'
+    );
+$$;
+
+create or replace function swh_update_counter(object_type text)
+    returns void
+    language plpgsql
+as $$
+begin
+    execute format('
+	insert into object_counts
+    (value, last_update, object_type)
+  values
+    ((select count(*) from %1$I), NOW(), %1$L)
+  on conflict (object_type) do update set
+    value = excluded.value,
+    last_update = excluded.last_update',
+  object_type);
+    return;
+end;
+$$;
+
+create or replace function swh_update_counter_bucketed()
+    returns void
+    language plpgsql
+as $$
+declare
+  query text;
+  line_to_update int;
+  new_value bigint;
+begin
+  select
+    object_counts_bucketed.line,
+    format(
+      'select count(%I) from %I where %s',
+      coalesce(identifier, '*'),
+      object_type,
+      coalesce(
+        concat_ws(
+          ' and ',
+          case when bucket_start is not null then
+            format('%I >= %L', identifier, bucket_start) -- lower bound condition, inclusive
+          end,
+          case when bucket_end is not null then
+            format('%I < %L', identifier, bucket_end) -- upper bound condition, exclusive
+          end
+        ),
+        'true'
+      )
+    )
+    from object_counts_bucketed
+    order by coalesce(last_update, now() - '1 month'::interval) asc
+    limit 1
+    into line_to_update, query;
+
+  execute query into new_value;
+
+  update object_counts_bucketed
+    set value = new_value,
+        last_update = now()
+    where object_counts_bucketed.line = line_to_update;
+
+END
+$$;
+
+create or replace function swh_update_counters_from_buckets()
+  returns trigger
+  language plpgsql
+as $$
+begin
+with to_update as (
+  select object_type, sum(value) as value, max(last_update) as last_update
+  from object_counts_bucketed ob1
+  where not exists (
+    select 1 from object_counts_bucketed ob2
+    where ob1.object_type = ob2.object_type
+    and value is null
+    )
+  group by object_type
+) update object_counts
+  set
+    value = to_update.value,
+    last_update = to_update.last_update
+  from to_update
+  where
+    object_counts.object_type = to_update.object_type
+    and object_counts.value != to_update.value;
+return null;
+end
+$$;
+
+create trigger update_counts_from_bucketed
+  after insert or update
+  on object_counts_bucketed
+  for each row
+  when (NEW.line % 256 = 0)
+  execute procedure swh_update_counters_from_buckets();
diff --git a/swh/storage/tests/data/sql-v0.18.0/60-indexes.sql b/swh/storage/tests/data/sql-v0.18.0/60-indexes.sql
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/data/sql-v0.18.0/60-indexes.sql
@@ -0,0 +1,283 @@
+-- psql variables to get the current database flavor
+
+select swh_get_dbflavor() = 'read_replica' as dbflavor_read_replica \gset
+select swh_get_dbflavor() != 'read_replica' as dbflavor_does_deduplication \gset
+select swh_get_dbflavor() = 'mirror' as dbflavor_mirror \gset
+select swh_get_dbflavor() = 'default' as dbflavor_default \gset
+
+-- content
+
+create unique index concurrently content_pkey on content(sha1);
+alter table content add primary key using index content_pkey;
+
+\if :dbflavor_does_deduplication
+  create unique index concurrently on content(sha1_git);
+\else
+  create index concurrently on content(sha1_git);
+\endif
+
+create index concurrently on content(sha256);
+create index concurrently on content(blake2s256);
+
+\if :dbflavor_default
+  create unique index concurrently on content(object_id); -- to be reviewed
+  create index concurrently on content(ctime);            -- to be reviewed
+\endif
+
+-- origin
+
+create unique index concurrently origin_pkey on origin(id);
+alter table origin add primary key using index origin_pkey;
+
+\if :dbflavor_does_deduplication
+  create unique index concurrently on origin using btree(url);
+\else
+  create index concurrently on origin using btree(url);
+\endif
+
+create index concurrently on origin using gin (url gin_trgm_ops);
+create index concurrently on origin using btree(digest(url, 'sha1'));
+
+
+-- skipped_content
+
+\if :dbflavor_does_deduplication
+  alter table skipped_content add constraint skipped_content_sha1_sha1_git_sha256_key unique (sha1, sha1_git, sha256);
+\endif
+
+create index concurrently on skipped_content(sha1);
+create index concurrently on skipped_content(sha1_git);
+create index concurrently on skipped_content(sha256);
+create index concurrently on skipped_content(blake2s256);
+create unique index concurrently on skipped_content(object_id);
+
+\if :dbflavor_default
+  alter table skipped_content add constraint skipped_content_origin_fkey foreign key (origin) references origin(id) not valid;
+  alter table skipped_content validate constraint skipped_content_origin_fkey;
+\endif
+
+-- directory
+create unique index concurrently directory_pkey on directory(id);
+alter table directory add primary key using index directory_pkey;
+
+\if :dbflavor_default
+  create index concurrently on directory using gin (dir_entries);   -- to be reviewed
+  create index concurrently on directory using gin (file_entries);  -- to be reviewed
+  create index concurrently on directory using gin (rev_entries);   -- to be reviewed
+  create unique index concurrently on directory(object_id);         -- to be reviewed
+\endif
+
+-- directory_entry_dir
+
+create unique index concurrently directory_entry_dir_pkey on directory_entry_dir(id);
+alter table directory_entry_dir add primary key using index directory_entry_dir_pkey;
+
+\if :dbflavor_does_deduplication
+  create unique index concurrently on directory_entry_dir(target, name, perms);
+\endif
+
+-- directory_entry_file
+
+create unique index concurrently directory_entry_file_pkey on directory_entry_file(id);
+alter table directory_entry_file add primary key using index directory_entry_file_pkey;
+
+\if :dbflavor_does_deduplication
+  create unique index concurrently on directory_entry_file(target, name, perms);
+\endif
+
+-- directory_entry_rev
+
+create unique index concurrently directory_entry_rev_pkey on directory_entry_rev(id);
+alter table directory_entry_rev add primary key using index directory_entry_rev_pkey;
+
+\if :dbflavor_does_deduplication
+  create unique index concurrently on directory_entry_rev(target, name, perms);
+\endif
+
+
+-- person
+create unique index concurrently person_pkey on person(id);
+alter table person add primary key using index person_pkey;
+
+\if :dbflavor_does_deduplication
+  create unique index concurrently on person(fullname);
+\else
+  create index concurrently on person(fullname);  -- to be reviewed
+\endif
+
+\if :dbflavor_default
+  create index concurrently on person(name);   -- to be reviewed
+  create index concurrently on person(email);  -- to be reviewed
+\endif
+
+-- revision
+create unique index concurrently revision_pkey on revision(id);
+alter table revision add primary key using index revision_pkey;
+
+\if :dbflavor_does_deduplication
+  alter table revision add constraint revision_author_fkey foreign key (author) references person(id) not valid;
+  alter table revision validate constraint revision_author_fkey;
+  alter table revision add constraint revision_committer_fkey foreign key (committer) references person(id) not valid;
+  alter table revision validate constraint revision_committer_fkey;
+
+  alter table revision
+    add constraint revision_date_neg_utc_offset_not_null
+    check (date is null or date_neg_utc_offset is not null)
+    not valid;
+  alter table revision
+    add constraint revision_committer_date_neg_utc_offset_not_null
+    check (committer_date is null or committer_date_neg_utc_offset is not null)
+    not valid;
+
+  alter table revision
+    validate constraint revision_date_neg_utc_offset_not_null;
+  alter table revision
+    validate constraint revision_committer_date_neg_utc_offset_not_null;
+\endif
+
+\if :dbflavor_default
+  create index concurrently on revision(directory);          -- to be reviewed
+  create unique index concurrently on revision(object_id);   -- to be reviewed
+\endif
+
+-- revision_history
+create unique index concurrently revision_history_pkey on revision_history(id, parent_rank);
+alter table revision_history add primary key using index revision_history_pkey;
+
+\if :dbflavor_default
+  create index concurrently on revision_history(parent_id);  -- to be reviewed
+\endif
+
+\if :dbflavor_does_deduplication
+  alter table revision_history add constraint revision_history_id_fkey foreign key (id) references revision(id) not valid;
+  alter table revision_history validate constraint revision_history_id_fkey;
+\endif
+
+-- snapshot
+create unique index concurrently snapshot_pkey on snapshot(object_id);
+alter table snapshot add primary key using index snapshot_pkey;
+
+\if :dbflavor_does_deduplication
+  create unique index concurrently on snapshot(id);
+\else
+  create index concurrently on snapshot(id);
+\endif
+
+-- snapshot_branch
+create unique index concurrently snapshot_branch_pkey on snapshot_branch(object_id);
+alter table snapshot_branch add primary key using index snapshot_branch_pkey;
+
+\if :dbflavor_does_deduplication
+  create unique index concurrently on snapshot_branch (target_type, target, name);
+  alter table snapshot_branch add constraint snapshot_branch_target_check check ((target_type is null) = (target is null)) not valid;
+  alter table snapshot_branch validate constraint snapshot_branch_target_check;
+  alter table snapshot_branch add constraint snapshot_target_check check (target_type not in ('content', 'directory', 'revision', 'release', 'snapshot') or length(target) = 20) not valid;
+  alter table snapshot_branch validate constraint snapshot_target_check;
+
+  create unique index concurrently on snapshot_branch (name) where target_type is null and target is null;
+\endif
+
+-- snapshot_branches
+create unique index concurrently snapshot_branches_pkey on snapshot_branches(snapshot_id, branch_id);
+alter table snapshot_branches add primary key using index snapshot_branches_pkey;
+
+\if :dbflavor_does_deduplication
+  alter table snapshot_branches add constraint snapshot_branches_snapshot_id_fkey foreign key (snapshot_id) references snapshot(object_id) not valid;
+  alter table snapshot_branches validate constraint snapshot_branches_snapshot_id_fkey;
+
+  alter table snapshot_branches add constraint snapshot_branches_branch_id_fkey foreign key (branch_id) references snapshot_branch(object_id) not valid;
+  alter table snapshot_branches validate constraint snapshot_branches_branch_id_fkey;
+\endif
+
+-- origin_visit
+create unique index concurrently origin_visit_pkey on origin_visit(origin, visit);
+alter table origin_visit add primary key using index origin_visit_pkey;
+
+\if :dbflavor_default
+  create index concurrently on origin_visit(date);                               -- to be reviewed
+  create index concurrently origin_visit_type_date on origin_visit(type, date);  -- to be reviewed
+\endif
+
+\if :dbflavor_does_deduplication
+  alter table origin_visit add constraint origin_visit_origin_fkey foreign key (origin) references origin(id) not valid;
+  alter table origin_visit validate constraint origin_visit_origin_fkey;
+\endif
+
+-- origin_visit_status
+
+create unique index concurrently origin_visit_status_pkey on origin_visit_status(origin, visit, date);
+alter table origin_visit_status add primary key using index origin_visit_status_pkey;
+
+\if :dbflavor_default
+  alter table origin_visit_status
+    add constraint origin_visit_status_origin_visit_fkey
+    foreign key (origin, visit)
+    references origin_visit(origin, visit) not valid;
+  alter table origin_visit_status validate constraint origin_visit_status_origin_visit_fkey;
+\endif
+
+-- release
+create unique index concurrently release_pkey on release(id);
+alter table release add primary key using index release_pkey;
+
+\if :dbflavor_default
+  create index concurrently on release(target, target_type);  -- to be reviewed
+  create unique index concurrently on release(object_id);     -- to be reviewed
+\endif
+
+\if :dbflavor_does_deduplication
+  alter table release add constraint release_author_fkey foreign key (author) references person(id) not valid;
+  alter table release validate constraint release_author_fkey;
+
+  alter table release
+    add constraint release_date_neg_utc_offset_not_null
+    check (date is null or date_neg_utc_offset is not null)
+    not valid;
+
+  alter table release
+    validate constraint release_date_neg_utc_offset_not_null;
+
+  -- if the author is null, then the date must be null
+  alter table release add constraint release_author_date_check check ((date is null) or (author is not null)) not valid;
+  alter table release validate constraint release_author_date_check;
+\endif
+
+-- metadata_fetcher
+create unique index metadata_fetcher_pkey on metadata_fetcher(id);
+alter table metadata_fetcher add primary key using index metadata_fetcher_pkey;
+
+\if :dbflavor_does_deduplication
+  create unique index metadata_fetcher_name_version on metadata_fetcher(name, version);
+\else
+  create index metadata_fetcher_name_version on metadata_fetcher(name, version);
+\endif
+
+-- metadata_authority
+create unique index concurrently metadata_authority_pkey on metadata_authority(id);
+alter table metadata_authority add primary key using index metadata_authority_pkey;
+
+\if :dbflavor_does_deduplication
+  create unique index concurrently metadata_authority_type_url on metadata_authority(type, url);
+\else
+  create index concurrently metadata_authority_type_url on metadata_authority(type, url);
+\endif
+
+
+-- raw_extrinsic_metadata
+create unique index concurrently raw_extrinsic_metadata_content_authority_date_fetcher on raw_extrinsic_metadata(target, authority_id, discovery_date, fetcher_id);
+
+\if :dbflavor_default
+  alter table raw_extrinsic_metadata add constraint raw_extrinsic_metadata_authority_fkey foreign key (authority_id) references metadata_authority(id) not valid;
+  alter table raw_extrinsic_metadata validate constraint raw_extrinsic_metadata_authority_fkey;
+
+  alter table raw_extrinsic_metadata add constraint raw_extrinsic_metadata_fetcher_fkey foreign key (fetcher_id) references metadata_fetcher(id) not valid;
+  alter table raw_extrinsic_metadata validate constraint raw_extrinsic_metadata_fetcher_fkey;
+\endif
+
+-- object_counts
+create unique index concurrently object_counts_pkey on object_counts(object_type);
+alter table object_counts add primary key using index object_counts_pkey;
+
+-- object_counts_bucketed
+create unique index concurrently object_counts_bucketed_pkey on object_counts_bucketed(line);
+alter table object_counts_bucketed add primary key using index object_counts_bucketed_pkey;
diff --git a/swh/storage/tests/data/sql-v0.18.0/logical_replication/replication_source.sql b/swh/storage/tests/data/sql-v0.18.0/logical_replication/replication_source.sql
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/data/sql-v0.18.0/logical_replication/replication_source.sql
@@ -0,0 +1,25 @@
+-- This file contains the instructions to create a replication source for
+-- PostgreSQL logical replication to another database.
+
+CREATE PUBLICATION softwareheritage;
+
+ALTER PUBLICATION softwareheritage ADD TABLE content;
+ALTER PUBLICATION softwareheritage ADD TABLE skipped_content;
+ALTER PUBLICATION softwareheritage ADD TABLE directory;
+ALTER PUBLICATION softwareheritage ADD TABLE directory_entry_file;
+ALTER PUBLICATION softwareheritage ADD TABLE directory_entry_dir;
+ALTER PUBLICATION softwareheritage ADD TABLE directory_entry_rev;
+ALTER PUBLICATION softwareheritage ADD TABLE person;
+ALTER PUBLICATION softwareheritage ADD TABLE revision;
+ALTER PUBLICATION softwareheritage ADD TABLE revision_history;
+ALTER PUBLICATION softwareheritage ADD TABLE release;
+ALTER PUBLICATION softwareheritage ADD TABLE snapshot;
+ALTER PUBLICATION softwareheritage ADD TABLE snapshot_branch;
+ALTER PUBLICATION softwareheritage ADD TABLE snapshot_branches;
+ALTER PUBLICATION softwareheritage ADD TABLE origin;
+ALTER PUBLICATION softwareheritage ADD TABLE origin_visit;
+ALTER PUBLICATION softwareheritage ADD TABLE origin_visit_status;
+ALTER PUBLICATION softwareheritage ADD TABLE metadata_fetcher;
+ALTER PUBLICATION softwareheritage ADD TABLE metadata_authority;
+ALTER PUBLICATION softwareheritage ADD TABLE raw_extrinsic_metadata;
+ALTER PUBLICATION softwareheritage ADD TABLE object_counts;
diff --git a/swh/storage/tests/test_postgresql_migrated.py b/swh/storage/tests/test_postgresql_migrated.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_postgresql_migrated.py
@@ -0,0 +1,63 @@
+# Copyright (C) 2021  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Tests postgresql migrations by initializing with an old schema, applying migrations,
+then running all the tests."""
+
+import glob
+from os import path
+
+import pytest
+import pytest_postgresql
+
+from swh.core.db.pytest_plugin import postgresql_fact
+from swh.core.utils import numfile_sortkey as sortkey
+import swh.storage
+from swh.storage.tests.storage_tests import TestStorage  # noqa
+
+BASE_DIR = path.dirname(swh.storage.__file__)
+SQL_UPGRADES_DIR = path.join(BASE_DIR, "../../sql/upgrades")
+
+
+PRE_MIGRATION_SCHEMA_DIR = "sql-v0.18.0"
+"""swh/storage/tests/data/{PRE_MIGRATION_SCHEMA_DIR}/ should be a copy of
+swh/storage/sql/ from a previous release."""
+
+BASE_DBVERSION = 164
+"""dbversion in swh/storage/tests/data/{PRE_MIGRATION_SCHEMA_DIR}/30_schema.sql"""
+
+pre_migration_schema_files = sorted(
+    glob.glob(path.join(BASE_DIR, "tests/data", PRE_MIGRATION_SCHEMA_DIR, "*.sql"))
+)
+
+migration_files = sorted(glob.glob(path.join(SQL_UPGRADES_DIR, "*.sql")), key=sortkey,)
+"""All migration files."""
+
+use_migration_files = [
+    filename
+    for filename in migration_files
+    if int(path.splitext(path.basename(filename))[0]) > BASE_DBVERSION
+]
+"""Migration files used to go from BASE_DBVERSION to the current dbversion."""
+
+migrated_swh_storage_postgresql = postgresql_fact(
+    "postgresql_proc_migrated",
+    db_name="storage",
+    dump_files=pre_migration_schema_files + use_migration_files,
+)
+
+postgresql_proc_migrated = pytest_postgresql.factories.postgresql_proc()
+"""Like postgresql_proc, but initialized with the old schema + migration files,
+instead of directly with the current schema."""
+
+
+@pytest.fixture
+def swh_storage_backend_config(
+    swh_storage_backend_config, migrated_swh_storage_postgresql
+):
+    yield {
+        **swh_storage_backend_config,
+        "db": migrated_swh_storage_postgresql.dsn,
+    }
diff --git a/swh/storage/tests/test_postgresql_migration.py b/swh/storage/tests/test_postgresql_migration.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_postgresql_migration.py
@@ -0,0 +1,194 @@
+# Copyright (C) 2021  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import glob
+import os
+import subprocess
+
+import attr
+import pytest
+from pytest_postgresql import factories
+
+from swh.core.utils import numfile_sortkey as sortkey
+from swh.storage import get_storage
+
+from .storage_tests import transform_entries
+
+DIR = os.path.dirname(__file__)
+
+
+pg_storage_migration_proc = factories.postgresql_proc()
+pg_storage_migration = factories.postgresql("pg_storage_migration_proc")
+
+
+def psql_run_file(dsn, filename):
+    subprocess.check_call(
+        [
+            "psql",
+            "--quiet",
+            "--no-psqlrc",
+            "-v",
+            "ON_ERROR_STOP=1",
+            "-d",
+            dsn,
+            "-f",
+            filename,
+        ]
+    )
+
+
+@pytest.fixture
+def storage(pg_storage_migration):
+    for filename in sorted(
+        glob.glob(os.path.join(DIR, "data", "sql-v0.18.0", "*.sql"))
+    ):
+        psql_run_file(pg_storage_migration.dsn, filename)
+
+    config = {
+        "cls": "local",
+        "db": pg_storage_migration.dsn,
+        "objstorage": {"cls": "memory"},
+        "check_config": False,  # it would error on the dbversion number
+    }
+    return get_storage(**config)
+
+
+@pytest.mark.db
+class TestPgStorageMigration:
+    """Creates an old schema, inserts some data, runs migrations, and checks the
+    data still exists."""
+
+    def _migrate(self, db):
+        current_version = db.dbversion()["version"]
+
+        filenames = sorted(
+            glob.glob(os.path.join(DIR, "../../../sql/upgrades/*.sql")), key=sortkey,
+        )
+
+        nb_migrations = 0
+
+        for filename in filenames:
+            (version_str, ext) = os.path.splitext(os.path.basename(filename))
+            assert ext == ".sql"
+            version = int(version_str)
+
+            if version <= current_version:
+                # this migration file is older than the current schema version
+                assert nb_migrations == 0
+                continue
+
+            nb_migrations += 1
+            psql_run_file(db.conn.dsn, filename)
+
+        assert nb_migrations, "no migrations applied"
+
+    @pytest.mark.parametrize("migrate_after_insert", (True, False))
+    def test_content(self, storage, sample_data, migrate_after_insert):
+        swh_storage = storage
+        if migrate_after_insert:
+            swh_storage.content_add(sample_data.contents)
+
+        with swh_storage.db() as db:
+            self._migrate(db)
+
+        if not migrate_after_insert:
+            swh_storage.content_add(sample_data.contents)
+
+        for content in sample_data.contents:
+            assert not list(swh_storage.content_missing([content.to_dict()]))
+            assert swh_storage.content_get([content.sha1]) == [
+                attr.evolve(content, data=None)
+            ]
+
+    @pytest.mark.parametrize("migrate_after_insert", (True, False))
+    def test_skipped_content(self, storage, sample_data, migrate_after_insert):
+        swh_storage = storage
+        if migrate_after_insert:
+            swh_storage.skipped_content_add(sample_data.skipped_contents)
+
+        with swh_storage.db() as db:
+            self._migrate(db)
+
+        if not migrate_after_insert:
+            swh_storage.skipped_content_add(sample_data.skipped_contents)
+
+        for skipped_content in sample_data.skipped_contents:
+            assert not list(
+                swh_storage.skipped_content_missing([skipped_content.to_dict()])
+            )
+
+    @pytest.mark.parametrize("migrate_after_insert", (True, False))
+    def test_directory(self, storage, sample_data, migrate_after_insert):
+        swh_storage = storage
+        if migrate_after_insert:
+            swh_storage.directory_add(sample_data.directories)
+            swh_storage.content_add(sample_data.contents)
+
+        with swh_storage.db() as db:
+            self._migrate(db)
+
+        if not migrate_after_insert:
+            swh_storage.directory_add(sample_data.directories)
+            swh_storage.content_add(sample_data.contents)
+
+        for directory in sample_data.directories:
+            assert not list(swh_storage.directory_missing([directory.id]))
+
+            actual_data = list(swh_storage.directory_ls(directory.id))
+            expected_data = list(transform_entries(storage, directory))
+
+            for data in actual_data:
+                assert data in expected_data
+
+    @pytest.mark.parametrize("migrate_after_insert", (True, False))
+    def test_revision(self, storage, sample_data, migrate_after_insert):
+        swh_storage = storage
+        if migrate_after_insert:
+            swh_storage.revision_add(sample_data.revisions)
+
+        with swh_storage.db() as db:
+            self._migrate(db)
+
+        if not migrate_after_insert:
+            swh_storage.revision_add(sample_data.revisions)
+
+        for revision in sample_data.revisions:
+            assert not list(swh_storage.revision_missing([revision.id]))
+            assert swh_storage.revision_get([revision.id]) == [revision]
+
+    @pytest.mark.parametrize("migrate_after_insert", (True, False))
+    def test_release(self, storage, sample_data, migrate_after_insert):
+        swh_storage = storage
+        if migrate_after_insert:
+            swh_storage.release_add(sample_data.releases)
+
+        with swh_storage.db() as db:
+            self._migrate(db)
+
+        if not migrate_after_insert:
+            swh_storage.release_add(sample_data.releases)
+
+        for release in sample_data.releases:
+            assert not list(swh_storage.release_missing([release.id]))
+            assert swh_storage.release_get([release.id]) == [release]
+
+    @pytest.mark.parametrize("migrate_after_insert", (True, False))
+    def test_snapshot(self, storage, sample_data, migrate_after_insert):
+        swh_storage = storage
+        if migrate_after_insert:
+            swh_storage.snapshot_add(sample_data.snapshots)
+
+        with swh_storage.db() as db:
+            self._migrate(db)
+
+        if not migrate_after_insert:
+            swh_storage.snapshot_add(sample_data.snapshots)
+
+        for snapshot in sample_data.snapshots:
+            assert not list(swh_storage.snapshot_missing([snapshot.id]))
+            assert swh_storage.snapshot_get(snapshot.id) == {
+                **snapshot.to_dict(),
+                "next_branch": None,
+            }