diff --git a/docs/extrinsic-metadata-specification.rst b/docs/extrinsic-metadata-specification.rst --- a/docs/extrinsic-metadata-specification.rst +++ b/docs/extrinsic-metadata-specification.rst @@ -158,12 +158,17 @@ origin_metadata_get(origin_url, authority, - after, limit) + page_token, limit) - which returns a list of dictionaries, one for each metadata item - deposited, corresponding to the given origin and obtained from the - specified authority. - `authority` must be a dict containing keys `type` and `url`. + where `authority` must be a dict containing keys `type` and `url` + which returns a dictionary with keys: + + * `next_page_token`, which is an opaque token to be used as + `page_token` for retrieving the next page. if absent, there is + no more pages to gather. + * `results`: list of dictionaries, one for each metadata item + deposited, corresponding to the given origin and obtained from the + specified authority. Each of these dictionaries is in the following format:: @@ -175,8 +180,10 @@ 'metadata': b'...' } -The parameters ``after`` and ``limit`` are used for pagination based on the -order defined by the ``discovery_date``. +The parameters ``page_token`` and ``limit`` are used for pagination based on +an arbitrary order. An initial query to ``origin_metadata_get`` must set +``page_token`` to ``None``, and further query must use the value from the +previous query's ``next_page_token`` to get the next page of results. ``metadata`` is a bytes array (eventually encoded using Base64). Its format is specific to each authority; and is treated as an opaque value @@ -215,7 +222,8 @@ _metadata_get(id, authority, - after, limit) + after, + page_token, limit) definited similarly to ``origin_metadata_add`` and ``origin_metadata_get``, diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -868,16 +868,44 @@ @_prepared_statement( "SELECT * from origin_metadata " - "WHERE origin=? AND authority_url=? AND discovery_date>=? " + "WHERE origin=? AND authority_url=? AND discovery_date>? " "AND authority_type=?" ) - def origin_metadata_get_after( + def origin_metadata_get_after_date( self, origin, authority_type, authority_url, after, *, statement ): return self._execute_with_retries( statement, [origin, authority_url, after, authority_type] ) + @_prepared_statement( + "SELECT * from origin_metadata " + "WHERE origin=? AND authority_type=? AND authority_url=? " + "AND (discovery_date, fetcher_name, fetcher_version) > (?, ?, ?)" + ) + def origin_metadata_get_after_date_and_fetcher( + self, + origin, + authority_type, + authority_url, + after_date, + after_fetcher_name, + after_fetcher_version, + *, + statement, + ): + return self._execute_with_retries( + statement, + [ + origin, + authority_type, + authority_url, + after_date, + after_fetcher_name, + after_fetcher_version, + ], + ) + @_prepared_statement( "SELECT * from origin_metadata " "WHERE origin=? AND authority_url=? AND authority_type=?" diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -13,6 +13,7 @@ import attr import dateutil +from swh.core.api.serializers import msgpack_loads, msgpack_dumps from swh.model.model import ( Revision, Release, @@ -1090,22 +1091,39 @@ origin_url: str, authority: Dict[str, str], after: Optional[datetime.datetime] = None, - limit: Optional[int] = None, - ) -> List[Dict[str, Any]]: + page_token: Optional[bytes] = None, + limit: int = 1000, + ) -> Dict[str, Any]: if not isinstance(origin_url, str): raise TypeError("origin_url must be str, not %r" % (origin_url,)) - if after is None: - entries = self._cql_runner.origin_metadata_get( - origin_url, authority["type"], authority["url"] + if page_token is not None: + (after_date, after_fetcher_name, after_fetcher_url) = msgpack_loads( + page_token ) - else: - entries = self._cql_runner.origin_metadata_get_after( + if after and after_date < after: + raise StorageArgumentException( + "page_token is inconsistent with the value of 'after'." + ) + entries = self._cql_runner.origin_metadata_get_after_date_and_fetcher( + origin_url, + authority["type"], + authority["url"], + after_date, + after_fetcher_name, + after_fetcher_url, + ) + elif after is not None: + entries = self._cql_runner.origin_metadata_get_after_date( origin_url, authority["type"], authority["url"], after ) + else: + entries = self._cql_runner.origin_metadata_get( + origin_url, authority["type"], authority["url"] + ) if limit: - entries = itertools.islice(entries, 0, limit) + entries = itertools.islice(entries, 0, limit + 1) results = [] for entry in entries: @@ -1126,7 +1144,25 @@ "metadata": entry.metadata, } ) - return results + + if len(results) > limit: + results.pop() + assert len(results) == limit + last_result = results[-1] + next_page_token: Optional[bytes] = msgpack_dumps( + ( + last_result["discovery_date"], + last_result["fetcher"]["name"], + last_result["fetcher"]["version"], + ) + ) + else: + next_page_token = None + + return { + "next_page_token": next_page_token, + "results": results, + } def metadata_fetcher_add( self, name: str, version: str, metadata: Dict[str, Any] diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -1076,6 +1076,7 @@ "discovery_date", "metadata_authority.type", "metadata_authority.url", + "metadata_fetcher.id", "metadata_fetcher.name", "metadata_fetcher.version", "format", @@ -1120,7 +1121,8 @@ self, origin_url: str, authority: int, - after: Optional[datetime.datetime], + after_time: Optional[datetime.datetime], + after_fetcher: Optional[int], limit: Optional[int], cur=None, ): @@ -1134,15 +1136,19 @@ f" ON (metadata_authority.id=authority_id) " f"INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) " f"INNER JOIN origin ON (origin.id=origin_metadata.origin_id) " - f"WHERE origin.url=%s AND authority_id=%s" + f"WHERE origin.url=%s AND authority_id=%s " ] args = [origin_url, authority] - if after: - query_parts.append("AND discovery_date >= %s") - args.append(after) + if after_fetcher is not None: + assert after_time + query_parts.append("AND (discovery_date, fetcher_id) > (%s, %s)") + args.extend([after_time, after_fetcher]) + elif after_time is not None: + query_parts.append("AND discovery_date > %s") + args.append(after_time) - query_parts.append("ORDER BY discovery_date") + query_parts.append("ORDER BY discovery_date, fetcher_id") if limit: query_parts.append("LIMIT %s") diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -31,6 +31,7 @@ import attr +from swh.core.api.serializers import msgpack_loads, msgpack_dumps from swh.model.model import ( BaseContent, Content, @@ -62,6 +63,8 @@ SortedListItem = TypeVar("SortedListItem") SortedListKey = TypeVar("SortedListKey") +FetcherKey = Tuple[str, str] + class SortedList(collections.UserList, Generic[SortedListKey, SortedListItem]): data: List[Tuple[SortedListKey, SortedListItem]] @@ -92,7 +95,7 @@ for (k, item) in self.data: yield item - def iter_from(self, start_key: SortedListKey) -> Iterator[SortedListItem]: + def iter_from(self, start_key: Any) -> Iterator[SortedListItem]: """Returns an iterator over all the elements whose key is greater or equal to `start_key`. (This is an efficient equivalent to: @@ -102,7 +105,7 @@ for (k, item) in itertools.islice(self.data, from_index, None): yield item - def iter_after(self, start_key: SortedListKey) -> Iterator[SortedListItem]: + def iter_after(self, start_key: Any) -> Iterator[SortedListItem]: """Same as iter_from, but using a strict inequality.""" it = self.iter_from(start_key) for item in it: @@ -137,12 +140,18 @@ # {origin_url: {authority: [metadata]}} self._origin_metadata: Dict[ - str, Dict[Hashable, SortedList[datetime.datetime, Dict[str, Any]]] + str, + Dict[ + Hashable, + SortedList[Tuple[datetime.datetime, FetcherKey], Dict[str, Any]], + ], ] = defaultdict( - lambda: defaultdict(lambda: SortedList(key=lambda x: x["discovery_date"])) + lambda: defaultdict( + lambda: SortedList(key=lambda x: (x["discovery_date"], x["fetcher"])) + ) ) # noqa - self._metadata_fetchers: Dict[Hashable, Dict[str, Any]] = {} + self._metadata_fetchers: Dict[FetcherKey, Dict[str, Any]] = {} self._metadata_authorities: Dict[Hashable, Dict[str, Any]] = {} self._objects = defaultdict(list) self._sorted_sha1s = SortedList[bytes, bytes]() @@ -1109,24 +1118,41 @@ origin_url: str, authority: Dict[str, str], after: Optional[datetime.datetime] = None, - limit: Optional[int] = None, - ) -> List[Dict[str, Any]]: + page_token: Optional[bytes] = None, + limit: int = 1000, + ) -> Dict[str, Any]: if not isinstance(origin_url, str): raise TypeError("origin_url must be str, not %r" % (origin_url,)) authority_key = self._metadata_authority_key(authority) - if after is None: - entries = iter(self._origin_metadata[origin_url][authority_key]) + if page_token is not None: + (after_time, after_fetcher) = msgpack_loads(page_token) + after_fetcher = tuple(after_fetcher) + if after is not None and after > after_time: + raise StorageArgumentException( + "page_token is inconsistent with the value of 'after'." + ) + entries = self._origin_metadata[origin_url][authority_key].iter_after( + (after_time, after_fetcher) + ) + elif after is not None: + entries = self._origin_metadata[origin_url][authority_key].iter_from( + (after,) + ) + entries = (entry for entry in entries if entry["discovery_date"] > after) else: - entries = self._origin_metadata[origin_url][authority_key].iter_from(after) + entries = iter(self._origin_metadata[origin_url][authority_key]) + if limit: - entries = itertools.islice(entries, 0, limit) + entries = itertools.islice(entries, 0, limit + 1) results = [] for entry in entries: authority = self._metadata_authorities[entry["authority"]] fetcher = self._metadata_fetchers[entry["fetcher"]] + if after: + assert entry["discovery_date"] > after results.append( { **entry, @@ -1137,7 +1163,24 @@ }, } ) - return results + + if len(results) > limit: + results.pop() + assert len(results) == limit + last_result = results[-1] + next_page_token: Optional[bytes] = msgpack_dumps( + ( + last_result["discovery_date"], + self._metadata_fetcher_key(last_result["fetcher"]), + ) + ) + else: + next_page_token = None + + return { + "next_page_token": next_page_token, + "results": results, + } def metadata_fetcher_add( self, name: str, version: str, metadata: Dict[str, Any] @@ -1197,7 +1240,7 @@ return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS)) @staticmethod - def _metadata_fetcher_key(fetcher: Dict) -> Hashable: + def _metadata_fetcher_key(fetcher: Dict) -> FetcherKey: return (fetcher["name"], fetcher["version"]) @staticmethod diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -1165,18 +1165,23 @@ origin_url: str, authority: Dict[str, str], after: Optional[datetime.datetime] = None, - limit: Optional[int] = None, - ) -> List[Dict[str, Any]]: + page_token: Optional[bytes] = None, + limit: int = 1000, + ) -> Dict[str, Any]: """Retrieve list of all origin_metadata entries for the origin_id Args: origin_url: the origin's URL authority: a dict containing keys `type` and `url`. after: minimum discovery_date for a result to be returned + page_token: opaque token, used to get the next page of results limit: maximum number of results to be returned Returns: - list of dicts in the format: + dict with keys `next_page_token` and `results`. + `next_page_token` is an opaque token that is used to get the + next page of results, or `None` if there are no more results. + `results` is a list of dicts in the format: .. code-block: python diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -17,6 +17,7 @@ import psycopg2.pool import psycopg2.errors +from swh.core.api.serializers import msgpack_loads, msgpack_dumps from swh.model.model import ( Content, Directory, @@ -1278,18 +1279,38 @@ origin_url: str, authority: Dict[str, str], after: Optional[datetime.datetime] = None, - limit: Optional[int] = None, + page_token: Optional[bytes] = None, + limit: int = 1000, db=None, cur=None, - ) -> List[Dict[str, Any]]: + ) -> Dict[str, Any]: + if page_token: + (after_time, after_fetcher) = msgpack_loads(page_token) + if after and after_time < after: + raise StorageArgumentException( + "page_token is inconsistent with the value of 'after'." + ) + else: + after_time = after + after_fetcher = None + authority_id = db.metadata_authority_get_id( authority["type"], authority["url"], cur ) if not authority_id: - return [] + return { + "next_page_token": None, + "results": [], + } + + rows = db.origin_metadata_get( + origin_url, authority_id, after_time, after_fetcher, limit + 1, cur + ) + rows = [dict(zip(db.origin_metadata_get_cols, row)) for row in rows] results = [] - for line in db.origin_metadata_get(origin_url, authority_id, after, limit, cur): - row = dict(zip(db.origin_metadata_get_cols, line)) + for row in rows: + row = row.copy() + row.pop("metadata_fetcher.id") results.append( { "origin_url": row.pop("origin.url"), @@ -1304,7 +1325,24 @@ **row, } ) - return results + + if len(results) > limit: + results.pop() + assert len(results) == limit + last_returned_row = rows[-2] # rows[-1] corresponds to the popped result + next_page_token: Optional[bytes] = msgpack_dumps( + ( + last_returned_row["discovery_date"], + last_returned_row["metadata_fetcher.id"], + ) + ) + else: + next_page_token = None + + return { + "next_page_token": next_page_token, + "results": results, + } @timed @db_transaction() diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -3216,13 +3216,10 @@ swh_storage.origin_metadata_add(**data.origin_metadata) swh_storage.origin_metadata_add(**data.origin_metadata2) - swh_storage.origin_metadata_get(origin["url"], authority) - + result = swh_storage.origin_metadata_get(origin["url"], authority) + assert result["next_page_token"] is None assert [data.origin_metadata, data.origin_metadata2] == list( - sorted( - swh_storage.origin_metadata_get(origin["url"], authority), - key=lambda x: x["discovery_date"], - ) + sorted(result["results"], key=lambda x: x["discovery_date"],) ) def test_origin_metadata_add_duplicate(self, swh_storage): @@ -3245,13 +3242,10 @@ swh_storage.origin_metadata_add(**data.origin_metadata2) swh_storage.origin_metadata_add(**new_origin_metadata2) - swh_storage.origin_metadata_get(origin["url"], authority) - + result = swh_storage.origin_metadata_get(origin["url"], authority) + assert result["next_page_token"] is None assert [data.origin_metadata, new_origin_metadata2] == list( - sorted( - swh_storage.origin_metadata_get(origin["url"], authority), - key=lambda x: x["discovery_date"], - ) + sorted(result["results"], key=lambda x: x["discovery_date"],) ) def test_origin_metadata_add_dict(self, swh_storage): @@ -3294,23 +3288,109 @@ swh_storage.origin_metadata_add(**origin1_metadata3) swh_storage.origin_metadata_add(**origin2_metadata) + result = swh_storage.origin_metadata_get(origin_url1, authority) + assert result["next_page_token"] is None assert [origin1_metadata1, origin1_metadata2] == list( - sorted( - swh_storage.origin_metadata_get(origin_url1, authority), - key=lambda x: x["discovery_date"], - ) + sorted(result["results"], key=lambda x: x["discovery_date"],) ) + result = swh_storage.origin_metadata_get(origin_url1, authority2) + assert result["next_page_token"] is None assert [origin1_metadata3] == list( - sorted( - swh_storage.origin_metadata_get(origin_url1, authority2), - key=lambda x: x["discovery_date"], - ) + sorted(result["results"], key=lambda x: x["discovery_date"],) + ) + + result = swh_storage.origin_metadata_get(origin_url2, authority) + assert result["next_page_token"] is None + assert [origin2_metadata] == list(result["results"],) + + def test_origin_metadata_get_after(self, swh_storage): + origin = data.origin + fetcher = data.metadata_fetcher + authority = data.metadata_authority + swh_storage.origin_add([origin])[0] + + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority) + + swh_storage.origin_metadata_add(**data.origin_metadata) + swh_storage.origin_metadata_add(**data.origin_metadata2) + + result = swh_storage.origin_metadata_get( + origin["url"], + authority, + after=data.origin_metadata["discovery_date"] - timedelta(seconds=1), + ) + assert result["next_page_token"] is None + assert [data.origin_metadata, data.origin_metadata2] == list( + sorted(result["results"], key=lambda x: x["discovery_date"],) + ) + + result = swh_storage.origin_metadata_get( + origin["url"], authority, after=data.origin_metadata["discovery_date"] + ) + assert result["next_page_token"] is None + assert [data.origin_metadata2] == result["results"] + + result = swh_storage.origin_metadata_get( + origin["url"], authority, after=data.origin_metadata2["discovery_date"] + ) + assert result["next_page_token"] is None + assert [] == result["results"] + + def test_origin_metadata_get_paginate(self, swh_storage): + origin = data.origin + fetcher = data.metadata_fetcher + authority = data.metadata_authority + swh_storage.origin_add([origin])[0] + + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority) + + swh_storage.origin_metadata_add(**data.origin_metadata) + swh_storage.origin_metadata_add(**data.origin_metadata2) + + swh_storage.origin_metadata_get(origin["url"], authority) + + result = swh_storage.origin_metadata_get(origin["url"], authority, limit=1) + assert result["next_page_token"] is not None + assert [data.origin_metadata] == result["results"] + + result = swh_storage.origin_metadata_get( + origin["url"], authority, limit=1, page_token=result["next_page_token"] ) + assert result["next_page_token"] is None + assert [data.origin_metadata2] == result["results"] + + def test_origin_metadata_get_paginate_same_date(self, swh_storage): + origin = data.origin + fetcher1 = data.metadata_fetcher + fetcher2 = data.metadata_fetcher2 + authority = data.metadata_authority + swh_storage.origin_add([origin])[0] + + swh_storage.metadata_fetcher_add(**fetcher1) + swh_storage.metadata_fetcher_add(**fetcher2) + swh_storage.metadata_authority_add(**authority) + + origin_metadata2 = { + **data.origin_metadata2, + "discovery_date": data.origin_metadata2["discovery_date"], + "fetcher": {"name": fetcher2["name"], "version": fetcher2["version"],}, + } + + swh_storage.origin_metadata_add(**data.origin_metadata) + swh_storage.origin_metadata_add(**origin_metadata2) + + result = swh_storage.origin_metadata_get(origin["url"], authority, limit=1) + assert result["next_page_token"] is not None + assert [data.origin_metadata] == result["results"] - assert [origin2_metadata] == list( - swh_storage.origin_metadata_get(origin_url2, authority) + result = swh_storage.origin_metadata_get( + origin["url"], authority, limit=1, page_token=result["next_page_token"] ) + assert result["next_page_token"] is None + assert [origin_metadata2] == result["results"] class TestStorageGeneratedData: