diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py
index 422ed19..8d902ce 100644
--- a/swh/indexer/storage/__init__.py
+++ b/swh/indexer/storage/__init__.py
@@ -1,697 +1,723 @@
 # Copyright (C) 2015-2020  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from collections import Counter
 from importlib import import_module
 import json
 from typing import Dict, Iterable, List, Optional, Tuple, Union
 import warnings
 
 import psycopg2
 import psycopg2.pool
 
 from swh.core.db.common import db_transaction
 from swh.indexer.storage.interface import IndexerStorageInterface
 from swh.model.hashutil import hash_to_bytes, hash_to_hex
 from swh.model.model import SHA1_SIZE
 from swh.storage.exc import StorageDBError
 from swh.storage.utils import get_partition_bounds_bytes
 
 from . import converters
 from .db import Db
 from .exc import DuplicateId, IndexerStorageArgumentException
 from .interface import PagedResult, Sha1
 from .metrics import process_metrics, send_metric, timed
 from .model import (
     ContentCtagsRow,
     ContentLanguageRow,
     ContentLicenseRow,
     ContentMetadataRow,
     ContentMimetypeRow,
     OriginIntrinsicMetadataRow,
     RevisionIntrinsicMetadataRow,
 )
+from .writer import JournalWriter
 
 INDEXER_CFG_KEY = "indexer_storage"
 
 
 MAPPING_NAMES = ["codemeta", "gemspec", "maven", "npm", "pkg-info"]
 
 
 SERVER_IMPLEMENTATIONS: Dict[str, str] = {
     "local": ".IndexerStorage",
     "remote": ".api.client.RemoteStorage",
     "memory": ".in_memory.IndexerStorage",
 }
 
 
 def get_indexer_storage(cls: str, **kwargs) -> IndexerStorageInterface:
     """Instantiate an indexer storage implementation of class `cls` with arguments
     `kwargs`.
 
     Args:
         cls: indexer storage class (local, remote or memory)
         kwargs: dictionary of arguments passed to the
             indexer storage class constructor
 
     Returns:
         an instance of swh.indexer.storage
 
     Raises:
         ValueError if passed an unknown storage class.
 
     """
     if "args" in kwargs:
         warnings.warn(
             'Explicit "args" key is deprecated, use keys directly instead.',
             DeprecationWarning,
         )
         kwargs = kwargs["args"]
 
     class_path = SERVER_IMPLEMENTATIONS.get(cls)
     if class_path is None:
         raise ValueError(
             f"Unknown indexer storage class `{cls}`. "
             f"Supported: {', '.join(SERVER_IMPLEMENTATIONS)}"
         )
 
     (module_path, class_name) = class_path.rsplit(".", 1)
     module = import_module(module_path if module_path else ".", package=__package__)
     BackendClass = getattr(module, class_name)
     check_config = kwargs.pop("check_config", {})
     idx_storage = BackendClass(**kwargs)
     if check_config:
         if not idx_storage.check_config(**check_config):
             raise EnvironmentError("Indexer storage check config failed")
     return idx_storage
 
 
 def check_id_duplicates(data):
     """
     If any two row models in `data` have the same unique key, raises
     a `ValueError`.
 
     Values associated to the key must be hashable.
 
     Args:
         data (List[dict]): List of dictionaries to be inserted
 
     >>> check_id_duplicates([
     ...     ContentLanguageRow(id=b'foo', indexer_configuration_id=42, lang="python"),
     ...     ContentLanguageRow(id=b'foo', indexer_configuration_id=32, lang="python"),
     ... ])
     >>> check_id_duplicates([
     ...     ContentLanguageRow(id=b'foo', indexer_configuration_id=42, lang="python"),
     ...     ContentLanguageRow(id=b'foo', indexer_configuration_id=42, lang="python"),
     ... ])
     Traceback (most recent call last):
       ...
     swh.indexer.storage.exc.DuplicateId: [{'id': b'foo', 'indexer_configuration_id': 42}]
     """  # noqa
     counter = Counter(tuple(sorted(item.unique_key().items())) for item in data)
     duplicates = [id_ for (id_, count) in counter.items() if count >= 2]
     if duplicates:
         raise DuplicateId(list(map(dict, duplicates)))
 
 
 class IndexerStorage:
     """SWH Indexer Storage
 
     """
 
-    def __init__(self, db, min_pool_conns=1, max_pool_conns=10):
+    def __init__(self, db, min_pool_conns=1, max_pool_conns=10, journal_writer=None):
         """
         Args:
-            db_conn: either a libpq connection string, or a psycopg2 connection
+            db: either a libpq connection string, or a psycopg2 connection
+            journal_writer: configuration passed to
+                            `swh.journal.writer.get_journal_writer`
 
         """
+        self.journal_writer = JournalWriter(self._tool_get_from_id, journal_writer)
         try:
             if isinstance(db, psycopg2.extensions.connection):
                 self._pool = None
                 self._db = Db(db)
             else:
                 self._pool = psycopg2.pool.ThreadedConnectionPool(
                     min_pool_conns, max_pool_conns, db
                 )
                 self._db = None
         except psycopg2.OperationalError as e:
             raise StorageDBError(e)
 
     def get_db(self):
         if self._db:
             return self._db
         return Db.from_pool(self._pool)
 
     def put_db(self, db):
         if db is not self._db:
             db.put_conn()
 
     @timed
     @db_transaction()
     def check_config(self, *, check_write, db=None, cur=None):
         # Check permissions on one of the tables
         if check_write:
             check = "INSERT"
         else:
             check = "SELECT"
 
         cur.execute(
             "select has_table_privilege(current_user, 'content_mimetype', %s)",  # noqa
             (check,),
         )
         return cur.fetchone()[0]
 
     @timed
     @db_transaction()
     def content_mimetype_missing(
         self, mimetypes: Iterable[Dict], db=None, cur=None
     ) -> List[Tuple[Sha1, int]]:
         return [obj[0] for obj in db.content_mimetype_missing_from_list(mimetypes, cur)]
 
     @timed
     @db_transaction()
     def get_partition(
         self,
         indexer_type: str,
         indexer_configuration_id: int,
         partition_id: int,
         nb_partitions: int,
         page_token: Optional[str] = None,
         limit: int = 1000,
         with_textual_data=False,
         db=None,
         cur=None,
     ) -> PagedResult[Sha1]:
         """Retrieve ids of content with `indexer_type` within within partition partition_id
         bound by limit.
 
         Args:
             **indexer_type**: Type of data content to index (mimetype, language, etc...)
             **indexer_configuration_id**: The tool used to index data
             **partition_id**: index of the partition to fetch
             **nb_partitions**: total number of partitions to split into
             **page_token**: opaque token used for pagination
             **limit**: Limit result (default to 1000)
             **with_textual_data** (bool): Deal with only textual content (True) or all
                 content (all contents by defaults, False)
 
         Raises:
             IndexerStorageArgumentException for;
             - limit to None
             - wrong indexer_type provided
 
         Returns:
             PagedResult of Sha1. If next_page_token is None, there is no more data to
             fetch
 
         """
         if limit is None:
             raise IndexerStorageArgumentException("limit should not be None")
         if indexer_type not in db.content_indexer_names:
             err = f"Wrong type. Should be one of [{','.join(db.content_indexer_names)}]"
             raise IndexerStorageArgumentException(err)
 
         start, end = get_partition_bounds_bytes(partition_id, nb_partitions, SHA1_SIZE)
         if page_token is not None:
             start = hash_to_bytes(page_token)
         if end is None:
             end = b"\xff" * SHA1_SIZE
 
         next_page_token: Optional[str] = None
         ids = [
             row[0]
             for row in db.content_get_range(
                 indexer_type,
                 start,
                 end,
                 indexer_configuration_id,
                 limit=limit + 1,
                 with_textual_data=with_textual_data,
                 cur=cur,
             )
         ]
 
         if len(ids) >= limit:
             next_page_token = hash_to_hex(ids[-1])
             ids = ids[:limit]
 
         assert len(ids) <= limit
         return PagedResult(results=ids, next_page_token=next_page_token)
 
     @timed
     @db_transaction()
     def content_mimetype_get_partition(
         self,
         indexer_configuration_id: int,
         partition_id: int,
         nb_partitions: int,
         page_token: Optional[str] = None,
         limit: int = 1000,
         db=None,
         cur=None,
     ) -> PagedResult[Sha1]:
         return self.get_partition(
             "mimetype",
             indexer_configuration_id,
             partition_id,
             nb_partitions,
             page_token=page_token,
             limit=limit,
             db=db,
             cur=cur,
         )
 
     @timed
     @process_metrics
     @db_transaction()
     def content_mimetype_add(
         self, mimetypes: List[ContentMimetypeRow], db=None, cur=None,
     ) -> Dict[str, int]:
         check_id_duplicates(mimetypes)
         mimetypes.sort(key=lambda m: m.id)
+        self.journal_writer.write_additions("content_mimetype", mimetypes)
         db.mktemp_content_mimetype(cur)
         db.copy_to(
             [m.to_dict() for m in mimetypes],
             "tmp_content_mimetype",
             ["id", "mimetype", "encoding", "indexer_configuration_id"],
             cur,
         )
         count = db.content_mimetype_add_from_temp(cur)
         return {"content_mimetype:add": count}
 
     @timed
     @db_transaction()
     def content_mimetype_get(
         self, ids: Iterable[Sha1], db=None, cur=None
     ) -> List[ContentMimetypeRow]:
         return [
             ContentMimetypeRow.from_dict(
                 converters.db_to_mimetype(dict(zip(db.content_mimetype_cols, c)))
             )
             for c in db.content_mimetype_get_from_list(ids, cur)
         ]
 
     @timed
     @db_transaction()
     def content_language_missing(
         self, languages: Iterable[Dict], db=None, cur=None
     ) -> List[Tuple[Sha1, int]]:
         return [obj[0] for obj in db.content_language_missing_from_list(languages, cur)]
 
     @timed
     @db_transaction()
     def content_language_get(
         self, ids: Iterable[Sha1], db=None, cur=None
     ) -> List[ContentLanguageRow]:
         return [
             ContentLanguageRow.from_dict(
                 converters.db_to_language(dict(zip(db.content_language_cols, c)))
             )
             for c in db.content_language_get_from_list(ids, cur)
         ]
 
     @timed
     @process_metrics
     @db_transaction()
     def content_language_add(
         self, languages: List[ContentLanguageRow], db=None, cur=None,
     ) -> Dict[str, int]:
         check_id_duplicates(languages)
         languages.sort(key=lambda m: m.id)
+        self.journal_writer.write_additions("content_language", languages)
         db.mktemp_content_language(cur)
         # empty language is mapped to 'unknown'
         db.copy_to(
             (
                 {
                     "id": lang.id,
                     "lang": lang.lang or "unknown",
                     "indexer_configuration_id": lang.indexer_configuration_id,
                 }
                 for lang in languages
             ),
             "tmp_content_language",
             ["id", "lang", "indexer_configuration_id"],
             cur,
         )
 
         count = db.content_language_add_from_temp(cur)
         return {"content_language:add": count}
 
     @timed
     @db_transaction()
     def content_ctags_missing(
         self, ctags: Iterable[Dict], db=None, cur=None
     ) -> List[Tuple[Sha1, int]]:
         return [obj[0] for obj in db.content_ctags_missing_from_list(ctags, cur)]
 
     @timed
     @db_transaction()
     def content_ctags_get(
         self, ids: Iterable[Sha1], db=None, cur=None
     ) -> List[ContentCtagsRow]:
         return [
             ContentCtagsRow.from_dict(
                 converters.db_to_ctags(dict(zip(db.content_ctags_cols, c)))
             )
             for c in db.content_ctags_get_from_list(ids, cur)
         ]
 
     @timed
     @process_metrics
     @db_transaction()
     def content_ctags_add(
         self, ctags: List[ContentCtagsRow], db=None, cur=None,
     ) -> Dict[str, int]:
         check_id_duplicates(ctags)
         ctags.sort(key=lambda m: m.id)
+        self.journal_writer.write_additions("content_ctags", ctags)
 
         db.mktemp_content_ctags(cur)
         db.copy_to(
             [ctag.to_dict() for ctag in ctags],
             tblname="tmp_content_ctags",
             columns=["id", "name", "kind", "line", "lang", "indexer_configuration_id"],
             cur=cur,
         )
 
         count = db.content_ctags_add_from_temp(cur)
         return {"content_ctags:add": count}
 
     @timed
     @db_transaction()
     def content_ctags_search(
         self,
         expression: str,
         limit: int = 10,
         last_sha1: Optional[Sha1] = None,
         db=None,
         cur=None,
     ) -> List[ContentCtagsRow]:
         return [
             ContentCtagsRow.from_dict(
                 converters.db_to_ctags(dict(zip(db.content_ctags_cols, obj)))
             )
             for obj in db.content_ctags_search(expression, last_sha1, limit, cur=cur)
         ]
 
     @timed
     @db_transaction()
     def content_fossology_license_get(
         self, ids: Iterable[Sha1], db=None, cur=None
     ) -> List[ContentLicenseRow]:
         return [
             ContentLicenseRow.from_dict(
                 converters.db_to_fossology_license(
                     dict(zip(db.content_fossology_license_cols, c))
                 )
             )
             for c in db.content_fossology_license_get_from_list(ids, cur)
         ]
 
     @timed
     @process_metrics
     @db_transaction()
     def content_fossology_license_add(
         self, licenses: List[ContentLicenseRow], db=None, cur=None,
     ) -> Dict[str, int]:
         check_id_duplicates(licenses)
         licenses.sort(key=lambda m: m.id)
+        self.journal_writer.write_additions("content_fossology_license", licenses)
         db.mktemp_content_fossology_license(cur)
         db.copy_to(
             [license.to_dict() for license in licenses],
             tblname="tmp_content_fossology_license",
             columns=["id", "license", "indexer_configuration_id"],
             cur=cur,
         )
         count = db.content_fossology_license_add_from_temp(cur)
         return {"content_fossology_license:add": count}
 
     @timed
     @db_transaction()
     def content_fossology_license_get_partition(
         self,
         indexer_configuration_id: int,
         partition_id: int,
         nb_partitions: int,
         page_token: Optional[str] = None,
         limit: int = 1000,
         db=None,
         cur=None,
     ) -> PagedResult[Sha1]:
         return self.get_partition(
             "fossology_license",
             indexer_configuration_id,
             partition_id,
             nb_partitions,
             page_token=page_token,
             limit=limit,
             with_textual_data=True,
             db=db,
             cur=cur,
         )
 
     @timed
     @db_transaction()
     def content_metadata_missing(
         self, metadata: Iterable[Dict], db=None, cur=None
     ) -> List[Tuple[Sha1, int]]:
         return [obj[0] for obj in db.content_metadata_missing_from_list(metadata, cur)]
 
     @timed
     @db_transaction()
     def content_metadata_get(
         self, ids: Iterable[Sha1], db=None, cur=None
     ) -> List[ContentMetadataRow]:
         return [
             ContentMetadataRow.from_dict(
                 converters.db_to_metadata(dict(zip(db.content_metadata_cols, c)))
             )
             for c in db.content_metadata_get_from_list(ids, cur)
         ]
 
     @timed
     @process_metrics
     @db_transaction()
     def content_metadata_add(
         self, metadata: List[ContentMetadataRow], db=None, cur=None,
     ) -> Dict[str, int]:
         check_id_duplicates(metadata)
         metadata.sort(key=lambda m: m.id)
