Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/storage/converters.py b/swh/storage/converters.py
index 9bf2a35a5..7015fea51 100644
--- a/swh/storage/converters.py
+++ b/swh/storage/converters.py
@@ -1,360 +1,306 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import codecs
import datetime
import numbers
+from swh.core.utils import decode_with_escape, encode_with_unescape
DEFAULT_AUTHOR = {
'fullname': None,
'name': None,
'email': None,
}
DEFAULT_DATE = {
'timestamp': None,
'offset': 0,
'neg_utc_offset': None,
}
-def backslashescape_errors(exception):
- if isinstance(exception, UnicodeDecodeError):
- bad_data = exception.object[exception.start:exception.end]
- escaped = ''.join(r'\x%02x' % x for x in bad_data)
- return escaped, exception.end
-
- return codecs.backslashreplace_errors(exception)
-
-codecs.register_error('backslashescape', backslashescape_errors)
-
-
-def decode_with_escape(value):
- """Decode a bytestring as utf-8, escaping the bytes of invalid utf-8 sequences
- as \\x<hex value>. We also escape NUL bytes as they are invalid in JSON
- strings.
- """
- # escape backslashes
- value = value.replace(b'\\', b'\\\\')
- value = value.replace(b'\x00', b'\\x00')
- return value.decode('utf-8', 'backslashescape')
-
-
-def encode_with_unescape(value):
- """Encode an unicode string containing \\x<hex> backslash escapes"""
- slices = []
- start = 0
- odd_backslashes = False
- i = 0
- while i < len(value):
- if value[i] == '\\':
- odd_backslashes = not odd_backslashes
- else:
- if odd_backslashes:
- if value[i] != 'x':
- raise ValueError('invalid escape for %r at position %d' %
- (value, i-1))
- slices.append(
- value[start:i-1].replace('\\\\', '\\').encode('utf-8')
- )
- slices.append(bytes.fromhex(value[i+1:i+3]))
-
- odd_backslashes = False
- start = i = i + 3
- continue
-
- i += 1
-
- slices.append(
- value[start:i].replace('\\\\', '\\').encode('utf-8')
- )
-
- return b''.join(slices)
-
-
def author_to_db(author):
"""Convert a swh-model author to its DB representation.
Args: a swh-model compatible author
Returns:
a dict containing three keys: author, fullname and email
"""
if author is None:
return DEFAULT_AUTHOR
return author
def db_to_author(id, fullname, name, email):
"""Convert the DB representation of an author to a swh-model author.
Args:
id (long): the author's identifier
fullname (bytes): the author's fullname
name (bytes): the author's name
email (bytes): the author's email
Returns:
a dict with four keys: id, fullname, name and email, or None if the id
is None
"""
if id is None:
return None
return {
'id': id,
'fullname': fullname,
'name': name,
'email': email,
}
def git_headers_to_db(git_headers):
"""Convert git headers to their database representation.
We convert the bytes to unicode by decoding them into utf-8 and replacing
invalid utf-8 sequences with backslash escapes.
"""
ret = []
for key, values in git_headers:
if isinstance(values, list):
ret.append([key, [decode_with_escape(value) for value in values]])
else:
ret.append([key, decode_with_escape(values)])
return ret
def db_to_git_headers(db_git_headers):
ret = []
for key, values in db_git_headers:
if isinstance(values, list):
ret.append([key, [encode_with_unescape(value)
for value in values]])
else:
ret.append([key, encode_with_unescape(values)])
return ret
def db_to_date(date, offset, neg_utc_offset):
"""Convert the DB representation of a date to a swh-model compatible date.
Args:
date (datetime.datetime): a date pulled out of the database
offset (int): an integer number of minutes representing an UTC offset
neg_utc_offset (boolean): whether an utc offset is negative
Returns:
a dict with three keys:
timestamp: a timestamp from UTC
offset: the number of minutes since UTC
negative_utc: whether a null UTC offset is negative
"""
if date is None:
return None
return {
'timestamp': date.timestamp(),
'offset': offset,
'negative_utc': neg_utc_offset,
}
def date_to_db(date_offset):
"""Convert a swh-model date_offset to its DB representation.
Args: a swh-model compatible date_offset
Returns:
a dict with three keys:
timestamp: a date in ISO format
offset: the UTC offset in minutes
neg_utc_offset: a boolean indicating whether a null offset is
negative or positive.
"""
if date_offset is None:
return DEFAULT_DATE
if isinstance(date_offset, numbers.Real):
date_offset = datetime.datetime.fromtimestamp(date_offset,
tz=datetime.timezone.utc)
if isinstance(date_offset, datetime.datetime):
timestamp = date_offset
utcoffset = date_offset.utcoffset()
offset = int(utcoffset.total_seconds()) // 60
neg_utc_offset = False if offset == 0 else None
else:
if isinstance(date_offset['timestamp'], numbers.Real):
timestamp = datetime.datetime.fromtimestamp(
date_offset['timestamp'], tz=datetime.timezone.utc)
else:
timestamp = date_offset['timestamp']
offset = date_offset['offset']
neg_utc_offset = date_offset.get('negative_utc', None)
return {
'timestamp': timestamp.isoformat(),
'offset': offset,
'neg_utc_offset': neg_utc_offset,
}
def revision_to_db(revision):
"""Convert a swh-model revision to its database representation.
"""
author = author_to_db(revision['author'])
date = date_to_db(revision['date'])
committer = author_to_db(revision['committer'])
committer_date = date_to_db(revision['committer_date'])
metadata = revision['metadata']
if metadata and 'extra_headers' in metadata:
metadata = metadata.copy()
extra_headers = git_headers_to_db(metadata['extra_headers'])
metadata['extra_headers'] = extra_headers
return {
'id': revision['id'],
'author_fullname': author['fullname'],
'author_name': author['name'],
'author_email': author['email'],
'date': date['timestamp'],
'date_offset': date['offset'],
'date_neg_utc_offset': date['neg_utc_offset'],
'committer_fullname': committer['fullname'],
'committer_name': committer['name'],
'committer_email': committer['email'],
'committer_date': committer_date['timestamp'],
'committer_date_offset': committer_date['offset'],
'committer_date_neg_utc_offset': committer_date['neg_utc_offset'],
'type': revision['type'],
'directory': revision['directory'],
'message': revision['message'],
'metadata': metadata,
'synthetic': revision['synthetic'],
'parents': [
{
'id': revision['id'],
'parent_id': parent,
'parent_rank': i,
} for i, parent in enumerate(revision['parents'])
],
}
def db_to_revision(db_revision):
"""Convert a database representation of a revision to its swh-model
representation."""
author = db_to_author(
db_revision['author_id'],
db_revision['author_fullname'],
db_revision['author_name'],
db_revision['author_email'],
)
date = db_to_date(
db_revision['date'],
db_revision['date_offset'],
db_revision['date_neg_utc_offset'],
)
committer = db_to_author(
db_revision['committer_id'],
db_revision['committer_fullname'],
db_revision['committer_name'],
db_revision['committer_email'],
)
committer_date = db_to_date(
db_revision['committer_date'],
db_revision['committer_date_offset'],
db_revision['committer_date_neg_utc_offset']
)
metadata = db_revision['metadata']
if metadata and 'extra_headers' in metadata:
extra_headers = db_to_git_headers(metadata['extra_headers'])
metadata['extra_headers'] = extra_headers
parents = []
if 'parents' in db_revision:
for parent in db_revision['parents']:
if parent:
parents.append(parent)
return {
'id': db_revision['id'],
'author': author,
'date': date,
'committer': committer,
'committer_date': committer_date,
'type': db_revision['type'],
'directory': db_revision['directory'],
'message': db_revision['message'],
'metadata': metadata,
'synthetic': db_revision['synthetic'],
'parents': parents,
}
def release_to_db(release):
"""Convert a swh-model release to its database representation.
"""
author = author_to_db(release['author'])
date = date_to_db(release['date'])
return {
'id': release['id'],
'author_fullname': author['fullname'],
'author_name': author['name'],
'author_email': author['email'],
'date': date['timestamp'],
'date_offset': date['offset'],
'date_neg_utc_offset': date['neg_utc_offset'],
'name': release['name'],
'target': release['target'],
'target_type': release['target_type'],
'comment': release['message'],
'synthetic': release['synthetic'],
}
def db_to_release(db_release):
"""Convert a database representation of a release to its swh-model
representation.
"""
author = db_to_author(
db_release['author_id'],
db_release['author_fullname'],
db_release['author_name'],
db_release['author_email'],
)
date = db_to_date(
db_release['date'],
db_release['date_offset'],
db_release['date_neg_utc_offset']
)
return {
'author': author,
'date': date,
'id': db_release['id'],
'name': db_release['name'],
'message': db_release['comment'],
'synthetic': db_release['synthetic'],
'target': db_release['target'],
'target_type': db_release['target_type'],
}
diff --git a/swh/storage/tests/test_converters.py b/swh/storage/tests/test_converters.py
index b02275059..f5ce97782 100644
--- a/swh/storage/tests/test_converters.py
+++ b/swh/storage/tests/test_converters.py
@@ -1,207 +1,130 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.storage import converters
@attr('!db')
class TestConverters(unittest.TestCase):
def setUp(self):
self.maxDiff = None
@istest
def db_to_author(self):
# when
actual_author = converters.db_to_author(
1, b'fullname', b'name', b'email')
# then
self.assertEquals(actual_author, {
'id': 1,
'fullname': b'fullname',
'name': b'name',
'email': b'email',
})
@istest
def db_to_revision(self):
# when
actual_revision = converters.db_to_revision({
'id': 'revision-id',
'date': None,
'date_offset': None,
'date_neg_utc_offset': None,
'committer_date': None,
'committer_date_offset': None,
'committer_date_neg_utc_offset': None,
'type': 'rev',
'directory': b'dir-sha1',
'message': b'commit message',
'author_id': 'auth-id',
'author_fullname': b'auth-fullname',
'author_name': b'auth-name',
'author_email': b'auth-email',
'committer_id': 'comm-id',
'committer_fullname': b'comm-fullname',
'committer_name': b'comm-name',
'committer_email': b'comm-email',
'metadata': {},
'synthetic': False,
'parents': [123, 456]
})
# then
self.assertEquals(actual_revision, {
'id': 'revision-id',
'author': {
'id': 'auth-id',
'fullname': b'auth-fullname',
'name': b'auth-name',
'email': b'auth-email',
},
'date': None,
'committer': {
'id': 'comm-id',
'fullname': b'comm-fullname',
'name': b'comm-name',
'email': b'comm-email',
},
'committer_date': None,
'type': 'rev',
'directory': b'dir-sha1',
'message': b'commit message',
'metadata': {},
'synthetic': False,
'parents': [123, 456],
})
@istest
def db_to_release(self):
# when
actual_release = converters.db_to_release({
'id': b'release-id',
'target': b'revision-id',
'target_type': 'revision',
'date': None,
'date_offset': None,
'date_neg_utc_offset': None,
'name': b'release-name',
'comment': b'release comment',
'synthetic': True,
'author_id': 'auth-id',
'author_fullname': b'auth-fullname',
'author_name': b'auth-name',
'author_email': b'auth-email',
})
# then
self.assertEquals(actual_release, {
'author': {
'id': 'auth-id',
'fullname': b'auth-fullname',
'name': b'auth-name',
'email': b'auth-email',
},
'date': None,
'id': b'release-id',
'name': b'release-name',
'message': b'release comment',
'synthetic': True,
'target': b'revision-id',
'target_type': 'revision'
})
- @istest
- def backslashescape_errors(self):
- raw_data_err = b'abcd\x80'
- with self.assertRaises(UnicodeDecodeError):
- raw_data_err.decode('utf-8', 'strict')
-
- self.assertEquals(
- raw_data_err.decode('utf-8', 'backslashescape'),
- 'abcd\\x80',
- )
-
- raw_data_ok = b'abcd\xc3\xa9'
- self.assertEquals(
- raw_data_ok.decode('utf-8', 'backslashescape'),
- raw_data_ok.decode('utf-8', 'strict'),
- )
-
- unicode_data = 'abcdef\u00a3'
- self.assertEquals(
- unicode_data.encode('ascii', 'backslashescape'),
- b'abcdef\\xa3',
- )
-
- @istest
- def encode_with_unescape(self):
- valid_data = '\\x01020304\\x00'
- valid_data_encoded = b'\x01020304\x00'
-
- self.assertEquals(
- valid_data_encoded,
- converters.encode_with_unescape(valid_data)
- )
-
- @istest
- def encode_with_unescape_invalid_escape(self):
- invalid_data = 'test\\abcd'
-
- with self.assertRaises(ValueError) as exc:
- converters.encode_with_unescape(invalid_data)
-
- self.assertIn('invalid escape', exc.exception.args[0])
- self.assertIn('position 4', exc.exception.args[0])
-
- @istest
- def decode_with_escape(self):
- backslashes = b'foo\\bar\\\\baz'
- backslashes_escaped = 'foo\\\\bar\\\\\\\\baz'
-
- self.assertEquals(
- backslashes_escaped,
- converters.decode_with_escape(backslashes),
- )
-
- valid_utf8 = b'foo\xc3\xa2'
- valid_utf8_escaped = 'foo\u00e2'
-
- self.assertEquals(
- valid_utf8_escaped,
- converters.decode_with_escape(valid_utf8),
- )
-
- invalid_utf8 = b'foo\xa2'
- invalid_utf8_escaped = 'foo\\xa2'
-
- self.assertEquals(
- invalid_utf8_escaped,
- converters.decode_with_escape(invalid_utf8),
- )
-
- valid_utf8_nul = b'foo\xc3\xa2\x00'
- valid_utf8_nul_escaped = 'foo\u00e2\\x00'
-
- self.assertEquals(
- valid_utf8_nul_escaped,
- converters.decode_with_escape(valid_utf8_nul),
- )
-
@istest
def db_to_git_headers(self):
raw_data = [
['gpgsig', b'garbage\x89a\x43b\x14'],
['extra', [b'fo\\\\\\o', b'bar\\', b'inval\\\\\x99id']],
]
db_data = converters.git_headers_to_db(raw_data)
loop = converters.db_to_git_headers(db_data)
self.assertEquals(raw_data, loop)

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 11:01 AM (3 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3277455

Event Timeline