diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -54,6 +54,8 @@
     entry_points="""
         [swh.workers]
         loader.mercurial=swh.loader.mercurial:register
+        [console_scripts]
+        swh-hg-identify=swh.loader.mercurial.identify:main
     """,
     classifiers=[
         "Programming Language :: Python :: 3",
diff --git a/swh/loader/mercurial/identify.py b/swh/loader/mercurial/identify.py
new file mode 100644
--- /dev/null
+++ b/swh/loader/mercurial/identify.py
@@ -0,0 +1,532 @@
+# Copyright (C) 2020  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import json
+import re
+import subprocess
+from codecs import escape_decode  # type: ignore
+from pathlib import Path
+from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union
+
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
+import click
+
+from swh.model.hashutil import hash_to_bytehex
+from swh.model.identifiers import normalize_timestamp, swhid
+from swh.model.model import RevisionType
+
+TAG_PATTERN = re.compile(b"([0-9A-Fa-f]{40}) +(.+)")
+
+
+class HgAuthor(NamedTuple):
+    """Represent a Mercurial revision author."""
+
+    fullname: bytes
+    """full name of the author"""
+
+    name: Optional[bytes]
+    """name of the author"""
+
+    email: Optional[bytes]
+    """email of the author"""
+
+    @staticmethod
+    def from_bytes(data: bytes) -> "HgAuthor":
+        """Convert bytes to an HgAuthor named tuple.
+
+        Expected format: "name <email>"
+        """
+        from swh.loader.mercurial.converters import parse_author
+
+        result = parse_author(data)
+        return HgAuthor(
+            fullname=result["fullname"], name=result["name"], email=result["email"]
+        )
+
+    def to_dict(self) -> Dict[str, Optional[bytes]]:
+        return {"fullname": self.fullname, "name": self.name, "email": self.email}
+
+
+HG_REVISION_TEMPLATE = "\n".join(
+    [
+        "node_id:{node}",
+        "author:{author}",
+        "timestamp_offset:{date|json}",
+        "p1:{p1.node}",
+        "p2:{p2.node}",
+        "extras:{join(extras, '\nextras:')}",
+    ]
+)  # Log template for HgRevision.from_bytes
+
+NULL_NODE_ID = b"0" * 40  # Value used when no parent
+
+
+class HgRevision(NamedTuple):
+    """Represent a Mercurial revision."""
+
+    node_id: bytes
+    """raw bytes of the revision hash"""
+
+    author: HgAuthor
+    """author of the revision"""
+
+    timestamp: bytes
+    """timestamp of the revision"""
+
+    offset: bytes
+    """offset of the revision"""
+
+    parents: List[bytes]
+    """hex bytes of the revision's parents"""
+
+    extras: Dict[bytes, bytes]
+    """metadata of the revision"""
+
+    description: bytes
+    """description of the revision"""
+
+    @staticmethod
+    def from_bytes(data: bytes, description: bytes) -> "HgRevision":
+        """Convert bytes to an HgRevision named tuple.
+
+        Expected data format:
+        '''
+        node_id:{node}
+        author:{author}
+        timestamp_offset:[{timestamp}, {offset}]
+        p1:{p1}
+        p2:{p2}
+        extras:{key1}={value1}
+        ...
+        extras:{keyn}={value}
+        '''
+
+        """
+        lines = data.split(b"\n")
+        tuples = [line.split(b":", 1) for line in lines]
+        fields: Dict[str, Any] = {
+            "parents": [],
+            "extras": {},
+            "description": description,
+        }
+        for key, value in tuples:
+            if key == b"timestamp_offset":
+                timestamp, offset = json.loads(value)
+                fields["timestamp"] = timestamp
+                fields["offset"] = offset
+            elif key in (b"p1", b"p2"):
+                if value != NULL_NODE_ID:
+                    fields["parents"].append(value)
+            elif key == b"extras":
+                extra_key, extra_value = value.split(b"=", 1)
+                fields["extras"][extra_key] = extra_value
+            elif key == b"author":
+                fields["author"] = HgAuthor.from_bytes(value)
+            else:
+                fields[key.decode()] = value
+
+        return HgRevision(**fields)
+
+    def branch(self) -> bytes:
+        return self.extras.get(b"branch", b"default")
+
+    def to_dict(self) -> Dict:
+        """Convert a HgRevision to a dict for SWHID computation"""
+        date = normalize_timestamp(int(self.timestamp))
+
+        extra_headers = [
+            (b"time_offset_seconds", str(self.offset).encode("utf-8")),
+        ]
+
+        for key, value in self.extras.items():
+            if key == b"branch" and value == b"default":
+                # branch default is skipped to match actual implementation
+                continue
+            if key == b"transplant_source":
+                # transplant_source is converted to hex to match actual implementation
+                value = hash_to_bytehex(escape_decode(value)[0])
+            extra_headers.append((key, value))
+
+        author = self.author.to_dict()
+
+        return {
+            "author": author,
+            "date": date,
+            "committer": author,
+            "committer_date": date,
+            "type": RevisionType.MERCURIAL.value,
+            "message": self.description,
+            "metadata": {"node": self.node_id},
+            "extra_headers": tuple(extra_headers),
+            "synthetic": False,
+            "parents": self.parents,
+        }
+
+
+class HgBranch(NamedTuple):
+    """Represent a Mercurial branch."""
+
+    name: bytes
+    """name of the branch"""
+
+    node_id: bytes
+    """row bytes of the target revision hash"""
+
+
+class HgTag(NamedTuple):
+    """Represent a Mercurial tag."""
+
+    name: bytes
+    """name of the tag"""
+
+    node_id: bytes
+    """hex bytes of the target revision"""
+
+
+class Hg:
+    """Provide methods to extract data from a Mercurial repository."""
+
+    def __init__(self, repository_root: Path) -> None:
+        self._root = repository_root
+
+    def _output(self, *args) -> bytes:
+        """Return the outpout of a `hg` call."""
+        return subprocess.check_output(["hg", *args], cwd=self._root)
+
+    def _call(self, *args) -> None:
+        """Perform a `hg` call."""
+        subprocess.check_call(
+            ["hg", *args],
+            cwd=self._root,
+            stderr=subprocess.PIPE,
+            stdout=subprocess.PIPE,
+        )
+
+    def root(self) -> Path:
+        """Return the root of the Mercurial repository."""
+        return self._root
+
+    def log(self, rev: Optional[Union[bytes, str]] = None) -> List[HgRevision]:
+        """Return the specified revisions of the Mercurial repository.
+
+        Mercurial revsets are supported. (See `hg help revsets`)
+
+        If no revision range is specified, return all revisions".
+        """
+        if rev:
+            node_ids = self._output("log", "-r", rev, "-T", "{node}\n").splitlines()
+        else:
+            node_ids = self._output("log", "-T", "{node}\n").splitlines()
+
+        revisions = [self._revision(node_id) for node_id in reversed(node_ids)]
+
+        return revisions
+
+    def _revision(self, revision: bytes) -> HgRevision:
+        data = self._output("log", "-r", revision, "-T", HG_REVISION_TEMPLATE)
+
+        # hg log strips the description so the raw description has to be taken
+        # from debugdata
+        _, desc = self._output("debugdata", "-c", revision).split(b"\n\n", 1)
+
+        return HgRevision.from_bytes(data, desc)
+
+    def up(self, rev: bytes) -> None:
+        """Update the repository working directory to the specified revision."""
+        self._call("up", rev)
+
+    def branches(self) -> List[HgBranch]:
+        """List the repository named branches."""
+        output = self._output("branches", "-T", "{branch}\n{node}\n\n").strip()
+
+        branches = []
+
+        for block in output.split(b"\n\n"):
+            name, node_id = block.splitlines()
+            branches.append(HgBranch(name=name, node_id=node_id))
+
+        return branches
+
+    def tip(self) -> HgRevision:
+        """Return the `tip` revision."""
+        return self.log("tip")[0]
+
+    def tags(self) -> List[HgTag]:
+        """Return the repository tags at the current revision."""
+        hgtags = self._root / ".hgtags"
+
+        tags = {}
+
+        if hgtags.is_file():
+            for line in hgtags.read_bytes().splitlines():
+                match = TAG_PATTERN.match(line)
+                if match is None:
+                    continue
+                node_id, name = match.groups()
+                tags[node_id] = name
+
+        return [HgTag(name=name, node_id=node_id) for node_id, name in tags.items()]
+
+
+@click.group()
+@click.option(
+    "--directory",
+    "-d",
+    help=("Path to the Mercurial repository. If unset, the current directory is used"),
+)
+@click.pass_context
+def main(ctx, directory=None):
+    """Compute the Software Heritage persistent identifier (SWHID) for the given
+       source code object(s).
+
+    For more details about SWHIDs see:
+
+    https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html
+    """
+    # ensure that ctx.obj exists and is a dict (in case `cli()` is called
+    # by means other than the `if` block below)
+    ctx.ensure_object(dict)
+
+    root = Path(directory) if directory else Path()
+    if not root.exists():
+        raise IOError(f"{root!r} does not exists")
+
+    ctx.obj["HG_ROOT"] = root
+
+
+def identify_directory(path: Path) -> str:
+    """Return the SWHID of the given path."""
+    output = subprocess.check_output(
+        ["swh-identify", "--exclude", ".hg", "."], cwd=path
+    ).decode()
+    return output.split()[0].split(":")[-1]
+
+
+class RevisionIdentity(NamedTuple):
+    """Represent a swh revision identity."""
+
+    swhid: bytes
+    """SWHID raw bytes"""
+
+    node_id: bytes
+    """node_id hex bytes"""
+
+    directory_swhid: bytes
+
+    def dir_uri(self) -> str:
+        """Return the SWHID uri of the revision's directory."""
+        return f"swh:1:dir:{self.directory_swhid.hex()}\t{self.node_id.decode()}"
+
+    def __str__(self) -> str:
+        """Return the string representation of a RevisionIdentity."""
+        uri = swhid("revision", self.swhid.hex())
+        return f"{uri}\t{self.node_id.decode()}"
+
+
+def identify_revision(
+    hg: Hg,
+    rev: Optional[bytes] = None,
+    node_id_2_swhid: Optional[Dict[bytes, bytes]] = None,
+) -> Iterator[RevisionIdentity]:
+    """Return the repository revision identities.
+
+    hg: A `Hg` repository instance
+    rev: An optional revision or Mercurial revsets (See `hg help revsets`)
+         If not provided all the repository revisions will be computed.
+    node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs
+        It will be updated in place with new mappings.
+    """
+    from swh.model.hashutil import hash_to_bytes
+    from swh.model.model import Revision
+
+    if node_id_2_swhid is None:
+        node_id_2_swhid = {}
+
+    for revision in hg.log(rev):
+        data = revision.to_dict()
+
+        hg.up(revision.node_id)
+        directory_swhid = hash_to_bytes(identify_directory(hg.root()))
+        data["directory"] = directory_swhid
+
+        parents = []
+        for parent in data["parents"]:
+            if parent not in node_id_2_swhid:
+                parent_revision = next(identify_revision(hg, parent, node_id_2_swhid))
+                node_id_2_swhid[parent] = parent_revision.swhid
+            parents.append(node_id_2_swhid[parent])
+        data["parents"] = parents
+
+        revision_swhid = hash_to_bytes(Revision.from_dict(data).id)
+        node_id_2_swhid[revision.node_id] = revision_swhid
+
+        yield RevisionIdentity(
+            swhid=revision_swhid,
+            node_id=revision.node_id,
+            directory_swhid=directory_swhid,
+        )
+
+
+class ReleaseIdentity(NamedTuple):
+    """Represent a swh release identity."""
+
+    swhid: str
+    """SWHID hex string"""
+
+    node_id: bytes
+    """node_id hex bytes"""
+
+    name: bytes
+    """name of the release"""
+
+    def __str__(self) -> str:
+        """Return the string representation of a ReleaseIdentity."""
+        uri = swhid("release", self.swhid)
+        return f"{uri}\t{self.name.decode()}"
+
+
+def identify_release(
+    hg: Hg, node_id_2_swhid: Optional[Dict[bytes, bytes]] = None,
+) -> Iterator[ReleaseIdentity]:
+    """Return the repository's release identities.
+
+    hg: A `Hg` repository instance
+    node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs
+        If not provided it will be computed using `identify_revision`.
+    """
+    from swh.model.model import ObjectType, Release
+
+    if node_id_2_swhid is None:
+        node_id_2_swhid = {
+            revision.node_id: revision.swhid for revision in identify_revision(hg)
+        }
+
+    for tag in hg.tags():
+        data = {
+            "name": tag.name,
+            "target": node_id_2_swhid[tag.node_id],
+            "target_type": ObjectType.REVISION.value,
+            "message": None,
+            "metadata": None,
+            "synthetic": False,
+            "author": {"name": None, "email": None, "fullname": b""},
+            "date": None,
+        }
+
+        release_swhid = Release.from_dict(data).id
+
+        yield ReleaseIdentity(
+            swhid=release_swhid, node_id=tag.node_id, name=tag.name,
+        )
+
+
+def identify_snapshot(
+    hg: Hg,
+    node_id_2_swhid: Optional[Dict[bytes, bytes]] = None,
+    releases: Optional[List[ReleaseIdentity]] = None,
+) -> str:
+    """Return the repository snapshot identity.
+
+    hg: A `Hg` repository instance
+    node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs
+         If not provided it will be computed using `identify_revision`.
+    release: an optional list of `ReleaseIdentity`.
+        If not provided it will be computed using `identify_release`.
+    """
+    from swh.model.model import Snapshot, TargetType
+
+    if node_id_2_swhid is None:
+        node_id_2_swhid = {
+            revision.node_id: revision.swhid for revision in identify_revision(hg)
+        }
+
+    if releases is None:
+        releases = [release for release in identify_release(hg, node_id_2_swhid)]
+
+    branches = {}
+
+    tip = hg.tip()
+    branches[b"HEAD"] = {
+        "target": tip.branch(),
+        "target_type": TargetType.ALIAS.value,
+    }
+
+    for branch in hg.branches():
+        branches[branch.name] = {
+            "target": node_id_2_swhid[branch.node_id],
+            "target_type": TargetType.REVISION.value,
+        }
+
+    for release in releases:
+        branches[release.name] = {
+            "target": release.swhid,
+            "target_type": TargetType.RELEASE.value,
+        }
+
+    return Snapshot.from_dict({"branches": branches}).id
+
+
+@main.command()
+@click.argument("rev", required=False)
+@click.pass_context
+def revision(ctx, rev):
+    """Compute the SWHID of a given revision.
+
+    If specified REV allow to select a single or multiple revisions
+    (using the Mercurial revsets language: `hg help revsets`)
+    """
+    hg = Hg(ctx.obj["HG_ROOT"])
+
+    for identity in identify_revision(hg, rev):
+        click.echo(identity)
+
+
+@main.command()
+@click.pass_context
+def snapshot(ctx):
+    """Compute the SWHID of the snapshot."""
+    root = ctx.obj["HG_ROOT"]
+    hg = Hg(root)
+
+    snapshot_swhid = identify_snapshot(hg)
+
+    uri = swhid("snapshot", snapshot_swhid)
+    click.echo(f"{uri}\t{root}")
+
+
+@main.command()
+@click.pass_context
+def all(ctx):
+    """Compute the SWHID of all the repository objects."""
+    root = ctx.obj["HG_ROOT"]
+    hg = Hg(root)
+
+    dir_uris = []
+    rev_uris = []
+    rel_uris = []
+
+    node_id_2_swhid = {}
+    for revision in identify_revision(hg):
+        dir_uris.append(revision.dir_uri())
+        rev_uris.append(str(revision))
+        node_id_2_swhid[revision.node_id] = revision.swhid
+
+    releases = []
+    for release in identify_release(hg, node_id_2_swhid):
+        rel_uris.append(str(release))
+        releases.append(release)
+
+    snapshot_swhid = identify_snapshot(hg, node_id_2_swhid, releases)
+
+    for uri in dir_uris + rev_uris + rel_uris:
+        click.echo(uri)
+
+    uri = swhid("snapshot", snapshot_swhid)
+    click.echo(f"{uri}\t{root}")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/swh/loader/mercurial/tests/test_identify.py b/swh/loader/mercurial/tests/test_identify.py
new file mode 100644
--- /dev/null
+++ b/swh/loader/mercurial/tests/test_identify.py
@@ -0,0 +1,93 @@
+# Copyright (C) 2020  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import os
+from urllib.parse import urlsplit
+
+from click.testing import CliRunner
+
+from swh.loader.mercurial.identify import main
+from swh.loader.tests import prepare_repository_from_archive
+
+
+def test_all_revisions(datadir: str, tmp_path: str):
+    archive_name = "hello"
+    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
+    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
+    directory = urlsplit(repo_url).path
+
+    runner = CliRunner()
+    result = runner.invoke(main, ["-d", directory, "revision"])
+
+    expected = (
+        "\n".join(
+            [
+                "swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940"
+                "\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9",
+                "swh:1:rev:8dd3db5d5519e4947f035d141581d304565372d2"
+                "\t82e55d328c8ca4ee16520036c0aaace03a5beb65",
+                "swh:1:rev:c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27"
+                "\tb985ae4a07e12ac662f45a171e2d42b13be5b50c",
+            ]
+        )
+        + "\n"
+    )
+    assert result.output == expected
+
+
+def test_single_revision(datadir: str, tmp_path: str):
+    archive_name = "hello"
+    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
+    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
+    directory = urlsplit(repo_url).path
+
+    runner = CliRunner()
+    result = runner.invoke(
+        main, ["-d", directory, "revision", "0a04b987be5ae354b710cefeba0e2d9de7ad41a9"]
+    )
+
+    expected = (
+        "\n".join(
+            [
+                "swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940"
+                "\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9",
+            ]
+        )
+        + "\n"
+    )
+    assert result.output == expected
+
+
+def test_all(datadir: str, tmp_path: str):
+    archive_name = "hello"
+    archive_path = os.path.join(datadir, f"{archive_name}.tgz")
+    repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
+    directory = urlsplit(repo_url).path
+
+    runner = CliRunner()
+    result = runner.invoke(main, ["-d", directory, "all"])
+
+    expected = (
+        "\n".join(
+            [
+                "swh:1:dir:43d727f2f3f2f7cb3b098ddad1d7038464a4cee2"
+                "\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9",
+                "swh:1:dir:b3f85f210ff86d334575f64cb01c5bf49895b63e"
+                "\t82e55d328c8ca4ee16520036c0aaace03a5beb65",
+                "swh:1:dir:8f2be433c945384c85920a8e60f2a68d2c0f20fb"
+                "\tb985ae4a07e12ac662f45a171e2d42b13be5b50c",
+                "swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940"
+                "\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9",
+                "swh:1:rev:8dd3db5d5519e4947f035d141581d304565372d2"
+                "\t82e55d328c8ca4ee16520036c0aaace03a5beb65",
+                "swh:1:rev:c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27"
+                "\tb985ae4a07e12ac662f45a171e2d42b13be5b50c",
+                "swh:1:rel:515c4d72e089404356d0f4b39d60f948b8999140\t0.1",
+                f"swh:1:snp:d35668e02e2ba4321dc951cd308cf883786f918a\t{directory}",
+            ]
+        )
+        + "\n"
+    )
+    assert result.output == expected