diff --git a/swh/loader/mercurial/from_disk.py b/swh/loader/mercurial/from_disk.py index 78de435..f1183fc 100644 --- a/swh/loader/mercurial/from_disk.py +++ b/swh/loader/mercurial/from_disk.py @@ -1,523 +1,590 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import deque from datetime import datetime import os from shutil import rmtree from tempfile import mkdtemp -from typing import Deque, Dict, Optional, Tuple, TypeVar, Union +from typing import Deque, Dict, List, Optional, Tuple, TypeVar, Union from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders from swh.loader.mercurial.utils import parse_visit_date from swh.model.from_disk import Content, DentryPerms, Directory -from swh.model.hashutil import MultiHash, hash_to_bytehex +from swh.model.hashutil import MultiHash, hash_to_bytehex, hash_to_bytes from swh.model.model import ( ObjectType, Origin, Person, Release, Revision, RevisionType, Sha1Git, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) from swh.model.model import Content as ModelContent +from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface from . import hgutil from .archive_extract import tmp_extract -from .hgutil import HgNodeId +from .hgutil import HgFilteredSet, HgNodeId, HgSpanSet FLAG_PERMS = { b"l": DentryPerms.symlink, b"x": DentryPerms.executable_content, b"": DentryPerms.content, } # type: Dict[bytes, DentryPerms] TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial.from_disk" T = TypeVar("T") class HgDirectory(Directory): """A more practical directory. - creates missing parent directories - removes empty directories """ def __setitem__(self, path: bytes, value: Union[Content, "HgDirectory"]) -> None: if b"/" in path: head, tail = path.split(b"/", 1) directory = self.get(head) if directory is None or isinstance(directory, Content): directory = HgDirectory() self[head] = directory directory[tail] = value else: super().__setitem__(path, value) def __delitem__(self, path: bytes) -> None: super().__delitem__(path) while b"/" in path: # remove empty parent directories path = path.rsplit(b"/", 1)[0] if len(self[path]) == 0: super().__delitem__(path) else: break def get( self, path: bytes, default: Optional[T] = None ) -> Optional[Union[Content, "HgDirectory", T]]: # TODO move to swh.model.from_disk.Directory try: return self[path] except KeyError: return default class HgLoaderFromDisk(BaseLoader): """Load a mercurial repository from a local repository.""" CONFIG_BASE_FILENAME = "loader/mercurial" visit_type = "hg" def __init__( self, storage: StorageInterface, url: str, directory: Optional[str] = None, logging_class: str = "swh.loader.mercurial.LoaderFromDisk", visit_date: Optional[datetime] = None, temp_directory: str = "/tmp", clone_timeout_seconds: int = 7200, content_cache_size: int = 10_000, max_content_size: Optional[int] = None, ): """Initialize the loader. Args: url: url of the repository. directory: directory of the local repository. logging_class: class of the loader logger. visit_date: visit date of the repository config: loader configuration """ super().__init__( storage=storage, logging_class=logging_class, max_content_size=max_content_size, ) self._temp_directory = temp_directory self._clone_timeout = clone_timeout_seconds self.origin_url = url self.visit_date = visit_date self.directory = directory self._repo: Optional[hgutil.Repository] = None self._revision_nodeid_to_swhid: Dict[HgNodeId, Sha1Git] = {} self._repo_directory: Optional[str] = None # keeps the last processed hg nodeid # it is used for differential tree update by store_directories # NULLID is the parent of the first revision self._last_hg_nodeid = hgutil.NULLID # keeps the last revision tree # it is used for differential tree update by store_directories self._last_root = HgDirectory() # Cache the content hash across revisions to avoid recalculation. self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict( content_cache_size, ) + # hg node id of the latest snapshot branch heads + # used to find what are the new revisions since last snapshot + self._latest_heads: List[HgNodeId] = [] + + self._load_status = "eventful" + def pre_cleanup(self) -> None: """As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues. """ clean_dangling_folders( self._temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log, ) def cleanup(self) -> None: """Last step executed by the loader.""" if self._repo_directory and os.path.exists(self._repo_directory): self.log.debug(f"Cleanup up repository {self._repo_directory}") rmtree(self._repo_directory) def prepare_origin_visit(self) -> None: """First step executed by the loader to prepare origin and visit references. Set/update self.origin, and optionally self.origin_url, self.visit_date. """ self.origin = Origin(url=self.origin_url) def prepare(self) -> None: """Second step executed by the loader to prepare some state needed by the loader. """ + # Set here to allow multiple calls to load on the same loader instance + self._latest_heads = [] + + latest_snapshot = snapshot_get_latest(self.storage, self.origin_url) + if latest_snapshot: + snapshot_branches = [ + branch.target + for branch in latest_snapshot.branches.values() + if branch.target_type != TargetType.ALIAS + ] + self._latest_heads = [ + hash_to_bytes(revision.metadata["node"]) + for revision in self.storage.revision_get(snapshot_branches) + if revision and revision.metadata + ] def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading Returns: a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading. """ if not self.directory: # no local repository self._repo_directory = mkdtemp( prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix=f"-{os.getpid()}", dir=self._temp_directory, ) self.log.debug( f"Cloning {self.origin_url} to {self.directory} " f"with timeout {self._clone_timeout} seconds" ) hgutil.clone(self.origin_url, self._repo_directory, self._clone_timeout) else: # existing local repository # Allow to load on disk repository without cloning # for testing purpose. self._repo_directory = self.directory self._repo = hgutil.repository(self._repo_directory) return False + def get_hg_revs_to_load(self) -> Union[HgFilteredSet, HgSpanSet]: + """Return the hg revision numbers to load.""" + assert self._repo is not None + repo: hgutil.Repository = self._repo + if self._latest_heads: + existing_heads = [] # heads that still exist in the repository + for hg_nodeid in self._latest_heads: + try: + rev = repo[hg_nodeid].rev() + existing_heads.append(rev) + except KeyError: # the node does not exist anymore + pass + + # select revisions that are not ancestors of heads + # and not the heads themselves + new_revs = repo.revs("not ::(%ld)", existing_heads) + + # for now, reload all revisions if there are new commits + # otherwise the loader will crash on missing parents + # incremental loading will come in next commits + if new_revs: + return repo.revs("all()") + else: + return new_revs + else: + return repo.revs("all()") + def store_data(self): """Store fetched data in the database.""" - for rev in self._repo: + revs = self.get_hg_revs_to_load() + if not revs: + self._load_status = "uneventful" + return + + for rev in revs: self.store_revision(self._repo[rev]) branch_by_hg_nodeid: Dict[HgNodeId, bytes] = { hg_nodeid: name for name, hg_nodeid in hgutil.branches(self._repo).items() } tags_by_name: Dict[bytes, HgNodeId] = self._repo.tags() tags_by_hg_nodeid: Dict[HgNodeId, bytes] = { hg_nodeid: name for name, hg_nodeid in tags_by_name.items() } snapshot_branches: Dict[bytes, SnapshotBranch] = {} for hg_nodeid, revision_swhid in self._revision_nodeid_to_swhid.items(): tag_name = tags_by_hg_nodeid.get(hg_nodeid) # tip is listed in the tags by the mercurial api # but its not a tag defined by the user in `.hgtags` if tag_name and tag_name != b"tip": snapshot_branches[tag_name] = SnapshotBranch( target=self.store_release(tag_name, revision_swhid), target_type=TargetType.RELEASE, ) if hg_nodeid in branch_by_hg_nodeid: name = branch_by_hg_nodeid[hg_nodeid] snapshot_branches[name] = SnapshotBranch( target=revision_swhid, target_type=TargetType.REVISION, ) # The tip is mapped to `HEAD` to match # the historical implementation if hg_nodeid == tags_by_name[b"tip"]: snapshot_branches[b"HEAD"] = SnapshotBranch( target=name, target_type=TargetType.ALIAS, ) snapshot = Snapshot(branches=snapshot_branches) self.storage.snapshot_add([snapshot]) self.flush() self.loaded_snapshot_id = snapshot.id + def load_status(self) -> Dict[str, str]: + """Detailed loading status. + + Defaults to logging an eventful load. + + Returns: a dictionary that is eventually passed back as the task's + result to the scheduler, allowing tuning of the task recurrence + mechanism. + """ + return { + "status": self._load_status, + } + def get_revision_id_from_hg_nodeid(self, hg_nodeid: HgNodeId) -> Sha1Git: """Return the swhid of a revision given its hg nodeid. Args: hg_nodeid: the hg nodeid of the revision. Returns: the swhid of the revision. """ return self._revision_nodeid_to_swhid[hg_nodeid] def get_revision_parents(self, rev_ctx: hgutil.BaseContext) -> Tuple[Sha1Git, ...]: """Return the swhids of the parent revisions. Args: hg_nodeid: the hg nodeid of the revision. Returns: the swhids of the parent revisions. """ parents = [] for parent_ctx in rev_ctx.parents(): parent_hg_nodeid = parent_ctx.node() # nullid is the value of a parent that does not exist if parent_hg_nodeid == hgutil.NULLID: continue parents.append(self.get_revision_id_from_hg_nodeid(parent_hg_nodeid)) return tuple(parents) def store_revision(self, rev_ctx: hgutil.BaseContext) -> None: """Store a revision given its hg nodeid. Args: rev_ctx: the he revision context. Returns: the swhid of the stored revision. """ hg_nodeid = rev_ctx.node() root_swhid = self.store_directories(rev_ctx) # `Person.from_fullname` is compatible with mercurial's freeform author # as fullname is what is used in revision hash when available. author = Person.from_fullname(rev_ctx.user()) (timestamp, offset) = rev_ctx.date() # TimestampWithTimezone.from_dict will change name # as it accept more than just dicts rev_date = TimestampWithTimezone.from_dict(int(timestamp)) extra_headers = [ (b"time_offset_seconds", str(offset).encode(),), ] for key, value in rev_ctx.extra().items(): # The default branch is skipped to match # the historical implementation if key == b"branch" and value == b"default": continue # transplant_source is converted to match # the historical implementation if key == b"transplant_source": value = hash_to_bytehex(value) extra_headers.append((key, value)) revision = Revision( author=author, date=rev_date, committer=author, committer_date=rev_date, type=RevisionType.MERCURIAL, directory=root_swhid, message=rev_ctx.description(), metadata={"node": hg_nodeid.hex()}, extra_headers=tuple(extra_headers), synthetic=False, parents=self.get_revision_parents(rev_ctx), ) self._revision_nodeid_to_swhid[hg_nodeid] = revision.id self.storage.revision_add([revision]) def store_release(self, name: bytes, target=Sha1Git) -> Sha1Git: """Store a release given its name and its target. A release correspond to a user defined tag in mercurial. The mercurial api as a `tip` tag that must be ignored. Args: name: name of the release. target: swhid of the target revision. Returns: the swhid of the stored release. """ release = Release( name=name, target=target, target_type=ObjectType.REVISION, message=None, metadata=None, synthetic=False, author=Person(name=None, email=None, fullname=b""), date=None, ) self.storage.release_add([release]) return release.id def store_content(self, rev_ctx: hgutil.BaseContext, file_path: bytes) -> Content: """Store a revision content hg nodeid and file path. Content is a mix of file content at a given revision and its permissions found in the changeset's manifest. Args: rev_ctx: the he revision context. file_path: the hg path of the content. Returns: the swhid of the top level directory. """ hg_nodeid = rev_ctx.node() file_ctx = rev_ctx[file_path] file_nodeid = file_ctx.filenode() perms = FLAG_PERMS[file_ctx.flags()] # Key is file_nodeid + perms because permissions does not participate # in content hash in hg while it is the case in swh. cache_key = (file_nodeid, perms) sha1_git = self._content_hash_cache.get(cache_key) if sha1_git is not None: return Content({"sha1_git": sha1_git, "perms": perms}) data = file_ctx.data() content_data = MultiHash.from_data(data).digest() content_data["length"] = len(data) content_data["perms"] = perms content_data["data"] = data content_data["status"] = "visible" content = Content(content_data) model = content.to_model() if isinstance(model, ModelContent): self.storage.content_add([model]) else: raise ValueError( f"{file_path!r} at rev {hg_nodeid.hex()!r} " "produced {type(model)!r} instead of {ModelContent!r}" ) self._content_hash_cache[cache_key] = content.hash # Here we make sure to return only necessary data. return Content({"sha1_git": content.hash, "perms": perms}) def store_directories(self, rev_ctx: hgutil.BaseContext) -> Sha1Git: """Store a revision directories given its hg nodeid. Mercurial as no directory as in git. A Git like tree must be build from file paths to obtain each directory hash. Args: rev_ctx: the he revision context. Returns: the swhid of the top level directory. """ repo: hgutil.Repository = self._repo # mypy can't infer that repo is not None prev_ctx = repo[self._last_hg_nodeid] # TODO maybe do diff on parents status = prev_ctx.status(rev_ctx) for file_path in status.removed: del self._last_root[file_path] for file_path in status.added: content = self.store_content(rev_ctx, file_path) self._last_root[file_path] = content for file_path in status.modified: content = self.store_content(rev_ctx, file_path) self._last_root[file_path] = content self._last_hg_nodeid = rev_ctx.node() directories: Deque[Directory] = deque([self._last_root]) while directories: directory = directories.pop() self.storage.directory_add([directory.to_model()]) directories.extend( [item for item in directory.values() if isinstance(item, Directory)] ) return self._last_root.hash class HgArchiveLoaderFromDisk(HgLoaderFromDisk): """Mercurial loader for repository wrapped within tarballs.""" def __init__( self, storage: StorageInterface, url: str, visit_date: Optional[datetime] = None, archive_path: str = None, temp_directory: str = "/tmp", max_content_size: Optional[int] = None, ): super().__init__( storage=storage, url=url, visit_date=visit_date, logging_class="swh.loader.mercurial.ArchiveLoaderFromDisk", temp_directory=temp_directory, max_content_size=max_content_size, ) self.archive_extract_temp_dir = None self.archive_path = archive_path def prepare(self): """Extract the archive instead of cloning.""" self.archive_extract_temp_dir = tmp_extract( archive=self.archive_path, dir=self._temp_directory, prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix=f".dump-{os.getpid()}", log=self.log, source=self.origin_url, ) repo_name = os.listdir(self.temp_dir)[0] self.directory = os.path.join(self.archive_extract_temp_dir, repo_name) super().prepare() # Allow direct usage of the loader from the command line with # `python -m swh.loader.mercurial.from_disk $ORIGIN_URL` if __name__ == "__main__": import logging import click logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(process)d %(message)s" ) @click.command() @click.option("--origin-url", help="origin url") @click.option("--hg-directory", help="Path to mercurial repository to load") @click.option("--visit-date", default=None, help="Visit date") def main(origin_url, hg_directory, visit_date): from swh.storage import get_storage storage = get_storage(cls="memory") return HgLoaderFromDisk( storage, origin_url, directory=hg_directory, visit_date=parse_visit_date(visit_date), ).load() main() diff --git a/swh/loader/mercurial/hgutil.py b/swh/loader/mercurial/hgutil.py index 71438fb..6ef1524 100644 --- a/swh/loader/mercurial/hgutil.py +++ b/swh/loader/mercurial/hgutil.py @@ -1,89 +1,91 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import io from multiprocessing import Process, Queue import traceback from typing import Dict, NewType # The internal Mercurial API is not guaranteed to be stable. -from mercurial import context, hg, util # type: ignore +from mercurial import context, hg, smartset, util # type: ignore import mercurial.ui # type: ignore NULLID = mercurial.node.nullid HgNodeId = NewType("HgNodeId", bytes) Repository = hg.localrepo BaseContext = context.basectx LRUCacheDict = util.lrucachedict +HgSpanSet = smartset._spanset +HgFilteredSet = smartset.filteredset def repository(path: str) -> hg.localrepo: ui = mercurial.ui.ui.load() return hg.repository(ui, path.encode()) def branches(repo: hg.localrepo) -> Dict[bytes, HgNodeId]: """List repository named branches and their tip node.""" result = {} for tag, heads, tip, isclosed in repo.branchmap().iterbranches(): if isclosed: continue result[tag] = tip return result class CloneTimeout(Exception): pass class CloneFailure(Exception): pass def _clone_task(src: str, dest: str, errors: Queue) -> None: """Clone task to run in a subprocess. Args: src: clone source dest: clone destination errors: message queue to communicate errors """ try: hg.clone( ui=mercurial.ui.ui.load(), peeropts={}, source=src.encode(), dest=dest.encode(), update=False, ) except Exception as e: exc_buffer = io.StringIO() traceback.print_exc(file=exc_buffer) errors.put_nowait(exc_buffer.getvalue()) raise e def clone(src: str, dest: str, timeout: int) -> None: """Clone a repository with timeout. Args: src: clone source dest: clone destination timeout: timeout in seconds """ errors: Queue = Queue() process = Process(target=_clone_task, args=(src, dest, errors)) process.start() process.join(timeout) if process.is_alive(): process.terminate() process.join(1) if process.is_alive(): process.kill() raise CloneTimeout(src, timeout) if not errors.empty(): raise CloneFailure(src, dest, errors.get()) diff --git a/swh/loader/mercurial/tests/test_from_disk.py b/swh/loader/mercurial/tests/test_from_disk.py index d38eb39..72b4962 100644 --- a/swh/loader/mercurial/tests/test_from_disk.py +++ b/swh/loader/mercurial/tests/test_from_disk.py @@ -1,248 +1,281 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from hashlib import sha1 import os from swh.loader.mercurial.utils import parse_visit_date from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.from_disk import Content, DentryPerms from swh.model.hashutil import hash_to_bytes from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType from swh.storage.algos.snapshot import snapshot_get_latest from ..from_disk import HgDirectory, HgLoaderFromDisk from .loader_checker import ExpectedSwhids, LoaderChecker VISIT_DATE = parse_visit_date("2016-05-03 15:16:32+00") assert VISIT_DATE is not None def random_content() -> Content: """Create minimal content object.""" data = str(datetime.now()).encode() return Content({"sha1_git": sha1(data).digest(), "perms": DentryPerms.content}) def test_hg_directory_creates_missing_directories(): directory = HgDirectory() directory[b"path/to/some/content"] = random_content() def test_hg_directory_get(): content = random_content() directory = HgDirectory() assert directory.get(b"path/to/content") is None assert directory.get(b"path/to/content", content) == content directory[b"path/to/content"] = content assert directory.get(b"path/to/content") == content def test_hg_directory_deletes_empty_directories(): directory = HgDirectory() content = random_content() directory[b"path/to/content"] = content directory[b"path/to/some/deep/content"] = random_content() del directory[b"path/to/some/deep/content"] assert directory.get(b"path/to/some/deep") is None assert directory.get(b"path/to/some") is None assert directory.get(b"path/to/content") == content def test_hg_directory_when_directory_replaces_file(): directory = HgDirectory() directory[b"path/to/some"] = random_content() directory[b"path/to/some/content"] = random_content() # Those tests assert expectations on repository loading # by reading expected values from associated json files # produced by the `swh-hg-identify` command line utility. # # It has more granularity than historical tests. # Assertions will tell if the error comes from the directories # revisions or release rather than only checking the snapshot. # # With more work it should event be possible to know which part # of an object is faulty. def test_examples(swh_storage, datadir, tmp_path): for archive_name in ("hello", "transplant", "the-sandbox", "example"): archive_path = os.path.join(datadir, f"{archive_name}.tgz") json_path = os.path.join(datadir, f"{archive_name}.json") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) LoaderChecker( loader=HgLoaderFromDisk(swh_storage, repo_url), expected=ExpectedSwhids.load(json_path), ).check() # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. def test_loader_hg_new_visit_no_release(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "the-sandbox" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoaderFromDisk(swh_storage, url=repo_url) assert loader.load() == {"status": "eventful"} tip_revision_develop = "a9c4534552df370f43f0ef97146f393ef2f2a08c" tip_revision_default = "70e750bb046101fdced06f428e73fee471509c56" expected_snapshot = Snapshot( id=hash_to_bytes("3b8fe58e467deb7597b12a5fd3b2c096b8c02028"), branches={ b"develop": SnapshotBranch( target=hash_to_bytes(tip_revision_develop), target_type=TargetType.REVISION, ), b"default": SnapshotBranch( target=hash_to_bytes(tip_revision_default), target_type=TargetType.REVISION, ), b"HEAD": SnapshotBranch(target=b"develop", target_type=TargetType.ALIAS,), }, ) assert_last_visit_matches( loader.storage, repo_url, status="full", type="hg", snapshot=expected_snapshot.id, ) check_snapshot(expected_snapshot, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 2, "directory": 3, "origin": 1, "origin_visit": 1, "release": 0, "revision": 58, "skipped_content": 0, "snapshot": 1, } # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. def test_loader_hg_new_visit_with_release(swh_storage, datadir, tmp_path): """Eventful visit with release should yield 1 snapshot""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,) actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} # then stats = get_stats(loader.storage) assert stats == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 1, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } # cf. test_loader.org for explaining from where those hashes tip_release = hash_to_bytes("515c4d72e089404356d0f4b39d60f948b8999140") release = loader.storage.release_get([tip_release])[0] assert release is not None tip_revision_default = hash_to_bytes("c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27") revision = loader.storage.revision_get([tip_revision_default])[0] assert revision is not None expected_snapshot = Snapshot( id=hash_to_bytes("d35668e02e2ba4321dc951cd308cf883786f918a"), branches={ b"default": SnapshotBranch( target=tip_revision_default, target_type=TargetType.REVISION, ), b"0.1": SnapshotBranch(target=tip_release, target_type=TargetType.RELEASE,), b"HEAD": SnapshotBranch(target=b"default", target_type=TargetType.ALIAS,), }, ) check_snapshot(expected_snapshot, loader.storage) assert_last_visit_matches( loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full", snapshot=expected_snapshot.id, ) # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path): """Visit a mercurial repository visit transplant operations within should yield a snapshot as well. """ archive_name = "transplant" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,) # load hg repository actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} # collect swh revisions assert_last_visit_matches( loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" ) revisions = [] snapshot = snapshot_get_latest(loader.storage, repo_url) for branch in snapshot.branches.values(): if branch.target_type.value != "revision": continue revisions.append(branch.target) # extract original changesets info and the transplant sources hg_changesets = set() transplant_sources = set() for rev in loader.storage.revision_log(revisions): hg_changesets.add(rev["metadata"]["node"]) for k, v in rev["extra_headers"]: if k == b"transplant_source": transplant_sources.add(v.decode("ascii")) # check extracted data are valid assert len(hg_changesets) > 0 assert len(transplant_sources) > 0 assert transplant_sources.issubset(hg_changesets) + + +def test_load_unchanged_repo_should_be_uneventfull(swh_storage, datadir, tmp_path): + archive_name = "hello" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + repo_path = repo_url.replace("file://", "") + + loader = HgLoaderFromDisk(swh_storage, repo_path) + + assert loader.load() == {"status": "eventful"} + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 1, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 1, + } + + assert loader.load() == {"status": "uneventful"} + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 2, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 1, + }