diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py index 6aea2a9..ff5294c 100644 --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -1,122 +1,127 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Dict, List, Optional, Tuple import attr from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import OriginVisitStats msg_type = "origin_visit_status" -def max_date(d1: Optional[datetime], d2: Optional[datetime]) -> datetime: - """Return the max date of the visit stats +def max_date(*dates: Optional[datetime]) -> datetime: + """Return the max date of given (possibly None) dates + At least one date must be not None. """ - if d1 is None and d2 is None: + datesok: Tuple[datetime, ...] = tuple(d for d in dates if d is not None) + if not datesok: raise ValueError("At least one date should be a valid datetime") - if d1 is None: - assert d2 is not None # make mypy happy - return d2 - if d2 is None: - return d1 - return max(d1, d2) + + maxdate = datesok[0] + if len(datesok) == 1: + return maxdate + + for d in datesok[1:]: + maxdate = max(d, maxdate) + + return maxdate def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: """Read messages from origin_visit_status journal topics, then inserts them in the scheduler "origin_visit_stats" table. Worker function for `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`. """ assert set(messages) <= { msg_type }, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" assert msg_type in messages, f"Expected {msg_type} messages" origin_visit_stats: Dict[Tuple[str, str], Dict] = {} for msg_dict in messages[msg_type]: if msg_dict["status"] in ("created", "ongoing"): continue origin = msg_dict["origin"] visit_type = msg_dict["type"] empty_object = { "url": origin, "visit_type": visit_type, "last_uneventful": None, "last_eventful": None, "last_failed": None, "last_notfound": None, "last_snapshot": None, } pk = origin, visit_type if pk not in origin_visit_stats: visit_stats = scheduler.origin_visit_stats_get(origin, visit_type) origin_visit_stats[pk] = ( attr.asdict(visit_stats) if visit_stats else empty_object ) visit_stats_d = origin_visit_stats[pk] if msg_dict["status"] == "not_found": visit_stats_d["last_notfound"] = max_date( msg_dict["date"], visit_stats_d.get("last_notfound") ) elif msg_dict["snapshot"] is None: visit_stats_d["last_failed"] = max_date( msg_dict["date"], visit_stats_d.get("last_failed") ) else: # visit with snapshot, something happened if visit_stats_d["last_snapshot"] is None: # first time visit with snapshot, we keep relevant information visit_stats_d["last_eventful"] = msg_dict["date"] visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: # visit with snapshot already stored, last_eventful should already be # stored assert visit_stats_d["last_eventful"] is not None latest_recorded_visit_date = max_date( visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] ) current_status_date = msg_dict["date"] previous_snapshot = visit_stats_d["last_snapshot"] if msg_dict["snapshot"] != previous_snapshot: if ( latest_recorded_visit_date and current_status_date < latest_recorded_visit_date ): # out of order message so ignored continue # new eventful visit (new snapshot) visit_stats_d["last_eventful"] = current_status_date visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: # same snapshot as before if ( latest_recorded_visit_date and current_status_date < latest_recorded_visit_date ): # we receive an old message which is an earlier "eventful" event # than what we had, we consider the last_eventful event as # actually an uneventful event. The true eventful message is the # current one visit_stats_d["last_uneventful"] = visit_stats_d[ "last_eventful" ] visit_stats_d["last_eventful"] = current_status_date else: # uneventful event visit_stats_d["last_uneventful"] = current_status_date scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() ) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py index 27f6ab1..3fa91be 100644 --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -1,587 +1,596 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools from itertools import permutations import pytest from swh.model.hashutil import hash_to_bytes from swh.scheduler.journal_client import max_date, process_journal_objects from swh.scheduler.model import ListedOrigin, OriginVisitStats from swh.scheduler.utils import utcnow def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler): process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) with pytest.raises(AssertionError, match="Got unexpected origin_visit"): process_fn({"origin_visit": [{"url": "http://foobar.baz"},]}) with pytest.raises(AssertionError, match="Expected origin_visit_status"): process_fn({}) ONE_DAY = datetime.timedelta(days=1) DATE3 = utcnow() DATE2 = DATE3 - ONE_DAY DATE1 = DATE2 - ONE_DAY assert DATE1 < DATE2 < DATE3 @pytest.mark.parametrize( - "d1,d2,expected_max_date", + "dates,expected_max_date", [ - (None, DATE2, DATE2), - (DATE1, None, DATE1), - (DATE1, DATE2, DATE2), - (DATE2, DATE1, DATE2), + ((DATE1,), DATE1), + ((None, DATE2), DATE2), + ((DATE1, None), DATE1), + ((DATE1, DATE2), DATE2), + ((DATE2, DATE1), DATE2), + ((DATE1, DATE2, DATE3), DATE3), + ((None, DATE2, DATE3), DATE3), + ((None, None, DATE3), DATE3), + ((DATE1, None, DATE3), DATE3), ], ) -def test_max_date(d1, d2, expected_max_date): - assert max_date(d1, d2) == expected_max_date +def test_max_date(dates, expected_max_date): + assert max_date(*dates) == expected_max_date def test_max_date_raise(): + with pytest.raises(ValueError, match="valid datetime"): + max_date() + with pytest.raises(ValueError, match="valid datetime"): + max_date(None) with pytest.raises(ValueError, match="valid datetime"): max_date(None, None) def test_journal_client_origin_visit_status_from_journal_ignored_status(swh_scheduler): """Only final statuses (full, partial) are important, the rest remain ignored. """ visit_statuses = [ { "origin": "foo", "visit": 1, "status": "created", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 1, "status": "ongoing", "date": utcnow(), "type": "svn", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # Ensure those visit status are ignored for visit_status in visit_statuses: actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is None def test_journal_client_origin_visit_status_from_journal_last_notfound(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "not_found", "date": DATE1, "type": "git", "snapshot": None, } process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=None, last_uneventful=None, last_failed=None, last_notfound=visit_status["date"], last_snapshot=None, ) visit_statuses = [ { "origin": "foo", "visit": 4, "status": "not_found", "date": DATE3, "type": "git", "snapshot": None, }, { "origin": "foo", "visit": 3, "status": "not_found", "date": DATE2, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) for visit_status in visit_statuses: actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is not None assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=None, last_uneventful=None, last_failed=None, last_notfound=DATE3, last_snapshot=None, ) def test_journal_client_origin_visit_status_from_journal_last_failed(swh_scheduler): visit_statuses = [ { "origin": "foo", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 2, "status": "full", "date": DATE1, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # Ensure those visit status are ignored for visit_status in visit_statuses: actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is not None assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=None, last_uneventful=None, last_failed=visit_status["date"], last_notfound=None, last_snapshot=None, ) visit_statuses = [ { "origin": "bar", "visit": 3, "status": "full", "date": DATE3, "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 4, "status": "full", "date": DATE2, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) for visit_status in visit_statuses: actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is not None assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=None, last_uneventful=None, last_failed=DATE3, last_notfound=None, last_snapshot=None, ) def test_journal_client_origin_visit_status_from_journal_last_eventful(swh_scheduler): visit_statuses = [ { "origin": "bar", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "visit": 2, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("eeecc0710eb6cf9efd5b920a8453e1e07157bfff"), }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) for visit_status in visit_statuses: actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is not None assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=visit_status["date"], last_uneventful=None, last_failed=None, last_notfound=None, last_snapshot=visit_status["snapshot"], ) visit_statuses = [ { "origin": "foo", "visit": 4, "status": "full", "date": DATE3, "type": "git", "snapshot": hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), }, { "origin": "foo", "visit": 3, "status": "partial", "date": DATE2, "type": "git", "snapshot": hash_to_bytes("aaacc0710eb6cf9efd5b920a8453e1e07157baaa"), }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) for visit_status in visit_statuses: actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is not None assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=DATE3, last_uneventful=None, last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), ) def test_journal_client_origin_visit_status_from_journal_last_uneventful(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "full", "date": DATE3 + ONE_DAY, "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), } # Let's insert some visit stats with some previous visit information swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=DATE1, last_uneventful=DATE3, last_failed=DATE2, last_notfound=DATE1, last_snapshot=visit_status["snapshot"], ) ] ) process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is not None assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=DATE1, last_uneventful=visit_status["date"], # most recent date but uneventful last_failed=DATE2, last_notfound=DATE1, last_snapshot=visit_status["snapshot"], ) VISIT_STATUSES = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "foo", "type": "git", "visit": 1, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "type": "git", "visit": 2, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES, len(VISIT_STATUSES)) ) def test_journal_client_origin_visit_status_permutation0(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) expected_visit_stats = OriginVisitStats( url="foo", visit_type="git", last_eventful=DATE1 + ONE_DAY, last_uneventful=DATE1 + 3 * ONE_DAY, last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ) assert swh_scheduler.origin_visit_stats_get("foo", "git") == expected_visit_stats VISIT_STATUSES1 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "partial", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 3, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES1, len(VISIT_STATUSES1)) ) def test_journal_client_origin_visit_status_permutation1(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) expected_visit_stats = OriginVisitStats( url="cavabarder", visit_type="hg", last_eventful=DATE1 + 2 * ONE_DAY, last_uneventful=DATE1 + 3 * ONE_DAY, last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), ) assert ( swh_scheduler.origin_visit_stats_get("cavabarder", "hg") == expected_visit_stats ) VISIT_STATUSES_2 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("0000000000000000000000000000000000000000"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("1111111111111111111111111111111111111111"), }, { "origin": "iciaussi", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("2222222222222222222222222222222222222222"), }, { "origin": "iciaussi", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("3333333333333333333333333333333333333333"), }, { "origin": "cavabarder", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("4444444444444444444444444444444444444444"), }, { "origin": "cavabarder", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("5555555555555555555555555555555555555555"), }, { "origin": "iciaussi", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("6666666666666666666666666666666666666666"), }, { "origin": "iciaussi", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("7777777777777777777777777777777777777777"), }, ] ) ] def test_journal_client_origin_visit_status_after_grab_next_visits( swh_scheduler, stored_lister ): """Ensure OriginVisitStat entries created in the db as a result of calling grab_next_visits() do not mess the OriginVisitStats upsert mechanism. """ listed_origins = [ ListedOrigin(lister_id=stored_lister.id, url=url, visit_type=visit_type) for (url, visit_type) in set((v["origin"], v["type"]) for v in VISIT_STATUSES_2) ] swh_scheduler.record_listed_origins(listed_origins) before = utcnow() swh_scheduler.grab_next_visits( visit_type="git", count=10, policy="oldest_scheduled_first" ) after = utcnow() assert swh_scheduler.origin_visit_stats_get("cavabarder", "hg") is None assert swh_scheduler.origin_visit_stats_get("cavabarder", "git") is not None process_journal_objects( {"origin_visit_status": VISIT_STATUSES_2}, scheduler=swh_scheduler ) for url in ("cavabarder", "iciaussi"): ovs = swh_scheduler.origin_visit_stats_get(url, "git") assert before <= ovs.last_scheduled <= after ovs = swh_scheduler.origin_visit_stats_get(url, "hg") assert ovs.last_scheduled is None ovs = swh_scheduler.origin_visit_stats_get("cavabarder", "git") assert ovs.last_eventful == DATE1 + 5 * ONE_DAY assert ovs.last_uneventful is None assert ovs.last_failed is None assert ovs.last_notfound is None assert ovs.last_snapshot == hash_to_bytes( "5555555555555555555555555555555555555555" )