+        self.journal_writer.write_additions("content_metadata", metadata)
 
         db.mktemp_content_metadata(cur)
 
         db.copy_to(
             [m.to_dict() for m in metadata],
             "tmp_content_metadata",
             ["id", "metadata", "indexer_configuration_id"],
             cur,
         )
         count = db.content_metadata_add_from_temp(cur)
         return {
             "content_metadata:add": count,
         }
 
     @timed
     @db_transaction()
     def revision_intrinsic_metadata_missing(
         self, metadata: Iterable[Dict], db=None, cur=None
     ) -> List[Tuple[Sha1, int]]:
         return [
             obj[0]
             for obj in db.revision_intrinsic_metadata_missing_from_list(metadata, cur)
         ]
 
     @timed
     @db_transaction()
     def revision_intrinsic_metadata_get(
         self, ids: Iterable[Sha1], db=None, cur=None
     ) -> List[RevisionIntrinsicMetadataRow]:
         return [
             RevisionIntrinsicMetadataRow.from_dict(
                 converters.db_to_metadata(
                     dict(zip(db.revision_intrinsic_metadata_cols, c))
                 )
             )
             for c in db.revision_intrinsic_metadata_get_from_list(ids, cur)
         ]
 
     @timed
     @process_metrics
     @db_transaction()
     def revision_intrinsic_metadata_add(
         self, metadata: List[RevisionIntrinsicMetadataRow], db=None, cur=None,
     ) -> Dict[str, int]:
         check_id_duplicates(metadata)
         metadata.sort(key=lambda m: m.id)
+        self.journal_writer.write_additions("revision_intrinsic_metadata", metadata)
 
         db.mktemp_revision_intrinsic_metadata(cur)
 
         db.copy_to(
             [m.to_dict() for m in metadata],
             "tmp_revision_intrinsic_metadata",
             ["id", "metadata", "mappings", "indexer_configuration_id"],
             cur,
         )
         count = db.revision_intrinsic_metadata_add_from_temp(cur)
         return {
             "revision_intrinsic_metadata:add": count,
         }
 
     @timed
     @db_transaction()
     def origin_intrinsic_metadata_get(
         self, urls: Iterable[str], db=None, cur=None
     ) -> List[OriginIntrinsicMetadataRow]:
         return [
             OriginIntrinsicMetadataRow.from_dict(
                 converters.db_to_metadata(
                     dict(zip(db.origin_intrinsic_metadata_cols, c))
                 )
             )
             for c in db.origin_intrinsic_metadata_get_from_list(urls, cur)
         ]
 
     @timed
     @process_metrics
     @db_transaction()
     def origin_intrinsic_metadata_add(
         self, metadata: List[OriginIntrinsicMetadataRow], db=None, cur=None,
     ) -> Dict[str, int]:
         check_id_duplicates(metadata)
         metadata.sort(key=lambda m: m.id)
+        self.journal_writer.write_additions("origin_intrinsic_metadata", metadata)
 
         db.mktemp_origin_intrinsic_metadata(cur)
 
         db.copy_to(
             [m.to_dict() for m in metadata],
             "tmp_origin_intrinsic_metadata",
             ["id", "metadata", "indexer_configuration_id", "from_revision", "mappings"],
             cur,
         )
         count = db.origin_intrinsic_metadata_add_from_temp(cur)
         return {
             "origin_intrinsic_metadata:add": count,
         }
 
     @timed
     @db_transaction()
     def origin_intrinsic_metadata_search_fulltext(
         self, conjunction: List[str], limit: int = 100, db=None, cur=None
     ) -> List[OriginIntrinsicMetadataRow]:
         return [
             OriginIntrinsicMetadataRow.from_dict(
                 converters.db_to_metadata(
                     dict(zip(db.origin_intrinsic_metadata_cols, c))
                 )
             )
             for c in db.origin_intrinsic_metadata_search_fulltext(
                 conjunction, limit=limit, cur=cur
             )
         ]
 
     @timed
     @db_transaction()
     def origin_intrinsic_metadata_search_by_producer(
         self,
         page_token: str = "",
         limit: int = 100,
         ids_only: bool = False,
         mappings: Optional[List[str]] = None,
         tool_ids: Optional[List[int]] = None,
         db=None,
         cur=None,
     ) -> PagedResult[Union[str, OriginIntrinsicMetadataRow]]:
         assert isinstance(page_token, str)
         # we go to limit+1 to check whether we should add next_page_token in
         # the response
         rows = db.origin_intrinsic_metadata_search_by_producer(
             page_token, limit + 1, ids_only, mappings, tool_ids, cur
         )
         next_page_token = None
         if ids_only:
             results = [origin for (origin,) in rows]
             if len(results) > limit:
                 results[limit:] = []
                 next_page_token = results[-1]
         else:
             results = [
                 OriginIntrinsicMetadataRow.from_dict(
                     converters.db_to_metadata(
                         dict(zip(db.origin_intrinsic_metadata_cols, row))
                     )
                 )
                 for row in rows
             ]
             if len(results) > limit:
                 results[limit:] = []
                 next_page_token = results[-1].id
 
         return PagedResult(results=results, next_page_token=next_page_token,)
 
     @timed
     @db_transaction()
     def origin_intrinsic_metadata_stats(self, db=None, cur=None):
         mapping_names = [m for m in MAPPING_NAMES]
         select_parts = []
 
         # Count rows for each mapping
         for mapping_name in mapping_names:
             select_parts.append(
                 (
                     "sum(case when (mappings @> ARRAY['%s']) "
                     "         then 1 else 0 end)"
                 )
                 % mapping_name
             )
 
         # Total
         select_parts.append("sum(1)")
 
         # Rows whose metadata has at least one key that is not '@context'
         select_parts.append(
             "sum(case when ('{}'::jsonb @> (metadata - '@context')) "
             "         then 0 else 1 end)"
         )
         cur.execute(
             "select " + ", ".join(select_parts) + " from origin_intrinsic_metadata"
         )
         results = dict(zip(mapping_names + ["total", "non_empty"], cur.fetchone()))
         return {
             "total": results.pop("total"),
             "non_empty": results.pop("non_empty"),
             "per_mapping": results,
         }
 
     @timed
     @db_transaction()
     def indexer_configuration_add(self, tools, db=None, cur=None):
         db.mktemp_indexer_configuration(cur)
         db.copy_to(
             tools,
             "tmp_indexer_configuration",
             ["tool_name", "tool_version", "tool_configuration"],
             cur,
         )
 
         tools = db.indexer_configuration_add_from_temp(cur)
         results = [dict(zip(db.indexer_configuration_cols, line)) for line in tools]
         send_metric(
             "indexer_configuration:add",
             len(results),
             method_name="indexer_configuration_add",
         )
         return results
 
     @timed
     @db_transaction()
     def indexer_configuration_get(self, tool, db=None, cur=None):
         tool_conf = tool["tool_configuration"]
         if isinstance(tool_conf, dict):
             tool_conf = json.dumps(tool_conf)
         idx = db.indexer_configuration_get(
             tool["tool_name"], tool["tool_version"], tool_conf
         )
         if not idx:
             return None
         return dict(zip(db.indexer_configuration_cols, idx))
