diff --git a/swh/storage/converters.py b/swh/storage/converters.py --- a/swh/storage/converters.py +++ b/swh/storage/converters.py @@ -3,10 +3,10 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import codecs import datetime import numbers +from swh.core.utils import decode_with_escape, encode_with_unescape DEFAULT_AUTHOR = { 'fullname': None, @@ -21,60 +21,6 @@ } -def backslashescape_errors(exception): - if isinstance(exception, UnicodeDecodeError): - bad_data = exception.object[exception.start:exception.end] - escaped = ''.join(r'\x%02x' % x for x in bad_data) - return escaped, exception.end - - return codecs.backslashreplace_errors(exception) - -codecs.register_error('backslashescape', backslashescape_errors) - - -def decode_with_escape(value): - """Decode a bytestring as utf-8, escaping the bytes of invalid utf-8 sequences - as \\x. We also escape NUL bytes as they are invalid in JSON - strings. - """ - # escape backslashes - value = value.replace(b'\\', b'\\\\') - value = value.replace(b'\x00', b'\\x00') - return value.decode('utf-8', 'backslashescape') - - -def encode_with_unescape(value): - """Encode an unicode string containing \\x backslash escapes""" - slices = [] - start = 0 - odd_backslashes = False - i = 0 - while i < len(value): - if value[i] == '\\': - odd_backslashes = not odd_backslashes - else: - if odd_backslashes: - if value[i] != 'x': - raise ValueError('invalid escape for %r at position %d' % - (value, i-1)) - slices.append( - value[start:i-1].replace('\\\\', '\\').encode('utf-8') - ) - slices.append(bytes.fromhex(value[i+1:i+3])) - - odd_backslashes = False - start = i = i + 3 - continue - - i += 1 - - slices.append( - value[start:i].replace('\\\\', '\\').encode('utf-8') - ) - - return b''.join(slices) - - def author_to_db(author): """Convert a swh-model author to its DB representation. diff --git a/swh/storage/tests/test_converters.py b/swh/storage/tests/test_converters.py --- a/swh/storage/tests/test_converters.py +++ b/swh/storage/tests/test_converters.py @@ -119,83 +119,6 @@ }) @istest - def backslashescape_errors(self): - raw_data_err = b'abcd\x80' - with self.assertRaises(UnicodeDecodeError): - raw_data_err.decode('utf-8', 'strict') - - self.assertEquals( - raw_data_err.decode('utf-8', 'backslashescape'), - 'abcd\\x80', - ) - - raw_data_ok = b'abcd\xc3\xa9' - self.assertEquals( - raw_data_ok.decode('utf-8', 'backslashescape'), - raw_data_ok.decode('utf-8', 'strict'), - ) - - unicode_data = 'abcdef\u00a3' - self.assertEquals( - unicode_data.encode('ascii', 'backslashescape'), - b'abcdef\\xa3', - ) - - @istest - def encode_with_unescape(self): - valid_data = '\\x01020304\\x00' - valid_data_encoded = b'\x01020304\x00' - - self.assertEquals( - valid_data_encoded, - converters.encode_with_unescape(valid_data) - ) - - @istest - def encode_with_unescape_invalid_escape(self): - invalid_data = 'test\\abcd' - - with self.assertRaises(ValueError) as exc: - converters.encode_with_unescape(invalid_data) - - self.assertIn('invalid escape', exc.exception.args[0]) - self.assertIn('position 4', exc.exception.args[0]) - - @istest - def decode_with_escape(self): - backslashes = b'foo\\bar\\\\baz' - backslashes_escaped = 'foo\\\\bar\\\\\\\\baz' - - self.assertEquals( - backslashes_escaped, - converters.decode_with_escape(backslashes), - ) - - valid_utf8 = b'foo\xc3\xa2' - valid_utf8_escaped = 'foo\u00e2' - - self.assertEquals( - valid_utf8_escaped, - converters.decode_with_escape(valid_utf8), - ) - - invalid_utf8 = b'foo\xa2' - invalid_utf8_escaped = 'foo\\xa2' - - self.assertEquals( - invalid_utf8_escaped, - converters.decode_with_escape(invalid_utf8), - ) - - valid_utf8_nul = b'foo\xc3\xa2\x00' - valid_utf8_nul_escaped = 'foo\u00e2\\x00' - - self.assertEquals( - valid_utf8_nul_escaped, - converters.decode_with_escape(valid_utf8_nul), - ) - - @istest def db_to_git_headers(self): raw_data = [ ['gpgsig', b'garbage\x89a\x43b\x14'],