diff --git a/swh/loader/core/discovery.py b/swh/loader/core/discovery.py
new file mode 100644
--- /dev/null
+++ b/swh/loader/core/discovery.py
@@ -0,0 +1,207 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Primitives for finding the unknown parts of disk contents efficiently."""
+
+from collections import namedtuple
+import itertools
+import logging
+import random
+from typing import Any, Iterable, List, Mapping, NamedTuple, Set, Union
+
+from swh.model.from_disk import model
+from swh.model.model import Sha1Git
+from swh.storage.interface import StorageInterface
+
+logger = logging.getLogger(__name__)
+
+# Maximum amount when sampling from the undecided set of directory entries
+SAMPLE_SIZE = 1000
+
+# Sets of sha1 of contents, skipped contents and directories respectively
+Sample: NamedTuple = namedtuple(
+    "Sample", ["contents", "skipped_contents", "directories"]
+)
+
+
+class BaseDiscoveryGraph:
+    """Creates the base structures and methods needed for discovery algorithms.
+    Subclasses should override ``get_sample`` to affect how the discovery is made."""
+
+    def __init__(self, contents, skipped_contents, directories):
+        self._all_contents: Mapping[
+            Sha1Git, Union[model.Content, model.SkippedContent]
+        ] = {}
+        self._undecided_directories: Set[Sha1Git] = set()
+        self._children: Mapping[Sha1Git, model.DirectoryEntry] = {}
+        self._parents: Mapping[model.DirectoryEntry, Sha1Git] = {}
+        self.undecided: Set[Sha1Git] = set()
+
+        for content in itertools.chain(contents, skipped_contents):
+            self.undecided.add(content.sha1_git)
+            self._all_contents[content.sha1_git] = content
+
+        for directory in directories:
+            self.undecided.add(directory.id)
+            self._undecided_directories.add(directory.id)
+            self._children[directory.id] = {c.target for c in directory.entries}
+            for child in directory.entries:
+                self._parents.setdefault(child.target, set()).add(directory.id)
+
+        self.undecided |= self._undecided_directories
+        self.known: Set[Sha1Git] = set()
+        self.unknown: Set[Sha1Git] = set()
+
+    def mark_known(self, entries: Iterable[Sha1Git]):
+        """Mark ``entries`` and those they imply as known in the SWH archive"""
+        self._mark_entries(entries, self._children, self.known)
+
+    def mark_unknown(self, entries: Iterable[Sha1Git]):
+        """Mark ``entries`` and those they imply as unknown in the SWH archive"""
+        self._mark_entries(entries, self._parents, self.unknown)
+
+    def _mark_entries(
+        self,
+        entries: Iterable[Sha1Git],
+        transitive_mapping: Mapping[Any, Any],
+        target_set: Set[Any],
+    ):
+        """Use Merkle graph properties to mark a directory entry as known or unknown.
+
+        If an entry is known, then all of its descendants are known. If it's
+        unknown, then all of its ancestors are unknown.
+
+        - ``entries``: directory entries to mark along with their ancestors/descendants
+          where applicable.
+        - ``transitive_mapping``: mapping from an entry to the next entries to mark
+          in the hierarchy, if any.
+        - ``target_set``: set where marked entries will be added.
+
+        """
+        to_process = set(entries)
+        while to_process:
+            current = to_process.pop()
+            target_set.add(current)
+            self.undecided.discard(current)
+            self._undecided_directories.discard(current)
+            next_entries = transitive_mapping.get(current, set()) & self.undecided
+            to_process.update(next_entries)
+
+    def get_sample(
+        self,
+    ) -> Sample:
+        """Return a three-tuple of samples from the undecided sets of contents,
+        skipped contents and directories respectively.
+        These samples will be queried against the storage which will tell us
+        which are known."""
+        raise NotImplementedError()
+
+    def do_query(self, swh_storage: StorageInterface, sample: Sample) -> None:
+        """Given a three-tuple of samples, ask the storage which are known or
+        unknown and mark them as such."""
+        contents_sample, skipped_contents_sample, directories_sample = sample
+
+        # TODO unify those APIs, and make it so only have to manipulate hashes
+        if contents_sample:
+            known = set(contents_sample)
+            unknown = set(swh_storage.content_missing_per_sha1_git(contents_sample))
+            known -= unknown
+
+            self.mark_known(known)
+            self.mark_unknown(unknown)
+
+        if skipped_contents_sample:
+            known = set(skipped_contents_sample)
+            as_dicts = [
+                self._all_contents[s].to_dict() for s in skipped_contents_sample
+            ]
+            unknown = {
+                d["sha1_git"] for d in swh_storage.skipped_content_missing(as_dicts)
+            }
+            known -= unknown
+
+            self.mark_known(known)
+            self.mark_unknown(unknown)
+
+        if directories_sample:
+            known = set(directories_sample)
+            unknown = set(swh_storage.directory_missing(directories_sample))
+            known -= unknown
+
+            self.mark_known(known)
+            self.mark_unknown(unknown)
+
+
+class RandomDirSamplingDiscoveryGraph(BaseDiscoveryGraph):
+    """Use a random sampling using only directories.
+
+    This allows us to find a statistically good spread of entries in the graph
+    with a smaller population than using all types of entries. When there are
+    no more directories, only contents or skipped contents are undecided if any
+    are left: we send them directly to the storage since they should be few and
+    their structure flat."""
+
+    def get_sample(self) -> Sample:
+        if self._undecided_directories:
+            if len(self._undecided_directories) <= SAMPLE_SIZE:
+                return Sample(
+                    contents=set(),
+                    skipped_contents=set(),
+                    directories=set(self._undecided_directories),
+                )
+            sample = random.sample(self._undecided_directories, SAMPLE_SIZE)
+            directories = {o for o in sample}
+            return Sample(
+                contents=set(), skipped_contents=set(), directories=directories
+            )
+
+        contents = set()
+        skipped_contents = set()
+
+        for sha1 in self.undecided:
+            obj = self._all_contents[sha1]
+            obj_type = obj.object_type
+            if obj_type == model.Content.object_type:
+                contents.add(sha1)
+            elif obj_type == model.SkippedContent.object_type:
+                skipped_contents.add(sha1)
+            else:
+                raise TypeError(f"Unexpected object type {obj_type}")
+
+        return Sample(
+            contents=contents, skipped_contents=skipped_contents, directories=set()
+        )
+
+
+def filter_known_objects(
+    swh_storage: StorageInterface,
+    contents: List[model.Content],
+    skipped_contents: List[model.SkippedContent],
+    directories: List[model.Directory],
+):
+    """Filter ``contents``, ``skipped_contents`` and ``directories`` to only return
+    those that are unknown to the SWH archive using a discovery algorithm."""
+    contents_count = len(contents)
+    skipped_contents_count = len(skipped_contents)
+    directories_count = len(directories)
+
+    graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories)
+
+    while graph.undecided:
+        sample = graph.get_sample()
+        graph.do_query(swh_storage, sample)
+
+    contents = [c for c in contents if c.sha1_git in graph.unknown]
+    skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown]
+    directories = [c for c in directories if c.id in graph.unknown]
+
+    logger.debug(
+        "Filtered out %d contents, %d skipped contents and %d directories",
+        contents_count - len(contents),
+        skipped_contents_count - len(skipped_contents),
+        directories_count - len(directories),
+    )
+
+    return (contents, skipped_contents, directories)
diff --git a/swh/loader/package/archive/tests/test_archive.py b/swh/loader/package/archive/tests/test_archive.py
--- a/swh/loader/package/archive/tests/test_archive.py
+++ b/swh/loader/package/archive/tests/test_archive.py
@@ -93,6 +93,13 @@
 }
 
 
+@pytest.fixture(autouse=True, scope="function")
+def lower_sample_rate(mocker):
+    """Lower the number of entries per discovery sample so the minimum threshold
+    for discovery is hit in tests without creating huge test data"""
+    mocker.patch("swh.loader.package.loader.discovery.SAMPLE_SIZE", 1)
+
+
 def test_archive_visit_with_no_artifact_found(swh_storage, requests_mock_datadir):
     url = URL
     unknown_artifact_url = "https://ftp.g.o/unknown/8sync-0.1.0.tar.gz"
@@ -129,6 +136,122 @@
     assert_last_visit_matches(swh_storage, url, status="partial", type="tar")
 
 
+def test_archive_visit_with_skipped_content(swh_storage, requests_mock_datadir):
+    """With no prior visit, load a gnu project and set the max content size
+    to something low to check that the loader skips "big" content."""
+    loader = ArchiveLoader(
+        swh_storage, URL, artifacts=GNU_ARTIFACTS[:1], max_content_size=10 * 1024
+    )
+
+    actual_load_status = loader.load()
+    assert actual_load_status["status"] == "eventful"
+
+    expected_snapshot_first_visit_id = hash_to_bytes(
+        "9efecc835e8f99254934f256b5301b94f348fd17"
+    )
+
+    assert actual_load_status["snapshot_id"] == hash_to_hex(
+        expected_snapshot_first_visit_id
+    )
+
+    assert_last_visit_matches(swh_storage, URL, status="full", type="tar")
+
+    _expected_new_non_skipped_contents_first_visit = [
+        "ae9be03bd2a06ed8f4f118d3fe76330bb1d77f62",
+        "809788434b433eb2e3cfabd5d591c9a659d5e3d8",
+        "1572607d456d7f633bc6065a2b3048496d679a31",
+        "27de3b3bc6545d2a797aeeb4657c0e215a0c2e55",
+        "fbd27c3f41f2668624ffc80b7ba5db9b92ff27ac",
+        "4f9709e64a9134fe8aefb36fd827b84d8b617ab5",
+        "84fb589b554fcb7f32b806951dcf19518d67b08f",
+        "3046e5d1f70297e2a507b98224b6222c9688d610",
+        "e08441aeab02704cfbd435d6445f7c072f8f524e",
+        "49d4c0ce1a16601f1e265d446b6c5ea6b512f27c",
+        "7d149b28eaa228b3871c91f0d5a95a2fa7cb0c68",
+        "f0c97052e567948adf03e641301e9983c478ccff",
+        "2e6db43f5cd764e677f416ff0d0c78c7a82ef19b",
+        "e9258d81faf5881a2f96a77ba609396f82cb97ad",
+        "7350628ccf194c2c3afba4ac588c33e3f3ac778d",
+        "0057bec9b5422aff9256af240b177ac0e3ac2608",
+        "6b5cc594ac466351450f7f64a0b79fdaf4435ad3",
+    ]
+
+    _expected_new_skipped_contents_first_visit = [
+        "1170cf105b04b7e2822a0e09d2acf71da7b9a130",
+        "2b8d0d0b43a1078fc708930c8ddc2956a86c566e",
+        "edeb33282b2bffa0e608e9d2fd960fd08093c0ea",
+        "d64e64d4c73679323f8d4cde2643331ba6c20af9",
+        "7a756602914be889c0a2d3952c710144b3e64cb0",
+        "8624bcdae55baeef00cd11d5dfcfa60f68710a02",
+        "f67935bc3a83a67259cda4b2d43373bd56703844",
+        "7d7c6c8c5ebaeff879f61f37083a3854184f6c41",
+        "b99fec102eb24bffd53ab61fc30d59e810f116a2",
+        "7fb724242e2b62b85ca64190c31dcae5303e19b3",
+        "0bb892d9391aa706dc2c3b1906567df43cbe06a2",
+    ]
+
+    # Check that the union of both sets make up the original set (without skipping)
+    union = set(_expected_new_non_skipped_contents_first_visit) | set(
+        _expected_new_skipped_contents_first_visit
+    )
+    assert union == set(_expected_new_contents_first_visit)
+
+    stats = get_stats(swh_storage)
+    assert {
+        "content": len(_expected_new_non_skipped_contents_first_visit),
+        "directory": len(_expected_new_directories_first_visit),
+        "origin": 1,
+        "origin_visit": 1,
+        "release": len(_expected_new_releases_first_visit),
+        "revision": 0,
+        "skipped_content": len(_expected_new_skipped_contents_first_visit),
+        "snapshot": 1,
+    } == stats
+
+    release_id = hash_to_bytes(list(_expected_new_releases_first_visit)[0])
+    expected_snapshot = Snapshot(
+        id=expected_snapshot_first_visit_id,
+        branches={
+            b"HEAD": SnapshotBranch(
+                target_type=TargetType.ALIAS,
+                target=b"releases/0.1.0",
+            ),
+            b"releases/0.1.0": SnapshotBranch(
+                target_type=TargetType.RELEASE,
+                target=release_id,
+            ),
+        },
+    )
+    check_snapshot(expected_snapshot, swh_storage)
+
+    assert swh_storage.release_get([release_id])[0] == Release(
+        id=release_id,
+        name=b"0.1.0",
+        message=(
+            b"Synthetic release for archive at "
+            b"https://ftp.gnu.org/gnu/8sync/8sync-0.1.0.tar.gz\n"
+        ),
+        target=hash_to_bytes("3aebc29ed1fccc4a6f2f2010fb8e57882406b528"),
+        target_type=ObjectType.DIRECTORY,
+        synthetic=True,
+        author=Person.from_fullname(b""),
+        date=TimestampWithTimezone.from_datetime(
+            datetime.datetime(1999, 12, 9, 8, 53, 30, tzinfo=datetime.timezone.utc)
+        ),
+    )
+
+    expected_contents = map(
+        hash_to_bytes, _expected_new_non_skipped_contents_first_visit
+    )
+    assert list(swh_storage.content_missing_per_sha1(expected_contents)) == []
+
+    expected_dirs = map(hash_to_bytes, _expected_new_directories_first_visit)
+    assert list(swh_storage.directory_missing(expected_dirs)) == []
+
+    expected_rels = map(hash_to_bytes, _expected_new_releases_first_visit)
+    assert list(swh_storage.release_missing(expected_rels)) == []
+
+
 def test_archive_visit_with_release_artifact_no_prior_visit(
     swh_storage, requests_mock_datadir
 ):
diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py
--- a/swh/loader/package/loader.py
+++ b/swh/loader/package/loader.py
@@ -31,6 +31,7 @@
 import sentry_sdk
 
 from swh.core.tarball import uncompress
+from swh.loader.core import discovery
 from swh.loader.core.loader import BaseLoader
 from swh.loader.exception import NotFound
 from swh.loader.package.utils import download
@@ -817,6 +818,12 @@
 
         contents, skipped_contents, directories = from_disk.iter_directory(directory)
 
+        # Instead of sending everything from the bottom up to the storage,
+        # use a Merkle graph discovery algorithm to filter out known objects.
+        contents, skipped_contents, directories = discovery.filter_known_objects(
+            self.storage, contents, skipped_contents, directories
+        )
+
         logger.debug("Number of skipped contents: %s", len(skipped_contents))
         self.storage.skipped_content_add(skipped_contents)
         logger.debug("Number of contents: %s", len(contents))