diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
index 20dd7f05..1cca4d63 100644
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -1,1221 +1,1220 @@
 # Copyright (C) 2019-2020  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import datetime
 import itertools
 import json
 import random
 import re
 from typing import Any, Dict, List, Iterable, Optional, Union
 
 import attr
 import dateutil
 
 from swh.core.api.serializers import msgpack_loads, msgpack_dumps
 from swh.model.model import (
     Revision,
     Release,
     Directory,
     DirectoryEntry,
     Content,
     SkippedContent,
     OriginVisit,
     OriginVisitStatus,
     Snapshot,
     Origin,
 )
 from swh.model.hashutil import DEFAULT_ALGORITHMS
 from swh.storage.objstorage import ObjStorage
 from swh.storage.writer import JournalWriter
 from swh.storage.validate import convert_validation_exceptions
 from swh.storage.utils import now
 
 from ..exc import StorageArgumentException, HashCollision
 from .common import TOKEN_BEGIN, TOKEN_END
 from .converters import (
     revision_to_db,
     revision_from_db,
     release_to_db,
     release_from_db,
 )
 from .cql import CqlRunner
 from .schema import HASH_ALGORITHMS
 
 
 # Max block size of contents to return
 BULK_BLOCK_CONTENT_LEN_MAX = 10000
 
 
 class CassandraStorage:
     def __init__(self, hosts, keyspace, objstorage, port=9042, journal_writer=None):
         self._cql_runner = CqlRunner(hosts, keyspace, port)
         self.journal_writer = JournalWriter(journal_writer)
         self.objstorage = ObjStorage(objstorage)
 
     def check_config(self, *, check_write):
         self._cql_runner.check_read()
 
         return True
 
     def _content_get_from_hash(self, algo, hash_) -> Iterable:
         """From the name of a hash algorithm and a value of that hash,
         looks up the "hash -> token" secondary table (content_by_{algo})
         to get tokens.
         Then, looks up the main table (content) to get all contents with
         that token, and filters out contents whose hash doesn't match."""
         found_tokens = self._cql_runner.content_get_tokens_from_single_hash(algo, hash_)
 
         for token in found_tokens:
             # Query the main table ('content').
             res = self._cql_runner.content_get_from_token(token)
 
             for row in res:
                 # re-check the the hash (in case of murmur3 collision)
                 if getattr(row, algo) == hash_:
                     yield row
 
     def _content_add(self, contents: List[Content], with_data: bool) -> Dict:
         # Filter-out content already in the database.
         contents = [
             c for c in contents if not self._cql_runner.content_get_from_pk(c.to_dict())
         ]
 
         self.journal_writer.content_add(contents)
 
         if with_data:
             # First insert to the objstorage, if the endpoint is
             # `content_add` (as opposed to `content_add_metadata`).
             # TODO: this should probably be done in concurrently to inserting
             # in index tables (but still before the main table; so an entry is
             # only added to the main table after everything else was
             # successfully inserted.
             summary = self.objstorage.content_add(
                 c for c in contents if c.status != "absent"
             )
             content_add_bytes = summary["content:add:bytes"]
 
         content_add = 0
         for content in contents:
             content_add += 1
 
             # Check for sha1 or sha1_git collisions. This test is not atomic
             # with the insertion, so it won't detect a collision if both
             # contents are inserted at the same time, but it's good enough.
             #
             # The proper way to do it would probably be a BATCH, but this
             # would be inefficient because of the number of partitions we
             # need to affect (len(HASH_ALGORITHMS)+1, which is currently 5)
             for algo in {"sha1", "sha1_git"}:
                 collisions = []
                 # Get tokens of 'content' rows with the same value for
                 # sha1/sha1_git
                 rows = self._content_get_from_hash(algo, content.get_hash(algo))
                 for row in rows:
                     if getattr(row, algo) != content.get_hash(algo):
                         # collision of token(partition key), ignore this
                         # row
                         continue
 
                     for algo in HASH_ALGORITHMS:
                         if getattr(row, algo) != content.get_hash(algo):
                             # This hash didn't match; discard the row.
                             collisions.append(
                                 {algo: getattr(row, algo) for algo in HASH_ALGORITHMS}
                             )
 
                 if collisions:
                     collisions.append(content.hashes())
                     raise HashCollision(algo, content.get_hash(algo), collisions)
 
             (token, insertion_finalizer) = self._cql_runner.content_add_prepare(content)
 
             # Then add to index tables
             for algo in HASH_ALGORITHMS:
                 self._cql_runner.content_index_add_one(algo, content, token)
 
             # Then to the main table
             insertion_finalizer()
 
         summary = {
             "content:add": content_add,
         }
 
         if with_data:
             summary["content:add:bytes"] = content_add_bytes
 
         return summary
 
     def content_add(self, content: Iterable[Content]) -> Dict:
         contents = [attr.evolve(c, ctime=now()) for c in content]
         return self._content_add(list(contents), with_data=True)
 
     def content_update(self, content, keys=[]):
         raise NotImplementedError(
             "content_update is not supported by the Cassandra backend"
         )
 
     def content_add_metadata(self, content: Iterable[Content]) -> Dict:
         return self._content_add(list(content), with_data=False)
 
     def content_get(self, content):
         if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
             raise StorageArgumentException(
                 "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX
             )
         yield from self.objstorage.content_get(content)
 
     def content_get_partition(
         self,
         partition_id: int,
         nb_partitions: int,
         limit: int = 1000,
         page_token: str = None,
     ):
         if limit is None:
             raise StorageArgumentException("limit should not be None")
 
         # Compute start and end of the range of tokens covered by the
         # requested partition
         partition_size = (TOKEN_END - TOKEN_BEGIN) // nb_partitions
         range_start = TOKEN_BEGIN + partition_id * partition_size
         range_end = TOKEN_BEGIN + (partition_id + 1) * partition_size
 
         # offset the range start according to the `page_token`.
         if page_token is not None:
             if not (range_start <= int(page_token) <= range_end):
                 raise StorageArgumentException("Invalid page_token.")
             range_start = int(page_token)
 
         # Get the first rows of the range
         rows = self._cql_runner.content_get_token_range(range_start, range_end, limit)
         rows = list(rows)
 
         if len(rows) == limit:
             next_page_token: Optional[str] = str(rows[-1].tok + 1)
         else:
             next_page_token = None
 
         return {
             "contents": [row._asdict() for row in rows if row.status != "absent"],
             "next_page_token": next_page_token,
         }
 
     def content_get_metadata(self, contents: List[bytes]) -> Dict[bytes, List[Dict]]:
         result: Dict[bytes, List[Dict]] = {sha1: [] for sha1 in contents}
         for sha1 in contents:
             # Get all (sha1, sha1_git, sha256, blake2s256) whose sha1
             # matches the argument, from the index table ('content_by_sha1')
             for row in self._content_get_from_hash("sha1", sha1):
                 content_metadata = row._asdict()
                 content_metadata.pop("ctime")
                 result[content_metadata["sha1"]].append(content_metadata)
         return result
 
     def content_find(self, content):
         # Find an algorithm that is common to all the requested contents.
         # It will be used to do an initial filtering efficiently.
         filter_algos = list(set(content).intersection(HASH_ALGORITHMS))
         if not filter_algos:
             raise StorageArgumentException(
                 "content keys must contain at least one of: "
                 "%s" % ", ".join(sorted(HASH_ALGORITHMS))
             )
         common_algo = filter_algos[0]
 
         results = []
         rows = self._content_get_from_hash(common_algo, content[common_algo])
         for row in rows:
             # Re-check all the hashes, in case of collisions (either of the
             # hash of the partition key, or the hashes in it)
             for algo in HASH_ALGORITHMS:
                 if content.get(algo) and getattr(row, algo) != content[algo]:
                     # This hash didn't match; discard the row.
                     break
             else:
                 # All hashes match, keep this row.
                 results.append(
                     {
                         **row._asdict(),
                         "ctime": row.ctime.replace(tzinfo=datetime.timezone.utc),
                     }
                 )
         return results
 
     def content_missing(self, content, key_hash="sha1"):
         for cont in content:
             res = self.content_find(cont)
             if not res:
                 yield cont[key_hash]
             if any(c["status"] == "missing" for c in res):
                 yield cont[key_hash]
 
     def content_missing_per_sha1(self, contents):
         return self.content_missing([{"sha1": c for c in contents}])
 
     def content_missing_per_sha1_git(self, contents):
         return self.content_missing(
             [{"sha1_git": c for c in contents}], key_hash="sha1_git"
         )
 
     def content_get_random(self):
         return self._cql_runner.content_get_random().sha1_git
 
     def _skipped_content_get_from_hash(self, algo, hash_) -> Iterable:
         """From the name of a hash algorithm and a value of that hash,
         looks up the "hash -> token" secondary table
         (skipped_content_by_{algo}) to get tokens.
         Then, looks up the main table (content) to get all contents with
         that token, and filters out contents whose hash doesn't match."""
         found_tokens = self._cql_runner.skipped_content_get_tokens_from_single_hash(
             algo, hash_
         )
 
         for token in found_tokens:
             # Query the main table ('content').
             res = self._cql_runner.skipped_content_get_from_token(token)
 
             for row in res:
                 # re-check the the hash (in case of murmur3 collision)
                 if getattr(row, algo) == hash_:
                     yield row
 
     def _skipped_content_add(self, contents: Iterable[SkippedContent]) -> Dict:
         # Filter-out content already in the database.
         contents = [
             c
             for c in contents
             if not self._cql_runner.skipped_content_get_from_pk(c.to_dict())
         ]
 
         self.journal_writer.skipped_content_add(contents)
 
         for content in contents:
             # Compute token of the row in the main table
             (token, insertion_finalizer) = self._cql_runner.skipped_content_add_prepare(
                 content
             )
 
             # Then add to index tables
             for algo in HASH_ALGORITHMS:
                 self._cql_runner.skipped_content_index_add_one(algo, content, token)
 
             # Then to the main table
             insertion_finalizer()
 
         return {"skipped_content:add": len(contents)}
 
     def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict:
         contents = [attr.evolve(c, ctime=now()) for c in content]
         return self._skipped_content_add(contents)
 
     def skipped_content_missing(self, contents):
         for content in contents:
             if not self._cql_runner.skipped_content_get_from_pk(content):
                 yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS}
 
     def directory_add(self, directories: Iterable[Directory]) -> Dict:
         directories = list(directories)
 
         # Filter out directories that are already inserted.
         missing = self.directory_missing([dir_.id for dir_ in directories])
         directories = [dir_ for dir_ in directories if dir_.id in missing]
 
         self.journal_writer.directory_add(directories)
 
         for directory in directories:
             # Add directory entries to the 'directory_entry' table
             for entry in directory.entries:
                 self._cql_runner.directory_entry_add_one(
                     {**entry.to_dict(), "directory_id": directory.id}
                 )
 
             # Add the directory *after* adding all the entries, so someone
             # calling snapshot_get_branch in the meantime won't end up
             # with half the entries.
             self._cql_runner.directory_add_one(directory.id)
 
         return {"directory:add": len(missing)}
 
     def directory_missing(self, directories):
         return self._cql_runner.directory_missing(directories)
 
     def _join_dentry_to_content(self, dentry):
         keys = (
             "status",
             "sha1",
             "sha1_git",
             "sha256",
             "length",
         )
         ret = dict.fromkeys(keys)
         ret.update(dentry.to_dict())
         if ret["type"] == "file":
             content = self.content_find({"sha1_git": ret["target"]})
             if content:
                 content = content[0]
                 for key in keys:
                     ret[key] = content[key]
         return ret
 
     def _directory_ls(self, directory_id, recursive, prefix=b""):
         if self.directory_missing([directory_id]):
             return
         rows = list(self._cql_runner.directory_entry_get([directory_id]))
 
         for row in rows:
             # Build and yield the directory entry dict
             entry = row._asdict()
             del entry["directory_id"]
             entry = DirectoryEntry.from_dict(entry)
             ret = self._join_dentry_to_content(entry)
             ret["name"] = prefix + ret["name"]
             ret["dir_id"] = directory_id
             yield ret
 
             if recursive and ret["type"] == "dir":
                 yield from self._directory_ls(
                     ret["target"], True, prefix + ret["name"] + b"/"
                 )
 
     def directory_entry_get_by_path(self, directory, paths):
         return self._directory_entry_get_by_path(directory, paths, b"")
 
     def _directory_entry_get_by_path(self, directory, paths, prefix):
         if not paths:
             return
 
         contents = list(self.directory_ls(directory))
 
         if not contents:
             return
 
         def _get_entry(entries, name):
             """Finds the entry with the requested name, prepends the
             prefix (to get its full path), and returns it.
 
             If no entry has that name, returns None."""
             for entry in entries:
                 if entry["name"] == name:
                     entry = entry.copy()
                     entry["name"] = prefix + entry["name"]
                     return entry
 
         first_item = _get_entry(contents, paths[0])
 
         if len(paths) == 1:
             return first_item
 
         if not first_item or first_item["type"] != "dir":
             return
 
         return self._directory_entry_get_by_path(
             first_item["target"], paths[1:], prefix + paths[0] + b"/"
         )
 
     def directory_ls(self, directory, recursive=False):
         yield from self._directory_ls(directory, recursive)
 
     def directory_get_random(self):
         return self._cql_runner.directory_get_random().id
 
     def revision_add(self, revisions: Iterable[Revision]) -> Dict:
         revisions = list(revisions)
 
         # Filter-out revisions already in the database
         missing = self.revision_missing([rev.id for rev in revisions])
         revisions = [rev for rev in revisions if rev.id in missing]
         self.journal_writer.revision_add(revisions)
 
         for revision in revisions:
             revobject = revision_to_db(revision)
             if revobject:
                 # Add parents first
                 for (rank, parent) in enumerate(revobject["parents"]):
                     self._cql_runner.revision_parent_add_one(
                         revobject["id"], rank, parent
                     )
 
                 # Then write the main revision row.
                 # Writing this after all parents were written ensures that
                 # read endpoints don't return a partial view while writing
                 # the parents
                 self._cql_runner.revision_add_one(revobject)
 
         return {"revision:add": len(revisions)}
 
     def revision_missing(self, revisions):
         return self._cql_runner.revision_missing(revisions)
 
     def revision_get(self, revisions):
         rows = self._cql_runner.revision_get(revisions)
         revs = {}
         for row in rows:
             # TODO: use a single query to get all parents?
             # (it might have lower latency, but requires more code and more
             # bandwidth, because revision id would be part of each returned
             # row)
             parent_rows = self._cql_runner.revision_parent_get(row.id)
             # parent_rank is the clustering key, so results are already
             # sorted by rank.
             parents = tuple(row.parent_id for row in parent_rows)
             rev = revision_from_db(row, parents=parents)
             revs[rev.id] = rev.to_dict()
 
         for rev_id in revisions:
             yield revs.get(rev_id)
 
     def _get_parent_revs(self, rev_ids, seen, limit, short):
         if limit and len(seen) >= limit:
             return
         rev_ids = [id_ for id_ in rev_ids if id_ not in seen]
         if not rev_ids:
             return
         seen |= set(rev_ids)
 
         # We need this query, even if short=True, to return consistent
         # results (ie. not return only a subset of a revision's parents
         # if it is being written)
         if short:
             rows = self._cql_runner.revision_get_ids(rev_ids)
         else:
             rows = self._cql_runner.revision_get(rev_ids)
 
         for row in rows:
             # TODO: use a single query to get all parents?
             # (it might have less latency, but requires less code and more
             # bandwidth (because revision id would be part of each returned
             # row)
             parent_rows = self._cql_runner.revision_parent_get(row.id)
 
             # parent_rank is the clustering key, so results are already
             # sorted by rank.
             parents = tuple(row.parent_id for row in parent_rows)
 
             if short:
                 yield (row.id, parents)
             else:
                 rev = revision_from_db(row, parents=parents)
                 yield rev.to_dict()
             yield from self._get_parent_revs(parents, seen, limit, short)
 
     def revision_log(self, revisions, limit=None):
         seen = set()
         yield from self._get_parent_revs(revisions, seen, limit, False)
 
     def revision_shortlog(self, revisions, limit=None):
         seen = set()
         yield from self._get_parent_revs(revisions, seen, limit, True)
 
     def revision_get_random(self):
         return self._cql_runner.revision_get_random().id
 
     def release_add(self, releases: Iterable[Release]) -> Dict:
         missing = self.release_missing([rel.id for rel in releases])
         releases = [rel for rel in releases if rel.id in missing]
 
         self.journal_writer.release_add(releases)
 
         for release in releases:
             if release:
                 self._cql_runner.release_add_one(release_to_db(release))
 
         return {"release:add": len(missing)}
 
     def release_missing(self, releases):
         return self._cql_runner.release_missing(releases)
 
     def release_get(self, releases):
         rows = self._cql_runner.release_get(releases)
         rels = {}
         for row in rows:
             release = release_from_db(row)
             rels[row.id] = release.to_dict()
 
         for rel_id in releases:
             yield rels.get(rel_id)
 
     def release_get_random(self):
         return self._cql_runner.release_get_random().id
 
     def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict:
         missing = self._cql_runner.snapshot_missing([snp.id for snp in snapshots])
         snapshots = [snp for snp in snapshots if snp.id in missing]
 
         for snapshot in snapshots:
             self.journal_writer.snapshot_add([snapshot])
 
             # Add branches
             for (branch_name, branch) in snapshot.branches.items():
                 if branch is None:
                     target_type = None
                     target = None
                 else:
                     target_type = branch.target_type.value
                     target = branch.target
                 self._cql_runner.snapshot_branch_add_one(
                     {
                         "snapshot_id": snapshot.id,
                         "name": branch_name,
                         "target_type": target_type,
                         "target": target,
                     }
                 )
 
             # Add the snapshot *after* adding all the branches, so someone
             # calling snapshot_get_branch in the meantime won't end up
             # with half the branches.
             self._cql_runner.snapshot_add_one(snapshot.id)
 
         return {"snapshot:add": len(snapshots)}
 
     def snapshot_missing(self, snapshots):
         return self._cql_runner.snapshot_missing(snapshots)
 
     def snapshot_get(self, snapshot_id):
         return self.snapshot_get_branches(snapshot_id)
 
     def snapshot_get_by_origin_visit(self, origin, visit):
         try:
             visit = self.origin_visit_get_by(origin, visit)
         except IndexError:
             return None
 
         return self.snapshot_get(visit["snapshot"])
 
     def snapshot_get_latest(self, origin, allowed_statuses=None):
         visit = self.origin_visit_get_latest(
             origin, allowed_statuses=allowed_statuses, require_snapshot=True
         )
 
         if visit:
             assert visit["snapshot"]
             if self._cql_runner.snapshot_missing([visit["snapshot"]]):
                 raise StorageArgumentException("Visit references unknown snapshot")
             return self.snapshot_get_branches(visit["snapshot"])
 
     def snapshot_count_branches(self, snapshot_id):
         if self._cql_runner.snapshot_missing([snapshot_id]):
             # Makes sure we don't fetch branches for a snapshot that is
             # being added.
             return None
         rows = list(self._cql_runner.snapshot_count_branches(snapshot_id))
         assert len(rows) == 1
         (nb_none, counts) = rows[0].counts
         counts = dict(counts)
         if nb_none:
             counts[None] = nb_none
         return counts
 
     def snapshot_get_branches(
         self, snapshot_id, branches_from=b"", branches_count=1000, target_types=None
     ):
         if self._cql_runner.snapshot_missing([snapshot_id]):
             # Makes sure we don't fetch branches for a snapshot that is
             # being added.
             return None
 
         branches = []
         while len(branches) < branches_count + 1:
             new_branches = list(
                 self._cql_runner.snapshot_branch_get(
                     snapshot_id, branches_from, branches_count + 1
                 )
             )
 
             if not new_branches:
                 break
 
             branches_from = new_branches[-1].name
 
             new_branches_filtered = new_branches
 
             # Filter by target_type
             if target_types:
                 new_branches_filtered = [
                     branch
                     for branch in new_branches_filtered
                     if branch.target is not None and branch.target_type in target_types
                 ]
 
             branches.extend(new_branches_filtered)
 
             if len(new_branches) < branches_count + 1:
                 break
 
         if len(branches) > branches_count:
             last_branch = branches.pop(-1).name
         else:
             last_branch = None
 
         branches = {
             branch.name: {"target": branch.target, "target_type": branch.target_type,}
             if branch.target
             else None
             for branch in branches
         }
 
         return {
             "id": snapshot_id,
             "branches": branches,
             "next_branch": last_branch,
         }
 
     def snapshot_get_random(self):
         return self._cql_runner.snapshot_get_random().id
 
     def object_find_by_sha1_git(self, ids):
         results = {id_: [] for id_ in ids}
         missing_ids = set(ids)
 
         # Mind the order, revision is the most likely one for a given ID,
         # so we check revisions first.
         queries = [
             ("revision", self._cql_runner.revision_missing),
             ("release", self._cql_runner.release_missing),
             ("content", self._cql_runner.content_missing_by_sha1_git),
             ("directory", self._cql_runner.directory_missing),
         ]
 
         for (object_type, query_fn) in queries:
             found_ids = missing_ids - set(query_fn(missing_ids))
             for sha1_git in found_ids:
                 results[sha1_git].append(
                     {"sha1_git": sha1_git, "type": object_type,}
                 )
                 missing_ids.remove(sha1_git)
 
             if not missing_ids:
                 # We found everything, skipping the next queries.
                 break
 
         return results
 
     def origin_get(self, origins):
         if isinstance(origins, dict):
             # Old API
             return_single = True
             origins = [origins]
         else:
             return_single = False
 
         if any("id" in origin for origin in origins):
             raise StorageArgumentException("Origin ids are not supported.")
 
         results = [self.origin_get_one(origin) for origin in origins]
 
         if return_single:
             assert len(results) == 1
             return results[0]
         else:
             return results
 
     def origin_get_one(self, origin: Dict[str, Any]) -> Optional[Dict[str, Any]]:
         if "id" in origin:
             raise StorageArgumentException("Origin ids are not supported.")
         if "url" not in origin:
             raise StorageArgumentException("Missing origin url")
         rows = self._cql_runner.origin_get_by_url(origin["url"])
 
         rows = list(rows)
         if rows:
             assert len(rows) == 1
             result = rows[0]._asdict()
             return {
                 "url": result["url"],
             }
         else:
             return None
 
     def origin_get_by_sha1(self, sha1s):
         results = []
         for sha1 in sha1s:
             rows = self._cql_runner.origin_get_by_sha1(sha1)
             if rows:
                 results.append({"url": rows.one().url})
             else:
                 results.append(None)
         return results
 
     def origin_list(self, page_token: Optional[str] = None, limit: int = 100) -> dict:
         # Compute what token to begin the listing from
         start_token = TOKEN_BEGIN
         if page_token:
             start_token = int(page_token)
             if not (TOKEN_BEGIN <= start_token <= TOKEN_END):
                 raise StorageArgumentException("Invalid page_token.")
 
         rows = self._cql_runner.origin_list(start_token, limit)
         rows = list(rows)
 
         if len(rows) == limit:
             next_page_token: Optional[str] = str(rows[-1].tok + 1)
         else:
             next_page_token = None
 
         return {
             "origins": [{"url": row.url} for row in rows],
             "next_page_token": next_page_token,
         }
 
     def origin_search(
         self, url_pattern, offset=0, limit=50, regexp=False, with_visit=False
     ):
         # TODO: remove this endpoint, swh-search should be used instead.
         origins = self._cql_runner.origin_iter_all()
         if regexp:
             pat = re.compile(url_pattern)
             origins = [orig for orig in origins if pat.search(orig.url)]
         else:
             origins = [orig for orig in origins if url_pattern in orig.url]
 
         if with_visit:
             origins = [orig for orig in origins if orig.next_visit_id > 1]
 
         return [{"url": orig.url,} for orig in origins[offset : offset + limit]]
 
     def origin_add(self, origins: Iterable[Origin]) -> List[Dict]:
         results = []
         for origin in origins:
             self.origin_add_one(origin)
             results.append(origin.to_dict())
         return results
 
     def origin_add_one(self, origin: Origin) -> str:
         known_origin = self.origin_get_one(origin.to_dict())
 
         if known_origin:
             origin_url = known_origin["url"]
         else:
             self.journal_writer.origin_add([origin])
 
             self._cql_runner.origin_add_one(origin)
             origin_url = origin.url
 
         return origin_url
 
     def origin_visit_add(
         self, origin_url: str, date: Union[str, datetime.datetime], type: str
     ) -> OriginVisit:
         if isinstance(date, str):
             # FIXME: Converge on iso8601 at some point
             date = dateutil.parser.parse(date)
         elif not isinstance(date, datetime.datetime):
             raise StorageArgumentException("Date must be a datetime or a string")
 
         if not self.origin_get_one({"url": origin_url}):
             raise StorageArgumentException("Unknown origin %s", origin_url)
 
         visit_id = self._cql_runner.origin_generate_unique_visit_id(origin_url)
         visit_state = "ongoing"
         with convert_validation_exceptions():
             visit = OriginVisit.from_dict(
                 {
                     "origin": origin_url,
                     "date": date,
                     "type": type,
                     "status": visit_state,
                     "snapshot": None,
                     "metadata": None,
                     "visit": visit_id,
                 }
             )
 
         self.journal_writer.origin_visit_add([visit])
         self._cql_runner.origin_visit_add_one(visit)
 
         with convert_validation_exceptions():
             visit_status = OriginVisitStatus(
                 origin=origin_url,
                 visit=visit_id,
                 date=date,
                 status=visit_state,
                 snapshot=None,
                 metadata=None,
             )
         self._origin_visit_status_add(visit_status)
 
         return visit
 
     def _origin_visit_status_add(self, visit_status: OriginVisitStatus) -> None:
         """Add an origin visit status"""
         self.journal_writer.origin_visit_status_add([visit_status])
         self._cql_runner.origin_visit_status_add_one(visit_status)
 
     def origin_visit_status_add(
         self, visit_statuses: Iterable[OriginVisitStatus]
     ) -> None:
         # First round to check existence (fail early if any is ko)
         for visit_status in visit_statuses:
             origin_url = self.origin_get({"url": visit_status.origin})
             if not origin_url:
                 raise StorageArgumentException(f"Unknown origin {visit_status.origin}")
 
         self.journal_writer.origin_visit_status_add(visit_statuses)
         for visit_status in visit_statuses:
             self._origin_visit_status_add(visit_status)
 
     def origin_visit_update(
         self,
         origin: str,
         visit_id: int,
         status: str,
         metadata: Optional[Dict] = None,
         snapshot: Optional[bytes] = None,
         date: Optional[datetime.datetime] = None,
     ):
         origin_url = origin  # TODO: rename the argument
 
         # Get the existing data of the visit
         visit_ = self.origin_visit_get_by(origin_url, visit_id)
         if not visit_:
             raise StorageArgumentException("This origin visit does not exist.")
         with convert_validation_exceptions():
             visit = OriginVisit.from_dict(visit_)
 
         updates: Dict[str, Any] = {"status": status}
         if metadata and metadata != visit.metadata:
             updates["metadata"] = metadata
         if snapshot and snapshot != visit.snapshot:
             updates["snapshot"] = snapshot
 
         with convert_validation_exceptions():
             visit = attr.evolve(visit, **updates)
 
         self.journal_writer.origin_visit_update([visit])
 
         last_visit_update = self._origin_visit_get_updated(visit.origin, visit.visit)
         assert last_visit_update is not None
 
         with convert_validation_exceptions():
             visit_status = OriginVisitStatus(
                 origin=origin_url,
                 visit=visit_id,
                 date=date or now(),
                 status=status,
                 snapshot=snapshot or last_visit_update["snapshot"],
                 metadata=metadata or last_visit_update["metadata"],
             )
