diff --git a/swh/core/bencode.py b/swh/core/bencode.py new file mode 100644 --- /dev/null +++ b/swh/core/bencode.py @@ -0,0 +1,60 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Provides a bencode encoder. + +This allows encoding a nested bytes/int/list/dict structure +into its bencode representation.""" + +from typing import Dict, Callable, Any, Generator + + +def _encode_bytes(obj: bytes) -> Generator[bytes, None, None]: + yield b'%d:' % len(obj) + yield obj + + +def _encode_int(obj: int) -> Generator[bytes, None, None]: + yield b'i%de' % obj + + +def _encode_list(obj: list) -> Generator[bytes, None, None]: + yield b'l' + for item in obj: + yield from _encode(item) + yield b'e' + + +def _encode_dict(obj: dict) -> Generator[bytes, None, None]: + yield b'd' + for (key, value) in sorted(obj.items()): + if type(key) != bytes: + raise TypeError('bencode dictionary keys must be bytes, not {}.' + .format(type(key))) + yield from _encode_bytes(key) + yield from _encode(value) + yield b'e' + + +_ENCODERS = { + bytes: _encode_bytes, + int: _encode_int, + list: _encode_list, + dict: _encode_dict, +} # type: Dict[type, Callable[[Any], Generator[bytes, None, None]]] + + +def _encode(obj: Any) -> Generator[bytes, None, None]: + encoder = _ENCODERS.get(type(obj), None) + if not encoder: + raise TypeError('Unsupported type for bencoding: {}'.format(type(obj))) + yield from encoder(obj) + + +def encode(obj) -> bytes: + """Encodes a nested bytes/int/list/dict structure into its bencode + representation. + """ + return b''.join(_encode(obj)) diff --git a/swh/core/tests/test_bencode.py b/swh/core/tests/test_bencode.py new file mode 100644 --- /dev/null +++ b/swh/core/tests/test_bencode.py @@ -0,0 +1,61 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""The examples are from the specification: +https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding""" + + +import pytest + +from swh.core.bencode import encode + + +def test_bencode_bytes(): + assert b'4:spam' == encode(b'spam') + + assert b'0:' == encode(b'') + + +def test_encode_int(): + assert b'i3e' == encode(3) + + assert b'i-3e' == encode(-3) + + assert b'i0e' == encode(0) + + +def test_encode_list(): + assert b'l4:spam4:eggse' == encode([b'spam', b'eggs']) + + assert b'le' == encode([]) + + +def test_encode_dict(): + assert \ + b'd3:cow3:moo4:spam4:eggse' == \ + encode({b'cow': b'moo', b'spam': b'eggs'}) + + assert \ + b'd4:spaml1:a1:bee' == \ + encode({b'spam': [b'a', b'b']}) + + assert \ + (b'd9:publisher3:bob17:publisher-webpage15:www.example.com' + b'18:publisher.location4:homee') == \ + encode({b'publisher': b'bob', + b'publisher-webpage': b'www.example.com', + b'publisher.location': b'home'}) + + assert b'de' == encode({}) + + +def test_invalid_root_type(): + with pytest.raises(TypeError): + encode('string') + + +def test_invalid_dict_key_type(): + with pytest.raises(TypeError): + encode({'string': b'bytes'})