diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py index 13034d4..6aea2a9 100644 --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -1,98 +1,122 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Dict, List, Optional, Tuple import attr from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import OriginVisitStats msg_type = "origin_visit_status" def max_date(d1: Optional[datetime], d2: Optional[datetime]) -> datetime: """Return the max date of the visit stats """ if d1 is None and d2 is None: raise ValueError("At least one date should be a valid datetime") if d1 is None: assert d2 is not None # make mypy happy return d2 if d2 is None: return d1 return max(d1, d2) def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: """Read messages from origin_visit_status journal topics, then inserts them in the scheduler "origin_visit_stats" table. Worker function for `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`. """ assert set(messages) <= { msg_type }, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" assert msg_type in messages, f"Expected {msg_type} messages" origin_visit_stats: Dict[Tuple[str, str], Dict] = {} for msg_dict in messages[msg_type]: if msg_dict["status"] in ("created", "ongoing"): continue origin = msg_dict["origin"] visit_type = msg_dict["type"] empty_object = { "url": origin, "visit_type": visit_type, "last_uneventful": None, "last_eventful": None, "last_failed": None, "last_notfound": None, "last_snapshot": None, } pk = origin, visit_type if pk not in origin_visit_stats: visit_stats = scheduler.origin_visit_stats_get(origin, visit_type) origin_visit_stats[pk] = ( attr.asdict(visit_stats) if visit_stats else empty_object ) visit_stats_d = origin_visit_stats[pk] if msg_dict["status"] == "not_found": visit_stats_d["last_notfound"] = max_date( msg_dict["date"], visit_stats_d.get("last_notfound") ) elif msg_dict["snapshot"] is None: visit_stats_d["last_failed"] = max_date( msg_dict["date"], visit_stats_d.get("last_failed") ) else: # visit with snapshot, something happened - if visit_stats_d == empty_object: + if visit_stats_d["last_snapshot"] is None: + # first time visit with snapshot, we keep relevant information visit_stats_d["last_eventful"] = msg_dict["date"] visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: - date = max_date( + # visit with snapshot already stored, last_eventful should already be + # stored + assert visit_stats_d["last_eventful"] is not None + latest_recorded_visit_date = max_date( visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] ) - if date and msg_dict["date"] < date: - # ignore out of order message - continue + current_status_date = msg_dict["date"] previous_snapshot = visit_stats_d["last_snapshot"] if msg_dict["snapshot"] != previous_snapshot: - visit_stats_d["last_eventful"] = msg_dict["date"] + if ( + latest_recorded_visit_date + and current_status_date < latest_recorded_visit_date + ): + # out of order message so ignored + continue + # new eventful visit (new snapshot) + visit_stats_d["last_eventful"] = current_status_date visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: - visit_stats_d["last_uneventful"] = msg_dict["date"] + # same snapshot as before + if ( + latest_recorded_visit_date + and current_status_date < latest_recorded_visit_date + ): + # we receive an old message which is an earlier "eventful" event + # than what we had, we consider the last_eventful event as + # actually an uneventful event. The true eventful message is the + # current one + visit_stats_d["last_uneventful"] = visit_stats_d[ + "last_eventful" + ] + visit_stats_d["last_eventful"] = current_status_date + else: + # uneventful event + visit_stats_d["last_uneventful"] = current_status_date scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() ) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py index 3d1c7df..a4b838e 100644 --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -1,356 +1,479 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools +from itertools import permutations import pytest from swh.model.hashutil import hash_to_bytes from swh.scheduler.journal_client import max_date, process_journal_objects from swh.scheduler.model import OriginVisitStats from swh.scheduler.utils import utcnow def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler): process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) with pytest.raises(AssertionError, match="Got unexpected origin_visit"): process_fn({"origin_visit": [{"url": "http://foobar.baz"},]}) with pytest.raises(AssertionError, match="Expected origin_visit_status"): process_fn({}) ONE_DAY = datetime.timedelta(days=1) DATE3 = utcnow() DATE2 = DATE3 - ONE_DAY DATE1 = DATE2 - ONE_DAY assert DATE1 < DATE2 < DATE3 @pytest.mark.parametrize( "d1,d2,expected_max_date", [ (None, DATE2, DATE2), (DATE1, None, DATE1), (DATE1, DATE2, DATE2), (DATE2, DATE1, DATE2), ], ) def test_max_date(d1, d2, expected_max_date): assert max_date(d1, d2) == expected_max_date def test_max_date_raise(): with pytest.raises(ValueError, match="valid datetime"): max_date(None, None) def test_journal_client_origin_visit_status_from_journal_ignored_status(swh_scheduler): """Only final statuses (full, partial) are important, the rest remain ignored. """ visit_statuses = [ { "origin": "foo", "visit": 1, "status": "created", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 1, "status": "ongoing", "date": utcnow(), "type": "svn", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # Ensure those visit status are ignored for visit_status in visit_statuses: actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is None def test_journal_client_origin_visit_status_from_journal_last_notfound(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "not_found", "date": DATE1, "type": "git", "snapshot": None, } process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=None, last_uneventful=None, last_failed=None, last_notfound=visit_status["date"], last_snapshot=None, ) visit_statuses = [ { "origin": "foo", "visit": 4, "status": "not_found", "date": DATE3, "type": "git", "snapshot": None, }, { "origin": "foo", "visit": 3, "status": "not_found", "date": DATE2, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) for visit_status in visit_statuses: actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is not None assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=None, last_uneventful=None, last_failed=None, last_notfound=DATE3, last_snapshot=None, ) def test_journal_client_origin_visit_status_from_journal_last_failed(swh_scheduler): visit_statuses = [ { "origin": "foo", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 2, "status": "full", "date": DATE1, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # Ensure those visit status are ignored for visit_status in visit_statuses: actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is not None assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=None, last_uneventful=None, last_failed=visit_status["date"], last_notfound=None, last_snapshot=None, ) visit_statuses = [ { "origin": "bar", "visit": 3, "status": "full", "date": DATE3, "type": "git", "snapshot": None, }, { "origin": "bar", - "visit": 3, + "visit": 4, "status": "full", "date": DATE2, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) for visit_status in visit_statuses: actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is not None assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=None, last_uneventful=None, last_failed=DATE3, last_notfound=None, last_snapshot=None, ) def test_journal_client_origin_visit_status_from_journal_last_eventful(swh_scheduler): visit_statuses = [ { "origin": "bar", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "visit": 2, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("eeecc0710eb6cf9efd5b920a8453e1e07157bfff"), }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) for visit_status in visit_statuses: actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is not None assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=visit_status["date"], last_uneventful=None, last_failed=None, last_notfound=None, last_snapshot=visit_status["snapshot"], ) - most_recent_date = DATE3 - most_recent_snapshot = hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd") visit_statuses = [ { "origin": "foo", - "visit": 3, + "visit": 4, "status": "full", - "date": most_recent_date, + "date": DATE3, "type": "git", - "snapshot": most_recent_snapshot, + "snapshot": hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), }, { "origin": "foo", "visit": 3, "status": "partial", "date": DATE2, "type": "git", "snapshot": hash_to_bytes("aaacc0710eb6cf9efd5b920a8453e1e07157baaa"), }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) for visit_status in visit_statuses: actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is not None assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], - last_eventful=most_recent_date, + last_eventful=DATE3, last_uneventful=None, last_failed=None, last_notfound=None, - last_snapshot=most_recent_snapshot, + last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), ) def test_journal_client_origin_visit_status_from_journal_last_uneventful(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "full", "date": DATE3 + ONE_DAY, "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), } # Let's insert some visit stats with some previous visit information swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=DATE1, last_uneventful=DATE3, last_failed=DATE2, last_notfound=DATE1, last_snapshot=visit_status["snapshot"], ) ] ) process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is not None assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=DATE1, last_uneventful=visit_status["date"], # most recent date but uneventful last_failed=DATE2, last_notfound=DATE1, last_snapshot=visit_status["snapshot"], ) + + +VISIT_STATUSES = [ + {**ovs, "date": DATE1 + n * ONE_DAY} + for n, ovs in enumerate( + [ + { + "origin": "foo", + "type": "git", + "visit": 1, + "status": "created", + "snapshot": None, + }, + { + "origin": "foo", + "type": "git", + "visit": 1, + "status": "full", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "foo", + "type": "git", + "visit": 2, + "status": "created", + "snapshot": None, + }, + { + "origin": "foo", + "type": "git", + "visit": 2, + "status": "full", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + ] + ) +] + + +@pytest.mark.parametrize( + "visit_statuses", permutations(VISIT_STATUSES, len(VISIT_STATUSES)) +) +def test_journal_client_origin_visit_status_permutation0(visit_statuses, swh_scheduler): + """Ensure out of order topic subscription ends up in the same final state + + """ + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + expected_visit_stats = OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE1 + ONE_DAY, + last_uneventful=DATE1 + 3 * ONE_DAY, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + ) + + assert swh_scheduler.origin_visit_stats_get("foo", "git") == expected_visit_stats + + +VISIT_STATUSES1 = [ + {**ovs, "date": DATE1 + n * ONE_DAY} + for n, ovs in enumerate( + [ + { + "origin": "cavabarder", + "type": "hg", + "visit": 1, + "status": "partial", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "cavabarder", + "type": "hg", + "visit": 2, + "status": "full", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "cavabarder", + "type": "hg", + "visit": 3, + "status": "full", + "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "cavabarder", + "type": "hg", + "visit": 4, + "status": "full", + "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + }, + ] + ) +] + + +@pytest.mark.parametrize( + "visit_statuses", permutations(VISIT_STATUSES1, len(VISIT_STATUSES1)) +) +def test_journal_client_origin_visit_status_permutation1(visit_statuses, swh_scheduler): + """Ensure out of order topic subscription ends up in the same final state + + """ + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + expected_visit_stats = OriginVisitStats( + url="cavabarder", + visit_type="hg", + last_eventful=DATE1 + 2 * ONE_DAY, + last_uneventful=DATE1 + 3 * ONE_DAY, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + ) + + assert ( + swh_scheduler.origin_visit_stats_get("cavabarder", "hg") == expected_visit_stats + )