diff --git a/swh/storage/converters.py b/swh/storage/converters.py index 9bf2a35a5..7015fea51 100644 --- a/swh/storage/converters.py +++ b/swh/storage/converters.py @@ -1,360 +1,306 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import codecs import datetime import numbers +from swh.core.utils import decode_with_escape, encode_with_unescape DEFAULT_AUTHOR = { 'fullname': None, 'name': None, 'email': None, } DEFAULT_DATE = { 'timestamp': None, 'offset': 0, 'neg_utc_offset': None, } -def backslashescape_errors(exception): - if isinstance(exception, UnicodeDecodeError): - bad_data = exception.object[exception.start:exception.end] - escaped = ''.join(r'\x%02x' % x for x in bad_data) - return escaped, exception.end - - return codecs.backslashreplace_errors(exception) - -codecs.register_error('backslashescape', backslashescape_errors) - - -def decode_with_escape(value): - """Decode a bytestring as utf-8, escaping the bytes of invalid utf-8 sequences - as \\x. We also escape NUL bytes as they are invalid in JSON - strings. - """ - # escape backslashes - value = value.replace(b'\\', b'\\\\') - value = value.replace(b'\x00', b'\\x00') - return value.decode('utf-8', 'backslashescape') - - -def encode_with_unescape(value): - """Encode an unicode string containing \\x backslash escapes""" - slices = [] - start = 0 - odd_backslashes = False - i = 0 - while i < len(value): - if value[i] == '\\': - odd_backslashes = not odd_backslashes - else: - if odd_backslashes: - if value[i] != 'x': - raise ValueError('invalid escape for %r at position %d' % - (value, i-1)) - slices.append( - value[start:i-1].replace('\\\\', '\\').encode('utf-8') - ) - slices.append(bytes.fromhex(value[i+1:i+3])) - - odd_backslashes = False - start = i = i + 3 - continue - - i += 1 - - slices.append( - value[start:i].replace('\\\\', '\\').encode('utf-8') - ) - - return b''.join(slices) - - def author_to_db(author): """Convert a swh-model author to its DB representation. Args: a swh-model compatible author Returns: a dict containing three keys: author, fullname and email """ if author is None: return DEFAULT_AUTHOR return author def db_to_author(id, fullname, name, email): """Convert the DB representation of an author to a swh-model author. Args: id (long): the author's identifier fullname (bytes): the author's fullname name (bytes): the author's name email (bytes): the author's email Returns: a dict with four keys: id, fullname, name and email, or None if the id is None """ if id is None: return None return { 'id': id, 'fullname': fullname, 'name': name, 'email': email, } def git_headers_to_db(git_headers): """Convert git headers to their database representation. We convert the bytes to unicode by decoding them into utf-8 and replacing invalid utf-8 sequences with backslash escapes. """ ret = [] for key, values in git_headers: if isinstance(values, list): ret.append([key, [decode_with_escape(value) for value in values]]) else: ret.append([key, decode_with_escape(values)]) return ret def db_to_git_headers(db_git_headers): ret = [] for key, values in db_git_headers: if isinstance(values, list): ret.append([key, [encode_with_unescape(value) for value in values]]) else: ret.append([key, encode_with_unescape(values)]) return ret def db_to_date(date, offset, neg_utc_offset): """Convert the DB representation of a date to a swh-model compatible date. Args: date (datetime.datetime): a date pulled out of the database offset (int): an integer number of minutes representing an UTC offset neg_utc_offset (boolean): whether an utc offset is negative Returns: a dict with three keys: timestamp: a timestamp from UTC offset: the number of minutes since UTC negative_utc: whether a null UTC offset is negative """ if date is None: return None return { 'timestamp': date.timestamp(), 'offset': offset, 'negative_utc': neg_utc_offset, } def date_to_db(date_offset): """Convert a swh-model date_offset to its DB representation. Args: a swh-model compatible date_offset Returns: a dict with three keys: timestamp: a date in ISO format offset: the UTC offset in minutes neg_utc_offset: a boolean indicating whether a null offset is negative or positive. """ if date_offset is None: return DEFAULT_DATE if isinstance(date_offset, numbers.Real): date_offset = datetime.datetime.fromtimestamp(date_offset, tz=datetime.timezone.utc) if isinstance(date_offset, datetime.datetime): timestamp = date_offset utcoffset = date_offset.utcoffset() offset = int(utcoffset.total_seconds()) // 60 neg_utc_offset = False if offset == 0 else None else: if isinstance(date_offset['timestamp'], numbers.Real): timestamp = datetime.datetime.fromtimestamp( date_offset['timestamp'], tz=datetime.timezone.utc) else: timestamp = date_offset['timestamp'] offset = date_offset['offset'] neg_utc_offset = date_offset.get('negative_utc', None) return { 'timestamp': timestamp.isoformat(), 'offset': offset, 'neg_utc_offset': neg_utc_offset, } def revision_to_db(revision): """Convert a swh-model revision to its database representation. """ author = author_to_db(revision['author']) date = date_to_db(revision['date']) committer = author_to_db(revision['committer']) committer_date = date_to_db(revision['committer_date']) metadata = revision['metadata'] if metadata and 'extra_headers' in metadata: metadata = metadata.copy() extra_headers = git_headers_to_db(metadata['extra_headers']) metadata['extra_headers'] = extra_headers return { 'id': revision['id'], 'author_fullname': author['fullname'], 'author_name': author['name'], 'author_email': author['email'], 'date': date['timestamp'], 'date_offset': date['offset'], 'date_neg_utc_offset': date['neg_utc_offset'], 'committer_fullname': committer['fullname'], 'committer_name': committer['name'], 'committer_email': committer['email'], 'committer_date': committer_date['timestamp'], 'committer_date_offset': committer_date['offset'], 'committer_date_neg_utc_offset': committer_date['neg_utc_offset'], 'type': revision['type'], 'directory': revision['directory'], 'message': revision['message'], 'metadata': metadata, 'synthetic': revision['synthetic'], 'parents': [ { 'id': revision['id'], 'parent_id': parent, 'parent_rank': i, } for i, parent in enumerate(revision['parents']) ], } def db_to_revision(db_revision): """Convert a database representation of a revision to its swh-model representation.""" author = db_to_author( db_revision['author_id'], db_revision['author_fullname'], db_revision['author_name'], db_revision['author_email'], ) date = db_to_date( db_revision['date'], db_revision['date_offset'], db_revision['date_neg_utc_offset'], ) committer = db_to_author( db_revision['committer_id'], db_revision['committer_fullname'], db_revision['committer_name'], db_revision['committer_email'], ) committer_date = db_to_date( db_revision['committer_date'], db_revision['committer_date_offset'], db_revision['committer_date_neg_utc_offset'] ) metadata = db_revision['metadata'] if metadata and 'extra_headers' in metadata: extra_headers = db_to_git_headers(metadata['extra_headers']) metadata['extra_headers'] = extra_headers parents = [] if 'parents' in db_revision: for parent in db_revision['parents']: if parent: parents.append(parent) return { 'id': db_revision['id'], 'author': author, 'date': date, 'committer': committer, 'committer_date': committer_date, 'type': db_revision['type'], 'directory': db_revision['directory'], 'message': db_revision['message'], 'metadata': metadata, 'synthetic': db_revision['synthetic'], 'parents': parents, } def release_to_db(release): """Convert a swh-model release to its database representation. """ author = author_to_db(release['author']) date = date_to_db(release['date']) return { 'id': release['id'], 'author_fullname': author['fullname'], 'author_name': author['name'], 'author_email': author['email'], 'date': date['timestamp'], 'date_offset': date['offset'], 'date_neg_utc_offset': date['neg_utc_offset'], 'name': release['name'], 'target': release['target'], 'target_type': release['target_type'], 'comment': release['message'], 'synthetic': release['synthetic'], } def db_to_release(db_release): """Convert a database representation of a release to its swh-model representation. """ author = db_to_author( db_release['author_id'], db_release['author_fullname'], db_release['author_name'], db_release['author_email'], ) date = db_to_date( db_release['date'], db_release['date_offset'], db_release['date_neg_utc_offset'] ) return { 'author': author, 'date': date, 'id': db_release['id'], 'name': db_release['name'], 'message': db_release['comment'], 'synthetic': db_release['synthetic'], 'target': db_release['target'], 'target_type': db_release['target_type'], } diff --git a/swh/storage/tests/test_converters.py b/swh/storage/tests/test_converters.py index b02275059..f5ce97782 100644 --- a/swh/storage/tests/test_converters.py +++ b/swh/storage/tests/test_converters.py @@ -1,207 +1,130 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from nose.tools import istest from nose.plugins.attrib import attr from swh.storage import converters @attr('!db') class TestConverters(unittest.TestCase): def setUp(self): self.maxDiff = None @istest def db_to_author(self): # when actual_author = converters.db_to_author( 1, b'fullname', b'name', b'email') # then self.assertEquals(actual_author, { 'id': 1, 'fullname': b'fullname', 'name': b'name', 'email': b'email', }) @istest def db_to_revision(self): # when actual_revision = converters.db_to_revision({ 'id': 'revision-id', 'date': None, 'date_offset': None, 'date_neg_utc_offset': None, 'committer_date': None, 'committer_date_offset': None, 'committer_date_neg_utc_offset': None, 'type': 'rev', 'directory': b'dir-sha1', 'message': b'commit message', 'author_id': 'auth-id', 'author_fullname': b'auth-fullname', 'author_name': b'auth-name', 'author_email': b'auth-email', 'committer_id': 'comm-id', 'committer_fullname': b'comm-fullname', 'committer_name': b'comm-name', 'committer_email': b'comm-email', 'metadata': {}, 'synthetic': False, 'parents': [123, 456] }) # then self.assertEquals(actual_revision, { 'id': 'revision-id', 'author': { 'id': 'auth-id', 'fullname': b'auth-fullname', 'name': b'auth-name', 'email': b'auth-email', }, 'date': None, 'committer': { 'id': 'comm-id', 'fullname': b'comm-fullname', 'name': b'comm-name', 'email': b'comm-email', }, 'committer_date': None, 'type': 'rev', 'directory': b'dir-sha1', 'message': b'commit message', 'metadata': {}, 'synthetic': False, 'parents': [123, 456], }) @istest def db_to_release(self): # when actual_release = converters.db_to_release({ 'id': b'release-id', 'target': b'revision-id', 'target_type': 'revision', 'date': None, 'date_offset': None, 'date_neg_utc_offset': None, 'name': b'release-name', 'comment': b'release comment', 'synthetic': True, 'author_id': 'auth-id', 'author_fullname': b'auth-fullname', 'author_name': b'auth-name', 'author_email': b'auth-email', }) # then self.assertEquals(actual_release, { 'author': { 'id': 'auth-id', 'fullname': b'auth-fullname', 'name': b'auth-name', 'email': b'auth-email', }, 'date': None, 'id': b'release-id', 'name': b'release-name', 'message': b'release comment', 'synthetic': True, 'target': b'revision-id', 'target_type': 'revision' }) - @istest - def backslashescape_errors(self): - raw_data_err = b'abcd\x80' - with self.assertRaises(UnicodeDecodeError): - raw_data_err.decode('utf-8', 'strict') - - self.assertEquals( - raw_data_err.decode('utf-8', 'backslashescape'), - 'abcd\\x80', - ) - - raw_data_ok = b'abcd\xc3\xa9' - self.assertEquals( - raw_data_ok.decode('utf-8', 'backslashescape'), - raw_data_ok.decode('utf-8', 'strict'), - ) - - unicode_data = 'abcdef\u00a3' - self.assertEquals( - unicode_data.encode('ascii', 'backslashescape'), - b'abcdef\\xa3', - ) - - @istest - def encode_with_unescape(self): - valid_data = '\\x01020304\\x00' - valid_data_encoded = b'\x01020304\x00' - - self.assertEquals( - valid_data_encoded, - converters.encode_with_unescape(valid_data) - ) - - @istest - def encode_with_unescape_invalid_escape(self): - invalid_data = 'test\\abcd' - - with self.assertRaises(ValueError) as exc: - converters.encode_with_unescape(invalid_data) - - self.assertIn('invalid escape', exc.exception.args[0]) - self.assertIn('position 4', exc.exception.args[0]) - - @istest - def decode_with_escape(self): - backslashes = b'foo\\bar\\\\baz' - backslashes_escaped = 'foo\\\\bar\\\\\\\\baz' - - self.assertEquals( - backslashes_escaped, - converters.decode_with_escape(backslashes), - ) - - valid_utf8 = b'foo\xc3\xa2' - valid_utf8_escaped = 'foo\u00e2' - - self.assertEquals( - valid_utf8_escaped, - converters.decode_with_escape(valid_utf8), - ) - - invalid_utf8 = b'foo\xa2' - invalid_utf8_escaped = 'foo\\xa2' - - self.assertEquals( - invalid_utf8_escaped, - converters.decode_with_escape(invalid_utf8), - ) - - valid_utf8_nul = b'foo\xc3\xa2\x00' - valid_utf8_nul_escaped = 'foo\u00e2\\x00' - - self.assertEquals( - valid_utf8_nul_escaped, - converters.decode_with_escape(valid_utf8_nul), - ) - @istest def db_to_git_headers(self): raw_data = [ ['gpgsig', b'garbage\x89a\x43b\x14'], ['extra', [b'fo\\\\\\o', b'bar\\', b'inval\\\\\x99id']], ] db_data = converters.git_headers_to_db(raw_data) loop = converters.db_to_git_headers(db_data) self.assertEquals(raw_data, loop)