diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -54,6 +54,8 @@ entry_points=""" [swh.workers] loader.mercurial=swh.loader.mercurial:register + [console_scripts] + swh-hg-identify=swh.loader.mercurial.identify:main """, classifiers=[ "Programming Language :: Python :: 3", diff --git a/swh/loader/mercurial/identify.py b/swh/loader/mercurial/identify.py new file mode 100644 --- /dev/null +++ b/swh/loader/mercurial/identify.py @@ -0,0 +1,494 @@ +# Copyright (C) 2018-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +import subprocess +from codecs import escape_decode # type: ignore +from pathlib import Path +from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union + +# WARNING: do not import unnecessary things here to keep cli startup time under +# control +import click + +from swh.model.hashutil import hash_to_bytehex +from swh.model.identifiers import normalize_timestamp +from swh.model.model import RevisionType + + +class HgAuthor(NamedTuple): + """Represent a Mercurial revision author.""" + + fullname: bytes # full name of the author + name: Optional[bytes] # name of the author + email: Optional[bytes] # email of the author + + @staticmethod + def from_bytes(data: bytes) -> "HgAuthor": + """Convert bytes to an HgAuthor named tuple. + + Expected format: "name " + """ + from swh.loader.mercurial.converters import parse_author + + result = parse_author(data) + return HgAuthor( + fullname=result["fullname"], name=result["name"], email=result["email"] + ) + + def to_dict(self) -> Dict[str, Optional[bytes]]: + return {"fullname": self.fullname, "name": self.name, "email": self.email} + + +HG_REVISION_TEMPLATE = "\n".join( + [ + "node_id:{node}", + "author:{author}", + "timestamp_offset:{date|json}", + "p1:{p1.node}", + "p2:{p2.node}", + "extras:{join(extras, '\nextras:')}", + ] +) # Log template for HgRevision.from_bytes + +NULL_NODE_ID = b"0" * 40 # Value used when no parent + + +class HgRevision(NamedTuple): + """Represent a Mercurial revision.""" + + node_id: bytes # raw bytes of the revision hash + author: HgAuthor # author of the revision + timestamp: bytes # timestamp of the revision + offset: bytes # offset of the revision + parents: List[bytes] # hex bytes of the revision's parents + extras: Dict[bytes, bytes] # metadata of the revision + description: bytes # description of the revision + + @staticmethod + def from_bytes(data: bytes, description: bytes) -> "HgRevision": + """Convert bytes to an HgRevision named tuple. + + Expected data format: + ''' + node_id:{node} + author:{author} + timestamp_offset:[{timestamp}, {offset}] + p1:{p1} + p2:{p2} + extras:{key1}={value1} + ... + extras:{keyn}={value} + ''' + + """ + lines = data.split(b"\n") + tuples = [line.split(b":", 1) for line in lines] + fields: Dict[str, Any] = {"parents": [], "extras": {}} + for key, value in tuples: + if key == b"timestamp_offset": + timestamp, offset = json.loads(value) + fields["timestamp"] = timestamp + fields["offset"] = offset + elif key in (b"p1", b"p2") and value != NULL_NODE_ID: + fields["parents"].append(value) + elif key == b"extras": + extra_key, extra_value = value.split(b"=", 1) + fields["extras"][extra_key] = extra_value + elif key == b"author": + fields["author"] = HgAuthor.from_bytes(value) + else: + fields[key.decode()] = value + + return HgRevision( + node_id=fields["node_id"], + author=fields["author"], + timestamp=fields["timestamp"], + offset=fields["offset"], + parents=fields["parents"], + extras=fields["extras"], + description=description, + ) + + def branch(self) -> bytes: + return self.extras.get(b"branch", b"default") + + def to_dict(self) -> Dict: + """Convert a HgRevision to a dict for SWHID computation""" + date = normalize_timestamp(int(self.timestamp)) + + extra_headers = [ + (b"time_offset_seconds", str(self.offset).encode("utf-8")), + ] + + for key, value in self.extras.items(): + if key == b"branch" and value == b"default": + # branch default is skipped to match actual implementation + continue + if key == b"transplant_source": + # transplant_source is converted to hex to match actual implementation + value = hash_to_bytehex(escape_decode(value)[0]) + extra_headers.append((key, value)) + + author = self.author.to_dict() + + return { + "author": author, + "date": date, + "committer": author, + "committer_date": date, + "type": RevisionType.MERCURIAL.value, + "message": self.description, + "metadata": {"node": self.node_id}, + "extra_headers": tuple(extra_headers), + "synthetic": False, + "parents": self.parents, + } + + +class HgBranch(NamedTuple): + """Represent a Mercurial branch.""" + + name: bytes # name of the branch + node_id: bytes # row bytes of the target revision hash + + +class HgTag(NamedTuple): + """Represent a Mercurial tag.""" + + name: bytes # name of the tag + node_id: bytes # hex bytes of the target revision + + +class Hg: + """Provide methods to extract data from a Mercurial repository.""" + + def __init__(self, repository_root: Path) -> None: + self._root = repository_root + + def _output(self, *args) -> bytes: + """Return the outpout of a `hg` call.""" + return subprocess.check_output(["hg", *args], cwd=self._root) + + def _call(self, *args) -> None: + """Perform a `hg` call.""" + subprocess.check_call( + ["hg", *args], + cwd=self._root, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + + def root(self) -> Path: + """Return the root of the Mercurial repository.""" + return self._root + + def log(self, rev: Optional[Union[bytes, str]] = None) -> List[HgRevision]: + """Return the specified revisions of the Mercurial repository. + + Mercurial revsets are supported. (See `hg help revsets`) + + If no revision range is specified, return all revisions". + """ + if rev: + node_ids = self._output("log", "-r", rev, "-T", "{node}\n").splitlines() + else: + node_ids = self._output("log", "-T", "{node}\n").splitlines() + + revisions = [self._revision(node_id) for node_id in reversed(node_ids)] + + return revisions + + def _revision(self, revision: bytes) -> HgRevision: + data = self._output("log", "-r", revision, "-T", HG_REVISION_TEMPLATE) + + # hg log strips the description so the raw description has to be taken + # from debugdata + _, desc = self._output("debugdata", "-c", revision).split(b"\n\n", 1) + + return HgRevision.from_bytes(data, desc) + + def up(self, rev: bytes) -> None: + """Update the repository working directory to the specified revision.""" + self._call("up", rev) + + def branches(self) -> List[HgBranch]: + """List the repository named branches.""" + output = self._output("branches", "-T", "{branch}\n{node}\n\n").strip() + + branches = [] + + for block in output.split(b"\n\n"): + name, node_id = block.splitlines() + branches.append(HgBranch(name=name, node_id=node_id)) + + return branches + + def tip(self) -> HgRevision: + """Return the `tip` revision.""" + return self.log("tip")[0] + + def tags(self) -> List[HgTag]: + """Return the repository tags at the current revision.""" + hgtags = self._root / ".hgtags" + + tags = [] + + if hgtags.is_file(): + for line in hgtags.read_bytes().splitlines(): + node_id, name = line.split(b" ", 1) + tags.append(HgTag(name=name, node_id=node_id)) + + return tags + + +@click.group() +@click.option( + "--directory", + "-d", + help=("Path to the Mercurial repository. If unset, the current directory is used "), +) +@click.pass_context +def main(ctx, directory=None): + """Compute the Software Heritage persistent identifier (SWHID) for the given + source code object(s). + + For more details about SWHIDs see: + + https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html + """ + # ensure that ctx.obj exists and is a dict (in case `cli()` is called + # by means other than the `if` block below) + ctx.ensure_object(dict) + + root = Path(directory) if directory else Path() + if not root.exists(): + raise IOError(f"{root!r} does not exists") + + ctx.obj["HG_ROOT"] = root + + +def identify_directory(path: Path) -> str: + """Return the SWHID of the given path.""" + output = subprocess.check_output( + ["swh-identify", "--exclude", ".hg", "."], cwd=path + ).decode() + return output.split()[0].split(":")[-1] + + +class RevisionIdentity(NamedTuple): + """Represent a swh revision identity.""" + + swhid: bytes # SWHID raw bytes + node_id: bytes # node_id hex bytes + directory_swhid: bytes + + def dir_uri(self) -> str: + """Return the SWHID uri of the revision's directory.""" + return f"swh:1:dir:{self.directory_swhid.hex()}\t{self.node_id.decode()}" + + def __str__(self) -> str: + """Return the string representation of a RevisionIdentity.""" + return f"swh:1:rev:{self.swhid.hex()}\t{self.node_id.decode()}" + + +def identify_revision( + hg: Hg, + rev: Optional[bytes] = None, + node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, +) -> Iterator[RevisionIdentity]: + """Return the repository revision identities. + + hg: A `Hg` repository instance + rev: An optional revision or Mercurial revsets (See `hg help revsets`) + If not provided all the repository revisions will be computed. + node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs + It will be updated in place with new mappings. + """ + from swh.model.hashutil import hash_to_bytes + from swh.model.identifiers import revision_identifier + + if node_id_2_swhid is None: + node_id_2_swhid = {} + + for revision in hg.log(rev): + data = revision.to_dict() + + hg.up(revision.node_id) + directory_swhid = hash_to_bytes(identify_directory(hg.root())) + data["directory"] = directory_swhid + + parents = [] + for parent in data["parents"]: + if parent not in node_id_2_swhid: + parent_swhid, *_ = next(identify_revision(hg, parent, node_id_2_swhid)) + node_id_2_swhid[parent] = parent_swhid + parents.append(node_id_2_swhid[parent]) + data["parents"] = parents + + revision_swhid = hash_to_bytes(revision_identifier(data)) + node_id_2_swhid[revision.node_id] = revision_swhid + + yield RevisionIdentity( + swhid=revision_swhid, + node_id=revision.node_id, + directory_swhid=directory_swhid, + ) + + +class ReleaseIdentity(NamedTuple): + """Represent a swh release identity.""" + + swhid: str # SWHID hex string + node_id: bytes # node_id hex bytes + name: bytes # name of the release + + def __str__(self) -> str: + """Return the string representation of a ReleaseIdentity.""" + return f"swh:1:rel:{self.swhid}\t{self.name.decode()}" + + +def identify_release( + hg: Hg, node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, +) -> Iterator[ReleaseIdentity]: + """Return the repository's release identities. + + hg: A `Hg` repository instance + node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs + If not provided it will be computed using `identify_revision`. + """ + from swh.model.identifiers import release_identifier + from swh.model.model import ObjectType + + if node_id_2_swhid is None: + node_id_2_swhid = { + revision.node_id: revision.swhid for revision in identify_revision(hg) + } + + for tag in hg.tags(): + data = { + "name": tag.name, + "target": node_id_2_swhid[tag.node_id], + "target_type": ObjectType.REVISION.value, + "message": None, + "metadata": None, + "synthetic": False, + "author": {"name": None, "email": None, "fullname": b""}, + "date": None, + } + + release_swhid = release_identifier(data) + + yield ReleaseIdentity( + swhid=release_swhid, node_id=tag.node_id, name=tag.name, + ) + + +def identify_snapshot( + hg: Hg, + node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, + releases: Optional[List[ReleaseIdentity]] = None, +) -> str: + """Return the repository release identifiers. + + hg: A `Hg` repository instance + node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs + If not provided it will be computed using `identify_revision`. + release: an optional list of `ReleaseIdentity`. + If not provided it will be computed using `identify_release`. + """ + from swh.model.identifiers import snapshot_identifier + from swh.model.model import TargetType + + if node_id_2_swhid is None: + node_id_2_swhid = { + revision.node_id: revision.swhid for revision in identify_revision(hg) + } + + if releases is None: + releases = [release for release in identify_release(hg, node_id_2_swhid)] + + branches = {} + + tip = hg.tip() + branches[b"HEAD"] = { + "target": tip.branch(), + "target_type": TargetType.ALIAS.value, + } + + for branch in hg.branches(): + branches[branch.name] = { + "target": node_id_2_swhid[branch.node_id], + "target_type": TargetType.REVISION.value, + } + + for release in releases: + branches[release.name] = { + "target": release.swhid, + "target_type": TargetType.RELEASE.value, + } + + return snapshot_identifier({"branches": branches}) + + +@main.command() +@click.argument("rev", required=False) +@click.pass_context +def revision(ctx, rev): + """Compute the SWHID of a given revision. + + If specified REV allow to select a single or multiple revisions + (using the Mercurial revsets language: `hg help revsets`) + """ + hg = Hg(ctx.obj["HG_ROOT"]) + + for identity in identify_revision(hg, rev): + click.echo(identity) + + +@main.command() +@click.pass_context +def snapshot(ctx): + """Compute the SWHID of the snapshot.""" + root = ctx.obj["HG_ROOT"] + hg = Hg(root) + + snapshot_swhid = identify_snapshot(hg) + + click.echo(f"swh:1:snp:{snapshot_swhid}\t{root}") + + +@main.command() +@click.pass_context +def all(ctx): + """Compute the SWHID of all the repository objects.""" + root = ctx.obj["HG_ROOT"] + hg = Hg(root) + + dir_uris = [] + rev_uris = [] + rel_uris = [] + + node_id_2_swhid = {} + for revision in identify_revision(hg): + dir_uris.append(revision.dir_uri()) + rev_uris.append(str(revision)) + node_id_2_swhid[revision.node_id] = revision.swhid + + releases = [] + for release in identify_release(hg, node_id_2_swhid): + rel_uris.append(str(release)) + releases.append(release) + + snapshot_swhid = identify_snapshot(hg, node_id_2_swhid, releases) + + for uri in dir_uris + rev_uris + rel_uris: + click.echo(uri) + + click.echo(f"swh:1:snp:{snapshot_swhid}\t{root}") + + +if __name__ == "__main__": + main() diff --git a/swh/loader/mercurial/tests/test_identify.py b/swh/loader/mercurial/tests/test_identify.py new file mode 100644 --- /dev/null +++ b/swh/loader/mercurial/tests/test_identify.py @@ -0,0 +1,86 @@ +import os +from urllib.parse import urlsplit + +from click.testing import CliRunner + +from swh.loader.mercurial.identify import main +from swh.loader.tests import prepare_repository_from_archive + + +def test_all_revisions(datadir: str, tmp_path: str): + archive_name = "hello" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + directory = urlsplit(repo_url).path + + runner = CliRunner() + result = runner.invoke(main, ["-d", directory, "revision"]) + + expected = ( + "\n".join( + [ + "swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940" + "\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9", + "swh:1:rev:8dd3db5d5519e4947f035d141581d304565372d2" + "\t82e55d328c8ca4ee16520036c0aaace03a5beb65", + "swh:1:rev:c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27" + "\tb985ae4a07e12ac662f45a171e2d42b13be5b50c", + ] + ) + + "\n" + ) + assert result.output == expected + + +def test_single_revision(datadir: str, tmp_path: str): + archive_name = "hello" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + directory = urlsplit(repo_url).path + + runner = CliRunner() + result = runner.invoke( + main, ["-d", directory, "revision", "0a04b987be5ae354b710cefeba0e2d9de7ad41a9"] + ) + + expected = ( + "\n".join( + [ + "swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940" + "\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9", + ] + ) + + "\n" + ) + assert result.output == expected + + +def test_all(datadir: str, tmp_path: str): + archive_name = "hello" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + directory = urlsplit(repo_url).path + + runner = CliRunner() + result = runner.invoke(main, ["-d", directory, "all"]) + + expected = ( + "\n".join( + [ + "swh:1:dir:b3f85f210ff86d334575f64cb01c5bf49895b63e", + "\t82e55d328c8ca4ee16520036c0aaace03a5beb65", + "swh:1:dir:8f2be433c945384c85920a8e60f2a68d2c0f20fb", + "\tb985ae4a07e12ac662f45a171e2d42b13be5b50c", + "swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940", + "\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9", + "swh:1:rev:8dd3db5d5519e4947f035d141581d304565372d2", + "\t82e55d328c8ca4ee16520036c0aaace03a5beb65", + "swh:1:rev:c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27", + "\tb985ae4a07e12ac662f45a171e2d42b13be5b50c", + "swh:1:rel:515c4d72e089404356d0f4b39d60f948b8999140\t0.1", + "swh:1:snp:d35668e02e2ba4321dc951cd308cf883786f918a\t{directory}", + ] + ) + + "\n" + ) + assert result.output == expected