Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9336903
D3240.id11487.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
23 KB
Subscribers
None
D3240.id11487.diff
View Options
diff --git a/docs/extrinsic-metadata-specification.rst b/docs/extrinsic-metadata-specification.rst
--- a/docs/extrinsic-metadata-specification.rst
+++ b/docs/extrinsic-metadata-specification.rst
@@ -158,12 +158,17 @@
origin_metadata_get(origin_url,
authority,
- after, limit)
+ page_token, limit)
- which returns a list of dictionaries, one for each metadata item
- deposited, corresponding to the given origin and obtained from the
- specified authority.
- `authority` must be a dict containing keys `type` and `url`.
+ where `authority` must be a dict containing keys `type` and `url`
+ which returns a dictionary with keys:
+
+ * `next_page_token`, which is an opaque token to be used as
+ `page_token` for retrieving the next page. if absent, there is
+ no more pages to gather.
+ * `results`: list of dictionaries, one for each metadata item
+ deposited, corresponding to the given origin and obtained from the
+ specified authority.
Each of these dictionaries is in the following format::
@@ -175,8 +180,10 @@
'metadata': b'...'
}
-The parameters ``after`` and ``limit`` are used for pagination based on the
-order defined by the ``discovery_date``.
+The parameters ``page_token`` and ``limit`` are used for pagination based on
+an arbitrary order. An initial query to ``origin_metadata_get`` must set
+``page_token`` to ``None``, and further query must use the value from the
+previous query's ``next_page_token`` to get the next page of results.
``metadata`` is a bytes array (eventually encoded using Base64).
Its format is specific to each authority; and is treated as an opaque value
@@ -215,7 +222,8 @@
<X>_metadata_get(id,
authority,
- after, limit)
+ after,
+ page_token, limit)
definited similarly to ``origin_metadata_add`` and ``origin_metadata_get``,
diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -868,16 +868,44 @@
@_prepared_statement(
"SELECT * from origin_metadata "
- "WHERE origin=? AND authority_url=? AND discovery_date>=? "
+ "WHERE origin=? AND authority_url=? AND discovery_date>? "
"AND authority_type=?"
)
- def origin_metadata_get_after(
+ def origin_metadata_get_after_date(
self, origin, authority_type, authority_url, after, *, statement
):
return self._execute_with_retries(
statement, [origin, authority_url, after, authority_type]
)
+ @_prepared_statement(
+ "SELECT * from origin_metadata "
+ "WHERE origin=? AND authority_type=? AND authority_url=? "
+ "AND (discovery_date, fetcher_name, fetcher_version) > (?, ?, ?)"
+ )
+ def origin_metadata_get_after_date_and_fetcher(
+ self,
+ origin,
+ authority_type,
+ authority_url,
+ after_date,
+ after_fetcher_name,
+ after_fetcher_version,
+ *,
+ statement,
+ ):
+ return self._execute_with_retries(
+ statement,
+ [
+ origin,
+ authority_type,
+ authority_url,
+ after_date,
+ after_fetcher_name,
+ after_fetcher_version,
+ ],
+ )
+
@_prepared_statement(
"SELECT * from origin_metadata "
"WHERE origin=? AND authority_url=? AND authority_type=?"
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -13,6 +13,7 @@
import attr
import dateutil
+from swh.core.api.serializers import msgpack_loads, msgpack_dumps
from swh.model.model import (
Revision,
Release,
@@ -1090,22 +1091,39 @@
origin_url: str,
authority: Dict[str, str],
after: Optional[datetime.datetime] = None,
- limit: Optional[int] = None,
- ) -> List[Dict[str, Any]]:
+ page_token: Optional[bytes] = None,
+ limit: int = 1000,
+ ) -> Dict[str, Any]:
if not isinstance(origin_url, str):
raise TypeError("origin_url must be str, not %r" % (origin_url,))
- if after is None:
- entries = self._cql_runner.origin_metadata_get(
- origin_url, authority["type"], authority["url"]
+ if page_token is not None:
+ (after_date, after_fetcher_name, after_fetcher_url) = msgpack_loads(
+ page_token
)
- else:
- entries = self._cql_runner.origin_metadata_get_after(
+ if after and after_date < after:
+ raise StorageArgumentException(
+ "page_token is inconsistent with the value of 'after'."
+ )
+ entries = self._cql_runner.origin_metadata_get_after_date_and_fetcher(
+ origin_url,
+ authority["type"],
+ authority["url"],
+ after_date,
+ after_fetcher_name,
+ after_fetcher_url,
+ )
+ elif after is not None:
+ entries = self._cql_runner.origin_metadata_get_after_date(
origin_url, authority["type"], authority["url"], after
)
+ else:
+ entries = self._cql_runner.origin_metadata_get(
+ origin_url, authority["type"], authority["url"]
+ )
if limit:
- entries = itertools.islice(entries, 0, limit)
+ entries = itertools.islice(entries, 0, limit + 1)
results = []
for entry in entries:
@@ -1126,7 +1144,25 @@
"metadata": entry.metadata,
}
)
- return results
+
+ if len(results) > limit:
+ results.pop()
+ assert len(results) == limit
+ last_result = results[-1]
+ next_page_token: Optional[bytes] = msgpack_dumps(
+ (
+ last_result["discovery_date"],
+ last_result["fetcher"]["name"],
+ last_result["fetcher"]["version"],
+ )
+ )
+ else:
+ next_page_token = None
+
+ return {
+ "next_page_token": next_page_token,
+ "results": results,
+ }
def metadata_fetcher_add(
self, name: str, version: str, metadata: Dict[str, Any]
diff --git a/swh/storage/db.py b/swh/storage/db.py
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -1076,6 +1076,7 @@
"discovery_date",
"metadata_authority.type",
"metadata_authority.url",
+ "metadata_fetcher.id",
"metadata_fetcher.name",
"metadata_fetcher.version",
"format",
@@ -1120,7 +1121,8 @@
self,
origin_url: str,
authority: int,
- after: Optional[datetime.datetime],
+ after_time: Optional[datetime.datetime],
+ after_fetcher: Optional[int],
limit: Optional[int],
cur=None,
):
@@ -1134,15 +1136,19 @@
f" ON (metadata_authority.id=authority_id) "
f"INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) "
f"INNER JOIN origin ON (origin.id=origin_metadata.origin_id) "
- f"WHERE origin.url=%s AND authority_id=%s"
+ f"WHERE origin.url=%s AND authority_id=%s "
]
args = [origin_url, authority]
- if after:
- query_parts.append("AND discovery_date >= %s")
- args.append(after)
+ if after_fetcher is not None:
+ assert after_time
+ query_parts.append("AND (discovery_date, fetcher_id) > (%s, %s)")
+ args.extend([after_time, after_fetcher])
+ elif after_time is not None:
+ query_parts.append("AND discovery_date > %s")
+ args.append(after_time)
- query_parts.append("ORDER BY discovery_date")
+ query_parts.append("ORDER BY discovery_date, fetcher_id")
if limit:
query_parts.append("LIMIT %s")
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -31,6 +31,7 @@
import attr
+from swh.core.api.serializers import msgpack_loads, msgpack_dumps
from swh.model.model import (
BaseContent,
Content,
@@ -62,6 +63,8 @@
SortedListItem = TypeVar("SortedListItem")
SortedListKey = TypeVar("SortedListKey")
+FetcherKey = Tuple[str, str]
+
class SortedList(collections.UserList, Generic[SortedListKey, SortedListItem]):
data: List[Tuple[SortedListKey, SortedListItem]]
@@ -92,7 +95,7 @@
for (k, item) in self.data:
yield item
- def iter_from(self, start_key: SortedListKey) -> Iterator[SortedListItem]:
+ def iter_from(self, start_key: Any) -> Iterator[SortedListItem]:
"""Returns an iterator over all the elements whose key is greater
or equal to `start_key`.
(This is an efficient equivalent to:
@@ -102,7 +105,7 @@
for (k, item) in itertools.islice(self.data, from_index, None):
yield item
- def iter_after(self, start_key: SortedListKey) -> Iterator[SortedListItem]:
+ def iter_after(self, start_key: Any) -> Iterator[SortedListItem]:
"""Same as iter_from, but using a strict inequality."""
it = self.iter_from(start_key)
for item in it:
@@ -137,12 +140,18 @@
# {origin_url: {authority: [metadata]}}
self._origin_metadata: Dict[
- str, Dict[Hashable, SortedList[datetime.datetime, Dict[str, Any]]]
+ str,
+ Dict[
+ Hashable,
+ SortedList[Tuple[datetime.datetime, FetcherKey], Dict[str, Any]],
+ ],
] = defaultdict(
- lambda: defaultdict(lambda: SortedList(key=lambda x: x["discovery_date"]))
+ lambda: defaultdict(
+ lambda: SortedList(key=lambda x: (x["discovery_date"], x["fetcher"]))
+ )
) # noqa
- self._metadata_fetchers: Dict[Hashable, Dict[str, Any]] = {}
+ self._metadata_fetchers: Dict[FetcherKey, Dict[str, Any]] = {}
self._metadata_authorities: Dict[Hashable, Dict[str, Any]] = {}
self._objects = defaultdict(list)
self._sorted_sha1s = SortedList[bytes, bytes]()
@@ -1109,24 +1118,41 @@
origin_url: str,
authority: Dict[str, str],
after: Optional[datetime.datetime] = None,
- limit: Optional[int] = None,
- ) -> List[Dict[str, Any]]:
+ page_token: Optional[bytes] = None,
+ limit: int = 1000,
+ ) -> Dict[str, Any]:
if not isinstance(origin_url, str):
raise TypeError("origin_url must be str, not %r" % (origin_url,))
authority_key = self._metadata_authority_key(authority)
- if after is None:
- entries = iter(self._origin_metadata[origin_url][authority_key])
+ if page_token is not None:
+ (after_time, after_fetcher) = msgpack_loads(page_token)
+ after_fetcher = tuple(after_fetcher)
+ if after is not None and after > after_time:
+ raise StorageArgumentException(
+ "page_token is inconsistent with the value of 'after'."
+ )
+ entries = self._origin_metadata[origin_url][authority_key].iter_after(
+ (after_time, after_fetcher)
+ )
+ elif after is not None:
+ entries = self._origin_metadata[origin_url][authority_key].iter_from(
+ (after,)
+ )
+ entries = (entry for entry in entries if entry["discovery_date"] > after)
else:
- entries = self._origin_metadata[origin_url][authority_key].iter_from(after)
+ entries = iter(self._origin_metadata[origin_url][authority_key])
+
if limit:
- entries = itertools.islice(entries, 0, limit)
+ entries = itertools.islice(entries, 0, limit + 1)
results = []
for entry in entries:
authority = self._metadata_authorities[entry["authority"]]
fetcher = self._metadata_fetchers[entry["fetcher"]]
+ if after:
+ assert entry["discovery_date"] > after
results.append(
{
**entry,
@@ -1137,7 +1163,24 @@
},
}
)
- return results
+
+ if len(results) > limit:
+ results.pop()
+ assert len(results) == limit
+ last_result = results[-1]
+ next_page_token: Optional[bytes] = msgpack_dumps(
+ (
+ last_result["discovery_date"],
+ self._metadata_fetcher_key(last_result["fetcher"]),
+ )
+ )
+ else:
+ next_page_token = None
+
+ return {
+ "next_page_token": next_page_token,
+ "results": results,
+ }
def metadata_fetcher_add(
self, name: str, version: str, metadata: Dict[str, Any]
@@ -1197,7 +1240,7 @@
return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS))
@staticmethod
- def _metadata_fetcher_key(fetcher: Dict) -> Hashable:
+ def _metadata_fetcher_key(fetcher: Dict) -> FetcherKey:
return (fetcher["name"], fetcher["version"])
@staticmethod
diff --git a/swh/storage/interface.py b/swh/storage/interface.py
--- a/swh/storage/interface.py
+++ b/swh/storage/interface.py
@@ -1165,18 +1165,23 @@
origin_url: str,
authority: Dict[str, str],
after: Optional[datetime.datetime] = None,
- limit: Optional[int] = None,
- ) -> List[Dict[str, Any]]:
+ page_token: Optional[bytes] = None,
+ limit: int = 1000,
+ ) -> Dict[str, Any]:
"""Retrieve list of all origin_metadata entries for the origin_id
Args:
origin_url: the origin's URL
authority: a dict containing keys `type` and `url`.
after: minimum discovery_date for a result to be returned
+ page_token: opaque token, used to get the next page of results
limit: maximum number of results to be returned
Returns:
- list of dicts in the format:
+ dict with keys `next_page_token` and `results`.
+ `next_page_token` is an opaque token that is used to get the
+ next page of results, or `None` if there are no more results.
+ `results` is a list of dicts in the format:
.. code-block: python
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -17,6 +17,7 @@
import psycopg2.pool
import psycopg2.errors
+from swh.core.api.serializers import msgpack_loads, msgpack_dumps
from swh.model.model import (
Content,
Directory,
@@ -1278,18 +1279,38 @@
origin_url: str,
authority: Dict[str, str],
after: Optional[datetime.datetime] = None,
- limit: Optional[int] = None,
+ page_token: Optional[bytes] = None,
+ limit: int = 1000,
db=None,
cur=None,
- ) -> List[Dict[str, Any]]:
+ ) -> Dict[str, Any]:
+ if page_token:
+ (after_time, after_fetcher) = msgpack_loads(page_token)
+ if after and after_time < after:
+ raise StorageArgumentException(
+ "page_token is inconsistent with the value of 'after'."
+ )
+ else:
+ after_time = after
+ after_fetcher = None
+
authority_id = db.metadata_authority_get_id(
authority["type"], authority["url"], cur
)
if not authority_id:
- return []
+ return {
+ "next_page_token": None,
+ "results": [],
+ }
+
+ rows = db.origin_metadata_get(
+ origin_url, authority_id, after_time, after_fetcher, limit + 1, cur
+ )
+ rows = [dict(zip(db.origin_metadata_get_cols, row)) for row in rows]
results = []
- for line in db.origin_metadata_get(origin_url, authority_id, after, limit, cur):
- row = dict(zip(db.origin_metadata_get_cols, line))
+ for row in rows:
+ row = row.copy()
+ row.pop("metadata_fetcher.id")
results.append(
{
"origin_url": row.pop("origin.url"),
@@ -1304,7 +1325,24 @@
**row,
}
)
- return results
+
+ if len(results) > limit:
+ results.pop()
+ assert len(results) == limit
+ last_returned_row = rows[-2] # rows[-1] corresponds to the popped result
+ next_page_token: Optional[bytes] = msgpack_dumps(
+ (
+ last_returned_row["discovery_date"],
+ last_returned_row["metadata_fetcher.id"],
+ )
+ )
+ else:
+ next_page_token = None
+
+ return {
+ "next_page_token": next_page_token,
+ "results": results,
+ }
@timed
@db_transaction()
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -3216,13 +3216,10 @@
swh_storage.origin_metadata_add(**data.origin_metadata)
swh_storage.origin_metadata_add(**data.origin_metadata2)
- swh_storage.origin_metadata_get(origin["url"], authority)
-
+ result = swh_storage.origin_metadata_get(origin["url"], authority)
+ assert result["next_page_token"] is None
assert [data.origin_metadata, data.origin_metadata2] == list(
- sorted(
- swh_storage.origin_metadata_get(origin["url"], authority),
- key=lambda x: x["discovery_date"],
- )
+ sorted(result["results"], key=lambda x: x["discovery_date"],)
)
def test_origin_metadata_add_duplicate(self, swh_storage):
@@ -3245,13 +3242,10 @@
swh_storage.origin_metadata_add(**data.origin_metadata2)
swh_storage.origin_metadata_add(**new_origin_metadata2)
- swh_storage.origin_metadata_get(origin["url"], authority)
-
+ result = swh_storage.origin_metadata_get(origin["url"], authority)
+ assert result["next_page_token"] is None
assert [data.origin_metadata, new_origin_metadata2] == list(
- sorted(
- swh_storage.origin_metadata_get(origin["url"], authority),
- key=lambda x: x["discovery_date"],
- )
+ sorted(result["results"], key=lambda x: x["discovery_date"],)
)
def test_origin_metadata_add_dict(self, swh_storage):
@@ -3294,23 +3288,109 @@
swh_storage.origin_metadata_add(**origin1_metadata3)
swh_storage.origin_metadata_add(**origin2_metadata)
+ result = swh_storage.origin_metadata_get(origin_url1, authority)
+ assert result["next_page_token"] is None
assert [origin1_metadata1, origin1_metadata2] == list(
- sorted(
- swh_storage.origin_metadata_get(origin_url1, authority),
- key=lambda x: x["discovery_date"],
- )
+ sorted(result["results"], key=lambda x: x["discovery_date"],)
)
+ result = swh_storage.origin_metadata_get(origin_url1, authority2)
+ assert result["next_page_token"] is None
assert [origin1_metadata3] == list(
- sorted(
- swh_storage.origin_metadata_get(origin_url1, authority2),
- key=lambda x: x["discovery_date"],
- )
+ sorted(result["results"], key=lambda x: x["discovery_date"],)
+ )
+
+ result = swh_storage.origin_metadata_get(origin_url2, authority)
+ assert result["next_page_token"] is None
+ assert [origin2_metadata] == list(result["results"],)
+
+ def test_origin_metadata_get_after(self, swh_storage):
+ origin = data.origin
+ fetcher = data.metadata_fetcher
+ authority = data.metadata_authority
+ swh_storage.origin_add([origin])[0]
+
+ swh_storage.metadata_fetcher_add(**fetcher)
+ swh_storage.metadata_authority_add(**authority)
+
+ swh_storage.origin_metadata_add(**data.origin_metadata)
+ swh_storage.origin_metadata_add(**data.origin_metadata2)
+
+ result = swh_storage.origin_metadata_get(
+ origin["url"],
+ authority,
+ after=data.origin_metadata["discovery_date"] - timedelta(seconds=1),
+ )
+ assert result["next_page_token"] is None
+ assert [data.origin_metadata, data.origin_metadata2] == list(
+ sorted(result["results"], key=lambda x: x["discovery_date"],)
+ )
+
+ result = swh_storage.origin_metadata_get(
+ origin["url"], authority, after=data.origin_metadata["discovery_date"]
+ )
+ assert result["next_page_token"] is None
+ assert [data.origin_metadata2] == result["results"]
+
+ result = swh_storage.origin_metadata_get(
+ origin["url"], authority, after=data.origin_metadata2["discovery_date"]
+ )
+ assert result["next_page_token"] is None
+ assert [] == result["results"]
+
+ def test_origin_metadata_get_paginate(self, swh_storage):
+ origin = data.origin
+ fetcher = data.metadata_fetcher
+ authority = data.metadata_authority
+ swh_storage.origin_add([origin])[0]
+
+ swh_storage.metadata_fetcher_add(**fetcher)
+ swh_storage.metadata_authority_add(**authority)
+
+ swh_storage.origin_metadata_add(**data.origin_metadata)
+ swh_storage.origin_metadata_add(**data.origin_metadata2)
+
+ swh_storage.origin_metadata_get(origin["url"], authority)
+
+ result = swh_storage.origin_metadata_get(origin["url"], authority, limit=1)
+ assert result["next_page_token"] is not None
+ assert [data.origin_metadata] == result["results"]
+
+ result = swh_storage.origin_metadata_get(
+ origin["url"], authority, limit=1, page_token=result["next_page_token"]
)
+ assert result["next_page_token"] is None
+ assert [data.origin_metadata2] == result["results"]
+
+ def test_origin_metadata_get_paginate_same_date(self, swh_storage):
+ origin = data.origin
+ fetcher1 = data.metadata_fetcher
+ fetcher2 = data.metadata_fetcher2
+ authority = data.metadata_authority
+ swh_storage.origin_add([origin])[0]
+
+ swh_storage.metadata_fetcher_add(**fetcher1)
+ swh_storage.metadata_fetcher_add(**fetcher2)
+ swh_storage.metadata_authority_add(**authority)
+
+ origin_metadata2 = {
+ **data.origin_metadata2,
+ "discovery_date": data.origin_metadata2["discovery_date"],
+ "fetcher": {"name": fetcher2["name"], "version": fetcher2["version"],},
+ }
+
+ swh_storage.origin_metadata_add(**data.origin_metadata)
+ swh_storage.origin_metadata_add(**origin_metadata2)
+
+ result = swh_storage.origin_metadata_get(origin["url"], authority, limit=1)
+ assert result["next_page_token"] is not None
+ assert [data.origin_metadata] == result["results"]
- assert [origin2_metadata] == list(
- swh_storage.origin_metadata_get(origin_url2, authority)
+ result = swh_storage.origin_metadata_get(
+ origin["url"], authority, limit=1, page_token=result["next_page_token"]
)
+ assert result["next_page_token"] is None
+ assert [origin_metadata2] == result["results"]
class TestStorageGeneratedData:
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Jul 3 2025, 7:47 AM (10 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223387
Attached To
D3240: Add pagination to origin_metadata_get.
Event Timeline
Log In to Comment