Changeset View
Standalone View
swh/loader/mercurial/identify.py
- This file was added.
# Copyright (C) 2018-2020 The Software Heritage developers | |||||
marmoute: This should get a header comment. | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import json | |||||
import re | |||||
import subprocess | |||||
from codecs import escape_decode # type: ignore | |||||
from pathlib import Path | |||||
from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union | |||||
# WARNING: do not import unnecessary things here to keep cli startup time under | |||||
Done Inline ActionsCan you add a docstring to this class to highlight its roll and the input/output format ? marmoute: Can you add a docstring to this class to highlight its roll and the input/output format ? | |||||
# control | |||||
import click | |||||
from swh.model.hashutil import hash_to_bytehex | |||||
from swh.model.identifiers import normalize_timestamp, swhid | |||||
from swh.model.model import RevisionType | |||||
TAG_PATTERN = re.compile(b"([0-9A-Fa-f]{40}) +(.+)") | |||||
class HgAuthor(NamedTuple): | |||||
"""Represent a Mercurial revision author.""" | |||||
fullname: bytes | |||||
"""full name of the author""" | |||||
name: Optional[bytes] | |||||
"""name of the author""" | |||||
email: Optional[bytes] | |||||
"""email of the author""" | |||||
@staticmethod | |||||
def from_bytes(data: bytes) -> "HgAuthor": | |||||
"""Convert bytes to an HgAuthor named tuple. | |||||
Expected format: "name <email>" | |||||
""" | |||||
Done Inline ActionsIs this the byte format that SWH is expecting to generate a revision ID ? If so, please mention it in a comment. (and if now, what is this ?) marmoute: Is this the byte format that SWH is expecting to generate a revision ID ? If so, please mention… | |||||
from swh.loader.mercurial.converters import parse_author | |||||
result = parse_author(data) | |||||
return HgAuthor( | |||||
fullname=result["fullname"], name=result["name"], email=result["email"] | |||||
) | |||||
def to_dict(self) -> Dict[str, Optional[bytes]]: | |||||
return {"fullname": self.fullname, "name": self.name, "email": self.email} | |||||
Done Inline ActionsWhat about b"0" * 40 ? I would also name this NULL_NODE_HEX this is not a binary entry. marmoute: What about b"0" * 40 ? I would also name this NULL_NODE_HEX this is not a binary entry. | |||||
HG_REVISION_TEMPLATE = "\n".join( | |||||
[ | |||||
"node_id:{node}", | |||||
Done Inline ActionsSame feedback about docstring. marmoute: Same feedback about docstring. | |||||
"author:{author}", | |||||
"timestamp_offset:{date|json}", | |||||
"p1:{p1.node}", | |||||
"p2:{p2.node}", | |||||
"extras:{join(extras, '\nextras:')}", | |||||
] | |||||
) # Log template for HgRevision.from_bytes | |||||
Done Inline Actionsyou can use docstring """raw bytes...""" just below the instance variable. ardumont: you can use docstring """raw bytes...""" just below the instance variable. | |||||
NULL_NODE_ID = b"0" * 40 # Value used when no parent | |||||
Done Inline ActionsWe probably wants a docstring about the expected format here. marmoute: We probably wants a docstring about the expected format here. | |||||
class HgRevision(NamedTuple): | |||||
"""Represent a Mercurial revision.""" | |||||
node_id: bytes | |||||
"""raw bytes of the revision hash""" | |||||
Done Inline Actionsextra space. ardumont: extra space. | |||||
author: HgAuthor | |||||
"""author of the revision""" | |||||
timestamp: bytes | |||||
"""timestamp of the revision""" | |||||
offset: bytes | |||||
"""offset of the revision""" | |||||
parents: List[bytes] | |||||
"""hex bytes of the revision's parents""" | |||||
extras: Dict[bytes, bytes] | |||||
"""metadata of the revision""" | |||||
description: bytes | |||||
"""description of the revision""" | |||||
@staticmethod | |||||
def from_bytes(data: bytes, description: bytes) -> "HgRevision": | |||||
"""Convert bytes to an HgRevision named tuple. | |||||
Expected data format: | |||||
''' | |||||
node_id:{node} | |||||
author:{author} | |||||
timestamp_offset:[{timestamp}, {offset}] | |||||
p1:{p1} | |||||
p2:{p2} | |||||
extras:{key1}={value1} | |||||
... | |||||
extras:{keyn}={value} | |||||
''' | |||||
""" | |||||
lines = data.split(b"\n") | |||||
tuples = [line.split(b":", 1) for line in lines] | |||||
fields: Dict[str, Any] = { | |||||
"parents": [], | |||||
"extras": {}, | |||||
Done Inline Actionsfields["description"] = description return HgRevision(**fields) ? ardumont: ```
fields["description"] = description
return HgRevision(**fields)
```
? | |||||
"description": description, | |||||
} | |||||
for key, value in tuples: | |||||
if key == b"timestamp_offset": | |||||
timestamp, offset = json.loads(value) | |||||
fields["timestamp"] = timestamp | |||||
fields["offset"] = offset | |||||
elif key in (b"p1", b"p2"): | |||||
Done Inline Actionssame feedback about docstring. marmoute: same feedback about docstring. | |||||
if value != NULL_NODE_ID: | |||||
fields["parents"].append(value) | |||||
elif key == b"extras": | |||||
extra_key, extra_value = value.split(b"=", 1) | |||||
fields["extras"][extra_key] = extra_value | |||||
elif key == b"author": | |||||
fields["author"] = HgAuthor.from_bytes(value) | |||||
else: | |||||
fields[key.decode()] = value | |||||
return HgRevision(**fields) | |||||
def branch(self) -> bytes: | |||||
return self.extras.get(b"branch", b"default") | |||||
def to_dict(self) -> Dict: | |||||
"""Convert a HgRevision to a dict for SWHID computation""" | |||||
date = normalize_timestamp(int(self.timestamp)) | |||||
extra_headers = [ | |||||
(b"time_offset_seconds", str(self.offset).encode("utf-8")), | |||||
] | |||||
for key, value in self.extras.items(): | |||||
if key == b"branch" and value == b"default": | |||||
# branch default is skipped to match actual implementation | |||||
continue | |||||
if key == b"transplant_source": | |||||
# transplant_source is converted to hex to match actual implementation | |||||
value = hash_to_bytehex(escape_decode(value)[0]) | |||||
Not Done Inline Actions"Actual" feel weird here. Maybe "to match historical implementation" marmoute: "Actual" feel weird here. Maybe "to match historical implementation" | |||||
extra_headers.append((key, value)) | |||||
author = self.author.to_dict() | |||||
return { | |||||
"author": author, | |||||
"date": date, | |||||
"committer": author, | |||||
"committer_date": date, | |||||
Done Inline ActionsWhy "cloned" ? marmoute: Why "cloned" ? | |||||
"type": RevisionType.MERCURIAL.value, | |||||
"message": self.description, | |||||
"metadata": {"node": self.node_id}, | |||||
"extra_headers": tuple(extra_headers), | |||||
"synthetic": False, | |||||
"parents": self.parents, | |||||
} | |||||
class HgBranch(NamedTuple): | |||||
"""Represent a Mercurial branch.""" | |||||
name: bytes | |||||
"""name of the branch""" | |||||
node_id: bytes | |||||
"""row bytes of the target revision hash""" | |||||
class HgTag(NamedTuple): | |||||
"""Represent a Mercurial tag.""" | |||||
name: bytes | |||||
"""name of the tag""" | |||||
Done Inline Actionssame feedback about docstring. marmoute: same feedback about docstring. | |||||
node_id: bytes | |||||
"""hex bytes of the target revision""" | |||||
class Hg: | |||||
"""Provide methods to extract data from a Mercurial repository.""" | |||||
Done Inline Actions(docstring) marmoute: (docstring) | |||||
def __init__(self, repository_root: Path) -> None: | |||||
self._root = repository_root | |||||
def _output(self, *args) -> bytes: | |||||
"""Return the outpout of a `hg` call.""" | |||||
return subprocess.check_output(["hg", *args], cwd=self._root) | |||||
ardumontUnsubmitted Done Inline ActionsNote to self, add the hg runtime dependency in the debian branch (if not there already, don't remember). ardumont: Note to self, add the hg runtime dependency in the debian branch (if not there already, don't… | |||||
def _call(self, *args) -> None: | |||||
"""Perform a `hg` call.""" | |||||
subprocess.check_call( | |||||
["hg", *args], | |||||
cwd=self._root, | |||||
stderr=subprocess.PIPE, | |||||
stdout=subprocess.PIPE, | |||||
) | |||||
def root(self) -> Path: | |||||
"""Return the root of the Mercurial repository.""" | |||||
return self._root | |||||
def log(self, rev: Optional[Union[bytes, str]] = None) -> List[HgRevision]: | |||||
"""Return the specified revisions of the Mercurial repository. | |||||
Mercurial revsets are supported. (See `hg help revsets`) | |||||
If no revision range is specified, return all revisions". | |||||
""" | |||||
if rev: | |||||
node_ids = self._output("log", "-r", rev, "-T", "{node}\n").splitlines() | |||||
else: | |||||
node_ids = self._output("log", "-T", "{node}\n").splitlines() | |||||
Done Inline ActionsJust spotted a small typo: s/reveset/revset marmoute: Just spotted a small typo: `s/reveset/revset` | |||||
revisions = [self._revision(node_id) for node_id in reversed(node_ids)] | |||||
return revisions | |||||
def _revision(self, revision: bytes) -> HgRevision: | |||||
data = self._output("log", "-r", revision, "-T", HG_REVISION_TEMPLATE) | |||||
# hg log strips the description so the raw description has to be taken | |||||
# from debugdata | |||||
_, desc = self._output("debugdata", "-c", revision).split(b"\n\n", 1) | |||||
Not Done Inline ActionsCan you explain the "\n\n" part in a small comment ? marmoute: Can you explain the "\n\n" part in a small comment ? | |||||
return HgRevision.from_bytes(data, desc) | |||||
def up(self, rev: bytes) -> None: | |||||
"""Update the repository working directory to the specified revision.""" | |||||
self._call("up", rev) | |||||
def branches(self) -> List[HgBranch]: | |||||
"""List the repository named branches.""" | |||||
Done Inline ActionsI recall the hgtags can be noisy in its content (possibly due to merge badly dealt with by users). Related to T970 rDLDHGe0b48c6c6e9aa3329c9058e1a39da38ff801714a ardumont: I recall the hgtags can be noisy in its content (possibly due to merge badly dealt with by… | |||||
Done Inline Actionsthat same tag name can appears multiple time in the tag file. In this case, the latest value should be used. The previous entries are here to indicate which older value have been overwritten. (We should add a case of this in the new tests (when it gets to the new tests)) marmoute: that same tag name can appears multiple time in the tag file. In this case, the latest value… | |||||
output = self._output("branches", "-T", "{branch}\n{node}\n\n").strip() | |||||
branches = [] | |||||
for block in output.split(b"\n\n"): | |||||
name, node_id = block.splitlines() | |||||
branches.append(HgBranch(name=name, node_id=node_id)) | |||||
return branches | |||||
Done Inline Actionsextra space ardumont: extra space | |||||
def tip(self) -> HgRevision: | |||||
"""Return the `tip` revision.""" | |||||
return self.log("tip")[0] | |||||
Not Done Inline ActionsUse "node-id" instead of revision here to make it clearer that we are talking about binary node ID marmoute: Use "node-id" instead of revision here to make it clearer that we are talking about binary node… | |||||
def tags(self) -> List[HgTag]: | |||||
"""Return the repository tags at the current revision.""" | |||||
hgtags = self._root / ".hgtags" | |||||
Not Done Inline ActionsWhat does "at the current revision" means in this context ? marmoute: What does "at the current revision" means in this context ? | |||||
tags = {} | |||||
if hgtags.is_file(): | |||||
for line in hgtags.read_bytes().splitlines(): | |||||
match = TAG_PATTERN.match(line) | |||||
if match is None: | |||||
continue | |||||
node_id, name = match.groups() | |||||
tags[node_id] = name | |||||
return [HgTag(name=name, node_id=node_id) for node_id, name in tags.items()] | |||||
@click.group() | |||||
@click.option( | |||||
"--directory", | |||||
"-d", | |||||
help=("Path to the Mercurial repository. If unset, the current directory is used"), | |||||
) | |||||
@click.pass_context | |||||
def main(ctx, directory=None): | |||||
"""Compute the Software Heritage persistent identifier (SWHID) for the given | |||||
source code object(s). | |||||
For more details about SWHIDs see: | |||||
https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html | |||||
""" | |||||
# ensure that ctx.obj exists and is a dict (in case `cli()` is called | |||||
# by means other than the `if` block below) | |||||
Done Inline Actionsmight as well use swh.model.identifiers.swhid function [1] instead of hard-coding values: even if that's giving a mouthful from swh.model.identifiers import swhid return f"{swhid("directory", self.directory_swhid.hex())}\t{self.node_id.decode()}" (or use an intermediary variable, as you wish...) [1] https://forge.softwareheritage.org/source/swh-model/browse/master/swh/model/identifiers.py$767-772 ardumont: might as well use swh.model.identifiers.swhid function [1] instead of hard-coding values:
even… | |||||
ctx.ensure_object(dict) | |||||
root = Path(directory) if directory else Path() | |||||
if not root.exists(): | |||||
Done Inline Actionssame ardumont: same | |||||
raise IOError(f"{root!r} does not exists") | |||||
ctx.obj["HG_ROOT"] = root | |||||
def identify_directory(path: Path) -> str: | |||||
"""Return the SWHID of the given path.""" | |||||
output = subprocess.check_output( | |||||
["swh-identify", "--exclude", ".hg", "."], cwd=path | |||||
).decode() | |||||
return output.split()[0].split(":")[-1] | |||||
Done Inline ActionsWhy do you use the swh-identify command in a subprocess rather than just using the identify_object function from swh.model.cli? douardda: Why do you use the `swh-identify` command in a subprocess rather than just using the… | |||||
class RevisionIdentity(NamedTuple): | |||||
"""Represent a swh revision identity.""" | |||||
swhid: bytes | |||||
"""SWHID raw bytes""" | |||||
node_id: bytes | |||||
"""node_id hex bytes""" | |||||
directory_swhid: bytes | |||||
def dir_uri(self) -> str: | |||||
"""Return the SWHID uri of the revision's directory.""" | |||||
return f"swh:1:dir:{self.directory_swhid.hex()}\t{self.node_id.decode()}" | |||||
def __str__(self) -> str: | |||||
"""Return the string representation of a RevisionIdentity.""" | |||||
uri = swhid("revision", self.swhid.hex()) | |||||
return f"{uri}\t{self.node_id.decode()}" | |||||
Done Inline Actionscurious me, is that a form of destructuring to say, give me the first element from the result, and discard the rest? it looks like it, TIL ;) (oh and that works because we are dealing with NamedTuple, neat) ardumont: curious me, is that a form of destructuring to say, give me the first element from the result… | |||||
Done Inline ActionsMade this cleared and not relying on the field declaration order. acezar: Made this cleared and not relying on the field declaration order. | |||||
def identify_revision( | |||||
hg: Hg, | |||||
rev: Optional[bytes] = None, | |||||
node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, | |||||
) -> Iterator[RevisionIdentity]: | |||||
"""Return the repository revision identities. | |||||
hg: A `Hg` repository instance | |||||
rev: An optional revision or Mercurial revsets (See `hg help revsets`) | |||||
If not provided all the repository revisions will be computed. | |||||
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs | |||||
It will be updated in place with new mappings. | |||||
""" | |||||
from swh.model.hashutil import hash_to_bytes | |||||
from swh.model.model import Revision | |||||
if node_id_2_swhid is None: | |||||
node_id_2_swhid = {} | |||||
for revision in hg.log(rev): | |||||
data = revision.to_dict() | |||||
Done Inline Actionssame about the model swhid function use. ardumont: same about the model swhid function use. | |||||
hg.up(revision.node_id) | |||||
directory_swhid = hash_to_bytes(identify_directory(hg.root())) | |||||
data["directory"] = directory_swhid | |||||
parents = [] | |||||
for parent in data["parents"]: | |||||
if parent not in node_id_2_swhid: | |||||
parent_revision = next(identify_revision(hg, parent, node_id_2_swhid)) | |||||
node_id_2_swhid[parent] = parent_revision.swhid | |||||
parents.append(node_id_2_swhid[parent]) | |||||
data["parents"] = parents | |||||
revision_swhid = hash_to_bytes(Revision.from_dict(data).id) | |||||
node_id_2_swhid[revision.node_id] = revision_swhid | |||||
yield RevisionIdentity( | |||||
swhid=revision_swhid, | |||||
node_id=revision.node_id, | |||||
directory_swhid=directory_swhid, | |||||
) | |||||
class ReleaseIdentity(NamedTuple): | |||||
"""Represent a swh release identity.""" | |||||
swhid: str | |||||
"""SWHID hex string""" | |||||
node_id: bytes | |||||
"""node_id hex bytes""" | |||||
name: bytes | |||||
"""name of the release""" | |||||
def __str__(self) -> str: | |||||
"""Return the string representation of a ReleaseIdentity.""" | |||||
uri = swhid("release", self.swhid) | |||||
return f"{uri}\t{self.name.decode()}" | |||||
def identify_release( | |||||
hg: Hg, node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, | |||||
) -> Iterator[ReleaseIdentity]: | |||||
"""Return the repository's release identities. | |||||
Done Inline Actionssnapshot identifiers. you might want to align to use identities instead of identifiers as you use elsewhere or use identifiers everywhere... ;) ardumont: snapshot identifiers.
you might want to align to use `identities` instead of `identifiers` as… | |||||
hg: A `Hg` repository instance | |||||
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs | |||||
If not provided it will be computed using `identify_revision`. | |||||
""" | |||||
from swh.model.model import ObjectType, Release | |||||
if node_id_2_swhid is None: | |||||
node_id_2_swhid = { | |||||
revision.node_id: revision.swhid for revision in identify_revision(hg) | |||||
} | |||||
for tag in hg.tags(): | |||||
data = { | |||||
"name": tag.name, | |||||
"target": node_id_2_swhid[tag.node_id], | |||||
"target_type": ObjectType.REVISION.value, | |||||
"message": None, | |||||
"metadata": None, | |||||
"synthetic": False, | |||||
"author": {"name": None, "email": None, "fullname": b""}, | |||||
"date": None, | |||||
} | |||||
release_swhid = Release.from_dict(data).id | |||||
yield ReleaseIdentity( | |||||
swhid=release_swhid, node_id=tag.node_id, name=tag.name, | |||||
) | |||||
def identify_snapshot( | |||||
hg: Hg, | |||||
node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, | |||||
releases: Optional[List[ReleaseIdentity]] = None, | |||||
) -> str: | |||||
"""Return the repository snapshot identity. | |||||
hg: A `Hg` repository instance | |||||
Done Inline Actionsas a heads up, I recall some modifications is currently happening on the module swh.model.identifiers. [1] To avoid some breakage on this in a near future, prefer using the swh data model object instead. from swh.model.model import Snapshot, SnapshotBranch branches[b"HEAD"] = SnapshotBranch(target=tip.branch(), target_type=TargetType.ALIAS) ... # adapt the other 2 loops with this ^ ... return Snapshot(branches=branches).id That remark is also valid for revision_identifier and release_identifier (earlier), it only hit me now ¯\_(ツ)_/¯ ardumont: as a heads up, I recall some modifications is currently happening on the module swh.model. | |||||
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs | |||||
If not provided it will be computed using `identify_revision`. | |||||
release: an optional list of `ReleaseIdentity`. | |||||
If not provided it will be computed using `identify_release`. | |||||
""" | |||||
from swh.model.model import Snapshot, TargetType | |||||
if node_id_2_swhid is None: | |||||
node_id_2_swhid = { | |||||
revision.node_id: revision.swhid for revision in identify_revision(hg) | |||||
} | |||||
if releases is None: | |||||
releases = [release for release in identify_release(hg, node_id_2_swhid)] | |||||
branches = {} | |||||
tip = hg.tip() | |||||
branches[b"HEAD"] = { | |||||
"target": tip.branch(), | |||||
"target_type": TargetType.ALIAS.value, | |||||
} | |||||
for branch in hg.branches(): | |||||
branches[branch.name] = { | |||||
"target": node_id_2_swhid[branch.node_id], | |||||
"target_type": TargetType.REVISION.value, | |||||
Done Inline Actionssame use swhid function. ardumont: same use swhid function. | |||||
} | |||||
for release in releases: | |||||
branches[release.name] = { | |||||
"target": release.swhid, | |||||
"target_type": TargetType.RELEASE.value, | |||||
} | |||||
return Snapshot.from_dict({"branches": branches}).id | |||||
@main.command() | |||||
@click.argument("rev", required=False) | |||||
@click.pass_context | |||||
def revision(ctx, rev): | |||||
"""Compute the SWHID of a given revision. | |||||
If specified REV allow to select a single or multiple revisions | |||||
(using the Mercurial revsets language: `hg help revsets`) | |||||
""" | |||||
hg = Hg(ctx.obj["HG_ROOT"]) | |||||
for identity in identify_revision(hg, rev): | |||||
click.echo(identity) | |||||
@main.command() | |||||
@click.pass_context | |||||
def snapshot(ctx): | |||||
"""Compute the SWHID of the snapshot.""" | |||||
root = ctx.obj["HG_ROOT"] | |||||
hg = Hg(root) | |||||
snapshot_swhid = identify_snapshot(hg) | |||||
uri = swhid("snapshot", snapshot_swhid) | |||||
click.echo(f"{uri}\t{root}") | |||||
@main.command() | |||||
@click.pass_context | |||||
def all(ctx): | |||||
"""Compute the SWHID of all the repository objects.""" | |||||
root = ctx.obj["HG_ROOT"] | |||||
hg = Hg(root) | |||||
dir_uris = [] | |||||
rev_uris = [] | |||||
rel_uris = [] | |||||
node_id_2_swhid = {} | |||||
for revision in identify_revision(hg): | |||||
dir_uris.append(revision.dir_uri()) | |||||
rev_uris.append(str(revision)) | |||||
node_id_2_swhid[revision.node_id] = revision.swhid | |||||
releases = [] | |||||
for release in identify_release(hg, node_id_2_swhid): | |||||
rel_uris.append(str(release)) | |||||
releases.append(release) | |||||
snapshot_swhid = identify_snapshot(hg, node_id_2_swhid, releases) | |||||
for uri in dir_uris + rev_uris + rel_uris: | |||||
click.echo(uri) | |||||
uri = swhid("snapshot", snapshot_swhid) | |||||
click.echo(f"{uri}\t{root}") | |||||
if __name__ == "__main__": | |||||
main() |
This should get a header comment.