-        self._cql_runner.origin_visit_status_add_one(visit_status)
-        # self._origin_visit_status_add(visit_status)
+        self._origin_visit_status_add(visit_status)
 
     def _origin_visit_merge(
         self, visit: Dict[str, Any], visit_status: Dict[str, Any]
     ) -> Dict[str, Any]:
         """Merge origin_visit and visit_status together.
 
         """
         return OriginVisit.from_dict(
             {
                 # default to the values in visit
                 **visit,
                 # override with the last update
                 **visit_status,
                 # visit['origin'] is the URL (via a join), while
                 # visit_status['origin'] is only an id.
                 "origin": visit["origin"],
                 # but keep the date of the creation of the origin visit
                 "date": visit["date"],
             }
         ).to_dict()
 
     def _origin_visit_apply_last_status(self, visit: Dict[str, Any]) -> Dict[str, Any]:
         """Retrieve the latest visit status information for the origin visit.
         Then merge it with the visit and return it.
 
         """
         visit_status = self._cql_runner.origin_visit_status_get_latest(
             visit["origin"], visit["visit"]
         )
         assert visit_status is not None
         return self._origin_visit_merge(visit, visit_status)
 
     def _origin_visit_get_updated(self, origin: str, visit_id: int) -> Dict[str, Any]:
         """Retrieve origin visit and latest origin visit status and merge them
         into an origin visit.
 
         """
         row_visit = self._cql_runner.origin_visit_get_one(origin, visit_id)
         assert row_visit is not None
         visit = self._format_origin_visit_row(row_visit)
         return self._origin_visit_apply_last_status(visit)
 
     def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None:
         for visit in visits:
             if visit.visit is None:
                 raise StorageArgumentException(f"Missing visit id for visit {visit}")
 
         self.journal_writer.origin_visit_upsert(visits)
         for visit in visits:
             assert visit.visit is not None
             self._cql_runner.origin_visit_upsert(visit)
             with convert_validation_exceptions():
                 visit_status = OriginVisitStatus(
                     origin=visit.origin,
                     visit=visit.visit,
                     date=now(),
                     status=visit.status,
                     snapshot=visit.snapshot,
                     metadata=visit.metadata,
                 )
             self._cql_runner.origin_visit_status_add_one(visit_status)
 
     @staticmethod
     def _format_origin_visit_row(visit):
         return {
             **visit._asdict(),
             "origin": visit.origin,
             "date": visit.date.replace(tzinfo=datetime.timezone.utc),
             "metadata": (json.loads(visit.metadata) if visit.metadata else None),
         }
 
     def origin_visit_get(
         self, origin: str, last_visit: Optional[int] = None, limit: Optional[int] = None
     ) -> Iterable[Dict[str, Any]]:
         rows = self._cql_runner.origin_visit_get(origin, last_visit, limit)
         for row in rows:
             visit = self._format_origin_visit_row(row)
             yield self._origin_visit_apply_last_status(visit)
 
     def origin_visit_find_by_date(
         self, origin: str, visit_date: datetime.datetime
     ) -> Optional[Dict[str, Any]]:
         # Iterator over all the visits of the origin
         # This should be ok for now, as there aren't too many visits
         # per origin.
         rows = list(self._cql_runner.origin_visit_get_all(origin))
 
         def key(visit):
             dt = visit.date.replace(tzinfo=datetime.timezone.utc) - visit_date
             return (abs(dt), -visit.visit)
 
         if rows:
             row = min(rows, key=key)
             visit = self._format_origin_visit_row(row)
             return self._origin_visit_apply_last_status(visit)
         return None
 
     def origin_visit_get_by(self, origin: str, visit: int) -> Optional[Dict[str, Any]]:
         row = self._cql_runner.origin_visit_get_one(origin, visit)
         if row:
             visit_ = self._format_origin_visit_row(row)
             return self._origin_visit_apply_last_status(visit_)
         return None
 
     def origin_visit_get_latest(
         self,
         origin: str,
         allowed_statuses: Optional[List[str]] = None,
         require_snapshot: bool = False,
     ) -> Optional[Dict[str, Any]]:
         # TODO: Do not fetch all visits
         rows = self._cql_runner.origin_visit_get_all(origin)
         latest_visit = None
         for row in rows:
             visit = self._format_origin_visit_row(row)
             updated_visit = self._origin_visit_apply_last_status(visit)
             if allowed_statuses and updated_visit["status"] not in allowed_statuses:
                 continue
             if require_snapshot and updated_visit["snapshot"] is None:
                 continue
 
             # updated_visit is a candidate
             if latest_visit is not None:
                 if updated_visit["date"] < latest_visit["date"]:
                     continue
                 if updated_visit["visit"] < latest_visit["visit"]:
                     continue
 
             latest_visit = updated_visit
 
         return latest_visit
 
     def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]:
         back_in_the_day = now() - datetime.timedelta(weeks=12)  # 3 months back
 
         # Random position to start iteration at
         start_token = random.randint(TOKEN_BEGIN, TOKEN_END)
 
         # Iterator over all visits, ordered by token(origins) then visit_id
         rows = self._cql_runner.origin_visit_iter(start_token)
         for row in rows:
             visit = self._format_origin_visit_row(row)
             visit_status = self._origin_visit_apply_last_status(visit)
             if (
                 visit_status["date"] > back_in_the_day
                 and visit_status["status"] == "full"
             ):
                 return visit_status
         else:
             return None
 
     def stat_counters(self):
         rows = self._cql_runner.stat_counters()
         keys = (
             "content",
             "directory",
             "origin",
             "origin_visit",
             "release",
             "revision",
             "skipped_content",
             "snapshot",
         )
         stats = {key: 0 for key in keys}
         stats.update({row.object_type: row.count for row in rows})
         return stats
 
     def refresh_stat_counters(self):
         pass
 
     def origin_metadata_add(
         self,
         origin_url: str,
         discovery_date: datetime.datetime,
         authority: Dict[str, Any],
         fetcher: Dict[str, Any],
         format: str,
         metadata: bytes,
     ) -> None:
         if not isinstance(origin_url, str):
             raise StorageArgumentException(
                 "origin_id must be str, not %r" % (origin_url,)
             )
         if not self._cql_runner.metadata_authority_get(**authority):
             raise StorageArgumentException(f"Unknown authority {authority}")
         if not self._cql_runner.metadata_fetcher_get(**fetcher):
             raise StorageArgumentException(f"Unknown fetcher {fetcher}")
 
         try:
             self._cql_runner.origin_metadata_add(
                 origin_url,
                 authority["type"],
                 authority["url"],
                 discovery_date,
                 fetcher["name"],
                 fetcher["version"],
                 format,
                 metadata,
             )
         except TypeError as e:
             raise StorageArgumentException(*e.args)
 
     def origin_metadata_get(
         self,
         origin_url: str,
         authority: Dict[str, str],
         after: Optional[datetime.datetime] = None,
         page_token: Optional[bytes] = None,
         limit: int = 1000,
     ) -> Dict[str, Any]:
         if not isinstance(origin_url, str):
             raise TypeError("origin_url must be str, not %r" % (origin_url,))
 
         if page_token is not None:
             (after_date, after_fetcher_name, after_fetcher_url) = msgpack_loads(
                 page_token
             )
             if after and after_date < after:
                 raise StorageArgumentException(
                     "page_token is inconsistent with the value of 'after'."
                 )
             entries = self._cql_runner.origin_metadata_get_after_date_and_fetcher(
                 origin_url,
                 authority["type"],
                 authority["url"],
                 after_date,
                 after_fetcher_name,
                 after_fetcher_url,
             )
         elif after is not None:
             entries = self._cql_runner.origin_metadata_get_after_date(
                 origin_url, authority["type"], authority["url"], after
             )
         else:
             entries = self._cql_runner.origin_metadata_get(
                 origin_url, authority["type"], authority["url"]
             )
 
         if limit:
             entries = itertools.islice(entries, 0, limit + 1)
 
         results = []
         for entry in entries:
             discovery_date = entry.discovery_date.replace(tzinfo=datetime.timezone.utc)
             results.append(
                 {
                     "origin_url": entry.origin,
                     "authority": {
                         "type": entry.authority_type,
                         "url": entry.authority_url,
                     },
                     "fetcher": {
                         "name": entry.fetcher_name,
                         "version": entry.fetcher_version,
                     },
                     "discovery_date": discovery_date,
                     "format": entry.format,
                     "metadata": entry.metadata,
                 }
             )
 
         if len(results) > limit:
             results.pop()
             assert len(results) == limit
             last_result = results[-1]
             next_page_token: Optional[bytes] = msgpack_dumps(
                 (
                     last_result["discovery_date"],
                     last_result["fetcher"]["name"],
                     last_result["fetcher"]["version"],
                 )
             )
         else:
             next_page_token = None
 
         return {
             "next_page_token": next_page_token,
             "results": results,
         }
 
     def metadata_fetcher_add(
         self, name: str, version: str, metadata: Dict[str, Any]
     ) -> None:
         self._cql_runner.metadata_fetcher_add(name, version, json.dumps(metadata))
 
     def metadata_fetcher_get(self, name: str, version: str) -> Optional[Dict[str, Any]]:
         fetcher = self._cql_runner.metadata_fetcher_get(name, version)
         if fetcher:
             return {
                 "name": fetcher.name,
                 "version": fetcher.version,
                 "metadata": json.loads(fetcher.metadata),
             }
         else:
             return None
 
     def metadata_authority_add(
         self, type: str, url: str, metadata: Dict[str, Any]
     ) -> None:
         self._cql_runner.metadata_authority_add(url, type, json.dumps(metadata))
 
     def metadata_authority_get(self, type: str, url: str) -> Optional[Dict[str, Any]]:
         authority = self._cql_runner.metadata_authority_get(type, url)
         if authority:
             return {
                 "type": authority.type,
                 "url": authority.url,
                 "metadata": json.loads(authority.metadata),
             }
         else:
             return None
 
     def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None:
         """Do nothing
 
         """
         return None
 
     def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict:
         return {}
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
index 0e8439e2..4738116a 100644
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -1,1293 +1,1289 @@
 # Copyright (C) 2015-2020  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import re
 import bisect
 import dateutil
 import collections
 import copy
 import datetime
 import itertools
 import random
 
 from collections import defaultdict
 from datetime import timedelta
 from typing import (
     Any,
     Callable,
     Dict,
     Generic,
     Hashable,
     Iterable,
     Iterator,
     List,
     Optional,
     Tuple,
     TypeVar,
     Union,
 )
 
 import attr
 
 from swh.core.api.serializers import msgpack_loads, msgpack_dumps
 from swh.model.model import (
     BaseContent,
     Content,
     SkippedContent,
     Directory,
     Revision,
     Release,
     Snapshot,
     OriginVisit,
     OriginVisitStatus,
     Origin,
     SHA1_SIZE,
 )
 from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
 from swh.storage.objstorage import ObjStorage
 from swh.storage.validate import convert_validation_exceptions
 from swh.storage.utils import now
 
 from .exc import StorageArgumentException, HashCollision
 
 from .converters import origin_url_to_sha1
 from .utils import get_partition_bounds_bytes
 from .writer import JournalWriter
 
 # Max block size of contents to return
 BULK_BLOCK_CONTENT_LEN_MAX = 10000
 
 
 SortedListItem = TypeVar("SortedListItem")
 SortedListKey = TypeVar("SortedListKey")
 
 FetcherKey = Tuple[str, str]
 
 
 class SortedList(collections.UserList, Generic[SortedListKey, SortedListItem]):
     data: List[Tuple[SortedListKey, SortedListItem]]
 
     # https://github.com/python/mypy/issues/708
     # key: Callable[[SortedListItem], SortedListKey]
 
     def __init__(
         self,
         data: List[SortedListItem] = None,
         key: Optional[Callable[[SortedListItem], SortedListKey]] = None,
     ):
         if key is None:
 
             def key(item):
                 return item
 
         assert key is not None  # for mypy
         super().__init__(sorted((key(x), x) for x in data or []))
 
         self.key: Callable[[SortedListItem], SortedListKey] = key
 
     def add(self, item: SortedListItem):
         k = self.key(item)
         bisect.insort(self.data, (k, item))
 
     def __iter__(self) -> Iterator[SortedListItem]:
         for (k, item) in self.data:
             yield item
 
     def iter_from(self, start_key: Any) -> Iterator[SortedListItem]:
         """Returns an iterator over all the elements whose key is greater
         or equal to `start_key`.
         (This is an efficient equivalent to:
         `(x for x in L if key(x) >= start_key)`)
         """
         from_index = bisect.bisect_left(self.data, (start_key,))
         for (k, item) in itertools.islice(self.data, from_index, None):
             yield item
 
     def iter_after(self, start_key: Any) -> Iterator[SortedListItem]:
         """Same as iter_from, but using a strict inequality."""
         it = self.iter_from(start_key)
         for item in it:
             if self.key(item) > start_key:  # type: ignore
                 yield item
                 break
 
         yield from it
 
 
 class InMemoryStorage:
     def __init__(self, journal_writer=None):
 
         self.reset()
         self.journal_writer = JournalWriter(journal_writer)
 
     def reset(self):
         self._contents = {}
         self._content_indexes = defaultdict(lambda: defaultdict(set))
         self._skipped_contents = {}
         self._skipped_content_indexes = defaultdict(lambda: defaultdict(set))
         self._directories = {}
         self._revisions = {}
         self._releases = {}
         self._snapshots = {}
         self._origins = {}
         self._origins_by_id = []
         self._origins_by_sha1 = {}
         self._origin_visits = {}
         self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {}
         self._persons = {}
 
         # {origin_url: {authority: [metadata]}}
         self._origin_metadata: Dict[
             str,
             Dict[
                 Hashable,
                 SortedList[Tuple[datetime.datetime, FetcherKey], Dict[str, Any]],
             ],
         ] = defaultdict(
             lambda: defaultdict(
                 lambda: SortedList(key=lambda x: (x["discovery_date"], x["fetcher"]))
             )
         )  # noqa
 
         self._metadata_fetchers: Dict[FetcherKey, Dict[str, Any]] = {}
         self._metadata_authorities: Dict[Hashable, Dict[str, Any]] = {}
         self._objects = defaultdict(list)
         self._sorted_sha1s = SortedList[bytes, bytes]()
 
         self.objstorage = ObjStorage({"cls": "memory", "args": {}})
 
     def check_config(self, *, check_write):
         return True
 
     def _content_add(self, contents: Iterable[Content], with_data: bool) -> Dict:
         self.journal_writer.content_add(contents)
 
         content_add = 0
         if with_data:
             summary = self.objstorage.content_add(
                 c for c in contents if c.status != "absent"
             )
             content_add_bytes = summary["content:add:bytes"]
 
         for content in contents:
             key = self._content_key(content)
             if key in self._contents:
                 continue
             for algorithm in DEFAULT_ALGORITHMS:
                 hash_ = content.get_hash(algorithm)
                 if hash_ in self._content_indexes[algorithm] and (
                     algorithm not in {"blake2s256", "sha256"}
                 ):
                     colliding_content_hashes = []
                     # Add the already stored contents
                     for content_hashes_set in self._content_indexes[algorithm][hash_]:
                         hashes = dict(content_hashes_set)
                         colliding_content_hashes.append(hashes)
                     # Add the new colliding content
                     colliding_content_hashes.append(content.hashes())
                     raise HashCollision(algorithm, hash_, colliding_content_hashes)
             for algorithm in DEFAULT_ALGORITHMS:
                 hash_ = content.get_hash(algorithm)
                 self._content_indexes[algorithm][hash_].add(key)
             self._objects[content.sha1_git].append(("content", content.sha1))
             self._contents[key] = content
             self._sorted_sha1s.add(content.sha1)
             self._contents[key] = attr.evolve(self._contents[key], data=None)
             content_add += 1
 
         summary = {
             "content:add": content_add,
         }
         if with_data:
             summary["content:add:bytes"] = content_add_bytes
 
         return summary
 
     def content_add(self, content: Iterable[Content]) -> Dict:
         content = [attr.evolve(c, ctime=now()) for c in content]
         return self._content_add(content, with_data=True)
 
     def content_update(self, content, keys=[]):
         self.journal_writer.content_update(content)
 
         for cont_update in content:
             cont_update = cont_update.copy()
             sha1 = cont_update.pop("sha1")
             for old_key in self._content_indexes["sha1"][sha1]:
                 old_cont = self._contents.pop(old_key)
 
                 for algorithm in DEFAULT_ALGORITHMS:
                     hash_ = old_cont.get_hash(algorithm)
                     self._content_indexes[algorithm][hash_].remove(old_key)
 
                 new_cont = attr.evolve(old_cont, **cont_update)
                 new_key = self._content_key(new_cont)
 
                 self._contents[new_key] = new_cont
 
                 for algorithm in DEFAULT_ALGORITHMS:
                     hash_ = new_cont.get_hash(algorithm)
                     self._content_indexes[algorithm][hash_].add(new_key)
 
     def content_add_metadata(self, content: Iterable[Content]) -> Dict:
         return self._content_add(content, with_data=False)
 
     def content_get(self, content):
         # FIXME: Make this method support slicing the `data`.
         if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
             raise StorageArgumentException(
                 "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX
             )
         yield from self.objstorage.content_get(content)
 
     def content_get_range(self, start, end, limit=1000):
         if limit is None:
             raise StorageArgumentException("limit should not be None")
         sha1s = (
             (sha1, content_key)
             for sha1 in self._sorted_sha1s.iter_from(start)
             for content_key in self._content_indexes["sha1"][sha1]
         )
         matched = []
         next_content = None
         for sha1, key in sha1s:
             if sha1 > end:
                 break
             if len(matched) >= limit:
                 next_content = sha1
                 break
             matched.append(self._contents[key].to_dict())
         return {
             "contents": matched,
             "next": next_content,
         }
 
     def content_get_partition(
         self,
         partition_id: int,
         nb_partitions: int,
         limit: int = 1000,
         page_token: str = None,
     ):
         if limit is None:
             raise StorageArgumentException("limit should not be None")
         (start, end) = get_partition_bounds_bytes(
             partition_id, nb_partitions, SHA1_SIZE
         )
         if page_token:
             start = hash_to_bytes(page_token)
         if end is None:
             end = b"\xff" * SHA1_SIZE
         result = self.content_get_range(start, end, limit)
         result2 = {
             "contents": result["contents"],
             "next_page_token": None,
         }
         if result["next"]:
             result2["next_page_token"] = hash_to_hex(result["next"])
         return result2
 
     def content_get_metadata(self, contents: List[bytes]) -> Dict[bytes, List[Dict]]:
         result: Dict = {sha1: [] for sha1 in contents}
         for sha1 in contents:
             if sha1 in self._content_indexes["sha1"]:
                 objs = self._content_indexes["sha1"][sha1]
                 # only 1 element as content_add_metadata would have raised a
                 # hash collision otherwise
                 for key in objs:
                     d = self._contents[key].to_dict()
                     del d["ctime"]
                     if "data" in d:
                         del d["data"]
                     result[sha1].append(d)
         return result
 
     def content_find(self, content):
         if not set(content).intersection(DEFAULT_ALGORITHMS):
             raise StorageArgumentException(
                 "content keys must contain at least one of: %s"
                 % ", ".join(sorted(DEFAULT_ALGORITHMS))
             )
         found = []
         for algo in DEFAULT_ALGORITHMS:
             hash = content.get(algo)
             if hash and hash in self._content_indexes[algo]:
                 found.append(self._content_indexes[algo][hash])
 
         if not found:
             return []
 
         keys = list(set.intersection(*found))
         return [self._contents[key].to_dict() for key in keys]
 
     def content_missing(self, content, key_hash="sha1"):
         for cont in content:
             for (algo, hash_) in cont.items():
                 if algo not in DEFAULT_ALGORITHMS:
                     continue
                 if hash_ not in self._content_indexes.get(algo, []):
                     yield cont[key_hash]
                     break
             else:
                 for result in self.content_find(cont):
                     if result["status"] == "missing":
                         yield cont[key_hash]
 
     def content_missing_per_sha1(self, contents):
         for content in contents:
             if content not in self._content_indexes["sha1"]:
                 yield content
 
     def content_missing_per_sha1_git(self, contents):
         for content in contents:
             if content not in self._content_indexes["sha1_git"]:
                 yield content
 
     def content_get_random(self):
         return random.choice(list(self._content_indexes["sha1_git"]))
 
     def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict:
         self.journal_writer.skipped_content_add(contents)
 
         summary = {"skipped_content:add": 0}
 
         missing_contents = self.skipped_content_missing([c.hashes() for c in contents])
         missing = {self._content_key(c) for c in missing_contents}
         contents = [c for c in contents if self._content_key(c) in missing]
         for content in contents:
             key = self._content_key(content)
             for algo in DEFAULT_ALGORITHMS:
                 if content.get_hash(algo):
                     self._skipped_content_indexes[algo][content.get_hash(algo)].add(key)
             self._skipped_contents[key] = content
             summary["skipped_content:add"] += 1
 
         return summary
 
     def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict:
         content = [attr.evolve(c, ctime=now()) for c in content]
         return self._skipped_content_add(content)
 
     def skipped_content_missing(self, contents):
         for content in contents:
             matches = list(self._skipped_contents.values())
             for (algorithm, key) in self._content_key(content):
                 if algorithm == "blake2s256":
                     continue
                 # Filter out skipped contents with the same hash
                 matches = [
                     match for match in matches if match.get_hash(algorithm) == key
                 ]
             # if none of the contents match
             if not matches:
                 yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS}
 
     def directory_add(self, directories: Iterable[Directory]) -> Dict:
         directories = [dir_ for dir_ in directories if dir_.id not in self._directories]
         self.journal_writer.directory_add(directories)
 
         count = 0
         for directory in directories:
             count += 1
             self._directories[directory.id] = directory
             self._objects[directory.id].append(("directory", directory.id))
 
         return {"directory:add": count}
 
     def directory_missing(self, directories):
         for id in directories:
             if id not in self._directories:
                 yield id
 
     def _join_dentry_to_content(self, dentry):
         keys = (
             "status",
             "sha1",
             "sha1_git",
             "sha256",
             "length",
         )
         ret = dict.fromkeys(keys)
         ret.update(dentry)
         if ret["type"] == "file":
             # TODO: Make it able to handle more than one content
             content = self.content_find({"sha1_git": ret["target"]})
             if content:
                 content = content[0]
                 for key in keys:
                     ret[key] = content[key]
         return ret
 
     def _directory_ls(self, directory_id, recursive, prefix=b""):
         if directory_id in self._directories:
             for entry in self._directories[directory_id].entries:
                 ret = self._join_dentry_to_content(entry.to_dict())
                 ret["name"] = prefix + ret["name"]
                 ret["dir_id"] = directory_id
                 yield ret
                 if recursive and ret["type"] == "dir":
                     yield from self._directory_ls(
                         ret["target"], True, prefix + ret["name"] + b"/"
                     )
 
     def directory_ls(self, directory, recursive=False):
         yield from self._directory_ls(directory, recursive)
 
     def directory_entry_get_by_path(self, directory, paths):
         return self._directory_entry_get_by_path(directory, paths, b"")
 
     def directory_get_random(self):
         if not self._directories:
             return None
         return random.choice(list(self._directories))
 
     def _directory_entry_get_by_path(self, directory, paths, prefix):
         if not paths:
             return
 
         contents = list(self.directory_ls(directory))
 
         if not contents:
             return
 
         def _get_entry(entries, name):
             for entry in entries:
                 if entry["name"] == name:
                     entry = entry.copy()
                     entry["name"] = prefix + entry["name"]
                     return entry
 
         first_item = _get_entry(contents, paths[0])
 
         if len(paths) == 1:
             return first_item
 
         if not first_item or first_item["type"] != "dir":
             return
 
         return self._directory_entry_get_by_path(
             first_item["target"], paths[1:], prefix + paths[0] + b"/"
         )
 
     def revision_add(self, revisions: Iterable[Revision]) -> Dict:
         revisions = [rev for rev in revisions if rev.id not in self._revisions]
         self.journal_writer.revision_add(revisions)
 
         count = 0
         for revision in revisions:
             revision = attr.evolve(
                 revision,
                 committer=self._person_add(revision.committer),
                 author=self._person_add(revision.author),
             )
             self._revisions[revision.id] = revision
             self._objects[revision.id].append(("revision", revision.id))
             count += 1
 
         return {"revision:add": count}
 
     def revision_missing(self, revisions):
         for id in revisions:
             if id not in self._revisions:
                 yield id
 
     def revision_get(self, revisions):
         for id in revisions:
             if id in self._revisions:
                 yield self._revisions.get(id).to_dict()
             else:
                 yield None
 
     def _get_parent_revs(self, rev_id, seen, limit):
         if limit and len(seen) >= limit:
             return
         if rev_id in seen or rev_id not in self._revisions:
             return
         seen.add(rev_id)
         yield self._revisions[rev_id].to_dict()
         for parent in self._revisions[rev_id].parents:
             yield from self._get_parent_revs(parent, seen, limit)
 
     def revision_log(self, revisions, limit=None):
         seen = set()
         for rev_id in revisions:
             yield from self._get_parent_revs(rev_id, seen, limit)
 
     def revision_shortlog(self, revisions, limit=None):
         yield from (
             (rev["id"], rev["parents"]) for rev in self.revision_log(revisions, limit)
         )
 
     def revision_get_random(self):
         return random.choice(list(self._revisions))
 
     def release_add(self, releases: Iterable[Release]) -> Dict:
         releases = [rel for rel in releases if rel.id not in self._releases]
         self.journal_writer.release_add(releases)
 
         count = 0
         for rel in releases:
             if rel.author:
                 self._person_add(rel.author)
             self._objects[rel.id].append(("release", rel.id))
             self._releases[rel.id] = rel
             count += 1
 
         return {"release:add": count}
 
     def release_missing(self, releases):
         yield from (rel for rel in releases if rel not in self._releases)
 
     def release_get(self, releases):
         for rel_id in releases:
             if rel_id in self._releases:
                 yield self._releases[rel_id].to_dict()
             else:
                 yield None
 
     def release_get_random(self):
         return random.choice(list(self._releases))
 
     def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict:
         count = 0
         snapshots = (snap for snap in snapshots if snap.id not in self._snapshots)
         for snapshot in snapshots:
             self.journal_writer.snapshot_add([snapshot])
             self._snapshots[snapshot.id] = snapshot
             self._objects[snapshot.id].append(("snapshot", snapshot.id))
             count += 1
 
         return {"snapshot:add": count}
 
     def snapshot_missing(self, snapshots):
         for id in snapshots:
             if id not in self._snapshots:
                 yield id
 
     def snapshot_get(self, snapshot_id):
         return self.snapshot_get_branches(snapshot_id)
 
     def snapshot_get_by_origin_visit(self, origin, visit):
         origin_url = self._get_origin_url(origin)
         if not origin_url:
             return
 
         if origin_url not in self._origins or visit > len(
             self._origin_visits[origin_url]
         ):
             return None
 
         visit = self._origin_visit_get_updated(origin_url, visit)
         snapshot_id = visit.snapshot
         if snapshot_id:
             return self.snapshot_get(snapshot_id)
         else:
             return None
 
     def snapshot_get_latest(self, origin, allowed_statuses=None):
         origin_url = self._get_origin_url(origin)
         if not origin_url:
             return
 
         visit = self.origin_visit_get_latest(
             origin_url, allowed_statuses=allowed_statuses, require_snapshot=True
         )
         if visit and visit["snapshot"]:
             snapshot = self.snapshot_get(visit["snapshot"])
             if not snapshot:
                 raise StorageArgumentException(
                     "last origin visit references an unknown snapshot"
                 )
             return snapshot
 
     def snapshot_count_branches(self, snapshot_id):
         snapshot = self._snapshots[snapshot_id]
         return collections.Counter(
             branch.target_type.value if branch else None
             for branch in snapshot.branches.values()
         )
 
     def snapshot_get_branches(
         self, snapshot_id, branches_from=b"", branches_count=1000, target_types=None
     ):
         snapshot = self._snapshots.get(snapshot_id)
         if snapshot is None:
             return None
         sorted_branch_names = sorted(snapshot.branches)
         from_index = bisect.bisect_left(sorted_branch_names, branches_from)
         if target_types:
             next_branch = None
             branches = {}
             for branch_name in sorted_branch_names[from_index:]:
                 branch = snapshot.branches[branch_name]
                 if branch and branch.target_type.value in target_types:
                     if len(branches) < branches_count:
                         branches[branch_name] = branch
                     else:
                         next_branch = branch_name
                         break
         else:
             # As there is no 'target_types', we can do that much faster
             to_index = from_index + branches_count
             returned_branch_names = sorted_branch_names[from_index:to_index]
             branches = {
                 branch_name: snapshot.branches[branch_name]
                 for branch_name in returned_branch_names
             }
             if to_index >= len(sorted_branch_names):
                 next_branch = None
             else:
                 next_branch = sorted_branch_names[to_index]
 
         branches = {
             name: branch.to_dict() if branch else None
             for (name, branch) in branches.items()
         }
 
         return {
             "id": snapshot_id,
             "branches": branches,
             "next_branch": next_branch,
         }
 
     def snapshot_get_random(self):
         return random.choice(list(self._snapshots))
 
     def object_find_by_sha1_git(self, ids):
         ret = {}
         for id_ in ids:
             objs = self._objects.get(id_, [])
             ret[id_] = [{"sha1_git": id_, "type": obj[0],} for obj in objs]
         return ret
 
     def _convert_origin(self, t):
         if t is None:
             return None
 
         return t.to_dict()
 
     def origin_get(self, origins):
         if isinstance(origins, dict):
             # Old API
             return_single = True
             origins = [origins]
         else:
             return_single = False
 
         # Sanity check to be error-compatible with the pgsql backend
         if any("id" in origin for origin in origins) and not all(
             "id" in origin for origin in origins
         ):
             raise StorageArgumentException(
                 'Either all origins or none at all should have an "id".'
             )
         if any("url" in origin for origin in origins) and not all(
             "url" in origin for origin in origins
         ):
             raise StorageArgumentException(
                 "Either all origins or none at all should have " 'an "url" key.'
             )
 
         results = []
         for origin in origins:
             result = None
             if "url" in origin:
                 if origin["url"] in self._origins:
                     result = self._origins[origin["url"]]
             else:
                 raise StorageArgumentException("Origin must have an url.")
             results.append(self._convert_origin(result))
 
         if return_single:
             assert len(results) == 1
             return results[0]
         else:
             return results
 
     def origin_get_by_sha1(self, sha1s):
         return [self._convert_origin(self._origins_by_sha1.get(sha1)) for sha1 in sha1s]
 
     def origin_get_range(self, origin_from=1, origin_count=100):
         origin_from = max(origin_from, 1)
         if origin_from <= len(self._origins_by_id):
             max_idx = origin_from + origin_count - 1
             if max_idx > len(self._origins_by_id):
                 max_idx = len(self._origins_by_id)
             for idx in range(origin_from - 1, max_idx):
                 origin = self._convert_origin(self._origins[self._origins_by_id[idx]])
                 yield {"id": idx + 1, **origin}
 
     def origin_list(self, page_token: Optional[str] = None, limit: int = 100) -> dict:
         origin_urls = sorted(self._origins)
         if page_token:
             from_ = bisect.bisect_left(origin_urls, page_token)
         else:
             from_ = 0
 
         result = {
             "origins": [
                 {"url": origin_url} for origin_url in origin_urls[from_ : from_ + limit]
             ]
         }
 
         if from_ + limit < len(origin_urls):
             result["next_page_token"] = origin_urls[from_ + limit]
 
         return result
 
     def origin_search(
         self, url_pattern, offset=0, limit=50, regexp=False, with_visit=False
     ):
         origins = map(self._convert_origin, self._origins.values())
         if regexp:
             pat = re.compile(url_pattern)
             origins = [orig for orig in origins if pat.search(orig["url"])]
         else:
             origins = [orig for orig in origins if url_pattern in orig["url"]]
         if with_visit:
             filtered_origins = []
             for orig in origins:
                 visits = (
                     self._origin_visit_get_updated(ov.origin, ov.visit)
                     for ov in self._origin_visits[orig["url"]]
                 )
                 for ov in visits:
                     if ov.snapshot and ov.snapshot in self._snapshots:
                         filtered_origins.append(orig)
                         break
         else:
             filtered_origins = origins
 
         return filtered_origins[offset : offset + limit]
 
     def origin_count(self, url_pattern, regexp=False, with_visit=False):
         return len(
             self.origin_search(
                 url_pattern,
                 regexp=regexp,
                 with_visit=with_visit,
                 limit=len(self._origins),
             )
         )
 
     def origin_add(self, origins: Iterable[Origin]) -> List[Dict]:
         origins = copy.deepcopy(list(origins))
         for origin in origins:
             self.origin_add_one(origin)
         return [origin.to_dict() for origin in origins]
 
     def origin_add_one(self, origin: Origin) -> str:
         if origin.url not in self._origins:
             self.journal_writer.origin_add([origin])
             # generate an origin_id because it is needed by origin_get_range.
             # TODO: remove this when we remove origin_get_range
             origin_id = len(self._origins) + 1
             self._origins_by_id.append(origin.url)
             assert len(self._origins_by_id) == origin_id
 
             self._origins[origin.url] = origin
             self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin
             self._origin_visits[origin.url] = []
             self._objects[origin.url].append(("origin", origin.url))
 
         return origin.url
 
     def origin_visit_add(
         self, origin_url: str, date: Union[str, datetime.datetime], type: str
     ) -> OriginVisit:
         if isinstance(date, str):
             # FIXME: Converge on iso8601 at some point
             date = dateutil.parser.parse(date)
         elif not isinstance(date, datetime.datetime):
             raise StorageArgumentException("Date must be a datetime or a string")
 
         origin = self.origin_get({"url": origin_url})
         if not origin:  # Cannot add a visit without an origin
             raise StorageArgumentException("Unknown origin %s", origin_url)
 
         if origin_url in self._origins:
             origin = self._origins[origin_url]
             # visit ids are in the range [1, +inf[
             visit_id = len(self._origin_visits[origin_url]) + 1
             status = "ongoing"
             with convert_validation_exceptions():
                 visit = OriginVisit(
                     origin=origin_url,
                     date=date,
                     type=type,
                     # TODO: Remove when we remove those fields from the model
                     status=status,
                     snapshot=None,
                     metadata=None,
                     visit=visit_id,
                 )
             self.journal_writer.origin_visit_add([visit])
             self._origin_visits[origin_url].append(visit)
             assert visit.visit is not None
             visit_key = (origin_url, visit.visit)
 
             with convert_validation_exceptions():
                 visit_update = OriginVisitStatus(
                     origin=origin_url,
                     visit=visit_id,
                     date=date,
                     status=status,
                     snapshot=None,
                     metadata=None,
                 )
             self._origin_visit_status_add_one(visit_update)
             self._objects[visit_key].append(("origin_visit", None))
 
         # return last visit
         return visit
 
     def _origin_visit_status_add_one(self, visit_status: OriginVisitStatus) -> None:
         """Add an origin visit status without checks.
 
         """
         self.journal_writer.origin_visit_status_add([visit_status])
         visit_key = (visit_status.origin, visit_status.visit)
         self._origin_visit_statuses.setdefault(visit_key, [])
         self._origin_visit_statuses[visit_key].append(visit_status)
 
     def origin_visit_status_add(
         self, visit_statuses: Iterable[OriginVisitStatus],
     ) -> None:
         # First round to check existence (fail early if any is ko)
         for visit_status in visit_statuses:
             origin_url = self.origin_get({"url": visit_status.origin})
             if not origin_url:
                 raise StorageArgumentException(f"Unknown origin {visit_status.origin}")
 
         for visit_status in visit_statuses:
             self._origin_visit_status_add_one(visit_status)
 
     def origin_visit_update(
         self,
         origin: str,
         visit_id: int,
         status: str,
         metadata: Optional[Dict] = None,
         snapshot: Optional[bytes] = None,
         date: Optional[datetime.datetime] = None,
     ):
         origin_url = self._get_origin_url(origin)
         if origin_url is None:
             raise StorageArgumentException("Unknown origin.")
 
         try:
             visit = self._origin_visits[origin_url][visit_id - 1]
         except IndexError:
             raise StorageArgumentException("Unknown visit_id for this origin") from None
 
         updates: Dict[str, Any] = {
             "status": status,
         }
         if metadata and metadata != visit.metadata:
             updates["metadata"] = metadata
         if snapshot and snapshot != visit.snapshot:
             updates["snapshot"] = snapshot
 
         if updates:
             with convert_validation_exceptions():
                 updated_visit = OriginVisit.from_dict({**visit.to_dict(), **updates})
             self.journal_writer.origin_visit_update([updated_visit])
 
             self._origin_visits[origin_url][visit_id - 1] = updated_visit
 
             # Retrieve the previous visit status
             assert visit.visit is not None
