diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -726,23 +726,31 @@ ), } - @_prepared_statement( - "SELECT * FROM origin_visit_status " - "WHERE origin = ? AND visit = ? " - "ORDER BY date DESC " - "LIMIT 1" - ) def origin_visit_status_get_latest( - self, origin: str, visit: int, *, statement + self, + origin: str, + visit: int, + allowed_statuses: Optional[List[str]] = None, + require_snapshot: bool = False, ) -> Optional[Dict[str, Any]]: - """Given an origin visit id, return its latest origin_visit_status + """Given an origin visit id, return its latest origin_visit_status. """ - rows = list(self._execute_with_retries(statement, [origin, visit])) - if rows: - return self._format_origin_visit_status_row(rows[0]) - else: + query = ( + "SELECT * FROM origin_visit_status " + "WHERE origin = %s AND visit = %s " + "ORDER BY date DESC" + ) + rows = list(self._execute_with_retries(query, [origin, visit])) + + # filtering is done python side as we cannot do it server side + if allowed_statuses: + rows = [row for row in rows if row.status in allowed_statuses] + if require_snapshot: + rows = [row for row in rows if row.snapshot is not None] + if not rows: return None + return self._format_origin_visit_status_row(rows[0]) @_prepared_statement("SELECT * FROM origin_visit WHERE origin = ? AND visit = ?") def origin_visit_get_one( diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -948,6 +948,20 @@ return latest_visit + def origin_visit_status_get_latest( + self, + origin_url: str, + visit: int, + allowed_statuses: Optional[List[str]] = None, + require_snapshot: bool = False, + ) -> Optional[OriginVisitStatus]: + visit_status = self._cql_runner.origin_visit_status_get_latest( + origin_url, visit, allowed_statuses, require_snapshot + ) + if not visit_status: + return None + return OriginVisitStatus.from_dict(visit_status) + def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -525,7 +525,18 @@ "ovs.snapshot", ] - def _make_origin_visit_status(self, row: Tuple[Any]) -> Optional[Dict[str, Any]]: + origin_visit_status_select_cols = [ + "o.url AS origin", + "ovs.visit", + "ovs.date", + "ovs.status", + "ovs.snapshot", + "ovs.metadata", + ] + + def _make_origin_visit_status( + self, row: Optional[Tuple[Any]] + ) -> Optional[Dict[str, Any]]: """Make an origin_visit_status dict out of a row """ @@ -534,21 +545,39 @@ return dict(zip(self.origin_visit_status_cols, row)) def origin_visit_status_get_latest( - self, origin: str, visit: int, cur=None + self, + origin_url: str, + visit: int, + allowed_statuses: Optional[List[str]] = None, + require_snapshot: bool = False, + cur=None, ) -> Optional[Dict[str, Any]]: """Given an origin visit id, return its latest origin_visit_status """ - cols = self.origin_visit_status_cols cur = self._cursor(cur) - cur.execute( - f"SELECT {', '.join(cols)} " - f"FROM origin_visit_status ovs " - f"INNER JOIN origin o on o.id=ovs.origin " - f"WHERE o.url=%s AND ovs.visit=%s" - f"ORDER BY ovs.date DESC LIMIT 1", - (origin, visit), - ) + + query_parts = [ + "SELECT %s" % ", ".join(self.origin_visit_status_select_cols), + "FROM origin_visit_status ovs ", + "INNER JOIN origin o ON o.id = ovs.origin", + ] + query_parts.append("WHERE o.url = %s") + query_params: List[Any] = [origin_url] + query_parts.append("AND ovs.visit = %s") + query_params.append(visit) + + if require_snapshot: + query_parts.append("AND ovs.snapshot is not null") + + if allowed_statuses: + query_parts.append("AND ovs.status IN %s") + query_params.append(tuple(allowed_statuses)) + + query_parts.append("ORDER BY ovs.date DESC LIMIT 1") + query = "\n".join(query_parts) + + cur.execute(query, tuple(query_params)) row = cur.fetchone() return self._make_origin_visit_status(row) diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -942,6 +942,30 @@ return None return visit.to_dict() + def origin_visit_status_get_latest( + self, + origin_url: str, + visit: int, + allowed_statuses: Optional[List[str]] = None, + require_snapshot: bool = False, + ) -> Optional[OriginVisitStatus]: + ori = self._origins.get(origin_url) + if not ori: + return None + + visit_key = (origin_url, visit) + visits = self._origin_visit_statuses.get(visit_key) + if not visits: + return None + + if allowed_statuses is not None: + visits = [visit for visit in visits if visit.status in allowed_statuses] + if require_snapshot: + visits = [visit for visit in visits if visit.snapshot] + + visit_status = max(visits, key=lambda v: (v.date, v.visit), default=None) + return visit_status + def _select_random_origin_visit_by_type(self, type: str) -> str: while True: url = random.choice(list(self._origin_visits.keys())) diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -871,7 +871,7 @@ ) -> Optional[Dict[str, Any]]: """Get the latest origin visit for the given origin, optionally looking only for those with one of the given allowed_statuses - or for those with a known snapshot. + or for those with a snapshot. Args: origin: origin URL @@ -896,6 +896,33 @@ """ ... + @remote_api_endpoint("origin/visit_status/get_latest") + def origin_visit_status_get_latest( + self, + origin_url: str, + visit: int, + allowed_statuses: Optional[List[str]] = None, + require_snapshot: bool = False, + ) -> Optional[OriginVisitStatus]: + """Get the latest origin visit status for the given origin visit, optionally + looking only for those with one of the given allowed_statuses or with a + snapshot. + + Args: + origin: origin URL + allowed_statuses: list of visit statuses considered + to find the latest visit. For instance, + ``allowed_statuses=['full']`` will only consider visits that + have successfully run to completion. + require_snapshot: If True, only a visit with a snapshot + will be returned. + + Returns: + The OriginVisitStatus matching the criteria + + """ + ... + @remote_api_endpoint("origin/visit/get_random") def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: """Randomly select one successful origin visit with diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -878,6 +878,24 @@ for visit_status in visit_statuses: self._origin_visit_status_add(visit_status, db, cur) + @timed + @db_transaction() + def origin_visit_status_get_latest( + self, + origin_url: str, + visit: int, + allowed_statuses: Optional[List[str]] = None, + require_snapshot: bool = False, + db=None, + cur=None, + ) -> Optional[OriginVisitStatus]: + row = db.origin_visit_status_get_latest( + origin_url, visit, allowed_statuses, require_snapshot, cur=cur + ) + if not row: + return None + return OriginVisitStatus.from_dict(row) + def _origin_visit_get_updated( self, origin: str, visit_id: int, db, cur ) -> Optional[Dict[str, Any]]: diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -2136,6 +2136,122 @@ "snapshot": data.complete_snapshot["id"], } == swh_storage.origin_visit_get_latest(origin_url, require_snapshot=True) + def test_origin_visit_status_get_latest(self, swh_storage): + origin1 = Origin.from_dict(data.origin) + swh_storage.origin_add_one(data.origin) + + # to have some reference visits + + ov1, ov2 = swh_storage.origin_visit_add( + [ + OriginVisit( + origin=origin1.url, + date=data.date_visit1, + type=data.type_visit1, + status="ongoing", + snapshot=None, + ), + OriginVisit( + origin=origin1.url, + date=data.date_visit2, + type=data.type_visit2, + status="ongoing", + snapshot=None, + ), + ] + ) + + snapshot = Snapshot.from_dict(data.complete_snapshot) + swh_storage.snapshot_add([snapshot]) + + date_now = now() + date_now = date_now.replace(microsecond=round(date_now.microsecond, -3)) + assert data.date_visit1 < data.date_visit2 + assert data.date_visit2 < date_now + + ovs1 = OriginVisitStatus( + origin=origin1.url, + visit=ov1.visit, + date=data.date_visit1, + status="partial", + snapshot=None, + ) + ovs2 = OriginVisitStatus( + origin=origin1.url, + visit=ov1.visit, + date=data.date_visit2, + status="ongoing", + snapshot=None, + ) + ovs3 = OriginVisitStatus( + origin=origin1.url, + visit=ov2.visit, + date=data.date_visit2, + status="ongoing", + snapshot=None, + ) + ovs4 = OriginVisitStatus( + origin=origin1.url, + visit=ov2.visit, + date=date_now, + status="full", + snapshot=snapshot.id, + metadata={"something": "wicked"}, + ) + + swh_storage.origin_visit_status_add([ovs1, ovs2, ovs3, ovs4]) + + # unknown origin so no result + actual_origin_visit = swh_storage.origin_visit_status_get_latest( + "unknown-origin", ov1.visit + ) + assert actual_origin_visit is None + + # unknown visit so no result + actual_origin_visit = swh_storage.origin_visit_status_get_latest( + ov1.origin, ov1.visit + 10 + ) + assert actual_origin_visit is None + + # Two visits, both with no snapshot, take the most recent + actual_origin_visit2 = swh_storage.origin_visit_status_get_latest( + origin1.url, ov1.visit + ) + assert isinstance(actual_origin_visit2, OriginVisitStatus) + assert actual_origin_visit2 == ovs2 + assert ovs2.origin == origin1.url + assert ovs2.visit == ov1.visit + + actual_origin_visit = swh_storage.origin_visit_status_get_latest( + origin1.url, ov1.visit, require_snapshot=True + ) + # there is no visit with snapshot yet for that visit + assert actual_origin_visit is None + + actual_origin_visit2 = swh_storage.origin_visit_status_get_latest( + origin1.url, ov1.visit, allowed_statuses=["partial", "ongoing"] + ) + # visit status with partial status visit elected + assert actual_origin_visit2 == ovs2 + assert actual_origin_visit2.status == "ongoing" + + actual_origin_visit4 = swh_storage.origin_visit_status_get_latest( + origin1.url, ov2.visit, require_snapshot=True + ) + assert actual_origin_visit4 == ovs4 + assert actual_origin_visit4.snapshot == snapshot.id + + actual_origin_visit = swh_storage.origin_visit_status_get_latest( + origin1.url, ov2.visit, require_snapshot=True, allowed_statuses=["ongoing"] + ) + # nothing matches so nothing + assert actual_origin_visit is None # there is no visit with status full + + actual_origin_visit3 = swh_storage.origin_visit_status_get_latest( + origin1.url, ov2.visit, allowed_statuses=["ongoing"] + ) + assert actual_origin_visit3 == ovs3 + def test_person_fullname_unicity(self, swh_storage): # given (person injection through revisions for example) revision = data.revision diff --git a/swh/storage/validate.py b/swh/storage/validate.py --- a/swh/storage/validate.py +++ b/swh/storage/validate.py @@ -106,10 +106,18 @@ releases = [Release.from_dict(r) for r in releases] return self.storage.release_add(releases) - def snapshot_add(self, snapshots: Iterable[Dict]) -> Dict: - with convert_validation_exceptions(): - snapshots = [Snapshot.from_dict(s) for s in snapshots] - return self.storage.snapshot_add(snapshots) + def snapshot_add( + self, snapshots: Union[Iterable[Dict], Iterable[Snapshot]] + ) -> Dict: + snapshots_: List[Snapshot] = [] + for s in snapshots: + if isinstance(s, Dict): + with convert_validation_exceptions(): + snapshot = Snapshot.from_dict(s) + else: + snapshot = s + snapshots_.append(snapshot) + return self.storage.snapshot_add(snapshots_) def origin_visit_add(self, visits: Iterable[OriginVisit]) -> Iterable[OriginVisit]: return self.storage.origin_visit_add(visits)