diff --git a/README.md b/README.md
--- a/README.md
+++ b/README.md
@@ -6,13 +6,10 @@
 before.
 
 The main entry points are:
-- :class:`swh.loader.mercurial.loader.HgBundle20Loader` which reads and loads a remote
-  repository (or local).
+- :class:`swh.loader.mercurial.loader.HgLoader` which reads and loads a repository
+  (local or remote) into an SWH archive.
 
-- :class:`swh.loader.mercurial.from_disk.HgLoaderFromDisk` which reads and loads a local
-  repository into an SWH archive.
-
-- :class:`swh.loader.mercurial.from_disk.HgArchiveLoaderFromDisk` which reads and loads
+- :class:`swh.loader.mercurial.loader.HgArchiveLoader` which reads and loads
   a local repository wrapped within a tarball
 
 # CLI run
@@ -32,4 +29,3 @@
 ``` bash
 swh loader --C /tmp/mercurial.yml run mercurial https://www.mercurial-scm.org/repo/hello
 ```
-or `mercurial_from_disk`
diff --git a/conftest.py b/conftest.py
--- a/conftest.py
+++ b/conftest.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2020  The Software Heritage developers
+# Copyright (C) 2020-2021  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
@@ -16,5 +16,4 @@
 def swh_scheduler_celery_includes(swh_scheduler_celery_includes):
     return swh_scheduler_celery_includes + [
         "swh.loader.mercurial.tasks",
-        "swh.loader.mercurial.tasks_from_disk",
     ]
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -1,5 +1,5 @@
 #!/usr/bin/env python3
-# Copyright (C) 2015-2020  The Software Heritage developers
+# Copyright (C) 2015-2021  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
@@ -54,7 +54,6 @@
     entry_points="""
         [swh.workers]
         loader.mercurial=swh.loader.mercurial:register
-        loader.mercurial_from_disk=swh.loader.mercurial:register_from_disk
         [console_scripts]
         swh-hg-identify=swh.loader.mercurial.identify:main
     """,
diff --git a/swh/loader/mercurial/__init__.py b/swh/loader/mercurial/__init__.py
--- a/swh/loader/mercurial/__init__.py
+++ b/swh/loader/mercurial/__init__.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019  The Software Heritage developers
+# Copyright (C) 2019-2021  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
@@ -9,19 +9,9 @@
 
 def register() -> Mapping[str, Any]:
     """Register the current worker module's definition"""
-    from .loader import HgBundle20Loader
+    from .loader import HgLoader
 
     return {
         "task_modules": [f"{__name__}.tasks"],
-        "loader": HgBundle20Loader,
-    }
-
-
-def register_from_disk() -> Mapping[str, Any]:
-    """Register the current worker module's definition"""
-    from .from_disk import HgLoaderFromDisk
-
-    return {
-        "task_modules": [f"{__name__}.tasks_from_disk"],
-        "loader": HgLoaderFromDisk,
+        "loader": HgLoader,
     }
