diff --git a/swh/core/bencode.py b/swh/core/bencode.py new file mode 100644 --- /dev/null +++ b/swh/core/bencode.py @@ -0,0 +1,52 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import List, Dict, Callable, Any + + +def _encode_bytes(parts: List[bytes], obj: bytes) -> None: + parts.append(b'%d:' % len(obj)) + parts.append(obj) + + +def _encode_int(parts: List[bytes], obj: int) -> None: + parts.append(b'i%de' % obj) + + +def _encode_list(parts: List[bytes], obj: list) -> None: + parts.append(b'l') + for item in obj: + _encode(parts, item) + parts.append(b'e') + + +def _encode_dict(parts: List[bytes], obj: dict) -> None: + parts.append(b'd') + for (key, value) in sorted(obj.items()): + assert type(key) == bytes + _encode_bytes(parts, key) + _encode(parts, value) + parts.append(b'e') + + +ENCODERS = { + bytes: _encode_bytes, + int: _encode_int, + list: _encode_list, + dict: _encode_dict, +} # type: Dict[type, Callable[[List[bytes], Any], None]] + + +def _encode(parts: List[bytes], obj: Any) -> None: + ENCODERS[type(obj)](parts, obj) + + +def encode(obj) -> bytes: + """Encodes a nested bytes/int/list/dict structure into its bencode + representation. + """ + parts = [] # type: List[bytes] + _encode(parts, obj) + return b''.join(parts) diff --git a/swh/core/tests/test_bencode.py b/swh/core/tests/test_bencode.py new file mode 100644 --- /dev/null +++ b/swh/core/tests/test_bencode.py @@ -0,0 +1,43 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.core.bencode import encode + +# The examples are from the specification: +# https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding + + +def test_bencode_bytes(): + assert b'4:spam' == encode(b'spam') + + assert b'0:' == encode(b'') + + +def test_encode_int(): + assert b'i3e' == encode(3) + + assert b'i-3e' == encode(-3) + + assert b'i0e' == encode(0) + + +def test_encode_list(): + assert b'l4:spam4:eggse' == encode([b'spam', b'eggs']) + + assert b'le' == encode([]) + + +def test_encode_dict(): + assert b'd3:cow3:moo4:spam4:eggse' == \ + encode({b'cow': b'moo', b'spam': b'eggs'}) + + assert b'd4:spaml1:a1:bee' == encode({b'spam': [b'a', b'b']}) + + assert (b'd9:publisher3:bob17:publisher-webpage15:www.example.com' + b'18:publisher.location4:homee') == \ + encode({b'publisher': b'bob', b'publisher-webpage': b'www.example.com', + b'publisher.location': b'home'}) + + assert b'de' == encode({})