diff --git a/swh/loader/mercurial/identify.py b/swh/loader/mercurial/identify.py index 36fbd39..ebe9707 100644 --- a/swh/loader/mercurial/identify.py +++ b/swh/loader/mercurial/identify.py @@ -1,550 +1,551 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from codecs import escape_decode # type: ignore import json from pathlib import Path import re import subprocess from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union # WARNING: do not import unnecessary things here to keep cli startup time under # control import click from swh.loader.mercurial.utils import get_minimum_env from swh.model.cli import identify_object +from swh.model.git_objects import normalize_timestamp from swh.model.hashutil import hash_to_bytehex -from swh.model.identifiers import CoreSWHID, ObjectType, normalize_timestamp from swh.model.model import RevisionType +from swh.model.swhids import CoreSWHID, ObjectType TAG_PATTERN = re.compile(b"([0-9A-Fa-f]{40}) +(.+)") class HgAuthor(NamedTuple): """Represent a Mercurial revision author.""" fullname: bytes """full name of the author""" name: Optional[bytes] """name of the author""" email: Optional[bytes] """email of the author""" @staticmethod def from_bytes(data: bytes) -> "HgAuthor": """Convert bytes to an HgAuthor named tuple. Expected format: "name " """ from swh.loader.mercurial.converters import parse_author result = parse_author(data) return HgAuthor( fullname=result["fullname"], name=result["name"], email=result["email"] ) def to_dict(self) -> Dict[str, Optional[bytes]]: return {"fullname": self.fullname, "name": self.name, "email": self.email} HG_REVISION_TEMPLATE = "\n".join( [ "node_id:{node}", "author:{author}", "timestamp_offset:{date|json}", "p1:{p1.node}", "p2:{p2.node}", "extras:{join(extras, '\nextras:')}", ] ) # Log template for HgRevision.from_bytes NULL_NODE_ID = b"0" * 40 # Value used when no parent class HgRevision(NamedTuple): """Represent a Mercurial revision.""" node_id: bytes """raw bytes of the revision hash""" author: HgAuthor """author of the revision""" timestamp: bytes """timestamp of the revision""" offset: bytes """offset of the revision""" parents: List[bytes] """hex bytes of the revision's parents""" extras: Dict[bytes, bytes] """metadata of the revision""" description: bytes """description of the revision""" @staticmethod def from_bytes(data: bytes, description: bytes) -> "HgRevision": """Convert bytes to an HgRevision named tuple. Expected data format: ''' node_id:{node} author:{author} timestamp_offset:[{timestamp}, {offset}] p1:{p1} p2:{p2} extras:{key1}={value1} ... extras:{keyn}={value} ''' """ lines = data.split(b"\n") tuples = [line.split(b":", 1) for line in lines] fields: Dict[str, Any] = { "parents": [], "extras": {}, "description": description, } for key, value in tuples: if key == b"timestamp_offset": timestamp, offset = json.loads(value) fields["timestamp"] = timestamp fields["offset"] = offset elif key in (b"p1", b"p2"): if value != NULL_NODE_ID: fields["parents"].append(value) elif key == b"extras": extra_key, extra_value = value.split(b"=", 1) fields["extras"][extra_key] = extra_value elif key == b"author": fields["author"] = HgAuthor.from_bytes(value) else: fields[key.decode()] = value return HgRevision(**fields) def branch(self) -> bytes: return self.extras.get(b"branch", b"default") def to_dict(self) -> Dict: """Convert a HgRevision to a dict for SWHID computation""" date = normalize_timestamp(int(self.timestamp)) extra_headers = [ (b"time_offset_seconds", str(self.offset).encode("utf-8")), ] for key, value in self.extras.items(): if key == b"branch" and value == b"default": # branch default is skipped to match historical implementation continue if key == b"transplant_source": # transplant_source is converted to hex # to match historical implementation value = hash_to_bytehex(escape_decode(value)[0]) extra_headers.append((key, value)) author = self.author.to_dict() return { "author": author, "date": date, "committer": author, "committer_date": date, "type": RevisionType.MERCURIAL.value, "message": self.description, "metadata": {"node": self.node_id}, "extra_headers": tuple(extra_headers), "synthetic": False, "parents": self.parents, } class HgBranch(NamedTuple): """Represent a Mercurial branch.""" name: bytes """name of the branch""" node_id: bytes """row bytes of the target revision hash""" class HgTag(NamedTuple): """Represent a Mercurial tag.""" name: bytes """name of the tag""" node_id: bytes """hex bytes of the target revision""" class Hg: """Provide methods to extract data from a Mercurial repository.""" def __init__(self, repository_root: Path) -> None: self._root = repository_root def _output(self, *args) -> bytes: """Return the outpout of a `hg` call.""" return subprocess.check_output( ["hg", *args], cwd=self._root, env=get_minimum_env() ) def _call(self, *args) -> None: """Perform a `hg` call.""" subprocess.check_call( ["hg", *args], cwd=self._root, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=get_minimum_env(), ) def root(self) -> Path: """Return the root of the Mercurial repository.""" return self._root def log(self, rev: Optional[Union[bytes, str]] = None) -> List[HgRevision]: """Return the specified revisions of the Mercurial repository. Mercurial revsets are supported. (See `hg help revsets`) If no revision range is specified, return all revisions". """ if rev: node_ids = self._output("log", "-r", rev, "-T", "{node}\n").splitlines() else: node_ids = self._output("log", "-T", "{node}\n").splitlines() revisions = [self._revision(node_id) for node_id in reversed(node_ids)] return revisions def _revision(self, revision: bytes) -> HgRevision: data = self._output("log", "-r", revision, "-T", HG_REVISION_TEMPLATE) # hg log strips the description so the raw description has to be taken # from debugdata # The description follows some metadata and is separated from them # by an empty line _, desc = self._output("debugdata", "-c", revision).split(b"\n\n", 1) return HgRevision.from_bytes(data, desc) def up(self, rev: bytes) -> None: """Update the repository working directory to the specified revision.""" self._call("up", rev) def branches(self) -> List[HgBranch]: """List the repository named branches.""" output = self._output("branches", "-T", "{branch}\n{node}\n\n").strip() branches = [] for block in output.split(b"\n\n"): name, node_id = block.splitlines() branches.append(HgBranch(name=name, node_id=node_id)) return branches def tip(self) -> HgRevision: """Return the `tip` node-id.""" return self.log("tip")[0] def tags(self) -> List[HgTag]: """Return the repository's tags as defined in the `.hgtags` file. `.hgtags` being like any other repository's tracked file, its content can vary from revision to revision. The returned value therefore depends on the current revision of the repository. """ hgtags = self._root / ".hgtags" tags = {} if hgtags.is_file(): for line in hgtags.read_bytes().splitlines(): match = TAG_PATTERN.match(line) if match is None: continue node_id, name = match.groups() tags[node_id] = name return [HgTag(name=name, node_id=node_id) for node_id, name in tags.items()] @click.group() @click.option( "--directory", "-d", help=("Path to the Mercurial repository. If unset, the current directory is used"), ) @click.pass_context def main(ctx, directory=None): """Compute the Software Heritage persistent identifier (SWHID) for the given source code object(s). For more details about SWHIDs see: https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html """ # ensure that ctx.obj exists and is a dict (in case `cli()` is called # by means other than the `if` block below) ctx.ensure_object(dict) root = Path(directory) if directory else Path() if not root.exists(): raise IOError(f"{root!r} does not exists") ctx.obj["HG_ROOT"] = root def identify_directory(path: Path) -> CoreSWHID: """Return the SWHID of the given path.""" return CoreSWHID.from_string( identify_object( "directory", follow_symlinks=True, exclude_patterns=[b".hg"], obj=str(path) ) ) class RevisionIdentity(NamedTuple): """Represent a swh revision identity.""" swhid: CoreSWHID """SWH Identifier of the revision.""" node_id: bytes """node_id hex bytes""" directory_swhid: CoreSWHID """SWH Identifier of the directory""" def dir_uri(self) -> str: """Return the SWHID uri of the revision's directory.""" return f"{self.directory_swhid}\t{self.node_id.decode()}" def __str__(self) -> str: """Return the string representation of a RevisionIdentity.""" return f"{self.swhid}\t{self.node_id.decode()}" def identify_revision( hg: Hg, rev: Optional[bytes] = None, node_id_2_swhid: Optional[Dict[bytes, CoreSWHID]] = None, ) -> Iterator[RevisionIdentity]: """Return the repository revision identities. Args: hg: A `Hg` repository instance rev: An optional revision or Mercurial revsets (See `hg help revsets`) If not provided all the repository revisions will be computed. node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs It will be updated in place with new mappings. """ from swh.model.model import Revision if node_id_2_swhid is None: node_id_2_swhid = {} for revision in hg.log(rev): data = revision.to_dict() hg.up(revision.node_id) directory_swhid = identify_directory(hg.root()) data["directory"] = directory_swhid.object_id parents = [] for parent in data["parents"]: if parent not in node_id_2_swhid: parent_revision = next(identify_revision(hg, parent, node_id_2_swhid)) node_id_2_swhid[parent] = parent_revision.swhid assert node_id_2_swhid[parent].object_type == ObjectType.REVISION parents.append(node_id_2_swhid[parent].object_id) data["parents"] = parents revision_swhid = Revision.from_dict(data).swhid() node_id_2_swhid[revision.node_id] = revision_swhid yield RevisionIdentity( swhid=revision_swhid, node_id=revision.node_id, directory_swhid=directory_swhid, ) class ReleaseIdentity(NamedTuple): """Represent a swh release identity.""" swhid: CoreSWHID """SWH Identifier of the release.""" node_id: bytes """node_id hex bytes""" name: bytes """name of the release""" def __str__(self) -> str: """Return the string representation of a ReleaseIdentity.""" return f"{self.swhid}\t{self.name.decode()}" def identify_release( hg: Hg, node_id_2_swhid: Optional[Dict[bytes, CoreSWHID]] = None, ) -> Iterator[ReleaseIdentity]: """Return the repository's release identities. Args: hg: A `Hg` repository instance node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs If not provided it will be computed using `identify_revision`. """ from swh.model.model import ObjectType as ModelObjectType from swh.model.model import Release if node_id_2_swhid is None: node_id_2_swhid = { revision.node_id: revision.swhid for revision in identify_revision(hg) } for tag in hg.tags(): assert node_id_2_swhid[tag.node_id].object_type == ObjectType.REVISION data = { "name": tag.name, "target": node_id_2_swhid[tag.node_id].object_id, "target_type": ModelObjectType.REVISION.value, "message": None, "metadata": None, "synthetic": False, "author": {"name": None, "email": None, "fullname": b""}, "date": None, } release_swhid = Release.from_dict(data).swhid() yield ReleaseIdentity( swhid=release_swhid, node_id=tag.node_id, name=tag.name, ) def identify_snapshot( hg: Hg, node_id_2_swhid: Optional[Dict[bytes, CoreSWHID]] = None, releases: Optional[List[ReleaseIdentity]] = None, ) -> CoreSWHID: """Return the repository snapshot identity. Args: hg: A `Hg` repository instance node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs If not provided it will be computed using `identify_revision`. release: an optional list of `ReleaseIdentity`. If not provided it will be computed using `identify_release`. """ from swh.model.model import Snapshot, TargetType if node_id_2_swhid is None: node_id_2_swhid = { revision.node_id: revision.swhid for revision in identify_revision(hg) } if releases is None: releases = [release for release in identify_release(hg, node_id_2_swhid)] branches = {} tip = hg.tip() branches[b"HEAD"] = { "target": tip.branch(), "target_type": TargetType.ALIAS.value, } for branch in hg.branches(): assert node_id_2_swhid[branch.node_id].object_type == ObjectType.REVISION branches[branch.name] = { "target": node_id_2_swhid[branch.node_id].object_id, "target_type": TargetType.REVISION.value, } for release in releases: assert release.swhid.object_type == ObjectType.RELEASE branches[release.name] = { "target": release.swhid.object_id, "target_type": TargetType.RELEASE.value, } return Snapshot.from_dict({"branches": branches}).swhid() @main.command() @click.argument("rev", required=False) @click.pass_context def revision(ctx, rev): """Compute the SWHID of a given revision. If specified REV allow to select a single or multiple revisions (using the Mercurial revsets language: `hg help revsets`) """ hg = Hg(ctx.obj["HG_ROOT"]) for identity in identify_revision(hg, rev): click.echo(identity) @main.command() @click.pass_context def snapshot(ctx): """Compute the SWHID of the snapshot.""" root = ctx.obj["HG_ROOT"] hg = Hg(root) snapshot_swhid = identify_snapshot(hg) click.echo(f"{snapshot_swhid}\t{root}") @main.command() @click.pass_context def all(ctx): """Compute the SWHID of all the repository objects.""" root = ctx.obj["HG_ROOT"] hg = Hg(root) dir_uris = [] rev_uris = [] rel_uris = [] node_id_2_swhid = {} for revision in identify_revision(hg): dir_uris.append(revision.dir_uri()) rev_uris.append(str(revision)) node_id_2_swhid[revision.node_id] = revision.swhid releases = [] for release in identify_release(hg, node_id_2_swhid): rel_uris.append(str(release)) releases.append(release) snapshot_swhid = identify_snapshot(hg, node_id_2_swhid, releases) for uri in dir_uris + rev_uris + rel_uris: click.echo(uri) click.echo(f"{snapshot_swhid}\t{root}") if __name__ == "__main__": main() diff --git a/swh/loader/mercurial/loader.py b/swh/loader/mercurial/loader.py index 3d13b57..44705c2 100644 --- a/swh/loader/mercurial/loader.py +++ b/swh/loader/mercurial/loader.py @@ -1,823 +1,823 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Loaders for ingesting Mercurial repositories either local from disk, or remote, see :class:`swh.loader.mercurial.loader.HgLoader` or from an archive, see :class:`swh.loader.mercurial.from_disk.HgArchiveLoader`. """ from collections import deque from datetime import datetime import os from shutil import rmtree from tempfile import mkdtemp from typing import Deque, Dict, Iterator, List, Optional, Set, Tuple, TypeVar, Union from swh.core.utils import grouper from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders from swh.loader.mercurial.utils import get_minimum_env -from swh.model import identifiers +from swh.model import swhids from swh.model.from_disk import Content, DentryPerms, Directory from swh.model.hashutil import hash_to_bytehex from swh.model.model import ( ExtID, ObjectType, Origin, Person, Release, Revision, RevisionType, Sha1Git, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) from swh.model.model import Content as ModelContent from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface from . import hgutil from .archive_extract import tmp_extract from .hgutil import NULLID, HgFilteredSet, HgNodeId, HgSpanSet FLAG_PERMS = { b"l": DentryPerms.symlink, b"x": DentryPerms.executable_content, b"": DentryPerms.content, } # type: Dict[bytes, DentryPerms] TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial.loader" EXTID_TYPE = "hg-nodeid" EXTID_VERSION: int = 1 T = TypeVar("T") class CorruptedRevision(ValueError): """Raised when a revision is corrupted.""" def __init__(self, hg_nodeid: HgNodeId) -> None: super().__init__(hg_nodeid.hex()) self.hg_nodeid = hg_nodeid class HgDirectory(Directory): """A more practical directory. - creates missing parent directories - removes empty directories """ def __setitem__(self, path: bytes, value: Union[Content, "HgDirectory"]) -> None: if b"/" in path: head, tail = path.split(b"/", 1) directory = self.get(head) if directory is None or isinstance(directory, Content): directory = HgDirectory() self[head] = directory directory[tail] = value else: super().__setitem__(path, value) def __delitem__(self, path: bytes) -> None: super().__delitem__(path) while b"/" in path: # remove empty parent directories path = path.rsplit(b"/", 1)[0] if len(self[path]) == 0: super().__delitem__(path) else: break def get( self, path: bytes, default: Optional[T] = None ) -> Optional[Union[Content, "HgDirectory", T]]: # TODO move to swh.model.from_disk.Directory try: return self[path] except KeyError: return default class HgLoader(BaseLoader): """Load a mercurial repository from a local repository. Mercurial's branching model is more complete than Git's; it allows for multiple heads per branch, closed heads and bookmarks. The following mapping is used to represent the branching state of a Mercurial project in a given snapshot: - `HEAD` (optional) either the node pointed by the `@` bookmark or the tip of the `default` branch - `branch-tip/` (required) the first head of the branch, sorted by nodeid if there are multiple heads. - `bookmarks/` (optional) holds the bookmarks mapping if any - `branch-heads//0..n` (optional) for any branch with multiple open heads, list all *open* heads - `branch-closed-heads//0..n` (optional) for any branch with at least one closed head, list all *closed* heads - `tags/` (optional) record tags The format is not ambiguous regardless of branch name since we know it ends with a `/`, as long as we have a stable sorting of the heads (we sort by nodeid). There may be some overlap between the refs, but it's simpler not to try to figure out de-duplication. However, to reduce the redundancy between snapshot branches in the most common case, when a branch has a single open head, it will only be referenced as `branch-tip/`. The `branch-heads/` hierarchy only appears when a branch has multiple open heads, which we consistently sort by increasing nodeid. The `branch-closed-heads/` hierarchy is also sorted by increasing nodeid. """ CONFIG_BASE_FILENAME = "loader/mercurial" visit_type = "hg" def __init__( self, storage: StorageInterface, url: str, directory: Optional[str] = None, logging_class: str = "swh.loader.mercurial.loader.HgLoader", visit_date: Optional[datetime] = None, temp_directory: str = "/tmp", clone_timeout_seconds: int = 7200, content_cache_size: int = 10_000, max_content_size: Optional[int] = None, ): """Initialize the loader. Args: url: url of the repository. directory: directory of the local repository. logging_class: class of the loader logger. visit_date: visit date of the repository config: loader configuration """ super().__init__( storage=storage, logging_class=logging_class, max_content_size=max_content_size, ) self._temp_directory = temp_directory self._clone_timeout = clone_timeout_seconds self.origin_url = url self.visit_date = visit_date self.directory = directory self._repo: Optional[hgutil.Repository] = None self._revision_nodeid_to_sha1git: Dict[HgNodeId, Sha1Git] = {} self._repo_directory: Optional[str] = None # keeps the last processed hg nodeid # it is used for differential tree update by store_directories # NULLID is the parent of the first revision self._last_hg_nodeid = hgutil.NULLID # keeps the last revision tree # it is used for differential tree update by store_directories self._last_root = HgDirectory() # Cache the content hash across revisions to avoid recalculation. self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict( content_cache_size, ) # hg node id of the latest snapshot branch heads # used to find what are the new revisions since last snapshot self._latest_heads: List[HgNodeId] = [] # hg node ids of all the tags recorded on previous runs # Used to compute which tags need to be added, even across incremental runs # that might separate a changeset introducing a tag from its target. self._saved_tags: Set[HgNodeId] = set() self._load_status = "eventful" # If set, will override the default value self._visit_status = None def pre_cleanup(self) -> None: """As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues. """ clean_dangling_folders( self._temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log, ) self.old_environ = os.environ.copy() os.environ.clear() os.environ.update(get_minimum_env()) def cleanup(self) -> None: """Last step executed by the loader.""" os.environ.clear() os.environ.update(self.old_environ) # Don't cleanup if loading from a local directory was_remote = self.directory is None if was_remote and self._repo_directory and os.path.exists(self._repo_directory): self.log.debug(f"Cleanup up repository {self._repo_directory}") rmtree(self._repo_directory) def prepare_origin_visit(self) -> None: """First step executed by the loader to prepare origin and visit references. Set/update self.origin, and optionally self.origin_url, self.visit_date. """ self.origin = Origin(url=self.origin_url) def prepare(self) -> None: """Second step executed by the loader to prepare some state needed by the loader. """ # Set here to allow multiple calls to load on the same loader instance self._latest_heads = [] latest_snapshot = snapshot_get_latest(self.storage, self.origin_url) if latest_snapshot: self._set_recorded_state(latest_snapshot) def _set_recorded_state(self, latest_snapshot: Snapshot) -> None: """ Looks up the nodeid for all revisions in the snapshot via extid_get_from_target, and adds them to `self._latest_heads`. Also looks up the currently saved releases ("tags" in Mercurial speak). The tags are all listed for easy comparison at the end, while only the latest heads are needed for revisions. """ heads = [] tags = [] for branch in latest_snapshot.branches.values(): if branch.target_type == TargetType.REVISION: heads.append(branch.target) elif branch.target_type == TargetType.RELEASE: tags.append(branch.target) self._latest_heads.extend( HgNodeId(extid.extid) for extid in self._get_extids_for_targets(heads) ) self._saved_tags.update( HgNodeId(extid.extid) for extid in self._get_extids_for_targets(tags) ) def _get_extids_for_targets(self, targets: List[Sha1Git]) -> List[ExtID]: """Get all Mercurial ExtIDs for the targets in the latest snapshot""" extids = [] for extid in self.storage.extid_get_from_target( - identifiers.ObjectType.REVISION, + swhids.ObjectType.REVISION, targets, extid_type=EXTID_TYPE, extid_version=EXTID_VERSION, ): extids.append(extid) self._revision_nodeid_to_sha1git[ HgNodeId(extid.extid) ] = extid.target.object_id if extids: # Filter out dangling extids, we need to load their target again revisions_missing = self.storage.revision_missing( [extid.target.object_id for extid in extids] ) extids = [ extid for extid in extids if extid.target.object_id not in revisions_missing ] return extids def _get_extids_for_hgnodes(self, hgnode_ids: List[HgNodeId]) -> List[ExtID]: """Get all Mercurial ExtIDs for the mercurial nodes in the list which point to a known revision. """ extids = [] for group_ids in grouper(hgnode_ids, n=1000): for extid in self.storage.extid_get_from_extid( EXTID_TYPE, group_ids, version=EXTID_VERSION ): extids.append(extid) self._revision_nodeid_to_sha1git[ HgNodeId(extid.extid) ] = extid.target.object_id if extids: # Filter out dangling extids, we need to load their target again revisions_missing = self.storage.revision_missing( [extid.target.object_id for extid in extids] ) extids = [ extid for extid in extids if extid.target.object_id not in revisions_missing ] return extids def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading Returns: a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading. """ if not self.directory: # no local repository self._repo_directory = mkdtemp( prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix=f"-{os.getpid()}", dir=self._temp_directory, ) self.log.debug( f"Cloning {self.origin_url} to {self._repo_directory} " f"with timeout {self._clone_timeout} seconds" ) hgutil.clone(self.origin_url, self._repo_directory, self._clone_timeout) else: # existing local repository # Allow to load on disk repository without cloning # for testing purpose. self._repo_directory = self.directory self._repo = hgutil.repository(self._repo_directory) return False def _new_revs(self, heads: List[HgNodeId]) -> Union[HgFilteredSet, HgSpanSet]: """Return unseen revisions. That is, filter out revisions that are not ancestors of heads""" assert self._repo is not None existing_heads = [] for hg_nodeid in heads: try: rev = self._repo[hg_nodeid].rev() existing_heads.append(rev) except KeyError: # the node does not exist anymore pass # select revisions that are not ancestors of heads # and not the heads themselves new_revs = self._repo.revs("not ::(%ld)", existing_heads) if new_revs: self.log.info("New revisions found: %d", len(new_revs)) return new_revs def get_hg_revs_to_load(self) -> Iterator[int]: """Yield hg revision numbers to load. """ assert self._repo is not None repo: hgutil.Repository = self._repo seen_revs: Set[int] = set() # 1. use snapshot to reuse existing seen heads from it if self._latest_heads: for rev in self._new_revs(self._latest_heads): seen_revs.add(rev) yield rev # 2. Then filter out remaining revisions through the overall extid mappings # across hg origins revs_left = repo.revs("all() - ::(%ld)", seen_revs) hg_nodeids = [repo[nodeid].node() for nodeid in revs_left] if hg_nodeids: # Don't filter revs if there are none, otherwise it'll load # everything yield from self._new_revs( [ HgNodeId(extid.extid) for extid in self._get_extids_for_hgnodes(hg_nodeids) ] ) def store_data(self): """Store fetched data in the database.""" revs = self.get_hg_revs_to_load() length_ingested_revs = 0 assert self._repo is not None repo = self._repo ignored_revs: Set[int] = set() for rev in revs: if rev in ignored_revs: continue try: self.store_revision(repo[rev]) length_ingested_revs += 1 except CorruptedRevision as e: self._visit_status = "partial" self.log.warning("Corrupted revision %s", e) descendents = repo.revs("(%ld)::", [rev]) ignored_revs.update(descendents) if length_ingested_revs == 0: # no new revision ingested, so uneventful # still we'll make a snapshot, so we continue self._load_status = "uneventful" branching_info = hgutil.branching_info(repo, ignored_revs) tags_by_name: Dict[bytes, HgNodeId] = repo.tags() snapshot_branches: Dict[bytes, SnapshotBranch] = {} for tag_name, hg_nodeid in tags_by_name.items(): if tag_name == b"tip": # `tip` is listed in the tags by the Mercurial API but its not a tag # defined by the user in `.hgtags`. continue if hg_nodeid not in self._saved_tags: label = b"tags/%s" % tag_name target = self.get_revision_id_from_hg_nodeid(hg_nodeid) snapshot_branches[label] = SnapshotBranch( target=self.store_release(tag_name, target), target_type=TargetType.RELEASE, ) for branch_name, node_id in branching_info.tips.items(): name = b"branch-tip/%s" % branch_name target = self.get_revision_id_from_hg_nodeid(node_id) snapshot_branches[name] = SnapshotBranch( target=target, target_type=TargetType.REVISION ) for bookmark_name, node_id in branching_info.bookmarks.items(): name = b"bookmarks/%s" % bookmark_name target = self.get_revision_id_from_hg_nodeid(node_id) snapshot_branches[name] = SnapshotBranch( target=target, target_type=TargetType.REVISION ) for branch_name, branch_heads in branching_info.open_heads.items(): for index, head in enumerate(branch_heads): name = b"branch-heads/%s/%d" % (branch_name, index) target = self.get_revision_id_from_hg_nodeid(head) snapshot_branches[name] = SnapshotBranch( target=target, target_type=TargetType.REVISION ) for branch_name, closed_heads in branching_info.closed_heads.items(): for index, head in enumerate(closed_heads): name = b"branch-closed-heads/%s/%d" % (branch_name, index) target = self.get_revision_id_from_hg_nodeid(head) snapshot_branches[name] = SnapshotBranch( target=target, target_type=TargetType.REVISION ) # If the repo is broken enough or if it has none of the "normal" default # mechanisms, we ignore `HEAD`. default_branch_alias = branching_info.default_branch_alias if default_branch_alias is not None: snapshot_branches[b"HEAD"] = SnapshotBranch( target=default_branch_alias, target_type=TargetType.ALIAS, ) snapshot = Snapshot(branches=snapshot_branches) self.storage.snapshot_add([snapshot]) self.flush() self.loaded_snapshot_id = snapshot.id def load_status(self) -> Dict[str, str]: """Detailed loading status. Defaults to logging an eventful load. Returns: a dictionary that is eventually passed back as the task's result to the scheduler, allowing tuning of the task recurrence mechanism. """ return { "status": self._load_status, } def visit_status(self) -> str: """Allow overriding the visit status in case of partial load""" if self._visit_status is not None: return self._visit_status return super().visit_status() def get_revision_id_from_hg_nodeid(self, hg_nodeid: HgNodeId) -> Sha1Git: """Return the git sha1 of a revision given its hg nodeid. Args: hg_nodeid: the hg nodeid of the revision. Returns: the sha1_git of the revision. """ from_cache = self._revision_nodeid_to_sha1git.get(hg_nodeid) if from_cache is not None: return from_cache # The parent was not loaded in this run, get it from storage from_storage = [ extid for extid in self.storage.extid_get_from_extid( EXTID_TYPE, ids=[hg_nodeid], version=EXTID_VERSION ) ] msg = "Expected 1 match from storage for hg node %r, got %d" assert len(from_storage) == 1, msg % (hg_nodeid.hex(), len(from_storage)) return from_storage[0].target.object_id def get_revision_parents(self, rev_ctx: hgutil.BaseContext) -> Tuple[Sha1Git, ...]: """Return the git sha1 of the parent revisions. Args: hg_nodeid: the hg nodeid of the revision. Returns: the sha1_git of the parent revisions. """ parents = [] for parent_ctx in rev_ctx.parents(): parent_hg_nodeid = parent_ctx.node() # nullid is the value of a parent that does not exist if parent_hg_nodeid == hgutil.NULLID: continue revision_id = self.get_revision_id_from_hg_nodeid(parent_hg_nodeid) parents.append(revision_id) return tuple(parents) def store_revision(self, rev_ctx: hgutil.BaseContext) -> None: """Store a revision given its hg nodeid. Args: rev_ctx: the he revision context. Returns: the sha1_git of the stored revision. """ hg_nodeid = rev_ctx.node() root_sha1git = self.store_directories(rev_ctx) # `Person.from_fullname` is compatible with mercurial's freeform author # as fullname is what is used in revision hash when available. author = Person.from_fullname(rev_ctx.user()) (timestamp, offset) = rev_ctx.date() # TimestampWithTimezone.from_dict will change name # as it accept more than just dicts rev_date = TimestampWithTimezone.from_dict(int(timestamp)) extra_headers = [ (b"time_offset_seconds", str(offset).encode(),), ] for key, value in rev_ctx.extra().items(): # The default branch is skipped to match # the historical implementation if key == b"branch" and value == b"default": continue # transplant_source is converted to match # the historical implementation if key == b"transplant_source": value = hash_to_bytehex(value) extra_headers.append((key, value)) revision = Revision( author=author, date=rev_date, committer=author, committer_date=rev_date, type=RevisionType.MERCURIAL, directory=root_sha1git, message=rev_ctx.description(), extra_headers=tuple(extra_headers), synthetic=False, parents=self.get_revision_parents(rev_ctx), ) self._revision_nodeid_to_sha1git[hg_nodeid] = revision.id self.storage.revision_add([revision]) # Save the mapping from SWHID to hg id - revision_swhid = identifiers.CoreSWHID( - object_type=identifiers.ObjectType.REVISION, object_id=revision.id, + revision_swhid = swhids.CoreSWHID( + object_type=swhids.ObjectType.REVISION, object_id=revision.id, ) self.storage.extid_add( [ ExtID( extid_type=EXTID_TYPE, extid_version=EXTID_VERSION, extid=hg_nodeid, target=revision_swhid, ) ] ) def store_release(self, name: bytes, target: Sha1Git) -> Sha1Git: """Store a release given its name and its target. A release correspond to a user defined tag in mercurial. The mercurial api as a `tip` tag that must be ignored. Args: name: name of the release. target: sha1_git of the target revision. Returns: the sha1_git of the stored release. """ release = Release( name=name, target=target, target_type=ObjectType.REVISION, message=None, metadata=None, synthetic=False, author=Person(name=None, email=None, fullname=b""), date=None, ) self.storage.release_add([release]) return release.id def store_content(self, rev_ctx: hgutil.BaseContext, file_path: bytes) -> Content: """Store a revision content hg nodeid and file path. Content is a mix of file content at a given revision and its permissions found in the changeset's manifest. Args: rev_ctx: the he revision context. file_path: the hg path of the content. Returns: the sha1_git of the top level directory. """ hg_nodeid = rev_ctx.node() file_ctx = rev_ctx[file_path] try: file_nodeid = file_ctx.filenode() except hgutil.LookupError: # TODO # Raising CorruptedRevision avoid crashing the whole loading # but can lead to a lot of missing revisions. # SkippedContent could be used but need actual content to calculate its id. # Maybe the hg_nodeid can be used instead. # Another option could be to just ignore the missing content. # This point is left to future commits. # Check for other uses to apply the same logic there. raise CorruptedRevision(hg_nodeid) perms = FLAG_PERMS[file_ctx.flags()] # Key is file_nodeid + perms because permissions does not participate # in content hash in hg while it is the case in swh. cache_key = (file_nodeid, perms) sha1_git = self._content_hash_cache.get(cache_key) if sha1_git is None: try: data = file_ctx.data() except hgutil.error.RevlogError: # TODO # See above use of `CorruptedRevision` raise CorruptedRevision(hg_nodeid) content = ModelContent.from_data(data) self.storage.content_add([content]) sha1_git = content.sha1_git self._content_hash_cache[cache_key] = sha1_git # Here we make sure to return only necessary data. return Content({"sha1_git": sha1_git, "perms": perms}) def _store_tree(self) -> Sha1Git: """Save the current in-memory tree to storage.""" directories: Deque[Directory] = deque([self._last_root]) while directories: directory = directories.pop() self.storage.directory_add([directory.to_model()]) directories.extend( [item for item in directory.values() if isinstance(item, Directory)] ) return self._last_root.hash def _store_directories_slow(self, rev_ctx: hgutil.BaseContext) -> Sha1Git: """Store a revision directories given its hg nodeid. This is the slow variant: it does not use a diff from the last revision but lists all the files. It is used for the first revision in every run (nullid for non-incremental, any other for incremental runs).""" try: files = rev_ctx.manifest().iterkeys() except hgutil.error.LookupError: raise CorruptedRevision(rev_ctx.node()) for file_path in files: content = self.store_content(rev_ctx, file_path) self._last_root[file_path] = content self._last_hg_nodeid = rev_ctx.node() return self._store_tree() def store_directories(self, rev_ctx: hgutil.BaseContext) -> Sha1Git: """Store a revision directories given its hg nodeid. Mercurial as no directory as in git. A Git like tree must be build from file paths to obtain each directory hash. Args: rev_ctx: the he revision context. Returns: the sha1_git of the top level directory. """ repo: hgutil.Repository = self._repo # mypy can't infer that repo is not None if self._last_hg_nodeid == NULLID: # We need to build the caches from scratch, do a full listing of # that revision. return self._store_directories_slow(rev_ctx) prev_ctx = repo[self._last_hg_nodeid] # TODO maybe do diff on parents try: status = prev_ctx.status(rev_ctx) except hgutil.error.LookupError: raise CorruptedRevision(rev_ctx.node()) for file_path in status.removed: try: del self._last_root[file_path] except KeyError: raise CorruptedRevision(rev_ctx.node()) for file_path in status.added: content = self.store_content(rev_ctx, file_path) self._last_root[file_path] = content for file_path in status.modified: content = self.store_content(rev_ctx, file_path) self._last_root[file_path] = content self._last_hg_nodeid = rev_ctx.node() return self._store_tree() class HgArchiveLoader(HgLoader): """Mercurial loader for repository wrapped within tarballs.""" def __init__( self, storage: StorageInterface, url: str, visit_date: Optional[datetime] = None, archive_path: str = None, temp_directory: str = "/tmp", max_content_size: Optional[int] = None, ): super().__init__( storage=storage, url=url, visit_date=visit_date, logging_class="swh.loader.mercurial.loader.ArchiveLoader", temp_directory=temp_directory, max_content_size=max_content_size, ) self.archive_extract_temp_dir = None self.archive_path = archive_path def prepare(self): """Extract the archive instead of cloning.""" self.archive_extract_temp_dir = tmp_extract( archive=self.archive_path, dir=self._temp_directory, prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix=f".dump-{os.getpid()}", log=self.log, source=self.origin_url, ) repo_name = os.listdir(self.temp_dir)[0] self.directory = os.path.join(self.archive_extract_temp_dir, repo_name) super().prepare() diff --git a/swh/loader/mercurial/tests/test_loader.py b/swh/loader/mercurial/tests/test_loader.py index 2cf7f65..31aa05e 100644 --- a/swh/loader/mercurial/tests/test_loader.py +++ b/swh/loader/mercurial/tests/test_loader.py @@ -1,695 +1,695 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from hashlib import sha1 import os from pathlib import Path import subprocess import unittest import attr import pytest from swh.loader.core.utils import parse_visit_date from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.from_disk import Content, DentryPerms from swh.model.hashutil import hash_to_bytes, hash_to_hex -from swh.model.identifiers import ObjectType from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType +from swh.model.swhids import ObjectType from swh.storage import get_storage from swh.storage.algos.snapshot import snapshot_get_latest from ..loader import EXTID_VERSION, HgDirectory, HgLoader from .loader_checker import ExpectedSwhids, LoaderChecker VISIT_DATE = parse_visit_date("2016-05-03 15:16:32+00") assert VISIT_DATE is not None def random_content() -> Content: """Create minimal content object.""" data = str(datetime.now()).encode() return Content({"sha1_git": sha1(data).digest(), "perms": DentryPerms.content}) def test_hg_directory_creates_missing_directories(): directory = HgDirectory() directory[b"path/to/some/content"] = random_content() def test_hg_directory_get(): content = random_content() directory = HgDirectory() assert directory.get(b"path/to/content") is None assert directory.get(b"path/to/content", content) == content directory[b"path/to/content"] = content assert directory.get(b"path/to/content") == content def test_hg_directory_deletes_empty_directories(): directory = HgDirectory() content = random_content() directory[b"path/to/content"] = content directory[b"path/to/some/deep/content"] = random_content() del directory[b"path/to/some/deep/content"] assert directory.get(b"path/to/some/deep") is None assert directory.get(b"path/to/some") is None assert directory.get(b"path/to/content") == content def test_hg_directory_when_directory_replaces_file(): directory = HgDirectory() directory[b"path/to/some"] = random_content() directory[b"path/to/some/content"] = random_content() # Those tests assert expectations on repository loading # by reading expected values from associated json files # produced by the `swh-hg-identify` command line utility. # # It has more granularity than historical tests. # Assertions will tell if the error comes from the directories # revisions or release rather than only checking the snapshot. # # With more work it should event be possible to know which part # of an object is faulty. @pytest.mark.parametrize( "archive_name", ("hello", "transplant", "the-sandbox", "example") ) def test_examples(swh_storage, datadir, tmp_path, archive_name): archive_path = Path(datadir, f"{archive_name}.tgz") json_path = Path(datadir, f"{archive_name}.json") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) LoaderChecker( loader=HgLoader(swh_storage, repo_url), expected=ExpectedSwhids.load(json_path), ).check() # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoader`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. def test_loader_hg_new_visit_no_release(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "the-sandbox" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoader(swh_storage, url=repo_url) assert loader.load() == {"status": "eventful"} tips = { b"branch-tip/default": "70e750bb046101fdced06f428e73fee471509c56", b"branch-tip/develop": "a9c4534552df370f43f0ef97146f393ef2f2a08c", } closed = { b"feature/fun_time": "4d640e8064fe69b4c851dfd43915c431e80c7497", b"feature/green2_loader": "94be9abcf9558213ff301af0ecd8223451ce991d", b"feature/greenloader": "9f82d95bd3edfb7f18b1a21d6171170395ea44ce", b"feature/my_test": "dafa445964230e808148db043c126063ea1dc9b6", b"feature/read2_loader": "9e912851eb64e3a1e08fbb587de7a4c897ce5a0a", b"feature/readloader": "ddecbc16f4c916c39eacfcb2302e15a9e70a231e", b"feature/red": "cb36b894129ca7910bb81c457c72d69d5ff111bc", b"feature/split5_loader": "3ed4b85d30401fe32ae3b1d650f215a588293a9e", b"feature/split_causing": "c346f6ff7f42f2a8ff867f92ab83a6721057d86c", b"feature/split_loader": "5f4eba626c3f826820c4475d2d81410759ec911b", b"feature/split_loader5": "5017ce0b285351da09a2029ea2cf544f79b593c7", b"feature/split_loading": "4e2dc6d6073f0b6d348f84ded52f9143b10344b9", b"feature/split_redload": "2d4a801c9a9645fcd3a9f4c06418d8393206b1f3", b"feature/splitloading": "88b80615ed8561be74a700b92883ec0374ddacb0", b"feature/test": "61d762d65afb3150e2653d6735068241779c1fcf", b"feature/test_branch": "be44d5e6cc66580f59c108f8bff5911ee91a22e4", b"feature/test_branching": "d2164061453ecb03d4347a05a77db83f706b8e15", b"feature/test_dog": "2973e5dc9568ac491b198f6b7f10c44ddc04e0a3", } mapping = {b"branch-closed-heads/%s/0" % b: n for b, n in closed.items()} mapping.update(tips) expected_branches = { k: SnapshotBranch(target=hash_to_bytes(v), target_type=TargetType.REVISION) for k, v in mapping.items() } expected_branches[b"HEAD"] = SnapshotBranch( target=b"branch-tip/default", target_type=TargetType.ALIAS ) expected_snapshot = Snapshot( id=hash_to_bytes("cbc609dcdced34dbd9938fe81b555170f1abc96f"), branches=expected_branches, ) assert_last_visit_matches( loader.storage, repo_url, status="full", type="hg", snapshot=expected_snapshot.id, ) check_snapshot(expected_snapshot, loader.storage) stats = get_stats(loader.storage) expected_stats = { "content": 2, "directory": 3, "origin": 1, "origin_visit": 1, "release": 0, "revision": 58, "skipped_content": 0, "snapshot": 1, } assert stats == expected_stats loader2 = HgLoader(swh_storage, url=repo_url) assert loader2.load() == {"status": "uneventful"} # nothing new happened stats2 = get_stats(loader2.storage) expected_stats2 = expected_stats.copy() expected_stats2["origin_visit"] = 2 # one new visit recorded assert stats2 == expected_stats2 assert_last_visit_matches( loader2.storage, repo_url, status="full", type="hg", snapshot=expected_snapshot.id, ) # but we got a snapshot nonetheless # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoader`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. def test_loader_hg_new_visit_with_release(swh_storage, datadir, tmp_path): """Eventful visit with release should yield 1 snapshot""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoader(swh_storage, url=repo_url, visit_date=VISIT_DATE,) actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} # then stats = get_stats(loader.storage) assert stats == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 1, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } # cf. test_loader.org for explaining from where those hashes tip_release = hash_to_bytes("515c4d72e089404356d0f4b39d60f948b8999140") release = loader.storage.release_get([tip_release])[0] assert release is not None tip_revision_default = hash_to_bytes("c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27") revision = loader.storage.revision_get([tip_revision_default])[0] assert revision is not None expected_snapshot = Snapshot( id=hash_to_bytes("7ef082aa8b53136b1bed97f734504be32679bbec"), branches={ b"branch-tip/default": SnapshotBranch( target=tip_revision_default, target_type=TargetType.REVISION, ), b"tags/0.1": SnapshotBranch( target=tip_release, target_type=TargetType.RELEASE, ), b"HEAD": SnapshotBranch( target=b"branch-tip/default", target_type=TargetType.ALIAS, ), }, ) check_snapshot(expected_snapshot, loader.storage) assert_last_visit_matches( loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full", snapshot=expected_snapshot.id, ) # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoader`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path): """Visit a mercurial repository visit transplant operations within should yield a snapshot as well. """ archive_name = "transplant" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoader(swh_storage, url=repo_url, visit_date=VISIT_DATE,) # load hg repository actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} # collect swh revisions assert_last_visit_matches( loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" ) revisions = [] snapshot = snapshot_get_latest(loader.storage, repo_url) for branch in snapshot.branches.values(): if branch.target_type.value != "revision": continue revisions.append(branch.target) # extract original changesets info and the transplant sources hg_changesets = set() transplant_sources = set() for rev in loader.storage.revision_log(revisions): extids = list( loader.storage.extid_get_from_target(ObjectType.REVISION, [rev["id"]]) ) assert len(extids) == 1 hg_changesets.add(hash_to_hex(extids[0].extid)) for k, v in rev["extra_headers"]: if k == b"transplant_source": transplant_sources.add(v.decode("ascii")) # check extracted data are valid assert len(hg_changesets) > 0 assert len(transplant_sources) > 0 assert transplant_sources <= hg_changesets def _partial_copy_storage( old_storage, origin_url: str, mechanism: str, copy_revisions: bool ): """Create a new storage, and only copy ExtIDs or head revisions to it.""" new_storage = get_storage(cls="memory") snapshot = snapshot_get_latest(old_storage, origin_url) assert snapshot heads = [branch.target for branch in snapshot.branches.values()] if mechanism == "extid": extids = old_storage.extid_get_from_target(ObjectType.REVISION, heads) new_storage.extid_add(extids) if copy_revisions: # copy revisions, but erase their metadata to make sure the loader doesn't # fallback to revision.metadata["nodeid"] revisions = [ attr.evolve(rev, metadata={}) for rev in old_storage.revision_get(heads) if rev ] new_storage.revision_add(revisions) else: assert mechanism == "same storage" return old_storage # copy origin, visit, status new_storage.origin_add(old_storage.origin_get([origin_url])) visit = old_storage.origin_visit_get_latest(origin_url) new_storage.origin_visit_add([visit]) statuses = old_storage.origin_visit_status_get(origin_url, visit.visit).results new_storage.origin_visit_status_add(statuses) new_storage.snapshot_add([snapshot]) return new_storage def test_load_unchanged_repo_should_be_uneventful( swh_storage, datadir, tmp_path, ): """Checks the loader can find which revisions it already loaded, using ExtIDs.""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_path = repo_url.replace("file://", "") loader = HgLoader(swh_storage, repo_path) assert loader.load() == {"status": "eventful"} assert get_stats(loader.storage) == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 1, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } visit_status = assert_last_visit_matches( loader.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full", ) assert visit_status.snapshot is not None # Create a new loader (to start with a clean slate, eg. remove the caches), # with the new, partial, storage loader2 = HgLoader(swh_storage, repo_path) assert loader2.load() == {"status": "uneventful"} # Should have all the objects assert get_stats(loader.storage) == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 2, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } visit_status2 = assert_last_visit_matches( loader2.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full", ) assert visit_status2.snapshot == visit_status.snapshot def test_closed_branch_incremental(swh_storage, datadir, tmp_path): """Test that a repository with a closed branch does not trip an incremental load""" archive_name = "example" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_path = repo_url.replace("file://", "") loader = HgLoader(swh_storage, repo_path) # Test 3 loads: full, and two incremental. assert loader.load() == {"status": "eventful"} expected_stats = { "content": 7, "directory": 16, "origin": 1, "origin_visit": 1, "release": 0, "revision": 9, "skipped_content": 0, "snapshot": 1, } assert get_stats(loader.storage) == expected_stats assert loader.load() == {"status": "uneventful"} assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 1 + 1} assert loader.load() == {"status": "uneventful"} assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 2 + 1} def test_load_unchanged_repo__dangling_extid(swh_storage, datadir, tmp_path): """Checks the loader will load revisions targeted by an ExtID if the revisions are missing from the storage""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_path = repo_url.replace("file://", "") loader = HgLoader(swh_storage, repo_path) assert loader.load() == {"status": "eventful"} assert get_stats(loader.storage) == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 1, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } old_storage = swh_storage # Create a new storage, and only copy ExtIDs or head revisions to it. # This should be enough for the loader to know revisions were already loaded new_storage = _partial_copy_storage( old_storage, repo_path, mechanism="extid", copy_revisions=False ) # Create a new loader (to start with a clean slate, eg. remove the caches), # with the new, partial, storage loader = HgLoader(new_storage, repo_path) assert get_stats(loader.storage) == { "content": 0, "directory": 0, "origin": 1, "origin_visit": 1, "release": 0, "revision": 0, "skipped_content": 0, "snapshot": 1, } assert loader.load() == {"status": "eventful"} assert get_stats(loader.storage) == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 2, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } def test_missing_filelog_should_not_crash(swh_storage, datadir, tmp_path): archive_name = "missing-filelog" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) directory = repo_url.replace("file://", "") loader = HgLoader( storage=swh_storage, url=repo_url, directory=directory, # specify directory to avoid clone visit_date=VISIT_DATE, ) actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} assert_last_visit_matches(swh_storage, repo_url, status="partial", type="hg") def test_multiple_open_heads(swh_storage, datadir, tmp_path): archive_name = "multiple-heads" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoader(storage=swh_storage, url=repo_url,) actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} assert_last_visit_matches(swh_storage, repo_url, status="full", type="hg") snapshot = snapshot_get_latest(swh_storage, repo_url) expected_branches = [ b"HEAD", b"branch-heads/default/0", b"branch-heads/default/1", b"branch-tip/default", ] assert sorted(snapshot.branches.keys()) == expected_branches # Check that we don't load anything the second time loader = HgLoader(storage=swh_storage, url=repo_url,) actual_load_status = loader.load() assert actual_load_status == {"status": "uneventful"} def hg_strip(repo: str, revset: str) -> None: """Removes `revset` and all of their descendants from the local repository.""" # Previously called `hg strip`, it was renamed to `hg debugstrip` in Mercurial 5.7 # because it's most likely not what most users want to do (they should use some kind # of history-rewriting tool like `histedit` or `prune`). # But here, it's exactly what we want to do. subprocess.check_call(["hg", "debugstrip", revset], cwd=repo) def test_load_repo_with_new_commits(swh_storage, datadir, tmp_path): archive_name = "hello" archive_path = Path(datadir, f"{archive_name}.tgz") json_path = Path(datadir, f"{archive_name}.json") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) # first load with missing commits hg_strip(repo_url.replace("file://", ""), "tip") loader = HgLoader(swh_storage, repo_url) assert loader.load() == {"status": "eventful"} assert get_stats(loader.storage) == { "content": 2, "directory": 2, "origin": 1, "origin_visit": 1, "release": 0, "revision": 2, "skipped_content": 0, "snapshot": 1, } # second load with all commits repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoader(swh_storage, repo_url) checker = LoaderChecker(loader=loader, expected=ExpectedSwhids.load(json_path),) checker.check() assert get_stats(loader.storage) == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 2, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 2, } def test_load_repo_check_extids_write_version(swh_storage, datadir, tmp_path): """ExtIDs should be stored with a given version when loading is done""" archive_name = "hello" archive_path = Path(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) hg_strip(repo_url.replace("file://", ""), "tip") loader = HgLoader(swh_storage, repo_url) assert loader.load() == {"status": "eventful"} # Ensure we write ExtIDs to a specific version. snapshot = snapshot_get_latest(swh_storage, repo_url) # First, filter out revisions from that snapshot revision_ids = [ branch.target for branch in snapshot.branches.values() if branch.target_type == TargetType.REVISION ] assert len(revision_ids) > 0 # Those revisions should have their associated ExtID version set to EXTID_VERSION extids = swh_storage.extid_get_from_target(ObjectType.REVISION, revision_ids) assert len(extids) == len(revision_ids) for extid in extids: assert extid.extid_version == EXTID_VERSION def test_load_new_extid_should_be_eventful(swh_storage, datadir, tmp_path): """Changing the extid version should make loaders ignore existing extids, and load the repo again.""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_path = repo_url.replace("file://", "") with unittest.mock.patch("swh.loader.mercurial.loader.EXTID_VERSION", 0): loader = HgLoader(swh_storage, repo_path) assert loader.load() == {"status": "eventful"} loader = HgLoader(swh_storage, repo_path) assert loader.load() == {"status": "eventful"} loader = HgLoader(swh_storage, repo_path) assert loader.load() == {"status": "uneventful"} with unittest.mock.patch("swh.loader.mercurial.loader.EXTID_VERSION", 10000): loader = HgLoader(swh_storage, repo_path) assert loader.load() == {"status": "eventful"} loader = HgLoader(swh_storage, repo_path) assert loader.load() == {"status": "uneventful"} def test_loader_hg_extid_filtering(swh_storage, datadir, tmp_path): """The first visit of a fork should filter already seen revisions (through extids) """ archive_name = "the-sandbox" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoader(swh_storage, url=repo_url) assert loader.load() == {"status": "eventful"} stats = get_stats(loader.storage) expected_stats = { "content": 2, "directory": 3, "origin": 1, "origin_visit": 1, "release": 0, "revision": 58, "skipped_content": 0, "snapshot": 1, } assert stats == expected_stats visit_status = assert_last_visit_matches( loader.storage, repo_url, status="full", type="hg", ) # Make a fork of the first repository we ingested fork_url = prepare_repository_from_archive( archive_path, "the-sandbox-reloaded", tmp_path ) loader2 = HgLoader( swh_storage, url=fork_url, directory=str(tmp_path / archive_name) ) assert loader2.load() == {"status": "uneventful"} stats = get_stats(loader.storage) expected_stats2 = expected_stats.copy() expected_stats2.update( {"origin": 1 + 1, "origin_visit": 1 + 1,} ) assert stats == expected_stats2 visit_status2 = assert_last_visit_matches( loader.storage, fork_url, status="full", type="hg", ) assert visit_status.snapshot is not None assert visit_status2.snapshot == visit_status.snapshot def test_loader_repository_with_bookmark_information(swh_storage, datadir, tmp_path): """Repository with bookmark information should be ingested correctly """ archive_name = "anomad-d" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = HgLoader(swh_storage, url=repo_url) assert loader.load() == {"status": "eventful"}