Page MenuHomeSoftware Heritage

D4216.id15471.diff
No OneTemporary

D4216.id15471.diff

diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -8,3 +8,4 @@
python-hglib
retrying
sqlitedict
+mercurial
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -54,6 +54,8 @@
entry_points="""
[swh.workers]
loader.mercurial=swh.loader.mercurial:register
+ [console_scripts]
+ swh-hg-identify=swh.loader.mercurial.identify:main
""",
classifiers=[
"Programming Language :: Python :: 3",
diff --git a/swh/loader/mercurial/identify.py b/swh/loader/mercurial/identify.py
new file mode 100644
--- /dev/null
+++ b/swh/loader/mercurial/identify.py
@@ -0,0 +1,533 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import json
+import re
+import subprocess
+from codecs import escape_decode # type: ignore
+from pathlib import Path
+from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union
+
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
+import click
+
+from swh.model.cli import identify_object
+from swh.model.hashutil import hash_to_bytehex
+from swh.model.identifiers import normalize_timestamp, swhid
+from swh.model.model import RevisionType
+
+TAG_PATTERN = re.compile(b"([0-9A-Fa-f]{40}) +(.+)")
+
+
+class HgAuthor(NamedTuple):
+ """Represent a Mercurial revision author."""
+
+ fullname: bytes
+ """full name of the author"""
+
+ name: Optional[bytes]
+ """name of the author"""
+
+ email: Optional[bytes]
+ """email of the author"""
+
+ @staticmethod
+ def from_bytes(data: bytes) -> "HgAuthor":
+ """Convert bytes to an HgAuthor named tuple.
+
+ Expected format: "name <email>"
+ """
+ from swh.loader.mercurial.converters import parse_author
+
+ result = parse_author(data)
+ return HgAuthor(
+ fullname=result["fullname"], name=result["name"], email=result["email"]
+ )
+
+ def to_dict(self) -> Dict[str, Optional[bytes]]:
+ return {"fullname": self.fullname, "name": self.name, "email": self.email}
+
+
+HG_REVISION_TEMPLATE = "\n".join(
+ [
+ "node_id:{node}",
+ "author:{author}",
+ "timestamp_offset:{date|json}",
+ "p1:{p1.node}",
+ "p2:{p2.node}",
+ "extras:{join(extras, '\nextras:')}",
+ ]
+) # Log template for HgRevision.from_bytes
+
+NULL_NODE_ID = b"0" * 40 # Value used when no parent
+
+
+class HgRevision(NamedTuple):
+ """Represent a Mercurial revision."""
+
+ node_id: bytes
+ """raw bytes of the revision hash"""
+
+ author: HgAuthor
+ """author of the revision"""
+
+ timestamp: bytes
+ """timestamp of the revision"""
+
+ offset: bytes
+ """offset of the revision"""
+
+ parents: List[bytes]
+ """hex bytes of the revision's parents"""
+
+ extras: Dict[bytes, bytes]
+ """metadata of the revision"""
+
+ description: bytes
+ """description of the revision"""
+
+ @staticmethod
+ def from_bytes(data: bytes, description: bytes) -> "HgRevision":
+ """Convert bytes to an HgRevision named tuple.
+
+ Expected data format:
+ '''
+ node_id:{node}
+ author:{author}
+ timestamp_offset:[{timestamp}, {offset}]
+ p1:{p1}
+ p2:{p2}
+ extras:{key1}={value1}
+ ...
+ extras:{keyn}={value}
+ '''
+
+ """
+ lines = data.split(b"\n")
+ tuples = [line.split(b":", 1) for line in lines]
+ fields: Dict[str, Any] = {
+ "parents": [],
+ "extras": {},
+ "description": description,
+ }
+ for key, value in tuples:
+ if key == b"timestamp_offset":
+ timestamp, offset = json.loads(value)
+ fields["timestamp"] = timestamp
+ fields["offset"] = offset
+ elif key in (b"p1", b"p2"):
+ if value != NULL_NODE_ID:
+ fields["parents"].append(value)
+ elif key == b"extras":
+ extra_key, extra_value = value.split(b"=", 1)
+ fields["extras"][extra_key] = extra_value
+ elif key == b"author":
+ fields["author"] = HgAuthor.from_bytes(value)
+ else:
+ fields[key.decode()] = value
+
+ return HgRevision(**fields)
+
+ def branch(self) -> bytes:
+ return self.extras.get(b"branch", b"default")
+
+ def to_dict(self) -> Dict:
+ """Convert a HgRevision to a dict for SWHID computation"""
+ date = normalize_timestamp(int(self.timestamp))
+
+ extra_headers = [
+ (b"time_offset_seconds", str(self.offset).encode("utf-8")),
+ ]
+
+ for key, value in self.extras.items():
+ if key == b"branch" and value == b"default":
+ # branch default is skipped to match actual implementation
+ continue
+ if key == b"transplant_source":
+ # transplant_source is converted to hex to match actual implementation
+ value = hash_to_bytehex(escape_decode(value)[0])
+ extra_headers.append((key, value))
+
+ author = self.author.to_dict()
+
+ return {
+ "author": author,
+ "date": date,
+ "committer": author,
+ "committer_date": date,
+ "type": RevisionType.MERCURIAL.value,
+ "message": self.description,
+ "metadata": {"node": self.node_id},
+ "extra_headers": tuple(extra_headers),
+ "synthetic": False,
+ "parents": self.parents,
+ }
+
+
+class HgBranch(NamedTuple):
+ """Represent a Mercurial branch."""
+
+ name: bytes
+ """name of the branch"""
+
+ node_id: bytes
+ """row bytes of the target revision hash"""
+
+
+class HgTag(NamedTuple):
+ """Represent a Mercurial tag."""
+
+ name: bytes
+ """name of the tag"""
+
+ node_id: bytes
+ """hex bytes of the target revision"""
+
+
+class Hg:
+ """Provide methods to extract data from a Mercurial repository."""
+
+ def __init__(self, repository_root: Path) -> None:
+ self._root = repository_root
+
+ def _output(self, *args) -> bytes:
+ """Return the outpout of a `hg` call."""
+ return subprocess.check_output(["hg", *args], cwd=self._root)
+
+ def _call(self, *args) -> None:
+ """Perform a `hg` call."""
+ subprocess.check_call(
+ ["hg", *args],
+ cwd=self._root,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ )
+
+ def root(self) -> Path:
+ """Return the root of the Mercurial repository."""
+ return self._root
+
+ def log(self, rev: Optional[Union[bytes, str]] = None) -> List[HgRevision]:
+ """Return the specified revisions of the Mercurial repository.
+
+ Mercurial revsets are supported. (See `hg help revsets`)
+
+ If no revision range is specified, return all revisions".
+ """
+ if rev:
+ node_ids = self._output("log", "-r", rev, "-T", "{node}\n").splitlines()
+ else:
+ node_ids = self._output("log", "-T", "{node}\n").splitlines()
+
+ revisions = [self._revision(node_id) for node_id in reversed(node_ids)]
+
+ return revisions
+
+ def _revision(self, revision: bytes) -> HgRevision:
+ data = self._output("log", "-r", revision, "-T", HG_REVISION_TEMPLATE)
+
+ # hg log strips the description so the raw description has to be taken
+ # from debugdata
+ _, desc = self._output("debugdata", "-c", revision).split(b"\n\n", 1)
+
+ return HgRevision.from_bytes(data, desc)
+
+ def up(self, rev: bytes) -> None:
+ """Update the repository working directory to the specified revision."""
+ self._call("up", rev)
+
+ def branches(self) -> List[HgBranch]:
+ """List the repository named branches."""
+ output = self._output("branches", "-T", "{branch}\n{node}\n\n").strip()
+
+ branches = []
+
+ for block in output.split(b"\n\n"):
+ name, node_id = block.splitlines()
+ branches.append(HgBranch(name=name, node_id=node_id))
+
+ return branches
+
+ def tip(self) -> HgRevision:
+ """Return the `tip` revision."""
+ return self.log("tip")[0]
+
+ def tags(self) -> List[HgTag]:
+ """Return the repository tags at the current revision."""
+ hgtags = self._root / ".hgtags"
+
+ tags = {}
+
+ if hgtags.is_file():
+ for line in hgtags.read_bytes().splitlines():
+ match = TAG_PATTERN.match(line)
+ if match is None:
+ continue
+ node_id, name = match.groups()
+ tags[node_id] = name
+
+ return [HgTag(name=name, node_id=node_id) for node_id, name in tags.items()]
+
+
+@click.group()
+@click.option(
+ "--directory",
+ "-d",
+ help=("Path to the Mercurial repository. If unset, the current directory is used"),
+)
+@click.pass_context
+def main(ctx, directory=None):
+ """Compute the Software Heritage persistent identifier (SWHID) for the given
+ source code object(s).
+
+ For more details about SWHIDs see:
+
+ https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html
+ """
+ # ensure that ctx.obj exists and is a dict (in case `cli()` is called
+ # by means other than the `if` block below)
+ ctx.ensure_object(dict)
+
+ root = Path(directory) if directory else Path()
+ if not root.exists():
+ raise IOError(f"{root!r} does not exists")
+
+ ctx.obj["HG_ROOT"] = root
+
+
+def identify_directory(path: Path) -> str:
+ """Return the SWHID of the given path."""
+ uri = identify_object(
+ "directory", follow_symlinks=True, exclude_patterns=[".hg"], obj=str(path)
+ )[1]
+ return uri.split(":")[-1]
+
+
+class RevisionIdentity(NamedTuple):
+ """Represent a swh revision identity."""
+
+ swhid: bytes
+ """SWHID raw bytes"""
+
+ node_id: bytes
+ """node_id hex bytes"""
+
+ directory_swhid: bytes
+
+ def dir_uri(self) -> str:
+ """Return the SWHID uri of the revision's directory."""
+ return f"swh:1:dir:{self.directory_swhid.hex()}\t{self.node_id.decode()}"
+
+ def __str__(self) -> str:
+ """Return the string representation of a RevisionIdentity."""
+ uri = swhid("revision", self.swhid.hex())
+ return f"{uri}\t{self.node_id.decode()}"
+
+
+def identify_revision(
+ hg: Hg,
+ rev: Optional[bytes] = None,
+ node_id_2_swhid: Optional[Dict[bytes, bytes]] = None,
+) -> Iterator[RevisionIdentity]:
+ """Return the repository revision identities.
+
+ hg: A `Hg` repository instance
+ rev: An optional revision or Mercurial revsets (See `hg help revsets`)
+ If not provided all the repository revisions will be computed.
+ node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs
+ It will be updated in place with new mappings.
+ """
+ from swh.model.hashutil import hash_to_bytes
+ from swh.model.model import Revision
+
+ if node_id_2_swhid is None:
+ node_id_2_swhid = {}
+
+ for revision in hg.log(rev):
+ data = revision.to_dict()
+
+ hg.up(revision.node_id)
+ directory_swhid = hash_to_bytes(identify_directory(hg.root()))
+ data["directory"] = directory_swhid
+
+ parents = []
+ for parent in data["parents"]:
+ if parent not in node_id_2_swhid:
+ parent_revision = next(identify_revision(hg, parent, node_id_2_swhid))
+ node_id_2_swhid[parent] = parent_revision.swhid
+ parents.append(node_id_2_swhid[parent])
+ data["parents"] = parents
+
+ revision_swhid = hash_to_bytes(Revision.from_dict(data).id)
+ node_id_2_swhid[revision.node_id] = revision_swhid
+
+ yield RevisionIdentity(
+ swhid=revision_swhid,
+ node_id=revision.node_id,
+ directory_swhid=directory_swhid,
+ )
+
+
+class ReleaseIdentity(NamedTuple):
+ """Represent a swh release identity."""
+
+ swhid: str
+ """SWHID hex string"""
+
+ node_id: bytes
+ """node_id hex bytes"""
+
+ name: bytes
+ """name of the release"""
+
+ def __str__(self) -> str:
+ """Return the string representation of a ReleaseIdentity."""
+ uri = swhid("release", self.swhid)
+ return f"{uri}\t{self.name.decode()}"
+
+
+def identify_release(
+ hg: Hg, node_id_2_swhid: Optional[Dict[bytes, bytes]] = None,
+) -> Iterator[ReleaseIdentity]:
+ """Return the repository's release identities.
+
+ hg: A `Hg` repository instance
+ node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs
+ If not provided it will be computed using `identify_revision`.
+ """
+ from swh.model.model import ObjectType, Release
+
+ if node_id_2_swhid is None:
+ node_id_2_swhid = {
+ revision.node_id: revision.swhid for revision in identify_revision(hg)
+ }
+
+ for tag in hg.tags():
+ data = {
+ "name": tag.name,
+ "target": node_id_2_swhid[tag.node_id],
+ "target_type": ObjectType.REVISION.value,
+ "message": None,
+ "metadata": None,
+ "synthetic": False,
+ "author": {"name": None, "email": None, "fullname": b""},
+ "date": None,
+ }
+
+ release_swhid = Release.from_dict(data).id
+
+ yield ReleaseIdentity(
+ swhid=release_swhid, node_id=tag.node_id, name=tag.name,
+ )
+
+
+def identify_snapshot(
+ hg: Hg,
+ node_id_2_swhid: Optional[Dict[bytes, bytes]] = None,
+ releases: Optional[List[ReleaseIdentity]] = None,
+) -> str:
+ """Return the repository snapshot identity.
+
+ hg: A `Hg` repository instance
+ node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs
+ If not provided it will be computed using `identify_revision`.
+ release: an optional list of `ReleaseIdentity`.
+ If not provided it will be computed using `identify_release`.
+ """
+ from swh.model.model import Snapshot, TargetType
+
+ if node_id_2_swhid is None:
+ node_id_2_swhid = {
+ revision.node_id: revision.swhid for revision in identify_revision(hg)
+ }
+
+ if releases is None:
+ releases = [release for release in identify_release(hg, node_id_2_swhid)]
+
+ branches = {}
+
+ tip = hg.tip()
+ branches[b"HEAD"] = {
+ "target": tip.branch(),
+ "target_type": TargetType.ALIAS.value,
+ }
+
+ for branch in hg.branches():
+ branches[branch.name] = {
+ "target": node_id_2_swhid[branch.node_id],
+ "target_type": TargetType.REVISION.value,
+ }
+
+ for release in releases:
+ branches[release.name] = {
+ "target": release.swhid,
+ "target_type": TargetType.RELEASE.value,
+ }
+
+ return Snapshot.from_dict({"branches": branches}).id
+
+
+@main.command()
+@click.argument("rev", required=False)
+@click.pass_context
+def revision(ctx, rev):
+ """Compute the SWHID of a given revision.
+
+ If specified REV allow to select a single or multiple revisions
+ (using the Mercurial revsets language: `hg help revsets`)
+ """
+ hg = Hg(ctx.obj["HG_ROOT"])
+
+ for identity in identify_revision(hg, rev):
+ click.echo(identity)
+
+
+@main.command()
+@click.pass_context
+def snapshot(ctx):
+ """Compute the SWHID of the snapshot."""
+ root = ctx.obj["HG_ROOT"]
+ hg = Hg(root)
+
+ snapshot_swhid = identify_snapshot(hg)
+
+ uri = swhid("snapshot", snapshot_swhid)
+ click.echo(f"{uri}\t{root}")
+
+
+@main.command()
+@click.pass_context
+def all(ctx):
+ """Compute the SWHID of all the repository objects."""
+ root = ctx.obj["HG_ROOT"]
+ hg = Hg(root)
+
+ dir_uris = []
+ rev_uris = []
+ rel_uris = []
+
+ node_id_2_swhid = {}
+ for revision in identify_revision(hg):
+ dir_uris.append(revision.dir_uri())
+ rev_uris.append(str(revision))
+ node_id_2_swhid[revision.node_id] = revision.swhid
+
+ releases = []
+ for release in identify_release(hg, node_id_2_swhid):
+ rel_uris.append(str(release))
+ releases.append(release)
+
+ snapshot_swhid = identify_snapshot(hg, node_id_2_swhid, releases)
+
+ for uri in dir_uris + rev_uris + rel_uris:
+ click.echo(uri)
+
+ uri = swhid("snapshot", snapshot_swhid)
+ click.echo(f"{uri}\t{root}")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/swh/loader/mercurial/tests/test_identify.py b/swh/loader/mercurial/tests/test_identify.py
new file mode 100644
--- /dev/null
+++ b/swh/loader/mercurial/tests/test_identify.py
@@ -0,0 +1,93 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import os
+from urllib.parse import urlsplit
+
+from click.testing import CliRunner
+
+from swh.loader.mercurial.identify import main
+from swh.loader.tests import prepare_repository_from_archive
+
+
+def test_all_revisions(datadir: str, tmp_path: str):
+ archive_name = "hello"
+ archive_path = os.path.join(datadir, f"{archive_name}.tgz")
+ repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
+ directory = urlsplit(repo_url).path
+
+ runner = CliRunner()
+ result = runner.invoke(main, ["-d", directory, "revision"])
+
+ expected = (
+ "\n".join(
+ [
+ "swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940"
+ "\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9",
+ "swh:1:rev:8dd3db5d5519e4947f035d141581d304565372d2"
+ "\t82e55d328c8ca4ee16520036c0aaace03a5beb65",
+ "swh:1:rev:c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27"
+ "\tb985ae4a07e12ac662f45a171e2d42b13be5b50c",
+ ]
+ )
+ + "\n"
+ )
+ assert result.output == expected
+
+
+def test_single_revision(datadir: str, tmp_path: str):
+ archive_name = "hello"
+ archive_path = os.path.join(datadir, f"{archive_name}.tgz")
+ repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
+ directory = urlsplit(repo_url).path
+
+ runner = CliRunner()
+ result = runner.invoke(
+ main, ["-d", directory, "revision", "0a04b987be5ae354b710cefeba0e2d9de7ad41a9"]
+ )
+
+ expected = (
+ "\n".join(
+ [
+ "swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940"
+ "\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9",
+ ]
+ )
+ + "\n"
+ )
+ assert result.output == expected
+
+
+def test_all(datadir: str, tmp_path: str):
+ archive_name = "hello"
+ archive_path = os.path.join(datadir, f"{archive_name}.tgz")
+ repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
+ directory = urlsplit(repo_url).path
+
+ runner = CliRunner()
+ result = runner.invoke(main, ["-d", directory, "all"])
+
+ expected = (
+ "\n".join(
+ [
+ "swh:1:dir:43d727f2f3f2f7cb3b098ddad1d7038464a4cee2"
+ "\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9",
+ "swh:1:dir:b3f85f210ff86d334575f64cb01c5bf49895b63e"
+ "\t82e55d328c8ca4ee16520036c0aaace03a5beb65",
+ "swh:1:dir:8f2be433c945384c85920a8e60f2a68d2c0f20fb"
+ "\tb985ae4a07e12ac662f45a171e2d42b13be5b50c",
+ "swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940"
+ "\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9",
+ "swh:1:rev:8dd3db5d5519e4947f035d141581d304565372d2"
+ "\t82e55d328c8ca4ee16520036c0aaace03a5beb65",
+ "swh:1:rev:c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27"
+ "\tb985ae4a07e12ac662f45a171e2d42b13be5b50c",
+ "swh:1:rel:515c4d72e089404356d0f4b39d60f948b8999140\t0.1",
+ f"swh:1:snp:d35668e02e2ba4321dc951cd308cf883786f918a\t{directory}",
+ ]
+ )
+ + "\n"
+ )
+ assert result.output == expected

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 3:21 PM (12 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224895

Event Timeline