diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py index f4be7c6..3c272cf 100644 --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -1,181 +1,257 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from datetime import datetime +from datetime import datetime, timedelta +import random from typing import Dict, List, Optional, Tuple import attr from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import OriginVisitStats +from swh.scheduler.utils import utcnow msg_type = "origin_visit_status" def max_date(*dates: Optional[datetime]) -> datetime: """Return the max date of given (possibly None) dates At least one date must be not None. """ filtered_dates = [d for d in dates if d is not None] if not filtered_dates: raise ValueError("At least one date should be a valid datetime") return max(filtered_dates) def update_next_position_offset(visit_stats: Dict, increment: int) -> None: """Update the next position offset according to existing value and the increment. The resulting value must be a positive integer. """ visit_stats["next_position_offset"] = max( 0, visit_stats["next_position_offset"] + increment ) +def from_position_offset_to_days(position_offset: int) -> int: + """Compute position offset to interval in days. + + - index 0 and 1: interval 1 day + - index 2, 3 and 4: interval 2 days + - index 5 and up: interval `4^(n-4)` days for n in (4, 16, 64, 256, 1024, ...) + + Args: + position_offset: The actual position offset for a given visit stats + + Returns: + The offset as an interval in number of days + + """ + assert position_offset >= 0 + if position_offset < 2: + result = 1 + elif position_offset < 5: + result = 2 + else: + result = 4 ** (position_offset - 4) + return result + + +def next_visit_queue_position( + queue_position_per_visit_type: Dict, visit_stats: Dict +) -> datetime: + """Compute the next visit queue position for the given visit_stats. + + This takes the visit_stats next_position_offset value and compute a corresponding + interval in days (with a random fudge factor of -/+ 10% range to avoid scheduling + burst for hosters). Then computes out of this visit interval and the current visit + stats's position in the queue a new position. + + As an implementation detail, if the visit stats does not have a queue position yet, + this fallbacks to use the current global position (for the same visit type as the + visit stats) to compute the new position in the queue. If there is no global state + yet for the visit type, this starts up using the ``utcnow`` function as default + value. + + Args: + queue_position_per_visit_type: The global state of the queue per visit type + visit_stats: The actual visit information to compute the next position for + + Returns: + The actual next visit queue position for that visit stats + + """ + days = from_position_offset_to_days(visit_stats["next_position_offset"]) + random_fudge_factor = random.uniform(-0.1, 0.1) + visit_interval = timedelta(days=days * (1 + random_fudge_factor)) + # Use the current queue position per visit type as starting position if none is + # already set + default_queue_position = queue_position_per_visit_type.get( + visit_stats["visit_type"], utcnow() + ) + current_position = ( + visit_stats["next_visit_queue_position"] + if visit_stats.get("next_visit_queue_position") + else default_queue_position + ) + return current_position + visit_interval + + def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: """Read messages from origin_visit_status journal topic to update "origin_visit_stats" information on (origin, visit_type). The goal is to compute visit stats information per origin and visit_type: last_eventful, last_uneventful, last_failed, last_notfound, last_snapshot, ... Details: - This journal consumes origin visit status information for final visit status ("full", "partial", "failed", "not_found"). It drops the information on non final visit statuses ("ongoing", "created"). - The snapshot is used to determine the "eventful/uneventful" nature of the origin visit status. - When no snapshot is provided, the visit is considered as failed so the last_failed column is updated. - As there is no time guarantee when reading message from the topic, the code tries to keep the data in the most timely ordered as possible. - Compared to what is already stored in the origin_visit_stats table, only most recent information is kept. - This updates the `next_visit_queue_position` (time at which some new objects are expected to be added for the origin), and `next_position_offset` (duration that we expect to wait between visits of this origin). This is a worker function to be used with `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`. """ assert set(messages) <= { msg_type }, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" assert msg_type in messages, f"Expected {msg_type} messages" interesting_messages = [ msg for msg in messages[msg_type] if "type" in msg and msg["status"] not in ("created", "ongoing") ] if not interesting_messages: return origin_visit_stats: Dict[Tuple[str, str], Dict] = { (visit_stats.url, visit_stats.visit_type): attr.asdict(visit_stats) for visit_stats in scheduler.origin_visit_stats_get( list(set((vs["origin"], vs["type"]) for vs in interesting_messages)) ) } # Use the default values from the model object empty_object = { field.name: field.default if field.default != attr.NOTHING else None for field in attr.fields(OriginVisitStats) } + # Retrieve the global queue state + queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get() + for msg_dict in interesting_messages: origin = msg_dict["origin"] visit_type = msg_dict["type"] pk = origin, visit_type if pk not in origin_visit_stats: origin_visit_stats[pk] = { **empty_object, "url": origin, "visit_type": visit_type, } visit_stats_d = origin_visit_stats[pk] if msg_dict["status"] == "not_found": visit_stats_d["last_notfound"] = max_date( msg_dict["date"], visit_stats_d.get("last_notfound") ) update_next_position_offset(visit_stats_d, 1) # visit less often elif msg_dict["status"] == "failed" or msg_dict["snapshot"] is None: visit_stats_d["last_failed"] = max_date( msg_dict["date"], visit_stats_d.get("last_failed") ) update_next_position_offset(visit_stats_d, 1) # visit less often else: # visit with snapshot, something happened if visit_stats_d["last_snapshot"] is None: # first time visit with snapshot, we keep relevant information visit_stats_d["last_eventful"] = msg_dict["date"] visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: # last_snapshot is set, so an eventful visit should have previously been # recorded assert visit_stats_d["last_eventful"] is not None latest_recorded_visit_date = max_date( visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] ) current_status_date = msg_dict["date"] previous_snapshot = visit_stats_d["last_snapshot"] if msg_dict["snapshot"] != previous_snapshot: if ( latest_recorded_visit_date and current_status_date < latest_recorded_visit_date ): # out of order message so ignored continue # new eventful visit (new snapshot) visit_stats_d["last_eventful"] = current_status_date visit_stats_d["last_snapshot"] = msg_dict["snapshot"] # Visit this origin more often in the future update_next_position_offset(visit_stats_d, -2) else: # same snapshot as before if ( latest_recorded_visit_date and current_status_date < latest_recorded_visit_date ): # we receive an old message which is an earlier "eventful" event # than what we had, we consider the last_eventful event as # actually an uneventful event. # The last uneventful visit remains the most recent: # max, previously computed visit_stats_d["last_uneventful"] = latest_recorded_visit_date # The eventful visit remains the oldest one: min visit_stats_d["last_eventful"] = min( visit_stats_d["last_eventful"], current_status_date ) # Visit this origin less often in the future update_next_position_offset(visit_stats_d, 1) elif ( latest_recorded_visit_date and current_status_date == latest_recorded_visit_date ): # A duplicated message must be ignored to avoid # populating the last_uneventful message continue else: # uneventful event visit_stats_d["last_uneventful"] = current_status_date # Visit this origin less often in the future update_next_position_offset(visit_stats_d, 1) + # Update the next visit queue position (which will be used solely for origin + # without any last_update, cf. the dedicated scheduling policy + # "origins_without_last_update") + visit_stats_d["next_visit_queue_position"] = next_visit_queue_position( + queue_position_per_visit_type, visit_stats_d + ) + scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() ) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py index c1ca61d..29d2d49 100644 --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -1,766 +1,909 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime +from datetime import timedelta import functools from itertools import permutations from unittest.mock import Mock +import attr import pytest from swh.model.hashutil import hash_to_bytes -from swh.scheduler.journal_client import max_date, process_journal_objects +from swh.scheduler.journal_client import ( + from_position_offset_to_days, + max_date, + next_visit_queue_position, + process_journal_objects, +) from swh.scheduler.model import ListedOrigin, OriginVisitStats from swh.scheduler.utils import utcnow def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler): process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) with pytest.raises(AssertionError, match="Got unexpected origin_visit"): process_fn({"origin_visit": [{"url": "http://foobar.baz"},]}) with pytest.raises(AssertionError, match="Expected origin_visit_status"): process_fn({}) ONE_DAY = datetime.timedelta(days=1) ONE_YEAR = datetime.timedelta(days=366) DATE3 = utcnow() DATE2 = DATE3 - ONE_DAY DATE1 = DATE2 - ONE_DAY assert DATE1 < DATE2 < DATE3 @pytest.mark.parametrize( "dates,expected_max_date", [ ((DATE1,), DATE1), ((None, DATE2), DATE2), ((DATE1, None), DATE1), ((DATE1, DATE2), DATE2), ((DATE2, DATE1), DATE2), ((DATE1, DATE2, DATE3), DATE3), ((None, DATE2, DATE3), DATE3), ((None, None, DATE3), DATE3), ((DATE1, None, DATE3), DATE3), ], ) def test_max_date(dates, expected_max_date): assert max_date(*dates) == expected_max_date def test_max_date_raise(): with pytest.raises(ValueError, match="valid datetime"): max_date() with pytest.raises(ValueError, match="valid datetime"): max_date(None) with pytest.raises(ValueError, match="valid datetime"): max_date(None, None) def test_journal_client_origin_visit_status_from_journal_ignored_status(swh_scheduler): """Only final statuses (full, partial) are important, the rest remain ignored. """ # Trace method calls on the swh_scheduler swh_scheduler = Mock(wraps=swh_scheduler) visit_statuses = [ { "origin": "foo", "visit": 1, "status": "created", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 1, "status": "ongoing", "date": utcnow(), "type": "svn", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # All messages have been ignored: no stats have been upserted swh_scheduler.origin_visit_stats_upsert.assert_not_called() def test_journal_client_ignore_missing_type(swh_scheduler): """Ignore statuses with missing type key""" # Trace method calls on the swh_scheduler swh_scheduler = Mock(wraps=swh_scheduler) date = utcnow() snapshot = hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd") visit_statuses = [ { "origin": "foo", "visit": 1, "status": "full", "date": date, "snapshot": snapshot, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # The message has been ignored: no stats have been upserted swh_scheduler.origin_visit_stats_upsert.assert_not_called() +def assert_visit_stats_ok(actual_visit_stats, expected_visit_stats): + """Utility test function to ensure visits stats read from the backend are in the right + shape. The comparison on the next_visit_queue_position will be dealt with in + dedicated tests so it's not tested in tests that are calling this function. + + """ + assert len(actual_visit_stats) == len(expected_visit_stats) + + for visit_stats in actual_visit_stats: + visit_stats = attr.evolve(visit_stats, next_visit_queue_position=None) + + assert visit_stats in expected_visit_stats + + def test_journal_client_origin_visit_status_from_journal_last_notfound(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "not_found", "date": DATE1, "type": "git", "snapshot": None, } process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) - assert actual_origin_visit_stats == [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=None, - last_notfound=visit_status["date"], - last_snapshot=None, - next_visit_queue_position=None, - next_position_offset=5, - ) - ] + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=None, + last_notfound=visit_status["date"], + last_snapshot=None, + next_position_offset=5, + ) + ], + ) visit_statuses = [ { "origin": "foo", "visit": 3, "status": "not_found", "date": DATE2, "type": "git", "snapshot": None, }, { "origin": "foo", "visit": 4, "status": "not_found", "date": DATE3, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) - assert actual_origin_visit_stats == [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=None, - last_notfound=DATE3, - last_snapshot=None, - next_visit_queue_position=None, - next_position_offset=7, - ) - ] + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=None, + last_notfound=DATE3, + last_snapshot=None, + next_position_offset=7, + ) + ], + ) def test_journal_client_origin_visit_status_from_journal_last_failed(swh_scheduler): visit_statuses = [ { "origin": "foo", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 2, "status": "full", "date": DATE2, "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 3, "status": "full", "date": DATE3, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) - assert actual_origin_visit_stats == [ - OriginVisitStats( - url="bar", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=DATE3, - last_notfound=None, - last_snapshot=None, - next_visit_queue_position=None, - next_position_offset=7, - ) - ] + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="bar", + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=DATE3, + last_notfound=None, + last_snapshot=None, + next_position_offset=7, + ) + ], + ) def test_journal_client_origin_visit_status_from_journal_last_failed2(swh_scheduler): visit_statuses = [ { "origin": "bar", "visit": 2, "status": "failed", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "bar", "visit": 3, "status": "failed", "date": DATE2, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) - assert actual_origin_visit_stats == [ - OriginVisitStats( - url="bar", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=DATE2, - last_notfound=None, - last_snapshot=None, - next_visit_queue_position=None, - next_position_offset=6, - ) - ] + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="bar", + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=DATE2, + last_notfound=None, + last_snapshot=None, + next_position_offset=6, + ) + ], + ) def test_journal_client_origin_visit_status_from_journal_last_eventful(swh_scheduler): visit_statuses = [ { "origin": "bar", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("eeecc0710eb6cf9efd5b920a8453e1e07157bfff"), }, { "origin": "foo", "visit": 2, "status": "partial", "date": DATE2, "type": "git", "snapshot": hash_to_bytes("aaacc0710eb6cf9efd5b920a8453e1e07157baaa"), }, { "origin": "foo", "visit": 3, "status": "full", "date": DATE3, "type": "git", "snapshot": hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) - assert actual_origin_visit_stats == [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE3, - last_uneventful=None, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), - next_visit_queue_position=None, - next_position_offset=0, - ) - ] + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE3, + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), + next_position_offset=0, + ) + ], + ) def test_journal_client_origin_visit_status_from_journal_last_uneventful(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "full", "date": DATE3 + ONE_DAY, "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), } # Let's insert some visit stats with some previous visit information swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=DATE1, last_uneventful=DATE3, last_failed=DATE2, last_notfound=DATE1, last_snapshot=visit_status["snapshot"], next_visit_queue_position=None, next_position_offset=4, ) ] ) process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( [(visit_status["origin"], visit_status["type"])] ) - assert actual_origin_visit_stats == [ - OriginVisitStats( - url=visit_status["origin"], - visit_type=visit_status["type"], - last_eventful=DATE1, - last_uneventful=visit_status["date"], # most recent date but uneventful - last_failed=DATE2, - last_notfound=DATE1, - last_snapshot=visit_status["snapshot"], - next_visit_queue_position=None, - next_position_offset=5, # uneventful so visit less often - ) - ] + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=DATE1, + last_uneventful=visit_status["date"], # most recent date but uneventful + last_failed=DATE2, + last_notfound=DATE1, + last_snapshot=visit_status["snapshot"], + next_position_offset=5, # uneventful so visit less often + ) + ], + ) VISIT_STATUSES = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "foo", "type": "git", "visit": 1, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "type": "git", "visit": 2, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES, len(VISIT_STATUSES)) ) def test_journal_client_origin_visit_status_permutation0(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) - expected_visit_stats = OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1 + ONE_DAY, - last_uneventful=DATE1 + 3 * ONE_DAY, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), - next_visit_queue_position=None, - next_position_offset=5, # uneventful, visit origin less often in the future + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE1 + ONE_DAY, + last_uneventful=DATE1 + 3 * ONE_DAY, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + next_position_offset=5, # uneventful, visit origin less often in future + ) + ], ) - assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ - expected_visit_stats - ] - VISIT_STATUSES_1 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "partial", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 3, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES_1, len(VISIT_STATUSES_1)) ) def test_journal_client_origin_visit_status_permutation1(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) (actual_visit_stats,) = swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) assert actual_visit_stats.url == "cavabarder" assert actual_visit_stats.visit_type == "hg" assert actual_visit_stats.last_eventful == DATE1 + 2 * ONE_DAY assert actual_visit_stats.last_uneventful == DATE1 + 3 * ONE_DAY assert actual_visit_stats.last_failed is None assert actual_visit_stats.last_notfound is None assert actual_visit_stats.last_snapshot == hash_to_bytes( "aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd" ) VISIT_STATUSES_2 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("0000000000000000000000000000000000000000"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("1111111111111111111111111111111111111111"), }, { "origin": "iciaussi", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("2222222222222222222222222222222222222222"), }, { "origin": "iciaussi", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("3333333333333333333333333333333333333333"), }, { "origin": "cavabarder", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("4444444444444444444444444444444444444444"), }, { "origin": "cavabarder", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("5555555555555555555555555555555555555555"), }, { "origin": "iciaussi", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("6666666666666666666666666666666666666666"), }, { "origin": "iciaussi", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("7777777777777777777777777777777777777777"), }, ] ) ] def test_journal_client_origin_visit_status_after_grab_next_visits( swh_scheduler, stored_lister ): """Ensure OriginVisitStat entries created in the db as a result of calling grab_next_visits() do not mess the OriginVisitStats upsert mechanism. """ listed_origins = [ ListedOrigin(lister_id=stored_lister.id, url=url, visit_type=visit_type) for (url, visit_type) in set((v["origin"], v["type"]) for v in VISIT_STATUSES_2) ] swh_scheduler.record_listed_origins(listed_origins) before = utcnow() swh_scheduler.grab_next_visits( visit_type="git", count=10, policy="oldest_scheduled_first" ) after = utcnow() assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [] assert swh_scheduler.origin_visit_stats_get([("cavabarder", "git")])[0] is not None process_journal_objects( {"origin_visit_status": VISIT_STATUSES_2}, scheduler=swh_scheduler ) for url in ("cavabarder", "iciaussi"): ovs = swh_scheduler.origin_visit_stats_get([(url, "git")])[0] assert before <= ovs.last_scheduled <= after ovs = swh_scheduler.origin_visit_stats_get([(url, "hg")])[0] assert ovs.last_scheduled is None ovs = swh_scheduler.origin_visit_stats_get([("cavabarder", "git")])[0] assert ovs.last_eventful == DATE1 + 5 * ONE_DAY assert ovs.last_uneventful is None assert ovs.last_failed is None assert ovs.last_notfound is None assert ovs.last_snapshot == hash_to_bytes( "5555555555555555555555555555555555555555" ) def test_journal_client_origin_visit_status_duplicated_messages(swh_scheduler): """A duplicated message must be ignored """ visit_status = { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) - expected_visit_stats = OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1, - last_uneventful=None, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE1, + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + ) + ], ) - assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ - expected_visit_stats - ] - def test_journal_client_origin_visit_status_several_upsert(swh_scheduler): """An old message updates old information """ visit_status1 = { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } visit_status2 = { "origin": "foo", "visit": 1, "status": "full", "date": DATE2, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } process_journal_objects( {"origin_visit_status": [visit_status2]}, scheduler=swh_scheduler ) process_journal_objects( {"origin_visit_status": [visit_status1]}, scheduler=swh_scheduler ) - assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1, - last_uneventful=DATE2, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - next_visit_queue_position=None, - next_position_offset=5, - ) - ] + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE1, + last_uneventful=DATE2, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_position_offset=5, + ) + ], + ) VISIT_STATUSES_SAME_SNAPSHOT = [ {**ovs, "date": DATE1 + n * ONE_YEAR} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 3, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES_SAME_SNAPSHOT, len(VISIT_STATUSES_SAME_SNAPSHOT)), ) def test_journal_client_origin_visit_statuses_same_snapshot_permutation( visit_statuses, swh_scheduler ): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) - assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [ - OriginVisitStats( - url="cavabarder", - visit_type="hg", - last_eventful=DATE1, - last_uneventful=DATE1 + 2 * ONE_YEAR, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - next_visit_queue_position=None, - next_position_offset=6, # 2 uneventful visits, whatever the permutation - ) - ] + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + [("cavabarder", "hg")] + ) + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="cavabarder", + visit_type="hg", + last_eventful=DATE1, + last_uneventful=DATE1 + 2 * ONE_YEAR, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_position_offset=6, # 2 uneventful visits, whatever the permutation + ) + ], + ) + + +@pytest.mark.parametrize( + "position_offset, interval", + [ + (0, 1), + (1, 1), + (2, 2), + (3, 2), + (4, 2), + (5, 4), + (6, 16), + (7, 64), + (8, 256), + (9, 1024), + (10, 4096), + ], +) +def test_journal_client_from_position_offset_to_days(position_offset, interval): + assert from_position_offset_to_days(position_offset) == interval + + +def test_journal_client_from_position_offset_to_days_only_positive_input(): + with pytest.raises(AssertionError): + from_position_offset_to_days(-1) + + +@pytest.mark.parametrize( + "fudge_factor,next_position_offset", [(0.01, 1), (-0.01, 5), (0.1, 8), (-0.1, 10),] +) +def test_next_visit_queue_position(mocker, fudge_factor, next_position_offset): + mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") + mock_random.return_value = fudge_factor + + date_now = utcnow() + + mock_now = mocker.patch("swh.scheduler.journal_client.utcnow") + mock_now.return_value = date_now + + actual_position = next_visit_queue_position( + {}, {"next_position_offset": next_position_offset, "visit_type": "svn",} + ) + + assert actual_position == date_now + timedelta( + days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) + ) + + assert mock_now.called + assert mock_random.called + + +@pytest.mark.parametrize( + "fudge_factor,next_position_offset", [(0.02, 2), (-0.02, 3), (0, 7), (-0.09, 9),] +) +def test_next_visit_queue_position_with_state( + mocker, fudge_factor, next_position_offset +): + mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") + mock_random.return_value = fudge_factor + + date_now = utcnow() + + actual_position = next_visit_queue_position( + {"git": date_now}, + {"next_position_offset": next_position_offset, "visit_type": "git",}, + ) + + assert actual_position == date_now + timedelta( + days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) + ) + + assert mock_random.called + + +@pytest.mark.parametrize( + "fudge_factor,next_position_offset", [(0.03, 3), (-0.03, 4), (0.08, 7), (-0.08, 9),] +) +def test_next_visit_queue_position_with_next_visit_queue( + mocker, fudge_factor, next_position_offset +): + mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") + mock_random.return_value = fudge_factor + + date_now = utcnow() + + actual_position = next_visit_queue_position( + {}, + { + "next_position_offset": next_position_offset, + "visit_type": "hg", + "next_visit_queue_position": date_now, + }, + ) + + assert actual_position == date_now + timedelta( + days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) + ) + + assert mock_random.called