diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py
index 71a5fb1..399a29c 100644
--- a/swh/journal/serializers.py
+++ b/swh/journal/serializers.py
@@ -1,138 +1,171 @@
 # Copyright (C) 2016-2017 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from typing import Any, Dict, Union, overload
 
 import msgpack
 
 from swh.core.api.serializers import msgpack_dumps, msgpack_loads
 from swh.model.hashutil import DEFAULT_ALGORITHMS
 from swh.model.model import (
     Content,
     Directory,
+    MetadataAuthority,
+    MetadataFetcher,
     Origin,
     OriginVisit,
     OriginVisitStatus,
+    RawExtrinsicMetadata,
     Release,
     Revision,
     SkippedContent,
     Snapshot,
 )
 
 ModelObject = Union[
     Content,
     Directory,
+    MetadataAuthority,
+    MetadataFetcher,
     Origin,
     OriginVisit,
     OriginVisitStatus,
+    RawExtrinsicMetadata,
     Release,
     Revision,
     SkippedContent,
     Snapshot,
 ]
 
 KeyType = Union[Dict[str, str], Dict[str, bytes], bytes]
 
 
 # these @overload'ed versions of the object_key method aim at helping mypy figuring
 # the correct type-ing.
 @overload
 def object_key(
     object_type: str, object_: Union[Content, Directory, Revision, Release, Snapshot]
 ) -> bytes:
     ...
 
 
 @overload
 def object_key(
     object_type: str, object_: Union[Origin, SkippedContent]
 ) -> Dict[str, bytes]:
     ...
 
 
 @overload
 def object_key(
-    object_type: str, object_: Union[OriginVisit, OriginVisitStatus]
+    object_type: str,
+    object_: Union[
+        MetadataAuthority,
+        MetadataFetcher,
+        OriginVisit,
+        OriginVisitStatus,
+        RawExtrinsicMetadata,
+    ],
 ) -> Dict[str, str]:
     ...
 
 
 def object_key(object_type: str, object_) -> KeyType:
     if object_type in ("revision", "release", "directory", "snapshot"):
         return object_.id
     elif object_type == "content":
         return object_.sha1  # TODO: use a dict of hashes
     elif object_type == "skipped_content":
         return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS}
     elif object_type == "origin":
         return {"url": object_.url}
     elif object_type == "origin_visit":
         return {
             "origin": object_.origin,
             "date": str(object_.date),
         }
     elif object_type == "origin_visit_status":
         return {
             "origin": object_.origin,
             "visit": str(object_.visit),
             "date": str(object_.date),
         }
+    elif object_type == "metadata_authority":
+        return {
+            "type": object_.type.value,
+            "url": object_.url,
+        }
+    elif object_type == "metadata_fetcher":
+        return {
+            "name": object_.name,
+            "version": object_.version,
+        }
+    elif object_type == "raw_extrinsic_metadata":
+        return {
+            "type": object_.type.value,
+            "id": str(object_.id),
+            "authority_type": object_.authority.type.value,
+            "authority_url": object_.authority.url,
+            "discovery_date": str(object_.discovery_date),
+            "fetcher_name": object_.fetcher.name,
+            "fetcher_version": object_.fetcher.version,
+        }
     else:
         raise ValueError("Unknown object type: %s." % object_type)
 
 
 def stringify_key_item(k: str, v: Union[str, bytes]) -> str:
     """Turn the item of a dict key into a string"""
     if isinstance(v, str):
         return v
     if k == "url":
         return v.decode("utf-8")
     return v.hex()
 
 
 def pprint_key(key: KeyType) -> str:
     """Pretty-print a kafka key"""
 
     if isinstance(key, dict):
         return "{%s}" % ", ".join(
             f"{k}: {stringify_key_item(k, v)}" for k, v in key.items()
         )
     elif isinstance(key, bytes):
         return key.hex()
     else:
         return key
 
 
 def key_to_kafka(key: KeyType) -> bytes:
     """Serialize a key, possibly a dict, in a predictable way"""
     p = msgpack.Packer(use_bin_type=True)
     if isinstance(key, dict):
         return p.pack_map_pairs(sorted(key.items()))
     else:
         return p.pack(key)
 
 
 def kafka_to_key(kafka_key: bytes) -> KeyType:
     """Deserialize a key"""
     return msgpack.loads(kafka_key, raw=False)
 
 
 def value_to_kafka(value: Any) -> bytes:
     """Serialize some data for storage in kafka"""
     return msgpack_dumps(value)
 
 
 def kafka_to_value(kafka_value: bytes) -> Any:
     """Deserialize some data stored in kafka"""
     value = msgpack_loads(kafka_value)
     if isinstance(value, list):
         return tuple(value)
     if isinstance(value, dict):
         return ensure_tuples(value)
     return value
 
 
 def ensure_tuples(value: Dict) -> Dict:
     return {k: tuple(v) if isinstance(v, list) else v for k, v in value.items()}
diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py
index a4622bd..1b103bf 100644
--- a/swh/journal/tests/journal_data.py
+++ b/swh/journal/tests/journal_data.py
@@ -1,301 +1,342 @@
 # Copyright (C) 2019-2020 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import datetime
 
 from typing import Dict, Sequence
 
 import attr
 
-from swh.model.hashutil import MultiHash, hash_to_bytes
+from swh.model.hashutil import MultiHash, hash_to_bytes, hash_to_hex
 from swh.journal.serializers import ModelObject
 
+from swh.model.identifiers import SWHID
 from swh.model.model import (
     Content,
     Directory,
     DirectoryEntry,
+    MetadataAuthority,
+    MetadataAuthorityType,
+    MetadataFetcher,
+    MetadataTargetType,
     ObjectType,
     Origin,
     OriginVisit,
     OriginVisitStatus,
     Person,
+    RawExtrinsicMetadata,
     Release,
     Revision,
     RevisionType,
     SkippedContent,
     Snapshot,
     SnapshotBranch,
     TargetType,
     Timestamp,
     TimestampWithTimezone,
 )
 
 UTC = datetime.timezone.utc
 
 CONTENTS = [
     Content(
         length=4,
         data=f"foo{i}".encode(),
         status="visible",
         **MultiHash.from_data(f"foo{i}".encode()).digest(),
     )
     for i in range(10)
 ] + [
     Content(
         length=14,
         data=f"forbidden foo{i}".encode(),
         status="hidden",
         **MultiHash.from_data(f"forbidden foo{i}".encode()).digest(),
     )
     for i in range(10)
 ]
 
 SKIPPED_CONTENTS = [
     SkippedContent(
         length=4,
         status="absent",
         reason=f"because chr({i}) != '*'",
         **MultiHash.from_data(f"bar{i}".encode()).digest(),
     )
     for i in range(2)
 ]
 
 duplicate_content1 = Content(
     length=4,
     sha1=hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"),
     sha1_git=b"another-foo",
     blake2s256=b"another-bar",
     sha256=b"another-baz",
     status="visible",
 )
 
 # Craft a sha1 collision
 sha1_array = bytearray(duplicate_content1.sha1_git)
 sha1_array[0] += 1
 duplicate_content2 = attr.evolve(duplicate_content1, sha1_git=bytes(sha1_array))
 
 
 DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2]
 
 
 COMMITTERS = [
     Person(fullname=b"foo", name=b"foo", email=b""),
     Person(fullname=b"bar", name=b"bar", email=b""),
 ]
 
 DATES = [
     TimestampWithTimezone(
         timestamp=Timestamp(seconds=1234567891, microseconds=0,),
         offset=120,
         negative_utc=False,
     ),
     TimestampWithTimezone(
         timestamp=Timestamp(seconds=1234567892, microseconds=0,),
         offset=120,
         negative_utc=False,
     ),
 ]
 
 REVISIONS = [
     Revision(
         id=hash_to_bytes("4ca486e65eb68e4986aeef8227d2db1d56ce51b3"),
         message=b"hello",
         date=DATES[0],
         committer=COMMITTERS[0],
         author=COMMITTERS[0],
         committer_date=DATES[0],
         type=RevisionType.GIT,
         directory=b"\x01" * 20,
         synthetic=False,
         metadata=None,
         parents=(),
     ),
     Revision(
         id=hash_to_bytes("677063f5c405d6fc1781fc56379c9a9adf43d3a0"),
         message=b"hello again",
         date=DATES[1],
         committer=COMMITTERS[1],
         author=COMMITTERS[1],
         committer_date=DATES[1],
         type=RevisionType.MERCURIAL,
         directory=b"\x02" * 20,
         synthetic=False,
         metadata=None,
         parents=(),
     ),
 ]
 
 RELEASES = [
     Release(
         id=hash_to_bytes("8059dc4e17fcd0e51ca3bcd6b80f4577d281fd08"),
         name=b"v0.0.1",
         date=TimestampWithTimezone(
             timestamp=Timestamp(seconds=1234567890, microseconds=0,),
             offset=120,
             negative_utc=False,
         ),
         author=COMMITTERS[0],
         target_type=ObjectType.REVISION,
         target=b"\x04" * 20,
         message=b"foo",
         synthetic=False,
     ),
 ]
 
 ORIGINS = [
     Origin(url="https://somewhere.org/den/fox",),
     Origin(url="https://overtherainbow.org/fox/den",),
 ]
 
 ORIGIN_VISITS = [
     OriginVisit(
         origin=ORIGINS[0].url,
         date=datetime.datetime(2013, 5, 7, 4, 20, 39, 369271, tzinfo=UTC),
         visit=1,
         type="git",
     ),
     OriginVisit(
         origin=ORIGINS[1].url,
         date=datetime.datetime(2014, 11, 27, 17, 20, 39, tzinfo=UTC),
         visit=1,
         type="hg",
     ),
     OriginVisit(
         origin=ORIGINS[0].url,
         date=datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC),
         visit=2,
         type="git",
     ),
     OriginVisit(
         origin=ORIGINS[0].url,
         date=datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC),
         visit=3,
         type="git",
     ),
     OriginVisit(
         origin=ORIGINS[1].url,
         date=datetime.datetime(2015, 11, 27, 17, 20, 39, tzinfo=UTC),
         visit=2,
         type="hg",
     ),
 ]
 
 # The origin-visit-status dates needs to be shifted slightly in the future from their
 # visit dates counterpart. Otherwise, we are hitting storage-wise the "on conflict"
 # ignore policy (because origin-visit-add creates an origin-visit-status with the same
 # parameters from the origin-visit {origin, visit, date}...
 ORIGIN_VISIT_STATUSES = [
     OriginVisitStatus(
         origin=ORIGINS[0].url,
         date=datetime.datetime(2013, 5, 7, 4, 20, 39, 432222, tzinfo=UTC),
         visit=1,
         status="ongoing",
         snapshot=None,
         metadata=None,
     ),
     OriginVisitStatus(
         origin=ORIGINS[1].url,
         date=datetime.datetime(2014, 11, 27, 17, 21, 12, tzinfo=UTC),
         visit=1,
         status="ongoing",
         snapshot=None,
         metadata=None,
     ),
     OriginVisitStatus(
         origin=ORIGINS[0].url,
         date=datetime.datetime(2018, 11, 27, 17, 20, 59, tzinfo=UTC),
         visit=2,
         status="ongoing",
         snapshot=None,
         metadata=None,
     ),
     OriginVisitStatus(
         origin=ORIGINS[0].url,
         date=datetime.datetime(2018, 11, 27, 17, 20, 49, tzinfo=UTC),
         visit=3,
         status="full",
         snapshot=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"),
         metadata=None,
     ),
     OriginVisitStatus(
         origin=ORIGINS[1].url,
         date=datetime.datetime(2015, 11, 27, 17, 22, 18, tzinfo=UTC),
         visit=2,
         status="partial",
         snapshot=hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"),
         metadata=None,
     ),
 ]
 
 
 DIRECTORIES = [
     Directory(id=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), entries=()),
     Directory(
         id=hash_to_bytes("21416d920e0ebf0df4a7888bed432873ed5cb3a7"),
         entries=(
             DirectoryEntry(
                 name=b"file1.ext",
                 perms=0o644,
                 type="file",
                 target=CONTENTS[0].sha1_git,
             ),
             DirectoryEntry(
                 name=b"dir1",
                 perms=0o755,
                 type="dir",
                 target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"),
             ),
             DirectoryEntry(
                 name=b"subprepo1", perms=0o160000, type="rev", target=REVISIONS[1].id,
             ),
         ),
     ),
 ]
 
 
 SNAPSHOTS = [
     Snapshot(
         id=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"),
         branches={
             b"master": SnapshotBranch(
                 target_type=TargetType.REVISION, target=REVISIONS[0].id
             )
         },
     ),
     Snapshot(
         id=hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"),
         branches={
             b"target/revision": SnapshotBranch(
                 target_type=TargetType.REVISION, target=REVISIONS[0].id,
             ),
             b"target/alias": SnapshotBranch(
                 target_type=TargetType.ALIAS, target=b"target/revision"
             ),
             b"target/directory": SnapshotBranch(
                 target_type=TargetType.DIRECTORY, target=DIRECTORIES[0].id,
             ),
             b"target/release": SnapshotBranch(
                 target_type=TargetType.RELEASE, target=RELEASES[0].id
             ),
             b"target/snapshot": SnapshotBranch(
                 target_type=TargetType.SNAPSHOT,
                 target=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"),
             ),
         },
     ),
 ]
 
 
