Changeset View
Changeset View
Standalone View
Standalone View
swh/model/cli.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import click | import click | ||||
import dulwich.repo | import dulwich.repo | ||||
import os | import os | ||||
import sys | import sys | ||||
from functools import partial | from functools import partial | ||||
from urllib.parse import urlparse | from urllib.parse import urlparse | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model import identifiers as pids | from swh.model.identifiers import ( | ||||
origin_identifier, | |||||
snapshot_identifier, | |||||
parse_swhid, | |||||
swhid, | |||||
SWHID, | |||||
CONTENT, | |||||
DIRECTORY, | |||||
) | |||||
from swh.model.exceptions import ValidationError | from swh.model.exceptions import ValidationError | ||||
from swh.model.from_disk import Content, Directory | from swh.model.from_disk import Content, Directory | ||||
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) | CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) | ||||
# Mapping between dulwich types and Software Heritage ones. Used by snapshot ID | # Mapping between dulwich types and Software Heritage ones. Used by snapshot ID | ||||
# computation. | # computation. | ||||
_DULWICH_TYPES = { | _DULWICH_TYPES = { | ||||
b"blob": "content", | b"blob": "content", | ||||
b"tree": "directory", | b"tree": "directory", | ||||
b"commit": "revision", | b"commit": "revision", | ||||
b"tag": "release", | b"tag": "release", | ||||
} | } | ||||
class PidParamType(click.ParamType): | class SWHIDParamType(click.ParamType): | ||||
name = "persistent identifier" | name = "persistent identifier" | ||||
def convert(self, value, param, ctx): | def convert(self, value, param, ctx): | ||||
try: | try: | ||||
pids.parse_persistent_identifier(value) | parse_swhid(value) | ||||
return value # return as string, as we need just that | return value # return as string, as we need just that | ||||
except ValidationError as e: | except ValidationError as e: | ||||
self.fail("%s is not a valid SWHID. %s." % (value, e), param, ctx) | self.fail("%s is not a valid SWHID. %s." % (value, e), param, ctx) | ||||
def pid_of_file(path): | def swhid_of_file(path): | ||||
object = Content.from_file(path=path).get_data() | object = Content.from_file(path=path).get_data() | ||||
return pids.persistent_identifier(pids.CONTENT, object) | return swhid(CONTENT, object) | ||||
def pid_of_file_content(data): | def swhid_of_file_content(data): | ||||
object = Content.from_bytes(mode=644, data=data).get_data() | object = Content.from_bytes(mode=644, data=data).get_data() | ||||
return pids.persistent_identifier(pids.CONTENT, object) | return swhid(CONTENT, object) | ||||
def pid_of_dir(path): | def swhid_of_dir(path): | ||||
object = Directory.from_disk(path=path).get_data() | object = Directory.from_disk(path=path).get_data() | ||||
return pids.persistent_identifier(pids.DIRECTORY, object) | return swhid(DIRECTORY, object) | ||||
def pid_of_origin(url): | def swhid_of_origin(url): | ||||
pid = pids.PersistentId( | swhid = SWHID(object_type="origin", object_id=origin_identifier({"url": url})) | ||||
object_type="origin", object_id=pids.origin_identifier({"url": url}) | return str(swhid) | ||||
) | |||||
return str(pid) | |||||
def pid_of_git_repo(path): | def swhid_of_git_repo(path): | ||||
repo = dulwich.repo.Repo(path) | repo = dulwich.repo.Repo(path) | ||||
branches = {} | branches = {} | ||||
for ref, target in repo.refs.as_dict().items(): | for ref, target in repo.refs.as_dict().items(): | ||||
obj = repo[target] | obj = repo[target] | ||||
if obj: | if obj: | ||||
branches[ref] = { | branches[ref] = { | ||||
"target": hashutil.bytehex_to_hash(target), | "target": hashutil.bytehex_to_hash(target), | ||||
"target_type": _DULWICH_TYPES[obj.type_name], | "target_type": _DULWICH_TYPES[obj.type_name], | ||||
} | } | ||||
else: | else: | ||||
branches[ref] = None | branches[ref] = None | ||||
for ref, target in repo.refs.get_symrefs().items(): | for ref, target in repo.refs.get_symrefs().items(): | ||||
branches[ref] = { | branches[ref] = { | ||||
"target": target, | "target": target, | ||||
"target_type": "alias", | "target_type": "alias", | ||||
} | } | ||||
snapshot = {"branches": branches} | snapshot = {"branches": branches} | ||||
pid = pids.PersistentId( | swhid = SWHID(object_type="snapshot", object_id=snapshot_identifier(snapshot)) | ||||
object_type="snapshot", object_id=pids.snapshot_identifier(snapshot) | return str(swhid) | ||||
) | |||||
return str(pid) | |||||
def identify_object(obj_type, follow_symlinks, obj): | def identify_object(obj_type, follow_symlinks, obj): | ||||
if obj_type == "auto": | if obj_type == "auto": | ||||
if obj == "-" or os.path.isfile(obj): | if obj == "-" or os.path.isfile(obj): | ||||
obj_type = "content" | obj_type = "content" | ||||
elif os.path.isdir(obj): | elif os.path.isdir(obj): | ||||
obj_type = "directory" | obj_type = "directory" | ||||
else: | else: | ||||
try: # URL parsing | try: # URL parsing | ||||
if urlparse(obj).scheme: | if urlparse(obj).scheme: | ||||
obj_type = "origin" | obj_type = "origin" | ||||
else: | else: | ||||
raise ValueError | raise ValueError | ||||
except ValueError: | except ValueError: | ||||
raise click.BadParameter("cannot detect object type for %s" % obj) | raise click.BadParameter("cannot detect object type for %s" % obj) | ||||
pid = None | swhid = None | ||||
if obj == "-": | if obj == "-": | ||||
content = sys.stdin.buffer.read() | content = sys.stdin.buffer.read() | ||||
pid = pid_of_file_content(content) | swhid = swhid_of_file_content(content) | ||||
elif obj_type in ["content", "directory"]: | elif obj_type in ["content", "directory"]: | ||||
path = obj.encode(sys.getfilesystemencoding()) | path = obj.encode(sys.getfilesystemencoding()) | ||||
if follow_symlinks and os.path.islink(obj): | if follow_symlinks and os.path.islink(obj): | ||||
path = os.path.realpath(obj) | path = os.path.realpath(obj) | ||||
if obj_type == "content": | if obj_type == "content": | ||||
pid = pid_of_file(path) | swhid = swhid_of_file(path) | ||||
elif obj_type == "directory": | elif obj_type == "directory": | ||||
pid = pid_of_dir(path) | swhid = swhid_of_dir(path) | ||||
elif obj_type == "origin": | elif obj_type == "origin": | ||||
pid = pid_of_origin(obj) | swhid = swhid_of_origin(obj) | ||||
elif obj_type == "snapshot": | elif obj_type == "snapshot": | ||||
pid = pid_of_git_repo(obj) | swhid = swhid_of_git_repo(obj) | ||||
else: # shouldn't happen, due to option validation | else: # shouldn't happen, due to option validation | ||||
raise click.BadParameter("invalid object type: " + obj_type) | raise click.BadParameter("invalid object type: " + obj_type) | ||||
# note: we return original obj instead of path here, to preserve user-given | # note: we return original obj instead of path here, to preserve user-given | ||||
# file name in output | # file name in output | ||||
return (obj, pid) | return (obj, swhid) | ||||
@click.command(context_settings=CONTEXT_SETTINGS) | @click.command(context_settings=CONTEXT_SETTINGS) | ||||
@click.option( | @click.option( | ||||
"--dereference/--no-dereference", | "--dereference/--no-dereference", | ||||
"follow_symlinks", | "follow_symlinks", | ||||
default=True, | default=True, | ||||
help="follow (or not) symlinks for OBJECTS passed as arguments " | help="follow (or not) symlinks for OBJECTS passed as arguments " | ||||
Show All 12 Lines | @click.option( | ||||
default="auto", | default="auto", | ||||
type=click.Choice(["auto", "content", "directory", "origin", "snapshot"]), | type=click.Choice(["auto", "content", "directory", "origin", "snapshot"]), | ||||
help="type of object to identify (default: auto)", | help="type of object to identify (default: auto)", | ||||
) | ) | ||||
@click.option( | @click.option( | ||||
"--verify", | "--verify", | ||||
"-v", | "-v", | ||||
metavar="SWHID", | metavar="SWHID", | ||||
type=PidParamType(), | type=SWHIDParamType(), | ||||
help="reference identifier to be compared with computed one", | help="reference identifier to be compared with computed one", | ||||
) | ) | ||||
@click.argument("objects", nargs=-1, required=True) | @click.argument("objects", nargs=-1, required=True) | ||||
def identify(obj_type, verify, show_filename, follow_symlinks, objects): | def identify(obj_type, verify, show_filename, follow_symlinks, objects): | ||||
"""Compute the Software Heritage persistent identifier (SWHID) for the given | """Compute the Software Heritage persistent identifier (SWHID) for the given | ||||
source code object(s). | source code object(s). | ||||
For more details about SWHIDs see: | For more details about SWHIDs see: | ||||
Show All 24 Lines | def identify(obj_type, verify, show_filename, follow_symlinks, objects): | ||||
""" # NoQA # overlong lines in shell examples are fine | """ # NoQA # overlong lines in shell examples are fine | ||||
if verify and len(objects) != 1: | if verify and len(objects) != 1: | ||||
raise click.BadParameter("verification requires a single object") | raise click.BadParameter("verification requires a single object") | ||||
results = map(partial(identify_object, obj_type, follow_symlinks), objects) | results = map(partial(identify_object, obj_type, follow_symlinks), objects) | ||||
if verify: | if verify: | ||||
pid = next(results)[1] | swhid = next(results)[1] | ||||
if verify == pid: | if verify == swhid: | ||||
click.echo("SWHID match: %s" % pid) | click.echo("SWHID match: %s" % swhid) | ||||
sys.exit(0) | sys.exit(0) | ||||
else: | else: | ||||
click.echo("SWHID mismatch: %s != %s" % (verify, pid)) | click.echo("SWHID mismatch: %s != %s" % (verify, swhid)) | ||||
sys.exit(1) | sys.exit(1) | ||||
else: | else: | ||||
for (obj, pid) in results: | for (obj, swhid) in results: | ||||
msg = pid | msg = swhid | ||||
if show_filename: | if show_filename: | ||||
msg = "%s\t%s" % (pid, os.fsdecode(obj)) | msg = "%s\t%s" % (swhid, os.fsdecode(obj)) | ||||
click.echo(msg) | click.echo(msg) | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
identify() | identify() |