diff --git a/swh/loader/mercurial/from_disk.py b/swh/loader/mercurial/from_disk.py --- a/swh/loader/mercurial/from_disk.py +++ b/swh/loader/mercurial/from_disk.py @@ -8,7 +8,7 @@ import os from shutil import rmtree from tempfile import mkdtemp -from typing import Deque, Dict, List, Optional, Tuple, TypeVar, Union +from typing import Deque, Dict, List, Optional, Set, Tuple, TypeVar, Union from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders @@ -167,6 +167,10 @@ # hg node id of the latest snapshot branch heads # used to find what are the new revisions since last snapshot self._latest_heads: List[bytes] = [] + # hg node ids of all the tags recorded on previous runs + # Used to compute which tags need to be added, even across incremental runs + # that might separate a changeset introducing a tag from its target. + self._saved_tags: Set[bytes] = set() self._load_status = "eventful" # If set, will override the default value @@ -223,41 +227,51 @@ latest_snapshot = snapshot_get_latest(self.storage, self.origin_url) if latest_snapshot: - self._set_latest_heads(latest_snapshot) + self._set_recorded_state(latest_snapshot) - def _set_latest_heads(self, latest_snapshot: Snapshot) -> None: + def _set_recorded_state(self, latest_snapshot: Snapshot) -> None: """ Looks up the nodeid for all revisions in the snapshot via extid_get_from_target, - and adds them to self._latest_heads. + and adds them to `self._latest_heads`. + Also looks up the currently saved releases ("tags" in Mercurial speak). + The tags are all listed for easy comparison at the end, while only the latest + heads are needed for revisions. """ - # TODO: add support for releases - snapshot_branches = [ - branch.target - for branch in latest_snapshot.branches.values() - if branch.target_type == TargetType.REVISION - ] - - # Get all ExtIDs for revisions in the latest snapshot - extids = self.storage.extid_get_from_target( - identifiers.ObjectType.REVISION, snapshot_branches - ) + snapshot_data: Dict[str, List[bytes]] = { + "heads": [], + "tags": [], + } - # Filter out extids not specific to Mercurial - extids = [extid for extid in extids if extid.extid_type == EXTID_TYPE] + for branch in latest_snapshot.branches.values(): + if branch.target_type == TargetType.REVISION: + snapshot_data["heads"].append(branch.target) + elif branch.target_type == TargetType.RELEASE: + snapshot_data["tags"].append(branch.target) - if extids: - # Filter out dangling extids, we need to load their target again - revisions_missing = self.storage.revision_missing( - [extid.target.object_id for extid in extids] - ) + for target, data in snapshot_data.items(): + # Get all Mercurial ExtIDs for the targets in the latest snapshot extids = [ extid - for extid in extids - if extid.target.object_id not in revisions_missing + for extid in self.storage.extid_get_from_target( + identifiers.ObjectType.REVISION, data + ) + if extid.extid_type == EXTID_TYPE ] - # Add the found nodeids to self.latest_heads - self._latest_heads.extend(extid.extid for extid in extids) + if extids: + # Filter out dangling extids, we need to load their target again + revisions_missing = self.storage.revision_missing( + [extid.target.object_id for extid in extids] + ) + extids = [ + extid + for extid in extids + if extid.target.object_id not in revisions_missing + ] + if target == "heads": + self._latest_heads.extend(extid.extid for extid in extids) + elif target == "tags": + self._saved_tags.update(extid.extid for extid in extids) def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading @@ -304,13 +318,9 @@ # and not the heads themselves new_revs = repo.revs("not ::(%ld)", existing_heads) - # for now, reload all revisions if there are new commits - # otherwise the loader will crash on missing parents - # incremental loading will come in next commits if new_revs: - return repo.revs("all()") - else: - return new_revs + self.log.info(f"New revisions found: {len(new_revs)}") + return new_revs else: return repo.revs("all()") @@ -340,25 +350,28 @@ hg_nodeid: name for name, hg_nodeid in hgutil.branches(repo).items() } tags_by_name: Dict[bytes, HgNodeId] = repo.tags() - tags_by_hg_nodeid: Dict[HgNodeId, bytes] = { - hg_nodeid: name for name, hg_nodeid in tags_by_name.items() - } snapshot_branches: Dict[bytes, SnapshotBranch] = {} + for tag_name, hg_nodeid in tags_by_name.items(): + if tag_name == b"tip": + # tip is listed in the tags by the mercurial api + # but its not a tag defined by the user in `.hgtags` + continue + if hg_nodeid not in self._saved_tags: + try: + revision_sha1git = self.get_revision_id_from_hg_nodeid(hg_nodeid) + except hgutil.error.RevlogError: + self.log.warning("Tag points to unknown revision %s", hg_nodeid) + else: + snapshot_branches[tag_name] = SnapshotBranch( + target=self.store_release(tag_name, revision_sha1git), + target_type=TargetType.RELEASE, + ) + extids = [] for hg_nodeid, revision_sha1git in self._revision_nodeid_to_sha1git.items(): - tag_name = tags_by_hg_nodeid.get(hg_nodeid) - - # tip is listed in the tags by the mercurial api - # but its not a tag defined by the user in `.hgtags` - if tag_name and tag_name != b"tip": - snapshot_branches[tag_name] = SnapshotBranch( - target=self.store_release(tag_name, revision_sha1git), - target_type=TargetType.RELEASE, - ) - if hg_nodeid in branch_by_hg_nodeid: name = branch_by_hg_nodeid[hg_nodeid] snapshot_branches[name] = SnapshotBranch( @@ -417,7 +430,23 @@ Returns: the sha1_git of the revision. """ - return self._revision_nodeid_to_sha1git[hg_nodeid] + + from_cache = self._revision_nodeid_to_sha1git.get(hg_nodeid) + if from_cache is not None: + return from_cache + # The parent was not loaded in this run, get it from storage + from_storage = self.storage.extid_get_from_extid(EXTID_TYPE, ids=[hg_nodeid]) + if not from_storage: + # This can happen with corrupted revisions in otherwise working repositories + # like Pypy. Said revision will point to a non-existing parent. + # We don't raise a `CorruptedRevision` here since this one just does not + # exist, the *child* revision is corrupted. + msg = "Node not found in cache or storage: %r" % hg_nodeid + raise hgutil.error.RevlogError(msg) + + msg = "Multiple matches from storage for hg node %r" % hg_nodeid + assert len(from_storage) == 1, msg + return from_storage[0].target.object_id def get_revision_parents(self, rev_ctx: hgutil.BaseContext) -> Tuple[Sha1Git, ...]: """Return the git sha1 of the parent revisions. @@ -434,7 +463,12 @@ # nullid is the value of a parent that does not exist if parent_hg_nodeid == hgutil.NULLID: continue - parents.append(self.get_revision_id_from_hg_nodeid(parent_hg_nodeid)) + try: + revision_id = self.get_revision_id_from_hg_nodeid(parent_hg_nodeid) + except hgutil.error.RevlogError as e: + # This node points to a non-existing parent, it's corrupted. + raise CorruptedRevision(rev_ctx.node()) from e + parents.append(revision_id) return tuple(parents) diff --git a/swh/loader/mercurial/tests/test_from_disk.py b/swh/loader/mercurial/tests/test_from_disk.py --- a/swh/loader/mercurial/tests/test_from_disk.py +++ b/swh/loader/mercurial/tests/test_from_disk.py @@ -2,10 +2,11 @@ # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - from datetime import datetime from hashlib import sha1 import os +from pathlib import Path +import subprocess import attr import pytest @@ -434,3 +435,50 @@ assert actual_load_status == {"status": "eventful"} assert_last_visit_matches(swh_storage, repo_url, status="partial", type="hg") + + +def hg_strip(repo: str, rev: str) -> None: + subprocess.check_call(["hg", "debugstrip", rev], cwd=repo) + + +def test_load_repo_with_new_commits(swh_storage, datadir, tmp_path): + archive_name = "hello" + archive_path = Path(datadir, f"{archive_name}.tgz") + json_path = Path(datadir, f"{archive_name}.json") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + + # first load with missing commits + hg_strip(repo_url.replace("file://", ""), "tip") + loader = HgLoaderFromDisk(swh_storage, repo_url) + assert loader.load() == {"status": "eventful"} + assert get_stats(loader.storage) == { + "content": 2, + "directory": 2, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 2, + "skipped_content": 0, + "snapshot": 1, + } + + # second load with all commits + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + loader = HgLoaderFromDisk(swh_storage, repo_url) + checker = LoaderChecker( + loader=HgLoaderFromDisk(swh_storage, repo_url), + expected=ExpectedSwhids.load(json_path), + ) + + checker.check() + + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 2, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 2, + }