diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -1112,6 +1112,20 @@ OriginVisitStatusRow.from_dict, self._execute_with_retries(statement, args) ) + @_prepared_select_statement( + OriginVisitStatusRow, + "WHERE origin = ? AND visit >= ? AND visit <= ? ORDER BY visit ASC, date ASC", + ) + def origin_visit_status_get_all_range( + self, origin_url: str, first_visit: int, last_visit: int, *, statement, + ) -> Iterable[OriginVisitStatusRow]: + + args = (origin_url, first_visit, last_visit) + + return map( + OriginVisitStatusRow.from_dict, self._execute_with_retries(statement, args) + ) + @_prepared_insert_statement(OriginVisitStatusRow) def origin_visit_status_add_one( self, visit_update: OriginVisitStatusRow, *, statement diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -1249,6 +1249,50 @@ return PagedResult(results=visits, next_page_token=next_page_token) + def origin_visit_status_latest_get_all( + self, + origin: str, + page_token: Optional[str] = None, + order: ListOrder = ListOrder.ASC, + limit: int = 10, + ) -> PagedResult[OriginVisitStatus]: + next_page_token = None + visit_from = None if page_token is None else int(page_token) + + visits: List[OriginVisit] = [] + extra_limit = limit + 1 + + # Take one more visit status so we can reuse it as the next page token if any + rows = self._cql_runner.origin_visit_get(origin, visit_from, extra_limit, order) + for row in rows: + visits.append(converters.row_to_visit(row)) + + assert visits[0].visit is not None + assert visits[-1].visit is not None + first_visit = min(visits[0].visit, visits[-1].visit) + last_visit = max(visits[0].visit, visits[-1].visit) + + statuses_rows = self._cql_runner.origin_visit_status_get_all_range( + origin, first_visit, last_visit + ) + visit_statuses = {} + for status_row in statuses_rows: + visit_status = converters.row_to_visit_status(status_row) + visit_statuses[visit_status.visit] = visit_status + + assert len(visits) <= extra_limit + if len(visits) == extra_limit: + # excluding that visit from the result to respect the limit size + visits = visits[:limit] + # last visit id is the next page token + next_page_token = str(visits[-1].visit) + + results = [ + visit_statuses[visit.visit] for visit in visits if visit.visit is not None + ] + + return PagedResult(results=results, next_page_token=next_page_token) + def origin_visit_status_get( self, origin: str, diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -567,6 +567,19 @@ return statuses[0:limit] + def origin_visit_status_get_all_range( + self, origin: str, first_visit: int, last_visit: int + ) -> Iterable[OriginVisitStatusRow]: + statuses = [ + s + for s in self._origin_visit_statuses.get_from_partition_key((origin,)) + if s.visit >= first_visit and s.visit <= last_visit + ] + + statuses.sort(key=lambda s: (s.visit, s.date)) + + return statuses + def origin_visit_status_add_one(self, visit_update: OriginVisitStatusRow) -> None: self._origin_visit_statuses.insert(visit_update) self.increment_counter("origin_visit_status", 1) diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -1047,6 +1047,27 @@ """ ... + @remote_api_endpoint("origin/visit_status/get_all_latest") + def origin_visit_status_latest_get_all( + self, + origin: str, + page_token: Optional[str] = None, + order: ListOrder = ListOrder.ASC, + limit: int = 10, + ) -> PagedResult[OriginVisitStatus]: + """Retrieve page of latest OriginVisitStatus information for each visit. + + Args: + origin: The visited origin + page_token: opaque string used to get the next results of a search + order: Order on visit status objects to list (default to asc) + limit: Number of visit statuses to return + + Returns: Page of OriginVisitStatus data model objects. if next_page_token is + None, there is no longer data to retrieve. + """ + ... + @remote_api_endpoint("origin/visit_status/get_random") def origin_visit_status_get_random(self, type: str) -> Optional[OriginVisitStatus]: """Randomly select one successful origin visit with diff --git a/swh/storage/postgresql/db.py b/swh/storage/postgresql/db.py --- a/swh/storage/postgresql/db.py +++ b/swh/storage/postgresql/db.py @@ -672,6 +672,37 @@ cur.execute(query, tuple(query_params)) yield from cur + def origin_visit_status_latest_get_range( + self, origin: str, visit_from: int, order: ListOrder, limit: int, cur=None, + ): + cur = self._cursor(cur) + + query_parts = [ + " SELECT DISTINCT ON (ovs.visit) ", + f"{', '.join(self.origin_visit_status_select_cols)}", + " FROM origin_visit_status ovs", + " INNER JOIN origin o ON o.id = ovs.origin", + ] + query_parts.append("WHERE o.url = %s") + query_params: List[Any] = [origin] + + if visit_from > 0: + op_comparison = ">" if order == ListOrder.ASC else "<" + query_parts.append(f"and ovs.visit {op_comparison} %s") + query_params.append(visit_from) + + if order == ListOrder.ASC: + query_parts.append("ORDER BY ovs.visit ASC, ovs.date DESC") + elif order == ListOrder.DESC: + query_parts.append("ORDER BY ovs.visit DESC, ovs.date DESC") + + query_parts.append("LIMIT %s") + query_params.append(limit) + + query = "\n".join(query_parts) + cur.execute(query, tuple(query_params)) + yield from cur + def origin_visit_get(self, origin_id, visit_id, cur=None): """Retrieve information on visit visit_id of origin origin_id. diff --git a/swh/storage/postgresql/storage.py b/swh/storage/postgresql/storage.py --- a/swh/storage/postgresql/storage.py +++ b/swh/storage/postgresql/storage.py @@ -1109,6 +1109,51 @@ return PagedResult(results=visits, next_page_token=next_page_token) + @db_transaction(statement_timeout=500) + def origin_visit_status_latest_get_all( + self, + origin: str, + page_token: Optional[str] = None, + order: ListOrder = ListOrder.ASC, + limit: int = 10, + *, + db: Db, + cur=None, + ) -> PagedResult[OriginVisitStatus]: + page_token = page_token or "0" + if not isinstance(order, ListOrder): + raise StorageArgumentException("order must be a ListOrder value") + if not isinstance(page_token, str): + raise StorageArgumentException("page_token must be a string.") + + next_page_token = None + visit_from = int(page_token) + statuses: List[OriginVisitStatus] = [] + extra_limit = limit + 1 + for row in db.origin_visit_status_latest_get_range( + origin, visit_from=visit_from, order=order, limit=extra_limit, cur=cur + ): + row_d = dict(zip(db.origin_visit_status_cols, row)) + statuses.append( + OriginVisitStatus( + origin=row_d["origin"], + visit=row_d["visit"], + date=row_d["date"], + status=row_d["status"], + snapshot=row_d["snapshot"], + metadata=row_d["metadata"], + type=row_d["type"], + ), + ) + + assert len(statuses) <= extra_limit + + if len(statuses) == extra_limit: + statuses = statuses[:limit] + next_page_token = str(statuses[-1].visit) + + return PagedResult(results=statuses, next_page_token=next_page_token) + @db_transaction(statement_timeout=1000) def origin_visit_find_by_date( self, origin: str, visit_date: datetime.datetime, *, db: Db, cur=None diff --git a/swh/storage/tests/storage_tests.py b/swh/storage/tests/storage_tests.py --- a/swh/storage/tests/storage_tests.py +++ b/swh/storage/tests/storage_tests.py @@ -1995,6 +1995,202 @@ assert actual_page.next_page_token is None assert actual_page.results == [ov1] + def test_origin_visit_status_latest_get_all(self, swh_storage, sample_data): + origin = sample_data.origin + swh_storage.origin_add([origin]) + ov1, ov2, ov3 = swh_storage.origin_visit_add( + [ + OriginVisit( + origin=origin.url, + date=sample_data.date_visit1, + type=sample_data.type_visit1, + ), + OriginVisit( + origin=origin.url, + date=sample_data.date_visit2, + type=sample_data.type_visit2, + ), + OriginVisit( + origin=origin.url, + date=sample_data.date_visit2, + type=sample_data.type_visit2, + ), + ] + ) + + swh_storage.origin_visit_status_add( + [ + OriginVisitStatus( + origin=origin.url, + visit=ov1.visit, + date=sample_data.date_visit1 + datetime.timedelta(hours=1), + type=sample_data.type_visit1, + status="created", + snapshot=None, + ), + OriginVisitStatus( + origin=origin.url, + visit=ov2.visit, + date=sample_data.date_visit2 + datetime.timedelta(hours=1), + type=sample_data.type_visit2, + status="created", + snapshot=None, + ), + OriginVisitStatus( + origin=origin.url, + visit=ov3.visit, + date=sample_data.date_visit2 + datetime.timedelta(hours=1), + type=sample_data.type_visit2, + status="created", + snapshot=None, + ), + ] + ) + + swh_storage.origin_visit_status_add( + [ + OriginVisitStatus( + origin=origin.url, + visit=ov1.visit, + date=sample_data.date_visit1 + datetime.timedelta(hours=1), + type=sample_data.type_visit1, + status="full", + snapshot=sample_data.snapshots[0].id, + ), + OriginVisitStatus( + origin=origin.url, + visit=ov2.visit, + date=sample_data.date_visit2 + datetime.timedelta(hours=1), + type=sample_data.type_visit2, + status="full", + snapshot=sample_data.snapshots[1].id, + ), + OriginVisitStatus( + origin=origin.url, + visit=ov3.visit, + date=sample_data.date_visit2 + datetime.timedelta(hours=1), + type=sample_data.type_visit2, + status="full", + snapshot=sample_data.snapshots[2].id, + ), + ] + ) + + ovs1 = swh_storage.origin_visit_status_get_latest(origin.url, visit=ov1.visit) + assert ovs1.status == "full" + + ovs2 = swh_storage.origin_visit_status_get_latest(origin.url, visit=ov2.visit) + assert ovs2.status == "full" + + ovs3 = swh_storage.origin_visit_status_get_latest(origin.url, visit=ov3.visit) + assert ovs3.status == "full" + + # order asc, no token, no limit + actual_page = swh_storage.origin_visit_status_latest_get_all(origin.url) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs1, ovs2, ovs3] + + # order asc, no token, limit + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, limit=2 + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovs1, ovs2] + + # order asc, token, no limit + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, page_token=next_page_token + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs3] + + # order asc, no token, limit + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, limit=1 + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovs1] + + # order asc, token, no limit + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, page_token=next_page_token + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs2, ovs3] + + # order asc, token, limit + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, page_token=next_page_token, limit=2 + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs2, ovs3] + + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, page_token=next_page_token, limit=1 + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovs2] + + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, page_token=next_page_token, limit=1 + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs3] + + # order desc, no token, no limit + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, order=ListOrder.DESC + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs3, ovs2, ovs1] + + # order desc, no token, limit + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, limit=2, order=ListOrder.DESC + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovs3, ovs2] + + # order desc, token, no limit + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, page_token=next_page_token, order=ListOrder.DESC + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs1] + + # order desc, no token, limit + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, limit=1, order=ListOrder.DESC + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovs3] + + # order desc, token, no limit + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, page_token=next_page_token, order=ListOrder.DESC + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs2, ovs1] + + # order desc, token, limit + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, page_token=next_page_token, order=ListOrder.DESC, limit=1 + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovs2] + + actual_page = swh_storage.origin_visit_status_latest_get_all( + origin.url, page_token=next_page_token, order=ListOrder.DESC + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovs1] + def test_origin_visit_status_get__unknown_cases(self, swh_storage, sample_data): origin = sample_data.origin actual_page = swh_storage.origin_visit_status_get("foobar", 1)