diff --git a/swh/loader/mercurial/bundle20_reader.py b/swh/loader/mercurial/bundle20_reader.py
deleted file mode 100644
--- a/swh/loader/mercurial/bundle20_reader.py
+++ /dev/null
@@ -1,621 +0,0 @@
-# Copyright (C) 2017  The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-"""This document contains code for extracting all of the data from
-Mercurial version 2 bundle file. It is referenced by
-bundle20_loader.py
-
-"""
-
-# =============================================================================
-# =============================================================================
-#                                  BACKGROUND
-# =============================================================================
-# =============================================================================
-#
-# https://www.mercurial-scm.org/wiki/BundleFormat says:
-# "The new bundle format design is described on the BundleFormat2 page."
-#
-# https://www.mercurial-scm.org/wiki/BundleFormat2#Format_of_the_Bundle2_Container says:                # noqa
-# "The latest description of the binary format can be found as comment in the
-# Mercurial source code."
-#
-# https://www.mercurial-scm.org/repo/hg/file/default/mercurial/help/internals/bundles.txt says:         # noqa
-# "The 'HG20' format is not yet documented here. See the inline comments in
-# 'mercurial/exchange.py' for now."
-#
-# -----------------------------------------------------------------------------
-# Avi says:
-# -----------------------------------------------------------------------------
-#
-# All of the above official(?) statements seem to be quite wrong.
-#
-# The mercurial-scm wiki is a cluster#@*& of missing pages, bad links, wrong
-# information, obsolete information, undecipherable names, and half-started
-# leavings that only sort of look like content. I don't understand who or what
-# it's there for. I think that means it's not there for me?
-#
-# https://www.mercurial-scm.org/wiki/BundleFormat2#New_header is wrong and
-# bizarre, and there isn't any other information on the page.
-#
-#
-# https://www.mercurial-scm.org/repo/hg/file/de86a6872d06/mercurial/help/internals/changegroups.txt     # noqa
-# (`hg help internals.changegroups`) is very close to what we need.
-# It is accurate, current, and thorough.
-# It describes much of the internal structure, which is super helpful if you
-# know in advance which info can be trusted, but it doesn't describe any of the
-# file-level details, including the file headers and that the entire bundle
-# is broken into overlaid 4KB chunks starting from just after the bundle
-# header, nor does it describe what any of the component elements are used for,
-# nor does it explain the meta-message segment in the blob deltas, nor does it
-# explain the file flags occasionally appended to manifest file hashes. Also it
-# says: "The [delta data] format is described more fully in 'hg help
-# internals.bdiff'", which is also wrong. As far as I can tell, that
-# file has never existed.
-#
-# It does however have one potentially extremely useful note buried in the
-# middle that, in hindsight, could have significant implications for complexity
-# and performance in future Mercurial loading work.
-#
-# It says: "In version 1, the delta is always applied against the previous node
-# from the changegroup or the first parent if this is the first entry in the
-# changegroup."
-#
-# If the next version of HG support for SWH can reliably get version 1 data,
-# then it could be implemented entirely without worrying about ballooning
-# memory utilization, which would shrink the code significantly and probably be
-# faster too. So maybe HG10 bundles instead of HG20 bundles are superior for
-# this task? But then I read that servers can optionally disable serving
-# version 1 content, and I like to think that this loader could eventually
-# be applied directly to a network stream without an intermediate phase for
-# cloning and local bundling, so...It seemed like a good idea at the time?
-#
-# -----------------------------------------------------------------------------
-# Other notes and thoughts:
-# -----------------------------------------------------------------------------
-# 1)
-# This is a relatively minor detail, but
-# Mercurial nodes are not content-addressable like Git's are.
-#
-# https://www.mercurial-scm.org/wiki/Nodeid explains: "If you modify a file,
-# commit the change, and then modify it to restore the original contents, the
-# contents are the same but the history is different, so the file will get a
-# new nodeid. This history-sensitivity is obtained by calculating the nodeid
-# from the concatenation of the parent nodeids with the file's contents..."
-#
-# The result is that we always have to collect and hash everything at least
-# once in order to know if we've seen something like it before, because nothing
-# tells us that the node we're looking at is unique. We can use node ids for
-# linking disparate elements together (e.g. commit to manifest) but not for
-# determining whether two elements in the same group are identical in all but
-# descendency. So there's no way to save time on duplicate hashing. Well...
-# there is the copied_file blob metadata, but, lol.
-#
-# 2)
-# Most of the code complexity is due to dealing with 'version 2' changegroups,
-# for which we need to keep track of the entire history of all updates made
-# to a given file or working directory tree structure, because a revision
-# delta could be applied over any of the prior revisions all the way back to
-# rev 0, according to whenever files were branched/merged/uncommitted/etc. For
-# very large repositories with a lot of churn, this can quickly expand to
-# require multiple gigabytes of space, possibly exceeding RAM availability if
-# one desires to keep as much data resident in memory as possible to boost
-# performance. mozilla-unified, for instance, produces some 2 million+ blobs
-# (1.6 million+ unique). Nested umpteen subdirectory levels deep, those blobs
-# balloon into a quantity of directory subtrees that rapidly exceeds an 8GB RAM
-# laptop's ability to keep them all active without a good amount of care and
-# pruning. The code here tries to strike a balance between memory utilization
-# and performance.
-#
-# This problem is also referenced in the last paragraph of the previous
-# section, where potentially this problem wouldn't exist for 'version 1' data
-# if we can reliably get it. Can we? Either that or not use bundles at all,
-# which has other costs.
-#
-# 3)
-# If the list of changed files stored by the changesets had indicated which
-# of those changed files were added or modified and which ones were removed,
-# this code could be much faster. Right now we have to perform a huge number of
-# substring replacements (see the apply_revdata method) to produce a complete
-# file manifest for each commit (as a string!!!) in order to know how to get
-# the set of removed files from the next delta. We can intuit from every
-# manifest delta which files were modified or added, but I believe there's no
-# way to intuit which files were removed without actually having the complete
-# prior state and without the list of removals being explicitly given. If you
-# have an explicit list of all the files that were removed for any given commit
-# changegroup, and you combine that with the delta updates in the manifest
-# changegroups which detail the set of files that have been added or modified,
-# then you wouldn't even have to apply any of the string deltas to get a
-# complete understanding of the set of differences between one manifest and the
-# next. Not having this effective speed boost is rather unfortunate; it would
-# require only one extra stored byte per commit to differentiate removals and
-# would make extracting bundles lightning fast.
-# ============================================================================
-##
-
-from binascii import unhexlify
-from collections import OrderedDict
-from datetime import datetime
-import itertools
-import struct
-
-from .chunked_reader import ChunkedFileReader
-from .objects import SelectiveCache
-
-
-def unpack(fmt_str, source):
-    """Utility function for fetching the right number of bytes from a stream to
-    satisfy a struct.unpack pattern.
-
-    args:
-        fmt_str: a struct.unpack string pattern
-                 (e.g. '>I' for 4 bytes big-endian)
-        source: any IO object that has a read(<size>) method which
-                returns an appropriate sequence of bytes
-    """
-    ret = struct.unpack(fmt_str, source.read(struct.calcsize(fmt_str)))
-    if len(ret) == 1:
-        return ret[0]
-    return ret
-
-
-class Bundle20Reader(object):
-    """Parser for extracting data from Mercurial Bundle20 files.
-    NOTE: Currently only works on uncompressed HG20 bundles, but checking for
-    COMPRESSION=<2chars> and loading the appropriate stream decompressor
-    at that point would be trivial to add if necessary.
-
-    args:
-        bundlefile (str): name of the binary repository bundle file
-        cache_filename (str): path to the disk cache used (transited
-                              to the SelectiveCache instance)
-        cache_size (int): tuning parameter for the upper RAM limit used by
-                    historical data caches. The default is defined in the
-                    SelectiveCache class.
-
-    """
-
-    NAUGHT_NODE = b"\x00" * 20
-
-    def __init__(self, bundlefile, cache_filename, cache_size=None):
-        self.bundlefile = bundlefile
-        self.cache_filename = cache_filename
-        bfile = open(bundlefile, "rb", buffering=200 * 1024 * 1024)
-
-        btype = bfile.read(4)  # 'HG20'
-        if btype != b"HG20":
-            raise Exception(bundlefile, b"Not an HG20 bundle. First 4 bytes:" + btype)
-        bfile.read(4)  # '\x00\x00\x00\x00'
-
-        self.params = self.read_bundle_header(bfile)
-        # print('PARAMETERS', self.params)
-        self.num_commits = self.params[b"nbchanges"]
-
-        self.filereader = ChunkedFileReader(bfile)
-
-        self.cache_size = cache_size
-        self.blobs_offset = None
-        self.changes_offset = self.filereader.tell()
-        self.changes_next_offset = None
-        self.manifests_offset = None
-        self.manifests_next_offset = None
-        self.id_to_info = {}
-
-    def read_bundle_header(self, bfile):
-        """Parse the file header which describes the format and parameters.
-        See the structure diagram at the top of the file for more insight.
-
-        args:
-            bfile: bundle file handle with the cursor at the start offset of
-                   the content header (the 9th byte in the file)
-
-        returns:
-            dict of decoded bundle parameters
-        """
-        unpack(">I", bfile)  # header length
-        chg_len = unpack(">B", bfile)  # len('CHANGEGROUP') == 11
-        bfile.read(chg_len)  # should say 'CHANGEGROUP'
-        unpack(">I", bfile)  # probably \x00\x00\x00\x00
-
-        n_mandatory, n_advisory = unpack(">BB", bfile)  # parameter counts
-        mandatory_params = [
-            (key_len, val_len)
-            for key_len, val_len in [unpack(">BB", bfile) for i in range(n_mandatory)]
-        ]
-        advisory_params = [
-            (key_len, val_len)
-            for key_len, val_len in [unpack(">BB", bfile) for i in range(n_advisory)]
-        ]
-        params = {}
-
-        for key_len, val_len in mandatory_params + advisory_params:
-            key = unpack(">%ds" % key_len, bfile)
-            val = int(unpack(">%ds" % val_len, bfile))
-            params[key] = val
-
-        return params
-
-    def revdata_iterator(self, bytes_to_read):
-        """A chunk's revdata section is a series of start/end/length/data_delta
-        content updates called RevDiffs that indicate components of a text diff
-        applied to the node's basenode. The sum length of all diffs is the
-        length indicated at the beginning of the chunk at the start of the
-        header.
-        See the structure diagram at the top of the file for more insight.
-
-        args:
-            bytes_to_read: int total number of bytes in the chunk's revdata
-        yields:
-            (int, int, read iterator) representing a single text diff component
-        """
-        while bytes_to_read > 0:
-            start_offset = unpack(">I", self.filereader)
-            end_offset = unpack(">I", self.filereader)
-            blocklen = unpack(">I", self.filereader)
-            delta_it = self.filereader.read_iterator(blocklen)
-            bytes_to_read -= 12 + blocklen
-            yield (start_offset, end_offset, delta_it)  # RevDiff
-
-    def read_chunk_header(self):
-        """The header of a RevChunk describes the id ('node') for the current
-        change, the commit id ('linknode') associated with this change,
-        the parental heritage ('p1' and 'p2'), and the node to which the
-        revdata updates will apply ('basenode'). 'linknode' is the same as
-        'node' when reading the commit log because any commit is already
-        itself. 'basenode' for a changeset will be NAUGHT_NODE, because
-        changeset chunks include complete information and not diffs.
-        See the structure diagram at the top of the file for more insight.
-
-        returns:
-            dict of the next delta header
-        """
-        header = self.filereader.read(100)
-        header = {
-            "node": header[0:20],
-            "p1": header[20:40],
-            "p2": header[40:60],
-            "basenode": header[60:80],
-            "linknode": header[80:100],
-        }
-        return header
-
-    def read_revchunk(self):
-        """Fetch a complete RevChunk.
-        A RevChunk contains the collection of line changes made in a particular
-        update. header['node'] identifies which update. Commits, manifests, and
-        files all have these. Each chunk contains an indicator of the whole
-        chunk size, an update header, and then the body of the update as a
-        series of text diff components.
-        See the structure diagram at the top of the file for more insight.
-
-        returns:
-            tuple(dict, iterator) of (header, chunk data) if there is another
-            chunk in the group, else None
-        """
-        size = unpack(">I", self.filereader) - 104
-        if size >= 0:
-            header = self.read_chunk_header()
-            return (header, self.revdata_iterator(size))
-        else:
-            return None  # NullChunk
-
-    def extract_commit_metadata(self, data):
-        """Converts the binary commit metadata format into a dict.
-
-        args:
-            data: bytestring of encoded commit information
-
-        returns:
-            dict of decoded commit information
-        """
-        parts, message = data.split(b"\n\n", 1)
-        parts = parts.split(b"\n")
-        commit = {}
-        commit["message"] = message
-        commit["manifest"] = unhexlify(parts[0])
-        commit["user"] = parts[1]
-        tstamp, tz, *extra = parts[2].split(b" ")
-        commit["time"] = datetime.fromtimestamp(float(tstamp))
-        commit["time_offset_seconds"] = int(tz)
-        if extra:
-            commit["extra"] = b" ".join(extra)
-        commit["changed_files"] = parts[3:]
-        return commit
-
-    def skip_sections(self, num_sections=1):
-        """Skip past <num_sections> sections quickly.
-
-        args:
-            num_sections: int number of sections to skip
-        """
-        for i in range(num_sections):
-            size = unpack(">I", self.filereader)
-            while size >= 104:
-                self.filereader.seek(size - 4, from_current=True)
-                size = unpack(">I", self.filereader)
-
-    def apply_revdata(self, revdata_it, prev_state):
-        """Compose the complete text body for a change from component deltas.
-
-        args:
-            revdata_it: output from the revdata_iterator method
-            prev_state: bytestring the base complete text on which the new
-                        deltas will be applied
-        returns:
-            (bytestring, list, list) the new complete string and lists of added
-            and removed components (used in manifest processing)
-        """
-        state = []
-        added = []
-        removed = []
-        next_start = 0
-
-        for delta_start, delta_end, rev_diff_it in revdata_it:
-            removed.append(prev_state[delta_start:delta_end])
-            added.append(b"".join(rev_diff_it))
-
-            state.append(prev_state[next_start:delta_start])
-            state.append(added[-1])
-            next_start = delta_end
-
-        state.append(prev_state[next_start:])
-        state = b"".join(state)
-
-        return (state, added, removed)
-
-    def skim_headers(self):
-        """Get all header data from a change group but bypass processing of the
-        contained delta components.
-
-        yields:
-            output of read_chunk_header method for all chunks in the group
-        """
-        size = unpack(">I", self.filereader) - 104
-        while size >= 0:
-            header = self.read_chunk_header()
-            self.filereader.seek(size, from_current=True)
-            yield header
-            size = unpack(">I", self.filereader) - 104
-
-    def group_iterator(self):
-        """Bundle sections are called groups. These are composed of one or more
-        revision chunks of delta components. Iterate over all the chunks in a
-        group and hand each one back.
-
-        yields:
-            see output of read_revchunk method
-        """
-        revchunk = self.read_revchunk()
-        while revchunk:  # A group is terminated by a NullChunk
-            yield revchunk  # (header, revdata_iterator)
-            revchunk = self.read_revchunk()
-
-    def yield_group_objects(self, cache_hints=None, group_offset=None):
-        """Bundles are sectioned into groups: the log of all commits, the log
-        of all manifest changes, and a series of logs of blob changes (one for
-        each file). All groups are structured the same way, as a series of
-        revisions each with a series of delta components. Iterate over the
-        current group and return the completed object data for the current
-        update by applying all of the internal delta components to each prior
-        revision.
-
-        args:
-            cache_hints: see build_cache_hints (this will be built
-                 automatically if not pre-built and passed in)
-            group_offset: int file position of the start of the desired group
-
-        yields:
-            (dict, bytestring, list, list) the output from read_chunk_header
-                followed by the output from apply_revdata
-        """
-        if group_offset is not None:
-            self.filereader.seek(group_offset)
-
-        if cache_hints is None:
-            cache_hints = self.build_cache_hints()
-
-        data_cache = SelectiveCache(
-            max_size=self.cache_size,
-            cache_hints=cache_hints,
-            filename=self.cache_filename,
-        )
-
-        # Loop over all revisions in the group
-        data = b""
-        for header, revdata_it in self.group_iterator():
-            node = header["node"]
-            basenode = header["basenode"]
-
-            data = data_cache.fetch(basenode) or b""
-
-            data, added, removed = self.apply_revdata(revdata_it, data)
-
-            data_cache.store(node, data)
-
-            yield (header, data, added, removed)  # each RevChunk
-
-    def extract_meta_from_blob(self, data):
-        """File revision data sometimes begins with a metadata section of
-        dubious value. Strip it off and maybe decode it. It seems to be mostly
-        useless. Why indicate that a file node is a copy of another node? You
-        can already get that information from the delta header.
-
-        args:
-            data: bytestring of one revision of a file, possibly with metadata
-                embedded at the start
-
-        returns:
-            (bytestring, dict) of (the blob data, the meta information)
-        """
-        meta = {}
-        if data.startswith(b"\x01\n"):
-            empty, metainfo, data = data.split(b"\x01\n", 2)
-            metainfo = b"\x01\n" + metainfo + b"\x01\n"
-            if metainfo.startswith(b"copy:"):
-                # direct file copy (?)
-                copyinfo = metainfo.split(b"\n")
-                meta["copied_file"] = copyinfo[0][6:]
-                meta["copied_rev"] = copyinfo[1][9:]
-            elif metainfo.startswith(b"censored:"):
-                # censored revision deltas must be full-replacements (?)
-                meta["censored"] = metainfo
-            else:
-                # no idea
-                meta["text"] = metainfo
-        return data, meta
-
-    def seek_changelog(self):
-        """Seek to the beginning of the change logs section.
-        """
-        self.filereader.seek(self.changes_offset)
-
-    def seek_manifests(self):
-        """Seek to the beginning of the manifests section.
-        """
-        if self.manifests_offset is None:
-            self.seek_changelog()
-            self.skip_sections(1)  # skip past commits
-            self.manifests_offset = self.filereader.tell()
-        else:
-            self.filereader.seek(self.manifests_offset)
-
-    def seek_filelist(self):
-        """Seek to the beginning of the file changes section.
-        """
-        if self.blobs_offset is None:
-            self.seek_manifests()
-            self.skip_sections(1)  # skip past manifests
-            self.blobs_offset = self.filereader.tell()
-        else:
-            self.filereader.seek(self.blobs_offset)
-
-    def yield_all_blobs(self):
-        """Gets blob data from the bundle.
-
-        yields:
-            (bytestring, (bytestring, int, dict)) of
-                (blob data, (file name, start offset of the file within the
-                bundle, node header))
-        """
-        self.seek_filelist()
-
-        # Loop through all files that have commits
-        size = unpack(">I", self.filereader)
-        while size > 0:
-            file_name = self.filereader.read(size - 4)
-            file_start_offset = self.filereader.tell()
-            # get all of the blobs for each file
-            for header, data, *_ in self.yield_group_objects():
-                blob, meta = self.extract_meta_from_blob(data)
-                yield blob, (file_name, file_start_offset, header)
-            size = unpack(">I", self.filereader)
-
-    def yield_all_changesets(self):
-        """Gets commit data from the bundle.
-
-        yields:
-            (dict, dict) of (read_chunk_header output,
-                 extract_commit_metadata output)
-        """
-        self.seek_changelog()
-        for header, data, *_ in self.yield_group_objects():
-            changeset = self.extract_commit_metadata(data)
-            yield (header, changeset)
-
-    def yield_all_manifest_deltas(self, cache_hints=None):
-        """Gets manifest data from the bundle.
-        In order to process the manifests in a reasonable amount of time, we
-        want to use only the deltas and not the entire manifest at each change,
-        because if we're processing them in sequential order (we are) then we
-        already have the previous state so we only need the changes.
-
-        args:
-            cache_hints: see build_cache_hints method
-
-        yields:
-            (dict, dict, dict) of (read_chunk_header output,
-                extract_manifest_elements output on added/modified files,
-                extract_manifest_elements on removed files)
-        """
-        self.seek_manifests()
-        for header, data, added, removed in self.yield_group_objects(
-            cache_hints=cache_hints
-        ):
-            added = self.extract_manifest_elements(added)
-            removed = self.extract_manifest_elements(removed)
-            yield (header, added, removed)
-
-    def build_manifest_hints(self):
-        """Just a minor abstraction shortcut for the build_cache_hints method.
-
-        returns:
-            see build_cache_hints method
-        """
-        self.seek_manifests()
-        return self.build_cache_hints()
-
-    def build_cache_hints(self):
-        """The SelectiveCache class that we use in building nodes can accept a
-        set of key counters that makes its memory usage much more efficient.
-
-        returns:
-            dict of key=a node id, value=the number of times we
-            will need data from that node when building subsequent nodes
-        """
-        cur_pos = self.filereader.tell()
-        hints = OrderedDict()
-        prev_node = None
-        for header in self.skim_headers():
-            basenode = header["basenode"]
-            if (basenode != self.NAUGHT_NODE) and (basenode != prev_node):
-                # If the base isn't immediately prior, then cache it once more.
-                hints[basenode] = hints.get(basenode, 0) + 1
-            prev_node = header["node"]
-        self.filereader.seek(cur_pos)
-        return hints
-
-    def extract_manifest_elements(self, data):
-        """Parses data that looks like a manifest. In practice we only pass in
-        the bits extracted from the application of a manifest delta describing
-        which files were added/modified or which ones were removed.
-
-        args:
-            data: either a string or a list of strings that, when joined,
-              embodies the composition of a manifest.
-
-              This takes the form
-              of repetitions of (without the brackets)::
-
-                b'<file_path>\x00<file_node>[flag]\\n' ...repeat...
-
-              where ``[flag]`` may or may not be there depending on
-              whether the file is specially flagged as executable or
-              something
-
-        returns:
-            dict: ``{file_path: (file_node, permissions), ...}`` where
-            permissions is given according to the flag that optionally exists
-            in the data
-        """
-        elements = {}
-        if isinstance(data, str):
-            data = data.split(b"\n")
-        else:
-            data = itertools.chain.from_iterable([chunk.split(b"\n") for chunk in data])
-
-        for line in data:
-            if line != b"":
-                f = line.split(b"\x00")
-
-                node = f[1]
-                flag_bytes = node[40:]
-
-                elements[f[0]] = (
-                    unhexlify(node[:40]),
-                    b"l" in flag_bytes,
-                    b"755" if (b"x" in flag_bytes) else b"644",
-                )
-
-        return elements
diff --git a/swh/loader/mercurial/chunked_reader.py b/swh/loader/mercurial/chunked_reader.py
deleted file mode 100644
--- a/swh/loader/mercurial/chunked_reader.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Copyright (C) 2017  The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import struct
-
-
-class ChunkedFileReader(object):
-    """A binary stream reader that gives seamless read access to Mercurial's
-    bundle2 HG20 format which is partitioned for some reason at the file level
-    into chunks of [4Bytes:<length>, <length>Bytes:<data>] as if it were
-    encoding transport packets.
-
-    args:
-        file: rb file handle pre-aligned to the start of the chunked portion
-        size_unpack_fmt: struct format string for unpacking the next chunk size
-    """
-
-    def __init__(self, file, size_unpack_fmt=">I"):
-        self._file = file
-        self._size_pattern = size_unpack_fmt
-        self._size_bytes = struct.calcsize(size_unpack_fmt)
-        self._bytes_per_chunk = self._chunk_size(True)
-        self._chunk_bytes_left = self._bytes_per_chunk
-        self._offset = self._file.tell()
-        # find the file size
-        self._file.seek(0, 2)  # seek to end
-        self._size = self._file.tell()
-        self._file.seek(self._offset, 0)  # seek back to original position
-
-    def _chunk_size(self, first_time=False):
-        """Unpack the next bytes from the file to get the next file chunk size.
-        """
-        size = struct.unpack(self._size_pattern, self._file.read(self._size_bytes))[0]
-        return size
-
-    def size(self):
-        """Returns the file size in bytes.
-        """
-        return self._size
-
-    def read(self, bytes_to_read):
-        """Return N bytes from the file as a single block.
-
-        args:
-            bytes_to_read: int number of bytes of content
-        """
-        return b"".join(self.read_iterator(bytes_to_read))
-
-    def read_iterator(self, bytes_to_read):
-        """Return a generator that yields N bytes from the file one file chunk
-        at a time.
-
-        args:
-            bytes_to_read: int number of bytes of content
-        """
-        while bytes_to_read > self._chunk_bytes_left:
-            yield self._file.read(self._chunk_bytes_left)
-            bytes_to_read -= self._chunk_bytes_left
-            self._chunk_bytes_left = self._chunk_size()
-        self._chunk_bytes_left -= bytes_to_read
-        yield self._file.read(bytes_to_read)
-
-    def seek(self, new_pos=None, from_current=False):
-        """Wraps the underlying file seek, additionally updating the
-        chunk_bytes_left counter appropriately so that we can start reading
-        from the new location.
-
-        args:
-
-            new_pos: new cursor byte position
-            from_current: if True, it treats new_pos as an offset from the
-                current cursor position, bypassing any chunk boundaries as if
-                they weren't there. This should give the same end position as a
-                read except without the reading data part.
-        """
-        if from_current:
-            new_pos = new_pos or 0  # None -> current
-            if new_pos <= self._chunk_bytes_left:
-                new_pos += self._file.tell()
-            else:
-                new_pos += (
-                    self._file.tell()
-                    + self._size_bytes
-                    + (
-                        (
-                            (new_pos - self._chunk_bytes_left - 1)  # aliasing
-                            // self._bytes_per_chunk
-                        )
-                        * self._size_bytes
-                    )
-                )
-        else:
-            new_pos = new_pos or self._offset  # None -> start position
-
-        self._chunk_bytes_left = self._bytes_per_chunk - (
-            (new_pos - self._offset) % (self._bytes_per_chunk + self._size_bytes)
-        )
-        self._file.seek(new_pos)
-
-    def __getattr__(self, item):
-        """Forward other calls to the underlying file object.
-        """
-        return getattr(self._file, item)
diff --git a/swh/loader/mercurial/cli.py b/swh/loader/mercurial/cli.py
deleted file mode 100644
--- a/swh/loader/mercurial/cli.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# Copyright (C) 2018-2021  The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from itertools import chain
-import logging
-
-import click
-from deprecated import deprecated
-
-from swh.loader.mercurial.utils import parse_visit_date
-
-LOGLEVELS = list(
-    chain.from_iterable(
-        (logging._levelToName[lvl], logging._levelToName[lvl].lower())
-        for lvl in sorted(logging._levelToName.keys())
-    )
-)
-
-
-@click.command()
-@click.argument("origin-url")
-@click.option(
-    "--hg-directory",
-    "-d",
-    help=(
-        "Path to the hg (local) directory to load from. "
-        "If unset, the hg repo will be cloned from the "
-        "given (origin) url."
-    ),
-)
-@click.option("--hg-archive", "-a", help=("Path to the hg archive file to load from."))
-@click.option("--visit-date", "-D", help="Visit date (defaults to now).")
-@click.option("--log-level", "-l", type=click.Choice(LOGLEVELS), help="Log level.")
-@deprecated(
-    version="0.4.0", reason="Use `swh loader run mercurial|mercurial_from_disk` instead"
-)
-def main(
-    origin_url, hg_directory=None, hg_archive=None, visit_date=None, log_level=None
-):
-    from swh.storage import get_storage
-
-    logging.basicConfig(
-        level=(log_level or "DEBUG").upper(),
-        format="%(asctime)s %(process)d %(message)s",
-    )
-
-    visit_date = parse_visit_date(visit_date or "now")
-
-    kwargs = {"visit_date": visit_date, "url": origin_url}
-    if hg_archive:
-        from .loader import HgArchiveBundle20Loader as HgLoader
-
-        kwargs["archive_path"] = hg_archive
-    else:
-        from .loader import HgBundle20Loader as HgLoader
-
-        kwargs["directory"] = hg_directory
-
-    storage = get_storage(cls="memory")
-    return HgLoader(storage, **kwargs).load()
-
-
-if __name__ == "__main__":
-    main()
diff --git a/swh/loader/mercurial/from_disk.py b/swh/loader/mercurial/from_disk.py
deleted file mode 100644
--- a/swh/loader/mercurial/from_disk.py
+++ /dev/null
@@ -1,815 +0,0 @@
-# Copyright (C) 2020-2021  The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from collections import deque
-from datetime import datetime
-import os
-from shutil import rmtree
-from tempfile import mkdtemp
-from typing import Deque, Dict, Iterator, List, Optional, Set, Tuple, TypeVar, Union
-
-from swh.core.utils import grouper
-from swh.loader.core.loader import BaseLoader
-from swh.loader.core.utils import clean_dangling_folders
-from swh.loader.mercurial.utils import get_minimum_env, parse_visit_date
-from swh.model import identifiers
-from swh.model.from_disk import Content, DentryPerms, Directory
-from swh.model.hashutil import hash_to_bytehex
-from swh.model.model import (
-    ExtID,
-    ObjectType,
-    Origin,
-    Person,
-    Release,
-    Revision,
-    RevisionType,
-    Sha1Git,
-    Snapshot,
-    SnapshotBranch,
-    TargetType,
-    TimestampWithTimezone,
-)
-from swh.model.model import Content as ModelContent
-from swh.storage.algos.snapshot import snapshot_get_latest
-from swh.storage.interface import StorageInterface
-
-from . import hgutil
-from .archive_extract import tmp_extract
-from .hgutil import HgFilteredSet, HgNodeId, HgSpanSet
-
-FLAG_PERMS = {
-    b"l": DentryPerms.symlink,
-    b"x": DentryPerms.executable_content,
-    b"": DentryPerms.content,
-}  # type: Dict[bytes, DentryPerms]
-
-TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial.from_disk"
-
-EXTID_TYPE = "hg-nodeid"
-EXTID_VERSION: int = 1
-
-T = TypeVar("T")
-
-
-class CorruptedRevision(ValueError):
-    """Raised when a revision is corrupted."""
-
-    def __init__(self, hg_nodeid: HgNodeId) -> None:
-        super().__init__(hg_nodeid.hex())
-        self.hg_nodeid = hg_nodeid
-
-
-class HgDirectory(Directory):
-    """A more practical directory.
-
-    - creates missing parent directories
-    - removes empty directories
-    """
-
-    def __setitem__(self, path: bytes, value: Union[Content, "HgDirectory"]) -> None:
-        if b"/" in path:
-            head, tail = path.split(b"/", 1)
-
-            directory = self.get(head)
-            if directory is None or isinstance(directory, Content):
-                directory = HgDirectory()
-                self[head] = directory
-
-            directory[tail] = value
-        else:
-            super().__setitem__(path, value)
-
-    def __delitem__(self, path: bytes) -> None:
-        super().__delitem__(path)
-
-        while b"/" in path:  # remove empty parent directories
-            path = path.rsplit(b"/", 1)[0]
-            if len(self[path]) == 0:
-                super().__delitem__(path)
-            else:
-                break
-
-    def get(
-        self, path: bytes, default: Optional[T] = None
-    ) -> Optional[Union[Content, "HgDirectory", T]]:
-        # TODO move to swh.model.from_disk.Directory
-        try:
-            return self[path]
-        except KeyError:
-            return default
-
-
-class HgLoaderFromDisk(BaseLoader):
-    """Load a mercurial repository from a local repository.
-
-    Mercurial's branching model is more complete than Git's; it allows for multiple
-    heads per branch, closed heads and bookmarks. The following mapping is used to
-    represent the branching state of a Mercurial project in a given snapshot:
-
-    - `HEAD` (optional) either the node pointed by the `@` bookmark or the tip of
-      the `default` branch
-    - `branch-tip/<branch-name>` (required) the first head of the branch, sorted by
-      nodeid if there are multiple heads.
-    - `bookmarks/<bookmark_name>` (optional) holds the bookmarks mapping if any
-    - `branch-heads/<branch_name>/0..n` (optional) for any branch with multiple open
-      heads, list all *open* heads
-    - `branch-closed-heads/<branch_name>/0..n` (optional) for any branch with at least
-      one closed head, list all *closed* heads
-    - `tags/<tag-name>` (optional) record tags
-
-    The format is not ambiguous regardless of branch name since we know it ends with a
-    `/<index>`, as long as we have a stable sorting of the heads (we sort by nodeid).
-    There may be some overlap between the refs, but it's simpler not to try to figure
-    out de-duplication.
-    However, to reduce the redundancy between snapshot branches in the most common case,
-    when a branch has a single open head, it will only be referenced as
-    `branch-tip/<branch-name>`. The `branch-heads/` hierarchy only appears when a branch
-    has multiple open heads, which we consistently sort by increasing nodeid.
-    The `branch-closed-heads/` hierarchy is also sorted by increasing nodeid.
-    """
-
-    CONFIG_BASE_FILENAME = "loader/mercurial"
-
-    visit_type = "hg"
-
-    def __init__(
-        self,
-        storage: StorageInterface,
-        url: str,
-        directory: Optional[str] = None,
-        logging_class: str = "swh.loader.mercurial.LoaderFromDisk",
-        visit_date: Optional[datetime] = None,
-        temp_directory: str = "/tmp",
-        clone_timeout_seconds: int = 7200,
-        content_cache_size: int = 10_000,
-        max_content_size: Optional[int] = None,
-    ):
-        """Initialize the loader.
-
-        Args:
-            url: url of the repository.
-            directory: directory of the local repository.
-            logging_class: class of the loader logger.
-            visit_date: visit date of the repository
-            config: loader configuration
-        """
-        super().__init__(
-            storage=storage,
-            logging_class=logging_class,
-            max_content_size=max_content_size,
-        )
-
-        self._temp_directory = temp_directory
-        self._clone_timeout = clone_timeout_seconds
-
-        self.origin_url = url
-        self.visit_date = visit_date
-        self.directory = directory
-
-        self._repo: Optional[hgutil.Repository] = None
-        self._revision_nodeid_to_sha1git: Dict[HgNodeId, Sha1Git] = {}
-        self._repo_directory: Optional[str] = None
-
-        # keeps the last processed hg nodeid
-        # it is used for differential tree update by store_directories
-        # NULLID is the parent of the first revision
-        self._last_hg_nodeid = hgutil.NULLID
-
-        # keeps the last revision tree
-        # it is used for differential tree update by store_directories
-        self._last_root = HgDirectory()
-
-        # Cache the content hash across revisions to avoid recalculation.
-        self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict(
-            content_cache_size,
-        )
-
-        # hg node id of the latest snapshot branch heads
-        # used to find what are the new revisions since last snapshot
-        self._latest_heads: List[HgNodeId] = []
-        # hg node ids of all the tags recorded on previous runs
-        # Used to compute which tags need to be added, even across incremental runs
-        # that might separate a changeset introducing a tag from its target.
-        self._saved_tags: Set[HgNodeId] = set()
-
-        self._load_status = "eventful"
-        # If set, will override the default value
-        self._visit_status = None
-
-    def pre_cleanup(self) -> None:
-        """As a first step, will try and check for dangling data to cleanup.
-        This should do its best to avoid raising issues.
-
-        """
-        clean_dangling_folders(
-            self._temp_directory,
-            pattern_check=TEMPORARY_DIR_PREFIX_PATTERN,
-            log=self.log,
-        )
-
-        self.old_environ = os.environ.copy()
-        os.environ.clear()
-        os.environ.update(get_minimum_env())
-
-    def cleanup(self) -> None:
-        """Last step executed by the loader."""
-        os.environ.clear()
-        os.environ.update(self.old_environ)
-
-        # Don't cleanup if loading from a local directory
-        was_remote = self.directory is None
-        if was_remote and self._repo_directory and os.path.exists(self._repo_directory):
-            self.log.debug(f"Cleanup up repository {self._repo_directory}")
-            rmtree(self._repo_directory)
-
-    def prepare_origin_visit(self) -> None:
-        """First step executed by the loader to prepare origin and visit
-        references. Set/update self.origin, and
-        optionally self.origin_url, self.visit_date.
-
-        """
-        self.origin = Origin(url=self.origin_url)
-
-    def prepare(self) -> None:
-        """Second step executed by the loader to prepare some state needed by
-        the loader.
-
-        """
-        # Set here to allow multiple calls to load on the same loader instance
-        self._latest_heads = []
-
-        latest_snapshot = snapshot_get_latest(self.storage, self.origin_url)
-        if latest_snapshot:
-            self._set_recorded_state(latest_snapshot)
-
-    def _set_recorded_state(self, latest_snapshot: Snapshot) -> None:
-        """
-        Looks up the nodeid for all revisions in the snapshot via extid_get_from_target,
-        and adds them to `self._latest_heads`.
-        Also looks up the currently saved releases ("tags" in Mercurial speak).
-        The tags are all listed for easy comparison at the end, while only the latest
-        heads are needed for revisions.
-        """
-        heads = []
-        tags = []
-
-        for branch in latest_snapshot.branches.values():
-            if branch.target_type == TargetType.REVISION:
-                heads.append(branch.target)
-            elif branch.target_type == TargetType.RELEASE:
-                tags.append(branch.target)
-
-        self._latest_heads.extend(
-            HgNodeId(extid.extid) for extid in self._get_extids_for_targets(heads)
-        )
-        self._saved_tags.update(
-            HgNodeId(extid.extid) for extid in self._get_extids_for_targets(tags)
-        )
-
-    def _get_extids_for_targets(self, targets: List[Sha1Git]) -> List[ExtID]:
-        """Get all Mercurial ExtIDs for the targets in the latest snapshot"""
-        extids = []
-        for extid in self.storage.extid_get_from_target(
-            identifiers.ObjectType.REVISION,
-            targets,
-            extid_type=EXTID_TYPE,
-            extid_version=EXTID_VERSION,
-        ):
-            extids.append(extid)
-            self._revision_nodeid_to_sha1git[
-                HgNodeId(extid.extid)
-            ] = extid.target.object_id
-
-        if extids:
-            # Filter out dangling extids, we need to load their target again
-            revisions_missing = self.storage.revision_missing(
-                [extid.target.object_id for extid in extids]
-            )
-            extids = [
-                extid
-                for extid in extids
-                if extid.target.object_id not in revisions_missing
-            ]
-        return extids
-
-    def _get_extids_for_hgnodes(self, hgnode_ids: List[HgNodeId]) -> List[ExtID]:
-        """Get all Mercurial ExtIDs for the mercurial nodes in the list which point to
-           a known revision.
-
-        """
-        extids = []
-
-        for group_ids in grouper(hgnode_ids, n=1000):
-            for extid in self.storage.extid_get_from_extid(
-                EXTID_TYPE, group_ids, version=EXTID_VERSION
-            ):
-                extids.append(extid)
-                self._revision_nodeid_to_sha1git[
-                    HgNodeId(extid.extid)
-                ] = extid.target.object_id
-
-        if extids:
-            # Filter out dangling extids, we need to load their target again
-            revisions_missing = self.storage.revision_missing(
-                [extid.target.object_id for extid in extids]
-            )
-            extids = [
-                extid
-                for extid in extids
-                if extid.target.object_id not in revisions_missing
-            ]
-        return extids
-
-    def fetch_data(self) -> bool:
-        """Fetch the data from the source the loader is currently loading
-
-        Returns:
-            a value that is interpreted as a boolean. If True, fetch_data needs
-            to be called again to complete loading.
-
-        """
-        if not self.directory:  # no local repository
-            self._repo_directory = mkdtemp(
-                prefix=TEMPORARY_DIR_PREFIX_PATTERN,
-                suffix=f"-{os.getpid()}",
-                dir=self._temp_directory,
-            )
-            self.log.debug(
-                f"Cloning {self.origin_url} to {self.directory} "
-                f"with timeout {self._clone_timeout} seconds"
-            )
-            hgutil.clone(self.origin_url, self._repo_directory, self._clone_timeout)
-        else:  # existing local repository
-            # Allow to load on disk repository without cloning
-            # for testing purpose.
-            self._repo_directory = self.directory
-
-        self._repo = hgutil.repository(self._repo_directory)
-
-        return False
-
-    def _new_revs(self, heads: List[HgNodeId]) -> Union[HgFilteredSet, HgSpanSet]:
-        """Return unseen revisions. That is, filter out revisions that are not ancestors of
-        heads"""
-        assert self._repo is not None
-        existing_heads = []
-
-        for hg_nodeid in heads:
-            try:
-                rev = self._repo[hg_nodeid].rev()
-                existing_heads.append(rev)
-            except KeyError:  # the node does not exist anymore
-                pass
-
-        # select revisions that are not ancestors of heads
-        # and not the heads themselves
-        new_revs = self._repo.revs("not ::(%ld)", existing_heads)
-
-        if new_revs:
-            self.log.info("New revisions found: %d", len(new_revs))
-
-        return new_revs
-
-    def get_hg_revs_to_load(self) -> Iterator[int]:
-        """Yield hg revision numbers to load.
-
-        """
-        assert self._repo is not None
-        repo: hgutil.Repository = self._repo
-
-        seen_revs: Set[int] = set()
-        # 1. use snapshot to reuse existing seen heads from it
-        if self._latest_heads:
-            for rev in self._new_revs(self._latest_heads):
-                seen_revs.add(rev)
-                yield rev
-
-        # 2. Then filter out remaining revisions through the overall extid mappings
-        # across hg origins
-        revs_left = repo.revs("all() - ::(%ld)", seen_revs)
-        hg_nodeids = [repo[nodeid].node() for nodeid in revs_left]
-        yield from self._new_revs(
-            [
-                HgNodeId(extid.extid)
-                for extid in self._get_extids_for_hgnodes(hg_nodeids)
-            ]
-        )
-
-    def store_data(self):
-        """Store fetched data in the database."""
-        revs = self.get_hg_revs_to_load()
-        length_ingested_revs = 0
-
-        assert self._repo is not None
-        repo = self._repo
-
-        ignored_revs: Set[int] = set()
-        for rev in revs:
-            if rev in ignored_revs:
-                continue
-            try:
-                self.store_revision(repo[rev])
-                length_ingested_revs += 1
-            except CorruptedRevision as e:
-                self._visit_status = "partial"
-                self.log.warning("Corrupted revision %s", e)
-                descendents = repo.revs("(%ld)::", [rev])
-                ignored_revs.update(descendents)
-
-        if length_ingested_revs == 0:
-            # no new revision ingested, so uneventful
-            # still we'll make a snapshot, so we continue
-            self._load_status = "uneventful"
-
-        branching_info = hgutil.branching_info(repo, ignored_revs)
-        tags_by_name: Dict[bytes, HgNodeId] = repo.tags()
-
-        snapshot_branches: Dict[bytes, SnapshotBranch] = {}
-
-        for tag_name, hg_nodeid in tags_by_name.items():
-            if tag_name == b"tip":
-                # `tip` is listed in the tags by the Mercurial API but its not a tag
-                # defined by the user in `.hgtags`.
-                continue
-            if hg_nodeid not in self._saved_tags:
-                label = b"tags/%s" % tag_name
-                target = self.get_revision_id_from_hg_nodeid(hg_nodeid)
-                snapshot_branches[label] = SnapshotBranch(
-                    target=self.store_release(tag_name, target),
-                    target_type=TargetType.RELEASE,
-                )
-
-        for branch_name, node_id in branching_info.tips.items():
-            name = b"branch-tip/%s" % branch_name
-            target = self.get_revision_id_from_hg_nodeid(node_id)
-            snapshot_branches[name] = SnapshotBranch(
-                target=target, target_type=TargetType.REVISION
-            )
-
-        for bookmark_name, node_id in branching_info.bookmarks.items():
-            name = b"bookmarks/%s" % bookmark_name
-            target = self.get_revision_id_from_hg_nodeid(node_id)
-            snapshot_branches[name] = SnapshotBranch(
-                target=target, target_type=TargetType.REVISION
-            )
-
-        for branch_name, branch_heads in branching_info.open_heads.items():
-            for index, head in enumerate(branch_heads):
-                name = b"branch-heads/%s/%d" % (branch_name, index)
-                target = self.get_revision_id_from_hg_nodeid(head)
-                snapshot_branches[name] = SnapshotBranch(
-                    target=target, target_type=TargetType.REVISION
-                )
-
-        for branch_name, closed_heads in branching_info.closed_heads.items():
-            for index, head in enumerate(closed_heads):
-                name = b"branch-closed-heads/%s/%d" % (branch_name, index)
-                target = self.get_revision_id_from_hg_nodeid(head)
-                snapshot_branches[name] = SnapshotBranch(
-                    target=target, target_type=TargetType.REVISION
-                )
-
-        # If the repo is broken enough or if it has none of the "normal" default
-        # mechanisms, we ignore `HEAD`.
-        default_branch_alias = branching_info.default_branch_alias
-        if default_branch_alias is not None:
-            snapshot_branches[b"HEAD"] = SnapshotBranch(
-                target=default_branch_alias, target_type=TargetType.ALIAS,
-            )
-        snapshot = Snapshot(branches=snapshot_branches)
-        self.storage.snapshot_add([snapshot])
-
-        self.flush()
-        self.loaded_snapshot_id = snapshot.id
-
-    def load_status(self) -> Dict[str, str]:
-        """Detailed loading status.
-
-        Defaults to logging an eventful load.
-
-        Returns: a dictionary that is eventually passed back as the task's
-          result to the scheduler, allowing tuning of the task recurrence
-          mechanism.
-        """
-        return {
-            "status": self._load_status,
-        }
-
-    def visit_status(self) -> str:
-        """Allow overriding the visit status in case of partial load"""
-        if self._visit_status is not None:
-            return self._visit_status
-        return super().visit_status()
-
-    def get_revision_id_from_hg_nodeid(self, hg_nodeid: HgNodeId) -> Sha1Git:
-        """Return the git sha1 of a revision given its hg nodeid.
-
-        Args:
-            hg_nodeid: the hg nodeid of the revision.
-
-        Returns:
-            the sha1_git of the revision.
-        """
-
-        from_cache = self._revision_nodeid_to_sha1git.get(hg_nodeid)
-        if from_cache is not None:
-            return from_cache
-        # The parent was not loaded in this run, get it from storage
-        from_storage = [
-            extid
-            for extid in self.storage.extid_get_from_extid(
-                EXTID_TYPE, ids=[hg_nodeid], version=EXTID_VERSION
-            )
-        ]
-
-        msg = "Expected 1 match from storage for hg node %r, got %d"
-        assert len(from_storage) == 1, msg % (hg_nodeid.hex(), len(from_storage))
-        return from_storage[0].target.object_id
-
-    def get_revision_parents(self, rev_ctx: hgutil.BaseContext) -> Tuple[Sha1Git, ...]:
-        """Return the git sha1 of the parent revisions.
-
-        Args:
-            hg_nodeid: the hg nodeid of the revision.
-
-        Returns:
-            the sha1_git of the parent revisions.
-        """
-        parents = []
-        for parent_ctx in rev_ctx.parents():
-            parent_hg_nodeid = parent_ctx.node()
-            # nullid is the value of a parent that does not exist
-            if parent_hg_nodeid == hgutil.NULLID:
-                continue
-            revision_id = self.get_revision_id_from_hg_nodeid(parent_hg_nodeid)
-            parents.append(revision_id)
-
-        return tuple(parents)
-
-    def store_revision(self, rev_ctx: hgutil.BaseContext) -> None:
-        """Store a revision given its hg nodeid.
-
-        Args:
-            rev_ctx: the he revision context.
-
-        Returns:
-            the sha1_git of the stored revision.
-        """
-        hg_nodeid = rev_ctx.node()
-
-        root_sha1git = self.store_directories(rev_ctx)
-
-        # `Person.from_fullname` is compatible with mercurial's freeform author
-        # as fullname is what is used in revision hash when available.
-        author = Person.from_fullname(rev_ctx.user())
-
-        (timestamp, offset) = rev_ctx.date()
-
-        # TimestampWithTimezone.from_dict will change name
-        # as it accept more than just dicts
-        rev_date = TimestampWithTimezone.from_dict(int(timestamp))
-
-        extra_headers = [
-            (b"time_offset_seconds", str(offset).encode(),),
-        ]
-        for key, value in rev_ctx.extra().items():
-            # The default branch is skipped to match
-            # the historical implementation
-            if key == b"branch" and value == b"default":
-                continue
-
-            # transplant_source is converted to match
-            # the historical implementation
-            if key == b"transplant_source":
-                value = hash_to_bytehex(value)
-            extra_headers.append((key, value))
-
-        revision = Revision(
-            author=author,
-            date=rev_date,
-            committer=author,
-            committer_date=rev_date,
-            type=RevisionType.MERCURIAL,
-            directory=root_sha1git,
-            message=rev_ctx.description(),
-            extra_headers=tuple(extra_headers),
-            synthetic=False,
-            parents=self.get_revision_parents(rev_ctx),
-        )
-
-        self._revision_nodeid_to_sha1git[hg_nodeid] = revision.id
-        self.storage.revision_add([revision])
-
-        # Save the mapping from SWHID to hg id
-        revision_swhid = identifiers.CoreSWHID(
-            object_type=identifiers.ObjectType.REVISION, object_id=revision.id,
-        )
-        self.storage.extid_add(
-            [
-                ExtID(
-                    extid_type=EXTID_TYPE,
-                    extid_version=EXTID_VERSION,
-                    extid=hg_nodeid,
-                    target=revision_swhid,
-                )
-            ]
-        )
-
-    def store_release(self, name: bytes, target: Sha1Git) -> Sha1Git:
-        """Store a release given its name and its target.
-
-        A release correspond to a user defined tag in mercurial.
-        The mercurial api as a `tip` tag that must be ignored.
-
-        Args:
-            name: name of the release.
-            target: sha1_git of the target revision.
-
-        Returns:
-            the sha1_git of the stored release.
-        """
-        release = Release(
-            name=name,
-            target=target,
-            target_type=ObjectType.REVISION,
-            message=None,
-            metadata=None,
-            synthetic=False,
-            author=Person(name=None, email=None, fullname=b""),
-            date=None,
-        )
-
-        self.storage.release_add([release])
-
-        return release.id
-
-    def store_content(self, rev_ctx: hgutil.BaseContext, file_path: bytes) -> Content:
-        """Store a revision content hg nodeid and file path.
-
-        Content is a mix of file content at a given revision
-        and its permissions found in the changeset's manifest.
-
-        Args:
-            rev_ctx: the he revision context.
-            file_path: the hg path of the content.
-
-        Returns:
-            the sha1_git of the top level directory.
-        """
-        hg_nodeid = rev_ctx.node()
-        file_ctx = rev_ctx[file_path]
-
-        try:
-            file_nodeid = file_ctx.filenode()
-        except hgutil.LookupError:
-            # TODO
-            # Raising CorruptedRevision avoid crashing the whole loading
-            # but can lead to a lot of missing revisions.
-            # SkippedContent could be used but need actual content to calculate its id.
-            # Maybe the hg_nodeid can be used instead.
-            # Another option could be to just ignore the missing content.
-            # This point is left to future commits.
-            # Check for other uses to apply the same logic there.
-            raise CorruptedRevision(hg_nodeid)
-
-        perms = FLAG_PERMS[file_ctx.flags()]
-
-        # Key is file_nodeid + perms because permissions does not participate
-        # in content hash in hg while it is the case in swh.
-        cache_key = (file_nodeid, perms)
-
-        sha1_git = self._content_hash_cache.get(cache_key)
-        if sha1_git is None:
-            try:
-                data = file_ctx.data()
-            except hgutil.error.RevlogError:
-                # TODO
-                # See above use of `CorruptedRevision`
-                raise CorruptedRevision(hg_nodeid)
-
-            content = ModelContent.from_data(data)
-
-            self.storage.content_add([content])
-
-            sha1_git = content.sha1_git
-            self._content_hash_cache[cache_key] = sha1_git
-
-        # Here we make sure to return only necessary data.
-        return Content({"sha1_git": sha1_git, "perms": perms})
-
-    def store_directories(self, rev_ctx: hgutil.BaseContext) -> Sha1Git:
-        """Store a revision directories given its hg nodeid.
-
-        Mercurial as no directory as in git. A Git like tree must be build
-        from file paths to obtain each directory hash.
-
-        Args:
-            rev_ctx: the he revision context.
-
-        Returns:
-            the sha1_git of the top level directory.
-        """
-        repo: hgutil.Repository = self._repo  # mypy can't infer that repo is not None
-        prev_ctx = repo[self._last_hg_nodeid]
-
-        # TODO maybe do diff on parents
-        try:
-            status = prev_ctx.status(rev_ctx)
-        except hgutil.error.LookupError:
-            raise CorruptedRevision(rev_ctx.node())
-
-        for file_path in status.removed:
-            try:
-                del self._last_root[file_path]
-            except KeyError:
-                raise CorruptedRevision(rev_ctx.node())
-
-        for file_path in status.added:
-            content = self.store_content(rev_ctx, file_path)
-            self._last_root[file_path] = content
-
-        for file_path in status.modified:
-            content = self.store_content(rev_ctx, file_path)
-            self._last_root[file_path] = content
-
-        self._last_hg_nodeid = rev_ctx.node()
-
-        directories: Deque[Directory] = deque([self._last_root])
-        while directories:
-            directory = directories.pop()
-            self.storage.directory_add([directory.to_model()])
-            directories.extend(
-                [item for item in directory.values() if isinstance(item, Directory)]
-            )
-
-        return self._last_root.hash
-
-
-class HgArchiveLoaderFromDisk(HgLoaderFromDisk):
-    """Mercurial loader for repository wrapped within tarballs."""
-
-    def __init__(
-        self,
-        storage: StorageInterface,
-        url: str,
-        visit_date: Optional[datetime] = None,
-        archive_path: str = None,
-        temp_directory: str = "/tmp",
-        max_content_size: Optional[int] = None,
-    ):
-        super().__init__(
-            storage=storage,
-            url=url,
-            visit_date=visit_date,
-            logging_class="swh.loader.mercurial.ArchiveLoaderFromDisk",
-            temp_directory=temp_directory,
-            max_content_size=max_content_size,
-        )
-        self.archive_extract_temp_dir = None
-        self.archive_path = archive_path
-
-    def prepare(self):
-        """Extract the archive instead of cloning."""
-        self.archive_extract_temp_dir = tmp_extract(
-            archive=self.archive_path,
-            dir=self._temp_directory,
-            prefix=TEMPORARY_DIR_PREFIX_PATTERN,
-            suffix=f".dump-{os.getpid()}",
-            log=self.log,
-            source=self.origin_url,
-        )
-
-        repo_name = os.listdir(self.temp_dir)[0]
-        self.directory = os.path.join(self.archive_extract_temp_dir, repo_name)
-        super().prepare()
-
-
-# Allow direct usage of the loader from the command line with
-# `python -m swh.loader.mercurial.from_disk $ORIGIN_URL`
-if __name__ == "__main__":
-    import logging
-
-    import click
-
-    logging.basicConfig(
-        level=logging.DEBUG, format="%(asctime)s %(process)d %(message)s"
-    )
-
-    @click.command()
-    @click.option("--origin-url", help="origin url")
-    @click.option("--hg-directory", help="Path to mercurial repository to load")
-    @click.option("--visit-date", default=None, help="Visit date")
-    def main(origin_url, hg_directory, visit_date):
-        from swh.storage import get_storage
-
-        storage = get_storage(cls="memory")
-        return HgLoaderFromDisk(
-            storage,
-            origin_url,
-            directory=hg_directory,
-            visit_date=parse_visit_date(visit_date),
-        ).load()
-
-    main()
diff --git a/swh/loader/mercurial/loader.py b/swh/loader/mercurial/loader.py
--- a/swh/loader/mercurial/loader.py
+++ b/swh/loader/mercurial/loader.py
@@ -1,51 +1,23 @@
-# Copyright (C) 2017-2021  The Software Heritage developers
+# Copyright (C) 2020-2021  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
-"""This document contains a SWH loader for ingesting repository data
-from Mercurial version 2 bundle files.
-
-"""
-
-# NOTE: The code here does expensive work twice in places because of the
-# intermediate need to check for what is missing before sending to the database
-# and the desire to not juggle very large amounts of data.
-
-# TODO: Decide whether to also serialize to disk and read back more quickly
-#       from there. Maybe only for very large repos and fast drives.
-# - Avi
-
-
-import datetime
+from collections import deque
+from datetime import datetime
 import os
-from queue import Empty
-import random
-import re
 from shutil import rmtree
 from tempfile import mkdtemp
-import time
-from typing import Any, Dict, Iterable, List, Optional
-
-import billiard
-import hglib
-from hglib.error import CommandError
+from typing import Deque, Dict, Iterator, List, Optional, Set, Tuple, TypeVar, Union
 
-from swh.loader.core.loader import DVCSLoader
+from swh.core.utils import grouper
+from swh.loader.core.loader import BaseLoader
 from swh.loader.core.utils import clean_dangling_folders
-from swh.loader.exception import NotFound
 from swh.loader.mercurial.utils import get_minimum_env
 from swh.model import identifiers
-from swh.model.hashutil import (
-    DEFAULT_ALGORITHMS,
-    MultiHash,
-    hash_to_bytehex,
-    hash_to_bytes,
-)
+from swh.model.from_disk import Content, DentryPerms, Directory
+from swh.model.hashutil import hash_to_bytehex
 from swh.model.model import (
-    BaseContent,
-    Content,
-    Directory,
     ExtID,
     ObjectType,
     Origin,
@@ -54,615 +26,736 @@
     Revision,
     RevisionType,
     Sha1Git,
-    SkippedContent,
     Snapshot,
     SnapshotBranch,
     TargetType,
     TimestampWithTimezone,
 )
-from swh.storage.algos.origin import origin_get_latest_visit_status
+from swh.model.model import Content as ModelContent
+from swh.storage.algos.snapshot import snapshot_get_latest
 from swh.storage.interface import StorageInterface
 
-from . import converters
+from . import hgutil
 from .archive_extract import tmp_extract
-from .bundle20_reader import Bundle20Reader
-from .converters import PRIMARY_ALGO as ALGO
-from .objects import SelectiveCache, SimpleTree
-
-TAG_PATTERN = re.compile("[0-9A-Fa-f]{40}")
+from .hgutil import HgFilteredSet, HgNodeId, HgSpanSet
 
-TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial."
+FLAG_PERMS = {
+    b"l": DentryPerms.symlink,
+    b"x": DentryPerms.executable_content,
+    b"": DentryPerms.content,
+}  # type: Dict[bytes, DentryPerms]
 
-HEAD_POINTER_NAME = b"tip"
+TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial.loader"
 
 EXTID_TYPE = "hg-nodeid"
+EXTID_VERSION: int = 1
+
+T = TypeVar("T")
+
+
+class CorruptedRevision(ValueError):
+    """Raised when a revision is corrupted."""
 
+    def __init__(self, hg_nodeid: HgNodeId) -> None:
+        super().__init__(hg_nodeid.hex())
+        self.hg_nodeid = hg_nodeid
 
-class CommandErrorWrapper(Exception):
-    """This exception is raised in place of a 'CommandError'
-       exception (raised by the underlying hglib library)
 