+
+    @db_transaction()
+    def _tool_get_from_id(self, id_, db, cur):
+        tool = dict(
+            zip(
+                db.indexer_configuration_cols,
+                db.indexer_configuration_get_from_id(id_, cur),
+            )
+        )
+        return {
+            "id": tool["id"],
+            "name": tool["tool_name"],
+            "version": tool["tool_version"],
+            "configuration": tool["tool_configuration"],
+        }
diff --git a/swh/indexer/storage/db.py b/swh/indexer/storage/db.py
index b2cb63b..ec2e084 100644
--- a/swh/indexer/storage/db.py
+++ b/swh/indexer/storage/db.py
@@ -1,538 +1,550 @@
 # Copyright (C) 2015-2018  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from typing import Dict, Iterable, Iterator, List
 
 from swh.core.db import BaseDb
 from swh.core.db.db_utils import execute_values_generator, stored_procedure
 from swh.model import hashutil
 
 from .interface import Sha1
 
 
 class Db(BaseDb):
     """Proxy to the SWH Indexer DB, with wrappers around stored procedures
 
     """
 
     content_mimetype_hash_keys = ["id", "indexer_configuration_id"]
 
     def _missing_from_list(
         self, table: str, data: Iterable[Dict], hash_keys: List[str], cur=None
     ):
         """Read from table the data with hash_keys that are missing.
 
         Args:
             table: Table name (e.g content_mimetype, content_language,
               etc...)
             data: Dict of data to read from
             hash_keys: List of keys to read in the data dict.
 
         Yields:
             The data which is missing from the db.
 
         """
         cur = self._cursor(cur)
         keys = ", ".join(hash_keys)
         equality = " AND ".join(("t.%s = c.%s" % (key, key)) for key in hash_keys)
         yield from execute_values_generator(
             cur,
             """
             select %s from (values %%s) as t(%s)
             where not exists (
                 select 1 from %s c
                 where %s
             )
             """
             % (keys, keys, table, equality),
             (tuple(m[k] for k in hash_keys) for m in data),
         )
 
     def content_mimetype_missing_from_list(
         self, mimetypes: Iterable[Dict], cur=None
     ) -> Iterator[Sha1]:
         """List missing mimetypes.
 
         """
         yield from self._missing_from_list(
             "content_mimetype", mimetypes, self.content_mimetype_hash_keys, cur=cur
         )
 
     content_mimetype_cols = [
         "id",
         "mimetype",
         "encoding",
         "tool_id",
         "tool_name",
         "tool_version",
         "tool_configuration",
     ]
 
     @stored_procedure("swh_mktemp_content_mimetype")
     def mktemp_content_mimetype(self, cur=None):
         pass
 
     def content_mimetype_add_from_temp(self, cur=None):
         cur = self._cursor(cur)
         cur.execute("select * from swh_content_mimetype_add()")
         return cur.fetchone()[0]
 
     def _convert_key(self, key, main_table="c"):
         """Convert keys according to specific use in the module.
 
         Args:
             key (str): Key expression to change according to the alias
               used in the query
             main_table (str): Alias to use for the main table. Default
               to c for content_{something}.
 
         Expected:
             Tables content_{something} being aliased as 'c' (something
             in {language, mimetype, ...}), table indexer_configuration
             being aliased as 'i'.
 
         """
         if key == "id":
             return "%s.id" % main_table
         elif key == "tool_id":
             return "i.id as tool_id"
         elif key == "license":
             return (
                 """
                 (
                     select name
                     from fossology_license
                     where id = %s.license_id
                 )
                 as licenses"""
                 % main_table
             )
         return key
 
     def _get_from_list(self, table, ids, cols, cur=None, id_col="id"):
         """Fetches entries from the `table` such that their `id` field
         (or whatever is given to `id_col`) is in `ids`.
         Returns the columns `cols`.
         The `cur` parameter is used to connect to the database.
         """
         cur = self._cursor(cur)
         keys = map(self._convert_key, cols)
         query = """
             select {keys}
             from (values %s) as t(id)
             inner join {table} c
                 on c.{id_col}=t.id
             inner join indexer_configuration i
                 on c.indexer_configuration_id=i.id;
             """.format(
             keys=", ".join(keys), id_col=id_col, table=table
         )
         yield from execute_values_generator(cur, query, ((_id,) for _id in ids))
 
     content_indexer_names = {
         "mimetype": "content_mimetype",
         "fossology_license": "content_fossology_license",
     }
 
     def content_get_range(
         self,
         content_type,
         start,
         end,
         indexer_configuration_id,
         limit=1000,
         with_textual_data=False,
         cur=None,
     ):
         """Retrieve contents with content_type, within range [start, end]
            bound by limit and associated to the given indexer
            configuration id.
 
            When asking to work on textual content, that filters on the
            mimetype table with any mimetype that is not binary.
 
         """
         cur = self._cursor(cur)
         table = self.content_indexer_names[content_type]
         if with_textual_data:
             extra = """inner join content_mimetype cm
                          on (t.id=cm.id and cm.mimetype like 'text/%%' and
                              %(start)s <= cm.id and cm.id <= %(end)s)
                     """
         else:
             extra = ""
         query = f"""select t.id
                     from {table} t
                     {extra}
                     where t.indexer_configuration_id=%(tool_id)s
                           and %(start)s <= t.id and t.id <= %(end)s
                     order by t.indexer_configuration_id, t.id
                     limit %(limit)s"""
         cur.execute(
             query,
             {
                 "start": start,
                 "end": end,
                 "tool_id": indexer_configuration_id,
                 "limit": limit,
             },
         )
         yield from cur
 
     def content_mimetype_get_from_list(self, ids, cur=None):
         yield from self._get_from_list(
             "content_mimetype", ids, self.content_mimetype_cols, cur=cur
         )
 
     content_language_hash_keys = ["id", "indexer_configuration_id"]
 
     def content_language_missing_from_list(self, languages, cur=None):
         """List missing languages.
 
         """
         yield from self._missing_from_list(
             "content_language", languages, self.content_language_hash_keys, cur=cur
         )
 
     content_language_cols = [
         "id",
         "lang",
         "tool_id",
         "tool_name",
         "tool_version",
         "tool_configuration",
     ]
 
     @stored_procedure("swh_mktemp_content_language")
     def mktemp_content_language(self, cur=None):
         pass
 
     def content_language_add_from_temp(self, cur=None):
         cur = self._cursor(cur)
         cur.execute("select * from swh_content_language_add()")
         return cur.fetchone()[0]
 
     def content_language_get_from_list(self, ids, cur=None):
         yield from self._get_from_list(
             "content_language", ids, self.content_language_cols, cur=cur
         )
 
     content_ctags_hash_keys = ["id", "indexer_configuration_id"]
 
     def content_ctags_missing_from_list(self, ctags, cur=None):
         """List missing ctags.
 
         """
         yield from self._missing_from_list(
             "content_ctags", ctags, self.content_ctags_hash_keys, cur=cur
         )
 
     content_ctags_cols = [
         "id",
         "name",
         "kind",
         "line",
         "lang",
         "tool_id",
         "tool_name",
         "tool_version",
         "tool_configuration",
     ]
 
     @stored_procedure("swh_mktemp_content_ctags")
     def mktemp_content_ctags(self, cur=None):
         pass
 
     def content_ctags_add_from_temp(self, cur=None):
         cur = self._cursor(cur)
         cur.execute("select * from swh_content_ctags_add()")
         return cur.fetchone()[0]
 
     def content_ctags_get_from_list(self, ids, cur=None):
         cur = self._cursor(cur)
         keys = map(self._convert_key, self.content_ctags_cols)
         yield from execute_values_generator(
             cur,
             """
             select %s
             from (values %%s) as t(id)
             inner join content_ctags c
                 on c.id=t.id
             inner join indexer_configuration i
                 on c.indexer_configuration_id=i.id
             order by line
             """
             % ", ".join(keys),
             ((_id,) for _id in ids),
         )
 
     def content_ctags_search(self, expression, last_sha1, limit, cur=None):
         cur = self._cursor(cur)
         if not last_sha1:
             query = """SELECT %s
                        FROM swh_content_ctags_search(%%s, %%s)""" % (
                 ",".join(self.content_ctags_cols)
             )
             cur.execute(query, (expression, limit))
         else:
             if last_sha1 and isinstance(last_sha1, bytes):
                 last_sha1 = "\\x%s" % hashutil.hash_to_hex(last_sha1)
             elif last_sha1:
                 last_sha1 = "\\x%s" % last_sha1
 
             query = """SELECT %s
                        FROM swh_content_ctags_search(%%s, %%s, %%s)""" % (
                 ",".join(self.content_ctags_cols)
             )
             cur.execute(query, (expression, limit, last_sha1))
 
         yield from cur
 
     content_fossology_license_cols = [
         "id",
         "tool_id",
         "tool_name",
         "tool_version",
         "tool_configuration",
         "license",
     ]
 
     @stored_procedure("swh_mktemp_content_fossology_license")
     def mktemp_content_fossology_license(self, cur=None):
         pass
 
     def content_fossology_license_add_from_temp(self, cur=None):
         """Add new licenses per content.
 
         """
         cur = self._cursor(cur)
         cur.execute("select * from swh_content_fossology_license_add()")
         return cur.fetchone()[0]
 
     def content_fossology_license_get_from_list(self, ids, cur=None):
         """Retrieve licenses per id.
 
         """
         cur = self._cursor(cur)
         keys = map(self._convert_key, self.content_fossology_license_cols)
         yield from execute_values_generator(
             cur,
             """
             select %s
             from (values %%s) as t(id)
             inner join content_fossology_license c on t.id=c.id
             inner join indexer_configuration i
                 on i.id=c.indexer_configuration_id
             """
             % ", ".join(keys),
             ((_id,) for _id in ids),
         )
 
     content_metadata_hash_keys = ["id", "indexer_configuration_id"]
 
     def content_metadata_missing_from_list(self, metadata, cur=None):
         """List missing metadata.
 
         """
         yield from self._missing_from_list(
             "content_metadata", metadata, self.content_metadata_hash_keys, cur=cur
         )
 
     content_metadata_cols = [
         "id",
         "metadata",
         "tool_id",
         "tool_name",
         "tool_version",
         "tool_configuration",
     ]
 
     @stored_procedure("swh_mktemp_content_metadata")
     def mktemp_content_metadata(self, cur=None):
         pass
 
     def content_metadata_add_from_temp(self, cur=None):
         cur = self._cursor(cur)
         cur.execute("select * from swh_content_metadata_add()")
         return cur.fetchone()[0]
 
     def content_metadata_get_from_list(self, ids, cur=None):
         yield from self._get_from_list(
             "content_metadata", ids, self.content_metadata_cols, cur=cur
         )
 
     revision_intrinsic_metadata_hash_keys = ["id", "indexer_configuration_id"]
 
     def revision_intrinsic_metadata_missing_from_list(self, metadata, cur=None):
         """List missing metadata.
 
         """
         yield from self._missing_from_list(
             "revision_intrinsic_metadata",
             metadata,
             self.revision_intrinsic_metadata_hash_keys,
             cur=cur,
         )
 
     revision_intrinsic_metadata_cols = [
         "id",
         "metadata",
         "mappings",
         "tool_id",
         "tool_name",
         "tool_version",
         "tool_configuration",
     ]
 
     @stored_procedure("swh_mktemp_revision_intrinsic_metadata")
     def mktemp_revision_intrinsic_metadata(self, cur=None):
         pass
 
     def revision_intrinsic_metadata_add_from_temp(self, cur=None):
         cur = self._cursor(cur)
         cur.execute("select * from swh_revision_intrinsic_metadata_add()")
         return cur.fetchone()[0]
 
     def revision_intrinsic_metadata_get_from_list(self, ids, cur=None):
         yield from self._get_from_list(
             "revision_intrinsic_metadata",
             ids,
             self.revision_intrinsic_metadata_cols,
             cur=cur,
         )
 
     origin_intrinsic_metadata_cols = [
         "id",
         "metadata",
         "from_revision",
         "mappings",
         "tool_id",
         "tool_name",
         "tool_version",
         "tool_configuration",
     ]
 
     origin_intrinsic_metadata_regconfig = "pg_catalog.simple"
     """The dictionary used to normalize 'metadata' and queries.
     'pg_catalog.simple' provides no stopword, so it should be suitable
     for proper names and non-English content.
     When updating this value, make sure to add a new index on
     origin_intrinsic_metadata.metadata."""
 
     @stored_procedure("swh_mktemp_origin_intrinsic_metadata")
     def mktemp_origin_intrinsic_metadata(self, cur=None):
         pass
 
     def origin_intrinsic_metadata_add_from_temp(self, cur=None):
         cur = self._cursor(cur)
         cur.execute("select * from swh_origin_intrinsic_metadata_add()")
         return cur.fetchone()[0]
 
     def origin_intrinsic_metadata_get_from_list(self, ids, cur=None):
         yield from self._get_from_list(
             "origin_intrinsic_metadata",
             ids,
             self.origin_intrinsic_metadata_cols,
             cur=cur,
             id_col="id",
         )
 
     def origin_intrinsic_metadata_search_fulltext(self, terms, *, limit, cur):
         regconfig = self.origin_intrinsic_metadata_regconfig
         tsquery_template = " && ".join(
             "plainto_tsquery('%s', %%s)" % regconfig for _ in terms
         )
         tsquery_args = [(term,) for term in terms]
         keys = (
             self._convert_key(col, "oim") for col in self.origin_intrinsic_metadata_cols
         )
 
         query = (
             "SELECT {keys} FROM origin_intrinsic_metadata AS oim "
             "INNER JOIN indexer_configuration AS i "
             "ON oim.indexer_configuration_id=i.id "
             "JOIN LATERAL (SELECT {tsquery_template}) AS s(tsq) ON true "
             "WHERE oim.metadata_tsvector @@ tsq "
             "ORDER BY ts_rank(oim.metadata_tsvector, tsq, 1) DESC "
             "LIMIT %s;"
         ).format(keys=", ".join(keys), tsquery_template=tsquery_template)
         cur.execute(query, tsquery_args + [limit])
         yield from cur
 
     def origin_intrinsic_metadata_search_by_producer(
         self, last, limit, ids_only, mappings, tool_ids, cur
     ):
         if ids_only:
             keys = "oim.id"
         else:
             keys = ", ".join(
                 (
                     self._convert_key(col, "oim")
                     for col in self.origin_intrinsic_metadata_cols
                 )
             )
         query_parts = [
             "SELECT %s" % keys,
             "FROM origin_intrinsic_metadata AS oim",
             "INNER JOIN indexer_configuration AS i",
             "ON oim.indexer_configuration_id=i.id",
         ]
         args = []
 
         where = []
         if last:
             where.append("oim.id > %s")
             args.append(last)
         if mappings is not None:
             where.append("oim.mappings && %s")
             args.append(mappings)
         if tool_ids is not None:
             where.append("oim.indexer_configuration_id = ANY(%s)")
             args.append(tool_ids)
         if where:
             query_parts.append("WHERE")
             query_parts.append(" AND ".join(where))
 
         if limit:
             query_parts.append("LIMIT %s")
             args.append(limit)
 
         cur.execute(" ".join(query_parts), args)
         yield from cur
 
     indexer_configuration_cols = [
         "id",
         "tool_name",
         "tool_version",
         "tool_configuration",
     ]
 
     @stored_procedure("swh_mktemp_indexer_configuration")
     def mktemp_indexer_configuration(self, cur=None):
         pass
 
     def indexer_configuration_add_from_temp(self, cur=None):
         cur = self._cursor(cur)
         cur.execute(
             "SELECT %s from swh_indexer_configuration_add()"
             % (",".join(self.indexer_configuration_cols),)
         )
         yield from cur
 
     def indexer_configuration_get(
         self, tool_name, tool_version, tool_configuration, cur=None
     ):
         cur = self._cursor(cur)
         cur.execute(
             """select %s
                        from indexer_configuration
                        where tool_name=%%s and
                              tool_version=%%s and
                              tool_configuration=%%s"""
             % (",".join(self.indexer_configuration_cols)),
             (tool_name, tool_version, tool_configuration),
         )
 
         return cur.fetchone()
+
+    def indexer_configuration_get_from_id(self, id_, cur=None):
+        cur = self._cursor(cur)
+        cur.execute(
+            """select %s
+                       from indexer_configuration
+                       where id=%%s"""
+            % (",".join(self.indexer_configuration_cols)),
+            (id_,),
+        )
+
+        return cur.fetchone()
diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py
index 792535d..0071ebe 100644
--- a/swh/indexer/storage/in_memory.py
+++ b/swh/indexer/storage/in_memory.py
@@ -1,486 +1,501 @@
 # Copyright (C) 2018-2020  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from collections import Counter, defaultdict
 import itertools
 import json
 import math
 import operator
 import re
 from typing import (
     Any,
     Dict,
     Generic,
     Iterable,
     List,
     Optional,
     Set,
     Tuple,
     Type,
     TypeVar,
     Union,
 )
 
 from swh.core.collections import SortedList
 from swh.model.hashutil import hash_to_bytes, hash_to_hex
 from swh.model.model import SHA1_SIZE, Sha1Git
 from swh.storage.utils import get_partition_bounds_bytes
 
 from . import MAPPING_NAMES, check_id_duplicates
 from .exc import IndexerStorageArgumentException
 from .interface import PagedResult, Sha1
 from .model import (
     BaseRow,
     ContentCtagsRow,
     ContentLanguageRow,
     ContentLicenseRow,
     ContentMetadataRow,
     ContentMimetypeRow,
     OriginIntrinsicMetadataRow,
     RevisionIntrinsicMetadataRow,
 )
+from .writer import JournalWriter
 
 SHA1_DIGEST_SIZE = 160
 
+ToolId = int
+
 
 def _transform_tool(tool):
     return {
         "id": tool["id"],
         "name": tool["tool_name"],
         "version": tool["tool_version"],
         "configuration": tool["tool_configuration"],
     }
 
 
 def check_id_types(data: List[Dict[str, Any]]):
     """Checks all elements of the list have an 'id' whose type is 'bytes'."""
     if not all(isinstance(item.get("id"), bytes) for item in data):
         raise IndexerStorageArgumentException("identifiers must be bytes.")
 
 
 def _key_from_dict(d):
     return tuple(sorted(d.items()))
 
 
-ToolId = int
 TValue = TypeVar("TValue", bound=BaseRow)
 
 
 class SubStorage(Generic[TValue]):
     """Implements common missing/get/add logic for each indexer type."""
 
     _data: Dict[Sha1, Dict[Tuple, Dict[str, Any]]]
     _tools_per_id: Dict[Sha1, Set[ToolId]]
 
-    def __init__(self, row_class: Type[TValue], tools):
+    def __init__(self, row_class: Type[TValue], tools, journal_writer):
         self.row_class = row_class
         self._tools = tools
         self._sorted_ids = SortedList[bytes, Sha1]()
         self._data = defaultdict(dict)
+        self._journal_writer = journal_writer
         self._tools_per_id = defaultdict(set)
 
     def _key_from_dict(self, d) -> Tuple:
         """Like the global _key_from_dict, but filters out dict keys that don't
         belong in the unique key."""
         return _key_from_dict({k: d[k] for k in self.row_class.UNIQUE_KEY_FIELDS})
 
     def missing(self, keys: Iterable[Dict]) -> List[Sha1]:
         """List data missing from storage.
 
         Args:
             data (iterable): dictionaries with keys:
 
                 - **id** (bytes): sha1 identifier
                 - **indexer_configuration_id** (int): tool used to compute
                   the results
 
         Yields:
             missing sha1s
 
         """
         results = []
         for key in keys:
             tool_id = key["indexer_configuration_id"]
             id_ = key["id"]
             if tool_id not in self._tools_per_id.get(id_, set()):
                 results.append(id_)
         return results
 
     def get(self, ids: Iterable[Sha1]) -> List[TValue]:
         """Retrieve data per id.
 
         Args:
             ids (iterable): sha1 checksums
 
         Yields:
             dict: dictionaries with the following keys:
 
               - **id** (bytes)
               - **tool** (dict): tool used to compute metadata
               - arbitrary data (as provided to `add`)
 
         """
         results = []
         for id_ in ids:
             for entry in self._data[id_].values():
                 entry = entry.copy()
                 tool_id = entry.pop("indexer_configuration_id")
                 results.append(
                     self.row_class(
                         id=id_, tool=_transform_tool(self._tools[tool_id]), **entry,
                     )
                 )
         return results
 
     def get_all(self) -> List[TValue]:
         return self.get(self._sorted_ids)
 
     def get_partition(
         self,
         indexer_configuration_id: int,
         partition_id: int,
         nb_partitions: int,
         page_token: Optional[str] = None,
         limit: int = 1000,
     ) -> PagedResult[Sha1]:
         """Retrieve ids of content with `indexer_type` within partition partition_id
         bound by limit.
 
         Args:
             **indexer_type**: Type of data content to index (mimetype, language, etc...)
             **indexer_configuration_id**: The tool used to index data
             **partition_id**: index of the partition to fetch
             **nb_partitions**: total number of partitions to split into
             **page_token**: opaque token used for pagination
             **limit**: Limit result (default to 1000)
             **with_textual_data** (bool): Deal with only textual content (True) or all
                 content (all contents by defaults, False)
 
         Raises:
             IndexerStorageArgumentException for;
             - limit to None
             - wrong indexer_type provided
 
         Returns:
             PagedResult of Sha1. If next_page_token is None, there is no more data to
             fetch
 
         """
         if limit is None:
             raise IndexerStorageArgumentException("limit should not be None")
         (start, end) = get_partition_bounds_bytes(
             partition_id, nb_partitions, SHA1_SIZE
         )
 
         if page_token:
             start = hash_to_bytes(page_token)
         if end is None:
             end = b"\xff" * SHA1_SIZE
 
         next_page_token: Optional[str] = None
         ids: List[Sha1] = []
         sha1s = (sha1 for sha1 in self._sorted_ids.iter_from(start))
         for counter, sha1 in enumerate(sha1s):
             if sha1 > end:
                 break
             if counter >= limit:
                 next_page_token = hash_to_hex(sha1)
                 break
             ids.append(sha1)
 
         assert len(ids) <= limit
         return PagedResult(results=ids, next_page_token=next_page_token)
 
     def add(self, data: Iterable[TValue]) -> int:
         """Add data not present in storage.
 
         Args:
             data (iterable): dictionaries with keys:
 
               - **id**: sha1
               - **indexer_configuration_id**: tool used to compute the
                 results
               - arbitrary data
 
         """
         data = list(data)
         check_id_duplicates(data)
