Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/identify.py
Show All 10 Lines | |||||
from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union | from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union | ||||
# WARNING: do not import unnecessary things here to keep cli startup time under | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
# control | # control | ||||
import click | import click | ||||
from swh.loader.mercurial.utils import get_minimum_env | from swh.loader.mercurial.utils import get_minimum_env | ||||
from swh.model.cli import identify_object | from swh.model.cli import identify_object | ||||
from swh.model.hashutil import hash_to_bytehex, hash_to_bytes | from swh.model.hashutil import hash_to_bytehex | ||||
from swh.model.identifiers import CoreSWHID, ObjectType, normalize_timestamp | from swh.model.identifiers import CoreSWHID, ObjectType, normalize_timestamp | ||||
from swh.model.model import RevisionType | from swh.model.model import RevisionType | ||||
TAG_PATTERN = re.compile(b"([0-9A-Fa-f]{40}) +(.+)") | TAG_PATTERN = re.compile(b"([0-9A-Fa-f]{40}) +(.+)") | ||||
class HgAuthor(NamedTuple): | class HgAuthor(NamedTuple): | ||||
"""Represent a Mercurial revision author.""" | """Represent a Mercurial revision author.""" | ||||
▲ Show 20 Lines • Show All 277 Lines • ▼ Show 20 Lines | def main(ctx, directory=None): | ||||
root = Path(directory) if directory else Path() | root = Path(directory) if directory else Path() | ||||
if not root.exists(): | if not root.exists(): | ||||
raise IOError(f"{root!r} does not exists") | raise IOError(f"{root!r} does not exists") | ||||
ctx.obj["HG_ROOT"] = root | ctx.obj["HG_ROOT"] = root | ||||
def identify_directory(path: Path) -> str: | def identify_directory(path: Path) -> CoreSWHID: | ||||
"""Return the SWHID of the given path.""" | """Return the SWHID of the given path.""" | ||||
uri = identify_object( | return CoreSWHID.from_string( | ||||
identify_object( | |||||
"directory", follow_symlinks=True, exclude_patterns=[".hg"], obj=str(path) | "directory", follow_symlinks=True, exclude_patterns=[".hg"], obj=str(path) | ||||
) | ) | ||||
return uri.split(":")[-1] | ) | ||||
class RevisionIdentity(NamedTuple): | class RevisionIdentity(NamedTuple): | ||||
"""Represent a swh revision identity.""" | """Represent a swh revision identity.""" | ||||
swhid: bytes | swhid: CoreSWHID | ||||
"""SWHID raw bytes""" | """SWH Identifier of the revision.""" | ||||
node_id: bytes | node_id: bytes | ||||
"""node_id hex bytes""" | """node_id hex bytes""" | ||||
directory_swhid: bytes | directory_swhid: CoreSWHID | ||||
"""SWH Identifier of the directory""" | |||||
def dir_uri(self) -> str: | def dir_uri(self) -> str: | ||||
"""Return the SWHID uri of the revision's directory.""" | """Return the SWHID uri of the revision's directory.""" | ||||
return f"swh:1:dir:{self.directory_swhid.hex()}\t{self.node_id.decode()}" | return f"{self.directory_swhid}\t{self.node_id.decode()}" | ||||
def __str__(self) -> str: | def __str__(self) -> str: | ||||
"""Return the string representation of a RevisionIdentity.""" | """Return the string representation of a RevisionIdentity.""" | ||||
uri = CoreSWHID(object_type=ObjectType.REVISION, object_id=self.swhid) | return f"{self.swhid}\t{self.node_id.decode()}" | ||||
return f"{uri}\t{self.node_id.decode()}" | |||||
def identify_revision( | def identify_revision( | ||||
hg: Hg, | hg: Hg, | ||||
rev: Optional[bytes] = None, | rev: Optional[bytes] = None, | ||||
node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, | node_id_2_swhid: Optional[Dict[bytes, CoreSWHID]] = None, | ||||
) -> Iterator[RevisionIdentity]: | ) -> Iterator[RevisionIdentity]: | ||||
"""Return the repository revision identities. | """Return the repository revision identities. | ||||
hg: A `Hg` repository instance | hg: A `Hg` repository instance | ||||
rev: An optional revision or Mercurial revsets (See `hg help revsets`) | rev: An optional revision or Mercurial revsets (See `hg help revsets`) | ||||
If not provided all the repository revisions will be computed. | If not provided all the repository revisions will be computed. | ||||
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs | node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs | ||||
It will be updated in place with new mappings. | It will be updated in place with new mappings. | ||||
""" | """ | ||||
from swh.model.hashutil import hash_to_bytes | |||||
from swh.model.model import Revision | from swh.model.model import Revision | ||||
if node_id_2_swhid is None: | if node_id_2_swhid is None: | ||||
node_id_2_swhid = {} | node_id_2_swhid = {} | ||||
for revision in hg.log(rev): | for revision in hg.log(rev): | ||||
data = revision.to_dict() | data = revision.to_dict() | ||||
hg.up(revision.node_id) | hg.up(revision.node_id) | ||||
directory_swhid = hash_to_bytes(identify_directory(hg.root())) | directory_swhid = identify_directory(hg.root()) | ||||
data["directory"] = directory_swhid | data["directory"] = directory_swhid.object_id | ||||
parents = [] | parents = [] | ||||
for parent in data["parents"]: | for parent in data["parents"]: | ||||
if parent not in node_id_2_swhid: | if parent not in node_id_2_swhid: | ||||
parent_revision = next(identify_revision(hg, parent, node_id_2_swhid)) | parent_revision = next(identify_revision(hg, parent, node_id_2_swhid)) | ||||
node_id_2_swhid[parent] = parent_revision.swhid | node_id_2_swhid[parent] = parent_revision.swhid | ||||
parents.append(node_id_2_swhid[parent]) | assert node_id_2_swhid[parent].object_type == ObjectType.REVISION | ||||
parents.append(node_id_2_swhid[parent].object_id) | |||||
data["parents"] = parents | data["parents"] = parents | ||||
revision_swhid = hash_to_bytes(Revision.from_dict(data).id) | revision_swhid = Revision.from_dict(data).swhid() | ||||
node_id_2_swhid[revision.node_id] = revision_swhid | node_id_2_swhid[revision.node_id] = revision_swhid | ||||
yield RevisionIdentity( | yield RevisionIdentity( | ||||
swhid=revision_swhid, | swhid=revision_swhid, | ||||
node_id=revision.node_id, | node_id=revision.node_id, | ||||
directory_swhid=directory_swhid, | directory_swhid=directory_swhid, | ||||
) | ) | ||||
class ReleaseIdentity(NamedTuple): | class ReleaseIdentity(NamedTuple): | ||||
"""Represent a swh release identity.""" | """Represent a swh release identity.""" | ||||
swhid: str | swhid: CoreSWHID | ||||
"""SWHID hex string""" | """SWH Identifier of the release.""" | ||||
node_id: bytes | node_id: bytes | ||||
"""node_id hex bytes""" | """node_id hex bytes""" | ||||
name: bytes | name: bytes | ||||
"""name of the release""" | """name of the release""" | ||||
def __str__(self) -> str: | def __str__(self) -> str: | ||||
"""Return the string representation of a ReleaseIdentity.""" | """Return the string representation of a ReleaseIdentity.""" | ||||
uri = CoreSWHID( | return f"{self.swhid}\t{self.name.decode()}" | ||||
object_type=ObjectType.RELEASE, object_id=hash_to_bytes(self.swhid) | |||||
) | |||||
return f"{uri}\t{self.name.decode()}" | |||||
def identify_release( | def identify_release( | ||||
hg: Hg, node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, | hg: Hg, node_id_2_swhid: Optional[Dict[bytes, CoreSWHID]] = None, | ||||
) -> Iterator[ReleaseIdentity]: | ) -> Iterator[ReleaseIdentity]: | ||||
"""Return the repository's release identities. | """Return the repository's release identities. | ||||
hg: A `Hg` repository instance | hg: A `Hg` repository instance | ||||
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs | node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs | ||||
If not provided it will be computed using `identify_revision`. | If not provided it will be computed using `identify_revision`. | ||||
""" | """ | ||||
from swh.model.model import ObjectType as ModelObjectType | from swh.model.model import ObjectType as ModelObjectType | ||||
from swh.model.model import Release | from swh.model.model import Release | ||||
if node_id_2_swhid is None: | if node_id_2_swhid is None: | ||||
node_id_2_swhid = { | node_id_2_swhid = { | ||||
revision.node_id: revision.swhid for revision in identify_revision(hg) | revision.node_id: revision.swhid for revision in identify_revision(hg) | ||||
} | } | ||||
for tag in hg.tags(): | for tag in hg.tags(): | ||||
assert node_id_2_swhid[tag.node_id].object_type == ObjectType.REVISION | |||||
data = { | data = { | ||||
"name": tag.name, | "name": tag.name, | ||||
"target": node_id_2_swhid[tag.node_id], | "target": node_id_2_swhid[tag.node_id].object_id, | ||||
"target_type": ModelObjectType.REVISION.value, | "target_type": ModelObjectType.REVISION.value, | ||||
"message": None, | "message": None, | ||||
"metadata": None, | "metadata": None, | ||||
"synthetic": False, | "synthetic": False, | ||||
"author": {"name": None, "email": None, "fullname": b""}, | "author": {"name": None, "email": None, "fullname": b""}, | ||||
"date": None, | "date": None, | ||||
} | } | ||||
release_swhid = Release.from_dict(data).id | release_swhid = Release.from_dict(data).swhid() | ||||
yield ReleaseIdentity( | yield ReleaseIdentity( | ||||
swhid=release_swhid, node_id=tag.node_id, name=tag.name, | swhid=release_swhid, node_id=tag.node_id, name=tag.name, | ||||
) | ) | ||||
def identify_snapshot( | def identify_snapshot( | ||||
hg: Hg, | hg: Hg, | ||||
node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, | node_id_2_swhid: Optional[Dict[bytes, CoreSWHID]] = None, | ||||
releases: Optional[List[ReleaseIdentity]] = None, | releases: Optional[List[ReleaseIdentity]] = None, | ||||
) -> str: | ) -> CoreSWHID: | ||||
"""Return the repository snapshot identity. | """Return the repository snapshot identity. | ||||
hg: A `Hg` repository instance | hg: A `Hg` repository instance | ||||
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs | node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs | ||||
If not provided it will be computed using `identify_revision`. | If not provided it will be computed using `identify_revision`. | ||||
release: an optional list of `ReleaseIdentity`. | release: an optional list of `ReleaseIdentity`. | ||||
If not provided it will be computed using `identify_release`. | If not provided it will be computed using `identify_release`. | ||||
""" | """ | ||||
Show All 11 Lines | ) -> CoreSWHID: | ||||
tip = hg.tip() | tip = hg.tip() | ||||
branches[b"HEAD"] = { | branches[b"HEAD"] = { | ||||
"target": tip.branch(), | "target": tip.branch(), | ||||
"target_type": TargetType.ALIAS.value, | "target_type": TargetType.ALIAS.value, | ||||
} | } | ||||
for branch in hg.branches(): | for branch in hg.branches(): | ||||
assert node_id_2_swhid[branch.node_id].object_type == ObjectType.REVISION | |||||
branches[branch.name] = { | branches[branch.name] = { | ||||
"target": node_id_2_swhid[branch.node_id], | "target": node_id_2_swhid[branch.node_id].object_id, | ||||
"target_type": TargetType.REVISION.value, | "target_type": TargetType.REVISION.value, | ||||
} | } | ||||
for release in releases: | for release in releases: | ||||
assert release.swhid.object_type == ObjectType.RELEASE | |||||
branches[release.name] = { | branches[release.name] = { | ||||
"target": release.swhid, | "target": release.swhid.object_id, | ||||
"target_type": TargetType.RELEASE.value, | "target_type": TargetType.RELEASE.value, | ||||
} | } | ||||
return Snapshot.from_dict({"branches": branches}).id | return Snapshot.from_dict({"branches": branches}).swhid() | ||||
@main.command() | @main.command() | ||||
@click.argument("rev", required=False) | @click.argument("rev", required=False) | ||||
@click.pass_context | @click.pass_context | ||||
def revision(ctx, rev): | def revision(ctx, rev): | ||||
"""Compute the SWHID of a given revision. | """Compute the SWHID of a given revision. | ||||
Show All 10 Lines | |||||
@click.pass_context | @click.pass_context | ||||
def snapshot(ctx): | def snapshot(ctx): | ||||
"""Compute the SWHID of the snapshot.""" | """Compute the SWHID of the snapshot.""" | ||||
root = ctx.obj["HG_ROOT"] | root = ctx.obj["HG_ROOT"] | ||||
hg = Hg(root) | hg = Hg(root) | ||||
snapshot_swhid = identify_snapshot(hg) | snapshot_swhid = identify_snapshot(hg) | ||||
uri = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snapshot_swhid) | click.echo(f"{snapshot_swhid}\t{root}") | ||||
click.echo(f"{uri}\t{root}") | |||||
@main.command() | @main.command() | ||||
@click.pass_context | @click.pass_context | ||||
def all(ctx): | def all(ctx): | ||||
"""Compute the SWHID of all the repository objects.""" | """Compute the SWHID of all the repository objects.""" | ||||
root = ctx.obj["HG_ROOT"] | root = ctx.obj["HG_ROOT"] | ||||
hg = Hg(root) | hg = Hg(root) | ||||
Show All 13 Lines | for release in identify_release(hg, node_id_2_swhid): | ||||
rel_uris.append(str(release)) | rel_uris.append(str(release)) | ||||
releases.append(release) | releases.append(release) | ||||
snapshot_swhid = identify_snapshot(hg, node_id_2_swhid, releases) | snapshot_swhid = identify_snapshot(hg, node_id_2_swhid, releases) | ||||
for uri in dir_uris + rev_uris + rel_uris: | for uri in dir_uris + rev_uris + rel_uris: | ||||
click.echo(uri) | click.echo(uri) | ||||
uri = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snapshot_swhid) | click.echo(f"{snapshot_swhid}\t{root}") | ||||
click.echo(f"{uri}\t{root}") | |||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
main() | main() |