diff --git a/swh/loader/mercurial/from_disk.py b/swh/loader/mercurial/from_disk.py --- a/swh/loader/mercurial/from_disk.py +++ b/swh/loader/mercurial/from_disk.py @@ -8,7 +8,7 @@ import os from shutil import rmtree from tempfile import mkdtemp -from typing import Deque, Dict, List, Optional, Set, Tuple, TypeVar, Union +from typing import Deque, Dict, Iterator, List, Optional, Set, Tuple, TypeVar, Union from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders @@ -36,7 +36,7 @@ from . import hgutil from .archive_extract import tmp_extract -from .hgutil import HgFilteredSet, HgNodeId, HgSpanSet +from .hgutil import HgNodeId FLAG_PERMS = { b"l": DentryPerms.symlink, @@ -289,6 +289,28 @@ ] return extids + def _get_extids_for_hgnodes(self, hgnode_ids: List[bytes]) -> List[ExtID]: + """Get all Mercurial ExtIDs for the mercurial nodes in the list which point to + a known revision. + + """ + extids = [ + extid + for extid in self.storage.extid_get_from_extid(EXTID_TYPE, hgnode_ids) + if extid.extid_version == EXTID_VERSION + ] + if extids: + # Filter out dangling extids, we need to load their target again + revisions_missing = self.storage.revision_missing( + [extid.target.object_id for extid in extids] + ) + extids = [ + extid + for extid in extids + if extid.target.object_id not in revisions_missing + ] + return extids + def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading @@ -317,35 +339,55 @@ return False - def get_hg_revs_to_load(self) -> Union[HgFilteredSet, HgSpanSet]: - """Return the hg revision numbers to load.""" + def _new_revs(self, heads: List[bytes]): + """Return unseen revisions. That is, filter out revisions that are not ancestors of + heads""" + assert self._repo is not None + existing_heads = [] + + for hg_nodeid in heads: + try: + rev = self._repo[hg_nodeid].rev() + existing_heads.append(rev) + except KeyError: # the node does not exist anymore + pass + + # select revisions that are not ancestors of heads + # and not the heads themselves + new_revs = self._repo.revs("not ::(%ld)", existing_heads) + + if new_revs: + self.log.info("New revisions found: %d", len(new_revs)) + + return new_revs + + def get_hg_revs_to_load(self) -> Iterator[int]: + """Yield hg revision numbers to load. + + """ assert self._repo is not None repo: hgutil.Repository = self._repo + + seen_revs: Set[int] = set() + # 1. use snapshot to reuse existing seen heads from it if self._latest_heads: - existing_heads = [] # heads that still exist in the repository - for hg_nodeid in self._latest_heads: - try: - rev = repo[hg_nodeid].rev() - existing_heads.append(rev) - except KeyError: # the node does not exist anymore - pass - - # select revisions that are not ancestors of heads - # and not the heads themselves - new_revs = repo.revs("not ::(%ld)", existing_heads) - - if new_revs: - self.log.info("New revisions found: %d", len(new_revs)) - return new_revs - else: - return repo.revs("all()") + for rev in self._new_revs(self._latest_heads): + seen_revs.add(rev) + yield rev + + # 2. Then filter out remaining revisions through the overall extid mappings + # across hg origins + revs_left = repo.revs("all() - ::(%ld)", seen_revs) + hg_nodeids = [repo[nodeid].node() for nodeid in revs_left] + for rev in self._new_revs( + [extid.extid for extid in self._get_extids_for_hgnodes(hg_nodeids)] + ): + yield rev def store_data(self): """Store fetched data in the database.""" revs = self.get_hg_revs_to_load() - if not revs: - self._load_status = "uneventful" - return + length_ingested_revs = 0 assert self._repo is not None repo = self._repo @@ -356,14 +398,15 @@ continue try: self.store_revision(repo[rev]) + length_ingested_revs += 1 except CorruptedRevision as e: self._visit_status = "partial" self.log.warning("Corrupted revision %s", e) descendents = repo.revs("(%ld)::", [rev]) ignored_revs.update(descendents) - if len(ignored_revs) == len(revs): - # The repository is completely broken, nothing can be loaded + if length_ingested_revs == 0: + # The repository has nothing to ingest (either empty or broken repository) self._load_status = "uneventful" return