diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py
index 90716c5..a6047fc 100644
--- a/swh/loader/core/loader.py
+++ b/swh/loader/core/loader.py
@@ -1,883 +1,919 @@
 # Copyright (C) 2015-2022  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import datetime
 import hashlib
 import logging
 import os
+from pathlib import Path
 import tempfile
 import time
 from typing import Any, ContextManager, Dict, Iterable, List, Optional, Union
 from urllib.parse import urlparse
 
 from requests.exceptions import HTTPError
 import sentry_sdk
 
 from swh.core.config import load_from_envvar
 from swh.core.statsd import Statsd
 from swh.core.tarball import uncompress
 from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister
-from swh.loader.exception import NotFound
+from swh.loader.core.utils import nix_hashes
+from swh.loader.exception import NotFound, UnsupportedChecksumComputation
 from swh.loader.package.utils import download
 from swh.model import from_disk
 from swh.model.model import (
     BaseContent,
     Content,
     Directory,
     Origin,
     OriginVisit,
     OriginVisitStatus,
     RawExtrinsicMetadata,
     Release,
     Revision,
     Sha1Git,
     SkippedContent,
     Snapshot,
     SnapshotBranch,
     TargetType,
 )
 from swh.storage import get_storage
 from swh.storage.algos.snapshot import snapshot_get_latest
 from swh.storage.interface import StorageInterface
 from swh.storage.utils import now
 
 DEFAULT_CONFIG: Dict[str, Any] = {
     "max_content_size": 100 * 1024 * 1024,
 }
 
 SENTRY_ORIGIN_URL_TAG_NAME = "swh.loader.origin_url"
 SENTRY_VISIT_TYPE_TAG_NAME = "swh.loader.visit_type"
 
 
 class BaseLoader:
     """Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g
     PyPI, Npm, CRAN, ...)
 
     A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/...
     package artifacts), ingests the contents/directories/revisions/releases/snapshot
     read from those artifacts and send them to the archive through the storage backend.
 
     The main entry point for the loader is the :func:`load` function.
 
     2 static methods (:func:`from_config`, :func:`from_configfile`) centralizes and
     eases the loader instantiation from either configuration dict or configuration file.
 
     Some class examples:
 
     - :class:`SvnLoader`
     - :class:`GitLoader`
     - :class:`PyPILoader`
     - :class:`NpmLoader`
 
     Args:
       lister_name: Name of the lister which triggered this load.
         If provided, the loader will try to use the forge's API to retrieve extrinsic
         metadata
       lister_instance_name: Name of the lister instance which triggered this load.
         Must be None iff lister_name is, but it may be the empty string for listers
         with a single instance.
     """
 
     visit_type: str
     origin: Origin
     loaded_snapshot_id: Optional[Sha1Git]
 
     parent_origins: Optional[List[Origin]]
     """If the given origin is a "forge fork" (ie. created with the "Fork" button
     of GitHub-like forges), :meth:`build_extrinsic_origin_metadata` sets this to
     a list of origins it was forked from; closest parent first."""
 
     def __init__(
         self,
         storage: StorageInterface,
         origin_url: str,
         logging_class: Optional[str] = None,
         save_data_path: Optional[str] = None,
         max_content_size: Optional[int] = None,
         lister_name: Optional[str] = None,
         lister_instance_name: Optional[str] = None,
         metadata_fetcher_credentials: CredentialsType = None,
     ):
         if lister_name == "":
             raise ValueError("lister_name must not be the empty string")
         if lister_name is None and lister_instance_name is not None:
             raise ValueError(
                 f"lister_name is None but lister_instance_name is {lister_instance_name!r}"
             )
         if lister_name is not None and lister_instance_name is None:
             raise ValueError(
                 f"lister_instance_name is None but lister_name is {lister_name!r}"
             )
 
         self.storage = storage
         self.origin = Origin(url=origin_url)
         self.max_content_size = int(max_content_size) if max_content_size else None
         self.lister_name = lister_name
         self.lister_instance_name = lister_instance_name
         self.metadata_fetcher_credentials = metadata_fetcher_credentials or {}
 
         if logging_class is None:
             logging_class = "%s.%s" % (
                 self.__class__.__module__,
                 self.__class__.__name__,
             )
         self.log = logging.getLogger(logging_class)
 
         _log = logging.getLogger("requests.packages.urllib3.connectionpool")
         _log.setLevel(logging.WARN)
 
         sentry_sdk.set_tag(SENTRY_ORIGIN_URL_TAG_NAME, self.origin.url)
         sentry_sdk.set_tag(SENTRY_VISIT_TYPE_TAG_NAME, self.visit_type)
 
         # possibly overridden in self.prepare method
         self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
 
         self.loaded_snapshot_id = None
 
         if save_data_path:
             path = save_data_path
             os.stat(path)
             if not os.access(path, os.R_OK | os.W_OK):
                 raise PermissionError("Permission denied: %r" % path)
 
         self.save_data_path = save_data_path
 
         self.parent_origins = None
 
         self.statsd = Statsd(
             namespace="swh_loader", constant_tags={"visit_type": self.visit_type}
         )
 
     @classmethod
     def from_config(cls, storage: Dict[str, Any], **config: Any):
         """Instantiate a loader from a configuration dict.
 
         This is basically a backwards-compatibility shim for the CLI.
 
         Args:
           storage: instantiation config for the storage
           config: the configuration dict for the loader, with the following keys:
             - credentials (optional): credentials list for the scheduler
             - any other kwargs passed to the loader.
 
         Returns:
           the instantiated loader
         """
         # Drop the legacy config keys which aren't used for this generation of loader.
         for legacy_key in ("storage", "celery"):
             config.pop(legacy_key, None)
 
         # Instantiate the storage
         storage_instance = get_storage(**storage)
         return cls(storage=storage_instance, **config)
 
     @classmethod
     def from_configfile(cls, **kwargs: Any):
         """Instantiate a loader from the configuration loaded from the
         SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their
         value is not None.
 
         Args:
             kwargs: kwargs passed to the loader instantiation
 
         """
         config = dict(load_from_envvar(DEFAULT_CONFIG))
         config.update({k: v for k, v in kwargs.items() if v is not None})
         return cls.from_config(**config)
 
     def save_data(self) -> None:
         """Save the data associated to the current load"""
         raise NotImplementedError
 
     def get_save_data_path(self) -> str:
         """The path to which we archive the loader's raw data"""
         if not hasattr(self, "__save_data_path"):
             year = str(self.visit_date.year)
 
             assert self.origin
             url = self.origin.url.encode("utf-8")
             origin_url_hash = hashlib.sha1(url).hexdigest()
 
             path = "%s/sha1:%s/%s/%s" % (
                 self.save_data_path,
                 origin_url_hash[0:2],
                 origin_url_hash,
                 year,
             )
 
             os.makedirs(path, exist_ok=True)
             self.__save_data_path = path
 
         return self.__save_data_path
 
     def flush(self) -> Dict[str, int]:
         """Flush any potential buffered data not sent to swh-storage.
         Returns the same value as :meth:`swh.storage.interface.StorageInterface.flush`.
         """
         return self.storage.flush()
 
     def cleanup(self) -> None:
         """Last step executed by the loader."""
         raise NotImplementedError
 
     def _store_origin_visit(self) -> None:
         """Store origin and visit references. Sets the self.visit references."""
         assert self.origin
         self.storage.origin_add([self.origin])
 
         assert isinstance(self.visit_type, str)
         self.visit = list(
             self.storage.origin_visit_add(
                 [
                     OriginVisit(
                         origin=self.origin.url,
                         date=self.visit_date,
                         type=self.visit_type,
                     )
                 ]
             )
         )[0]
 
     def prepare(self) -> None:
         """Second step executed by the loader to prepare some state needed by
            the loader.
 
         Raises
            NotFound exception if the origin to ingest is not found.
 
         """
         raise NotImplementedError
 
     def get_origin(self) -> Origin:
         """Get the origin that is currently being loaded.
         self.origin should be set in :func:`prepare_origin`
 
         Returns:
           dict: an origin ready to be sent to storage by
           :func:`origin_add`.
         """
         assert self.origin
         return self.origin
 
     def fetch_data(self) -> bool:
         """Fetch the data from the source the loader is currently loading
            (ex: git/hg/svn/... repository).
 
         Returns:
             a value that is interpreted as a boolean. If True, fetch_data needs
             to be called again to complete loading.
 
         """
         raise NotImplementedError
 
     def process_data(self) -> bool:
         """Run any additional processing between fetching and storing the data
 
         Returns:
             a value that is interpreted as a boolean. If True, fetch_data needs
             to be called again to complete loading.
             Ignored if ``fetch_data`` already returned :const:`False`.
         """
         return True
 
     def store_data(self) -> None:
         """Store fetched data in the database.
 
         Should call the :func:`maybe_load_xyz` methods, which handle the
         bundles sent to storage, rather than send directly.
         """
         raise NotImplementedError
 
     def load_status(self) -> Dict[str, str]:
         """Detailed loading status.
 
         Defaults to logging an eventful load.
 
         Returns: a dictionary that is eventually passed back as the task's
           result to the scheduler, allowing tuning of the task recurrence
           mechanism.
         """
         return {
             "status": "eventful",
         }
 
     def post_load(self, success: bool = True) -> None:
         """Permit the loader to do some additional actions according to status
         after the loading is done. The flag success indicates the
         loading's status.
 
         Defaults to doing nothing.
 
         This is up to the implementer of this method to make sure this
         does not break.
 
         Args:
             success (bool): the success status of the loading
 
         """
         pass
 
     def visit_status(self) -> str:
         """Detailed visit status.
 
         Defaults to logging a full visit.
         """
         return "full"
 
     def pre_cleanup(self) -> None:
         """As a first step, will try and check for dangling data to cleanup.
         This should do its best to avoid raising issues.
 
         """
         pass
 
     def load(self) -> Dict[str, str]:
         r"""Loading logic for the loader to follow:
 
         - Store the actual ``origin_visit`` to storage
         - Call :meth:`prepare` to prepare any eventual state
         - Call :meth:`get_origin` to get the origin we work with and store
 
         - while True:
 
           - Call :meth:`fetch_data` to fetch the data to store
           - Call :meth:`process_data` to optionally run processing between
             :meth:`fetch_data` and :meth:`store_data`
           - Call :meth:`store_data` to store the data
 
         - Call :meth:`cleanup` to clean up any eventual state put in place
              in :meth:`prepare` method.
 
         """
         try:
             with self.statsd_timed("pre_cleanup"):
                 self.pre_cleanup()
         except Exception:
             msg = "Cleaning up dangling data failed! Continue loading."
             self.log.warning(msg)
             sentry_sdk.capture_exception()
 
         self._store_origin_visit()
 
         assert (
             self.visit.visit
         ), "The method `_store_origin_visit` should set the visit (OriginVisit)"
         self.log.info(
             "Load origin '%s' with type '%s'", self.origin.url, self.visit.type
         )
 
         try:
             with self.statsd_timed("build_extrinsic_origin_metadata"):
                 metadata = self.build_extrinsic_origin_metadata()
             self.load_metadata_objects(metadata)
         except Exception as e:
             sentry_sdk.capture_exception(e)
             # Do not fail the whole task if this is the only failure
             self.log.exception(
                 "Failure while loading extrinsic origin metadata.",
                 extra={
                     "swh_task_args": [],
                     "swh_task_kwargs": {
                         "origin": self.origin.url,
                         "lister_name": self.lister_name,
                         "lister_instance_name": self.lister_instance_name,
                     },
                 },
             )
 
         total_time_fetch_data = 0.0
         total_time_process_data = 0.0
         total_time_store_data = 0.0
 
         # Initially not a success, will be True when actually one
         status = "failed"
         success = False
 
         try:
             with self.statsd_timed("prepare"):
                 self.prepare()
 
             while True:
                 t1 = time.monotonic()
                 more_data_to_fetch = self.fetch_data()
                 t2 = time.monotonic()
                 total_time_fetch_data += t2 - t1
 
                 more_data_to_fetch = self.process_data() and more_data_to_fetch
                 t3 = time.monotonic()
                 total_time_process_data += t3 - t2
 
                 self.store_data()
                 t4 = time.monotonic()
                 total_time_store_data += t4 - t3
                 if not more_data_to_fetch:
                     break
 
             self.statsd_timing("fetch_data", total_time_fetch_data * 1000.0)
             self.statsd_timing("process_data", total_time_process_data * 1000.0)
             self.statsd_timing("store_data", total_time_store_data * 1000.0)
 
             status = self.visit_status()
             visit_status = OriginVisitStatus(
                 origin=self.origin.url,
                 visit=self.visit.visit,
                 type=self.visit_type,
                 date=now(),
                 status=status,
                 snapshot=self.loaded_snapshot_id,
             )
             self.storage.origin_visit_status_add([visit_status])
             success = True
             with self.statsd_timed(
                 "post_load", tags={"success": success, "status": status}
             ):
                 self.post_load()
         except BaseException as e:
             success = False
             if isinstance(e, NotFound):
                 status = "not_found"
                 task_status = "uneventful"
             else:
                 status = "partial" if self.loaded_snapshot_id else "failed"
                 task_status = "failed"
 
             self.log.exception(
                 "Loading failure, updating to `%s` status",
                 status,
                 extra={
                     "swh_task_args": [],
                     "swh_task_kwargs": {
                         "origin": self.origin.url,
                         "lister_name": self.lister_name,
                         "lister_instance_name": self.lister_instance_name,
                     },
                 },
             )
             if not isinstance(e, (SystemExit, KeyboardInterrupt)):
                 sentry_sdk.capture_exception()
             visit_status = OriginVisitStatus(
                 origin=self.origin.url,
                 visit=self.visit.visit,
                 type=self.visit_type,
                 date=now(),
                 status=status,
                 snapshot=self.loaded_snapshot_id,
             )
             self.storage.origin_visit_status_add([visit_status])
             with self.statsd_timed(
                 "post_load", tags={"success": success, "status": status}
             ):
                 self.post_load(success=success)
             if not isinstance(e, Exception):
                 # e derives from BaseException but not Exception; this is most likely
                 # SystemExit or KeyboardInterrupt, so we should re-raise it.
                 raise
             return {"status": task_status}
         finally:
             with self.statsd_timed(
                 "flush", tags={"success": success, "status": status}
             ):
                 self.flush()
             with self.statsd_timed(
                 "cleanup", tags={"success": success, "status": status}
             ):
                 self.cleanup()
 
         return self.load_status()
 
     def load_metadata_objects(
         self, metadata_objects: List[RawExtrinsicMetadata]
     ) -> None:
         if not metadata_objects:
             return
 
         authorities = {mo.authority for mo in metadata_objects}
         self.storage.metadata_authority_add(list(authorities))
 
         fetchers = {mo.fetcher for mo in metadata_objects}
         self.storage.metadata_fetcher_add(list(fetchers))
 
         self.storage.raw_extrinsic_metadata_add(metadata_objects)
 
     def build_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadata]:
         """Builds a list of full RawExtrinsicMetadata objects, using
         a metadata fetcher returned by :func:`get_fetcher_classes`."""
         if self.lister_name is None:
             self.log.debug("lister_not provided, skipping extrinsic origin metadata")
             return []
 
         assert (
             self.lister_instance_name is not None
         ), "lister_instance_name is None, but lister_name is not"
 
         metadata = []
 
         fetcher_classes = get_fetchers_for_lister(self.lister_name)
 
         self.statsd_average("metadata_fetchers", len(fetcher_classes))
 
         for cls in fetcher_classes:
             metadata_fetcher = cls(
                 origin=self.origin,
                 lister_name=self.lister_name,
                 lister_instance_name=self.lister_instance_name,
                 credentials=self.metadata_fetcher_credentials,
             )
             with self.statsd_timed(
                 "fetch_one_metadata", tags={"fetcher": cls.FETCHER_NAME}
             ):
                 metadata.extend(metadata_fetcher.get_origin_metadata())
             if self.parent_origins is None:
                 self.parent_origins = metadata_fetcher.get_parent_origins()
                 self.statsd_average(
                     "metadata_parent_origins",
                     len(self.parent_origins),
                     tags={"fetcher": cls.FETCHER_NAME},
                 )
         self.statsd_average("metadata_objects", len(metadata))
 
         return metadata
 
     def statsd_timed(self, name: str, tags: Dict[str, Any] = {}) -> ContextManager:
         """
         Wrapper for :meth:`swh.core.statsd.Statsd.timed`, which uses the standard
         metric name and tags for loaders.
         """
         return self.statsd.timed(
             "operation_duration_seconds", tags={"operation": name, **tags}
         )
 
     def statsd_timing(self, name: str, value: float, tags: Dict[str, Any] = {}) -> None:
         """
         Wrapper for :meth:`swh.core.statsd.Statsd.timing`, which uses the standard
         metric name and tags for loaders.
         """
         self.statsd.timing(
             "operation_duration_seconds", value, tags={"operation": name, **tags}
         )
 
     def statsd_average(
         self, name: str, value: Union[int, float], tags: Dict[str, Any] = {}
     ) -> None:
         """Increments both ``{name}_sum`` (by the ``value``) and ``{name}_count``
         (by ``1``), allowing to prometheus to compute the average ``value`` over
         time."""
         self.statsd.increment(f"{name}_sum", value, tags=tags)
         self.statsd.increment(f"{name}_count", tags=tags)
 
 
 class DVCSLoader(BaseLoader):
     """This base class is a pattern for dvcs loaders (e.g. git, mercurial).
 
     Those loaders are able to load all the data in one go. For example, the
     loader defined in swh-loader-git :class:`BulkUpdater`.
 
     For other loaders (stateful one, (e.g :class:`SWHSvnLoader`),
     inherit directly from :class:`BaseLoader`.
 
     """
 
     def cleanup(self) -> None:
         """Clean up an eventual state installed for computations."""
         pass
 
     def has_contents(self) -> bool:
         """Checks whether we need to load contents"""
         return True
 
     def get_contents(self) -> Iterable[BaseContent]:
         """Get the contents that need to be loaded"""
         raise NotImplementedError
 
     def has_directories(self) -> bool:
         """Checks whether we need to load directories"""
         return True
 
     def get_directories(self) -> Iterable[Directory]:
         """Get the directories that need to be loaded"""
         raise NotImplementedError
 
     def has_revisions(self) -> bool:
         """Checks whether we need to load revisions"""
         return True
 
     def get_revisions(self) -> Iterable[Revision]:
         """Get the revisions that need to be loaded"""
         raise NotImplementedError
 
     def has_releases(self) -> bool:
         """Checks whether we need to load releases"""
         return True
 
     def get_releases(self) -> Iterable[Release]:
         """Get the releases that need to be loaded"""
         raise NotImplementedError
 
     def get_snapshot(self) -> Snapshot:
         """Get the snapshot that needs to be loaded"""
         raise NotImplementedError
 
     def eventful(self) -> bool:
         """Whether the load was eventful"""
         raise NotImplementedError
 
     def store_data(self) -> None:
         assert self.origin
         if self.save_data_path:
             self.save_data()
 
         if self.has_contents():
             for obj in self.get_contents():
                 if isinstance(obj, Content):
                     self.storage.content_add([obj])
                 elif isinstance(obj, SkippedContent):
                     self.storage.skipped_content_add([obj])
                 else:
                     raise TypeError(f"Unexpected content type: {obj}")
         if self.has_directories():
             for directory in self.get_directories():
                 self.storage.directory_add([directory])
         if self.has_revisions():
             for revision in self.get_revisions():
                 self.storage.revision_add([revision])
         if self.has_releases():
             for release in self.get_releases():
                 self.storage.release_add([release])
         snapshot = self.get_snapshot()
         self.storage.snapshot_add([snapshot])
         self.flush()
         self.loaded_snapshot_id = snapshot.id
 
 
 class NodeLoader(BaseLoader):
     """Common class for :class:`ContentLoader` and :class:`Directoryloader`.
 
     The "checksums" field is a dictionary of hex hashes on the object retrieved (content
-    or directory).
+    or directory). When "checksums_computation" is "standard", that means the checksums
+    are computed on the content of the remote file to retrieve itself (as unix cli
+    allows, "sha1sum", "sha256sum", ...). When "checksums_computation" is "nar", the
+    checks is delegated to the `nix-store --dump` command, it's actually checksums on
+    the content of the remote artifact retrieved. Other "checksums_computation" will
+    raise UnsupportedChecksumComputation
 
     The multiple "fallback" urls received are mirror urls only used to fetch the object
     if the main origin is no longer available. Those are not stored.
 
     Ingestion is considered eventful on the first ingestion. Subsequent load of the same
     object should end up being an uneventful visit (matching snapshot).
 
     """
 
     def __init__(
         self,
         storage: StorageInterface,
         url: str,
         checksums: Dict[str, str],
+        checksums_computation: str = "standard",
         fallback_urls: List[str] = None,
         **kwargs,
     ):
         super().__init__(storage, url, **kwargs)
         self.snapshot: Optional[Snapshot] = None
         self.checksums = checksums
