Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9340700
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Subscribers
None
View Options
diff --git a/swh/storage/converters.py b/swh/storage/converters.py
index 9bf2a35a5..7015fea51 100644
--- a/swh/storage/converters.py
+++ b/swh/storage/converters.py
@@ -1,360 +1,306 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import codecs
import datetime
import numbers
+from swh.core.utils import decode_with_escape, encode_with_unescape
DEFAULT_AUTHOR = {
'fullname': None,
'name': None,
'email': None,
}
DEFAULT_DATE = {
'timestamp': None,
'offset': 0,
'neg_utc_offset': None,
}
-def backslashescape_errors(exception):
- if isinstance(exception, UnicodeDecodeError):
- bad_data = exception.object[exception.start:exception.end]
- escaped = ''.join(r'\x%02x' % x for x in bad_data)
- return escaped, exception.end
-
- return codecs.backslashreplace_errors(exception)
-
-codecs.register_error('backslashescape', backslashescape_errors)
-
-
-def decode_with_escape(value):
- """Decode a bytestring as utf-8, escaping the bytes of invalid utf-8 sequences
- as \\x<hex value>. We also escape NUL bytes as they are invalid in JSON
- strings.
- """
- # escape backslashes
- value = value.replace(b'\\', b'\\\\')
- value = value.replace(b'\x00', b'\\x00')
- return value.decode('utf-8', 'backslashescape')
-
-
-def encode_with_unescape(value):
- """Encode an unicode string containing \\x<hex> backslash escapes"""
- slices = []
- start = 0
- odd_backslashes = False
- i = 0
- while i < len(value):
- if value[i] == '\\':
- odd_backslashes = not odd_backslashes
- else:
- if odd_backslashes:
- if value[i] != 'x':
- raise ValueError('invalid escape for %r at position %d' %
- (value, i-1))
- slices.append(
- value[start:i-1].replace('\\\\', '\\').encode('utf-8')
- )
- slices.append(bytes.fromhex(value[i+1:i+3]))
-
- odd_backslashes = False
- start = i = i + 3
- continue
-
- i += 1
-
- slices.append(
- value[start:i].replace('\\\\', '\\').encode('utf-8')
- )
-
- return b''.join(slices)
-
-
def author_to_db(author):
"""Convert a swh-model author to its DB representation.
Args: a swh-model compatible author
Returns:
a dict containing three keys: author, fullname and email
"""
if author is None:
return DEFAULT_AUTHOR
return author
def db_to_author(id, fullname, name, email):
"""Convert the DB representation of an author to a swh-model author.
Args:
id (long): the author's identifier
fullname (bytes): the author's fullname
name (bytes): the author's name
email (bytes): the author's email
Returns:
a dict with four keys: id, fullname, name and email, or None if the id
is None
"""
if id is None:
return None
return {
'id': id,
'fullname': fullname,
'name': name,
'email': email,
}
def git_headers_to_db(git_headers):
"""Convert git headers to their database representation.
We convert the bytes to unicode by decoding them into utf-8 and replacing
invalid utf-8 sequences with backslash escapes.
"""
ret = []
for key, values in git_headers:
if isinstance(values, list):
ret.append([key, [decode_with_escape(value) for value in values]])
else:
ret.append([key, decode_with_escape(values)])
return ret
def db_to_git_headers(db_git_headers):
ret = []
for key, values in db_git_headers:
if isinstance(values, list):
ret.append([key, [encode_with_unescape(value)
for value in values]])
else:
ret.append([key, encode_with_unescape(values)])
return ret
def db_to_date(date, offset, neg_utc_offset):
"""Convert the DB representation of a date to a swh-model compatible date.
Args:
date (datetime.datetime): a date pulled out of the database
offset (int): an integer number of minutes representing an UTC offset
neg_utc_offset (boolean): whether an utc offset is negative
Returns:
a dict with three keys:
timestamp: a timestamp from UTC
offset: the number of minutes since UTC
negative_utc: whether a null UTC offset is negative
"""
if date is None:
return None
return {
'timestamp': date.timestamp(),
'offset': offset,
'negative_utc': neg_utc_offset,
}
def date_to_db(date_offset):
"""Convert a swh-model date_offset to its DB representation.
Args: a swh-model compatible date_offset
Returns:
a dict with three keys:
timestamp: a date in ISO format
offset: the UTC offset in minutes
neg_utc_offset: a boolean indicating whether a null offset is
negative or positive.
"""
if date_offset is None:
return DEFAULT_DATE
if isinstance(date_offset, numbers.Real):
date_offset = datetime.datetime.fromtimestamp(date_offset,
tz=datetime.timezone.utc)
if isinstance(date_offset, datetime.datetime):
timestamp = date_offset
utcoffset = date_offset.utcoffset()
offset = int(utcoffset.total_seconds()) // 60
neg_utc_offset = False if offset == 0 else None
else:
if isinstance(date_offset['timestamp'], numbers.Real):
timestamp = datetime.datetime.fromtimestamp(
date_offset['timestamp'], tz=datetime.timezone.utc)
else:
timestamp = date_offset['timestamp']
offset = date_offset['offset']
neg_utc_offset = date_offset.get('negative_utc', None)
return {
'timestamp': timestamp.isoformat(),
'offset': offset,
'neg_utc_offset': neg_utc_offset,
}
def revision_to_db(revision):
"""Convert a swh-model revision to its database representation.
"""
author = author_to_db(revision['author'])
date = date_to_db(revision['date'])
committer = author_to_db(revision['committer'])
committer_date = date_to_db(revision['committer_date'])
metadata = revision['metadata']
if metadata and 'extra_headers' in metadata:
metadata = metadata.copy()
extra_headers = git_headers_to_db(metadata['extra_headers'])
metadata['extra_headers'] = extra_headers
return {
'id': revision['id'],
'author_fullname': author['fullname'],
'author_name': author['name'],
'author_email': author['email'],
'date': date['timestamp'],
'date_offset': date['offset'],
'date_neg_utc_offset': date['neg_utc_offset'],
'committer_fullname': committer['fullname'],
'committer_name': committer['name'],
'committer_email': committer['email'],
'committer_date': committer_date['timestamp'],
'committer_date_offset': committer_date['offset'],
'committer_date_neg_utc_offset': committer_date['neg_utc_offset'],
'type': revision['type'],
'directory': revision['directory'],
'message': revision['message'],
'metadata': metadata,
'synthetic': revision['synthetic'],
'parents': [
{
'id': revision['id'],
'parent_id': parent,
'parent_rank': i,
} for i, parent in enumerate(revision['parents'])
],
}
def db_to_revision(db_revision):
"""Convert a database representation of a revision to its swh-model
representation."""
author = db_to_author(
db_revision['author_id'],
db_revision['author_fullname'],
db_revision['author_name'],
db_revision['author_email'],
)
date = db_to_date(
db_revision['date'],
db_revision['date_offset'],
db_revision['date_neg_utc_offset'],
)
committer = db_to_author(
db_revision['committer_id'],
db_revision['committer_fullname'],
db_revision['committer_name'],
db_revision['committer_email'],
)
committer_date = db_to_date(
db_revision['committer_date'],
db_revision['committer_date_offset'],
db_revision['committer_date_neg_utc_offset']
)
metadata = db_revision['metadata']
if metadata and 'extra_headers' in metadata:
extra_headers = db_to_git_headers(metadata['extra_headers'])
metadata['extra_headers'] = extra_headers
parents = []
if 'parents' in db_revision:
for parent in db_revision['parents']:
if parent:
parents.append(parent)
return {
'id': db_revision['id'],
'author': author,
'date': date,
'committer': committer,
'committer_date': committer_date,
'type': db_revision['type'],
'directory': db_revision['directory'],
'message': db_revision['message'],
'metadata': metadata,
'synthetic': db_revision['synthetic'],
'parents': parents,
}
def release_to_db(release):
"""Convert a swh-model release to its database representation.
"""
author = author_to_db(release['author'])
date = date_to_db(release['date'])
return {
'id': release['id'],
'author_fullname': author['fullname'],
'author_name': author['name'],
'author_email': author['email'],
'date': date['timestamp'],
'date_offset': date['offset'],
'date_neg_utc_offset': date['neg_utc_offset'],
'name': release['name'],
'target': release['target'],
'target_type': release['target_type'],
'comment': release['message'],
'synthetic': release['synthetic'],
}
def db_to_release(db_release):
"""Convert a database representation of a release to its swh-model
representation.
"""
author = db_to_author(
db_release['author_id'],
db_release['author_fullname'],
db_release['author_name'],
db_release['author_email'],
)
date = db_to_date(
db_release['date'],
db_release['date_offset'],
db_release['date_neg_utc_offset']
)
return {
'author': author,
'date': date,
'id': db_release['id'],
'name': db_release['name'],
'message': db_release['comment'],
'synthetic': db_release['synthetic'],
'target': db_release['target'],
'target_type': db_release['target_type'],
}
diff --git a/swh/storage/tests/test_converters.py b/swh/storage/tests/test_converters.py
index b02275059..f5ce97782 100644
--- a/swh/storage/tests/test_converters.py
+++ b/swh/storage/tests/test_converters.py
@@ -1,207 +1,130 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.storage import converters
@attr('!db')
class TestConverters(unittest.TestCase):
def setUp(self):
self.maxDiff = None
@istest
def db_to_author(self):
# when
actual_author = converters.db_to_author(
1, b'fullname', b'name', b'email')
# then
self.assertEquals(actual_author, {
'id': 1,
'fullname': b'fullname',
'name': b'name',
'email': b'email',
})
@istest
def db_to_revision(self):
# when
actual_revision = converters.db_to_revision({
'id': 'revision-id',
'date': None,
'date_offset': None,
'date_neg_utc_offset': None,
'committer_date': None,
'committer_date_offset': None,
'committer_date_neg_utc_offset': None,
'type': 'rev',
'directory': b'dir-sha1',
'message': b'commit message',
'author_id': 'auth-id',
'author_fullname': b'auth-fullname',
'author_name': b'auth-name',
'author_email': b'auth-email',
'committer_id': 'comm-id',
'committer_fullname': b'comm-fullname',
'committer_name': b'comm-name',
'committer_email': b'comm-email',
'metadata': {},
'synthetic': False,
'parents': [123, 456]
})
# then
self.assertEquals(actual_revision, {
'id': 'revision-id',
'author': {
'id': 'auth-id',
'fullname': b'auth-fullname',
'name': b'auth-name',
'email': b'auth-email',
},
'date': None,
'committer': {
'id': 'comm-id',
'fullname': b'comm-fullname',
'name': b'comm-name',
'email': b'comm-email',
},
'committer_date': None,
'type': 'rev',
'directory': b'dir-sha1',
'message': b'commit message',
'metadata': {},
'synthetic': False,
'parents': [123, 456],
})
@istest
def db_to_release(self):
# when
actual_release = converters.db_to_release({
'id': b'release-id',
'target': b'revision-id',
'target_type': 'revision',
'date': None,
'date_offset': None,
'date_neg_utc_offset': None,
'name': b'release-name',
'comment': b'release comment',
'synthetic': True,
'author_id': 'auth-id',
'author_fullname': b'auth-fullname',
'author_name': b'auth-name',
'author_email': b'auth-email',
})
# then
self.assertEquals(actual_release, {
'author': {
'id': 'auth-id',
'fullname': b'auth-fullname',
'name': b'auth-name',
'email': b'auth-email',
},
'date': None,
'id': b'release-id',
'name': b'release-name',
'message': b'release comment',
'synthetic': True,
'target': b'revision-id',
'target_type': 'revision'
})
- @istest
- def backslashescape_errors(self):
- raw_data_err = b'abcd\x80'
- with self.assertRaises(UnicodeDecodeError):
- raw_data_err.decode('utf-8', 'strict')
-
- self.assertEquals(
- raw_data_err.decode('utf-8', 'backslashescape'),
- 'abcd\\x80',
- )
-
- raw_data_ok = b'abcd\xc3\xa9'
- self.assertEquals(
- raw_data_ok.decode('utf-8', 'backslashescape'),
- raw_data_ok.decode('utf-8', 'strict'),
- )
-
- unicode_data = 'abcdef\u00a3'
- self.assertEquals(
- unicode_data.encode('ascii', 'backslashescape'),
- b'abcdef\\xa3',
- )
-
- @istest
- def encode_with_unescape(self):
- valid_data = '\\x01020304\\x00'
- valid_data_encoded = b'\x01020304\x00'
-
- self.assertEquals(
- valid_data_encoded,
- converters.encode_with_unescape(valid_data)
- )
-
- @istest
- def encode_with_unescape_invalid_escape(self):
- invalid_data = 'test\\abcd'
-
- with self.assertRaises(ValueError) as exc:
- converters.encode_with_unescape(invalid_data)
-
- self.assertIn('invalid escape', exc.exception.args[0])
- self.assertIn('position 4', exc.exception.args[0])
-
- @istest
- def decode_with_escape(self):
- backslashes = b'foo\\bar\\\\baz'
- backslashes_escaped = 'foo\\\\bar\\\\\\\\baz'
-
- self.assertEquals(
- backslashes_escaped,
- converters.decode_with_escape(backslashes),
- )
-
- valid_utf8 = b'foo\xc3\xa2'
- valid_utf8_escaped = 'foo\u00e2'
-
- self.assertEquals(
- valid_utf8_escaped,
- converters.decode_with_escape(valid_utf8),
- )
-
- invalid_utf8 = b'foo\xa2'
- invalid_utf8_escaped = 'foo\\xa2'
-
- self.assertEquals(
- invalid_utf8_escaped,
- converters.decode_with_escape(invalid_utf8),
- )
-
- valid_utf8_nul = b'foo\xc3\xa2\x00'
- valid_utf8_nul_escaped = 'foo\u00e2\\x00'
-
- self.assertEquals(
- valid_utf8_nul_escaped,
- converters.decode_with_escape(valid_utf8_nul),
- )
-
@istest
def db_to_git_headers(self):
raw_data = [
['gpgsig', b'garbage\x89a\x43b\x14'],
['extra', [b'fo\\\\\\o', b'bar\\', b'inval\\\\\x99id']],
]
db_data = converters.git_headers_to_db(raw_data)
loop = converters.db_to_git_headers(db_data)
self.assertEquals(raw_data, loop)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 11:01 AM (3 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3277455
Attached To
R65 Staging repository
Event Timeline
Log In to Comment