diff --git a/swh/loader/tar/tests/test_utils.py b/swh/loader/tar/tests/test_utils.py new file mode 100644 index 0000000..e2786a0 --- /dev/null +++ b/swh/loader/tar/tests/test_utils.py @@ -0,0 +1,44 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random +import unittest + +from swh.loader.tar import utils + + +class UtilsLib(unittest.TestCase): + + def assert_ok(self, actual_data, expected_data): + """Check that actual_data and expected_data matched. + + Actual data is a random block of data. We want to check its + contents match exactly but not the order within. + + """ + out = [] + random.shuffle(expected_data) + for d in actual_data: + self.assertIn(d, expected_data) + out.append(d) + self.assertEqual(len(out), len(expected_data)) + + def test_random_block(self): + _input = list(range(0, 9)) + # given + actual_data = utils.random_blocks(_input, 2) + self.assert_ok(actual_data, expected_data=_input) + + def test_random_block2(self): + _input = list(range(9, 0, -1)) + # given + actual_data = utils.random_blocks(_input, 4) + self.assert_ok(actual_data, expected_data=_input) + + def test_random_block_with_fillvalue(self): + _input = [(i, i+1) for i in range(0, 9)] + actual_data = utils.random_blocks(_input, 2, + fillvalue=(None, None)) + self.assert_ok(actual_data, expected_data=_input) diff --git a/swh/loader/tar/utils.py b/swh/loader/tar/utils.py index af25e3d..73a17d6 100644 --- a/swh/loader/tar/utils.py +++ b/swh/loader/tar/utils.py @@ -1,48 +1,56 @@ -# Copyright (C) 2015-2017 The Software Heritage developers +# Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import itertools import random +import itertools def grouper(iterable, n, fillvalue=None): """Collect data into fixed-length chunks or blocks. Args: - iterable: an iterable - n: size of block - fillvalue: value to use for the last block + iterable (Iterable): an iterable + n (int): size of block to slice the iterable into + fillvalue (Optional[Something]): value to use as fill-in + values (typically for the last loop, the iterable might be + less than n elements). None by default but could be anything + relevant for the caller (e.g tuple of (None, None)) Returns: fixed-length chunks of blocks as iterables """ args = [iter(iterable)] * n - return itertools.zip_longest(*args, fillvalue=fillvalue) + for _data in itertools.zip_longest(*args, fillvalue=fillvalue): + yield (d for d in _data if d is not fillvalue) def random_blocks(iterable, block=100, fillvalue=None): """Given an iterable: + - slice the iterable in data set of block-sized elements - - randomized the data set - - yield each element + - randomized the block-sized elements + - yield each element of that randomized block-sized + - continue onto the next block-sized block Args: - iterable: iterable of data - block: number of elements per block - fillvalue: a fillvalue for the last block if not enough values in - last block + iterable (Iterable): an iterable + block (int): number of elements per block + fillvalue (Optional[Something]): value to use as fill-in + values (typically for the last loop, the iterable might be + less than n elements). None by default but could be anything + relevant for the caller (e.g tuple of (None, None)) - Returns: - An iterable of randomized per block-size elements. + Yields: + random elements per size of block """ count = 0 - for iterable in grouper(iterable, block, fillvalue=fillvalue): + for iter_ in grouper(iterable, block, fillvalue=fillvalue): count += 1 - lst = list(iterable) + lst = list(iter_) random.shuffle(lst) for e in lst: yield e