+        self.checksums_computation = checksums_computation
+        if self.checksums_computation not in ("nar", "standard"):
+            raise UnsupportedChecksumComputation(
+                "Unsupported checksums computations: %s",
+                self.checksums_computation,
+            )
+
         fallback_urls_ = fallback_urls or []
         self.mirror_urls: List[str] = [self.origin.url, *fallback_urls_]
+        # Ensure content received matched the "standard" checksums received, this
+        # contains the checksums when checksum_computations is "standard", it's empty
+        # otherwise
+        self.standard_hashes = (
+            self.checksums if self.checksums_computation == "standard" else {}
+        )
+        self.log.debug("Loader checksums computation: %s", self.checksums_computation)
 
     def prepare(self) -> None:
         self.last_snapshot = snapshot_get_latest(self.storage, self.origin.url)
 
     def load_status(self) -> Dict[str, Any]:
         return {
             "status": "uneventful"
             if self.last_snapshot == self.snapshot
             else "eventful"
         }
 
     def cleanup(self) -> None:
         self.log.debug("cleanup")
 
 
 class ContentLoader(NodeLoader):
     """Basic loader for edge case content ingestion.
 
     The output snapshot is of the form:
 
     .. code::
 
        id: <bytes>
        branches:
          HEAD:
            target_type: content
            target: <content-id>
 
     """
 
     visit_type = "content"
 
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
         self.content: Optional[Content] = None
 
     def fetch_data(self) -> bool:
         """Retrieve the content file as a Content Object"""
         for url in self.mirror_urls:
             url_ = urlparse(url)
             self.log.debug(
                 "prepare; origin_url=%s fallback=%s scheme=%s path=%s",
                 self.origin.url,
                 url,
                 url_.scheme,
                 url_.path,
             )
             try:
