diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -55,6 +55,8 @@ entry_points=""" [swh.workers] loader.mercurial=swh.loader.mercurial:register + [console_scripts] + swh-hg-identify=swh.loader.mercurial.identify:main """, classifiers=[ "Programming Language :: Python :: 3", diff --git a/swh/loader/mercurial/identify.py b/swh/loader/mercurial/identify.py new file mode 100644 --- /dev/null +++ b/swh/loader/mercurial/identify.py @@ -0,0 +1,232 @@ +import json +import subprocess +from pathlib import Path +from typing import Dict, List, NamedTuple, Optional + +import click + +from swh.model.hashutil import hash_to_bytehex, hash_to_bytes +from swh.model.identifiers import normalize_timestamp, revision_identifier +from swh.model.model import RevisionType + + +class HgAuthor(NamedTuple): + name: Optional[bytes] + email: Optional[bytes] + + @staticmethod + def from_bytes(data: bytes) -> "HgAuthor": + if data.startswith(b"<") and data.endswith(b">"): + email = data[1:-1].strip() + return HgAuthor(name=None, email=email.strip()) + elif data.endswith(b">"): + name, email = data[:-1].split(b"<") + name = name.strip() + email = email.strip() + return HgAuthor(name=name, email=email) + else: + return HgAuthor(name=data.strip(), email=None) + + def to_dict(self) -> Dict[str, Optional[bytes]]: + if self.name and self.email: + fullname = self.name + b" <" + self.email + b">" + elif self.name and not self.email: + fullname = self.name + elif self.email and not self.name: + fullname = b"<" + self.email + b">" + + return {"name": self.name, "email": self.email, "fullname": fullname} + + +HG_REVISION_TEMPLATE = "\n".join( + [ + "node_id:{node}", + "author:{author}", + "timestamp_offset:{date|json}", + "p1:{p1.node}", + "p2:{p2.node}", + "extras:{extras|json}", + "{desc}", + ] +) +NULL_NODE_ID = b"0000000000000000000000000000000000000000" + + +class HgRevision(NamedTuple): + node_id: bytes + author: HgAuthor + timestamp: bytes + offset: bytes + parents: List[bytes] + extras: Dict[bytes, bytes] + description: bytes + + @staticmethod + def from_bytes(data: bytes) -> "HgRevision": + maxsplit = len(HG_REVISION_TEMPLATE.splitlines()) - 1 + *lines, description = data.split(b"\n", maxsplit=maxsplit) + tuples = [line.split(b":", 1) for line in lines] + fields = {key.decode(): value for key, value in tuples} + + timestamp, offset = json.loads(fields["timestamp_offset"]) + + parents = [] + if fields["p1"] != NULL_NODE_ID: + parents.append(fields["p1"]) + if fields["p2"] != NULL_NODE_ID: + parents.append(fields["p2"]) + + extras = { + key.encode(): value.encode() + for key, value in json.loads(fields["extras"]).items() + } + + return HgRevision( + node_id=fields["node_id"], + author=HgAuthor.from_bytes(fields["author"]), + timestamp=timestamp, + offset=offset, + parents=parents, + extras=extras, + description=description, + ) + + def to_dict(self) -> Dict: + date = normalize_timestamp(int(self.timestamp)) + extra_headers = [ + (b"time_offset_seconds", str(self.offset).encode("utf-8")), + ] + for key, value in self.extras.items(): + if key == b"branch" and value == b"default": + continue + if key == b"transplant_source": + value = hash_to_bytehex(value) + extra_headers.append((key, value)) + author = self.author.to_dict() + return { + "author": author, + "date": date, + "committer": author, + "committer_date": date, + "type": RevisionType.MERCURIAL.value, + "message": self.description, + "metadata": {"node": self.node_id}, + "extra_headers": tuple(extra_headers), + "synthetic": False, + "parents": self.parents, + } + + +class Hg: + def __init__(self, root: Path) -> None: + self._root = root + + def _output(self, *args) -> bytes: + return subprocess.check_output(["hg", *args], cwd=self._root) + + def _call(self, *args) -> None: + subprocess.check_call( + ["hg", *args], + cwd=self._root, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + + def root(self) -> Path: + return self._root + + def log(self, rev=None) -> List[HgRevision]: + if rev: + node_ids = self._output("log", "-r", rev, "-T", "{node}\n").splitlines() + else: + node_ids = self._output("log", "-T", "{node}\n").splitlines() + revisions = [self._revision(node_id) for node_id in reversed(node_ids)] + return revisions + + def _revision(self, revision) -> HgRevision: + data = self._output("log", "-r", revision, "-T", HG_REVISION_TEMPLATE) + return HgRevision.from_bytes(data) + + def up(self, rev): + self._call("up", rev) + + +@click.group() +@click.option( + "--directory", + "-d", + help=( + "Path to the cloned mercurial repository. " + "If unset, the current directory is used " + ), +) +@click.pass_context +def main(ctx, directory=None): + """Compute the Software Heritage persistent identifier (SWHID) for the given + source code object(s). + + For more details about SWHIDs see: + + https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html + """ + # ensure that ctx.obj exists and is a dict (in case `cli()` is called + # by means other than the `if` block below) + ctx.ensure_object(dict) + + root = Path(directory) if directory else Path() + if not root.exists(): + raise IOError(f"{root!r} does not exists") + + ctx.obj["HG_ROOT"] = root + + +def indentify_directory(path: Path) -> str: + output = subprocess.check_output( + ["swh-identify", "--exclude", ".hg", "."], cwd=path + ).decode() + return output.split()[0].split(":")[-1] + + +def identify_revision( + hg: Hg, rev: bytes, revisions_swhid: Optional[Dict[bytes, bytes]] = None, +): + if revisions_swhid is None: + revisions_swhid = {} + + for revision in hg.log(rev): + data = revision.to_dict() + + hg.up(revision.node_id) + data["directory"] = indentify_directory(hg.root()) + + parents = [] + for parent in data["parents"]: + if parent not in revisions_swhid: + parent_swhid, _ = next(identify_revision(hg, parent, revisions_swhid)) + revisions_swhid[parent] = parent_swhid + parents.append(revisions_swhid[parent]) + data["parents"] = parents + + revision_swhid = hash_to_bytes(revision_identifier(data)) + revisions_swhid[revision.node_id] = revision_swhid + + yield revision_swhid, revision.node_id + + +@main.command() +@click.argument("rev", required=False) +@click.pass_context +def revision(ctx, rev): + """Compute the SWHID of a given revision. + + If specified REV allow to select a single or multiple revisions + (using the mercurial reveset language: `hg help revsets`) + """ + hg = Hg(ctx.obj["HG_ROOT"]) + + for revision_swhid, node_id in identify_revision(hg, rev): + click.echo(f"swh:1:rev:{revision_swhid.hex()}\t{node_id.decode()}") + + +if __name__ == "__main__": + main()