diff --git a/swh/model/hashutil.py b/swh/model/hashutil.py index eaacb23..86ecc6f 100644 --- a/swh/model/hashutil.py +++ b/swh/model/hashutil.py @@ -1,365 +1,365 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of hashing function definitions. This is the base module use to compute swh's hashes. Only a subset of hashing algorithms is supported as defined in the ALGORITHMS set. Any provided algorithms not in that list will result in a ValueError explaining the error. This module defines a MultiHash class to ease the softwareheritage hashing algorithms computation. This allows to compute hashes from file object, path, data using a similar interface as what the standard hashlib module provides. Basic usage examples: - file object: MultiHash.from_file( file_object, hash_names=DEFAULT_ALGORITHMS).digest() - path (filepath): MultiHash.from_path(b'foo').hexdigest() - data (bytes): MultiHash.from_data(b'foo').bytehexdigest() "Complex" usage, defining a swh hashlib instance first: - To compute length, integrate the length to the set of algorithms to compute, for example: .. code-block:: python h = MultiHash(hash_names=set({'length'}).union(DEFAULT_ALGORITHMS)) with open(filepath, 'rb') as f: h.update(f.read(HASH_BLOCK_SIZE)) hashes = h.digest() # returns a dict of {hash_algo_name: hash_in_bytes} - Write alongside computing hashing algorithms (from a stream), example: .. code-block:: python h = MultiHash(length=length) with open(filepath, 'wb') as f: for chunk in r.iter_content(): # r a stream of sort h.update(chunk) f.write(chunk) hashes = h.hexdigest() # returns a dict of {hash_algo_name: hash_in_hex} """ import binascii import functools import hashlib from io import BytesIO import os from typing import Callable, Dict, Optional -ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256", "blake2b512"]) +ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256", "blake2b512", "md5"]) """Hashing algorithms supported by this module""" DEFAULT_ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256"]) """Algorithms computed by default when calling the functions from this module. Subset of :const:`ALGORITHMS`. """ HASH_BLOCK_SIZE = 32768 """Block size for streaming hash computations made in this module""" _blake2_hash_cache = {} # type: Dict[str, Callable] class MultiHash: """Hashutil class to support multiple hashes computation. Args: hash_names (set): Set of hash algorithms (+ optionally length) to compute hashes (cf. DEFAULT_ALGORITHMS) length (int): Length of the total sum of chunks to read If the length is provided as algorithm, the length is also computed and returned. """ def __init__(self, hash_names=DEFAULT_ALGORITHMS, length=None): self.state = {} self.track_length = False for name in hash_names: if name == "length": self.state["length"] = 0 self.track_length = True else: self.state[name] = _new_hash(name, length) @classmethod def from_state(cls, state, track_length): ret = cls([]) ret.state = state ret.track_length = track_length @classmethod def from_file(cls, fobj, hash_names=DEFAULT_ALGORITHMS, length=None): ret = cls(length=length, hash_names=hash_names) while True: chunk = fobj.read(HASH_BLOCK_SIZE) if not chunk: break ret.update(chunk) return ret @classmethod def from_path(cls, path, hash_names=DEFAULT_ALGORITHMS): length = os.path.getsize(path) with open(path, "rb") as f: ret = cls.from_file(f, hash_names=hash_names, length=length) return ret @classmethod def from_data(cls, data, hash_names=DEFAULT_ALGORITHMS): length = len(data) fobj = BytesIO(data) return cls.from_file(fobj, hash_names=hash_names, length=length) def update(self, chunk): for name, h in self.state.items(): if name == "length": continue h.update(chunk) if self.track_length: self.state["length"] += len(chunk) def digest(self): return { name: h.digest() if name != "length" else h for name, h in self.state.items() } def hexdigest(self): return { name: h.hexdigest() if name != "length" else h for name, h in self.state.items() } def bytehexdigest(self): return { name: hash_to_bytehex(h.digest()) if name != "length" else h for name, h in self.state.items() } def copy(self): copied_state = { name: h.copy() if name != "length" else h for name, h in self.state.items() } return self.from_state(copied_state, self.track_length) def _new_blake2_hash(algo): """Return a function that initializes a blake2 hash. """ if algo in _blake2_hash_cache: return _blake2_hash_cache[algo]() lalgo = algo.lower() if not lalgo.startswith("blake2"): raise ValueError("Algorithm %s is not a blake2 hash" % algo) blake_family = lalgo[:7] digest_size = None if lalgo[7:]: try: digest_size, remainder = divmod(int(lalgo[7:]), 8) except ValueError: raise ValueError("Unknown digest size for algo %s" % algo) from None if remainder: raise ValueError( "Digest size for algorithm %s must be a multiple of 8" % algo ) if lalgo in hashlib.algorithms_available: # Handle the case where OpenSSL ships the given algorithm # (e.g. Python 3.5 on Debian 9 stretch) _blake2_hash_cache[algo] = lambda: hashlib.new(lalgo) else: # Try using the built-in implementation for Python 3.6+ if blake_family in hashlib.algorithms_available: blake2 = getattr(hashlib, blake_family) else: import pyblake2 blake2 = getattr(pyblake2, blake_family) _blake2_hash_cache[algo] = lambda: blake2(digest_size=digest_size) return _blake2_hash_cache[algo]() def _new_hashlib_hash(algo): """Initialize a digest object from hashlib. Handle the swh-specific names for the blake2-related algorithms """ if algo.startswith("blake2"): return _new_blake2_hash(algo) else: return hashlib.new(algo) def git_object_header(git_type: str, length: int) -> bytes: """Returns the header for a git object of the given type and length. The header of a git object consists of: - The type of the object (encoded in ASCII) - One ASCII space (\x20) - The length of the object (decimal encoded in ASCII) - One NUL byte Args: base_algo (str from :const:`ALGORITHMS`): a hashlib-supported algorithm git_type: the type of the git object (supposedly one of 'blob', 'commit', 'tag', 'tree') length: the length of the git object you're encoding Returns: a hashutil.hash object """ git_object_types = { "blob", "tree", "commit", "tag", "snapshot", "raw_extrinsic_metadata", "extid", } if git_type not in git_object_types: raise ValueError( "Unexpected git object type %s, expected one of %s" % (git_type, ", ".join(sorted(git_object_types))) ) return ("%s %d\0" % (git_type, length)).encode("ascii") def _new_hash(algo: str, length: Optional[int] = None): """Initialize a digest object (as returned by python's hashlib) for the requested algorithm. See the constant ALGORITHMS for the list of supported algorithms. If a git-specific hashing algorithm is requested (e.g., "sha1_git"), the hashing object will be pre-fed with the needed header; for this to work, length must be given. Args: algo (str): a hashing algorithm (one of ALGORITHMS) length (int): the length of the hashed payload (needed for git-specific algorithms) Returns: a hashutil.hash object Raises: ValueError if algo is unknown, or length is missing for a git-specific hash. """ if algo not in ALGORITHMS: raise ValueError( "Unexpected hashing algorithm %s, expected one of %s" % (algo, ", ".join(sorted(ALGORITHMS))) ) if algo.endswith("_git"): if length is None: raise ValueError("Missing length for git hashing algorithm") base_algo = algo[:-4] h = _new_hashlib_hash(base_algo) h.update(git_object_header("blob", length)) return h return _new_hashlib_hash(algo) def hash_git_data(data, git_type, base_algo="sha1"): """Hash the given data as a git object of type git_type. Args: data: a bytes object git_type: the git object type base_algo: the base hashing algorithm used (default: sha1) Returns: a dict mapping each algorithm to a bytes digest Raises: ValueError if the git_type is unexpected. """ h = _new_hashlib_hash(base_algo) h.update(git_object_header(git_type, len(data))) h.update(data) return h.digest() @functools.lru_cache() def hash_to_hex(hash): """Converts a hash (in hex or bytes form) to its hexadecimal ascii form Args: hash (str or bytes): a :class:`bytes` hash or a :class:`str` containing the hexadecimal form of the hash Returns: str: the hexadecimal form of the hash """ if isinstance(hash, str): return hash return binascii.hexlify(hash).decode("ascii") @functools.lru_cache() def hash_to_bytehex(hash): """Converts a hash to its hexadecimal bytes representation Args: hash (bytes): a :class:`bytes` hash Returns: bytes: the hexadecimal form of the hash, as :class:`bytes` """ return binascii.hexlify(hash) @functools.lru_cache() def hash_to_bytes(hash): """Converts a hash (in hex or bytes form) to its raw bytes form Args: hash (str or bytes): a :class:`bytes` hash or a :class:`str` containing the hexadecimal form of the hash Returns: bytes: the :class:`bytes` form of the hash """ if isinstance(hash, bytes): return hash return bytes.fromhex(hash) @functools.lru_cache() def bytehex_to_hash(hex): """Converts a hexadecimal bytes representation of a hash to that hash Args: hash (bytes): a :class:`bytes` containing the hexadecimal form of the hash encoded in ascii Returns: bytes: the :class:`bytes` form of the hash """ return hash_to_bytes(hex.decode()) diff --git a/swh/model/tests/test_hashutil.py b/swh/model/tests/test_hashutil.py index ec540d2..c864bd8 100644 --- a/swh/model/tests/test_hashutil.py +++ b/swh/model/tests/test_hashutil.py @@ -1,378 +1,408 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import hashlib import io import os import tempfile from unittest.mock import patch import pytest from swh.model import hashutil -from swh.model.hashutil import MultiHash +from swh.model.hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytehex @contextlib.contextmanager def patch_blake2(function_name): try: with patch(function_name) as mock: yield mock finally: # mocking blake2 inserts mock objects in the cache; we need # to clean it before the next test runs hashutil._blake2_hash_cache.clear() @pytest.fixture(autouse=True) def blake2_hash_cache_reset(): # Reset function cache hashutil._blake2_hash_cache = {} @pytest.fixture def hash_test_data(): class HashTestData: data = b"1984\n" hex_checksums = { "sha1": "62be35bf00ff0c624f4a621e2ea5595a049e0731", "sha1_git": "568aaf43d83b2c3df8067f3bedbb97d83260be6d", "sha256": "26602113b4b9afd9d55466b08580d3c2" "4a9b50ee5b5866c0d91fab0e65907311", "blake2s256": "63cfb259e1fdb485bc5c55749697a6b21ef31fb7445f6c78a" "c9422f9f2dc8906", } checksums = { type: bytes.fromhex(cksum) for type, cksum in hex_checksums.items() } bytehex_checksums = { type: hashutil.hash_to_bytehex(cksum) for type, cksum in checksums.items() } git_hex_checksums = { "blob": hex_checksums["sha1_git"], "tree": "5b2e883aa33d2efab98442693ea4dd5f1b8871b0", "commit": "79e4093542e72f0fcb7cbd75cb7d270f9254aa8f", "tag": "d6bf62466f287b4d986c545890716ce058bddf67", } git_checksums = { type: bytes.fromhex(cksum) for type, cksum in git_hex_checksums.items() } return HashTestData def test_multi_hash_data(hash_test_data): checksums = MultiHash.from_data(hash_test_data.data).digest() assert checksums == hash_test_data.checksums assert "length" not in checksums def test_multi_hash_data_with_length(hash_test_data): expected_checksums = hash_test_data.checksums.copy() expected_checksums["length"] = len(hash_test_data.data) algos = set(["length"]).union(hashutil.DEFAULT_ALGORITHMS) checksums = MultiHash.from_data(hash_test_data.data, hash_names=algos).digest() assert checksums == expected_checksums assert "length" in checksums def test_multi_hash_data_unknown_hash(hash_test_data): with pytest.raises(ValueError, match="Unexpected hashing algorithm.*unknown-hash"): MultiHash.from_data(hash_test_data.data, ["unknown-hash"]) def test_multi_hash_file(hash_test_data): fobj = io.BytesIO(hash_test_data.data) checksums = MultiHash.from_file(fobj, length=len(hash_test_data.data)).digest() assert checksums == hash_test_data.checksums def test_multi_hash_file_hexdigest(hash_test_data): fobj = io.BytesIO(hash_test_data.data) length = len(hash_test_data.data) checksums = MultiHash.from_file(fobj, length=length).hexdigest() assert checksums == hash_test_data.hex_checksums def test_multi_hash_file_bytehexdigest(hash_test_data): fobj = io.BytesIO(hash_test_data.data) length = len(hash_test_data.data) checksums = MultiHash.from_file(fobj, length=length).bytehexdigest() assert checksums == hash_test_data.bytehex_checksums +def test_multi_hash_file_with_md5(hash_test_data): + fobj = io.BytesIO(hash_test_data.data) + + checksums = MultiHash.from_file( + fobj, hash_names=DEFAULT_ALGORITHMS | {"md5"}, length=len(hash_test_data.data) + ).digest() + md5sum = {"md5": hashlib.md5(hash_test_data.data).digest()} + assert checksums == {**hash_test_data.checksums, **md5sum} + + +def test_multi_hash_file_hexdigest_with_md5(hash_test_data): + fobj = io.BytesIO(hash_test_data.data) + length = len(hash_test_data.data) + checksums = MultiHash.from_file( + fobj, hash_names=DEFAULT_ALGORITHMS | {"md5"}, length=length + ).hexdigest() + md5sum = {"md5": hashlib.md5(hash_test_data.data).hexdigest()} + assert checksums == {**hash_test_data.hex_checksums, **md5sum} + + +def test_multi_hash_file_bytehexdigest_with_md5(hash_test_data): + fobj = io.BytesIO(hash_test_data.data) + length = len(hash_test_data.data) + checksums = MultiHash.from_file( + fobj, hash_names=DEFAULT_ALGORITHMS | {"md5"}, length=length + ).bytehexdigest() + md5sum = {"md5": hash_to_bytehex(hashlib.md5(hash_test_data.data).digest())} + assert checksums == {**hash_test_data.bytehex_checksums, **md5sum} + + def test_multi_hash_file_missing_length(hash_test_data): fobj = io.BytesIO(hash_test_data.data) with pytest.raises(ValueError, match="Missing length"): MultiHash.from_file(fobj, hash_names=["sha1_git"]) def test_multi_hash_path(hash_test_data): with tempfile.NamedTemporaryFile(delete=False) as f: f.write(hash_test_data.data) hashes = MultiHash.from_path(f.name).digest() os.remove(f.name) assert hash_test_data.checksums == hashes def test_hash_git_data(hash_test_data): checksums = { git_type: hashutil.hash_git_data(hash_test_data.data, git_type) for git_type in hash_test_data.git_checksums } assert checksums == hash_test_data.git_checksums def test_hash_git_data_unknown_git_type(hash_test_data): with pytest.raises( ValueError, match="Unexpected git object type.*unknown-git-type" ): hashutil.hash_git_data(hash_test_data.data, "unknown-git-type") def test_hash_to_hex(hash_test_data): for type in hash_test_data.checksums: hex = hash_test_data.hex_checksums[type] hash = hash_test_data.checksums[type] assert hashutil.hash_to_hex(hex) == hex assert hashutil.hash_to_hex(hash) == hex def test_hash_to_bytes(hash_test_data): for type in hash_test_data.checksums: hex = hash_test_data.hex_checksums[type] hash = hash_test_data.checksums[type] assert hashutil.hash_to_bytes(hex) == hash assert hashutil.hash_to_bytes(hash) == hash def test_hash_to_bytehex(hash_test_data): for algo in hash_test_data.checksums: hex_checksum = hash_test_data.hex_checksums[algo].encode("ascii") assert hex_checksum == hashutil.hash_to_bytehex(hash_test_data.checksums[algo]) def test_bytehex_to_hash(hash_test_data): for algo in hash_test_data.checksums: assert hash_test_data.checksums[algo] == hashutil.bytehex_to_hash( hash_test_data.hex_checksums[algo].encode() ) def test_new_hash_unsupported_hashing_algorithm(): expected_message = ( "Unexpected hashing algorithm blake2:10, " "expected one of blake2b512, blake2s256, " - "sha1, sha1_git, sha256" + "md5, sha1, sha1_git, sha256" ) with pytest.raises(ValueError, match=expected_message): hashutil._new_hash("blake2:10") @pytest.mark.skipif( "blake2b512" not in hashlib.algorithms_available, reason="blake2b512 not built-in" ) @patch("hashlib.new") def test_new_hash_blake2b_blake2b512_builtin(mock_hashlib_new): mock_hashlib_new.return_value = sentinel = object() h = hashutil._new_hash("blake2b512") assert h is sentinel mock_hashlib_new.assert_called_with("blake2b512") @pytest.mark.skipif( "blake2s256" not in hashlib.algorithms_available, reason="blake2s256 not built-in" ) @patch("hashlib.new") def test_new_hash_blake2s_blake2s256_builtin(mock_hashlib_new): mock_hashlib_new.return_value = sentinel = object() h = hashutil._new_hash("blake2s256") assert h is sentinel mock_hashlib_new.assert_called_with("blake2s256") @pytest.mark.skipif( "blake2b" not in hashlib.algorithms_available, reason="blake2b not built-in" ) def test_new_hash_blake2b_builtin(): removed_hash = False try: if "blake2b512" in hashlib.algorithms_available: removed_hash = True hashlib.algorithms_available.remove("blake2b512") with patch_blake2("hashlib.blake2b") as mock_blake2b: mock_blake2b.return_value = sentinel = object() h = hashutil._new_hash("blake2b512") assert h is sentinel mock_blake2b.assert_called_with(digest_size=512 // 8) finally: if removed_hash: hashlib.algorithms_available.add("blake2b512") @pytest.mark.skipif( "blake2s" not in hashlib.algorithms_available, reason="blake2s not built-in" ) def test_new_hash_blake2s_builtin(): removed_hash = False try: if "blake2s256" in hashlib.algorithms_available: removed_hash = True hashlib.algorithms_available.remove("blake2s256") with patch_blake2("hashlib.blake2s") as mock_blake2s: mock_blake2s.return_value = sentinel = object() h = hashutil._new_hash("blake2s256") assert h is sentinel mock_blake2s.assert_called_with(digest_size=256 // 8) finally: if removed_hash: hashlib.algorithms_available.add("blake2s256") @pytest.mark.skipif( "blake2b512" in hashlib.algorithms_available, reason="blake2b512 built-in" ) @pytest.mark.skipif( "blake2b" in hashlib.algorithms_available, reason="blake2b built-in" ) def test_new_hash_blake2b_pyblake2(): with patch_blake2("pyblake2.blake2b") as mock_blake2b: mock_blake2b.return_value = sentinel = object() h = hashutil._new_hash("blake2b512") assert h is sentinel mock_blake2b.assert_called_with(digest_size=512 // 8) @pytest.mark.skipif( "blake2s256" in hashlib.algorithms_available, reason="blake2s256 built-in" ) @pytest.mark.skipif( "blake2s" in hashlib.algorithms_available, reason="blake2s built-in" ) def test_new_hash_blake2s_pyblake2(): with patch_blake2("pyblake2.blake2s") as mock_blake2s: mock_blake2s.return_value = sentinel = object() h = hashutil._new_hash("blake2s256") assert h is sentinel mock_blake2s.assert_called_with(digest_size=256 // 8) @pytest.fixture def hashgit_test_data(): class HashGitTestData: blob_data = b"42\n" tree_data = b"".join( [ b"40000 barfoo\0", bytes.fromhex("c3020f6bf135a38c6df" "3afeb5fb38232c5e07087"), b"100644 blah\0", bytes.fromhex("63756ef0df5e4f10b6efa" "33cfe5c758749615f20"), b"100644 hello\0", bytes.fromhex("907b308167f0880fb2a" "5c0e1614bb0c7620f9dc3"), ] ) commit_data = b"""\ tree 1c61f7259dcb770f46b194d941df4f08ff0a3970 author Antoine R. Dumont (@ardumont) 1444054085 +0200 committer Antoine R. Dumont (@ardumont) 1444054085 +0200 initial """ # noqa tag_data = """object 24d012aaec0bc5a4d2f62c56399053d6cc72a241 type commit tag 0.0.1 tagger Antoine R. Dumont (@ardumont) 1444225145 +0200 blah """.encode( "utf-8" ) # NOQA checksums = { "blob_sha1_git": bytes.fromhex( "d81cc0710eb6cf9efd5b920a8453e1" "e07157b6cd" ), "tree_sha1_git": bytes.fromhex( "ac212302c45eada382b27bfda795db" "121dacdb1c" ), "commit_sha1_git": bytes.fromhex( "e960570b2e6e2798fa4cfb9af2c399" "d629189653" ), "tag_sha1_git": bytes.fromhex( "bc2b99ba469987bcf1272c189ed534" "e9e959f120" ), } return HashGitTestData def test_unknown_header_type(): with pytest.raises(ValueError, match="Unexpected git object type"): hashutil.hash_git_data(b"any-data", "some-unknown-type") def test_hashdata_content(hashgit_test_data): # when actual_hash = hashutil.hash_git_data(hashgit_test_data.blob_data, git_type="blob") # then assert actual_hash == hashgit_test_data.checksums["blob_sha1_git"] def test_hashdata_tree(hashgit_test_data): # when actual_hash = hashutil.hash_git_data(hashgit_test_data.tree_data, git_type="tree") # then assert actual_hash == hashgit_test_data.checksums["tree_sha1_git"] def test_hashdata_revision(hashgit_test_data): # when actual_hash = hashutil.hash_git_data( hashgit_test_data.commit_data, git_type="commit" ) # then assert actual_hash == hashgit_test_data.checksums["commit_sha1_git"] def test_hashdata_tag(hashgit_test_data): # when actual_hash = hashutil.hash_git_data(hashgit_test_data.tag_data, git_type="tag") # then assert actual_hash == hashgit_test_data.checksums["tag_sha1_git"]