Changeset View
Standalone View
swh/loader/mercurial/identify.py
- This file was added.
# Copyright (C) 2018-2020 The Software Heritage developers | |||||
marmoute: This should get a header comment. | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import json | |||||
import subprocess | |||||
from codecs import escape_decode # type: ignore | |||||
from pathlib import Path | |||||
from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union | |||||
# WARNING: do not import unnecessary things here to keep cli startup time under | |||||
# control | |||||
Done Inline ActionsCan you add a docstring to this class to highlight its roll and the input/output format ? marmoute: Can you add a docstring to this class to highlight its roll and the input/output format ? | |||||
import click | |||||
from swh.model.hashutil import hash_to_bytehex | |||||
from swh.model.identifiers import normalize_timestamp, swhid | |||||
from swh.model.model import RevisionType | |||||
class HgAuthor(NamedTuple): | |||||
"""Represent a Mercurial revision author.""" | |||||
fullname: bytes | |||||
"""full name of the author""" | |||||
name: Optional[bytes] | |||||
"""name of the author""" | |||||
email: Optional[bytes] | |||||
"""email of the author""" | |||||
@staticmethod | |||||
def from_bytes(data: bytes) -> "HgAuthor": | |||||
"""Convert bytes to an HgAuthor named tuple. | |||||
Expected format: "name <email>" | |||||
""" | |||||
from swh.loader.mercurial.converters import parse_author | |||||
result = parse_author(data) | |||||
Done Inline ActionsIs this the byte format that SWH is expecting to generate a revision ID ? If so, please mention it in a comment. (and if now, what is this ?) marmoute: Is this the byte format that SWH is expecting to generate a revision ID ? If so, please mention… | |||||
return HgAuthor( | |||||
fullname=result["fullname"], name=result["name"], email=result["email"] | |||||
) | |||||
def to_dict(self) -> Dict[str, Optional[bytes]]: | |||||
return {"fullname": self.fullname, "name": self.name, "email": self.email} | |||||
HG_REVISION_TEMPLATE = "\n".join( | |||||
[ | |||||
"node_id:{node}", | |||||
Done Inline ActionsWhat about b"0" * 40 ? I would also name this NULL_NODE_HEX this is not a binary entry. marmoute: What about b"0" * 40 ? I would also name this NULL_NODE_HEX this is not a binary entry. | |||||
"author:{author}", | |||||
"timestamp_offset:{date|json}", | |||||
"p1:{p1.node}", | |||||
Done Inline ActionsSame feedback about docstring. marmoute: Same feedback about docstring. | |||||
"p2:{p2.node}", | |||||
"extras:{join(extras, '\nextras:')}", | |||||
] | |||||
) # Log template for HgRevision.from_bytes | |||||
NULL_NODE_ID = b"0" * 40 # Value used when no parent | |||||
Done Inline Actionsyou can use docstring """raw bytes...""" just below the instance variable. ardumont: you can use docstring """raw bytes...""" just below the instance variable. | |||||
class HgRevision(NamedTuple): | |||||
"""Represent a Mercurial revision.""" | |||||
Done Inline ActionsWe probably wants a docstring about the expected format here. marmoute: We probably wants a docstring about the expected format here. | |||||
node_id: bytes | |||||
"""raw bytes of the revision hash""" | |||||
author: HgAuthor | |||||
"""author of the revision""" | |||||
Done Inline Actionsextra space. ardumont: extra space. | |||||
timestamp: bytes | |||||
"""timestamp of the revision""" | |||||
offset: bytes | |||||
"""offset of the revision""" | |||||
parents: List[bytes] | |||||
"""hex bytes of the revision's parents""" | |||||
extras: Dict[bytes, bytes] | |||||
"""metadata of the revision""" | |||||
description: bytes | |||||
"""description of the revision""" | |||||
@staticmethod | |||||
def from_bytes(data: bytes, description: bytes) -> "HgRevision": | |||||
"""Convert bytes to an HgRevision named tuple. | |||||
Expected data format: | |||||
''' | |||||
node_id:{node} | |||||
author:{author} | |||||
timestamp_offset:[{timestamp}, {offset}] | |||||
p1:{p1} | |||||
p2:{p2} | |||||
extras:{key1}={value1} | |||||
... | |||||
extras:{keyn}={value} | |||||
''' | |||||
""" | |||||
lines = data.split(b"\n") | |||||
tuples = [line.split(b":", 1) for line in lines] | |||||
fields: Dict[str, Any] = { | |||||
"parents": [], | |||||
"extras": {}, | |||||
"description": description, | |||||
} | |||||
for key, value in tuples: | |||||
Done Inline Actionsfields["description"] = description return HgRevision(**fields) ? ardumont: ```
fields["description"] = description
return HgRevision(**fields)
```
? | |||||
if key == b"timestamp_offset": | |||||
timestamp, offset = json.loads(value) | |||||
fields["timestamp"] = timestamp | |||||
fields["offset"] = offset | |||||
elif key in (b"p1", b"p2"): | |||||
if value != NULL_NODE_ID: | |||||
fields["parents"].append(value) | |||||
elif key == b"extras": | |||||
Done Inline Actionssame feedback about docstring. marmoute: same feedback about docstring. | |||||
extra_key, extra_value = value.split(b"=", 1) | |||||
fields["extras"][extra_key] = extra_value | |||||
elif key == b"author": | |||||
fields["author"] = HgAuthor.from_bytes(value) | |||||
else: | |||||
fields[key.decode()] = value | |||||
return HgRevision(**fields) | |||||
def branch(self) -> bytes: | |||||
return self.extras.get(b"branch", b"default") | |||||
def to_dict(self) -> Dict: | |||||
"""Convert a HgRevision to a dict for SWHID computation""" | |||||
date = normalize_timestamp(int(self.timestamp)) | |||||
extra_headers = [ | |||||
(b"time_offset_seconds", str(self.offset).encode("utf-8")), | |||||
] | |||||
for key, value in self.extras.items(): | |||||
if key == b"branch" and value == b"default": | |||||
# branch default is skipped to match actual implementation | |||||
continue | |||||
if key == b"transplant_source": | |||||
# transplant_source is converted to hex to match actual implementation | |||||
value = hash_to_bytehex(escape_decode(value)[0]) | |||||
extra_headers.append((key, value)) | |||||
author = self.author.to_dict() | |||||
Not Done Inline Actions"Actual" feel weird here. Maybe "to match historical implementation" marmoute: "Actual" feel weird here. Maybe "to match historical implementation" | |||||
return { | |||||
"author": author, | |||||
"date": date, | |||||
"committer": author, | |||||
"committer_date": date, | |||||
"type": RevisionType.MERCURIAL.value, | |||||
"message": self.description, | |||||
"metadata": {"node": self.node_id}, | |||||
Done Inline ActionsWhy "cloned" ? marmoute: Why "cloned" ? | |||||
"extra_headers": tuple(extra_headers), | |||||
"synthetic": False, | |||||
"parents": self.parents, | |||||
} | |||||
class HgBranch(NamedTuple): | |||||
"""Represent a Mercurial branch.""" | |||||
name: bytes | |||||
"""name of the branch""" | |||||
node_id: bytes | |||||
"""row bytes of the target revision hash""" | |||||
class HgTag(NamedTuple): | |||||
"""Represent a Mercurial tag.""" | |||||
name: bytes | |||||
"""name of the tag""" | |||||
node_id: bytes | |||||
"""hex bytes of the target revision""" | |||||
Done Inline Actionssame feedback about docstring. marmoute: same feedback about docstring. | |||||
class Hg: | |||||
"""Provide methods to extract data from a Mercurial repository.""" | |||||
def __init__(self, repository_root: Path) -> None: | |||||
self._root = repository_root | |||||
Done Inline Actions(docstring) marmoute: (docstring) | |||||
def _output(self, *args) -> bytes: | |||||
"""Return the outpout of a `hg` call.""" | |||||
return subprocess.check_output(["hg", *args], cwd=self._root) | |||||
def _call(self, *args) -> None: | |||||
"""Perform a `hg` call.""" | |||||
Done Inline ActionsNote to self, add the hg runtime dependency in the debian branch (if not there already, don't remember). ardumont: Note to self, add the hg runtime dependency in the debian branch (if not there already, don't… | |||||
subprocess.check_call( | |||||
["hg", *args], | |||||
cwd=self._root, | |||||
stderr=subprocess.PIPE, | |||||
stdout=subprocess.PIPE, | |||||
) | |||||
def root(self) -> Path: | |||||
"""Return the root of the Mercurial repository.""" | |||||
return self._root | |||||
def log(self, rev: Optional[Union[bytes, str]] = None) -> List[HgRevision]: | |||||
"""Return the specified revisions of the Mercurial repository. | |||||
Mercurial revsets are supported. (See `hg help revsets`) | |||||
If no revision range is specified, return all revisions". | |||||
""" | |||||
if rev: | |||||
node_ids = self._output("log", "-r", rev, "-T", "{node}\n").splitlines() | |||||
else: | |||||
node_ids = self._output("log", "-T", "{node}\n").splitlines() | |||||
revisions = [self._revision(node_id) for node_id in reversed(node_ids)] | |||||
return revisions | |||||
Done Inline ActionsJust spotted a small typo: s/reveset/revset marmoute: Just spotted a small typo: `s/reveset/revset` | |||||
def _revision(self, revision: bytes) -> HgRevision: | |||||
data = self._output("log", "-r", revision, "-T", HG_REVISION_TEMPLATE) | |||||
# hg log strips the description so the raw description has to be taken | |||||
# from debugdata | |||||
_, desc = self._output("debugdata", "-c", revision).split(b"\n\n", 1) | |||||
return HgRevision.from_bytes(data, desc) | |||||
def up(self, rev: bytes) -> None: | |||||
Not Done Inline ActionsCan you explain the "\n\n" part in a small comment ? marmoute: Can you explain the "\n\n" part in a small comment ? | |||||
"""Update the repository working directory to the specified revision.""" | |||||
self._call("up", rev) | |||||
def branches(self) -> List[HgBranch]: | |||||
"""List the repository named branches.""" | |||||
output = self._output("branches", "-T", "{branch}\n{node}\n\n").strip() | |||||
branches = [] | |||||
Done Inline ActionsI recall the hgtags can be noisy in its content (possibly due to merge badly dealt with by users). Related to T970 rDLDHGe0b48c6c6e9aa3329c9058e1a39da38ff801714a ardumont: I recall the hgtags can be noisy in its content (possibly due to merge badly dealt with by… | |||||
Done Inline Actionsthat same tag name can appears multiple time in the tag file. In this case, the latest value should be used. The previous entries are here to indicate which older value have been overwritten. (We should add a case of this in the new tests (when it gets to the new tests)) marmoute: that same tag name can appears multiple time in the tag file. In this case, the latest value… | |||||
for block in output.split(b"\n\n"): | |||||
name, node_id = block.splitlines() | |||||
branches.append(HgBranch(name=name, node_id=node_id)) | |||||
return branches | |||||
def tip(self) -> HgRevision: | |||||
"""Return the `tip` revision.""" | |||||
Done Inline Actionsextra space ardumont: extra space | |||||
return self.log("tip")[0] | |||||
def tags(self) -> List[HgTag]: | |||||
"""Return the repository tags at the current revision.""" | |||||
Not Done Inline ActionsUse "node-id" instead of revision here to make it clearer that we are talking about binary node ID marmoute: Use "node-id" instead of revision here to make it clearer that we are talking about binary node… | |||||
hgtags = self._root / ".hgtags" | |||||
tags = [] | |||||
Not Done Inline ActionsWhat does "at the current revision" means in this context ? marmoute: What does "at the current revision" means in this context ? | |||||
if hgtags.is_file(): | |||||
for line in hgtags.read_bytes().splitlines(): | |||||
node_id, name = line.split(b" ", 1) | |||||
tags.append(HgTag(name=name, node_id=node_id)) | |||||
return tags | |||||
@click.group() | |||||
@click.option( | |||||
"--directory", | |||||
"-d", | |||||
help=("Path to the Mercurial repository. If unset, the current directory is used"), | |||||
) | |||||
@click.pass_context | |||||
def main(ctx, directory=None): | |||||
"""Compute the Software Heritage persistent identifier (SWHID) for the given | |||||
source code object(s). | |||||
For more details about SWHIDs see: | |||||
https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html | |||||
""" | |||||
# ensure that ctx.obj exists and is a dict (in case `cli()` is called | |||||
# by means other than the `if` block below) | |||||
ctx.ensure_object(dict) | |||||
root = Path(directory) if directory else Path() | |||||
if not root.exists(): | |||||
raise IOError(f"{root!r} does not exists") | |||||
Done Inline Actionsmight as well use swh.model.identifiers.swhid function [1] instead of hard-coding values: even if that's giving a mouthful from swh.model.identifiers import swhid return f"{swhid("directory", self.directory_swhid.hex())}\t{self.node_id.decode()}" (or use an intermediary variable, as you wish...) [1] https://forge.softwareheritage.org/source/swh-model/browse/master/swh/model/identifiers.py$767-772 ardumont: might as well use swh.model.identifiers.swhid function [1] instead of hard-coding values:
even… | |||||
ctx.obj["HG_ROOT"] = root | |||||
def identify_directory(path: Path) -> str: | |||||
Done Inline Actionssame ardumont: same | |||||
"""Return the SWHID of the given path.""" | |||||
output = subprocess.check_output( | |||||
["swh-identify", "--exclude", ".hg", "."], cwd=path | |||||
).decode() | |||||
return output.split()[0].split(":")[-1] | |||||
class RevisionIdentity(NamedTuple): | |||||
"""Represent a swh revision identity.""" | |||||
swhid: bytes | |||||
"""SWHID raw bytes""" | |||||
Done Inline ActionsWhy do you use the swh-identify command in a subprocess rather than just using the identify_object function from swh.model.cli? douardda: Why do you use the `swh-identify` command in a subprocess rather than just using the… | |||||
node_id: bytes | |||||
"""node_id hex bytes""" | |||||
directory_swhid: bytes | |||||
def dir_uri(self) -> str: | |||||
"""Return the SWHID uri of the revision's directory.""" | |||||
return f"swh:1:dir:{self.directory_swhid.hex()}\t{self.node_id.decode()}" | |||||
def __str__(self) -> str: | |||||
"""Return the string representation of a RevisionIdentity.""" | |||||
uri = swhid("revision", self.swhid.hex()) | |||||
return f"{uri}\t{self.node_id.decode()}" | |||||
def identify_revision( | |||||
hg: Hg, | |||||
rev: Optional[bytes] = None, | |||||
node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, | |||||
Done Inline Actionscurious me, is that a form of destructuring to say, give me the first element from the result, and discard the rest? it looks like it, TIL ;) (oh and that works because we are dealing with NamedTuple, neat) ardumont: curious me, is that a form of destructuring to say, give me the first element from the result… | |||||
Done Inline ActionsMade this cleared and not relying on the field declaration order. acezar: Made this cleared and not relying on the field declaration order. | |||||
) -> Iterator[RevisionIdentity]: | |||||
"""Return the repository revision identities. | |||||
hg: A `Hg` repository instance | |||||
rev: An optional revision or Mercurial revsets (See `hg help revsets`) | |||||
If not provided all the repository revisions will be computed. | |||||
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs | |||||
It will be updated in place with new mappings. | |||||
""" | |||||
from swh.model.hashutil import hash_to_bytes | |||||
from swh.model.model import Revision | |||||
if node_id_2_swhid is None: | |||||
node_id_2_swhid = {} | |||||
for revision in hg.log(rev): | |||||
data = revision.to_dict() | |||||
hg.up(revision.node_id) | |||||
directory_swhid = hash_to_bytes(identify_directory(hg.root())) | |||||
data["directory"] = directory_swhid | |||||
parents = [] | |||||
for parent in data["parents"]: | |||||
Done Inline Actionssame about the model swhid function use. ardumont: same about the model swhid function use. | |||||
if parent not in node_id_2_swhid: | |||||
parent_revision = next(identify_revision(hg, parent, node_id_2_swhid)) | |||||
node_id_2_swhid[parent] = parent_revision.swhid | |||||
parents.append(node_id_2_swhid[parent]) | |||||
data["parents"] = parents | |||||
revision_swhid = hash_to_bytes(Revision.from_dict(data).id) | |||||
node_id_2_swhid[revision.node_id] = revision_swhid | |||||
yield RevisionIdentity( | |||||
swhid=revision_swhid, | |||||
node_id=revision.node_id, | |||||
directory_swhid=directory_swhid, | |||||
) | |||||
class ReleaseIdentity(NamedTuple): | |||||
"""Represent a swh release identity.""" | |||||
swhid: str | |||||
"""SWHID hex string""" | |||||
node_id: bytes | |||||
"""node_id hex bytes""" | |||||
name: bytes | |||||
"""name of the release""" | |||||
def __str__(self) -> str: | |||||
"""Return the string representation of a ReleaseIdentity.""" | |||||
uri = swhid("release", self.swhid) | |||||
return f"{uri}\t{self.name.decode()}" | |||||
def identify_release( | |||||
hg: Hg, node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, | |||||
) -> Iterator[ReleaseIdentity]: | |||||
"""Return the repository's release identities. | |||||
hg: A `Hg` repository instance | |||||
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs | |||||
If not provided it will be computed using `identify_revision`. | |||||
""" | |||||
from swh.model.model import ObjectType, Release | |||||
Done Inline Actionssnapshot identifiers. you might want to align to use identities instead of identifiers as you use elsewhere or use identifiers everywhere... ;) ardumont: snapshot identifiers.
you might want to align to use `identities` instead of `identifiers` as… | |||||
if node_id_2_swhid is None: | |||||
node_id_2_swhid = { | |||||
revision.node_id: revision.swhid for revision in identify_revision(hg) | |||||
} | |||||
for tag in hg.tags(): | |||||
data = { | |||||
"name": tag.name, | |||||
"target": node_id_2_swhid[tag.node_id], | |||||
"target_type": ObjectType.REVISION.value, | |||||
"message": None, | |||||
"metadata": None, | |||||
"synthetic": False, | |||||
"author": {"name": None, "email": None, "fullname": b""}, | |||||
"date": None, | |||||
} | |||||
release_swhid = Release.from_dict(data).id | |||||
yield ReleaseIdentity( | |||||
swhid=release_swhid, node_id=tag.node_id, name=tag.name, | |||||
) | |||||
def identify_snapshot( | |||||
hg: Hg, | |||||
node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, | |||||
releases: Optional[List[ReleaseIdentity]] = None, | |||||
) -> str: | |||||
"""Return the repository snapshot identity. | |||||
hg: A `Hg` repository instance | |||||
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs | |||||
If not provided it will be computed using `identify_revision`. | |||||
release: an optional list of `ReleaseIdentity`. | |||||
If not provided it will be computed using `identify_release`. | |||||
""" | |||||
from swh.model.model import Snapshot, TargetType | |||||
Done Inline Actionsas a heads up, I recall some modifications is currently happening on the module swh.model.identifiers. [1] To avoid some breakage on this in a near future, prefer using the swh data model object instead. from swh.model.model import Snapshot, SnapshotBranch branches[b"HEAD"] = SnapshotBranch(target=tip.branch(), target_type=TargetType.ALIAS) ... # adapt the other 2 loops with this ^ ... return Snapshot(branches=branches).id That remark is also valid for revision_identifier and release_identifier (earlier), it only hit me now ¯\_(ツ)_/¯ ardumont: as a heads up, I recall some modifications is currently happening on the module swh.model. | |||||
if node_id_2_swhid is None: | |||||
node_id_2_swhid = { | |||||
revision.node_id: revision.swhid for revision in identify_revision(hg) | |||||
} | |||||
if releases is None: | |||||
releases = [release for release in identify_release(hg, node_id_2_swhid)] | |||||
branches = {} | |||||
tip = hg.tip() | |||||
branches[b"HEAD"] = { | |||||
"target": tip.branch(), | |||||
"target_type": TargetType.ALIAS.value, | |||||
} | |||||
for branch in hg.branches(): | |||||
branches[branch.name] = { | |||||
"target": node_id_2_swhid[branch.node_id], | |||||
"target_type": TargetType.REVISION.value, | |||||
} | |||||
for release in releases: | |||||
branches[release.name] = { | |||||
"target": release.swhid, | |||||
"target_type": TargetType.RELEASE.value, | |||||
Done Inline Actionssame use swhid function. ardumont: same use swhid function. | |||||
} | |||||
return Snapshot.from_dict({"branches": branches}).id | |||||
@main.command() | |||||
@click.argument("rev", required=False) | |||||
@click.pass_context | |||||
def revision(ctx, rev): | |||||
"""Compute the SWHID of a given revision. | |||||
If specified REV allow to select a single or multiple revisions | |||||
(using the Mercurial revsets language: `hg help revsets`) | |||||
""" | |||||
hg = Hg(ctx.obj["HG_ROOT"]) | |||||
for identity in identify_revision(hg, rev): | |||||
click.echo(identity) | |||||
@main.command() | |||||
@click.pass_context | |||||
def snapshot(ctx): | |||||
"""Compute the SWHID of the snapshot.""" | |||||
root = ctx.obj["HG_ROOT"] | |||||
hg = Hg(root) | |||||
snapshot_swhid = identify_snapshot(hg) | |||||
uri = swhid("snapshot", snapshot_swhid) | |||||
click.echo(f"{uri}\t{root}") | |||||
@main.command() | |||||
@click.pass_context | |||||
def all(ctx): | |||||
"""Compute the SWHID of all the repository objects.""" | |||||
root = ctx.obj["HG_ROOT"] | |||||
hg = Hg(root) | |||||
dir_uris = [] | |||||
rev_uris = [] | |||||
rel_uris = [] | |||||
node_id_2_swhid = {} | |||||
for revision in identify_revision(hg): | |||||
dir_uris.append(revision.dir_uri()) | |||||
rev_uris.append(str(revision)) | |||||
node_id_2_swhid[revision.node_id] = revision.swhid | |||||
releases = [] | |||||
for release in identify_release(hg, node_id_2_swhid): | |||||
rel_uris.append(str(release)) | |||||
releases.append(release) | |||||
snapshot_swhid = identify_snapshot(hg, node_id_2_swhid, releases) | |||||
for uri in dir_uris + rev_uris + rel_uris: | |||||
click.echo(uri) | |||||
uri = swhid("snapshot", snapshot_swhid) | |||||
click.echo(f"{uri}\t{root}") | |||||
if __name__ == "__main__": | |||||
main() |
This should get a header comment.