swh_storage = <swh.storage.in_memory.InMemoryStorage object at 0x7fac801c0978>
sample_data = <swh.storage.tests.storage_data.StorageData object at 0x7fac801c0a20>
def test_snapshot_get_from_revision(swh_storage, sample_data):
origin = sample_data.origin
swh_storage.origin_add([origin])
date_visit2 = now()
visit1, visit2 = sample_data.origin_visits[:2]
assert visit1.origin == origin.url
ov1, ov2 = swh_storage.origin_visit_add([visit1, visit2])
revision1, revision2, revision3 = sample_data.revisions[:3]
swh_storage.revision_add([revision1, revision2])
empty_snapshot, complete_snapshot = sample_data.snapshots[1:3]
swh_storage.snapshot_add([complete_snapshot])
# Add complete_snapshot to visit1 which targets revision1
ovs1, ovs2 = [
OriginVisitStatus(
origin=origin.url,
visit=ov1.visit,
date=date_visit2,
status="partial",
snapshot=complete_snapshot.id,
),
OriginVisitStatus(
origin=origin.url,
visit=ov2.visit,
date=now(),
status="full",
snapshot=empty_snapshot.id,
),
]
swh_storage.origin_visit_status_add([ovs1, ovs2])
assert ov1.date < ov2.date
assert ov2.date < ovs1.date
assert ovs1.date < ovs2.date
# revision3 does not exist so result is None
actual_snapshot_id = snapshot_get_from_revision(
> swh_storage, origin.url, revision3.id
)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/algos/test_snapshot.py:195:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
storage = <swh.storage.in_memory.InMemoryStorage object at 0x7fac801c0978>
origin = 'https://github.com/user1/repo1'
revision_id = b',\xbd{\xb2,e;\xbb#\xa2\x96W\x85*P\xa0\x1bY\x1dF'
def snapshot_get_from_revision(
storage: StorageInterface, origin: str, revision_id: bytes
) -> Optional[bytes]:
"""Retrieve the most recent snapshot targeting the revision_id for the given origin.
Returns
The snapshot id if found. None otherwise.
"""
revision = storage.revision_get([revision_id])
if not revision:
return None
next_page_token_visit = None
while True:
visit_page = storage.origin_visit_get(
origin, order=ListOrder.DESC, page_token=next_page_token_visit
)
next_page_token_visit = visit_page.next_page_token
for visit in visit_page.results:
next_page_token_visit_status = None
while True:
visit_status_page = storage.origin_visit_status_get(
origin,
visit.visit,
order=ListOrder.DESC,
page_token=next_page_token_visit_status,
)
next_page_token_visit_status = visit_status_page.next_page_token
for visit_status in visit_status_page.results:
snapshot_id = visit_status.snapshot
if snapshot_id is None:
continue
detail_snapshot = storage.snapshot_get(snapshot_id)
if not detail_snapshot:
continue
for branch_name, branch in detail_snapshot["branches"].items():
if (
> branch["target_type"] == TargetType.REVISION.value
and branch["target"] == revision_id
): # snapshot found
E TypeError: 'NoneType' object is not subscriptable
.tox/py3/lib/python3.7/site-packages/swh/storage/algos/snapshot.py:138: TypeError
TEST RESULT
TEST RESULT
- Run At
- Jul 30 2020, 10:10 AM