diff --git a/swh/storage/api/serializers.py b/swh/storage/api/serializers.py --- a/swh/storage/api/serializers.py +++ b/swh/storage/api/serializers.py @@ -1,11 +1,11 @@ -# Copyright (C) 2020-2021 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Decoder and encoders for swh-model objects.""" -from typing import Callable, Dict, List, Tuple +from typing import Any, Callable, Dict, List, Tuple from swh.model import model, swhids from swh.storage import interface @@ -24,6 +24,24 @@ } +def _encode_origin_visit_with_statuses( + ovws: interface.OriginVisitWithStatuses, +) -> Dict[str, Any]: + return { + "visit": ovws.visit.to_dict(), + "statuses": [status.to_dict() for status in ovws.statuses], + } + + +def _decode_origin_visit_with_statuses( + ovws: Dict[str, Any], +) -> interface.OriginVisitWithStatuses: + return interface.OriginVisitWithStatuses( + visit=model.OriginVisit(**ovws["visit"]), + statuses=[model.OriginVisitStatus(**status) for status in ovws["statuses"]], + ) + + def _decode_model_enum(d): return getattr(model, d.pop("__type__"))(d["value"]) @@ -45,6 +63,11 @@ (swhids.ObjectType, "identifiers_enum", _encode_enum), (model.MetadataAuthorityType, "model_enum", _encode_enum), (interface.ListOrder, "storage_enum", _encode_enum), + ( + interface.OriginVisitWithStatuses, + "origin_visit_with_statuses", + _encode_origin_visit_with_statuses, + ), ] @@ -57,4 +80,5 @@ "swhids_enum": _decode_swhids_enum, "model_enum": _decode_model_enum, "storage_enum": _decode_storage_enum, + "origin_visit_with_statuses": _decode_origin_visit_with_statuses, } diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -1112,6 +1112,20 @@ OriginVisitStatusRow.from_dict, self._execute_with_retries(statement, args) ) + @_prepared_select_statement( + OriginVisitStatusRow, + "WHERE origin = ? AND visit >= ? AND visit <= ? ORDER BY visit ASC, date ASC", + ) + def origin_visit_status_get_all_range( + self, origin_url: str, visit_from: int, visit_to: int, *, statement, + ) -> Iterable[OriginVisitStatusRow]: + + args = (origin_url, visit_from, visit_to) + + return map( + OriginVisitStatusRow.from_dict, self._execute_with_retries(statement, args) + ) + @_prepared_insert_statement(OriginVisitStatusRow) def origin_visit_status_add_one( self, visit_update: OriginVisitStatusRow, *, statement diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -4,7 +4,7 @@ # See top-level LICENSE file for more information import base64 -import collections +from collections import defaultdict import datetime import itertools import operator @@ -54,6 +54,7 @@ from swh.storage.interface import ( VISIT_STATUSES, ListOrder, + OriginVisitWithStatuses, PagedResult, PartialBranches, Sha1, @@ -406,7 +407,7 @@ # Bucket the known contents by hash found_contents_by_hash: Dict[str, Dict[str, list]] = { - algo: collections.defaultdict(list) for algo in DEFAULT_ALGORITHMS + algo: defaultdict(list) for algo in DEFAULT_ALGORITHMS } for found_content in found_contents: for algo in DEFAULT_ALGORITHMS: @@ -1249,6 +1250,57 @@ return PagedResult(results=visits, next_page_token=next_page_token) + def origin_visit_get_with_statuses( + self, + origin: str, + allowed_statuses: Optional[List[str]] = None, + require_snapshot: bool = False, + page_token: Optional[str] = None, + order: ListOrder = ListOrder.ASC, + limit: int = 10, + ) -> PagedResult[OriginVisitWithStatuses]: + next_page_token = None + visit_from = None if page_token is None else int(page_token) + extra_limit = limit + 1 + + # First get visits (plus one so we can use it as the next page token if any) + rows = self._cql_runner.origin_visit_get(origin, visit_from, extra_limit, order) + visits: List[OriginVisit] = [converters.row_to_visit(row) for row in rows] + + assert visits[0].visit is not None + assert visits[-1].visit is not None + visit_from = min(visits[0].visit, visits[-1].visit) + visit_to = max(visits[0].visit, visits[-1].visit) + + # Then, fetch all statuses associated to these visits + statuses_rows = self._cql_runner.origin_visit_status_get_all_range( + origin, visit_from, visit_to + ) + visit_statuses: Dict[int, List[OriginVisitStatus]] = defaultdict(list) + for status_row in statuses_rows: + if allowed_statuses and status_row.status not in allowed_statuses: + continue + if require_snapshot and status_row.snapshot is None: + continue + visit_status = converters.row_to_visit_status(status_row) + visit_statuses[visit_status.visit].append(visit_status) + + # Add pagination if there are more visits + assert len(visits) <= extra_limit + if len(visits) == extra_limit: + # excluding that visit from the result to respect the limit size + visits = visits[:limit] + # last visit id is the next page token + next_page_token = str(visits[-1].visit) + + results = [ + OriginVisitWithStatuses(visit=visit, statuses=visit_statuses[visit.visit]) + for visit in visits + if visit.visit is not None + ] + + return PagedResult(results=results, next_page_token=next_page_token) + def origin_visit_status_get( self, origin: str, diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2021 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -567,6 +567,19 @@ return statuses[0:limit] + def origin_visit_status_get_all_range( + self, origin: str, first_visit: int, last_visit: int + ) -> Iterable[OriginVisitStatusRow]: + statuses = [ + s + for s in self._origin_visit_statuses.get_from_partition_key((origin,)) + if s.visit >= first_visit and s.visit <= last_visit + ] + + statuses.sort(key=lambda s: (s.visit, s.date)) + + return statuses + def origin_visit_status_add_one(self, visit_update: OriginVisitStatusRow) -> None: self._origin_visit_statuses.insert(visit_update) self.increment_counter("origin_visit_status", 1) diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2021 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,6 +7,7 @@ from enum import Enum from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar +import attr from typing_extensions import Protocol, TypedDict, runtime_checkable from swh.core.api import remote_api_endpoint @@ -54,6 +55,12 @@ the snapshot has less than the request number of branches.""" +@attr.s +class OriginVisitWithStatuses: + visit = attr.ib(type=OriginVisit) + statuses = attr.ib(type=List[OriginVisitStatus]) + + TResult = TypeVar("TResult") PagedResult = CorePagedResult[TResult, str] @@ -1047,6 +1054,37 @@ """ ... + @remote_api_endpoint("origin/visit_status/get_all_latest") + def origin_visit_get_with_statuses( + self, + origin: str, + allowed_statuses: Optional[List[str]] = None, + require_snapshot: bool = False, + page_token: Optional[str] = None, + order: ListOrder = ListOrder.ASC, + limit: int = 10, + ) -> PagedResult[OriginVisitWithStatuses]: + """Retrieve page of origin visits and all their statuses. + + Origin visit statuses are always sorted in ascending order of their dates. + + Args: + origin: The visited origin URL + allowed_statuses: Only visit statuses matching that list will be returned. + If empty, all visit statuses will be returned. Possible status values + are ``created``, ``not_found``, ``ongoing``, ``failed``, ``partial`` + and ``full``. + require_snapshot: If :const:`True`, only visit statuses with a snapshot + will be returned. + page_token: opaque string used to get the next results + order: Order on visit objects to list (default to asc) + limit: Number of visits with their statuses to return + + Returns: Page of OriginVisitWithStatuses objects. if next_page_token is + None, there is no longer data to retrieve. + """ + ... + @remote_api_endpoint("origin/visit_status/get_random") def origin_visit_status_get_random(self, type: str) -> Optional[OriginVisitStatus]: """Randomly select one successful origin visit with diff --git a/swh/storage/postgresql/db.py b/swh/storage/postgresql/db.py --- a/swh/storage/postgresql/db.py +++ b/swh/storage/postgresql/db.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2021 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -672,6 +672,46 @@ cur.execute(query, tuple(query_params)) yield from cur + def origin_visit_status_get_all_in_range( + self, + origin: str, + allowed_statuses: Optional[List[str]], + require_snapshot: bool, + visit_from: int, + visit_to: int, + cur=None, + ): + cur = self._cursor(cur) + + query_parts = [ + f"SELECT {', '.join(self.origin_visit_status_select_cols)}", + " FROM origin_visit_status ovs", + " INNER JOIN origin o ON o.id = ovs.origin", + ] + query_parts.append("WHERE o.url = %s") + query_params: List[Any] = [origin] + + assert visit_from <= visit_to + + query_parts.append("AND ovs.visit >= %s") + query_params.append(visit_from) + + query_parts.append("AND ovs.visit <= %s") + query_params.append(visit_to) + + if require_snapshot: + query_parts.append("AND ovs.snapshot is not null") + + if allowed_statuses: + query_parts.append("AND ovs.status IN %s") + query_params.append(tuple(allowed_statuses)) + + query_parts.append("ORDER BY ovs.visit ASC, ovs.date ASC") + + query = "\n".join(query_parts) + cur.execute(query, tuple(query_params)) + yield from cur + def origin_visit_get(self, origin_id, visit_id, cur=None): """Retrieve information on visit visit_id of origin origin_id. diff --git a/swh/storage/postgresql/storage.py b/swh/storage/postgresql/storage.py --- a/swh/storage/postgresql/storage.py +++ b/swh/storage/postgresql/storage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2021 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -47,6 +47,7 @@ from swh.storage.interface import ( VISIT_STATUSES, ListOrder, + OriginVisitWithStatuses, PagedResult, PartialBranches, ) @@ -1138,6 +1139,76 @@ return PagedResult(results=visits, next_page_token=next_page_token) + @db_transaction(statement_timeout=500) + def origin_visit_get_with_statuses( + self, + origin: str, + allowed_statuses: Optional[List[str]] = None, + require_snapshot: bool = False, + page_token: Optional[str] = None, + order: ListOrder = ListOrder.ASC, + limit: int = 10, + *, + db: Db, + cur=None, + ) -> PagedResult[OriginVisitWithStatuses]: + page_token = page_token or "0" + if not isinstance(order, ListOrder): + raise StorageArgumentException("order must be a ListOrder value") + if not isinstance(page_token, str): + raise StorageArgumentException("page_token must be a string.") + + # First get visits (plus one so we can use it as the next page token if any) + visits_page = self.origin_visit_get( + origin=origin, + page_token=page_token, + order=order, + limit=limit, + db=db, + cur=cur, + ) + + visits = visits_page.results + next_page_token = visits_page.next_page_token + + if visits: + + visit_from = min(visits[0].visit, visits[-1].visit) + visit_to = max(visits[0].visit, visits[-1].visit) + + # Then, fetch all statuses associated to these visits + visit_statuses: Dict[int, List[OriginVisitStatus]] = defaultdict(list) + for row in db.origin_visit_status_get_all_in_range( + origin, + allowed_statuses, + require_snapshot, + visit_from=visit_from, + visit_to=visit_to, + cur=cur, + ): + row_d = dict(zip(db.origin_visit_status_cols, row)) + + visit_statuses[row_d["visit"]].append( + OriginVisitStatus( + origin=row_d["origin"], + visit=row_d["visit"], + date=row_d["date"], + status=row_d["status"], + snapshot=row_d["snapshot"], + metadata=row_d["metadata"], + type=row_d["type"], + ), + ) + + results = [ + OriginVisitWithStatuses( + visit=visit, statuses=visit_statuses[visit.visit] + ) + for visit in visits + ] + + return PagedResult(results=results, next_page_token=next_page_token) + @db_transaction(statement_timeout=1000) def origin_visit_find_by_date( self, origin: str, visit_date: datetime.datetime, *, db: Db, cur=None diff --git a/swh/storage/tests/storage_tests.py b/swh/storage/tests/storage_tests.py --- a/swh/storage/tests/storage_tests.py +++ b/swh/storage/tests/storage_tests.py @@ -45,7 +45,12 @@ from swh.storage.common import origin_url_to_sha1 as sha1 from swh.storage.exc import HashCollision, StorageArgumentException from swh.storage.in_memory import InMemoryStorage -from swh.storage.interface import ListOrder, PagedResult, StorageInterface +from swh.storage.interface import ( + ListOrder, + OriginVisitWithStatuses, + PagedResult, + StorageInterface, +) from swh.storage.tests.conftest import function_scoped_fixture_check from swh.storage.utils import ( content_hex_hashes, @@ -1995,6 +2000,301 @@ assert actual_page.next_page_token is None assert actual_page.results == [ov1] + @pytest.mark.parametrize( + "allowed_statuses,require_snapshot", + [ + ([], False), + (["failed"], False), + (["failed", "full"], False), + ([], True), + (["failed"], True), + (["failed", "full"], True), + ], + ) + def test_origin_visit_get_with_statuses( + self, swh_storage, sample_data, allowed_statuses, require_snapshot + ): + origin = sample_data.origin + swh_storage.origin_add([origin]) + ov1, ov2, ov3 = swh_storage.origin_visit_add( + [ + OriginVisit( + origin=origin.url, + date=sample_data.date_visit1, + type=sample_data.type_visit1, + ), + OriginVisit( + origin=origin.url, + date=sample_data.date_visit2, + type=sample_data.type_visit2, + ), + OriginVisit( + origin=origin.url, + date=sample_data.date_visit2, + type=sample_data.type_visit2, + ), + ] + ) + + swh_storage.origin_visit_status_add( + [ + OriginVisitStatus( + origin=origin.url, + visit=ov1.visit, + date=sample_data.date_visit1 + datetime.timedelta(hours=1), + type=sample_data.type_visit1, + status="failed", + snapshot=None, + ), + OriginVisitStatus( + origin=origin.url, + visit=ov2.visit, + date=sample_data.date_visit2 + datetime.timedelta(hours=1), + type=sample_data.type_visit2, + status="failed", + snapshot=None, + ), + OriginVisitStatus( + origin=origin.url, + visit=ov3.visit, + date=sample_data.date_visit2 + datetime.timedelta(hours=1), + type=sample_data.type_visit2, + status="failed", + snapshot=None, + ), + ] + ) + + swh_storage.origin_visit_status_add( + [ + OriginVisitStatus( + origin=origin.url, + visit=ov1.visit, + date=sample_data.date_visit1 + datetime.timedelta(hours=2), + type=sample_data.type_visit1, + status="full", + snapshot=sample_data.snapshots[0].id, + ), + OriginVisitStatus( + origin=origin.url, + visit=ov2.visit, + date=sample_data.date_visit2 + datetime.timedelta(hours=2), + type=sample_data.type_visit2, + status="full", + snapshot=sample_data.snapshots[1].id, + ), + OriginVisitStatus( + origin=origin.url, + visit=ov3.visit, + date=sample_data.date_visit2 + datetime.timedelta(hours=2), + type=sample_data.type_visit2, + status="full", + snapshot=sample_data.snapshots[2].id, + ), + ] + ) + + ov1_statuses = swh_storage.origin_visit_status_get( + origin.url, visit=ov1.visit + ).results + + ov2_statuses = swh_storage.origin_visit_status_get( + origin.url, visit=ov2.visit + ).results + + ov3_statuses = swh_storage.origin_visit_status_get( + origin.url, visit=ov3.visit + ).results + + def _filter_statuses(ov_statuses): + if allowed_statuses: + ov_statuses = [ + ovs for ovs in ov_statuses if ovs.status in allowed_statuses + ] + assert [ovs.status for ovs in ov_statuses] == allowed_statuses + else: + assert [ovs.status for ovs in ov_statuses] == [ + "created", + "failed", + "full", + ] + if require_snapshot: + ov_statuses = [ovs for ovs in ov_statuses if ovs.snapshot is not None] + return ov_statuses + + ov1_statuses = _filter_statuses(ov1_statuses) + ov2_statuses = _filter_statuses(ov2_statuses) + ov3_statuses = _filter_statuses(ov3_statuses) + + ovws1 = OriginVisitWithStatuses(visit=ov1, statuses=ov1_statuses) + ovws2 = OriginVisitWithStatuses(visit=ov2, statuses=ov2_statuses) + ovws3 = OriginVisitWithStatuses(visit=ov3, statuses=ov3_statuses) + + # order asc, no token, no limit + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovws1, ovws2, ovws3] + + # order asc, no token, limit + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + limit=2, + ) + next_page_token = actual_page.next_page_token + assert len(actual_page.results) == 2 + assert next_page_token is not None + assert actual_page.results == [ovws1, ovws2] + + # order asc, token, no limit + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + page_token=next_page_token, + ) + + assert actual_page.next_page_token is None + assert actual_page.results == [ovws3] + + # order asc, no token, limit + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + limit=1, + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovws1] + + # order asc, token, no limit + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + page_token=next_page_token, + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovws2, ovws3] + + # order asc, token, limit + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + page_token=next_page_token, + limit=2, + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovws2, ovws3] + + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + page_token=next_page_token, + limit=1, + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovws2] + + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + page_token=next_page_token, + limit=1, + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovws3] + + # order desc, no token, no limit + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + order=ListOrder.DESC, + ) + + assert actual_page.next_page_token is None + assert actual_page.results == [ovws3, ovws2, ovws1] + + # order desc, no token, limit + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + limit=2, + order=ListOrder.DESC, + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovws3, ovws2] + + # order desc, token, no limit + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + page_token=next_page_token, + order=ListOrder.DESC, + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovws1] + + # order desc, no token, limit + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + limit=1, + order=ListOrder.DESC, + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovws3] + + # order desc, token, no limit + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + page_token=next_page_token, + order=ListOrder.DESC, + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovws2, ovws1] + + # order desc, token, limit + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + page_token=next_page_token, + order=ListOrder.DESC, + limit=1, + ) + next_page_token = actual_page.next_page_token + assert next_page_token is not None + assert actual_page.results == [ovws2] + + actual_page = swh_storage.origin_visit_get_with_statuses( + origin.url, + allowed_statuses=allowed_statuses, + require_snapshot=require_snapshot, + page_token=next_page_token, + order=ListOrder.DESC, + ) + assert actual_page.next_page_token is None + assert actual_page.results == [ovws1] + def test_origin_visit_status_get__unknown_cases(self, swh_storage, sample_data): origin = sample_data.origin actual_page = swh_storage.origin_visit_status_get("foobar", 1)