diff --git a/swh/model/tests/fields/test_compound.py b/swh/model/tests/fields/test_compound.py index b6e13b6..d89a3e1 100644 --- a/swh/model/tests/fields/test_compound.py +++ b/swh/model/tests/fields/test_compound.py @@ -1,241 +1,228 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest -from nose.tools import istest - -from swh.model.exceptions import ValidationError, NON_FIELD_ERRORS +from swh.model.exceptions import NON_FIELD_ERRORS, ValidationError from swh.model.fields import compound, simple class ValidateCompound(unittest.TestCase): def setUp(self): def validate_always(model): return True def validate_never(model): return False self.test_model = 'test model' self.test_schema = { 'int': (True, simple.validate_int), 'str': (True, simple.validate_str), 'str2': (True, simple.validate_str), 'datetime': (False, simple.validate_datetime), NON_FIELD_ERRORS: validate_always, } self.test_schema_shortcut = self.test_schema.copy() self.test_schema_shortcut[NON_FIELD_ERRORS] = validate_never self.test_schema_field_failed = self.test_schema.copy() self.test_schema_field_failed['int'] = (True, [simple.validate_int, validate_never]) self.test_value = { 'str': 'value1', 'str2': 'value2', 'int': 42, 'datetime': datetime.datetime(1990, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), } self.test_value_missing = { 'str': 'value1', } self.test_value_str_error = { 'str': 1984, 'str2': 'value2', 'int': 42, 'datetime': datetime.datetime(1990, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), } self.test_value_missing_keys = {'int'} self.test_value_wrong_type = 42 self.present_keys = set(self.test_value) self.missing_keys = {'missingkey1', 'missingkey2'} - @istest - def validate_any_key(self): + def test_validate_any_key(self): self.assertTrue( compound.validate_any_key(self.test_value, self.present_keys)) self.assertTrue( compound.validate_any_key(self.test_value, self.present_keys | self.missing_keys)) - @istest - def validate_any_key_missing(self): + def test_validate_any_key_missing(self): with self.assertRaises(ValidationError) as cm: compound.validate_any_key(self.test_value, self.missing_keys) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'missing-alternative-field') self.assertEqual(exc.params['missing_fields'], ', '.join(sorted(self.missing_keys))) - @istest - def validate_all_keys(self): + def test_validate_all_keys(self): self.assertTrue( compound.validate_all_keys(self.test_value, self.present_keys)) - @istest - def validate_all_keys_missing(self): + def test_validate_all_keys_missing(self): with self.assertRaises(ValidationError) as cm: compound.validate_all_keys(self.test_value, self.missing_keys) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'missing-mandatory-field') self.assertEqual(exc.params['missing_fields'], ', '.join(sorted(self.missing_keys))) with self.assertRaises(ValidationError) as cm: compound.validate_all_keys(self.test_value, self.present_keys | self.missing_keys) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'missing-mandatory-field') self.assertEqual(exc.params['missing_fields'], ', '.join(sorted(self.missing_keys))) - @istest - def validate_against_schema(self): + def test_validate_against_schema(self): self.assertTrue( compound.validate_against_schema(self.test_model, self.test_schema, self.test_value)) - @istest - def validate_against_schema_wrong_type(self): + def test_validate_against_schema_wrong_type(self): with self.assertRaises(ValidationError) as cm: compound.validate_against_schema(self.test_model, self.test_schema, self.test_value_wrong_type) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'model-unexpected-type') self.assertEqual(exc.params['model'], self.test_model) self.assertEqual(exc.params['type'], self.test_value_wrong_type.__class__.__name__) - @istest - def validate_against_schema_mandatory_keys(self): + def test_validate_against_schema_mandatory_keys(self): with self.assertRaises(ValidationError) as cm: compound.validate_against_schema(self.test_model, self.test_schema, self.test_value_missing) # The exception should be of the form: # ValidationError({ # 'mandatory_key1': [ValidationError('model-field-mandatory')], # 'mandatory_key2': [ValidationError('model-field-mandatory')], # }) exc = cm.exception self.assertIsInstance(str(exc), str) for key in self.test_value_missing_keys: nested_key = exc.error_dict[key] self.assertIsInstance(nested_key, list) self.assertEqual(len(nested_key), 1) nested = nested_key[0] self.assertIsInstance(nested, ValidationError) self.assertEqual(nested.code, 'model-field-mandatory') self.assertEqual(nested.params['field'], key) - @istest - def validate_against_schema_whole_schema_shortcut_previous_error(self): + def test_validate_whole_schema_shortcut_previous_error(self): with self.assertRaises(ValidationError) as cm: compound.validate_against_schema( self.test_model, self.test_schema_shortcut, self.test_value_missing, ) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertNotIn(NON_FIELD_ERRORS, exc.error_dict) - @istest - def validate_against_schema_whole_schema(self): + def test_validate_whole_schema(self): with self.assertRaises(ValidationError) as cm: compound.validate_against_schema( self.test_model, self.test_schema_shortcut, self.test_value, ) # The exception should be of the form: # ValidationError({ # NON_FIELD_ERRORS: [ValidationError('model-validation-failed')], # }) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEquals(set(exc.error_dict.keys()), {NON_FIELD_ERRORS}) non_field_errors = exc.error_dict[NON_FIELD_ERRORS] self.assertIsInstance(non_field_errors, list) self.assertEquals(len(non_field_errors), 1) nested = non_field_errors[0] self.assertIsInstance(nested, ValidationError) self.assertEquals(nested.code, 'model-validation-failed') self.assertEquals(nested.params['model'], self.test_model) self.assertEquals(nested.params['validator'], 'validate_never') - @istest - def validate_against_schema_field_error(self): + def test_validate_against_schema_field_error(self): with self.assertRaises(ValidationError) as cm: compound.validate_against_schema(self.test_model, self.test_schema, self.test_value_str_error) # The exception should be of the form: # ValidationError({ # 'str': [ValidationError('unexpected-type')], # }) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEquals(set(exc.error_dict.keys()), {'str'}) str_errors = exc.error_dict['str'] self.assertIsInstance(str_errors, list) self.assertEquals(len(str_errors), 1) nested = str_errors[0] self.assertIsInstance(nested, ValidationError) self.assertEquals(nested.code, 'unexpected-type') - @istest - def validate_against_schema_field_failed(self): + def test_validate_against_schema_field_failed(self): with self.assertRaises(ValidationError) as cm: compound.validate_against_schema(self.test_model, self.test_schema_field_failed, self.test_value) # The exception should be of the form: # ValidationError({ # 'int': [ValidationError('field-validation-failed')], # }) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEquals(set(exc.error_dict.keys()), {'int'}) int_errors = exc.error_dict['int'] self.assertIsInstance(int_errors, list) self.assertEquals(len(int_errors), 1) nested = int_errors[0] self.assertIsInstance(nested, ValidationError) self.assertEquals(nested.code, 'field-validation-failed') self.assertEquals(nested.params['validator'], 'validate_never') self.assertEquals(nested.params['field'], 'int') diff --git a/swh/model/tests/fields/test_hashes.py b/swh/model/tests/fields/test_hashes.py index 0ef303f..7ce0b78 100644 --- a/swh/model/tests/fields/test_hashes.py +++ b/swh/model/tests/fields/test_hashes.py @@ -1,162 +1,150 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from nose.tools import istest - from swh.model.exceptions import ValidationError from swh.model.fields import hashes class ValidateHashes(unittest.TestCase): def setUp(self): self.valid_byte_hashes = { 'sha1': b'\xf1\xd2\xd2\xf9\x24\xe9\x86\xac\x86\xfd\xf7\xb3\x6c\x94' b'\xbc\xdf\x32\xbe\xec\x15', 'sha1_git': b'\x25\x7c\xc5\x64\x2c\xb1\xa0\x54\xf0\x8c\xc8\x3f\x2d' b'\x94\x3e\x56\xfd\x3e\xbe\x99', 'sha256': b'\xb5\xbb\x9d\x80\x14\xa0\xf9\xb1\xd6\x1e\x21\xe7\x96' b'\xd7\x8d\xcc\xdf\x13\x52\xf2\x3c\xd3\x28\x12\xf4\x85' b'\x0b\x87\x8a\xe4\x94\x4c', } self.valid_str_hashes = { 'sha1': 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15', 'sha1_git': '257cc5642cb1a054f08cc83f2d943e56fd3ebe99', 'sha256': 'b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f485' '0b878ae4944c', } self.bad_hash = object() - @istest - def valid_bytes_hash(self): + def test_valid_bytes_hash(self): for hash_type, value in self.valid_byte_hashes.items(): self.assertTrue(hashes.validate_hash(value, hash_type)) - @istest - def valid_str_hash(self): + def test_valid_str_hash(self): for hash_type, value in self.valid_str_hashes.items(): self.assertTrue(hashes.validate_hash(value, hash_type)) - @istest - def invalid_hash_type(self): + def test_invalid_hash_type(self): hash_type = 'unknown_hash_type' with self.assertRaises(ValidationError) as cm: hashes.validate_hash(self.valid_str_hashes['sha1'], hash_type) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-type') self.assertEqual(exc.params['hash_type'], hash_type) self.assertIn('Unexpected hash type', str(exc)) self.assertIn(hash_type, str(exc)) - @istest - def invalid_bytes_len(self): + def test_invalid_bytes_len(self): for hash_type, value in self.valid_byte_hashes.items(): value = value + b'\x00\x01' with self.assertRaises(ValidationError) as cm: hashes.validate_hash(value, hash_type) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-length') self.assertEqual(exc.params['hash_type'], hash_type) self.assertEqual(exc.params['length'], len(value)) self.assertIn('Unexpected length', str(exc)) self.assertIn(str(len(value)), str(exc)) - @istest - def invalid_str_len(self): + def test_invalid_str_len(self): for hash_type, value in self.valid_str_hashes.items(): value = value + '0001' with self.assertRaises(ValidationError) as cm: hashes.validate_hash(value, hash_type) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-length') self.assertEqual(exc.params['hash_type'], hash_type) self.assertEqual(exc.params['length'], len(value)) self.assertIn('Unexpected length', str(exc)) self.assertIn(str(len(value)), str(exc)) - @istest - def invalid_str_contents(self): + def test_invalid_str_contents(self): for hash_type, value in self.valid_str_hashes.items(): value = '\xa2' + value[1:-1] + '\xc3' with self.assertRaises(ValidationError) as cm: hashes.validate_hash(value, hash_type) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-contents') self.assertEqual(exc.params['hash_type'], hash_type) self.assertEqual(exc.params['unexpected_chars'], '\xa2, \xc3') self.assertIn('Unexpected characters', str(exc)) self.assertIn('\xc3', str(exc)) self.assertIn('\xa2', str(exc)) - @istest - def invalid_value_type(self): + def test_invalid_value_type(self): with self.assertRaises(ValidationError) as cm: hashes.validate_hash(self.bad_hash, 'sha1') exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-value-type') self.assertEqual(exc.params['type'], self.bad_hash.__class__.__name__) self.assertIn('Unexpected type', str(exc)) self.assertIn(self.bad_hash.__class__.__name__, str(exc)) - @istest - def validate_sha1(self): + def test_validate_sha1(self): self.assertTrue(hashes.validate_sha1(self.valid_byte_hashes['sha1'])) self.assertTrue(hashes.validate_sha1(self.valid_str_hashes['sha1'])) with self.assertRaises(ValidationError) as cm: hashes.validate_sha1(self.bad_hash) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-value-type') self.assertEqual(exc.params['type'], self.bad_hash.__class__.__name__) - @istest - def validate_sha1_git(self): + def test_validate_sha1_git(self): self.assertTrue( hashes.validate_sha1_git(self.valid_byte_hashes['sha1_git'])) self.assertTrue( hashes.validate_sha1_git(self.valid_str_hashes['sha1_git'])) with self.assertRaises(ValidationError) as cm: hashes.validate_sha1_git(self.bad_hash) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-value-type') self.assertEqual(exc.params['type'], self.bad_hash.__class__.__name__) - @istest - def validate_sha256(self): + def test_validate_sha256(self): self.assertTrue( hashes.validate_sha256(self.valid_byte_hashes['sha256'])) self.assertTrue( hashes.validate_sha256(self.valid_str_hashes['sha256'])) with self.assertRaises(ValidationError) as cm: hashes.validate_sha256(self.bad_hash) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-value-type') self.assertEqual(exc.params['type'], self.bad_hash.__class__.__name__) diff --git a/swh/model/tests/fields/test_simple.py b/swh/model/tests/fields/test_simple.py index 6fa2918..ab5e262 100644 --- a/swh/model/tests/fields/test_simple.py +++ b/swh/model/tests/fields/test_simple.py @@ -1,136 +1,123 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest -from nose.tools import istest - from swh.model.exceptions import ValidationError from swh.model.fields import simple class ValidateSimple(unittest.TestCase): def setUp(self): self.valid_str = 'I am a valid string' self.valid_bytes = b'I am a valid bytes object' self.enum_values = {'an enum value', 'other', 'and another'} self.invalid_enum_value = 'invalid enum value' self.valid_int = 42 self.valid_real = 42.42 self.valid_datetime = datetime.datetime(1999, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) self.invalid_datetime_notz = datetime.datetime(1999, 1, 1, 12, 0, 0) - @istest - def validate_int(self): + def test_validate_int(self): self.assertTrue(simple.validate_int(self.valid_int)) - @istest - def validate_int_invalid_type(self): + def test_validate_int_invalid_type(self): with self.assertRaises(ValidationError) as cm: simple.validate_int(self.valid_str) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-type') self.assertEqual(exc.params['expected_type'], 'Integral') self.assertEqual(exc.params['type'], 'str') - @istest - def validate_str(self): + def test_validate_str(self): self.assertTrue(simple.validate_str(self.valid_str)) - @istest - def validate_str_invalid_type(self): + def test_validate_str_invalid_type(self): with self.assertRaises(ValidationError) as cm: simple.validate_str(self.valid_int) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-type') self.assertEqual(exc.params['expected_type'], 'str') self.assertEqual(exc.params['type'], 'int') with self.assertRaises(ValidationError) as cm: simple.validate_str(self.valid_bytes) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-type') self.assertEqual(exc.params['expected_type'], 'str') self.assertEqual(exc.params['type'], 'bytes') - @istest - def validate_bytes(self): + def test_validate_bytes(self): self.assertTrue(simple.validate_bytes(self.valid_bytes)) - @istest - def validate_bytes_invalid_type(self): + def test_validate_bytes_invalid_type(self): with self.assertRaises(ValidationError) as cm: simple.validate_bytes(self.valid_int) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-type') self.assertEqual(exc.params['expected_type'], 'bytes') self.assertEqual(exc.params['type'], 'int') with self.assertRaises(ValidationError) as cm: simple.validate_bytes(self.valid_str) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-type') self.assertEqual(exc.params['expected_type'], 'bytes') self.assertEqual(exc.params['type'], 'str') - @istest - def validate_datetime(self): + def test_validate_datetime(self): self.assertTrue(simple.validate_datetime(self.valid_datetime)) self.assertTrue(simple.validate_datetime(self.valid_int)) self.assertTrue(simple.validate_datetime(self.valid_real)) - @istest - def validate_datetime_invalid_type(self): + def test_validate_datetime_invalid_type(self): with self.assertRaises(ValidationError) as cm: simple.validate_datetime(self.valid_str) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-type') self.assertEqual(exc.params['expected_type'], 'one of datetime, Real') self.assertEqual(exc.params['type'], 'str') - @istest - def validate_datetime_invalide_tz(self): + def test_validate_datetime_invalide_tz(self): with self.assertRaises(ValidationError) as cm: simple.validate_datetime(self.invalid_datetime_notz) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'datetime-without-tzinfo') - @istest - def validate_enum(self): + def test_validate_enum(self): for value in self.enum_values: self.assertTrue(simple.validate_enum(value, self.enum_values)) - @istest - def validate_enum_invalid_value(self): + def test_validate_enum_invalid_value(self): with self.assertRaises(ValidationError) as cm: simple.validate_enum(self.invalid_enum_value, self.enum_values) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-value') self.assertEqual(exc.params['value'], self.invalid_enum_value) self.assertEqual(exc.params['expected_values'], ', '.join(sorted(self.enum_values))) diff --git a/swh/model/tests/test_cli.py b/swh/model/tests/test_cli.py index 9e31a4a..5d20874 100644 --- a/swh/model/tests/test_cli.py +++ b/swh/model/tests/test_cli.py @@ -1,116 +1,116 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import tempfile import unittest from click.testing import CliRunner from nose.plugins.attrib import attr from swh.model import cli -from swh.model.tests.test_from_disk import DataMixin from swh.model.hashutil import hash_to_hex +from swh.model.tests.test_from_disk import DataMixin @attr('fs') class TestIdentify(DataMixin, unittest.TestCase): def setUp(self): super().setUp() self.runner = CliRunner() - def assertPidOK(self, result, pid): + def assertPidOK(self, result, pid): # noqa: N802 self.assertEqual(result.exit_code, 0) self.assertEqual(result.output.split()[0], pid) def test_content_id(self): """identify file content""" self.make_contents(self.tmpdir_name) for filename, content in self.contents.items(): path = os.path.join(self.tmpdir_name, filename) result = self.runner.invoke(cli.identify, ['--type', 'content', path]) self.assertPidOK(result, 'swh:1:cnt:' + hash_to_hex(content['sha1_git'])) def test_directory_id(self): """identify an entire directory""" self.make_from_tarball(self.tmpdir_name) path = os.path.join(self.tmpdir_name, b'sample-folder') result = self.runner.invoke(cli.identify, ['--type', 'directory', path]) self.assertPidOK(result, 'swh:1:dir:e8b0f1466af8608c8a3fb9879db172b887e80759') def test_symlink(self): """identify symlink --- both itself and target""" regular = os.path.join(self.tmpdir_name, b'foo.txt') link = os.path.join(self.tmpdir_name, b'bar.txt') open(regular, 'w').write('foo\n') os.symlink(os.path.basename(regular), link) result = self.runner.invoke(cli.identify, [link]) self.assertPidOK(result, 'swh:1:cnt:257cc5642cb1a054f08cc83f2d943e56fd3ebe99') result = self.runner.invoke(cli.identify, ['--no-dereference', link]) self.assertPidOK(result, 'swh:1:cnt:996f1789ff67c0e3f69ef5933a55d54c5d0e9954') def test_show_filename(self): """filename is shown by default""" self.make_contents(self.tmpdir_name) for filename, content in self.contents.items(): path = os.path.join(self.tmpdir_name, filename) result = self.runner.invoke(cli.identify, ['--type', 'content', path]) self.assertEqual(result.exit_code, 0) self.assertEqual(result.output.rstrip(), 'swh:1:cnt:%s\t%s' % (hash_to_hex(content['sha1_git']), path.decode())) def test_hide_filename(self): """filename is hidden upon request""" self.make_contents(self.tmpdir_name) for filename, content in self.contents.items(): path = os.path.join(self.tmpdir_name, filename) result = self.runner.invoke(cli.identify, ['--type', 'content', '--no-filename', path]) self.assertPidOK(result, 'swh:1:cnt:' + hash_to_hex(content['sha1_git'])) def test_auto_id(self): """automatic object type: file or directory, depending on argument""" with tempfile.NamedTemporaryFile(prefix='swh.model.cli') as f: result = self.runner.invoke(cli.identify, [f.name]) self.assertEqual(result.exit_code, 0) self.assertRegex(result.output, r'^swh:\d+:cnt:') with tempfile.TemporaryDirectory(prefix='swh.model.cli') as dirname: result = self.runner.invoke(cli.identify, [dirname]) self.assertEqual(result.exit_code, 0) self.assertRegex(result.output, r'^swh:\d+:dir:') def test_verify_content(self): """identifier verification""" self.make_contents(self.tmpdir_name) for filename, content in self.contents.items(): expected_id = 'swh:1:cnt:' + hash_to_hex(content['sha1_git']) # match path = os.path.join(self.tmpdir_name, filename) result = self.runner.invoke(cli.identify, ['--verify', expected_id, path]) self.assertEqual(result.exit_code, 0) # mismatch with open(path, 'a') as f: f.write('trailing garbage to make verification fail') result = self.runner.invoke(cli.identify, ['--verify', expected_id, path]) self.assertEqual(result.exit_code, 1) diff --git a/swh/model/tests/test_from_disk.py b/swh/model/tests/test_from_disk.py index 432b193..2e6c395 100644 --- a/swh/model/tests/test_from_disk.py +++ b/swh/model/tests/test_from_disk.py @@ -1,788 +1,787 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import tarfile import tempfile import unittest from nose.plugins.attrib import attr from swh.model import from_disk -from swh.model.from_disk import Content, Directory, DentryPerms +from swh.model.from_disk import Content, DentryPerms, Directory from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex - TEST_DATA = os.path.join(os.path.dirname(__file__), 'data') class ModeToPerms(unittest.TestCase): def setUp(self): super().setUp() # Generate a full permissions map self.perms_map = {} # Symlinks for i in range(0o120000, 0o127777 + 1): self.perms_map[i] = DentryPerms.symlink # Directories for i in range(0o040000, 0o047777 + 1): self.perms_map[i] = DentryPerms.directory # Other file types: socket, regular file, block device, character # device, fifo all map to regular files for ft in [0o140000, 0o100000, 0o060000, 0o020000, 0o010000]: for i in range(ft, ft + 0o7777 + 1): if i & 0o111: # executable bits are set self.perms_map[i] = DentryPerms.executable_content else: self.perms_map[i] = DentryPerms.content def test_exhaustive_mode_to_perms(self): for fmode, perm in self.perms_map.items(): self.assertEqual(perm, from_disk.mode_to_perms(fmode)) class DataMixin: maxDiff = None def setUp(self): self.tmpdir = tempfile.TemporaryDirectory( prefix='swh.model.from_disk' ) self.tmpdir_name = os.fsencode(self.tmpdir.name) self.contents = { b'file': { 'data': b'42\n', 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689' ), 'sha256': hash_to_bytes( '084c799cd551dd1d8d5c5f9a5d593b2e' '931f5e36122ee5c793c1d08a19839cc0' ), 'sha1_git': hash_to_bytes( 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'blake2s256': hash_to_bytes( 'd5fe1939576527e42cfd76a9455a2432' 'fe7f56669564577dd93c4280e76d661d' ), 'length': 3, 'mode': 0o100644 }, } self.symlinks = { b'symlink': { 'data': b'target', 'blake2s256': hash_to_bytes( '595d221b30fdd8e10e2fdf18376e688e' '9f18d56fd9b6d1eb6a822f8c146c6da6' ), 'sha1': hash_to_bytes( '0e8a3ad980ec179856012b7eecf4327e99cd44cd' ), 'sha1_git': hash_to_bytes( '1de565933b05f74c75ff9a6520af5f9f8a5a2f1d' ), 'sha256': hash_to_bytes( '34a04005bcaf206eec990bd9637d9fdb' '6725e0a0c0d4aebf003f17f4c956eb5c' ), 'length': 6, 'perms': DentryPerms.symlink, } } self.specials = { b'fifo': os.mkfifo, b'devnull': lambda path: os.mknod(path, device=os.makedev(1, 3)), } self.empty_content = { 'data': b'', 'length': 0, 'blake2s256': hash_to_bytes( '69217a3079908094e11121d042354a7c' '1f55b6482ca1a51e1b250dfd1ed0eef9' ), 'sha1': hash_to_bytes( 'da39a3ee5e6b4b0d3255bfef95601890afd80709' ), 'sha1_git': hash_to_bytes( 'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391' ), 'sha256': hash_to_bytes( 'e3b0c44298fc1c149afbf4c8996fb924' '27ae41e4649b934ca495991b7852b855' ), 'perms': DentryPerms.content, } self.empty_directory = { 'id': hash_to_bytes( '4b825dc642cb6eb9a060e54bf8d69288fbee4904' ), 'entries': [], } # Generated with generate_testdata_from_disk self.tarball_contents = { b'': { 'entries': [{ 'name': b'bar', 'perms': DentryPerms.directory, 'target': hash_to_bytes( '3c1f578394f4623f74a0ba7fe761729f59fc6ec4' ), 'type': 'dir', }, { 'name': b'empty-folder', 'perms': DentryPerms.directory, 'target': hash_to_bytes( '4b825dc642cb6eb9a060e54bf8d69288fbee4904' ), 'type': 'dir', }, { 'name': b'foo', 'perms': DentryPerms.directory, 'target': hash_to_bytes( '2b41c40f0d1fbffcba12497db71fba83fcca96e5' ), 'type': 'dir', }, { 'name': b'link-to-another-quote', 'perms': DentryPerms.symlink, 'target': hash_to_bytes( '7d5c08111e21c8a9f71540939998551683375fad' ), 'type': 'file', }, { 'name': b'link-to-binary', 'perms': DentryPerms.symlink, 'target': hash_to_bytes( 'e86b45e538d9b6888c969c89fbd22a85aa0e0366' ), 'type': 'file', }, { 'name': b'link-to-foo', 'perms': DentryPerms.symlink, 'target': hash_to_bytes( '19102815663d23f8b75a47e7a01965dcdc96468c' ), 'type': 'file', }, { 'name': b'some-binary', 'perms': DentryPerms.executable_content, 'target': hash_to_bytes( '68769579c3eaadbe555379b9c3538e6628bae1eb' ), 'type': 'file', }], 'id': hash_to_bytes( 'e8b0f1466af8608c8a3fb9879db172b887e80759' ), }, b'bar': { 'entries': [{ 'name': b'barfoo', 'perms': DentryPerms.directory, 'target': hash_to_bytes( 'c3020f6bf135a38c6df3afeb5fb38232c5e07087' ), 'type': 'dir', }], 'id': hash_to_bytes( '3c1f578394f4623f74a0ba7fe761729f59fc6ec4' ), }, b'bar/barfoo': { 'entries': [{ 'name': b'another-quote.org', 'perms': DentryPerms.content, 'target': hash_to_bytes( '133693b125bad2b4ac318535b84901ebb1f6b638' ), 'type': 'file', }], 'id': hash_to_bytes( 'c3020f6bf135a38c6df3afeb5fb38232c5e07087' ), }, b'bar/barfoo/another-quote.org': { 'blake2s256': hash_to_bytes( 'd26c1cad82d43df0bffa5e7be11a60e3' '4adb85a218b433cbce5278b10b954fe8' ), 'length': 72, 'perms': DentryPerms.content, 'sha1': hash_to_bytes( '90a6138ba59915261e179948386aa1cc2aa9220a' ), 'sha1_git': hash_to_bytes( '133693b125bad2b4ac318535b84901ebb1f6b638' ), 'sha256': hash_to_bytes( '3db5ae168055bcd93a4d08285dc99ffe' 'e2883303b23fac5eab850273a8ea5546' ), }, b'empty-folder': { 'entries': [], 'id': hash_to_bytes( '4b825dc642cb6eb9a060e54bf8d69288fbee4904' ), }, b'foo': { 'entries': [{ 'name': b'barfoo', 'perms': DentryPerms.symlink, 'target': hash_to_bytes( '8185dfb2c0c2c597d16f75a8a0c37668567c3d7e' ), 'type': 'file', }, { 'name': b'quotes.md', 'perms': DentryPerms.content, 'target': hash_to_bytes( '7c4c57ba9ff496ad179b8f65b1d286edbda34c9a' ), 'type': 'file', }, { 'name': b'rel-link-to-barfoo', 'perms': DentryPerms.symlink, 'target': hash_to_bytes( 'acac326ddd63b0bc70840659d4ac43619484e69f' ), 'type': 'file', }], 'id': hash_to_bytes( '2b41c40f0d1fbffcba12497db71fba83fcca96e5' ), }, b'foo/barfoo': { 'blake2s256': hash_to_bytes( 'e1252f2caa4a72653c4efd9af871b62b' 'f2abb7bb2f1b0e95969204bd8a70d4cd' ), 'data': b'bar/barfoo', 'length': 10, 'perms': DentryPerms.symlink, 'sha1': hash_to_bytes( '9057ee6d0162506e01c4d9d5459a7add1fedac37' ), 'sha1_git': hash_to_bytes( '8185dfb2c0c2c597d16f75a8a0c37668567c3d7e' ), 'sha256': hash_to_bytes( '29ad3f5725321b940332c78e403601af' 'ff61daea85e9c80b4a7063b6887ead68' ), }, b'foo/quotes.md': { 'blake2s256': hash_to_bytes( 'bf7ce4fe304378651ee6348d3e9336ed' '5ad603d33e83c83ba4e14b46f9b8a80b' ), 'length': 66, 'perms': DentryPerms.content, 'sha1': hash_to_bytes( '1bf0bb721ac92c18a19b13c0eb3d741cbfadebfc' ), 'sha1_git': hash_to_bytes( '7c4c57ba9ff496ad179b8f65b1d286edbda34c9a' ), 'sha256': hash_to_bytes( 'caca942aeda7b308859eb56f909ec96d' '07a499491690c453f73b9800a93b1659' ), }, b'foo/rel-link-to-barfoo': { 'blake2s256': hash_to_bytes( 'd9c327421588a1cf61f316615005a2e9' 'c13ac3a4e96d43a24138d718fa0e30db' ), 'data': b'../bar/barfoo', 'length': 13, 'perms': DentryPerms.symlink, 'sha1': hash_to_bytes( 'dc51221d308f3aeb2754db48391b85687c2869f4' ), 'sha1_git': hash_to_bytes( 'acac326ddd63b0bc70840659d4ac43619484e69f' ), 'sha256': hash_to_bytes( '8007d20db2af40435f42ddef4b8ad76b' '80adbec26b249fdf0473353f8d99df08' ), }, b'link-to-another-quote': { 'blake2s256': hash_to_bytes( '2d0e73cea01ba949c1022dc10c8a43e6' '6180639662e5dc2737b843382f7b1910' ), 'data': b'bar/barfoo/another-quote.org', 'length': 28, 'perms': DentryPerms.symlink, 'sha1': hash_to_bytes( 'cbeed15e79599c90de7383f420fed7acb48ea171' ), 'sha1_git': hash_to_bytes( '7d5c08111e21c8a9f71540939998551683375fad' ), 'sha256': hash_to_bytes( 'e6e17d0793aa750a0440eb9ad5b80b25' '8076637ef0fb68f3ac2e59e4b9ac3ba6' ), }, b'link-to-binary': { 'blake2s256': hash_to_bytes( '9ce18b1adecb33f891ca36664da676e1' '2c772cc193778aac9a137b8dc5834b9b' ), 'data': b'some-binary', 'length': 11, 'perms': DentryPerms.symlink, 'sha1': hash_to_bytes( 'd0248714948b3a48a25438232a6f99f0318f59f1' ), 'sha1_git': hash_to_bytes( 'e86b45e538d9b6888c969c89fbd22a85aa0e0366' ), 'sha256': hash_to_bytes( '14126e97d83f7d261c5a6889cee73619' '770ff09e40c5498685aba745be882eff' ), }, b'link-to-foo': { 'blake2s256': hash_to_bytes( '08d6cad88075de8f192db097573d0e82' '9411cd91eb6ec65e8fc16c017edfdb74' ), 'data': b'foo', 'length': 3, 'perms': DentryPerms.symlink, 'sha1': hash_to_bytes( '0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33' ), 'sha1_git': hash_to_bytes( '19102815663d23f8b75a47e7a01965dcdc96468c' ), 'sha256': hash_to_bytes( '2c26b46b68ffc68ff99b453c1d304134' '13422d706483bfa0f98a5e886266e7ae' ), }, b'some-binary': { 'blake2s256': hash_to_bytes( '922e0f7015035212495b090c27577357' 'a740ddd77b0b9e0cd23b5480c07a18c6' ), 'length': 5, 'perms': DentryPerms.executable_content, 'sha1': hash_to_bytes( '0bbc12d7f4a2a15b143da84617d95cb223c9b23c' ), 'sha1_git': hash_to_bytes( '68769579c3eaadbe555379b9c3538e6628bae1eb' ), 'sha256': hash_to_bytes( 'bac650d34a7638bb0aeb5342646d24e3' 'b9ad6b44c9b383621faa482b990a367d' ), }, } def tearDown(self): self.tmpdir.cleanup() def assertContentEqual(self, left, right, *, check_data=False, # noqa check_path=False): if not isinstance(left, Content): raise ValueError('%s is not a Content' % left) if isinstance(right, Content): right = right.get_data() keys = DEFAULT_ALGORITHMS | { 'length', 'perms', } if check_data: keys |= {'data'} if check_path: keys |= {'path'} failed = [] for key in keys: try: lvalue = left.data[key] if key == 'perms' and 'perms' not in right: rvalue = from_disk.mode_to_perms(right['mode']) else: rvalue = right[key] except KeyError: failed.append(key) continue if lvalue != rvalue: failed.append(key) if failed: raise self.failureException( 'Content mismatched:\n' + '\n'.join( 'content[%s] = %r != %r' % ( key, left.data.get(key), right.get(key)) for key in failed ) ) def assertDirectoryEqual(self, left, right): # NoQA if not isinstance(left, Directory): raise ValueError('%s is not a Directory' % left) if isinstance(right, Directory): right = right.get_data() return self.assertCountEqual(left.entries, right['entries']) def make_contents(self, directory): for filename, content in self.contents.items(): path = os.path.join(directory, filename) with open(path, 'wb') as f: f.write(content['data']) os.chmod(path, content['mode']) def make_symlinks(self, directory): for filename, symlink in self.symlinks.items(): path = os.path.join(directory, filename) os.symlink(symlink['data'], path) def make_specials(self, directory): for filename, fn in self.specials.items(): path = os.path.join(directory, filename) fn(path) def make_from_tarball(self, directory): tarball = os.path.join(TEST_DATA, 'dir-folders', 'sample-folder.tgz') with tarfile.open(tarball, 'r:gz') as f: f.extractall(os.fsdecode(directory)) class TestContent(DataMixin, unittest.TestCase): def setUp(self): super().setUp() def test_data_to_content(self): for filename, content in self.contents.items(): conv_content = Content.from_bytes(mode=content['mode'], data=content['data']) self.assertContentEqual(conv_content, content) self.assertIn(hash_to_hex(conv_content.hash), repr(conv_content)) class SymlinkToContent(DataMixin, unittest.TestCase): def setUp(self): super().setUp() self.make_symlinks(self.tmpdir_name) def test_symlink_to_content(self): for filename, symlink in self.symlinks.items(): path = os.path.join(self.tmpdir_name, filename) perms = 0o120000 conv_content = Content.from_symlink(path=path, mode=perms) self.assertContentEqual(conv_content, symlink) class FileToContent(DataMixin, unittest.TestCase): def setUp(self): super().setUp() self.make_contents(self.tmpdir_name) self.make_symlinks(self.tmpdir_name) self.make_specials(self.tmpdir_name) def test_file_to_content(self): # Check whether loading the data works for data in [True, False]: for filename, symlink in self.symlinks.items(): path = os.path.join(self.tmpdir_name, filename) conv_content = Content.from_file(path=path, data=data) self.assertContentEqual(conv_content, symlink, check_data=data) for filename, content in self.contents.items(): path = os.path.join(self.tmpdir_name, filename) conv_content = Content.from_file(path=path, data=data) self.assertContentEqual(conv_content, content, check_data=data) for filename in self.specials: path = os.path.join(self.tmpdir_name, filename) conv_content = Content.from_file(path=path, data=data) self.assertContentEqual(conv_content, self.empty_content) def test_file_to_content_with_path(self): for filename, content in self.contents.items(): content_w_path = content.copy() path = os.path.join(self.tmpdir_name, filename) content_w_path['path'] = path conv_content = Content.from_file(path=path, save_path=True) self.assertContentEqual(conv_content, content_w_path, check_path=True) class DirectoryToObjects(DataMixin, unittest.TestCase): def setUp(self): super().setUp() contents = os.path.join(self.tmpdir_name, b'contents') os.mkdir(contents) self.make_contents(contents) symlinks = os.path.join(self.tmpdir_name, b'symlinks') os.mkdir(symlinks) self.make_symlinks(symlinks) specials = os.path.join(self.tmpdir_name, b'specials') os.mkdir(specials) self.make_specials(specials) empties = os.path.join(self.tmpdir_name, b'empty1', b'empty2') os.makedirs(empties) def test_directory_to_objects(self): directory = Directory.from_disk(path=self.tmpdir_name) for name, value in self.contents.items(): self.assertContentEqual(directory[b'contents/' + name], value) for name, value in self.symlinks.items(): self.assertContentEqual(directory[b'symlinks/' + name], value) for name in self.specials: self.assertContentEqual( directory[b'specials/' + name], self.empty_content, ) self.assertEqual( directory[b'empty1/empty2'].get_data(), self.empty_directory, ) # Raise on non existent file with self.assertRaisesRegex(KeyError, "b'nonexistent'"): directory[b'empty1/nonexistent'] # Raise on non existent directory with self.assertRaisesRegex(KeyError, "b'nonexistentdir'"): directory[b'nonexistentdir/file'] objs = directory.collect() self.assertCountEqual(['content', 'directory'], objs) self.assertEqual(len(objs['directory']), 6) self.assertEqual(len(objs['content']), len(self.contents) + len(self.symlinks) + 1) def test_directory_to_objects_ignore_empty(self): directory = Directory.from_disk( path=self.tmpdir_name, dir_filter=from_disk.ignore_empty_directories ) for name, value in self.contents.items(): self.assertContentEqual(directory[b'contents/' + name], value) for name, value in self.symlinks.items(): self.assertContentEqual(directory[b'symlinks/' + name], value) for name in self.specials: self.assertContentEqual( directory[b'specials/' + name], self.empty_content, ) # empty directories have been ignored recursively with self.assertRaisesRegex(KeyError, "b'empty1'"): directory[b'empty1'] with self.assertRaisesRegex(KeyError, "b'empty1'"): directory[b'empty1/empty2'] objs = directory.collect() self.assertCountEqual(['content', 'directory'], objs) self.assertEqual(len(objs['directory']), 4) self.assertEqual(len(objs['content']), len(self.contents) + len(self.symlinks) + 1) def test_directory_to_objects_ignore_name(self): directory = Directory.from_disk( path=self.tmpdir_name, dir_filter=from_disk.ignore_named_directories([b'symlinks']) ) for name, value in self.contents.items(): self.assertContentEqual(directory[b'contents/' + name], value) for name in self.specials: self.assertContentEqual( directory[b'specials/' + name], self.empty_content, ) self.assertEqual( directory[b'empty1/empty2'].get_data(), self.empty_directory, ) with self.assertRaisesRegex(KeyError, "b'symlinks'"): directory[b'symlinks'] objs = directory.collect() self.assertCountEqual(['content', 'directory'], objs) self.assertEqual(len(objs['directory']), 5) self.assertEqual(len(objs['content']), len(self.contents) + 1) def test_directory_to_objects_ignore_name_case(self): directory = Directory.from_disk( path=self.tmpdir_name, dir_filter=from_disk.ignore_named_directories([b'symLiNks'], case_sensitive=False) ) for name, value in self.contents.items(): self.assertContentEqual(directory[b'contents/' + name], value) for name in self.specials: self.assertContentEqual( directory[b'specials/' + name], self.empty_content, ) self.assertEqual( directory[b'empty1/empty2'].get_data(), self.empty_directory, ) with self.assertRaisesRegex(KeyError, "b'symlinks'"): directory[b'symlinks'] objs = directory.collect() self.assertCountEqual(['content', 'directory'], objs) self.assertEqual(len(objs['directory']), 5) self.assertEqual(len(objs['content']), len(self.contents) + 1) @attr('fs') class TarballTest(DataMixin, unittest.TestCase): def setUp(self): super().setUp() self.make_from_tarball(self.tmpdir_name) def test_contents_match(self): directory = Directory.from_disk( path=os.path.join(self.tmpdir_name, b'sample-folder') ) for name, data in self.tarball_contents.items(): obj = directory[name] if isinstance(obj, Content): self.assertContentEqual(obj, data) elif isinstance(obj, Directory): self.assertDirectoryEqual(obj, data) else: raise self.failureException('Unknown type for %s' % obj) class DirectoryManipulation(DataMixin, unittest.TestCase): def test_directory_access_nested(self): d = Directory() d[b'a'] = Directory() d[b'a/b'] = Directory() self.assertEqual(d[b'a/b'].get_data(), self.empty_directory) def test_directory_del_nested(self): d = Directory() d[b'a'] = Directory() d[b'a/b'] = Directory() with self.assertRaisesRegex(KeyError, "b'c'"): del d[b'a/b/c'] with self.assertRaisesRegex(KeyError, "b'level2'"): del d[b'a/level2/c'] del d[b'a/b'] self.assertEqual(d[b'a'].get_data(), self.empty_directory) def test_directory_access_self(self): d = Directory() self.assertIs(d, d[b'']) self.assertIs(d, d[b'/']) self.assertIs(d, d[b'//']) def test_directory_access_wrong_type(self): d = Directory() with self.assertRaisesRegex(ValueError, 'bytes from Directory'): d['foo'] with self.assertRaisesRegex(ValueError, 'bytes from Directory'): d[42] def test_directory_repr(self): entries = [b'a', b'b', b'c'] d = Directory() for entry in entries: d[entry] = Directory() r = repr(d) self.assertIn(hash_to_hex(d.hash), r) for entry in entries: self.assertIn(str(entry), r) def test_directory_set_wrong_type_name(self): d = Directory() with self.assertRaisesRegex(ValueError, 'bytes Directory entry'): d['foo'] = Directory() with self.assertRaisesRegex(ValueError, 'bytes Directory entry'): d[42] = Directory() def test_directory_set_nul_in_name(self): d = Directory() with self.assertRaisesRegex(ValueError, 'nul bytes'): d[b'\x00\x01'] = Directory() def test_directory_set_empty_name(self): d = Directory() with self.assertRaisesRegex(ValueError, 'must have a name'): d[b''] = Directory() with self.assertRaisesRegex(ValueError, 'must have a name'): d[b'/'] = Directory() def test_directory_set_wrong_type(self): d = Directory() with self.assertRaisesRegex(ValueError, 'Content or Directory'): d[b'entry'] = object() def test_directory_del_wrong_type(self): d = Directory() with self.assertRaisesRegex(ValueError, 'bytes Directory entry'): del d['foo'] with self.assertRaisesRegex(ValueError, 'bytes Directory entry'): del d[42] diff --git a/swh/model/tests/test_hashutil.py b/swh/model/tests/test_hashutil.py index 94a66dd..a7796ef 100644 --- a/swh/model/tests/test_hashutil.py +++ b/swh/model/tests/test_hashutil.py @@ -1,413 +1,379 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import io import os import tempfile import unittest - -from nose.tools import istest from unittest.mock import patch from swh.model import hashutil from swh.model.hashutil import MultiHash class BaseHashutil(unittest.TestCase): def setUp(self): # Reset function cache hashutil._blake2_hash_cache = {} self.data = b'1984\n' self.hex_checksums = { 'sha1': '62be35bf00ff0c624f4a621e2ea5595a049e0731', 'sha1_git': '568aaf43d83b2c3df8067f3bedbb97d83260be6d', 'sha256': '26602113b4b9afd9d55466b08580d3c2' '4a9b50ee5b5866c0d91fab0e65907311', 'blake2s256': '63cfb259e1fdb485bc5c55749697a6b21ef31fb7445f6c78a' 'c9422f9f2dc8906', } self.checksums = { type: bytes.fromhex(cksum) for type, cksum in self.hex_checksums.items() } self.bytehex_checksums = { type: hashutil.hash_to_bytehex(cksum) for type, cksum in self.checksums.items() } self.git_hex_checksums = { 'blob': self.hex_checksums['sha1_git'], 'tree': '5b2e883aa33d2efab98442693ea4dd5f1b8871b0', 'commit': '79e4093542e72f0fcb7cbd75cb7d270f9254aa8f', 'tag': 'd6bf62466f287b4d986c545890716ce058bddf67', } self.git_checksums = { type: bytes.fromhex(cksum) for type, cksum in self.git_hex_checksums.items() } class MultiHashTest(BaseHashutil): - @istest - def multi_hash_data(self): + def test_multi_hash_data(self): checksums = MultiHash.from_data(self.data).digest() self.assertEqual(checksums, self.checksums) self.assertFalse('length' in checksums) - @istest - def multi_hash_data_with_length(self): + def test_multi_hash_data_with_length(self): expected_checksums = self.checksums.copy() expected_checksums['length'] = len(self.data) algos = set(['length']).union(hashutil.DEFAULT_ALGORITHMS) checksums = MultiHash.from_data(self.data, hash_names=algos).digest() self.assertEqual(checksums, expected_checksums) self.assertTrue('length' in checksums) - @istest - def multi_hash_data_unknown_hash(self): + def test_multi_hash_data_unknown_hash(self): with self.assertRaises(ValueError) as cm: MultiHash.from_data(self.data, ['unknown-hash']) self.assertIn('Unexpected hashing algorithm', cm.exception.args[0]) self.assertIn('unknown-hash', cm.exception.args[0]) - @istest - def multi_hash_file(self): + def test_multi_hash_file(self): fobj = io.BytesIO(self.data) checksums = MultiHash.from_file(fobj, length=len(self.data)).digest() self.assertEqual(checksums, self.checksums) - @istest - def multi_hash_file_hexdigest(self): + def test_multi_hash_file_hexdigest(self): fobj = io.BytesIO(self.data) length = len(self.data) checksums = MultiHash.from_file(fobj, length=length).hexdigest() self.assertEqual(checksums, self.hex_checksums) - @istest - def multi_hash_file_bytehexdigest(self): + def test_multi_hash_file_bytehexdigest(self): fobj = io.BytesIO(self.data) length = len(self.data) checksums = MultiHash.from_file(fobj, length=length).bytehexdigest() self.assertEqual(checksums, self.bytehex_checksums) - @istest - def multi_hash_file_missing_length(self): + def test_multi_hash_file_missing_length(self): fobj = io.BytesIO(self.data) with self.assertRaises(ValueError) as cm: MultiHash.from_file(fobj, hash_names=['sha1_git']) self.assertIn('Missing length', cm.exception.args[0]) - @istest - def multi_hash_path(self): + def test_multi_hash_path(self): with tempfile.NamedTemporaryFile(delete=False) as f: f.write(self.data) hashes = MultiHash.from_path(f.name).digest() os.remove(f.name) self.assertEquals(self.checksums, hashes) class Hashutil(BaseHashutil): - @istest - def hash_data(self): + def test_hash_data(self): checksums = hashutil.hash_data(self.data) self.assertEqual(checksums, self.checksums) self.assertFalse('length' in checksums) - @istest - def hash_data_with_length(self): + def test_hash_data_with_length(self): expected_checksums = self.checksums.copy() expected_checksums['length'] = len(self.data) algos = set(['length']).union(hashutil.DEFAULT_ALGORITHMS) checksums = hashutil.hash_data(self.data, algorithms=algos) self.assertEqual(checksums, expected_checksums) self.assertTrue('length' in checksums) - @istest - def hash_data_unknown_hash(self): + def test_hash_data_unknown_hash(self): with self.assertRaises(ValueError) as cm: hashutil.hash_data(self.data, ['unknown-hash']) self.assertIn('Unexpected hashing algorithm', cm.exception.args[0]) self.assertIn('unknown-hash', cm.exception.args[0]) - @istest - def hash_git_data(self): + def test_hash_git_data(self): checksums = { git_type: hashutil.hash_git_data(self.data, git_type) for git_type in self.git_checksums } self.assertEqual(checksums, self.git_checksums) - @istest - def hash_git_data_unknown_git_type(self): + def test_hash_git_data_unknown_git_type(self): with self.assertRaises(ValueError) as cm: hashutil.hash_git_data(self.data, 'unknown-git-type') self.assertIn('Unexpected git object type', cm.exception.args[0]) self.assertIn('unknown-git-type', cm.exception.args[0]) - @istest - def hash_file(self): + def test_hash_file(self): fobj = io.BytesIO(self.data) checksums = hashutil.hash_file(fobj, length=len(self.data)) self.assertEqual(checksums, self.checksums) - @istest - def hash_file_missing_length(self): + def test_hash_file_missing_length(self): fobj = io.BytesIO(self.data) with self.assertRaises(ValueError) as cm: hashutil.hash_file(fobj, algorithms=['sha1_git']) self.assertIn('Missing length', cm.exception.args[0]) - @istest - def hash_path(self): + def test_hash_path(self): with tempfile.NamedTemporaryFile(delete=False) as f: f.write(self.data) hashes = hashutil.hash_path(f.name) os.remove(f.name) self.checksums['length'] = len(self.data) self.assertEquals(self.checksums, hashes) - @istest - def hash_to_hex(self): + def test_hash_to_hex(self): for type in self.checksums: hex = self.hex_checksums[type] hash = self.checksums[type] self.assertEquals(hashutil.hash_to_hex(hex), hex) self.assertEquals(hashutil.hash_to_hex(hash), hex) - @istest - def hash_to_bytes(self): + def test_hash_to_bytes(self): for type in self.checksums: hex = self.hex_checksums[type] hash = self.checksums[type] self.assertEquals(hashutil.hash_to_bytes(hex), hash) self.assertEquals(hashutil.hash_to_bytes(hash), hash) - @istest - def hash_to_bytehex(self): + def test_hash_to_bytehex(self): for algo in self.checksums: self.assertEqual(self.hex_checksums[algo].encode('ascii'), hashutil.hash_to_bytehex(self.checksums[algo])) - @istest - def bytehex_to_hash(self): + def test_bytehex_to_hash(self): for algo in self.checksums: self.assertEqual(self.checksums[algo], hashutil.bytehex_to_hash( self.hex_checksums[algo].encode())) - @istest - def new_hash_unsupported_hashing_algorithm(self): + def test_new_hash_unsupported_hashing_algorithm(self): try: hashutil._new_hash('blake2:10') except ValueError as e: self.assertEquals(str(e), 'Unexpected hashing algorithm blake2:10, ' 'expected one of blake2b512, blake2s256, ' 'sha1, sha1_git, sha256') @patch('hashlib.new') - @istest - def new_hash_blake2b_blake2b512_builtin(self, mock_hashlib_new): + def test_new_hash_blake2b_blake2b512_builtin(self, mock_hashlib_new): if 'blake2b512' not in hashlib.algorithms_available: self.skipTest('blake2b512 not built-in') mock_hashlib_new.return_value = sentinel = object() h = hashutil._new_hash('blake2b512') self.assertIs(h, sentinel) mock_hashlib_new.assert_called_with('blake2b512') @patch('hashlib.new') - @istest - def new_hash_blake2s_blake2s256_builtin(self, mock_hashlib_new): + def test_new_hash_blake2s_blake2s256_builtin(self, mock_hashlib_new): if 'blake2s256' not in hashlib.algorithms_available: self.skipTest('blake2s256 not built-in') mock_hashlib_new.return_value = sentinel = object() h = hashutil._new_hash('blake2s256') self.assertIs(h, sentinel) mock_hashlib_new.assert_called_with('blake2s256') - @istest - def new_hash_blake2b_builtin(self): + def test_new_hash_blake2b_builtin(self): removed_hash = False try: if 'blake2b512' in hashlib.algorithms_available: removed_hash = True hashlib.algorithms_available.remove('blake2b512') if 'blake2b' not in hashlib.algorithms_available: self.skipTest('blake2b not built in') with patch('hashlib.blake2b') as mock_blake2b: mock_blake2b.return_value = sentinel = object() h = hashutil._new_hash('blake2b512') self.assertIs(h, sentinel) mock_blake2b.assert_called_with(digest_size=512//8) finally: if removed_hash: hashlib.algorithms_available.add('blake2b512') - @istest - def new_hash_blake2s_builtin(self): + def test_new_hash_blake2s_builtin(self): removed_hash = False try: if 'blake2s256' in hashlib.algorithms_available: removed_hash = True hashlib.algorithms_available.remove('blake2s256') if 'blake2s' not in hashlib.algorithms_available: self.skipTest('blake2s not built in') with patch('hashlib.blake2s') as mock_blake2s: mock_blake2s.return_value = sentinel = object() h = hashutil._new_hash('blake2s256') self.assertIs(h, sentinel) mock_blake2s.assert_called_with(digest_size=256//8) finally: if removed_hash: hashlib.algorithms_available.add('blake2s256') - @istest - def new_hash_blake2b_pyblake2(self): + def test_new_hash_blake2b_pyblake2(self): if 'blake2b512' in hashlib.algorithms_available: self.skipTest('blake2b512 built in') if 'blake2b' in hashlib.algorithms_available: self.skipTest('blake2b built in') with patch('pyblake2.blake2b') as mock_blake2b: mock_blake2b.return_value = sentinel = object() h = hashutil._new_hash('blake2b512') self.assertIs(h, sentinel) mock_blake2b.assert_called_with(digest_size=512//8) - @istest - def new_hash_blake2s_pyblake2(self): + def test_new_hash_blake2s_pyblake2(self): if 'blake2s256' in hashlib.algorithms_available: self.skipTest('blake2s256 built in') if 'blake2s' in hashlib.algorithms_available: self.skipTest('blake2s built in') with patch('pyblake2.blake2s') as mock_blake2s: mock_blake2s.return_value = sentinel = object() h = hashutil._new_hash('blake2s256') self.assertIs(h, sentinel) mock_blake2s.assert_called_with(digest_size=256//8) class HashlibGit(unittest.TestCase): def setUp(self): self.blob_data = b'42\n' self.tree_data = b''.join([b'40000 barfoo\0', bytes.fromhex('c3020f6bf135a38c6df' '3afeb5fb38232c5e07087'), b'100644 blah\0', bytes.fromhex('63756ef0df5e4f10b6efa' '33cfe5c758749615f20'), b'100644 hello\0', bytes.fromhex('907b308167f0880fb2a' '5c0e1614bb0c7620f9dc3')]) self.commit_data = """tree 1c61f7259dcb770f46b194d941df4f08ff0a3970 author Antoine R. Dumont (@ardumont) 1444054085 +0200 committer Antoine R. Dumont (@ardumont) 1444054085 +0200 initial """.encode('utf-8') # NOQA self.tag_data = """object 24d012aaec0bc5a4d2f62c56399053d6cc72a241 type commit tag 0.0.1 tagger Antoine R. Dumont (@ardumont) 1444225145 +0200 blah """.encode('utf-8') # NOQA self.checksums = { 'blob_sha1_git': bytes.fromhex('d81cc0710eb6cf9efd5b920a8453e1' 'e07157b6cd'), 'tree_sha1_git': bytes.fromhex('ac212302c45eada382b27bfda795db' '121dacdb1c'), 'commit_sha1_git': bytes.fromhex('e960570b2e6e2798fa4cfb9af2c399' 'd629189653'), 'tag_sha1_git': bytes.fromhex('bc2b99ba469987bcf1272c189ed534' 'e9e959f120'), } - @istest - def unknown_header_type(self): + def test_unknown_header_type(self): with self.assertRaises(ValueError) as cm: hashutil.hash_git_data(b'any-data', 'some-unknown-type') self.assertIn('Unexpected git object type', cm.exception.args[0]) - @istest - def hashdata_content(self): + def test_hashdata_content(self): # when actual_hash = hashutil.hash_git_data(self.blob_data, git_type='blob') # then self.assertEqual(actual_hash, self.checksums['blob_sha1_git']) - @istest - def hashdata_tree(self): + def test_hashdata_tree(self): # when actual_hash = hashutil.hash_git_data(self.tree_data, git_type='tree') # then self.assertEqual(actual_hash, self.checksums['tree_sha1_git']) - @istest - def hashdata_revision(self): + def test_hashdata_revision(self): # when actual_hash = hashutil.hash_git_data(self.commit_data, git_type='commit') # then self.assertEqual(actual_hash, self.checksums['commit_sha1_git']) - @istest - def hashdata_tag(self): + def test_hashdata_tag(self): # when actual_hash = hashutil.hash_git_data(self.tag_data, git_type='tag') # then self.assertEqual(actual_hash, self.checksums['tag_sha1_git']) diff --git a/swh/model/tests/test_identifiers.py b/swh/model/tests/test_identifiers.py index 6658608..6496f06 100644 --- a/swh/model/tests/test_identifiers.py +++ b/swh/model/tests/test_identifiers.py @@ -1,919 +1,894 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime import unittest -from nose.tools import istest - from swh.model import hashutil, identifiers - from swh.model.exceptions import ValidationError -from swh.model.identifiers import SNAPSHOT, RELEASE, REVISION, DIRECTORY -from swh.model.identifiers import CONTENT, PERSISTENT_IDENTIFIER_TYPES -from swh.model.identifiers import PersistentId +from swh.model.identifiers import (CONTENT, DIRECTORY, + PERSISTENT_IDENTIFIER_TYPES, RELEASE, + REVISION, SNAPSHOT, PersistentId) class UtilityFunctionsIdentifier(unittest.TestCase): def setUp(self): self.str_id = 'c2e41aae41ac17bd4a650770d6ee77f62e52235b' self.bytes_id = binascii.unhexlify(self.str_id) self.bad_type_id = object() - @istest - def identifier_to_bytes(self): + def test_identifier_to_bytes(self): for id in [self.str_id, self.bytes_id]: self.assertEqual(identifiers.identifier_to_bytes(id), self.bytes_id) # wrong length with self.assertRaises(ValueError) as cm: identifiers.identifier_to_bytes(id[:-2]) self.assertIn('length', str(cm.exception)) with self.assertRaises(ValueError) as cm: identifiers.identifier_to_bytes(self.bad_type_id) self.assertIn('type', str(cm.exception)) - @istest - def identifier_to_str(self): + def test_identifier_to_str(self): for id in [self.str_id, self.bytes_id]: self.assertEqual(identifiers.identifier_to_str(id), self.str_id) # wrong length with self.assertRaises(ValueError) as cm: identifiers.identifier_to_str(id[:-2]) self.assertIn('length', str(cm.exception)) with self.assertRaises(ValueError) as cm: identifiers.identifier_to_str(self.bad_type_id) self.assertIn('type', str(cm.exception)) class UtilityFunctionsDateOffset(unittest.TestCase): def setUp(self): self.dates = { b'1448210036': { 'seconds': 1448210036, 'microseconds': 0, }, b'1448210036.002342': { 'seconds': 1448210036, 'microseconds': 2342, }, b'1448210036.12': { 'seconds': 1448210036, 'microseconds': 120000, } } self.broken_dates = [ 1448210036.12, ] self.offsets = { 0: b'+0000', -630: b'-1030', 800: b'+1320', } - @istest - def format_date(self): + def test_format_date(self): for date_repr, date in self.dates.items(): self.assertEqual(identifiers.format_date(date), date_repr) - @istest - def format_date_fail(self): + def test_format_date_fail(self): for date in self.broken_dates: with self.assertRaises(ValueError): identifiers.format_date(date) - @istest - def format_offset(self): + def test_format_offset(self): for offset, res in self.offsets.items(): self.assertEqual(identifiers.format_offset(offset), res) class ContentIdentifier(unittest.TestCase): def setUp(self): self.content = { 'status': 'visible', 'length': 5, 'data': b'1984\n', 'ctime': datetime.datetime(2015, 11, 22, 16, 33, 56, tzinfo=datetime.timezone.utc), } self.content_id = hashutil.hash_data(self.content['data']) - @istest - def content_identifier(self): + def test_content_identifier(self): self.assertEqual(identifiers.content_identifier(self.content), self.content_id) class DirectoryIdentifier(unittest.TestCase): def setUp(self): self.directory = { 'id': 'c2e41aae41ac17bd4a650770d6ee77f62e52235b', 'entries': [ { 'type': 'file', 'perms': 33188, 'name': b'README', 'target': '37ec8ea2110c0b7a32fbb0e872f6e7debbf95e21' }, { 'type': 'file', 'perms': 33188, 'name': b'Rakefile', 'target': '3bb0e8592a41ae3185ee32266c860714980dbed7' }, { 'type': 'dir', 'perms': 16384, 'name': b'app', 'target': '61e6e867f5d7ba3b40540869bc050b0c4fed9e95' }, { 'type': 'file', 'perms': 33188, 'name': b'1.megabyte', 'target': '7c2b2fbdd57d6765cdc9d84c2d7d333f11be7fb3' }, { 'type': 'dir', 'perms': 16384, 'name': b'config', 'target': '591dfe784a2e9ccc63aaba1cb68a765734310d98' }, { 'type': 'dir', 'perms': 16384, 'name': b'public', 'target': '9588bf4522c2b4648bfd1c61d175d1f88c1ad4a5' }, { 'type': 'file', 'perms': 33188, 'name': b'development.sqlite3', 'target': 'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391' }, { 'type': 'dir', 'perms': 16384, 'name': b'doc', 'target': '154705c6aa1c8ead8c99c7915373e3c44012057f' }, { 'type': 'dir', 'perms': 16384, 'name': b'db', 'target': '85f157bdc39356b7bc7de9d0099b4ced8b3b382c' }, { 'type': 'dir', 'perms': 16384, 'name': b'log', 'target': '5e3d3941c51cce73352dff89c805a304ba96fffe' }, { 'type': 'dir', 'perms': 16384, 'name': b'script', 'target': '1b278423caf176da3f3533592012502aa10f566c' }, { 'type': 'dir', 'perms': 16384, 'name': b'test', 'target': '035f0437c080bfd8711670b3e8677e686c69c763' }, { 'type': 'dir', 'perms': 16384, 'name': b'vendor', 'target': '7c0dc9ad978c1af3f9a4ce061e50f5918bd27138' }, { 'type': 'rev', 'perms': 57344, 'name': b'will_paginate', 'target': '3d531e169db92a16a9a8974f0ae6edf52e52659e' } ], } self.empty_directory = { 'id': '4b825dc642cb6eb9a060e54bf8d69288fbee4904', 'entries': [], } - @istest - def dir_identifier(self): + def test_dir_identifier(self): self.assertEqual( identifiers.directory_identifier(self.directory), self.directory['id']) - @istest - def dir_identifier_empty_directory(self): + def test_dir_identifier_empty_directory(self): self.assertEqual( identifiers.directory_identifier(self.empty_directory), self.empty_directory['id']) class RevisionIdentifier(unittest.TestCase): def setUp(self): linus_tz = datetime.timezone(datetime.timedelta(minutes=-420)) gpgsig = b'''\ -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.13 (Darwin) iQIcBAABAgAGBQJVJcYsAAoJEBiY3kIkQRNJVAUQAJ8/XQIfMqqC5oYeEFfHOPYZ L7qy46bXHVBa9Qd8zAJ2Dou3IbI2ZoF6/Et89K/UggOycMlt5FKV/9toWyuZv4Po L682wonoxX99qvVTHo6+wtnmYO7+G0f82h+qHMErxjP+I6gzRNBvRr+SfY7VlGdK wikMKOMWC5smrScSHITnOq1Ews5pe3N7qDYMzK0XVZmgDoaem4RSWMJs4My/qVLN e0CqYWq2A22GX7sXl6pjneJYQvcAXUX+CAzp24QnPSb+Q22Guj91TcxLFcHCTDdn qgqMsEyMiisoglwrCbO+D+1xq9mjN9tNFWP66SQ48mrrHYTBV5sz9eJyDfroJaLP CWgbDTgq6GzRMehHT3hXfYS5NNatjnhkNISXR7pnVP/obIi/vpWh5ll6Gd8q26z+ a/O41UzOaLTeNI365MWT4/cnXohVLRG7iVJbAbCxoQmEgsYMRc/pBAzWJtLfcB2G jdTswYL6+MUdL8sB9pZ82D+BP/YAdHe69CyTu1lk9RT2pYtI/kkfjHubXBCYEJSG +VGllBbYG6idQJpyrOYNRJyrDi9yvDJ2W+S0iQrlZrxzGBVGTB/y65S8C+2WTBcE lf1Qb5GDsQrZWgD+jtWTywOYHtCBwyCKSAXxSARMbNPeak9WPlcW/Jmu+fUcMe2x dg1KdHOa34shrKDaOVzW =od6m -----END PGP SIGNATURE-----''' self.revision = { 'id': 'bc0195aad0daa2ad5b0d76cce22b167bc3435590', 'directory': '85a74718d377195e1efd0843ba4f3260bad4fe07', 'parents': ['01e2d0627a9a6edb24c37db45db5ecb31e9de808'], 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', }, 'date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'committer': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', }, 'committer_date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'message': b'Linux 4.2-rc2\n', } self.revision_none_metadata = { 'id': 'bc0195aad0daa2ad5b0d76cce22b167bc3435590', 'directory': '85a74718d377195e1efd0843ba4f3260bad4fe07', 'parents': ['01e2d0627a9a6edb24c37db45db5ecb31e9de808'], 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', }, 'date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'committer': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', }, 'committer_date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'message': b'Linux 4.2-rc2\n', 'metadata': None, } self.synthetic_revision = { 'id': b'\xb2\xa7\xe1&\x04\x92\xe3D\xfa\xb3\xcb\xf9\x1b\xc1<\x91' b'\xe0T&\xfd', 'author': { 'name': b'Software Heritage', 'email': b'robot@softwareheritage.org', }, 'date': { 'timestamp': {'seconds': 1437047495}, 'offset': 0, 'negative_utc': False, }, 'type': 'tar', 'committer': { 'name': b'Software Heritage', 'email': b'robot@softwareheritage.org', }, 'committer_date': 1437047495, 'synthetic': True, 'parents': [None], 'message': b'synthetic revision message\n', 'directory': b'\xd1\x1f\x00\xa6\xa0\xfe\xa6\x05SA\xd2U\x84\xb5\xa9' b'e\x16\xc0\xd2\xb8', 'metadata': {'original_artifact': [ {'archive_type': 'tar', 'name': 'gcc-5.2.0.tar.bz2', 'sha1_git': '39d281aff934d44b439730057e55b055e206a586', 'sha1': 'fe3f5390949d47054b613edc36c557eb1d51c18e', 'sha256': '5f835b04b5f7dd4f4d2dc96190ec1621b8d89f' '2dc6f638f9f8bc1b1014ba8cad'}]}, } # cat commit.txt | git hash-object -t commit --stdin self.revision_with_extra_headers = { 'id': '010d34f384fa99d047cdd5e2f41e56e5c2feee45', 'directory': '85a74718d377195e1efd0843ba4f3260bad4fe07', 'parents': ['01e2d0627a9a6edb24c37db45db5ecb31e9de808'], 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', 'fullname': b'Linus Torvalds ', }, 'date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'committer': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', 'fullname': b'Linus Torvalds ', }, 'committer_date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'message': b'Linux 4.2-rc2\n', 'metadata': { 'extra_headers': [ ['svn-repo-uuid', '046f1af7-66c2-d61b-5410-ce57b7db7bff'], ['svn-revision', 10], ] } } self.revision_with_gpgsig = { 'id': '44cc742a8ca17b9c279be4cc195a93a6ef7a320e', 'directory': 'b134f9b7dc434f593c0bab696345548b37de0558', 'parents': ['689664ae944b4692724f13b709a4e4de28b54e57', 'c888305e1efbaa252d01b4e5e6b778f865a97514'], 'author': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', 'fullname': b'Jiang Xin ', }, 'date': { 'timestamp': 1428538899, 'offset': 480, }, 'committer': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', }, 'committer_date': { 'timestamp': 1428538899, 'offset': 480, }, 'metadata': { 'extra_headers': [ ['gpgsig', gpgsig], ], }, 'message': b'''Merge branch 'master' of git://github.com/alexhenrie/git-po * 'master' of git://github.com/alexhenrie/git-po: l10n: ca.po: update translation ''' } self.revision_no_message = { 'id': '4cfc623c9238fa92c832beed000ce2d003fd8333', 'directory': 'b134f9b7dc434f593c0bab696345548b37de0558', 'parents': ['689664ae944b4692724f13b709a4e4de28b54e57', 'c888305e1efbaa252d01b4e5e6b778f865a97514'], 'author': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', 'fullname': b'Jiang Xin ', }, 'date': { 'timestamp': 1428538899, 'offset': 480, }, 'committer': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', }, 'committer_date': { 'timestamp': 1428538899, 'offset': 480, }, 'message': None, } self.revision_empty_message = { 'id': '7442cd78bd3b4966921d6a7f7447417b7acb15eb', 'directory': 'b134f9b7dc434f593c0bab696345548b37de0558', 'parents': ['689664ae944b4692724f13b709a4e4de28b54e57', 'c888305e1efbaa252d01b4e5e6b778f865a97514'], 'author': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', 'fullname': b'Jiang Xin ', }, 'date': { 'timestamp': 1428538899, 'offset': 480, }, 'committer': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', }, 'committer_date': { 'timestamp': 1428538899, 'offset': 480, }, 'message': b'', } self.revision_only_fullname = { 'id': '010d34f384fa99d047cdd5e2f41e56e5c2feee45', 'directory': '85a74718d377195e1efd0843ba4f3260bad4fe07', 'parents': ['01e2d0627a9a6edb24c37db45db5ecb31e9de808'], 'author': { 'fullname': b'Linus Torvalds ', }, 'date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'committer': { 'fullname': b'Linus Torvalds ', }, 'committer_date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'message': b'Linux 4.2-rc2\n', 'metadata': { 'extra_headers': [ ['svn-repo-uuid', '046f1af7-66c2-d61b-5410-ce57b7db7bff'], ['svn-revision', 10], ] } } - @istest - def revision_identifier(self): + def test_revision_identifier(self): self.assertEqual( identifiers.revision_identifier(self.revision), identifiers.identifier_to_str(self.revision['id']), ) - @istest - def revision_identifier_none_metadata(self): + def test_revision_identifier_none_metadata(self): self.assertEqual( identifiers.revision_identifier(self.revision_none_metadata), identifiers.identifier_to_str(self.revision_none_metadata['id']), ) - @istest - def revision_identifier_synthetic(self): + def test_revision_identifier_synthetic(self): self.assertEqual( identifiers.revision_identifier(self.synthetic_revision), identifiers.identifier_to_str(self.synthetic_revision['id']), ) - @istest - def revision_identifier_with_extra_headers(self): + def test_revision_identifier_with_extra_headers(self): self.assertEqual( identifiers.revision_identifier( self.revision_with_extra_headers), identifiers.identifier_to_str( self.revision_with_extra_headers['id']), ) - @istest - def revision_identifier_with_gpgsig(self): + def test_revision_identifier_with_gpgsig(self): self.assertEqual( identifiers.revision_identifier( self.revision_with_gpgsig), identifiers.identifier_to_str( self.revision_with_gpgsig['id']), ) - @istest - def revision_identifier_no_message(self): + def test_revision_identifier_no_message(self): self.assertEqual( identifiers.revision_identifier( self.revision_no_message), identifiers.identifier_to_str( self.revision_no_message['id']), ) - @istest - def revision_identifier_empty_message(self): + def test_revision_identifier_empty_message(self): self.assertEqual( identifiers.revision_identifier( self.revision_empty_message), identifiers.identifier_to_str( self.revision_empty_message['id']), ) - @istest - def revision_identifier_only_fullname(self): + def test_revision_identifier_only_fullname(self): self.assertEqual( identifiers.revision_identifier( self.revision_only_fullname), identifiers.identifier_to_str( self.revision_only_fullname['id']), ) class ReleaseIdentifier(unittest.TestCase): def setUp(self): linus_tz = datetime.timezone(datetime.timedelta(minutes=-420)) self.release = { 'id': '2b10839e32c4c476e9d94492756bb1a3e1ec4aa8', 'target': b't\x1b"R\xa5\xe1Ml`\xa9\x13\xc7z`\x99\xab\xe7:\x85J', 'target_type': 'revision', 'name': b'v2.6.14', 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@g5.osdl.org', }, 'date': datetime.datetime(2005, 10, 27, 17, 2, 33, tzinfo=linus_tz), 'message': b'''\ Linux 2.6.14 release -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.1 (GNU/Linux) iD8DBQBDYWq6F3YsRnbiHLsRAmaeAJ9RCez0y8rOBbhSv344h86l/VVcugCeIhO1 wdLOnvj91G4wxYqrvThthbE= =7VeT -----END PGP SIGNATURE----- ''', 'synthetic': False, } self.release_no_author = { 'id': b'&y\x1a\x8b\xcf\x0em3\xf4:\xefv\x82\xbd\xb5U#mV\xde', 'target': '9ee1c939d1cb936b1f98e8d81aeffab57bae46ab', 'target_type': 'revision', 'name': b'v2.6.12', 'message': b'''\ This is the final 2.6.12 release -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.2.4 (GNU/Linux) iD8DBQBCsykyF3YsRnbiHLsRAvPNAJ482tCZwuxp/bJRz7Q98MHlN83TpACdHr37 o6X/3T+vm8K3bf3driRr34c= =sBHn -----END PGP SIGNATURE----- ''', 'synthetic': False, } self.release_no_message = { 'id': 'b6f4f446715f7d9543ef54e41b62982f0db40045', 'target': '9ee1c939d1cb936b1f98e8d81aeffab57bae46ab', 'target_type': 'revision', 'name': b'v2.6.12', 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@g5.osdl.org', }, 'date': datetime.datetime(2005, 10, 27, 17, 2, 33, tzinfo=linus_tz), 'message': None, } self.release_empty_message = { 'id': '71a0aea72444d396575dc25ac37fec87ee3c6492', 'target': '9ee1c939d1cb936b1f98e8d81aeffab57bae46ab', 'target_type': 'revision', 'name': b'v2.6.12', 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@g5.osdl.org', }, 'date': datetime.datetime(2005, 10, 27, 17, 2, 33, tzinfo=linus_tz), 'message': b'', } self.release_negative_utc = { 'id': '97c8d2573a001f88e72d75f596cf86b12b82fd01', 'name': b'20081029', 'target': '54e9abca4c77421e2921f5f156c9fe4a9f7441c7', 'target_type': 'revision', 'date': { 'timestamp': {'seconds': 1225281976}, 'offset': 0, 'negative_utc': True, }, 'author': { 'name': b'Otavio Salvador', 'email': b'otavio@debian.org', 'id': 17640, }, 'synthetic': False, 'message': b'tagging version 20081029\n\nr56558\n', } self.release_newline_in_author = { 'author': { 'email': b'esycat@gmail.com', 'fullname': b'Eugene Janusov\n', 'name': b'Eugene Janusov\n', }, 'date': { 'negative_utc': None, 'offset': 600, 'timestamp': { 'microseconds': 0, 'seconds': 1377480558, }, }, 'id': b'\\\x98\xf5Y\xd04\x16-\xe2->\xbe\xb9T3\xe6\xf8\x88R1', 'message': b'Release of v0.3.2.', 'name': b'0.3.2', 'synthetic': False, 'target': (b'\xc0j\xa3\xd9;x\xa2\x86\\I5\x17' b'\x000\xf8\xc2\xd79o\xd3'), 'target_type': 'revision', } - @istest - def release_identifier(self): + def test_release_identifier(self): self.assertEqual( identifiers.release_identifier(self.release), identifiers.identifier_to_str(self.release['id']) ) - @istest - def release_identifier_no_author(self): + def test_release_identifier_no_author(self): self.assertEqual( identifiers.release_identifier(self.release_no_author), identifiers.identifier_to_str(self.release_no_author['id']) ) - @istest - def release_identifier_no_message(self): + def test_release_identifier_no_message(self): self.assertEqual( identifiers.release_identifier(self.release_no_message), identifiers.identifier_to_str(self.release_no_message['id']) ) - @istest - def release_identifier_empty_message(self): + def test_release_identifier_empty_message(self): self.assertEqual( identifiers.release_identifier(self.release_empty_message), identifiers.identifier_to_str(self.release_empty_message['id']) ) - @istest - def release_identifier_negative_utc(self): + def test_release_identifier_negative_utc(self): self.assertEqual( identifiers.release_identifier(self.release_negative_utc), identifiers.identifier_to_str(self.release_negative_utc['id']) ) - @istest - def release_identifier_newline_in_author(self): + def test_release_identifier_newline_in_author(self): self.assertEqual( identifiers.release_identifier(self.release_newline_in_author), identifiers.identifier_to_str(self.release_newline_in_author['id']) ) class SnapshotIdentifier(unittest.TestCase): def setUp(self): super().setUp() self.empty = { 'id': '1a8893e6a86f444e8be8e7bda6cb34fb1735a00e', 'branches': {}, } self.dangling_branch = { 'id': 'c84502e821eb21ed84e9fd3ec40973abc8b32353', 'branches': { b'HEAD': None, }, } self.unresolved = { 'id': '84b4548ea486e4b0a7933fa541ff1503a0afe1e0', 'branches': { b'foo': { 'target': b'bar', 'target_type': 'alias', }, }, } self.all_types = { 'id': '6e65b86363953b780d92b0a928f3e8fcdd10db36', 'branches': { b'directory': { 'target': '1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8', 'target_type': 'directory', }, b'content': { 'target': 'fe95a46679d128ff167b7c55df5d02356c5a1ae1', 'target_type': 'content', }, b'alias': { 'target': b'revision', 'target_type': 'alias', }, b'revision': { 'target': 'aafb16d69fd30ff58afdd69036a26047f3aebdc6', 'target_type': 'revision', }, b'release': { 'target': '7045404f3d1c54e6473c71bbb716529fbad4be24', 'target_type': 'release', }, b'snapshot': { 'target': '1a8893e6a86f444e8be8e7bda6cb34fb1735a00e', 'target_type': 'snapshot', }, b'dangling': None, } } def test_empty_snapshot(self): self.assertEqual( identifiers.snapshot_identifier(self.empty), identifiers.identifier_to_str(self.empty['id']), ) def test_dangling_branch(self): self.assertEqual( identifiers.snapshot_identifier(self.dangling_branch), identifiers.identifier_to_str(self.dangling_branch['id']), ) def test_unresolved(self): with self.assertRaisesRegex(ValueError, "b'foo' -> b'bar'"): identifiers.snapshot_identifier(self.unresolved) def test_unresolved_force(self): self.assertEqual( identifiers.snapshot_identifier( self.unresolved, ignore_unresolved=True, ), identifiers.identifier_to_str(self.unresolved['id']), ) def test_all_types(self): self.assertEqual( identifiers.snapshot_identifier(self.all_types), identifiers.identifier_to_str(self.all_types['id']), ) def test_persistent_identifier(self): _snapshot_id = hashutil.hash_to_bytes( 'c7c108084bc0bf3d81436bf980b46e98bd338453') _release_id = '22ece559cc7cc2364edc5e5593d63ae8bd229f9f' _revision_id = '309cf2674ee7a0749978cf8265ab91a60aea0f7d' _directory_id = 'd198bc9d7a6bcf6db04f476d29314f157507d505' _content_id = '94a9ed024d3859793618152ea559a168bbcbb5e2' _snapshot = {'id': _snapshot_id} _release = {'id': _release_id} _revision = {'id': _revision_id} _directory = {'id': _directory_id} _content = {'sha1_git': _content_id} for full_type, _hash, expected_persistent_id, version, _meta in [ (SNAPSHOT, _snapshot_id, 'swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453', None, {}), (RELEASE, _release_id, 'swh:2:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f', 2, {}), (REVISION, _revision_id, 'swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d', None, {}), (DIRECTORY, _directory_id, 'swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505', None, {}), (CONTENT, _content_id, 'swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2', 1, {}), (SNAPSHOT, _snapshot, 'swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453', None, {}), (RELEASE, _release, 'swh:2:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f', 2, {}), (REVISION, _revision, 'swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d', None, {}), (DIRECTORY, _directory, 'swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505', None, {}), (CONTENT, _content, 'swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2', 1, {}), (CONTENT, _content, 'swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2;origin=1', 1, {'origin': '1'}), ]: if version: actual_value = identifiers.persistent_identifier( full_type, _hash, version, metadata=_meta) else: actual_value = identifiers.persistent_identifier( full_type, _hash, metadata=_meta) self.assertEquals(actual_value, expected_persistent_id) def test_persistent_identifier_wrong_input(self): _snapshot_id = 'notahash4bc0bf3d81436bf980b46e98bd338453' _snapshot = {'id': _snapshot_id} for _type, _hash, _error in [ (SNAPSHOT, _snapshot_id, 'Unexpected characters'), (SNAPSHOT, _snapshot, 'Unexpected characters'), ('foo', '', 'Wrong input: Supported types are'), ]: with self.assertRaisesRegex(ValidationError, _error): identifiers.persistent_identifier(_type, _hash) def test_parse_persistent_identifier(self): for pid, _type, _version, _hash in [ ('swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2', CONTENT, 1, '94a9ed024d3859793618152ea559a168bbcbb5e2'), ('swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505', DIRECTORY, 1, 'd198bc9d7a6bcf6db04f476d29314f157507d505'), ('swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d', REVISION, 1, '309cf2674ee7a0749978cf8265ab91a60aea0f7d'), ('swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f', RELEASE, 1, '22ece559cc7cc2364edc5e5593d63ae8bd229f9f'), ('swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453', SNAPSHOT, 1, 'c7c108084bc0bf3d81436bf980b46e98bd338453'), ]: expected_result = PersistentId( namespace='swh', scheme_version=_version, object_type=_type, object_id=_hash, metadata={} ) actual_result = identifiers.parse_persistent_identifier(pid) self.assertEquals(actual_result, expected_result) for pid, _type, _version, _hash, _metadata in [ ('swh:1:cnt:9c95815d9e9d91b8dae8e05d8bbc696fe19f796b;lines=1-18;origin=https://github.com/python/cpython', # noqa CONTENT, 1, '9c95815d9e9d91b8dae8e05d8bbc696fe19f796b', { 'lines': '1-18', 'origin': 'https://github.com/python/cpython' }), ('swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;origin=deb://Debian/packages/linuxdoc-tools', # noqa DIRECTORY, 1, '0b6959356d30f1a4e9b7f6bca59b9a336464c03d', { 'origin': 'deb://Debian/packages/linuxdoc-tools' }) ]: expected_result = PersistentId( namespace='swh', scheme_version=_version, object_type=_type, object_id=_hash, metadata=_metadata ) actual_result = identifiers.parse_persistent_identifier(pid) self.assertEquals(actual_result, expected_result) def test_parse_persistent_identifier_parsing_error(self): for pid, _error in [ ('swh:1:cnt', 'Wrong format: There should be 4 mandatory values'), ('swh:1:', 'Wrong format: There should be 4 mandatory values'), ('swh:', 'Wrong format: There should be 4 mandatory values'), ('swh:1:cnt:', 'Wrong format: Identifier should be present'), ('foo:1:cnt:abc8bc9d7a6bcf6db04f476d29314f157507d505', 'Wrong format: Supported namespace is \'swh\''), ('swh:2:dir:def8bc9d7a6bcf6db04f476d29314f157507d505', 'Wrong format: Supported version is 1'), ('swh:1:foo:fed8bc9d7a6bcf6db04f476d29314f157507d505', 'Wrong format: Supported types are %s' % ( ', '.join(PERSISTENT_IDENTIFIER_TYPES))), ('swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;invalid;' 'malformed', 'Contextual data is badly formatted, form key=val expected'), ('swh:1:snp:gh6959356d30f1a4e9b7f6bca59b9a336464c03d', 'Wrong format: Identifier should be a valid hash'), ('swh:1:snp:foo', 'Wrong format: Identifier should be a valid hash') ]: with self.assertRaisesRegex( ValidationError, _error): identifiers.parse_persistent_identifier(pid) diff --git a/swh/model/tests/test_toposort.py b/swh/model/tests/test_toposort.py index 66a8ee1..174368f 100644 --- a/swh/model/tests/test_toposort.py +++ b/swh/model/tests/test_toposort.py @@ -1,99 +1,100 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest + from swh.model.toposort import toposort def is_toposorted_slow(revision_log): """Check (inefficiently) that the given revision log is in any topological order. Complexity: O(n^2). (Note: It's totally possible to write a O(n) is_toposorted function, but it requires computing the transitive closure of the input DAG, which requires computing a topological ordering of that DAG, which kind of defeats the purpose of writing unit tests for toposort().) Args: revision_log: Revision log as returned by swh.storage.Storage.revision_log(). Returns: True if the revision log is topologically sorted. """ rev_by_id = {r['id']: r for r in revision_log} def all_parents(revision): for parent in revision['parents']: yield parent yield from all_parents(rev_by_id[parent]) visited = set() for rev in revision_log: visited.add(rev['id']) if not all(parent in visited for parent in all_parents(rev)): return False return True class TestToposort(unittest.TestCase): def generate_log(self, graph): for node_id, parents in graph.items(): yield {'id': node_id, 'parents': tuple(parents)} def unordered_log(self, log): return {(d['id'], tuple(d['parents'])) for d in log} def check(self, graph): log = list(self.generate_log(graph)) topolog = list(toposort(log)) self.assertEqual(len(topolog), len(graph)) self.assertEqual(self.unordered_log(topolog), self.unordered_log(log)) self.assertTrue(is_toposorted_slow(toposort(log))) def test_linked_list(self): self.check({3: [2], 2: [1], 1: []}) def test_fork(self): self.check({7: [6], 6: [4], 5: [3], 4: [2], 3: [2], 2: [1], 1: []}) def test_fork_merge(self): self.check({8: [7, 5], 7: [6], 6: [4], 5: [3], 4: [2], 3: [2], 2: [1], 1: []}) def test_two_origins(self): self.check({9: [8], 8: [7, 5], 7: [6], 6: [4], 5: [3], 4: [], 3: []}) def test_three_way(self): self.check({9: [8, 4, 2], 8: [7, 5], 7: [6], 6: [4], 5: [3], 4: [2], 3: [2], 2: [1], 1: []}) diff --git a/swh/model/tests/test_validators.py b/swh/model/tests/test_validators.py index 60a1de4..cb9a8d4 100644 --- a/swh/model/tests/test_validators.py +++ b/swh/model/tests/test_validators.py @@ -1,75 +1,71 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest -from nose.tools import istest - -from swh.model import validators, hashutil, exceptions +from swh.model import exceptions, hashutil, validators class TestValidators(unittest.TestCase): def setUp(self): self.valid_visible_content = { 'status': 'visible', 'length': 5, 'data': b'1984\n', 'ctime': datetime.datetime(2015, 11, 22, 16, 33, 56, tzinfo=datetime.timezone.utc), } self.valid_visible_content.update( hashutil.hash_data(self.valid_visible_content['data'])) self.valid_absent_content = { 'status': 'absent', 'length': 5, 'ctime': datetime.datetime(2015, 11, 22, 16, 33, 56, tzinfo=datetime.timezone.utc), 'reason': 'Content too large', 'sha1_git': self.valid_visible_content['sha1_git'], 'origin': 42, } self.invalid_content_hash_mismatch = self.valid_visible_content.copy() self.invalid_content_hash_mismatch.update( hashutil.hash_data(b"this is not the data you're looking for")) - @istest - def validate_content(self): + def test_validate_content(self): self.assertTrue( validators.validate_content(self.valid_visible_content)) self.assertTrue( validators.validate_content(self.valid_absent_content)) - @istest - def validate_content_hash_mismatch(self): + def test_validate_content_hash_mismatch(self): with self.assertRaises(exceptions.ValidationError) as cm: validators.validate_content(self.invalid_content_hash_mismatch) # All the hashes are wrong. The exception should be of the form: # ValidationError({ # NON_FIELD_ERRORS: [ # ValidationError('content-hash-mismatch', 'sha1'), # ValidationError('content-hash-mismatch', 'sha1_git'), # ValidationError('content-hash-mismatch', 'sha256'), # ] # }) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEquals(set(exc.error_dict.keys()), {exceptions.NON_FIELD_ERRORS}) hash_mismatches = exc.error_dict[exceptions.NON_FIELD_ERRORS] self.assertIsInstance(hash_mismatches, list) self.assertEqual(len(hash_mismatches), 4) self.assertTrue(all(mismatch.code == 'content-hash-mismatch' for mismatch in hash_mismatches)) self.assertEqual(set(mismatch.params['hash'] for mismatch in hash_mismatches), {'sha1', 'sha1_git', 'sha256', 'blake2s256'})