+METADATA_AUTHORITIES = [
+    MetadataAuthority(
+        type=MetadataAuthorityType.FORGE, url="http://example.org/", metadata={},
+    ),
+]
+
+METADATA_FETCHERS = [
+    MetadataFetcher(name="test-fetcher", version="1.0.0", metadata={},)
+]
+
+RAW_EXTRINSIC_METADATA = [
+    RawExtrinsicMetadata(
+        type=MetadataTargetType.ORIGIN,
+        id="http://example.org/foo.git",
+        discovery_date=datetime.datetime(2020, 7, 30, 17, 8, 20, tzinfo=UTC),
+        authority=attr.evolve(METADATA_AUTHORITIES[0], metadata=None),
+        fetcher=attr.evolve(METADATA_FETCHERS[0], metadata=None),
+        format="json",
+        metadata=b'{"foo": "bar"}',
+    ),
+    RawExtrinsicMetadata(
+        type=MetadataTargetType.CONTENT,
+        id=SWHID(object_type="content", object_id=hash_to_hex(CONTENTS[0].sha1_git)),
+        discovery_date=datetime.datetime(2020, 7, 30, 17, 8, 20, tzinfo=UTC),
+        authority=attr.evolve(METADATA_AUTHORITIES[0], metadata=None),
+        fetcher=attr.evolve(METADATA_FETCHERS[0], metadata=None),
+        format="json",
+        metadata=b'{"foo": "bar"}',
+    ),
+]
+
+
 TEST_OBJECTS: Dict[str, Sequence[ModelObject]] = {
     "content": CONTENTS,
     "directory": DIRECTORIES,
+    "metadata_authority": METADATA_AUTHORITIES,
+    "metadata_fetcher": METADATA_FETCHERS,
     "origin": ORIGINS,
     "origin_visit": ORIGIN_VISITS,
     "origin_visit_status": ORIGIN_VISIT_STATUSES,
+    "raw_extrinsic_metadata": RAW_EXTRINSIC_METADATA,
     "release": RELEASES,
     "revision": REVISIONS,
     "snapshot": SNAPSHOTS,
     "skipped_content": SKIPPED_CONTENTS,
 }
diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py
index 68d26f0..14e49d7 100644
--- a/swh/journal/tests/test_pytest_plugin.py
+++ b/swh/journal/tests/test_pytest_plugin.py
@@ -1,68 +1,71 @@
 # Copyright (C) 2020 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from typing import Iterator
 from confluent_kafka.admin import AdminClient
 
 
 def test_kafka_server(kafka_server_base: str):
     ip, port_str = kafka_server_base.split(":")
     assert ip == "127.0.0.1"
     assert int(port_str)
 
     admin = AdminClient({"bootstrap.servers": kafka_server_base})
 
     topics = admin.list_topics()
 
     assert len(topics.brokers) == 1
 
 
 def test_kafka_server_with_topics(
     kafka_server: str,
     kafka_prefix: str,
     object_types: Iterator[str],
     privileged_object_types: Iterator[str],
 ):
     admin = AdminClient({"bootstrap.servers": kafka_server})
 
     # check unprivileged topics are present
     topics = {
         topic
         for topic in admin.list_topics().topics
         if topic.startswith(f"{kafka_prefix}.")
     }
     assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types}
 
     # check privileged topics are present
     topics = {
         topic
         for topic in admin.list_topics().topics
         if topic.startswith(f"{kafka_prefix}_privileged.")
     }
     assert topics == {
         f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types
     }
 
 
 def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str):
     assert test_config == {
         "consumer_id": "swh.journal.consumer",
         "stop_after_objects": 1,
         "storage": {"cls": "memory", "args": {}},
         "object_types": {
             "content",
             "directory",
+            "metadata_authority",
+            "metadata_fetcher",
             "origin",
             "origin_visit",
             "origin_visit_status",
+            "raw_extrinsic_metadata",
             "release",
             "revision",
             "snapshot",
             "skipped_content",
         },
         "privileged_object_types": {"release", "revision",},
         "brokers": [kafka_server_base],
         "prefix": kafka_prefix,
     }