+                # FIXME: Ensure no "nar" computations is required for file
+                assert self.checksums_computation == "standard"
                 with tempfile.TemporaryDirectory() as tmpdir:
                     file_path, _ = download(url, dest=tmpdir, hashes=self.checksums)
                     with open(file_path, "rb") as file:
                         self.content = Content.from_data(file.read())
             except HTTPError as http_error:
                 if http_error.response.status_code == 404:
                     self.log.debug(
                         "Not found '%s', continue on next mirror url if any", url
                     )
                 continue
             else:
                 return False  # no more data to fetch
 
         # If we reach this point, we did not find any proper content, consider the
         # origin not found
         raise NotFound(f"Unknown origin {self.origin.url}.")
 
     def process_data(self) -> bool:
         """Build the snapshot out of the Content retrieved."""
 
         assert self.content is not None
         self.snapshot = Snapshot(
             branches={
                 b"HEAD": SnapshotBranch(
                     target=self.content.sha1_git,
                     target_type=TargetType.CONTENT,
                 ),
             }
         )
 
         return False  # no more data to process
 
     def store_data(self) -> None:
         """Store newly retrieved Content and Snapshot."""
         assert self.content is not None
         self.storage.content_add([self.content])
         assert self.snapshot is not None
         self.storage.snapshot_add([self.snapshot])
         self.loaded_snapshot_id = self.snapshot.id
 
     def visit_status(self):
         return "full" if self.content and self.snapshot is not None else "partial"
 
 
 class DirectoryLoader(NodeLoader):
     """Basic loader for edge case directory ingestion (through one tarball).
 
     The output snapshot is of the form:
 
     .. code::
 
        id: <bytes>
        branches:
          HEAD:
            target_type: directory
            target: <directory-id>
 
     """
 
     visit_type = "directory"
 
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
         self.directory: Optional[from_disk.Directory] = None
         self.cnts: List[Content] = None
         self.skipped_cnts: List[SkippedContent] = None
         self.dirs: List[Directory] = None
 
     def fetch_data(self) -> bool:
         """Fetch directory as a tarball amongst the self.mirror_urls.
 
         Raises NotFound if no tarball is found
 
         """
         for url in self.mirror_urls:
             url_ = urlparse(url)
             self.log.debug(
                 "prepare; origin_url=%s fallback=%s scheme=%s path=%s",
                 self.origin.url,
                 url,
                 url_.scheme,
                 url_.path,
             )
             with tempfile.TemporaryDirectory() as tmpdir:
                 try:
                     tarball_path, extrinsic_metadata = download(
                         url,
                         tmpdir,
-                        # Ensure content received matched the checksums received
-                        hashes=self.checksums,
+                        hashes=self.standard_hashes,
                         extra_request_headers={"Accept-Encoding": "identity"},
                     )
-                except ValueError as e:
-                    # Checksum mismatch
-                    self.log.debug("Error: %s", e)
+                except ValueError:
+                    # Checksum mismatch can happen, so we
+                    self.log.debug(
+                        "Mismatched checksums <%s>: continue on next mirror url if any",
+                        url,
+                    )
                     continue
                 except HTTPError as http_error:
                     if http_error.response.status_code == 404:
                         self.log.debug(
-                            "Not found '%s', continue on next mirror url if any", url
+                            "Not found <%s>: continue on next mirror url if any", url
                         )
                     continue
 
-                directory_path = os.path.join(tmpdir, "src")
-                os.makedirs(directory_path, exist_ok=True)
-                uncompress(tarball_path, dest=directory_path)
-
+                directory_path = Path(tmpdir) / "src"
+                directory_path.mkdir(parents=True, exist_ok=True)
+                uncompress(tarball_path, dest=str(directory_path))
                 self.log.debug("uncompressed path to directory: %s", directory_path)
 