-       This is needed because billiard.Queue is serializing the
-       queued object and as CommandError doesn't have a constructor without
-       parameters, the deserialization is failing
+class HgDirectory(Directory):
+    """A more practical directory.
+
+    - creates missing parent directories
+    - removes empty directories
     """
 
-    def __init__(self, err: Optional[bytes]):
-        self.err = err
+    def __setitem__(self, path: bytes, value: Union[Content, "HgDirectory"]) -> None:
+        if b"/" in path:
+            head, tail = path.split(b"/", 1)
 
+            directory = self.get(head)
+            if directory is None or isinstance(directory, Content):
+                directory = HgDirectory()
+                self[head] = directory
 
-class CloneTimeoutError(Exception):
-    pass
+            directory[tail] = value
+        else:
+            super().__setitem__(path, value)
 
+    def __delitem__(self, path: bytes) -> None:
+        super().__delitem__(path)
 
-class HgBundle20Loader(DVCSLoader):
-    """Mercurial loader able to deal with remote or local repository.
+        while b"/" in path:  # remove empty parent directories
+            path = path.rsplit(b"/", 1)[0]
+            if len(self[path]) == 0:
+                super().__delitem__(path)
+            else:
+                break
 
+    def get(
+        self, path: bytes, default: Optional[T] = None
+    ) -> Optional[Union[Content, "HgDirectory", T]]:
+        # TODO move to swh.model.from_disk.Directory
+        try:
+            return self[path]
+        except KeyError:
+            return default
+
+
+class HgLoader(BaseLoader):
+    """Load a mercurial repository from a local repository.
+
+    Mercurial's branching model is more complete than Git's; it allows for multiple
+    heads per branch, closed heads and bookmarks. The following mapping is used to
+    represent the branching state of a Mercurial project in a given snapshot:
+
+    - `HEAD` (optional) either the node pointed by the `@` bookmark or the tip of
+      the `default` branch
+    - `branch-tip/<branch-name>` (required) the first head of the branch, sorted by
+      nodeid if there are multiple heads.
+    - `bookmarks/<bookmark_name>` (optional) holds the bookmarks mapping if any
+    - `branch-heads/<branch_name>/0..n` (optional) for any branch with multiple open
+      heads, list all *open* heads
+    - `branch-closed-heads/<branch_name>/0..n` (optional) for any branch with at least
+      one closed head, list all *closed* heads
+    - `tags/<tag-name>` (optional) record tags
+
+    The format is not ambiguous regardless of branch name since we know it ends with a
+    `/<index>`, as long as we have a stable sorting of the heads (we sort by nodeid).
+    There may be some overlap between the refs, but it's simpler not to try to figure
+    out de-duplication.
+    However, to reduce the redundancy between snapshot branches in the most common case,
+    when a branch has a single open head, it will only be referenced as
+    `branch-tip/<branch-name>`. The `branch-heads/` hierarchy only appears when a branch
+    has multiple open heads, which we consistently sort by increasing nodeid.
+    The `branch-closed-heads/` hierarchy is also sorted by increasing nodeid.
     """
 
+    CONFIG_BASE_FILENAME = "loader/mercurial"
+
     visit_type = "hg"
 
     def __init__(
         self,
         storage: StorageInterface,
         url: str,
-        visit_date: Optional[datetime.datetime] = None,
         directory: Optional[str] = None,
-        logging_class="swh.loader.mercurial.Bundle20Loader",
-        bundle_filename: Optional[str] = "HG20_none_bundle",
-        reduce_effort: bool = False,
+        logging_class: str = "swh.loader.mercurial.LoaderFromDisk",
+        visit_date: Optional[datetime] = None,
         temp_directory: str = "/tmp",
-        cache1_size: int = 800 * 1024 * 1024,
-        cache2_size: int = 800 * 1024 * 1024,
         clone_timeout_seconds: int = 7200,
-        save_data_path: Optional[str] = None,
+        content_cache_size: int = 10_000,
         max_content_size: Optional[int] = None,
     ):
+        """Initialize the loader.
+
+        Args:
+            url: url of the repository.
+            directory: directory of the local repository.
+            logging_class: class of the loader logger.
+            visit_date: visit date of the repository
+            config: loader configuration
+        """
         super().__init__(
             storage=storage,
             logging_class=logging_class,
-            save_data_path=save_data_path,
             max_content_size=max_content_size,
         )
+
+        self._temp_directory = temp_directory
+        self._clone_timeout = clone_timeout_seconds
+
         self.origin_url = url
         self.visit_date = visit_date
         self.directory = directory
-        self.bundle_filename = bundle_filename
-        self.reduce_effort_flag = reduce_effort
-        self.empty_repository = None
-        self.temp_directory = temp_directory
-        self.cache1_size = cache1_size
-        self.cache2_size = cache2_size
-        self.clone_timeout = clone_timeout_seconds
-        self.working_directory = None
-        self.bundle_path = None
-        self.heads: Dict[bytes, Any] = {}
-        self.releases: Dict[bytes, Any] = {}
-        self.last_snapshot_id: Optional[bytes] = None
-        self.old_environ = os.environ.copy()
-        os.environ.clear()
-        os.environ.update(get_minimum_env())
 
-    def pre_cleanup(self):
-        """Cleanup potential dangling files from prior runs (e.g. OOM killed
-           tasks)
+        self._repo: Optional[hgutil.Repository] = None
+        self._revision_nodeid_to_sha1git: Dict[HgNodeId, Sha1Git] = {}
+        self._repo_directory: Optional[str] = None
+
+        # keeps the last processed hg nodeid
+        # it is used for differential tree update by store_directories
+        # NULLID is the parent of the first revision
+        self._last_hg_nodeid = hgutil.NULLID
+
+        # keeps the last revision tree
+        # it is used for differential tree update by store_directories
+        self._last_root = HgDirectory()
+
+        # Cache the content hash across revisions to avoid recalculation.
+        self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict(
+            content_cache_size,
+        )
+
+        # hg node id of the latest snapshot branch heads
+        # used to find what are the new revisions since last snapshot
+        self._latest_heads: List[HgNodeId] = []
+        # hg node ids of all the tags recorded on previous runs
+        # Used to compute which tags need to be added, even across incremental runs
+        # that might separate a changeset introducing a tag from its target.
+        self._saved_tags: Set[HgNodeId] = set()
+
+        self._load_status = "eventful"
+        # If set, will override the default value
+        self._visit_status = None
+
+    def pre_cleanup(self) -> None:
+        """As a first step, will try and check for dangling data to cleanup.
+        This should do its best to avoid raising issues.
 
         """
         clean_dangling_folders(
-            self.temp_directory,
+            self._temp_directory,
             pattern_check=TEMPORARY_DIR_PREFIX_PATTERN,
             log=self.log,
         )
 
