diff --git a/swh/model/fields/compound.py b/swh/model/fields/compound.py index 412ebdb..00eb252 100644 --- a/swh/model/fields/compound.py +++ b/swh/model/fields/compound.py @@ -1,126 +1,126 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import itertools from ..exceptions import ValidationError, NON_FIELD_ERRORS def validate_against_schema(model, schema, value): """Validate a value for the given model against the given schema. Args: model: the name of the model schema: the schema to validate against value: the value to validate Returns: True if the value is correct against the schema Raises: ValidationError if the value does not validate against the schema """ if not isinstance(value, dict): raise ValidationError( 'Unexpected type %(type)s for %(model)s, expected dict', params={ 'model': model, 'type': value.__class__.__name__, }, code='model-unexpected-type', ) errors = defaultdict(list) for key, (mandatory, validators) in itertools.chain( ((k, v) for k, v in schema.items() if k != NON_FIELD_ERRORS), [(NON_FIELD_ERRORS, (False, schema.get(NON_FIELD_ERRORS, [])))] ): if not validators: continue if not isinstance(validators, list): validators = [validators] validated_value = value if key != NON_FIELD_ERRORS: try: validated_value = value[key] except KeyError: if mandatory: errors[key].append( ValidationError( 'Field %(field)s is mandatory', params={'field': key}, code='model-field-mandatory', ) ) continue else: if errors: # Don't validate the whole object if some fields are broken continue for validator in validators: try: valid = validator(validated_value) except ValidationError as e: errors[key].append(e) else: if not valid: errdata = { 'validator': validator.__name__, } if key == NON_FIELD_ERRORS: errmsg = 'Validation of model %(model)s failed in ' \ '%(validator)s' errdata['model'] = model errcode = 'model-validation-failed' else: errmsg = 'Validation of field %(field)s failed in ' \ '%(validator)s' errdata['field'] = key errcode = 'field-validation-failed' errors[key].append( ValidationError(errmsg, params=errdata, code=errcode) ) if errors: raise ValidationError(dict(errors)) return True def validate_all_keys(value, keys): """Validate that all the given keys are present in value""" missing_keys = set(keys) - set(value) if missing_keys: missing_fields = ', '.join(sorted(missing_keys)) raise ValidationError( - 'Missing mandatory fields %(missing_fields)', + 'Missing mandatory fields %(missing_fields)s', params={'missing_fields': missing_fields}, code='missing-mandatory-field' ) return True def validate_any_key(value, keys): """Validate that any of the given keys is present in value""" present_keys = set(keys) & set(value) if not present_keys: missing_fields = ', '.join(sorted(keys)) raise ValidationError( - 'Must contain one of the alternative fields %(missing_fields)', - params={'missing_fields': missing_fields}, - code='missing-alternative-field', - ) + 'Must contain one of the alternative fields %(missing_fields)s', + params={'missing_fields': missing_fields}, + code='missing-alternative-field', + ) return True diff --git a/swh/model/fields/simple.py b/swh/model/fields/simple.py index 0f8b305..3020997 100644 --- a/swh/model/fields/simple.py +++ b/swh/model/fields/simple.py @@ -1,80 +1,80 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import numbers from ..exceptions import ValidationError def validate_type(value, type): """Validate that value is an integer""" if not isinstance(value, type): if isinstance(type, tuple): typestr = 'one of %s' % ', '.join(typ.__name__ for typ in type) else: typestr = type.__name__ raise ValidationError( 'Unexpected type %(type)s, expected %(expected_type)s', params={ 'type': value.__class__.__name__, 'expected_type': typestr, }, code='unexpected-type' ) return True def validate_int(value): """Validate that the given value is an int""" return validate_type(value, numbers.Integral) def validate_str(value): """Validate that the given value is a string""" return validate_type(value, str) def validate_bytes(value): """Validate that the given value is a bytes object""" return validate_type(value, bytes) def validate_datetime(value): """Validate that the given value is either a datetime, or a numeric number of seconds since the UNIX epoch.""" errors = [] try: validate_type(value, (datetime.datetime, numbers.Real)) except ValidationError as e: errors.append(e) if isinstance(value, datetime.datetime) and value.tzinfo is None: errors.append(ValidationError( 'Datetimes must be timezone-aware in swh', code='datetime-without-tzinfo', )) if errors: raise ValidationError(errors) return True def validate_enum(value, expected_values): """Validate that value is contained in expected_values""" if value not in expected_values: raise ValidationError( - 'Unexpected value %(value)s, expected one of %(expected_values)', + 'Unexpected value %(value)s, expected one of %(expected_values)s', params={ 'value': value, 'expected_values': ', '.join(sorted(expected_values)), }, code='unexpected-value', ) return True diff --git a/swh/model/tests/fields/test_compound.py b/swh/model/tests/fields/test_compound.py index db840a9..b6e13b6 100644 --- a/swh/model/tests/fields/test_compound.py +++ b/swh/model/tests/fields/test_compound.py @@ -1,232 +1,241 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest from nose.tools import istest from swh.model.exceptions import ValidationError, NON_FIELD_ERRORS from swh.model.fields import compound, simple class ValidateCompound(unittest.TestCase): def setUp(self): def validate_always(model): return True def validate_never(model): return False self.test_model = 'test model' self.test_schema = { 'int': (True, simple.validate_int), 'str': (True, simple.validate_str), 'str2': (True, simple.validate_str), 'datetime': (False, simple.validate_datetime), NON_FIELD_ERRORS: validate_always, } self.test_schema_shortcut = self.test_schema.copy() self.test_schema_shortcut[NON_FIELD_ERRORS] = validate_never self.test_schema_field_failed = self.test_schema.copy() self.test_schema_field_failed['int'] = (True, [simple.validate_int, validate_never]) self.test_value = { 'str': 'value1', 'str2': 'value2', 'int': 42, 'datetime': datetime.datetime(1990, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), } self.test_value_missing = { 'str': 'value1', } self.test_value_str_error = { 'str': 1984, 'str2': 'value2', 'int': 42, 'datetime': datetime.datetime(1990, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), } self.test_value_missing_keys = {'int'} self.test_value_wrong_type = 42 self.present_keys = set(self.test_value) self.missing_keys = {'missingkey1', 'missingkey2'} @istest def validate_any_key(self): self.assertTrue( compound.validate_any_key(self.test_value, self.present_keys)) self.assertTrue( compound.validate_any_key(self.test_value, self.present_keys | self.missing_keys)) @istest def validate_any_key_missing(self): with self.assertRaises(ValidationError) as cm: compound.validate_any_key(self.test_value, self.missing_keys) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'missing-alternative-field') self.assertEqual(exc.params['missing_fields'], ', '.join(sorted(self.missing_keys))) @istest def validate_all_keys(self): self.assertTrue( compound.validate_all_keys(self.test_value, self.present_keys)) @istest def validate_all_keys_missing(self): with self.assertRaises(ValidationError) as cm: compound.validate_all_keys(self.test_value, self.missing_keys) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'missing-mandatory-field') self.assertEqual(exc.params['missing_fields'], ', '.join(sorted(self.missing_keys))) with self.assertRaises(ValidationError) as cm: compound.validate_all_keys(self.test_value, self.present_keys | self.missing_keys) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'missing-mandatory-field') self.assertEqual(exc.params['missing_fields'], ', '.join(sorted(self.missing_keys))) @istest def validate_against_schema(self): self.assertTrue( compound.validate_against_schema(self.test_model, self.test_schema, self.test_value)) @istest def validate_against_schema_wrong_type(self): with self.assertRaises(ValidationError) as cm: compound.validate_against_schema(self.test_model, self.test_schema, self.test_value_wrong_type) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'model-unexpected-type') self.assertEqual(exc.params['model'], self.test_model) self.assertEqual(exc.params['type'], self.test_value_wrong_type.__class__.__name__) @istest def validate_against_schema_mandatory_keys(self): with self.assertRaises(ValidationError) as cm: compound.validate_against_schema(self.test_model, self.test_schema, self.test_value_missing) # The exception should be of the form: # ValidationError({ # 'mandatory_key1': [ValidationError('model-field-mandatory')], # 'mandatory_key2': [ValidationError('model-field-mandatory')], # }) exc = cm.exception + self.assertIsInstance(str(exc), str) for key in self.test_value_missing_keys: nested_key = exc.error_dict[key] self.assertIsInstance(nested_key, list) self.assertEqual(len(nested_key), 1) nested = nested_key[0] self.assertIsInstance(nested, ValidationError) self.assertEqual(nested.code, 'model-field-mandatory') self.assertEqual(nested.params['field'], key) @istest def validate_against_schema_whole_schema_shortcut_previous_error(self): with self.assertRaises(ValidationError) as cm: compound.validate_against_schema( self.test_model, self.test_schema_shortcut, self.test_value_missing, ) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertNotIn(NON_FIELD_ERRORS, exc.error_dict) @istest def validate_against_schema_whole_schema(self): with self.assertRaises(ValidationError) as cm: compound.validate_against_schema( self.test_model, self.test_schema_shortcut, self.test_value, ) # The exception should be of the form: # ValidationError({ # NON_FIELD_ERRORS: [ValidationError('model-validation-failed')], # }) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEquals(set(exc.error_dict.keys()), {NON_FIELD_ERRORS}) non_field_errors = exc.error_dict[NON_FIELD_ERRORS] self.assertIsInstance(non_field_errors, list) self.assertEquals(len(non_field_errors), 1) nested = non_field_errors[0] self.assertIsInstance(nested, ValidationError) self.assertEquals(nested.code, 'model-validation-failed') self.assertEquals(nested.params['model'], self.test_model) self.assertEquals(nested.params['validator'], 'validate_never') @istest def validate_against_schema_field_error(self): with self.assertRaises(ValidationError) as cm: compound.validate_against_schema(self.test_model, self.test_schema, self.test_value_str_error) # The exception should be of the form: # ValidationError({ # 'str': [ValidationError('unexpected-type')], # }) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEquals(set(exc.error_dict.keys()), {'str'}) str_errors = exc.error_dict['str'] self.assertIsInstance(str_errors, list) self.assertEquals(len(str_errors), 1) nested = str_errors[0] self.assertIsInstance(nested, ValidationError) self.assertEquals(nested.code, 'unexpected-type') @istest def validate_against_schema_field_failed(self): with self.assertRaises(ValidationError) as cm: compound.validate_against_schema(self.test_model, self.test_schema_field_failed, self.test_value) # The exception should be of the form: # ValidationError({ # 'int': [ValidationError('field-validation-failed')], # }) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEquals(set(exc.error_dict.keys()), {'int'}) int_errors = exc.error_dict['int'] self.assertIsInstance(int_errors, list) self.assertEquals(len(int_errors), 1) nested = int_errors[0] self.assertIsInstance(nested, ValidationError) self.assertEquals(nested.code, 'field-validation-failed') self.assertEquals(nested.params['validator'], 'validate_never') self.assertEquals(nested.params['field'], 'int') diff --git a/swh/model/tests/fields/test_hashes.py b/swh/model/tests/fields/test_hashes.py index b79678e..0ef303f 100644 --- a/swh/model/tests/fields/test_hashes.py +++ b/swh/model/tests/fields/test_hashes.py @@ -1,154 +1,162 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from nose.tools import istest from swh.model.exceptions import ValidationError from swh.model.fields import hashes class ValidateHashes(unittest.TestCase): def setUp(self): self.valid_byte_hashes = { 'sha1': b'\xf1\xd2\xd2\xf9\x24\xe9\x86\xac\x86\xfd\xf7\xb3\x6c\x94' b'\xbc\xdf\x32\xbe\xec\x15', 'sha1_git': b'\x25\x7c\xc5\x64\x2c\xb1\xa0\x54\xf0\x8c\xc8\x3f\x2d' b'\x94\x3e\x56\xfd\x3e\xbe\x99', 'sha256': b'\xb5\xbb\x9d\x80\x14\xa0\xf9\xb1\xd6\x1e\x21\xe7\x96' b'\xd7\x8d\xcc\xdf\x13\x52\xf2\x3c\xd3\x28\x12\xf4\x85' b'\x0b\x87\x8a\xe4\x94\x4c', } self.valid_str_hashes = { 'sha1': 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15', 'sha1_git': '257cc5642cb1a054f08cc83f2d943e56fd3ebe99', 'sha256': 'b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f485' '0b878ae4944c', } self.bad_hash = object() @istest def valid_bytes_hash(self): for hash_type, value in self.valid_byte_hashes.items(): self.assertTrue(hashes.validate_hash(value, hash_type)) @istest def valid_str_hash(self): for hash_type, value in self.valid_str_hashes.items(): self.assertTrue(hashes.validate_hash(value, hash_type)) @istest def invalid_hash_type(self): hash_type = 'unknown_hash_type' with self.assertRaises(ValidationError) as cm: hashes.validate_hash(self.valid_str_hashes['sha1'], hash_type) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-type') self.assertEqual(exc.params['hash_type'], hash_type) self.assertIn('Unexpected hash type', str(exc)) self.assertIn(hash_type, str(exc)) @istest def invalid_bytes_len(self): for hash_type, value in self.valid_byte_hashes.items(): value = value + b'\x00\x01' with self.assertRaises(ValidationError) as cm: hashes.validate_hash(value, hash_type) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-length') self.assertEqual(exc.params['hash_type'], hash_type) self.assertEqual(exc.params['length'], len(value)) self.assertIn('Unexpected length', str(exc)) self.assertIn(str(len(value)), str(exc)) @istest def invalid_str_len(self): for hash_type, value in self.valid_str_hashes.items(): value = value + '0001' with self.assertRaises(ValidationError) as cm: hashes.validate_hash(value, hash_type) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-length') self.assertEqual(exc.params['hash_type'], hash_type) self.assertEqual(exc.params['length'], len(value)) self.assertIn('Unexpected length', str(exc)) self.assertIn(str(len(value)), str(exc)) @istest def invalid_str_contents(self): for hash_type, value in self.valid_str_hashes.items(): value = '\xa2' + value[1:-1] + '\xc3' with self.assertRaises(ValidationError) as cm: hashes.validate_hash(value, hash_type) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-contents') self.assertEqual(exc.params['hash_type'], hash_type) self.assertEqual(exc.params['unexpected_chars'], '\xa2, \xc3') self.assertIn('Unexpected characters', str(exc)) self.assertIn('\xc3', str(exc)) self.assertIn('\xa2', str(exc)) @istest def invalid_value_type(self): with self.assertRaises(ValidationError) as cm: hashes.validate_hash(self.bad_hash, 'sha1') exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-value-type') self.assertEqual(exc.params['type'], self.bad_hash.__class__.__name__) self.assertIn('Unexpected type', str(exc)) self.assertIn(self.bad_hash.__class__.__name__, str(exc)) @istest def validate_sha1(self): self.assertTrue(hashes.validate_sha1(self.valid_byte_hashes['sha1'])) self.assertTrue(hashes.validate_sha1(self.valid_str_hashes['sha1'])) with self.assertRaises(ValidationError) as cm: hashes.validate_sha1(self.bad_hash) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-value-type') self.assertEqual(exc.params['type'], self.bad_hash.__class__.__name__) @istest def validate_sha1_git(self): self.assertTrue( hashes.validate_sha1_git(self.valid_byte_hashes['sha1_git'])) self.assertTrue( hashes.validate_sha1_git(self.valid_str_hashes['sha1_git'])) with self.assertRaises(ValidationError) as cm: hashes.validate_sha1_git(self.bad_hash) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-value-type') self.assertEqual(exc.params['type'], self.bad_hash.__class__.__name__) @istest def validate_sha256(self): self.assertTrue( hashes.validate_sha256(self.valid_byte_hashes['sha256'])) self.assertTrue( hashes.validate_sha256(self.valid_str_hashes['sha256'])) with self.assertRaises(ValidationError) as cm: hashes.validate_sha256(self.bad_hash) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-hash-value-type') self.assertEqual(exc.params['type'], self.bad_hash.__class__.__name__) diff --git a/swh/model/tests/fields/test_simple.py b/swh/model/tests/fields/test_simple.py index 9af424f..6fa2918 100644 --- a/swh/model/tests/fields/test_simple.py +++ b/swh/model/tests/fields/test_simple.py @@ -1,128 +1,136 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest from nose.tools import istest from swh.model.exceptions import ValidationError from swh.model.fields import simple class ValidateSimple(unittest.TestCase): def setUp(self): self.valid_str = 'I am a valid string' self.valid_bytes = b'I am a valid bytes object' self.enum_values = {'an enum value', 'other', 'and another'} self.invalid_enum_value = 'invalid enum value' self.valid_int = 42 self.valid_real = 42.42 self.valid_datetime = datetime.datetime(1999, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) self.invalid_datetime_notz = datetime.datetime(1999, 1, 1, 12, 0, 0) @istest def validate_int(self): self.assertTrue(simple.validate_int(self.valid_int)) @istest def validate_int_invalid_type(self): with self.assertRaises(ValidationError) as cm: simple.validate_int(self.valid_str) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-type') self.assertEqual(exc.params['expected_type'], 'Integral') self.assertEqual(exc.params['type'], 'str') @istest def validate_str(self): self.assertTrue(simple.validate_str(self.valid_str)) @istest def validate_str_invalid_type(self): with self.assertRaises(ValidationError) as cm: simple.validate_str(self.valid_int) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-type') self.assertEqual(exc.params['expected_type'], 'str') self.assertEqual(exc.params['type'], 'int') with self.assertRaises(ValidationError) as cm: simple.validate_str(self.valid_bytes) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-type') self.assertEqual(exc.params['expected_type'], 'str') self.assertEqual(exc.params['type'], 'bytes') @istest def validate_bytes(self): self.assertTrue(simple.validate_bytes(self.valid_bytes)) @istest def validate_bytes_invalid_type(self): with self.assertRaises(ValidationError) as cm: simple.validate_bytes(self.valid_int) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-type') self.assertEqual(exc.params['expected_type'], 'bytes') self.assertEqual(exc.params['type'], 'int') with self.assertRaises(ValidationError) as cm: simple.validate_bytes(self.valid_str) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-type') self.assertEqual(exc.params['expected_type'], 'bytes') self.assertEqual(exc.params['type'], 'str') @istest def validate_datetime(self): self.assertTrue(simple.validate_datetime(self.valid_datetime)) self.assertTrue(simple.validate_datetime(self.valid_int)) self.assertTrue(simple.validate_datetime(self.valid_real)) @istest def validate_datetime_invalid_type(self): with self.assertRaises(ValidationError) as cm: simple.validate_datetime(self.valid_str) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-type') self.assertEqual(exc.params['expected_type'], 'one of datetime, Real') self.assertEqual(exc.params['type'], 'str') @istest def validate_datetime_invalide_tz(self): with self.assertRaises(ValidationError) as cm: simple.validate_datetime(self.invalid_datetime_notz) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'datetime-without-tzinfo') @istest def validate_enum(self): for value in self.enum_values: self.assertTrue(simple.validate_enum(value, self.enum_values)) @istest def validate_enum_invalid_value(self): with self.assertRaises(ValidationError) as cm: simple.validate_enum(self.invalid_enum_value, self.enum_values) exc = cm.exception + self.assertIsInstance(str(exc), str) self.assertEqual(exc.code, 'unexpected-value') self.assertEqual(exc.params['value'], self.invalid_enum_value) self.assertEqual(exc.params['expected_values'], ', '.join(sorted(self.enum_values)))