-            visit_key = (origin_url, visit.visit)
 
             last_visit_status = self._origin_visit_get_updated(origin, visit_id)
             assert last_visit_status is not None
 
             with convert_validation_exceptions():
                 visit_status = OriginVisitStatus(
                     origin=origin_url,
                     visit=visit_id,
                     date=date or now(),
                     status=status,
                     snapshot=snapshot or last_visit_status.snapshot,
                     metadata=metadata or last_visit_status.metadata,
                 )
-                visit_key = (visit_status.origin, visit_status.visit)
-                self._origin_visit_statuses.setdefault(visit_key, [])
-                self._origin_visit_statuses[visit_key].append(visit_status)
-                # self._origin_visit_status_add_one(visit_status)
+                self._origin_visit_status_add_one(visit_status)
 
     def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None:
         for visit in visits:
             if visit.visit is None:
                 raise StorageArgumentException(f"Missing visit id for visit {visit}")
 
         self.journal_writer.origin_visit_upsert(visits)
 
         date = now()
 
         for visit in visits:
             assert visit.visit is not None
             assert visit.visit > 0
             origin_url = visit.origin
             origin = self.origin_get({"url": origin_url})
 
             if not origin:  # Cannot add a visit without an origin
                 raise StorageArgumentException("Unknown origin %s", origin_url)
 
             if origin_url in self._origins:
                 origin = self._origins[origin_url]
                 # visit ids are in the range [1, +inf[
                 assert visit.visit is not None
                 visit_key = (origin_url, visit.visit)
 
                 with convert_validation_exceptions():
                     visit_update = OriginVisitStatus(
                         origin=origin_url,
                         visit=visit.visit,
                         date=date,
                         status=visit.status,
                         snapshot=visit.snapshot,
                         metadata=visit.metadata,
                     )
 
                 self._origin_visit_statuses.setdefault(visit_key, [])
                 while len(self._origin_visits[origin_url]) < visit.visit:
                     self._origin_visits[origin_url].append(None)
 
                 self._origin_visits[origin_url][visit.visit - 1] = visit
                 self._origin_visit_statuses[visit_key].append(visit_update)
 
                 self._objects[visit_key].append(("origin_visit", None))
 
     def _origin_visit_get_updated(self, origin: str, visit_id: int) -> OriginVisit:
         """Merge origin visit and latest origin visit status
 
         """
         assert visit_id >= 1
         visit = self._origin_visits[origin][visit_id - 1]
         assert visit is not None
         visit_key = (origin, visit_id)
 
         visit_update = max(self._origin_visit_statuses[visit_key], key=lambda v: v.date)
         return OriginVisit.from_dict(
             {
                 # default to the values in visit
                 **visit.to_dict(),
                 # override with the last update
                 **visit_update.to_dict(),
                 # but keep the date of the creation of the origin visit
                 "date": visit.date,
             }
         )
 
     def origin_visit_get(
         self, origin: str, last_visit: Optional[int] = None, limit: Optional[int] = None
     ) -> Iterable[Dict[str, Any]]:
 
         origin_url = self._get_origin_url(origin)
         if origin_url in self._origin_visits:
             visits = self._origin_visits[origin_url]
             if last_visit is not None:
                 visits = visits[last_visit:]
             if limit is not None:
                 visits = visits[:limit]
             for visit in visits:
                 if not visit:
                     continue
                 visit_id = visit.visit
 
                 visit_update = self._origin_visit_get_updated(origin_url, visit_id)
                 assert visit_update is not None
                 yield visit_update.to_dict()
 
     def origin_visit_find_by_date(
         self, origin: str, visit_date: datetime.datetime
     ) -> Optional[Dict[str, Any]]:
         origin_url = self._get_origin_url(origin)
         if origin_url in self._origin_visits:
             visits = self._origin_visits[origin_url]
             visit = min(visits, key=lambda v: (abs(v.date - visit_date), -v.visit))
             visit_update = self._origin_visit_get_updated(origin, visit.visit)
             assert visit_update is not None
             return visit_update.to_dict()
         return None
 
     def origin_visit_get_by(self, origin: str, visit: int) -> Optional[Dict[str, Any]]:
         origin_url = self._get_origin_url(origin)
         if origin_url in self._origin_visits and visit <= len(
             self._origin_visits[origin_url]
         ):
             visit_update = self._origin_visit_get_updated(origin_url, visit)
             assert visit_update is not None
             return visit_update.to_dict()
         return None
 
     def origin_visit_get_latest(
         self,
         origin: str,
         allowed_statuses: Optional[List[str]] = None,
         require_snapshot: bool = False,
     ) -> Optional[Dict[str, Any]]:
         ori = self._origins.get(origin)
         if not ori:
             return None
         visits = self._origin_visits[ori.url]
 
         visits = [
             self._origin_visit_get_updated(visit.origin, visit.visit)
             for visit in visits
             if visit is not None
         ]
 
         if allowed_statuses is not None:
             visits = [visit for visit in visits if visit.status in allowed_statuses]
         if require_snapshot:
             visits = [visit for visit in visits if visit.snapshot]
 
         visit = max(visits, key=lambda v: (v.date, v.visit), default=None)
         if visit is None:
             return None
         return visit.to_dict()
 
     def _select_random_origin_visit_by_type(self, type: str) -> str:
         while True:
             url = random.choice(list(self._origin_visits.keys()))
             random_origin_visits = self._origin_visits[url]
             if random_origin_visits[0].type == type:
                 return url
 
     def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]:
         url = self._select_random_origin_visit_by_type(type)
         random_origin_visits = copy.deepcopy(self._origin_visits[url])
         random_origin_visits.reverse()
         back_in_the_day = now() - timedelta(weeks=12)  # 3 months back
         # This should be enough for tests
         for visit in random_origin_visits:
             updated_visit = self._origin_visit_get_updated(url, visit.visit)
             assert updated_visit is not None
             if updated_visit.date > back_in_the_day and updated_visit.status == "full":
                 return updated_visit.to_dict()
         else:
             return None
 
     def stat_counters(self):
         keys = (
             "content",
             "directory",
             "origin",
             "origin_visit",
             "person",
             "release",
             "revision",
             "skipped_content",
             "snapshot",
         )
         stats = {key: 0 for key in keys}
         stats.update(
             collections.Counter(
                 obj_type
                 for (obj_type, obj_id) in itertools.chain(*self._objects.values())
             )
         )
         return stats
 
     def refresh_stat_counters(self):
         pass
 
     def origin_metadata_add(
         self,
         origin_url: str,
         discovery_date: datetime.datetime,
         authority: Dict[str, Any],
         fetcher: Dict[str, Any],
         format: str,
         metadata: bytes,
     ) -> None:
         if not isinstance(origin_url, str):
             raise StorageArgumentException(
                 "origin_id must be str, not %r" % (origin_url,)
             )
         if not isinstance(metadata, bytes):
             raise StorageArgumentException(
                 "metadata must be bytes, not %r" % (metadata,)
             )
         authority_key = self._metadata_authority_key(authority)
         if authority_key not in self._metadata_authorities:
             raise StorageArgumentException(f"Unknown authority {authority}")
         fetcher_key = self._metadata_fetcher_key(fetcher)
         if fetcher_key not in self._metadata_fetchers:
             raise StorageArgumentException(f"Unknown fetcher {fetcher}")
 
         origin_metadata_list = self._origin_metadata[origin_url][authority_key]
 
         origin_metadata = {
             "origin_url": origin_url,
             "discovery_date": discovery_date,
             "authority": authority_key,
             "fetcher": fetcher_key,
             "format": format,
             "metadata": metadata,
         }
 
         for existing_origin_metadata in origin_metadata_list:
             if (
                 existing_origin_metadata["fetcher"] == fetcher_key
                 and existing_origin_metadata["discovery_date"] == discovery_date
             ):
                 # Duplicate of an existing one; replace it.
                 existing_origin_metadata.update(origin_metadata)
                 break
         else:
             origin_metadata_list.add(origin_metadata)
         return None
 
     def origin_metadata_get(
         self,
         origin_url: str,
         authority: Dict[str, str],
         after: Optional[datetime.datetime] = None,
         page_token: Optional[bytes] = None,
         limit: int = 1000,
     ) -> Dict[str, Any]:
         if not isinstance(origin_url, str):
             raise TypeError("origin_url must be str, not %r" % (origin_url,))
 
         authority_key = self._metadata_authority_key(authority)
 
         if page_token is not None:
             (after_time, after_fetcher) = msgpack_loads(page_token)
             after_fetcher = tuple(after_fetcher)
             if after is not None and after > after_time:
                 raise StorageArgumentException(
                     "page_token is inconsistent with the value of 'after'."
                 )
             entries = self._origin_metadata[origin_url][authority_key].iter_after(
                 (after_time, after_fetcher)
             )
         elif after is not None:
             entries = self._origin_metadata[origin_url][authority_key].iter_from(
                 (after,)
             )
             entries = (entry for entry in entries if entry["discovery_date"] > after)
         else:
             entries = iter(self._origin_metadata[origin_url][authority_key])
 
         if limit:
             entries = itertools.islice(entries, 0, limit + 1)
 
         results = []
         for entry in entries:
             authority = self._metadata_authorities[entry["authority"]]
             fetcher = self._metadata_fetchers[entry["fetcher"]]
             if after:
                 assert entry["discovery_date"] > after
             results.append(
                 {
                     **entry,
                     "authority": {"type": authority["type"], "url": authority["url"],},
                     "fetcher": {
                         "name": fetcher["name"],
                         "version": fetcher["version"],
                     },
                 }
             )
 
         if len(results) > limit:
             results.pop()
             assert len(results) == limit
             last_result = results[-1]
             next_page_token: Optional[bytes] = msgpack_dumps(
                 (
                     last_result["discovery_date"],
                     self._metadata_fetcher_key(last_result["fetcher"]),
                 )
             )
         else:
             next_page_token = None
 
         return {
             "next_page_token": next_page_token,
             "results": results,
         }
 
     def metadata_fetcher_add(
         self, name: str, version: str, metadata: Dict[str, Any]
     ) -> None:
         fetcher = {
             "name": name,
             "version": version,
             "metadata": metadata,
         }
         key = self._metadata_fetcher_key(fetcher)
         if key not in self._metadata_fetchers:
             self._metadata_fetchers[key] = fetcher
 
     def metadata_fetcher_get(self, name: str, version: str) -> Optional[Dict[str, Any]]:
         return self._metadata_fetchers.get(
             self._metadata_fetcher_key({"name": name, "version": version})
         )
 
     def metadata_authority_add(
         self, type: str, url: str, metadata: Dict[str, Any]
     ) -> None:
         authority = {
             "type": type,
             "url": url,
             "metadata": metadata,
         }
         key = self._metadata_authority_key(authority)
         self._metadata_authorities[key] = authority
 
     def metadata_authority_get(self, type: str, url: str) -> Optional[Dict[str, Any]]:
         return self._metadata_authorities.get(
             self._metadata_authority_key({"type": type, "url": url})
         )
 
     def _get_origin_url(self, origin):
         if isinstance(origin, str):
             return origin
         else:
             raise TypeError("origin must be a string.")
 
     def _person_add(self, person):
         key = ("person", person.fullname)
         if key not in self._objects:
             self._persons[person.fullname] = person
             self._objects[key].append(key)
 
         return self._persons[person.fullname]
 
     @staticmethod
     def _content_key(content):
         """ A stable key and the algorithm for a content"""
         if isinstance(content, BaseContent):
             content = content.to_dict()
         return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS))
 
     @staticmethod
     def _metadata_fetcher_key(fetcher: Dict) -> FetcherKey:
         return (fetcher["name"], fetcher["version"])
 
     @staticmethod
     def _metadata_authority_key(authority: Dict) -> Hashable:
         return (authority["type"], authority["url"])
 
     def diff_directories(self, from_dir, to_dir, track_renaming=False):
         raise NotImplementedError("InMemoryStorage.diff_directories")
 
     def diff_revisions(self, from_rev, to_rev, track_renaming=False):
         raise NotImplementedError("InMemoryStorage.diff_revisions")
 
     def diff_revision(self, revision, track_renaming=False):
         raise NotImplementedError("InMemoryStorage.diff_revision")
 
     def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None:
         """Do nothing
 
         """
         return None
 
     def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict:
         return {}
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
index 5a5bb5d1..7b9b9a7f 100644
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -1,1420 +1,1414 @@
 # Copyright (C) 2015-2020  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import contextlib
 import datetime
 import itertools
 
 from collections import defaultdict
 from contextlib import contextmanager
 from typing import Any, Dict, Iterable, List, Optional, Union
 
 import attr
 import dateutil.parser
 import psycopg2
 import psycopg2.pool
 import psycopg2.errors
 
 from swh.core.api.serializers import msgpack_loads, msgpack_dumps
 from swh.model.model import (
     Content,
     Directory,
     Origin,
     OriginVisit,
     OriginVisitStatus,
     Revision,
     Release,
     SkippedContent,
     Snapshot,
     SHA1_SIZE,
 )
 from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
 from swh.storage.objstorage import ObjStorage
 from swh.storage.validate import VALIDATION_EXCEPTIONS
 from swh.storage.utils import now
 
 from . import converters
 from .common import db_transaction_generator, db_transaction
 from .db import Db
 from .exc import StorageArgumentException, StorageDBError, HashCollision
 from .algos import diff
 from .metrics import timed, send_metric, process_metrics
 from .utils import get_partition_bounds_bytes, extract_collision_hash
 from .writer import JournalWriter
 
 
 # Max block size of contents to return
 BULK_BLOCK_CONTENT_LEN_MAX = 10000
 
 EMPTY_SNAPSHOT_ID = hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e")
 """Identifier for the empty snapshot"""
 
 
 VALIDATION_EXCEPTIONS = VALIDATION_EXCEPTIONS + [
     psycopg2.errors.CheckViolation,
     psycopg2.errors.IntegrityError,
     psycopg2.errors.InvalidTextRepresentation,
     psycopg2.errors.NotNullViolation,
     psycopg2.errors.NumericValueOutOfRange,
     psycopg2.errors.UndefinedFunction,  # (raised on wrong argument typs)
 ]
 """Exceptions raised by postgresql when validation of the arguments
 failed."""
 
 
 @contextlib.contextmanager
 def convert_validation_exceptions():
     """Catches postgresql errors related to invalid arguments, and
     re-raises a StorageArgumentException."""
     try:
         yield
     except tuple(VALIDATION_EXCEPTIONS) as e:
         raise StorageArgumentException(str(e))
 
 
 class Storage:
     """SWH storage proxy, encompassing DB and object storage
 
     """
 
     def __init__(
         self, db, objstorage, min_pool_conns=1, max_pool_conns=10, journal_writer=None
     ):
         """
         Args:
             db_conn: either a libpq connection string, or a psycopg2 connection
             obj_root: path to the root of the object storage
 
         """
         try:
             if isinstance(db, psycopg2.extensions.connection):
                 self._pool = None
                 self._db = Db(db)
             else:
                 self._pool = psycopg2.pool.ThreadedConnectionPool(
                     min_pool_conns, max_pool_conns, db
                 )
                 self._db = None
         except psycopg2.OperationalError as e:
             raise StorageDBError(e)
 
         self.journal_writer = JournalWriter(journal_writer)
         self.objstorage = ObjStorage(objstorage)
 
     def get_db(self):
         if self._db:
             return self._db
         else:
             return Db.from_pool(self._pool)
 
     def put_db(self, db):
         if db is not self._db:
             db.put_conn()
 
     @contextmanager
     def db(self):
         db = None
         try:
             db = self.get_db()
             yield db
         finally:
             if db:
                 self.put_db(db)
 
     @timed
     @db_transaction()
     def check_config(self, *, check_write, db=None, cur=None):
 
         if not self.objstorage.check_config(check_write=check_write):
             return False
 
         # Check permissions on one of the tables
         if check_write:
             check = "INSERT"
         else:
             check = "SELECT"
 
         cur.execute("select has_table_privilege(current_user, 'content', %s)", (check,))
         return cur.fetchone()[0]
 
     def _content_unique_key(self, hash, db):
         """Given a hash (tuple or dict), return a unique key from the
            aggregation of keys.
 
         """
         keys = db.content_hash_keys
         if isinstance(hash, tuple):
             return hash
         return tuple([hash[k] for k in keys])
 
     def _content_add_metadata(self, db, cur, content):
         """Add content to the postgresql database but not the object storage.
         """
         # create temporary table for metadata injection
         db.mktemp("content", cur)
 
         db.copy_to(
             (c.to_dict() for c in content), "tmp_content", db.content_add_keys, cur
         )
 
         # move metadata in place
         try:
             db.content_add_from_temp(cur)
         except psycopg2.IntegrityError as e:
             if e.diag.sqlstate == "23505" and e.diag.table_name == "content":
                 message_detail = e.diag.message_detail
                 if message_detail:
                     hash_name, hash_id = extract_collision_hash(message_detail)
                     collision_contents_hashes = [
                         c.hashes() for c in content if c.get_hash(hash_name) == hash_id
                     ]
                 else:
                     constraint_to_hash_name = {
                         "content_pkey": "sha1",
                         "content_sha1_git_idx": "sha1_git",
                         "content_sha256_idx": "sha256",
                     }
                     hash_name = constraint_to_hash_name.get(e.diag.constraint_name)
                     hash_id = None
                     collision_contents_hashes = None
 
                 raise HashCollision(
                     hash_name, hash_id, collision_contents_hashes
                 ) from None
             else:
                 raise
 
     @timed
     @process_metrics
     def content_add(self, content: Iterable[Content]) -> Dict:
         ctime = now()
 
         contents = [attr.evolve(c, ctime=ctime) for c in content]
 
         objstorage_summary = self.objstorage.content_add(contents)
 
         with self.db() as db:
             with db.transaction() as cur:
                 missing = list(
                     self.content_missing(
                         map(Content.to_dict, contents),
                         key_hash="sha1_git",
                         db=db,
                         cur=cur,
                     )
                 )
                 contents = [c for c in contents if c.sha1_git in missing]
 
                 self.journal_writer.content_add(contents)
                 self._content_add_metadata(db, cur, contents)
 
         return {
             "content:add": len(contents),
             "content:add:bytes": objstorage_summary["content:add:bytes"],
         }
 
     @timed
     @db_transaction()
     def content_update(self, content, keys=[], db=None, cur=None):
         # TODO: Add a check on input keys. How to properly implement
         # this? We don't know yet the new columns.
         self.journal_writer.content_update(content)
 
         db.mktemp("content", cur)
         select_keys = list(set(db.content_get_metadata_keys).union(set(keys)))
         with convert_validation_exceptions():
             db.copy_to(content, "tmp_content", select_keys, cur)
             db.content_update_from_temp(keys_to_update=keys, cur=cur)
 
     @timed
     @process_metrics
     @db_transaction()
     def content_add_metadata(
         self, content: Iterable[Content], db=None, cur=None
     ) -> Dict:
         contents = list(content)
         missing = self.content_missing(
             (c.to_dict() for c in contents), key_hash="sha1_git", db=db, cur=cur,
         )
         contents = [c for c in contents if c.sha1_git in missing]
 
         self.journal_writer.content_add_metadata(contents)
         self._content_add_metadata(db, cur, contents)
 
         return {
             "content:add": len(contents),
         }
 
     @timed
     def content_get(self, content):
         # FIXME: Make this method support slicing the `data`.
         if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
             raise StorageArgumentException(
                 "Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX
             )
         yield from self.objstorage.content_get(content)
 
     @timed
     @db_transaction()
     def content_get_range(self, start, end, limit=1000, db=None, cur=None):
         if limit is None:
             raise StorageArgumentException("limit should not be None")
         contents = []
         next_content = None
         for counter, content_row in enumerate(
             db.content_get_range(start, end, limit + 1, cur)
         ):
             content = dict(zip(db.content_get_metadata_keys, content_row))
             if counter >= limit:
                 # take the last commit for the next page starting from this
                 next_content = content["sha1"]
                 break
             contents.append(content)
         return {
             "contents": contents,
             "next": next_content,
         }
 
     @timed
     def content_get_partition(
         self,
         partition_id: int,
         nb_partitions: int,
         limit: int = 1000,
         page_token: str = None,
     ):
         if limit is None:
             raise StorageArgumentException("limit should not be None")
         (start, end) = get_partition_bounds_bytes(
             partition_id, nb_partitions, SHA1_SIZE
         )
         if page_token:
             start = hash_to_bytes(page_token)
         if end is None:
             end = b"\xff" * SHA1_SIZE
         result = self.content_get_range(start, end, limit)
         result2 = {
             "contents": result["contents"],
             "next_page_token": None,
         }
         if result["next"]:
             result2["next_page_token"] = hash_to_hex(result["next"])
         return result2
 
     @timed
     @db_transaction(statement_timeout=500)
     def content_get_metadata(
         self, contents: List[bytes], db=None, cur=None
     ) -> Dict[bytes, List[Dict]]:
         result: Dict[bytes, List[Dict]] = {sha1: [] for sha1 in contents}
         for row in db.content_get_metadata_from_sha1s(contents, cur):
             content_meta = dict(zip(db.content_get_metadata_keys, row))
             result[content_meta["sha1"]].append(content_meta)
         return result
 
     @timed
     @db_transaction_generator()
     def content_missing(self, content, key_hash="sha1", db=None, cur=None):
         keys = db.content_hash_keys
 
         if key_hash not in keys:
             raise StorageArgumentException("key_hash should be one of %s" % keys)
 
         key_hash_idx = keys.index(key_hash)
 
         if not content:
             return
 
         for obj in db.content_missing_from_list(content, cur):
             yield obj[key_hash_idx]
 
     @timed
     @db_transaction_generator()
     def content_missing_per_sha1(self, contents, db=None, cur=None):
         for obj in db.content_missing_per_sha1(contents, cur):
             yield obj[0]
 
     @timed
     @db_transaction_generator()
     def content_missing_per_sha1_git(self, contents, db=None, cur=None):
         for obj in db.content_missing_per_sha1_git(contents, cur):
             yield obj[0]
 
     @timed
     @db_transaction()
     def content_find(self, content, db=None, cur=None):
         if not set(content).intersection(DEFAULT_ALGORITHMS):
             raise StorageArgumentException(
                 "content keys must contain at least one of: "
                 "sha1, sha1_git, sha256, blake2s256"
             )
 
         contents = db.content_find(
             sha1=content.get("sha1"),
             sha1_git=content.get("sha1_git"),
             sha256=content.get("sha256"),
             blake2s256=content.get("blake2s256"),
             cur=cur,
         )
         return [dict(zip(db.content_find_cols, content)) for content in contents]
 
     @timed
     @db_transaction()
     def content_get_random(self, db=None, cur=None):
         return db.content_get_random(cur)
 
     @staticmethod
     def _skipped_content_normalize(d):
         d = d.copy()
 
         if d.get("status") is None:
             d["status"] = "absent"
 
         if d.get("length") is None:
             d["length"] = -1
 
         return d
 
     @staticmethod
     def _skipped_content_validate(d):
         """Sanity checks on status / reason / length, that postgresql
         doesn't enforce."""
         if d["status"] != "absent":
             raise StorageArgumentException(
                 "Invalid content status: {}".format(d["status"])
             )
 
         if d.get("reason") is None:
             raise StorageArgumentException(
                 "Must provide a reason if content is absent."
             )
 
         if d["length"] < -1:
             raise StorageArgumentException("Content length must be positive or -1.")
 
     def _skipped_content_add_metadata(self, db, cur, content: Iterable[SkippedContent]):
         origin_ids = db.origin_id_get_by_url([cont.origin for cont in content], cur=cur)
         content = [
             attr.evolve(c, origin=origin_id)
             for (c, origin_id) in zip(content, origin_ids)
         ]
         db.mktemp("skipped_content", cur)
         db.copy_to(
             [c.to_dict() for c in content],
             "tmp_skipped_content",
             db.skipped_content_keys,
             cur,
         )
 
         # move metadata in place
         db.skipped_content_add_from_temp(cur)
 
     @timed
     @process_metrics
     @db_transaction()
     def skipped_content_add(
         self, content: Iterable[SkippedContent], db=None, cur=None
     ) -> Dict:
         ctime = now()
         content = [attr.evolve(c, ctime=ctime) for c in content]
 
         missing_contents = self.skipped_content_missing(
             (c.to_dict() for c in content), db=db, cur=cur,
         )
         content = [
             c
             for c in content
             if any(
                 all(
                     c.get_hash(algo) == missing_content.get(algo)
                     for algo in DEFAULT_ALGORITHMS
                 )
                 for missing_content in missing_contents
             )
         ]
 
         self.journal_writer.skipped_content_add(content)
         self._skipped_content_add_metadata(db, cur, content)
 
         return {
             "skipped_content:add": len(content),
         }
 
     @timed
     @db_transaction_generator()
     def skipped_content_missing(self, contents, db=None, cur=None):
         contents = list(contents)
         for content in db.skipped_content_missing(contents, cur):
             yield dict(zip(db.content_hash_keys, content))
 
     @timed
     @process_metrics
     @db_transaction()
     def directory_add(
         self, directories: Iterable[Directory], db=None, cur=None
     ) -> Dict:
         directories = list(directories)
         summary = {"directory:add": 0}
 
         dirs = set()
         dir_entries: Dict[str, defaultdict] = {
             "file": defaultdict(list),
             "dir": defaultdict(list),
             "rev": defaultdict(list),
         }
 
         for cur_dir in directories:
             dir_id = cur_dir.id
             dirs.add(dir_id)
             for src_entry in cur_dir.entries:
                 entry = src_entry.to_dict()
                 entry["dir_id"] = dir_id
                 dir_entries[entry["type"]][dir_id].append(entry)
 
         dirs_missing = set(self.directory_missing(dirs, db=db, cur=cur))
         if not dirs_missing:
             return summary
 
         self.journal_writer.directory_add(
             dir_ for dir_ in directories if dir_.id in dirs_missing
         )
 
         # Copy directory ids
         dirs_missing_dict = ({"id": dir} for dir in dirs_missing)
         db.mktemp("directory", cur)
         db.copy_to(dirs_missing_dict, "tmp_directory", ["id"], cur)
 
         # Copy entries
         for entry_type, entry_list in dir_entries.items():
             entries = itertools.chain.from_iterable(
                 entries_for_dir
                 for dir_id, entries_for_dir in entry_list.items()
                 if dir_id in dirs_missing
             )
 
             db.mktemp_dir_entry(entry_type)
 
             db.copy_to(
                 entries,
                 "tmp_directory_entry_%s" % entry_type,
                 ["target", "name", "perms", "dir_id"],
                 cur,
             )
 
         # Do the final copy
         db.directory_add_from_temp(cur)
         summary["directory:add"] = len(dirs_missing)
 
         return summary
 
     @timed
     @db_transaction_generator()
     def directory_missing(self, directories, db=None, cur=None):
         for obj in db.directory_missing_from_list(directories, cur):
             yield obj[0]
 
     @timed
     @db_transaction_generator(statement_timeout=20000)
     def directory_ls(self, directory, recursive=False, db=None, cur=None):
         if recursive:
             res_gen = db.directory_walk(directory, cur=cur)
         else:
             res_gen = db.directory_walk_one(directory, cur=cur)
 
         for line in res_gen:
             yield dict(zip(db.directory_ls_cols, line))
 
     @timed
     @db_transaction(statement_timeout=2000)
     def directory_entry_get_by_path(self, directory, paths, db=None, cur=None):
         res = db.directory_entry_get_by_path(directory, paths, cur)
         if res:
             return dict(zip(db.directory_ls_cols, res))
 
     @timed
     @db_transaction()
     def directory_get_random(self, db=None, cur=None):
         return db.directory_get_random(cur)
 
     @timed
     @process_metrics
     @db_transaction()
     def revision_add(self, revisions: Iterable[Revision], db=None, cur=None) -> Dict:
         revisions = list(revisions)
         summary = {"revision:add": 0}
 
         revisions_missing = set(
             self.revision_missing(
                 set(revision.id for revision in revisions), db=db, cur=cur
             )
         )
 
         if not revisions_missing:
             return summary
 
         db.mktemp_revision(cur)
 
         revisions_filtered = [
             revision for revision in revisions if revision.id in revisions_missing
         ]
 
         self.journal_writer.revision_add(revisions_filtered)
 
         revisions_filtered = list(map(converters.revision_to_db, revisions_filtered))
 
         parents_filtered: List[bytes] = []
 
         with convert_validation_exceptions():
             db.copy_to(
                 revisions_filtered,
                 "tmp_revision",
                 db.revision_add_cols,
                 cur,
                 lambda rev: parents_filtered.extend(rev["parents"]),
             )
 
             db.revision_add_from_temp(cur)
 
             db.copy_to(
                 parents_filtered,
                 "revision_history",
                 ["id", "parent_id", "parent_rank"],
                 cur,
             )
 
         return {"revision:add": len(revisions_missing)}
 
     @timed
     @db_transaction_generator()
     def revision_missing(self, revisions, db=None, cur=None):
         if not revisions:
             return
 
         for obj in db.revision_missing_from_list(revisions, cur):
             yield obj[0]
 
     @timed
     @db_transaction_generator(statement_timeout=1000)
     def revision_get(self, revisions, db=None, cur=None):
         for line in db.revision_get_from_list(revisions, cur):
             data = converters.db_to_revision(dict(zip(db.revision_get_cols, line)))
             if not data["type"]:
                 yield None
                 continue
             yield data
 
     @timed
     @db_transaction_generator(statement_timeout=2000)
     def revision_log(self, revisions, limit=None, db=None, cur=None):
         for line in db.revision_log(revisions, limit, cur):
             data = converters.db_to_revision(dict(zip(db.revision_get_cols, line)))
             if not data["type"]:
                 yield None
                 continue
             yield data
 
     @timed
     @db_transaction_generator(statement_timeout=2000)
     def revision_shortlog(self, revisions, limit=None, db=None, cur=None):
 
         yield from db.revision_shortlog(revisions, limit, cur)
 
     @timed
     @db_transaction()
     def revision_get_random(self, db=None, cur=None):
         return db.revision_get_random(cur)
 
     @timed
     @process_metrics
     @db_transaction()
     def release_add(self, releases: Iterable[Release], db=None, cur=None) -> Dict:
         releases = list(releases)
         summary = {"release:add": 0}
 
         release_ids = set(release.id for release in releases)
         releases_missing = set(self.release_missing(release_ids, db=db, cur=cur))
 
         if not releases_missing:
             return summary
 
         db.mktemp_release(cur)
 
         releases_filtered = [
             release for release in releases if release.id in releases_missing
         ]
 
         self.journal_writer.release_add(releases_filtered)
 
         releases_filtered = list(map(converters.release_to_db, releases_filtered))
 
         with convert_validation_exceptions():
             db.copy_to(releases_filtered, "tmp_release", db.release_add_cols, cur)
 
             db.release_add_from_temp(cur)
 
         return {"release:add": len(releases_missing)}
 
     @timed
     @db_transaction_generator()
     def release_missing(self, releases, db=None, cur=None):
         if not releases:
             return
 
         for obj in db.release_missing_from_list(releases, cur):
             yield obj[0]
 
     @timed
     @db_transaction_generator(statement_timeout=500)
     def release_get(self, releases, db=None, cur=None):
         for release in db.release_get_from_list(releases, cur):
             data = converters.db_to_release(dict(zip(db.release_get_cols, release)))
             yield data if data["target_type"] else None
 
     @timed
     @db_transaction()
     def release_get_random(self, db=None, cur=None):
         return db.release_get_random(cur)
 
     @timed
     @process_metrics
     @db_transaction()
     def snapshot_add(self, snapshots: Iterable[Snapshot], db=None, cur=None) -> Dict:
         created_temp_table = False
 
         count = 0
         for snapshot in snapshots:
             if not db.snapshot_exists(snapshot.id, cur):
                 if not created_temp_table:
                     db.mktemp_snapshot_branch(cur)
                     created_temp_table = True
 
                 with convert_validation_exceptions():
                     db.copy_to(
                         (
                             {
                                 "name": name,
                                 "target": info.target if info else None,
                                 "target_type": (
                                     info.target_type.value if info else None
                                 ),
                             }
                             for name, info in snapshot.branches.items()
                         ),
                         "tmp_snapshot_branch",
                         ["name", "target", "target_type"],
                         cur,
                     )
 
                 self.journal_writer.snapshot_add([snapshot])
 
                 db.snapshot_add(snapshot.id, cur)
                 count += 1
 
         return {"snapshot:add": count}
 
     @timed
     @db_transaction_generator()
     def snapshot_missing(self, snapshots, db=None, cur=None):
         for obj in db.snapshot_missing_from_list(snapshots, cur):
             yield obj[0]
 
     @timed
     @db_transaction(statement_timeout=2000)
     def snapshot_get(self, snapshot_id, db=None, cur=None):
 
         return self.snapshot_get_branches(snapshot_id, db=db, cur=cur)
 
     @timed
     @db_transaction(statement_timeout=2000)
     def snapshot_get_by_origin_visit(self, origin, visit, db=None, cur=None):
         snapshot_id = db.snapshot_get_by_origin_visit(origin, visit, cur)
 
         if snapshot_id:
             return self.snapshot_get(snapshot_id, db=db, cur=cur)
 
         return None
 
     @timed
     @db_transaction(statement_timeout=4000)
     def snapshot_get_latest(self, origin, allowed_statuses=None, db=None, cur=None):
         if isinstance(origin, int):
             origin = self.origin_get({"id": origin}, db=db, cur=cur)
             if not origin:
                 return
             origin = origin["url"]
 
         origin_visit = self.origin_visit_get_latest(
             origin,
             allowed_statuses=allowed_statuses,
             require_snapshot=True,
             db=db,
             cur=cur,
         )
         if origin_visit and origin_visit["snapshot"]:
             snapshot = self.snapshot_get(origin_visit["snapshot"], db=db, cur=cur)
             if not snapshot:
                 raise StorageArgumentException(
                     "last origin visit references an unknown snapshot"
                 )
             return snapshot
 
     @timed
     @db_transaction(statement_timeout=2000)
     def snapshot_count_branches(self, snapshot_id, db=None, cur=None):
         return dict([bc for bc in db.snapshot_count_branches(snapshot_id, cur)])
 
     @timed
     @db_transaction(statement_timeout=2000)
     def snapshot_get_branches(
         self,
         snapshot_id,
         branches_from=b"",
         branches_count=1000,
         target_types=None,
         db=None,
         cur=None,
     ):
         if snapshot_id == EMPTY_SNAPSHOT_ID:
             return {
                 "id": snapshot_id,
                 "branches": {},
                 "next_branch": None,
             }
 
         branches = {}
         next_branch = None
 
         fetched_branches = list(
             db.snapshot_get_by_id(
                 snapshot_id,
                 branches_from=branches_from,
                 branches_count=branches_count + 1,
                 target_types=target_types,
                 cur=cur,
             )
         )
         for branch in fetched_branches[:branches_count]:
             branch = dict(zip(db.snapshot_get_cols, branch))
             del branch["snapshot_id"]
             name = branch.pop("name")
             if branch == {"target": None, "target_type": None}:
                 branch = None
             branches[name] = branch
 
         if len(fetched_branches) > branches_count:
             branch = dict(zip(db.snapshot_get_cols, fetched_branches[-1]))
             next_branch = branch["name"]
 
         if branches:
             return {
                 "id": snapshot_id,
                 "branches": branches,
                 "next_branch": next_branch,
             }
 
         return None
 
     @timed
     @db_transaction()
     def snapshot_get_random(self, db=None, cur=None):
         return db.snapshot_get_random(cur)
 
     @timed
     @db_transaction()
     def origin_visit_add(
         self,
         origin_url: str,
         date: Union[str, datetime.datetime],
         type: str,
         db=None,
         cur=None,
     ) -> OriginVisit:
         if isinstance(date, str):
             # FIXME: Converge on iso8601 at some point
             date = dateutil.parser.parse(date)
         elif not isinstance(date, datetime.datetime):
             raise StorageArgumentException("Date must be a datetime or a string")
 
         origin = self.origin_get({"url": origin_url}, db=db, cur=cur)
         if not origin:  # Cannot add a visit without an origin
             raise StorageArgumentException("Unknown origin %s", origin_url)
 
         with convert_validation_exceptions():
             visit_id = db.origin_visit_add(origin_url, date, type, cur=cur)
 
         status = "ongoing"
         # We can write to the journal only after inserting to the
         # DB, because we want the id of the visit
         visit = OriginVisit.from_dict(
             {
                 "origin": origin_url,
                 "date": date,
                 "type": type,
                 "visit": visit_id,
                 # TODO: Remove when we remove those fields from the model
                 "status": status,
                 "metadata": None,
                 "snapshot": None,
             }
         )
         self.journal_writer.origin_visit_add([visit])
 
         with convert_validation_exceptions():
             visit_status = OriginVisitStatus(
                 origin=origin_url,
                 visit=visit_id,
                 date=date,
                 status=status,
                 snapshot=None,
                 metadata=None,
             )
         self._origin_visit_status_add(visit_status, db=db, cur=cur)
         send_metric("origin_visit:add", count=1, method_name="origin_visit")
         return visit
 
     def _origin_visit_status_add(
         self, visit_status: OriginVisitStatus, db, cur
     ) -> None:
         """Add an origin visit status"""
         self.journal_writer.origin_visit_status_add([visit_status])
         db.origin_visit_status_add(visit_status, cur=cur)
         send_metric(
             "origin_visit_status:add", count=1, method_name="origin_visit_status"
         )
 
     @timed
     @db_transaction()
     def origin_visit_status_add(
         self, visit_statuses: Iterable[OriginVisitStatus], db=None, cur=None,
     ) -> None:
         # First round to check existence (fail early if any is ko)
         for visit_status in visit_statuses:
             origin_url = self.origin_get({"url": visit_status.origin}, db=db, cur=cur)
             if not origin_url:
                 raise StorageArgumentException(f"Unknown origin {visit_status.origin}")
 
         for visit_status in visit_statuses:
             self._origin_visit_status_add(visit_status, db, cur)
 
     @timed
     @db_transaction()
     def origin_visit_update(
         self,
         origin: str,
         visit_id: int,
         status: str,
         metadata: Optional[Dict] = None,
         snapshot: Optional[bytes] = None,
         date: Optional[datetime.datetime] = None,
         db=None,
         cur=None,
     ):
         if not isinstance(origin, str):
             raise StorageArgumentException(
                 "origin must be a string, not %r" % (origin,)
             )
         origin_url = origin
         visit = db.origin_visit_get(origin_url, visit_id, cur=cur)
 
         if not visit:
             raise StorageArgumentException("Invalid visit_id for this origin.")
 
         visit = dict(zip(db.origin_visit_get_cols, visit))
 
         updates: Dict[str, Any] = {
             "status": status,
         }
         if metadata and metadata != visit["metadata"]:
             updates["metadata"] = metadata
         if snapshot and snapshot != visit["snapshot"]:
             updates["snapshot"] = snapshot
 
         if updates:
             with convert_validation_exceptions():
                 updated_visit = OriginVisit.from_dict({**visit, **updates})
             self.journal_writer.origin_visit_update([updated_visit])
 
             # Write updates to origin visit (backward compatibility)
             db.origin_visit_update(origin, visit_id, updates)
 
             # Add new origin visit status
             last_visit_status = self._origin_visit_get_updated(
                 origin, visit_id, db=db, cur=cur
             )
             assert last_visit_status is not None
 
             with convert_validation_exceptions():
                 visit_status = OriginVisitStatus(
                     origin=origin_url,
                     visit=visit_id,
                     date=date or now(),
                     status=status,
                     snapshot=snapshot or last_visit_status["snapshot"],
                     metadata=metadata or last_visit_status["metadata"],
                 )