+                if self.checksums_computation == "nar":
+                    # hashes are not "standard", so we need an extra check to happen
+                    # on the uncompressed tarball
+                    dir_to_check = next(directory_path.iterdir())
+                    self.log.debug("Directory to check nar hashes: %s", dir_to_check)
+                    actual_checksums = nix_hashes(
+                        dir_to_check, self.checksums.keys()
+                    ).hexdigest()
+
+                    assert actual_checksums == self.checksums
+
                 self.directory = from_disk.Directory.from_disk(
-                    path=directory_path.encode("utf-8"),
+                    path=bytes(directory_path),
                     max_content_length=self.max_content_size,
                 )
                 # Compute the merkle dag from the top-level directory
                 self.cnts, self.skipped_cnts, self.dirs = from_disk.iter_directory(
                     self.directory
                 )
 
                 if self.directory is not None:
                     return False  # no more data to fetch
 
         # if we reach here, we did not find any proper tarball, so consider the origin
         # not found
         raise NotFound(f"Unknown origin {self.origin.url}.")
 
     def process_data(self) -> bool:
         """Build the snapshot out of the Directory retrieved."""
 
         assert self.directory is not None
         # Build the snapshot
         self.snapshot = Snapshot(
             branches={
                 b"HEAD": SnapshotBranch(
                     target=self.directory.hash,
                     target_type=TargetType.DIRECTORY,
                 ),
             }
         )
 
         return False  # no more data to process
 
     def store_data(self) -> None:
         """Store newly retrieved Content and Snapshot."""
         self.log.debug("Number of skipped contents: %s", len(self.skipped_cnts))
         self.storage.skipped_content_add(self.skipped_cnts)
         self.log.debug("Number of contents: %s", len(self.cnts))
         self.storage.content_add(self.cnts)
         self.log.debug("Number of directories: %s", len(self.dirs))
         self.storage.directory_add(self.dirs)
         assert self.snapshot is not None
         self.storage.snapshot_add([self.snapshot])
         self.loaded_snapshot_id = self.snapshot.id
 
     def visit_status(self):
         return "full" if self.directory and self.snapshot is not None else "partial"
diff --git a/swh/loader/core/tests/conftest.py b/swh/loader/core/tests/conftest.py
new file mode 100644
index 0000000..c18e911
--- /dev/null
+++ b/swh/loader/core/tests/conftest.py
@@ -0,0 +1,45 @@
+# Copyright (C) 2018-2022  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from os import path
+import shutil
+from typing import Dict, List, Union
+
+import pytest
+
+from swh.model.hashutil import MultiHash
+
+nix_store_missing = shutil.which("nix-store") is None
+
+
+@pytest.fixture
+def tarball_path(datadir):
+    """Return tarball filepath fetched by DirectoryLoader test runs."""
+    return path.join(datadir, "https_example.org", "archives_dummy-hello.tar.gz")
+
+
+def compute_hashes(
+    filepath: str, cksum_algos: Union[str, List[str]] = "sha256"
+) -> Dict[str, str]:
+    """Compute checksums dict out of a filepath"""
+    checksum_algos = {cksum_algos} if isinstance(cksum_algos, str) else set(cksum_algos)
+    return MultiHash.from_path(filepath, hash_names=checksum_algos).hexdigest()
+
+
+@pytest.fixture
+def tarball_with_std_hashes(tarball_path):
+    return (
+        tarball_path,
+        compute_hashes(tarball_path, ["sha1", "sha256", "sha512"]),
+    )
+
+
+@pytest.fixture
+def tarball_with_nar_hashes(tarball_path):
+    # FIXME: compute it instead of hard-coding it
+    return (
+        tarball_path,
+        {"sha256": "23fb1fe278aeb2de899f7d7f10cf892f63136cea2c07146da2200da4de54b7e4"},
+    )
diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py
index e1cd1fa..7822d0b 100644
--- a/swh/loader/core/tests/test_loader.py
+++ b/swh/loader/core/tests/test_loader.py
@@ -1,769 +1,803 @@
 # Copyright (C) 2018-2022  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import datetime
 import hashlib
 import logging
 import os
 import time
-from typing import Dict, List, Union
 from unittest.mock import MagicMock, call
 
 import pytest
 
 from swh.loader.core.loader import (
     SENTRY_ORIGIN_URL_TAG_NAME,
     SENTRY_VISIT_TYPE_TAG_NAME,
     BaseLoader,
     ContentLoader,
     DirectoryLoader,
     DVCSLoader,
 )
 from swh.loader.core.metadata_fetchers import MetadataFetcherProtocol
-from swh.loader.exception import NotFound
+from swh.loader.exception import NotFound, UnsupportedChecksumComputation
 from swh.loader.tests import assert_last_visit_matches
-from swh.model.hashutil import MultiHash, hash_to_bytes
+from swh.model.hashutil import hash_to_bytes
 from swh.model.model import (
     MetadataAuthority,
     MetadataAuthorityType,
     MetadataFetcher,
     Origin,
     RawExtrinsicMetadata,
     Snapshot,
 )
 import swh.storage.exc
 
