diff --git a/swh/core/bencode.py b/swh/core/bencode.py new file mode 100644 --- /dev/null +++ b/swh/core/bencode.py @@ -0,0 +1,58 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Provides a bencode encoder. + +This allows encoding a nested bytes/int/list/dict structure +into its bencode representation.""" + +from typing import Dict, Callable, Any, Generator + + +def _encode_bytes(obj: bytes) -> Generator[bytes, None, None]: + yield b'%d:' % len(obj) + yield obj + + +def _encode_int(obj: int) -> Generator[bytes, None, None]: + yield b'i%de' % obj + + +def _encode_list(obj: list) -> Generator[bytes, None, None]: + yield b'l' + for item in obj: + yield from _encode(item) + yield b'e' + + +def _encode_dict(obj: dict) -> Generator[bytes, None, None]: + yield b'd' + for (key, value) in sorted(obj.items()): + assert type(key) == bytes + yield from _encode_bytes(key) + yield from _encode(value) + yield b'e' + + +_ENCODERS = { + bytes: _encode_bytes, + int: _encode_int, + list: _encode_list, + dict: _encode_dict, +} # type: Dict[type, Callable[[Any], Generator[bytes, None, None]]] + + +def _encode(obj: Any) -> Generator[bytes, None, None]: + encoder = _ENCODERS.get(type(obj), None) + if not encoder: + raise TypeError('Unsupported type for bencoding: {}'.format(type(obj))) + yield from encoder(obj) + + +def encode(obj) -> bytes: + """Encodes a nested bytes/int/list/dict structure into its bencode + representation. + """ + return b''.join(_encode(obj)) diff --git a/swh/core/tests/test_bencode.py b/swh/core/tests/test_bencode.py new file mode 100644 --- /dev/null +++ b/swh/core/tests/test_bencode.py @@ -0,0 +1,54 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest + +from swh.core.bencode import encode + + +class SpecificationTestCase(unittest.TestCase): + """These examples are from the specification: + https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding""" + + def test_bencode_bytes(self): + self.assertEqual(b'4:spam', encode(b'spam')) + + self.assertEqual(b'0:', encode(b'')) + + def test_encode_int(self): + self.assertEqual(b'i3e', encode(3)) + + self.assertEqual(b'i-3e', encode(-3)) + + self.assertEqual(b'i0e', encode(0)) + + def test_encode_list(self): + self.assertEqual(b'l4:spam4:eggse', encode([b'spam', b'eggs'])) + + self.assertEqual(b'le', encode([])) + + def test_encode_dict(self): + self.assertEqual( + b'd3:cow3:moo4:spam4:eggse', + encode({b'cow': b'moo', b'spam': b'eggs'})) + + self.assertEqual( + b'd4:spaml1:a1:bee', + encode({b'spam': [b'a', b'b']})) + + self.assertEqual( + b'd9:publisher3:bob17:publisher-webpage15:www.example.com' + b'18:publisher.location4:homee', + encode({b'publisher': b'bob', + b'publisher-webpage': b'www.example.com', + b'publisher.location': b'home'})) + + self.assertEqual(b'de', encode({})) + + +class TypeTestCase(unittest.TestCase): + def test_invalid_type(self): + with self.assertRaises(TypeError): + encode('string')