-                db.origin_visit_status_add(visit_status, cur=cur)
-                send_metric(
-                    "origin_visit_status:add",
-                    count=1,
-                    method_name="origin_visit_status",
-                )
-                # self._origin_visit_status_add(visit_status, db=db, cur=cur)
+                self._origin_visit_status_add(visit_status, db=db, cur=cur)
 
     def _origin_visit_get_updated(
         self, origin: str, visit_id: int, db, cur
     ) -> Optional[Dict[str, Any]]:
         """Retrieve origin visit and latest origin visit status and merge them
         into an origin visit.
 
         """
         row_visit = db.origin_visit_get(origin, visit_id)
         if row_visit is None:
             return None
         visit = dict(zip(db.origin_visit_get_cols, row_visit))
         return self._origin_visit_apply_update(visit, db=db, cur=cur)
 
     def _origin_visit_apply_update(
         self, visit: Dict[str, Any], db, cur=None
     ) -> Dict[str, Any]:
         """Retrieve the latest visit status information for the origin visit.
         Then merge it with the visit and return it.
 
         """
         visit_status = db.origin_visit_status_get_latest(
             visit["origin"], visit["visit"]
         )
         return self._origin_visit_merge(visit, visit_status)
 
     def _origin_visit_merge(
         self, visit: Dict[str, Any], visit_status: Dict[str, Any]
     ) -> Dict[str, Any]:
         """Merge origin_visit and origin_visit_status together.
 
         """
         return OriginVisit.from_dict(
             {
                 # default to the values in visit
                 **visit,
                 # override with the last update
                 **visit_status,
                 # visit['origin'] is the URL (via a join), while
                 # visit_status['origin'] is only an id.
                 "origin": visit["origin"],
                 # but keep the date of the creation of the origin visit
                 "date": visit["date"],
             }
         ).to_dict()
 
     @timed
     @db_transaction()
     def origin_visit_upsert(
         self, visits: Iterable[OriginVisit], db=None, cur=None
     ) -> None:
         for visit in visits:
             if visit.visit is None:
                 raise StorageArgumentException(f"Missing visit id for visit {visit}")
 
         self.journal_writer.origin_visit_upsert(visits)
 
         for visit in visits:
             # TODO: upsert them all in a single query
             assert visit.visit is not None
             db.origin_visit_upsert(visit, cur=cur)
             with convert_validation_exceptions():
                 visit_status = OriginVisitStatus(
                     origin=visit.origin,
                     visit=visit.visit,
                     date=now(),
                     status=visit.status,
                     snapshot=visit.snapshot,
                     metadata=visit.metadata,
                 )
             db.origin_visit_status_add(visit_status, cur=cur)
 
     @timed
     @db_transaction_generator(statement_timeout=500)
     def origin_visit_get(
         self,
         origin: str,
         last_visit: Optional[int] = None,
         limit: Optional[int] = None,
         db=None,
         cur=None,
     ) -> Iterable[Dict[str, Any]]:
         lines = db.origin_visit_get_all(
             origin, last_visit=last_visit, limit=limit, cur=cur
         )
         for line in lines:
             visit = dict(zip(db.origin_visit_get_cols, line))
             yield self._origin_visit_apply_update(visit, db)
 
     @timed
     @db_transaction(statement_timeout=500)
     def origin_visit_find_by_date(
         self, origin: str, visit_date: datetime.datetime, db=None, cur=None
     ) -> Optional[Dict[str, Any]]:
         visit = db.origin_visit_find_by_date(origin, visit_date, cur=cur)
         if visit:
             return self._origin_visit_apply_update(visit, db)
         return None
 
     @timed
     @db_transaction(statement_timeout=500)
     def origin_visit_get_by(
         self, origin: str, visit: int, db=None, cur=None
     ) -> Optional[Dict[str, Any]]:
         row = db.origin_visit_get(origin, visit, cur)
         if row:
             visit_dict = dict(zip(db.origin_visit_get_cols, row))
             return self._origin_visit_apply_update(visit_dict, db)
         return None
 
     @timed
     @db_transaction(statement_timeout=4000)
     def origin_visit_get_latest(
         self,
         origin: str,
         allowed_statuses: Optional[List[str]] = None,
         require_snapshot: bool = False,
         db=None,
         cur=None,
     ) -> Optional[Dict[str, Any]]:
         row = db.origin_visit_get_latest(
             origin,
             allowed_statuses=allowed_statuses,
             require_snapshot=require_snapshot,
             cur=cur,
         )
         if row:
             visit = dict(zip(db.origin_visit_get_cols, row))
             return self._origin_visit_apply_update(visit, db)
         return None
 
     @timed
     @db_transaction()
     def origin_visit_get_random(
         self, type: str, db=None, cur=None
     ) -> Optional[Dict[str, Any]]:
         row = db.origin_visit_get_random(type, cur)
         if row:
             visit = dict(zip(db.origin_visit_get_cols, row))
             return self._origin_visit_apply_update(visit, db)
         return None
 
     @timed
     @db_transaction(statement_timeout=2000)
     def object_find_by_sha1_git(self, ids, db=None, cur=None):
         ret = {id: [] for id in ids}
 
         for retval in db.object_find_by_sha1_git(ids, cur=cur):
             if retval[1]:
                 ret[retval[0]].append(
                     dict(zip(db.object_find_by_sha1_git_cols, retval))
                 )
 
         return ret
 
     @timed
     @db_transaction(statement_timeout=500)
     def origin_get(self, origins, db=None, cur=None):
         if isinstance(origins, dict):
             # Old API
             return_single = True
             origins = [origins]
         elif len(origins) == 0:
             return []
         else:
             return_single = False
 
         origin_urls = [origin["url"] for origin in origins]
         results = db.origin_get_by_url(origin_urls, cur)
 
         results = [dict(zip(db.origin_cols, result)) for result in results]
         if return_single:
             assert len(results) == 1
             if results[0]["url"] is not None:
                 return results[0]
             else:
                 return None
         else:
             return [None if res["url"] is None else res for res in results]
 
     @timed
     @db_transaction_generator(statement_timeout=500)
     def origin_get_by_sha1(self, sha1s, db=None, cur=None):
         for line in db.origin_get_by_sha1(sha1s, cur):
             if line[0] is not None:
                 yield dict(zip(db.origin_cols, line))
             else:
                 yield None
 
     @timed
     @db_transaction_generator()
     def origin_get_range(self, origin_from=1, origin_count=100, db=None, cur=None):
         for origin in db.origin_get_range(origin_from, origin_count, cur):
             yield dict(zip(db.origin_get_range_cols, origin))
 
     @timed
     @db_transaction()
     def origin_list(
         self, page_token: Optional[str] = None, limit: int = 100, *, db=None, cur=None
     ) -> dict:
         page_token = page_token or "0"
         if not isinstance(page_token, str):
             raise StorageArgumentException("page_token must be a string.")
         origin_from = int(page_token)
         result: Dict[str, Any] = {
             "origins": [
                 dict(zip(db.origin_get_range_cols, origin))
                 for origin in db.origin_get_range(origin_from, limit, cur)
             ],
         }
 
         assert len(result["origins"]) <= limit
         if len(result["origins"]) == limit:
             result["next_page_token"] = str(result["origins"][limit - 1]["id"] + 1)
 
         for origin in result["origins"]:
             del origin["id"]
 
         return result
 
     @timed
     @db_transaction_generator()
     def origin_search(
         self,
         url_pattern,
         offset=0,
         limit=50,
         regexp=False,
         with_visit=False,
         db=None,
         cur=None,
     ):
         for origin in db.origin_search(
             url_pattern, offset, limit, regexp, with_visit, cur
         ):
             yield dict(zip(db.origin_cols, origin))
 
     @timed
     @db_transaction()
     def origin_count(
         self, url_pattern, regexp=False, with_visit=False, db=None, cur=None
     ):
         return db.origin_count(url_pattern, regexp, with_visit, cur)
 
     @timed
     @db_transaction()
     def origin_add(self, origins: Iterable[Origin], db=None, cur=None) -> List[Dict]:
         origins = list(origins)
         for origin in origins:
             self.origin_add_one(origin, db=db, cur=cur)
 
         return [o.to_dict() for o in origins]
 
     @timed
     @db_transaction()
     def origin_add_one(self, origin: Origin, db=None, cur=None) -> str:
         origin_row = list(db.origin_get_by_url([origin.url], cur))[0]
         origin_url = dict(zip(db.origin_cols, origin_row))["url"]
         if origin_url:
             return origin_url
 
         self.journal_writer.origin_add([origin])
 
         url = db.origin_add(origin.url, cur)
         send_metric("origin:add", count=1, method_name="origin_add_one")
         return url
 
     @db_transaction(statement_timeout=500)
     def stat_counters(self, db=None, cur=None):
         return {k: v for (k, v) in db.stat_counters()}
 
     @db_transaction()
     def refresh_stat_counters(self, db=None, cur=None):
         keys = [
             "content",
             "directory",
             "directory_entry_dir",
             "directory_entry_file",
             "directory_entry_rev",
             "origin",
             "origin_visit",
             "person",
             "release",
             "revision",
             "revision_history",
             "skipped_content",
             "snapshot",
         ]
 
         for key in keys:
             cur.execute("select * from swh_update_counter(%s)", (key,))
 
     @timed
     @db_transaction()
     def origin_metadata_add(
         self,
         origin_url: str,
         discovery_date: datetime.datetime,
         authority: Dict[str, Any],
         fetcher: Dict[str, Any],
         format: str,
         metadata: bytes,
         db=None,
         cur=None,
     ) -> None:
         authority_id = db.metadata_authority_get_id(
             authority["type"], authority["url"], cur
         )
         if not authority_id:
             raise StorageArgumentException(f"Unknown authority {authority}")
         fetcher_id = db.metadata_fetcher_get_id(
             fetcher["name"], fetcher["version"], cur
         )
         if not fetcher_id:
             raise StorageArgumentException(f"Unknown fetcher {fetcher}")
         try:
             db.origin_metadata_add(
                 origin_url,
                 discovery_date,
                 authority_id,
                 fetcher_id,
                 format,
                 metadata,
                 cur,
             )
         except psycopg2.ProgrammingError as e:
             raise StorageArgumentException(*e.args)
         send_metric("origin_metadata:add", count=1, method_name="origin_metadata_add")
 
     @timed
     @db_transaction(statement_timeout=500)
     def origin_metadata_get(
         self,
         origin_url: str,
         authority: Dict[str, str],
         after: Optional[datetime.datetime] = None,
         page_token: Optional[bytes] = None,
         limit: int = 1000,
         db=None,
         cur=None,
     ) -> Dict[str, Any]:
         if page_token:
             (after_time, after_fetcher) = msgpack_loads(page_token)
             if after and after_time < after:
                 raise StorageArgumentException(
                     "page_token is inconsistent with the value of 'after'."
                 )
         else:
             after_time = after
             after_fetcher = None
 
         authority_id = db.metadata_authority_get_id(
             authority["type"], authority["url"], cur
         )
         if not authority_id:
             return {
                 "next_page_token": None,
                 "results": [],
             }
 
         rows = db.origin_metadata_get(
             origin_url, authority_id, after_time, after_fetcher, limit + 1, cur
         )
         rows = [dict(zip(db.origin_metadata_get_cols, row)) for row in rows]
         results = []
         for row in rows:
             row = row.copy()
             row.pop("metadata_fetcher.id")
             results.append(
                 {
                     "origin_url": row.pop("origin.url"),
                     "authority": {
                         "type": row.pop("metadata_authority.type"),
                         "url": row.pop("metadata_authority.url"),
                     },
                     "fetcher": {
                         "name": row.pop("metadata_fetcher.name"),
                         "version": row.pop("metadata_fetcher.version"),
                     },
                     **row,
                 }
             )
 
         if len(results) > limit:
             results.pop()
             assert len(results) == limit
             last_returned_row = rows[-2]  # rows[-1] corresponds to the popped result
             next_page_token: Optional[bytes] = msgpack_dumps(
                 (
                     last_returned_row["discovery_date"],
                     last_returned_row["metadata_fetcher.id"],
                 )
             )
         else:
             next_page_token = None
 
         return {
             "next_page_token": next_page_token,
             "results": results,
         }
 
     @timed
     @db_transaction()
     def metadata_fetcher_add(
         self, name: str, version: str, metadata: Dict[str, Any], db=None, cur=None
     ) -> None:
         db.metadata_fetcher_add(name, version, metadata)
         send_metric("metadata_fetcher:add", count=1, method_name="metadata_fetcher")
 
     @timed
     @db_transaction(statement_timeout=500)
     def metadata_fetcher_get(
         self, name: str, version: str, db=None, cur=None
     ) -> Optional[Dict[str, Any]]:
         row = db.metadata_fetcher_get(name, version, cur=cur)
         if not row:
             return None
         return dict(zip(db.metadata_fetcher_cols, row))
 
     @timed
     @db_transaction()
     def metadata_authority_add(
         self, type: str, url: str, metadata: Dict[str, Any], db=None, cur=None
     ) -> None:
         db.metadata_authority_add(type, url, metadata, cur)
         send_metric("metadata_authority:add", count=1, method_name="metadata_authority")
 
     @timed
     @db_transaction()
     def metadata_authority_get(
         self, type: str, url: str, db=None, cur=None
     ) -> Optional[Dict[str, Any]]:
         row = db.metadata_authority_get(type, url, cur=cur)
         if not row:
             return None
         return dict(zip(db.metadata_authority_cols, row))
 
     @timed
     def diff_directories(self, from_dir, to_dir, track_renaming=False):
         return diff.diff_directories(self, from_dir, to_dir, track_renaming)
 
     @timed
     def diff_revisions(self, from_rev, to_rev, track_renaming=False):
         return diff.diff_revisions(self, from_rev, to_rev, track_renaming)
 
     @timed
     def diff_revision(self, revision, track_renaming=False):
         return diff.diff_revision(self, revision, track_renaming)
 
     def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None:
         """Do nothing
 
         """
         return None
 
     def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict:
         return {}
diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py
index 3e8dce07..ca531c0a 100644
--- a/swh/storage/tests/test_kafka_writer.py
+++ b/swh/storage/tests/test_kafka_writer.py
@@ -1,157 +1,157 @@
 # Copyright (C) 2018-2020 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from confluent_kafka import Consumer
 
 from swh.storage import get_storage
 from swh.model.model import Origin, OriginVisit
 from swh.model.hypothesis_strategies import objects
 from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed
 from swh.journal.tests.journal_data import TEST_OBJECTS
 
 from swh.model.model import Person
 from attr import asdict, has
 from hypothesis import given
 from hypothesis.strategies import lists
 
 
 def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer):
 
     writer_config = {
         "cls": "kafka",
         "brokers": [kafka_server],
         "client_id": "kafka_writer",
         "prefix": kafka_prefix,
         "anonymize": False,
     }
     storage_config = {
         "cls": "pipeline",
         "steps": [{"cls": "memory", "journal_writer": writer_config},],
     }
 
     storage = get_storage(**storage_config)
 
     expected_messages = 0
 
     for obj_type, objs in TEST_OBJECTS.items():
         method = getattr(storage, obj_type + "_add")
         if obj_type in (
             "content",
             "skipped_content",
             "directory",
             "revision",
             "release",
             "snapshot",
             "origin",
             "origin_visit_status",
         ):
             method(objs)
             expected_messages += len(objs)
         elif obj_type in ("origin_visit",):
             for obj in objs:
                 assert isinstance(obj, OriginVisit)
                 storage.origin_add_one(Origin(url=obj.origin))
                 visit = method(obj.origin, date=obj.date, type=obj.type)
                 expected_messages += 1 + 1  # 1 visit + 1 visit status
 
                 obj_d = obj.to_dict()
                 for k in ("visit", "origin", "date", "type"):
                     del obj_d[k]
                 storage.origin_visit_update(obj.origin, visit.visit, **obj_d)
-                expected_messages += 1
+                expected_messages += 1 + 1  # 1 visit update + 1 visit status
         else:
             assert False, obj_type
 
     existing_topics = set(
         topic
         for topic in consumer.list_topics(timeout=10).topics.keys()
         if topic.startswith(f"{kafka_prefix}.")  # final . to exclude privileged topics
     )
     assert existing_topics == {
         f"{kafka_prefix}.{obj_type}"
         for obj_type in (
             "content",
             "directory",
             "origin",
             "origin_visit",
             "origin_visit_status",
             "release",
             "revision",
             "snapshot",
             "skipped_content",
         )
     }
 
     consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
     assert_all_objects_consumed(consumed_messages)
 
 
 def test_storage_direct_writer_anonymized(
     kafka_prefix: str, kafka_server, consumer: Consumer
 ):
 
     writer_config = {
         "cls": "kafka",
         "brokers": [kafka_server],
         "client_id": "kafka_writer",
         "prefix": kafka_prefix,
         "anonymize": True,
     }
     storage_config = {
         "cls": "pipeline",
         "steps": [{"cls": "memory", "journal_writer": writer_config},],
     }
 
     storage = get_storage(**storage_config)
 
     expected_messages = 0
 
     for obj_type, objs in TEST_OBJECTS.items():
         if obj_type == "origin_visit":
             # these have non-consistent API and are unrelated with what we
             # want to test here
             continue
         method = getattr(storage, obj_type + "_add")
         method(objs)
         expected_messages += len(objs)
 
     existing_topics = set(
         topic
         for topic in consumer.list_topics(timeout=10).topics.keys()
         if topic.startswith(kafka_prefix)
     )
     assert existing_topics == {
         f"{kafka_prefix}.{obj_type}"
         for obj_type in (
             "content",
             "directory",
             "origin",
             "origin_visit",
             "origin_visit_status",
             "release",
             "revision",
             "snapshot",
             "skipped_content",
         )
     } | {
         f"{kafka_prefix}_privileged.{obj_type}" for obj_type in ("release", "revision",)
     }
 
 
 def check_anonymized_obj(obj):
     if has(obj):
         if isinstance(obj, Person):
             assert obj.name is None
             assert obj.email is None
             assert len(obj.fullname) == 32
         else:
             for key, value in asdict(obj, recurse=False).items():
                 check_anonymized_obj(value)
 
 
 @given(lists(objects(split_content=True)))
 def test_anonymizer(obj_type_and_objs):
     for obj_type, obj in obj_type_and_objs:
         check_anonymized_obj(obj.anonymize())
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
index e5ae40d2..11378133 100644
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -1,4001 +1,4017 @@
 # Copyright (C) 2015-2020  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import copy
 from contextlib import contextmanager
 import datetime
 import inspect
 import itertools
 import math
 import queue
 import random
 import threading
 
 from collections import defaultdict
 from datetime import timedelta
 from unittest.mock import Mock
 
 import psycopg2
 import pytest
 
 from hypothesis import given, strategies, settings, HealthCheck
 
 from typing import ClassVar, Optional
 
 from swh.model import from_disk, identifiers
 from swh.model.hashutil import hash_to_bytes
 from swh.model.model import (
     Content,
     Directory,
     Origin,
     OriginVisit,
     OriginVisitStatus,
     Release,
     Revision,
     Snapshot,
 )
 from swh.model.hypothesis_strategies import objects
 from swh.model.hashutil import hash_to_hex
 from swh.storage import get_storage
 from swh.storage.converters import origin_url_to_sha1 as sha1
 from swh.storage.exc import HashCollision, StorageArgumentException
 from swh.storage.interface import StorageInterface
-from swh.storage.utils import content_hex_hashes
+from swh.storage.utils import content_hex_hashes, now
 
 from .storage_data import data
 
 
 @contextmanager
 def db_transaction(storage):
     with storage.db() as db:
         with db.transaction() as cur:
             yield db, cur
 
 
 def normalize_entity(entity):
     entity = copy.deepcopy(entity)
     for key in ("date", "committer_date"):
         if key in entity:
             entity[key] = identifiers.normalize_timestamp(entity[key])
     return entity
 
 
 def transform_entries(dir_, *, prefix=b""):
     for ent in dir_["entries"]:
         yield {
             "dir_id": dir_["id"],
             "type": ent["type"],
             "target": ent["target"],
             "name": prefix + ent["name"],
             "perms": ent["perms"],
             "status": None,
             "sha1": None,
             "sha1_git": None,
             "sha256": None,
             "length": None,
         }
 
 
 def cmpdir(directory):
     return (directory["type"], directory["dir_id"])
 
 
 def short_revision(revision):
     return [revision["id"], revision["parents"]]
 
 
 def assert_contents_ok(
     expected_contents, actual_contents, keys_to_check={"sha1", "data"}
 ):
     """Assert that a given list of contents matches on a given set of keys.
 
     """
     for k in keys_to_check:
         expected_list = set([c.get(k) for c in expected_contents])
         actual_list = set([c.get(k) for c in actual_contents])
         assert actual_list == expected_list, k
 
 
 class LazyContent(Content):
     def with_data(self):
         return Content.from_dict({**self.to_dict(), "data": data.cont["data"]})
 
 
