diff --git a/swh/loader/mercurial/from_disk.py b/swh/loader/mercurial/from_disk.py --- a/swh/loader/mercurial/from_disk.py +++ b/swh/loader/mercurial/from_disk.py @@ -289,6 +289,25 @@ ] return extids + def _get_extids_for_hgnodes(self, hgnode_ids: List[bytes]) -> List[ExtID]: + """Get all Mercurial ExtIDs for the mercurial nodes in the list.""" + extids = [ + extid + for extid in self.storage.extid_get_from_extid(EXTID_TYPE, hgnode_ids) + if extid.extid_version == EXTID_VERSION + ] + if extids: + # Filter out dangling extids, we need to load their target again + revisions_missing = self.storage.revision_missing( + [extid.target.object_id for extid in extids] + ) + extids = [ + extid + for extid in extids + if extid.target.object_id not in revisions_missing + ] + return extids + def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading @@ -317,28 +336,45 @@ return False + def _new_revs(self, heads: List[bytes]): + """Return unseen revisions. That is, filter out revisions that are not ancestors of + heads""" + assert self._repo is not None + existing_heads = [] + + for hg_nodeid in heads: + try: + rev = self._repo[hg_nodeid].rev() + existing_heads.append(rev) + except KeyError: # the node does not exist anymore + pass + + # select revisions that are not ancestors of heads + # and not the heads themselves + new_revs = self._repo.revs("not ::(%ld)", existing_heads) + + if new_revs: + self.log.info("New revisions found: %d", len(new_revs)) + + return new_revs + def get_hg_revs_to_load(self) -> Union[HgFilteredSet, HgSpanSet]: """Return the hg revision numbers to load.""" assert self._repo is not None repo: hgutil.Repository = self._repo + if self._latest_heads: - existing_heads = [] # heads that still exist in the repository - for hg_nodeid in self._latest_heads: - try: - rev = repo[hg_nodeid].rev() - existing_heads.append(rev) - except KeyError: # the node does not exist anymore - pass - - # select revisions that are not ancestors of heads - # and not the heads themselves - new_revs = repo.revs("not ::(%ld)", existing_heads) - - if new_revs: - self.log.info("New revisions found: %d", len(new_revs)) - return new_revs + # Reuse existing seen heads from last snapshot + new_revs = self._new_revs(self._latest_heads) else: - return repo.revs("all()") + # otherwise, try to filter out across origins already seen revisions + all_revs = repo.revs("all()") + hg_nodeids = [repo[nodeid].node() for nodeid in all_revs] + new_revs = self._new_revs( + [extid.extid for extid in self._get_extids_for_hgnodes(hg_nodeids)] + ) + + return new_revs def store_data(self): """Store fetched data in the database.""" @@ -422,6 +458,7 @@ snapshot_branches[b"HEAD"] = SnapshotBranch( target=default_branch_alias, target_type=TargetType.ALIAS, ) + snapshot = Snapshot(branches=snapshot_branches) self.storage.snapshot_add([snapshot]) diff --git a/swh/loader/mercurial/tests/test_from_disk.py b/swh/loader/mercurial/tests/test_from_disk.py --- a/swh/loader/mercurial/tests/test_from_disk.py +++ b/swh/loader/mercurial/tests/test_from_disk.py @@ -164,7 +164,7 @@ check_snapshot(expected_snapshot, loader.storage) stats = get_stats(loader.storage) - assert stats == { + expected_stats = { "content": 2, "directory": 3, "origin": 1, @@ -175,6 +175,19 @@ "snapshot": 1, } + assert stats == expected_stats + loader2 = HgLoaderFromDisk(swh_storage, url=repo_url) + + assert loader2.load() == {"status": "uneventful"} + + stats2 = get_stats(loader2.storage) + expected_stats2 = expected_stats.copy() + expected_stats2.update( + {"origin_visit": 1 + 1,} + ) + assert stats2 == expected_stats2 + # FIXME: Already seen objects are filtered out, so no new snapshot. + # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. @@ -670,3 +683,58 @@ loader = HgLoaderFromDisk(swh_storage, repo_path) assert loader.load() == {"status": "uneventful"} + + +def test_loader_hg_extid_filtering(swh_storage, datadir, tmp_path): + """The first visit of a fork should filter already seen revisions (through extids) + + """ + archive_name = "the-sandbox" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + + loader = HgLoaderFromDisk(swh_storage, url=repo_url) + + assert loader.load() == {"status": "eventful"} + stats = get_stats(loader.storage) + expected_stats = { + "content": 2, + "directory": 3, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 58, + "skipped_content": 0, + "snapshot": 1, + } + assert stats == expected_stats + + visit_status = assert_last_visit_matches( + loader.storage, repo_url, status="full", type="hg", + ) + + # Make a fork of the first repository we ingested + fork_url = prepare_repository_from_archive( + archive_path, "the-sandbox-reloaded", tmp_path + ) + loader2 = HgLoaderFromDisk( + swh_storage, url=fork_url, directory=str(tmp_path / archive_name) + ) + + assert loader2.load() == {"status": "uneventful"} + + stats = get_stats(loader.storage) + expected_stats2 = expected_stats.copy() + expected_stats2.update( + {"origin": 1 + 1, "origin_visit": 1 + 1,} + ) + assert stats == expected_stats2 + + visit_status2 = assert_last_visit_matches( + loader.storage, fork_url, status="full", type="hg", + ) + + assert visit_status.snapshot is not None + # FIXME: It should be the same as the first visit_status.snapshot though, shouldn't + # it? + assert visit_status2.snapshot is None