diff --git a/swh/storage/algos/origin.py b/swh/storage/algos/origin.py index 1cd7beee..bc5b2679 100644 --- a/swh/storage/algos/origin.py +++ b/swh/storage/algos/origin.py @@ -1,97 +1,129 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Iterator, List, Optional, Tuple from swh.model.model import Origin, OriginVisit, OriginVisitStatus -from swh.storage.interface import StorageInterface +from swh.storage.interface import ListOrder, StorageInterface def iter_origins( storage: StorageInterface, origin_from: int = 1, origin_to: Optional[int] = None, batch_size: int = 10000, ) -> Iterator[Origin]: """Iterates over all origins in the storage. Args: storage: the storage object used for queries. origin_from: lower interval boundary origin_to: upper interval boundary batch_size: number of origins per query Yields: origin within the boundary [origin_to, origin_from] in batch_size """ start = origin_from while True: if origin_to: origin_count = min(origin_to - start, batch_size) else: origin_count = batch_size origins = list( storage.origin_get_range(origin_from=start, origin_count=origin_count) ) if not origins: break start = origins[-1]["id"] + 1 for origin in origins: del origin["id"] yield Origin.from_dict(origin) if origin_to and start > origin_to: break def origin_get_latest_visit_status( storage: StorageInterface, origin_url: str, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ) -> Optional[Tuple[OriginVisit, OriginVisitStatus]]: """Get the latest origin visit (and status) of an origin. Optionally, a combination of criteria can be provided, origin type, allowed statuses or if a visit has a snapshot. If no visit matching the criteria is found, returns None. Otherwise, returns a tuple of origin visit, origin visit status. Args: storage: A storage backend origin: origin URL type: Optional visit type to filter on (e.g git, tar, dsc, svn, hg, npm, pypi, ...) allowed_statuses: list of visit statuses considered to find the latest visit. For instance, ``allowed_statuses=['full']`` will only consider visits that have successfully run to completion. require_snapshot: If True, only a visit with a snapshot will be returned. Returns: a tuple of (visit, visit_status) model object if the visit *and* the visit status exist (and match the search criteria), None otherwise. """ visit = storage.origin_visit_get_latest( origin_url, type=type, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, ) result: Optional[Tuple[OriginVisit, OriginVisitStatus]] = None if visit: assert visit.visit is not None visit_status = storage.origin_visit_status_get_latest( origin_url, visit.visit, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, ) if visit_status: result = visit, visit_status return result + + +def iter_origin_visits( + storage: StorageInterface, origin: str, order: ListOrder = ListOrder.ASC +) -> Iterator[OriginVisit]: + """Iter over origin visits from an origin + + """ + next_page_token = None + while True: + page = storage.origin_visit_get(origin, order=order, page_token=next_page_token) + next_page_token = page.next_page_token + yield from page.results + if page.next_page_token is None: + break + + +def iter_origin_visit_statuses( + storage: StorageInterface, origin: str, visit: int, order: ListOrder = ListOrder.ASC +) -> Iterator[OriginVisitStatus]: + """Iter over origin visit status from an origin visit + + """ + next_page_token = None + while True: + page = storage.origin_visit_status_get( + origin, visit, order=order, page_token=next_page_token + ) + next_page_token = page.next_page_token + yield from page.results + if next_page_token is None: + break diff --git a/swh/storage/algos/snapshot.py b/swh/storage/algos/snapshot.py index ade9b22b..c1d624ce 100644 --- a/swh/storage/algos/snapshot.py +++ b/swh/storage/algos/snapshot.py @@ -1,95 +1,138 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import List, Optional -from swh.model.model import Snapshot +from swh.model.model import Snapshot, TargetType -from swh.storage.algos.origin import origin_get_latest_visit_status +from swh.storage.algos.origin import ( + origin_get_latest_visit_status, + iter_origin_visits, + iter_origin_visit_statuses, +) +from swh.storage.interface import ListOrder, StorageInterface def snapshot_get_all_branches(storage, snapshot_id): """Get all the branches for a given snapshot Args: storage (swh.storage.storage.Storage): the storage instance snapshot_id (bytes): the snapshot's identifier Returns: dict: a dict with two keys: * **id**: identifier of the snapshot * **branches**: a dict of branches contained in the snapshot whose keys are the branches' names. """ ret = storage.snapshot_get(snapshot_id) if not ret: return next_branch = ret.pop("next_branch", None) while next_branch: data = storage.snapshot_get_branches(snapshot_id, branches_from=next_branch) ret["branches"].update(data["branches"]) next_branch = data.get("next_branch") return ret def snapshot_get_latest( storage, origin: str, allowed_statuses: Optional[List[str]] = None, branches_count: Optional[int] = None, ) -> Optional[Snapshot]: """Get the latest snapshot for the given origin, optionally only from visits that have one of the given allowed_statuses. The branches of the snapshot are iterated in the lexicographical order of their names. Args: storage: Storage instance origin: the origin's URL allowed_statuses: list of visit statuses considered to find the latest snapshot for the visit. For instance, ``allowed_statuses=['full']`` will only consider visits that have successfully run to completion. branches_count: Optional parameter to retrieve snapshot with all branches (default behavior when None) or not. If set to positive number, the snapshot will be partial with only that number of branches. Raises: ValueError if branches_count is not a positive value Returns: The snapshot object if one is found matching the criteria or None. """ visit_and_status = origin_get_latest_visit_status( storage, origin, allowed_statuses=allowed_statuses, require_snapshot=True, ) if not visit_and_status: return None _, visit_status = visit_and_status snapshot_id = visit_status.snapshot if not snapshot_id: return None if branches_count: # partial snapshot if not isinstance(branches_count, int) or branches_count <= 0: raise ValueError( "Parameter branches_count must be a positive integer. " f"Current value is {branches_count}" ) snapshot = storage.snapshot_get_branches( snapshot_id, branches_count=branches_count ) if snapshot is None: return None snapshot.pop("next_branch") else: snapshot = snapshot_get_all_branches(storage, snapshot_id) return Snapshot.from_dict(snapshot) if snapshot else None + + +def snapshot_id_get_from_revision( + storage: StorageInterface, origin: str, revision_id: bytes +) -> Optional[bytes]: + """Retrieve the most recent snapshot id targeting the revision_id for the given origin. + + *Warning* This is a potentially highly costly operation + + Returns + The snapshot id if found. None otherwise. + + """ + revision = storage.revision_get([revision_id]) + if not revision: + return None + + for visit in iter_origin_visits(storage, origin, order=ListOrder.DESC): + assert visit.visit is not None + for visit_status in iter_origin_visit_statuses( + storage, origin, visit.visit, order=ListOrder.DESC + ): + snapshot_id = visit_status.snapshot + if snapshot_id is None: + continue + + snapshot = snapshot_get_all_branches(storage, snapshot_id) + if not snapshot: + continue + for branch_name, branch in snapshot["branches"].items(): + if ( + branch is not None + and branch["target_type"] == TargetType.REVISION.value + and branch["target"] == revision_id + ): # snapshot found + return snapshot_id + + return None diff --git a/swh/storage/tests/algos/test_origin.py b/swh/storage/tests/algos/test_origin.py index aedb0ed5..b831906c 100644 --- a/swh/storage/tests/algos/test_origin.py +++ b/swh/storage/tests/algos/test_origin.py @@ -1,321 +1,417 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime import pytest from unittest.mock import patch from swh.model.model import Origin, OriginVisit, OriginVisitStatus -from swh.storage.algos.origin import iter_origins, origin_get_latest_visit_status +from swh.storage.algos.origin import ( + iter_origins, + origin_get_latest_visit_status, + iter_origin_visits, + iter_origin_visit_statuses, +) +from swh.storage.interface import ListOrder from swh.storage.utils import now from swh.storage.tests.test_storage import round_to_milliseconds def assert_list_eq(left, right, msg=None): assert list(left) == list(right), msg @pytest.fixture def swh_storage_backend_config(): yield { "cls": "memory", } def test_iter_origins(swh_storage): origins = [ Origin(url="bar"), Origin(url="qux"), Origin(url="quuz"), ] assert swh_storage.origin_add(origins) == {"origin:add": 3} assert_list_eq(iter_origins(swh_storage), origins) assert_list_eq(iter_origins(swh_storage, batch_size=1), origins) assert_list_eq(iter_origins(swh_storage, batch_size=2), origins) for i in range(1, 5): assert_list_eq(iter_origins(swh_storage, origin_from=i + 1), origins[i:], i) assert_list_eq( iter_origins(swh_storage, origin_from=i + 1, batch_size=1), origins[i:], i ) assert_list_eq( iter_origins(swh_storage, origin_from=i + 1, batch_size=2), origins[i:], i ) for j in range(i, 5): assert_list_eq( iter_origins(swh_storage, origin_from=i + 1, origin_to=j + 1), origins[i:j], (i, j), ) assert_list_eq( iter_origins( swh_storage, origin_from=i + 1, origin_to=j + 1, batch_size=1 ), origins[i:j], (i, j), ) assert_list_eq( iter_origins( swh_storage, origin_from=i + 1, origin_to=j + 1, batch_size=2 ), origins[i:j], (i, j), ) @patch("swh.storage.in_memory.InMemoryStorage.origin_get_range") def test_iter_origins_batch_size(mock_origin_get_range, swh_storage): mock_origin_get_range.return_value = [] list(iter_origins(swh_storage)) mock_origin_get_range.assert_called_with(origin_from=1, origin_count=10000) list(iter_origins(swh_storage, batch_size=42)) mock_origin_get_range.assert_called_with(origin_from=1, origin_count=42) def test_origin_get_latest_visit_status_none(swh_storage, sample_data): """Looking up unknown objects should return nothing """ # unknown origin so no result assert origin_get_latest_visit_status(swh_storage, "unknown-origin") is None # unknown type so no result origin = sample_data.origin origin_visit = sample_data.origin_visit assert origin_visit.origin == origin.url swh_storage.origin_add([origin]) swh_storage.origin_visit_add([origin_visit])[0] assert origin_visit.type != "unknown" actual_origin_visit = origin_get_latest_visit_status( swh_storage, origin.url, type="unknown" ) assert actual_origin_visit is None actual_origin_visit = origin_get_latest_visit_status( swh_storage, origin.url, require_snapshot=True ) assert actual_origin_visit is None actual_origin_visit = origin_get_latest_visit_status( swh_storage, origin.url, allowed_statuses=["unknown"] ) assert actual_origin_visit is None def init_storage_with_origin_visits(swh_storage, sample_data): """Initialize storage with origin/origin-visit/origin-visit-status """ snapshot = sample_data.snapshots[2] origin1, origin2 = sample_data.origins[:2] swh_storage.origin_add([origin1, origin2]) ov1, ov2 = swh_storage.origin_visit_add( [ OriginVisit( origin=origin1.url, date=sample_data.date_visit1, type=sample_data.type_visit1, ), OriginVisit( origin=origin2.url, date=sample_data.date_visit2, type=sample_data.type_visit2, ), ] ) swh_storage.snapshot_add([snapshot]) date_now = now() date_now = round_to_milliseconds(date_now) assert sample_data.date_visit1 < sample_data.date_visit2 assert sample_data.date_visit2 < date_now # origin visit status 1 for origin visit 1 ovs11 = OriginVisitStatus( origin=origin1.url, visit=ov1.visit, date=sample_data.date_visit1, status="partial", snapshot=None, ) # origin visit status 2 for origin visit 1 ovs12 = OriginVisitStatus( origin=origin1.url, visit=ov1.visit, date=sample_data.date_visit2, status="ongoing", snapshot=None, ) # origin visit status 1 for origin visit 2 ovs21 = OriginVisitStatus( origin=origin2.url, visit=ov2.visit, date=sample_data.date_visit2, status="ongoing", snapshot=None, ) # origin visit status 2 for origin visit 2 ovs22 = OriginVisitStatus( origin=origin2.url, visit=ov2.visit, date=date_now, status="full", snapshot=snapshot.id, metadata={"something": "wicked"}, ) swh_storage.origin_visit_status_add([ovs11, ovs12, ovs21, ovs22]) return { "origin": [origin1, origin2], "origin_visit": [ov1, ov2], "origin_visit_status": [ovs11, ovs12, ovs21, ovs22], } def test_origin_get_latest_visit_status_filter_type(swh_storage, sample_data): """Filtering origin visit per types should yield consistent results """ objects = init_storage_with_origin_visits(swh_storage, sample_data) origin1, origin2 = objects["origin"] ov1, ov2 = objects["origin_visit"] ovs11, ovs12, _, ovs22 = objects["origin_visit_status"] # no visit for origin1 url with type_visit2 assert ( origin_get_latest_visit_status( swh_storage, origin1.url, type=sample_data.type_visit2 ) is None ) # no visit for origin2 url with type_visit1 assert ( origin_get_latest_visit_status( swh_storage, origin2.url, type=sample_data.type_visit1 ) is None ) # Two visits, both with no snapshot, take the most recent actual_ov1, actual_ovs12 = origin_get_latest_visit_status( swh_storage, origin1.url, type=sample_data.type_visit1 ) assert isinstance(actual_ov1, OriginVisit) assert isinstance(actual_ovs12, OriginVisitStatus) assert actual_ov1.origin == ov1.origin assert actual_ov1.visit == ov1.visit assert actual_ov1.type == sample_data.type_visit1 assert actual_ovs12 == ovs12 # take the most recent visit with type_visit2 actual_ov2, actual_ovs22 = origin_get_latest_visit_status( swh_storage, origin2.url, type=sample_data.type_visit2 ) assert isinstance(actual_ov2, OriginVisit) assert isinstance(actual_ovs22, OriginVisitStatus) assert actual_ov2.origin == ov2.origin assert actual_ov2.visit == ov2.visit assert actual_ov2.type == sample_data.type_visit2 assert actual_ovs22 == ovs22 def test_origin_get_latest_visit_status_filter_status(swh_storage, sample_data): objects = init_storage_with_origin_visits(swh_storage, sample_data) origin1, origin2 = objects["origin"] ov1, ov2 = objects["origin_visit"] ovs11, ovs12, _, ovs22 = objects["origin_visit_status"] # no failed status for that visit assert ( origin_get_latest_visit_status( swh_storage, origin2.url, allowed_statuses=["failed"] ) is None ) # only 1 partial for that visit actual_ov1, actual_ovs11 = origin_get_latest_visit_status( swh_storage, origin1.url, allowed_statuses=["partial"] ) assert actual_ov1.origin == ov1.origin assert actual_ov1.visit == ov1.visit assert actual_ov1.type == sample_data.type_visit1 assert actual_ovs11 == ovs11 # both status exist, take the latest one actual_ov1, actual_ovs12 = origin_get_latest_visit_status( swh_storage, origin1.url, allowed_statuses=["partial", "ongoing"] ) assert actual_ov1.origin == ov1.origin assert actual_ov1.visit == ov1.visit assert actual_ov1.type == sample_data.type_visit1 assert actual_ovs12 == ovs12 assert isinstance(actual_ov1, OriginVisit) assert isinstance(actual_ovs12, OriginVisitStatus) assert actual_ov1.origin == ov1.origin assert actual_ov1.visit == ov1.visit assert actual_ov1.type == sample_data.type_visit1 assert actual_ovs12 == ovs12 # take the most recent visit with type_visit2 actual_ov2, actual_ovs22 = origin_get_latest_visit_status( swh_storage, origin2.url, allowed_statuses=["full"] ) assert actual_ov2.origin == ov2.origin assert actual_ov2.visit == ov2.visit assert actual_ov2.type == sample_data.type_visit2 assert actual_ovs22 == ovs22 def test_origin_get_latest_visit_status_filter_snapshot(swh_storage, sample_data): objects = init_storage_with_origin_visits(swh_storage, sample_data) origin1, origin2 = objects["origin"] _, ov2 = objects["origin_visit"] _, _, _, ovs22 = objects["origin_visit_status"] # there is no visit with snapshot yet for that visit assert ( origin_get_latest_visit_status(swh_storage, origin1.url, require_snapshot=True) is None ) # visit status with partial status visit elected actual_ov2, actual_ovs22 = origin_get_latest_visit_status( swh_storage, origin2.url, require_snapshot=True ) assert actual_ov2.origin == ov2.origin assert actual_ov2.visit == ov2.visit assert actual_ov2.type == ov2.type assert actual_ovs22 == ovs22 date_now = now() # Add another visit swh_storage.origin_visit_add( [OriginVisit(origin=origin2.url, date=date_now, type=sample_data.type_visit2,),] ) # Requiring the latest visit with a snapshot, we still find the previous visit ov2, ovs22 = origin_get_latest_visit_status( swh_storage, origin2.url, require_snapshot=True ) assert actual_ov2.origin == ov2.origin assert actual_ov2.visit == ov2.visit assert actual_ov2.type == ov2.type assert actual_ovs22 == ovs22 + + +def test_iter_origin_visits(swh_storage, sample_data): + """Iter over origin visits for an origin returns all visits""" + origin1, origin2 = sample_data.origins[:2] + swh_storage.origin_add([origin1, origin2]) + + date_past = now() - datetime.timedelta(weeks=20) + + new_visits = [] + for visit_id in range(20): + new_visits.append( + OriginVisit( + origin=origin1.url, + date=date_past + datetime.timedelta(days=visit_id), + type="git", + ) + ) + + visits = swh_storage.origin_visit_add(new_visits) + reversed_visits = list(reversed(visits)) + + # no limit, order asc + actual_visits = list(iter_origin_visits(swh_storage, origin1.url)) + assert actual_visits == visits + + # no limit, order desc + actual_visits = list( + iter_origin_visits(swh_storage, origin1.url, order=ListOrder.DESC) + ) + assert actual_visits == reversed_visits + + # no result + actual_visits = list(iter_origin_visits(swh_storage, origin2.url)) + assert actual_visits == [] + + +def test_iter_origin_visit_status(swh_storage, sample_data): + origin1, origin2 = sample_data.origins[:2] + swh_storage.origin_add([origin1]) + + ov1 = swh_storage.origin_visit_add([sample_data.origin_visit])[0] + assert ov1.origin == origin1.url + + date_past = now() - datetime.timedelta(weeks=20) + + ovs1 = OriginVisitStatus( + origin=origin1.url, + visit=ov1.visit, + date=ov1.date, + status="created", + snapshot=None, + ) + new_visit_statuses = [ovs1] + for i in range(20): + status_date = date_past + datetime.timedelta(days=i) + + new_visit_statuses.append( + OriginVisitStatus( + origin=origin1.url, + visit=ov1.visit, + date=status_date, + status="created", + snapshot=None, + ) + ) + + visit_statuses = swh_storage.origin_visit_add(new_visit_statuses) + reversed_visit_statuses = list(reversed(visit_statuses)) + + # order asc + actual_visit_statuses = list( + iter_origin_visit_statuses(swh_storage, ov1.origin, ov1.visit) + ) + assert actual_visit_statuses == visit_statuses + + # order desc + actual_visit_statuses = list( + iter_origin_visit_statuses( + swh_storage, ov1.origin, ov1.visit, order=ListOrder.DESC + ) + ) + assert actual_visit_statuses == reversed_visit_statuses + + # no result + actual_visit_statuses = list( + iter_origin_visit_statuses(swh_storage, origin2.url, ov1.visit) + ) + assert actual_visit_statuses == [] diff --git a/swh/storage/tests/algos/test_snapshot.py b/swh/storage/tests/algos/test_snapshot.py index 16646222..d2974a3a 100644 --- a/swh/storage/tests/algos/test_snapshot.py +++ b/swh/storage/tests/algos/test_snapshot.py @@ -1,147 +1,209 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given import pytest from swh.model.collections import ImmutableDict from swh.model.hypothesis_strategies import snapshots, branch_names, branch_targets from swh.model.model import OriginVisit, OriginVisitStatus, Snapshot -from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest +from swh.storage.algos.snapshot import ( + snapshot_get_all_branches, + snapshot_get_latest, + snapshot_id_get_from_revision, +) from swh.storage.utils import now @pytest.fixture def swh_storage_backend_config(): yield { "cls": "memory", "journal_writer": None, } @given(snapshot=snapshots(min_size=0, max_size=10, only_objects=False)) def test_snapshot_small(swh_storage, snapshot): # noqa swh_storage.snapshot_add([snapshot]) returned_snapshot = snapshot_get_all_branches(swh_storage, snapshot.id) assert snapshot.to_dict() == returned_snapshot @given(branch_name=branch_names(), branch_target=branch_targets(only_objects=True)) def test_snapshot_large(swh_storage, branch_name, branch_target): # noqa snapshot = Snapshot( branches=ImmutableDict( (b"%s%05d" % (branch_name, i), branch_target) for i in range(10000) ), ) swh_storage.snapshot_add([snapshot]) returned_snapshot = snapshot_get_all_branches(swh_storage, snapshot.id) assert snapshot.to_dict() == returned_snapshot def test_snapshot_get_latest_none(swh_storage, sample_data): """Retrieve latest snapshot on unknown origin or origin without snapshot should yield no result """ # unknown origin so None assert snapshot_get_latest(swh_storage, "unknown-origin") is None # no snapshot on origin visit so None origin = sample_data.origin swh_storage.origin_add([origin]) origin_visit, origin_visit2 = sample_data.origin_visits[:2] assert origin_visit.origin == origin.url swh_storage.origin_visit_add([origin_visit]) assert snapshot_get_latest(swh_storage, origin.url) is None ov1 = swh_storage.origin_visit_get_latest(origin.url) assert ov1 is not None # visit references a snapshot but the snapshot does not exist in backend for some # reason complete_snapshot = sample_data.snapshots[2] swh_storage.origin_visit_status_add( [ OriginVisitStatus( origin=origin.url, visit=ov1.visit, date=origin_visit2.date, status="partial", snapshot=complete_snapshot.id, ) ] ) # so we do not find it assert snapshot_get_latest(swh_storage, origin.url) is None assert snapshot_get_latest(swh_storage, origin.url, branches_count=1) is None def test_snapshot_get_latest(swh_storage, sample_data): origin = sample_data.origin swh_storage.origin_add([origin]) visit1, visit2 = sample_data.origin_visits[:2] assert visit1.origin == origin.url swh_storage.origin_visit_add([visit1]) ov1 = swh_storage.origin_visit_get_latest(origin.url) # Add snapshot to visit1, latest snapshot = visit 1 snapshot complete_snapshot = sample_data.snapshots[2] swh_storage.snapshot_add([complete_snapshot]) swh_storage.origin_visit_status_add( [ OriginVisitStatus( origin=origin.url, visit=ov1.visit, date=visit2.date, status="partial", snapshot=None, ) ] ) assert visit1.date < visit2.date # no snapshot associated to the visit, so None actual_snapshot = snapshot_get_latest( swh_storage, origin.url, allowed_statuses=["partial"] ) assert actual_snapshot is None date_now = now() assert visit2.date < date_now swh_storage.origin_visit_status_add( [ OriginVisitStatus( origin=origin.url, visit=ov1.visit, date=date_now, status="full", snapshot=complete_snapshot.id, ) ] ) swh_storage.origin_visit_add( [OriginVisit(origin=origin.url, date=now(), type=visit1.type,)] ) actual_snapshot = snapshot_get_latest(swh_storage, origin.url) assert actual_snapshot is not None assert actual_snapshot == complete_snapshot actual_snapshot = snapshot_get_latest(swh_storage, origin.url, branches_count=1) assert actual_snapshot is not None assert actual_snapshot.id == complete_snapshot.id assert len(actual_snapshot.branches.values()) == 1 with pytest.raises(ValueError, match="branches_count must be a positive integer"): snapshot_get_latest(swh_storage, origin.url, branches_count="something-wrong") + + +def test_snapshot_get_id_from_revision(swh_storage, sample_data): + origin = sample_data.origin + swh_storage.origin_add([origin]) + + date_visit2 = now() + visit1, visit2 = sample_data.origin_visits[:2] + assert visit1.origin == origin.url + + ov1, ov2 = swh_storage.origin_visit_add([visit1, visit2]) + + revision1, revision2, revision3 = sample_data.revisions[:3] + swh_storage.revision_add([revision1, revision2]) + + empty_snapshot, complete_snapshot = sample_data.snapshots[1:3] + swh_storage.snapshot_add([complete_snapshot]) + + # Add complete_snapshot to visit1 which targets revision1 + ovs1, ovs2 = [ + OriginVisitStatus( + origin=origin.url, + visit=ov1.visit, + date=date_visit2, + status="partial", + snapshot=complete_snapshot.id, + ), + OriginVisitStatus( + origin=origin.url, + visit=ov2.visit, + date=now(), + status="full", + snapshot=empty_snapshot.id, + ), + ] + + swh_storage.origin_visit_status_add([ovs1, ovs2]) + assert ov1.date < ov2.date + assert ov2.date < ovs1.date + assert ovs1.date < ovs2.date + + # revision3 does not exist so result is None + actual_snapshot_id = snapshot_id_get_from_revision( + swh_storage, origin.url, revision3.id + ) + assert actual_snapshot_id is None + + # no snapshot targets revision2 for origin.url so result is None + actual_snapshot_id = snapshot_id_get_from_revision( + swh_storage, origin.url, revision2.id + ) + assert actual_snapshot_id is None + + # complete_snapshot targets at least revision1 + actual_snapshot_id = snapshot_id_get_from_revision( + swh_storage, origin.url, revision1.id + ) + assert actual_snapshot_id == complete_snapshot.id