diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -840,6 +840,7 @@ def _origin_visit_status_add(self, visit_status: OriginVisitStatus) -> None: """Add an origin visit status""" + self.journal_writer.origin_visit_status_add([visit_status]) self._cql_runner.origin_visit_status_add_one(visit_status) def origin_visit_status_add( @@ -896,7 +897,8 @@ snapshot=snapshot or last_visit_update["snapshot"], metadata=metadata or last_visit_update["metadata"], ) - self._origin_visit_status_add(visit_status) + self._cql_runner.origin_visit_status_add_one(visit_status) + # self._origin_visit_status_add(visit_status) def _origin_visit_merge( self, visit: Dict[str, Any], visit_status: Dict[str, Any] @@ -957,7 +959,7 @@ snapshot=visit.snapshot, metadata=visit.metadata, ) - self._origin_visit_status_add(visit_status) + self._cql_runner.origin_visit_status_add_one(visit_status) @staticmethod def _format_origin_visit_row(visit): diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -823,6 +823,7 @@ metadata=None, visit=visit_id, ) + self.journal_writer.origin_visit_add([visit]) self._origin_visits[origin_url].append(visit) assert visit.visit is not None visit_key = (origin_url, visit.visit) @@ -836,15 +837,21 @@ snapshot=None, metadata=None, ) - self._origin_visit_statuses[visit_key] = [visit_update] - + self._origin_visit_status_add_one(visit_update) self._objects[visit_key].append(("origin_visit", None)) - self.journal_writer.origin_visit_add([visit]) - # return last visit return visit + def _origin_visit_status_add_one(self, visit_status: OriginVisitStatus) -> None: + """Add an origin visit status without checks. + + """ + self.journal_writer.origin_visit_status_add([visit_status]) + visit_key = (visit_status.origin, visit_status.visit) + self._origin_visit_statuses.setdefault(visit_key, []) + self._origin_visit_statuses[visit_key].append(visit_status) + def origin_visit_status_add( self, visit_statuses: Iterable[OriginVisitStatus], ) -> None: @@ -854,11 +861,8 @@ if not origin_url: raise StorageArgumentException(f"Unknown origin {visit_status.origin}") - # Insert for visit_status in visit_statuses: - visit_key = (visit_status.origin, visit_status.visit) - self.journal_writer.origin_visit_status_add([visit_status]) - self._origin_visit_statuses[visit_key].append(visit_status) + self._origin_visit_status_add_one(visit_status) def origin_visit_update( self, @@ -878,30 +882,41 @@ except IndexError: raise StorageArgumentException("Unknown visit_id for this origin") from None - # Retrieve the previous visit status - assert visit.visit is not None - visit_key = (origin_url, visit.visit) + updates: Dict[str, Any] = { + "status": status, + } + if metadata and metadata != visit.metadata: + updates["metadata"] = metadata + if snapshot and snapshot != visit.snapshot: + updates["snapshot"] = snapshot - last_visit_update = max( - self._origin_visit_statuses[visit_key], key=lambda v: v.date - ) + if updates: + with convert_validation_exceptions(): + updated_visit = OriginVisit.from_dict({**visit.to_dict(), **updates}) + self.journal_writer.origin_visit_update([updated_visit]) - with convert_validation_exceptions(): - visit_update = OriginVisitStatus( - origin=origin_url, - visit=visit_id, - date=date or now(), - status=status, - snapshot=snapshot or last_visit_update.snapshot, - metadata=metadata or last_visit_update.metadata, - ) - self._origin_visit_statuses[visit_key].append(visit_update) + self._origin_visits[origin_url][visit_id - 1] = updated_visit - self.journal_writer.origin_visit_update( - [self._origin_visit_get_updated(origin_url, visit_id)] - ) + # Retrieve the previous visit status + assert visit.visit is not None + visit_key = (origin_url, visit.visit) - self._origin_visits[origin_url][visit_id - 1] = visit + last_visit_status = self._origin_visit_get_updated(origin, visit_id) + assert last_visit_status is not None + + with convert_validation_exceptions(): + visit_status = OriginVisitStatus( + origin=origin_url, + visit=visit_id, + date=date or now(), + status=status, + snapshot=snapshot or last_visit_status.snapshot, + metadata=metadata or last_visit_status.metadata, + ) + visit_key = (visit_status.origin, visit_status.visit) + self._origin_visit_statuses.setdefault(visit_key, []) + self._origin_visit_statuses[visit_key].append(visit_status) + # self._origin_visit_status_add_one(visit_status) def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: for visit in visits: diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -859,6 +859,7 @@ "snapshot": None, } ) + self.journal_writer.origin_visit_add([visit]) with convert_validation_exceptions(): visit_status = OriginVisitStatus( @@ -870,18 +871,15 @@ metadata=None, ) self._origin_visit_status_add(visit_status, db=db, cur=cur) - - self.journal_writer.origin_visit_add([visit]) - send_metric("origin_visit:add", count=1, method_name="origin_visit") return visit def _origin_visit_status_add( - self, origin_visit_status: OriginVisitStatus, db, cur + self, visit_status: OriginVisitStatus, db, cur ) -> None: """Add an origin visit status""" - db.origin_visit_status_add(origin_visit_status, cur=cur) - # TODO: write to the journal the origin visit status + self.journal_writer.origin_visit_status_add([visit_status]) + db.origin_visit_status_add(visit_status, cur=cur) send_metric( "origin_visit_status:add", count=1, method_name="origin_visit_status" ) @@ -897,7 +895,6 @@ if not origin_url: raise StorageArgumentException(f"Unknown origin {visit_status.origin}") - self.journal_writer.origin_visit_status_add(visit_statuses) for visit_status in visit_statuses: self._origin_visit_status_add(visit_status, db, cur) @@ -957,7 +954,13 @@ snapshot=snapshot or last_visit_status["snapshot"], metadata=metadata or last_visit_status["metadata"], ) - self._origin_visit_status_add(visit_status, db=db, cur=cur) + db.origin_visit_status_add(visit_status, cur=cur) + send_metric( + "origin_visit_status:add", + count=1, + method_name="origin_visit_status", + ) + # self._origin_visit_status_add(visit_status, db=db, cur=cur) def _origin_visit_get_updated( self, origin: str, visit_id: int, db, cur diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py --- a/swh/storage/tests/test_kafka_writer.py +++ b/swh/storage/tests/test_kafka_writer.py @@ -45,6 +45,7 @@ "release", "snapshot", "origin", + "origin_visit_status", ): method(objs) expected_messages += len(objs) @@ -53,7 +54,7 @@ assert isinstance(obj, OriginVisit) storage.origin_add_one(Origin(url=obj.origin)) visit = method(obj.origin, date=obj.date, type=obj.type) - expected_messages += 1 + expected_messages += 1 + 1 # 1 visit + 1 visit status obj_d = obj.to_dict() for k in ("visit", "origin", "date", "type"): @@ -75,6 +76,7 @@ "directory", "origin", "origin_visit", + "origin_visit_status", "release", "revision", "snapshot", @@ -127,6 +129,7 @@ "directory", "origin", "origin_visit", + "origin_visit_status", "release", "revision", "snapshot", diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -76,6 +76,7 @@ src.origin_visit_add( origin_url=visit.origin, date=visit.date, type=visit.type ) + nb_sent += 1 # this adds origin-visit-status as well else: method = getattr(src, object_type + "_add") method(objects) @@ -119,6 +120,7 @@ src.origin_visit_add( origin_url=visit.origin, date=visit.date, type=visit.type ) + nb_sent += 1 # this adds origin-visit-status as well else: method = getattr(src, object_type + "_add") method(objects) diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -2152,7 +2152,6 @@ "origin": origin_url, "date": data.date_visit2, "visit": origin_visit1.visit, - "type": data.type_visit1, "status": "ongoing", "metadata": None, "snapshot": None, @@ -2166,9 +2165,14 @@ "metadata": None, "snapshot": None, } - assert list(swh_storage.journal_writer.journal.objects) == [ + actual_written_objects = list(swh_storage.journal_writer.journal.objects) + assert actual_written_objects == [ ("origin", Origin.from_dict(data.origin2)), - ("origin_visit", OriginVisit.from_dict(data1)), + ( + "origin_visit", + OriginVisit.from_dict({**data1, "type": data.type_visit1,}), + ), + ("origin_visit_status", OriginVisitStatus.from_dict(data1)), ("origin_visit", OriginVisit.from_dict(data2)), ] @@ -2343,6 +2347,7 @@ origin_visit1.visit, status="ongoing", snapshot=data.empty_snapshot["id"], + # date=data.date_visit2 ) by_id = swh_storage.snapshot_get(data.empty_snapshot["id"]) @@ -2355,7 +2360,6 @@ "origin": origin_url, "date": data.date_visit1, "visit": origin_visit1.visit, - "type": data.type_visit1, "status": "ongoing", "metadata": None, "snapshot": None, @@ -2364,16 +2368,24 @@ "origin": origin_url, "date": data.date_visit1, "visit": origin_visit1.visit, - "type": data.type_visit1, "status": "ongoing", "metadata": None, "snapshot": data.empty_snapshot["id"], } - assert list(swh_storage.journal_writer.journal.objects) == [ + actual_objects = list(swh_storage.journal_writer.journal.objects) + assert actual_objects == [ ("origin", Origin.from_dict(data.origin)), - ("origin_visit", OriginVisit.from_dict(data1)), + ( + "origin_visit", + OriginVisit.from_dict({**data1, "type": data.type_visit1},), + ), + ("origin_visit_status", OriginVisitStatus.from_dict(data1)), ("snapshot", Snapshot.from_dict(data.empty_snapshot)), - ("origin_visit", OriginVisit.from_dict(data2)), + ( + "origin_visit", + OriginVisit.from_dict({**data2, "type": data.type_visit1,}), + ), + # ("origin_visit_status", OriginVisitStatus.from_dict(data2)), ] def test_snapshot_add_get_complete(self, swh_storage): @@ -2731,7 +2743,7 @@ "origin": origin_url, "date": data.date_visit1, "visit": origin_visit1.visit, - "type": data.type_visit1, + # "type": data.type_visit1, "status": "ongoing", "metadata": None, "snapshot": None, @@ -2749,7 +2761,7 @@ "origin": origin_url, "date": data.date_visit2, "visit": origin_visit2.visit, - "type": data.type_visit2, + # "type": data.type_visit2, "status": "ongoing", "metadata": None, "snapshot": None, @@ -2765,10 +2777,18 @@ } assert list(swh_storage.journal_writer.journal.objects) == [ ("origin", Origin.from_dict(data.origin)), - ("origin_visit", OriginVisit.from_dict(data1)), + ( + "origin_visit", + OriginVisit.from_dict({**data1, "type": data.type_visit1}), + ), + ("origin_visit_status", OriginVisitStatus.from_dict(data1)), ("snapshot", Snapshot.from_dict(data.snapshot)), ("origin_visit", OriginVisit.from_dict(data2)), - ("origin_visit", OriginVisit.from_dict(data3)), + ( + "origin_visit", + OriginVisit.from_dict({**data3, "type": data.type_visit2}), + ), + ("origin_visit_status", OriginVisitStatus.from_dict(data3)), ("origin_visit", OriginVisit.from_dict(data4)), ]