diff --git a/swh/model/cli.py b/swh/model/cli.py index 6e69d1d..8ac9250 100644 --- a/swh/model/cli.py +++ b/swh/model/cli.py @@ -1,256 +1,274 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import sys -from typing import List +from typing import Dict, List, Optional # WARNING: do not import unnecessary things here to keep cli startup time under # control import click from swh.core.cli import swh as swh_cli_group -from swh.model.identifiers import SWHID +from swh.model.identifiers import CoreSWHID, ObjectType CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) # Mapping between dulwich types and Software Heritage ones. Used by snapshot ID # computation. _DULWICH_TYPES = { b"blob": "content", b"tree": "directory", b"commit": "revision", b"tag": "release", } -class SWHIDParamType(click.ParamType): - """Click argument that accepts SWHID and return them as - :class:`swh.model.identifiers.SWHID` instances """ +class CoreSWHIDParamType(click.ParamType): + """Click argument that accepts a core SWHID and returns them as + :class:`swh.model.identifiers.CoreSWHID` instances """ name = "SWHID" - def convert(self, value, param, ctx) -> SWHID: + def convert(self, value, param, ctx) -> CoreSWHID: from swh.model.exceptions import ValidationError - from swh.model.identifiers import parse_swhid try: - return parse_swhid(value) + return CoreSWHID.from_string(value) except ValidationError as e: - self.fail(f'"{value}" is not a valid SWHID: {e}', param, ctx) + self.fail(f'"{value}" is not a valid core SWHID: {e}', param, ctx) -def swhid_of_file(path): +def swhid_of_file(path) -> CoreSWHID: from swh.model.from_disk import Content - from swh.model.identifiers import CONTENT, swhid + from swh.model.hashutil import hash_to_bytes object = Content.from_file(path=path).get_data() - return swhid(CONTENT, object) + return CoreSWHID( + object_type=ObjectType.CONTENT, object_id=hash_to_bytes(object["sha1_git"]) + ) -def swhid_of_file_content(data): +def swhid_of_file_content(data) -> CoreSWHID: from swh.model.from_disk import Content - from swh.model.identifiers import CONTENT, swhid + from swh.model.hashutil import hash_to_bytes object = Content.from_bytes(mode=644, data=data).get_data() - return swhid(CONTENT, object) + return CoreSWHID( + object_type=ObjectType.CONTENT, object_id=hash_to_bytes(object["sha1_git"]) + ) -def swhid_of_dir(path: bytes, exclude_patterns: List[bytes] = None) -> str: +def swhid_of_dir(path: bytes, exclude_patterns: List[bytes] = None) -> CoreSWHID: from swh.model.from_disk import ( Directory, accept_all_directories, ignore_directories_patterns, ) - from swh.model.identifiers import DIRECTORY, swhid + from swh.model.hashutil import hash_to_bytes dir_filter = ( ignore_directories_patterns(path, exclude_patterns) if exclude_patterns else accept_all_directories ) object = Directory.from_disk(path=path, dir_filter=dir_filter).get_data() - return swhid(DIRECTORY, object) + return CoreSWHID( + object_type=ObjectType.DIRECTORY, object_id=hash_to_bytes(object["id"]) + ) def swhid_of_origin(url): - from swh.model.identifiers import SWHID, origin_identifier + from swh.model.hashutil import hash_to_bytes + from swh.model.identifiers import ( + ExtendedObjectType, + ExtendedSWHID, + origin_identifier, + ) - return str(SWHID(object_type="origin", object_id=origin_identifier({"url": url}))) + return ExtendedSWHID( + object_type=ExtendedObjectType.ORIGIN, + object_id=hash_to_bytes(origin_identifier({"url": url})), + ) -def swhid_of_git_repo(path): +def swhid_of_git_repo(path) -> CoreSWHID: import dulwich.repo from swh.model import hashutil - from swh.model.identifiers import SWHID, snapshot_identifier + from swh.model.identifiers import snapshot_identifier repo = dulwich.repo.Repo(path) - branches = {} + branches: Dict[bytes, Optional[Dict]] = {} for ref, target in repo.refs.as_dict().items(): obj = repo[target] if obj: branches[ref] = { "target": hashutil.bytehex_to_hash(target), "target_type": _DULWICH_TYPES[obj.type_name], } else: branches[ref] = None for ref, target in repo.refs.get_symrefs().items(): branches[ref] = { "target": target, "target_type": "alias", } snapshot = {"branches": branches} - return str(SWHID(object_type="snapshot", object_id=snapshot_identifier(snapshot))) + return CoreSWHID( + object_type=ObjectType.SNAPSHOT, + object_id=hashutil.hash_to_bytes(snapshot_identifier(snapshot)), + ) -def identify_object(obj_type, follow_symlinks, exclude_patterns, obj): +def identify_object(obj_type, follow_symlinks, exclude_patterns, obj) -> str: from urllib.parse import urlparse if obj_type == "auto": if obj == "-" or os.path.isfile(obj): obj_type = "content" elif os.path.isdir(obj): obj_type = "directory" else: try: # URL parsing if urlparse(obj).scheme: obj_type = "origin" else: raise ValueError except ValueError: raise click.BadParameter("cannot detect object type for %s" % obj) - swhid = None - if obj == "-": content = sys.stdin.buffer.read() - swhid = swhid_of_file_content(content) + swhid = str(swhid_of_file_content(content)) elif obj_type in ["content", "directory"]: path = obj.encode(sys.getfilesystemencoding()) if follow_symlinks and os.path.islink(obj): path = os.path.realpath(obj) if obj_type == "content": - swhid = swhid_of_file(path) + swhid = str(swhid_of_file(path)) elif obj_type == "directory": - swhid = swhid_of_dir( - path, [pattern.encode() for pattern in exclude_patterns] + swhid = str( + swhid_of_dir(path, [pattern.encode() for pattern in exclude_patterns]) ) elif obj_type == "origin": - swhid = swhid_of_origin(obj) + swhid = str(swhid_of_origin(obj)) elif obj_type == "snapshot": - swhid = swhid_of_git_repo(obj) + swhid = str(swhid_of_git_repo(obj)) else: # shouldn't happen, due to option validation raise click.BadParameter("invalid object type: " + obj_type) # note: we return original obj instead of path here, to preserve user-given # file name in output - return (obj, swhid) + return swhid @swh_cli_group.command(context_settings=CONTEXT_SETTINGS) @click.option( "--dereference/--no-dereference", "follow_symlinks", default=True, help="follow (or not) symlinks for OBJECTS passed as arguments " + "(default: follow)", ) @click.option( "--filename/--no-filename", "show_filename", default=True, help="show/hide file name (default: show)", ) @click.option( "--type", "-t", "obj_type", default="auto", type=click.Choice(["auto", "content", "directory", "origin", "snapshot"]), help="type of object to identify (default: auto)", ) @click.option( "--exclude", "-x", "exclude_patterns", metavar="PATTERN", multiple=True, help="Exclude directories using glob patterns \ (e.g., '*.git' to exclude all .git directories)", ) @click.option( "--verify", "-v", metavar="SWHID", - type=SWHIDParamType(), + type=CoreSWHIDParamType(), help="reference identifier to be compared with computed one", ) @click.argument("objects", nargs=-1, required=True) def identify( obj_type, verify, show_filename, follow_symlinks, objects, exclude_patterns, ): """Compute the Software Heritage persistent identifier (SWHID) for the given source code object(s). For more details about SWHIDs see: \b https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html Tip: you can pass "-" to identify the content of standard input. \b Examples: \b $ swh identify fork.c kmod.c sched/deadline.c swh:1:cnt:2e391c754ae730bd2d8520c2ab497c403220c6e3 fork.c swh:1:cnt:0277d1216f80ae1adeed84a686ed34c9b2931fc2 kmod.c swh:1:cnt:57b939c81bce5d06fa587df8915f05affbe22b82 sched/deadline.c \b $ swh identify --no-filename /usr/src/linux/kernel/ swh:1:dir:f9f858a48d663b3809c9e2f336412717496202ab \b $ git clone --mirror https://forge.softwareheritage.org/source/helloworld.git $ swh identify --type snapshot helloworld.git/ swh:1:snp:510aa88bdc517345d258c1fc2babcd0e1f905e93 helloworld.git """ # NoQA # overlong lines in shell examples are fine from functools import partial if verify and len(objects) != 1: raise click.BadParameter("verification requires a single object") - results = map( - partial(identify_object, obj_type, follow_symlinks, exclude_patterns), objects, + results = zip( + objects, + map( + partial(identify_object, obj_type, follow_symlinks, exclude_patterns), + objects, + ), ) if verify: swhid = next(results)[1] if str(verify) == swhid: click.echo("SWHID match: %s" % swhid) sys.exit(0) else: click.echo("SWHID mismatch: %s != %s" % (verify, swhid)) sys.exit(1) else: for (obj, swhid) in results: msg = swhid if show_filename: msg = "%s\t%s" % (swhid, os.fsdecode(obj)) click.echo(msg) if __name__ == "__main__": identify() diff --git a/swh/model/tests/test_cli.py b/swh/model/tests/test_cli.py index 3d86ede..9a00660 100644 --- a/swh/model/tests/test_cli.py +++ b/swh/model/tests/test_cli.py @@ -1,164 +1,164 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import tarfile import tempfile import unittest from click.testing import CliRunner import pytest from swh.model import cli from swh.model.hashutil import hash_to_hex from swh.model.tests.test_from_disk import DataMixin @pytest.mark.fs class TestIdentify(DataMixin, unittest.TestCase): def setUp(self): super().setUp() self.runner = CliRunner() def assertSWHID(self, result, swhid): - self.assertEqual(result.exit_code, 0) + self.assertEqual(result.exit_code, 0, result.output) self.assertEqual(result.output.split()[0], swhid) def test_no_args(self): result = self.runner.invoke(cli.identify) self.assertNotEqual(result.exit_code, 0) def test_content_id(self): """identify file content""" self.make_contents(self.tmpdir_name) for filename, content in self.contents.items(): path = os.path.join(self.tmpdir_name, filename) result = self.runner.invoke(cli.identify, ["--type", "content", path]) self.assertSWHID(result, "swh:1:cnt:" + hash_to_hex(content["sha1_git"])) def test_content_id_from_stdin(self): """identify file content""" self.make_contents(self.tmpdir_name) for _, content in self.contents.items(): result = self.runner.invoke(cli.identify, ["-"], input=content["data"]) self.assertSWHID(result, "swh:1:cnt:" + hash_to_hex(content["sha1_git"])) def test_directory_id(self): """identify an entire directory""" self.make_from_tarball(self.tmpdir_name) path = os.path.join(self.tmpdir_name, b"sample-folder") result = self.runner.invoke(cli.identify, ["--type", "directory", path]) self.assertSWHID(result, "swh:1:dir:e8b0f1466af8608c8a3fb9879db172b887e80759") def test_snapshot_id(self): """identify a snapshot""" tarball = os.path.join( os.path.dirname(__file__), "data", "repos", "sample-repo.tgz" ) with tempfile.TemporaryDirectory(prefix="swh.model.cli") as d: with tarfile.open(tarball, "r:gz") as t: t.extractall(d) repo_dir = os.path.join(d, "sample-repo") result = self.runner.invoke( cli.identify, ["--type", "snapshot", repo_dir] ) self.assertSWHID( result, "swh:1:snp:abc888898124270905a0ef3c67e872ce08e7e0c1" ) def test_origin_id(self): """identify an origin URL""" url = "https://github.com/torvalds/linux" result = self.runner.invoke(cli.identify, ["--type", "origin", url]) self.assertSWHID(result, "swh:1:ori:b63a575fe3faab7692c9f38fb09d4bb45651bb0f") def test_symlink(self): """identify symlink --- both itself and target""" regular = os.path.join(self.tmpdir_name, b"foo.txt") link = os.path.join(self.tmpdir_name, b"bar.txt") open(regular, "w").write("foo\n") os.symlink(os.path.basename(regular), link) result = self.runner.invoke(cli.identify, [link]) self.assertSWHID(result, "swh:1:cnt:257cc5642cb1a054f08cc83f2d943e56fd3ebe99") result = self.runner.invoke(cli.identify, ["--no-dereference", link]) self.assertSWHID(result, "swh:1:cnt:996f1789ff67c0e3f69ef5933a55d54c5d0e9954") def test_show_filename(self): """filename is shown by default""" self.make_contents(self.tmpdir_name) for filename, content in self.contents.items(): path = os.path.join(self.tmpdir_name, filename) result = self.runner.invoke(cli.identify, ["--type", "content", path]) self.assertEqual(result.exit_code, 0) self.assertEqual( result.output.rstrip(), "swh:1:cnt:%s\t%s" % (hash_to_hex(content["sha1_git"]), path.decode()), ) def test_hide_filename(self): """filename is hidden upon request""" self.make_contents(self.tmpdir_name) for filename, content in self.contents.items(): path = os.path.join(self.tmpdir_name, filename) result = self.runner.invoke( cli.identify, ["--type", "content", "--no-filename", path] ) self.assertSWHID(result, "swh:1:cnt:" + hash_to_hex(content["sha1_git"])) def test_auto_content(self): """automatic object type detection: content""" with tempfile.NamedTemporaryFile(prefix="swh.model.cli") as f: result = self.runner.invoke(cli.identify, [f.name]) self.assertEqual(result.exit_code, 0) self.assertRegex(result.output, r"^swh:\d+:cnt:") def test_auto_directory(self): """automatic object type detection: directory""" with tempfile.TemporaryDirectory(prefix="swh.model.cli") as dirname: result = self.runner.invoke(cli.identify, [dirname]) self.assertEqual(result.exit_code, 0) self.assertRegex(result.output, r"^swh:\d+:dir:") def test_auto_origin(self): """automatic object type detection: origin""" result = self.runner.invoke(cli.identify, ["https://github.com/torvalds/linux"]) - self.assertEqual(result.exit_code, 0) + self.assertEqual(result.exit_code, 0, result.output) self.assertRegex(result.output, r"^swh:\d+:ori:") def test_verify_content(self): """identifier verification""" self.make_contents(self.tmpdir_name) for filename, content in self.contents.items(): expected_id = "swh:1:cnt:" + hash_to_hex(content["sha1_git"]) # match path = os.path.join(self.tmpdir_name, filename) result = self.runner.invoke(cli.identify, ["--verify", expected_id, path]) - self.assertEqual(result.exit_code, 0) + self.assertEqual(result.exit_code, 0, result.output) # mismatch with open(path, "a") as f: f.write("trailing garbage to make verification fail") result = self.runner.invoke(cli.identify, ["--verify", expected_id, path]) self.assertEqual(result.exit_code, 1) def test_exclude(self): """exclude patterns""" self.make_from_tarball(self.tmpdir_name) path = os.path.join(self.tmpdir_name, b"sample-folder") excluded_dir = os.path.join(path, b"excluded_dir\x96") os.mkdir(excluded_dir) with open(os.path.join(excluded_dir, b"some_file"), "w") as f: f.write("content") result = self.runner.invoke( cli.identify, ["--type", "directory", "--exclude", "excluded_*", path] ) self.assertSWHID(result, "swh:1:dir:e8b0f1466af8608c8a3fb9879db172b887e80759")