diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -756,6 +756,80 @@ origin_visit_get_method = getattr(self, method_name) return origin_visit_get_method(*args) + @_prepared_statement( + "SELECT * FROM origin_visit_status WHERE origin = ? AND visit = ? AND date > ? " + "ORDER BY date ASC " + "LIMIT ?" + ) + def _origin_visit_get_token_asc_limit( + self, + origin: str, + visit: int, + date_from: datetime.datetime, + limit: int, + *, + statement, + ) -> ResultSet: + return self._execute_with_retries(statement, [origin, visit, date_from, limit]) + + @_prepared_statement( + "SELECT * FROM origin_visit_status WHERE origin = ? AND visit = ? AND date < ? " + "ORDER BY visit DESC " + "LIMIT ?" + ) + def _origin_visit_get_token_desc_limit( + self, + origin: str, + visit: int, + date_from: datetime.datetime, + limit: int, + *, + statement, + ) -> ResultSet: + return self._execute_with_retries(statement, [origin, visit, date_from, limit]) + + @_prepared_statement( + "SELECT * FROM origin_visit_status WHERE origin = ? AND visit = ? " + "ORDER BY visit ASC " + "LIMIT ?" + ) + def _origin_visit_get_no_token_asc_limit( + self, origin: str, visit: int, limit: int, *, statement + ) -> ResultSet: + return self._execute_with_retries(statement, [origin, visit, limit]) + + @_prepared_statement( + "SELECT * FROM origin_visit_status WHERE origin = ? AND visit = ? " + "ORDER BY visit DESC " + "LIMIT ?" + ) + def _origin_visit_get_no_token_desc_limit( + self, origin: str, visit: int, limit: int, *, statement + ) -> ResultSet: + return self._execute_with_retries(statement, [origin, visit, limit]) + + def origin_visit_status_get_range( + self, + origin: str, + visit: int, + date_from: Optional[datetime.datetime], + limit: int, + order: ListOrder, + ) -> ResultSet: + args: List[Any] = [origin, visit] + + if date_from is not None: + page_name = "token" + args.append(date_from) + else: + page_name = "no_token" + + args.append(limit) + + method_name = f"_origin_visit_get_{page_name}_{order.value}_limit" + origin_visit_get_method = getattr(self, method_name) + return origin_visit_get_method(*args) + @_prepared_insert_statement("origin_visit", _origin_visit_keys) def origin_visit_add_one(self, visit: OriginVisit, *, statement) -> None: self._add_one(statement, "origin_visit", visit, self._origin_visit_keys) diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -862,6 +862,40 @@ return PagedResult(results=visits, next_page_token=next_page_token) + def origin_visit_status_get( + self, + origin: str, + visit: int, + page_token: Optional[str] = None, + order: ListOrder = ListOrder.ASC, + limit: int = 10, + ) -> PagedResult[OriginVisitStatus]: + if not isinstance(order, ListOrder): + raise StorageArgumentException("order must be a ListOrder value") + if page_token and not isinstance(page_token, str): + raise StorageArgumentException("page_token must be a string.") + + next_page_token = None + if page_token is None: + date_from = None + else: + date_from = datetime.datetime.fromisoformat(page_token) + visit_statuses: List[OriginVisitStatus] = [] + extra_limit = limit + 1 + + rows = self._cql_runner.origin_visit_status_get_range( + origin, visit, date_from, extra_limit, order + ) + for row in rows: + visit_statuses.append(converters.row_to_visit_status(row)) + + assert len(visit_statuses) <= extra_limit + if len(visit_statuses) == extra_limit: + visit_statuses = visit_statuses[:limit] + next_page_token = str(visit_statuses[-1].date) + + return PagedResult(results=visit_statuses, next_page_token=next_page_token) + def origin_visit_find_by_date( self, origin: str, visit_date: datetime.datetime ) -> Optional[OriginVisit]: diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -574,8 +574,14 @@ row = cur.fetchone() return self._make_origin_visit_status(row) - def origin_visit_get_all( - self, origin_id, last_visit=None, order="asc", limit=None, cur=None + def origin_visit_status_get_range( + self, + origin: str, + visit: int, + date_from: Optional[datetime.datetime], + order: ListOrder, + limit: int, + cur=None, ): """Retrieve all visits for origin with id origin_id. @@ -587,34 +593,29 @@ """ cur = self._cursor(cur) - assert order.lower() in ["asc", "desc"] query_parts = [ - "SELECT DISTINCT ON (ov.visit) %s " - % ", ".join(self.origin_visit_select_cols), - "FROM origin_visit ov", - "INNER JOIN origin o ON o.id = ov.origin", - "INNER JOIN origin_visit_status ovs", - "ON ov.origin = ovs.origin AND ov.visit = ovs.visit", + f"SELECT {', '.join(self.origin_visit_status_select_cols)} " + "FROM origin_visit_status ovs ", + "INNER JOIN origin o ON o.id = ovs.origin ", ] - query_parts.append("WHERE o.url = %s") - query_params: List[Any] = [origin_id] + query_parts.append("WHERE o.url = %s AND ovs.visit = %s ") + query_params: List[Any] = [origin, visit] - if last_visit is not None: - op_comparison = ">" if order == "asc" else "<" - query_parts.append(f"and ov.visit {op_comparison} %s") - query_params.append(last_visit) + if date_from is not None: + op_comparison = ">" if order == ListOrder.ASC else "<" + query_parts.append(f"and ovs.date {op_comparison} %s ") + query_params.append(date_from) - if order == "asc": - query_parts.append("ORDER BY ov.visit ASC, ovs.date DESC") - elif order == "desc": - query_parts.append("ORDER BY ov.visit DESC, ovs.date DESC") + if order == ListOrder.ASC: + query_parts.append("ORDER BY ovs.date ASC ") + elif order == ListOrder.DESC: + query_parts.append("ORDER BY ovs.date DESC ") else: assert False - if limit is not None: - query_parts.append("LIMIT %s") - query_params.append(limit) + query_parts.append("LIMIT %s") + query_params.append(limit) query = "\n".join(query_parts) cur.execute(query, tuple(query_params)) diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -946,6 +946,46 @@ return None + def origin_visit_status_get( + self, + origin: str, + visit: int, + page_token: Optional[str] = None, + order: ListOrder = ListOrder.ASC, + limit: int = 10, + ) -> PagedResult[OriginVisitStatus]: + if not isinstance(order, ListOrder): + raise StorageArgumentException("order must be a ListOrder value") + if page_token and not isinstance(page_token, str): + raise StorageArgumentException("page_token must be a string.") + + next_page_token = None + if page_token is None: + date_from = None + else: + date_from = datetime.datetime.fromisoformat(page_token) + extra_limit = limit + 1 + visit_statuses = sorted( + self._origin_visit_statuses.get((origin, visit), []), + key=lambda v: v.date, + reverse=(order == ListOrder.DESC), + ) + + if date_from is not None: + if order == ListOrder.ASC: + visit_statuses = [v for v in visit_statuses if v.date > date_from] + elif order == ListOrder.DESC: + visit_statuses = [v for v in visit_statuses if v.date < date_from] + + visit_statuses = visit_statuses[:extra_limit] + + assert len(visit_statuses) <= extra_limit + if len(visit_statuses) == extra_limit: + visit_statuses = visit_statuses[:limit] + next_page_token = str(visit_statuses[-1].date) + + return PagedResult(results=visit_statuses, next_page_token=next_page_token) + def origin_visit_status_get_latest( self, origin_url: str, diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -891,6 +891,34 @@ """ ... + @remote_api_endpoint("origin/visit_status/get") + def origin_visit_status_get( + self, + origin: str, + visit: int, + page_token: Optional[str] = None, + order: ListOrder = ListOrder.ASC, + limit: int = 10, + ) -> PagedResult[OriginVisitStatus]: + """Retrieve page of OriginVisitStatus information. + + Args: + origin: The visited origin + visit: The visit identifier + page_token: opaque string used to get the next results of a search + order: Order on visit status objects to list (default to asc) + limit: Number of visit statuses to return + + Raises: + StorageArgumentException if the order is wrong or the page_token type is + mistyped. + + Returns: Page of OriginVisitStatus data model objects. if next_page_token is + None, there is no longer data to retrieve. + + """ + ... + @remote_api_endpoint("origin/visit_status/get_latest") def origin_visit_status_get_latest( self, diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -971,6 +971,53 @@ return visit return None + @timed + @db_transaction(statement_timeout=500) + def origin_visit_status_get( + self, + origin: str, + visit: int, + page_token: Optional[str] = None, + order: ListOrder = ListOrder.ASC, + limit: int = 10, + db=None, + cur=None, + ) -> PagedResult[OriginVisitStatus]: + if not isinstance(order, ListOrder): + raise StorageArgumentException("order must be a ListOrder value") + if page_token and not isinstance(page_token, str): + raise StorageArgumentException("page_token must be a string.") + + next_page_token = None + if page_token is None: + date_from = None + else: + date_from = datetime.datetime.fromisoformat(page_token) + visit_statuses: List[OriginVisitStatus] = [] + extra_limit = limit + 1 + for row in db.origin_visit_status_get_range( + origin, visit, date_from=date_from, order=order, limit=extra_limit, cur=cur, + ): + row_d = dict(zip(db.origin_visit_status_cols, row)) + visit_statuses.append( + OriginVisitStatus( + origin=row_d["origin"], + visit=row_d["visit"], + date=row_d["date"], + status=row_d["status"], + snapshot=row_d["snapshot"], + metadata=row_d["metadata"], + ) + ) + + assert len(visit_statuses) <= extra_limit + + if len(visit_statuses) == extra_limit: + visit_statuses = visit_statuses[:limit] + next_page_token = str(visit_statuses[-1].date) + + return PagedResult(results=visit_statuses, next_page_token=next_page_token) + @timed @db_transaction() def origin_visit_status_get_random( diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -1295,6 +1295,211 @@ ) assert actual_page == PagedResult(results=[ov1]) + def test_origin_visit_status_get__unknown_cases(self, swh_storage, sample_data): + origin = sample_data.origin + actual_page = swh_storage.origin_visit_status_get("foobar", 1) + assert actual_page.next_page_token is None + assert actual_page.results == [] + + actual_page = swh_storage.origin_visit_status_get(origin.url, 1) + assert actual_page.next_page_token is None + assert actual_page.results == [] + + origin = sample_data.origin + swh_storage.origin_add([origin]) + ov1 = swh_storage.origin_visit_add( + [ + OriginVisit( + origin=origin.url, + date=sample_data.date_visit1, + type=sample_data.type_visit1, + ), + ] + )[0] + actual_page = swh_storage.origin_visit_status_get(origin.url, ov1.visit + 10) + assert actual_page.next_page_token is None + assert actual_page.results == [] + + def test_origin_visit_status_get__validation_failure( + self, swh_storage, sample_data + ): + origin = sample_data.origin + swh_storage.origin_add([origin]) + ov1 = swh_storage.origin_visit_add( + [ + OriginVisit( + origin=origin.url, + date=sample_data.date_visit1, + type=sample_data.type_visit1, + ), + ] + )[0] + + with pytest.raises( + StorageArgumentException, match="page_token must be a string" + ): + # page_token not str + swh_storage.origin_visit_status_get(origin.url, ov1.visit, page_token=10) + + with pytest.raises( + StorageArgumentException, match="order must be a ListOrder value" + ): + # wrong order + swh_storage.origin_visit_status_get(origin.url, ov1.visit, order="foobar") + + def test_origin_visit_status_get_all(self, swh_storage, sample_data): + origin = sample_data.origin + swh_storage.origin_add([origin]) + date_visit3 = round_to_milliseconds(now()) + date_visit1 = date_visit3 - datetime.timedelta(hours=2) + date_visit2 = date_visit3 - datetime.timedelta(hours=1) + assert date_visit1 < date_visit2 + assert date_visit2 < date_visit3 + + ov1 = swh_storage.origin_visit_add( + [ + OriginVisit( + origin=origin.url, date=date_visit1, type=sample_data.type_visit1, + ), + ] + )[0] + + ovs1 = OriginVisitStatus( + origin=origin.url, + visit=ov1.visit, + date=date_visit1, + status="created", + snapshot=None, + ) + + ovs2 = OriginVisitStatus( + origin=origin.url, + visit=ov1.visit, + date=date_visit2, + status="partial", + snapshot=None, + ) + + ovs3 = OriginVisitStatus( + origin=origin.url, + visit=ov1.visit, + date=date_visit3, + status="full", + snapshot=sample_data.snapshot.id, + metadata={}, + ) + + swh_storage.origin_visit_status_add([ovs2, ovs3]) + + # order asc, no token, no limit + actual_page = swh_storage.origin_visit_status_get(origin.url, ov1.visit) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs1, ovs2, ovs3] + + # order asc, no token, limit + actual_page = swh_storage.origin_visit_status_get( + origin.url, ov1.visit, limit=2 + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovs1, ovs2] + + # order asc, token, no limit + actual_page = swh_storage.origin_visit_status_get( + origin.url, ov1.visit, page_token=next_page_token + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs3] + + # order asc, no token, limit + actual_page = swh_storage.origin_visit_status_get( + origin.url, ov1.visit, limit=1 + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovs1] + + actual_page = swh_storage.origin_visit_status_get( + origin.url, ov1.visit, page_token=next_page_token + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs2, ovs3] + + # order asc, token, limit + actual_page = swh_storage.origin_visit_status_get( + origin.url, ov1.visit, page_token=next_page_token, limit=2 + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs2, ovs3] + + # order asc, no token, limit + actual_page = swh_storage.origin_visit_status_get( + origin.url, ov1.visit, limit=2 + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovs1, ovs2] + + actual_page = swh_storage.origin_visit_status_get( + origin.url, ov1.visit, page_token=next_page_token, limit=1 + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs3] + + # order desc, no token, no limit + actual_page = swh_storage.origin_visit_status_get( + origin.url, ov1.visit, order=ListOrder.DESC + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs3, ovs2, ovs1] + + # order desc, no token, limit + actual_page = swh_storage.origin_visit_status_get( + origin.url, ov1.visit, limit=2, order=ListOrder.DESC + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovs3, ovs2] + + actual_page = swh_storage.origin_visit_status_get( + origin.url, ov1.visit, page_token=next_page_token, order=ListOrder.DESC + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs1] + + # order desc, no token, limit + actual_page = swh_storage.origin_visit_status_get( + origin.url, ov1.visit, order=ListOrder.DESC, limit=1 + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovs3] + + # order desc, token, no limit + actual_page = swh_storage.origin_visit_status_get( + origin.url, ov1.visit, page_token=next_page_token, order=ListOrder.DESC + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs2, ovs1] + + # order desc, token, limit + actual_page = swh_storage.origin_visit_status_get( + origin.url, + ov1.visit, + page_token=next_page_token, + order=ListOrder.DESC, + limit=1, + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovs2] + + actual_page = swh_storage.origin_visit_status_get( + origin.url, ov1.visit, page_token=next_page_token, order=ListOrder.DESC + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs1] + def test_origin_visit_status_get_random(self, swh_storage, sample_data): origins = sample_data.origins[:2] swh_storage.origin_add(origins)