diff --git a/swh/loader/mercurial/hgutil.py b/swh/loader/mercurial/hgutil.py index 1ed33f2..1a78f0a 100644 --- a/swh/loader/mercurial/hgutil.py +++ b/swh/loader/mercurial/hgutil.py @@ -1,172 +1,179 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from dataclasses import dataclass import io import os import signal import time import traceback from typing import Dict, List, Mapping, NewType, Optional, Set from billiard import Process, Queue # The internal Mercurial API is not guaranteed to be stable. from mercurial import bookmarks, context, error, hg, smartset, util # type: ignore import mercurial.ui # type: ignore NULLID = mercurial.node.nullid HgNodeId = NewType("HgNodeId", bytes) Repository = hg.localrepo BaseContext = context.basectx LRUCacheDict = util.lrucachedict HgSpanSet = smartset._spanset HgFilteredSet = smartset.filteredset LookupError = error.LookupError def repository(path: str) -> hg.localrepo: ui = mercurial.ui.ui.load() return hg.repository(ui, path.encode()) @dataclass class BranchingInfo: tips: Mapping[bytes, HgNodeId] """The first head of the branch, sorted by nodeid if there are multiple heads.""" bookmarks: Mapping[bytes, HgNodeId] """all bookmarks in the repository (except local divergent ones)""" open_heads: Mapping[bytes, List[HgNodeId]] """All *open* heads of a given branch, sorted by nodeid""" closed_heads: Mapping[bytes, List[HgNodeId]] """All *closed* heads of a given branch, sorted by nodeid, if any""" default_branch_alias: Optional[bytes] """The default snapshot branch to show in the UI""" def branching_info(repo: hg.localrepo, ignored: Set[int]) -> BranchingInfo: """Lists all relevant information about branch heads and bookmarks, grouped by type. `ignored`: Revisions that we ignore during loading because they are corrupted or have a corrupted ancestor. Categories may have overlapping nodes: a branch tip can be a closed branch head and have a bookmark on it, for example. """ branch_tips: Dict[bytes, HgNodeId] = {} branch_open_heads = defaultdict(list) branch_closed_heads = defaultdict(list) all_bookmarks = bookmarks.listbookmarks(repo) for branch_name, heads in repo.branchmap().items(): # Sort the heads by node id since it's stable and doesn't depend on local # topology like cloning order. for head in sorted(heads): head = repo[head] if head.rev() in ignored: # This revision or one of its ancestors is corrupted, ignore it continue node_id = head.node() if head.closesbranch(): branch_closed_heads[branch_name].append(node_id) else: if not branch_tips.get(branch_name): branch_tips[branch_name] = node_id branch_open_heads[branch_name].append(node_id) # The default revision is where the "@" bookmark is, or failing that the tip of the # `default` branch. For our purposes we're trying to find a branch tip to alias to, # so only return those if they are branch tips, otherwise don't bother. default_rev_alias = None at_bookmark = all_bookmarks.get(b"@") if at_bookmark is not None: bookmark_at_branch = repo[at_bookmark].branch() if branch_tips.get(bookmark_at_branch) is not None: default_rev_alias = b"bookmarks/@" if default_rev_alias is None and branch_tips.get(b"default") is not None: default_rev_alias = b"branch-tip/default" branches_with_one_head = set() for branch, heads in branch_open_heads.items(): if len(heads) == 1: branches_with_one_head.add(branch) # The most common case is one head per branch. Simplifying this means we have # less duplicate data, because open heads are the same as open branch tips. # We don't do more complex deduplication, this is just a simple optimization. for branch in branches_with_one_head: del branch_open_heads[branch] + # for bookmarks, the ids listed are not aligned with the rest, it's human + # readable id as bytes string instead of bytes string. Hence the extra mapping. + branch_bookmarks = { + branch: HgNodeId(bytes.fromhex(node_id.decode())) + for branch, node_id in all_bookmarks.items() + } + return BranchingInfo( tips=branch_tips, - bookmarks=all_bookmarks, + bookmarks=branch_bookmarks, open_heads=branch_open_heads, closed_heads=branch_closed_heads, default_branch_alias=default_rev_alias, ) class CloneTimeout(Exception): pass class CloneFailure(Exception): pass def _clone_task(src: str, dest: str, errors: Queue) -> None: """Clone task to run in a subprocess. Args: src: clone source dest: clone destination errors: message queue to communicate errors """ try: hg.clone( ui=mercurial.ui.ui.load(), peeropts={}, source=src.encode(), dest=dest.encode(), update=False, ) except Exception as e: exc_buffer = io.StringIO() traceback.print_exc(file=exc_buffer) errors.put_nowait(exc_buffer.getvalue()) raise e def clone(src: str, dest: str, timeout: float) -> None: """Clone a repository with timeout. Args: src: clone source dest: clone destination timeout: timeout in seconds """ errors: Queue = Queue() process = Process(target=_clone_task, args=(src, dest, errors)) process.start() process.join(timeout) if process.is_alive(): process.terminate() # Give it literally a second (in successive steps of 0.1 second), then kill it. # Can't use `process.join(1)` here, billiard appears to be bugged # https://github.com/celery/billiard/issues/270 killed = False for _ in range(10): time.sleep(0.1) if not process.is_alive(): break else: killed = True os.kill(process.pid, signal.SIGKILL) raise CloneTimeout(src, timeout, killed) if not errors.empty(): raise CloneFailure(src, dest, errors.get()) diff --git a/swh/loader/mercurial/tests/test_from_disk.py b/swh/loader/mercurial/tests/test_from_disk.py index b2b23db..3a3d9c9 100644 --- a/swh/loader/mercurial/tests/test_from_disk.py +++ b/swh/loader/mercurial/tests/test_from_disk.py @@ -1,739 +1,738 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from hashlib import sha1 import os from pathlib import Path import subprocess import unittest import attr import pytest from swh.loader.mercurial.loader import HgBundle20Loader from swh.loader.mercurial.utils import parse_visit_date from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.from_disk import Content, DentryPerms from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.identifiers import ObjectType from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType from swh.storage import get_storage from swh.storage.algos.snapshot import snapshot_get_latest from ..from_disk import EXTID_VERSION, HgDirectory, HgLoaderFromDisk from .loader_checker import ExpectedSwhids, LoaderChecker VISIT_DATE = parse_visit_date("2016-05-03 15:16:32+00") assert VISIT_DATE is not None def random_content() -> Content: """Create minimal content object.""" data = str(datetime.now()).encode() return Content({"sha1_git": sha1(data).digest(), "perms": DentryPerms.content}) def test_hg_directory_creates_missing_directories(): directory = HgDirectory() directory[b"path/to/some/content"] = random_content() def test_hg_directory_get(): content = random_content() directory = HgDirectory() assert directory.get(b"path/to/content") is None assert directory.get(b"path/to/content", content) == content directory[b"path/to/content"] = content assert directory.get(b"path/to/content") == content def test_hg_directory_deletes_empty_directories(): directory = HgDirectory() content = random_content() directory[b"path/to/content"] = content directory[b"path/to/some/deep/content"] = random_content() del directory[b"path/to/some/deep/content"] assert directory.get(b"path/to/some/deep") is None assert directory.get(b"path/to/some") is None assert directory.get(b"path/to/content") == content def test_hg_directory_when_directory_replaces_file(): directory = HgDirectory() directory[b"path/to/some"] = random_content() directory[b"path/to/some/content"] = random_content() # Those tests assert expectations on repository loading # by reading expected values from associated json files # produced by the `swh-hg-identify` command line utility. # # It has more granularity than historical tests. # Assertions will tell if the error comes from the directories # revisions or release rather than only checking the snapshot. # # With more work it should event be possible to know which part # of an object is faulty. @pytest.mark.parametrize( "archive_name", ("hello", "transplant", "the-sandbox", "example") ) def test_examples(swh_storage, datadir, tmp_path, archive_name): archive_path = Path(datadir, f"{archive_name}.tgz") json_path = Path(datadir, f"{archive_name}.json") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) LoaderChecker( loader=HgLoaderFromDisk(swh_storage, repo_url), expected=ExpectedSwhids.load(json_path), ).check() # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. def test_loader_hg_new_visit_no_release(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "the-sandbox" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoaderFromDisk(swh_storage, url=repo_url) assert loader.load() == {"status": "eventful"} tips = { b"branch-tip/default": "70e750bb046101fdced06f428e73fee471509c56", b"branch-tip/develop": "a9c4534552df370f43f0ef97146f393ef2f2a08c", } closed = { b"feature/fun_time": "4d640e8064fe69b4c851dfd43915c431e80c7497", b"feature/green2_loader": "94be9abcf9558213ff301af0ecd8223451ce991d", b"feature/greenloader": "9f82d95bd3edfb7f18b1a21d6171170395ea44ce", b"feature/my_test": "dafa445964230e808148db043c126063ea1dc9b6", b"feature/read2_loader": "9e912851eb64e3a1e08fbb587de7a4c897ce5a0a", b"feature/readloader": "ddecbc16f4c916c39eacfcb2302e15a9e70a231e", b"feature/red": "cb36b894129ca7910bb81c457c72d69d5ff111bc", b"feature/split5_loader": "3ed4b85d30401fe32ae3b1d650f215a588293a9e", b"feature/split_causing": "c346f6ff7f42f2a8ff867f92ab83a6721057d86c", b"feature/split_loader": "5f4eba626c3f826820c4475d2d81410759ec911b", b"feature/split_loader5": "5017ce0b285351da09a2029ea2cf544f79b593c7", b"feature/split_loading": "4e2dc6d6073f0b6d348f84ded52f9143b10344b9", b"feature/split_redload": "2d4a801c9a9645fcd3a9f4c06418d8393206b1f3", b"feature/splitloading": "88b80615ed8561be74a700b92883ec0374ddacb0", b"feature/test": "61d762d65afb3150e2653d6735068241779c1fcf", b"feature/test_branch": "be44d5e6cc66580f59c108f8bff5911ee91a22e4", b"feature/test_branching": "d2164061453ecb03d4347a05a77db83f706b8e15", b"feature/test_dog": "2973e5dc9568ac491b198f6b7f10c44ddc04e0a3", } mapping = {b"branch-closed-heads/%s/0" % b: n for b, n in closed.items()} mapping.update(tips) expected_branches = { k: SnapshotBranch(target=hash_to_bytes(v), target_type=TargetType.REVISION) for k, v in mapping.items() } expected_branches[b"HEAD"] = SnapshotBranch( target=b"branch-tip/default", target_type=TargetType.ALIAS ) expected_snapshot = Snapshot( id=hash_to_bytes("cbc609dcdced34dbd9938fe81b555170f1abc96f"), branches=expected_branches, ) assert_last_visit_matches( loader.storage, repo_url, status="full", type="hg", snapshot=expected_snapshot.id, ) check_snapshot(expected_snapshot, loader.storage) stats = get_stats(loader.storage) expected_stats = { "content": 2, "directory": 3, "origin": 1, "origin_visit": 1, "release": 0, "revision": 58, "skipped_content": 0, "snapshot": 1, } assert stats == expected_stats loader2 = HgLoaderFromDisk(swh_storage, url=repo_url) assert loader2.load() == {"status": "uneventful"} # nothing new happened stats2 = get_stats(loader2.storage) expected_stats2 = expected_stats.copy() expected_stats2["origin_visit"] = 2 # one new visit recorded assert stats2 == expected_stats2 assert_last_visit_matches( loader2.storage, repo_url, status="full", type="hg", snapshot=expected_snapshot.id, ) # but we got a snapshot nonetheless # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. def test_loader_hg_new_visit_with_release(swh_storage, datadir, tmp_path): """Eventful visit with release should yield 1 snapshot""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,) actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} # then stats = get_stats(loader.storage) assert stats == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 1, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } # cf. test_loader.org for explaining from where those hashes tip_release = hash_to_bytes("515c4d72e089404356d0f4b39d60f948b8999140") release = loader.storage.release_get([tip_release])[0] assert release is not None tip_revision_default = hash_to_bytes("c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27") revision = loader.storage.revision_get([tip_revision_default])[0] assert revision is not None expected_snapshot = Snapshot( id=hash_to_bytes("7ef082aa8b53136b1bed97f734504be32679bbec"), branches={ b"branch-tip/default": SnapshotBranch( target=tip_revision_default, target_type=TargetType.REVISION, ), b"tags/0.1": SnapshotBranch( target=tip_release, target_type=TargetType.RELEASE, ), b"HEAD": SnapshotBranch( target=b"branch-tip/default", target_type=TargetType.ALIAS, ), }, ) check_snapshot(expected_snapshot, loader.storage) assert_last_visit_matches( loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full", snapshot=expected_snapshot.id, ) # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path): """Visit a mercurial repository visit transplant operations within should yield a snapshot as well. """ archive_name = "transplant" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,) # load hg repository actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} # collect swh revisions assert_last_visit_matches( loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" ) revisions = [] snapshot = snapshot_get_latest(loader.storage, repo_url) for branch in snapshot.branches.values(): if branch.target_type.value != "revision": continue revisions.append(branch.target) # extract original changesets info and the transplant sources hg_changesets = set() transplant_sources = set() for rev in loader.storage.revision_log(revisions): extids = list( loader.storage.extid_get_from_target(ObjectType.REVISION, [rev["id"]]) ) assert len(extids) == 1 hg_changesets.add(hash_to_hex(extids[0].extid)) for k, v in rev["extra_headers"]: if k == b"transplant_source": transplant_sources.add(v.decode("ascii")) # check extracted data are valid assert len(hg_changesets) > 0 assert len(transplant_sources) > 0 assert transplant_sources <= hg_changesets def _partial_copy_storage( old_storage, origin_url: str, mechanism: str, copy_revisions: bool ): """Create a new storage, and only copy ExtIDs or head revisions to it.""" new_storage = get_storage(cls="memory") snapshot = snapshot_get_latest(old_storage, origin_url) assert snapshot heads = [branch.target for branch in snapshot.branches.values()] if mechanism == "extid": extids = old_storage.extid_get_from_target(ObjectType.REVISION, heads) new_storage.extid_add(extids) if copy_revisions: # copy revisions, but erase their metadata to make sure the loader doesn't # fallback to revision.metadata["nodeid"] revisions = [ attr.evolve(rev, metadata={}) for rev in old_storage.revision_get(heads) if rev ] new_storage.revision_add(revisions) else: assert mechanism == "same storage" return old_storage # copy origin, visit, status new_storage.origin_add(old_storage.origin_get([origin_url])) visit = old_storage.origin_visit_get_latest(origin_url) new_storage.origin_visit_add([visit]) statuses = old_storage.origin_visit_status_get(origin_url, visit.visit).results new_storage.origin_visit_status_add(statuses) new_storage.snapshot_add([snapshot]) return new_storage def test_load_unchanged_repo_should_be_uneventful( swh_storage, datadir, tmp_path, ): """Checks the loader can find which revisions it already loaded, using ExtIDs.""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_path = repo_url.replace("file://", "") loader = HgLoaderFromDisk(swh_storage, repo_path) assert loader.load() == {"status": "eventful"} assert get_stats(loader.storage) == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 1, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } visit_status = assert_last_visit_matches( loader.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full", ) assert visit_status.snapshot is not None # Create a new loader (to start with a clean slate, eg. remove the caches), # with the new, partial, storage loader2 = HgLoaderFromDisk(swh_storage, repo_path) assert loader2.load() == {"status": "uneventful"} # Should have all the objects assert get_stats(loader.storage) == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 2, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } visit_status2 = assert_last_visit_matches( loader2.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full", ) assert visit_status2.snapshot == visit_status.snapshot def test_closed_branch_incremental(swh_storage, datadir, tmp_path): """Test that a repository with a closed branch does not trip an incremental load""" archive_name = "example" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_path = repo_url.replace("file://", "") loader = HgLoaderFromDisk(swh_storage, repo_path) # Test 3 loads: full, and two incremental. assert loader.load() == {"status": "eventful"} expected_stats = { "content": 7, "directory": 16, "origin": 1, "origin_visit": 1, "release": 0, "revision": 9, "skipped_content": 0, "snapshot": 1, } assert get_stats(loader.storage) == expected_stats assert loader.load() == {"status": "uneventful"} assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 1 + 1} assert loader.load() == {"status": "uneventful"} assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 2 + 1} def test_old_loader_new_loader(swh_storage, datadir, tmp_path): archive_name = "example" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_path = repo_url.replace("file://", "") old_loader = HgBundle20Loader(swh_storage, repo_path) assert old_loader.load() == {"status": "eventful"} expected_stats = { "content": 7, "directory": 16, "origin": 1, "origin_visit": 1, "release": 0, "revision": 9, "skipped_content": 0, "snapshot": 1, } assert get_stats(old_loader.storage) == expected_stats # Will pick up more branches, hence a different snapshot loader = HgLoaderFromDisk(swh_storage, repo_path) res = loader.load() new_expected_stats = { **expected_stats, "origin_visit": 2, "snapshot": 2, } assert get_stats(loader.storage) == new_expected_stats assert res == {"status": "eventful"} # Shouldn't pick up anything now loader = HgLoaderFromDisk(swh_storage, repo_path) assert loader.load() == {"status": "uneventful"} # Shouldn't pick up anything either after another load loader = HgLoaderFromDisk(swh_storage, repo_path) assert loader.load() == {"status": "uneventful"} def test_load_unchanged_repo__dangling_extid(swh_storage, datadir, tmp_path): """Checks the loader will load revisions targeted by an ExtID if the revisions are missing from the storage""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_path = repo_url.replace("file://", "") loader = HgLoaderFromDisk(swh_storage, repo_path) assert loader.load() == {"status": "eventful"} assert get_stats(loader.storage) == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 1, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } old_storage = swh_storage # Create a new storage, and only copy ExtIDs or head revisions to it. # This should be enough for the loader to know revisions were already loaded new_storage = _partial_copy_storage( old_storage, repo_path, mechanism="extid", copy_revisions=False ) # Create a new loader (to start with a clean slate, eg. remove the caches), # with the new, partial, storage loader = HgLoaderFromDisk(new_storage, repo_path) assert get_stats(loader.storage) == { "content": 0, "directory": 0, "origin": 1, "origin_visit": 1, "release": 0, "revision": 0, "skipped_content": 0, "snapshot": 1, } assert loader.load() == {"status": "eventful"} assert get_stats(loader.storage) == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 2, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } def test_missing_filelog_should_not_crash(swh_storage, datadir, tmp_path): archive_name = "missing-filelog" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) directory = repo_url.replace("file://", "") loader = HgLoaderFromDisk( storage=swh_storage, url=repo_url, directory=directory, # specify directory to avoid clone visit_date=VISIT_DATE, ) actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} assert_last_visit_matches(swh_storage, repo_url, status="partial", type="hg") def test_multiple_open_heads(swh_storage, datadir, tmp_path): archive_name = "multiple-heads" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoaderFromDisk(storage=swh_storage, url=repo_url,) actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} assert_last_visit_matches(swh_storage, repo_url, status="full", type="hg") snapshot = snapshot_get_latest(swh_storage, repo_url) expected_branches = [ b"HEAD", b"branch-heads/default/0", b"branch-heads/default/1", b"branch-tip/default", ] assert sorted(snapshot.branches.keys()) == expected_branches # Check that we don't load anything the second time loader = HgLoaderFromDisk(storage=swh_storage, url=repo_url,) actual_load_status = loader.load() assert actual_load_status == {"status": "uneventful"} def hg_strip(repo: str, revset: str) -> None: """Removes `revset` and all of their descendants from the local repository.""" # Previously called `hg strip`, it was renamed to `hg debugstrip` in Mercurial 5.7 # because it's most likely not what most users want to do (they should use some kind # of history-rewriting tool like `histedit` or `prune`). # But here, it's exactly what we want to do. subprocess.check_call(["hg", "debugstrip", revset], cwd=repo) def test_load_repo_with_new_commits(swh_storage, datadir, tmp_path): archive_name = "hello" archive_path = Path(datadir, f"{archive_name}.tgz") json_path = Path(datadir, f"{archive_name}.json") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) # first load with missing commits hg_strip(repo_url.replace("file://", ""), "tip") loader = HgLoaderFromDisk(swh_storage, repo_url) assert loader.load() == {"status": "eventful"} assert get_stats(loader.storage) == { "content": 2, "directory": 2, "origin": 1, "origin_visit": 1, "release": 0, "revision": 2, "skipped_content": 0, "snapshot": 1, } # second load with all commits repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoaderFromDisk(swh_storage, repo_url) checker = LoaderChecker(loader=loader, expected=ExpectedSwhids.load(json_path),) checker.check() assert get_stats(loader.storage) == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 2, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 2, } def test_load_repo_check_extids_write_version(swh_storage, datadir, tmp_path): """ExtIDs should be stored with a given version when loading is done""" archive_name = "hello" archive_path = Path(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) hg_strip(repo_url.replace("file://", ""), "tip") loader = HgLoaderFromDisk(swh_storage, repo_url) assert loader.load() == {"status": "eventful"} # Ensure we write ExtIDs to a specific version. snapshot = snapshot_get_latest(swh_storage, repo_url) # First, filter out revisions from that snapshot revision_ids = [ branch.target for branch in snapshot.branches.values() if branch.target_type == TargetType.REVISION ] assert len(revision_ids) > 0 # Those revisions should have their associated ExtID version set to EXTID_VERSION extids = swh_storage.extid_get_from_target(ObjectType.REVISION, revision_ids) assert len(extids) == len(revision_ids) for extid in extids: assert extid.extid_version == EXTID_VERSION def test_load_new_extid_should_be_eventful(swh_storage, datadir, tmp_path): """Changing the extid version should make loaders ignore existing extids, and load the repo again.""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_path = repo_url.replace("file://", "") with unittest.mock.patch("swh.loader.mercurial.from_disk.EXTID_VERSION", 0): loader = HgLoaderFromDisk(swh_storage, repo_path) assert loader.load() == {"status": "eventful"} loader = HgLoaderFromDisk(swh_storage, repo_path) assert loader.load() == {"status": "eventful"} loader = HgLoaderFromDisk(swh_storage, repo_path) assert loader.load() == {"status": "uneventful"} with unittest.mock.patch("swh.loader.mercurial.from_disk.EXTID_VERSION", 10000): loader = HgLoaderFromDisk(swh_storage, repo_path) assert loader.load() == {"status": "eventful"} loader = HgLoaderFromDisk(swh_storage, repo_path) assert loader.load() == {"status": "uneventful"} def test_loader_hg_extid_filtering(swh_storage, datadir, tmp_path): """The first visit of a fork should filter already seen revisions (through extids) """ archive_name = "the-sandbox" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoaderFromDisk(swh_storage, url=repo_url) assert loader.load() == {"status": "eventful"} stats = get_stats(loader.storage) expected_stats = { "content": 2, "directory": 3, "origin": 1, "origin_visit": 1, "release": 0, "revision": 58, "skipped_content": 0, "snapshot": 1, } assert stats == expected_stats visit_status = assert_last_visit_matches( loader.storage, repo_url, status="full", type="hg", ) # Make a fork of the first repository we ingested fork_url = prepare_repository_from_archive( archive_path, "the-sandbox-reloaded", tmp_path ) loader2 = HgLoaderFromDisk( swh_storage, url=fork_url, directory=str(tmp_path / archive_name) ) assert loader2.load() == {"status": "uneventful"} stats = get_stats(loader.storage) expected_stats2 = expected_stats.copy() expected_stats2.update( {"origin": 1 + 1, "origin_visit": 1 + 1,} ) assert stats == expected_stats2 visit_status2 = assert_last_visit_matches( loader.storage, fork_url, status="full", type="hg", ) assert visit_status.snapshot is not None assert visit_status2.snapshot == visit_status.snapshot -@pytest.mark.xfail -def test_loader_missing_hgnodeid_mapping(swh_storage, datadir, tmp_path): - """TODO: Analyze this issue and update the comment about the proper behavior. +def test_loader_repository_with_bookmark_information(swh_storage, datadir, tmp_path): + """Repository with bookmark information should be ingested correctly """ archive_name = "anomad-d" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoaderFromDisk(swh_storage, url=repo_url) assert loader.load() == {"status": "eventful"}