Page MenuHomeSoftware Heritage

D3240.id11487.diff
No OneTemporary

D3240.id11487.diff

diff --git a/docs/extrinsic-metadata-specification.rst b/docs/extrinsic-metadata-specification.rst
--- a/docs/extrinsic-metadata-specification.rst
+++ b/docs/extrinsic-metadata-specification.rst
@@ -158,12 +158,17 @@
origin_metadata_get(origin_url,
authority,
- after, limit)
+ page_token, limit)
- which returns a list of dictionaries, one for each metadata item
- deposited, corresponding to the given origin and obtained from the
- specified authority.
- `authority` must be a dict containing keys `type` and `url`.
+ where `authority` must be a dict containing keys `type` and `url`
+ which returns a dictionary with keys:
+
+ * `next_page_token`, which is an opaque token to be used as
+ `page_token` for retrieving the next page. if absent, there is
+ no more pages to gather.
+ * `results`: list of dictionaries, one for each metadata item
+ deposited, corresponding to the given origin and obtained from the
+ specified authority.
Each of these dictionaries is in the following format::
@@ -175,8 +180,10 @@
'metadata': b'...'
}
-The parameters ``after`` and ``limit`` are used for pagination based on the
-order defined by the ``discovery_date``.
+The parameters ``page_token`` and ``limit`` are used for pagination based on
+an arbitrary order. An initial query to ``origin_metadata_get`` must set
+``page_token`` to ``None``, and further query must use the value from the
+previous query's ``next_page_token`` to get the next page of results.
``metadata`` is a bytes array (eventually encoded using Base64).
Its format is specific to each authority; and is treated as an opaque value
@@ -215,7 +222,8 @@
<X>_metadata_get(id,
authority,
- after, limit)
+ after,
+ page_token, limit)
definited similarly to ``origin_metadata_add`` and ``origin_metadata_get``,
diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -868,16 +868,44 @@
@_prepared_statement(
"SELECT * from origin_metadata "
- "WHERE origin=? AND authority_url=? AND discovery_date>=? "
+ "WHERE origin=? AND authority_url=? AND discovery_date>? "
"AND authority_type=?"
)
- def origin_metadata_get_after(
+ def origin_metadata_get_after_date(
self, origin, authority_type, authority_url, after, *, statement
):
return self._execute_with_retries(
statement, [origin, authority_url, after, authority_type]
)
+ @_prepared_statement(
+ "SELECT * from origin_metadata "
+ "WHERE origin=? AND authority_type=? AND authority_url=? "
+ "AND (discovery_date, fetcher_name, fetcher_version) > (?, ?, ?)"
+ )
+ def origin_metadata_get_after_date_and_fetcher(
+ self,
+ origin,
+ authority_type,
+ authority_url,
+ after_date,
+ after_fetcher_name,
+ after_fetcher_version,
+ *,
+ statement,
+ ):
+ return self._execute_with_retries(
+ statement,
+ [
+ origin,
+ authority_type,
+ authority_url,
+ after_date,
+ after_fetcher_name,
+ after_fetcher_version,
+ ],
+ )
+
@_prepared_statement(
"SELECT * from origin_metadata "
"WHERE origin=? AND authority_url=? AND authority_type=?"
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -13,6 +13,7 @@
import attr
import dateutil
+from swh.core.api.serializers import msgpack_loads, msgpack_dumps
from swh.model.model import (
Revision,
Release,
@@ -1090,22 +1091,39 @@
origin_url: str,
authority: Dict[str, str],
after: Optional[datetime.datetime] = None,
- limit: Optional[int] = None,
- ) -> List[Dict[str, Any]]:
+ page_token: Optional[bytes] = None,
+ limit: int = 1000,
+ ) -> Dict[str, Any]:
if not isinstance(origin_url, str):
raise TypeError("origin_url must be str, not %r" % (origin_url,))
- if after is None:
- entries = self._cql_runner.origin_metadata_get(
- origin_url, authority["type"], authority["url"]
+ if page_token is not None:
+ (after_date, after_fetcher_name, after_fetcher_url) = msgpack_loads(
+ page_token
)
- else:
- entries = self._cql_runner.origin_metadata_get_after(
+ if after and after_date < after:
+ raise StorageArgumentException(
+ "page_token is inconsistent with the value of 'after'."
+ )
+ entries = self._cql_runner.origin_metadata_get_after_date_and_fetcher(
+ origin_url,
+ authority["type"],
+ authority["url"],
+ after_date,
+ after_fetcher_name,
+ after_fetcher_url,
+ )
+ elif after is not None:
+ entries = self._cql_runner.origin_metadata_get_after_date(
origin_url, authority["type"], authority["url"], after
)
+ else:
+ entries = self._cql_runner.origin_metadata_get(
+ origin_url, authority["type"], authority["url"]
+ )
if limit:
- entries = itertools.islice(entries, 0, limit)
+ entries = itertools.islice(entries, 0, limit + 1)
results = []
for entry in entries:
@@ -1126,7 +1144,25 @@
"metadata": entry.metadata,
}
)
- return results
+
+ if len(results) > limit:
+ results.pop()
+ assert len(results) == limit
+ last_result = results[-1]
+ next_page_token: Optional[bytes] = msgpack_dumps(
+ (
+ last_result["discovery_date"],
+ last_result["fetcher"]["name"],
+ last_result["fetcher"]["version"],
+ )
+ )
+ else:
+ next_page_token = None
+
+ return {
+ "next_page_token": next_page_token,
+ "results": results,
+ }
def metadata_fetcher_add(
self, name: str, version: str, metadata: Dict[str, Any]
diff --git a/swh/storage/db.py b/swh/storage/db.py
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -1076,6 +1076,7 @@
"discovery_date",
"metadata_authority.type",
"metadata_authority.url",
+ "metadata_fetcher.id",
"metadata_fetcher.name",
"metadata_fetcher.version",
"format",
@@ -1120,7 +1121,8 @@
self,
origin_url: str,
authority: int,
- after: Optional[datetime.datetime],
+ after_time: Optional[datetime.datetime],
+ after_fetcher: Optional[int],
limit: Optional[int],
cur=None,
):
@@ -1134,15 +1136,19 @@
f" ON (metadata_authority.id=authority_id) "
f"INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) "
f"INNER JOIN origin ON (origin.id=origin_metadata.origin_id) "
- f"WHERE origin.url=%s AND authority_id=%s"
+ f"WHERE origin.url=%s AND authority_id=%s "
]
args = [origin_url, authority]
- if after:
- query_parts.append("AND discovery_date >= %s")
- args.append(after)
+ if after_fetcher is not None:
+ assert after_time
+ query_parts.append("AND (discovery_date, fetcher_id) > (%s, %s)")
+ args.extend([after_time, after_fetcher])
+ elif after_time is not None:
+ query_parts.append("AND discovery_date > %s")
+ args.append(after_time)
- query_parts.append("ORDER BY discovery_date")
+ query_parts.append("ORDER BY discovery_date, fetcher_id")
if limit:
query_parts.append("LIMIT %s")
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -31,6 +31,7 @@
import attr
+from swh.core.api.serializers import msgpack_loads, msgpack_dumps
from swh.model.model import (
BaseContent,
Content,
@@ -62,6 +63,8 @@
SortedListItem = TypeVar("SortedListItem")
SortedListKey = TypeVar("SortedListKey")
+FetcherKey = Tuple[str, str]
+
class SortedList(collections.UserList, Generic[SortedListKey, SortedListItem]):
data: List[Tuple[SortedListKey, SortedListItem]]
@@ -92,7 +95,7 @@
for (k, item) in self.data:
yield item
- def iter_from(self, start_key: SortedListKey) -> Iterator[SortedListItem]:
+ def iter_from(self, start_key: Any) -> Iterator[SortedListItem]:
"""Returns an iterator over all the elements whose key is greater
or equal to `start_key`.
(This is an efficient equivalent to:
@@ -102,7 +105,7 @@
for (k, item) in itertools.islice(self.data, from_index, None):
yield item
- def iter_after(self, start_key: SortedListKey) -> Iterator[SortedListItem]:
+ def iter_after(self, start_key: Any) -> Iterator[SortedListItem]:
"""Same as iter_from, but using a strict inequality."""
it = self.iter_from(start_key)
for item in it:
@@ -137,12 +140,18 @@
# {origin_url: {authority: [metadata]}}
self._origin_metadata: Dict[
- str, Dict[Hashable, SortedList[datetime.datetime, Dict[str, Any]]]
+ str,
+ Dict[
+ Hashable,
+ SortedList[Tuple[datetime.datetime, FetcherKey], Dict[str, Any]],
+ ],
] = defaultdict(
- lambda: defaultdict(lambda: SortedList(key=lambda x: x["discovery_date"]))
+ lambda: defaultdict(
+ lambda: SortedList(key=lambda x: (x["discovery_date"], x["fetcher"]))
+ )
) # noqa
- self._metadata_fetchers: Dict[Hashable, Dict[str, Any]] = {}
+ self._metadata_fetchers: Dict[FetcherKey, Dict[str, Any]] = {}
self._metadata_authorities: Dict[Hashable, Dict[str, Any]] = {}
self._objects = defaultdict(list)
self._sorted_sha1s = SortedList[bytes, bytes]()
@@ -1109,24 +1118,41 @@
origin_url: str,
authority: Dict[str, str],
after: Optional[datetime.datetime] = None,
- limit: Optional[int] = None,
- ) -> List[Dict[str, Any]]:
+ page_token: Optional[bytes] = None,
+ limit: int = 1000,
+ ) -> Dict[str, Any]:
if not isinstance(origin_url, str):
raise TypeError("origin_url must be str, not %r" % (origin_url,))
authority_key = self._metadata_authority_key(authority)
- if after is None:
- entries = iter(self._origin_metadata[origin_url][authority_key])
+ if page_token is not None:
+ (after_time, after_fetcher) = msgpack_loads(page_token)
+ after_fetcher = tuple(after_fetcher)
+ if after is not None and after > after_time:
+ raise StorageArgumentException(
+ "page_token is inconsistent with the value of 'after'."
+ )
+ entries = self._origin_metadata[origin_url][authority_key].iter_after(
+ (after_time, after_fetcher)
+ )
+ elif after is not None:
+ entries = self._origin_metadata[origin_url][authority_key].iter_from(
+ (after,)
+ )
+ entries = (entry for entry in entries if entry["discovery_date"] > after)
else:
- entries = self._origin_metadata[origin_url][authority_key].iter_from(after)
+ entries = iter(self._origin_metadata[origin_url][authority_key])
+
if limit:
- entries = itertools.islice(entries, 0, limit)
+ entries = itertools.islice(entries, 0, limit + 1)
results = []
for entry in entries:
authority = self._metadata_authorities[entry["authority"]]
fetcher = self._metadata_fetchers[entry["fetcher"]]
+ if after:
+ assert entry["discovery_date"] > after
results.append(
{
**entry,
@@ -1137,7 +1163,24 @@
},
}
)
- return results
+
+ if len(results) > limit:
+ results.pop()
+ assert len(results) == limit
+ last_result = results[-1]
+ next_page_token: Optional[bytes] = msgpack_dumps(
+ (
+ last_result["discovery_date"],
+ self._metadata_fetcher_key(last_result["fetcher"]),
+ )
+ )
+ else:
+ next_page_token = None
+
+ return {
+ "next_page_token": next_page_token,
+ "results": results,
+ }
def metadata_fetcher_add(
self, name: str, version: str, metadata: Dict[str, Any]
@@ -1197,7 +1240,7 @@
return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS))
@staticmethod
- def _metadata_fetcher_key(fetcher: Dict) -> Hashable:
+ def _metadata_fetcher_key(fetcher: Dict) -> FetcherKey:
return (fetcher["name"], fetcher["version"])
@staticmethod
diff --git a/swh/storage/interface.py b/swh/storage/interface.py
--- a/swh/storage/interface.py
+++ b/swh/storage/interface.py
@@ -1165,18 +1165,23 @@
origin_url: str,
authority: Dict[str, str],
after: Optional[datetime.datetime] = None,
- limit: Optional[int] = None,
- ) -> List[Dict[str, Any]]:
+ page_token: Optional[bytes] = None,
+ limit: int = 1000,
+ ) -> Dict[str, Any]:
"""Retrieve list of all origin_metadata entries for the origin_id
Args:
origin_url: the origin's URL
authority: a dict containing keys `type` and `url`.
after: minimum discovery_date for a result to be returned
+ page_token: opaque token, used to get the next page of results
limit: maximum number of results to be returned
Returns:
- list of dicts in the format:
+ dict with keys `next_page_token` and `results`.
+ `next_page_token` is an opaque token that is used to get the
+ next page of results, or `None` if there are no more results.
+ `results` is a list of dicts in the format:
.. code-block: python
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -17,6 +17,7 @@
import psycopg2.pool
import psycopg2.errors
+from swh.core.api.serializers import msgpack_loads, msgpack_dumps
from swh.model.model import (
Content,
Directory,
@@ -1278,18 +1279,38 @@
origin_url: str,
authority: Dict[str, str],
after: Optional[datetime.datetime] = None,
- limit: Optional[int] = None,
+ page_token: Optional[bytes] = None,
+ limit: int = 1000,
db=None,
cur=None,
- ) -> List[Dict[str, Any]]:
+ ) -> Dict[str, Any]:
+ if page_token:
+ (after_time, after_fetcher) = msgpack_loads(page_token)
+ if after and after_time < after:
+ raise StorageArgumentException(
+ "page_token is inconsistent with the value of 'after'."
+ )
+ else:
+ after_time = after
+ after_fetcher = None
+
authority_id = db.metadata_authority_get_id(
authority["type"], authority["url"], cur
)
if not authority_id:
- return []
+ return {
+ "next_page_token": None,
+ "results": [],
+ }
+
+ rows = db.origin_metadata_get(
+ origin_url, authority_id, after_time, after_fetcher, limit + 1, cur
+ )
+ rows = [dict(zip(db.origin_metadata_get_cols, row)) for row in rows]
results = []
- for line in db.origin_metadata_get(origin_url, authority_id, after, limit, cur):
- row = dict(zip(db.origin_metadata_get_cols, line))
+ for row in rows:
+ row = row.copy()
+ row.pop("metadata_fetcher.id")
results.append(
{
"origin_url": row.pop("origin.url"),
@@ -1304,7 +1325,24 @@
**row,
}
)
- return results
+
+ if len(results) > limit:
+ results.pop()
+ assert len(results) == limit
+ last_returned_row = rows[-2] # rows[-1] corresponds to the popped result
+ next_page_token: Optional[bytes] = msgpack_dumps(
+ (
+ last_returned_row["discovery_date"],
+ last_returned_row["metadata_fetcher.id"],
+ )
+ )
+ else:
+ next_page_token = None
+
+ return {
+ "next_page_token": next_page_token,
+ "results": results,
+ }
@timed
@db_transaction()
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -3216,13 +3216,10 @@
swh_storage.origin_metadata_add(**data.origin_metadata)
swh_storage.origin_metadata_add(**data.origin_metadata2)
- swh_storage.origin_metadata_get(origin["url"], authority)
-
+ result = swh_storage.origin_metadata_get(origin["url"], authority)
+ assert result["next_page_token"] is None
assert [data.origin_metadata, data.origin_metadata2] == list(
- sorted(
- swh_storage.origin_metadata_get(origin["url"], authority),
- key=lambda x: x["discovery_date"],
- )
+ sorted(result["results"], key=lambda x: x["discovery_date"],)
)
def test_origin_metadata_add_duplicate(self, swh_storage):
@@ -3245,13 +3242,10 @@
swh_storage.origin_metadata_add(**data.origin_metadata2)
swh_storage.origin_metadata_add(**new_origin_metadata2)
- swh_storage.origin_metadata_get(origin["url"], authority)
-
+ result = swh_storage.origin_metadata_get(origin["url"], authority)
+ assert result["next_page_token"] is None
assert [data.origin_metadata, new_origin_metadata2] == list(
- sorted(
- swh_storage.origin_metadata_get(origin["url"], authority),
- key=lambda x: x["discovery_date"],
- )
+ sorted(result["results"], key=lambda x: x["discovery_date"],)
)
def test_origin_metadata_add_dict(self, swh_storage):
@@ -3294,23 +3288,109 @@
swh_storage.origin_metadata_add(**origin1_metadata3)
swh_storage.origin_metadata_add(**origin2_metadata)
+ result = swh_storage.origin_metadata_get(origin_url1, authority)
+ assert result["next_page_token"] is None
assert [origin1_metadata1, origin1_metadata2] == list(
- sorted(
- swh_storage.origin_metadata_get(origin_url1, authority),
- key=lambda x: x["discovery_date"],
- )
+ sorted(result["results"], key=lambda x: x["discovery_date"],)
)
+ result = swh_storage.origin_metadata_get(origin_url1, authority2)
+ assert result["next_page_token"] is None
assert [origin1_metadata3] == list(
- sorted(
- swh_storage.origin_metadata_get(origin_url1, authority2),
- key=lambda x: x["discovery_date"],
- )
+ sorted(result["results"], key=lambda x: x["discovery_date"],)
+ )
+
+ result = swh_storage.origin_metadata_get(origin_url2, authority)
+ assert result["next_page_token"] is None
+ assert [origin2_metadata] == list(result["results"],)
+
+ def test_origin_metadata_get_after(self, swh_storage):
+ origin = data.origin
+ fetcher = data.metadata_fetcher
+ authority = data.metadata_authority
+ swh_storage.origin_add([origin])[0]
+
+ swh_storage.metadata_fetcher_add(**fetcher)
+ swh_storage.metadata_authority_add(**authority)
+
+ swh_storage.origin_metadata_add(**data.origin_metadata)
+ swh_storage.origin_metadata_add(**data.origin_metadata2)
+
+ result = swh_storage.origin_metadata_get(
+ origin["url"],
+ authority,
+ after=data.origin_metadata["discovery_date"] - timedelta(seconds=1),
+ )
+ assert result["next_page_token"] is None
+ assert [data.origin_metadata, data.origin_metadata2] == list(
+ sorted(result["results"], key=lambda x: x["discovery_date"],)
+ )
+
+ result = swh_storage.origin_metadata_get(
+ origin["url"], authority, after=data.origin_metadata["discovery_date"]
+ )
+ assert result["next_page_token"] is None
+ assert [data.origin_metadata2] == result["results"]
+
+ result = swh_storage.origin_metadata_get(
+ origin["url"], authority, after=data.origin_metadata2["discovery_date"]
+ )
+ assert result["next_page_token"] is None
+ assert [] == result["results"]
+
+ def test_origin_metadata_get_paginate(self, swh_storage):
+ origin = data.origin
+ fetcher = data.metadata_fetcher
+ authority = data.metadata_authority
+ swh_storage.origin_add([origin])[0]
+
+ swh_storage.metadata_fetcher_add(**fetcher)
+ swh_storage.metadata_authority_add(**authority)
+
+ swh_storage.origin_metadata_add(**data.origin_metadata)
+ swh_storage.origin_metadata_add(**data.origin_metadata2)
+
+ swh_storage.origin_metadata_get(origin["url"], authority)
+
+ result = swh_storage.origin_metadata_get(origin["url"], authority, limit=1)
+ assert result["next_page_token"] is not None
+ assert [data.origin_metadata] == result["results"]
+
+ result = swh_storage.origin_metadata_get(
+ origin["url"], authority, limit=1, page_token=result["next_page_token"]
)
+ assert result["next_page_token"] is None
+ assert [data.origin_metadata2] == result["results"]
+
+ def test_origin_metadata_get_paginate_same_date(self, swh_storage):
+ origin = data.origin
+ fetcher1 = data.metadata_fetcher
+ fetcher2 = data.metadata_fetcher2
+ authority = data.metadata_authority
+ swh_storage.origin_add([origin])[0]
+
+ swh_storage.metadata_fetcher_add(**fetcher1)
+ swh_storage.metadata_fetcher_add(**fetcher2)
+ swh_storage.metadata_authority_add(**authority)
+
+ origin_metadata2 = {
+ **data.origin_metadata2,
+ "discovery_date": data.origin_metadata2["discovery_date"],
+ "fetcher": {"name": fetcher2["name"], "version": fetcher2["version"],},
+ }
+
+ swh_storage.origin_metadata_add(**data.origin_metadata)
+ swh_storage.origin_metadata_add(**origin_metadata2)
+
+ result = swh_storage.origin_metadata_get(origin["url"], authority, limit=1)
+ assert result["next_page_token"] is not None
+ assert [data.origin_metadata] == result["results"]
- assert [origin2_metadata] == list(
- swh_storage.origin_metadata_get(origin_url2, authority)
+ result = swh_storage.origin_metadata_get(
+ origin["url"], authority, limit=1, page_token=result["next_page_token"]
)
+ assert result["next_page_token"] is None
+ assert [origin_metadata2] == result["results"]
class TestStorageGeneratedData:

File Metadata

Mime Type
text/plain
Expires
Jul 3 2025, 7:47 AM (10 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223387

Event Timeline