self = <swh.storage.tests.test_cassandra.TestCassandraStorage object at 0x7f0dac2dbef0>
swh_storage = <swh.storage.validate.ValidatingProxyStorage object at 0x7f0dac1a10f0>
def test_origin_visit_status_add(self, swh_storage):
"""Correct origin visit statuses should add a new visit status
"""
origin1 = Origin.from_dict(data.origin2)
origin2 = Origin(url="new-origin")
swh_storage.origin_add([origin1, origin2])
ov1, ov2 = swh_storage.origin_visit_add(
[
OriginVisit(
origin=origin1.url,
date=data.date_visit1,
type=data.type_visit1,
status="ongoing",
snapshot=None,
),
OriginVisit(
origin=origin2.url,
date=data.date_visit2,
type=data.type_visit2,
status="ongoing",
snapshot=None,
),
]
)
snapshot_id = data.snapshot["id"]
date_visit_now = now()
visit_status1 = OriginVisitStatus(
origin=ov1.origin,
visit=ov1.visit,
date=date_visit_now,
status="full",
snapshot=snapshot_id,
)
date_visit_now = now()
visit_status2 = OriginVisitStatus(
origin=ov2.origin,
visit=ov2.visit,
date=date_visit_now,
status="ongoing",
snapshot=None,
metadata={"intrinsic": "something"},
)
swh_storage.origin_visit_status_add([visit_status1, visit_status2])
origin_visit1 = swh_storage.origin_visit_get_latest(
origin1.url, require_snapshot=True
)
assert origin_visit1
assert origin_visit1["status"] == "full"
assert origin_visit1["snapshot"] == snapshot_id
origin_visit2 = swh_storage.origin_visit_get_latest(
origin2.url, require_snapshot=False
)
assert origin2.url != origin1.url
assert origin_visit2
assert origin_visit2["status"] == "ongoing"
assert origin_visit2["snapshot"] is None
assert origin_visit2["metadata"] == {"intrinsic": "something"}
actual_objects = list(swh_storage.journal_writer.journal.objects)
expected_origins = [origin1, origin2]
expected_visits = [ov1, ov2]
expected_visit_statuses = []
for visit in expected_visits: # out of origin-visit-add calls
visit_status = visit.to_dict()
visit_status.pop("type")
expected_visit_statuses.append(OriginVisitStatus.from_dict(visit_status))
# out of origin-visit-status add calls
expected_visit_statuses += [visit_status1, visit_status2]
expected_objects = (
[("origin", o) for o in expected_origins]
+ [("origin_visit", v) for v in expected_visits]
+ [("origin_visit_status", ovs) for ovs in expected_visit_statuses]
)
> assert len(actual_objects) == len(expected_objects)
E assert 10 == 8
E +10
E -8
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_storage.py:1821: AssertionError
TEST RESULT
TEST RESULT
- Run At
- Jun 23 2020, 3:02 PM