Changeset View
Changeset View
Standalone View
Standalone View
swh/model/cli.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | import os | ||||
import sys | import sys | ||||
from typing import List | from typing import Dict, List, Optional | ||||
# WARNING: do not import unnecessary things here to keep cli startup time under | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
# control | # control | ||||
import click | import click | ||||
from swh.core.cli import swh as swh_cli_group | from swh.core.cli import swh as swh_cli_group | ||||
from swh.model.identifiers import SWHID | from swh.model.identifiers import CoreSWHID, ObjectType | ||||
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) | CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) | ||||
# Mapping between dulwich types and Software Heritage ones. Used by snapshot ID | # Mapping between dulwich types and Software Heritage ones. Used by snapshot ID | ||||
# computation. | # computation. | ||||
_DULWICH_TYPES = { | _DULWICH_TYPES = { | ||||
b"blob": "content", | b"blob": "content", | ||||
b"tree": "directory", | b"tree": "directory", | ||||
b"commit": "revision", | b"commit": "revision", | ||||
b"tag": "release", | b"tag": "release", | ||||
} | } | ||||
class SWHIDParamType(click.ParamType): | class CoreSWHIDParamType(click.ParamType): | ||||
"""Click argument that accepts SWHID and return them as | """Click argument that accepts a core SWHID and returns them as | ||||
:class:`swh.model.identifiers.SWHID` instances """ | :class:`swh.model.identifiers.CoreSWHID` instances """ | ||||
name = "SWHID" | name = "SWHID" | ||||
def convert(self, value, param, ctx) -> SWHID: | def convert(self, value, param, ctx) -> CoreSWHID: | ||||
from swh.model.exceptions import ValidationError | from swh.model.exceptions import ValidationError | ||||
from swh.model.identifiers import parse_swhid | |||||
try: | try: | ||||
return parse_swhid(value) | return CoreSWHID.from_string(value) | ||||
except ValidationError as e: | except ValidationError as e: | ||||
self.fail(f'"{value}" is not a valid SWHID: {e}', param, ctx) | self.fail(f'"{value}" is not a valid core SWHID: {e}', param, ctx) | ||||
def swhid_of_file(path): | def swhid_of_file(path) -> CoreSWHID: | ||||
from swh.model.from_disk import Content | from swh.model.from_disk import Content | ||||
from swh.model.identifiers import CONTENT, swhid | from swh.model.hashutil import hash_to_bytes | ||||
object = Content.from_file(path=path).get_data() | object = Content.from_file(path=path).get_data() | ||||
return swhid(CONTENT, object) | return CoreSWHID( | ||||
object_type=ObjectType.CONTENT, object_id=hash_to_bytes(object["sha1_git"]) | |||||
) | |||||
def swhid_of_file_content(data): | def swhid_of_file_content(data) -> CoreSWHID: | ||||
from swh.model.from_disk import Content | from swh.model.from_disk import Content | ||||
from swh.model.identifiers import CONTENT, swhid | from swh.model.hashutil import hash_to_bytes | ||||
object = Content.from_bytes(mode=644, data=data).get_data() | object = Content.from_bytes(mode=644, data=data).get_data() | ||||
return swhid(CONTENT, object) | return CoreSWHID( | ||||
object_type=ObjectType.CONTENT, object_id=hash_to_bytes(object["sha1_git"]) | |||||
) | |||||
def swhid_of_dir(path: bytes, exclude_patterns: List[bytes] = None) -> str: | def swhid_of_dir(path: bytes, exclude_patterns: List[bytes] = None) -> CoreSWHID: | ||||
from swh.model.from_disk import ( | from swh.model.from_disk import ( | ||||
Directory, | Directory, | ||||
accept_all_directories, | accept_all_directories, | ||||
ignore_directories_patterns, | ignore_directories_patterns, | ||||
) | ) | ||||
from swh.model.identifiers import DIRECTORY, swhid | from swh.model.hashutil import hash_to_bytes | ||||
dir_filter = ( | dir_filter = ( | ||||
ignore_directories_patterns(path, exclude_patterns) | ignore_directories_patterns(path, exclude_patterns) | ||||
if exclude_patterns | if exclude_patterns | ||||
else accept_all_directories | else accept_all_directories | ||||
) | ) | ||||
object = Directory.from_disk(path=path, dir_filter=dir_filter).get_data() | object = Directory.from_disk(path=path, dir_filter=dir_filter).get_data() | ||||
return swhid(DIRECTORY, object) | return CoreSWHID( | ||||
object_type=ObjectType.DIRECTORY, object_id=hash_to_bytes(object["id"]) | |||||
) | |||||
def swhid_of_origin(url): | def swhid_of_origin(url): | ||||
from swh.model.identifiers import SWHID, origin_identifier | from swh.model.hashutil import hash_to_bytes | ||||
from swh.model.identifiers import ( | |||||
ExtendedObjectType, | |||||
ExtendedSWHID, | |||||
origin_identifier, | |||||
) | |||||
return str(SWHID(object_type="origin", object_id=origin_identifier({"url": url}))) | return ExtendedSWHID( | ||||
object_type=ExtendedObjectType.ORIGIN, | |||||
object_id=hash_to_bytes(origin_identifier({"url": url})), | |||||
) | |||||
def swhid_of_git_repo(path): | def swhid_of_git_repo(path) -> CoreSWHID: | ||||
anlambert: Why changing the return type here ? You could avoid multiple calls to `str()` below by keeping… | |||||
vlorentzAuthorUnsubmitted Done Inline Actionseach function is called exactly one, and str() needs to happen either before returning or after; it doesn't make a difference. I changed it because I prefer coercing to strings as late as possible. vlorentz: each function is called exactly one, and `str()` needs to happen either before returning or… | |||||
anlambertUnsubmitted Not Done Inline ActionsAh right, I misread the code. Nevertheless, I would have keep the string conversion in the functions but that's not a big deal anyway. anlambert: Ah right, I misread the code. Nevertheless, I would have keep the string conversion in the… | |||||
import dulwich.repo | import dulwich.repo | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.identifiers import SWHID, snapshot_identifier | from swh.model.identifiers import snapshot_identifier | ||||
repo = dulwich.repo.Repo(path) | repo = dulwich.repo.Repo(path) | ||||
branches = {} | branches: Dict[bytes, Optional[Dict]] = {} | ||||
for ref, target in repo.refs.as_dict().items(): | for ref, target in repo.refs.as_dict().items(): | ||||
obj = repo[target] | obj = repo[target] | ||||
if obj: | if obj: | ||||
branches[ref] = { | branches[ref] = { | ||||
"target": hashutil.bytehex_to_hash(target), | "target": hashutil.bytehex_to_hash(target), | ||||
"target_type": _DULWICH_TYPES[obj.type_name], | "target_type": _DULWICH_TYPES[obj.type_name], | ||||
} | } | ||||
else: | else: | ||||
branches[ref] = None | branches[ref] = None | ||||
for ref, target in repo.refs.get_symrefs().items(): | for ref, target in repo.refs.get_symrefs().items(): | ||||
branches[ref] = { | branches[ref] = { | ||||
"target": target, | "target": target, | ||||
"target_type": "alias", | "target_type": "alias", | ||||
} | } | ||||
snapshot = {"branches": branches} | snapshot = {"branches": branches} | ||||
return str(SWHID(object_type="snapshot", object_id=snapshot_identifier(snapshot))) | return CoreSWHID( | ||||
object_type=ObjectType.SNAPSHOT, | |||||
object_id=hashutil.hash_to_bytes(snapshot_identifier(snapshot)), | |||||
) | |||||
def identify_object(obj_type, follow_symlinks, exclude_patterns, obj): | def identify_object(obj_type, follow_symlinks, exclude_patterns, obj) -> str: | ||||
from urllib.parse import urlparse | from urllib.parse import urlparse | ||||
if obj_type == "auto": | if obj_type == "auto": | ||||
if obj == "-" or os.path.isfile(obj): | if obj == "-" or os.path.isfile(obj): | ||||
obj_type = "content" | obj_type = "content" | ||||
elif os.path.isdir(obj): | elif os.path.isdir(obj): | ||||
obj_type = "directory" | obj_type = "directory" | ||||
else: | else: | ||||
try: # URL parsing | try: # URL parsing | ||||
if urlparse(obj).scheme: | if urlparse(obj).scheme: | ||||
obj_type = "origin" | obj_type = "origin" | ||||
else: | else: | ||||
raise ValueError | raise ValueError | ||||
except ValueError: | except ValueError: | ||||
raise click.BadParameter("cannot detect object type for %s" % obj) | raise click.BadParameter("cannot detect object type for %s" % obj) | ||||
swhid = None | |||||
if obj == "-": | if obj == "-": | ||||
content = sys.stdin.buffer.read() | content = sys.stdin.buffer.read() | ||||
swhid = swhid_of_file_content(content) | swhid = str(swhid_of_file_content(content)) | ||||
elif obj_type in ["content", "directory"]: | elif obj_type in ["content", "directory"]: | ||||
path = obj.encode(sys.getfilesystemencoding()) | path = obj.encode(sys.getfilesystemencoding()) | ||||
if follow_symlinks and os.path.islink(obj): | if follow_symlinks and os.path.islink(obj): | ||||
path = os.path.realpath(obj) | path = os.path.realpath(obj) | ||||
if obj_type == "content": | if obj_type == "content": | ||||
swhid = swhid_of_file(path) | swhid = str(swhid_of_file(path)) | ||||
elif obj_type == "directory": | elif obj_type == "directory": | ||||
swhid = swhid_of_dir( | swhid = str( | ||||
path, [pattern.encode() for pattern in exclude_patterns] | swhid_of_dir(path, [pattern.encode() for pattern in exclude_patterns]) | ||||
) | ) | ||||
elif obj_type == "origin": | elif obj_type == "origin": | ||||
swhid = swhid_of_origin(obj) | swhid = str(swhid_of_origin(obj)) | ||||
elif obj_type == "snapshot": | elif obj_type == "snapshot": | ||||
swhid = swhid_of_git_repo(obj) | swhid = str(swhid_of_git_repo(obj)) | ||||
else: # shouldn't happen, due to option validation | else: # shouldn't happen, due to option validation | ||||
raise click.BadParameter("invalid object type: " + obj_type) | raise click.BadParameter("invalid object type: " + obj_type) | ||||
# note: we return original obj instead of path here, to preserve user-given | # note: we return original obj instead of path here, to preserve user-given | ||||
# file name in output | # file name in output | ||||
return (obj, swhid) | return swhid | ||||
@swh_cli_group.command(context_settings=CONTEXT_SETTINGS) | @swh_cli_group.command(context_settings=CONTEXT_SETTINGS) | ||||
@click.option( | @click.option( | ||||
"--dereference/--no-dereference", | "--dereference/--no-dereference", | ||||
"follow_symlinks", | "follow_symlinks", | ||||
default=True, | default=True, | ||||
help="follow (or not) symlinks for OBJECTS passed as arguments " | help="follow (or not) symlinks for OBJECTS passed as arguments " | ||||
Show All 21 Lines | @click.option( | ||||
multiple=True, | multiple=True, | ||||
help="Exclude directories using glob patterns \ | help="Exclude directories using glob patterns \ | ||||
(e.g., '*.git' to exclude all .git directories)", | (e.g., '*.git' to exclude all .git directories)", | ||||
) | ) | ||||
@click.option( | @click.option( | ||||
"--verify", | "--verify", | ||||
"-v", | "-v", | ||||
metavar="SWHID", | metavar="SWHID", | ||||
type=SWHIDParamType(), | type=CoreSWHIDParamType(), | ||||
help="reference identifier to be compared with computed one", | help="reference identifier to be compared with computed one", | ||||
) | ) | ||||
@click.argument("objects", nargs=-1, required=True) | @click.argument("objects", nargs=-1, required=True) | ||||
def identify( | def identify( | ||||
obj_type, verify, show_filename, follow_symlinks, objects, exclude_patterns, | obj_type, verify, show_filename, follow_symlinks, objects, exclude_patterns, | ||||
): | ): | ||||
"""Compute the Software Heritage persistent identifier (SWHID) for the given | """Compute the Software Heritage persistent identifier (SWHID) for the given | ||||
source code object(s). | source code object(s). | ||||
Show All 24 Lines | \b | ||||
swh:1:snp:510aa88bdc517345d258c1fc2babcd0e1f905e93 helloworld.git | swh:1:snp:510aa88bdc517345d258c1fc2babcd0e1f905e93 helloworld.git | ||||
""" # NoQA # overlong lines in shell examples are fine | """ # NoQA # overlong lines in shell examples are fine | ||||
from functools import partial | from functools import partial | ||||
if verify and len(objects) != 1: | if verify and len(objects) != 1: | ||||
raise click.BadParameter("verification requires a single object") | raise click.BadParameter("verification requires a single object") | ||||
results = map( | results = zip( | ||||
partial(identify_object, obj_type, follow_symlinks, exclude_patterns), objects, | objects, | ||||
map( | |||||
partial(identify_object, obj_type, follow_symlinks, exclude_patterns), | |||||
objects, | |||||
), | |||||
) | ) | ||||
if verify: | if verify: | ||||
swhid = next(results)[1] | swhid = next(results)[1] | ||||
if str(verify) == swhid: | if str(verify) == swhid: | ||||
click.echo("SWHID match: %s" % swhid) | click.echo("SWHID match: %s" % swhid) | ||||
sys.exit(0) | sys.exit(0) | ||||
else: | else: | ||||
Show All 12 Lines |
Why changing the return type here ? You could avoid multiple calls to str() below by keeping the old behavior.