-    def cleanup(self):
-        """Clean temporary working directory
+        self.old_environ = os.environ.copy()
+        os.environ.clear()
+        os.environ.update(get_minimum_env())
 
-        """
+    def cleanup(self) -> None:
+        """Last step executed by the loader."""
         os.environ.clear()
         os.environ.update(self.old_environ)
-        if self.bundle_path and os.path.exists(self.bundle_path):
-            self.log.debug("Cleanup up working bundle %s" % self.bundle_path)
-            os.unlink(self.bundle_path)
-        if self.working_directory and os.path.exists(self.working_directory):
-            self.log.debug(
-                "Cleanup up working directory %s" % (self.working_directory,)
-            )
-            rmtree(self.working_directory)
 
-    def get_heads(self, repo):
-        """Read the closed branches heads (branch, bookmarks) and returns a
-           dict with key the branch_name (bytes) and values the tuple
-           (pointer nature (bytes), mercurial's node id
-           (bytes)). Those needs conversion to swh-ids. This is taken
-           care of in get_revisions.
+        # Don't cleanup if loading from a local directory
+        was_remote = self.directory is None
+        if was_remote and self._repo_directory and os.path.exists(self._repo_directory):
+            self.log.debug(f"Cleanup up repository {self._repo_directory}")
+            rmtree(self._repo_directory)
+
+    def prepare_origin_visit(self) -> None:
+        """First step executed by the loader to prepare origin and visit
+        references. Set/update self.origin, and
+        optionally self.origin_url, self.visit_date.
 
         """
-        b = {}
-        for _, node_hash_id, pointer_nature, branch_name, *_ in repo.heads():
-            b[branch_name] = (pointer_nature, hash_to_bytes(node_hash_id.decode()))
+        self.origin = Origin(url=self.origin_url)
 
-        bookmarks = repo.bookmarks()
-        if bookmarks and bookmarks[0]:
-            for bookmark_name, _, target_short in bookmarks[0]:
-                target = repo[target_short].node()
-                b[bookmark_name] = (None, hash_to_bytes(target.decode()))
+    def prepare(self) -> None:
+        """Second step executed by the loader to prepare some state needed by
+        the loader.
 
-        return b
+        """
+        # Set here to allow multiple calls to load on the same loader instance
+        self._latest_heads = []
 
-    def prepare_origin_visit(self) -> None:
-        self.origin = Origin(url=self.origin_url)
-        visit_status = origin_get_latest_visit_status(
-            self.storage, self.origin_url, require_snapshot=True
+        latest_snapshot = snapshot_get_latest(self.storage, self.origin_url)
+        if latest_snapshot:
+            self._set_recorded_state(latest_snapshot)
+
+    def _set_recorded_state(self, latest_snapshot: Snapshot) -> None:
+        """
+        Looks up the nodeid for all revisions in the snapshot via extid_get_from_target,
+        and adds them to `self._latest_heads`.
+        Also looks up the currently saved releases ("tags" in Mercurial speak).
+        The tags are all listed for easy comparison at the end, while only the latest
+        heads are needed for revisions.
+        """
+        heads = []
+        tags = []
+
+        for branch in latest_snapshot.branches.values():
+            if branch.target_type == TargetType.REVISION:
+                heads.append(branch.target)
+            elif branch.target_type == TargetType.RELEASE:
+                tags.append(branch.target)
+
+        self._latest_heads.extend(
+            HgNodeId(extid.extid) for extid in self._get_extids_for_targets(heads)
         )
-        self.last_snapshot_id = None if visit_status is None else visit_status.snapshot
+        self._saved_tags.update(
+            HgNodeId(extid.extid) for extid in self._get_extids_for_targets(tags)
+        )
+
+    def _get_extids_for_targets(self, targets: List[Sha1Git]) -> List[ExtID]:
+        """Get all Mercurial ExtIDs for the targets in the latest snapshot"""
+        extids = []
+        for extid in self.storage.extid_get_from_target(
+            identifiers.ObjectType.REVISION,
+            targets,
+            extid_type=EXTID_TYPE,
+            extid_version=EXTID_VERSION,
+        ):
+            extids.append(extid)
+            self._revision_nodeid_to_sha1git[
+                HgNodeId(extid.extid)
+            ] = extid.target.object_id
+
+        if extids:
+            # Filter out dangling extids, we need to load their target again
+            revisions_missing = self.storage.revision_missing(
+                [extid.target.object_id for extid in extids]
+            )
+            extids = [
+                extid
+                for extid in extids
+                if extid.target.object_id not in revisions_missing
+            ]
+        return extids
 
-    @staticmethod
-    def clone_with_timeout(log, origin, destination, timeout):
-        queue = billiard.Queue()
-        start = time.monotonic()
+    def _get_extids_for_hgnodes(self, hgnode_ids: List[HgNodeId]) -> List[ExtID]:
+        """Get all Mercurial ExtIDs for the mercurial nodes in the list which point to
+           a known revision.
 
-        def do_clone(queue, origin, destination):
-            try:
-                result = hglib.clone(source=origin, dest=destination, noupdate=True)
-            except CommandError as e:
-                # the queued object need an empty constructor to be deserialized later
-                queue.put(CommandErrorWrapper(e.err))
-            except BaseException as e:
-                queue.put(e)
-            else:
-                queue.put(result)
+        """
+        extids = []
+
+        for group_ids in grouper(hgnode_ids, n=1000):
+            for extid in self.storage.extid_get_from_extid(
+                EXTID_TYPE, group_ids, version=EXTID_VERSION
+            ):
+                extids.append(extid)
+                self._revision_nodeid_to_sha1git[
+                    HgNodeId(extid.extid)
+                ] = extid.target.object_id
+
+        if extids:
+            # Filter out dangling extids, we need to load their target again
+            revisions_missing = self.storage.revision_missing(
+                [extid.target.object_id for extid in extids]
+            )
+            extids = [
+                extid
+                for extid in extids
+                if extid.target.object_id not in revisions_missing
+            ]
+        return extids
 
-        process = billiard.Process(target=do_clone, args=(queue, origin, destination))
-        process.start()
+    def fetch_data(self) -> bool:
+        """Fetch the data from the source the loader is currently loading
 
-        while True:
-            try:
-                result = queue.get(timeout=0.1)
-                break
-            except Empty:
-                duration = time.monotonic() - start
-                if timeout and duration > timeout:
-                    log.warning(
-                        "Timeout cloning `%s` within %s seconds", origin, timeout
-                    )
-                    process.terminate()
-                    process.join()
-                    raise CloneTimeoutError(origin, timeout)
-                continue
+        Returns:
+            a value that is interpreted as a boolean. If True, fetch_data needs
+            to be called again to complete loading.
 
-        process.join()
+        """
+        if not self.directory:  # no local repository
+            self._repo_directory = mkdtemp(
+                prefix=TEMPORARY_DIR_PREFIX_PATTERN,
+                suffix=f"-{os.getpid()}",
+                dir=self._temp_directory,
+            )
+            self.log.debug(
+                f"Cloning {self.origin_url} to {self.directory} "
+                f"with timeout {self._clone_timeout} seconds"
+            )
+            hgutil.clone(self.origin_url, self._repo_directory, self._clone_timeout)
+        else:  # existing local repository
+            # Allow to load on disk repository without cloning
+            # for testing purpose.
+            self._repo_directory = self.directory
 
-        if isinstance(result, Exception):
-            raise result from None
+        self._repo = hgutil.repository(self._repo_directory)
 
-        return result
+        return False
 
-    def prepare(self):
-        """Prepare the necessary steps to load an actual remote or local
-           repository.
+    def _new_revs(self, heads: List[HgNodeId]) -> Union[HgFilteredSet, HgSpanSet]:
+        """Return unseen revisions. That is, filter out revisions that are not ancestors of
+        heads"""
+        assert self._repo is not None
+        existing_heads = []
 
-           To load a local repository, pass the optional directory
-           parameter as filled with a path to a real local folder.
+        for hg_nodeid in heads:
+            try:
+                rev = self._repo[hg_nodeid].rev()
+                existing_heads.append(rev)
+            except KeyError:  # the node does not exist anymore
+                pass
 
-           To load a remote repository, pass the optional directory
-           parameter as None.
+        # select revisions that are not ancestors of heads
+        # and not the heads themselves
+        new_revs = self._repo.revs("not ::(%ld)", existing_heads)
 
-        Args:
-            origin_url (str): Origin url to load
-            visit_date (str/datetime): Date of the visit
-            directory (str/None): The local directory to load
+        if new_revs:
+            self.log.info("New revisions found: %d", len(new_revs))
 
-        """
-        self.branches = {}
-        self.tags = []
-        self.releases = {}
-        self.node_2_rev = {}
-        self.heads = {}
-        self.extids = []
+        return new_revs
 
-        directory = self.directory
+    def get_hg_revs_to_load(self) -> Iterator[int]:
+        """Yield hg revision numbers to load.
 
-        if not directory:  # remote repository
-            self.working_directory = mkdtemp(
-                prefix=TEMPORARY_DIR_PREFIX_PATTERN,
-                suffix="-%s" % os.getpid(),
-                dir=self.temp_directory,
-            )
-            os.makedirs(self.working_directory, exist_ok=True)
-            self.hgdir = self.working_directory
+        """
+        assert self._repo is not None
+        repo: hgutil.Repository = self._repo
+
+        seen_revs: Set[int] = set()
+        # 1. use snapshot to reuse existing seen heads from it
+        if self._latest_heads:
+            for rev in self._new_revs(self._latest_heads):
+                seen_revs.add(rev)
+                yield rev
+
+        # 2. Then filter out remaining revisions through the overall extid mappings
+        # across hg origins
+        revs_left = repo.revs("all() - ::(%ld)", seen_revs)
+        hg_nodeids = [repo[nodeid].node() for nodeid in revs_left]
+        yield from self._new_revs(
+            [
+                HgNodeId(extid.extid)
+                for extid in self._get_extids_for_hgnodes(hg_nodeids)
+            ]
+        )
 
-            self.log.debug(
-                "Cloning %s to %s with timeout %s seconds",
-                self.origin_url,
-                self.hgdir,
-                self.clone_timeout,
-            )
+    def store_data(self):
+        """Store fetched data in the database."""
+        revs = self.get_hg_revs_to_load()
+        length_ingested_revs = 0
+
+        assert self._repo is not None
+        repo = self._repo
 
+        ignored_revs: Set[int] = set()
+        for rev in revs:
+            if rev in ignored_revs:
+                continue
             try:
-                self.clone_with_timeout(
-                    self.log, self.origin_url, self.hgdir, self.clone_timeout
+                self.store_revision(repo[rev])
+                length_ingested_revs += 1
+            except CorruptedRevision as e:
+                self._visit_status = "partial"
+                self.log.warning("Corrupted revision %s", e)
+                descendents = repo.revs("(%ld)::", [rev])
+                ignored_revs.update(descendents)
+
+        if length_ingested_revs == 0:
+            # no new revision ingested, so uneventful
+            # still we'll make a snapshot, so we continue
+            self._load_status = "uneventful"
+
+        branching_info = hgutil.branching_info(repo, ignored_revs)
+        tags_by_name: Dict[bytes, HgNodeId] = repo.tags()
+
+        snapshot_branches: Dict[bytes, SnapshotBranch] = {}
+
+        for tag_name, hg_nodeid in tags_by_name.items():
+            if tag_name == b"tip":
+                # `tip` is listed in the tags by the Mercurial API but its not a tag
+                # defined by the user in `.hgtags`.
+                continue
+            if hg_nodeid not in self._saved_tags:
+                label = b"tags/%s" % tag_name
+                target = self.get_revision_id_from_hg_nodeid(hg_nodeid)
+                snapshot_branches[label] = SnapshotBranch(
+                    target=self.store_release(tag_name, target),
+                    target_type=TargetType.RELEASE,
                 )
-            except CommandErrorWrapper as e:
-                for msg in [
-                    b"does not appear to be an hg repository",
-                    b"404: not found",
-                    b"name or service not known",
-                ]:
-                    if msg in e.err.lower():
-                        raise NotFound(e.args[0]) from None
-                raise e
-
-        else:  # local repository
-            self.working_directory = None
-            self.hgdir = directory
-
-        self.bundle_path = os.path.join(self.hgdir, self.bundle_filename)
-        self.log.debug("Bundling at %s" % self.bundle_path)
-
-        with hglib.open(self.hgdir) as repo:
-            self.heads = self.get_heads(repo)
-            repo.bundle(bytes(self.bundle_path, "utf-8"), all=True, type=b"none-v2")
-
-        self.cache_filename1 = os.path.join(
-            self.hgdir, "swh-cache-1-%s" % (hex(random.randint(0, 0xFFFFFF))[2:],)
-        )
-        self.cache_filename2 = os.path.join(
-            self.hgdir, "swh-cache-2-%s" % (hex(random.randint(0, 0xFFFFFF))[2:],)
-        )
 
-        try:
-            self.br = Bundle20Reader(
-                bundlefile=self.bundle_path,
-                cache_filename=self.cache_filename1,
-                cache_size=self.cache1_size,
+        for branch_name, node_id in branching_info.tips.items():
+            name = b"branch-tip/%s" % branch_name
+            target = self.get_revision_id_from_hg_nodeid(node_id)
+            snapshot_branches[name] = SnapshotBranch(
+                target=target, target_type=TargetType.REVISION
             )
-        except FileNotFoundError:
-            # Empty repository! Still a successful visit targeting an
-            # empty snapshot
-            self.log.warn("%s is an empty repository!" % self.hgdir)
-            self.empty_repository = True
-        else:
-            self.reduce_effort = set()
-            if self.reduce_effort_flag:
-                now = datetime.datetime.now(tz=datetime.timezone.utc)
-                if (now - self.visit_date).days > 1:
-                    # Assuming that self.visit_date would be today for
-                    # a new visit, treat older visit dates as
-                    # indication of wanting to skip some processing
-                    # effort.
-                    for header, commit in self.br.yield_all_changesets():
-                        ts = commit["time"].timestamp()
-                        if ts < self.visit_date.timestamp():
-                            self.reduce_effort.add(header["node"])
-
-    def has_contents(self):
-        return not self.empty_repository
-
-    def has_directories(self):
-        return not self.empty_repository
-
-    def has_revisions(self):
-        return not self.empty_repository
-
-    def has_releases(self):
-        return not self.empty_repository
-
-    def fetch_data(self):
-        """Fetch the data from the data source."""
-        pass
-
-    def get_contents(self) -> Iterable[BaseContent]:
-        """Get the contents that need to be loaded."""
-
-        # NOTE: This method generates blobs twice to reduce memory usage
-        # without generating disk writes.
-        self.file_node_to_hash = {}
-        hash_to_info = {}
-        self.num_contents = 0
-        contents: Dict[bytes, BaseContent] = {}
-        missing_contents = set()
-
-        for blob, node_info in self.br.yield_all_blobs():
-            self.num_contents += 1
-            file_name = node_info[0]
-            header = node_info[2]
-
-            length = len(blob)
-            if header["linknode"] in self.reduce_effort:
-                algorithms = set([ALGO])
-            else:
-                algorithms = DEFAULT_ALGORITHMS
-            h = MultiHash.from_data(blob, hash_names=algorithms)
-            content = h.digest()
-            content["length"] = length
-            blob_hash = content[ALGO]
-            self.file_node_to_hash[header["node"]] = blob_hash
-
-            if header["linknode"] in self.reduce_effort:
-                continue
 
-            hash_to_info[blob_hash] = node_info
-            if self.max_content_size is not None and length >= self.max_content_size:
-                contents[blob_hash] = SkippedContent(
-                    status="absent", reason="Content too large", **content
-                )
-            else:
-                contents[blob_hash] = Content(data=blob, status="visible", **content)
+        for bookmark_name, node_id in branching_info.bookmarks.items():
+            name = b"bookmarks/%s" % bookmark_name
+            target = self.get_revision_id_from_hg_nodeid(node_id)
+            snapshot_branches[name] = SnapshotBranch(
+                target=target, target_type=TargetType.REVISION
+            )
 
-            if file_name == b".hgtags":
-                # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model
-                # overwrite until the last one
-                self.tags = (t for t in blob.split(b"\n") if t != b"")
+        for branch_name, branch_heads in branching_info.open_heads.items():
+            for index, head in enumerate(branch_heads):
+                name = b"branch-heads/%s/%d" % (branch_name, index)
+                target = self.get_revision_id_from_hg_nodeid(head)
+                snapshot_branches[name] = SnapshotBranch(
+                    target=target, target_type=TargetType.REVISION
+                )
 
-        if contents:
-            missing_contents = set(
-                self.storage.content_missing(
-                    [c.to_dict() for c in contents.values()], key_hash=ALGO
+        for branch_name, closed_heads in branching_info.closed_heads.items():
+            for index, head in enumerate(closed_heads):
+                name = b"branch-closed-heads/%s/%d" % (branch_name, index)
+                target = self.get_revision_id_from_hg_nodeid(head)
+                snapshot_branches[name] = SnapshotBranch(
+                    target=target, target_type=TargetType.REVISION
                 )
+
+        # If the repo is broken enough or if it has none of the "normal" default
+        # mechanisms, we ignore `HEAD`.
+        default_branch_alias = branching_info.default_branch_alias
+        if default_branch_alias is not None:
+            snapshot_branches[b"HEAD"] = SnapshotBranch(
+                target=default_branch_alias, target_type=TargetType.ALIAS,
             )
+        snapshot = Snapshot(branches=snapshot_branches)
+        self.storage.snapshot_add([snapshot])
+
+        self.flush()
+        self.loaded_snapshot_id = snapshot.id
 
-        # Clusters needed blobs by file offset and then only fetches the
-        # groups at the needed offsets.
-        focs: Dict[int, Dict[bytes, bytes]] = {}  # "file/offset/contents"
-        for blob_hash in missing_contents:
-            _, file_offset, header = hash_to_info[blob_hash]
-            focs.setdefault(file_offset, {})
-            focs[file_offset][header["node"]] = blob_hash
-
-        for offset, node_hashes in sorted(focs.items()):
-            for header, data, *_ in self.br.yield_group_objects(group_offset=offset):
-                node = header["node"]
-                if node in node_hashes:
-                    blob, meta = self.br.extract_meta_from_blob(data)
-                    content = contents.pop(node_hashes[node], None)
-                    if content:
-                        if (
-                            self.max_content_size is not None
-                            and len(blob) >= self.max_content_size
-                        ):
-                            yield SkippedContent.from_data(
-                                blob, reason="Content too large"
-                            )
-                        else:
-                            yield Content.from_data(blob)
-
-    def load_directories(self):
-        """This is where the work is done to convert manifest deltas from the
-        repository bundle into SWH directories.
+    def load_status(self) -> Dict[str, str]:
+        """Detailed loading status.
 
+        Defaults to logging an eventful load.
+
+        Returns: a dictionary that is eventually passed back as the task's
+          result to the scheduler, allowing tuning of the task recurrence
+          mechanism.
         """