+from .conftest import compute_hashes, nix_store_missing
+
 ORIGIN = Origin(url="some-url")
 PARENT_ORIGIN = Origin(url="base-origin-url")
 
 METADATA_AUTHORITY = MetadataAuthority(
     type=MetadataAuthorityType.FORGE, url="http://example.org/"
 )
 REMD = RawExtrinsicMetadata(
     target=ORIGIN.swhid(),
     discovery_date=datetime.datetime.now(tz=datetime.timezone.utc),
     authority=METADATA_AUTHORITY,
     fetcher=MetadataFetcher(
         name="test fetcher",
         version="0.0.1",
     ),
     format="test-format",
     metadata=b'{"foo": "bar"}',
 )
 
 
 class DummyLoader:
     """Base Loader to overload and simplify the base class (technical: to avoid repetition
     in other *Loader classes)"""
 
     visit_type = "git"
 
     def __init__(self, storage, *args, **kwargs):
         super().__init__(storage, ORIGIN.url, *args, **kwargs)
 
     def cleanup(self):
         pass
 
     def prepare(self, *args, **kwargs):
         pass
 
     def fetch_data(self):
         pass
 
     def get_snapshot_id(self):
         return None
 
 
 class DummyDVCSLoader(DummyLoader, DVCSLoader):
     """DVCS Loader that does nothing in regards to DAG objects."""
 
     def get_contents(self):
         return []
 
     def get_directories(self):
         return []
 
     def get_revisions(self):
         return []
 
     def get_releases(self):
         return []
 
     def get_snapshot(self):
         return Snapshot(branches={})
 
     def eventful(self):
         return False
 
 
 class DummyBaseLoader(DummyLoader, BaseLoader):
     """Buffered loader will send new data when threshold is reached"""
 
     def store_data(self):
         pass
 
 
 class DummyMetadataFetcher:
     SUPPORTED_LISTERS = {"fake-forge"}
     FETCHER_NAME = "fake-forge"
 
     def __init__(self, origin, credentials, lister_name, lister_instance_name):
         pass
 
     def get_origin_metadata(self):
         return [REMD]
 
     def get_parent_origins(self):
         return []
 
 
 class DummyMetadataFetcherWithFork:
     SUPPORTED_LISTERS = {"fake-forge"}
     FETCHER_NAME = "fake-forge"
 
     def __init__(self, origin, credentials, lister_name, lister_instance_name):
         pass
 
     def get_origin_metadata(self):
         return [REMD]
 
     def get_parent_origins(self):
         return [PARENT_ORIGIN]
 
 
 def test_types():
     assert isinstance(
         DummyMetadataFetcher(None, None, None, None), MetadataFetcherProtocol
     )
     assert isinstance(
         DummyMetadataFetcherWithFork(None, None, None, None), MetadataFetcherProtocol
     )
 
 
 def test_base_loader(swh_storage):
     loader = DummyBaseLoader(swh_storage)
     result = loader.load()
     assert result == {"status": "eventful"}
 
 
 def test_base_loader_with_config(swh_storage):
     loader = DummyBaseLoader(swh_storage, "logger-name")
     result = loader.load()
     assert result == {"status": "eventful"}
 
 
 def test_base_loader_with_known_lister_name(swh_storage, mocker):
     fetcher_cls = MagicMock(wraps=DummyMetadataFetcher)
     fetcher_cls.SUPPORTED_LISTERS = DummyMetadataFetcher.SUPPORTED_LISTERS
     fetcher_cls.FETCHER_NAME = "fake-forge"
     mocker.patch(
         "swh.loader.core.metadata_fetchers._fetchers", return_value=[fetcher_cls]
     )
 
     loader = DummyBaseLoader(
         swh_storage, lister_name="fake-forge", lister_instance_name=""
     )
     statsd_report = mocker.patch.object(loader.statsd, "_report")
     result = loader.load()
     assert result == {"status": "eventful"}
 
     fetcher_cls.assert_called_once()
     fetcher_cls.assert_called_once_with(
         origin=ORIGIN,
         credentials={},
         lister_name="fake-forge",
         lister_instance_name="",
     )
     assert swh_storage.raw_extrinsic_metadata_get(
         ORIGIN.swhid(), METADATA_AUTHORITY
     ).results == [REMD]
     assert loader.parent_origins == []
 
     assert [
         call("metadata_fetchers_sum", "c", 1, {}, 1),
         call("metadata_fetchers_count", "c", 1, {}, 1),
         call("metadata_parent_origins_sum", "c", 0, {"fetcher": "fake-forge"}, 1),
         call("metadata_parent_origins_count", "c", 1, {"fetcher": "fake-forge"}, 1),
         call("metadata_objects_sum", "c", 1, {}, 1),
         call("metadata_objects_count", "c", 1, {}, 1),
     ] == [c for c in statsd_report.mock_calls if "metadata_" in c[1][0]]
     assert loader.statsd.namespace == "swh_loader"
     assert loader.statsd.constant_tags == {"visit_type": "git"}
 
 
 def test_base_loader_with_unknown_lister_name(swh_storage, mocker):
     fetcher_cls = MagicMock(wraps=DummyMetadataFetcher)
     fetcher_cls.SUPPORTED_LISTERS = DummyMetadataFetcher.SUPPORTED_LISTERS
     mocker.patch(
         "swh.loader.core.metadata_fetchers._fetchers", return_value=[fetcher_cls]
     )
 
     loader = DummyBaseLoader(
         swh_storage, lister_name="other-lister", lister_instance_name=""
     )
     result = loader.load()
     assert result == {"status": "eventful"}
 
     fetcher_cls.assert_not_called()
     with pytest.raises(swh.storage.exc.StorageArgumentException):
         swh_storage.raw_extrinsic_metadata_get(ORIGIN.swhid(), METADATA_AUTHORITY)
 
 
 def test_base_loader_forked_origin(swh_storage, mocker):
     fetcher_cls = MagicMock(wraps=DummyMetadataFetcherWithFork)
     fetcher_cls.SUPPORTED_LISTERS = DummyMetadataFetcherWithFork.SUPPORTED_LISTERS
     fetcher_cls.FETCHER_NAME = "fake-forge"
     mocker.patch(
         "swh.loader.core.metadata_fetchers._fetchers", return_value=[fetcher_cls]
     )
 
     loader = DummyBaseLoader(
         swh_storage, lister_name="fake-forge", lister_instance_name=""
     )
     statsd_report = mocker.patch.object(loader.statsd, "_report")
     result = loader.load()
     assert result == {"status": "eventful"}
 
     fetcher_cls.assert_called_once()
     fetcher_cls.assert_called_once_with(
         origin=ORIGIN,
         credentials={},
         lister_name="fake-forge",
         lister_instance_name="",
     )
     assert swh_storage.raw_extrinsic_metadata_get(
         ORIGIN.swhid(), METADATA_AUTHORITY
     ).results == [REMD]
     assert loader.parent_origins == [PARENT_ORIGIN]
 
     assert [
         call("metadata_fetchers_sum", "c", 1, {}, 1),
         call("metadata_fetchers_count", "c", 1, {}, 1),
         call("metadata_parent_origins_sum", "c", 1, {"fetcher": "fake-forge"}, 1),
         call("metadata_parent_origins_count", "c", 1, {"fetcher": "fake-forge"}, 1),
         call("metadata_objects_sum", "c", 1, {}, 1),
         call("metadata_objects_count", "c", 1, {}, 1),
     ] == [c for c in statsd_report.mock_calls if "metadata_" in c[1][0]]
     assert loader.statsd.namespace == "swh_loader"
     assert loader.statsd.constant_tags == {"visit_type": "git"}
 
 
 def test_base_loader_post_load_raise(swh_storage, mocker):
     loader = DummyBaseLoader(swh_storage)
     post_load = mocker.patch.object(loader, "post_load")
 
     # raise exception in post_load when success is True
     def post_load_method(*args, success=True):
         if success:
             raise Exception("Error in post_load")
 
     post_load.side_effect = post_load_method
 
     result = loader.load()
     assert result == {"status": "failed"}
 
     # ensure post_load has been called twice, once with success to True and
     # once with success to False as the first post_load call raised exception
     assert post_load.call_args_list == [mocker.call(), mocker.call(success=False)]
 
 
 def test_dvcs_loader(swh_storage):
     loader = DummyDVCSLoader(swh_storage)
     result = loader.load()
     assert result == {"status": "eventful"}
 
 
 def test_dvcs_loader_with_config(swh_storage):
     loader = DummyDVCSLoader(swh_storage, "another-logger")
     result = loader.load()
     assert result == {"status": "eventful"}
 
 
 def test_loader_logger_default_name(swh_storage):
     loader = DummyBaseLoader(swh_storage)
     assert isinstance(loader.log, logging.Logger)
     assert loader.log.name == "swh.loader.core.tests.test_loader.DummyBaseLoader"
 
     loader = DummyDVCSLoader(swh_storage)
     assert isinstance(loader.log, logging.Logger)
     assert loader.log.name == "swh.loader.core.tests.test_loader.DummyDVCSLoader"
 
 
 def test_loader_logger_with_name(swh_storage):
     loader = DummyBaseLoader(swh_storage, "some.logger.name")
     assert isinstance(loader.log, logging.Logger)
     assert loader.log.name == "some.logger.name"
 
 
 def test_loader_save_data_path(swh_storage, tmp_path):
     loader = DummyBaseLoader(swh_storage, "some.logger.name.1", save_data_path=tmp_path)
     url = "http://bitbucket.org/something"
     loader.origin = Origin(url=url)
     loader.visit_date = datetime.datetime(year=2019, month=10, day=1)
 
     hash_url = hashlib.sha1(url.encode("utf-8")).hexdigest()
     expected_save_path = "%s/sha1:%s/%s/2019" % (str(tmp_path), hash_url[0:2], hash_url)
 
     save_path = loader.get_save_data_path()
     assert save_path == expected_save_path
 
 
 def _check_load_failure(
     caplog, loader, exc_class, exc_text, status="partial", origin=ORIGIN
 ):
     """Check whether a failed load properly logged its exception, and that the
     snapshot didn't get referenced in storage"""
     assert isinstance(loader, (DVCSLoader, ContentLoader, DirectoryLoader))
     for record in caplog.records:
         if record.levelname != "ERROR":
             continue
         assert "Loading failure" in record.message
         assert record.exc_info
         exc = record.exc_info[1]
         assert isinstance(exc, exc_class)
         assert exc_text in exc.args[0]
 
     if isinstance(loader, DVCSLoader):
         # Check that the get_snapshot operation would have succeeded
         assert loader.get_snapshot() is not None
 
     # And confirm that the visit doesn't reference a snapshot
     visit = assert_last_visit_matches(loader.storage, origin.url, status)
     if status != "partial":
         assert visit.snapshot is None
         # But that the snapshot didn't get loaded
         assert loader.loaded_snapshot_id is None
 
 
 @pytest.mark.parametrize("success", [True, False])
 def test_loader_timings(swh_storage, mocker, success):
     current_time = time.time()
     mocker.patch("time.monotonic", side_effect=lambda: current_time)
     mocker.patch("swh.core.statsd.monotonic", side_effect=lambda: current_time)
 
     runtimes = {
         "pre_cleanup": 2.0,
         "build_extrinsic_origin_metadata": 3.0,
         "prepare": 5.0,
         "fetch_data": 7.0,
         "process_data": 11.0,
         "store_data": 13.0,
         "post_load": 17.0,
         "flush": 23.0,
         "cleanup": 27.0,
     }
 
     class TimedLoader(BaseLoader):
         visit_type = "my-visit-type"
 
         def __getattribute__(self, method_name):
             if method_name == "visit_status" and not success:
 
                 def crashy():
                     raise Exception("oh no")
 
                 return crashy
 
             if method_name not in runtimes:
                 return super().__getattribute__(method_name)
 
             def meth(*args, **kwargs):
                 nonlocal current_time
                 current_time += runtimes[method_name]
 
             return meth
 
     loader = TimedLoader(swh_storage, origin_url="http://example.org/hello.git")
     statsd_report = mocker.patch.object(loader.statsd, "_report")
     loader.load()
 
     if success:
         expected_tags = {
             "post_load": {"success": True, "status": "full"},
             "flush": {"success": True, "status": "full"},
             "cleanup": {"success": True, "status": "full"},
         }
     else:
         expected_tags = {
             "post_load": {"success": False, "status": "failed"},
             "flush": {"success": False, "status": "failed"},
             "cleanup": {"success": False, "status": "failed"},
         }
 
     # note that this is a list equality, so order of entries in 'runtimes' matters.
     # This is not perfect, but call() objects are not hashable so it's simpler this way,
     # even if not perfect.
     assert statsd_report.mock_calls == [
         call(
             "operation_duration_seconds",
             "ms",
             value * 1000,
             {"operation": key, **expected_tags.get(key, {})},
             1,
         )
         for (key, value) in runtimes.items()
     ]
     assert loader.statsd.namespace == "swh_loader"
     assert loader.statsd.constant_tags == {"visit_type": "my-visit-type"}
 
 
 class DummyDVCSLoaderExc(DummyDVCSLoader):
     """A loader which raises an exception when loading some contents"""
 
     def get_contents(self):
         raise RuntimeError("Failed to get contents!")
 
 
 def test_dvcs_loader_exc_partial_visit(swh_storage, caplog):
     logger_name = "dvcsloaderexc"
     caplog.set_level(logging.ERROR, logger=logger_name)
 
     loader = DummyDVCSLoaderExc(swh_storage, logging_class=logger_name)
     # fake the loading ending up in a snapshot
     loader.loaded_snapshot_id = hash_to_bytes(
         "9e4dd2b40d1b46b70917c0949aa2195c823a648e"
     )
     result = loader.load()
 
     # loading failed
     assert result == {"status": "failed"}
 
     # still resulted in a partial visit with a snapshot (somehow)
     _check_load_failure(
         caplog,
         loader,
         RuntimeError,
         "Failed to get contents!",
     )
 
 
 class BrokenStorageProxy:
     def __init__(self, storage):
         self.storage = storage
 
     def __getattr__(self, attr):
         return getattr(self.storage, attr)
 
     def snapshot_add(self, snapshots):
         raise RuntimeError("Failed to add snapshot!")
 
 
 class DummyDVCSLoaderStorageExc(DummyDVCSLoader):
     """A loader which raises an exception when loading some contents"""
 
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
         self.storage = BrokenStorageProxy(self.storage)
 
 
 def test_dvcs_loader_storage_exc_failed_visit(swh_storage, caplog):
     logger_name = "dvcsloaderexc"
     caplog.set_level(logging.ERROR, logger=logger_name)
 
     loader = DummyDVCSLoaderStorageExc(swh_storage, logging_class=logger_name)
     result = loader.load()
 
     assert result == {"status": "failed"}
 
     _check_load_failure(
         caplog, loader, RuntimeError, "Failed to add snapshot!", status="failed"
     )
 
 
 class DummyDVCSLoaderNotFound(DummyDVCSLoader, BaseLoader):
     """A loader which raises a not_found exception during the prepare method call"""
 
     def prepare(*args, **kwargs):
         raise NotFound("Unknown origin!")
 
     def load_status(self):
         return {
             "status": "uneventful",
         }
 
 
 def test_loader_not_found(swh_storage, caplog):
     loader = DummyDVCSLoaderNotFound(swh_storage)
     result = loader.load()
 
     assert result == {"status": "uneventful"}
 
     _check_load_failure(caplog, loader, NotFound, "Unknown origin!", status="not_found")
 
 
 class DummyLoaderWithError(DummyBaseLoader):
     def prepare(self, *args, **kwargs):
         raise Exception("error")
 
 
 class DummyDVCSLoaderWithError(DummyDVCSLoader, BaseLoader):
     def prepare(self, *args, **kwargs):
         raise Exception("error")
 
 
 @pytest.mark.parametrize("loader_cls", [DummyLoaderWithError, DummyDVCSLoaderWithError])
 def test_loader_sentry_tags_on_error(swh_storage, sentry_events, loader_cls):
     loader = loader_cls(swh_storage)
     loader.load()
     sentry_tags = sentry_events[0]["tags"]
     assert sentry_tags.get(SENTRY_ORIGIN_URL_TAG_NAME) == ORIGIN.url
     assert sentry_tags.get(SENTRY_VISIT_TYPE_TAG_NAME) == DummyLoader.visit_type
 
 
 CONTENT_MIRROR = "https://common-lisp.net"
 CONTENT_URL = f"{CONTENT_MIRROR}/project/asdf/archives/asdf-3.3.5.lisp"
 
 
 @pytest.fixture
 def content_path(datadir):
     """Return filepath fetched by ContentLoader test runs."""
     return os.path.join(
         datadir, "https_common-lisp.net", "project_asdf_archives_asdf-3.3.5.lisp"
     )
 
 
