diff --git a/swh/loader/mercurial/from_disk.py b/swh/loader/mercurial/from_disk.py index 3c2ff3a..ee1dbce 100644 --- a/swh/loader/mercurial/from_disk.py +++ b/swh/loader/mercurial/from_disk.py @@ -1,618 +1,685 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import deque from datetime import datetime import os from shutil import rmtree from tempfile import mkdtemp from typing import Deque, Dict, List, Optional, Tuple, TypeVar, Union from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders from swh.loader.mercurial.utils import parse_visit_date +from swh.model import identifiers from swh.model.from_disk import Content, DentryPerms, Directory from swh.model.hashutil import hash_to_bytehex, hash_to_bytes from swh.model.model import ( + ExtID, ObjectType, Origin, Person, Release, Revision, RevisionType, Sha1Git, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) from swh.model.model import Content as ModelContent from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface from . import hgutil from .archive_extract import tmp_extract from .hgutil import HgFilteredSet, HgNodeId, HgSpanSet FLAG_PERMS = { b"l": DentryPerms.symlink, b"x": DentryPerms.executable_content, b"": DentryPerms.content, } # type: Dict[bytes, DentryPerms] TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial.from_disk" +EXTID_TYPE = "hg-nodeid" + T = TypeVar("T") class CorruptedRevision(ValueError): """Raised when a revision is corrupted.""" def __init__(self, hg_nodeid: HgNodeId) -> None: super().__init__(hg_nodeid.hex()) self.hg_nodeid = hg_nodeid class HgDirectory(Directory): """A more practical directory. - creates missing parent directories - removes empty directories """ def __setitem__(self, path: bytes, value: Union[Content, "HgDirectory"]) -> None: if b"/" in path: head, tail = path.split(b"/", 1) directory = self.get(head) if directory is None or isinstance(directory, Content): directory = HgDirectory() self[head] = directory directory[tail] = value else: super().__setitem__(path, value) def __delitem__(self, path: bytes) -> None: super().__delitem__(path) while b"/" in path: # remove empty parent directories path = path.rsplit(b"/", 1)[0] if len(self[path]) == 0: super().__delitem__(path) else: break def get( self, path: bytes, default: Optional[T] = None ) -> Optional[Union[Content, "HgDirectory", T]]: # TODO move to swh.model.from_disk.Directory try: return self[path] except KeyError: return default class HgLoaderFromDisk(BaseLoader): """Load a mercurial repository from a local repository.""" CONFIG_BASE_FILENAME = "loader/mercurial" visit_type = "hg" def __init__( self, storage: StorageInterface, url: str, directory: Optional[str] = None, logging_class: str = "swh.loader.mercurial.LoaderFromDisk", visit_date: Optional[datetime] = None, temp_directory: str = "/tmp", clone_timeout_seconds: int = 7200, content_cache_size: int = 10_000, max_content_size: Optional[int] = None, ): """Initialize the loader. Args: url: url of the repository. directory: directory of the local repository. logging_class: class of the loader logger. visit_date: visit date of the repository config: loader configuration """ super().__init__( storage=storage, logging_class=logging_class, max_content_size=max_content_size, ) self._temp_directory = temp_directory self._clone_timeout = clone_timeout_seconds self.origin_url = url self.visit_date = visit_date self.directory = directory self._repo: Optional[hgutil.Repository] = None self._revision_nodeid_to_swhid: Dict[HgNodeId, Sha1Git] = {} self._repo_directory: Optional[str] = None # keeps the last processed hg nodeid # it is used for differential tree update by store_directories # NULLID is the parent of the first revision self._last_hg_nodeid = hgutil.NULLID # keeps the last revision tree # it is used for differential tree update by store_directories self._last_root = HgDirectory() # Cache the content hash across revisions to avoid recalculation. self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict( content_cache_size, ) # hg node id of the latest snapshot branch heads # used to find what are the new revisions since last snapshot self._latest_heads: List[bytes] = [] self._load_status = "eventful" # If set, will override the default value self._visit_status = None def pre_cleanup(self) -> None: """As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues. """ clean_dangling_folders( self._temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log, ) def cleanup(self) -> None: """Last step executed by the loader.""" if self._repo_directory and os.path.exists(self._repo_directory): self.log.debug(f"Cleanup up repository {self._repo_directory}") rmtree(self._repo_directory) def prepare_origin_visit(self) -> None: """First step executed by the loader to prepare origin and visit references. Set/update self.origin, and optionally self.origin_url, self.visit_date. """ self.origin = Origin(url=self.origin_url) def prepare(self) -> None: """Second step executed by the loader to prepare some state needed by the loader. """ # Set here to allow multiple calls to load on the same loader instance self._latest_heads = [] latest_snapshot = snapshot_get_latest(self.storage, self.origin_url) if latest_snapshot: - # TODO: add support for releases - snapshot_branches = [ - branch.target - for branch in latest_snapshot.branches.values() - if branch.target_type == TargetType.REVISION - ] + self._set_latest_heads(latest_snapshot) + + def _set_latest_heads(self, latest_snapshot: Snapshot) -> None: + """ + Looks up the nodeid for all revisions in the snapshot, and adds them to + self._latest_heads. + + This works in two steps: + + 1. Query the revisions with extid_get_from_target, to find nodeids from + revision ids, using the new ExtID architecture + 2. For all revisions that were not found this way, fetch the revision + and look for the nodeid in its metadata. + + This is a temporary process. When we are done migrating away from revision + metadata, step 2 will be removed. + """ + # TODO: add support for releases + snapshot_branches = [ + branch.target + for branch in latest_snapshot.branches.values() + if branch.target_type == TargetType.REVISION + ] + + # Get all ExtIDs for revisions in the latest snapshot + extids = self.storage.extid_get_from_target( + identifiers.ObjectType.REVISION, snapshot_branches + ) - self._latest_heads = [ - hash_to_bytes(revision.metadata["node"]) - for revision in self.storage.revision_get(snapshot_branches) - if revision and revision.metadata + # Filter out extids not specific to Mercurial + extids = [extid for extid in extids if extid.extid_type == EXTID_TYPE] + + if extids: + # Filter out dangling extids, we need to load their target again + revisions_missing = self.storage.revision_missing( + [extid.target.object_id for extid in extids] + ) + extids = [ + extid + for extid in extids + if extid.target.object_id not in revisions_missing ] + # Add the found nodeids to self.latest_heads + self._latest_heads.extend(extid.extid for extid in extids) + + # For each revision without a nodeid, get the revision metadata + # to see if it is found there. + found_revisions = {extid.target.object_id for extid in extids if extid} + revisions_without_extid = list(set(snapshot_branches) - found_revisions) + + self._latest_heads.extend( + hash_to_bytes(revision.metadata["node"]) + for revision in self.storage.revision_get(revisions_without_extid) + if revision and revision.metadata + ) + def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading Returns: a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading. """ if not self.directory: # no local repository self._repo_directory = mkdtemp( prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix=f"-{os.getpid()}", dir=self._temp_directory, ) self.log.debug( f"Cloning {self.origin_url} to {self.directory} " f"with timeout {self._clone_timeout} seconds" ) hgutil.clone(self.origin_url, self._repo_directory, self._clone_timeout) else: # existing local repository # Allow to load on disk repository without cloning # for testing purpose. self._repo_directory = self.directory self._repo = hgutil.repository(self._repo_directory) return False def get_hg_revs_to_load(self) -> Union[HgFilteredSet, HgSpanSet]: """Return the hg revision numbers to load.""" assert self._repo is not None repo: hgutil.Repository = self._repo if self._latest_heads: existing_heads = [] # heads that still exist in the repository for hg_nodeid in self._latest_heads: try: rev = repo[hg_nodeid].rev() existing_heads.append(rev) except KeyError: # the node does not exist anymore pass # select revisions that are not ancestors of heads # and not the heads themselves new_revs = repo.revs("not ::(%ld)", existing_heads) # for now, reload all revisions if there are new commits # otherwise the loader will crash on missing parents # incremental loading will come in next commits if new_revs: return repo.revs("all()") else: return new_revs else: return repo.revs("all()") def store_data(self): """Store fetched data in the database.""" revs = self.get_hg_revs_to_load() if not revs: self._load_status = "uneventful" return assert self._repo is not None repo = self._repo blacklisted_revs: List[int] = [] for rev in revs: if rev in blacklisted_revs: continue try: self.store_revision(repo[rev]) except CorruptedRevision as e: self._visit_status = "partial" self.log.warning("Corrupted revision %s", e) descendents = repo.revs("(%ld)::", [rev]) blacklisted_revs.extend(descendents) branch_by_hg_nodeid: Dict[HgNodeId, bytes] = { hg_nodeid: name for name, hg_nodeid in hgutil.branches(repo).items() } tags_by_name: Dict[bytes, HgNodeId] = repo.tags() tags_by_hg_nodeid: Dict[HgNodeId, bytes] = { hg_nodeid: name for name, hg_nodeid in tags_by_name.items() } snapshot_branches: Dict[bytes, SnapshotBranch] = {} + extids = [] + for hg_nodeid, revision_swhid in self._revision_nodeid_to_swhid.items(): tag_name = tags_by_hg_nodeid.get(hg_nodeid) # tip is listed in the tags by the mercurial api # but its not a tag defined by the user in `.hgtags` if tag_name and tag_name != b"tip": snapshot_branches[tag_name] = SnapshotBranch( target=self.store_release(tag_name, revision_swhid), target_type=TargetType.RELEASE, ) if hg_nodeid in branch_by_hg_nodeid: name = branch_by_hg_nodeid[hg_nodeid] snapshot_branches[name] = SnapshotBranch( target=revision_swhid, target_type=TargetType.REVISION, ) # The tip is mapped to `HEAD` to match # the historical implementation if hg_nodeid == tags_by_name[b"tip"]: snapshot_branches[b"HEAD"] = SnapshotBranch( target=name, target_type=TargetType.ALIAS, ) + # TODO: do not write an ExtID if we got this branch from an ExtID that + # already exists. + # When we are done migrating away from revision metadata, this will + # be as simple as checking if the target is in self._latest_heads + extids.append( + ExtID( + extid_type=EXTID_TYPE, + extid=hg_nodeid, + target=identifiers.CoreSWHID( + object_type=identifiers.ObjectType.REVISION, + object_id=revision_swhid, + ), + ) + ) + snapshot = Snapshot(branches=snapshot_branches) self.storage.snapshot_add([snapshot]) + self.storage.extid_add(extids) + self.flush() self.loaded_snapshot_id = snapshot.id def load_status(self) -> Dict[str, str]: """Detailed loading status. Defaults to logging an eventful load. Returns: a dictionary that is eventually passed back as the task's result to the scheduler, allowing tuning of the task recurrence mechanism. """ return { "status": self._load_status, } def visit_status(self) -> str: """Allow overriding the visit status in case of partial load""" if self._visit_status is not None: return self._visit_status return super().visit_status() def get_revision_id_from_hg_nodeid(self, hg_nodeid: HgNodeId) -> Sha1Git: """Return the swhid of a revision given its hg nodeid. Args: hg_nodeid: the hg nodeid of the revision. Returns: the swhid of the revision. """ return self._revision_nodeid_to_swhid[hg_nodeid] def get_revision_parents(self, rev_ctx: hgutil.BaseContext) -> Tuple[Sha1Git, ...]: """Return the swhids of the parent revisions. Args: hg_nodeid: the hg nodeid of the revision. Returns: the swhids of the parent revisions. """ parents = [] for parent_ctx in rev_ctx.parents(): parent_hg_nodeid = parent_ctx.node() # nullid is the value of a parent that does not exist if parent_hg_nodeid == hgutil.NULLID: continue parents.append(self.get_revision_id_from_hg_nodeid(parent_hg_nodeid)) return tuple(parents) def store_revision(self, rev_ctx: hgutil.BaseContext) -> None: """Store a revision given its hg nodeid. Args: rev_ctx: the he revision context. Returns: the swhid of the stored revision. """ hg_nodeid = rev_ctx.node() root_swhid = self.store_directories(rev_ctx) # `Person.from_fullname` is compatible with mercurial's freeform author # as fullname is what is used in revision hash when available. author = Person.from_fullname(rev_ctx.user()) (timestamp, offset) = rev_ctx.date() # TimestampWithTimezone.from_dict will change name # as it accept more than just dicts rev_date = TimestampWithTimezone.from_dict(int(timestamp)) extra_headers = [ (b"time_offset_seconds", str(offset).encode(),), ] for key, value in rev_ctx.extra().items(): # The default branch is skipped to match # the historical implementation if key == b"branch" and value == b"default": continue # transplant_source is converted to match # the historical implementation if key == b"transplant_source": value = hash_to_bytehex(value) extra_headers.append((key, value)) revision = Revision( author=author, date=rev_date, committer=author, committer_date=rev_date, type=RevisionType.MERCURIAL, directory=root_swhid, message=rev_ctx.description(), metadata={"node": hg_nodeid.hex()}, extra_headers=tuple(extra_headers), synthetic=False, parents=self.get_revision_parents(rev_ctx), ) self._revision_nodeid_to_swhid[hg_nodeid] = revision.id self.storage.revision_add([revision]) def store_release(self, name: bytes, target=Sha1Git) -> Sha1Git: """Store a release given its name and its target. A release correspond to a user defined tag in mercurial. The mercurial api as a `tip` tag that must be ignored. Args: name: name of the release. target: swhid of the target revision. Returns: the swhid of the stored release. """ release = Release( name=name, target=target, target_type=ObjectType.REVISION, message=None, metadata=None, synthetic=False, author=Person(name=None, email=None, fullname=b""), date=None, ) self.storage.release_add([release]) return release.id def store_content(self, rev_ctx: hgutil.BaseContext, file_path: bytes) -> Content: """Store a revision content hg nodeid and file path. Content is a mix of file content at a given revision and its permissions found in the changeset's manifest. Args: rev_ctx: the he revision context. file_path: the hg path of the content. Returns: the swhid of the top level directory. """ hg_nodeid = rev_ctx.node() file_ctx = rev_ctx[file_path] try: file_nodeid = file_ctx.filenode() except hgutil.LookupError: # TODO # Raising CorruptedRevision avoid crashing the whole loading # but can lead to a lot of missing revisions. # SkippedContent could be used but need actual content to calculate its id. # Maybe the hg_nodeid can be used instead. # Another option could be to just ignore the missing content. # This point is left to future commits. raise CorruptedRevision(hg_nodeid) perms = FLAG_PERMS[file_ctx.flags()] # Key is file_nodeid + perms because permissions does not participate # in content hash in hg while it is the case in swh. cache_key = (file_nodeid, perms) sha1_git = self._content_hash_cache.get(cache_key) if sha1_git is None: data = file_ctx.data() content = ModelContent.from_data(data) self.storage.content_add([content]) sha1_git = content.sha1_git self._content_hash_cache[cache_key] = sha1_git # Here we make sure to return only necessary data. return Content({"sha1_git": sha1_git, "perms": perms}) def store_directories(self, rev_ctx: hgutil.BaseContext) -> Sha1Git: """Store a revision directories given its hg nodeid. Mercurial as no directory as in git. A Git like tree must be build from file paths to obtain each directory hash. Args: rev_ctx: the he revision context. Returns: the swhid of the top level directory. """ repo: hgutil.Repository = self._repo # mypy can't infer that repo is not None prev_ctx = repo[self._last_hg_nodeid] # TODO maybe do diff on parents status = prev_ctx.status(rev_ctx) for file_path in status.removed: del self._last_root[file_path] for file_path in status.added: content = self.store_content(rev_ctx, file_path) self._last_root[file_path] = content for file_path in status.modified: content = self.store_content(rev_ctx, file_path) self._last_root[file_path] = content self._last_hg_nodeid = rev_ctx.node() directories: Deque[Directory] = deque([self._last_root]) while directories: directory = directories.pop() self.storage.directory_add([directory.to_model()]) directories.extend( [item for item in directory.values() if isinstance(item, Directory)] ) return self._last_root.hash class HgArchiveLoaderFromDisk(HgLoaderFromDisk): """Mercurial loader for repository wrapped within tarballs.""" def __init__( self, storage: StorageInterface, url: str, visit_date: Optional[datetime] = None, archive_path: str = None, temp_directory: str = "/tmp", max_content_size: Optional[int] = None, ): super().__init__( storage=storage, url=url, visit_date=visit_date, logging_class="swh.loader.mercurial.ArchiveLoaderFromDisk", temp_directory=temp_directory, max_content_size=max_content_size, ) self.archive_extract_temp_dir = None self.archive_path = archive_path def prepare(self): """Extract the archive instead of cloning.""" self.archive_extract_temp_dir = tmp_extract( archive=self.archive_path, dir=self._temp_directory, prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix=f".dump-{os.getpid()}", log=self.log, source=self.origin_url, ) repo_name = os.listdir(self.temp_dir)[0] self.directory = os.path.join(self.archive_extract_temp_dir, repo_name) super().prepare() # Allow direct usage of the loader from the command line with # `python -m swh.loader.mercurial.from_disk $ORIGIN_URL` if __name__ == "__main__": import logging import click logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(process)d %(message)s" ) @click.command() @click.option("--origin-url", help="origin url") @click.option("--hg-directory", help="Path to mercurial repository to load") @click.option("--visit-date", default=None, help="Visit date") def main(origin_url, hg_directory, visit_date): from swh.storage import get_storage storage = get_storage(cls="memory") return HgLoaderFromDisk( storage, origin_url, directory=hg_directory, visit_date=parse_visit_date(visit_date), ).load() main() diff --git a/swh/loader/mercurial/tests/test_from_disk.py b/swh/loader/mercurial/tests/test_from_disk.py index c2c223e..e207f1c 100644 --- a/swh/loader/mercurial/tests/test_from_disk.py +++ b/swh/loader/mercurial/tests/test_from_disk.py @@ -1,300 +1,440 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from hashlib import sha1 import os +import attr +import pytest + from swh.loader.mercurial.utils import parse_visit_date from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.from_disk import Content, DentryPerms from swh.model.hashutil import hash_to_bytes +from swh.model.identifiers import ObjectType from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType +from swh.storage import get_storage from swh.storage.algos.snapshot import snapshot_get_latest from ..from_disk import HgDirectory, HgLoaderFromDisk from .loader_checker import ExpectedSwhids, LoaderChecker VISIT_DATE = parse_visit_date("2016-05-03 15:16:32+00") assert VISIT_DATE is not None def random_content() -> Content: """Create minimal content object.""" data = str(datetime.now()).encode() return Content({"sha1_git": sha1(data).digest(), "perms": DentryPerms.content}) def test_hg_directory_creates_missing_directories(): directory = HgDirectory() directory[b"path/to/some/content"] = random_content() def test_hg_directory_get(): content = random_content() directory = HgDirectory() assert directory.get(b"path/to/content") is None assert directory.get(b"path/to/content", content) == content directory[b"path/to/content"] = content assert directory.get(b"path/to/content") == content def test_hg_directory_deletes_empty_directories(): directory = HgDirectory() content = random_content() directory[b"path/to/content"] = content directory[b"path/to/some/deep/content"] = random_content() del directory[b"path/to/some/deep/content"] assert directory.get(b"path/to/some/deep") is None assert directory.get(b"path/to/some") is None assert directory.get(b"path/to/content") == content def test_hg_directory_when_directory_replaces_file(): directory = HgDirectory() directory[b"path/to/some"] = random_content() directory[b"path/to/some/content"] = random_content() # Those tests assert expectations on repository loading # by reading expected values from associated json files # produced by the `swh-hg-identify` command line utility. # # It has more granularity than historical tests. # Assertions will tell if the error comes from the directories # revisions or release rather than only checking the snapshot. # # With more work it should event be possible to know which part # of an object is faulty. def test_examples(swh_storage, datadir, tmp_path): for archive_name in ("hello", "transplant", "the-sandbox", "example"): archive_path = os.path.join(datadir, f"{archive_name}.tgz") json_path = os.path.join(datadir, f"{archive_name}.json") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) LoaderChecker( loader=HgLoaderFromDisk(swh_storage, repo_url), expected=ExpectedSwhids.load(json_path), ).check() # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. def test_loader_hg_new_visit_no_release(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "the-sandbox" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoaderFromDisk(swh_storage, url=repo_url) assert loader.load() == {"status": "eventful"} tip_revision_develop = "a9c4534552df370f43f0ef97146f393ef2f2a08c" tip_revision_default = "70e750bb046101fdced06f428e73fee471509c56" expected_snapshot = Snapshot( id=hash_to_bytes("3b8fe58e467deb7597b12a5fd3b2c096b8c02028"), branches={ b"develop": SnapshotBranch( target=hash_to_bytes(tip_revision_develop), target_type=TargetType.REVISION, ), b"default": SnapshotBranch( target=hash_to_bytes(tip_revision_default), target_type=TargetType.REVISION, ), b"HEAD": SnapshotBranch(target=b"develop", target_type=TargetType.ALIAS,), }, ) assert_last_visit_matches( loader.storage, repo_url, status="full", type="hg", snapshot=expected_snapshot.id, ) check_snapshot(expected_snapshot, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 2, "directory": 3, "origin": 1, "origin_visit": 1, "release": 0, "revision": 58, "skipped_content": 0, "snapshot": 1, } # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. def test_loader_hg_new_visit_with_release(swh_storage, datadir, tmp_path): """Eventful visit with release should yield 1 snapshot""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,) actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} # then stats = get_stats(loader.storage) assert stats == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 1, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } # cf. test_loader.org for explaining from where those hashes tip_release = hash_to_bytes("515c4d72e089404356d0f4b39d60f948b8999140") release = loader.storage.release_get([tip_release])[0] assert release is not None tip_revision_default = hash_to_bytes("c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27") revision = loader.storage.revision_get([tip_revision_default])[0] assert revision is not None expected_snapshot = Snapshot( id=hash_to_bytes("d35668e02e2ba4321dc951cd308cf883786f918a"), branches={ b"default": SnapshotBranch( target=tip_revision_default, target_type=TargetType.REVISION, ), b"0.1": SnapshotBranch(target=tip_release, target_type=TargetType.RELEASE,), b"HEAD": SnapshotBranch(target=b"default", target_type=TargetType.ALIAS,), }, ) check_snapshot(expected_snapshot, loader.storage) assert_last_visit_matches( loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full", snapshot=expected_snapshot.id, ) # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path): """Visit a mercurial repository visit transplant operations within should yield a snapshot as well. """ archive_name = "transplant" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,) # load hg repository actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} # collect swh revisions assert_last_visit_matches( loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" ) revisions = [] snapshot = snapshot_get_latest(loader.storage, repo_url) for branch in snapshot.branches.values(): if branch.target_type.value != "revision": continue revisions.append(branch.target) # extract original changesets info and the transplant sources hg_changesets = set() transplant_sources = set() for rev in loader.storage.revision_log(revisions): hg_changesets.add(rev["metadata"]["node"]) for k, v in rev["extra_headers"]: if k == b"transplant_source": transplant_sources.add(v.decode("ascii")) # check extracted data are valid assert len(hg_changesets) > 0 assert len(transplant_sources) > 0 assert transplant_sources.issubset(hg_changesets) -def test_load_unchanged_repo_should_be_uneventfull(swh_storage, datadir, tmp_path): +def _partial_copy_storage( + old_storage, origin_url: str, mechanism: str, copy_revisions: bool +): + """Create a new storage, and only copy ExtIDs or head revisions to it.""" + new_storage = get_storage(cls="memory") + snapshot = snapshot_get_latest(old_storage, origin_url) + assert snapshot + heads = [branch.target for branch in snapshot.branches.values()] + + if mechanism == "extid": + extids = old_storage.extid_get_from_target(ObjectType.REVISION, heads) + new_storage.extid_add(extids) + if copy_revisions: + # copy revisions, but erase their metadata to make sure the loader doesn't + # fallback to revision.metadata["nodeid"] + revisions = [ + attr.evolve(rev, metadata={}) + for rev in old_storage.revision_get(heads) + if rev + ] + new_storage.revision_add(revisions) + + elif mechanism == "revision metadata": + assert ( + copy_revisions + ), "copy_revisions must be True if mechanism='revision metadata'" + revisions = [rev for rev in old_storage.revision_get(heads) if rev] + new_storage.revision_add(revisions) + + else: + assert mechanism == "same storage" + return old_storage + + # copy origin, visit, status + new_storage.origin_add(old_storage.origin_get([origin_url])) + visit = old_storage.origin_visit_get_latest(origin_url) + new_storage.origin_visit_add([visit]) + statuses = old_storage.origin_visit_status_get(origin_url, visit.visit).results + new_storage.origin_visit_status_add(statuses) + new_storage.snapshot_add([snapshot]) + + return new_storage + + +@pytest.mark.parametrize("mechanism", ("extid", "revision metadata", "same storage")) +def test_load_unchanged_repo_should_be_uneventful( + swh_storage, datadir, tmp_path, mechanism +): + """Checks the loader can find which revisions it already loaded, using either + ExtIDs or revision metadata.""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_path = repo_url.replace("file://", "") loader = HgLoaderFromDisk(swh_storage, repo_path) assert loader.load() == {"status": "eventful"} assert get_stats(loader.storage) == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 1, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } + old_storage = swh_storage + + # Create a new storage, and only copy ExtIDs or head revisions to it. + # This should be enough for the loader to know revisions were already loaded + new_storage = _partial_copy_storage( + old_storage, repo_path, mechanism=mechanism, copy_revisions=True + ) + + # Create a new loader (to start with a clean slate, eg. remove the caches), + # with the new, partial, storage + loader = HgLoaderFromDisk(new_storage, repo_path) assert loader.load() == {"status": "uneventful"} + + if mechanism == "same storage": + # Should have all the objects + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 2, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 1, + } + else: + # Should have only the objects we directly inserted from the test, plus + # a new visit + assert get_stats(loader.storage) == { + "content": 0, + "directory": 0, + "origin": 1, + "origin_visit": 2, + "release": 0, + "revision": 1, + "skipped_content": 0, + "snapshot": 1, + } + + +def test_load_unchanged_repo__dangling_extid(swh_storage, datadir, tmp_path): + """Checks the loader will load revisions targeted by an ExtID if the + revisions are missing from the storage""" + archive_name = "hello" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + repo_path = repo_url.replace("file://", "") + + loader = HgLoaderFromDisk(swh_storage, repo_path) + + assert loader.load() == {"status": "eventful"} + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 1, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 1, + } + + old_storage = swh_storage + + # Create a new storage, and only copy ExtIDs or head revisions to it. + # This should be enough for the loader to know revisions were already loaded + new_storage = _partial_copy_storage( + old_storage, repo_path, mechanism="extid", copy_revisions=False + ) + + # Create a new loader (to start with a clean slate, eg. remove the caches), + # with the new, partial, storage + loader = HgLoaderFromDisk(new_storage, repo_path) + + assert get_stats(loader.storage) == { + "content": 0, + "directory": 0, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 0, + "skipped_content": 0, + "snapshot": 1, + } + + assert loader.load() == {"status": "eventful"} + assert get_stats(loader.storage) == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 2, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } def test_missing_filelog_should_not_crash(swh_storage, datadir, tmp_path): archive_name = "missing-filelog" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) directory = repo_url.replace("file://", "") loader = HgLoaderFromDisk( storage=swh_storage, url=repo_url, directory=directory, # specify directory to avoid clone visit_date=VISIT_DATE, ) actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} assert_last_visit_matches(swh_storage, repo_url, status="partial", type="hg")