diff --git a/sql/upgrades/157.sql b/sql/upgrades/157.sql
--- a/sql/upgrades/157.sql
+++ b/sql/upgrades/157.sql
@@ -44,3 +44,20 @@
 
 create unique index object_metadata_content_authority_date_fetcher
   on object_metadata(id, authority_id, discovery_date, fetcher_id);
+
+
+-- Add context columns
+alter table object_metadata
+  add column origin text;
+alter table object_metadata
+  add column visit bigint;
+alter table object_metadata
+  add column snapshot swhid;
+alter table object_metadata
+  add column release swhid;
+alter table object_metadata
+  add column revision swhid;
+alter table object_metadata
+  add column path bytea;
+alter table object_metadata
+  add column directory swhid;
diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -18,6 +18,7 @@
     Optional,
     Tuple,
     TypeVar,
+    Union,
 )
 
 from cassandra import CoordinationFailure
@@ -880,6 +881,13 @@
         "fetcher_version",
         "format",
         "metadata",
+        "origin",
+        "visit",
+        "snapshot",
+        "release",
+        "revision",
+        "path",
+        "directory",
     ]
 
     @_prepared_statement(
@@ -897,6 +905,7 @@
         fetcher_version,
         format,
         metadata,
+        context: Dict[str, Union[str, bytes, int]],
         *,
         statement,
     ):
diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py
--- a/swh/storage/cassandra/schema.py
+++ b/swh/storage/cassandra/schema.py
@@ -219,6 +219,15 @@
     format          ascii,
     metadata        blob,
 
+    -- context
+    origin          text,
+    visit           bigint,
+    snapshot        text,
+    release         text,
+    revision        text,
+    path            blob,
+    directory       text,
+
     PRIMARY KEY ((id), authority_type, authority_url, discovery_date,
                        fetcher_name, fetcher_version)
 );
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -8,7 +8,7 @@
 import json
 import random
 import re
-from typing import Any, Dict, List, Iterable, Optional
+from typing import Any, Dict, List, Iterable, Optional, Union
 
 import attr
 from deprecated import deprecated
@@ -32,6 +32,7 @@
 from swh.storage.utils import now
 
 from ..exc import StorageArgumentException, HashCollision
+from ..extrinsic_metadata import check_extrinsic_metadata_context, CONTEXT_KEYS
 from .common import TOKEN_BEGIN, TOKEN_END
 from .converters import (
     revision_to_db,
@@ -1030,8 +1031,18 @@
             raise StorageArgumentException(
                 "origin_url must be str, not %r" % (origin_url,)
             )
+
+        context: Dict[str, Union[str, bytes, int]] = {}  # origins have no context
+
         self._object_metadata_add(
-            "origin", origin_url, discovery_date, authority, fetcher, format, metadata,
+            "origin",
+            origin_url,
+            discovery_date,
+            authority,
+            fetcher,
+            format,
+            metadata,
+            context,
         )
 
     def origin_metadata_get(
@@ -1062,7 +1073,10 @@
         fetcher: Dict[str, Any],
         format: str,
         metadata: bytes,
+        context: Dict[str, Union[str, bytes, int]],
     ) -> None:
+        check_extrinsic_metadata_context(object_type, context)
+
         if not self._cql_runner.metadata_authority_get(**authority):
             raise StorageArgumentException(f"Unknown authority {authority}")
         if not self._cql_runner.metadata_fetcher_get(**fetcher):
@@ -1079,6 +1093,7 @@
                 fetcher["version"],
                 format,
                 metadata,
+                context,
             )
         except TypeError as e:
             raise StorageArgumentException(*e.args)
@@ -1139,6 +1154,14 @@
                 "metadata": entry.metadata,
             }
 
+            if CONTEXT_KEYS[object_type]:
+                context = {}
+                for key in CONTEXT_KEYS[object_type]:
+                    value = getattr(entry, key)
+                    if value is not None:
+                        context[key] = value
+                result["context"] = context
+
             results.append(result)
 
         if len(results) > limit:
diff --git a/swh/storage/db.py b/swh/storage/db.py
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -6,7 +6,7 @@
 import datetime
 import random
 import select
-from typing import Any, Dict, Iterable, List, Optional, Tuple
+from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
 
 from swh.core.db import BaseDb
 from swh.core.db.db_utils import stored_procedure, jsonize
@@ -1092,6 +1092,17 @@
     def release_get_random(self, cur=None):
         return self._get_random_row_from_table("release", ["id"], "id", cur)
 