-def compute_hashes(
-    filepath: str, cksum_algos: Union[str, List[str]] = "sha256"
-) -> Dict[str, str]:
-    """Compute checksums dict out of a filepath"""
-    checksum_algos = {cksum_algos} if isinstance(cksum_algos, str) else set(cksum_algos)
-    return MultiHash.from_path(filepath, hash_names=checksum_algos).hexdigest()
-
-
 def test_content_loader_missing_field(swh_storage):
     """It should raise if the ContentLoader is missing checksums field"""
     origin = Origin(CONTENT_URL)
     with pytest.raises(TypeError, match="missing"):
         ContentLoader(swh_storage, origin.url)
 
 
+@pytest.mark.parametrize("loader_class", [ContentLoader, DirectoryLoader])
+def test_node_loader_missing_field(swh_storage, loader_class):
+    """It should raise if the ContentLoader is missing checksums field"""
+    with pytest.raises(UnsupportedChecksumComputation):
+        loader_class(
+            swh_storage,
+            CONTENT_URL,
+            checksums={"sha256": "irrelevant-for-that-test"},
+            checksums_computation="unsupported",
+        )
+
+
 def test_content_loader_404(caplog, swh_storage, requests_mock_datadir, content_path):
     """It should not ingest origin when there is no file to be found (no mirror url)"""
     unknown_origin = Origin(f"{CONTENT_MIRROR}/project/asdf/archives/unknown.lisp")
     loader = ContentLoader(
         swh_storage,
         unknown_origin.url,
         checksums=compute_hashes(content_path),
     )
     result = loader.load()
 
     assert result == {"status": "uneventful"}
 
     _check_load_failure(
         caplog,
         loader,
         NotFound,
         "Unknown origin",
         status="not_found",
         origin=unknown_origin,
     )
 
 
 def test_content_loader_404_with_fallback(
     caplog, swh_storage, requests_mock_datadir, content_path
 ):
     """It should not ingest origin when there is no file to be found"""
     unknown_origin = Origin(f"{CONTENT_MIRROR}/project/asdf/archives/unknown.lisp")
     fallback_url_ko = f"{CONTENT_MIRROR}/project/asdf/archives/unknown2.lisp"
     loader = ContentLoader(
         swh_storage,
         unknown_origin.url,
         fallback_urls=[fallback_url_ko],
         checksums=compute_hashes(content_path),
     )
     result = loader.load()
 
     assert result == {"status": "uneventful"}
 
     _check_load_failure(
         caplog,
         loader,
         NotFound,
         "Unknown origin",
         status="not_found",
         origin=unknown_origin,
     )
 
 
 @pytest.mark.parametrize("checksum_algo", ["sha1", "sha256", "sha512"])
 def test_content_loader_ok_with_fallback(
     checksum_algo,
     caplog,
     swh_storage,
     requests_mock_datadir,
     content_path,
 ):
     """It should be an eventful visit even when ingesting through mirror url"""
     dead_origin = Origin(f"{CONTENT_MIRROR}/dead-origin-url")
     fallback_url_ok = CONTENT_URL
     fallback_url_ko = f"{CONTENT_MIRROR}/project/asdf/archives/unknown2.lisp"
 
     loader = ContentLoader(
         swh_storage,
         dead_origin.url,
         fallback_urls=[fallback_url_ok, fallback_url_ko],
         checksums=compute_hashes(content_path, checksum_algo),
     )
     result = loader.load()
 
     assert result == {"status": "eventful"}
 
 
 def test_content_loader_ok_simple(swh_storage, requests_mock_datadir, content_path):
     """It should be an eventful visit on a new file, then uneventful"""
     origin = Origin(CONTENT_URL)
     loader = ContentLoader(
         swh_storage,
         origin.url,
         checksums=compute_hashes(content_path, ["sha1", "sha256", "sha512"]),
     )
     result = loader.load()
 
     assert result == {"status": "eventful"}
 
     visit_status = assert_last_visit_matches(
         swh_storage, origin.url, status="full", type="content"
     )
     assert visit_status.snapshot is not None
 
     result2 = loader.load()
 
     assert result2 == {"status": "uneventful"}
 
 
 DIRECTORY_MIRROR = "https://example.org"
 DIRECTORY_URL = f"{DIRECTORY_MIRROR}/archives/dummy-hello.tar.gz"
 
 
