diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -18,14 +18,16 @@ import attr -from swh.model.model import \ - Content, Directory, Revision, Release, Snapshot, OriginVisit, Origin -from swh.model.hashutil import DEFAULT_ALGORITHMS +from swh.model.model import ( + Content, Directory, Revision, Release, Snapshot, OriginVisit, Origin, + SHA1_SIZE) +from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from .storage import get_journal_writer from .converters import origin_url_to_sha1 +from .utils import get_partition_bounds_bytes # Max block size of contents to return BULK_BLOCK_CONTENT_LEN_MAX = 10000 @@ -306,6 +308,45 @@ 'next': next_content, } + def content_get_partition( + self, partition_id: int, nb_partitions: int, limit: int = 1000, + page_token: str = None): + """Splits contents into nb_partitions, and returns one of these based on + partition_id (which must be in [0, nb_partitions-1]) + + There is no guarantee on how the partitioning is done, or the + result order. + + Args: + partition_id (int): index of the partition to fetch + nb_partitions (int): total number of partitions to split into + limit (int): Limit result (default to 1000) + page_token (Optional[str]): opaque token used for pagination. + + Returns: + a dict with keys: + - contents (List[dict]): iterable of contents in the partition. + - **next_page_token** (Optional[str]): opaque token to be used as + `page_token` for retrieving the next page. if absent, there is + no more pages to gather. + """ + if limit is None: + raise ValueError('Development error: limit should not be None') + (start, end) = get_partition_bounds_bytes( + partition_id, nb_partitions, SHA1_SIZE) + if page_token: + start = hash_to_bytes(page_token) + if end is None: + end = b'\xff'*SHA1_SIZE + result = self.content_get_range(start, end, limit) + result2 = { + 'contents': result['contents'], + 'next_page_token': None, + } + if result['next']: + result2['next_page_token'] = hash_to_hex(result['next']) + return result2 + def content_get_metadata(self, content): """Retrieve content metadata in bulk diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -18,7 +18,8 @@ import psycopg2.pool from swh.core.api import remote_api_endpoint -from swh.model.hashutil import ALGORITHMS, hash_to_bytes +from swh.model.model import SHA1_SIZE +from swh.model.hashutil import ALGORITHMS, hash_to_bytes, hash_to_hex from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError try: @@ -33,6 +34,7 @@ from .exc import StorageDBError from .algos import diff from .metrics import timed, send_metric, process_metrics +from .utils import get_partition_bounds_bytes # Max block size of contents to return @@ -491,6 +493,48 @@ 'next': next_content, } + @remote_api_endpoint('content/partition') + @timed + @db_transaction() + def content_get_partition( + self, partition_id: int, nb_partitions: int, limit: int = 1000, + page_token: str = None, db=None, cur=None): + """Splits contents into nb_partitions, and returns one of these based on + partition_id (which must be in [0, nb_partitions-1]) + + There is no guarantee on how the partitioning is done, or the + result order. + + Args: + partition_id (int): index of the partition to fetch + nb_partitions (int): total number of partitions to split into + limit (int): Limit result (default to 1000) + page_token (Optional[str]): opaque token used for pagination. + + Returns: + a dict with keys: + - contents (List[dict]): iterable of contents in the partition. + - **next_page_token** (Optional[str]): opaque token to be used as + `page_token` for retrieving the next page. if absent, there is + no more pages to gather. + """ + if limit is None: + raise ValueError('Development error: limit should not be None') + (start, end) = get_partition_bounds_bytes( + partition_id, nb_partitions, SHA1_SIZE) + if page_token: + start = hash_to_bytes(page_token) + if end is None: + end = b'\xff'*SHA1_SIZE + result = self.content_get_range(start, end, limit) + result2 = { + 'contents': result['contents'], + 'next_page_token': None, + } + if result['next']: + result2['next_page_token'] = hash_to_hex(result['next']) + return result2 + @remote_api_endpoint('content/metadata') @timed @db_transaction_generator(statement_timeout=500) diff --git a/swh/storage/tests/storage_data.py b/swh/storage/tests/storage_data.py --- a/swh/storage/tests/storage_data.py +++ b/swh/storage/tests/storage_data.py @@ -10,7 +10,10 @@ class StorageData: def __getattr__(self, key): - v = globals()[key] + try: + v = globals()[key] + except KeyError as e: + raise AttributeError(e.args[0]) if hasattr(v, 'copy'): return v.copy() return v diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -7,6 +7,7 @@ from contextlib import contextmanager import datetime import itertools +import math import queue import random import threading @@ -23,6 +24,7 @@ from typing import ClassVar, Optional from swh.model import from_disk, identifiers +from swh.model.model import SHA1_SIZE from swh.model.hashutil import hash_to_bytes from swh.model.hypothesis_strategies import objects from swh.storage import HashCollision @@ -70,6 +72,17 @@ return [revision['id'], revision['parents']] +def assert_contents_ok(expected_contents, actual_contents, + keys_to_check={'sha1', 'data'}): + """Assert that a given list of contents matches on a given set of keys. + + """ + for k in keys_to_check: + expected_list = set([c.get(k) for c in expected_contents]) + actual_list = set([c.get(k) for c in actual_contents]) + assert actual_list == expected_list, k + + class TestStorage: """Main class for Storage testing. @@ -357,6 +370,76 @@ # then assert list(gen) == [missing_cont['sha1']] + def test_content_get_partition(self, swh_storage, swh_contents): + """content_get_partition paginates results if limit exceeded""" + expected_contents = [c for c in swh_contents + if c['status'] != 'absent'] + + actual_contents = [] + for i in range(16): + actual_result = swh_storage.content_get_partition(i, 16) + assert actual_result['next_page_token'] is None + actual_contents.extend(actual_result['contents']) + + assert_contents_ok( + expected_contents, actual_contents, ['sha1']) + + def test_content_get_partition_full(self, swh_storage, swh_contents): + """content_get_partition for a single partition returns all available + contents""" + expected_contents = [c for c in swh_contents + if c['status'] != 'absent'] + + actual_result = swh_storage.content_get_partition(0, 1) + assert actual_result['next_page_token'] is None + + actual_contents = actual_result['contents'] + assert_contents_ok( + expected_contents, actual_contents, ['sha1']) + + def test_content_get_partition_empty(self, swh_storage, swh_contents): + """content_get_partition for an empty partition returns nothing""" + first_sha1 = min(content['sha1'] for content in swh_contents) + first_sha1 = int.from_bytes(first_sha1, 'big') + # nb_partitions = smallest power of 2 such that first_sha1 is not in + # the first partition + nb_partitions = \ + 1 << (SHA1_SIZE*8 - math.floor(math.log2(first_sha1)) + 1) + + actual_result = swh_storage.content_get_partition(0, nb_partitions) + + assert actual_result['next_page_token'] is None + assert len(actual_result['contents']) == 0 + + def test_content_get_partition_limit_none(self, swh_storage): + """content_get_partition call with wrong limit input should fail""" + with pytest.raises(ValueError) as e: + swh_storage.content_get_partition(1, 16, limit=None) + + assert e.value.args == ('Development error: limit should not be None',) + + def test_generate_content_get_partition_pagination( + self, swh_storage, swh_contents): + """content_get_partition returns contents within range provided""" + expected_contents = [c for c in swh_contents + if c['status'] != 'absent'] + + # retrieve contents + actual_contents = [] + for i in range(4): + page_token = None + while True: + actual_result = swh_storage.content_get_partition( + i, 4, limit=3, page_token=page_token) + actual_contents.extend(actual_result['contents']) + page_token = actual_result['next_page_token'] + + if page_token is None: + break + + assert_contents_ok( + expected_contents, actual_contents, ['sha1']) + def test_content_get_metadata(self, swh_storage): cont1 = data.cont cont2 = data.cont2 @@ -3062,16 +3145,6 @@ class TestStorageGeneratedData: - def assert_contents_ok(self, expected_contents, actual_contents, - keys_to_check={'sha1', 'data'}): - """Assert that a given list of contents matches on a given set of keys. - - """ - for k in keys_to_check: - expected_list = set([c.get(k) for c in expected_contents]) - actual_list = set([c.get(k) for c in actual_contents]) - assert actual_list == expected_list, k - def test_generate_content_get(self, swh_storage, swh_contents): contents_with_data = [c for c in swh_contents if c['status'] != 'absent'] @@ -3081,7 +3154,7 @@ # retrieve contents actual_contents = list(swh_storage.content_get(get_sha1s)) assert None not in actual_contents - self.assert_contents_ok(contents_with_data, actual_contents) + assert_contents_ok(contents_with_data, actual_contents) def test_generate_content_get_metadata(self, swh_storage, swh_contents): # input the list of sha1s we want from storage @@ -3096,8 +3169,8 @@ keys_to_check = {'length', 'status', 'sha1', 'sha1_git', 'sha256', 'blake2s256'} - self.assert_contents_ok(expected_contents, actual_contents, - keys_to_check=keys_to_check) + assert_contents_ok(expected_contents, actual_contents, + keys_to_check=keys_to_check) def test_generate_content_get_range(self, swh_storage, swh_contents): """content_get_range returns complete range""" @@ -3116,7 +3189,7 @@ expected_contents = [c for c in present_contents if start <= c['sha1'] <= end] if expected_contents: - self.assert_contents_ok( + assert_contents_ok( expected_contents, actual_contents, ['sha1']) else: assert actual_contents == [] @@ -3135,7 +3208,7 @@ expected_contents = [c for c in present_contents if start <= c['sha1'] <= end] if expected_contents: - self.assert_contents_ok( + assert_contents_ok( expected_contents, actual_contents, ['sha1']) else: assert actual_contents == [] @@ -3173,7 +3246,7 @@ expected_contents = [c for c in swh_contents if c['status'] != 'absent'] - self.assert_contents_ok( + assert_contents_ok( expected_contents, actual_contents, ['sha1']) def test_generate_content_get_range_limit(self, swh_storage, swh_contents): @@ -3196,7 +3269,7 @@ assert len(actual_contents) == limited_results expected_contents = [contents_map[sha1] for sha1 in get_sha1s[:-1]] - self.assert_contents_ok( + assert_contents_ok( expected_contents, actual_contents, ['sha1']) # retrieve next part @@ -3205,7 +3278,7 @@ actual_contents2 = actual_results2['contents'] assert len(actual_contents2) == 1 - self.assert_contents_ok( + assert_contents_ok( [contents_map[get_sha1s[-1]]], actual_contents2, ['sha1']) def test_origin_get_range_from_zero(self, swh_storage, swh_origins): diff --git a/swh/storage/utils.py b/swh/storage/utils.py new file mode 100644 --- /dev/null +++ b/swh/storage/utils.py @@ -0,0 +1,42 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Optional, Tuple + + +def _is_power_of_two(n: int) -> bool: + return n > 0 and n & (n-1) == 0 + + +def get_partition_bounds_bytes( + i: int, n: int, nb_bytes: int) -> Tuple[bytes, Optional[bytes]]: + r"""Splits the range [0; 2^(nb_bytes*8)) into n same-length intervals, + and returns the boundaries of this interval (both inclusive); or None + as upper bound, if this is the last partition + + n must be a power of 2. + + >>> get_partition_bounds_bytes(0, 16, 2) == (b'\x00\x00', b'\x10\x00') + True + >>> get_partition_bounds_bytes(1, 16, 2) == (b'\x10\x00', b'\x20\x00') + True + >>> get_partition_bounds_bytes(14, 16, 2) == (b'\xe0\x00', b'\xf0\x00') + True + >>> get_partition_bounds_bytes(15, 16, 2) == (b'\xf0\x00', None) + True + """ + if not _is_power_of_two(n): + raise ValueError('number of partitions must be a power of two') + if not 0 <= i < n: + raise ValueError( + 'partition index must be between 0 and the number of partitions.') + + space_size = 1 << (nb_bytes*8) + partition_size = space_size//n + + start = (partition_size*i).to_bytes(nb_bytes, 'big') + end = None if i == n-1 \ + else (partition_size*(i+1)).to_bytes(nb_bytes, 'big') + return (start, end) diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -13,6 +13,7 @@ slow: --hypothesis-profile=slow \ --cov={envsitepackagesdir}/swh/storage \ {envsitepackagesdir}/swh/storage \ + --doctest-modules \ --cov-branch {posargs} [testenv:flake8]