-def now():
-    return datetime.datetime.now(tz=datetime.timezone.utc)
-
-
 class TestStorage:
     """Main class for Storage testing.
 
     This class is used as-is to test local storage (see TestLocalStorage
     below) and remote storage (see TestRemoteStorage in
     test_remote_storage.py.
 
     We need to have the two classes inherit from this base class
     separately to avoid nosetests running the tests from the base
     class twice.
     """
 
     maxDiff = None  # type: ClassVar[Optional[int]]
 
     def test_types(self, swh_storage_backend_config):
         """Checks all methods of StorageInterface are implemented by this
         backend, and that they have the same signature."""
         # Create an instance of the protocol (which cannot be instantiated
         # directly, so this creates a subclass, then instantiates it)
         interface = type("_", (StorageInterface,), {})()
         storage = get_storage(**swh_storage_backend_config)
 
         assert "content_add" in dir(interface)
 
         missing_methods = []
 
         for meth_name in dir(interface):
             if meth_name.startswith("_"):
                 continue
             interface_meth = getattr(interface, meth_name)
             try:
                 concrete_meth = getattr(storage, meth_name)
             except AttributeError:
                 if not getattr(interface_meth, "deprecated_endpoint", False):
                     # The backend is missing a (non-deprecated) endpoint
                     missing_methods.append(meth_name)
                 continue
 
             expected_signature = inspect.signature(interface_meth)
             actual_signature = inspect.signature(concrete_meth)
 
             assert expected_signature == actual_signature, meth_name
 
         assert missing_methods == []
 
     def test_check_config(self, swh_storage):
         assert swh_storage.check_config(check_write=True)
         assert swh_storage.check_config(check_write=False)
 
     def test_content_add(self, swh_storage):
         cont = data.cont
 
         insertion_start_time = now()
         actual_result = swh_storage.content_add([cont])
         insertion_end_time = now()
 
         assert actual_result == {
             "content:add": 1,
             "content:add:bytes": cont["length"],
         }
 
         assert list(swh_storage.content_get([cont["sha1"]])) == [
             {"sha1": cont["sha1"], "data": cont["data"]}
         ]
 
         expected_cont = data.cont
         del expected_cont["data"]
         contents = [
             obj
             for (obj_type, obj) in swh_storage.journal_writer.journal.objects
             if obj_type == "content"
         ]
         assert len(contents) == 1
         for obj in contents:
             assert insertion_start_time <= obj.ctime
             assert obj.ctime <= insertion_end_time
             obj_d = obj.to_dict()
             del obj_d["ctime"]
             assert obj_d == expected_cont
 
         swh_storage.refresh_stat_counters()
         assert swh_storage.stat_counters()["content"] == 1
 
     def test_content_add_from_generator(self, swh_storage):
         def _cnt_gen():
             yield data.cont
 
         actual_result = swh_storage.content_add(_cnt_gen())
 
         assert actual_result == {
             "content:add": 1,
             "content:add:bytes": data.cont["length"],
         }
 
         swh_storage.refresh_stat_counters()
         assert swh_storage.stat_counters()["content"] == 1
 
     def test_content_add_from_lazy_content(self, swh_storage):
 
         lazy_content = LazyContent.from_dict({**data.cont, "data": b"nope",})
 
         insertion_start_time = now()
 
         # bypass the validation proxy for now, to directly put a dict
         actual_result = swh_storage.storage.content_add([lazy_content])
 
         insertion_end_time = now()
 
         assert actual_result == {
             "content:add": 1,
             "content:add:bytes": data.cont["length"],
         }
 
         # the fact that we retrieve the content object from the storage with
         # the correct 'data' field ensures it has been 'called'
         assert list(swh_storage.content_get([data.cont["sha1"]])) == [
             {"sha1": data.cont["sha1"], "data": data.cont["data"]}
         ]
 
         expected_cont = data.cont
         del expected_cont["data"]
         contents = [
             obj
             for (obj_type, obj) in swh_storage.journal_writer.journal.objects
             if obj_type == "content"
         ]
         assert len(contents) == 1
         for obj in contents:
             assert insertion_start_time <= obj.ctime
             assert obj.ctime <= insertion_end_time
             obj_d = obj.to_dict()
             del obj_d["ctime"]
             assert obj_d == expected_cont
 
         swh_storage.refresh_stat_counters()
         assert swh_storage.stat_counters()["content"] == 1
 
     def test_content_add_validation(self, swh_storage):
         cont = data.cont
 
         with pytest.raises(StorageArgumentException, match="status"):
             swh_storage.content_add([{**cont, "status": "absent"}])
 
         with pytest.raises(StorageArgumentException, match="status"):
             swh_storage.content_add([{**cont, "status": "foobar"}])
 
         with pytest.raises(StorageArgumentException, match="(?i)length"):
             swh_storage.content_add([{**cont, "length": -2}])
 
         with pytest.raises(StorageArgumentException, match="reason"):
             swh_storage.content_add([{**cont, "reason": "foobar"}])
 
     def test_skipped_content_add_validation(self, swh_storage):
         cont = data.cont.copy()
         del cont["data"]
 
         with pytest.raises(StorageArgumentException, match="status"):
             swh_storage.skipped_content_add([{**cont, "status": "visible"}])
 
         with pytest.raises(StorageArgumentException, match="reason") as cm:
             swh_storage.skipped_content_add([{**cont, "status": "absent"}])
 
         if type(cm.value) == psycopg2.IntegrityError:
             assert cm.exception.pgcode == psycopg2.errorcodes.NOT_NULL_VIOLATION
 
     def test_content_get_missing(self, swh_storage):
         cont = data.cont
 
         swh_storage.content_add([cont])
 
         # Query a single missing content
         results = list(swh_storage.content_get([data.cont2["sha1"]]))
         assert results == [None]
 
         # Check content_get does not abort after finding a missing content
         results = list(swh_storage.content_get([data.cont["sha1"], data.cont2["sha1"]]))
         assert results == [{"sha1": cont["sha1"], "data": cont["data"]}, None]
 
         # Check content_get does not discard found countent when it finds
         # a missing content.
         results = list(swh_storage.content_get([data.cont2["sha1"], data.cont["sha1"]]))
         assert results == [None, {"sha1": cont["sha1"], "data": cont["data"]}]
 
     def test_content_add_different_input(self, swh_storage):
         cont = data.cont
         cont2 = data.cont2
 
         actual_result = swh_storage.content_add([cont, cont2])
         assert actual_result == {
             "content:add": 2,
             "content:add:bytes": cont["length"] + cont2["length"],
         }
 
     def test_content_add_twice(self, swh_storage):
         actual_result = swh_storage.content_add([data.cont])
         assert actual_result == {
             "content:add": 1,
             "content:add:bytes": data.cont["length"],
         }
         assert len(swh_storage.journal_writer.journal.objects) == 1
 
         actual_result = swh_storage.content_add([data.cont, data.cont2])
         assert actual_result == {
             "content:add": 1,
             "content:add:bytes": data.cont2["length"],
         }
         assert 2 <= len(swh_storage.journal_writer.journal.objects) <= 3
 
         assert len(swh_storage.content_find(data.cont)) == 1
         assert len(swh_storage.content_find(data.cont2)) == 1
 
     def test_content_add_collision(self, swh_storage):
         cont1 = data.cont
 
         # create (corrupted) content with same sha1{,_git} but != sha256
         cont1b = cont1.copy()
         sha256_array = bytearray(cont1b["sha256"])
         sha256_array[0] += 1
         cont1b["sha256"] = bytes(sha256_array)
 
         with pytest.raises(HashCollision) as cm:
             swh_storage.content_add([cont1, cont1b])
 
         exc = cm.value
         actual_algo = exc.algo
         assert actual_algo in ["sha1", "sha1_git", "blake2s256"]
         actual_id = exc.hash_id
         assert actual_id == hash_to_hex(cont1[actual_algo])
         collisions = exc.args[2]
         assert len(collisions) == 2
         assert collisions == [
             content_hex_hashes(Content.from_dict(cont1).hashes()),
             content_hex_hashes(Content.from_dict(cont1b).hashes()),
         ]
         assert exc.colliding_content_hashes() == [
             Content.from_dict(cont1).hashes(),
             Content.from_dict(cont1b).hashes(),
         ]
 
     def test_content_add_duplicate(self, swh_storage):
         swh_storage.content_add([data.cont, data.cont])
 
         assert list(swh_storage.content_get([data.cont["sha1"]])) == [
             {"sha1": data.cont["sha1"], "data": data.cont["data"]}
         ]
 
     def test_content_update(self, swh_storage):
         if hasattr(swh_storage, "storage"):
             swh_storage.journal_writer.journal = None  # TODO, not supported
 
         cont = copy.deepcopy(data.cont)
 
         swh_storage.content_add([cont])
         # alter the sha1_git for example
         cont["sha1_git"] = hash_to_bytes("3a60a5275d0333bf13468e8b3dcab90f4046e654")
 
         swh_storage.content_update([cont], keys=["sha1_git"])
 
         results = swh_storage.content_get_metadata([cont["sha1"]])
         del cont["data"]
         assert tuple(results[cont["sha1"]]) == (cont,)
 
     def test_content_add_metadata(self, swh_storage):
         cont = data.cont
         del cont["data"]
         cont["ctime"] = now()
 
         actual_result = swh_storage.content_add_metadata([cont])
         assert actual_result == {
             "content:add": 1,
         }
 
         expected_cont = cont.copy()
         del expected_cont["ctime"]
         assert tuple(
             swh_storage.content_get_metadata([cont["sha1"]])[cont["sha1"]]
         ) == (expected_cont,)
         contents = [
             obj
             for (obj_type, obj) in swh_storage.journal_writer.journal.objects
             if obj_type == "content"
         ]
         assert len(contents) == 1
         for obj in contents:
             obj_d = obj.to_dict()
             del obj_d["ctime"]
             assert obj_d == expected_cont
 
     def test_content_add_metadata_different_input(self, swh_storage):
         cont = data.cont
         del cont["data"]
         cont["ctime"] = now()
         cont2 = data.cont2
         del cont2["data"]
         cont2["ctime"] = now()
 
         actual_result = swh_storage.content_add_metadata([cont, cont2])
         assert actual_result == {
             "content:add": 2,
         }
 
     def test_content_add_metadata_collision(self, swh_storage):
         cont1 = data.cont
         del cont1["data"]
         cont1["ctime"] = now()
 
         # create (corrupted) content with same sha1{,_git} but != sha256
         cont1b = cont1.copy()
         sha1_git_array = bytearray(cont1b["sha256"])
         sha1_git_array[0] += 1
         cont1b["sha256"] = bytes(sha1_git_array)
 
         with pytest.raises(HashCollision) as cm:
             swh_storage.content_add_metadata([cont1, cont1b])
 
         exc = cm.value
         actual_algo = exc.algo
         assert actual_algo in ["sha1", "sha1_git", "blake2s256"]
         actual_id = exc.hash_id
         assert actual_id == hash_to_hex(cont1[actual_algo])
         collisions = exc.args[2]
         assert len(collisions) == 2
         assert collisions == [
             content_hex_hashes(Content.from_dict(cont1).hashes()),
             content_hex_hashes(Content.from_dict(cont1b).hashes()),
         ]
         assert exc.colliding_content_hashes() == [
             Content.from_dict(cont1).hashes(),
             Content.from_dict(cont1b).hashes(),
         ]
 
     def test_skipped_content_add(self, swh_storage):
         cont = data.skipped_cont
         cont2 = data.skipped_cont2
         cont2["blake2s256"] = None
 
         missing = list(swh_storage.skipped_content_missing([cont, cont2]))
 
         assert missing == [
             {
                 "sha1": cont["sha1"],
                 "sha1_git": cont["sha1_git"],
                 "blake2s256": cont["blake2s256"],
                 "sha256": cont["sha256"],
             },
             {
                 "sha1": cont2["sha1"],
                 "sha1_git": cont2["sha1_git"],
                 "blake2s256": cont2["blake2s256"],
                 "sha256": cont2["sha256"],
             },
         ]
 
         actual_result = swh_storage.skipped_content_add([cont, cont, cont2])
 
         assert 2 <= actual_result.pop("skipped_content:add") <= 3
         assert actual_result == {}
 
         missing = list(swh_storage.skipped_content_missing([cont, cont2]))
 
         assert missing == []
 
     def test_skipped_content_add_missing_hashes(self, swh_storage):
         cont = data.skipped_cont
         cont2 = data.skipped_cont2
         cont["sha1_git"] = cont2["sha1_git"] = None
 
         missing = list(swh_storage.skipped_content_missing([cont, cont2]))
 
         assert len(missing) == 2
 
         actual_result = swh_storage.skipped_content_add([cont, cont, cont2])
 
         assert 2 <= actual_result.pop("skipped_content:add") <= 3
         assert actual_result == {}
 
         missing = list(swh_storage.skipped_content_missing([cont, cont2]))
 
         assert missing == []
 
     def test_skipped_content_missing_partial_hash(self, swh_storage):
         cont = data.skipped_cont
         cont2 = cont.copy()
         cont2["sha1_git"] = None
 
         missing = list(swh_storage.skipped_content_missing([cont, cont2]))
 
         assert len(missing) == 2
 
         actual_result = swh_storage.skipped_content_add([cont])
 
         assert actual_result.pop("skipped_content:add") == 1
         assert actual_result == {}
 
         missing = list(swh_storage.skipped_content_missing([cont, cont2]))
 
         assert missing == [
             {
                 "sha1": cont2["sha1"],
                 "sha1_git": cont2["sha1_git"],
                 "blake2s256": cont2["blake2s256"],
                 "sha256": cont2["sha256"],
             }
         ]
 
     @pytest.mark.property_based
     @settings(deadline=None)  # this test is very slow
     @given(
         strategies.sets(
             elements=strategies.sampled_from(["sha256", "sha1_git", "blake2s256"]),
             min_size=0,
         )
     )
     def test_content_missing(self, swh_storage, algos):
         algos |= {"sha1"}
         cont2 = data.cont2
         missing_cont = data.missing_cont
         swh_storage.content_add([cont2])
         test_contents = [cont2]
         missing_per_hash = defaultdict(list)
         for i in range(256):
             test_content = missing_cont.copy()
             for hash in algos:
                 test_content[hash] = bytes([i]) + test_content[hash][1:]
                 missing_per_hash[hash].append(test_content[hash])
             test_contents.append(test_content)
 
         assert set(swh_storage.content_missing(test_contents)) == set(
             missing_per_hash["sha1"]
         )
 
         for hash in algos:
             assert set(
                 swh_storage.content_missing(test_contents, key_hash=hash)
             ) == set(missing_per_hash[hash])
 
     @pytest.mark.property_based
     @given(
         strategies.sets(
             elements=strategies.sampled_from(["sha256", "sha1_git", "blake2s256"]),
             min_size=0,
         )
     )
     def test_content_missing_unknown_algo(self, swh_storage, algos):
         algos |= {"sha1"}
         cont2 = data.cont2
         missing_cont = data.missing_cont
         swh_storage.content_add([cont2])
         test_contents = [cont2]
         missing_per_hash = defaultdict(list)
         for i in range(16):
             test_content = missing_cont.copy()
             for hash in algos:
                 test_content[hash] = bytes([i]) + test_content[hash][1:]
                 missing_per_hash[hash].append(test_content[hash])
             test_content["nonexisting_algo"] = b"\x00"
             test_contents.append(test_content)
 
         assert set(swh_storage.content_missing(test_contents)) == set(
             missing_per_hash["sha1"]
         )
 
         for hash in algos:
             assert set(
                 swh_storage.content_missing(test_contents, key_hash=hash)
             ) == set(missing_per_hash[hash])
 
     def test_content_missing_per_sha1(self, swh_storage):
         # given
         cont2 = data.cont2
         missing_cont = data.missing_cont
         swh_storage.content_add([cont2])
         # when
         gen = swh_storage.content_missing_per_sha1(
             [cont2["sha1"], missing_cont["sha1"]]
         )
         # then
         assert list(gen) == [missing_cont["sha1"]]
 
     def test_content_missing_per_sha1_git(self, swh_storage):
         cont = data.cont
         cont2 = data.cont2
         missing_cont = data.missing_cont
 
         swh_storage.content_add([cont, cont2])
 
         contents = [cont["sha1_git"], cont2["sha1_git"], missing_cont["sha1_git"]]
 
         missing_contents = swh_storage.content_missing_per_sha1_git(contents)
         assert list(missing_contents) == [missing_cont["sha1_git"]]
 
     def test_content_get_partition(self, swh_storage, swh_contents):
         """content_get_partition paginates results if limit exceeded"""
         expected_contents = [c for c in swh_contents if c["status"] != "absent"]
 
         actual_contents = []
         for i in range(16):
             actual_result = swh_storage.content_get_partition(i, 16)
             assert actual_result["next_page_token"] is None
             actual_contents.extend(actual_result["contents"])
 
         assert_contents_ok(expected_contents, actual_contents, ["sha1"])
 
     def test_content_get_partition_full(self, swh_storage, swh_contents):
         """content_get_partition for a single partition returns all available
         contents"""
         expected_contents = [c for c in swh_contents if c["status"] != "absent"]
 
         actual_result = swh_storage.content_get_partition(0, 1)
         assert actual_result["next_page_token"] is None
 
         actual_contents = actual_result["contents"]
         assert_contents_ok(expected_contents, actual_contents, ["sha1"])
 
     def test_content_get_partition_empty(self, swh_storage, swh_contents):
         """content_get_partition when at least one of the partitions is
         empty"""
         expected_contents = {
             cont["sha1"] for cont in swh_contents if cont["status"] != "absent"
         }
         # nb_partitions = smallest power of 2 such that at least one of
         # the partitions is empty
         nb_partitions = 1 << math.floor(math.log2(len(swh_contents)) + 1)
 
         seen_sha1s = []
 
         for i in range(nb_partitions):
             actual_result = swh_storage.content_get_partition(
                 i, nb_partitions, limit=len(swh_contents) + 1
             )
 
             for cont in actual_result["contents"]:
                 seen_sha1s.append(cont["sha1"])
 
             # Limit is higher than the max number of results
             assert actual_result["next_page_token"] is None
 
         assert set(seen_sha1s) == expected_contents
 
     def test_content_get_partition_limit_none(self, swh_storage):
         """content_get_partition call with wrong limit input should fail"""
         with pytest.raises(StorageArgumentException) as e:
             swh_storage.content_get_partition(1, 16, limit=None)
 
         assert e.value.args == ("limit should not be None",)
 
     def test_generate_content_get_partition_pagination(self, swh_storage, swh_contents):
         """content_get_partition returns contents within range provided"""
         expected_contents = [c for c in swh_contents if c["status"] != "absent"]
 
         # retrieve contents
         actual_contents = []
         for i in range(4):
             page_token = None
             while True:
                 actual_result = swh_storage.content_get_partition(
                     i, 4, limit=3, page_token=page_token
                 )
                 actual_contents.extend(actual_result["contents"])
                 page_token = actual_result["next_page_token"]
 
                 if page_token is None:
                     break
 
         assert_contents_ok(expected_contents, actual_contents, ["sha1"])
 
     def test_content_get_metadata(self, swh_storage):
         cont1 = data.cont
         cont2 = data.cont2
 
         swh_storage.content_add([cont1, cont2])
 
         actual_md = swh_storage.content_get_metadata([cont1["sha1"], cont2["sha1"]])
 
         # we only retrieve the metadata
         cont1.pop("data")
         cont2.pop("data")
 
         assert tuple(actual_md[cont1["sha1"]]) == (cont1,)
         assert tuple(actual_md[cont2["sha1"]]) == (cont2,)
         assert len(actual_md.keys()) == 2
 
     def test_content_get_metadata_missing_sha1(self, swh_storage):
         cont1 = data.cont
         cont2 = data.cont2
         missing_cont = data.missing_cont
 
         swh_storage.content_add([cont1, cont2])
 
         actual_contents = swh_storage.content_get_metadata([missing_cont["sha1"]])
 
         assert len(actual_contents) == 1
         assert tuple(actual_contents[missing_cont["sha1"]]) == ()
 
     def test_content_get_random(self, swh_storage):
         swh_storage.content_add([data.cont, data.cont2, data.cont3])
 
         assert swh_storage.content_get_random() in {
             data.cont["sha1_git"],
             data.cont2["sha1_git"],
             data.cont3["sha1_git"],
         }
 
     def test_directory_add(self, swh_storage):
         init_missing = list(swh_storage.directory_missing([data.dir["id"]]))
         assert [data.dir["id"]] == init_missing
 
         actual_result = swh_storage.directory_add([data.dir])
         assert actual_result == {"directory:add": 1}
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("directory", Directory.from_dict(data.dir))
         ]
 
         actual_data = list(swh_storage.directory_ls(data.dir["id"]))
         expected_data = list(transform_entries(data.dir))
 
         assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
 
         after_missing = list(swh_storage.directory_missing([data.dir["id"]]))
         assert after_missing == []
 
         swh_storage.refresh_stat_counters()
         assert swh_storage.stat_counters()["directory"] == 1
 
     def test_directory_add_from_generator(self, swh_storage):
         def _dir_gen():
             yield data.dir
 
         actual_result = swh_storage.directory_add(directories=_dir_gen())
         assert actual_result == {"directory:add": 1}
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("directory", Directory.from_dict(data.dir))
         ]
 
         swh_storage.refresh_stat_counters()
         assert swh_storage.stat_counters()["directory"] == 1
 
     def test_directory_add_validation(self, swh_storage):
         dir_ = copy.deepcopy(data.dir)
         dir_["entries"][0]["type"] = "foobar"
 
         with pytest.raises(StorageArgumentException, match="type.*foobar"):
             swh_storage.directory_add([dir_])
 
         dir_ = copy.deepcopy(data.dir)
         del dir_["entries"][0]["target"]
 
         with pytest.raises(StorageArgumentException, match="target") as cm:
             swh_storage.directory_add([dir_])
 
         if type(cm.value) == psycopg2.IntegrityError:
             assert cm.value.pgcode == psycopg2.errorcodes.NOT_NULL_VIOLATION
 
     def test_directory_add_twice(self, swh_storage):
         actual_result = swh_storage.directory_add([data.dir])
         assert actual_result == {"directory:add": 1}
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("directory", Directory.from_dict(data.dir))
         ]
 
         actual_result = swh_storage.directory_add([data.dir])
         assert actual_result == {"directory:add": 0}
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("directory", Directory.from_dict(data.dir))
         ]
 
     def test_directory_get_recursive(self, swh_storage):
         init_missing = list(swh_storage.directory_missing([data.dir["id"]]))
         assert init_missing == [data.dir["id"]]
 
         actual_result = swh_storage.directory_add([data.dir, data.dir2, data.dir3])
         assert actual_result == {"directory:add": 3}
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("directory", Directory.from_dict(data.dir)),
             ("directory", Directory.from_dict(data.dir2)),
             ("directory", Directory.from_dict(data.dir3)),
         ]
 
         # List directory containing a file and an unknown subdirectory
         actual_data = list(swh_storage.directory_ls(data.dir["id"], recursive=True))
         expected_data = list(transform_entries(data.dir))
         assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
 
         # List directory containing a file and an unknown subdirectory
         actual_data = list(swh_storage.directory_ls(data.dir2["id"], recursive=True))
         expected_data = list(transform_entries(data.dir2))
         assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
 
         # List directory containing a known subdirectory, entries should
         # be both those of the directory and of the subdir
         actual_data = list(swh_storage.directory_ls(data.dir3["id"], recursive=True))
         expected_data = list(
             itertools.chain(
                 transform_entries(data.dir3),
                 transform_entries(data.dir, prefix=b"subdir/"),
             )
         )
         assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
 
     def test_directory_get_non_recursive(self, swh_storage):
         init_missing = list(swh_storage.directory_missing([data.dir["id"]]))
         assert init_missing == [data.dir["id"]]
 
         actual_result = swh_storage.directory_add([data.dir, data.dir2, data.dir3])
         assert actual_result == {"directory:add": 3}
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("directory", Directory.from_dict(data.dir)),
             ("directory", Directory.from_dict(data.dir2)),
             ("directory", Directory.from_dict(data.dir3)),
         ]
 
         # List directory containing a file and an unknown subdirectory
         actual_data = list(swh_storage.directory_ls(data.dir["id"]))
         expected_data = list(transform_entries(data.dir))
         assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
 
         # List directory contaiining a single file
         actual_data = list(swh_storage.directory_ls(data.dir2["id"]))
         expected_data = list(transform_entries(data.dir2))
         assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
 
         # List directory containing a known subdirectory, entries should
         # only be those of the parent directory, not of the subdir
         actual_data = list(swh_storage.directory_ls(data.dir3["id"]))
         expected_data = list(transform_entries(data.dir3))
         assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
 
     def test_directory_entry_get_by_path(self, swh_storage):
         # given
         init_missing = list(swh_storage.directory_missing([data.dir3["id"]]))
         assert [data.dir3["id"]] == init_missing
 
         actual_result = swh_storage.directory_add([data.dir3, data.dir4])
         assert actual_result == {"directory:add": 2}
 
         expected_entries = [
             {
                 "dir_id": data.dir3["id"],
                 "name": b"foo",
                 "type": "file",
                 "target": data.cont["sha1_git"],
                 "sha1": None,
                 "sha1_git": None,
                 "sha256": None,
                 "status": None,
                 "perms": from_disk.DentryPerms.content,
                 "length": None,
             },
             {
                 "dir_id": data.dir3["id"],
                 "name": b"subdir",
                 "type": "dir",
                 "target": data.dir["id"],
                 "sha1": None,
                 "sha1_git": None,
                 "sha256": None,
                 "status": None,
                 "perms": from_disk.DentryPerms.directory,
                 "length": None,
             },
             {
                 "dir_id": data.dir3["id"],
                 "name": b"hello",
                 "type": "file",
                 "target": b"12345678901234567890",
                 "sha1": None,
                 "sha1_git": None,
                 "sha256": None,
                 "status": None,
                 "perms": from_disk.DentryPerms.content,
                 "length": None,
             },
         ]
 
         # when (all must be found here)
         for entry, expected_entry in zip(data.dir3["entries"], expected_entries):
             actual_entry = swh_storage.directory_entry_get_by_path(
                 data.dir3["id"], [entry["name"]]
             )
             assert actual_entry == expected_entry
 
         # same, but deeper
         for entry, expected_entry in zip(data.dir3["entries"], expected_entries):
             actual_entry = swh_storage.directory_entry_get_by_path(
                 data.dir4["id"], [b"subdir1", entry["name"]]
             )
             expected_entry = expected_entry.copy()
             expected_entry["name"] = b"subdir1/" + expected_entry["name"]
             assert actual_entry == expected_entry
 
         # when (nothing should be found here since data.dir is not persisted.)
         for entry in data.dir["entries"]:
             actual_entry = swh_storage.directory_entry_get_by_path(
                 data.dir["id"], [entry["name"]]
             )
             assert actual_entry is None
 
     def test_directory_get_random(self, swh_storage):
         swh_storage.directory_add([data.dir, data.dir2, data.dir3])
 
         assert swh_storage.directory_get_random() in {
             data.dir["id"],
             data.dir2["id"],
             data.dir3["id"],
         }
 
     def test_revision_add(self, swh_storage):
         init_missing = swh_storage.revision_missing([data.revision["id"]])
         assert list(init_missing) == [data.revision["id"]]
 
         actual_result = swh_storage.revision_add([data.revision])
         assert actual_result == {"revision:add": 1}
 
         end_missing = swh_storage.revision_missing([data.revision["id"]])
         assert list(end_missing) == []
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("revision", Revision.from_dict(data.revision))
         ]
 
         # already there so nothing added
         actual_result = swh_storage.revision_add([data.revision])
         assert actual_result == {"revision:add": 0}
 
         swh_storage.refresh_stat_counters()
         assert swh_storage.stat_counters()["revision"] == 1
 
     def test_revision_add_from_generator(self, swh_storage):
         def _rev_gen():
             yield data.revision
 
         actual_result = swh_storage.revision_add(_rev_gen())
         assert actual_result == {"revision:add": 1}
 
         swh_storage.refresh_stat_counters()
         assert swh_storage.stat_counters()["revision"] == 1
 
     def test_revision_add_validation(self, swh_storage):
         rev = copy.deepcopy(data.revision)
         rev["date"]["offset"] = 2 ** 16
 
         with pytest.raises(StorageArgumentException, match="offset") as cm:
             swh_storage.revision_add([rev])
 
         if type(cm.value) == psycopg2.DataError:
             assert cm.value.pgcode == psycopg2.errorcodes.NUMERIC_VALUE_OUT_OF_RANGE
 
         rev = copy.deepcopy(data.revision)
         rev["committer_date"]["offset"] = 2 ** 16
 
         with pytest.raises(StorageArgumentException, match="offset") as cm:
             swh_storage.revision_add([rev])
 
         if type(cm.value) == psycopg2.DataError:
             assert cm.value.pgcode == psycopg2.errorcodes.NUMERIC_VALUE_OUT_OF_RANGE
 
         rev = copy.deepcopy(data.revision)
         rev["type"] = "foobar"
 
         with pytest.raises(StorageArgumentException, match="(?i)type") as cm:
             swh_storage.revision_add([rev])
 
         if type(cm.value) == psycopg2.DataError:
             assert cm.value.pgcode == psycopg2.errorcodes.INVALID_TEXT_REPRESENTATION
 
     def test_revision_add_twice(self, swh_storage):
         actual_result = swh_storage.revision_add([data.revision])
         assert actual_result == {"revision:add": 1}
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("revision", Revision.from_dict(data.revision))
         ]
 
         actual_result = swh_storage.revision_add([data.revision, data.revision2])
         assert actual_result == {"revision:add": 1}
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("revision", Revision.from_dict(data.revision)),
             ("revision", Revision.from_dict(data.revision2)),
         ]
 
     def test_revision_add_name_clash(self, swh_storage):
         revision1 = data.revision
         revision2 = data.revision2
 
         revision1["author"] = {
             "fullname": b"John Doe <john.doe@example.com>",
             "name": b"John Doe",
             "email": b"john.doe@example.com",
         }
         revision2["author"] = {
             "fullname": b"John Doe <john.doe@example.com>",
             "name": b"John Doe ",
             "email": b"john.doe@example.com ",
         }
         actual_result = swh_storage.revision_add([revision1, revision2])
         assert actual_result == {"revision:add": 2}
 
     def test_revision_get_order(self, swh_storage):
         add_result = swh_storage.revision_add([data.revision, data.revision2])
         assert add_result == {"revision:add": 2}
 
         # order 1
         res1 = swh_storage.revision_get([data.revision["id"], data.revision2["id"]])
         assert list(res1) == [data.revision, data.revision2]
 
         # order 2
         res2 = swh_storage.revision_get([data.revision2["id"], data.revision["id"]])
         assert list(res2) == [data.revision2, data.revision]
 
     def test_revision_log(self, swh_storage):
         # given
         # data.revision4 -is-child-of-> data.revision3
         swh_storage.revision_add([data.revision3, data.revision4])
 
         # when
         actual_results = list(swh_storage.revision_log([data.revision4["id"]]))
 
         # hack: ids generated
         for actual_result in actual_results:
             if "id" in actual_result["author"]:
                 del actual_result["author"]["id"]
             if "id" in actual_result["committer"]:
                 del actual_result["committer"]["id"]
 
         assert len(actual_results) == 2  # rev4 -child-> rev3
         assert actual_results[0] == normalize_entity(data.revision4)
         assert actual_results[1] == normalize_entity(data.revision3)
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("revision", Revision.from_dict(data.revision3)),
             ("revision", Revision.from_dict(data.revision4)),
         ]
 
     def test_revision_log_with_limit(self, swh_storage):
         # given
         # data.revision4 -is-child-of-> data.revision3
         swh_storage.revision_add([data.revision3, data.revision4])
         actual_results = list(swh_storage.revision_log([data.revision4["id"]], 1))
 
         # hack: ids generated
         for actual_result in actual_results:
             if "id" in actual_result["author"]:
                 del actual_result["author"]["id"]
             if "id" in actual_result["committer"]:
                 del actual_result["committer"]["id"]
 
         assert len(actual_results) == 1
         assert actual_results[0] == data.revision4
 
     def test_revision_log_unknown_revision(self, swh_storage):
         rev_log = list(swh_storage.revision_log([data.revision["id"]]))
         assert rev_log == []
 
     def test_revision_shortlog(self, swh_storage):
         # given
         # data.revision4 -is-child-of-> data.revision3
         swh_storage.revision_add([data.revision3, data.revision4])
 
         # when
         actual_results = list(swh_storage.revision_shortlog([data.revision4["id"]]))
 
         assert len(actual_results) == 2  # rev4 -child-> rev3
         assert list(actual_results[0]) == short_revision(data.revision4)
         assert list(actual_results[1]) == short_revision(data.revision3)
 
     def test_revision_shortlog_with_limit(self, swh_storage):
         # given
         # data.revision4 -is-child-of-> data.revision3
         swh_storage.revision_add([data.revision3, data.revision4])
         actual_results = list(swh_storage.revision_shortlog([data.revision4["id"]], 1))
 
         assert len(actual_results) == 1
         assert list(actual_results[0]) == short_revision(data.revision4)
 
     def test_revision_get(self, swh_storage):
         swh_storage.revision_add([data.revision])
 
         actual_revisions = list(
             swh_storage.revision_get([data.revision["id"], data.revision2["id"]])
         )
 
         # when
         if "id" in actual_revisions[0]["author"]:
             del actual_revisions[0]["author"]["id"]  # hack: ids are generated
         if "id" in actual_revisions[0]["committer"]:
             del actual_revisions[0]["committer"]["id"]
 
         assert len(actual_revisions) == 2
         assert actual_revisions[0] == normalize_entity(data.revision)
         assert actual_revisions[1] is None
 
     def test_revision_get_no_parents(self, swh_storage):
         swh_storage.revision_add([data.revision3])
 
         get = list(swh_storage.revision_get([data.revision3["id"]]))
 
         assert len(get) == 1
         assert get[0]["parents"] == ()  # no parents on this one
 
     def test_revision_get_random(self, swh_storage):
         swh_storage.revision_add([data.revision, data.revision2, data.revision3])
 
         assert swh_storage.revision_get_random() in {
             data.revision["id"],
             data.revision2["id"],
             data.revision3["id"],
         }
 
     def test_release_add(self, swh_storage):
         init_missing = swh_storage.release_missing(
             [data.release["id"], data.release2["id"]]
         )
         assert [data.release["id"], data.release2["id"]] == list(init_missing)
 
         actual_result = swh_storage.release_add([data.release, data.release2])
         assert actual_result == {"release:add": 2}
 
         end_missing = swh_storage.release_missing(
             [data.release["id"], data.release2["id"]]
         )
         assert list(end_missing) == []
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("release", Release.from_dict(data.release)),
             ("release", Release.from_dict(data.release2)),
         ]
 
         # already present so nothing added
         actual_result = swh_storage.release_add([data.release, data.release2])
         assert actual_result == {"release:add": 0}
 
         swh_storage.refresh_stat_counters()
         assert swh_storage.stat_counters()["release"] == 2
 
     def test_release_add_from_generator(self, swh_storage):
         def _rel_gen():
             yield data.release
             yield data.release2
 
         actual_result = swh_storage.release_add(_rel_gen())
         assert actual_result == {"release:add": 2}
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("release", Release.from_dict(data.release)),
             ("release", Release.from_dict(data.release2)),
         ]
 
         swh_storage.refresh_stat_counters()
         assert swh_storage.stat_counters()["release"] == 2
 
     def test_release_add_no_author_date(self, swh_storage):
         release = data.release
 
         release["author"] = None
         release["date"] = None
 
         actual_result = swh_storage.release_add([release])
         assert actual_result == {"release:add": 1}
 
         end_missing = swh_storage.release_missing([data.release["id"]])
         assert list(end_missing) == []
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("release", Release.from_dict(release))
         ]
 
     def test_release_add_validation(self, swh_storage):
         rel = copy.deepcopy(data.release)
         rel["date"]["offset"] = 2 ** 16
 
         with pytest.raises(StorageArgumentException, match="offset") as cm:
             swh_storage.release_add([rel])
 
         if type(cm.value) == psycopg2.DataError:
             assert cm.value.pgcode == psycopg2.errorcodes.NUMERIC_VALUE_OUT_OF_RANGE
 
         rel = copy.deepcopy(data.release)
         rel["author"] = None
 
         with pytest.raises(StorageArgumentException, match="date") as cm:
             swh_storage.release_add([rel])
 
         if type(cm.value) == psycopg2.IntegrityError:
             assert cm.value.pgcode == psycopg2.errorcodes.CHECK_VIOLATION
 
     def test_release_add_validation_type(self, swh_storage):
         rel = copy.deepcopy(data.release)
 
         rel["date"]["offset"] = "toto"
         with pytest.raises(StorageArgumentException):
             swh_storage.release_add([rel])
 
     def test_release_add_twice(self, swh_storage):
         actual_result = swh_storage.release_add([data.release])
         assert actual_result == {"release:add": 1}
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("release", Release.from_dict(data.release))
         ]
 
         actual_result = swh_storage.release_add([data.release, data.release2])
         assert actual_result == {"release:add": 1}
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("release", Release.from_dict(data.release)),
             ("release", Release.from_dict(data.release2)),
         ]
 
     def test_release_add_name_clash(self, swh_storage):
         release1 = data.release.copy()
         release2 = data.release2.copy()
 
         release1["author"] = {
             "fullname": b"John Doe <john.doe@example.com>",
             "name": b"John Doe",
             "email": b"john.doe@example.com",
         }
         release2["author"] = {
             "fullname": b"John Doe <john.doe@example.com>",
             "name": b"John Doe ",
             "email": b"john.doe@example.com ",
         }
         actual_result = swh_storage.release_add([release1, release2])
         assert actual_result == {"release:add": 2}
 
     def test_release_get(self, swh_storage):
         # given
         swh_storage.release_add([data.release, data.release2])
 
         # when
         actual_releases = list(
             swh_storage.release_get([data.release["id"], data.release2["id"]])
         )
 
         # then
         for actual_release in actual_releases:
             if "id" in actual_release["author"]:
                 del actual_release["author"]["id"]  # hack: ids are generated
 
         assert [normalize_entity(data.release), normalize_entity(data.release2)] == [
             actual_releases[0],
             actual_releases[1],
         ]
 
         unknown_releases = list(swh_storage.release_get([data.release3["id"]]))
 
         assert unknown_releases[0] is None
 
     def test_release_get_order(self, swh_storage):
         add_result = swh_storage.release_add([data.release, data.release2])
         assert add_result == {"release:add": 2}
 
         # order 1
         res1 = swh_storage.release_get([data.release["id"], data.release2["id"]])
         assert list(res1) == [data.release, data.release2]
 
         # order 2
         res2 = swh_storage.release_get([data.release2["id"], data.release["id"]])
         assert list(res2) == [data.release2, data.release]
 
     def test_release_get_random(self, swh_storage):
         swh_storage.release_add([data.release, data.release2, data.release3])
 
         assert swh_storage.release_get_random() in {
             data.release["id"],
             data.release2["id"],
             data.release3["id"],
         }
 
     def test_origin_add_one(self, swh_storage):
         origin0 = swh_storage.origin_get(data.origin)
         assert origin0 is None
 
         id = swh_storage.origin_add_one(data.origin)
 
         actual_origin = swh_storage.origin_get({"url": data.origin["url"]})
         assert actual_origin["url"] == data.origin["url"]
 
         id2 = swh_storage.origin_add_one(data.origin)
 
         assert id == id2
 
     def test_origin_add(self, swh_storage):
         origin0 = swh_storage.origin_get([data.origin])[0]
         assert origin0 is None
 
         origin1, origin2 = swh_storage.origin_add([data.origin, data.origin2])
 
         actual_origin = swh_storage.origin_get([{"url": data.origin["url"],}])[0]
         assert actual_origin["url"] == origin1["url"]
 
         actual_origin2 = swh_storage.origin_get([{"url": data.origin2["url"],}])[0]
         assert actual_origin2["url"] == origin2["url"]
 
         if "id" in actual_origin:
             del actual_origin["id"]
             del actual_origin2["id"]
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("origin", Origin.from_dict(actual_origin)),
             ("origin", Origin.from_dict(actual_origin2)),
         ]
 
         swh_storage.refresh_stat_counters()
         assert swh_storage.stat_counters()["origin"] == 2
 
     def test_origin_add_from_generator(self, swh_storage):
         def _ori_gen():
             yield data.origin
             yield data.origin2
 
         origin1, origin2 = swh_storage.origin_add(_ori_gen())
 
         actual_origin = swh_storage.origin_get([{"url": data.origin["url"],}])[0]
         assert actual_origin["url"] == origin1["url"]
 
         actual_origin2 = swh_storage.origin_get([{"url": data.origin2["url"],}])[0]
         assert actual_origin2["url"] == origin2["url"]
 
         if "id" in actual_origin:
             del actual_origin["id"]
             del actual_origin2["id"]
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("origin", Origin.from_dict(actual_origin)),
             ("origin", Origin.from_dict(actual_origin2)),
         ]
 
         swh_storage.refresh_stat_counters()
         assert swh_storage.stat_counters()["origin"] == 2
 
     def test_origin_add_twice(self, swh_storage):
         add1 = swh_storage.origin_add([data.origin, data.origin2])
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("origin", Origin.from_dict(data.origin)),
             ("origin", Origin.from_dict(data.origin2)),
         ]
 
         add2 = swh_storage.origin_add([data.origin, data.origin2])
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("origin", Origin.from_dict(data.origin)),
             ("origin", Origin.from_dict(data.origin2)),
         ]
 
         assert add1 == add2
 
     def test_origin_add_validation(self, swh_storage):
         """Incorrect formatted origin should fail the validation
 
         """
         with pytest.raises(StorageArgumentException, match="url"):
             swh_storage.origin_add([{}])
         with pytest.raises(
             StorageArgumentException, match="unexpected keyword argument"
         ):
             swh_storage.origin_add([{"ul": "mistyped url key"}])
 
     def test_origin_get_legacy(self, swh_storage):
         assert swh_storage.origin_get(data.origin) is None
         swh_storage.origin_add_one(data.origin)
 
         actual_origin0 = swh_storage.origin_get({"url": data.origin["url"]})
         assert actual_origin0["url"] == data.origin["url"]
 
     def test_origin_get(self, swh_storage):
         assert swh_storage.origin_get(data.origin) is None
         swh_storage.origin_add_one(data.origin)
 
         actual_origin0 = swh_storage.origin_get([{"url": data.origin["url"]}])
         assert len(actual_origin0) == 1
         assert actual_origin0[0]["url"] == data.origin["url"]
 
     def _generate_random_visits(self, nb_visits=100, start=0, end=7):
         """Generate random visits within the last 2 months (to avoid
         computations)
 
         """
         visits = []
         today = now()
         for weeks in range(nb_visits, 0, -1):
             hours = random.randint(0, 24)
             minutes = random.randint(0, 60)
             seconds = random.randint(0, 60)
             days = random.randint(0, 28)
             weeks = random.randint(start, end)
             date_visit = today - timedelta(
                 weeks=weeks, hours=hours, minutes=minutes, seconds=seconds, days=days
             )
             visits.append(date_visit)
         return visits
 
     def test_origin_visit_get_random(self, swh_storage):
         swh_storage.origin_add(data.origins)
         # Add some random visits within the selection range
         visits = self._generate_random_visits()
         visit_type = "git"
 
         # Add visits to those origins
         for origin in data.origins:
             origin_url = origin["url"]
             for date_visit in visits:
                 visit = swh_storage.origin_visit_add(
                     origin_url, date=date_visit, type=visit_type
                 )
                 swh_storage.origin_visit_update(
                     origin_url, visit_id=visit.visit, status="full"
                 )
 
         swh_storage.refresh_stat_counters()
 
         stats = swh_storage.stat_counters()
         assert stats["origin"] == len(data.origins)
         assert stats["origin_visit"] == len(data.origins) * len(visits)
 
         random_origin_visit = swh_storage.origin_visit_get_random(visit_type)
         assert random_origin_visit
         assert random_origin_visit["origin"] is not None
         original_urls = [o["url"] for o in data.origins]
         assert random_origin_visit["origin"] in original_urls
 
     def test_origin_visit_get_random_nothing_found(self, swh_storage):
         swh_storage.origin_add(data.origins)
         visit_type = "hg"
         # Add some visits outside of the random generation selection so nothing
         # will be found by the random selection
         visits = self._generate_random_visits(nb_visits=3, start=13, end=24)
         for origin in data.origins:
             origin_url = origin["url"]
             for date_visit in visits:
                 visit = swh_storage.origin_visit_add(
                     origin_url, date=date_visit, type=visit_type
                 )
                 swh_storage.origin_visit_update(origin_url, visit.visit, status="full")
 
         random_origin_visit = swh_storage.origin_visit_get_random(visit_type)
         assert random_origin_visit is None
 
     def test_origin_get_by_sha1(self, swh_storage):
         assert swh_storage.origin_get(data.origin) is None
         swh_storage.origin_add_one(data.origin)
 
         origins = list(swh_storage.origin_get_by_sha1([sha1(data.origin["url"])]))
         assert len(origins) == 1
         assert origins[0]["url"] == data.origin["url"]
 
     def test_origin_get_by_sha1_not_found(self, swh_storage):
         assert swh_storage.origin_get(data.origin) is None
         origins = list(swh_storage.origin_get_by_sha1([sha1(data.origin["url"])]))
         assert len(origins) == 1
         assert origins[0] is None
 
     def test_origin_search_single_result(self, swh_storage):
         found_origins = list(swh_storage.origin_search(data.origin["url"]))
         assert len(found_origins) == 0
 
         found_origins = list(swh_storage.origin_search(data.origin["url"], regexp=True))
         assert len(found_origins) == 0
 
         swh_storage.origin_add_one(data.origin)
         origin_data = {"url": data.origin["url"]}
         found_origins = list(swh_storage.origin_search(data.origin["url"]))
         assert len(found_origins) == 1
         if "id" in found_origins[0]:
             del found_origins[0]["id"]
         assert found_origins[0] == origin_data
 
         found_origins = list(
             swh_storage.origin_search("." + data.origin["url"][1:-1] + ".", regexp=True)
         )
         assert len(found_origins) == 1
         if "id" in found_origins[0]:
             del found_origins[0]["id"]
         assert found_origins[0] == origin_data
 
         swh_storage.origin_add_one(data.origin2)
         origin2_data = {"url": data.origin2["url"]}
         found_origins = list(swh_storage.origin_search(data.origin2["url"]))
         assert len(found_origins) == 1
         if "id" in found_origins[0]:
             del found_origins[0]["id"]
         assert found_origins[0] == origin2_data
 
         found_origins = list(
             swh_storage.origin_search(
                 "." + data.origin2["url"][1:-1] + ".", regexp=True
             )
         )
         assert len(found_origins) == 1
         if "id" in found_origins[0]:
             del found_origins[0]["id"]
         assert found_origins[0] == origin2_data
 
     def test_origin_search_no_regexp(self, swh_storage):
         swh_storage.origin_add_one(data.origin)
         swh_storage.origin_add_one(data.origin2)
 
         origin = swh_storage.origin_get({"url": data.origin["url"]})
         origin2 = swh_storage.origin_get({"url": data.origin2["url"]})
 
         # no pagination
         found_origins = list(swh_storage.origin_search("/"))
         assert len(found_origins) == 2
 
         # offset=0
         found_origins0 = list(swh_storage.origin_search("/", offset=0, limit=1))  # noqa
         assert len(found_origins0) == 1
         assert found_origins0[0] in [origin, origin2]
 
         # offset=1
         found_origins1 = list(swh_storage.origin_search("/", offset=1, limit=1))  # noqa
         assert len(found_origins1) == 1
         assert found_origins1[0] in [origin, origin2]
 
         # check both origins were returned
         assert found_origins0 != found_origins1
 
     def test_origin_search_regexp_substring(self, swh_storage):
         swh_storage.origin_add_one(data.origin)
         swh_storage.origin_add_one(data.origin2)
 
         origin = swh_storage.origin_get({"url": data.origin["url"]})
         origin2 = swh_storage.origin_get({"url": data.origin2["url"]})
 
         # no pagination
         found_origins = list(swh_storage.origin_search("/", regexp=True))
         assert len(found_origins) == 2
 
         # offset=0
         found_origins0 = list(
             swh_storage.origin_search("/", offset=0, limit=1, regexp=True)
         )  # noqa
         assert len(found_origins0) == 1
         assert found_origins0[0] in [origin, origin2]
 
         # offset=1
         found_origins1 = list(
             swh_storage.origin_search("/", offset=1, limit=1, regexp=True)
         )  # noqa
         assert len(found_origins1) == 1
         assert found_origins1[0] in [origin, origin2]
 
         # check both origins were returned
         assert found_origins0 != found_origins1
 
     def test_origin_search_regexp_fullstring(self, swh_storage):
         swh_storage.origin_add_one(data.origin)
         swh_storage.origin_add_one(data.origin2)
 
         origin = swh_storage.origin_get({"url": data.origin["url"]})
         origin2 = swh_storage.origin_get({"url": data.origin2["url"]})
 
         # no pagination
         found_origins = list(swh_storage.origin_search(".*/.*", regexp=True))
         assert len(found_origins) == 2
 
         # offset=0
         found_origins0 = list(
             swh_storage.origin_search(".*/.*", offset=0, limit=1, regexp=True)
         )  # noqa
         assert len(found_origins0) == 1
         assert found_origins0[0] in [origin, origin2]
 
         # offset=1
         found_origins1 = list(
             swh_storage.origin_search(".*/.*", offset=1, limit=1, regexp=True)
         )  # noqa
         assert len(found_origins1) == 1
         assert found_origins1[0] in [origin, origin2]
 
         # check both origins were returned
         assert found_origins0 != found_origins1
 
     def test_origin_visit_add(self, swh_storage):
         # given
         origin_url = swh_storage.origin_add_one(data.origin2)
         date_visit = now()
 
         # Round to milliseconds before insertion, so equality doesn't fail
         # after a round-trip through a DB (eg. Cassandra)
         date_visit = date_visit.replace(microsecond=round(date_visit.microsecond, -3))
 
         # when
         origin_visit = swh_storage.origin_visit_add(
             origin_url, type=data.type_visit1, date=date_visit
         )
 
         expected_origin_visit = {
             "origin": origin_url,
             "date": date_visit,
             "visit": origin_visit.visit,
             "type": data.type_visit1,
             "status": "ongoing",
             "metadata": None,
             "snapshot": None,
         }
         assert origin_visit == OriginVisit.from_dict(expected_origin_visit)
         actual_origin_visits = list(swh_storage.origin_visit_get(origin_url))
 
         assert expected_origin_visit in actual_origin_visits
 
         objects = list(swh_storage.journal_writer.journal.objects)
         assert ("origin", Origin.from_dict(data.origin2)) in objects
         assert ("origin_visit", OriginVisit.from_dict(expected_origin_visit)) in objects
 
     def test_origin_visit_get__unknown_origin(self, swh_storage):
         assert [] == list(swh_storage.origin_visit_get("foo"))
 
     def test_origin_visit_add_default_type(self, swh_storage):
         # given
         origin_url = swh_storage.origin_add_one(data.origin2)
 
         # when
         date_visit = now()
         date_visit2 = date_visit + datetime.timedelta(minutes=1)
 
         # Round to milliseconds before insertion, so equality doesn't fail
         # after a round-trip through a DB (eg. Cassandra)
         date_visit = date_visit.replace(microsecond=round(date_visit.microsecond, -3))
         date_visit2 = date_visit2.replace(
             microsecond=round(date_visit2.microsecond, -3)
         )
 
         origin_visit1 = swh_storage.origin_visit_add(
             origin_url, date=date_visit, type=data.type_visit1
         )
         origin_visit2 = swh_storage.origin_visit_add(
             origin_url, date=date_visit2, type=data.type_visit2
         )
 
         # then
         assert origin_visit1.origin == origin_url
         assert origin_visit1.visit is not None
 
         actual_origin_visits = list(swh_storage.origin_visit_get(origin_url))
         expected_visits = [
             {
                 "origin": origin_url,
                 "date": date_visit,
                 "visit": origin_visit1.visit,
                 "type": data.type_visit1,
                 "status": "ongoing",
                 "metadata": None,
                 "snapshot": None,
             },
             {
                 "origin": origin_url,
                 "date": date_visit2,
                 "visit": origin_visit2.visit,
                 "type": data.type_visit2,
                 "status": "ongoing",
                 "metadata": None,
                 "snapshot": None,
             },
         ]
         assert len(expected_visits) == len(actual_origin_visits)
         for visit in expected_visits:
             assert visit in actual_origin_visits
 
         objects = list(swh_storage.journal_writer.journal.objects)
         assert ("origin", Origin.from_dict(data.origin2)) in objects
 
         for visit in expected_visits:
             assert ("origin_visit", OriginVisit.from_dict(visit)) in objects
 
     def test_origin_visit_add_validation(self, swh_storage):
         origin_url = swh_storage.origin_add_one(data.origin2)
         with pytest.raises(StorageArgumentException) as cm:
             swh_storage.origin_visit_add(origin_url, date=[b"foo"], type="git")
 
         if type(cm.value) == psycopg2.ProgrammingError:
             assert cm.value.pgcode == psycopg2.errorcodes.UNDEFINED_FUNCTION
 
     def test_origin_visit_status_add_validation(self, swh_storage):
         """Wrong origin_visit_status input should raise storage argument error"""
         date_visit = now()
         visit_status1 = OriginVisitStatus(
             origin="unknown-origin-url",
             visit=10,
             date=date_visit,
             status="full",
             snapshot=None,
         )
         with pytest.raises(StorageArgumentException, match="Unknown origin"):
             swh_storage.origin_visit_status_add([visit_status1])
 
     def test_origin_visit_status_add(self, swh_storage):
         """Correct origin visit statuses should add a new visit status
 
         """
         origin_url = swh_storage.origin_add_one(data.origin2)
         origin_visit1 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit1, type=data.type_visit1
         )
         snapshot_id = data.snapshot["id"]
         date_visit_now = now()
         visit_status1 = OriginVisitStatus(
             origin=origin_visit1.origin,
             visit=origin_visit1.visit,
             date=date_visit_now,
             status="full",
             snapshot=snapshot_id,
         )
 
         origin_url2 = swh_storage.origin_add_one({"url": "new-origin"})
         origin_visit2 = swh_storage.origin_visit_add(
             origin_url2, date=data.date_visit2, type=data.type_visit2
         )
         date_visit_now = now()
         visit_status2 = OriginVisitStatus(
             origin=origin_visit2.origin,
             visit=origin_visit2.visit,
             date=date_visit_now,
             status="ongoing",
             snapshot=None,
             metadata={"intrinsic": "something"},
         )
         swh_storage.origin_visit_status_add([visit_status1, visit_status2])
 
         origin_visit1 = swh_storage.origin_visit_get_latest(
             origin_url, require_snapshot=True
         )
         assert origin_visit1
         assert origin_visit1["status"] == "full"
         assert origin_visit1["snapshot"] == snapshot_id
 
         origin_visit2 = swh_storage.origin_visit_get_latest(
             origin_url2, require_snapshot=False
         )
         assert origin_url2 != origin_url
         assert origin_visit2
         assert origin_visit2["status"] == "ongoing"
         assert origin_visit2["snapshot"] is None
         assert origin_visit2["metadata"] == {"intrinsic": "something"}
 
     def test_origin_visit_update(self, swh_storage):
         # given
         origin_url = swh_storage.origin_add_one(data.origin)
         origin_url2 = swh_storage.origin_add_one(data.origin2)
         # date_visit < date_visit2
         date_visit = data.date_visit1
         date_visit2 = data.date_visit2
 
         # Round to milliseconds before insertion, so equality doesn't fail
         # after a round-trip through a DB (eg. Cassandra)
         date_visit = date_visit.replace(microsecond=round(date_visit.microsecond, -3))
         date_visit2 = date_visit2.replace(
             microsecond=round(date_visit2.microsecond, -3)
         )
 
         origin_visit1 = swh_storage.origin_visit_add(
             origin_url, date=date_visit, type=data.type_visit1
         )
         origin_visit2 = swh_storage.origin_visit_add(
             origin_url, date=date_visit2, type=data.type_visit2
         )
         origin_visit3 = swh_storage.origin_visit_add(
             origin_url2, date=date_visit2, type=data.type_visit3
         )
 
         # when
         visit1_metadata = {
             "contents": 42,
             "directories": 22,
         }
         swh_storage.origin_visit_update(
             origin_url, origin_visit1.visit, status="full", metadata=visit1_metadata
         )
         swh_storage.origin_visit_update(
             origin_url2, origin_visit3.visit, status="partial"
         )
 
         # then
         actual_origin_visits = list(swh_storage.origin_visit_get(origin_url))
         expected_visits = [
             {
                 "origin": origin_url,
                 "date": date_visit,
                 "visit": origin_visit1.visit,
                 "type": data.type_visit1,
                 "status": "full",
                 "metadata": visit1_metadata,
                 "snapshot": None,
             },
             {
                 "origin": origin_url,
                 "date": date_visit2,
                 "visit": origin_visit2.visit,
                 "type": data.type_visit2,
                 "status": "ongoing",
                 "metadata": None,
                 "snapshot": None,
             },
         ]
         for visit in expected_visits:
             assert visit in actual_origin_visits
 
         actual_origin_visits_bis = list(
             swh_storage.origin_visit_get(origin_url, limit=1)
         )
         assert actual_origin_visits_bis == [
             {
                 "origin": origin_url,
                 "date": date_visit,
                 "visit": origin_visit1.visit,
                 "type": data.type_visit1,
                 "status": "full",
                 "metadata": visit1_metadata,
                 "snapshot": None,
             }
         ]
 
         actual_origin_visits_ter = list(
             swh_storage.origin_visit_get(origin_url, last_visit=origin_visit1.visit)
         )
         assert actual_origin_visits_ter == [
             {
                 "origin": origin_url,
                 "date": date_visit2,
                 "visit": origin_visit2.visit,
                 "type": data.type_visit2,
                 "status": "ongoing",
                 "metadata": None,
                 "snapshot": None,
             }
         ]
 
         actual_origin_visits2 = list(swh_storage.origin_visit_get(origin_url2))
         assert actual_origin_visits2 == [
             {
                 "origin": origin_url2,
                 "date": date_visit2,
                 "visit": origin_visit3.visit,
                 "type": data.type_visit3,
                 "status": "partial",
                 "metadata": None,
                 "snapshot": None,
             }
         ]
 
         data1 = {
             "origin": origin_url,
             "date": date_visit,
             "visit": origin_visit1.visit,
             "type": data.type_visit1,
             "status": "ongoing",
             "metadata": None,
             "snapshot": None,
         }
         data2 = {
             "origin": origin_url,
             "date": date_visit2,
             "visit": origin_visit2.visit,
             "type": data.type_visit2,
             "status": "ongoing",
             "metadata": None,
             "snapshot": None,
         }
         data3 = {
             "origin": origin_url2,
             "date": date_visit2,
             "visit": origin_visit3.visit,
             "type": data.type_visit3,
             "status": "ongoing",
             "metadata": None,
             "snapshot": None,
         }
         data4 = {
             "origin": origin_url,
             "date": date_visit,
             "visit": origin_visit1.visit,
             "type": data.type_visit1,
             "metadata": visit1_metadata,
             "status": "full",
             "snapshot": None,
         }
         data5 = {
             "origin": origin_url2,
             "date": date_visit2,
             "visit": origin_visit3.visit,
             "type": data.type_visit3,
             "status": "partial",
             "metadata": None,
             "snapshot": None,
         }
         objects = list(swh_storage.journal_writer.journal.objects)
         assert ("origin", Origin.from_dict(data.origin)) in objects
         assert ("origin", Origin.from_dict(data.origin2)) in objects
         assert ("origin_visit", OriginVisit.from_dict(data1)) in objects
         assert ("origin_visit", OriginVisit.from_dict(data2)) in objects
         assert ("origin_visit", OriginVisit.from_dict(data3)) in objects
         assert ("origin_visit", OriginVisit.from_dict(data4)) in objects
         assert ("origin_visit", OriginVisit.from_dict(data5)) in objects
 
     def test_origin_visit_update_validation(self, swh_storage):
         origin_url = swh_storage.origin_add_one(data.origin)
         visit = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit2, type=data.type_visit2
         )
         with pytest.raises(
             (StorageArgumentException, ValueError), match="status"
         ) as cm:
             swh_storage.origin_visit_update(origin_url, visit.visit, status="foobar")
 
         if type(cm.value) == psycopg2.DataError:
             assert cm.value.pgcode == psycopg2.errorcodes.INVALID_TEXT_REPRESENTATION
 
     def test_origin_visit_find_by_date(self, swh_storage):
         # given
         origin_url = swh_storage.origin_add_one(data.origin)
         swh_storage.origin_visit_add(
             origin_url, date=data.date_visit2, type=data.type_visit1
         )
 
         origin_visit2 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit3, type=data.type_visit2
         )
         origin_visit3 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit2, type=data.type_visit3
         )
 
         # Simple case
         visit = swh_storage.origin_visit_find_by_date(origin_url, data.date_visit3)
         assert visit["visit"] == origin_visit2.visit
 
         # There are two visits at the same date, the latest must be returned
         visit = swh_storage.origin_visit_find_by_date(origin_url, data.date_visit2)
         assert visit["visit"] == origin_visit3.visit
 
     def test_origin_visit_find_by_date__unknown_origin(self, swh_storage):
         swh_storage.origin_visit_find_by_date("foo", data.date_visit2)
 
     def test_origin_visit_update_missing_snapshot(self, swh_storage):
         # given
         origin_url = swh_storage.origin_add_one(data.origin)
         origin_visit = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit1, type=data.type_visit1
         )
 
         # when
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit.visit,
             status="ongoing",
             snapshot=data.snapshot["id"],
         )  # snapshot does not exist yet
 
         # then
         actual_origin_visit = swh_storage.origin_visit_get_by(
             origin_url, origin_visit.visit
         )
         assert actual_origin_visit["snapshot"] == data.snapshot["id"]
 
         # when
         swh_storage.snapshot_add([data.snapshot])
         assert actual_origin_visit["snapshot"] == data.snapshot["id"]
 
     def test_origin_visit_get_by(self, swh_storage):
         origin_url = swh_storage.origin_add_one(data.origin)
         origin_url2 = swh_storage.origin_add_one(data.origin2)
         origin_visit1 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit2, type=data.type_visit2
         )
 
         swh_storage.snapshot_add([data.snapshot])
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit1.visit,
             status="ongoing",
             snapshot=data.snapshot["id"],
         )
 
         # Add some other {origin, visit} entries
         swh_storage.origin_visit_add(
             origin_url, date=data.date_visit3, type=data.type_visit3
         )
         swh_storage.origin_visit_add(
             origin_url2, date=data.date_visit3, type=data.type_visit3
         )
 
         # when
         visit1_metadata = {
             "contents": 42,
             "directories": 22,
         }
 
         swh_storage.origin_visit_update(
             origin_url, origin_visit1.visit, status="full", metadata=visit1_metadata
         )
 
         expected_origin_visit = origin_visit1.to_dict()
         expected_origin_visit.update(
             {
                 "origin": origin_url,
                 "visit": origin_visit1.visit,
                 "date": data.date_visit2,
                 "type": data.type_visit2,
                 "metadata": visit1_metadata,
                 "status": "full",
                 "snapshot": data.snapshot["id"],
             }
         )
 
         # when
         actual_origin_visit1 = swh_storage.origin_visit_get_by(
             origin_url, origin_visit1.visit
         )
 
         # then
         assert actual_origin_visit1 == expected_origin_visit
 
     def test_origin_visit_get_by__unknown_origin(self, swh_storage):
         assert swh_storage.origin_visit_get_by("foo", 10) is None
 
     def test_origin_visit_upsert_new(self, swh_storage):
         # given
         origin_url = swh_storage.origin_add_one(data.origin2)
 
         # when
         swh_storage.origin_visit_upsert(
             [
                 OriginVisit.from_dict(
                     {
                         "origin": origin_url,
                         "date": data.date_visit2,
                         "visit": 123,
                         "type": data.type_visit2,
                         "status": "full",
                         "metadata": None,
                         "snapshot": None,
                     }
                 ),
                 OriginVisit.from_dict(
                     {
                         "origin": origin_url,
                         "date": "2018-01-01 23:00:00+00",
                         "visit": 1234,
                         "type": data.type_visit2,
                         "status": "full",
                         "metadata": None,
                         "snapshot": None,
                     }
                 ),
             ]
         )
 
         # then
         actual_origin_visits = list(swh_storage.origin_visit_get(origin_url))
         assert actual_origin_visits == [
             {
                 "origin": origin_url,
                 "date": data.date_visit2,
                 "visit": 123,
                 "type": data.type_visit2,
                 "status": "full",
                 "metadata": None,
                 "snapshot": None,
             },
             {
                 "origin": origin_url,
                 "date": data.date_visit3,
                 "visit": 1234,
                 "type": data.type_visit2,
                 "status": "full",
                 "metadata": None,
                 "snapshot": None,
             },
         ]
 
         data1 = {
             "origin": origin_url,
             "date": data.date_visit2,
             "visit": 123,
             "type": data.type_visit2,
             "status": "full",
             "metadata": None,
             "snapshot": None,
         }
         data2 = {
             "origin": origin_url,
             "date": data.date_visit3,
             "visit": 1234,
             "type": data.type_visit2,
             "status": "full",
             "metadata": None,
             "snapshot": None,
         }
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("origin", Origin.from_dict(data.origin2)),
             ("origin_visit", OriginVisit.from_dict(data1)),
             ("origin_visit", OriginVisit.from_dict(data2)),
         ]
 
     def test_origin_visit_upsert_existing(self, swh_storage):
         # given
         origin_url = swh_storage.origin_add_one(data.origin2)
 
         # when
         origin_visit1 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit2, type=data.type_visit1
         )
 
         swh_storage.origin_visit_upsert(
             [
                 OriginVisit.from_dict(
                     {
                         "origin": origin_url,
                         "date": data.date_visit2,
                         "visit": origin_visit1.visit,
                         "type": data.type_visit1,
                         "status": "full",
                         "metadata": None,
                         "snapshot": None,
                     }
                 )
             ]
         )
 
         # then
         assert origin_visit1.origin == origin_url
         assert origin_visit1.visit is not None
 
         actual_origin_visits = list(swh_storage.origin_visit_get(origin_url))
         assert actual_origin_visits == [
             {
                 "origin": origin_url,
                 "date": data.date_visit2,
                 "visit": origin_visit1.visit,
                 "type": data.type_visit1,
                 "status": "full",
                 "metadata": None,
                 "snapshot": None,
             }
         ]
 
         data1 = {
             "origin": origin_url,
             "date": data.date_visit2,
             "visit": origin_visit1.visit,
             "status": "ongoing",
             "metadata": None,
             "snapshot": None,
         }
         data2 = {
             "origin": origin_url,
             "date": data.date_visit2,
             "visit": origin_visit1.visit,
             "type": data.type_visit1,
             "status": "full",
             "metadata": None,
             "snapshot": None,
         }
         actual_written_objects = list(swh_storage.journal_writer.journal.objects)
         assert actual_written_objects == [
             ("origin", Origin.from_dict(data.origin2)),
             (
                 "origin_visit",
                 OriginVisit.from_dict({**data1, "type": data.type_visit1,}),
             ),
             ("origin_visit_status", OriginVisitStatus.from_dict(data1)),
             ("origin_visit", OriginVisit.from_dict(data2)),
         ]
 
     def test_origin_visit_upsert_missing_visit_id(self, swh_storage):
         # given
         origin_url = swh_storage.origin_add_one(data.origin2)
 
         # then
         with pytest.raises(StorageArgumentException, match="Missing visit id"):
             swh_storage.origin_visit_upsert(
                 [
                     OriginVisit.from_dict(
                         {
                             "origin": origin_url,
                             "date": data.date_visit2,
                             "visit": None,  # <- make the test raise
                             "type": data.type_visit1,
                             "status": "full",
                             "metadata": None,
                             "snapshot": None,
                         }
                     )
                 ]
             )
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("origin", Origin.from_dict(data.origin2))
         ]
 
     def test_origin_visit_get_by_no_result(self, swh_storage):
         swh_storage.origin_add([data.origin])
         actual_origin_visit = swh_storage.origin_visit_get_by(data.origin["url"], 999)
         assert actual_origin_visit is None
 
     def test_origin_visit_get_latest(self, swh_storage):
         origin_url = swh_storage.origin_add_one(data.origin)
         ov1 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit1, type=data.type_visit1
         )
         ov2 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit2, type=data.type_visit2
         )
         # Add a visit with the same date as the previous one
         ov3 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit2, type=data.type_visit2
         )
 
         origin_visit1 = swh_storage.origin_visit_get_by(origin_url, ov1.visit)
         origin_visit2 = swh_storage.origin_visit_get_by(origin_url, ov2.visit)
         origin_visit3 = swh_storage.origin_visit_get_by(origin_url, ov3.visit)
         # Two visits, both with no snapshot
         assert origin_visit3 == swh_storage.origin_visit_get_latest(origin_url)
         assert (
             swh_storage.origin_visit_get_latest(origin_url, require_snapshot=True)
             is None
         )
 
         # Add snapshot to visit1; require_snapshot=True makes it return
         # visit1 and require_snapshot=False still returns visit2
         swh_storage.snapshot_add([data.complete_snapshot])
         swh_storage.origin_visit_update(
             origin_url,
             ov1.visit,
             status="ongoing",
             snapshot=data.complete_snapshot["id"],
         )
         assert {
             **origin_visit1,
             "snapshot": data.complete_snapshot["id"],
         } == swh_storage.origin_visit_get_latest(origin_url, require_snapshot=True)
 
         assert origin_visit3 == swh_storage.origin_visit_get_latest(origin_url)
 
         # Status filter: all three visits are status=ongoing, so no visit
         # returned
         assert (
             swh_storage.origin_visit_get_latest(origin_url, allowed_statuses=["full"])
             is None
         )
 
         # Mark the first visit as completed and check status filter again
         swh_storage.origin_visit_update(origin_url, ov1.visit, status="full")
         assert {
             **origin_visit1,
             "snapshot": data.complete_snapshot["id"],
             "status": "full",
         } == swh_storage.origin_visit_get_latest(origin_url, allowed_statuses=["full"])
 
         assert origin_visit3 == swh_storage.origin_visit_get_latest(origin_url)
 
         # Add snapshot to visit2 and check that the new snapshot is returned
         swh_storage.snapshot_add([data.empty_snapshot])
         swh_storage.origin_visit_update(
             origin_url, ov2.visit, status="ongoing", snapshot=data.empty_snapshot["id"]
         )
         assert {
             **origin_visit2,
             "snapshot": data.empty_snapshot["id"],
         } == swh_storage.origin_visit_get_latest(origin_url, require_snapshot=True)
 
         assert origin_visit3 == swh_storage.origin_visit_get_latest(origin_url)
 
         # Check that the status filter is still working
         assert {
             **origin_visit1,
             "snapshot": data.complete_snapshot["id"],
             "status": "full",
         } == swh_storage.origin_visit_get_latest(origin_url, allowed_statuses=["full"])
 
         # Add snapshot to visit3 (same date as visit2)
         swh_storage.snapshot_add([data.complete_snapshot])
         swh_storage.origin_visit_update(
             origin_url,
             ov3.visit,
             status="ongoing",
             snapshot=data.complete_snapshot["id"],
         )
         assert {
             **origin_visit1,
             "snapshot": data.complete_snapshot["id"],
             "status": "full",
         } == swh_storage.origin_visit_get_latest(origin_url, allowed_statuses=["full"])
         assert {
             **origin_visit1,
             "snapshot": data.complete_snapshot["id"],
             "status": "full",
         } == swh_storage.origin_visit_get_latest(
             origin_url, allowed_statuses=["full"], require_snapshot=True
         )
         assert {
             **origin_visit3,
             "snapshot": data.complete_snapshot["id"],
         } == swh_storage.origin_visit_get_latest(origin_url)
 
         assert {
             **origin_visit3,
             "snapshot": data.complete_snapshot["id"],
         } == swh_storage.origin_visit_get_latest(origin_url, require_snapshot=True)
 
     def test_person_fullname_unicity(self, swh_storage):
         # given (person injection through revisions for example)
         revision = data.revision
 
         # create a revision with same committer fullname but wo name and email
         revision2 = copy.deepcopy(data.revision2)
         revision2["committer"] = dict(revision["committer"])
         revision2["committer"]["email"] = None
         revision2["committer"]["name"] = None
 
         swh_storage.revision_add([revision])
         swh_storage.revision_add([revision2])
 
         # when getting added revisions
         revisions = list(swh_storage.revision_get([revision["id"], revision2["id"]]))
 
         # then
         # check committers are the same
         assert revisions[0]["committer"] == revisions[1]["committer"]
 
     def test_snapshot_add_get_empty(self, swh_storage):
         origin_url = swh_storage.origin_add_one(data.origin)
         origin_visit1 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit1, type=data.type_visit1
         )
         visit_id = origin_visit1.visit
 
         actual_result = swh_storage.snapshot_add([data.empty_snapshot])
         assert actual_result == {"snapshot:add": 1}
 