-@pytest.fixture
-def tarball_path(datadir):
-    """Return tarball filepath fetched by DirectoryLoader test runs."""
-    return os.path.join(datadir, "https_example.org", "archives_dummy-hello.tar.gz")
-
-
 def test_directory_loader_missing_field(swh_storage):
     """It should raise if the DirectoryLoader is missing checksums field"""
     origin = Origin(DIRECTORY_URL)
     with pytest.raises(TypeError, match="missing"):
         DirectoryLoader(swh_storage, origin.url)
 
 
 def test_directory_loader_404(caplog, swh_storage, requests_mock_datadir, tarball_path):
     """It should not ingest origin when there is no tarball to be found (no mirrors)"""
     unknown_origin = Origin(f"{DIRECTORY_MIRROR}/archives/unknown.tar.gz")
     loader = DirectoryLoader(
         swh_storage,
         unknown_origin.url,
         checksums=compute_hashes(tarball_path),
     )
     result = loader.load()
 
     assert result == {"status": "uneventful"}
 
     _check_load_failure(
         caplog,
         loader,
         NotFound,
         "Unknown origin",
         status="not_found",
         origin=unknown_origin,
     )
 
 
 def test_directory_loader_404_with_fallback(
     caplog, swh_storage, requests_mock_datadir, tarball_path
 ):
     """It should not ingest origin when there is no tarball to be found"""
     unknown_origin = Origin(f"{DIRECTORY_MIRROR}/archives/unknown.tbz2")
     fallback_url_ko = f"{DIRECTORY_MIRROR}/archives/elsewhere-unknown2.tbz2"
     loader = DirectoryLoader(
         swh_storage,
         unknown_origin.url,
         fallback_urls=[fallback_url_ko],
         checksums=compute_hashes(tarball_path),
     )
     result = loader.load()
 
     assert result == {"status": "uneventful"}
 
     _check_load_failure(
         caplog,
         loader,
         NotFound,
         "Unknown origin",
         status="not_found",
         origin=unknown_origin,
     )
 
 
 def test_directory_loader_404_with_integrity_check_failure(
-    caplog, swh_storage, requests_mock_datadir, tarball_path
+    caplog, swh_storage, requests_mock_datadir, tarball_with_std_hashes
 ):
     """It should not ingest tarball with mismatched checksum"""
+    tarball_path, checksums = tarball_with_std_hashes
+
     origin = Origin(DIRECTORY_URL)
     erratic_checksums = {
         algo: chksum.replace("a", "e")  # alter checksums to fail integrity check
-        for algo, chksum in compute_hashes(tarball_path).items()
+        for algo, chksum in checksums.items()
     }
 
     loader = DirectoryLoader(
         swh_storage,
         origin.url,
         checksums=erratic_checksums,  # making the integrity check fail
     )
     result = loader.load()
 
     assert result == {"status": "uneventful"}
 
     _check_load_failure(
         caplog,
         loader,
         NotFound,
         "Unknown origin",
         status="not_found",
         origin=origin,
     )
 
 
 @pytest.mark.parametrize("checksum_algo", ["sha1", "sha256", "sha512"])
 def test_directory_loader_ok_with_fallback(
-    caplog, swh_storage, requests_mock_datadir, tarball_path, checksum_algo
+    caplog, swh_storage, requests_mock_datadir, tarball_with_std_hashes, checksum_algo
 ):
     """It should be an eventful visit even when ingesting through mirror url"""
+    tarball_path, checksums = tarball_with_std_hashes
+
     dead_origin = Origin(f"{DIRECTORY_MIRROR}/dead-origin-url")
     fallback_url_ok = DIRECTORY_URL
     fallback_url_ko = f"{DIRECTORY_MIRROR}/archives/unknown2.tgz"
 
     loader = DirectoryLoader(
         swh_storage,
         dead_origin.url,
         fallback_urls=[fallback_url_ok, fallback_url_ko],
-        checksums=compute_hashes(tarball_path, checksum_algo),
+        checksums={checksum_algo: checksums[checksum_algo]},
     )
     result = loader.load()
 
     assert result == {"status": "eventful"}
 
 
-def test_directory_loader_ok_simple(swh_storage, requests_mock_datadir, tarball_path):
+def test_directory_loader_ok_simple(
+    swh_storage, requests_mock_datadir, tarball_with_std_hashes
+):
     """It should be an eventful visit on a new tarball, then uneventful"""
     origin = Origin(DIRECTORY_URL)
+    tarball_path, checksums = tarball_with_std_hashes
+    loader = DirectoryLoader(
+        swh_storage,
+        origin.url,
+        checksums=checksums,
+    )
+    result = loader.load()
+
+    assert result == {"status": "eventful"}
+
+    visit_status = assert_last_visit_matches(
+        swh_storage, origin.url, status="full", type="directory"
+    )
+    assert visit_status.snapshot is not None
+
+    result2 = loader.load()
+
+    assert result2 == {"status": "uneventful"}
+
+
+@pytest.mark.skipif(nix_store_missing, reason="requires nix-bin installed (bullseye)")
+def test_directory_loader_ok_with_nar(
+    swh_storage, requests_mock_datadir, tarball_with_nar_hashes
+):
+    """It should be an eventful visit on a tarball with nar hashes, then uneventful"""
+    tarball_path, nar_checksums = tarball_with_nar_hashes
+    origin = Origin(DIRECTORY_URL)
+
     loader = DirectoryLoader(
         swh_storage,
         origin.url,
-        checksums=compute_hashes(tarball_path, ["sha1", "sha256", "sha512"]),
+        checksums=nar_checksums,
+        checksums_computation="nar",
     )
     result = loader.load()
 
     assert result == {"status": "eventful"}
 
     visit_status = assert_last_visit_matches(
         swh_storage, origin.url, status="full", type="directory"
     )
     assert visit_status.snapshot is not None
 
     result2 = loader.load()
 
     assert result2 == {"status": "uneventful"}
diff --git a/swh/loader/core/tests/test_utils.py b/swh/loader/core/tests/test_utils.py
index 28d6c21..e2c9979 100644
--- a/swh/loader/core/tests/test_utils.py
+++ b/swh/loader/core/tests/test_utils.py
@@ -1,187 +1,218 @@
-# Copyright (C) 2019  The Software Heritage developers
+# Copyright (C) 2019-2022  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from datetime import datetime
 import os
+from pathlib import Path
 import signal
+import tempfile
 from time import sleep
 from unittest.mock import patch
 
 import pytest
 
+from swh.core.tarball import uncompress
 from swh.loader.core.utils import (
     CloneFailure,
     CloneTimeout,
     clean_dangling_folders,
     clone_with_timeout,
+    nix_hashes,
     parse_visit_date,
 )
+from swh.loader.exception import MissingOptionalDependency
+
+from .conftest import nix_store_missing
 
 
 def prepare_arborescence_from(tmpdir, folder_names):
     """Prepare arborescence tree with folders
 
     Args:
         tmpdir (Either[LocalPath, str]): Root temporary directory
         folder_names (List[str]): List of folder names
 
     Returns:
         List of folders
     """
     dangling_folders = []
     for dname in folder_names:
         d = str(tmpdir / dname)
         os.mkdir(d)
         dangling_folders.append(d)
     return str(tmpdir), dangling_folders
 
 
 def assert_dirs(actual_dirs, expected_dirs):
     """Assert that the directory actual and expected match"""
     for d in actual_dirs:
         assert d in expected_dirs
     assert len(actual_dirs) == len(expected_dirs)
 
 
 def test_clean_dangling_folders_0(tmpdir):
     """Folder does not exist, do nothing"""
     r = clean_dangling_folders("/path/does/not/exist", "unused-pattern")
     assert r is None
 
 
 @patch("swh.loader.core.utils.psutil.pid_exists", return_value=False)
 def test_clean_dangling_folders_1(mock_pid_exists, tmpdir):
     """Folder which matches pattern with dead pid are cleaned up"""
     rootpath, dangling = prepare_arborescence_from(
         tmpdir,
         [
             "something",
             "swh.loader.svn-4321.noisynoise",
         ],
     )
 
     clean_dangling_folders(rootpath, "swh.loader.svn")
 
     actual_dirs = os.listdir(rootpath)
     mock_pid_exists.assert_called_once_with(4321)
     assert_dirs(actual_dirs, ["something"])
 
 
 @patch("swh.loader.core.utils.psutil.pid_exists", return_value=True)
 def test_clean_dangling_folders_2(mock_pid_exists, tmpdir):
     """Folder which matches pattern with live pid are skipped"""
     rootpath, dangling = prepare_arborescence_from(
         tmpdir,
         [
             "something",
             "swh.loader.hg-1234.noisynoise",
         ],
     )
 
     clean_dangling_folders(rootpath, "swh.loader.hg")
 
     actual_dirs = os.listdir(rootpath)
     mock_pid_exists.assert_called_once_with(1234)
     assert_dirs(
         actual_dirs,
         [
             "something",
             "swh.loader.hg-1234.noisynoise",
         ],
     )
 
 
 @patch("swh.loader.core.utils.psutil.pid_exists", return_value=False)
 @patch(
     "swh.loader.core.utils.shutil.rmtree",
     side_effect=ValueError("Could not remove for reasons"),
 )
 def test_clean_dangling_folders_3(mock_rmtree, mock_pid_exists, tmpdir):
     """Error in trying to clean dangling folders are skipped"""
     path1 = "thingy"
     path2 = "swh.loader.git-1468.noisy"
     rootpath, dangling = prepare_arborescence_from(
         tmpdir,
         [
             path1,
             path2,
         ],
     )
 
     clean_dangling_folders(rootpath, "swh.loader.git")
 
     actual_dirs = os.listdir(rootpath)
     mock_pid_exists.assert_called_once_with(1468)
     mock_rmtree.assert_called_once_with(os.path.join(rootpath, path2))
     assert_dirs(actual_dirs, [path2, path1])
 
 
 def test_clone_with_timeout_no_error_no_timeout():
     def succeed():
         """This does nothing to simulate a successful clone"""
 
     clone_with_timeout("foo", "bar", succeed, timeout=0.5)
 
 
 def test_clone_with_timeout_no_error_timeout():
     def slow():
         """This lasts for more than the timeout"""
         sleep(1)
 
     with pytest.raises(CloneTimeout):
         clone_with_timeout("foo", "bar", slow, timeout=0.5)
 
 
 def test_clone_with_timeout_error():
     def raise_something():
         raise RuntimeError("panic!")
 
     with pytest.raises(CloneFailure):
         clone_with_timeout("foo", "bar", raise_something, timeout=0.5)
 
 
 def test_clone_with_timeout_sigkill():
     """This also tests that the traceback is useful"""
     src = "https://www.mercurial-scm.org/repo/hello"
     dest = "/dev/null"
     timeout = 0.5
     sleepy_time = 100 * timeout
     assert sleepy_time > timeout
 
     def ignores_sigterm(*args, **kwargs):
         # ignore SIGTERM to force sigkill
         signal.signal(signal.SIGTERM, lambda signum, frame: None)
         sleep(sleepy_time)  # we make sure we exceed the timeout
 
     with pytest.raises(CloneTimeout) as e:
         clone_with_timeout(src, dest, ignores_sigterm, timeout)
     killed = True
     assert e.value.args == (src, timeout, killed)
 
 
 VISIT_DATE_STR = "2021-02-17 15:50:04.518963"
 VISIT_DATE = datetime(2021, 2, 17, 15, 50, 4, 518963)
 
 
 @pytest.mark.parametrize(
     "input_visit_date,expected_date",
     [
         (None, None),
         (VISIT_DATE, VISIT_DATE),
         (VISIT_DATE_STR, VISIT_DATE),
     ],
 )
 def test_utils_parse_visit_date(input_visit_date, expected_date):
     assert parse_visit_date(input_visit_date) == expected_date
 
 
 def test_utils_parse_visit_date_now():
     actual_date = parse_visit_date("now")
     assert isinstance(actual_date, datetime)
 
 
 def test_utils_parse_visit_date_fails():
     with pytest.raises(ValueError, match="invalid"):
         parse_visit_date(10)  # not a string nor a date