+    _object_metadata_context_cols = [
+        "origin",
+        "visit",
+        "snapshot",
+        "release",
+        "revision",
+        "path",
+        "directory",
+    ]
+    """The list of context columns for all artifact types."""
+
     _object_metadata_insert_cols = [
         "type",
         "id",
@@ -1100,6 +1111,7 @@
         "discovery_date",
         "format",
         "metadata",
+        *_object_metadata_context_cols,
     ]
     """List of columns of the object_metadata table, used when writing
     metadata."""
@@ -1122,6 +1134,7 @@
         "metadata_fetcher.id",
         "metadata_fetcher.name",
         "metadata_fetcher.version",
+        *_object_metadata_context_cols,
         "format",
         "metadata",
     ]
@@ -1144,6 +1157,7 @@
         self,
         object_type: str,
         id: str,
+        context: Dict[str, Union[str, bytes, int]],
         discovery_date: datetime.datetime,
         authority_id: int,
         fetcher_id: int,
@@ -1161,6 +1175,9 @@
             format=format,
             metadata=metadata,
         )
+        for col in self._object_metadata_context_cols:
+            args[col] = context.get(col)
+
         params = [args[col] for col in self._object_metadata_insert_cols]
 
         cur.execute(query, params)
diff --git a/swh/storage/extrinsic_metadata.py b/swh/storage/extrinsic_metadata.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/extrinsic_metadata.py
@@ -0,0 +1,55 @@
+# Copyright (C) 2020  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, cast, Dict
+
+from swh.model.identifiers import PersistentId, parse_persistent_identifier
+
+from .exc import StorageArgumentException
+
+CONTEXT_KEYS: Dict[str, Dict[str, type]] = {}
+CONTEXT_KEYS["origin"] = {}
+CONTEXT_KEYS["snapshot"] = {"origin": str, "visit": int}
+CONTEXT_KEYS["release"] = {**CONTEXT_KEYS["snapshot"], "snapshot": PersistentId}
+CONTEXT_KEYS["revision"] = {**CONTEXT_KEYS["release"], "release": PersistentId}
+CONTEXT_KEYS["directory"] = {
+    **CONTEXT_KEYS["revision"],
+    "revision": PersistentId,
+    "path": bytes,
+}
+CONTEXT_KEYS["content"] = {**CONTEXT_KEYS["directory"], "directory": PersistentId}
+
+
+def check_extrinsic_metadata_context(object_type: str, context: Dict[str, Any]):
+    key_types = CONTEXT_KEYS[object_type]
+
+    extra_keys = set(context) - set(key_types)
+    if extra_keys:
+        raise StorageArgumentException(f"Unknown context keys: {', '.join(extra_keys)}")
+
+    for (key, value) in context.items():
+        expected_type = key_types[key]
+        expected_type_str = str(expected_type)  # for display
+
+        # If an SWHID is expected and a string is given, parse it
+        if expected_type is PersistentId and isinstance(value, str):
+            value = parse_persistent_identifier(value)
+            expected_type_str = "PersistentId or str"
+
+        # Check the type of the context value
+        if not isinstance(value, expected_type):
+            raise StorageArgumentException(
+                f"Context key {key} must have type {expected_type_str}, "
+                f"but is {value!r}"
+            )
+
+        # If it is an SWHID, check it is also a core SWHID.
+        if expected_type is PersistentId:
+            value = cast(PersistentId, value)
+            if value.metadata != {}:
+                raise StorageArgumentException(
+                    f"Context key {key} must be a core SWHID, "
+                    f"but it has qualifiers {', '.join(value.metadata)}."
+                )
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -25,6 +25,7 @@
     Optional,
     Tuple,
     TypeVar,
+    Union,
 )
 
 import attr
@@ -49,9 +50,9 @@
 from swh.storage.objstorage import ObjStorage
 from swh.storage.utils import now
 
-from .exc import StorageArgumentException, HashCollision
-
 from .converters import origin_url_to_sha1
+from .exc import StorageArgumentException, HashCollision
+from .extrinsic_metadata import check_extrinsic_metadata_context, CONTEXT_KEYS
 from .utils import get_partition_bounds_bytes
 from .writer import JournalWriter
 