+        date_now = now()
+
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit1.visit,
             status="ongoing",
             snapshot=data.empty_snapshot["id"],
-            # date=data.date_visit2
+            date=date_now,
         )
 
         by_id = swh_storage.snapshot_get(data.empty_snapshot["id"])
         assert by_id == {**data.empty_snapshot, "next_branch": None}
 
         by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, visit_id)
         assert by_ov == {**data.empty_snapshot, "next_branch": None}
 
         data1 = {
             "origin": origin_url,
             "date": data.date_visit1,
             "visit": origin_visit1.visit,
             "status": "ongoing",
             "metadata": None,
             "snapshot": None,
         }
         data2 = {
             "origin": origin_url,
             "date": data.date_visit1,
             "visit": origin_visit1.visit,
             "status": "ongoing",
             "metadata": None,
             "snapshot": data.empty_snapshot["id"],
         }
         actual_objects = list(swh_storage.journal_writer.journal.objects)
         assert actual_objects == [
             ("origin", Origin.from_dict(data.origin)),
             (
                 "origin_visit",
                 OriginVisit.from_dict({**data1, "type": data.type_visit1},),
             ),
             ("origin_visit_status", OriginVisitStatus.from_dict(data1)),
             ("snapshot", Snapshot.from_dict(data.empty_snapshot)),
             (
                 "origin_visit",
                 OriginVisit.from_dict({**data2, "type": data.type_visit1,}),
             ),
-            # ("origin_visit_status", OriginVisitStatus.from_dict(data2)),
+            (
+                "origin_visit_status",
+                OriginVisitStatus.from_dict({**data2, "date": date_now}),
+            ),
         ]
 
     def test_snapshot_add_get_complete(self, swh_storage):
         origin_url = data.origin["url"]
         origin_url = swh_storage.origin_add_one(data.origin)
         origin_visit1 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit1, type=data.type_visit1
         )
         visit_id = origin_visit1.visit
 
         actual_result = swh_storage.snapshot_add([data.complete_snapshot])
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit1.visit,
             status="ongoing",
             snapshot=data.complete_snapshot["id"],
         )
         assert actual_result == {"snapshot:add": 1}
 
         by_id = swh_storage.snapshot_get(data.complete_snapshot["id"])
         assert by_id == {**data.complete_snapshot, "next_branch": None}
 
         by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, visit_id)
         assert by_ov == {**data.complete_snapshot, "next_branch": None}
 
     def test_snapshot_add_many(self, swh_storage):
         actual_result = swh_storage.snapshot_add(
             [data.snapshot, data.complete_snapshot]
         )
         assert actual_result == {"snapshot:add": 2}
 
         assert {
             **data.complete_snapshot,
             "next_branch": None,
         } == swh_storage.snapshot_get(data.complete_snapshot["id"])
 
         assert {**data.snapshot, "next_branch": None} == swh_storage.snapshot_get(
             data.snapshot["id"]
         )
 
         swh_storage.refresh_stat_counters()
         assert swh_storage.stat_counters()["snapshot"] == 2
 
     def test_snapshot_add_many_from_generator(self, swh_storage):
         def _snp_gen():
             yield data.snapshot
             yield data.complete_snapshot
 
         actual_result = swh_storage.snapshot_add(_snp_gen())
         assert actual_result == {"snapshot:add": 2}
 
         swh_storage.refresh_stat_counters()
         assert swh_storage.stat_counters()["snapshot"] == 2
 
     def test_snapshot_add_many_incremental(self, swh_storage):
         actual_result = swh_storage.snapshot_add([data.complete_snapshot])
         assert actual_result == {"snapshot:add": 1}
 
         actual_result2 = swh_storage.snapshot_add(
             [data.snapshot, data.complete_snapshot]
         )
         assert actual_result2 == {"snapshot:add": 1}
 
         assert {
             **data.complete_snapshot,
             "next_branch": None,
         } == swh_storage.snapshot_get(data.complete_snapshot["id"])
 
         assert {**data.snapshot, "next_branch": None} == swh_storage.snapshot_get(
             data.snapshot["id"]
         )
 
     def test_snapshot_add_twice(self, swh_storage):
         actual_result = swh_storage.snapshot_add([data.empty_snapshot])
         assert actual_result == {"snapshot:add": 1}
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("snapshot", Snapshot.from_dict(data.empty_snapshot))
         ]
 
         actual_result = swh_storage.snapshot_add([data.snapshot])
         assert actual_result == {"snapshot:add": 1}
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("snapshot", Snapshot.from_dict(data.empty_snapshot)),
             ("snapshot", Snapshot.from_dict(data.snapshot)),
         ]
 
     def test_snapshot_add_validation(self, swh_storage):
         snap = copy.deepcopy(data.snapshot)
         snap["branches"][b"foo"] = {"target_type": "revision"}
 
         with pytest.raises(StorageArgumentException, match="target"):
             swh_storage.snapshot_add([snap])
 
         snap = copy.deepcopy(data.snapshot)
         snap["branches"][b"foo"] = {"target": b"\x42" * 20}
 
         with pytest.raises(StorageArgumentException, match="target_type"):
             swh_storage.snapshot_add([snap])
 
     def test_snapshot_add_count_branches(self, swh_storage):
         actual_result = swh_storage.snapshot_add([data.complete_snapshot])
         assert actual_result == {"snapshot:add": 1}
 
         snp_id = data.complete_snapshot["id"]
         snp_size = swh_storage.snapshot_count_branches(snp_id)
 
         expected_snp_size = {
             "alias": 1,
             "content": 1,
             "directory": 2,
             "release": 1,
             "revision": 1,
             "snapshot": 1,
             None: 1,
         }
         assert snp_size == expected_snp_size
 
     def test_snapshot_add_get_paginated(self, swh_storage):
         swh_storage.snapshot_add([data.complete_snapshot])
 
         snp_id = data.complete_snapshot["id"]
         branches = data.complete_snapshot["branches"]
         branch_names = list(sorted(branches))
 
         # Test branch_from
         snapshot = swh_storage.snapshot_get_branches(snp_id, branches_from=b"release")
 
         rel_idx = branch_names.index(b"release")
         expected_snapshot = {
             "id": snp_id,
             "branches": {name: branches[name] for name in branch_names[rel_idx:]},
             "next_branch": None,
         }
 
         assert snapshot == expected_snapshot
 
         # Test branches_count
         snapshot = swh_storage.snapshot_get_branches(snp_id, branches_count=1)
 
         expected_snapshot = {
             "id": snp_id,
             "branches": {branch_names[0]: branches[branch_names[0]],},
             "next_branch": b"content",
         }
         assert snapshot == expected_snapshot
 
         # test branch_from + branches_count
 
         snapshot = swh_storage.snapshot_get_branches(
             snp_id, branches_from=b"directory", branches_count=3
         )
 
         dir_idx = branch_names.index(b"directory")
         expected_snapshot = {
             "id": snp_id,
             "branches": {
                 name: branches[name] for name in branch_names[dir_idx : dir_idx + 3]
             },
             "next_branch": branch_names[dir_idx + 3],
         }
 
         assert snapshot == expected_snapshot
 
     def test_snapshot_add_get_filtered(self, swh_storage):
         origin_url = swh_storage.origin_add_one(data.origin)
         origin_visit1 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit1, type=data.type_visit1
         )
 
         swh_storage.snapshot_add([data.complete_snapshot])
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit1.visit,
             status="ongoing",
             snapshot=data.complete_snapshot["id"],
         )
 
         snp_id = data.complete_snapshot["id"]
         branches = data.complete_snapshot["branches"]
 
         snapshot = swh_storage.snapshot_get_branches(
             snp_id, target_types=["release", "revision"]
         )
 
         expected_snapshot = {
             "id": snp_id,
             "branches": {
                 name: tgt
                 for name, tgt in branches.items()
                 if tgt and tgt["target_type"] in ["release", "revision"]
             },
             "next_branch": None,
         }
 
         assert snapshot == expected_snapshot
 
         snapshot = swh_storage.snapshot_get_branches(snp_id, target_types=["alias"])
 
         expected_snapshot = {
             "id": snp_id,
             "branches": {
                 name: tgt
                 for name, tgt in branches.items()
                 if tgt and tgt["target_type"] == "alias"
             },
             "next_branch": None,
         }
 
         assert snapshot == expected_snapshot
 
     def test_snapshot_add_get_filtered_and_paginated(self, swh_storage):
         swh_storage.snapshot_add([data.complete_snapshot])
 
         snp_id = data.complete_snapshot["id"]
         branches = data.complete_snapshot["branches"]
         branch_names = list(sorted(branches))
 
         # Test branch_from
 
         snapshot = swh_storage.snapshot_get_branches(
             snp_id, target_types=["directory", "release"], branches_from=b"directory2"
         )
 
         expected_snapshot = {
             "id": snp_id,
             "branches": {name: branches[name] for name in (b"directory2", b"release")},
             "next_branch": None,
         }
 
         assert snapshot == expected_snapshot
 
         # Test branches_count
 
         snapshot = swh_storage.snapshot_get_branches(
             snp_id, target_types=["directory", "release"], branches_count=1
         )
 
         expected_snapshot = {
             "id": snp_id,
             "branches": {b"directory": branches[b"directory"]},
             "next_branch": b"directory2",
         }
         assert snapshot == expected_snapshot
 
         # Test branches_count
 
         snapshot = swh_storage.snapshot_get_branches(
             snp_id, target_types=["directory", "release"], branches_count=2
         )
 
         expected_snapshot = {
             "id": snp_id,
             "branches": {
                 name: branches[name] for name in (b"directory", b"directory2")
             },
             "next_branch": b"release",
         }
         assert snapshot == expected_snapshot
 
         # test branch_from + branches_count
 
         snapshot = swh_storage.snapshot_get_branches(
             snp_id,
             target_types=["directory", "release"],
             branches_from=b"directory2",
             branches_count=1,
         )
 
         dir_idx = branch_names.index(b"directory2")
         expected_snapshot = {
             "id": snp_id,
             "branches": {branch_names[dir_idx]: branches[branch_names[dir_idx]],},
             "next_branch": b"release",
         }
 
         assert snapshot == expected_snapshot
 
     def test_snapshot_add_get(self, swh_storage):
         origin_url = swh_storage.origin_add_one(data.origin)
         origin_visit1 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit1, type=data.type_visit1
         )
         visit_id = origin_visit1.visit
 
         swh_storage.snapshot_add([data.snapshot])
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit1.visit,
             status="ongoing",
             snapshot=data.snapshot["id"],
         )
 
         by_id = swh_storage.snapshot_get(data.snapshot["id"])
         assert by_id == {**data.snapshot, "next_branch": None}
 
         by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, visit_id)
         assert by_ov == {**data.snapshot, "next_branch": None}
 
         origin_visit_info = swh_storage.origin_visit_get_by(origin_url, visit_id)
         assert origin_visit_info["snapshot"] == data.snapshot["id"]
 
     def test_snapshot_add_nonexistent_visit(self, swh_storage):
         origin_url = swh_storage.origin_add_one(data.origin)
         # unknown visit
         visit_id = 54164461156
         swh_storage.journal_writer.journal.objects[:] = []
 
         swh_storage.snapshot_add([data.snapshot])
 
         with pytest.raises(StorageArgumentException):
             swh_storage.origin_visit_update(
                 origin_url, visit_id, status="ongoing", snapshot=data.snapshot["id"]
             )
 
         assert list(swh_storage.journal_writer.journal.objects) == [
             ("snapshot", Snapshot.from_dict(data.snapshot))
         ]
 
     def test_snapshot_add_twice__by_origin_visit(self, swh_storage):
         origin_url = swh_storage.origin_add_one(data.origin)
         origin_visit1 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit1, type=data.type_visit1
         )
         visit1_id = origin_visit1.visit
         swh_storage.snapshot_add([data.snapshot])
