diff --git a/debian/control b/debian/control --- a/debian/control +++ b/debian/control @@ -7,7 +7,7 @@ python3-all, python3-nose, python3-setuptools, - python3-swh.core (>= 0.0.36~), + python3-swh.core (>= 0.0.46~), python3-swh.loader.core (>= 0.0.35~), python3-swh.loader.dir (>= 0.0.33~), python3-swh.model (>= 0.0.27~), @@ -19,7 +19,7 @@ Package: python3-swh.loader.tar Architecture: all -Depends: python3-swh.core (>= 0.0.36~), +Depends: python3-swh.core (>= 0.0.46~), python3-swh.loader.core (>= 0.0.35~), python3-swh.loader.dir (>= 0.0.33~), python3-swh.model (>= 0.0.27~), diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ -swh.core >= 0.0.36 +swh.core >= 0.0.46 swh.model >= 0.0.27 swh.scheduler >= 0.0.14 swh.storage >= 0.0.83 diff --git a/swh/loader/tar/db.py b/swh/loader/tar/db.py deleted file mode 100644 --- a/swh/loader/tar/db.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import psycopg2 - - -def connect(db_url): - """Open db connection. - """ - return psycopg2.connect(db_url) - - -def execute(cur, query_params): - """Execute the query_params. - query_params is expected to be either: - - a sql query (string) - - a tuple (sql query, params) - """ - if isinstance(query_params, str): - cur.execute(query_params) - else: - cur.execute(*query_params) - - -def entry_to_bytes(entry): - """Convert an entry coming from the database to bytes""" - if isinstance(entry, memoryview): - return entry.tobytes() - return entry - - -def line_to_bytes(line): - """Convert a line coming from the database to bytes""" - return line.__class__(entry_to_bytes(entry) for entry in line) - - -def cursor_to_bytes(cursor): - """Yield all the data from a cursor as bytes""" - yield from (line_to_bytes(line) for line in cursor) - - -def query_fetch(db_conn, query_params): - """Execute sql query which returns results. - query_params is expected to be either: - - a sql query (string) - - a tuple (sql query, params) - """ - with db_conn.cursor() as cur: - execute(cur, query_params) - yield from cursor_to_bytes(cur) diff --git a/swh/loader/tar/tests/test_utils.py b/swh/loader/tar/tests/test_utils.py new file mode 100644 --- /dev/null +++ b/swh/loader/tar/tests/test_utils.py @@ -0,0 +1,43 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random +import unittest + +from swh.loader.tar import utils + + +class UtilsLib(unittest.TestCase): + + def assert_ok(self, actual_data, expected_data): + """Check that actual_data and expected_data matched. + + Actual data is a random block of data. We want to check its + contents match exactly but not the order within. + + """ + out = [] + random.shuffle(expected_data) + for d in actual_data: + self.assertIn(d, expected_data) + out.append(d) + self.assertEqual(len(out), len(expected_data)) + + def test_random_block(self): + _input = list(range(0, 9)) + # given + actual_data = utils.random_blocks(_input, 2) + self.assert_ok(actual_data, expected_data=_input) + + def test_random_block2(self): + _input = list(range(9, 0, -1)) + # given + actual_data = utils.random_blocks(_input, 4) + self.assert_ok(actual_data, expected_data=_input) + + def test_random_block_with_fillvalue(self): + _input = [(i, i+1) for i in range(0, 9)] + actual_data = utils.random_blocks(_input, 2) + self.assert_ok(actual_data, expected_data=_input) diff --git a/swh/loader/tar/utils.py b/swh/loader/tar/utils.py --- a/swh/loader/tar/utils.py +++ b/swh/loader/tar/utils.py @@ -1,48 +1,35 @@ -# Copyright (C) 2015-2017 The Software Heritage developers +# Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import itertools import random +from swh.core.utils import grouper -def grouper(iterable, n, fillvalue=None): - """Collect data into fixed-length chunks or blocks. - Args: - iterable: an iterable - n: size of block - fillvalue: value to use for the last block - - Returns: - fixed-length chunks of blocks as iterables - - """ - args = [iter(iterable)] * n - return itertools.zip_longest(*args, fillvalue=fillvalue) +def random_blocks(iterable, block=100): + """Randomize iterable per block of size block. + Given an iterable: -def random_blocks(iterable, block=100, fillvalue=None): - """Given an iterable: - slice the iterable in data set of block-sized elements - - randomized the data set - - yield each element + - randomized the block-sized elements + - yield each element of that randomized block-sized + - continue onto the next block-sized block Args: - iterable: iterable of data - block: number of elements per block - fillvalue: a fillvalue for the last block if not enough values in - last block + iterable (Iterable): an iterable + block (int): number of elements per block - Returns: - An iterable of randomized per block-size elements. + Yields: + random element of the iterable """ count = 0 - for iterable in grouper(iterable, block, fillvalue=fillvalue): + for iter_ in grouper(iterable, block): count += 1 - lst = list(iterable) + lst = list(iter_) random.shuffle(lst) for e in lst: yield e