diff --git a/swh/model/hashutil.py b/swh/model/hashutil.py index e58f687..de85857 100644 --- a/swh/model/hashutil.py +++ b/swh/model/hashutil.py @@ -1,453 +1,360 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of hashing function definitions. This is the base module use to compute swh's hashes. Only a subset of hashing algorithms is supported as defined in the ALGORITHMS set. Any provided algorithms not in that list will result in a ValueError explaining the error. This module defines a MultiHash class to ease the softwareheritage hashing algorithms computation. This allows to compute hashes from file object, path, data using a similar interface as what the standard hashlib module provides. Basic usage examples: - file object: MultiHash.from_file( file_object, hash_names=DEFAULT_ALGORITHMS).digest() - path (filepath): MultiHash.from_path(b'foo').hexdigest() - data (bytes): MultiHash.from_data(b'foo').bytehexdigest() "Complex" usage, defining a swh hashlib instance first: - To compute length, integrate the length to the set of algorithms to compute, for example: .. code-block:: python h = MultiHash(hash_names=set({'length'}).union(DEFAULT_ALGORITHMS)) with open(filepath, 'rb') as f: h.update(f.read(HASH_BLOCK_SIZE)) hashes = h.digest() # returns a dict of {hash_algo_name: hash_in_bytes} - Write alongside computing hashing algorithms (from a stream), example: .. code-block:: python h = MultiHash(length=length) with open(filepath, 'wb') as f: for chunk in r.iter_content(): # r a stream of sort h.update(chunk) f.write(chunk) hashes = h.hexdigest() # returns a dict of {hash_algo_name: hash_in_hex} - Note: Prior to this, we would have to use chunk_cb (cf. hash_file, - hash_path) - - -This module also defines the following (deprecated) hashing functions: - -- hash_file: Hash the contents of the given file object with the given - algorithms (defaulting to DEFAULT_ALGORITHMS if none provided). - -- hash_data: Hash the given binary blob with the given algorithms - (defaulting to DEFAULT_ALGORITHMS if none provided). - -- hash_path: Hash the contents of the file at the given path with the - given algorithms (defaulting to DEFAULT_ALGORITHMS if none - provided). """ import binascii import functools import hashlib import os from io import BytesIO ALGORITHMS = set(['sha1', 'sha256', 'sha1_git', 'blake2s256', 'blake2b512']) """Hashing algorithms supported by this module""" DEFAULT_ALGORITHMS = set(['sha1', 'sha256', 'sha1_git', 'blake2s256']) """Algorithms computed by default when calling the functions from this module. Subset of :const:`ALGORITHMS`. """ HASH_BLOCK_SIZE = 32768 """Block size for streaming hash computations made in this module""" _blake2_hash_cache = {} class MultiHash: """Hashutil class to support multiple hashes computation. Args: hash_names (set): Set of hash algorithms (+ optionally length) to compute hashes (cf. DEFAULT_ALGORITHMS) length (int): Length of the total sum of chunks to read If the length is provided as algorithm, the length is also computed and returned. """ def __init__(self, hash_names=DEFAULT_ALGORITHMS, length=None): self.state = {} self.track_length = False for name in hash_names: if name == 'length': self.state['length'] = 0 self.track_length = True else: self.state[name] = _new_hash(name, length) @classmethod def from_state(cls, state, track_length): ret = cls([]) ret.state = state ret.track_length = track_length @classmethod def from_file(cls, fobj, hash_names=DEFAULT_ALGORITHMS, length=None): ret = cls(length=length, hash_names=hash_names) while True: chunk = fobj.read(HASH_BLOCK_SIZE) if not chunk: break ret.update(chunk) return ret @classmethod def from_path(cls, path, hash_names=DEFAULT_ALGORITHMS): length = os.path.getsize(path) with open(path, 'rb') as f: ret = cls.from_file(f, hash_names=hash_names, length=length) return ret @classmethod def from_data(cls, data, hash_names=DEFAULT_ALGORITHMS): length = len(data) fobj = BytesIO(data) return cls.from_file(fobj, hash_names=hash_names, length=length) def update(self, chunk): for name, h in self.state.items(): if name == 'length': continue h.update(chunk) if self.track_length: self.state['length'] += len(chunk) def digest(self): return { name: h.digest() if name != 'length' else h for name, h in self.state.items() } def hexdigest(self): return { name: h.hexdigest() if name != 'length' else h for name, h in self.state.items() } def bytehexdigest(self): return { name: hash_to_bytehex(h.digest()) if name != 'length' else h for name, h in self.state.items() } def copy(self): copied_state = { name: h.copy() if name != 'length' else h for name, h in self.state.items() } return self.from_state(copied_state, self.track_length) def _new_blake2_hash(algo): """Return a function that initializes a blake2 hash. """ if algo in _blake2_hash_cache: return _blake2_hash_cache[algo]() lalgo = algo.lower() if not lalgo.startswith('blake2'): raise ValueError('Algorithm %s is not a blake2 hash' % algo) blake_family = lalgo[:7] digest_size = None if lalgo[7:]: try: digest_size, remainder = divmod(int(lalgo[7:]), 8) except ValueError: raise ValueError( 'Unknown digest size for algo %s' % algo ) from None if remainder: raise ValueError( 'Digest size for algorithm %s must be a multiple of 8' % algo ) if lalgo in hashlib.algorithms_available: # Handle the case where OpenSSL ships the given algorithm # (e.g. Python 3.5 on Debian 9 stretch) _blake2_hash_cache[algo] = lambda: hashlib.new(lalgo) else: # Try using the built-in implementation for Python 3.6+ if blake_family in hashlib.algorithms_available: blake2 = getattr(hashlib, blake_family) else: import pyblake2 blake2 = getattr(pyblake2, blake_family) _blake2_hash_cache[algo] = lambda: blake2(digest_size=digest_size) return _blake2_hash_cache[algo]() def _new_hashlib_hash(algo): """Initialize a digest object from hashlib. Handle the swh-specific names for the blake2-related algorithms """ if algo.startswith('blake2'): return _new_blake2_hash(algo) else: return hashlib.new(algo) def _new_git_hash(base_algo, git_type, length): """Initialize a digest object (as returned by python's hashlib) for the requested algorithm, and feed it with the header for a git object of the given type and length. The header for hashing a git object consists of: - The type of the object (encoded in ASCII) - One ASCII space (\x20) - The length of the object (decimal encoded in ASCII) - One NUL byte Args: base_algo (str from :const:`ALGORITHMS`): a hashlib-supported algorithm git_type: the type of the git object (supposedly one of 'blob', 'commit', 'tag', 'tree') length: the length of the git object you're encoding Returns: a hashutil.hash object """ h = _new_hashlib_hash(base_algo) git_header = '%s %d\0' % (git_type, length) h.update(git_header.encode('ascii')) return h def _new_hash(algo, length=None): """Initialize a digest object (as returned by python's hashlib) for the requested algorithm. See the constant ALGORITHMS for the list of supported algorithms. If a git-specific hashing algorithm is requested (e.g., "sha1_git"), the hashing object will be pre-fed with the needed header; for this to work, length must be given. Args: algo (str): a hashing algorithm (one of ALGORITHMS) length (int): the length of the hashed payload (needed for git-specific algorithms) Returns: a hashutil.hash object Raises: ValueError if algo is unknown, or length is missing for a git-specific hash. """ if algo not in ALGORITHMS: raise ValueError( 'Unexpected hashing algorithm %s, expected one of %s' % (algo, ', '.join(sorted(ALGORITHMS)))) if algo.endswith('_git'): if length is None: raise ValueError('Missing length for git hashing algorithm') base_algo = algo[:-4] return _new_git_hash(base_algo, 'blob', length) return _new_hashlib_hash(algo) -def hash_file(fobj, length=None, algorithms=DEFAULT_ALGORITHMS, - chunk_cb=None): - """(Deprecated) cf. MultiHash.from_file - - Hash the contents of the given file object with the given algorithms. - - Args: - fobj: a file-like object - length (int): the length of the contents of the file-like - object (for the git-specific algorithms) - algorithms (set): the hashing algorithms to be used, as an - iterable over strings - chunk_cb (fun): a callback function taking a chunk of data as - parameter - - Returns: - a dict mapping each algorithm to a digest (bytes by default). - - Raises: - ValueError if algorithms contains an unknown hash algorithm. - - """ - h = MultiHash(algorithms, length) - while True: - chunk = fobj.read(HASH_BLOCK_SIZE) - if not chunk: - break - h.update(chunk) - if chunk_cb: - chunk_cb(chunk) - - return h.digest() - - -def hash_path(path, algorithms=DEFAULT_ALGORITHMS, chunk_cb=None): - """(deprecated) cf. MultiHash.from_path - - Hash the contents of the file at the given path with the given - algorithms. - - Args: - path (str): the path of the file to hash - algorithms (set): the hashing algorithms used - chunk_cb (fun): a callback function taking a chunk of data as parameter - - Returns: a dict mapping each algorithm to a bytes digest. - - Raises: - ValueError if algorithms contains an unknown hash algorithm. - OSError on file access error - - """ - length = os.path.getsize(path) - with open(path, 'rb') as fobj: - hashes = hash_file(fobj, length, algorithms, chunk_cb=chunk_cb) - hashes['length'] = length - return hashes - - -def hash_data(data, algorithms=DEFAULT_ALGORITHMS): - """(deprecated) cf. MultiHash.from_data - - Hash the given binary blob with the given algorithms. - - Args: - data (bytes): raw content to hash - algorithms (set): the hashing algorithms used - - Returns: a dict mapping each algorithm to a bytes digest - - Raises: - TypeError if data does not support the buffer interface. - ValueError if algorithms contains an unknown hash algorithm. - - """ - return MultiHash.from_data(data, hash_names=algorithms).digest() - - def hash_git_data(data, git_type, base_algo='sha1'): """Hash the given data as a git object of type git_type. Args: data: a bytes object git_type: the git object type base_algo: the base hashing algorithm used (default: sha1) Returns: a dict mapping each algorithm to a bytes digest Raises: ValueError if the git_type is unexpected. """ git_object_types = {'blob', 'tree', 'commit', 'tag', 'snapshot'} if git_type not in git_object_types: raise ValueError('Unexpected git object type %s, expected one of %s' % (git_type, ', '.join(sorted(git_object_types)))) h = _new_git_hash(base_algo, git_type, len(data)) h.update(data) return h.digest() @functools.lru_cache() def hash_to_hex(hash): """Converts a hash (in hex or bytes form) to its hexadecimal ascii form Args: hash (str or bytes): a :class:`bytes` hash or a :class:`str` containing the hexadecimal form of the hash Returns: str: the hexadecimal form of the hash """ if isinstance(hash, str): return hash return binascii.hexlify(hash).decode('ascii') @functools.lru_cache() def hash_to_bytehex(hash): """Converts a hash to its hexadecimal bytes representation Args: hash (bytes): a :class:`bytes` hash Returns: bytes: the hexadecimal form of the hash, as :class:`bytes` """ return binascii.hexlify(hash) @functools.lru_cache() def hash_to_bytes(hash): """Converts a hash (in hex or bytes form) to its raw bytes form Args: hash (str or bytes): a :class:`bytes` hash or a :class:`str` containing the hexadecimal form of the hash Returns: bytes: the :class:`bytes` form of the hash """ if isinstance(hash, bytes): return hash return bytes.fromhex(hash) @functools.lru_cache() def bytehex_to_hash(hex): """Converts a hexadecimal bytes representation of a hash to that hash Args: hash (bytes): a :class:`bytes` containing the hexadecimal form of the hash encoded in ascii Returns: bytes: the :class:`bytes` form of the hash """ return hash_to_bytes(hex.decode()) diff --git a/swh/model/tests/test_hashutil.py b/swh/model/tests/test_hashutil.py index 0e41068..abdff97 100644 --- a/swh/model/tests/test_hashutil.py +++ b/swh/model/tests/test_hashutil.py @@ -1,379 +1,334 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import io import os import tempfile import unittest from unittest.mock import patch from swh.model import hashutil from swh.model.hashutil import MultiHash class BaseHashutil(unittest.TestCase): def setUp(self): # Reset function cache hashutil._blake2_hash_cache = {} self.data = b'1984\n' self.hex_checksums = { 'sha1': '62be35bf00ff0c624f4a621e2ea5595a049e0731', 'sha1_git': '568aaf43d83b2c3df8067f3bedbb97d83260be6d', 'sha256': '26602113b4b9afd9d55466b08580d3c2' '4a9b50ee5b5866c0d91fab0e65907311', 'blake2s256': '63cfb259e1fdb485bc5c55749697a6b21ef31fb7445f6c78a' 'c9422f9f2dc8906', } self.checksums = { type: bytes.fromhex(cksum) for type, cksum in self.hex_checksums.items() } self.bytehex_checksums = { type: hashutil.hash_to_bytehex(cksum) for type, cksum in self.checksums.items() } self.git_hex_checksums = { 'blob': self.hex_checksums['sha1_git'], 'tree': '5b2e883aa33d2efab98442693ea4dd5f1b8871b0', 'commit': '79e4093542e72f0fcb7cbd75cb7d270f9254aa8f', 'tag': 'd6bf62466f287b4d986c545890716ce058bddf67', } self.git_checksums = { type: bytes.fromhex(cksum) for type, cksum in self.git_hex_checksums.items() } class MultiHashTest(BaseHashutil): def test_multi_hash_data(self): checksums = MultiHash.from_data(self.data).digest() self.assertEqual(checksums, self.checksums) self.assertFalse('length' in checksums) def test_multi_hash_data_with_length(self): expected_checksums = self.checksums.copy() expected_checksums['length'] = len(self.data) algos = set(['length']).union(hashutil.DEFAULT_ALGORITHMS) checksums = MultiHash.from_data(self.data, hash_names=algos).digest() self.assertEqual(checksums, expected_checksums) self.assertTrue('length' in checksums) def test_multi_hash_data_unknown_hash(self): with self.assertRaises(ValueError) as cm: MultiHash.from_data(self.data, ['unknown-hash']) self.assertIn('Unexpected hashing algorithm', cm.exception.args[0]) self.assertIn('unknown-hash', cm.exception.args[0]) def test_multi_hash_file(self): fobj = io.BytesIO(self.data) checksums = MultiHash.from_file(fobj, length=len(self.data)).digest() self.assertEqual(checksums, self.checksums) def test_multi_hash_file_hexdigest(self): fobj = io.BytesIO(self.data) length = len(self.data) checksums = MultiHash.from_file(fobj, length=length).hexdigest() self.assertEqual(checksums, self.hex_checksums) def test_multi_hash_file_bytehexdigest(self): fobj = io.BytesIO(self.data) length = len(self.data) checksums = MultiHash.from_file(fobj, length=length).bytehexdigest() self.assertEqual(checksums, self.bytehex_checksums) def test_multi_hash_file_missing_length(self): fobj = io.BytesIO(self.data) with self.assertRaises(ValueError) as cm: MultiHash.from_file(fobj, hash_names=['sha1_git']) self.assertIn('Missing length', cm.exception.args[0]) def test_multi_hash_path(self): with tempfile.NamedTemporaryFile(delete=False) as f: f.write(self.data) hashes = MultiHash.from_path(f.name).digest() os.remove(f.name) self.assertEqual(self.checksums, hashes) class Hashutil(BaseHashutil): - def test_hash_data(self): - checksums = hashutil.hash_data(self.data) - self.assertEqual(checksums, self.checksums) - self.assertFalse('length' in checksums) - - def test_hash_data_with_length(self): - expected_checksums = self.checksums.copy() - expected_checksums['length'] = len(self.data) - - algos = set(['length']).union(hashutil.DEFAULT_ALGORITHMS) - checksums = hashutil.hash_data(self.data, algorithms=algos) - - self.assertEqual(checksums, expected_checksums) - self.assertTrue('length' in checksums) - - def test_hash_data_unknown_hash(self): - with self.assertRaises(ValueError) as cm: - hashutil.hash_data(self.data, ['unknown-hash']) - - self.assertIn('Unexpected hashing algorithm', cm.exception.args[0]) - self.assertIn('unknown-hash', cm.exception.args[0]) def test_hash_git_data(self): checksums = { git_type: hashutil.hash_git_data(self.data, git_type) for git_type in self.git_checksums } self.assertEqual(checksums, self.git_checksums) def test_hash_git_data_unknown_git_type(self): with self.assertRaises(ValueError) as cm: hashutil.hash_git_data(self.data, 'unknown-git-type') self.assertIn('Unexpected git object type', cm.exception.args[0]) self.assertIn('unknown-git-type', cm.exception.args[0]) - def test_hash_file(self): - fobj = io.BytesIO(self.data) - - checksums = hashutil.hash_file(fobj, length=len(self.data)) - self.assertEqual(checksums, self.checksums) - - def test_hash_file_missing_length(self): - fobj = io.BytesIO(self.data) - - with self.assertRaises(ValueError) as cm: - hashutil.hash_file(fobj, algorithms=['sha1_git']) - - self.assertIn('Missing length', cm.exception.args[0]) - - def test_hash_path(self): - with tempfile.NamedTemporaryFile(delete=False) as f: - f.write(self.data) - - hashes = hashutil.hash_path(f.name) - os.remove(f.name) - - self.checksums['length'] = len(self.data) - self.assertEqual(self.checksums, hashes) - def test_hash_to_hex(self): for type in self.checksums: hex = self.hex_checksums[type] hash = self.checksums[type] self.assertEqual(hashutil.hash_to_hex(hex), hex) self.assertEqual(hashutil.hash_to_hex(hash), hex) def test_hash_to_bytes(self): for type in self.checksums: hex = self.hex_checksums[type] hash = self.checksums[type] self.assertEqual(hashutil.hash_to_bytes(hex), hash) self.assertEqual(hashutil.hash_to_bytes(hash), hash) def test_hash_to_bytehex(self): for algo in self.checksums: self.assertEqual(self.hex_checksums[algo].encode('ascii'), hashutil.hash_to_bytehex(self.checksums[algo])) def test_bytehex_to_hash(self): for algo in self.checksums: self.assertEqual(self.checksums[algo], hashutil.bytehex_to_hash( self.hex_checksums[algo].encode())) def test_new_hash_unsupported_hashing_algorithm(self): try: hashutil._new_hash('blake2:10') except ValueError as e: self.assertEqual(str(e), 'Unexpected hashing algorithm blake2:10, ' 'expected one of blake2b512, blake2s256, ' 'sha1, sha1_git, sha256') @patch('hashlib.new') def test_new_hash_blake2b_blake2b512_builtin(self, mock_hashlib_new): if 'blake2b512' not in hashlib.algorithms_available: self.skipTest('blake2b512 not built-in') mock_hashlib_new.return_value = sentinel = object() h = hashutil._new_hash('blake2b512') self.assertIs(h, sentinel) mock_hashlib_new.assert_called_with('blake2b512') @patch('hashlib.new') def test_new_hash_blake2s_blake2s256_builtin(self, mock_hashlib_new): if 'blake2s256' not in hashlib.algorithms_available: self.skipTest('blake2s256 not built-in') mock_hashlib_new.return_value = sentinel = object() h = hashutil._new_hash('blake2s256') self.assertIs(h, sentinel) mock_hashlib_new.assert_called_with('blake2s256') def test_new_hash_blake2b_builtin(self): removed_hash = False try: if 'blake2b512' in hashlib.algorithms_available: removed_hash = True hashlib.algorithms_available.remove('blake2b512') if 'blake2b' not in hashlib.algorithms_available: self.skipTest('blake2b not built in') with patch('hashlib.blake2b') as mock_blake2b: mock_blake2b.return_value = sentinel = object() h = hashutil._new_hash('blake2b512') self.assertIs(h, sentinel) mock_blake2b.assert_called_with(digest_size=512//8) finally: if removed_hash: hashlib.algorithms_available.add('blake2b512') def test_new_hash_blake2s_builtin(self): removed_hash = False try: if 'blake2s256' in hashlib.algorithms_available: removed_hash = True hashlib.algorithms_available.remove('blake2s256') if 'blake2s' not in hashlib.algorithms_available: self.skipTest('blake2s not built in') with patch('hashlib.blake2s') as mock_blake2s: mock_blake2s.return_value = sentinel = object() h = hashutil._new_hash('blake2s256') self.assertIs(h, sentinel) mock_blake2s.assert_called_with(digest_size=256//8) finally: if removed_hash: hashlib.algorithms_available.add('blake2s256') def test_new_hash_blake2b_pyblake2(self): if 'blake2b512' in hashlib.algorithms_available: self.skipTest('blake2b512 built in') if 'blake2b' in hashlib.algorithms_available: self.skipTest('blake2b built in') with patch('pyblake2.blake2b') as mock_blake2b: mock_blake2b.return_value = sentinel = object() h = hashutil._new_hash('blake2b512') self.assertIs(h, sentinel) mock_blake2b.assert_called_with(digest_size=512//8) def test_new_hash_blake2s_pyblake2(self): if 'blake2s256' in hashlib.algorithms_available: self.skipTest('blake2s256 built in') if 'blake2s' in hashlib.algorithms_available: self.skipTest('blake2s built in') with patch('pyblake2.blake2s') as mock_blake2s: mock_blake2s.return_value = sentinel = object() h = hashutil._new_hash('blake2s256') self.assertIs(h, sentinel) mock_blake2s.assert_called_with(digest_size=256//8) class HashlibGit(unittest.TestCase): def setUp(self): self.blob_data = b'42\n' self.tree_data = b''.join([b'40000 barfoo\0', bytes.fromhex('c3020f6bf135a38c6df' '3afeb5fb38232c5e07087'), b'100644 blah\0', bytes.fromhex('63756ef0df5e4f10b6efa' '33cfe5c758749615f20'), b'100644 hello\0', bytes.fromhex('907b308167f0880fb2a' '5c0e1614bb0c7620f9dc3')]) self.commit_data = """tree 1c61f7259dcb770f46b194d941df4f08ff0a3970 author Antoine R. Dumont (@ardumont) 1444054085 +0200 committer Antoine R. Dumont (@ardumont) 1444054085 +0200 initial """.encode('utf-8') # NOQA self.tag_data = """object 24d012aaec0bc5a4d2f62c56399053d6cc72a241 type commit tag 0.0.1 tagger Antoine R. Dumont (@ardumont) 1444225145 +0200 blah """.encode('utf-8') # NOQA self.checksums = { 'blob_sha1_git': bytes.fromhex('d81cc0710eb6cf9efd5b920a8453e1' 'e07157b6cd'), 'tree_sha1_git': bytes.fromhex('ac212302c45eada382b27bfda795db' '121dacdb1c'), 'commit_sha1_git': bytes.fromhex('e960570b2e6e2798fa4cfb9af2c399' 'd629189653'), 'tag_sha1_git': bytes.fromhex('bc2b99ba469987bcf1272c189ed534' 'e9e959f120'), } def test_unknown_header_type(self): with self.assertRaises(ValueError) as cm: hashutil.hash_git_data(b'any-data', 'some-unknown-type') self.assertIn('Unexpected git object type', cm.exception.args[0]) def test_hashdata_content(self): # when actual_hash = hashutil.hash_git_data(self.blob_data, git_type='blob') # then self.assertEqual(actual_hash, self.checksums['blob_sha1_git']) def test_hashdata_tree(self): # when actual_hash = hashutil.hash_git_data(self.tree_data, git_type='tree') # then self.assertEqual(actual_hash, self.checksums['tree_sha1_git']) def test_hashdata_revision(self): # when actual_hash = hashutil.hash_git_data(self.commit_data, git_type='commit') # then self.assertEqual(actual_hash, self.checksums['commit_sha1_git']) def test_hashdata_tag(self): # when actual_hash = hashutil.hash_git_data(self.tag_data, git_type='tag') # then self.assertEqual(actual_hash, self.checksums['tag_sha1_git']) diff --git a/swh/model/tests/test_identifiers.py b/swh/model/tests/test_identifiers.py index de96865..1492b87 100644 --- a/swh/model/tests/test_identifiers.py +++ b/swh/model/tests/test_identifiers.py @@ -1,894 +1,895 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime import unittest from swh.model import hashutil, identifiers from swh.model.exceptions import ValidationError from swh.model.identifiers import (CONTENT, DIRECTORY, PERSISTENT_IDENTIFIER_TYPES, RELEASE, REVISION, SNAPSHOT, PersistentId) class UtilityFunctionsIdentifier(unittest.TestCase): def setUp(self): self.str_id = 'c2e41aae41ac17bd4a650770d6ee77f62e52235b' self.bytes_id = binascii.unhexlify(self.str_id) self.bad_type_id = object() def test_identifier_to_bytes(self): for id in [self.str_id, self.bytes_id]: self.assertEqual(identifiers.identifier_to_bytes(id), self.bytes_id) # wrong length with self.assertRaises(ValueError) as cm: identifiers.identifier_to_bytes(id[:-2]) self.assertIn('length', str(cm.exception)) with self.assertRaises(ValueError) as cm: identifiers.identifier_to_bytes(self.bad_type_id) self.assertIn('type', str(cm.exception)) def test_identifier_to_str(self): for id in [self.str_id, self.bytes_id]: self.assertEqual(identifiers.identifier_to_str(id), self.str_id) # wrong length with self.assertRaises(ValueError) as cm: identifiers.identifier_to_str(id[:-2]) self.assertIn('length', str(cm.exception)) with self.assertRaises(ValueError) as cm: identifiers.identifier_to_str(self.bad_type_id) self.assertIn('type', str(cm.exception)) class UtilityFunctionsDateOffset(unittest.TestCase): def setUp(self): self.dates = { b'1448210036': { 'seconds': 1448210036, 'microseconds': 0, }, b'1448210036.002342': { 'seconds': 1448210036, 'microseconds': 2342, }, b'1448210036.12': { 'seconds': 1448210036, 'microseconds': 120000, } } self.broken_dates = [ 1448210036.12, ] self.offsets = { 0: b'+0000', -630: b'-1030', 800: b'+1320', } def test_format_date(self): for date_repr, date in self.dates.items(): self.assertEqual(identifiers.format_date(date), date_repr) def test_format_date_fail(self): for date in self.broken_dates: with self.assertRaises(ValueError): identifiers.format_date(date) def test_format_offset(self): for offset, res in self.offsets.items(): self.assertEqual(identifiers.format_offset(offset), res) class ContentIdentifier(unittest.TestCase): def setUp(self): self.content = { 'status': 'visible', 'length': 5, 'data': b'1984\n', 'ctime': datetime.datetime(2015, 11, 22, 16, 33, 56, tzinfo=datetime.timezone.utc), } - self.content_id = hashutil.hash_data(self.content['data']) + self.content_id = hashutil.MultiHash.from_data( + self.content['data']).digest() def test_content_identifier(self): self.assertEqual(identifiers.content_identifier(self.content), self.content_id) class DirectoryIdentifier(unittest.TestCase): def setUp(self): self.directory = { 'id': 'c2e41aae41ac17bd4a650770d6ee77f62e52235b', 'entries': [ { 'type': 'file', 'perms': 33188, 'name': b'README', 'target': '37ec8ea2110c0b7a32fbb0e872f6e7debbf95e21' }, { 'type': 'file', 'perms': 33188, 'name': b'Rakefile', 'target': '3bb0e8592a41ae3185ee32266c860714980dbed7' }, { 'type': 'dir', 'perms': 16384, 'name': b'app', 'target': '61e6e867f5d7ba3b40540869bc050b0c4fed9e95' }, { 'type': 'file', 'perms': 33188, 'name': b'1.megabyte', 'target': '7c2b2fbdd57d6765cdc9d84c2d7d333f11be7fb3' }, { 'type': 'dir', 'perms': 16384, 'name': b'config', 'target': '591dfe784a2e9ccc63aaba1cb68a765734310d98' }, { 'type': 'dir', 'perms': 16384, 'name': b'public', 'target': '9588bf4522c2b4648bfd1c61d175d1f88c1ad4a5' }, { 'type': 'file', 'perms': 33188, 'name': b'development.sqlite3', 'target': 'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391' }, { 'type': 'dir', 'perms': 16384, 'name': b'doc', 'target': '154705c6aa1c8ead8c99c7915373e3c44012057f' }, { 'type': 'dir', 'perms': 16384, 'name': b'db', 'target': '85f157bdc39356b7bc7de9d0099b4ced8b3b382c' }, { 'type': 'dir', 'perms': 16384, 'name': b'log', 'target': '5e3d3941c51cce73352dff89c805a304ba96fffe' }, { 'type': 'dir', 'perms': 16384, 'name': b'script', 'target': '1b278423caf176da3f3533592012502aa10f566c' }, { 'type': 'dir', 'perms': 16384, 'name': b'test', 'target': '035f0437c080bfd8711670b3e8677e686c69c763' }, { 'type': 'dir', 'perms': 16384, 'name': b'vendor', 'target': '7c0dc9ad978c1af3f9a4ce061e50f5918bd27138' }, { 'type': 'rev', 'perms': 57344, 'name': b'will_paginate', 'target': '3d531e169db92a16a9a8974f0ae6edf52e52659e' } ], } self.empty_directory = { 'id': '4b825dc642cb6eb9a060e54bf8d69288fbee4904', 'entries': [], } def test_dir_identifier(self): self.assertEqual( identifiers.directory_identifier(self.directory), self.directory['id']) def test_dir_identifier_empty_directory(self): self.assertEqual( identifiers.directory_identifier(self.empty_directory), self.empty_directory['id']) class RevisionIdentifier(unittest.TestCase): def setUp(self): linus_tz = datetime.timezone(datetime.timedelta(minutes=-420)) gpgsig = b'''\ -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.13 (Darwin) iQIcBAABAgAGBQJVJcYsAAoJEBiY3kIkQRNJVAUQAJ8/XQIfMqqC5oYeEFfHOPYZ L7qy46bXHVBa9Qd8zAJ2Dou3IbI2ZoF6/Et89K/UggOycMlt5FKV/9toWyuZv4Po L682wonoxX99qvVTHo6+wtnmYO7+G0f82h+qHMErxjP+I6gzRNBvRr+SfY7VlGdK wikMKOMWC5smrScSHITnOq1Ews5pe3N7qDYMzK0XVZmgDoaem4RSWMJs4My/qVLN e0CqYWq2A22GX7sXl6pjneJYQvcAXUX+CAzp24QnPSb+Q22Guj91TcxLFcHCTDdn qgqMsEyMiisoglwrCbO+D+1xq9mjN9tNFWP66SQ48mrrHYTBV5sz9eJyDfroJaLP CWgbDTgq6GzRMehHT3hXfYS5NNatjnhkNISXR7pnVP/obIi/vpWh5ll6Gd8q26z+ a/O41UzOaLTeNI365MWT4/cnXohVLRG7iVJbAbCxoQmEgsYMRc/pBAzWJtLfcB2G jdTswYL6+MUdL8sB9pZ82D+BP/YAdHe69CyTu1lk9RT2pYtI/kkfjHubXBCYEJSG +VGllBbYG6idQJpyrOYNRJyrDi9yvDJ2W+S0iQrlZrxzGBVGTB/y65S8C+2WTBcE lf1Qb5GDsQrZWgD+jtWTywOYHtCBwyCKSAXxSARMbNPeak9WPlcW/Jmu+fUcMe2x dg1KdHOa34shrKDaOVzW =od6m -----END PGP SIGNATURE-----''' self.revision = { 'id': 'bc0195aad0daa2ad5b0d76cce22b167bc3435590', 'directory': '85a74718d377195e1efd0843ba4f3260bad4fe07', 'parents': ['01e2d0627a9a6edb24c37db45db5ecb31e9de808'], 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', }, 'date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'committer': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', }, 'committer_date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'message': b'Linux 4.2-rc2\n', } self.revision_none_metadata = { 'id': 'bc0195aad0daa2ad5b0d76cce22b167bc3435590', 'directory': '85a74718d377195e1efd0843ba4f3260bad4fe07', 'parents': ['01e2d0627a9a6edb24c37db45db5ecb31e9de808'], 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', }, 'date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'committer': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', }, 'committer_date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'message': b'Linux 4.2-rc2\n', 'metadata': None, } self.synthetic_revision = { 'id': b'\xb2\xa7\xe1&\x04\x92\xe3D\xfa\xb3\xcb\xf9\x1b\xc1<\x91' b'\xe0T&\xfd', 'author': { 'name': b'Software Heritage', 'email': b'robot@softwareheritage.org', }, 'date': { 'timestamp': {'seconds': 1437047495}, 'offset': 0, 'negative_utc': False, }, 'type': 'tar', 'committer': { 'name': b'Software Heritage', 'email': b'robot@softwareheritage.org', }, 'committer_date': 1437047495, 'synthetic': True, 'parents': [None], 'message': b'synthetic revision message\n', 'directory': b'\xd1\x1f\x00\xa6\xa0\xfe\xa6\x05SA\xd2U\x84\xb5\xa9' b'e\x16\xc0\xd2\xb8', 'metadata': {'original_artifact': [ {'archive_type': 'tar', 'name': 'gcc-5.2.0.tar.bz2', 'sha1_git': '39d281aff934d44b439730057e55b055e206a586', 'sha1': 'fe3f5390949d47054b613edc36c557eb1d51c18e', 'sha256': '5f835b04b5f7dd4f4d2dc96190ec1621b8d89f' '2dc6f638f9f8bc1b1014ba8cad'}]}, } # cat commit.txt | git hash-object -t commit --stdin self.revision_with_extra_headers = { 'id': '010d34f384fa99d047cdd5e2f41e56e5c2feee45', 'directory': '85a74718d377195e1efd0843ba4f3260bad4fe07', 'parents': ['01e2d0627a9a6edb24c37db45db5ecb31e9de808'], 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', 'fullname': b'Linus Torvalds ', }, 'date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'committer': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', 'fullname': b'Linus Torvalds ', }, 'committer_date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'message': b'Linux 4.2-rc2\n', 'metadata': { 'extra_headers': [ ['svn-repo-uuid', '046f1af7-66c2-d61b-5410-ce57b7db7bff'], ['svn-revision', 10], ] } } self.revision_with_gpgsig = { 'id': '44cc742a8ca17b9c279be4cc195a93a6ef7a320e', 'directory': 'b134f9b7dc434f593c0bab696345548b37de0558', 'parents': ['689664ae944b4692724f13b709a4e4de28b54e57', 'c888305e1efbaa252d01b4e5e6b778f865a97514'], 'author': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', 'fullname': b'Jiang Xin ', }, 'date': { 'timestamp': 1428538899, 'offset': 480, }, 'committer': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', }, 'committer_date': { 'timestamp': 1428538899, 'offset': 480, }, 'metadata': { 'extra_headers': [ ['gpgsig', gpgsig], ], }, 'message': b'''Merge branch 'master' of git://github.com/alexhenrie/git-po * 'master' of git://github.com/alexhenrie/git-po: l10n: ca.po: update translation ''' } self.revision_no_message = { 'id': '4cfc623c9238fa92c832beed000ce2d003fd8333', 'directory': 'b134f9b7dc434f593c0bab696345548b37de0558', 'parents': ['689664ae944b4692724f13b709a4e4de28b54e57', 'c888305e1efbaa252d01b4e5e6b778f865a97514'], 'author': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', 'fullname': b'Jiang Xin ', }, 'date': { 'timestamp': 1428538899, 'offset': 480, }, 'committer': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', }, 'committer_date': { 'timestamp': 1428538899, 'offset': 480, }, 'message': None, } self.revision_empty_message = { 'id': '7442cd78bd3b4966921d6a7f7447417b7acb15eb', 'directory': 'b134f9b7dc434f593c0bab696345548b37de0558', 'parents': ['689664ae944b4692724f13b709a4e4de28b54e57', 'c888305e1efbaa252d01b4e5e6b778f865a97514'], 'author': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', 'fullname': b'Jiang Xin ', }, 'date': { 'timestamp': 1428538899, 'offset': 480, }, 'committer': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', }, 'committer_date': { 'timestamp': 1428538899, 'offset': 480, }, 'message': b'', } self.revision_only_fullname = { 'id': '010d34f384fa99d047cdd5e2f41e56e5c2feee45', 'directory': '85a74718d377195e1efd0843ba4f3260bad4fe07', 'parents': ['01e2d0627a9a6edb24c37db45db5ecb31e9de808'], 'author': { 'fullname': b'Linus Torvalds ', }, 'date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'committer': { 'fullname': b'Linus Torvalds ', }, 'committer_date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'message': b'Linux 4.2-rc2\n', 'metadata': { 'extra_headers': [ ['svn-repo-uuid', '046f1af7-66c2-d61b-5410-ce57b7db7bff'], ['svn-revision', 10], ] } } def test_revision_identifier(self): self.assertEqual( identifiers.revision_identifier(self.revision), identifiers.identifier_to_str(self.revision['id']), ) def test_revision_identifier_none_metadata(self): self.assertEqual( identifiers.revision_identifier(self.revision_none_metadata), identifiers.identifier_to_str(self.revision_none_metadata['id']), ) def test_revision_identifier_synthetic(self): self.assertEqual( identifiers.revision_identifier(self.synthetic_revision), identifiers.identifier_to_str(self.synthetic_revision['id']), ) def test_revision_identifier_with_extra_headers(self): self.assertEqual( identifiers.revision_identifier( self.revision_with_extra_headers), identifiers.identifier_to_str( self.revision_with_extra_headers['id']), ) def test_revision_identifier_with_gpgsig(self): self.assertEqual( identifiers.revision_identifier( self.revision_with_gpgsig), identifiers.identifier_to_str( self.revision_with_gpgsig['id']), ) def test_revision_identifier_no_message(self): self.assertEqual( identifiers.revision_identifier( self.revision_no_message), identifiers.identifier_to_str( self.revision_no_message['id']), ) def test_revision_identifier_empty_message(self): self.assertEqual( identifiers.revision_identifier( self.revision_empty_message), identifiers.identifier_to_str( self.revision_empty_message['id']), ) def test_revision_identifier_only_fullname(self): self.assertEqual( identifiers.revision_identifier( self.revision_only_fullname), identifiers.identifier_to_str( self.revision_only_fullname['id']), ) class ReleaseIdentifier(unittest.TestCase): def setUp(self): linus_tz = datetime.timezone(datetime.timedelta(minutes=-420)) self.release = { 'id': '2b10839e32c4c476e9d94492756bb1a3e1ec4aa8', 'target': b't\x1b"R\xa5\xe1Ml`\xa9\x13\xc7z`\x99\xab\xe7:\x85J', 'target_type': 'revision', 'name': b'v2.6.14', 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@g5.osdl.org', }, 'date': datetime.datetime(2005, 10, 27, 17, 2, 33, tzinfo=linus_tz), 'message': b'''\ Linux 2.6.14 release -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.1 (GNU/Linux) iD8DBQBDYWq6F3YsRnbiHLsRAmaeAJ9RCez0y8rOBbhSv344h86l/VVcugCeIhO1 wdLOnvj91G4wxYqrvThthbE= =7VeT -----END PGP SIGNATURE----- ''', 'synthetic': False, } self.release_no_author = { 'id': b'&y\x1a\x8b\xcf\x0em3\xf4:\xefv\x82\xbd\xb5U#mV\xde', 'target': '9ee1c939d1cb936b1f98e8d81aeffab57bae46ab', 'target_type': 'revision', 'name': b'v2.6.12', 'message': b'''\ This is the final 2.6.12 release -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.2.4 (GNU/Linux) iD8DBQBCsykyF3YsRnbiHLsRAvPNAJ482tCZwuxp/bJRz7Q98MHlN83TpACdHr37 o6X/3T+vm8K3bf3driRr34c= =sBHn -----END PGP SIGNATURE----- ''', 'synthetic': False, } self.release_no_message = { 'id': 'b6f4f446715f7d9543ef54e41b62982f0db40045', 'target': '9ee1c939d1cb936b1f98e8d81aeffab57bae46ab', 'target_type': 'revision', 'name': b'v2.6.12', 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@g5.osdl.org', }, 'date': datetime.datetime(2005, 10, 27, 17, 2, 33, tzinfo=linus_tz), 'message': None, } self.release_empty_message = { 'id': '71a0aea72444d396575dc25ac37fec87ee3c6492', 'target': '9ee1c939d1cb936b1f98e8d81aeffab57bae46ab', 'target_type': 'revision', 'name': b'v2.6.12', 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@g5.osdl.org', }, 'date': datetime.datetime(2005, 10, 27, 17, 2, 33, tzinfo=linus_tz), 'message': b'', } self.release_negative_utc = { 'id': '97c8d2573a001f88e72d75f596cf86b12b82fd01', 'name': b'20081029', 'target': '54e9abca4c77421e2921f5f156c9fe4a9f7441c7', 'target_type': 'revision', 'date': { 'timestamp': {'seconds': 1225281976}, 'offset': 0, 'negative_utc': True, }, 'author': { 'name': b'Otavio Salvador', 'email': b'otavio@debian.org', 'id': 17640, }, 'synthetic': False, 'message': b'tagging version 20081029\n\nr56558\n', } self.release_newline_in_author = { 'author': { 'email': b'esycat@gmail.com', 'fullname': b'Eugene Janusov\n', 'name': b'Eugene Janusov\n', }, 'date': { 'negative_utc': None, 'offset': 600, 'timestamp': { 'microseconds': 0, 'seconds': 1377480558, }, }, 'id': b'\\\x98\xf5Y\xd04\x16-\xe2->\xbe\xb9T3\xe6\xf8\x88R1', 'message': b'Release of v0.3.2.', 'name': b'0.3.2', 'synthetic': False, 'target': (b'\xc0j\xa3\xd9;x\xa2\x86\\I5\x17' b'\x000\xf8\xc2\xd79o\xd3'), 'target_type': 'revision', } def test_release_identifier(self): self.assertEqual( identifiers.release_identifier(self.release), identifiers.identifier_to_str(self.release['id']) ) def test_release_identifier_no_author(self): self.assertEqual( identifiers.release_identifier(self.release_no_author), identifiers.identifier_to_str(self.release_no_author['id']) ) def test_release_identifier_no_message(self): self.assertEqual( identifiers.release_identifier(self.release_no_message), identifiers.identifier_to_str(self.release_no_message['id']) ) def test_release_identifier_empty_message(self): self.assertEqual( identifiers.release_identifier(self.release_empty_message), identifiers.identifier_to_str(self.release_empty_message['id']) ) def test_release_identifier_negative_utc(self): self.assertEqual( identifiers.release_identifier(self.release_negative_utc), identifiers.identifier_to_str(self.release_negative_utc['id']) ) def test_release_identifier_newline_in_author(self): self.assertEqual( identifiers.release_identifier(self.release_newline_in_author), identifiers.identifier_to_str(self.release_newline_in_author['id']) ) class SnapshotIdentifier(unittest.TestCase): def setUp(self): super().setUp() self.empty = { 'id': '1a8893e6a86f444e8be8e7bda6cb34fb1735a00e', 'branches': {}, } self.dangling_branch = { 'id': 'c84502e821eb21ed84e9fd3ec40973abc8b32353', 'branches': { b'HEAD': None, }, } self.unresolved = { 'id': '84b4548ea486e4b0a7933fa541ff1503a0afe1e0', 'branches': { b'foo': { 'target': b'bar', 'target_type': 'alias', }, }, } self.all_types = { 'id': '6e65b86363953b780d92b0a928f3e8fcdd10db36', 'branches': { b'directory': { 'target': '1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8', 'target_type': 'directory', }, b'content': { 'target': 'fe95a46679d128ff167b7c55df5d02356c5a1ae1', 'target_type': 'content', }, b'alias': { 'target': b'revision', 'target_type': 'alias', }, b'revision': { 'target': 'aafb16d69fd30ff58afdd69036a26047f3aebdc6', 'target_type': 'revision', }, b'release': { 'target': '7045404f3d1c54e6473c71bbb716529fbad4be24', 'target_type': 'release', }, b'snapshot': { 'target': '1a8893e6a86f444e8be8e7bda6cb34fb1735a00e', 'target_type': 'snapshot', }, b'dangling': None, } } def test_empty_snapshot(self): self.assertEqual( identifiers.snapshot_identifier(self.empty), identifiers.identifier_to_str(self.empty['id']), ) def test_dangling_branch(self): self.assertEqual( identifiers.snapshot_identifier(self.dangling_branch), identifiers.identifier_to_str(self.dangling_branch['id']), ) def test_unresolved(self): with self.assertRaisesRegex(ValueError, "b'foo' -> b'bar'"): identifiers.snapshot_identifier(self.unresolved) def test_unresolved_force(self): self.assertEqual( identifiers.snapshot_identifier( self.unresolved, ignore_unresolved=True, ), identifiers.identifier_to_str(self.unresolved['id']), ) def test_all_types(self): self.assertEqual( identifiers.snapshot_identifier(self.all_types), identifiers.identifier_to_str(self.all_types['id']), ) def test_persistent_identifier(self): _snapshot_id = hashutil.hash_to_bytes( 'c7c108084bc0bf3d81436bf980b46e98bd338453') _release_id = '22ece559cc7cc2364edc5e5593d63ae8bd229f9f' _revision_id = '309cf2674ee7a0749978cf8265ab91a60aea0f7d' _directory_id = 'd198bc9d7a6bcf6db04f476d29314f157507d505' _content_id = '94a9ed024d3859793618152ea559a168bbcbb5e2' _snapshot = {'id': _snapshot_id} _release = {'id': _release_id} _revision = {'id': _revision_id} _directory = {'id': _directory_id} _content = {'sha1_git': _content_id} for full_type, _hash, expected_persistent_id, version, _meta in [ (SNAPSHOT, _snapshot_id, 'swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453', None, {}), (RELEASE, _release_id, 'swh:2:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f', 2, {}), (REVISION, _revision_id, 'swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d', None, {}), (DIRECTORY, _directory_id, 'swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505', None, {}), (CONTENT, _content_id, 'swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2', 1, {}), (SNAPSHOT, _snapshot, 'swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453', None, {}), (RELEASE, _release, 'swh:2:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f', 2, {}), (REVISION, _revision, 'swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d', None, {}), (DIRECTORY, _directory, 'swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505', None, {}), (CONTENT, _content, 'swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2', 1, {}), (CONTENT, _content, 'swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2;origin=1', 1, {'origin': '1'}), ]: if version: actual_value = identifiers.persistent_identifier( full_type, _hash, version, metadata=_meta) else: actual_value = identifiers.persistent_identifier( full_type, _hash, metadata=_meta) self.assertEqual(actual_value, expected_persistent_id) def test_persistent_identifier_wrong_input(self): _snapshot_id = 'notahash4bc0bf3d81436bf980b46e98bd338453' _snapshot = {'id': _snapshot_id} for _type, _hash, _error in [ (SNAPSHOT, _snapshot_id, 'Unexpected characters'), (SNAPSHOT, _snapshot, 'Unexpected characters'), ('foo', '', 'Wrong input: Supported types are'), ]: with self.assertRaisesRegex(ValidationError, _error): identifiers.persistent_identifier(_type, _hash) def test_parse_persistent_identifier(self): for pid, _type, _version, _hash in [ ('swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2', CONTENT, 1, '94a9ed024d3859793618152ea559a168bbcbb5e2'), ('swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505', DIRECTORY, 1, 'd198bc9d7a6bcf6db04f476d29314f157507d505'), ('swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d', REVISION, 1, '309cf2674ee7a0749978cf8265ab91a60aea0f7d'), ('swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f', RELEASE, 1, '22ece559cc7cc2364edc5e5593d63ae8bd229f9f'), ('swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453', SNAPSHOT, 1, 'c7c108084bc0bf3d81436bf980b46e98bd338453'), ]: expected_result = PersistentId( namespace='swh', scheme_version=_version, object_type=_type, object_id=_hash, metadata={} ) actual_result = identifiers.parse_persistent_identifier(pid) self.assertEqual(actual_result, expected_result) for pid, _type, _version, _hash, _metadata in [ ('swh:1:cnt:9c95815d9e9d91b8dae8e05d8bbc696fe19f796b;lines=1-18;origin=https://github.com/python/cpython', # noqa CONTENT, 1, '9c95815d9e9d91b8dae8e05d8bbc696fe19f796b', { 'lines': '1-18', 'origin': 'https://github.com/python/cpython' }), ('swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;origin=deb://Debian/packages/linuxdoc-tools', # noqa DIRECTORY, 1, '0b6959356d30f1a4e9b7f6bca59b9a336464c03d', { 'origin': 'deb://Debian/packages/linuxdoc-tools' }) ]: expected_result = PersistentId( namespace='swh', scheme_version=_version, object_type=_type, object_id=_hash, metadata=_metadata ) actual_result = identifiers.parse_persistent_identifier(pid) self.assertEqual(actual_result, expected_result) def test_parse_persistent_identifier_parsing_error(self): for pid, _error in [ ('swh:1:cnt', 'Wrong format: There should be 4 mandatory values'), ('swh:1:', 'Wrong format: There should be 4 mandatory values'), ('swh:', 'Wrong format: There should be 4 mandatory values'), ('swh:1:cnt:', 'Wrong format: Identifier should be present'), ('foo:1:cnt:abc8bc9d7a6bcf6db04f476d29314f157507d505', 'Wrong format: Supported namespace is \'swh\''), ('swh:2:dir:def8bc9d7a6bcf6db04f476d29314f157507d505', 'Wrong format: Supported version is 1'), ('swh:1:foo:fed8bc9d7a6bcf6db04f476d29314f157507d505', 'Wrong format: Supported types are %s' % ( ', '.join(PERSISTENT_IDENTIFIER_TYPES))), ('swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;invalid;' 'malformed', 'Contextual data is badly formatted, form key=val expected'), ('swh:1:snp:gh6959356d30f1a4e9b7f6bca59b9a336464c03d', 'Wrong format: Identifier should be a valid hash'), ('swh:1:snp:foo', 'Wrong format: Identifier should be a valid hash') ]: with self.assertRaisesRegex( ValidationError, _error): identifiers.parse_persistent_identifier(pid) diff --git a/swh/model/tests/test_validators.py b/swh/model/tests/test_validators.py index 8c8512e..691c579 100644 --- a/swh/model/tests/test_validators.py +++ b/swh/model/tests/test_validators.py @@ -1,71 +1,75 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest from swh.model import exceptions, hashutil, validators +def hash_data(raw_content): + return hashutil.MultiHash.from_data(raw_content).digest() + + class TestValidators(unittest.TestCase): def setUp(self): self.valid_visible_content = { 'status': 'visible', 'length': 5, 'data': b'1984\n', 'ctime': datetime.datetime(2015, 11, 22, 16, 33, 56, tzinfo=datetime.timezone.utc), } self.valid_visible_content.update( - hashutil.hash_data(self.valid_visible_content['data'])) + hash_data(self.valid_visible_content['data'])) self.valid_absent_content = { 'status': 'absent', 'length': 5, 'ctime': datetime.datetime(2015, 11, 22, 16, 33, 56, tzinfo=datetime.timezone.utc), 'reason': 'Content too large', 'sha1_git': self.valid_visible_content['sha1_git'], 'origin': 42, } self.invalid_content_hash_mismatch = self.valid_visible_content.copy() self.invalid_content_hash_mismatch.update( - hashutil.hash_data(b"this is not the data you're looking for")) + hash_data(b"this is not the data you're looking for")) def test_validate_content(self): self.assertTrue( validators.validate_content(self.valid_visible_content)) self.assertTrue( validators.validate_content(self.valid_absent_content)) def test_validate_content_hash_mismatch(self): with self.assertRaises(exceptions.ValidationError) as cm: validators.validate_content(self.invalid_content_hash_mismatch) # All the hashes are wrong. The exception should be of the form: # ValidationError({ # NON_FIELD_ERRORS: [ # ValidationError('content-hash-mismatch', 'sha1'), # ValidationError('content-hash-mismatch', 'sha1_git'), # ValidationError('content-hash-mismatch', 'sha256'), # ] # }) exc = cm.exception self.assertIsInstance(str(exc), str) self.assertEqual(set(exc.error_dict.keys()), {exceptions.NON_FIELD_ERRORS}) hash_mismatches = exc.error_dict[exceptions.NON_FIELD_ERRORS] self.assertIsInstance(hash_mismatches, list) self.assertEqual(len(hash_mismatches), 4) self.assertTrue(all(mismatch.code == 'content-hash-mismatch' for mismatch in hash_mismatches)) self.assertEqual(set(mismatch.params['hash'] for mismatch in hash_mismatches), {'sha1', 'sha1_git', 'sha256', 'blake2s256'})