diff --git a/swh/loader/mercurial/from_disk.py b/swh/loader/mercurial/from_disk.py --- a/swh/loader/mercurial/from_disk.py +++ b/swh/loader/mercurial/from_disk.py @@ -8,7 +8,7 @@ import os from shutil import rmtree from tempfile import mkdtemp -from typing import Deque, Dict, List, Optional, Set, Tuple, TypeVar, Union +from typing import Deque, Dict, Iterator, List, Optional, Set, Tuple, TypeVar, Union from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders @@ -36,7 +36,7 @@ from . import hgutil from .archive_extract import tmp_extract -from .hgutil import HgFilteredSet, HgNodeId, HgSpanSet +from .hgutil import HgNodeId FLAG_PERMS = { b"l": DentryPerms.symlink, @@ -289,6 +289,25 @@ ] return extids + def _get_extids_for_hgnodes(self, hgnode_ids: List[bytes]) -> List[ExtID]: + """Get all Mercurial ExtIDs for the mercurial nodes in the list.""" + extids = [ + extid + for extid in self.storage.extid_get_from_extid(EXTID_TYPE, hgnode_ids) + if extid.extid_version == EXTID_VERSION + ] + if extids: + # Filter out dangling extids, we need to load their target again + revisions_missing = self.storage.revision_missing( + [extid.target.object_id for extid in extids] + ) + extids = [ + extid + for extid in extids + if extid.target.object_id not in revisions_missing + ] + return extids + def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading @@ -317,35 +336,55 @@ return False - def get_hg_revs_to_load(self) -> Union[HgFilteredSet, HgSpanSet]: - """Return the hg revision numbers to load.""" + def _new_revs(self, heads: List[bytes]): + """Return unseen revisions. That is, filter out revisions that are not ancestors of + heads""" + assert self._repo is not None + existing_heads = [] + + for hg_nodeid in heads: + try: + rev = self._repo[hg_nodeid].rev() + existing_heads.append(rev) + except KeyError: # the node does not exist anymore + pass + + # select revisions that are not ancestors of heads + # and not the heads themselves + new_revs = self._repo.revs("not ::(%ld)", existing_heads) + + if new_revs: + self.log.info("New revisions found: %d", len(new_revs)) + + return new_revs + + def get_hg_revs_to_load(self) -> Iterator[int]: + """Yield hg revision numbers to load. + + """ assert self._repo is not None repo: hgutil.Repository = self._repo + + seen_revs: Set[int] = set() + # 1. use snapshot to reuse existing seen heads from it if self._latest_heads: - existing_heads = [] # heads that still exist in the repository - for hg_nodeid in self._latest_heads: - try: - rev = repo[hg_nodeid].rev() - existing_heads.append(rev) - except KeyError: # the node does not exist anymore - pass - - # select revisions that are not ancestors of heads - # and not the heads themselves - new_revs = repo.revs("not ::(%ld)", existing_heads) - - if new_revs: - self.log.info("New revisions found: %d", len(new_revs)) - return new_revs - else: - return repo.revs("all()") + for rev in self._new_revs(self._latest_heads): + seen_revs.add(rev) + yield rev + + # 2. Then filter out remaining revisions through the overall extid mappings + # across hg origins + revs_left = repo.revs("all() - ::(%ld)", seen_revs) + hg_nodeids = [repo[nodeid].node() for nodeid in revs_left] + for rev in self._new_revs( + [extid.extid for extid in self._get_extids_for_hgnodes(hg_nodeids)] + ): + yield rev def store_data(self): """Store fetched data in the database.""" revs = self.get_hg_revs_to_load() - if not revs: - self._load_status = "uneventful" - return + length_revs = 0 assert self._repo is not None repo = self._repo @@ -356,13 +395,19 @@ continue try: self.store_revision(repo[rev]) + length_revs += 1 except CorruptedRevision as e: self._visit_status = "partial" self.log.warning("Corrupted revision %s", e) descendents = repo.revs("(%ld)::", [rev]) ignored_revs.update(descendents) - if len(ignored_revs) == len(revs): + if length_revs == 0: + # The repository has nothing to ingest + self._load_status = "uneventful" + return + + if len(ignored_revs) == length_revs: # The repository is completely broken, nothing can be loaded self._load_status = "uneventful" return diff --git a/swh/loader/mercurial/tests/test_from_disk.py b/swh/loader/mercurial/tests/test_from_disk.py --- a/swh/loader/mercurial/tests/test_from_disk.py +++ b/swh/loader/mercurial/tests/test_from_disk.py @@ -164,7 +164,7 @@ check_snapshot(expected_snapshot, loader.storage) stats = get_stats(loader.storage) - assert stats == { + expected_stats = { "content": 2, "directory": 3, "origin": 1, @@ -175,6 +175,20 @@ "snapshot": 1, } + assert stats == expected_stats + loader2 = HgLoaderFromDisk(swh_storage, url=repo_url) + + assert loader2.load() == {"status": "uneventful"} + + stats2 = get_stats(loader2.storage) + expected_stats2 = expected_stats.copy() + expected_stats2.update( + {"origin_visit": 1 + 1,} + ) + assert stats2 == expected_stats2 + # FIXME: Already seen objects are filtered out, so no new snapshot. + # Current behavior but is it ok? + # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. @@ -670,3 +684,61 @@ loader = HgLoaderFromDisk(swh_storage, repo_path) assert loader.load() == {"status": "uneventful"} + + +def test_loader_hg_extid_filtering(swh_storage, datadir, tmp_path): + """The first visit of a fork should filter already seen revisions (through extids) + + """ + archive_name = "the-sandbox" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + + loader = HgLoaderFromDisk(swh_storage, url=repo_url) + + assert loader.load() == {"status": "eventful"} + stats = get_stats(loader.storage) + expected_stats = { + "content": 2, + "directory": 3, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 58, + "skipped_content": 0, + "snapshot": 1, + } + assert stats == expected_stats + + visit_status = assert_last_visit_matches( + loader.storage, repo_url, status="full", type="hg", + ) + + # Make a fork of the first repository we ingested + fork_url = prepare_repository_from_archive( + archive_path, "the-sandbox-reloaded", tmp_path + ) + loader2 = HgLoaderFromDisk( + swh_storage, url=fork_url, directory=str(tmp_path / archive_name) + ) + + assert loader2.load() == {"status": "uneventful"} + + stats = get_stats(loader.storage) + expected_stats2 = expected_stats.copy() + expected_stats2.update( + {"origin": 1 + 1, "origin_visit": 1 + 1,} + ) + assert stats == expected_stats2 + + visit_status2 = assert_last_visit_matches( + loader.storage, fork_url, status="full", type="hg", + ) + + assert visit_status.snapshot is not None + assert visit_status2.snapshot is None + + # FIXME: Consistent behavior with filtering data out from already seen snapshot (on + # a given origin). But, the other fork origin has no snapshot at all. We should + # though, shouldn't we? Otherwise, that would mean fork could end up with no + # snapshot at all.