@@ -1018,6 +1019,7 @@
     def content_metadata_add(
         self,
         id: str,
+        context: Dict[str, Union[str, bytes, int]],
         discovery_date: datetime.datetime,
         authority: Dict[str, Any],
         fetcher: Dict[str, Any],
@@ -1025,7 +1027,14 @@
         metadata: bytes,
     ) -> None:
         self._object_metadata_add(
-            "content", id, discovery_date, authority, fetcher, format, metadata,
+            "content",
+            id,
+            discovery_date,
+            authority,
+            fetcher,
+            format,
+            metadata,
+            context,
         )
 
     def origin_metadata_add(
@@ -1041,8 +1050,18 @@
             raise StorageArgumentException(
                 "origin_url must be str, not %r" % (origin_url,)
             )
+
+        context: Dict[str, Union[str, bytes, int]] = {}  # origins have no context
+
         self._object_metadata_add(
-            "origin", origin_url, discovery_date, authority, fetcher, format, metadata,
+            "origin",
+            origin_url,
+            discovery_date,
+            authority,
+            fetcher,
+            format,
+            metadata,
+            context,
         )
 
     def _object_metadata_add(
@@ -1054,7 +1073,9 @@
         fetcher: Dict[str, Any],
         format: str,
         metadata: bytes,
+        context: Dict[str, Union[str, bytes, int]],
     ) -> None:
+        check_extrinsic_metadata_context(object_type, context)
         if not isinstance(metadata, bytes):
             raise StorageArgumentException(
                 "metadata must be bytes, not %r" % (metadata,)
@@ -1077,6 +1098,9 @@
             "metadata": metadata,
         }
 
+        if CONTEXT_KEYS[object_type]:
+            object_metadata["context"] = context
+
         for existing_object_metadata in object_metadata_list:
             if (
                 existing_object_metadata["fetcher"] == fetcher_key
diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql
--- a/swh/storage/sql/30-swh-schema.sql
+++ b/swh/storage/sql/30-swh-schema.sql
@@ -436,7 +436,16 @@
 
   -- metadata itself
   format         text          not null,
-  metadata       bytea         not null
+  metadata       bytea         not null,
+
+  -- context
+  origin         text,
+  visit          bigint,
+  snapshot       swhid,
+  release        swhid,
+  revision       swhid,
+  path           bytea,
+  directory      swhid
 );
 
 comment on table object_metadata is 'keeps all metadata found concerning an object';
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -10,7 +10,7 @@
 from collections import defaultdict
 from contextlib import contextmanager
 from deprecated import deprecated
-from typing import Any, Dict, Iterable, List, Optional
+from typing import Any, Dict, Iterable, List, Optional, Union
 
 import attr
 import psycopg2
@@ -36,6 +36,10 @@
 from swh.storage.utils import now
 
 from . import converters
+from .extrinsic_metadata import (
+    check_extrinsic_metadata_context,
+    CONTEXT_KEYS,
+)
 from .common import db_transaction_generator, db_transaction
 from .db import Db
 from .exc import StorageArgumentException, StorageDBError, HashCollision
@@ -1162,9 +1166,12 @@
         db=None,
         cur=None,
     ) -> None:
+        context: Dict[str, Union[str, bytes, int]] = {}  # origins have no context
+
         self._object_metadata_add(
             "origin",
             origin_url,
+            context,
             discovery_date,
             authority,
             fetcher,
@@ -1178,6 +1185,7 @@
         self,
         object_type: str,
         id: str,
+        context: Dict[str, Union[str, bytes, int]],
         discovery_date: datetime.datetime,
         authority: Dict[str, Any],
         fetcher: Dict[str, Any],
@@ -1186,6 +1194,8 @@
         db,
         cur,
     ) -> None:
+        check_extrinsic_metadata_context(object_type, context)
+
         authority_id = self._get_authority_id(authority, db, cur)
         fetcher_id = self._get_fetcher_id(fetcher, db, cur)
         if not isinstance(metadata, bytes):
@@ -1196,6 +1206,7 @@
         db.object_metadata_add(
             object_type,
             id,
+            context,
             discovery_date,
             authority_id,
             fetcher_id,
@@ -1270,7 +1281,14 @@
         for row in rows:
             row = row.copy()
             row.pop("metadata_fetcher.id")
+            context = {}
+            for key in CONTEXT_KEYS[object_type]:
+                value = row[key]
+                if value is not None:
+                    context[key] = value
+
             result = {
+                "id": row["id"],
                 "authority": {
                     "type": row.pop("metadata_authority.type"),
                     "url": row.pop("metadata_authority.url"),
@@ -1279,9 +1297,14 @@
                     "name": row.pop("metadata_fetcher.name"),
                     "version": row.pop("metadata_fetcher.version"),
                 },
-                **row,
+                "discovery_date": row["discovery_date"],
+                "format": row["format"],
+                "metadata": row["metadata"],
             }
 
+            if CONTEXT_KEYS[object_type]:
+                result["context"] = context
+
             results.append(result)
 
         if len(results) > limit: