Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7343141
D3713.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
D3713.diff
View Options
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -177,9 +177,9 @@
self,
partition_id: int,
nb_partitions: int,
- limit: int = 1000,
page_token: Optional[str] = None,
- ) -> Dict[str, Any]:
+ limit: int = 1000,
+ ) -> PagedResult[Content]:
if limit is None:
raise StorageArgumentException("limit should not be None")
@@ -195,19 +195,24 @@
raise StorageArgumentException("Invalid page_token.")
range_start = int(page_token)
- # Get the first rows of the range
+ next_page_token: Optional[str] = None
+
rows = self._cql_runner.content_get_token_range(range_start, range_end, limit)
- rows = list(rows)
+ contents = []
+ last_id: Optional[int] = None
+ for row in rows:
+ if row.status == "absent":
+ continue
+ row_d = row._asdict()
+ last_id = row_d.pop("tok")
+ contents.append(Content(**row_d))
- if len(rows) == limit:
- next_page_token: Optional[str] = str(rows[-1].tok + 1)
- else:
- next_page_token = None
+ if len(contents) == limit:
+ assert last_id is not None
+ next_page_token = str(last_id + 1)
- return {
- "contents": [row._asdict() for row in rows if row.status != "absent"],
- "next_page_token": next_page_token,
- }
+ assert len(contents) <= limit
+ return PagedResult(results=contents, next_page_token=next_page_token,)
def content_get_metadata(self, contents: List[bytes]) -> Dict[bytes, List[Dict]]:
result: Dict[bytes, List[Dict]] = {sha1: [] for sha1 in contents}
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -292,9 +292,9 @@
self,
partition_id: int,
nb_partitions: int,
- limit: int = 1000,
page_token: Optional[str] = None,
- ) -> Dict[str, Any]:
+ limit: int = 1000,
+ ) -> PagedResult[Content]:
if limit is None:
raise StorageArgumentException("limit should not be None")
(start, end) = get_partition_bounds_bytes(
@@ -304,14 +304,24 @@
start = hash_to_bytes(page_token)
if end is None:
end = b"\xff" * SHA1_SIZE
- result = self.content_get_range(start, end, limit)
- result2 = {
- "contents": result["contents"],
- "next_page_token": None,
- }
- if result["next"]:
- result2["next_page_token"] = hash_to_hex(result["next"])
- return result2
+
+ next_page_token: Optional[str] = None
+ sha1s = (
+ (sha1, content_key)
+ for sha1 in self._sorted_sha1s.iter_from(start)
+ for content_key in self._content_indexes["sha1"][sha1]
+ )
+ contents: List[Content] = []
+ for counter, (sha1, key) in enumerate(sha1s):
+ if sha1 > end:
+ break
+ if counter >= limit:
+ next_page_token = hash_to_hex(sha1)
+ break
+ contents.append(self._contents[key])
+
+ assert len(contents) <= limit
+ return PagedResult(results=contents, next_page_token=next_page_token)
def content_get_metadata(self, contents: List[bytes]) -> Dict[bytes, List[Dict]]:
result: Dict = {sha1: [] for sha1 in contents}
diff --git a/swh/storage/interface.py b/swh/storage/interface.py
--- a/swh/storage/interface.py
+++ b/swh/storage/interface.py
@@ -180,9 +180,9 @@
self,
partition_id: int,
nb_partitions: int,
- limit: int = 1000,
page_token: Optional[str] = None,
- ) -> Dict[str, Any]:
+ limit: int = 1000,
+ ) -> PagedResult[Content]:
"""Splits contents into nb_partitions, and returns one of these based on
partition_id (which must be in [0, nb_partitions-1])
@@ -190,17 +190,15 @@
result order.
Args:
- partition_id (int): index of the partition to fetch
- nb_partitions (int): total number of partitions to split into
- limit (int): Limit result (default to 1000)
- page_token (Optional[str]): opaque token used for pagination.
+ partition_id: index of the partition to fetch
+ nb_partitions: total number of partitions to split into
+ page_token: opaque token used for pagination.
+ limit: Limit result (default to 1000)
Returns:
- a dict with keys:
- - contents (List[dict]): iterable of contents in the partition.
- - **next_page_token** (Optional[str]): opaque token to be used as
- `page_token` for retrieving the next page. if absent, there is
- no more pages to gather.
+ PagedResult of Content model objects within the partition. If
+ next_page_token is None, there is no longer data to retrieve.
+
"""
...
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -278,35 +278,15 @@
@timed
@db_transaction()
- def content_get_range(
- self, start: bytes, end: bytes, limit: int = 1000, db=None, cur=None
- ) -> Dict[str, Any]:
- if limit is None:
- raise StorageArgumentException("limit should not be None")
- contents = []
- next_content = None
- for counter, content_row in enumerate(
- db.content_get_range(start, end, limit + 1, cur)
- ):
- content = dict(zip(db.content_get_metadata_keys, content_row))
- if counter >= limit:
- # take the last commit for the next page starting from this
- next_content = content["sha1"]
- break
- contents.append(content)
- return {
- "contents": contents,
- "next": next_content,
- }
-
- @timed
def content_get_partition(
self,
partition_id: int,
nb_partitions: int,
- limit: int = 1000,
page_token: Optional[str] = None,
- ) -> Dict[str, Any]:
+ limit: int = 1000,
+ db=None,
+ cur=None,
+ ) -> PagedResult[Content]:
if limit is None:
raise StorageArgumentException("limit should not be None")
(start, end) = get_partition_bounds_bytes(
@@ -316,14 +296,20 @@
start = hash_to_bytes(page_token)
if end is None:
end = b"\xff" * SHA1_SIZE
- result = self.content_get_range(start, end, limit)
- result2 = {
- "contents": result["contents"],
- "next_page_token": None,
- }
- if result["next"]:
- result2["next_page_token"] = hash_to_hex(result["next"])
- return result2
+
+ next_page_token: Optional[str] = None
+ contents = []
+ for counter, row in enumerate(db.content_get_range(start, end, limit + 1, cur)):
+ row_d = dict(zip(db.content_get_metadata_keys, row))
+ content = Content(**row_d)
+ if counter >= limit:
+ # take the last content for the next page starting from this
+ next_page_token = hash_to_hex(content.sha1)
+ break
+ contents.append(content)
+
+ assert len(contents) <= limit
+ return PagedResult(results=contents, next_page_token=next_page_token)
@timed
@db_transaction(statement_timeout=500)
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -544,30 +544,38 @@
def test_content_get_partition(self, swh_storage, swh_contents):
"""content_get_partition paginates results if limit exceeded"""
- expected_contents = [c.to_dict() for c in swh_contents if c.status != "absent"]
+ expected_contents = [
+ attr.evolve(c, data=None) for c in swh_contents if c.status != "absent"
+ ]
actual_contents = []
for i in range(16):
actual_result = swh_storage.content_get_partition(i, 16)
- assert actual_result["next_page_token"] is None
- actual_contents.extend(actual_result["contents"])
+ assert actual_result.next_page_token is None
+ actual_contents.extend(actual_result.results)
- assert_contents_ok(expected_contents, actual_contents, ["sha1"])
+ assert len(actual_contents) == len(expected_contents)
+ for content in actual_contents:
+ assert content in expected_contents
def test_content_get_partition_full(self, swh_storage, swh_contents):
- """content_get_partition for a single partition returns all available
- contents"""
- expected_contents = [c.to_dict() for c in swh_contents if c.status != "absent"]
+ """content_get_partition for a single partition returns all available contents
+
+ """
+ expected_contents = [
+ attr.evolve(c, data=None) for c in swh_contents if c.status != "absent"
+ ]
actual_result = swh_storage.content_get_partition(0, 1)
- assert actual_result["next_page_token"] is None
+ assert actual_result.next_page_token is None
- actual_contents = actual_result["contents"]
- assert_contents_ok(expected_contents, actual_contents, ["sha1"])
+ actual_contents = actual_result.results
+ assert len(actual_contents) == len(expected_contents)
+ for content in actual_contents:
+ assert content in expected_contents
def test_content_get_partition_empty(self, swh_storage, swh_contents):
- """content_get_partition when at least one of the partitions is
- empty"""
+ """content_get_partition when at least one of the partitions is empty"""
expected_contents = {
cont.sha1 for cont in swh_contents if cont.status != "absent"
}
@@ -582,24 +590,24 @@
i, nb_partitions, limit=len(swh_contents) + 1
)
- for cont in actual_result["contents"]:
- seen_sha1s.append(cont["sha1"])
+ for content in actual_result.results:
+ seen_sha1s.append(content.sha1)
# Limit is higher than the max number of results
- assert actual_result["next_page_token"] is None
+ assert actual_result.next_page_token is None
assert set(seen_sha1s) == expected_contents
def test_content_get_partition_limit_none(self, swh_storage):
"""content_get_partition call with wrong limit input should fail"""
- with pytest.raises(StorageArgumentException) as e:
+ with pytest.raises(StorageArgumentException, match="limit should not be None"):
swh_storage.content_get_partition(1, 16, limit=None)
- assert e.value.args == ("limit should not be None",)
-
- def test_generate_content_get_partition_pagination(self, swh_storage, swh_contents):
+ def test_content_get_partition_pagination_generate(self, swh_storage, swh_contents):
"""content_get_partition returns contents within range provided"""
- expected_contents = [c.to_dict() for c in swh_contents if c.status != "absent"]
+ expected_contents = [
+ attr.evolve(c, data=None) for c in swh_contents if c.status != "absent"
+ ]
# retrieve contents
actual_contents = []
@@ -609,13 +617,15 @@
actual_result = swh_storage.content_get_partition(
i, 4, limit=3, page_token=page_token
)
- actual_contents.extend(actual_result["contents"])
- page_token = actual_result["next_page_token"]
+ actual_contents.extend(actual_result.results)
+ page_token = actual_result.next_page_token
if page_token is None:
break
- assert_contents_ok(expected_contents, actual_contents, ["sha1"])
+ assert len(actual_contents) == len(expected_contents)
+ for content in actual_contents:
+ assert content in expected_contents
def test_content_get_metadata(self, swh_storage, sample_data):
cont1, cont2 = sample_data.contents[:2]
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mar 17 2025, 7:38 PM (7 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223279
Attached To
D3713: storage*: content_get_partition(...) -> PagedResult[Content]
Event Timeline
Log In to Comment