Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9697238
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
View Options
diff --git a/swh/core/hashutil.py b/swh/core/hashutil.py
index 5c3059b..b643a6c 100644
--- a/swh/core/hashutil.py
+++ b/swh/core/hashutil.py
@@ -1,106 +1,106 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import binascii
import functools
import hashlib
import os
from io import BytesIO
# supported hashing algorithms
ALGORITHMS = set(['sha1', 'sha256', 'sha1_git'])
# should be a multiple of 64 (sha1/sha256's block size)
# FWIW coreutils' sha1sum uses 32768
HASH_BLOCK_SIZE = 32768
def _new_hash(algo, length=None):
"""Initialize a digest object (as returned by python's hashlib) for the
requested algorithm. See the constant ALGORITHMS for the list of supported
algorithms. If a git-specific hashing algorithm is requested (e.g.,
"sha1_git"), the hashing object will be pre-fed with the needed header; for
this to work, length must be given.
"""
if algo not in ALGORITHMS:
raise ValueError('unknown hashing algorithm ' + algo)
h = None
if algo.endswith('_git'):
if length is None:
raise ValueError('missing length for git hashing algorithm')
h = hashlib.new(algo.split('_')[0])
h.update(('blob %d\0' % length).encode('ascii')) # git hash header
else:
h = hashlib.new(algo)
return h
def _hash_file_obj(f, length, algorithms=ALGORITHMS, chunk_cb=None):
"""hash the content of a file-like object
If chunk_cb is given, call it on each data chunk after updating the hash
"""
hashers = {algo: _new_hash(algo, length)
for algo in algorithms}
while True:
chunk = f.read(HASH_BLOCK_SIZE)
if not chunk:
break
for h in hashers.values():
h.update(chunk)
if chunk_cb:
chunk_cb(chunk)
return {algo: hashers[algo].digest() for algo in hashers}
def _hash_fname(fname, algorithms=ALGORITHMS):
"""hash the content of a file specified by file name
"""
length = os.path.getsize(fname)
with open(fname, 'rb') as f:
return _hash_file_obj(f, length)
def hashfile(f, length=None, algorithms=ALGORITHMS):
"""Hash the content of a given file, given either as a file-like object or a
file name. All specified hash algorithms will be computed, reading the file
only once. Returns a dictionary mapping algorithm names to hex-encoded
checksums.
When passing a file-like object, content length must be given; when passing
a file name, content length is ignored.
"""
- if isinstance(f, str):
+ if isinstance(f, (str, bytes)):
return _hash_fname(f, algorithms)
else:
return _hash_file_obj(f, length, algorithms)
def hashdata(data, algorithms=ALGORITHMS):
"""Like hashfile, but hashes content passed as a string (of bytes)
"""
buf = BytesIO(data)
return _hash_file_obj(buf, len(data), algorithms)
@functools.lru_cache()
def hash_to_hex(hash):
"""Converts a hash to its hexadecimal string representation"""
return binascii.hexlify(hash).decode('ascii')
@functools.lru_cache()
def hex_to_hash(hex):
"""Converts a hexadecimal string representation of a hash to that hash"""
return bytes.fromhex(hex)
diff --git a/swh/core/tests/test_hashutil.py b/swh/core/tests/test_hashutil.py
index 0931019..f277707 100644
--- a/swh/core/tests/test_hashutil.py
+++ b/swh/core/tests/test_hashutil.py
@@ -1,77 +1,85 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import tempfile
import unittest
from nose.tools import istest
from swh.core import hashutil
class Hashlib(unittest.TestCase):
def setUp(self):
self.data = b'42\n'
self.hex_checksums = {
'sha1': '34973274ccef6ab4dfaaf86599792fa9c3fe4689',
'sha1_git': 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd',
'sha256': '084c799cd551dd1d8d5c5f9a5d593b2e931f5e36'
'122ee5c793c1d08a19839cc0',
}
self.checksums = {
'sha1': bytes.fromhex('34973274ccef6ab4dfaaf865997'
'92fa9c3fe4689'),
'sha1_git': bytes.fromhex('d81cc0710eb6cf9efd5b920a845'
'3e1e07157b6cd'),
'sha256': bytes.fromhex('084c799cd551dd1d8d5c5f9a5d5'
'93b2e931f5e36122ee5c793c1d0'
'8a19839cc0'),
}
@istest
def hashdata(self):
checksums = hashutil.hashdata(self.data)
self.assertEqual(checksums, self.checksums)
@istest
def unknown_algo(self):
with self.assertRaises(ValueError):
hashutil.hashdata(self.data, algorithms=['does-not-exist'])
@istest
def algo_selection(self):
checksums = hashutil.hashdata(self.data, algorithms=['sha1', 'sha256'])
self.assertIn('sha1', checksums)
self.assertIn('sha256', checksums)
self.assertNotIn('sha1_git', checksums)
@istest
def hashfile_by_name(self):
with tempfile.NamedTemporaryFile() as f:
f.write(self.data)
f.flush()
checksums = hashutil.hashfile(f.name)
self.assertEqual(checksums, self.checksums)
+ @istest
+ def hashfile_by_name_as_bytes(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(self.data)
+ f.flush()
+ checksums = hashutil.hashfile(f.name.encode('utf-8'))
+ self.assertEqual(checksums, self.checksums)
+
@istest
def hashfile_by_obj(self):
with tempfile.TemporaryFile() as f:
f.write(self.data)
f.seek(0)
checksums = hashutil.hashfile(f, len(self.data))
self.assertEqual(checksums, self.checksums)
@istest
def hex_to_hash(self):
for algo in self.checksums:
self.assertEqual(self.checksums[algo],
hashutil.hex_to_hash(self.hex_checksums[algo]))
@istest
def hash_to_hex(self):
for algo in self.checksums:
self.assertEqual(self.hex_checksums[algo],
hashutil.hash_to_hex(self.checksums[algo]))
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Aug 18, 11:12 PM (1 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3260855
Attached To
rDCORE Foundations and core functionalities
Event Timeline
Log In to Comment