+
+
+@patch(
+    "swh.loader.core.utils.shutil.which",
+    return_value=None,
+)
+def test_nix_hashes_missing_nix_store(mock_which):
+    with pytest.raises(MissingOptionalDependency, match="nix-store"):
+        nix_hashes("some-irrelevant-filepath", ["sha1"])
+
+
+@pytest.mark.skipif(nix_store_missing, reason="requires nix-bin installed (bullseye)")
+def test_nix_hashes_compute(tarball_with_nar_hashes):
+    tarball_path, nar_checksums = tarball_with_nar_hashes
+
+    with tempfile.TemporaryDirectory() as tmpdir:
+        directory_path = Path(tmpdir) / "src"
+        directory_path.mkdir(parents=True, exist_ok=True)
+        uncompress(tarball_path, dest=str(directory_path))
+        directory = next(directory_path.iterdir())
+
+        actual_multihash = nix_hashes(directory, nar_checksums.keys())
+
+        assert actual_multihash.hexdigest() == nar_checksums
diff --git a/swh/loader/core/utils.py b/swh/loader/core/utils.py
index 0e9b388..77e8b7c 100644
--- a/swh/loader/core/utils.py
+++ b/swh/loader/core/utils.py
@@ -1,127 +1,154 @@
 # Copyright (C) 2018-2022  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 
 from datetime import datetime, timezone
 import io
 import os
+from pathlib import Path
 import shutil
 import signal
+from subprocess import PIPE, Popen
 import time
 import traceback
-from typing import Callable, Optional, Union
+from typing import Callable, Iterable, Optional, Union
 
 from billiard import Process, Queue  # type: ignore
 from dateutil.parser import parse
 import psutil
 
+from swh.loader.exception import MissingOptionalDependency
+from swh.model.hashutil import MultiHash
+
 
 def clean_dangling_folders(dirpath: str, pattern_check: str, log=None) -> None:
     """Clean up potential dangling temporary working folder rooted at `dirpath`. Those
        folders must match a dedicated pattern and not belonging to a live pid.
 
     Args:
         dirpath: Path to check for dangling files
         pattern_check: A dedicated pattern to check on first level directory (e.g
             `swh.loader.mercurial.`, `swh.loader.svn.`)
         log (Logger): Optional logger
 
     """
     if not os.path.exists(dirpath):
         return
     for filename in os.listdir(dirpath):
         path_to_cleanup = os.path.join(dirpath, filename)
         try:
             # pattern: `swh.loader.{loader-type}-pid.{noise}`
             if (
                 pattern_check not in filename or "-" not in filename
             ):  # silently ignore unknown patterns
                 continue
             _, pid_ = filename.split("-")
             pid = int(pid_.split(".")[0])
             if psutil.pid_exists(pid):
                 if log:
                     log.debug("PID %s is live, skipping", pid)
                 continue
             # could be removed concurrently, so check before removal
             if os.path.exists(path_to_cleanup):
                 shutil.rmtree(path_to_cleanup)
         except Exception as e:
             if log:
                 log.warn("Fail to clean dangling path %s: %s", path_to_cleanup, e)
 
 
 class CloneTimeout(Exception):
     pass
 
 
 class CloneFailure(Exception):
     pass
 
 
 def _clone_task(clone_func: Callable[[], None], errors: Queue) -> None:
     try:
         clone_func()
     except Exception as e:
         exc_buffer = io.StringIO()
         traceback.print_exc(file=exc_buffer)
         errors.put_nowait(exc_buffer.getvalue())
         raise e
 
 
 def clone_with_timeout(
     src: str, dest: str, clone_func: Callable[[], None], timeout: float
 ) -> None:
     """Clone a repository with timeout.
 
     Args:
         src: clone source
         dest: clone destination
         clone_func: callable that does the actual cloning
         timeout: timeout in seconds
     """
     errors: Queue = Queue()
     process = Process(target=_clone_task, args=(clone_func, errors))
     process.start()
     process.join(timeout)
 
     if process.is_alive():
         process.terminate()
         # Give it literally a second (in successive steps of 0.1 second),
         # then kill it.
         # Can't use `process.join(1)` here, billiard appears to be bugged
         # https://github.com/celery/billiard/issues/270
         killed = False
         for _ in range(10):
             time.sleep(0.1)
             if not process.is_alive():
                 break
         else:
             killed = True
             os.kill(process.pid, signal.SIGKILL)
         raise CloneTimeout(src, timeout, killed)
 
     if not errors.empty():
         raise CloneFailure(src, dest, errors.get())
 
 
 def parse_visit_date(visit_date: Optional[Union[datetime, str]]) -> Optional[datetime]:
     """Convert visit date from either None, a string or a datetime to either None or
     datetime.
 
     """
     if visit_date is None:
         return None
 
     if isinstance(visit_date, datetime):
         return visit_date
 
     if visit_date == "now":
         return datetime.now(tz=timezone.utc)
 
     if isinstance(visit_date, str):
         return parse(visit_date)
 
     raise ValueError(f"invalid visit date {visit_date!r}")
+
+
+def nix_hashes(filepath: Path, hash_names: Iterable[str]) -> MultiHash:
+    """Compute nix-store hashes on filepath.
+
+    Raises:
+        FileNotFoundError in case the nix-store command is not available on the system.
+
+    """
+    NIX_STORE = shutil.which("nix-store")
+    if NIX_STORE is None:
+        raise MissingOptionalDependency("nix-store")
+
+    multi_hash = MultiHash(hash_names=hash_names)
+
+    command = [NIX_STORE, "--dump", str(filepath)]
+    with Popen(command, stdout=PIPE) as proc:
+        assert proc.stdout is not None
+        for chunk in proc.stdout:
+            multi_hash.update(chunk)
+
+    return multi_hash
diff --git a/swh/loader/exception.py b/swh/loader/exception.py
index 6a77fc9..3fd396d 100644
--- a/swh/loader/exception.py
+++ b/swh/loader/exception.py
@@ -1,13 +1,25 @@
-# Copyright (C) 2021  The Software Heritage developers
+# Copyright (C) 2021-2022  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 
 class NotFound(ValueError):
     """An exception raised when some information to retrieve is not found (e.g origin,
     artifact, ...)
 
     """
 
     pass
+
+
+class MissingOptionalDependency(ValueError):
+    """An exception raised when an optional runtime dependency is missing."""
+
+    pass
+
+
+class UnsupportedChecksumComputation(ValueError):
+    """An exception raised when loader cannot compute such checksums."""
+
+    pass