-        self.mnode_to_tree_id = {}
-        cache_hints = self.br.build_manifest_hints()
+        return {
+            "status": self._load_status,
+        }
 
-        def tree_size(t):
-            return t.size()
+    def visit_status(self) -> str:
+        """Allow overriding the visit status in case of partial load"""
+        if self._visit_status is not None:
+            return self._visit_status
+        return super().visit_status()
 
-        self.trees = SelectiveCache(
-            cache_hints=cache_hints,
-            size_function=tree_size,
-            filename=self.cache_filename2,
-            max_size=self.cache2_size,
-        )
+    def get_revision_id_from_hg_nodeid(self, hg_nodeid: HgNodeId) -> Sha1Git:
+        """Return the git sha1 of a revision given its hg nodeid.
 
-        tree = SimpleTree()
-        for header, added, removed in self.br.yield_all_manifest_deltas(cache_hints):
-            node = header["node"]
-            basenode = header["basenode"]
-            tree = self.trees.fetch(basenode) or tree  # working tree
-
-            for path in removed.keys():
-                tree = tree.remove_tree_node_for_path(path)
-            for path, info in added.items():
-                file_node, is_symlink, perms_code = info
-                tree = tree.add_blob(
-                    path, self.file_node_to_hash[file_node], is_symlink, perms_code
-                )
+        Args:
+            hg_nodeid: the hg nodeid of the revision.
 
-            if header["linknode"] in self.reduce_effort:
-                self.trees.store(node, tree)
-            else:
-                new_dirs = []
-                self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs)
-                self.trees.store(node, tree)
-                yield header, tree, new_dirs
+        Returns:
+            the sha1_git of the revision.
+        """
+
+        from_cache = self._revision_nodeid_to_sha1git.get(hg_nodeid)
+        if from_cache is not None:
+            return from_cache
+        # The parent was not loaded in this run, get it from storage
+        from_storage = [
+            extid
+            for extid in self.storage.extid_get_from_extid(
+                EXTID_TYPE, ids=[hg_nodeid], version=EXTID_VERSION
+            )
+        ]
+
+        msg = "Expected 1 match from storage for hg node %r, got %d"
+        assert len(from_storage) == 1, msg % (hg_nodeid.hex(), len(from_storage))
+        return from_storage[0].target.object_id
+
+    def get_revision_parents(self, rev_ctx: hgutil.BaseContext) -> Tuple[Sha1Git, ...]:
+        """Return the git sha1 of the parent revisions.
 
-    def get_directories(self) -> Iterable[Directory]:
-        """Compute directories to load
+        Args:
+            hg_nodeid: the hg nodeid of the revision.
 
+        Returns:
+            the sha1_git of the parent revisions.
         """
-        dirs: Dict[Sha1Git, Directory] = {}
-        self.num_directories = 0
-        for _, _, new_dirs in self.load_directories():
-            for d in new_dirs:
-                self.num_directories += 1
-                dirs[d["id"]] = Directory.from_dict(d)
+        parents = []
+        for parent_ctx in rev_ctx.parents():
+            parent_hg_nodeid = parent_ctx.node()
+            # nullid is the value of a parent that does not exist
+            if parent_hg_nodeid == hgutil.NULLID:
+                continue
+            revision_id = self.get_revision_id_from_hg_nodeid(parent_hg_nodeid)
+            parents.append(revision_id)
 
-        missing_dirs: List[Sha1Git] = list(dirs.keys())
-        if missing_dirs:
-            missing_dirs = list(self.storage.directory_missing(missing_dirs))
+        return tuple(parents)
 
-        for _id in missing_dirs:
-            yield dirs[_id]
+    def store_revision(self, rev_ctx: hgutil.BaseContext) -> None:
+        """Store a revision given its hg nodeid.
 
-    def get_revisions(self) -> Iterable[Revision]:
-        """Compute revisions to load
+        Args:
+            rev_ctx: the he revision context.
 
+        Returns:
+            the sha1_git of the stored revision.
         """
-        revisions = {}
-        self.num_revisions = 0
-        for header, commit in self.br.yield_all_changesets():
-            if header["node"] in self.reduce_effort:
+        hg_nodeid = rev_ctx.node()
+
+        root_sha1git = self.store_directories(rev_ctx)
+
+        # `Person.from_fullname` is compatible with mercurial's freeform author
+        # as fullname is what is used in revision hash when available.
+        author = Person.from_fullname(rev_ctx.user())
+
+        (timestamp, offset) = rev_ctx.date()
+
+        # TimestampWithTimezone.from_dict will change name
+        # as it accept more than just dicts
+        rev_date = TimestampWithTimezone.from_dict(int(timestamp))
+
+        extra_headers = [
+            (b"time_offset_seconds", str(offset).encode(),),
+        ]
+        for key, value in rev_ctx.extra().items():
+            # The default branch is skipped to match
+            # the historical implementation
+            if key == b"branch" and value == b"default":
                 continue
 
-            self.num_revisions += 1
-            date_dict = identifiers.normalize_timestamp(int(commit["time"].timestamp()))
-            author_dict = converters.parse_author(commit["user"])
-            if commit["manifest"] == Bundle20Reader.NAUGHT_NODE:
-                directory_id = SimpleTree().hash_changed()
-            else:
-                directory_id = self.mnode_to_tree_id[commit["manifest"]]
+            # transplant_source is converted to match
+            # the historical implementation
+            if key == b"transplant_source":
+                value = hash_to_bytehex(value)
+            extra_headers.append((key, value))
+
+        revision = Revision(
+            author=author,
+            date=rev_date,
+            committer=author,
+            committer_date=rev_date,
+            type=RevisionType.MERCURIAL,
+            directory=root_sha1git,
+            message=rev_ctx.description(),
+            extra_headers=tuple(extra_headers),
+            synthetic=False,
+            parents=self.get_revision_parents(rev_ctx),
+        )
 