+        object_type = self.row_class.object_type  # type: ignore
+        self._journal_writer.write_additions(object_type, data)
         count = 0
         for obj in data:
             item = obj.to_dict()
             id_ = item.pop("id")
             tool_id = item["indexer_configuration_id"]
             key = _key_from_dict(obj.unique_key())
             self._data[id_][key] = item
             self._tools_per_id[id_].add(tool_id)
             count += 1
             if id_ not in self._sorted_ids:
                 self._sorted_ids.add(id_)
         return count
 
 
 class IndexerStorage:
     """In-memory SWH indexer storage."""
 
-    def __init__(self):
+    def __init__(self, journal_writer=None):
         self._tools = {}
-        self._mimetypes = SubStorage(ContentMimetypeRow, self._tools)
-        self._languages = SubStorage(ContentLanguageRow, self._tools)
-        self._content_ctags = SubStorage(ContentCtagsRow, self._tools)
-        self._licenses = SubStorage(ContentLicenseRow, self._tools)
-        self._content_metadata = SubStorage(ContentMetadataRow, self._tools)
+
+        def tool_getter(id_):
+            tool = self._tools[id_]
+            return {
+                "id": tool["id"],
+                "name": tool["tool_name"],
+                "version": tool["tool_version"],
+                "configuration": tool["tool_configuration"],
+            }
+
+        self.journal_writer = JournalWriter(tool_getter, journal_writer)
+        args = (self._tools, self.journal_writer)
+        self._mimetypes = SubStorage(ContentMimetypeRow, *args)
+        self._languages = SubStorage(ContentLanguageRow, *args)
+        self._content_ctags = SubStorage(ContentCtagsRow, *args)
+        self._licenses = SubStorage(ContentLicenseRow, *args)
+        self._content_metadata = SubStorage(ContentMetadataRow, *args)
         self._revision_intrinsic_metadata = SubStorage(
-            RevisionIntrinsicMetadataRow, self._tools
-        )
-        self._origin_intrinsic_metadata = SubStorage(
-            OriginIntrinsicMetadataRow, self._tools
+            RevisionIntrinsicMetadataRow, *args
         )
+        self._origin_intrinsic_metadata = SubStorage(OriginIntrinsicMetadataRow, *args)
 
     def check_config(self, *, check_write):
         return True
 
     def content_mimetype_missing(
         self, mimetypes: Iterable[Dict]
     ) -> List[Tuple[Sha1, int]]:
         return self._mimetypes.missing(mimetypes)
 
     def content_mimetype_get_partition(
         self,
         indexer_configuration_id: int,
         partition_id: int,
         nb_partitions: int,
         page_token: Optional[str] = None,
         limit: int = 1000,
     ) -> PagedResult[Sha1]:
         return self._mimetypes.get_partition(
             indexer_configuration_id, partition_id, nb_partitions, page_token, limit
         )
 
     def content_mimetype_add(
         self, mimetypes: List[ContentMimetypeRow]
     ) -> Dict[str, int]:
         added = self._mimetypes.add(mimetypes)
         return {"content_mimetype:add": added}
 
     def content_mimetype_get(self, ids: Iterable[Sha1]) -> List[ContentMimetypeRow]:
         return self._mimetypes.get(ids)
 
     def content_language_missing(
         self, languages: Iterable[Dict]
     ) -> List[Tuple[Sha1, int]]:
         return self._languages.missing(languages)
 
     def content_language_get(self, ids: Iterable[Sha1]) -> List[ContentLanguageRow]:
         return self._languages.get(ids)
 
     def content_language_add(
         self, languages: List[ContentLanguageRow]
     ) -> Dict[str, int]:
         added = self._languages.add(languages)
         return {"content_language:add": added}
 
     def content_ctags_missing(self, ctags: Iterable[Dict]) -> List[Tuple[Sha1, int]]:
         return self._content_ctags.missing(ctags)
 
     def content_ctags_get(self, ids: Iterable[Sha1]) -> List[ContentCtagsRow]:
         return self._content_ctags.get(ids)
 
     def content_ctags_add(self, ctags: List[ContentCtagsRow]) -> Dict[str, int]:
         added = self._content_ctags.add(ctags)
         return {"content_ctags:add": added}
 
     def content_ctags_search(
         self, expression: str, limit: int = 10, last_sha1: Optional[Sha1] = None
     ) -> List[ContentCtagsRow]:
         nb_matches = 0
         items_per_id: Dict[Tuple[Sha1Git, ToolId], List[ContentCtagsRow]] = {}
         for item in sorted(self._content_ctags.get_all()):
             if item.id <= (last_sha1 or bytes(0 for _ in range(SHA1_DIGEST_SIZE))):
                 continue
             items_per_id.setdefault(
                 (item.id, item.indexer_configuration_id), []
             ).append(item)
 
         results = []
         for items in items_per_id.values():
             for item in items:
                 if item.name != expression:
                     continue
                 nb_matches += 1
                 if nb_matches > limit:
                     break
                 results.append(item)
 
         return results
 
     def content_fossology_license_get(
         self, ids: Iterable[Sha1]
     ) -> List[ContentLicenseRow]:
         return self._licenses.get(ids)
 
     def content_fossology_license_add(
         self, licenses: List[ContentLicenseRow]
     ) -> Dict[str, int]:
         added = self._licenses.add(licenses)
         return {"content_fossology_license:add": added}
 
     def content_fossology_license_get_partition(
         self,
         indexer_configuration_id: int,
         partition_id: int,
         nb_partitions: int,
         page_token: Optional[str] = None,
         limit: int = 1000,
     ) -> PagedResult[Sha1]:
         return self._licenses.get_partition(
             indexer_configuration_id, partition_id, nb_partitions, page_token, limit
         )
 
     def content_metadata_missing(
         self, metadata: Iterable[Dict]
     ) -> List[Tuple[Sha1, int]]:
         return self._content_metadata.missing(metadata)
 
     def content_metadata_get(self, ids: Iterable[Sha1]) -> List[ContentMetadataRow]:
         return self._content_metadata.get(ids)
 
     def content_metadata_add(
         self, metadata: List[ContentMetadataRow]
     ) -> Dict[str, int]:
         added = self._content_metadata.add(metadata)
         return {"content_metadata:add": added}
 
     def revision_intrinsic_metadata_missing(
         self, metadata: Iterable[Dict]
     ) -> List[Tuple[Sha1, int]]:
         return self._revision_intrinsic_metadata.missing(metadata)
 
     def revision_intrinsic_metadata_get(
         self, ids: Iterable[Sha1]
     ) -> List[RevisionIntrinsicMetadataRow]:
         return self._revision_intrinsic_metadata.get(ids)
 
     def revision_intrinsic_metadata_add(
         self, metadata: List[RevisionIntrinsicMetadataRow]
     ) -> Dict[str, int]:
         added = self._revision_intrinsic_metadata.add(metadata)
         return {"revision_intrinsic_metadata:add": added}
 
     def origin_intrinsic_metadata_get(
         self, urls: Iterable[str]
     ) -> List[OriginIntrinsicMetadataRow]:
         return self._origin_intrinsic_metadata.get(urls)
 
     def origin_intrinsic_metadata_add(
         self, metadata: List[OriginIntrinsicMetadataRow]
     ) -> Dict[str, int]:
         added = self._origin_intrinsic_metadata.add(metadata)
         return {"origin_intrinsic_metadata:add": added}
 
     def origin_intrinsic_metadata_search_fulltext(
         self, conjunction: List[str], limit: int = 100
     ) -> List[OriginIntrinsicMetadataRow]:
         # A very crude fulltext search implementation, but that's enough
         # to work on English metadata
         tokens_re = re.compile("[a-zA-Z0-9]+")
         search_tokens = list(itertools.chain(*map(tokens_re.findall, conjunction)))
 
         def rank(data):
             # Tokenize the metadata
             text = json.dumps(data.metadata)
             text_tokens = tokens_re.findall(text)
             text_token_occurences = Counter(text_tokens)
 
             # Count the number of occurrences of search tokens in the text
             score = 0
             for search_token in search_tokens:
                 if text_token_occurences[search_token] == 0:
                     # Search token is not in the text.
                     return 0
                 score += text_token_occurences[search_token]
 
             # Normalize according to the text's length
             return score / math.log(len(text_tokens))
 
         results = [
             (rank(data), data) for data in self._origin_intrinsic_metadata.get_all()
         ]
         results = [(rank_, data) for (rank_, data) in results if rank_ > 0]
         results.sort(
             key=operator.itemgetter(0), reverse=True  # Don't try to order 'data'
         )
         return [result for (rank_, result) in results[:limit]]
 
     def origin_intrinsic_metadata_search_by_producer(
         self,
         page_token: str = "",
         limit: int = 100,
         ids_only: bool = False,
         mappings: Optional[List[str]] = None,
         tool_ids: Optional[List[int]] = None,
     ) -> PagedResult[Union[str, OriginIntrinsicMetadataRow]]:
         assert isinstance(page_token, str)
         nb_results = 0
         if mappings is not None:
             mapping_set = frozenset(mappings)
         if tool_ids is not None:
             tool_id_set = frozenset(tool_ids)
         rows = []
 
         # we go to limit+1 to check whether we should add next_page_token in
         # the response
         for entry in self._origin_intrinsic_metadata.get_all():
             if entry.id <= page_token:
                 continue
             if nb_results >= (limit + 1):
                 break
             if mappings and mapping_set.isdisjoint(entry.mappings):
                 continue
             if tool_ids and entry.tool["id"] not in tool_id_set:
                 continue
             rows.append(entry)
             nb_results += 1
 
         if len(rows) > limit:
             rows = rows[:limit]
             next_page_token = rows[-1].id
         else:
             next_page_token = None
         if ids_only:
             rows = [row.id for row in rows]
         return PagedResult(results=rows, next_page_token=next_page_token,)
 
     def origin_intrinsic_metadata_stats(self):
         mapping_count = {m: 0 for m in MAPPING_NAMES}
         total = non_empty = 0
         for data in self._origin_intrinsic_metadata.get_all():
             total += 1
             if set(data.metadata) - {"@context"}:
                 non_empty += 1
             for mapping in data.mappings:
                 mapping_count[mapping] += 1
         return {"per_mapping": mapping_count, "total": total, "non_empty": non_empty}
 
     def indexer_configuration_add(self, tools):
         inserted = []
         for tool in tools:
             tool = tool.copy()
             id_ = self._tool_key(tool)
             tool["id"] = id_
             self._tools[id_] = tool
             inserted.append(tool)
         return inserted
 
     def indexer_configuration_get(self, tool):
         return self._tools.get(self._tool_key(tool))
 
     def _tool_key(self, tool):
         return hash(
             (
                 tool["tool_name"],
                 tool["tool_version"],
                 json.dumps(tool["tool_configuration"], sort_keys=True),
             )
         )
diff --git a/swh/indexer/storage/model.py b/swh/indexer/storage/model.py
index cc9c954..d14d107 100644
--- a/swh/indexer/storage/model.py
+++ b/swh/indexer/storage/model.py
@@ -1,122 +1,135 @@
 # Copyright (C) 2020  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 """Classes used internally by the in-memory idx-storage, and will be
 used for the interface of the idx-storage in the near future."""
 
 from __future__ import annotations
 
 from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar
 
 import attr
+from typing_extensions import Final
 
 from swh.model.model import Sha1Git, dictify
 
 TSelf = TypeVar("TSelf")
 
 
 @attr.s
 class BaseRow:
     UNIQUE_KEY_FIELDS: Tuple = ("id", "indexer_configuration_id")
 
     id = attr.ib(type=Any)
     indexer_configuration_id = attr.ib(type=Optional[int], default=None, kw_only=True)
     tool = attr.ib(type=Optional[Dict], default=None, kw_only=True)
 
     def __attrs_post_init__(self):
         if self.indexer_configuration_id is None and self.tool is None:
             raise TypeError("Either indexer_configuration_id or tool must be not None.")
         if self.indexer_configuration_id is not None and self.tool is not None:
             raise TypeError(
                 "indexer_configuration_id and tool are mutually exclusive; "
                 "only one may be not None."
             )
 
     def anonymize(self: TSelf) -> Optional[TSelf]:
         # Needed to implement swh.journal.writer.ValueProtocol
         return None
 
     def to_dict(self) -> Dict[str, Any]:
         """Wrapper of `attr.asdict` that can be overridden by subclasses
         that have special handling of some of the fields."""
         d = dictify(attr.asdict(self, recurse=False))
         if d["indexer_configuration_id"] is None:
             del d["indexer_configuration_id"]
         if d["tool"] is None:
             del d["tool"]
 
         return d
 
     @classmethod
     def from_dict(cls: Type[TSelf], d) -> TSelf:
         return cls(**d)  # type: ignore
 
     def unique_key(self) -> Dict:
         if self.indexer_configuration_id is None:
             raise ValueError(
                 "Can only call unique_key() on objects without "
                 "indexer_configuration_id."
             )
         return {key: getattr(self, key) for key in self.UNIQUE_KEY_FIELDS}
 
 
 @attr.s
 class ContentMimetypeRow(BaseRow):
+    object_type: Final = "content_mimetype"
+
     id = attr.ib(type=Sha1Git)
     mimetype = attr.ib(type=str)
     encoding = attr.ib(type=str)
 
 
 @attr.s
 class ContentLanguageRow(BaseRow):
+    object_type: Final = "content_language"
+
     id = attr.ib(type=Sha1Git)
     lang = attr.ib(type=str)
 
 
 @attr.s
 class ContentCtagsRow(BaseRow):
+    object_type: Final = "content_ctags"
     UNIQUE_KEY_FIELDS = (
         "id",
         "indexer_configuration_id",
         "name",
         "kind",
         "line",
         "lang",
     )
 
     id = attr.ib(type=Sha1Git)
     name = attr.ib(type=str)
     kind = attr.ib(type=str)
     line = attr.ib(type=int)
     lang = attr.ib(type=str)
 
 
 @attr.s
 class ContentLicenseRow(BaseRow):
+    object_type: Final = "content_fossology_license"
     UNIQUE_KEY_FIELDS = ("id", "indexer_configuration_id", "license")
 
     id = attr.ib(type=Sha1Git)
     license = attr.ib(type=str)
 
 
 @attr.s
 class ContentMetadataRow(BaseRow):
+    object_type: Final = "content_metadata"
+
     id = attr.ib(type=Sha1Git)
     metadata = attr.ib(type=Dict[str, Any])
 
 
 @attr.s
 class RevisionIntrinsicMetadataRow(BaseRow):
+    object_type: Final = "revision_intrinsic_metadata"
+
     id = attr.ib(type=Sha1Git)
     metadata = attr.ib(type=Dict[str, Any])
     mappings = attr.ib(type=List[str])
 
 
 @attr.s
 class OriginIntrinsicMetadataRow(BaseRow):
+    object_type: Final = "origin_intrinsic_metadata"
+
     id = attr.ib(type=str)
     metadata = attr.ib(type=Dict[str, Any])
     from_revision = attr.ib(type=Sha1Git)
     mappings = attr.ib(type=List[str])
diff --git a/swh/indexer/storage/writer.py b/swh/indexer/storage/writer.py
new file mode 100644
index 0000000..adae76d
--- /dev/null
+++ b/swh/indexer/storage/writer.py
@@ -0,0 +1,64 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, Callable, Dict, Iterable
+
+import attr
+
+try:
+    from swh.journal.writer import get_journal_writer
+except ImportError:
+    get_journal_writer = None  # type: ignore
+    # mypy limitation, see https://github.com/python/mypy/issues/1153
+
+from .model import BaseRow
+
+
+class JournalWriter:
+    """Journal writer storage collaborator. It's in charge of adding objects to
+    the journal.
+
+    """
+
+    def __init__(self, tool_getter: Callable[[int], Dict[str, Any]], journal_writer):
+        """
+        Args:
+            tool_getter: a callable that takes a tool_id and return a dict representing
+                         a tool object
+            journal_writer: configuration passed to
+                            `swh.journal.writer.get_journal_writer`
+        """
+        self._tool_getter = tool_getter
+        if journal_writer:
+            if get_journal_writer is None:
+                raise EnvironmentError(
+                    "You need the swh.journal package to use the "
+                    "journal_writer feature"
+                )
+            self.journal = get_journal_writer(**journal_writer)
+        else:
+            self.journal = None
+
+    def write_additions(self, obj_type, entries: Iterable[BaseRow]) -> None:
+        if not self.journal:
+            return
+
+        # usually, all the additions in a batch are from the same indexer,
+        # so this cache allows doing a single query for all the entries.
+        tool_cache = {}
+
+        for entry in entries:
+            assert entry.object_type == obj_type  # type: ignore
+            # get the tool used to generate this addition
+            tool_id = entry.indexer_configuration_id
+            assert tool_id
+            if tool_id not in tool_cache:
+                tool_cache[tool_id] = self._tool_getter(tool_id)
+            entry = attr.evolve(
+                entry, tool=tool_cache[tool_id], indexer_configuration_id=None
+            )
+
+            # write to kafka
+            self.journal.write_addition(obj_type, entry)
diff --git a/swh/indexer/tests/storage/conftest.py b/swh/indexer/tests/storage/conftest.py
index 80a013f..133404b 100644
--- a/swh/indexer/tests/storage/conftest.py
+++ b/swh/indexer/tests/storage/conftest.py
@@ -1,76 +1,80 @@
 # Copyright (C) 2015-2019  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from os.path import join
 
 import pytest
 
 from swh.indexer.storage import get_indexer_storage
 from swh.indexer.storage.model import ContentLicenseRow, ContentMimetypeRow
 from swh.model.hashutil import hash_to_bytes
 from swh.storage.pytest_plugin import postgresql_fact
 
 from . import SQL_DIR
 from .generate_data_test import FOSSOLOGY_LICENSES, MIMETYPE_OBJECTS, TOOLS
 
 DUMP_FILES = join(SQL_DIR, "*.sql")
 
 
 class DataObj(dict):
     def __getattr__(self, key):
         return self.__getitem__(key)
 
     def __setattr__(self, key, value):
         return self.__setitem__(key, value)
 
 
 @pytest.fixture
 def swh_indexer_storage_with_data(swh_indexer_storage):
     data = DataObj()
     tools = {
         tool["tool_name"]: {
             "id": tool["id"],
             "name": tool["tool_name"],
             "version": tool["tool_version"],
             "configuration": tool["tool_configuration"],
         }
         for tool in swh_indexer_storage.indexer_configuration_add(TOOLS)
     }
     data.tools = tools
     data.sha1_1 = hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689")
     data.sha1_2 = hash_to_bytes("61c2b3a30496d329e21af70dd2d7e097046d07b7")
     data.revision_id_1 = hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238")
     data.revision_id_2 = hash_to_bytes("7026b7c1a2af56521e9587659012345678904321")
     data.revision_id_3 = hash_to_bytes("7026b7c1a2af56521e9587659012345678904320")
     data.origin_url_1 = "file:///dev/0/zero"  # 44434341
     data.origin_url_2 = "file:///dev/1/one"  # 44434342
     data.origin_url_3 = "file:///dev/2/two"  # 54974445
     data.mimetypes = [
         ContentMimetypeRow(indexer_configuration_id=tools["file"]["id"], **mimetype_obj)
         for mimetype_obj in MIMETYPE_OBJECTS
     ]
     swh_indexer_storage.content_mimetype_add(data.mimetypes)
     data.fossology_licenses = [
         ContentLicenseRow(
             id=fossology_obj["id"],
             indexer_configuration_id=tools["nomos"]["id"],
             license=license,
         )
         for fossology_obj in FOSSOLOGY_LICENSES
         for license in fossology_obj["licenses"]
     ]
     swh_indexer_storage._test_data = data
 
     return (swh_indexer_storage, data)
 
 
 swh_indexer_storage_postgresql = postgresql_fact(
     "postgresql_proc", dump_files=DUMP_FILES
 )
 
 
 @pytest.fixture
 def swh_indexer_storage(swh_indexer_storage_postgresql):
-    return get_indexer_storage("local", db=swh_indexer_storage_postgresql.dsn)
+    return get_indexer_storage(
+        "local",
+        db=swh_indexer_storage_postgresql.dsn,
+        journal_writer={"cls": "memory",},
+    )
diff --git a/swh/indexer/tests/storage/test_api_client.py b/swh/indexer/tests/storage/test_api_client.py
index 2769309..993596d 100644
--- a/swh/indexer/tests/storage/test_api_client.py
+++ b/swh/indexer/tests/storage/test_api_client.py
@@ -1,35 +1,54 @@
 # Copyright (C) 2015-2019  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import pytest
 
 from swh.indexer.storage import get_indexer_storage
 from swh.indexer.storage.api.client import RemoteStorage
 import swh.indexer.storage.api.server as server
 
 from .test_storage import *  # noqa
 
 
 @pytest.fixture
-def app(swh_indexer_storage_postgresql):
-    server.storage = get_indexer_storage("local", db=swh_indexer_storage_postgresql.dsn)
-    return server.app
+def app_server(swh_indexer_storage_postgresql):
+    server.storage = get_indexer_storage(
+        "local",
+        db=swh_indexer_storage_postgresql.dsn,
+        journal_writer={"cls": "memory",},
+    )
+    yield server
+
+
+@pytest.fixture
+def app(app_server):
+    return app_server.app
 
 
 @pytest.fixture
 def swh_rpc_client_class():
     # these are needed for the swh_indexer_storage_with_data fixture
     assert hasattr(RemoteStorage, "indexer_configuration_add")
     assert hasattr(RemoteStorage, "content_mimetype_add")
     return RemoteStorage
 
 
 @pytest.fixture
-def swh_indexer_storage(swh_rpc_client, app):
+def swh_indexer_storage(swh_rpc_client, app_server):
     # This version of the swh_storage fixture uses the swh_rpc_client fixture
     # to instantiate a RemoteStorage (see swh_rpc_client_class above) that
     # proxies, via the swh.core RPC mechanism, the local (in memory) storage
     # configured in the app fixture above.
-    return swh_rpc_client
+    #
+    # Also note that, for the sake of
+    # making it easier to write tests, the in-memory journal writer of the
+    # in-memory backend storage is attached to the RemoteStorage as its
+    # journal_writer attribute.
+    storage = swh_rpc_client
+
+    journal_writer = getattr(storage, "journal_writer", None)
+    storage.journal_writer = app_server.storage.journal_writer
+    yield storage
+    storage.journal_writer = journal_writer
diff --git a/swh/indexer/tests/storage/test_in_memory.py b/swh/indexer/tests/storage/test_in_memory.py
index 6457891..854d4fd 100644
--- a/swh/indexer/tests/storage/test_in_memory.py
+++ b/swh/indexer/tests/storage/test_in_memory.py
@@ -1,15 +1,15 @@
 # Copyright (C) 2015-2019  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import pytest
 
 from swh.indexer.storage import get_indexer_storage
 
 from .test_storage import *  # noqa
 
 
 @pytest.fixture
 def swh_indexer_storage():
-    return get_indexer_storage("memory")
+    return get_indexer_storage("memory", journal_writer={"cls": "memory",})
diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py
index cd55a2b..c17c3e8 100644
--- a/swh/indexer/tests/storage/test_storage.py
+++ b/swh/indexer/tests/storage/test_storage.py
@@ -1,1705 +1,1722 @@
 # Copyright (C) 2015-2020  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import math
 import threading
 from typing import Any, Dict, List, Tuple, Type
 
 import attr
 import pytest
 
 from swh.indexer.storage.exc import DuplicateId, IndexerStorageArgumentException
 from swh.indexer.storage.interface import IndexerStorageInterface, PagedResult
 from swh.indexer.storage.model import (
     BaseRow,
     ContentCtagsRow,
     ContentLanguageRow,
     ContentLicenseRow,
     ContentMetadataRow,
     ContentMimetypeRow,
     OriginIntrinsicMetadataRow,
     RevisionIntrinsicMetadataRow,
 )
 from swh.model.hashutil import hash_to_bytes
 
 
 def prepare_mimetypes_from_licenses(
     fossology_licenses: List[ContentLicenseRow],
 ) -> List[ContentMimetypeRow]:
     """Fossology license needs some consistent data in db to run.
 
     """
     mimetypes = []
     for c in fossology_licenses:
         mimetypes.append(
             ContentMimetypeRow(
                 id=c.id,
                 mimetype="text/plain",  # for filtering on textual data to work
                 encoding="utf-8",
                 indexer_configuration_id=c.indexer_configuration_id,
             )
         )
     return mimetypes
 
 
 def endpoint_name(etype: str, ename: str) -> str:
     """Compute the storage's endpoint's name
 
     >>> endpoint_name('content_mimetype', 'add')
     'content_mimetype_add'
     >>> endpoint_name('content_fosso_license', 'delete')
     'content_fosso_license_delete'
 
     """
     return f"{etype}_{ename}"
 
 
 def endpoint(storage, etype: str, ename: str):
     return getattr(storage, endpoint_name(etype, ename))
 
 
 def expected_summary(count: int, etype: str, ename: str = "add") -> Dict[str, int]:
     """Compute the expected summary
 
     The key is determine according to etype and ename
 
         >>> expected_summary(10, 'content_mimetype', 'add')
         {'content_mimetype:add': 10}
         >>> expected_summary(9, 'origin_intrinsic_metadata', 'delete')
         {'origin_intrinsic_metadata:del': 9}
 
     """
     pattern = ename[0:3]
     key = endpoint_name(etype, ename).replace(f"_{ename}", f":{pattern}")
     return {key: count}
 
 
 def test_check_config(swh_indexer_storage) -> None:
     assert swh_indexer_storage.check_config(check_write=True)
     assert swh_indexer_storage.check_config(check_write=False)
 
 
 class StorageETypeTester:
     """Base class for testing a series of common behaviour between a bunch of
     endpoint types supported by an IndexerStorage.
 
     This is supposed to be inherited with the following class attributes:
     - endpoint_type
     - tool_name
     - example_data
 
     See below for example usage.
     """
 
     endpoint_type: str
     tool_name: str
     example_data: List[Dict]
     row_class: Type[BaseRow]
 
     def test_missing(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         etype = self.endpoint_type
         tool_id = data.tools[self.tool_name]["id"]
 
         # given 2 (hopefully) unknown objects
         query = [
             {"id": data.sha1_1, "indexer_configuration_id": tool_id,},
             {"id": data.sha1_2, "indexer_configuration_id": tool_id,},
         ]
 
         # we expect these are both returned by the xxx_missing endpoint
         actual_missing = endpoint(storage, etype, "missing")(query)
         assert list(actual_missing) == [
             data.sha1_1,
             data.sha1_2,
         ]
 
         # now, when we add one of them
         summary = endpoint(storage, etype, "add")(
             [
                 self.row_class.from_dict(
                     {
                         "id": data.sha1_2,
                         **self.example_data[0],
                         "indexer_configuration_id": tool_id,
                     }
                 )
             ]
         )
 
         assert summary == expected_summary(1, etype)
 
         # we expect only the other one returned
         actual_missing = endpoint(storage, etype, "missing")(query)
         assert list(actual_missing) == [data.sha1_1]
 
     def test_add__update_in_place_duplicate(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         etype = self.endpoint_type
         tool = data.tools[self.tool_name]
 
         data_v1 = {
             "id": data.sha1_2,
             **self.example_data[0],
             "indexer_configuration_id": tool["id"],
         }
 
         # given
         summary = endpoint(storage, etype, "add")([self.row_class.from_dict(data_v1)])
         assert summary == expected_summary(1, etype)  # not added
 
         # when
         actual_data = list(endpoint(storage, etype, "get")([data.sha1_2]))
 
         expected_data_v1 = [
             self.row_class.from_dict(
                 {"id": data.sha1_2, **self.example_data[0], "tool": tool}
             )
         ]
 
         # then
         assert actual_data == expected_data_v1
 
         # given
         data_v2 = data_v1.copy()
         data_v2.update(self.example_data[1])
 
         endpoint(storage, etype, "add")([self.row_class.from_dict(data_v2)])
         assert summary == expected_summary(1, etype)  # modified so counted
 
         actual_data = list(endpoint(storage, etype, "get")([data.sha1_2]))
 
         expected_data_v2 = [
             self.row_class.from_dict(
                 {"id": data.sha1_2, **self.example_data[1], "tool": tool,}
             )
         ]
 
         # data did change as the v2 was used to overwrite v1
         assert actual_data == expected_data_v2
 
     def test_add_deadlock(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         etype = self.endpoint_type
         tool = data.tools[self.tool_name]
 
         hashes = [
             hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4{:03d}".format(i))
             for i in range(1000)
         ]
 
         data_v1 = [
             self.row_class.from_dict(
                 {
                     "id": hash_,
                     **self.example_data[0],
                     "indexer_configuration_id": tool["id"],
                 }
             )
             for hash_ in hashes
         ]
         data_v2 = [
             self.row_class.from_dict(
                 {
                     "id": hash_,
                     **self.example_data[1],
                     "indexer_configuration_id": tool["id"],
                 }
             )
             for hash_ in hashes
         ]
 
         # Remove one item from each, so that both queries have to succeed for
         # all items to be in the DB.
         data_v2a = data_v2[1:]
         data_v2b = list(reversed(data_v2[0:-1]))
 
         # given
         endpoint(storage, etype, "add")(data_v1)
 
         # when
         actual_data = sorted(
             endpoint(storage, etype, "get")(hashes), key=lambda x: x.id,
         )
 
         expected_data_v1 = [
             self.row_class.from_dict(
                 {"id": hash_, **self.example_data[0], "tool": tool}
             )
             for hash_ in hashes
         ]
 
         # then
         assert actual_data == expected_data_v1
 
         # given
         def f1() -> None:
             endpoint(storage, etype, "add")(data_v2a)
 
         def f2() -> None:
             endpoint(storage, etype, "add")(data_v2b)
 
         t1 = threading.Thread(target=f1)
         t2 = threading.Thread(target=f2)
         t2.start()
         t1.start()
 
         t1.join()
         t2.join()
 
         actual_data = sorted(
             endpoint(storage, etype, "get")(hashes), key=lambda x: x.id,
         )
 
         expected_data_v2 = [
             self.row_class.from_dict(
                 {"id": hash_, **self.example_data[1], "tool": tool}
             )
             for hash_ in hashes
         ]
 
         assert len(actual_data) == len(expected_data_v1) == len(expected_data_v2)
         for (item, expected_item_v1, expected_item_v2) in zip(
             actual_data, expected_data_v1, expected_data_v2
         ):
             assert item in (expected_item_v1, expected_item_v2)
 
     def test_add__duplicate_twice(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         etype = self.endpoint_type
         tool = data.tools[self.tool_name]
 
         data_rev1 = self.row_class.from_dict(
             {
                 "id": data.revision_id_2,
                 **self.example_data[0],
                 "indexer_configuration_id": tool["id"],
             }
         )
 
         data_rev2 = self.row_class.from_dict(
             {
                 "id": data.revision_id_2,
                 **self.example_data[1],
                 "indexer_configuration_id": tool["id"],
             }
         )
 
         # when
         summary = endpoint(storage, etype, "add")([data_rev1])
         assert summary == expected_summary(1, etype)
 
         with pytest.raises(DuplicateId):
             endpoint(storage, etype, "add")([data_rev2, data_rev2])
 
         # then
         actual_data = list(
             endpoint(storage, etype, "get")([data.revision_id_2, data.revision_id_1])
         )
 
         expected_data = [
             self.row_class.from_dict(
                 {"id": data.revision_id_2, **self.example_data[0], "tool": tool}
             )
         ]
         assert actual_data == expected_data
 
-    def test_get(
+    def test_add(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         etype = self.endpoint_type
         tool = data.tools[self.tool_name]
 
+        # conftest fills it with mimetypes
+        storage.journal_writer.journal.objects = []  # type: ignore
+
         query = [data.sha1_2, data.sha1_1]
         data1 = self.row_class.from_dict(
             {
                 "id": data.sha1_2,
                 **self.example_data[0],
                 "indexer_configuration_id": tool["id"],
             }
         )
 
         # when
         summary = endpoint(storage, etype, "add")([data1])
         assert summary == expected_summary(1, etype)
 
         # then
         actual_data = list(endpoint(storage, etype, "get")(query))
 
         # then
         expected_data = [
             self.row_class.from_dict(
                 {"id": data.sha1_2, **self.example_data[0], "tool": tool}
             )
         ]
 
         assert actual_data == expected_data
 
+        journal_objects = storage.journal_writer.journal.objects  # type: ignore
+        actual_journal_data = [
+            obj for (obj_type, obj) in journal_objects if obj_type == self.endpoint_type
+        ]
+        assert list(sorted(actual_journal_data)) == list(sorted(expected_data))
+
 
 class TestIndexerStorageContentMimetypes(StorageETypeTester):
     """Test Indexer Storage content_mimetype related methods
     """
 
     endpoint_type = "content_mimetype"
     tool_name = "file"
     example_data = [
         {"mimetype": "text/plain", "encoding": "utf-8",},
         {"mimetype": "text/html", "encoding": "us-ascii",},
     ]
     row_class = ContentMimetypeRow
 
     def test_generate_content_mimetype_get_partition_failure(
         self, swh_indexer_storage: IndexerStorageInterface
     ) -> None:
         """get_partition call with wrong limit input should fail"""
         storage = swh_indexer_storage
         indexer_configuration_id = 42
         with pytest.raises(
             IndexerStorageArgumentException, match="limit should not be None"
         ):
             storage.content_mimetype_get_partition(
                 indexer_configuration_id, 0, 3, limit=None  # type: ignore
             )
 
     def test_generate_content_mimetype_get_partition_no_limit(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         """get_partition should return result"""
         storage, data = swh_indexer_storage_with_data
         mimetypes = data.mimetypes
 
         expected_ids = set([c.id for c in mimetypes])
         indexer_configuration_id = mimetypes[0].indexer_configuration_id
 
         assert len(mimetypes) == 16
         nb_partitions = 16
 
         actual_ids = []
         for partition_id in range(nb_partitions):
             actual_result = storage.content_mimetype_get_partition(
                 indexer_configuration_id, partition_id, nb_partitions
             )
             assert actual_result.next_page_token is None
             actual_ids.extend(actual_result.results)
 
         assert len(actual_ids) == len(expected_ids)
         for actual_id in actual_ids:
             assert actual_id in expected_ids
 
     def test_generate_content_mimetype_get_partition_full(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         """get_partition for a single partition should return available ids
 
         """
         storage, data = swh_indexer_storage_with_data
         mimetypes = data.mimetypes
         expected_ids = set([c.id for c in mimetypes])
         indexer_configuration_id = mimetypes[0].indexer_configuration_id
 
         actual_result = storage.content_mimetype_get_partition(
             indexer_configuration_id, 0, 1
         )
         assert actual_result.next_page_token is None
         actual_ids = actual_result.results
         assert len(actual_ids) == len(expected_ids)
         for actual_id in actual_ids:
             assert actual_id in expected_ids
 
     def test_generate_content_mimetype_get_partition_empty(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         """get_partition when at least one of the partitions is empty"""
         storage, data = swh_indexer_storage_with_data
         mimetypes = data.mimetypes
         expected_ids = set([c.id for c in mimetypes])
         indexer_configuration_id = mimetypes[0].indexer_configuration_id
 
         # nb_partitions = smallest power of 2 such that at least one of
         # the partitions is empty
         nb_mimetypes = len(mimetypes)
         nb_partitions = 1 << math.floor(math.log2(nb_mimetypes) + 1)
 
         seen_ids = []
 
         for partition_id in range(nb_partitions):
             actual_result = storage.content_mimetype_get_partition(
                 indexer_configuration_id,
                 partition_id,
                 nb_partitions,
                 limit=nb_mimetypes + 1,
             )
 
             for actual_id in actual_result.results:
                 seen_ids.append(actual_id)
 
             # Limit is higher than the max number of results
             assert actual_result.next_page_token is None
 
         assert set(seen_ids) == expected_ids
 
     def test_generate_content_mimetype_get_partition_with_pagination(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         """get_partition should return ids provided with pagination
 
         """
         storage, data = swh_indexer_storage_with_data
         mimetypes = data.mimetypes
         expected_ids = set([c.id for c in mimetypes])
         indexer_configuration_id = mimetypes[0].indexer_configuration_id
 
         nb_partitions = 4
 
         actual_ids = []
         for partition_id in range(nb_partitions):
             next_page_token = None
             while True:
                 actual_result = storage.content_mimetype_get_partition(
                     indexer_configuration_id,
                     partition_id,
                     nb_partitions,
                     limit=2,
                     page_token=next_page_token,
                 )
                 actual_ids.extend(actual_result.results)
                 next_page_token = actual_result.next_page_token
                 if next_page_token is None:
                     break
 
         assert len(set(actual_ids)) == len(set(expected_ids))
         for actual_id in actual_ids:
             assert actual_id in expected_ids
 
 
 class TestIndexerStorageContentLanguage(StorageETypeTester):
     """Test Indexer Storage content_language related methods
     """
 
     endpoint_type = "content_language"
     tool_name = "pygments"
     example_data = [
         {"lang": "haskell",},
         {"lang": "common-lisp",},
     ]
     row_class = ContentLanguageRow
 
 
 class TestIndexerStorageContentCTags(StorageETypeTester):
     """Test Indexer Storage content_ctags related methods
     """
 
     endpoint_type = "content_ctags"
     tool_name = "universal-ctags"
     example_data = [
         {"name": "done", "kind": "variable", "line": 119, "lang": "OCaml",},
         {"name": "done", "kind": "variable", "line": 100, "lang": "Python",},
         {"name": "main", "kind": "function", "line": 119, "lang": "Python",},
     ]
     row_class = ContentCtagsRow
 
     # the following tests are disabled because CTAGS behaves differently
     @pytest.mark.skip
     def test_add__update_in_place_duplicate(self):
         pass
 
     @pytest.mark.skip
     def test_add_deadlock(self):
         pass
 
     def test_content_ctags_search(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         # 1. given
         tool = data.tools["universal-ctags"]
         tool_id = tool["id"]
 
         ctags1 = [
             ContentCtagsRow(
                 id=data.sha1_1,
                 indexer_configuration_id=tool_id,
                 **kwargs,  # type: ignore
             )
             for kwargs in [
                 {"name": "hello", "kind": "function", "line": 133, "lang": "Python",},
                 {"name": "counter", "kind": "variable", "line": 119, "lang": "Python",},
                 {"name": "hello", "kind": "variable", "line": 210, "lang": "Python",},
             ]
         ]
         ctags1_with_tool = [
             attr.evolve(ctag, indexer_configuration_id=None, tool=tool)
             for ctag in ctags1
         ]
 
         ctags2 = [
             ContentCtagsRow(
                 id=data.sha1_2,
                 indexer_configuration_id=tool_id,
                 **kwargs,  # type: ignore
             )
             for kwargs in [
                 {"name": "hello", "kind": "variable", "line": 100, "lang": "C",},
                 {"name": "result", "kind": "variable", "line": 120, "lang": "C",},
             ]
         ]
         ctags2_with_tool = [
             attr.evolve(ctag, indexer_configuration_id=None, tool=tool)
             for ctag in ctags2
         ]
 
         storage.content_ctags_add(ctags1 + ctags2)
 
         # 1. when
         actual_ctags = list(storage.content_ctags_search("hello", limit=1))
 
         # 1. then
         assert actual_ctags == [ctags1_with_tool[0]]
 
         # 2. when
         actual_ctags = list(
             storage.content_ctags_search("hello", limit=1, last_sha1=data.sha1_1)
         )
 
         # 2. then
         assert actual_ctags == [ctags2_with_tool[0]]
 
         # 3. when
         actual_ctags = list(storage.content_ctags_search("hello"))
 
         # 3. then
         assert actual_ctags == [
             ctags1_with_tool[0],
             ctags1_with_tool[2],
             ctags2_with_tool[0],
         ]
 
         # 4. when
         actual_ctags = list(storage.content_ctags_search("counter"))
 
         # then
         assert actual_ctags == [ctags1_with_tool[1]]
 
         # 5. when
         actual_ctags = list(storage.content_ctags_search("result", limit=1))
 
         # then
         assert actual_ctags == [ctags2_with_tool[1]]
 
     def test_content_ctags_search_no_result(
         self, swh_indexer_storage: IndexerStorageInterface
     ) -> None:
         storage = swh_indexer_storage
         actual_ctags = list(storage.content_ctags_search("counter"))
 
         assert not actual_ctags
 
     def test_content_ctags_add__add_new_ctags_added(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
 
         # given
         tool = data.tools["universal-ctags"]
         tool_id = tool["id"]
 
         ctag1 = ContentCtagsRow(
             id=data.sha1_2,
             indexer_configuration_id=tool_id,
             name="done",
             kind="variable",
             line=100,
             lang="Scheme",
         )
         ctag1_with_tool = attr.evolve(ctag1, indexer_configuration_id=None, tool=tool)
 
         # given
         storage.content_ctags_add([ctag1])
         storage.content_ctags_add([ctag1])  # conflict does nothing
 
         # when
         actual_ctags = list(storage.content_ctags_get([data.sha1_2]))
 
         # then
         assert actual_ctags == [ctag1_with_tool]
 
         # given
         ctag2 = ContentCtagsRow(
             id=data.sha1_2,
             indexer_configuration_id=tool_id,
             name="defn",
             kind="function",
             line=120,
             lang="Scheme",
         )
         ctag2_with_tool = attr.evolve(ctag2, indexer_configuration_id=None, tool=tool)
 
         storage.content_ctags_add([ctag2])
 
         actual_ctags = list(storage.content_ctags_get([data.sha1_2]))
 
         assert actual_ctags == [ctag1_with_tool, ctag2_with_tool]
 
     def test_content_ctags_add__update_in_place(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         # given
         tool = data.tools["universal-ctags"]
         tool_id = tool["id"]
 
         ctag1 = ContentCtagsRow(
             id=data.sha1_2,
             indexer_configuration_id=tool_id,
             name="done",
             kind="variable",
             line=100,
             lang="Scheme",
         )
         ctag1_with_tool = attr.evolve(ctag1, indexer_configuration_id=None, tool=tool)
 
         # given
         storage.content_ctags_add([ctag1])
 
         # when
         actual_ctags = list(storage.content_ctags_get([data.sha1_2]))
 
         # then
         assert actual_ctags == [ctag1_with_tool]
 
         # given
         ctag2 = ContentCtagsRow(
             id=data.sha1_2,
             indexer_configuration_id=tool_id,
             name="defn",
             kind="function",
             line=120,
             lang="Scheme",
         )
         ctag2_with_tool = attr.evolve(ctag2, indexer_configuration_id=None, tool=tool)
 
         storage.content_ctags_add([ctag1, ctag2])
 
         actual_ctags = list(storage.content_ctags_get([data.sha1_2]))
 
         assert actual_ctags == [ctag1_with_tool, ctag2_with_tool]
 
     def test_add_empty(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         (storage, data) = swh_indexer_storage_with_data
         etype = self.endpoint_type
 
         summary = endpoint(storage, etype, "add")([])
         assert summary == {"content_ctags:add": 0}
 
         actual_ctags = list(endpoint(storage, etype, "get")([data.sha1_2]))
 
         assert actual_ctags == []
 
     def test_get_unknown(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         (storage, data) = swh_indexer_storage_with_data
         etype = self.endpoint_type
 
         actual_ctags = list(endpoint(storage, etype, "get")([data.sha1_2]))
 
         assert actual_ctags == []
 
 
 class TestIndexerStorageContentMetadata(StorageETypeTester):
     """Test Indexer Storage content_metadata related methods
     """
 
     tool_name = "swh-metadata-detector"
     endpoint_type = "content_metadata"
     example_data = [
         {
             "metadata": {
                 "other": {},
                 "codeRepository": {
                     "type": "git",
                     "url": "https://github.com/moranegg/metadata_test",
                 },
                 "description": "Simple package.json test for indexer",
                 "name": "test_metadata",
                 "version": "0.0.1",
             },
         },
         {"metadata": {"other": {}, "name": "test_metadata", "version": "0.0.1"},},
     ]
     row_class = ContentMetadataRow
 
 
 class TestIndexerStorageRevisionIntrinsicMetadata(StorageETypeTester):
     """Test Indexer Storage revision_intrinsic_metadata related methods
     """
 
     tool_name = "swh-metadata-detector"
     endpoint_type = "revision_intrinsic_metadata"
     example_data = [
         {
             "metadata": {
                 "other": {},
                 "codeRepository": {
                     "type": "git",
                     "url": "https://github.com/moranegg/metadata_test",
                 },
                 "description": "Simple package.json test for indexer",
                 "name": "test_metadata",
                 "version": "0.0.1",
             },
             "mappings": ["mapping1"],
         },
         {
             "metadata": {"other": {}, "name": "test_metadata", "version": "0.0.1"},
             "mappings": ["mapping2"],
         },
     ]
     row_class = RevisionIntrinsicMetadataRow
 
 
 class TestIndexerStorageContentFossologyLicense(StorageETypeTester):
     endpoint_type = "content_fossology_license"
     tool_name = "nomos"
     example_data = [
         {"license": "Apache-2.0"},
         {"license": "BSD-2-Clause"},
     ]
 
     row_class = ContentLicenseRow
 
     # the following tests are disabled because licenses behaves differently
     @pytest.mark.skip
     def test_add__update_in_place_duplicate(self):
         pass
 
     @pytest.mark.skip
     def test_add_deadlock(self):
         pass
 
     # content_fossology_license_missing does not exist
     @pytest.mark.skip
     def test_missing(self):
         pass
 
     def test_content_fossology_license_add__new_license_added(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         # given
         tool = data.tools["nomos"]
         tool_id = tool["id"]
 
         license1 = ContentLicenseRow(
             id=data.sha1_1, license="Apache-2.0", indexer_configuration_id=tool_id,
         )
 
         # given
         storage.content_fossology_license_add([license1])
         # conflict does nothing
         storage.content_fossology_license_add([license1])
 
         # when
         actual_licenses = list(storage.content_fossology_license_get([data.sha1_1]))
 
         # then
         expected_licenses = [
             ContentLicenseRow(id=data.sha1_1, license="Apache-2.0", tool=tool,)
         ]
         assert actual_licenses == expected_licenses
 
         # given
         license2 = ContentLicenseRow(
             id=data.sha1_1, license="BSD-2-Clause", indexer_configuration_id=tool_id,
         )
 
         storage.content_fossology_license_add([license2])
 
         actual_licenses = list(storage.content_fossology_license_get([data.sha1_1]))
 
         expected_licenses.append(
             ContentLicenseRow(id=data.sha1_1, license="BSD-2-Clause", tool=tool,)
         )
 
         # first license was not removed when the second one was added
         assert sorted(actual_licenses) == sorted(expected_licenses)
 
     def test_generate_content_fossology_license_get_partition_failure(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         """get_partition call with wrong limit input should fail"""
         storage, data = swh_indexer_storage_with_data
         indexer_configuration_id = 42
         with pytest.raises(
             IndexerStorageArgumentException, match="limit should not be None"
         ):
             storage.content_fossology_license_get_partition(
                 indexer_configuration_id, 0, 3, limit=None,  # type: ignore
             )
 
     def test_generate_content_fossology_license_get_partition_no_limit(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         """get_partition should return results"""
         storage, data = swh_indexer_storage_with_data
         # craft some consistent mimetypes
         fossology_licenses = data.fossology_licenses
         mimetypes = prepare_mimetypes_from_licenses(fossology_licenses)
         indexer_configuration_id = fossology_licenses[0].indexer_configuration_id
 
         storage.content_mimetype_add(mimetypes)
         # add fossology_licenses to storage
         storage.content_fossology_license_add(fossology_licenses)
 
         # All ids from the db
         expected_ids = set([c.id for c in fossology_licenses])
 
         assert len(fossology_licenses) == 10
         assert len(mimetypes) == 10
         nb_partitions = 4
 
         actual_ids = []
         for partition_id in range(nb_partitions):
 
             actual_result = storage.content_fossology_license_get_partition(
                 indexer_configuration_id, partition_id, nb_partitions
             )
             assert actual_result.next_page_token is None
             actual_ids.extend(actual_result.results)
 
         assert len(set(actual_ids)) == len(expected_ids)
         for actual_id in actual_ids:
             assert actual_id in expected_ids
 
     def test_generate_content_fossology_license_get_partition_full(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         """get_partition for a single partition should return available ids
 
         """
         storage, data = swh_indexer_storage_with_data
         # craft some consistent mimetypes
         fossology_licenses = data.fossology_licenses
         mimetypes = prepare_mimetypes_from_licenses(fossology_licenses)
         indexer_configuration_id = fossology_licenses[0].indexer_configuration_id
 
         storage.content_mimetype_add(mimetypes)
         # add fossology_licenses to storage
         storage.content_fossology_license_add(fossology_licenses)
 
         # All ids from the db
         expected_ids = set([c.id for c in fossology_licenses])
 
         actual_result = storage.content_fossology_license_get_partition(
             indexer_configuration_id, 0, 1
         )
         assert actual_result.next_page_token is None
         actual_ids = actual_result.results
         assert len(set(actual_ids)) == len(expected_ids)
         for actual_id in actual_ids:
             assert actual_id in expected_ids
 
     def test_generate_content_fossology_license_get_partition_empty(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         """get_partition when at least one of the partitions is empty"""
         storage, data = swh_indexer_storage_with_data
         # craft some consistent mimetypes
         fossology_licenses = data.fossology_licenses
         mimetypes = prepare_mimetypes_from_licenses(fossology_licenses)
         indexer_configuration_id = fossology_licenses[0].indexer_configuration_id
 
         storage.content_mimetype_add(mimetypes)
         # add fossology_licenses to storage
         storage.content_fossology_license_add(fossology_licenses)
 
         # All ids from the db
         expected_ids = set([c.id for c in fossology_licenses])
 
         # nb_partitions = smallest power of 2 such that at least one of
         # the partitions is empty
         nb_licenses = len(fossology_licenses)
         nb_partitions = 1 << math.floor(math.log2(nb_licenses) + 1)
 
         seen_ids = []
 
         for partition_id in range(nb_partitions):
             actual_result = storage.content_fossology_license_get_partition(
                 indexer_configuration_id,
                 partition_id,
                 nb_partitions,
                 limit=nb_licenses + 1,
             )
 
             for actual_id in actual_result.results:
                 seen_ids.append(actual_id)
 
             # Limit is higher than the max number of results
             assert actual_result.next_page_token is None
 
         assert set(seen_ids) == expected_ids
 
     def test_generate_content_fossology_license_get_partition_with_pagination(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         """get_partition should return ids provided with paginationv
 
         """
         storage, data = swh_indexer_storage_with_data
         # craft some consistent mimetypes
         fossology_licenses = data.fossology_licenses
         mimetypes = prepare_mimetypes_from_licenses(fossology_licenses)
         indexer_configuration_id = fossology_licenses[0].indexer_configuration_id
 
         storage.content_mimetype_add(mimetypes)
         # add fossology_licenses to storage
         storage.content_fossology_license_add(fossology_licenses)
 
         # All ids from the db
         expected_ids = [c.id for c in fossology_licenses]
 
         nb_partitions = 4
 
         actual_ids = []
         for partition_id in range(nb_partitions):
             next_page_token = None
             while True:
                 actual_result = storage.content_fossology_license_get_partition(
                     indexer_configuration_id,
                     partition_id,
                     nb_partitions,
                     limit=2,
                     page_token=next_page_token,
                 )
                 actual_ids.extend(actual_result.results)
                 next_page_token = actual_result.next_page_token
                 if next_page_token is None:
                     break
 
         assert len(set(actual_ids)) == len(set(expected_ids))
         for actual_id in actual_ids:
             assert actual_id in expected_ids
 
     def test_add_empty(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         (storage, data) = swh_indexer_storage_with_data
         etype = self.endpoint_type
 
         summary = endpoint(storage, etype, "add")([])
         assert summary == {"content_fossology_license:add": 0}
 
         actual_license = list(endpoint(storage, etype, "get")([data.sha1_2]))
 
         assert actual_license == []
 
     def test_get_unknown(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         (storage, data) = swh_indexer_storage_with_data
         etype = self.endpoint_type
 
         actual_license = list(endpoint(storage, etype, "get")([data.sha1_2]))
 
         assert actual_license == []
 
 
 class TestIndexerStorageOriginIntrinsicMetadata:
-    def test_origin_intrinsic_metadata_get(
+    def test_origin_intrinsic_metadata_add(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         # given
         tool_id = data.tools["swh-metadata-detector"]["id"]
 
         metadata = {
             "version": None,
             "name": None,
         }
         metadata_rev = RevisionIntrinsicMetadataRow(
             id=data.revision_id_2,
             metadata=metadata,
             mappings=["mapping1"],
             indexer_configuration_id=tool_id,
         )
         metadata_origin = OriginIntrinsicMetadataRow(
             id=data.origin_url_1,
             metadata=metadata,
             indexer_configuration_id=tool_id,
             mappings=["mapping1"],
             from_revision=data.revision_id_2,
         )
 
         # when
         storage.revision_intrinsic_metadata_add([metadata_rev])
         storage.origin_intrinsic_metadata_add([metadata_origin])
 
         # then
         actual_metadata = list(
             storage.origin_intrinsic_metadata_get([data.origin_url_1, "no://where"])
         )
 
         expected_metadata = [
             OriginIntrinsicMetadataRow(
                 id=data.origin_url_1,
                 metadata=metadata,
                 tool=data.tools["swh-metadata-detector"],
                 from_revision=data.revision_id_2,
                 mappings=["mapping1"],
             )
         ]
 
         assert actual_metadata == expected_metadata
 
+        journal_objects = storage.journal_writer.journal.objects  # type: ignore
+        actual_journal_metadata = [
+            obj
+            for (obj_type, obj) in journal_objects
+            if obj_type == "origin_intrinsic_metadata"
+        ]
+        assert list(sorted(actual_journal_metadata)) == list(sorted(expected_metadata))
+
     def test_origin_intrinsic_metadata_add_update_in_place_duplicate(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         # given
         tool_id = data.tools["swh-metadata-detector"]["id"]
 
         metadata_v1: Dict[str, Any] = {
             "version": None,
             "name": None,
         }
         metadata_rev_v1 = RevisionIntrinsicMetadataRow(
             id=data.revision_id_2,
             metadata=metadata_v1,
             mappings=[],
             indexer_configuration_id=tool_id,
         )
         metadata_origin_v1 = OriginIntrinsicMetadataRow(
             id=data.origin_url_1,
             metadata=metadata_v1.copy(),
             indexer_configuration_id=tool_id,
             mappings=[],
             from_revision=data.revision_id_2,
         )
 
         # given
         storage.revision_intrinsic_metadata_add([metadata_rev_v1])
         storage.origin_intrinsic_metadata_add([metadata_origin_v1])
 
         # when
         actual_metadata = list(
             storage.origin_intrinsic_metadata_get([data.origin_url_1])
         )
 
         # then
         expected_metadata_v1 = [
             OriginIntrinsicMetadataRow(
                 id=data.origin_url_1,
                 metadata=metadata_v1,
                 tool=data.tools["swh-metadata-detector"],
                 from_revision=data.revision_id_2,
                 mappings=[],
             )
         ]
         assert actual_metadata == expected_metadata_v1
 
         # given
         metadata_v2 = metadata_v1.copy()
         metadata_v2.update(
             {"name": "test_update_duplicated_metadata", "author": "MG",}
         )
         metadata_rev_v2 = attr.evolve(metadata_rev_v1, metadata=metadata_v2)
         metadata_origin_v2 = OriginIntrinsicMetadataRow(
             id=data.origin_url_1,
             metadata=metadata_v2.copy(),
             indexer_configuration_id=tool_id,
             mappings=["npm"],
             from_revision=data.revision_id_1,
         )
 
         storage.revision_intrinsic_metadata_add([metadata_rev_v2])
         storage.origin_intrinsic_metadata_add([metadata_origin_v2])
 
         actual_metadata = list(
             storage.origin_intrinsic_metadata_get([data.origin_url_1])
         )
 
         expected_metadata_v2 = [
             OriginIntrinsicMetadataRow(
                 id=data.origin_url_1,
                 metadata=metadata_v2,
                 tool=data.tools["swh-metadata-detector"],
                 from_revision=data.revision_id_1,
                 mappings=["npm"],
             )
         ]
 
         # metadata did change as the v2 was used to overwrite v1
         assert actual_metadata == expected_metadata_v2
 
     def test_origin_intrinsic_metadata_add__deadlock(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         # given
         tool_id = data.tools["swh-metadata-detector"]["id"]
 
         origins = ["file:///tmp/origin{:02d}".format(i) for i in range(100)]
 
         example_data1: Dict[str, Any] = {
             "metadata": {"version": None, "name": None,},
             "mappings": [],
         }
         example_data2: Dict[str, Any] = {
             "metadata": {"version": "v1.1.1", "name": "foo",},
             "mappings": [],
         }
 
         metadata_rev_v1 = RevisionIntrinsicMetadataRow(
             id=data.revision_id_2,
             metadata={"version": None, "name": None,},
             mappings=[],
             indexer_configuration_id=tool_id,
         )
 
         data_v1 = [
             OriginIntrinsicMetadataRow(
                 id=origin,
                 from_revision=data.revision_id_2,
                 indexer_configuration_id=tool_id,
                 **example_data1,
             )
             for origin in origins
         ]
         data_v2 = [
             OriginIntrinsicMetadataRow(
                 id=origin,
                 from_revision=data.revision_id_2,
                 indexer_configuration_id=tool_id,
                 **example_data2,
             )
             for origin in origins
         ]
 
         # Remove one item from each, so that both queries have to succeed for
         # all items to be in the DB.
         data_v2a = data_v2[1:]
         data_v2b = list(reversed(data_v2[0:-1]))
 
         # given
         storage.revision_intrinsic_metadata_add([metadata_rev_v1])
         storage.origin_intrinsic_metadata_add(data_v1)
 
         # when
         actual_data = list(storage.origin_intrinsic_metadata_get(origins))
 
         expected_data_v1 = [
             OriginIntrinsicMetadataRow(
                 id=origin,
                 from_revision=data.revision_id_2,
                 tool=data.tools["swh-metadata-detector"],
                 **example_data1,
             )
             for origin in origins
         ]
 
         # then
         assert actual_data == expected_data_v1
 
         # given
         def f1() -> None:
             storage.origin_intrinsic_metadata_add(data_v2a)
 
         def f2() -> None:
             storage.origin_intrinsic_metadata_add(data_v2b)
 
         t1 = threading.Thread(target=f1)
         t2 = threading.Thread(target=f2)
         t2.start()
         t1.start()
 
         t1.join()
         t2.join()
 
         actual_data = list(storage.origin_intrinsic_metadata_get(origins))
 
         expected_data_v2 = [
             OriginIntrinsicMetadataRow(
                 id=origin,
                 from_revision=data.revision_id_2,
                 tool=data.tools["swh-metadata-detector"],
                 **example_data2,
             )
             for origin in origins
         ]
 
         actual_data.sort(key=lambda item: item.id)
         assert len(actual_data) == len(expected_data_v1) == len(expected_data_v2)
         for (item, expected_item_v1, expected_item_v2) in zip(
             actual_data, expected_data_v1, expected_data_v2
         ):
             assert item in (expected_item_v1, expected_item_v2)
 
     def test_origin_intrinsic_metadata_add__duplicate_twice(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         # given
         tool_id = data.tools["swh-metadata-detector"]["id"]
 
         metadata = {
             "developmentStatus": None,
             "name": None,
         }
         metadata_rev = RevisionIntrinsicMetadataRow(
             id=data.revision_id_2,
             metadata=metadata,
             mappings=["mapping1"],
             indexer_configuration_id=tool_id,
         )
         metadata_origin = OriginIntrinsicMetadataRow(
             id=data.origin_url_1,
             metadata=metadata,
             indexer_configuration_id=tool_id,
             mappings=["mapping1"],
             from_revision=data.revision_id_2,
         )
 
         # when
         storage.revision_intrinsic_metadata_add([metadata_rev])
 
         with pytest.raises(DuplicateId):
             storage.origin_intrinsic_metadata_add([metadata_origin, metadata_origin])
 
     def test_origin_intrinsic_metadata_search_fulltext(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         # given
         tool_id = data.tools["swh-metadata-detector"]["id"]
 
         metadata1 = {
             "author": "John Doe",
         }
         metadata1_rev = RevisionIntrinsicMetadataRow(
             id=data.revision_id_1,
             metadata=metadata1,
             mappings=[],
             indexer_configuration_id=tool_id,
         )
         metadata1_origin = OriginIntrinsicMetadataRow(
             id=data.origin_url_1,
             metadata=metadata1,
             mappings=[],
             indexer_configuration_id=tool_id,
             from_revision=data.revision_id_1,
         )
         metadata2 = {
             "author": "Jane Doe",
         }
         metadata2_rev = RevisionIntrinsicMetadataRow(
             id=data.revision_id_2,
             metadata=metadata2,
             mappings=[],
             indexer_configuration_id=tool_id,
         )
         metadata2_origin = OriginIntrinsicMetadataRow(
             id=data.origin_url_2,
             metadata=metadata2,
             mappings=[],
             indexer_configuration_id=tool_id,
             from_revision=data.revision_id_2,
         )
 
         # when
         storage.revision_intrinsic_metadata_add([metadata1_rev])
         storage.origin_intrinsic_metadata_add([metadata1_origin])
         storage.revision_intrinsic_metadata_add([metadata2_rev])
         storage.origin_intrinsic_metadata_add([metadata2_origin])
 
         # then
         search = storage.origin_intrinsic_metadata_search_fulltext
         assert set([res.id for res in search(["Doe"])]) == set(
             [data.origin_url_1, data.origin_url_2]
         )
         assert [res.id for res in search(["John", "Doe"])] == [data.origin_url_1]
         assert [res.id for res in search(["John"])] == [data.origin_url_1]
         assert not list(search(["John", "Jane"]))
 
     def test_origin_intrinsic_metadata_search_fulltext_rank(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         # given
         tool_id = data.tools["swh-metadata-detector"]["id"]
 
         # The following authors have "Random Person" to add some more content
         # to the JSON data, to work around normalization quirks when there
         # are few words (rank/(1+ln(nb_words)) is very sensitive to nb_words
         # for small values of nb_words).
         metadata1 = {"author": ["Random Person", "John Doe", "Jane Doe",]}
         metadata1_rev = RevisionIntrinsicMetadataRow(
             id=data.revision_id_1,
             metadata=metadata1,
             mappings=[],
             indexer_configuration_id=tool_id,
         )
         metadata1_origin = OriginIntrinsicMetadataRow(
             id=data.origin_url_1,
             metadata=metadata1,
             mappings=[],
             indexer_configuration_id=tool_id,
             from_revision=data.revision_id_1,
         )
         metadata2 = {"author": ["Random Person", "Jane Doe",]}
         metadata2_rev = RevisionIntrinsicMetadataRow(
             id=data.revision_id_2,
             metadata=metadata2,
             mappings=[],
             indexer_configuration_id=tool_id,
         )
         metadata2_origin = OriginIntrinsicMetadataRow(
             id=data.origin_url_2,
             metadata=metadata2,
             mappings=[],
             indexer_configuration_id=tool_id,
             from_revision=data.revision_id_2,
         )
 
         # when
         storage.revision_intrinsic_metadata_add([metadata1_rev])
         storage.origin_intrinsic_metadata_add([metadata1_origin])
         storage.revision_intrinsic_metadata_add([metadata2_rev])
         storage.origin_intrinsic_metadata_add([metadata2_origin])
 
         # then
         search = storage.origin_intrinsic_metadata_search_fulltext
         assert [res.id for res in search(["Doe"])] == [
             data.origin_url_1,
             data.origin_url_2,
         ]
         assert [res.id for res in search(["Doe"], limit=1)] == [data.origin_url_1]
         assert [res.id for res in search(["John"])] == [data.origin_url_1]
         assert [res.id for res in search(["Jane"])] == [
             data.origin_url_2,
             data.origin_url_1,
         ]
         assert [res.id for res in search(["John", "Jane"])] == [data.origin_url_1]
 
     def _fill_origin_intrinsic_metadata(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         tool1_id = data.tools["swh-metadata-detector"]["id"]
         tool2_id = data.tools["swh-metadata-detector2"]["id"]
 
         metadata1 = {
             "@context": "foo",
             "author": "John Doe",
         }
         metadata1_rev = RevisionIntrinsicMetadataRow(
             id=data.revision_id_1,
             metadata=metadata1,
             mappings=["npm"],
             indexer_configuration_id=tool1_id,
         )
         metadata1_origin = OriginIntrinsicMetadataRow(
             id=data.origin_url_1,
             metadata=metadata1,
             mappings=["npm"],
             indexer_configuration_id=tool1_id,
             from_revision=data.revision_id_1,
         )
         metadata2 = {
             "@context": "foo",
             "author": "Jane Doe",
         }
         metadata2_rev = RevisionIntrinsicMetadataRow(
             id=data.revision_id_2,
             metadata=metadata2,
             mappings=["npm", "gemspec"],
             indexer_configuration_id=tool2_id,
         )
         metadata2_origin = OriginIntrinsicMetadataRow(
             id=data.origin_url_2,
             metadata=metadata2,
             mappings=["npm", "gemspec"],
             indexer_configuration_id=tool2_id,
             from_revision=data.revision_id_2,
         )
         metadata3 = {
             "@context": "foo",
         }
         metadata3_rev = RevisionIntrinsicMetadataRow(
             id=data.revision_id_3,
             metadata=metadata3,
             mappings=["npm", "gemspec"],
             indexer_configuration_id=tool2_id,
         )
         metadata3_origin = OriginIntrinsicMetadataRow(
             id=data.origin_url_3,
             metadata=metadata3,
             mappings=["pkg-info"],
             indexer_configuration_id=tool2_id,
             from_revision=data.revision_id_3,
         )
 
         storage.revision_intrinsic_metadata_add([metadata1_rev])
         storage.origin_intrinsic_metadata_add([metadata1_origin])
         storage.revision_intrinsic_metadata_add([metadata2_rev])
         storage.origin_intrinsic_metadata_add([metadata2_origin])
         storage.revision_intrinsic_metadata_add([metadata3_rev])
         storage.origin_intrinsic_metadata_add([metadata3_origin])
 
     def test_origin_intrinsic_metadata_search_by_producer(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         self._fill_origin_intrinsic_metadata(swh_indexer_storage_with_data)
         tool1 = data.tools["swh-metadata-detector"]
         tool2 = data.tools["swh-metadata-detector2"]
         endpoint = storage.origin_intrinsic_metadata_search_by_producer
 
         # test pagination
         # no 'page_token' param, return all origins
         result = endpoint(ids_only=True)
         assert result == PagedResult(
             results=[data.origin_url_1, data.origin_url_2, data.origin_url_3,],
             next_page_token=None,
         )
 
         # 'page_token' is < than origin_1, return everything
         result = endpoint(page_token=data.origin_url_1[:-1], ids_only=True)
         assert result == PagedResult(
             results=[data.origin_url_1, data.origin_url_2, data.origin_url_3,],
             next_page_token=None,
         )
 
         # 'page_token' is origin_3, return nothing
         result = endpoint(page_token=data.origin_url_3, ids_only=True)
         assert result == PagedResult(results=[], next_page_token=None)
 
         # test limit argument
         result = endpoint(page_token=data.origin_url_1[:-1], limit=2, ids_only=True)
         assert result == PagedResult(
             results=[data.origin_url_1, data.origin_url_2],
             next_page_token=data.origin_url_2,
         )
 
         result = endpoint(page_token=data.origin_url_1, limit=2, ids_only=True)
         assert result == PagedResult(
             results=[data.origin_url_2, data.origin_url_3], next_page_token=None,
         )
 
         result = endpoint(page_token=data.origin_url_2, limit=2, ids_only=True)
         assert result == PagedResult(results=[data.origin_url_3], next_page_token=None,)
 
         # test mappings filtering
         result = endpoint(mappings=["npm"], ids_only=True)
         assert result == PagedResult(
             results=[data.origin_url_1, data.origin_url_2], next_page_token=None,
         )
 
         result = endpoint(mappings=["npm", "gemspec"], ids_only=True)
         assert result == PagedResult(
             results=[data.origin_url_1, data.origin_url_2], next_page_token=None,
         )
 
         result = endpoint(mappings=["gemspec"], ids_only=True)
         assert result == PagedResult(results=[data.origin_url_2], next_page_token=None,)
 
         result = endpoint(mappings=["pkg-info"], ids_only=True)
         assert result == PagedResult(results=[data.origin_url_3], next_page_token=None,)
 
         result = endpoint(mappings=["foobar"], ids_only=True)
         assert result == PagedResult(results=[], next_page_token=None,)
 
         # test pagination + mappings
         result = endpoint(mappings=["npm"], limit=1, ids_only=True)
         assert result == PagedResult(
             results=[data.origin_url_1], next_page_token=data.origin_url_1,
         )
 
         # test tool filtering
         result = endpoint(tool_ids=[tool1["id"]], ids_only=True)
         assert result == PagedResult(results=[data.origin_url_1], next_page_token=None,)
 
         result = endpoint(tool_ids=[tool2["id"]], ids_only=True)
         assert sorted(result.results) == [data.origin_url_2, data.origin_url_3]
         assert result.next_page_token is None
 
         result = endpoint(tool_ids=[tool1["id"], tool2["id"]], ids_only=True)
         assert sorted(result.results) == [
             data.origin_url_1,
             data.origin_url_2,
             data.origin_url_3,
         ]
         assert result.next_page_token is None
 
         # test ids_only=False
         assert endpoint(mappings=["gemspec"]) == PagedResult(
             results=[
                 OriginIntrinsicMetadataRow(
                     id=data.origin_url_2,
                     metadata={"@context": "foo", "author": "Jane Doe",},
                     mappings=["npm", "gemspec"],
                     tool=tool2,
                     from_revision=data.revision_id_2,
                 )
             ],
             next_page_token=None,
         )
 
     def test_origin_intrinsic_metadata_stats(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         self._fill_origin_intrinsic_metadata(swh_indexer_storage_with_data)
 
         result = storage.origin_intrinsic_metadata_stats()
         assert result == {
             "per_mapping": {
                 "gemspec": 1,
                 "npm": 2,
                 "pkg-info": 1,
                 "codemeta": 0,
                 "maven": 0,
             },
             "total": 3,
             "non_empty": 2,
         }
 
 
 class TestIndexerStorageIndexerConfiguration:
     def test_indexer_configuration_add(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         tool = {
             "tool_name": "some-unknown-tool",
             "tool_version": "some-version",
             "tool_configuration": {"debian-package": "some-package"},
         }
 
         actual_tool = storage.indexer_configuration_get(tool)
         assert actual_tool is None  # does not exist
 
         # add it
         actual_tools = list(storage.indexer_configuration_add([tool]))
 
         assert len(actual_tools) == 1
         actual_tool = actual_tools[0]
         assert actual_tool is not None  # now it exists
         new_id = actual_tool.pop("id")
         assert actual_tool == tool
 
         actual_tools2 = list(storage.indexer_configuration_add([tool]))
         actual_tool2 = actual_tools2[0]
         assert actual_tool2 is not None  # now it exists
         new_id2 = actual_tool2.pop("id")
 
         assert new_id == new_id2
         assert actual_tool == actual_tool2
 
     def test_indexer_configuration_add_multiple(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         tool = {
             "tool_name": "some-unknown-tool",
             "tool_version": "some-version",
             "tool_configuration": {"debian-package": "some-package"},
         }
 
         actual_tools = list(storage.indexer_configuration_add([tool]))
         assert len(actual_tools) == 1
 
         new_tools = [
             tool,
             {
                 "tool_name": "yet-another-tool",
                 "tool_version": "version",
                 "tool_configuration": {},
             },
         ]
 
         actual_tools = list(storage.indexer_configuration_add(new_tools))
         assert len(actual_tools) == 2
 
         # order not guaranteed, so we iterate over results to check
         for tool in actual_tools:
             _id = tool.pop("id")
             assert _id is not None
             assert tool in new_tools
 
     def test_indexer_configuration_get_missing(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         tool = {
             "tool_name": "unknown-tool",
             "tool_version": "3.1.0rc2-31-ga2cbb8c",
             "tool_configuration": {"command_line": "nomossa <filepath>"},
         }
 
         actual_tool = storage.indexer_configuration_get(tool)
 
         assert actual_tool is None
 
     def test_indexer_configuration_get(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         tool = {
             "tool_name": "nomos",
             "tool_version": "3.1.0rc2-31-ga2cbb8c",
             "tool_configuration": {"command_line": "nomossa <filepath>"},
         }
 
         actual_tool = storage.indexer_configuration_get(tool)
         assert actual_tool
 
         expected_tool = tool.copy()
         del actual_tool["id"]
 
         assert expected_tool == actual_tool
 
     def test_indexer_configuration_metadata_get_missing_context(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         tool = {
             "tool_name": "swh-metadata-translator",
             "tool_version": "0.0.1",
             "tool_configuration": {"context": "unknown-context"},
         }
 
         actual_tool = storage.indexer_configuration_get(tool)
 
         assert actual_tool is None
 
     def test_indexer_configuration_metadata_get(
         self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
     ) -> None:
         storage, data = swh_indexer_storage_with_data
         tool = {
             "tool_name": "swh-metadata-translator",
             "tool_version": "0.0.1",
             "tool_configuration": {"type": "local", "context": "NpmMapping"},
         }
 
         storage.indexer_configuration_add([tool])
         actual_tool = storage.indexer_configuration_get(tool)
         assert actual_tool
 
         expected_tool = tool.copy()
         expected_tool["id"] = actual_tool["id"]
 
         assert expected_tool == actual_tool