diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -177,9 +177,9 @@ self, partition_id: int, nb_partitions: int, - limit: int = 1000, page_token: Optional[str] = None, - ) -> Dict[str, Any]: + limit: int = 1000, + ) -> PagedResult[Content]: if limit is None: raise StorageArgumentException("limit should not be None") @@ -195,19 +195,23 @@ raise StorageArgumentException("Invalid page_token.") range_start = int(page_token) - # Get the first rows of the range + next_page_token: Optional[str] = None + rows = self._cql_runner.content_get_token_range(range_start, range_end, limit) - rows = list(rows) + contents = [] + last_id: Optional[int] = None + for row in rows: + if row.status == "absent": + continue + row_d = row._asdict() + last_id = row_d.pop("tok") + contents.append(Content(**row_d)) - if len(rows) == limit: - next_page_token: Optional[str] = str(rows[-1].tok + 1) - else: - next_page_token = None + if len(contents) == limit: + assert last_id is not None + next_page_token = str(last_id + 1) - return { - "contents": [row._asdict() for row in rows if row.status != "absent"], - "next_page_token": next_page_token, - } + return PagedResult(results=contents, next_page_token=next_page_token,) def content_get_metadata(self, contents: List[bytes]) -> Dict[bytes, List[Dict]]: result: Dict[bytes, List[Dict]] = {sha1: [] for sha1 in contents} diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -292,9 +292,9 @@ self, partition_id: int, nb_partitions: int, - limit: int = 1000, page_token: Optional[str] = None, - ) -> Dict[str, Any]: + limit: int = 1000, + ) -> PagedResult[Content]: if limit is None: raise StorageArgumentException("limit should not be None") (start, end) = get_partition_bounds_bytes( @@ -304,14 +304,24 @@ start = hash_to_bytes(page_token) if end is None: end = b"\xff" * SHA1_SIZE - result = self.content_get_range(start, end, limit) - result2 = { - "contents": result["contents"], - "next_page_token": None, - } - if result["next"]: - result2["next_page_token"] = hash_to_hex(result["next"]) - return result2 + + next_page_token: Optional[str] = None + sha1s = ( + (sha1, content_key) + for sha1 in self._sorted_sha1s.iter_from(start) + for content_key in self._content_indexes["sha1"][sha1] + ) + contents: List[Content] = [] + for counter, (sha1, key) in enumerate(sha1s): + if sha1 > end: + break + if counter >= limit: + next_page_token = hash_to_hex(sha1) + break + contents.append(self._contents[key]) + + assert len(contents) <= limit + return PagedResult(results=contents, next_page_token=next_page_token) def content_get_metadata(self, contents: List[bytes]) -> Dict[bytes, List[Dict]]: result: Dict = {sha1: [] for sha1 in contents} diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -180,9 +180,9 @@ self, partition_id: int, nb_partitions: int, - limit: int = 1000, page_token: Optional[str] = None, - ) -> Dict[str, Any]: + limit: int = 1000, + ) -> PagedResult[Content]: """Splits contents into nb_partitions, and returns one of these based on partition_id (which must be in [0, nb_partitions-1]) @@ -190,17 +190,14 @@ result order. Args: - partition_id (int): index of the partition to fetch - nb_partitions (int): total number of partitions to split into - limit (int): Limit result (default to 1000) - page_token (Optional[str]): opaque token used for pagination. + partition_id: index of the partition to fetch + nb_partitions: total number of partitions to split into + page_token: opaque token used for pagination. + limit: Limit result (default to 1000) + + Returns: PagedResult of Content model objects within the partition. If + next_page_token is None, there is no longer data to retrieve. - Returns: - a dict with keys: - - contents (List[dict]): iterable of contents in the partition. - - **next_page_token** (Optional[str]): opaque token to be used as - `page_token` for retrieving the next page. if absent, there is - no more pages to gather. """ ... diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -278,35 +278,15 @@ @timed @db_transaction() - def content_get_range( - self, start: bytes, end: bytes, limit: int = 1000, db=None, cur=None - ) -> Dict[str, Any]: - if limit is None: - raise StorageArgumentException("limit should not be None") - contents = [] - next_content = None - for counter, content_row in enumerate( - db.content_get_range(start, end, limit + 1, cur) - ): - content = dict(zip(db.content_get_metadata_keys, content_row)) - if counter >= limit: - # take the last commit for the next page starting from this - next_content = content["sha1"] - break - contents.append(content) - return { - "contents": contents, - "next": next_content, - } - - @timed def content_get_partition( self, partition_id: int, nb_partitions: int, - limit: int = 1000, page_token: Optional[str] = None, - ) -> Dict[str, Any]: + limit: int = 1000, + db=None, + cur=None, + ) -> PagedResult[Content]: if limit is None: raise StorageArgumentException("limit should not be None") (start, end) = get_partition_bounds_bytes( @@ -316,14 +296,20 @@ start = hash_to_bytes(page_token) if end is None: end = b"\xff" * SHA1_SIZE - result = self.content_get_range(start, end, limit) - result2 = { - "contents": result["contents"], - "next_page_token": None, - } - if result["next"]: - result2["next_page_token"] = hash_to_hex(result["next"]) - return result2 + + next_page_token: Optional[str] = None + contents = [] + for counter, row in enumerate(db.content_get_range(start, end, limit + 1, cur)): + row_d = dict(zip(db.content_get_metadata_keys, row)) + content = Content(**row_d) + if counter >= limit: + # take the last content for the next page starting from this + next_page_token = hash_to_hex(content.sha1) + break + contents.append(content) + + assert len(contents) <= limit + return PagedResult(results=contents, next_page_token=next_page_token) @timed @db_transaction(statement_timeout=500) diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -544,30 +544,37 @@ def test_content_get_partition(self, swh_storage, swh_contents): """content_get_partition paginates results if limit exceeded""" - expected_contents = [c.to_dict() for c in swh_contents if c.status != "absent"] + expected_contents = [ + attr.evolve(c, data=None) for c in swh_contents if c.status != "absent" + ] actual_contents = [] for i in range(16): actual_result = swh_storage.content_get_partition(i, 16) - assert actual_result["next_page_token"] is None - actual_contents.extend(actual_result["contents"]) + assert actual_result.next_page_token is None + actual_contents.extend(actual_result.results) - assert_contents_ok(expected_contents, actual_contents, ["sha1"]) + for content in actual_contents: + assert content in expected_contents def test_content_get_partition_full(self, swh_storage, swh_contents): - """content_get_partition for a single partition returns all available - contents""" - expected_contents = [c.to_dict() for c in swh_contents if c.status != "absent"] + """content_get_partition for a single partition returns all available contents + + """ + expected_contents = [ + attr.evolve(c, data=None) for c in swh_contents if c.status != "absent" + ] actual_result = swh_storage.content_get_partition(0, 1) - assert actual_result["next_page_token"] is None + assert actual_result.next_page_token is None - actual_contents = actual_result["contents"] - assert_contents_ok(expected_contents, actual_contents, ["sha1"]) + actual_contents = actual_result.results + assert len(actual_contents) == len(expected_contents) + for content in actual_contents: + assert content in expected_contents def test_content_get_partition_empty(self, swh_storage, swh_contents): - """content_get_partition when at least one of the partitions is - empty""" + """content_get_partition when at least one of the partitions is empty""" expected_contents = { cont.sha1 for cont in swh_contents if cont.status != "absent" } @@ -582,24 +589,24 @@ i, nb_partitions, limit=len(swh_contents) + 1 ) - for cont in actual_result["contents"]: - seen_sha1s.append(cont["sha1"]) + for content in actual_result.results: + seen_sha1s.append(content.sha1) # Limit is higher than the max number of results - assert actual_result["next_page_token"] is None + assert actual_result.next_page_token is None assert set(seen_sha1s) == expected_contents def test_content_get_partition_limit_none(self, swh_storage): """content_get_partition call with wrong limit input should fail""" - with pytest.raises(StorageArgumentException) as e: + with pytest.raises(StorageArgumentException, match="limit should not be None"): swh_storage.content_get_partition(1, 16, limit=None) - assert e.value.args == ("limit should not be None",) - def test_generate_content_get_partition_pagination(self, swh_storage, swh_contents): """content_get_partition returns contents within range provided""" - expected_contents = [c.to_dict() for c in swh_contents if c.status != "absent"] + expected_contents = [ + attr.evolve(c, data=None) for c in swh_contents if c.status != "absent" + ] # retrieve contents actual_contents = [] @@ -609,13 +616,15 @@ actual_result = swh_storage.content_get_partition( i, 4, limit=3, page_token=page_token ) - actual_contents.extend(actual_result["contents"]) - page_token = actual_result["next_page_token"] + actual_contents.extend(actual_result.results) + page_token = actual_result.next_page_token if page_token is None: break - assert_contents_ok(expected_contents, actual_contents, ["sha1"]) + assert len(actual_contents) == len(expected_contents) + for content in actual_contents: + assert content in expected_contents def test_content_get_metadata(self, swh_storage, sample_data): cont1, cont2 = sample_data.contents[:2]