-            extra_headers = [
-                (
-                    b"time_offset_seconds",
-                    str(commit["time_offset_seconds"]).encode("utf-8"),
+        self._revision_nodeid_to_sha1git[hg_nodeid] = revision.id
+        self.storage.revision_add([revision])
+
+        # Save the mapping from SWHID to hg id
+        revision_swhid = identifiers.CoreSWHID(
+            object_type=identifiers.ObjectType.REVISION, object_id=revision.id,
+        )
+        self.storage.extid_add(
+            [
+                ExtID(
+                    extid_type=EXTID_TYPE,
+                    extid_version=EXTID_VERSION,
+                    extid=hg_nodeid,
+                    target=revision_swhid,
                 )
             ]
-            extra = commit.get("extra")
-            if extra:
-                for e in extra.split(b"\x00"):
-                    k, v = e.split(b":", 1)
-                    # transplant_source stores binary reference to a changeset
-                    # prefer to dump hexadecimal one in the revision metadata
-                    if k == b"transplant_source":
-                        v = hash_to_bytehex(v)
-                    extra_headers.append((k, v))
-
-            parents = []
-            p1 = self.node_2_rev.get(header["p1"])
-            p2 = self.node_2_rev.get(header["p2"])
-            if p1:
-                parents.append(p1)
-            if p2:
-                parents.append(p2)
-
-            revision = Revision(
-                author=Person.from_dict(author_dict),
-                date=TimestampWithTimezone.from_dict(date_dict),
-                committer=Person.from_dict(author_dict),
-                committer_date=TimestampWithTimezone.from_dict(date_dict),
-                type=RevisionType.MERCURIAL,
-                directory=directory_id,
-                message=commit["message"],
-                extra_headers=tuple(extra_headers),
-                synthetic=False,
-                parents=tuple(parents),
-            )
+        )
 
-            self.node_2_rev[header["node"]] = revision.id
-            revisions[revision.id] = revision
+    def store_release(self, name: bytes, target: Sha1Git) -> Sha1Git:
+        """Store a release given its name and its target.
 
-            revision_swhid = identifiers.CoreSWHID(
-                object_type=identifiers.ObjectType.REVISION, object_id=revision.id,
-            )
-            self.extids.append(
-                ExtID(
-                    extid_type=EXTID_TYPE, extid=header["node"], target=revision_swhid
-                )
-            )
+        A release correspond to a user defined tag in mercurial.
+        The mercurial api as a `tip` tag that must be ignored.
 
-        # Converts heads to use swh ids
-        self.heads = {
-            branch_name: (pointer_nature, self.node_2_rev[node_id])
-            for branch_name, (pointer_nature, node_id) in self.heads.items()
-        }
+        Args:
+            name: name of the release.
+            target: sha1_git of the target revision.
 
-        missing_revs = set(revisions.keys())
-        if missing_revs:
-            missing_revs = set(self.storage.revision_missing(list(missing_revs)))
-
-        for rev in missing_revs:
-            yield revisions[rev]
-        self.mnode_to_tree_id = None
-
-    def _read_tag(self, tag, split_byte=b" "):
-        node, *name = tag.split(split_byte)
-        name = split_byte.join(name)
-        return node, name
-
-    def get_releases(self) -> Iterable[Release]:
-        """Get the releases that need to be loaded."""
-        self.num_releases = 0
-        releases = {}
-        missing_releases = set()
-        for t in self.tags:
-            self.num_releases += 1
-            node, name = self._read_tag(t)
-            node = node.decode()
-            node_bytes = hash_to_bytes(node)
-            if not TAG_PATTERN.match(node):
-                self.log.warn("Wrong pattern (%s) found in tags. Skipping" % (node,))
-                continue
-            if node_bytes not in self.node_2_rev:
-                self.log.warn(
-                    "No matching revision for tag %s "
-                    "(hg changeset: %s). Skipping" % (name.decode(), node)
-                )
-                continue
-            tgt_rev = self.node_2_rev[node_bytes]
-            release = Release(
-                name=name,
-                target=tgt_rev,
-                target_type=ObjectType.REVISION,
-                message=None,
-                metadata=None,
-                synthetic=False,
-                author=Person(name=None, email=None, fullname=b""),
-                date=None,
-            )
-            missing_releases.add(release.id)
-            releases[release.id] = release
-            self.releases[name] = release.id
+        Returns:
+            the sha1_git of the stored release.
+        """
+        release = Release(
+            name=name,
+            target=target,
+            target_type=ObjectType.REVISION,
+            message=None,
+            metadata=None,
+            synthetic=False,
+            author=Person(name=None, email=None, fullname=b""),
+            date=None,
+        )
 
-        if missing_releases:
-            missing_releases = set(self.storage.release_missing(list(missing_releases)))
+        self.storage.release_add([release])
 
-        for _id in missing_releases:
-            yield releases[_id]
+        return release.id
 
-    def get_snapshot(self) -> Snapshot:
-        """Get the snapshot that need to be loaded."""
-        branches: Dict[bytes, Optional[SnapshotBranch]] = {}
-        for name, (pointer_nature, target) in self.heads.items():
-            branches[name] = SnapshotBranch(
-                target=target, target_type=TargetType.REVISION
-            )
-            if pointer_nature == HEAD_POINTER_NAME:
-                branches[b"HEAD"] = SnapshotBranch(
-                    target=name, target_type=TargetType.ALIAS
-                )
-        for name, target in self.releases.items():
-            branches[name] = SnapshotBranch(
-                target=target, target_type=TargetType.RELEASE
-            )
+    def store_content(self, rev_ctx: hgutil.BaseContext, file_path: bytes) -> Content:
+        """Store a revision content hg nodeid and file path.
 
-        self.snapshot = Snapshot(branches=branches)
-        return self.snapshot
+        Content is a mix of file content at a given revision
+        and its permissions found in the changeset's manifest.
 
-    def store_data(self) -> None:
-        super().store_data()
-        self.storage.extid_add(self.extids)
+        Args:
+            rev_ctx: the he revision context.
+            file_path: the hg path of the content.
 
-    def get_fetch_history_result(self):
-        """Return the data to store in fetch_history."""
-        return {
-            "contents": self.num_contents,
-            "directories": self.num_directories,
-            "revisions": self.num_revisions,
-            "releases": self.num_releases,
-        }
+        Returns:
+            the sha1_git of the top level directory.
+        """
+        hg_nodeid = rev_ctx.node()
+        file_ctx = rev_ctx[file_path]
 
-    def load_status(self):
-        snapshot = self.get_snapshot()
-        load_status = "eventful"
-        if self.last_snapshot_id is not None and self.last_snapshot_id == snapshot.id:
-            load_status = "uneventful"
-        return {
-            "status": load_status,
-        }
+        try:
+            file_nodeid = file_ctx.filenode()
+        except hgutil.LookupError:
+            # TODO
+            # Raising CorruptedRevision avoid crashing the whole loading
+            # but can lead to a lot of missing revisions.
+            # SkippedContent could be used but need actual content to calculate its id.
+            # Maybe the hg_nodeid can be used instead.
+            # Another option could be to just ignore the missing content.
+            # This point is left to future commits.
+            # Check for other uses to apply the same logic there.
+            raise CorruptedRevision(hg_nodeid)
+
+        perms = FLAG_PERMS[file_ctx.flags()]
+
+        # Key is file_nodeid + perms because permissions does not participate
+        # in content hash in hg while it is the case in swh.
+        cache_key = (file_nodeid, perms)
+
+        sha1_git = self._content_hash_cache.get(cache_key)
+        if sha1_git is None:
+            try:
+                data = file_ctx.data()
+            except hgutil.error.RevlogError:
+                # TODO
+                # See above use of `CorruptedRevision`
+                raise CorruptedRevision(hg_nodeid)
 
+            content = ModelContent.from_data(data)
 
-class HgArchiveBundle20Loader(HgBundle20Loader):
-    """Mercurial loader for repository wrapped within archives.
+            self.storage.content_add([content])
 
-    """
+            sha1_git = content.sha1_git
+            self._content_hash_cache[cache_key] = sha1_git
+
+        # Here we make sure to return only necessary data.
+        return Content({"sha1_git": sha1_git, "perms": perms})
+
+    def store_directories(self, rev_ctx: hgutil.BaseContext) -> Sha1Git:
+        """Store a revision directories given its hg nodeid.
+
+        Mercurial as no directory as in git. A Git like tree must be build
+        from file paths to obtain each directory hash.
+
+        Args:
+            rev_ctx: the he revision context.
+
+        Returns:
+            the sha1_git of the top level directory.
+        """
+        repo: hgutil.Repository = self._repo  # mypy can't infer that repo is not None
+        prev_ctx = repo[self._last_hg_nodeid]
+
+        # TODO maybe do diff on parents
+        try:
+            status = prev_ctx.status(rev_ctx)
+        except hgutil.error.LookupError:
+            raise CorruptedRevision(rev_ctx.node())
+
+        for file_path in status.removed:
+            try:
+                del self._last_root[file_path]
+            except KeyError:
+                raise CorruptedRevision(rev_ctx.node())
+
+        for file_path in status.added:
+            content = self.store_content(rev_ctx, file_path)
+            self._last_root[file_path] = content
+
+        for file_path in status.modified:
+            content = self.store_content(rev_ctx, file_path)
+            self._last_root[file_path] = content
+
+        self._last_hg_nodeid = rev_ctx.node()
+
+        directories: Deque[Directory] = deque([self._last_root])
+        while directories:
+            directory = directories.pop()
+            self.storage.directory_add([directory.to_model()])
+            directories.extend(
+                [item for item in directory.values() if isinstance(item, Directory)]
+            )
+
+        return self._last_root.hash
+
+
+class HgArchiveLoader(HgLoader):
+    """Mercurial loader for repository wrapped within tarballs."""
 
     def __init__(
         self,
         storage: StorageInterface,
         url: str,
-        visit_date: Optional[datetime.datetime] = None,
-        archive_path=None,
+        visit_date: Optional[datetime] = None,
+        archive_path: str = None,
         temp_directory: str = "/tmp",
         max_content_size: Optional[int] = None,
     ):
@@ -670,7 +763,7 @@
             storage=storage,
             url=url,
             visit_date=visit_date,
-            logging_class="swh.loader.mercurial.HgArchiveBundle20Loader",
+            logging_class="swh.loader.mercurial.ArchiveLoaderFromDisk",
             temp_directory=temp_directory,
             max_content_size=max_content_size,
         )
@@ -678,15 +771,16 @@
         self.archive_path = archive_path
 
     def prepare(self):
+        """Extract the archive instead of cloning."""
         self.archive_extract_temp_dir = tmp_extract(
             archive=self.archive_path,
-            dir=self.temp_directory,
+            dir=self._temp_directory,
             prefix=TEMPORARY_DIR_PREFIX_PATTERN,
-            suffix=".dump-%s" % os.getpid(),
+            suffix=f".dump-{os.getpid()}",
             log=self.log,
             source=self.origin_url,
         )
 
-        repo_name = os.listdir(self.archive_extract_temp_dir)[0]
+        repo_name = os.listdir(self.temp_dir)[0]
         self.directory = os.path.join(self.archive_extract_temp_dir, repo_name)
         super().prepare()
diff --git a/swh/loader/mercurial/tasks.py b/swh/loader/mercurial/tasks.py
--- a/swh/loader/mercurial/tasks.py
+++ b/swh/loader/mercurial/tasks.py
@@ -9,7 +9,7 @@
 
 from swh.loader.mercurial.utils import parse_visit_date
 
-from .from_disk import HgArchiveLoaderFromDisk, HgLoaderFromDisk
+from .loader import HgArchiveLoader, HgLoader
 
 
 @shared_task(name=__name__ + ".LoadMercurial")
@@ -20,11 +20,11 @@
 
     Import a mercurial tarball into swh.
 
-    Args: see :func:`HgLoaderFromDisk.load`.
+    Args: see :func:`HgLoader.load`.
 
     """
 
-    loader = HgLoaderFromDisk.from_configfile(
+    loader = HgLoader.from_configfile(
         url=url, directory=directory, visit_date=parse_visit_date(visit_date)
     )
     return loader.load()
@@ -36,9 +36,9 @@
 ):
     """Import a mercurial tarball into swh.
 
-    Args: see :func:`HgArchiveLoaderFromDisk.load`.
+    Args: see :func:`HgArchiveLoader.load`.
     """
-    loader = HgArchiveLoaderFromDisk.from_configfile(
+    loader = HgArchiveLoader.from_configfile(
         url=url, archive_path=archive_path, visit_date=parse_visit_date(visit_date)
     )
     return loader.load()
diff --git a/swh/loader/mercurial/tasks_from_disk.py b/swh/loader/mercurial/tasks_from_disk.py
deleted file mode 100644
--- a/swh/loader/mercurial/tasks_from_disk.py
+++ /dev/null
@@ -1,43 +0,0 @@
-# Copyright (C) 2020-2021  The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from typing import Optional
-
-from celery import shared_task
-
-from swh.loader.mercurial.utils import parse_visit_date
-
-from .from_disk import HgArchiveLoaderFromDisk, HgLoaderFromDisk
-
-
-@shared_task(name=__name__ + ".LoadMercurialFromDisk")
-def load_hg(
-    *, url: str, directory: Optional[str] = None, visit_date: Optional[str] = None
-):
-    """Mercurial repository loading
-
-    Import a mercurial tarball into swh.
-
-    Args: see :func:`HgLoaderFromDisk` constructor.
-
-    """
-    loader = HgLoaderFromDisk.from_configfile(
-        url=url, directory=directory, visit_date=parse_visit_date(visit_date)
-    )
-    return loader.load()
-
-
-@shared_task(name=__name__ + ".LoadArchiveMercurialFromDisk")
-def load_hg_from_archive(
-    *, url: str, archive_path: Optional[str] = None, visit_date: Optional[str] = None
-):
-    """Import a mercurial tarball into swh.
-
-    Args: see :func:`HgArchiveLoaderFromDisk` constructor.
-    """
-    loader = HgArchiveLoaderFromDisk.from_configfile(
-        url=url, archive_path=archive_path, visit_date=parse_visit_date(visit_date)
-    )
-    return loader.load()
diff --git a/swh/loader/mercurial/tests/test_from_disk.py b/swh/loader/mercurial/tests/test_from_disk.py
deleted file mode 100644
--- a/swh/loader/mercurial/tests/test_from_disk.py
+++ /dev/null
@@ -1,738 +0,0 @@
-# Copyright (C) 2020-2021  The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-from datetime import datetime
-from hashlib import sha1
-import os
-from pathlib import Path
-import subprocess
-import unittest
-
-import attr
-import pytest
-
-from swh.loader.mercurial.loader import HgBundle20Loader
-from swh.loader.mercurial.utils import parse_visit_date
-from swh.loader.tests import (
-    assert_last_visit_matches,
-    check_snapshot,
-    get_stats,
-    prepare_repository_from_archive,
-)
-from swh.model.from_disk import Content, DentryPerms
-from swh.model.hashutil import hash_to_bytes, hash_to_hex
-from swh.model.identifiers import ObjectType
-from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType
-from swh.storage import get_storage
-from swh.storage.algos.snapshot import snapshot_get_latest
-
-from ..from_disk import EXTID_VERSION, HgDirectory, HgLoaderFromDisk
-from .loader_checker import ExpectedSwhids, LoaderChecker
-
-VISIT_DATE = parse_visit_date("2016-05-03 15:16:32+00")
-assert VISIT_DATE is not None
-
-
-def random_content() -> Content:
-    """Create minimal content object."""
-    data = str(datetime.now()).encode()
-    return Content({"sha1_git": sha1(data).digest(), "perms": DentryPerms.content})
-
-
-def test_hg_directory_creates_missing_directories():
-    directory = HgDirectory()
-    directory[b"path/to/some/content"] = random_content()
-
-
-def test_hg_directory_get():
-    content = random_content()
-    directory = HgDirectory()
-
-    assert directory.get(b"path/to/content") is None
-    assert directory.get(b"path/to/content", content) == content
-
-    directory[b"path/to/content"] = content
-    assert directory.get(b"path/to/content") == content
-
-
-def test_hg_directory_deletes_empty_directories():
-    directory = HgDirectory()
-    content = random_content()
-    directory[b"path/to/content"] = content
-    directory[b"path/to/some/deep/content"] = random_content()
-
-    del directory[b"path/to/some/deep/content"]
-
-    assert directory.get(b"path/to/some/deep") is None
-    assert directory.get(b"path/to/some") is None
-    assert directory.get(b"path/to/content") == content
-
-
-def test_hg_directory_when_directory_replaces_file():
-    directory = HgDirectory()
-    directory[b"path/to/some"] = random_content()
-    directory[b"path/to/some/content"] = random_content()
-
-
-# Those tests assert expectations on repository loading
-# by reading expected values from associated json files
-# produced by the `swh-hg-identify` command line utility.
-#
-# It has more granularity than historical tests.
-# Assertions will tell if the error comes from the directories
-# revisions or release rather than only checking the snapshot.
-#
-# With more work it should event be possible to know which part
-# of an object is faulty.
-@pytest.mark.parametrize(
-    "archive_name", ("hello", "transplant", "the-sandbox", "example")
-)
-def test_examples(swh_storage, datadir, tmp_path, archive_name):
-    archive_path = Path(datadir, f"{archive_name}.tgz")
-    json_path = Path(datadir, f"{archive_name}.json")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-
-    LoaderChecker(
-        loader=HgLoaderFromDisk(swh_storage, repo_url),
-        expected=ExpectedSwhids.load(json_path),
-    ).check()
-
-
-# This test has as been adapted from the historical `HgBundle20Loader` tests
-# to ensure compatibility of `HgLoaderFromDisk`.
-# Hashes as been produced by copy pasting the result of the implementation
-# to prevent regressions.
-def test_loader_hg_new_visit_no_release(swh_storage, datadir, tmp_path):
-    """Eventful visit should yield 1 snapshot"""
-    archive_name = "the-sandbox"
-    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-
-    loader = HgLoaderFromDisk(swh_storage, url=repo_url)
-
-    assert loader.load() == {"status": "eventful"}
-
-    tips = {
-        b"branch-tip/default": "70e750bb046101fdced06f428e73fee471509c56",
-        b"branch-tip/develop": "a9c4534552df370f43f0ef97146f393ef2f2a08c",
-    }
-    closed = {
-        b"feature/fun_time": "4d640e8064fe69b4c851dfd43915c431e80c7497",
-        b"feature/green2_loader": "94be9abcf9558213ff301af0ecd8223451ce991d",
-        b"feature/greenloader": "9f82d95bd3edfb7f18b1a21d6171170395ea44ce",
-        b"feature/my_test": "dafa445964230e808148db043c126063ea1dc9b6",
-        b"feature/read2_loader": "9e912851eb64e3a1e08fbb587de7a4c897ce5a0a",
-        b"feature/readloader": "ddecbc16f4c916c39eacfcb2302e15a9e70a231e",
-        b"feature/red": "cb36b894129ca7910bb81c457c72d69d5ff111bc",
-        b"feature/split5_loader": "3ed4b85d30401fe32ae3b1d650f215a588293a9e",
-        b"feature/split_causing": "c346f6ff7f42f2a8ff867f92ab83a6721057d86c",
-        b"feature/split_loader": "5f4eba626c3f826820c4475d2d81410759ec911b",
-        b"feature/split_loader5": "5017ce0b285351da09a2029ea2cf544f79b593c7",
-        b"feature/split_loading": "4e2dc6d6073f0b6d348f84ded52f9143b10344b9",
-        b"feature/split_redload": "2d4a801c9a9645fcd3a9f4c06418d8393206b1f3",
-        b"feature/splitloading": "88b80615ed8561be74a700b92883ec0374ddacb0",
-        b"feature/test": "61d762d65afb3150e2653d6735068241779c1fcf",
-        b"feature/test_branch": "be44d5e6cc66580f59c108f8bff5911ee91a22e4",
-        b"feature/test_branching": "d2164061453ecb03d4347a05a77db83f706b8e15",
-        b"feature/test_dog": "2973e5dc9568ac491b198f6b7f10c44ddc04e0a3",
-    }
-
-    mapping = {b"branch-closed-heads/%s/0" % b: n for b, n in closed.items()}
-    mapping.update(tips)
-
-    expected_branches = {
-        k: SnapshotBranch(target=hash_to_bytes(v), target_type=TargetType.REVISION)
-        for k, v in mapping.items()
-    }
-    expected_branches[b"HEAD"] = SnapshotBranch(
-        target=b"branch-tip/default", target_type=TargetType.ALIAS
-    )
-
-    expected_snapshot = Snapshot(
-        id=hash_to_bytes("cbc609dcdced34dbd9938fe81b555170f1abc96f"),
-        branches=expected_branches,
-    )
-
-    assert_last_visit_matches(
-        loader.storage,
-        repo_url,
-        status="full",
-        type="hg",
-        snapshot=expected_snapshot.id,
-    )
-    check_snapshot(expected_snapshot, loader.storage)
-
-    stats = get_stats(loader.storage)
-    expected_stats = {
-        "content": 2,
-        "directory": 3,
-        "origin": 1,
-        "origin_visit": 1,
-        "release": 0,
-        "revision": 58,
-        "skipped_content": 0,
-        "snapshot": 1,
-    }
-    assert stats == expected_stats
-    loader2 = HgLoaderFromDisk(swh_storage, url=repo_url)
-
-    assert loader2.load() == {"status": "uneventful"}  # nothing new happened
-
-    stats2 = get_stats(loader2.storage)
-    expected_stats2 = expected_stats.copy()
-    expected_stats2["origin_visit"] = 2  # one new visit recorded
-    assert stats2 == expected_stats2
-    assert_last_visit_matches(
-        loader2.storage,
-        repo_url,
-        status="full",
-        type="hg",
-        snapshot=expected_snapshot.id,
-    )  # but we got a snapshot nonetheless
-
-
-# This test has as been adapted from the historical `HgBundle20Loader` tests
-# to ensure compatibility of `HgLoaderFromDisk`.
-# Hashes as been produced by copy pasting the result of the implementation
-# to prevent regressions.
-def test_loader_hg_new_visit_with_release(swh_storage, datadir, tmp_path):
-    """Eventful visit with release should yield 1 snapshot"""
-
-    archive_name = "hello"
-    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-
-    loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,)
-
-    actual_load_status = loader.load()
-    assert actual_load_status == {"status": "eventful"}
-
-    # then
-    stats = get_stats(loader.storage)
-    assert stats == {
-        "content": 3,
-        "directory": 3,
-        "origin": 1,
-        "origin_visit": 1,
-        "release": 1,
-        "revision": 3,
-        "skipped_content": 0,
-        "snapshot": 1,
-    }
-
-    # cf. test_loader.org for explaining from where those hashes
-    tip_release = hash_to_bytes("515c4d72e089404356d0f4b39d60f948b8999140")
-    release = loader.storage.release_get([tip_release])[0]
-    assert release is not None
-
-    tip_revision_default = hash_to_bytes("c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27")
-    revision = loader.storage.revision_get([tip_revision_default])[0]
-    assert revision is not None
-
-    expected_snapshot = Snapshot(
-        id=hash_to_bytes("7ef082aa8b53136b1bed97f734504be32679bbec"),
-        branches={
-            b"branch-tip/default": SnapshotBranch(
-                target=tip_revision_default, target_type=TargetType.REVISION,
-            ),
-            b"tags/0.1": SnapshotBranch(
-                target=tip_release, target_type=TargetType.RELEASE,
-            ),
-            b"HEAD": SnapshotBranch(
-                target=b"branch-tip/default", target_type=TargetType.ALIAS,
-            ),
-        },
-    )
-
-    check_snapshot(expected_snapshot, loader.storage)
-    assert_last_visit_matches(
-        loader.storage,
-        repo_url,
-        type=RevisionType.MERCURIAL.value,
-        status="full",
-        snapshot=expected_snapshot.id,
-    )
-
-
-# This test has as been adapted from the historical `HgBundle20Loader` tests
-# to ensure compatibility of `HgLoaderFromDisk`.
-# Hashes as been produced by copy pasting the result of the implementation
-# to prevent regressions.
-def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path):
-    """Visit a mercurial repository visit transplant operations within should yield a
-    snapshot as well.
-
-    """
-
-    archive_name = "transplant"
-    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-
-    loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,)
-
-    # load hg repository
-    actual_load_status = loader.load()
-    assert actual_load_status == {"status": "eventful"}
-
-    # collect swh revisions
-    assert_last_visit_matches(
-        loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full"
-    )
-
-    revisions = []
-    snapshot = snapshot_get_latest(loader.storage, repo_url)
-    for branch in snapshot.branches.values():
-        if branch.target_type.value != "revision":
-            continue
-        revisions.append(branch.target)
-
-    # extract original changesets info and the transplant sources
-    hg_changesets = set()
-    transplant_sources = set()
-    for rev in loader.storage.revision_log(revisions):
-        extids = list(
-            loader.storage.extid_get_from_target(ObjectType.REVISION, [rev["id"]])
-        )
-        assert len(extids) == 1
-        hg_changesets.add(hash_to_hex(extids[0].extid))
-        for k, v in rev["extra_headers"]:
-            if k == b"transplant_source":
-                transplant_sources.add(v.decode("ascii"))
-
-    # check extracted data are valid
-    assert len(hg_changesets) > 0
-    assert len(transplant_sources) > 0
-    assert transplant_sources <= hg_changesets
-
-
-def _partial_copy_storage(
-    old_storage, origin_url: str, mechanism: str, copy_revisions: bool
-):
-    """Create a new storage, and only copy ExtIDs or head revisions to it."""
-    new_storage = get_storage(cls="memory")
-    snapshot = snapshot_get_latest(old_storage, origin_url)
-    assert snapshot
-    heads = [branch.target for branch in snapshot.branches.values()]
-
-    if mechanism == "extid":
-        extids = old_storage.extid_get_from_target(ObjectType.REVISION, heads)
-        new_storage.extid_add(extids)
-        if copy_revisions:
-            # copy revisions, but erase their metadata to make sure the loader doesn't
-            # fallback to revision.metadata["nodeid"]
-            revisions = [
-                attr.evolve(rev, metadata={})
-                for rev in old_storage.revision_get(heads)
-                if rev
-            ]
-            new_storage.revision_add(revisions)
-
-    else:
-        assert mechanism == "same storage"
-        return old_storage
-
-    # copy origin, visit, status
-    new_storage.origin_add(old_storage.origin_get([origin_url]))
-    visit = old_storage.origin_visit_get_latest(origin_url)
-    new_storage.origin_visit_add([visit])
-    statuses = old_storage.origin_visit_status_get(origin_url, visit.visit).results
-    new_storage.origin_visit_status_add(statuses)
-    new_storage.snapshot_add([snapshot])
-
-    return new_storage
-
-
-def test_load_unchanged_repo_should_be_uneventful(
-    swh_storage, datadir, tmp_path,
-):
-    """Checks the loader can find which revisions it already loaded, using ExtIDs."""
-    archive_name = "hello"
-    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-    repo_path = repo_url.replace("file://", "")
-
-    loader = HgLoaderFromDisk(swh_storage, repo_path)
-
-    assert loader.load() == {"status": "eventful"}
-    assert get_stats(loader.storage) == {
-        "content": 3,
-        "directory": 3,
-        "origin": 1,
-        "origin_visit": 1,
-        "release": 1,
-        "revision": 3,
-        "skipped_content": 0,
-        "snapshot": 1,
-    }
-    visit_status = assert_last_visit_matches(
-        loader.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full",
-    )
-    assert visit_status.snapshot is not None
-
-    # Create a new loader (to start with a clean slate, eg. remove the caches),
-    # with the new, partial, storage
-    loader2 = HgLoaderFromDisk(swh_storage, repo_path)
-    assert loader2.load() == {"status": "uneventful"}
-
-    # Should have all the objects
-    assert get_stats(loader.storage) == {
-        "content": 3,
-        "directory": 3,
-        "origin": 1,
-        "origin_visit": 2,
-        "release": 1,
-        "revision": 3,
-        "skipped_content": 0,
-        "snapshot": 1,
-    }
-    visit_status2 = assert_last_visit_matches(
-        loader2.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full",
-    )
-    assert visit_status2.snapshot == visit_status.snapshot
-
-
-def test_closed_branch_incremental(swh_storage, datadir, tmp_path):
-    """Test that a repository with a closed branch does not trip an incremental load"""
-    archive_name = "example"
-    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-    repo_path = repo_url.replace("file://", "")
-
-    loader = HgLoaderFromDisk(swh_storage, repo_path)
-
-    # Test 3 loads: full, and two incremental.
-    assert loader.load() == {"status": "eventful"}
-    expected_stats = {
-        "content": 7,
-        "directory": 16,
-        "origin": 1,
-        "origin_visit": 1,
-        "release": 0,
-        "revision": 9,
-        "skipped_content": 0,
-        "snapshot": 1,
-    }
-    assert get_stats(loader.storage) == expected_stats
-    assert loader.load() == {"status": "uneventful"}
-    assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 1 + 1}
-    assert loader.load() == {"status": "uneventful"}
-    assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 2 + 1}
-
-
-def test_old_loader_new_loader(swh_storage, datadir, tmp_path):
-    archive_name = "example"
-    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-    repo_path = repo_url.replace("file://", "")
-
-    old_loader = HgBundle20Loader(swh_storage, repo_path)
-    assert old_loader.load() == {"status": "eventful"}
-
-    expected_stats = {
-        "content": 7,
-        "directory": 16,
-        "origin": 1,
-        "origin_visit": 1,
-        "release": 0,
-        "revision": 9,
-        "skipped_content": 0,
-        "snapshot": 1,
-    }
-    assert get_stats(old_loader.storage) == expected_stats
-
-    # Will pick up more branches, hence a different snapshot
-    loader = HgLoaderFromDisk(swh_storage, repo_path)
-    res = loader.load()
-    new_expected_stats = {
-        **expected_stats,
-        "origin_visit": 2,
-        "snapshot": 2,
-    }
-    assert get_stats(loader.storage) == new_expected_stats
-    assert res == {"status": "eventful"}
-
-    # Shouldn't pick up anything now
-    loader = HgLoaderFromDisk(swh_storage, repo_path)
-    assert loader.load() == {"status": "uneventful"}
-
-    # Shouldn't pick up anything either after another load
-    loader = HgLoaderFromDisk(swh_storage, repo_path)
-    assert loader.load() == {"status": "uneventful"}
-
-
-def test_load_unchanged_repo__dangling_extid(swh_storage, datadir, tmp_path):
-    """Checks the loader will load revisions targeted by an ExtID if the
-    revisions are missing from the storage"""
-    archive_name = "hello"
-    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-    repo_path = repo_url.replace("file://", "")
-
-    loader = HgLoaderFromDisk(swh_storage, repo_path)
-
-    assert loader.load() == {"status": "eventful"}
-    assert get_stats(loader.storage) == {
-        "content": 3,
-        "directory": 3,
-        "origin": 1,
-        "origin_visit": 1,
-        "release": 1,
-        "revision": 3,
-        "skipped_content": 0,
-        "snapshot": 1,
-    }
-
-    old_storage = swh_storage
-
-    # Create a new storage, and only copy ExtIDs or head revisions to it.
-    # This should be enough for the loader to know revisions were already loaded
-    new_storage = _partial_copy_storage(
-        old_storage, repo_path, mechanism="extid", copy_revisions=False
-    )
-
-    # Create a new loader (to start with a clean slate, eg. remove the caches),
-    # with the new, partial, storage
-    loader = HgLoaderFromDisk(new_storage, repo_path)
-
-    assert get_stats(loader.storage) == {
-        "content": 0,
-        "directory": 0,
-        "origin": 1,
-        "origin_visit": 1,
-        "release": 0,
-        "revision": 0,
-        "skipped_content": 0,
-        "snapshot": 1,
-    }
-
-    assert loader.load() == {"status": "eventful"}
-
-    assert get_stats(loader.storage) == {
-        "content": 3,
-        "directory": 3,
-        "origin": 1,
-        "origin_visit": 2,
-        "release": 1,
-        "revision": 3,
-        "skipped_content": 0,
-        "snapshot": 1,
-    }
-
-
-def test_missing_filelog_should_not_crash(swh_storage, datadir, tmp_path):
-    archive_name = "missing-filelog"
-    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-    directory = repo_url.replace("file://", "")
-
-    loader = HgLoaderFromDisk(
-        storage=swh_storage,
-        url=repo_url,
-        directory=directory,  # specify directory to avoid clone
-        visit_date=VISIT_DATE,
-    )
-
-    actual_load_status = loader.load()
-    assert actual_load_status == {"status": "eventful"}
-
-    assert_last_visit_matches(swh_storage, repo_url, status="partial", type="hg")
-
-
-def test_multiple_open_heads(swh_storage, datadir, tmp_path):
-    archive_name = "multiple-heads"
-    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-
-    loader = HgLoaderFromDisk(storage=swh_storage, url=repo_url,)
-
-    actual_load_status = loader.load()
-    assert actual_load_status == {"status": "eventful"}
-
-    assert_last_visit_matches(swh_storage, repo_url, status="full", type="hg")
-
-    snapshot = snapshot_get_latest(swh_storage, repo_url)
-    expected_branches = [
-        b"HEAD",
-        b"branch-heads/default/0",
-        b"branch-heads/default/1",
-        b"branch-tip/default",
-    ]
-    assert sorted(snapshot.branches.keys()) == expected_branches
-
-    # Check that we don't load anything the second time
-    loader = HgLoaderFromDisk(storage=swh_storage, url=repo_url,)
-
-    actual_load_status = loader.load()
-
-    assert actual_load_status == {"status": "uneventful"}
-
-
-def hg_strip(repo: str, revset: str) -> None:
-    """Removes `revset` and all of their descendants from the local repository."""
-    # Previously called `hg strip`, it was renamed to `hg debugstrip` in Mercurial 5.7
-    # because it's most likely not what most users want to do (they should use some kind
-    # of history-rewriting tool like `histedit` or `prune`).
-    # But here, it's exactly what we want to do.
-    subprocess.check_call(["hg", "debugstrip", revset], cwd=repo)
-
-
-def test_load_repo_with_new_commits(swh_storage, datadir, tmp_path):
-    archive_name = "hello"
-    archive_path = Path(datadir, f"{archive_name}.tgz")
-    json_path = Path(datadir, f"{archive_name}.json")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-
-    # first load with missing commits
-    hg_strip(repo_url.replace("file://", ""), "tip")
-    loader = HgLoaderFromDisk(swh_storage, repo_url)
-    assert loader.load() == {"status": "eventful"}
-    assert get_stats(loader.storage) == {
-        "content": 2,
-        "directory": 2,
-        "origin": 1,
-        "origin_visit": 1,
-        "release": 0,
-        "revision": 2,
-        "skipped_content": 0,
-        "snapshot": 1,
-    }
-
-    # second load with all commits
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-    loader = HgLoaderFromDisk(swh_storage, repo_url)
-    checker = LoaderChecker(loader=loader, expected=ExpectedSwhids.load(json_path),)
-
-    checker.check()
-
-    assert get_stats(loader.storage) == {
-        "content": 3,
-        "directory": 3,
-        "origin": 1,
-        "origin_visit": 2,
-        "release": 1,
-        "revision": 3,
-        "skipped_content": 0,
-        "snapshot": 2,
-    }
-
-
-def test_load_repo_check_extids_write_version(swh_storage, datadir, tmp_path):
-    """ExtIDs should be stored with a given version when loading is done"""
-    archive_name = "hello"
-    archive_path = Path(datadir, f"{archive_name}.tgz")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-
-    hg_strip(repo_url.replace("file://", ""), "tip")
-    loader = HgLoaderFromDisk(swh_storage, repo_url)
-    assert loader.load() == {"status": "eventful"}
-
-    # Ensure we write ExtIDs to a specific version.
-    snapshot = snapshot_get_latest(swh_storage, repo_url)
-
-    # First, filter out revisions from that snapshot
-    revision_ids = [
-        branch.target
-        for branch in snapshot.branches.values()
-        if branch.target_type == TargetType.REVISION
-    ]
-
-    assert len(revision_ids) > 0
-
-    # Those revisions should have their associated ExtID version set to EXTID_VERSION
-    extids = swh_storage.extid_get_from_target(ObjectType.REVISION, revision_ids)
-
-    assert len(extids) == len(revision_ids)
-    for extid in extids:
-        assert extid.extid_version == EXTID_VERSION
-
-
-def test_load_new_extid_should_be_eventful(swh_storage, datadir, tmp_path):
-    """Changing the extid version should make loaders ignore existing extids,
-    and load the repo again."""
-    archive_name = "hello"
-    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-    repo_path = repo_url.replace("file://", "")
-
-    with unittest.mock.patch("swh.loader.mercurial.from_disk.EXTID_VERSION", 0):
-        loader = HgLoaderFromDisk(swh_storage, repo_path)
-        assert loader.load() == {"status": "eventful"}
-
-    loader = HgLoaderFromDisk(swh_storage, repo_path)
-    assert loader.load() == {"status": "eventful"}
-
-    loader = HgLoaderFromDisk(swh_storage, repo_path)
-    assert loader.load() == {"status": "uneventful"}
-
-    with unittest.mock.patch("swh.loader.mercurial.from_disk.EXTID_VERSION", 10000):
-        loader = HgLoaderFromDisk(swh_storage, repo_path)
-        assert loader.load() == {"status": "eventful"}
-
-        loader = HgLoaderFromDisk(swh_storage, repo_path)
-        assert loader.load() == {"status": "uneventful"}
-
-
-def test_loader_hg_extid_filtering(swh_storage, datadir, tmp_path):
-    """The first visit of a fork should filter already seen revisions (through extids)
-
-    """
-    archive_name = "the-sandbox"
-    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-
-    loader = HgLoaderFromDisk(swh_storage, url=repo_url)
-
-    assert loader.load() == {"status": "eventful"}
-    stats = get_stats(loader.storage)
-    expected_stats = {
-        "content": 2,
-        "directory": 3,
-        "origin": 1,
-        "origin_visit": 1,
-        "release": 0,
-        "revision": 58,
-        "skipped_content": 0,
-        "snapshot": 1,
-    }
-    assert stats == expected_stats
-
-    visit_status = assert_last_visit_matches(
-        loader.storage, repo_url, status="full", type="hg",
-    )
-
-    # Make a fork of the first repository we ingested
-    fork_url = prepare_repository_from_archive(
-        archive_path, "the-sandbox-reloaded", tmp_path
-    )
-    loader2 = HgLoaderFromDisk(
-        swh_storage, url=fork_url, directory=str(tmp_path / archive_name)
-    )
-
-    assert loader2.load() == {"status": "uneventful"}
-
-    stats = get_stats(loader.storage)
-    expected_stats2 = expected_stats.copy()
-    expected_stats2.update(
-        {"origin": 1 + 1, "origin_visit": 1 + 1,}
-    )
-    assert stats == expected_stats2
-
-    visit_status2 = assert_last_visit_matches(
-        loader.storage, fork_url, status="full", type="hg",
-    )
-    assert visit_status.snapshot is not None
-    assert visit_status2.snapshot == visit_status.snapshot
-
-
-def test_loader_repository_with_bookmark_information(swh_storage, datadir, tmp_path):
-    """Repository with bookmark information should be ingested correctly
-
-    """
-    archive_name = "anomad-d"
-    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
-    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-
-    loader = HgLoaderFromDisk(swh_storage, url=repo_url)
-
-    assert loader.load() == {"status": "eventful"}
diff --git a/swh/loader/mercurial/tests/test_loader.py b/swh/loader/mercurial/tests/test_loader.py
--- a/swh/loader/mercurial/tests/test_loader.py
+++ b/swh/loader/mercurial/tests/test_loader.py
@@ -1,15 +1,15 @@
-# Copyright (C) 2018-2021  The Software Heritage developers
+# Copyright (C) 2020-2021  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
-
-import copy
-import logging
+from datetime import datetime
+from hashlib import sha1
 import os
-import time
+from pathlib import Path
+import subprocess
+import unittest
 
-import hglib
-from hglib.error import CommandError
+import attr
 import pytest
 
 from swh.loader.mercurial.utils import parse_visit_date
@@ -19,51 +19,150 @@
     get_stats,
     prepare_repository_from_archive,
 )
+from swh.model.from_disk import Content, DentryPerms
 from swh.model.hashutil import hash_to_bytes, hash_to_hex
 from swh.model.identifiers import ObjectType
 from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType
+from swh.storage import get_storage
 from swh.storage.algos.snapshot import snapshot_get_latest
 
-from ..loader import CloneTimeoutError, HgArchiveBundle20Loader, HgBundle20Loader
+from ..loader import EXTID_VERSION, HgDirectory, HgLoader
+from .loader_checker import ExpectedSwhids, LoaderChecker
 
 VISIT_DATE = parse_visit_date("2016-05-03 15:16:32+00")
 assert VISIT_DATE is not None
 
 
+def random_content() -> Content:
+    """Create minimal content object."""
+    data = str(datetime.now()).encode()
+    return Content({"sha1_git": sha1(data).digest(), "perms": DentryPerms.content})
+
+
+def test_hg_directory_creates_missing_directories():
+    directory = HgDirectory()
+    directory[b"path/to/some/content"] = random_content()
+
+
+def test_hg_directory_get():
+    content = random_content()
+    directory = HgDirectory()
+
+    assert directory.get(b"path/to/content") is None
+    assert directory.get(b"path/to/content", content) == content
+
+    directory[b"path/to/content"] = content
+    assert directory.get(b"path/to/content") == content
+
+
+def test_hg_directory_deletes_empty_directories():
+    directory = HgDirectory()
+    content = random_content()
+    directory[b"path/to/content"] = content
+    directory[b"path/to/some/deep/content"] = random_content()
+
+    del directory[b"path/to/some/deep/content"]
+
+    assert directory.get(b"path/to/some/deep") is None
+    assert directory.get(b"path/to/some") is None
+    assert directory.get(b"path/to/content") == content
+
+
+def test_hg_directory_when_directory_replaces_file():
+    directory = HgDirectory()
+    directory[b"path/to/some"] = random_content()
+    directory[b"path/to/some/content"] = random_content()
+
+
+# Those tests assert expectations on repository loading
+# by reading expected values from associated json files
+# produced by the `swh-hg-identify` command line utility.
+#
+# It has more granularity than historical tests.
+# Assertions will tell if the error comes from the directories
+# revisions or release rather than only checking the snapshot.
+#
+# With more work it should event be possible to know which part
+# of an object is faulty.
+@pytest.mark.parametrize(
+    "archive_name", ("hello", "transplant", "the-sandbox", "example")
+)
+def test_examples(swh_storage, datadir, tmp_path, archive_name):
+    archive_path = Path(datadir, f"{archive_name}.tgz")
+    json_path = Path(datadir, f"{archive_name}.json")
+    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
+
+    LoaderChecker(
+        loader=HgLoader(swh_storage, repo_url), expected=ExpectedSwhids.load(json_path),
+    ).check()
+
+
+# This test has as been adapted from the historical `HgBundle20Loader` tests
+# to ensure compatibility of `HgLoader`.
+# Hashes as been produced by copy pasting the result of the implementation
+# to prevent regressions.
 def test_loader_hg_new_visit_no_release(swh_storage, datadir, tmp_path):
     """Eventful visit should yield 1 snapshot"""
     archive_name = "the-sandbox"
     archive_path = os.path.join(datadir, f"{archive_name}.tgz")
     repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
 
-    loader = HgBundle20Loader(swh_storage, repo_url)
+    loader = HgLoader(swh_storage, url=repo_url)
 
     assert loader.load() == {"status": "eventful"}
 
-    tip_revision_develop = "a9c4534552df370f43f0ef97146f393ef2f2a08c"
-    tip_revision_default = "70e750bb046101fdced06f428e73fee471509c56"
+    tips = {
+        b"branch-tip/default": "70e750bb046101fdced06f428e73fee471509c56",
+        b"branch-tip/develop": "a9c4534552df370f43f0ef97146f393ef2f2a08c",
+    }
+    closed = {
+        b"feature/fun_time": "4d640e8064fe69b4c851dfd43915c431e80c7497",
+        b"feature/green2_loader": "94be9abcf9558213ff301af0ecd8223451ce991d",
+        b"feature/greenloader": "9f82d95bd3edfb7f18b1a21d6171170395ea44ce",
+        b"feature/my_test": "dafa445964230e808148db043c126063ea1dc9b6",
+        b"feature/read2_loader": "9e912851eb64e3a1e08fbb587de7a4c897ce5a0a",
+        b"feature/readloader": "ddecbc16f4c916c39eacfcb2302e15a9e70a231e",
+        b"feature/red": "cb36b894129ca7910bb81c457c72d69d5ff111bc",
+        b"feature/split5_loader": "3ed4b85d30401fe32ae3b1d650f215a588293a9e",
+        b"feature/split_causing": "c346f6ff7f42f2a8ff867f92ab83a6721057d86c",
+        b"feature/split_loader": "5f4eba626c3f826820c4475d2d81410759ec911b",
+        b"feature/split_loader5": "5017ce0b285351da09a2029ea2cf544f79b593c7",
+        b"feature/split_loading": "4e2dc6d6073f0b6d348f84ded52f9143b10344b9",
+        b"feature/split_redload": "2d4a801c9a9645fcd3a9f4c06418d8393206b1f3",
+        b"feature/splitloading": "88b80615ed8561be74a700b92883ec0374ddacb0",
+        b"feature/test": "61d762d65afb3150e2653d6735068241779c1fcf",
+        b"feature/test_branch": "be44d5e6cc66580f59c108f8bff5911ee91a22e4",
+        b"feature/test_branching": "d2164061453ecb03d4347a05a77db83f706b8e15",
+        b"feature/test_dog": "2973e5dc9568ac491b198f6b7f10c44ddc04e0a3",
+    }
+
+    mapping = {b"branch-closed-heads/%s/0" % b: n for b, n in closed.items()}
+    mapping.update(tips)
+
+    expected_branches = {
+        k: SnapshotBranch(target=hash_to_bytes(v), target_type=TargetType.REVISION)
+        for k, v in mapping.items()
+    }
+    expected_branches[b"HEAD"] = SnapshotBranch(
+        target=b"branch-tip/default", target_type=TargetType.ALIAS
+    )
+
     expected_snapshot = Snapshot(
-        id=hash_to_bytes("3b8fe58e467deb7597b12a5fd3b2c096b8c02028"),
-        branches={
-            b"develop": SnapshotBranch(
-                target=hash_to_bytes(tip_revision_develop),
-                target_type=TargetType.REVISION,
-            ),
-            b"default": SnapshotBranch(
-                target=hash_to_bytes(tip_revision_default),
-                target_type=TargetType.REVISION,
-            ),
-            b"HEAD": SnapshotBranch(target=b"develop", target_type=TargetType.ALIAS,),
-        },
+        id=hash_to_bytes("cbc609dcdced34dbd9938fe81b555170f1abc96f"),
+        branches=expected_branches,
     )
 
     assert_last_visit_matches(
-        swh_storage, repo_url, status="full", type="hg", snapshot=expected_snapshot.id,
+        loader.storage,
+        repo_url,
+        status="full",
+        type="hg",
+        snapshot=expected_snapshot.id,
     )
-    check_snapshot(expected_snapshot, swh_storage)
+    check_snapshot(expected_snapshot, loader.storage)
 
-    stats = get_stats(swh_storage)
-    assert stats == {
+    stats = get_stats(loader.storage)
+    expected_stats = {
         "content": 2,
         "directory": 3,
         "origin": 1,
@@ -73,44 +172,42 @@
         "skipped_content": 0,
         "snapshot": 1,
     }
+    assert stats == expected_stats
+    loader2 = HgLoader(swh_storage, url=repo_url)
 
-    # Ensure archive loader yields the same snapshot
-    loader2 = HgArchiveBundle20Loader(
-        swh_storage, url=archive_path, archive_path=archive_path, visit_date=VISIT_DATE,
-    )
-
-    actual_load_status = loader2.load()
-    assert actual_load_status == {"status": "eventful"}
+    assert loader2.load() == {"status": "uneventful"}  # nothing new happened
 
     stats2 = get_stats(loader2.storage)
-    expected_stats = copy.deepcopy(stats)
-    expected_stats["origin"] += 1
-    expected_stats["origin_visit"] += 1
-    assert stats2 == expected_stats
-
-    # That visit yields the same snapshot
+    expected_stats2 = expected_stats.copy()
+    expected_stats2["origin_visit"] = 2  # one new visit recorded
+    assert stats2 == expected_stats2
     assert_last_visit_matches(
         loader2.storage,
-        archive_path,
+        repo_url,
         status="full",
         type="hg",
         snapshot=expected_snapshot.id,
-    )
+    )  # but we got a snapshot nonetheless
 
 
+# This test has as been adapted from the historical `HgBundle20Loader` tests
+# to ensure compatibility of `HgLoader`.
+# Hashes as been produced by copy pasting the result of the implementation
+# to prevent regressions.
 def test_loader_hg_new_visit_with_release(swh_storage, datadir, tmp_path):
     """Eventful visit with release should yield 1 snapshot"""
+
     archive_name = "hello"
     archive_path = os.path.join(datadir, f"{archive_name}.tgz")
     repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
 
-    loader = HgBundle20Loader(swh_storage, url=repo_url, visit_date=VISIT_DATE,)
+    loader = HgLoader(swh_storage, url=repo_url, visit_date=VISIT_DATE,)
 
     actual_load_status = loader.load()
     assert actual_load_status == {"status": "eventful"}
 
     # then
-    stats = get_stats(swh_storage)
+    stats = get_stats(loader.storage)
     assert stats == {
         "content": 3,
         "directory": 3,
@@ -124,74 +221,238 @@
 
     # cf. test_loader.org for explaining from where those hashes
     tip_release = hash_to_bytes("515c4d72e089404356d0f4b39d60f948b8999140")
-    release = swh_storage.release_get([tip_release])[0]
+    release = loader.storage.release_get([tip_release])[0]
     assert release is not None
 
     tip_revision_default = hash_to_bytes("c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27")
-    revision = swh_storage.revision_get([tip_revision_default])[0]
+    revision = loader.storage.revision_get([tip_revision_default])[0]
     assert revision is not None
 
     expected_snapshot = Snapshot(
-        id=hash_to_bytes("d35668e02e2ba4321dc951cd308cf883786f918a"),
+        id=hash_to_bytes("7ef082aa8b53136b1bed97f734504be32679bbec"),
         branches={
-            b"default": SnapshotBranch(
+            b"branch-tip/default": SnapshotBranch(
                 target=tip_revision_default, target_type=TargetType.REVISION,
             ),
-            b"0.1": SnapshotBranch(target=tip_release, target_type=TargetType.RELEASE,),
-            b"HEAD": SnapshotBranch(target=b"default", target_type=TargetType.ALIAS,),
+            b"tags/0.1": SnapshotBranch(
+                target=tip_release, target_type=TargetType.RELEASE,
+            ),
+            b"HEAD": SnapshotBranch(
+                target=b"branch-tip/default", target_type=TargetType.ALIAS,
+            ),
         },
     )
 
-    check_snapshot(expected_snapshot, swh_storage)
+    check_snapshot(expected_snapshot, loader.storage)
     assert_last_visit_matches(
-        swh_storage,
+        loader.storage,
         repo_url,
         type=RevisionType.MERCURIAL.value,
         status="full",
         snapshot=expected_snapshot.id,
     )
 
-    # Ensure archive loader yields the same snapshot
-    loader2 = HgArchiveBundle20Loader(
-        swh_storage, url=archive_path, archive_path=archive_path, visit_date=VISIT_DATE,
-    )
 
-    actual_load_status = loader2.load()
-    assert actual_load_status == {"status": "eventful"}
+# This test has as been adapted from the historical `HgBundle20Loader` tests
+# to ensure compatibility of `HgLoader`.
+# Hashes as been produced by copy pasting the result of the implementation
+# to prevent regressions.
+def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path):
+    """Visit a mercurial repository visit transplant operations within should yield a
+    snapshot as well.
 
-    stats2 = get_stats(loader2.storage)
-    expected_stats = copy.deepcopy(stats)
-    expected_stats["origin"] += 1
-    expected_stats["origin_visit"] += 1
-    assert stats2 == expected_stats
+    """
 
-    # That visit yields the same snapshot
+    archive_name = "transplant"
+    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
+    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
+
+    loader = HgLoader(swh_storage, url=repo_url, visit_date=VISIT_DATE,)
+
+    # load hg repository
+    actual_load_status = loader.load()
+    assert actual_load_status == {"status": "eventful"}
+
+    # collect swh revisions
     assert_last_visit_matches(
-        loader2.storage,
-        archive_path,
-        status="full",
-        type="hg",
-        snapshot=expected_snapshot.id,
+        loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full"
     )
 
+    revisions = []
+    snapshot = snapshot_get_latest(loader.storage, repo_url)
+    for branch in snapshot.branches.values():
+        if branch.target_type.value != "revision":
+            continue
+        revisions.append(branch.target)
+
+    # extract original changesets info and the transplant sources
+    hg_changesets = set()
+    transplant_sources = set()
+    for rev in loader.storage.revision_log(revisions):
+        extids = list(
+            loader.storage.extid_get_from_target(ObjectType.REVISION, [rev["id"]])
+        )
+        assert len(extids) == 1
+        hg_changesets.add(hash_to_hex(extids[0].extid))
+        for k, v in rev["extra_headers"]:
+            if k == b"transplant_source":
+                transplant_sources.add(v.decode("ascii"))
+
+    # check extracted data are valid
+    assert len(hg_changesets) > 0
+    assert len(transplant_sources) > 0
+    assert transplant_sources <= hg_changesets
 
-def test_visit_with_archive_decompression_failure(swh_storage, mocker, datadir):
-    """Failure to decompress should fail early, no data is ingested"""
-    mock_patoo = mocker.patch("swh.loader.mercurial.archive_extract.patoolib")
-    mock_patoo.side_effect = ValueError
 
+def _partial_copy_storage(
+    old_storage, origin_url: str, mechanism: str, copy_revisions: bool
+):
+    """Create a new storage, and only copy ExtIDs or head revisions to it."""
+    new_storage = get_storage(cls="memory")
+    snapshot = snapshot_get_latest(old_storage, origin_url)
+    assert snapshot
+    heads = [branch.target for branch in snapshot.branches.values()]
+
+    if mechanism == "extid":
+        extids = old_storage.extid_get_from_target(ObjectType.REVISION, heads)
+        new_storage.extid_add(extids)
+        if copy_revisions:
+            # copy revisions, but erase their metadata to make sure the loader doesn't
+            # fallback to revision.metadata["nodeid"]
+            revisions = [
+                attr.evolve(rev, metadata={})
+                for rev in old_storage.revision_get(heads)
+                if rev
+            ]
+            new_storage.revision_add(revisions)
+
+    else:
+        assert mechanism == "same storage"
+        return old_storage
+
+    # copy origin, visit, status
+    new_storage.origin_add(old_storage.origin_get([origin_url]))
+    visit = old_storage.origin_visit_get_latest(origin_url)
+    new_storage.origin_visit_add([visit])
+    statuses = old_storage.origin_visit_status_get(origin_url, visit.visit).results
+    new_storage.origin_visit_status_add(statuses)
+    new_storage.snapshot_add([snapshot])
+
+    return new_storage
+
+
+def test_load_unchanged_repo_should_be_uneventful(
+    swh_storage, datadir, tmp_path,
+):
+    """Checks the loader can find which revisions it already loaded, using ExtIDs."""
     archive_name = "hello"
     archive_path = os.path.join(datadir, f"{archive_name}.tgz")
+    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
+    repo_path = repo_url.replace("file://", "")
 
-    loader = HgArchiveBundle20Loader(
-        swh_storage, url=archive_path, visit_date=VISIT_DATE,
+    loader = HgLoader(swh_storage, repo_path)
+
+    assert loader.load() == {"status": "eventful"}
+    assert get_stats(loader.storage) == {
+        "content": 3,
+        "directory": 3,
+        "origin": 1,
+        "origin_visit": 1,
+        "release": 1,
+        "revision": 3,
+        "skipped_content": 0,
+        "snapshot": 1,
+    }
+    visit_status = assert_last_visit_matches(
+        loader.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full",
     )
+    assert visit_status.snapshot is not None
 
-    actual_load_status = loader.load()
-    assert actual_load_status == {"status": "failed"}
+    # Create a new loader (to start with a clean slate, eg. remove the caches),
+    # with the new, partial, storage
+    loader2 = HgLoader(swh_storage, repo_path)
+    assert loader2.load() == {"status": "uneventful"}
 
-    stats = get_stats(swh_storage)
-    assert stats == {
+    # Should have all the objects
+    assert get_stats(loader.storage) == {
+        "content": 3,
+        "directory": 3,
+        "origin": 1,
+        "origin_visit": 2,
+        "release": 1,
+        "revision": 3,
+        "skipped_content": 0,
+        "snapshot": 1,
+    }
+    visit_status2 = assert_last_visit_matches(
+        loader2.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full",
+    )
+    assert visit_status2.snapshot == visit_status.snapshot
+
+
+def test_closed_branch_incremental(swh_storage, datadir, tmp_path):
+    """Test that a repository with a closed branch does not trip an incremental load"""
+    archive_name = "example"
+    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
+    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
+    repo_path = repo_url.replace("file://", "")
+
+    loader = HgLoader(swh_storage, repo_path)
+
+    # Test 3 loads: full, and two incremental.
+    assert loader.load() == {"status": "eventful"}
+    expected_stats = {
+        "content": 7,
+        "directory": 16,
+        "origin": 1,
+        "origin_visit": 1,
+        "release": 0,
+        "revision": 9,
+        "skipped_content": 0,
+        "snapshot": 1,
+    }
+    assert get_stats(loader.storage) == expected_stats
+    assert loader.load() == {"status": "uneventful"}
+    assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 1 + 1}
+    assert loader.load() == {"status": "uneventful"}
+    assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 2 + 1}
+
+
+def test_load_unchanged_repo__dangling_extid(swh_storage, datadir, tmp_path):
+    """Checks the loader will load revisions targeted by an ExtID if the
+    revisions are missing from the storage"""
+    archive_name = "hello"
+    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
+    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
+    repo_path = repo_url.replace("file://", "")
+
+    loader = HgLoader(swh_storage, repo_path)
+
+    assert loader.load() == {"status": "eventful"}
+    assert get_stats(loader.storage) == {
+        "content": 3,
+        "directory": 3,
+        "origin": 1,
+        "origin_visit": 1,
+        "release": 1,
+        "revision": 3,
+        "skipped_content": 0,
+        "snapshot": 1,
+    }
+
+    old_storage = swh_storage
+
+    # Create a new storage, and only copy ExtIDs or head revisions to it.
+    # This should be enough for the loader to know revisions were already loaded
+    new_storage = _partial_copy_storage(
+        old_storage, repo_path, mechanism="extid", copy_revisions=False
+    )
+
+    # Create a new loader (to start with a clean slate, eg. remove the caches),
+    # with the new, partial, storage
+    loader = HgLoader(new_storage, repo_path)
+
+    assert get_stats(loader.storage) == {
         "content": 0,
         "directory": 0,
         "origin": 1,
@@ -199,172 +460,236 @@
         "release": 0,
         "revision": 0,
         "skipped_content": 0,
-        "snapshot": 0,
+        "snapshot": 1,
     }
-    # That visit yields the same snapshot
-    assert_last_visit_matches(
-        swh_storage, archive_path, status="failed", type="hg", snapshot=None
-    )
 
+    assert loader.load() == {"status": "eventful"}
 
-def test_visit_error_with_snapshot_partial(swh_storage, datadir, tmp_path, mocker):
-    """Incomplete ingestion leads to a 'partial' ingestion status"""
-    mock = mocker.patch("swh.loader.mercurial.loader.HgBundle20Loader.store_metadata")
-    mock.side_effect = ValueError
+    assert get_stats(loader.storage) == {
+        "content": 3,
+        "directory": 3,
+        "origin": 1,
+        "origin_visit": 2,
+        "release": 1,
+        "revision": 3,
+        "skipped_content": 0,
+        "snapshot": 1,
+    }
 
-    archive_name = "the-sandbox"
+
+def test_missing_filelog_should_not_crash(swh_storage, datadir, tmp_path):
+    archive_name = "missing-filelog"
     archive_path = os.path.join(datadir, f"{archive_name}.tgz")
     repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
+    directory = repo_url.replace("file://", "")
 
-    loader = HgBundle20Loader(swh_storage, repo_url)
-
-    assert loader.load() == {"status": "failed"}
-
-    assert_last_visit_matches(
-        swh_storage,
-        repo_url,
-        status="partial",
-        type="hg",
-        snapshot=hash_to_bytes("3b8fe58e467deb7597b12a5fd3b2c096b8c02028"),
+    loader = HgLoader(
+        storage=swh_storage,
+        url=repo_url,
+        directory=directory,  # specify directory to avoid clone
+        visit_date=VISIT_DATE,
     )
 
+    actual_load_status = loader.load()
+    assert actual_load_status == {"status": "eventful"}
 
-@pytest.mark.parametrize(
-    "error_msg",
-    [
-        b"does not appear to be an HG repository",
-        b"404: Not Found",
-        b"404: NOT FOUND",
-        b"Name or service not known",
-    ],
-)
-def test_visit_error_with_status_not_found(
-    swh_storage, datadir, tmp_path, mocker, error_msg
-):
-    """Not reaching the repo leads to a 'not_found' ingestion status"""
-    mock = mocker.patch("hglib.clone")
-    mock.side_effect = CommandError((), 255, b"", error_msg)
+    assert_last_visit_matches(swh_storage, repo_url, status="partial", type="hg")
 
-    archive_name = "the-sandbox"
+
+def test_multiple_open_heads(swh_storage, datadir, tmp_path):
+    archive_name = "multiple-heads"
     archive_path = os.path.join(datadir, f"{archive_name}.tgz")
     repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
 
-    loader = HgBundle20Loader(swh_storage, repo_url)
+    loader = HgLoader(storage=swh_storage, url=repo_url,)
 
-    assert loader.load() == {"status": "uneventful"}
+    actual_load_status = loader.load()
+    assert actual_load_status == {"status": "eventful"}
 
-    assert_last_visit_matches(
-        swh_storage, repo_url, status="not_found", type="hg", snapshot=None,
-    )
+    assert_last_visit_matches(swh_storage, repo_url, status="full", type="hg")
 
+    snapshot = snapshot_get_latest(swh_storage, repo_url)
+    expected_branches = [
+        b"HEAD",
+        b"branch-heads/default/0",
+        b"branch-heads/default/1",
+        b"branch-tip/default",
+    ]
+    assert sorted(snapshot.branches.keys()) == expected_branches
 
-def test_visit_error_with_clone_error(swh_storage, datadir, tmp_path, mocker):
-    """Testing failures other than 'not_found'"""
+    # Check that we don't load anything the second time
+    loader = HgLoader(storage=swh_storage, url=repo_url,)
 
-    mock = mocker.patch("hglib.clone")
-    mock.side_effect = CommandError((), 255, b"", b"out of disk space")
+    actual_load_status = loader.load()
 
-    archive_name = "the-sandbox"
-    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
+    assert actual_load_status == {"status": "uneventful"}
+
+
+def hg_strip(repo: str, revset: str) -> None:
+    """Removes `revset` and all of their descendants from the local repository."""
+    # Previously called `hg strip`, it was renamed to `hg debugstrip` in Mercurial 5.7
+    # because it's most likely not what most users want to do (they should use some kind
+    # of history-rewriting tool like `histedit` or `prune`).
+    # But here, it's exactly what we want to do.
+    subprocess.check_call(["hg", "debugstrip", revset], cwd=repo)
+
+
+def test_load_repo_with_new_commits(swh_storage, datadir, tmp_path):
+    archive_name = "hello"
+    archive_path = Path(datadir, f"{archive_name}.tgz")
+    json_path = Path(datadir, f"{archive_name}.json")
     repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
 
-    loader = HgBundle20Loader(swh_storage, repo_url)
+    # first load with missing commits
+    hg_strip(repo_url.replace("file://", ""), "tip")
+    loader = HgLoader(swh_storage, repo_url)
+    assert loader.load() == {"status": "eventful"}
+    assert get_stats(loader.storage) == {
+        "content": 2,
+        "directory": 2,
+        "origin": 1,
+        "origin_visit": 1,
+        "release": 0,
+        "revision": 2,
+        "skipped_content": 0,
+        "snapshot": 1,
+    }
 
-    assert loader.load() == {"status": "failed"}
+    # second load with all commits
+    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
+    loader = HgLoader(swh_storage, repo_url)
+    checker = LoaderChecker(loader=loader, expected=ExpectedSwhids.load(json_path),)
 
-    assert_last_visit_matches(
-        swh_storage, repo_url, status="failed", type="hg", snapshot=None,
-    )
+    checker.check()
 
+    assert get_stats(loader.storage) == {
+        "content": 3,
+        "directory": 3,
+        "origin": 1,
+        "origin_visit": 2,
+        "release": 1,
+        "revision": 3,
+        "skipped_content": 0,
+        "snapshot": 2,
+    }
 
-def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path):
-    """Visit a mercurial repository visit transplant operations within should yield a
-    snapshot as well.
 
-    """
+def test_load_repo_check_extids_write_version(swh_storage, datadir, tmp_path):
+    """ExtIDs should be stored with a given version when loading is done"""
+    archive_name = "hello"
+    archive_path = Path(datadir, f"{archive_name}.tgz")
+    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
 
-    archive_name = "transplant"
+    hg_strip(repo_url.replace("file://", ""), "tip")
+    loader = HgLoader(swh_storage, repo_url)
+    assert loader.load() == {"status": "eventful"}
+
+    # Ensure we write ExtIDs to a specific version.
+    snapshot = snapshot_get_latest(swh_storage, repo_url)
+
+    # First, filter out revisions from that snapshot
+    revision_ids = [
+        branch.target
+        for branch in snapshot.branches.values()
+        if branch.target_type == TargetType.REVISION
+    ]
+
+    assert len(revision_ids) > 0
+
+    # Those revisions should have their associated ExtID version set to EXTID_VERSION
+    extids = swh_storage.extid_get_from_target(ObjectType.REVISION, revision_ids)
+
+    assert len(extids) == len(revision_ids)
+    for extid in extids:
+        assert extid.extid_version == EXTID_VERSION
+
+
+def test_load_new_extid_should_be_eventful(swh_storage, datadir, tmp_path):
+    """Changing the extid version should make loaders ignore existing extids,
+    and load the repo again."""
+    archive_name = "hello"
     archive_path = os.path.join(datadir, f"{archive_name}.tgz")
     repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
-    loader = HgBundle20Loader(swh_storage, url=repo_url, visit_date=VISIT_DATE,)
+    repo_path = repo_url.replace("file://", "")
 
-    # load hg repository
-    actual_load_status = loader.load()
-    assert actual_load_status == {"status": "eventful"}
+    with unittest.mock.patch("swh.loader.mercurial.loader.EXTID_VERSION", 0):
+        loader = HgLoader(swh_storage, repo_path)
+        assert loader.load() == {"status": "eventful"}
 
-    # collect swh revisions
-    assert_last_visit_matches(
-        swh_storage, repo_url, type=RevisionType.MERCURIAL.value, status="full"
-    )
-
-    revisions = []
-    snapshot = snapshot_get_latest(swh_storage, repo_url)
-    for branch in snapshot.branches.values():
-        if branch.target_type.value != "revision":
-            continue
-        revisions.append(branch.target)
+    loader = HgLoader(swh_storage, repo_path)
+    assert loader.load() == {"status": "eventful"}
 
-    # extract original changesets info and the transplant sources
-    hg_changesets = set()
-    transplant_sources = set()
-    for rev in swh_storage.revision_log(revisions):
-        extids = list(
-            loader.storage.extid_get_from_target(ObjectType.REVISION, [rev["id"]])
-        )
-        assert len(extids) == 1
-        hg_changesets.add(hash_to_hex(extids[0].extid))
-        for k, v in rev["extra_headers"]:
-            if k == b"transplant_source":
-                transplant_sources.add(v.decode("ascii"))
+    loader = HgLoader(swh_storage, repo_path)
+    assert loader.load() == {"status": "uneventful"}
 
-    # check extracted data are valid
-    assert len(hg_changesets) > 0
-    assert len(transplant_sources) > 0
-    assert transplant_sources.issubset(hg_changesets)
+    with unittest.mock.patch("swh.loader.mercurial.loader.EXTID_VERSION", 10000):
+        loader = HgLoader(swh_storage, repo_path)
+        assert loader.load() == {"status": "eventful"}
 
+        loader = HgLoader(swh_storage, repo_path)
+        assert loader.load() == {"status": "uneventful"}
 
-def test_clone_with_timeout_timeout(caplog, tmp_path, monkeypatch):
-    log = logging.getLogger("test_clone_with_timeout")
 
-    def clone_timeout(source, dest, *args, **kwargs):
-        time.sleep(60)
+def test_loader_hg_extid_filtering(swh_storage, datadir, tmp_path):
+    """The first visit of a fork should filter already seen revisions (through extids)
 
-    monkeypatch.setattr(hglib, "clone", clone_timeout)
+    """
+    archive_name = "the-sandbox"
+    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
+    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
 
-    with pytest.raises(CloneTimeoutError):
-        HgBundle20Loader.clone_with_timeout(
-            log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1
-        )
+    loader = HgLoader(swh_storage, url=repo_url)
 
-    for record in caplog.records:
-        assert record.levelname == "WARNING"
-        assert "https://www.mercurial-scm.org/repo/hello" in record.getMessage()
-        assert record.args == ("https://www.mercurial-scm.org/repo/hello", 1)
+    assert loader.load() == {"status": "eventful"}
+    stats = get_stats(loader.storage)
+    expected_stats = {
+        "content": 2,
+        "directory": 3,
+        "origin": 1,
+        "origin_visit": 1,
+        "release": 0,
+        "revision": 58,
+        "skipped_content": 0,
+        "snapshot": 1,
+    }
+    assert stats == expected_stats
 
+    visit_status = assert_last_visit_matches(
+        loader.storage, repo_url, status="full", type="hg",
+    )
 
-def test_clone_with_timeout_returns(caplog, tmp_path, monkeypatch):
-    log = logging.getLogger("test_clone_with_timeout")
+    # Make a fork of the first repository we ingested
+    fork_url = prepare_repository_from_archive(
+        archive_path, "the-sandbox-reloaded", tmp_path
+    )
+    loader2 = HgLoader(
+        swh_storage, url=fork_url, directory=str(tmp_path / archive_name)
+    )
 
-    def clone_return(source, dest, *args, **kwargs):
-        return (source, dest)
+    assert loader2.load() == {"status": "uneventful"}
 
-    monkeypatch.setattr(hglib, "clone", clone_return)
+    stats = get_stats(loader.storage)
+    expected_stats2 = expected_stats.copy()
+    expected_stats2.update(
+        {"origin": 1 + 1, "origin_visit": 1 + 1,}
+    )
+    assert stats == expected_stats2
 
-    assert HgBundle20Loader.clone_with_timeout(
-        log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1
-    ) == ("https://www.mercurial-scm.org/repo/hello", tmp_path)
+    visit_status2 = assert_last_visit_matches(
+        loader.storage, fork_url, status="full", type="hg",
+    )
+    assert visit_status.snapshot is not None
+    assert visit_status2.snapshot == visit_status.snapshot
 
 
-def test_clone_with_timeout_exception(caplog, tmp_path, monkeypatch):
-    log = logging.getLogger("test_clone_with_timeout")
+def test_loader_repository_with_bookmark_information(swh_storage, datadir, tmp_path):
+    """Repository with bookmark information should be ingested correctly
 
-    def clone_return(source, dest, *args, **kwargs):
-        raise ValueError("Test exception")
+    """
+    archive_name = "anomad-d"
+    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
+    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
 
-    monkeypatch.setattr(hglib, "clone", clone_return)
+    loader = HgLoader(swh_storage, url=repo_url)
 
-    with pytest.raises(ValueError) as excinfo:
-        HgBundle20Loader.clone_with_timeout(
-            log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1
-        )
-    assert "Test exception" in excinfo.value.args[0]
+    assert loader.load() == {"status": "eventful"}
diff --git a/swh/loader/mercurial/tests/test_tasks.py b/swh/loader/mercurial/tests/test_tasks.py
--- a/swh/loader/mercurial/tests/test_tasks.py
+++ b/swh/loader/mercurial/tests/test_tasks.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2018-2020  The Software Heritage developers
+# Copyright (C) 2018-2021  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
@@ -7,7 +7,7 @@
 def test_loader(
     mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker
 ):
-    mock_loader = mocker.patch("swh.loader.mercurial.from_disk.HgLoaderFromDisk.load")
+    mock_loader = mocker.patch("swh.loader.mercurial.loader.HgLoader.load")
     mock_loader.return_value = {"status": "eventful"}
 
     res = swh_scheduler_celery_app.send_task(
@@ -26,9 +26,7 @@
 def test_archive_loader(
     mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker
 ):
-    mock_loader = mocker.patch(
-        "swh.loader.mercurial.from_disk.HgArchiveLoaderFromDisk.load"
-    )
+    mock_loader = mocker.patch("swh.loader.mercurial.loader.HgArchiveLoader.load")
     mock_loader.return_value = {"status": "uneventful"}
 
     res = swh_scheduler_celery_app.send_task(
diff --git a/swh/loader/mercurial/tests/test_tasks_from_disk.py b/swh/loader/mercurial/tests/test_tasks_from_disk.py
deleted file mode 100644
--- a/swh/loader/mercurial/tests/test_tasks_from_disk.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Copyright (C) 2018-2020  The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-
-def test_loader(
-    mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker
-):
-    mock_loader = mocker.patch("swh.loader.mercurial.from_disk.HgLoaderFromDisk.load")
-    mock_loader.return_value = {"status": "eventful"}
-
-    res = swh_scheduler_celery_app.send_task(
-        "swh.loader.mercurial.tasks_from_disk.LoadMercurialFromDisk",
-        kwargs={"url": "origin_url", "directory": "/some/repo", "visit_date": "now",},
-    )
-
-    assert res
-    res.wait()
-    assert res.successful()
-
-    assert res.result == {"status": "eventful"}
-    mock_loader.assert_called_once_with()
-
-
-def test_archive_loader(
-    mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker
-):
-    mock_loader = mocker.patch(
-        "swh.loader.mercurial.from_disk.HgArchiveLoaderFromDisk.load"
-    )
-    mock_loader.return_value = {"status": "uneventful"}
-
-    res = swh_scheduler_celery_app.send_task(
-        "swh.loader.mercurial.tasks_from_disk.LoadArchiveMercurialFromDisk",
-        kwargs={
-            "url": "another_url",
-            "archive_path": "/some/tar.tgz",
-            "visit_date": "now",
-        },
-    )
-    assert res
-    res.wait()
-    assert res.successful()
-
-    assert res.result == {"status": "uneventful"}
-    mock_loader.assert_called_once_with()