diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ retrying sqlitedict vcversioner +mercurial diff --git a/swh/loader/mercurial/from_disk.py b/swh/loader/mercurial/from_disk.py new file mode 100644 --- /dev/null +++ b/swh/loader/mercurial/from_disk.py @@ -0,0 +1,511 @@ +import os +import textwrap +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Any, Dict, Iterable, Iterator, List, Optional +from urllib.parse import urlsplit + +import dateutil +import mercurial.ui # type: ignore +from mercurial import hg +from mercurial import tags as tagsmod + +from swh.loader.core.loader import DVCSLoader +from swh.model.hashutil import MultiHash, hash_to_bytehex, hash_to_bytes +from swh.model.identifiers import normalize_timestamp +from swh.model.model import ( + BaseContent, + Content, + Directory, + ObjectType, + Origin, + Person, + Release, + Revision, + RevisionType, + Snapshot, + TargetType, + Timestamp, + TimestampWithTimezone, +) +from swh.storage.algos.origin import origin_get_latest_visit_status + +DIR_PERM = 0o040000 +FLAG_PERMS = { + b"l": 0o120000, # symlink flag + b"x": 0o100755, # exec flag + b"": 0o100644, # no flag +} # type: Dict[bytes, int] + + +class HgLoaderFromDisk(DVCSLoader): + """Load a mercurial repository from a cloned mercurial directory.""" + + CONFIG_BASE_FILENAME = "loader/mercurial" + + visit_type = "hg" + + def __init__( + self, + url: str, + directory: str, # Path to a cloned mercurial repository + logging_class="swh.loader.mercurial.Loader", + visit_date: Optional[datetime] = None, + config: Optional[Dict[str, Any]] = None, + ): + super().__init__(logging_class=logging_class, config=config or {}) + self.origin_url = url + self.visit_date = visit_date + self.directory = urlsplit(directory).path + + def prepare_origin_visit(self, *args, **kwargs) -> None: + """First step executed by the loader to prepare origin and visit + references. Set/update self.origin, and + optionally self.origin_url, self.visit_date. + + """ + self.origin = Origin(url=self.origin_url) + visit_date = self.visit_date + if isinstance(visit_date, str): + visit_date = dateutil.parser.parse(visit_date) + self.visit_date = visit_date + visit_and_status = origin_get_latest_visit_status( + self.storage, self.origin_url, require_snapshot=True + ) + if visit_and_status is None: + self.last_snapshot_id = None + else: + _, visit_status = visit_and_status + self.last_snapshot_id = visit_status.snapshot + + def prepare(self, *args, **kwargs): + """Second step executed by the loader to prepare some state needed by + the loader. + + """ + ui = mercurial.ui.ui.load() + self.repo = hg.repository(ui, self.directory.encode()).unfiltered() + + self.hashes_cache = build_hashes_cache(self.repo) + self.tags_rev = get_tags_rev(self.repo) + + def save_data(self) -> None: + """Save the data associated to the current load""" + # TODO may be implemented with incremental loading + return + + def fetch_data(self) -> bool: + """Fetch the data from the data source.""" + return False # No data to fetch since we use an already cloned repo + + def has_contents(self) -> bool: + """Checks whether we need to load contents""" + # TODO may be implemented with incremental loading + return True + + def get_contents(self) -> Iterable[BaseContent]: + """Get the contents that need to be loaded""" + _hashes = [] + for rev in self.hashes_cache: + ctx = self.repo[rev] + + for filename in self.hashes_cache[rev]: + fctx = ctx[filename] + data = fctx.data() + hashes = self.hashes_cache[rev][filename] + _hashes.append(hashes["sha1_git"]) + + yield Content( + data=data, status="visible", length=len(data), **hashes, + ) + + def has_directories(self) -> bool: + """Checks whether we need to load directories""" + # TODO may be implemented with incremental loading + return True + + def get_directories(self) -> Iterable[Directory]: + """Get the directories that need to be loaded""" + self.rev_dir_hashes: Dict[bytes, bytes] = {} + for rev in self.hashes_cache: + tree = TreeDirectory(None) + + ctx = self.repo[rev] + for filepath in self.hashes_cache[rev]: + fctx = ctx[filepath] + tree.add_file( + path=filepath, + perms=FLAG_PERMS[fctx.flags()], + sha1_git=self.hashes_cache[rev][filepath]["sha1_git"], + ) + + yield from tree.directories() + self.rev_dir_hashes[rev] = tree.sha1_git() + + def has_revisions(self) -> bool: + """Checks whether we need to load revisions""" + # TODO may be implemented with incremental loading + return True + + def get_revisions(self) -> Iterable[Revision]: + """Get the revisions that need to be loaded""" + + # Keep track of calculated revisions sha1_git + # for release and snapshot generation + self._node_hashes: Dict[bytes, bytes] = {} + + for rev in self.repo: + ctx = self.repo[rev] + parents = tuple( + [ + self._node_hashes[p.node()] # revisions are in increasing order so + # this won't fail + for p in ctx.parents() + if p.node() != mercurial.node.nullid + ] + ) + + author = author_dict_from_str(ctx.user()) + # rev_date = get_ctx_date(ctx).to_dict() + (timestamp, offset) = ctx.date() + rev_date = normalize_timestamp(int(timestamp)) + extra_headers = [ + (b"time_offset_seconds", str(offset).encode("utf-8"),), + ] + for key, value in ctx.extra().items(): + if key == b"branch" and value == b"default": + continue + if key == b"transplant_source": + value = hash_to_bytehex(value) + extra_headers.append((key, value)) + + revision = { + "author": author, + "date": rev_date, + "committer": author, + "committer_date": rev_date, + "type": RevisionType.MERCURIAL.value, + "directory": self.rev_dir_hashes[rev], + "message": ctx.description(), + "metadata": {"node": ctx.node().hex()}, + "extra_headers": tuple(extra_headers), + "synthetic": False, + "parents": parents, + } + + revision["id"] = hash_to_bytes(Revision.compute_hash(revision)) + self._node_hashes[ctx.node()] = revision["id"] + yield Revision.from_dict(revision) + + def has_releases(self) -> bool: + """Checks whether we need to load releases""" + # TODO may be implemented with incremental loading + return True + + def get_releases(self) -> Iterable[Release]: + """Get the releases that need to be loaded""" + self._releases = {} + + # Keep track of original tags target for snapshot generation + self.tags_target = {} + + for (name, nodeid) in self.repo.tags().items(): + target_ctx = self.repo[nodeid] + self.tags_target[name] = target_ctx.node() + if name == b"tip": + continue + + if nodeid == self.tags_rev[name]["target"]: + target = self._node_hashes[target_ctx.node()] + release = { + "name": name, + "target": target, + "target_type": ObjectType.REVISION.value, + "message": None, + "metadata": None, + "synthetic": False, + "author": {"name": None, "email": None, "fullname": b""}, + "date": None, + } + release["id"] = hash_to_bytes(Release.compute_hash(release)) + + self._releases[name] = release["id"] + yield Release.from_dict(release) + else: + raise Exception(f"no target found for node {nodeid.hex()}") + + def get_snapshot(self) -> Snapshot: + """Get the snapshot that needs to be loaded""" + branches: Dict[bytes, Any] = {} + tip_node = [ + target_node + for name, target_node in self.tags_target.items() + if name == b"tip" + ][0] + for name, node in get_hg_branches(self.repo).items(): + target = self._node_hashes[node] + branches[name] = { + "target": target, + "target_type": TargetType.REVISION.value, + } + if node == tip_node: + branches[b"HEAD"] = { + "target": name, + "target_type": TargetType.ALIAS.value, + } + for name, target in self._releases.items(): + branches[name] = { + "target": target, + "target_type": TargetType.RELEASE.value, + } + + snapshot = {"branches": branches} + snapshot["id"] = hash_to_bytes(Snapshot.compute_hash({"branches": branches})) + return Snapshot.from_dict(snapshot) + + def load_status(self): + """Detailed loading status. + + Defaults to logging an eventful load. + + Returns: a dictionary that is eventually passed back as the task's + result to the scheduler, allowing tuning of the task recurrence + mechanism. + """ + return { + "status": "eventful", + } + + +def get_ctx_date(ctx) -> TimestampWithTimezone: + """Return a changset timestamp. """ + (timestamp, offset) = ctx.date() + # TODO timestamp seems to always be negative. + # Some investigation is needed but it works. + result = TimestampWithTimezone( + timestamp=Timestamp(seconds=int(timestamp), microseconds=0), + offset=-(offset // 60), + negative_utc=(offset == 0), + ) + return result + + +def build_hashes_cache(repo): + """Build a cache of every file hash in advance. + + TODO: use lru cache at when incremental loading is implemented. + """ + result = {} + + for rev in repo: + result[rev] = {} + ctx = repo[rev] + manifest = ctx.manifest() + + for filepath in manifest: + fctx = ctx[filepath] + data = fctx.data() + hashes = MultiHash.from_data(data).digest() + result[rev][filepath] = hashes + + return result + + +def path_split_all(path): + """Split all parts of a path. """ + parts = [] + head, tail = os.path.split(path) + while tail != b"": + parts.append(tail) + head, tail = os.path.split(head) + return list(reversed(parts)) + + +def get_tags_rev(repo) -> Dict[str, Dict]: + """Map tags to their respective rev and target node """ + result: Dict[str, Dict] = {} + hgtags_log = repo.file(b".hgtags") + + for trev in hgtags_log: + # TODO: handle edge cases for linkrev + rev = hgtags_log.linkrev(trev) + + oldfnodes = [ + hgtags_log.node(p) + for p in hgtags_log.parentrevs(trev) + if p != mercurial.node.nullrev + ] + newfnodes = [hgtags_log.node(trev)] + + changes = tagsmod.difftags(repo.ui, repo, oldfnodes, newfnodes) + for tag, old, new in changes: + if new is None: # The tag has been removed + del result[tag] + else: # The tag has been added of moved + result[tag] = { + "rev": rev, # rev which added the tag + "target": new, # targeted node + } + + return result + + +class TreeElement(ABC): + """Interface that every element of the tree must respect.""" + + @abstractmethod + def as_entry_dict(self) -> Dict[str, Any]: + """Return the element as a dict suitable for `Directory.from_dict`.""" + + def directories(self) -> Iterator[Directory]: + """Yield all directories contained in the element.""" + yield from [] + + +class TreeDirectory(TreeElement): + """Represent a directory structure. + + It is used to compute recursively compute the sha1_git of all the elements of the + tree. + + When incremental loading will be implemented, it will be used to recaclulate only + the changed parts. + """ + + def __init__(self, name: Optional[bytes]) -> None: + self._name = name + self._files: Dict[bytes, TreeFile] = {} + self._dirs: Dict[bytes, TreeDirectory] = {} + self._sha1_git: Optional[bytes] = None + + def add_file(self, path: bytes, perms: int, sha1_git: bytes) -> None: + """Add a file to the tree by its full path.""" + path, filename = os.path.split(path) + parts = path_split_all(path) + + current_dir = self + for part in parts: + current_dir = current_dir._add_tree(part) + current_dir._add_file(filename, perms, sha1_git) + + def _add_tree(self, name: bytes) -> "TreeDirectory": + """Add a tree to the tree by is name.""" + if name not in self._dirs: + self._dirs[name] = TreeDirectory(name) + return self._dirs[name] + + def _add_file(self, name: bytes, perms: int, sha1_git: bytes) -> None: + """Add a file to the tree by is name.""" + if name in self._files: + raise Exception(f"name {name.decode()} already exists") + if name in self._dirs: + raise Exception(f"name {name.decode()} already is an existing directory") + self._files[name] = TreeFile(name, perms, sha1_git) + + def __str__(self): + """Display the tree (for debug purpose only).""" + name = self._name.decode() if self._name else "/" + files = textwrap.indent("\n".join(map(str, self._files.values())), prefix=" ") + dirs = textwrap.indent("\n".join(map(str, self._dirs.values())), prefix=" ") + return "\n".join([f"{name} ({self.sha1_git().hex()})", files, dirs]) + + def sha1_git(self) -> bytes: + """Compute the hash of the tree.""" + if self._sha1_git is None: + self._sha1_git = hash_to_bytes( + Directory.compute_hash({"entries": self._entries()}) + ) + return self._sha1_git + + def _entries(self) -> List[Dict[str, Any]]: + """List entries of the tree.""" + file_entries = [f.as_entry_dict() for f in self._files.values()] + dir_entries = [d.as_entry_dict() for d in self._dirs.values()] + return file_entries + dir_entries + + def as_entry_dict(self) -> Dict[str, Any]: + """Return the tree as a dict suitable for `Directory.from_dict`.""" + return { + "type": "dir", + "perms": DIR_PERM, + "name": self._name, + "target": self.sha1_git(), + } + + def directories(self) -> Iterator[Directory]: + """Yield all directories contained in the tree.""" + for item in self._dirs.values(): + yield from item.directories() + + yield Directory.from_dict( + {"id": self.sha1_git(), "entries": self._entries(),} + ) + + +class TreeFile(TreeElement): + """Represent a file in a tree.""" + + def __init__(self, name: bytes, perms: int, sha1_git: bytes) -> None: + self._name = name + self._perms = perms + self._sha1_git = sha1_git + + def __str__(self): + """Display the file (for debug purpose only).""" + return f"{self._name.decode()} ({self._sha1_git.hex()})" + + def as_entry_dict(self) -> Dict[str, Any]: + """Return the file as a dict suitable for `Directory.from_dict`.""" + return { + "type": "file", + "perms": self._perms, + "name": self._name, + "target": self._sha1_git, + } + + +def author_dict_from_str(author: bytes) -> Dict[str, bytes]: + result = Person.from_fullname(author).to_dict() + # git requires a space between name and email. Other wise the hash differs + # the fullname is used when present in the hash function + if result["email"]: + result["fullname"] = result["name"] + b" <" + result["email"] + b">" + else: + result["fullname"] = result["name"] + return result + + +if __name__ == "__main__": + import logging + + import click + + logging.basicConfig( + level=logging.DEBUG, format="%(asctime)s %(process)d %(message)s" + ) + + @click.command() + @click.option("--origin-url", help="origin url") + @click.option("--hg-directory", help="Path to mercurial repository to load") + @click.option("--visit-date", default=None, help="Visit date") + def main(origin_url, hg_directory, visit_date): + if not visit_date: + visit_date = datetime.datetime.now(tz=datetime.timezone.utc) + + return HgLoaderFromDisk().load(origin_url, hg_directory, visit_date) + + main() + + +def get_hg_branches(repo): + """Equivalent of `hg branches`. + + For some reason, there is no direct way to get the same result of `hg branches` in + the localrepository interface. + """ + result = {} + for tag, heads, tip, isclosed in repo.branchmap().iterbranches(): + if isclosed: + continue + result[tag] = tip + return result diff --git a/swh/loader/mercurial/tests/test_from_disk.py b/swh/loader/mercurial/tests/test_from_disk.py new file mode 100644 --- /dev/null +++ b/swh/loader/mercurial/tests/test_from_disk.py @@ -0,0 +1,161 @@ +import os + +from swh.loader.tests import ( + assert_last_visit_matches, + check_snapshot, + get_stats, + prepare_repository_from_archive, +) +from swh.model.hashutil import hash_to_bytes +from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType +from swh.storage.algos.snapshot import snapshot_get_latest + +from ..from_disk import HgLoaderFromDisk + + +def test_loader_hg_new_visit_no_release(swh_config, datadir, tmp_path): + archive_name = "the-sandbox" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + + loader = HgLoaderFromDisk(url=repo_url, directory=repo_url) + + assert loader.load() == {"status": "eventful"} + + tip_revision_develop = "a9c4534552df370f43f0ef97146f393ef2f2a08c" + tip_revision_default = "70e750bb046101fdced06f428e73fee471509c56" + expected_snapshot = Snapshot( + id=hash_to_bytes("3b8fe58e467deb7597b12a5fd3b2c096b8c02028"), + branches={ + b"develop": SnapshotBranch( + target=hash_to_bytes(tip_revision_develop), + target_type=TargetType.REVISION, + ), + b"default": SnapshotBranch( + target=hash_to_bytes(tip_revision_default), + target_type=TargetType.REVISION, + ), + b"HEAD": SnapshotBranch(target=b"develop", target_type=TargetType.ALIAS,), + }, + ) + + assert_last_visit_matches( + loader.storage, + repo_url, + status="full", + type="hg", + snapshot=expected_snapshot.id, + ) + check_snapshot(expected_snapshot, loader.storage) + + stats = get_stats(loader.storage) + assert stats == { + "content": 2, + "directory": 3, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 58, + "skipped_content": 0, + "snapshot": 1, + } + + +def test_loader_hg_new_visit_with_release(swh_config, datadir, tmp_path): + """Eventful visit with release should yield 1 snapshot""" + archive_name = "hello" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + + loader = HgLoaderFromDisk( + url=repo_url, directory=repo_url, visit_date="2016-05-03 15:16:32+00", + ) + + actual_load_status = loader.load() + assert actual_load_status == {"status": "eventful"} + + # then + stats = get_stats(loader.storage) + assert stats == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 1, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 1, + } + + # cf. test_loader.org for explaining from where those hashes + tip_release = hash_to_bytes("515c4d72e089404356d0f4b39d60f948b8999140") + release = loader.storage.release_get([tip_release])[0] + assert release is not None + + tip_revision_default = hash_to_bytes("c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27") + revision = loader.storage.revision_get([tip_revision_default])[0] + assert revision is not None + + expected_snapshot = Snapshot( + id=hash_to_bytes("d35668e02e2ba4321dc951cd308cf883786f918a"), + branches={ + b"default": SnapshotBranch( + target=tip_revision_default, target_type=TargetType.REVISION, + ), + b"0.1": SnapshotBranch(target=tip_release, target_type=TargetType.RELEASE,), + b"HEAD": SnapshotBranch(target=b"default", target_type=TargetType.ALIAS,), + }, + ) + + check_snapshot(expected_snapshot, loader.storage) + assert_last_visit_matches( + loader.storage, + repo_url, + type=RevisionType.MERCURIAL.value, + status="full", + snapshot=expected_snapshot.id, + ) + + +def test_visit_repository_with_transplant_operations(swh_config, datadir, tmp_path): + """Visit a mercurial repository visit transplant operations within should yield a + snapshot as well. + + """ + + archive_name = "transplant" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + loader = HgLoaderFromDisk( + url=repo_url, directory=repo_url, visit_date="2019-05-23 12:06:00+00" + ) + + # load hg repository + actual_load_status = loader.load() + assert actual_load_status == {"status": "eventful"} + + # collect swh revisions + assert_last_visit_matches( + loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" + ) + + revisions = [] + snapshot = snapshot_get_latest(loader.storage, repo_url) + for branch in snapshot.branches.values(): + if branch.target_type.value != "revision": + continue + revisions.append(branch.target) + + # extract original changesets info and the transplant sources + hg_changesets = set() + transplant_sources = set() + for rev in loader.storage.revision_log(revisions): + hg_changesets.add(rev["metadata"]["node"]) + for k, v in rev["extra_headers"]: + if k == b"transplant_source": + transplant_sources.add(v.decode("ascii")) + + # check extracted data are valid + assert len(hg_changesets) > 0 + assert len(transplant_sources) > 0 + assert transplant_sources.issubset(hg_changesets)