diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ python-hglib retrying sqlitedict +mercurial diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -54,6 +54,8 @@ entry_points=""" [swh.workers] loader.mercurial=swh.loader.mercurial:register + [console_scripts] + swh-hg-identify=swh.loader.mercurial.identify:main """, classifiers=[ "Programming Language :: Python :: 3", diff --git a/swh/loader/mercurial/identify.py b/swh/loader/mercurial/identify.py new file mode 100644 --- /dev/null +++ b/swh/loader/mercurial/identify.py @@ -0,0 +1,541 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +import re +import subprocess +from codecs import escape_decode # type: ignore +from pathlib import Path +from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union + +# WARNING: do not import unnecessary things here to keep cli startup time under +# control +import click + +from swh.model.cli import identify_object +from swh.model.hashutil import hash_to_bytehex +from swh.model.identifiers import normalize_timestamp, swhid +from swh.model.model import RevisionType + +TAG_PATTERN = re.compile(b"([0-9A-Fa-f]{40}) +(.+)") + + +class HgAuthor(NamedTuple): + """Represent a Mercurial revision author.""" + + fullname: bytes + """full name of the author""" + + name: Optional[bytes] + """name of the author""" + + email: Optional[bytes] + """email of the author""" + + @staticmethod + def from_bytes(data: bytes) -> "HgAuthor": + """Convert bytes to an HgAuthor named tuple. + + Expected format: "name " + """ + from swh.loader.mercurial.converters import parse_author + + result = parse_author(data) + return HgAuthor( + fullname=result["fullname"], name=result["name"], email=result["email"] + ) + + def to_dict(self) -> Dict[str, Optional[bytes]]: + return {"fullname": self.fullname, "name": self.name, "email": self.email} + + +HG_REVISION_TEMPLATE = "\n".join( + [ + "node_id:{node}", + "author:{author}", + "timestamp_offset:{date|json}", + "p1:{p1.node}", + "p2:{p2.node}", + "extras:{join(extras, '\nextras:')}", + ] +) # Log template for HgRevision.from_bytes + +NULL_NODE_ID = b"0" * 40 # Value used when no parent + + +class HgRevision(NamedTuple): + """Represent a Mercurial revision.""" + + node_id: bytes + """raw bytes of the revision hash""" + + author: HgAuthor + """author of the revision""" + + timestamp: bytes + """timestamp of the revision""" + + offset: bytes + """offset of the revision""" + + parents: List[bytes] + """hex bytes of the revision's parents""" + + extras: Dict[bytes, bytes] + """metadata of the revision""" + + description: bytes + """description of the revision""" + + @staticmethod + def from_bytes(data: bytes, description: bytes) -> "HgRevision": + """Convert bytes to an HgRevision named tuple. + + Expected data format: + ''' + node_id:{node} + author:{author} + timestamp_offset:[{timestamp}, {offset}] + p1:{p1} + p2:{p2} + extras:{key1}={value1} + ... + extras:{keyn}={value} + ''' + + """ + lines = data.split(b"\n") + tuples = [line.split(b":", 1) for line in lines] + fields: Dict[str, Any] = { + "parents": [], + "extras": {}, + "description": description, + } + for key, value in tuples: + if key == b"timestamp_offset": + timestamp, offset = json.loads(value) + fields["timestamp"] = timestamp + fields["offset"] = offset + elif key in (b"p1", b"p2"): + if value != NULL_NODE_ID: + fields["parents"].append(value) + elif key == b"extras": + extra_key, extra_value = value.split(b"=", 1) + fields["extras"][extra_key] = extra_value + elif key == b"author": + fields["author"] = HgAuthor.from_bytes(value) + else: + fields[key.decode()] = value + + return HgRevision(**fields) + + def branch(self) -> bytes: + return self.extras.get(b"branch", b"default") + + def to_dict(self) -> Dict: + """Convert a HgRevision to a dict for SWHID computation""" + date = normalize_timestamp(int(self.timestamp)) + + extra_headers = [ + (b"time_offset_seconds", str(self.offset).encode("utf-8")), + ] + + for key, value in self.extras.items(): + if key == b"branch" and value == b"default": + # branch default is skipped to match historical implementation + continue + if key == b"transplant_source": + # transplant_source is converted to hex + # to match historical implementation + value = hash_to_bytehex(escape_decode(value)[0]) + extra_headers.append((key, value)) + + author = self.author.to_dict() + + return { + "author": author, + "date": date, + "committer": author, + "committer_date": date, + "type": RevisionType.MERCURIAL.value, + "message": self.description, + "metadata": {"node": self.node_id}, + "extra_headers": tuple(extra_headers), + "synthetic": False, + "parents": self.parents, + } + + +class HgBranch(NamedTuple): + """Represent a Mercurial branch.""" + + name: bytes + """name of the branch""" + + node_id: bytes + """row bytes of the target revision hash""" + + +class HgTag(NamedTuple): + """Represent a Mercurial tag.""" + + name: bytes + """name of the tag""" + + node_id: bytes + """hex bytes of the target revision""" + + +class Hg: + """Provide methods to extract data from a Mercurial repository.""" + + def __init__(self, repository_root: Path) -> None: + self._root = repository_root + + def _output(self, *args) -> bytes: + """Return the outpout of a `hg` call.""" + return subprocess.check_output(["hg", *args], cwd=self._root) + + def _call(self, *args) -> None: + """Perform a `hg` call.""" + subprocess.check_call( + ["hg", *args], + cwd=self._root, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + + def root(self) -> Path: + """Return the root of the Mercurial repository.""" + return self._root + + def log(self, rev: Optional[Union[bytes, str]] = None) -> List[HgRevision]: + """Return the specified revisions of the Mercurial repository. + + Mercurial revsets are supported. (See `hg help revsets`) + + If no revision range is specified, return all revisions". + """ + if rev: + node_ids = self._output("log", "-r", rev, "-T", "{node}\n").splitlines() + else: + node_ids = self._output("log", "-T", "{node}\n").splitlines() + + revisions = [self._revision(node_id) for node_id in reversed(node_ids)] + + return revisions + + def _revision(self, revision: bytes) -> HgRevision: + data = self._output("log", "-r", revision, "-T", HG_REVISION_TEMPLATE) + + # hg log strips the description so the raw description has to be taken + # from debugdata + # The description follows some metadata and is separated from them + # by an empty line + _, desc = self._output("debugdata", "-c", revision).split(b"\n\n", 1) + + return HgRevision.from_bytes(data, desc) + + def up(self, rev: bytes) -> None: + """Update the repository working directory to the specified revision.""" + self._call("up", rev) + + def branches(self) -> List[HgBranch]: + """List the repository named branches.""" + output = self._output("branches", "-T", "{branch}\n{node}\n\n").strip() + + branches = [] + + for block in output.split(b"\n\n"): + name, node_id = block.splitlines() + branches.append(HgBranch(name=name, node_id=node_id)) + + return branches + + def tip(self) -> HgRevision: + """Return the `tip` node-id.""" + return self.log("tip")[0] + + def tags(self) -> List[HgTag]: + """Return the repository's tags as defined in the `.hgtags` file. + + `.hgtags` being like any other repository's tracked file, its content can vary + from revision to revision. The returned value therefore depends on the current + revision of the repository. + """ + hgtags = self._root / ".hgtags" + + tags = {} + + if hgtags.is_file(): + for line in hgtags.read_bytes().splitlines(): + match = TAG_PATTERN.match(line) + if match is None: + continue + node_id, name = match.groups() + tags[node_id] = name + + return [HgTag(name=name, node_id=node_id) for node_id, name in tags.items()] + + +@click.group() +@click.option( + "--directory", + "-d", + help=("Path to the Mercurial repository. If unset, the current directory is used"), +) +@click.pass_context +def main(ctx, directory=None): + """Compute the Software Heritage persistent identifier (SWHID) for the given + source code object(s). + + For more details about SWHIDs see: + + https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html + """ + # ensure that ctx.obj exists and is a dict (in case `cli()` is called + # by means other than the `if` block below) + ctx.ensure_object(dict) + + root = Path(directory) if directory else Path() + if not root.exists(): + raise IOError(f"{root!r} does not exists") + + ctx.obj["HG_ROOT"] = root + + +def identify_directory(path: Path) -> str: + """Return the SWHID of the given path.""" + uri = identify_object( + "directory", follow_symlinks=True, exclude_patterns=[".hg"], obj=str(path) + )[1] + return uri.split(":")[-1] + + +class RevisionIdentity(NamedTuple): + """Represent a swh revision identity.""" + + swhid: bytes + """SWHID raw bytes""" + + node_id: bytes + """node_id hex bytes""" + + directory_swhid: bytes + + def dir_uri(self) -> str: + """Return the SWHID uri of the revision's directory.""" + return f"swh:1:dir:{self.directory_swhid.hex()}\t{self.node_id.decode()}" + + def __str__(self) -> str: + """Return the string representation of a RevisionIdentity.""" + uri = swhid("revision", self.swhid.hex()) + return f"{uri}\t{self.node_id.decode()}" + + +def identify_revision( + hg: Hg, + rev: Optional[bytes] = None, + node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, +) -> Iterator[RevisionIdentity]: + """Return the repository revision identities. + + hg: A `Hg` repository instance + rev: An optional revision or Mercurial revsets (See `hg help revsets`) + If not provided all the repository revisions will be computed. + node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs + It will be updated in place with new mappings. + """ + from swh.model.hashutil import hash_to_bytes + from swh.model.model import Revision + + if node_id_2_swhid is None: + node_id_2_swhid = {} + + for revision in hg.log(rev): + data = revision.to_dict() + + hg.up(revision.node_id) + directory_swhid = hash_to_bytes(identify_directory(hg.root())) + data["directory"] = directory_swhid + + parents = [] + for parent in data["parents"]: + if parent not in node_id_2_swhid: + parent_revision = next(identify_revision(hg, parent, node_id_2_swhid)) + node_id_2_swhid[parent] = parent_revision.swhid + parents.append(node_id_2_swhid[parent]) + data["parents"] = parents + + revision_swhid = hash_to_bytes(Revision.from_dict(data).id) + node_id_2_swhid[revision.node_id] = revision_swhid + + yield RevisionIdentity( + swhid=revision_swhid, + node_id=revision.node_id, + directory_swhid=directory_swhid, + ) + + +class ReleaseIdentity(NamedTuple): + """Represent a swh release identity.""" + + swhid: str + """SWHID hex string""" + + node_id: bytes + """node_id hex bytes""" + + name: bytes + """name of the release""" + + def __str__(self) -> str: + """Return the string representation of a ReleaseIdentity.""" + uri = swhid("release", self.swhid) + return f"{uri}\t{self.name.decode()}" + + +def identify_release( + hg: Hg, node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, +) -> Iterator[ReleaseIdentity]: + """Return the repository's release identities. + + hg: A `Hg` repository instance + node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs + If not provided it will be computed using `identify_revision`. + """ + from swh.model.model import ObjectType, Release + + if node_id_2_swhid is None: + node_id_2_swhid = { + revision.node_id: revision.swhid for revision in identify_revision(hg) + } + + for tag in hg.tags(): + data = { + "name": tag.name, + "target": node_id_2_swhid[tag.node_id], + "target_type": ObjectType.REVISION.value, + "message": None, + "metadata": None, + "synthetic": False, + "author": {"name": None, "email": None, "fullname": b""}, + "date": None, + } + + release_swhid = Release.from_dict(data).id + + yield ReleaseIdentity( + swhid=release_swhid, node_id=tag.node_id, name=tag.name, + ) + + +def identify_snapshot( + hg: Hg, + node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, + releases: Optional[List[ReleaseIdentity]] = None, +) -> str: + """Return the repository snapshot identity. + + hg: A `Hg` repository instance + node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs + If not provided it will be computed using `identify_revision`. + release: an optional list of `ReleaseIdentity`. + If not provided it will be computed using `identify_release`. + """ + from swh.model.model import Snapshot, TargetType + + if node_id_2_swhid is None: + node_id_2_swhid = { + revision.node_id: revision.swhid for revision in identify_revision(hg) + } + + if releases is None: + releases = [release for release in identify_release(hg, node_id_2_swhid)] + + branches = {} + + tip = hg.tip() + branches[b"HEAD"] = { + "target": tip.branch(), + "target_type": TargetType.ALIAS.value, + } + + for branch in hg.branches(): + branches[branch.name] = { + "target": node_id_2_swhid[branch.node_id], + "target_type": TargetType.REVISION.value, + } + + for release in releases: + branches[release.name] = { + "target": release.swhid, + "target_type": TargetType.RELEASE.value, + } + + return Snapshot.from_dict({"branches": branches}).id + + +@main.command() +@click.argument("rev", required=False) +@click.pass_context +def revision(ctx, rev): + """Compute the SWHID of a given revision. + + If specified REV allow to select a single or multiple revisions + (using the Mercurial revsets language: `hg help revsets`) + """ + hg = Hg(ctx.obj["HG_ROOT"]) + + for identity in identify_revision(hg, rev): + click.echo(identity) + + +@main.command() +@click.pass_context +def snapshot(ctx): + """Compute the SWHID of the snapshot.""" + root = ctx.obj["HG_ROOT"] + hg = Hg(root) + + snapshot_swhid = identify_snapshot(hg) + + uri = swhid("snapshot", snapshot_swhid) + click.echo(f"{uri}\t{root}") + + +@main.command() +@click.pass_context +def all(ctx): + """Compute the SWHID of all the repository objects.""" + root = ctx.obj["HG_ROOT"] + hg = Hg(root) + + dir_uris = [] + rev_uris = [] + rel_uris = [] + + node_id_2_swhid = {} + for revision in identify_revision(hg): + dir_uris.append(revision.dir_uri()) + rev_uris.append(str(revision)) + node_id_2_swhid[revision.node_id] = revision.swhid + + releases = [] + for release in identify_release(hg, node_id_2_swhid): + rel_uris.append(str(release)) + releases.append(release) + + snapshot_swhid = identify_snapshot(hg, node_id_2_swhid, releases) + + for uri in dir_uris + rev_uris + rel_uris: + click.echo(uri) + + uri = swhid("snapshot", snapshot_swhid) + click.echo(f"{uri}\t{root}") + + +if __name__ == "__main__": + main() diff --git a/swh/loader/mercurial/tests/test_identify.py b/swh/loader/mercurial/tests/test_identify.py new file mode 100644 --- /dev/null +++ b/swh/loader/mercurial/tests/test_identify.py @@ -0,0 +1,74 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +from textwrap import dedent +from urllib.parse import urlsplit + +from click.testing import CliRunner + +from swh.loader.mercurial.identify import main +from swh.loader.tests import prepare_repository_from_archive + + +def test_all_revisions(datadir: str, tmp_path: str): + archive_name = "hello" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + directory = urlsplit(repo_url).path + + runner = CliRunner() + result = runner.invoke(main, ["-d", directory, "revision"]) + + expected = dedent( + """ + swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9 + swh:1:rev:8dd3db5d5519e4947f035d141581d304565372d2\t82e55d328c8ca4ee16520036c0aaace03a5beb65 + swh:1:rev:c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27\tb985ae4a07e12ac662f45a171e2d42b13be5b50c + """ + ).lstrip() + assert result.output == expected + + +def test_single_revision(datadir: str, tmp_path: str): + archive_name = "hello" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + directory = urlsplit(repo_url).path + + runner = CliRunner() + result = runner.invoke( + main, ["-d", directory, "revision", "0a04b987be5ae354b710cefeba0e2d9de7ad41a9"] + ) + + expected = ( + "swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940" + "\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9\n" + ) + assert result.output == expected + + +def test_all(datadir: str, tmp_path: str): + archive_name = "hello" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + directory = urlsplit(repo_url).path + + runner = CliRunner() + result = runner.invoke(main, ["-d", directory, "all"]) + + expected = dedent( + f""" + swh:1:dir:43d727f2f3f2f7cb3b098ddad1d7038464a4cee2\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9 + swh:1:dir:b3f85f210ff86d334575f64cb01c5bf49895b63e\t82e55d328c8ca4ee16520036c0aaace03a5beb65 + swh:1:dir:8f2be433c945384c85920a8e60f2a68d2c0f20fb\tb985ae4a07e12ac662f45a171e2d42b13be5b50c + swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9 + swh:1:rev:8dd3db5d5519e4947f035d141581d304565372d2\t82e55d328c8ca4ee16520036c0aaace03a5beb65 + swh:1:rev:c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27\tb985ae4a07e12ac662f45a171e2d42b13be5b50c + swh:1:rel:515c4d72e089404356d0f4b39d60f948b8999140\t0.1 + swh:1:snp:d35668e02e2ba4321dc951cd308cf883786f918a\t{directory} + """ + ).lstrip() + assert result.output == expected