Changeset View
Changeset View
Standalone View
Standalone View
swh/core/utils.py
Show First 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | def grouper(iterable, n): | ||||
args = [iter(iterable)] * n | args = [iter(iterable)] * n | ||||
stop_value = object() | stop_value = object() | ||||
for _data in itertools.zip_longest(*args, fillvalue=stop_value): | for _data in itertools.zip_longest(*args, fillvalue=stop_value): | ||||
yield (d for d in _data if d is not stop_value) | yield (d for d in _data if d is not stop_value) | ||||
def backslashescape_errors(exception): | def backslashescape_errors(exception): | ||||
if isinstance(exception, UnicodeDecodeError): | if isinstance(exception, UnicodeDecodeError): | ||||
bad_data = exception.object[exception.start:exception.end] | bad_data = exception.object[exception.start : exception.end] | ||||
escaped = ''.join(r'\x%02x' % x for x in bad_data) | escaped = ''.join(r'\x%02x' % x for x in bad_data) | ||||
return escaped, exception.end | return escaped, exception.end | ||||
return codecs.backslashreplace_errors(exception) | return codecs.backslashreplace_errors(exception) | ||||
codecs.register_error('backslashescape', backslashescape_errors) | codecs.register_error('backslashescape', backslashescape_errors) | ||||
def encode_with_unescape(value): | def encode_with_unescape(value): | ||||
"""Encode an unicode string containing \\x<hex> backslash escapes""" | """Encode an unicode string containing \\x<hex> backslash escapes""" | ||||
slices = [] | slices = [] | ||||
start = 0 | start = 0 | ||||
odd_backslashes = False | odd_backslashes = False | ||||
i = 0 | i = 0 | ||||
while i < len(value): | while i < len(value): | ||||
if value[i] == '\\': | if value[i] == '\\': | ||||
odd_backslashes = not odd_backslashes | odd_backslashes = not odd_backslashes | ||||
else: | else: | ||||
if odd_backslashes: | if odd_backslashes: | ||||
if value[i] != 'x': | if value[i] != 'x': | ||||
raise ValueError('invalid escape for %r at position %d' % | raise ValueError( | ||||
(value, i-1)) | 'invalid escape for %r at position %d' % (value, i - 1) | ||||
) | |||||
slices.append( | slices.append( | ||||
value[start:i-1].replace('\\\\', '\\').encode('utf-8') | value[start : i - 1].replace('\\\\', '\\').encode('utf-8') | ||||
) | ) | ||||
slices.append(bytes.fromhex(value[i+1:i+3])) | slices.append(bytes.fromhex(value[i + 1 : i + 3])) | ||||
odd_backslashes = False | odd_backslashes = False | ||||
start = i = i + 3 | start = i = i + 3 | ||||
continue | continue | ||||
i += 1 | i += 1 | ||||
slices.append( | slices.append(value[start:i].replace('\\\\', '\\').encode('utf-8')) | ||||
value[start:i].replace('\\\\', '\\').encode('utf-8') | |||||
) | |||||
return b''.join(slices) | return b''.join(slices) | ||||
def decode_with_escape(value): | def decode_with_escape(value): | ||||
"""Decode a bytestring as utf-8, escaping the bytes of invalid utf-8 sequences | """Decode a bytestring as utf-8, escaping the bytes of invalid utf-8 sequences | ||||
as \\x<hex value>. We also escape NUL bytes as they are invalid in JSON | as \\x<hex value>. We also escape NUL bytes as they are invalid in JSON | ||||
strings. | strings. | ||||
Show All 25 Lines |