+        date_now2 = now()
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit1.visit,
             status="ongoing",
             snapshot=data.snapshot["id"],
+            date=date_now2,
         )
 
         by_ov1 = swh_storage.snapshot_get_by_origin_visit(origin_url, visit1_id)
         assert by_ov1 == {**data.snapshot, "next_branch": None}
 
         origin_visit2 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit2, type=data.type_visit2
         )
         visit2_id = origin_visit2.visit
 
         swh_storage.snapshot_add([data.snapshot])
+        date_now4 = now()
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit2.visit,
             status="ongoing",
             snapshot=data.snapshot["id"],
+            date=date_now4,
         )
 
         by_ov2 = swh_storage.snapshot_get_by_origin_visit(origin_url, visit2_id)
         assert by_ov2 == {**data.snapshot, "next_branch": None}
 
         data1 = {
             "origin": origin_url,
             "date": data.date_visit1,
             "visit": origin_visit1.visit,
-            # "type": data.type_visit1,
             "status": "ongoing",
             "metadata": None,
             "snapshot": None,
         }
         data2 = {
             "origin": origin_url,
             "date": data.date_visit1,
             "visit": origin_visit1.visit,
-            "type": data.type_visit1,
             "status": "ongoing",
             "metadata": None,
             "snapshot": data.snapshot["id"],
         }
         data3 = {
             "origin": origin_url,
             "date": data.date_visit2,
             "visit": origin_visit2.visit,
-            # "type": data.type_visit2,
             "status": "ongoing",
             "metadata": None,
             "snapshot": None,
         }
         data4 = {
             "origin": origin_url,
             "date": data.date_visit2,
             "visit": origin_visit2.visit,
-            "type": data.type_visit2,
             "status": "ongoing",
             "metadata": None,
             "snapshot": data.snapshot["id"],
         }
