Changeset View
Changeset View
Standalone View
Standalone View
swh/model/tests/test_hashutil.py
# Copyright (C) 2015-2018 The Software Heritage developers | # Copyright (C) 2015-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import hashlib | import hashlib | ||||
import io | import io | ||||
import os | import os | ||||
import tempfile | import tempfile | ||||
import unittest | import unittest | ||||
from nose.tools import istest | |||||
from unittest.mock import patch | from unittest.mock import patch | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.hashutil import MultiHash | from swh.model.hashutil import MultiHash | ||||
class BaseHashutil(unittest.TestCase): | class BaseHashutil(unittest.TestCase): | ||||
def setUp(self): | def setUp(self): | ||||
Show All 29 Lines | def setUp(self): | ||||
self.git_checksums = { | self.git_checksums = { | ||||
type: bytes.fromhex(cksum) | type: bytes.fromhex(cksum) | ||||
for type, cksum in self.git_hex_checksums.items() | for type, cksum in self.git_hex_checksums.items() | ||||
} | } | ||||
class MultiHashTest(BaseHashutil): | class MultiHashTest(BaseHashutil): | ||||
@istest | def test_multi_hash_data(self): | ||||
def multi_hash_data(self): | |||||
checksums = MultiHash.from_data(self.data).digest() | checksums = MultiHash.from_data(self.data).digest() | ||||
self.assertEqual(checksums, self.checksums) | self.assertEqual(checksums, self.checksums) | ||||
self.assertFalse('length' in checksums) | self.assertFalse('length' in checksums) | ||||
@istest | def test_multi_hash_data_with_length(self): | ||||
def multi_hash_data_with_length(self): | |||||
expected_checksums = self.checksums.copy() | expected_checksums = self.checksums.copy() | ||||
expected_checksums['length'] = len(self.data) | expected_checksums['length'] = len(self.data) | ||||
algos = set(['length']).union(hashutil.DEFAULT_ALGORITHMS) | algos = set(['length']).union(hashutil.DEFAULT_ALGORITHMS) | ||||
checksums = MultiHash.from_data(self.data, hash_names=algos).digest() | checksums = MultiHash.from_data(self.data, hash_names=algos).digest() | ||||
self.assertEqual(checksums, expected_checksums) | self.assertEqual(checksums, expected_checksums) | ||||
self.assertTrue('length' in checksums) | self.assertTrue('length' in checksums) | ||||
@istest | def test_multi_hash_data_unknown_hash(self): | ||||
def multi_hash_data_unknown_hash(self): | |||||
with self.assertRaises(ValueError) as cm: | with self.assertRaises(ValueError) as cm: | ||||
MultiHash.from_data(self.data, ['unknown-hash']) | MultiHash.from_data(self.data, ['unknown-hash']) | ||||
self.assertIn('Unexpected hashing algorithm', cm.exception.args[0]) | self.assertIn('Unexpected hashing algorithm', cm.exception.args[0]) | ||||
self.assertIn('unknown-hash', cm.exception.args[0]) | self.assertIn('unknown-hash', cm.exception.args[0]) | ||||
@istest | def test_multi_hash_file(self): | ||||
def multi_hash_file(self): | |||||
fobj = io.BytesIO(self.data) | fobj = io.BytesIO(self.data) | ||||
checksums = MultiHash.from_file(fobj, length=len(self.data)).digest() | checksums = MultiHash.from_file(fobj, length=len(self.data)).digest() | ||||
self.assertEqual(checksums, self.checksums) | self.assertEqual(checksums, self.checksums) | ||||
@istest | def test_multi_hash_file_hexdigest(self): | ||||
def multi_hash_file_hexdigest(self): | |||||
fobj = io.BytesIO(self.data) | fobj = io.BytesIO(self.data) | ||||
length = len(self.data) | length = len(self.data) | ||||
checksums = MultiHash.from_file(fobj, length=length).hexdigest() | checksums = MultiHash.from_file(fobj, length=length).hexdigest() | ||||
self.assertEqual(checksums, self.hex_checksums) | self.assertEqual(checksums, self.hex_checksums) | ||||
@istest | def test_multi_hash_file_bytehexdigest(self): | ||||
def multi_hash_file_bytehexdigest(self): | |||||
fobj = io.BytesIO(self.data) | fobj = io.BytesIO(self.data) | ||||
length = len(self.data) | length = len(self.data) | ||||
checksums = MultiHash.from_file(fobj, length=length).bytehexdigest() | checksums = MultiHash.from_file(fobj, length=length).bytehexdigest() | ||||
self.assertEqual(checksums, self.bytehex_checksums) | self.assertEqual(checksums, self.bytehex_checksums) | ||||
@istest | def test_multi_hash_file_missing_length(self): | ||||
def multi_hash_file_missing_length(self): | |||||
fobj = io.BytesIO(self.data) | fobj = io.BytesIO(self.data) | ||||
with self.assertRaises(ValueError) as cm: | with self.assertRaises(ValueError) as cm: | ||||
MultiHash.from_file(fobj, hash_names=['sha1_git']) | MultiHash.from_file(fobj, hash_names=['sha1_git']) | ||||
self.assertIn('Missing length', cm.exception.args[0]) | self.assertIn('Missing length', cm.exception.args[0]) | ||||
@istest | def test_multi_hash_path(self): | ||||
def multi_hash_path(self): | |||||
with tempfile.NamedTemporaryFile(delete=False) as f: | with tempfile.NamedTemporaryFile(delete=False) as f: | ||||
f.write(self.data) | f.write(self.data) | ||||
hashes = MultiHash.from_path(f.name).digest() | hashes = MultiHash.from_path(f.name).digest() | ||||
os.remove(f.name) | os.remove(f.name) | ||||
self.assertEquals(self.checksums, hashes) | self.assertEquals(self.checksums, hashes) | ||||
class Hashutil(BaseHashutil): | class Hashutil(BaseHashutil): | ||||
@istest | def test_hash_data(self): | ||||
def hash_data(self): | |||||
checksums = hashutil.hash_data(self.data) | checksums = hashutil.hash_data(self.data) | ||||
self.assertEqual(checksums, self.checksums) | self.assertEqual(checksums, self.checksums) | ||||
self.assertFalse('length' in checksums) | self.assertFalse('length' in checksums) | ||||
@istest | def test_hash_data_with_length(self): | ||||
def hash_data_with_length(self): | |||||
expected_checksums = self.checksums.copy() | expected_checksums = self.checksums.copy() | ||||
expected_checksums['length'] = len(self.data) | expected_checksums['length'] = len(self.data) | ||||
algos = set(['length']).union(hashutil.DEFAULT_ALGORITHMS) | algos = set(['length']).union(hashutil.DEFAULT_ALGORITHMS) | ||||
checksums = hashutil.hash_data(self.data, algorithms=algos) | checksums = hashutil.hash_data(self.data, algorithms=algos) | ||||
self.assertEqual(checksums, expected_checksums) | self.assertEqual(checksums, expected_checksums) | ||||
self.assertTrue('length' in checksums) | self.assertTrue('length' in checksums) | ||||
@istest | def test_hash_data_unknown_hash(self): | ||||
def hash_data_unknown_hash(self): | |||||
with self.assertRaises(ValueError) as cm: | with self.assertRaises(ValueError) as cm: | ||||
hashutil.hash_data(self.data, ['unknown-hash']) | hashutil.hash_data(self.data, ['unknown-hash']) | ||||
self.assertIn('Unexpected hashing algorithm', cm.exception.args[0]) | self.assertIn('Unexpected hashing algorithm', cm.exception.args[0]) | ||||
self.assertIn('unknown-hash', cm.exception.args[0]) | self.assertIn('unknown-hash', cm.exception.args[0]) | ||||
@istest | def test_hash_git_data(self): | ||||
def hash_git_data(self): | |||||
checksums = { | checksums = { | ||||
git_type: hashutil.hash_git_data(self.data, git_type) | git_type: hashutil.hash_git_data(self.data, git_type) | ||||
for git_type in self.git_checksums | for git_type in self.git_checksums | ||||
} | } | ||||
self.assertEqual(checksums, self.git_checksums) | self.assertEqual(checksums, self.git_checksums) | ||||
@istest | def test_hash_git_data_unknown_git_type(self): | ||||
def hash_git_data_unknown_git_type(self): | |||||
with self.assertRaises(ValueError) as cm: | with self.assertRaises(ValueError) as cm: | ||||
hashutil.hash_git_data(self.data, 'unknown-git-type') | hashutil.hash_git_data(self.data, 'unknown-git-type') | ||||
self.assertIn('Unexpected git object type', cm.exception.args[0]) | self.assertIn('Unexpected git object type', cm.exception.args[0]) | ||||
self.assertIn('unknown-git-type', cm.exception.args[0]) | self.assertIn('unknown-git-type', cm.exception.args[0]) | ||||
@istest | def test_hash_file(self): | ||||
def hash_file(self): | |||||
fobj = io.BytesIO(self.data) | fobj = io.BytesIO(self.data) | ||||
checksums = hashutil.hash_file(fobj, length=len(self.data)) | checksums = hashutil.hash_file(fobj, length=len(self.data)) | ||||
self.assertEqual(checksums, self.checksums) | self.assertEqual(checksums, self.checksums) | ||||
@istest | def test_hash_file_missing_length(self): | ||||
def hash_file_missing_length(self): | |||||
fobj = io.BytesIO(self.data) | fobj = io.BytesIO(self.data) | ||||
with self.assertRaises(ValueError) as cm: | with self.assertRaises(ValueError) as cm: | ||||
hashutil.hash_file(fobj, algorithms=['sha1_git']) | hashutil.hash_file(fobj, algorithms=['sha1_git']) | ||||
self.assertIn('Missing length', cm.exception.args[0]) | self.assertIn('Missing length', cm.exception.args[0]) | ||||
@istest | def test_hash_path(self): | ||||
def hash_path(self): | |||||
with tempfile.NamedTemporaryFile(delete=False) as f: | with tempfile.NamedTemporaryFile(delete=False) as f: | ||||
f.write(self.data) | f.write(self.data) | ||||
hashes = hashutil.hash_path(f.name) | hashes = hashutil.hash_path(f.name) | ||||
os.remove(f.name) | os.remove(f.name) | ||||
self.checksums['length'] = len(self.data) | self.checksums['length'] = len(self.data) | ||||
self.assertEquals(self.checksums, hashes) | self.assertEquals(self.checksums, hashes) | ||||
@istest | def test_hash_to_hex(self): | ||||
def hash_to_hex(self): | |||||
for type in self.checksums: | for type in self.checksums: | ||||
hex = self.hex_checksums[type] | hex = self.hex_checksums[type] | ||||
hash = self.checksums[type] | hash = self.checksums[type] | ||||
self.assertEquals(hashutil.hash_to_hex(hex), hex) | self.assertEquals(hashutil.hash_to_hex(hex), hex) | ||||
self.assertEquals(hashutil.hash_to_hex(hash), hex) | self.assertEquals(hashutil.hash_to_hex(hash), hex) | ||||
@istest | def test_hash_to_bytes(self): | ||||
def hash_to_bytes(self): | |||||
for type in self.checksums: | for type in self.checksums: | ||||
hex = self.hex_checksums[type] | hex = self.hex_checksums[type] | ||||
hash = self.checksums[type] | hash = self.checksums[type] | ||||
self.assertEquals(hashutil.hash_to_bytes(hex), hash) | self.assertEquals(hashutil.hash_to_bytes(hex), hash) | ||||
self.assertEquals(hashutil.hash_to_bytes(hash), hash) | self.assertEquals(hashutil.hash_to_bytes(hash), hash) | ||||
@istest | def test_hash_to_bytehex(self): | ||||
def hash_to_bytehex(self): | |||||
for algo in self.checksums: | for algo in self.checksums: | ||||
self.assertEqual(self.hex_checksums[algo].encode('ascii'), | self.assertEqual(self.hex_checksums[algo].encode('ascii'), | ||||
hashutil.hash_to_bytehex(self.checksums[algo])) | hashutil.hash_to_bytehex(self.checksums[algo])) | ||||
@istest | def test_bytehex_to_hash(self): | ||||
def bytehex_to_hash(self): | |||||
for algo in self.checksums: | for algo in self.checksums: | ||||
self.assertEqual(self.checksums[algo], | self.assertEqual(self.checksums[algo], | ||||
hashutil.bytehex_to_hash( | hashutil.bytehex_to_hash( | ||||
self.hex_checksums[algo].encode())) | self.hex_checksums[algo].encode())) | ||||
@istest | def test_new_hash_unsupported_hashing_algorithm(self): | ||||
def new_hash_unsupported_hashing_algorithm(self): | |||||
try: | try: | ||||
hashutil._new_hash('blake2:10') | hashutil._new_hash('blake2:10') | ||||
except ValueError as e: | except ValueError as e: | ||||
self.assertEquals(str(e), | self.assertEquals(str(e), | ||||
'Unexpected hashing algorithm blake2:10, ' | 'Unexpected hashing algorithm blake2:10, ' | ||||
'expected one of blake2b512, blake2s256, ' | 'expected one of blake2b512, blake2s256, ' | ||||
'sha1, sha1_git, sha256') | 'sha1, sha1_git, sha256') | ||||
@patch('hashlib.new') | @patch('hashlib.new') | ||||
@istest | def test_new_hash_blake2b_blake2b512_builtin(self, mock_hashlib_new): | ||||
def new_hash_blake2b_blake2b512_builtin(self, mock_hashlib_new): | |||||
if 'blake2b512' not in hashlib.algorithms_available: | if 'blake2b512' not in hashlib.algorithms_available: | ||||
self.skipTest('blake2b512 not built-in') | self.skipTest('blake2b512 not built-in') | ||||
mock_hashlib_new.return_value = sentinel = object() | mock_hashlib_new.return_value = sentinel = object() | ||||
h = hashutil._new_hash('blake2b512') | h = hashutil._new_hash('blake2b512') | ||||
self.assertIs(h, sentinel) | self.assertIs(h, sentinel) | ||||
mock_hashlib_new.assert_called_with('blake2b512') | mock_hashlib_new.assert_called_with('blake2b512') | ||||
@patch('hashlib.new') | @patch('hashlib.new') | ||||
@istest | def test_new_hash_blake2s_blake2s256_builtin(self, mock_hashlib_new): | ||||
def new_hash_blake2s_blake2s256_builtin(self, mock_hashlib_new): | |||||
if 'blake2s256' not in hashlib.algorithms_available: | if 'blake2s256' not in hashlib.algorithms_available: | ||||
self.skipTest('blake2s256 not built-in') | self.skipTest('blake2s256 not built-in') | ||||
mock_hashlib_new.return_value = sentinel = object() | mock_hashlib_new.return_value = sentinel = object() | ||||
h = hashutil._new_hash('blake2s256') | h = hashutil._new_hash('blake2s256') | ||||
self.assertIs(h, sentinel) | self.assertIs(h, sentinel) | ||||
mock_hashlib_new.assert_called_with('blake2s256') | mock_hashlib_new.assert_called_with('blake2s256') | ||||
@istest | def test_new_hash_blake2b_builtin(self): | ||||
def new_hash_blake2b_builtin(self): | |||||
removed_hash = False | removed_hash = False | ||||
try: | try: | ||||
if 'blake2b512' in hashlib.algorithms_available: | if 'blake2b512' in hashlib.algorithms_available: | ||||
removed_hash = True | removed_hash = True | ||||
hashlib.algorithms_available.remove('blake2b512') | hashlib.algorithms_available.remove('blake2b512') | ||||
if 'blake2b' not in hashlib.algorithms_available: | if 'blake2b' not in hashlib.algorithms_available: | ||||
self.skipTest('blake2b not built in') | self.skipTest('blake2b not built in') | ||||
with patch('hashlib.blake2b') as mock_blake2b: | with patch('hashlib.blake2b') as mock_blake2b: | ||||
mock_blake2b.return_value = sentinel = object() | mock_blake2b.return_value = sentinel = object() | ||||
h = hashutil._new_hash('blake2b512') | h = hashutil._new_hash('blake2b512') | ||||
self.assertIs(h, sentinel) | self.assertIs(h, sentinel) | ||||
mock_blake2b.assert_called_with(digest_size=512//8) | mock_blake2b.assert_called_with(digest_size=512//8) | ||||
finally: | finally: | ||||
if removed_hash: | if removed_hash: | ||||
hashlib.algorithms_available.add('blake2b512') | hashlib.algorithms_available.add('blake2b512') | ||||
@istest | def test_new_hash_blake2s_builtin(self): | ||||
def new_hash_blake2s_builtin(self): | |||||
removed_hash = False | removed_hash = False | ||||
try: | try: | ||||
if 'blake2s256' in hashlib.algorithms_available: | if 'blake2s256' in hashlib.algorithms_available: | ||||
removed_hash = True | removed_hash = True | ||||
hashlib.algorithms_available.remove('blake2s256') | hashlib.algorithms_available.remove('blake2s256') | ||||
if 'blake2s' not in hashlib.algorithms_available: | if 'blake2s' not in hashlib.algorithms_available: | ||||
self.skipTest('blake2s not built in') | self.skipTest('blake2s not built in') | ||||
with patch('hashlib.blake2s') as mock_blake2s: | with patch('hashlib.blake2s') as mock_blake2s: | ||||
mock_blake2s.return_value = sentinel = object() | mock_blake2s.return_value = sentinel = object() | ||||
h = hashutil._new_hash('blake2s256') | h = hashutil._new_hash('blake2s256') | ||||
self.assertIs(h, sentinel) | self.assertIs(h, sentinel) | ||||
mock_blake2s.assert_called_with(digest_size=256//8) | mock_blake2s.assert_called_with(digest_size=256//8) | ||||
finally: | finally: | ||||
if removed_hash: | if removed_hash: | ||||
hashlib.algorithms_available.add('blake2s256') | hashlib.algorithms_available.add('blake2s256') | ||||
@istest | def test_new_hash_blake2b_pyblake2(self): | ||||
def new_hash_blake2b_pyblake2(self): | |||||
if 'blake2b512' in hashlib.algorithms_available: | if 'blake2b512' in hashlib.algorithms_available: | ||||
self.skipTest('blake2b512 built in') | self.skipTest('blake2b512 built in') | ||||
if 'blake2b' in hashlib.algorithms_available: | if 'blake2b' in hashlib.algorithms_available: | ||||
self.skipTest('blake2b built in') | self.skipTest('blake2b built in') | ||||
with patch('pyblake2.blake2b') as mock_blake2b: | with patch('pyblake2.blake2b') as mock_blake2b: | ||||
mock_blake2b.return_value = sentinel = object() | mock_blake2b.return_value = sentinel = object() | ||||
h = hashutil._new_hash('blake2b512') | h = hashutil._new_hash('blake2b512') | ||||
self.assertIs(h, sentinel) | self.assertIs(h, sentinel) | ||||
mock_blake2b.assert_called_with(digest_size=512//8) | mock_blake2b.assert_called_with(digest_size=512//8) | ||||
@istest | def test_new_hash_blake2s_pyblake2(self): | ||||
def new_hash_blake2s_pyblake2(self): | |||||
if 'blake2s256' in hashlib.algorithms_available: | if 'blake2s256' in hashlib.algorithms_available: | ||||
self.skipTest('blake2s256 built in') | self.skipTest('blake2s256 built in') | ||||
if 'blake2s' in hashlib.algorithms_available: | if 'blake2s' in hashlib.algorithms_available: | ||||
self.skipTest('blake2s built in') | self.skipTest('blake2s built in') | ||||
with patch('pyblake2.blake2s') as mock_blake2s: | with patch('pyblake2.blake2s') as mock_blake2s: | ||||
mock_blake2s.return_value = sentinel = object() | mock_blake2s.return_value = sentinel = object() | ||||
Show All 38 Lines | """.encode('utf-8') # NOQA | ||||
'tree_sha1_git': bytes.fromhex('ac212302c45eada382b27bfda795db' | 'tree_sha1_git': bytes.fromhex('ac212302c45eada382b27bfda795db' | ||||
'121dacdb1c'), | '121dacdb1c'), | ||||
'commit_sha1_git': bytes.fromhex('e960570b2e6e2798fa4cfb9af2c399' | 'commit_sha1_git': bytes.fromhex('e960570b2e6e2798fa4cfb9af2c399' | ||||
'd629189653'), | 'd629189653'), | ||||
'tag_sha1_git': bytes.fromhex('bc2b99ba469987bcf1272c189ed534' | 'tag_sha1_git': bytes.fromhex('bc2b99ba469987bcf1272c189ed534' | ||||
'e9e959f120'), | 'e9e959f120'), | ||||
} | } | ||||
@istest | def test_unknown_header_type(self): | ||||
def unknown_header_type(self): | |||||
with self.assertRaises(ValueError) as cm: | with self.assertRaises(ValueError) as cm: | ||||
hashutil.hash_git_data(b'any-data', 'some-unknown-type') | hashutil.hash_git_data(b'any-data', 'some-unknown-type') | ||||
self.assertIn('Unexpected git object type', cm.exception.args[0]) | self.assertIn('Unexpected git object type', cm.exception.args[0]) | ||||
@istest | def test_hashdata_content(self): | ||||
def hashdata_content(self): | |||||
# when | # when | ||||
actual_hash = hashutil.hash_git_data(self.blob_data, git_type='blob') | actual_hash = hashutil.hash_git_data(self.blob_data, git_type='blob') | ||||
# then | # then | ||||
self.assertEqual(actual_hash, | self.assertEqual(actual_hash, | ||||
self.checksums['blob_sha1_git']) | self.checksums['blob_sha1_git']) | ||||
@istest | def test_hashdata_tree(self): | ||||
def hashdata_tree(self): | |||||
# when | # when | ||||
actual_hash = hashutil.hash_git_data(self.tree_data, git_type='tree') | actual_hash = hashutil.hash_git_data(self.tree_data, git_type='tree') | ||||
# then | # then | ||||
self.assertEqual(actual_hash, | self.assertEqual(actual_hash, | ||||
self.checksums['tree_sha1_git']) | self.checksums['tree_sha1_git']) | ||||
@istest | def test_hashdata_revision(self): | ||||
def hashdata_revision(self): | |||||
# when | # when | ||||
actual_hash = hashutil.hash_git_data(self.commit_data, | actual_hash = hashutil.hash_git_data(self.commit_data, | ||||
git_type='commit') | git_type='commit') | ||||
# then | # then | ||||
self.assertEqual(actual_hash, | self.assertEqual(actual_hash, | ||||
self.checksums['commit_sha1_git']) | self.checksums['commit_sha1_git']) | ||||
@istest | def test_hashdata_tag(self): | ||||
def hashdata_tag(self): | |||||
# when | # when | ||||
actual_hash = hashutil.hash_git_data(self.tag_data, git_type='tag') | actual_hash = hashutil.hash_git_data(self.tag_data, git_type='tag') | ||||
# then | # then | ||||
self.assertEqual(actual_hash, | self.assertEqual(actual_hash, | ||||
self.checksums['tag_sha1_git']) | self.checksums['tag_sha1_git']) |