diff --git a/swh/model/cli.py b/swh/model/cli.py index 8355629..853efa9 100644 --- a/swh/model/cli.py +++ b/swh/model/cli.py @@ -1,128 +1,142 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import os import sys from functools import partial +from urllib.parse import urlparse from swh.model import identifiers as pids from swh.model.exceptions import ValidationError from swh.model.from_disk import Content, Directory CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) class PidParamType(click.ParamType): name = 'persistent identifier' def convert(self, value, param, ctx): try: pids.parse_persistent_identifier(value) return value # return as string, as we need just that except ValidationError as e: self.fail('%s is not a valid PID. %s.' % (value, e), param, ctx) def pid_of_file(path): object = Content.from_file(path=path).get_data() return pids.persistent_identifier(pids.CONTENT, object) def pid_of_dir(path): object = Directory.from_disk(path=path).get_data() return pids.persistent_identifier(pids.DIRECTORY, object) +def pid_of_origin(url): + pid = pids.PersistentId(object_type='origin', + object_id=pids.origin_identifier({'url': url})) + return str(pid) + + def identify_object(obj_type, follow_symlinks, obj): if obj_type == 'auto': if os.path.isfile(obj): obj_type = 'content' elif os.path.isdir(obj): obj_type = 'directory' - else: # shouldn't happen, due to path validation - raise click.BadParameter('%s is neither a file nor a directory' % - obj) - - path = obj - if follow_symlinks and os.path.islink(obj): - path = os.path.realpath(obj) + else: + try: # URL parsing + if urlparse(obj).scheme: + obj_type = 'origin' + else: + raise ValueError + except ValueError: + raise click.BadParameter('cannot detect object type for %s' % + obj) pid = None - if obj_type == 'content': - pid = pid_of_file(path) - elif obj_type == 'directory': - pid = pid_of_dir(path) + + if obj_type in ['content', 'directory']: + path = obj.encode(sys.getfilesystemencoding()) + if follow_symlinks and os.path.islink(obj): + path = os.path.realpath(obj) + if obj_type == 'content': + pid = pid_of_file(path) + elif obj_type == 'directory': + pid = pid_of_dir(path) + elif obj_type == 'origin': + pid = pid_of_origin(obj) else: # shouldn't happen, due to option validation raise click.BadParameter('invalid object type: ' + obj_type) # note: we return original obj instead of path here, to preserve user-given # file name in output return (obj, pid) @click.command(context_settings=CONTEXT_SETTINGS) @click.option('--dereference/--no-dereference', 'follow_symlinks', default=True, help='follow (or not) symlinks for OBJECTS passed as arguments ' + '(default: follow)') @click.option('--filename/--no-filename', 'show_filename', default=True, help='show/hide file name (default: show)') @click.option('--type', '-t', 'obj_type', default='auto', - type=click.Choice(['auto', 'content', 'directory']), + type=click.Choice(['auto', 'content', 'directory', 'origin']), help='type of object to identify (default: auto)') @click.option('--verify', '-v', metavar='PID', type=PidParamType(), help='reference identifier to be compared with computed one') -@click.argument('objects', nargs=-1, required=True, - type=click.Path(exists=True, readable=True, - allow_dash=True, path_type=bytes)) +@click.argument('objects', nargs=-1, required=True) def identify(obj_type, verify, show_filename, follow_symlinks, objects): """Compute the Software Heritage persistent identifier (PID) for the given source code object(s). For more details about Software Heritage PIDs see: \b https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html \b Examples: \b $ swh identify fork.c kmod.c sched/deadline.c swh:1:cnt:2e391c754ae730bd2d8520c2ab497c403220c6e3 fork.c swh:1:cnt:0277d1216f80ae1adeed84a686ed34c9b2931fc2 kmod.c swh:1:cnt:57b939c81bce5d06fa587df8915f05affbe22b82 sched/deadline.c \b $ swh identify --no-filename /usr/src/linux/kernel/ swh:1:dir:f9f858a48d663b3809c9e2f336412717496202ab """ if verify and len(objects) != 1: raise click.BadParameter('verification requires a single object') results = map(partial(identify_object, obj_type, follow_symlinks), objects) if verify: pid = next(results)[1] if verify == pid: click.echo('PID match: %s' % pid) sys.exit(0) else: click.echo('PID mismatch: %s != %s' % (verify, pid)) sys.exit(1) else: for (obj, pid) in results: msg = pid if show_filename: msg = '%s\t%s' % (pid, os.fsdecode(obj)) click.echo(msg) if __name__ == '__main__': identify() diff --git a/swh/model/tests/test_cli.py b/swh/model/tests/test_cli.py index e4232fe..7f70b46 100644 --- a/swh/model/tests/test_cli.py +++ b/swh/model/tests/test_cli.py @@ -1,116 +1,132 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import tempfile import unittest from click.testing import CliRunner import pytest from swh.model import cli from swh.model.hashutil import hash_to_hex from swh.model.tests.test_from_disk import DataMixin @pytest.mark.fs class TestIdentify(DataMixin, unittest.TestCase): def setUp(self): super().setUp() self.runner = CliRunner() def assertPidOK(self, result, pid): # noqa: N802 self.assertEqual(result.exit_code, 0) self.assertEqual(result.output.split()[0], pid) def test_content_id(self): """identify file content""" self.make_contents(self.tmpdir_name) for filename, content in self.contents.items(): path = os.path.join(self.tmpdir_name, filename) result = self.runner.invoke(cli.identify, ['--type', 'content', path]) self.assertPidOK(result, 'swh:1:cnt:' + hash_to_hex(content['sha1_git'])) def test_directory_id(self): """identify an entire directory""" self.make_from_tarball(self.tmpdir_name) path = os.path.join(self.tmpdir_name, b'sample-folder') result = self.runner.invoke(cli.identify, ['--type', 'directory', path]) self.assertPidOK(result, 'swh:1:dir:e8b0f1466af8608c8a3fb9879db172b887e80759') + def test_origin_id(self): + """identify an origin URL""" + url = 'https://github.com/torvalds/linux' + result = self.runner.invoke(cli.identify, ['--type', 'origin', url]) + self.assertPidOK(result, + 'swh:1:ori:b63a575fe3faab7692c9f38fb09d4bb45651bb0f') + def test_symlink(self): """identify symlink --- both itself and target""" regular = os.path.join(self.tmpdir_name, b'foo.txt') link = os.path.join(self.tmpdir_name, b'bar.txt') open(regular, 'w').write('foo\n') os.symlink(os.path.basename(regular), link) result = self.runner.invoke(cli.identify, [link]) self.assertPidOK(result, 'swh:1:cnt:257cc5642cb1a054f08cc83f2d943e56fd3ebe99') result = self.runner.invoke(cli.identify, ['--no-dereference', link]) self.assertPidOK(result, 'swh:1:cnt:996f1789ff67c0e3f69ef5933a55d54c5d0e9954') def test_show_filename(self): """filename is shown by default""" self.make_contents(self.tmpdir_name) for filename, content in self.contents.items(): path = os.path.join(self.tmpdir_name, filename) result = self.runner.invoke(cli.identify, ['--type', 'content', path]) self.assertEqual(result.exit_code, 0) self.assertEqual(result.output.rstrip(), 'swh:1:cnt:%s\t%s' % (hash_to_hex(content['sha1_git']), path.decode())) def test_hide_filename(self): """filename is hidden upon request""" self.make_contents(self.tmpdir_name) for filename, content in self.contents.items(): path = os.path.join(self.tmpdir_name, filename) result = self.runner.invoke(cli.identify, ['--type', 'content', '--no-filename', path]) self.assertPidOK(result, 'swh:1:cnt:' + hash_to_hex(content['sha1_git'])) - def test_auto_id(self): - """automatic object type: file or directory, depending on argument""" + def test_auto_content(self): + """automatic object type detection: content""" with tempfile.NamedTemporaryFile(prefix='swh.model.cli') as f: result = self.runner.invoke(cli.identify, [f.name]) self.assertEqual(result.exit_code, 0) self.assertRegex(result.output, r'^swh:\d+:cnt:') + def test_auto_directory(self): + """automatic object type detection: directory""" with tempfile.TemporaryDirectory(prefix='swh.model.cli') as dirname: result = self.runner.invoke(cli.identify, [dirname]) self.assertEqual(result.exit_code, 0) self.assertRegex(result.output, r'^swh:\d+:dir:') + def test_auto_origin(self): + """automatic object type detection: origin""" + result = self.runner.invoke(cli.identify, + ['https://github.com/torvalds/linux']) + self.assertEqual(result.exit_code, 0) + self.assertRegex(result.output, r'^swh:\d+:ori:') + def test_verify_content(self): """identifier verification""" self.make_contents(self.tmpdir_name) for filename, content in self.contents.items(): expected_id = 'swh:1:cnt:' + hash_to_hex(content['sha1_git']) # match path = os.path.join(self.tmpdir_name, filename) result = self.runner.invoke(cli.identify, ['--verify', expected_id, path]) self.assertEqual(result.exit_code, 0) # mismatch with open(path, 'a') as f: f.write('trailing garbage to make verification fail') result = self.runner.invoke(cli.identify, ['--verify', expected_id, path]) self.assertEqual(result.exit_code, 1)