-        assert list(swh_storage.journal_writer.journal.objects) == [
+        actual_objects = list(swh_storage.journal_writer.journal.objects)
+        assert actual_objects == [
             ("origin", Origin.from_dict(data.origin)),
             (
                 "origin_visit",
                 OriginVisit.from_dict({**data1, "type": data.type_visit1}),
             ),
             ("origin_visit_status", OriginVisitStatus.from_dict(data1)),
             ("snapshot", Snapshot.from_dict(data.snapshot)),
-            ("origin_visit", OriginVisit.from_dict(data2)),
+            (
+                "origin_visit",
+                OriginVisit.from_dict({**data2, "type": data.type_visit1}),
+            ),
+            (
+                "origin_visit_status",
+                OriginVisitStatus.from_dict({**data2, "date": date_now2}),
+            ),
             (
                 "origin_visit",
                 OriginVisit.from_dict({**data3, "type": data.type_visit2}),
             ),
             ("origin_visit_status", OriginVisitStatus.from_dict(data3)),
-            ("origin_visit", OriginVisit.from_dict(data4)),
+            (
+                "origin_visit",
+                OriginVisit.from_dict({**data4, "type": data.type_visit2}),
+            ),
+            (
+                "origin_visit_status",
+                OriginVisitStatus.from_dict({**data4, "date": date_now4}),
+            ),
         ]
 
     def test_snapshot_get_latest(self, swh_storage):
         origin_url = swh_storage.origin_add_one(data.origin)
         origin_visit1 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit1, type=data.type_visit1
         )
         origin_visit2 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit2, type=data.type_visit2
         )
         # Add a visit with the same date as the previous one
         origin_visit3 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit2, type=data.type_visit3
         )
         # Two visits, both with no snapshot: latest snapshot is None
         assert swh_storage.snapshot_get_latest(origin_url) is None
 
         # Add snapshot to visit1, latest snapshot = visit 1 snapshot
         swh_storage.snapshot_add([data.complete_snapshot])
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit1.visit,
             status="ongoing",
             snapshot=data.complete_snapshot["id"],
         )
         assert {
             **data.complete_snapshot,
             "next_branch": None,
         } == swh_storage.snapshot_get_latest(origin_url)
 
         # Status filter: all three visits are status=ongoing, so no snapshot
         # returned
         assert (
             swh_storage.snapshot_get_latest(origin_url, allowed_statuses=["full"])
             is None
         )
 
         # Mark the first visit as completed and check status filter again
         swh_storage.origin_visit_update(origin_url, origin_visit1.visit, status="full")
         assert {
             **data.complete_snapshot,
             "next_branch": None,
         } == swh_storage.snapshot_get_latest(origin_url, allowed_statuses=["full"])
 
         # Add snapshot to visit2 and check that the new snapshot is returned
         swh_storage.snapshot_add([data.empty_snapshot])
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit2.visit,
             status="ongoing",
             snapshot=data.empty_snapshot["id"],
         )
         assert {
             **data.empty_snapshot,
             "next_branch": None,
         } == swh_storage.snapshot_get_latest(origin_url)
 
         # Check that the status filter is still working
         assert {
             **data.complete_snapshot,
             "next_branch": None,
         } == swh_storage.snapshot_get_latest(origin_url, allowed_statuses=["full"])
 
         # Add snapshot to visit3 (same date as visit2) and check that
         # the new snapshot is returned
         swh_storage.snapshot_add([data.complete_snapshot])
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit3.visit,
             status="ongoing",
             snapshot=data.complete_snapshot["id"],
         )
         assert {
             **data.complete_snapshot,
             "next_branch": None,
         } == swh_storage.snapshot_get_latest(origin_url)
 
     def test_snapshot_get_latest__missing_snapshot(self, swh_storage):
         origin_url = swh_storage.origin_add_one(data.origin)
         assert swh_storage.snapshot_get_latest(origin_url) is None
         origin_visit1 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit1, type=data.type_visit1
         )
         origin_visit2 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit2, type=data.type_visit2
         )
 
         # Two visits, both with no snapshot: latest snapshot is None
         assert swh_storage.snapshot_get_latest(origin_url) is None
 
         # Add unknown snapshot to visit1, check that the inconsistency is
         # detected
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit1.visit,
             status="ongoing",
             snapshot=data.complete_snapshot["id"],
         )
         with pytest.raises(Exception):
             # XXX: should the exception be more specific than this?
             swh_storage.snapshot_get_latest(origin_url)
 
         # Status filter: both visits are status=ongoing, so no snapshot
         # returned
         assert (
             swh_storage.snapshot_get_latest(origin_url, allowed_statuses=["full"])
             is None
         )
 
         # Mark the first visit as completed and check status filter again
         swh_storage.origin_visit_update(origin_url, origin_visit1.visit, status="full")
         with pytest.raises(Exception):
             # XXX: should the exception be more specific than this?
             swh_storage.snapshot_get_latest(origin_url, allowed_statuses=["full"]),
 
         # Actually add the snapshot and check status filter again
         swh_storage.snapshot_add([data.complete_snapshot])
         assert {
             **data.complete_snapshot,
             "next_branch": None,
         } == swh_storage.snapshot_get_latest(origin_url)
 
         # Add unknown snapshot to visit2 and check that the inconsistency
         # is detected
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit2.visit,
             status="ongoing",
             snapshot=data.snapshot["id"],
         )
         with pytest.raises(Exception):
             # XXX: should the exception be more specific than this?
             swh_storage.snapshot_get_latest(origin_url)
 
         # Actually add that snapshot and check that the new one is returned
         swh_storage.snapshot_add([data.snapshot])
         assert {
             **data.snapshot,
             "next_branch": None,
         } == swh_storage.snapshot_get_latest(origin_url)
 
     def test_snapshot_get_random(self, swh_storage):
         swh_storage.snapshot_add(
             [data.snapshot, data.empty_snapshot, data.complete_snapshot]
         )
 
         assert swh_storage.snapshot_get_random() in {
             data.snapshot["id"],
             data.empty_snapshot["id"],
             data.complete_snapshot["id"],
         }
 
     def test_snapshot_missing(self, swh_storage):
         snap = data.snapshot
         missing_snap = data.empty_snapshot
         snapshots = [snap["id"], missing_snap["id"]]
         swh_storage.snapshot_add([snap])
 
         missing_snapshots = swh_storage.snapshot_missing(snapshots)
 
         assert list(missing_snapshots) == [missing_snap["id"]]
 
     def test_stat_counters(self, swh_storage):
         expected_keys = ["content", "directory", "origin", "revision"]
 
         # Initially, all counters are 0
 
         swh_storage.refresh_stat_counters()
         counters = swh_storage.stat_counters()
         assert set(expected_keys) <= set(counters)
         for key in expected_keys:
             assert counters[key] == 0
 
         # Add a content. Only the content counter should increase.
 
         swh_storage.content_add([data.cont])
 
         swh_storage.refresh_stat_counters()
         counters = swh_storage.stat_counters()
 
         assert set(expected_keys) <= set(counters)
         for key in expected_keys:
             if key != "content":
                 assert counters[key] == 0
         assert counters["content"] == 1
 
         # Add other objects. Check their counter increased as well.
 
         origin_url = swh_storage.origin_add_one(data.origin2)
         origin_visit1 = swh_storage.origin_visit_add(
             origin_url, date=data.date_visit2, type=data.type_visit2
         )
 
         swh_storage.snapshot_add([data.snapshot])
         swh_storage.origin_visit_update(
             origin_url,
             origin_visit1.visit,
             status="ongoing",
             snapshot=data.snapshot["id"],
         )
         swh_storage.directory_add([data.dir])
         swh_storage.revision_add([data.revision])
         swh_storage.release_add([data.release])
 
         swh_storage.refresh_stat_counters()
         counters = swh_storage.stat_counters()
         assert counters["content"] == 1
         assert counters["directory"] == 1
         assert counters["snapshot"] == 1
         assert counters["origin"] == 1
         assert counters["origin_visit"] == 1
         assert counters["revision"] == 1
         assert counters["release"] == 1
         assert counters["snapshot"] == 1
         if "person" in counters:
             assert counters["person"] == 3
 
     def test_content_find_ctime(self, swh_storage):
         cont = data.cont.copy()
         del cont["data"]
         ctime = now()
         cont["ctime"] = ctime
         swh_storage.content_add_metadata([cont])
 
         actually_present = swh_storage.content_find({"sha1": cont["sha1"]})
 
         # check ctime up to one second
         dt = actually_present[0]["ctime"] - ctime
         assert abs(dt.total_seconds()) <= 1
         del actually_present[0]["ctime"]
 
         assert actually_present[0] == {
             "sha1": cont["sha1"],
             "sha256": cont["sha256"],
             "sha1_git": cont["sha1_git"],
             "blake2s256": cont["blake2s256"],
             "length": cont["length"],
             "status": "visible",
         }
 
     def test_content_find_with_present_content(self, swh_storage):
         # 1. with something to find
         cont = data.cont
         swh_storage.content_add([cont, data.cont2])
 
         actually_present = swh_storage.content_find({"sha1": cont["sha1"]})
         assert 1 == len(actually_present)
         actually_present[0].pop("ctime")
 
         assert actually_present[0] == {
             "sha1": cont["sha1"],
             "sha256": cont["sha256"],
             "sha1_git": cont["sha1_git"],
             "blake2s256": cont["blake2s256"],
             "length": cont["length"],
             "status": "visible",
         }
 
         # 2. with something to find
         actually_present = swh_storage.content_find({"sha1_git": cont["sha1_git"]})
         assert 1 == len(actually_present)
 
         actually_present[0].pop("ctime")
         assert actually_present[0] == {
             "sha1": cont["sha1"],
             "sha256": cont["sha256"],
             "sha1_git": cont["sha1_git"],
             "blake2s256": cont["blake2s256"],
             "length": cont["length"],
             "status": "visible",
         }
 
         # 3. with something to find
         actually_present = swh_storage.content_find({"sha256": cont["sha256"]})
         assert 1 == len(actually_present)
 
         actually_present[0].pop("ctime")
         assert actually_present[0] == {
             "sha1": cont["sha1"],
             "sha256": cont["sha256"],
             "sha1_git": cont["sha1_git"],
             "blake2s256": cont["blake2s256"],
             "length": cont["length"],
             "status": "visible",
         }
 
         # 4. with something to find
         actually_present = swh_storage.content_find(
             {
                 "sha1": cont["sha1"],
                 "sha1_git": cont["sha1_git"],
                 "sha256": cont["sha256"],
                 "blake2s256": cont["blake2s256"],
             }
         )
         assert 1 == len(actually_present)
 
         actually_present[0].pop("ctime")
         assert actually_present[0] == {
             "sha1": cont["sha1"],
             "sha256": cont["sha256"],
             "sha1_git": cont["sha1_git"],
             "blake2s256": cont["blake2s256"],
             "length": cont["length"],
             "status": "visible",
         }
 
     def test_content_find_with_non_present_content(self, swh_storage):
         # 1. with something that does not exist
         missing_cont = data.missing_cont
 
         actually_present = swh_storage.content_find({"sha1": missing_cont["sha1"]})
 
         assert actually_present == []
 
         # 2. with something that does not exist
         actually_present = swh_storage.content_find(
             {"sha1_git": missing_cont["sha1_git"]}
         )
 
         assert actually_present == []
 
         # 3. with something that does not exist
         actually_present = swh_storage.content_find({"sha256": missing_cont["sha256"]})
 
         assert actually_present == []
 
     def test_content_find_with_duplicate_input(self, swh_storage):
         cont1 = data.cont
         duplicate_cont = cont1.copy()
 
         # Create fake data with colliding sha256 and blake2s256
         sha1_array = bytearray(duplicate_cont["sha1"])
         sha1_array[0] += 1
         duplicate_cont["sha1"] = bytes(sha1_array)
         sha1git_array = bytearray(duplicate_cont["sha1_git"])
         sha1git_array[0] += 1
         duplicate_cont["sha1_git"] = bytes(sha1git_array)
         # Inject the data
         swh_storage.content_add([cont1, duplicate_cont])
         finder = {
             "blake2s256": duplicate_cont["blake2s256"],
             "sha256": duplicate_cont["sha256"],
         }
         actual_result = list(swh_storage.content_find(finder))
 
         cont1.pop("data")
         duplicate_cont.pop("data")
         actual_result[0].pop("ctime")
         actual_result[1].pop("ctime")
 
         expected_result = [cont1, duplicate_cont]
         for result in expected_result:
             assert result in actual_result
 
     def test_content_find_with_duplicate_sha256(self, swh_storage):
         cont1 = data.cont
         duplicate_cont = cont1.copy()
 
         # Create fake data with colliding sha256
         for hashalgo in ("sha1", "sha1_git", "blake2s256"):
             value = bytearray(duplicate_cont[hashalgo])
             value[0] += 1
             duplicate_cont[hashalgo] = bytes(value)
         swh_storage.content_add([cont1, duplicate_cont])
 
         finder = {"sha256": duplicate_cont["sha256"]}
         actual_result = list(swh_storage.content_find(finder))
         assert len(actual_result) == 2
 
         cont1.pop("data")
         duplicate_cont.pop("data")
         actual_result[0].pop("ctime")
         actual_result[1].pop("ctime")
         expected_result = [cont1, duplicate_cont]
         assert expected_result == sorted(actual_result, key=lambda x: x["sha1"])
 
         # Find with both sha256 and blake2s256
         finder = {
             "sha256": duplicate_cont["sha256"],
             "blake2s256": duplicate_cont["blake2s256"],
         }
         actual_result = list(swh_storage.content_find(finder))
         assert len(actual_result) == 1
         actual_result[0].pop("ctime")
 
         expected_result = [duplicate_cont]
         assert actual_result[0] == duplicate_cont
 
     def test_content_find_with_duplicate_blake2s256(self, swh_storage):
         cont1 = data.cont
         duplicate_cont = cont1.copy()
 
         # Create fake data with colliding sha256 and blake2s256
         sha1_array = bytearray(duplicate_cont["sha1"])
         sha1_array[0] += 1
         duplicate_cont["sha1"] = bytes(sha1_array)
         sha1git_array = bytearray(duplicate_cont["sha1_git"])
         sha1git_array[0] += 1
         duplicate_cont["sha1_git"] = bytes(sha1git_array)
         sha256_array = bytearray(duplicate_cont["sha256"])
         sha256_array[0] += 1
         duplicate_cont["sha256"] = bytes(sha256_array)
         swh_storage.content_add([cont1, duplicate_cont])
         finder = {"blake2s256": duplicate_cont["blake2s256"]}
         actual_result = list(swh_storage.content_find(finder))
 
         cont1.pop("data")
         duplicate_cont.pop("data")
         actual_result[0].pop("ctime")
         actual_result[1].pop("ctime")
         expected_result = [cont1, duplicate_cont]
         for result in expected_result:
             assert result in actual_result
 
         # Find with both sha256 and blake2s256
         finder = {
             "sha256": duplicate_cont["sha256"],
             "blake2s256": duplicate_cont["blake2s256"],
         }
         actual_result = list(swh_storage.content_find(finder))
 
         actual_result[0].pop("ctime")
 
         expected_result = [duplicate_cont]
         assert expected_result == actual_result
 
     def test_content_find_bad_input(self, swh_storage):
         # 1. with bad input
         with pytest.raises(StorageArgumentException):
             swh_storage.content_find({})  # empty is bad
 
         # 2. with bad input
         with pytest.raises(StorageArgumentException):
             swh_storage.content_find({"unknown-sha1": "something"})  # not the right key
 
     def test_object_find_by_sha1_git(self, swh_storage):
         sha1_gits = [b"00000000000000000000"]
         expected = {
             b"00000000000000000000": [],
         }
 
         swh_storage.content_add([data.cont])
         sha1_gits.append(data.cont["sha1_git"])
         expected[data.cont["sha1_git"]] = [
             {"sha1_git": data.cont["sha1_git"], "type": "content",}
         ]
 
         swh_storage.directory_add([data.dir])
         sha1_gits.append(data.dir["id"])
         expected[data.dir["id"]] = [{"sha1_git": data.dir["id"], "type": "directory",}]
 
         swh_storage.revision_add([data.revision])
         sha1_gits.append(data.revision["id"])
         expected[data.revision["id"]] = [
             {"sha1_git": data.revision["id"], "type": "revision",}
         ]
 
         swh_storage.release_add([data.release])
         sha1_gits.append(data.release["id"])
         expected[data.release["id"]] = [
             {"sha1_git": data.release["id"], "type": "release",}
         ]
 
         ret = swh_storage.object_find_by_sha1_git(sha1_gits)
 
         assert expected == ret
 
     def test_metadata_fetcher_add_get(self, swh_storage):
         actual_fetcher = swh_storage.metadata_fetcher_get(
             data.metadata_fetcher["name"], data.metadata_fetcher["version"]
         )
         assert actual_fetcher is None  # does not exist
 
         swh_storage.metadata_fetcher_add(**data.metadata_fetcher)
 
         res = swh_storage.metadata_fetcher_get(
             data.metadata_fetcher["name"], data.metadata_fetcher["version"]
         )
 
         assert res is not data.metadata_fetcher
         assert res == data.metadata_fetcher
 
     def test_metadata_authority_add_get(self, swh_storage):
         actual_authority = swh_storage.metadata_authority_get(
             data.metadata_authority["type"], data.metadata_authority["url"]
         )
         assert actual_authority is None  # does not exist
 
         swh_storage.metadata_authority_add(**data.metadata_authority)
 
         res = swh_storage.metadata_authority_get(
             data.metadata_authority["type"], data.metadata_authority["url"]
         )
 
         assert res is not data.metadata_authority
         assert res == data.metadata_authority
 
     def test_origin_metadata_add(self, swh_storage):
         origin = data.origin
         fetcher = data.metadata_fetcher
         authority = data.metadata_authority
         swh_storage.origin_add([origin])[0]
 
         swh_storage.metadata_fetcher_add(**fetcher)
         swh_storage.metadata_authority_add(**authority)
 
         swh_storage.origin_metadata_add(**data.origin_metadata)
         swh_storage.origin_metadata_add(**data.origin_metadata2)
 
         result = swh_storage.origin_metadata_get(origin["url"], authority)
         assert result["next_page_token"] is None
         assert [data.origin_metadata, data.origin_metadata2] == list(
             sorted(result["results"], key=lambda x: x["discovery_date"],)
         )
 
     def test_origin_metadata_add_duplicate(self, swh_storage):
         """Duplicates should be silently updated."""
         origin = data.origin
         fetcher = data.metadata_fetcher
         authority = data.metadata_authority
         swh_storage.origin_add([origin])[0]
 
         new_origin_metadata2 = {
             **data.origin_metadata2,
             "format": "new-format",
             "metadata": b"new-metadata",
         }
 
         swh_storage.metadata_fetcher_add(**fetcher)
         swh_storage.metadata_authority_add(**authority)
 
         swh_storage.origin_metadata_add(**data.origin_metadata)
         swh_storage.origin_metadata_add(**data.origin_metadata2)
         swh_storage.origin_metadata_add(**new_origin_metadata2)
 
         result = swh_storage.origin_metadata_get(origin["url"], authority)
         assert result["next_page_token"] is None
         assert [data.origin_metadata, new_origin_metadata2] == list(
             sorted(result["results"], key=lambda x: x["discovery_date"],)
         )
 
     def test_origin_metadata_add_dict(self, swh_storage):
         origin = data.origin
         fetcher = data.metadata_fetcher
         authority = data.metadata_authority
         swh_storage.origin_add([origin])[0]
 
         swh_storage.metadata_fetcher_add(**fetcher)
         swh_storage.metadata_authority_add(**authority)
 
         kwargs = data.origin_metadata.copy()
         kwargs["metadata"] = {"foo": "bar"}
 
         with pytest.raises(StorageArgumentException):
             swh_storage.origin_metadata_add(**kwargs)
 
     def test_origin_metadata_get(self, swh_storage):
         authority = data.metadata_authority
         fetcher = data.metadata_fetcher
         authority2 = data.metadata_authority2
         fetcher2 = data.metadata_fetcher2
         origin_url1 = data.origin["url"]
         origin_url2 = data.origin2["url"]
         swh_storage.origin_add([data.origin])
         swh_storage.origin_add([data.origin2])
 
         origin1_metadata1 = data.origin_metadata
         origin1_metadata2 = data.origin_metadata2
         origin1_metadata3 = data.origin_metadata3
         origin2_metadata = {**data.origin_metadata2, "origin_url": origin_url2}
 
         swh_storage.metadata_authority_add(**authority)
         swh_storage.metadata_fetcher_add(**fetcher)
         swh_storage.metadata_authority_add(**authority2)
         swh_storage.metadata_fetcher_add(**fetcher2)
 
         swh_storage.origin_metadata_add(**origin1_metadata1)
         swh_storage.origin_metadata_add(**origin1_metadata2)
         swh_storage.origin_metadata_add(**origin1_metadata3)
         swh_storage.origin_metadata_add(**origin2_metadata)
 
         result = swh_storage.origin_metadata_get(origin_url1, authority)
         assert result["next_page_token"] is None
         assert [origin1_metadata1, origin1_metadata2] == list(
             sorted(result["results"], key=lambda x: x["discovery_date"],)
         )
 
         result = swh_storage.origin_metadata_get(origin_url1, authority2)
         assert result["next_page_token"] is None
         assert [origin1_metadata3] == list(
             sorted(result["results"], key=lambda x: x["discovery_date"],)
         )
 
         result = swh_storage.origin_metadata_get(origin_url2, authority)
         assert result["next_page_token"] is None
         assert [origin2_metadata] == list(result["results"],)
 
     def test_origin_metadata_get_after(self, swh_storage):
         origin = data.origin
         fetcher = data.metadata_fetcher
         authority = data.metadata_authority
         swh_storage.origin_add([origin])[0]
 
         swh_storage.metadata_fetcher_add(**fetcher)
         swh_storage.metadata_authority_add(**authority)
 
         swh_storage.origin_metadata_add(**data.origin_metadata)
         swh_storage.origin_metadata_add(**data.origin_metadata2)
 
         result = swh_storage.origin_metadata_get(
             origin["url"],
             authority,
             after=data.origin_metadata["discovery_date"] - timedelta(seconds=1),
         )
         assert result["next_page_token"] is None
         assert [data.origin_metadata, data.origin_metadata2] == list(
             sorted(result["results"], key=lambda x: x["discovery_date"],)
         )
 
         result = swh_storage.origin_metadata_get(
             origin["url"], authority, after=data.origin_metadata["discovery_date"]
         )
         assert result["next_page_token"] is None
         assert [data.origin_metadata2] == result["results"]
 
         result = swh_storage.origin_metadata_get(
             origin["url"], authority, after=data.origin_metadata2["discovery_date"]
         )
         assert result["next_page_token"] is None
         assert [] == result["results"]
 
     def test_origin_metadata_get_paginate(self, swh_storage):
         origin = data.origin
         fetcher = data.metadata_fetcher
         authority = data.metadata_authority
         swh_storage.origin_add([origin])[0]
 
         swh_storage.metadata_fetcher_add(**fetcher)
         swh_storage.metadata_authority_add(**authority)
 
         swh_storage.origin_metadata_add(**data.origin_metadata)
         swh_storage.origin_metadata_add(**data.origin_metadata2)
 
         swh_storage.origin_metadata_get(origin["url"], authority)
 
         result = swh_storage.origin_metadata_get(origin["url"], authority, limit=1)
         assert result["next_page_token"] is not None
         assert [data.origin_metadata] == result["results"]
 
         result = swh_storage.origin_metadata_get(
             origin["url"], authority, limit=1, page_token=result["next_page_token"]
         )
         assert result["next_page_token"] is None
         assert [data.origin_metadata2] == result["results"]
 
     def test_origin_metadata_get_paginate_same_date(self, swh_storage):
         origin = data.origin
         fetcher1 = data.metadata_fetcher
         fetcher2 = data.metadata_fetcher2
         authority = data.metadata_authority
         swh_storage.origin_add([origin])[0]
 
         swh_storage.metadata_fetcher_add(**fetcher1)
         swh_storage.metadata_fetcher_add(**fetcher2)
         swh_storage.metadata_authority_add(**authority)
 
         origin_metadata2 = {
             **data.origin_metadata2,
             "discovery_date": data.origin_metadata2["discovery_date"],
             "fetcher": {"name": fetcher2["name"], "version": fetcher2["version"],},
         }
 
         swh_storage.origin_metadata_add(**data.origin_metadata)
         swh_storage.origin_metadata_add(**origin_metadata2)
 
         result = swh_storage.origin_metadata_get(origin["url"], authority, limit=1)
         assert result["next_page_token"] is not None
         assert [data.origin_metadata] == result["results"]
 
         result = swh_storage.origin_metadata_get(
             origin["url"], authority, limit=1, page_token=result["next_page_token"]
         )
         assert result["next_page_token"] is None
         assert [origin_metadata2] == result["results"]
 
 
 class TestStorageGeneratedData:
     def test_generate_content_get(self, swh_storage, swh_contents):
         contents_with_data = [c for c in swh_contents if c["status"] != "absent"]
         # input the list of sha1s we want from storage
         get_sha1s = [c["sha1"] for c in contents_with_data]
 
         # retrieve contents
         actual_contents = list(swh_storage.content_get(get_sha1s))
         assert None not in actual_contents
         assert_contents_ok(contents_with_data, actual_contents)
 
     def test_generate_content_get_metadata(self, swh_storage, swh_contents):
         # input the list of sha1s we want from storage
         expected_contents = [c for c in swh_contents if c["status"] != "absent"]
         get_sha1s = [c["sha1"] for c in expected_contents]
 
         # retrieve contents
         meta_contents = swh_storage.content_get_metadata(get_sha1s)
 
         assert len(list(meta_contents)) == len(get_sha1s)
 
         actual_contents = []
         for contents in meta_contents.values():
             actual_contents.extend(contents)
 
         keys_to_check = {"length", "status", "sha1", "sha1_git", "sha256", "blake2s256"}
 
         assert_contents_ok(
             expected_contents, actual_contents, keys_to_check=keys_to_check
         )
 
     def test_generate_content_get_range(self, swh_storage, swh_contents):
         """content_get_range returns complete range"""
         present_contents = [c for c in swh_contents if c["status"] != "absent"]
 
         get_sha1s = sorted([c["sha1"] for c in swh_contents if c["status"] != "absent"])
         start = get_sha1s[2]
         end = get_sha1s[-2]
         actual_result = swh_storage.content_get_range(start, end)
 
         assert actual_result["next"] is None
 
         actual_contents = actual_result["contents"]
         expected_contents = [c for c in present_contents if start <= c["sha1"] <= end]
         if expected_contents:
             assert_contents_ok(expected_contents, actual_contents, ["sha1"])
         else:
             assert actual_contents == []
 
     def test_generate_content_get_range_full(self, swh_storage, swh_contents):
         """content_get_range for a full range returns all available contents"""
         present_contents = [c for c in swh_contents if c["status"] != "absent"]
 
         start = b"0" * 40
         end = b"f" * 40
         actual_result = swh_storage.content_get_range(start, end)
         assert actual_result["next"] is None
 
         actual_contents = actual_result["contents"]
         expected_contents = [c for c in present_contents if start <= c["sha1"] <= end]
         if expected_contents:
             assert_contents_ok(expected_contents, actual_contents, ["sha1"])
         else:
             assert actual_contents == []
 
     def test_generate_content_get_range_empty(self, swh_storage, swh_contents):
         """content_get_range for an empty range returns nothing"""
         start = b"0" * 40
         end = b"f" * 40
         actual_result = swh_storage.content_get_range(end, start)
         assert actual_result["next"] is None
         assert len(actual_result["contents"]) == 0
 
     def test_generate_content_get_range_limit_none(self, swh_storage):
         """content_get_range call with wrong limit input should fail"""
         with pytest.raises(StorageArgumentException) as e:
             swh_storage.content_get_range(start=None, end=None, limit=None)
 
         assert e.value.args == ("limit should not be None",)
 
     def test_generate_content_get_range_no_limit(self, swh_storage, swh_contents):
         """content_get_range returns contents within range provided"""
         # input the list of sha1s we want from storage
         get_sha1s = sorted([c["sha1"] for c in swh_contents if c["status"] != "absent"])
         start = get_sha1s[0]
         end = get_sha1s[-1]
 
         # retrieve contents
         actual_result = swh_storage.content_get_range(start, end)
 
         actual_contents = actual_result["contents"]
         assert actual_result["next"] is None
         assert len(actual_contents) == len(get_sha1s)
 
         expected_contents = [c for c in swh_contents if c["status"] != "absent"]
         assert_contents_ok(expected_contents, actual_contents, ["sha1"])
 
     def test_generate_content_get_range_limit(self, swh_storage, swh_contents):
         """content_get_range paginates results if limit exceeded"""
         contents_map = {c["sha1"]: c for c in swh_contents}
 
         # input the list of sha1s we want from storage
         get_sha1s = sorted([c["sha1"] for c in swh_contents if c["status"] != "absent"])
         start = get_sha1s[0]
         end = get_sha1s[-1]
 
         # retrieve contents limited to n-1 results
         limited_results = len(get_sha1s) - 1
         actual_result = swh_storage.content_get_range(start, end, limit=limited_results)
 
         actual_contents = actual_result["contents"]
         assert actual_result["next"] == get_sha1s[-1]
         assert len(actual_contents) == limited_results
 
         expected_contents = [contents_map[sha1] for sha1 in get_sha1s[:-1]]
         assert_contents_ok(expected_contents, actual_contents, ["sha1"])
 
         # retrieve next part
         actual_results2 = swh_storage.content_get_range(start=end, end=end)
         assert actual_results2["next"] is None
         actual_contents2 = actual_results2["contents"]
         assert len(actual_contents2) == 1
 
         assert_contents_ok([contents_map[get_sha1s[-1]]], actual_contents2, ["sha1"])
 
     def test_origin_get_range_from_zero(self, swh_storage, swh_origins):
         actual_origins = list(
             swh_storage.origin_get_range(origin_from=0, origin_count=0)
         )
         assert len(actual_origins) == 0
 
         actual_origins = list(
             swh_storage.origin_get_range(origin_from=0, origin_count=1)
         )
         assert len(actual_origins) == 1
         assert actual_origins[0]["id"] == 1
         assert actual_origins[0]["url"] == swh_origins[0]["url"]
 
     @pytest.mark.parametrize(
         "origin_from,origin_count",
         [(1, 1), (1, 10), (1, 20), (1, 101), (11, 0), (11, 10), (91, 11)],
     )
     def test_origin_get_range(
         self, swh_storage, swh_origins, origin_from, origin_count
     ):
         actual_origins = list(
             swh_storage.origin_get_range(
                 origin_from=origin_from, origin_count=origin_count
             )
         )
 
         origins_with_id = list(enumerate(swh_origins, start=1))
         expected_origins = [
             {"url": origin["url"], "id": origin_id,}
             for (origin_id, origin) in origins_with_id[
                 origin_from - 1 : origin_from + origin_count - 1
             ]
         ]
 
         assert actual_origins == expected_origins
 
     @pytest.mark.parametrize("limit", [1, 7, 10, 100, 1000])
     def test_origin_list(self, swh_storage, swh_origins, limit):
         returned_origins = []
 
         page_token = None
         i = 0
         while True:
             result = swh_storage.origin_list(page_token=page_token, limit=limit)
             assert len(result["origins"]) <= limit
 
             returned_origins.extend(origin["url"] for origin in result["origins"])
 
             i += 1
             page_token = result.get("next_page_token")
 
             if page_token is None:
                 assert i * limit >= len(swh_origins)
                 break
             else:
                 assert len(result["origins"]) == limit
 
         expected_origins = [origin["url"] for origin in swh_origins]
         assert sorted(returned_origins) == sorted(expected_origins)
 
     ORIGINS = [
         "https://github.com/user1/repo1",
         "https://github.com/user2/repo1",
         "https://github.com/user3/repo1",
         "https://gitlab.com/user1/repo1",
         "https://gitlab.com/user2/repo1",
         "https://forge.softwareheritage.org/source/repo1",
     ]
 
     def test_origin_count(self, swh_storage):
         swh_storage.origin_add([{"url": url} for url in self.ORIGINS])
 
         assert swh_storage.origin_count("github") == 3
         assert swh_storage.origin_count("gitlab") == 2
         assert swh_storage.origin_count(".*user.*", regexp=True) == 5
         assert swh_storage.origin_count(".*user.*", regexp=False) == 0
         assert swh_storage.origin_count(".*user1.*", regexp=True) == 2
         assert swh_storage.origin_count(".*user1.*", regexp=False) == 0
 
     def test_origin_count_with_visit_no_visits(self, swh_storage):
         swh_storage.origin_add([{"url": url} for url in self.ORIGINS])
 
         # none of them have visits, so with_visit=True => 0
         assert swh_storage.origin_count("github", with_visit=True) == 0
         assert swh_storage.origin_count("gitlab", with_visit=True) == 0
         assert swh_storage.origin_count(".*user.*", regexp=True, with_visit=True) == 0
         assert swh_storage.origin_count(".*user.*", regexp=False, with_visit=True) == 0
         assert swh_storage.origin_count(".*user1.*", regexp=True, with_visit=True) == 0
         assert swh_storage.origin_count(".*user1.*", regexp=False, with_visit=True) == 0
 
     def test_origin_count_with_visit_with_visits_no_snapshot(self, swh_storage):
         swh_storage.origin_add([{"url": url} for url in self.ORIGINS])
 
         origin_url = "https://github.com/user1/repo1"
         swh_storage.origin_visit_add(origin_url, date=now(), type="git")
 
         assert swh_storage.origin_count("github", with_visit=False) == 3
         # it has a visit, but no snapshot, so with_visit=True => 0
         assert swh_storage.origin_count("github", with_visit=True) == 0
 
         assert swh_storage.origin_count("gitlab", with_visit=False) == 2
         # these gitlab origins have no visit
         assert swh_storage.origin_count("gitlab", with_visit=True) == 0
 
         assert (
             swh_storage.origin_count("github.*user1", regexp=True, with_visit=False)
             == 1
         )
         assert (
             swh_storage.origin_count("github.*user1", regexp=True, with_visit=True) == 0
         )
         assert swh_storage.origin_count("github", regexp=True, with_visit=True) == 0
 
     def test_origin_count_with_visit_with_visits_and_snapshot(self, swh_storage):
         swh_storage.origin_add([{"url": url} for url in self.ORIGINS])
 
         swh_storage.snapshot_add([data.snapshot])
         origin_url = "https://github.com/user1/repo1"
         visit = swh_storage.origin_visit_add(origin_url, date=now(), type="git")
         swh_storage.origin_visit_update(
             origin_url, visit.visit, status="ongoing", snapshot=data.snapshot["id"]
         )
 
         assert swh_storage.origin_count("github", with_visit=False) == 3
         # github/user1 has a visit and a snapshot, so with_visit=True => 1
         assert swh_storage.origin_count("github", with_visit=True) == 1
 
         assert (
             swh_storage.origin_count("github.*user1", regexp=True, with_visit=False)
             == 1
         )
         assert (
             swh_storage.origin_count("github.*user1", regexp=True, with_visit=True) == 1
         )
         assert swh_storage.origin_count("github", regexp=True, with_visit=True) == 1
 
     @settings(suppress_health_check=[HealthCheck.too_slow])
     @given(strategies.lists(objects(), max_size=2))
     def test_add_arbitrary(self, swh_storage, objects):
         for (obj_type, obj) in objects:
             obj = obj.to_dict()
             if obj_type == "origin_visit":
                 origin_url = obj.pop("origin")
                 swh_storage.origin_add_one({"url": origin_url})
                 if "visit" in obj:
                     del obj["visit"]
                 swh_storage.origin_visit_add(origin_url, obj["date"], obj["type"])
             elif obj_type == "origin_visit_update":
                 # internal object for now, they don't have a storage
                 # endpoint yet
                 continue
             else:
                 if obj_type == "content" and obj["status"] == "absent":
                     obj_type = "skipped_content"
                 method = getattr(swh_storage, obj_type + "_add")
                 try:
                     method([obj])
                 except HashCollision:
                     pass
 
 
 @pytest.mark.db
 class TestLocalStorage:
     """Test the local storage"""
 
     # This test is only relevant on the local storage, with an actual
     # objstorage raising an exception
     def test_content_add_objstorage_exception(self, swh_storage):
         swh_storage.objstorage.content_add = Mock(
             side_effect=Exception("mocked broken objstorage")
         )
 
         with pytest.raises(Exception) as e:
             swh_storage.content_add([data.cont])
 
         assert e.value.args == ("mocked broken objstorage",)
         missing = list(swh_storage.content_missing([data.cont]))
         assert missing == [data.cont["sha1"]]
 
 
 @pytest.mark.db
 class TestStorageRaceConditions:
     @pytest.mark.xfail
     def test_content_add_race(self, swh_storage):
 
         results = queue.Queue()
 
         def thread():
             try:
                 with db_transaction(swh_storage) as (db, cur):
                     ret = swh_storage.content_add([data.cont], db=db, cur=cur)
                 results.put((threading.get_ident(), "data", ret))
             except Exception as e:
                 results.put((threading.get_ident(), "exc", e))
 
         t1 = threading.Thread(target=thread)
         t2 = threading.Thread(target=thread)
         t1.start()
         # this avoids the race condition
         # import time
         # time.sleep(1)
         t2.start()
         t1.join()
         t2.join()
 
         r1 = results.get(block=False)
         r2 = results.get(block=False)
 
         with pytest.raises(queue.Empty):
             results.get(block=False)
         assert r1[0] != r2[0]
         assert r1[1] == "data", "Got exception %r in Thread%s" % (r1[2], r1[0])
         assert r2[1] == "data", "Got exception %r in Thread%s" % (r2[2], r2[0])
 
 
 @pytest.mark.db
 class TestPgStorage:
     """This class is dedicated for the rare case where the schema needs to
        be altered dynamically.
 
        Otherwise, the tests could be blocking when ran altogether.
 
     """
 
     def test_content_update_with_new_cols(self, swh_storage):
         swh_storage.journal_writer.journal = None  # TODO, not supported
 
         with db_transaction(swh_storage) as (_, cur):
             cur.execute(
                 """alter table content
                            add column test text default null,
                            add column test2 text default null"""
             )
 
         cont = copy.deepcopy(data.cont2)
         swh_storage.content_add([cont])
         cont["test"] = "value-1"
         cont["test2"] = "value-2"
 
         swh_storage.content_update([cont], keys=["test", "test2"])
         with db_transaction(swh_storage) as (_, cur):
             cur.execute(
                 """SELECT sha1, sha1_git, sha256, length, status,
                    test, test2
                    FROM content WHERE sha1 = %s""",
                 (cont["sha1"],),
             )
 
             datum = cur.fetchone()
 
         assert datum == (
             cont["sha1"],
             cont["sha1_git"],
             cont["sha256"],
             cont["length"],
             "visible",
             cont["test"],
             cont["test2"],
         )
 
         with db_transaction(swh_storage) as (_, cur):
             cur.execute(
                 """alter table content drop column test,
                                                drop column test2"""
             )
 
     def test_content_add_db(self, swh_storage):
         cont = data.cont
 
         actual_result = swh_storage.content_add([cont])
 
         assert actual_result == {
             "content:add": 1,
             "content:add:bytes": cont["length"],
         }
 
         if hasattr(swh_storage, "objstorage"):
             assert cont["sha1"] in swh_storage.objstorage.objstorage
 
         with db_transaction(swh_storage) as (_, cur):
             cur.execute(
                 "SELECT sha1, sha1_git, sha256, length, status"
                 " FROM content WHERE sha1 = %s",
                 (cont["sha1"],),
             )
             datum = cur.fetchone()
 
         assert datum == (
             cont["sha1"],
             cont["sha1_git"],
             cont["sha256"],
             cont["length"],
             "visible",
         )
 
         expected_cont = cont.copy()
         del expected_cont["data"]
         contents = [
             obj
             for (obj_type, obj) in swh_storage.journal_writer.journal.objects
             if obj_type == "content"
         ]
         assert len(contents) == 1
         for obj in contents:
             obj_d = obj.to_dict()
             del obj_d["ctime"]
             assert obj_d == expected_cont
 
     def test_content_add_metadata_db(self, swh_storage):
         cont = data.cont
         del cont["data"]
         cont["ctime"] = now()
 
         actual_result = swh_storage.content_add_metadata([cont])
 
         assert actual_result == {
             "content:add": 1,
         }
 
         if hasattr(swh_storage, "objstorage"):
             assert cont["sha1"] not in swh_storage.objstorage.objstorage
         with db_transaction(swh_storage) as (_, cur):
             cur.execute(
                 "SELECT sha1, sha1_git, sha256, length, status"
                 " FROM content WHERE sha1 = %s",
                 (cont["sha1"],),
             )
             datum = cur.fetchone()
         assert datum == (
             cont["sha1"],
             cont["sha1_git"],
             cont["sha256"],
             cont["length"],
             "visible",
         )
 
         contents = [
             obj
             for (obj_type, obj) in swh_storage.journal_writer.journal.objects
             if obj_type == "content"
         ]
         assert len(contents) == 1
         for obj in contents:
             obj_d = obj.to_dict()
             assert obj_d == cont
 
     def test_skipped_content_add_db(self, swh_storage):
         cont = data.skipped_cont
         cont2 = data.skipped_cont2
         cont2["blake2s256"] = None
 
         actual_result = swh_storage.skipped_content_add([cont, cont, cont2])
 
         assert 2 <= actual_result.pop("skipped_content:add") <= 3
         assert actual_result == {}
 
         with db_transaction(swh_storage) as (_, cur):
             cur.execute(
                 "SELECT sha1, sha1_git, sha256, blake2s256, "
                 "length, status, reason "
                 "FROM skipped_content ORDER BY sha1_git"
             )
 
             dbdata = cur.fetchall()
 
         assert len(dbdata) == 2
         assert dbdata[0] == (
             cont["sha1"],
             cont["sha1_git"],
             cont["sha256"],
             cont["blake2s256"],
             cont["length"],
             "absent",
             "Content too long",
         )
 
         assert dbdata[1] == (
             cont2["sha1"],
             cont2["sha1_git"],
             cont2["sha256"],
             cont2["blake2s256"],
             cont2["length"],
             "absent",
             "Content too long",
         )
 
     def test_clear_buffers(self, swh_storage):
         """Calling clear buffers on real storage does nothing
 
         """
         assert swh_storage.clear_buffers() is None
 
     def test_flush(self, swh_storage):
         """Calling clear buffers on real storage does nothing
 
         """
         assert swh_storage.flush() == {}