diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -3,13 +3,15 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from datetime import datetime +from datetime import datetime, timedelta +import random from typing import Dict, List, Optional, Tuple import attr from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import OriginVisitStats +from swh.scheduler.utils import utcnow msg_type = "origin_visit_status" @@ -36,6 +38,70 @@ ) +def from_position_offset_to_days(position_offset: int) -> int: + """Compute position offset to interval in days. + + - index 0 and 1: interval 1 day + - index 2, 3 and 4: interval 2 days + - index 5 and up: interval `4^(n-4)` days for n in (4, 16, 64, 256, 1024, ...) + + Args: + position_offset: The actual position offset for a given visit stats + + Returns: + The offset as an interval in number of days + + """ + assert position_offset >= 0 + if position_offset < 2: + result = 1 + elif position_offset < 5: + result = 2 + else: + result = 4 ** (position_offset - 4) + return result + + +def next_visit_queue_position( + queue_position_per_visit_type: Dict, visit_stats: Dict +) -> datetime: + """Compute the next visit queue position for the given visit_stats. + + This takes the visit_stats next_position_offset value and compute a corresponding + interval in days (with a random fudge factor of -/+ 10% range to avoid scheduling + burst for hosters). Then computes out of this visit interval and the current visit + stats's position in the queue a new position. + + As an implementation detail, if the visit stats does not have a queue position yet, + this fallbacks to use the current global position (for the same visit type as the + visit stats) to compute the new position in the queue. If there is no global state + yet for the visit type, this starts up using the ``utcnow`` function as default + value. + + Args: + queue_position_per_visit_type: The global state of the queue per visit type + visit_stats: The actual visit information to compute the next position for + + Returns: + The actual next visit queue position for that visit stats + + """ + days = from_position_offset_to_days(visit_stats["next_position_offset"]) + random_fudge_factor = random.uniform(-0.1, 0.1) + visit_interval = timedelta(days=days * (1 + random_fudge_factor)) + # Use the current queue position per visit type as starting position if none is + # already set + default_queue_position = queue_position_per_visit_type.get( + visit_stats["visit_type"], utcnow() + ) + current_position = ( + visit_stats["next_visit_queue_position"] + if visit_stats.get("next_visit_queue_position") + else default_queue_position + ) + return current_position + visit_interval + + def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: @@ -96,6 +162,9 @@ for field in attr.fields(OriginVisitStats) } + # Retrieve the global queue state + queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get() + for msg_dict in interesting_messages: origin = msg_dict["origin"] visit_type = msg_dict["type"] @@ -176,6 +245,13 @@ # Visit this origin less often in the future update_next_position_offset(visit_stats_d, 1) + # Update the next visit queue position (which will be used solely for origin + # without any last_update, cf. the dedicated scheduling policy + # "origins_without_last_update") + visit_stats_d["next_visit_queue_position"] = next_visit_queue_position( + queue_position_per_visit_type, visit_stats_d + ) + scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() ) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -4,14 +4,21 @@ # See top-level LICENSE file for more information import datetime +from datetime import timedelta import functools from itertools import permutations from unittest.mock import Mock +import attr import pytest from swh.model.hashutil import hash_to_bytes -from swh.scheduler.journal_client import max_date, process_journal_objects +from swh.scheduler.journal_client import ( + from_position_offset_to_days, + max_date, + next_visit_queue_position, + process_journal_objects, +) from swh.scheduler.model import ListedOrigin, OriginVisitStats from swh.scheduler.utils import utcnow @@ -123,6 +130,20 @@ swh_scheduler.origin_visit_stats_upsert.assert_not_called() +def assert_visit_stats_ok(actual_visit_stats, expected_visit_stats): + """Utility test function to ensure visits stats read from the backend are in the right + shape. The comparison on the next_visit_queue_position will be dealt with in + dedicated tests so it's not tested in tests that are calling this function. + + """ + assert len(actual_visit_stats) == len(expected_visit_stats) + + for visit_stats in actual_visit_stats: + visit_stats = attr.evolve(visit_stats, next_visit_queue_position=None) + + assert visit_stats in expected_visit_stats + + def test_journal_client_origin_visit_status_from_journal_last_notfound(swh_scheduler): visit_status = { "origin": "foo", @@ -138,19 +159,21 @@ ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) - assert actual_origin_visit_stats == [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=None, - last_notfound=visit_status["date"], - last_snapshot=None, - next_visit_queue_position=None, - next_position_offset=5, - ) - ] + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=None, + last_notfound=visit_status["date"], + last_snapshot=None, + next_position_offset=5, + ) + ], + ) visit_statuses = [ { @@ -176,19 +199,21 @@ ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) - assert actual_origin_visit_stats == [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=None, - last_notfound=DATE3, - last_snapshot=None, - next_visit_queue_position=None, - next_position_offset=7, - ) - ] + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=None, + last_notfound=DATE3, + last_snapshot=None, + next_position_offset=7, + ) + ], + ) def test_journal_client_origin_visit_status_from_journal_last_failed(swh_scheduler): @@ -232,19 +257,21 @@ ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) - assert actual_origin_visit_stats == [ - OriginVisitStats( - url="bar", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=DATE3, - last_notfound=None, - last_snapshot=None, - next_visit_queue_position=None, - next_position_offset=7, - ) - ] + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="bar", + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=DATE3, + last_notfound=None, + last_snapshot=None, + next_position_offset=7, + ) + ], + ) def test_journal_client_origin_visit_status_from_journal_last_failed2(swh_scheduler): @@ -272,19 +299,21 @@ ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) - assert actual_origin_visit_stats == [ - OriginVisitStats( - url="bar", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=DATE2, - last_notfound=None, - last_snapshot=None, - next_visit_queue_position=None, - next_position_offset=6, - ) - ] + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="bar", + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=DATE2, + last_notfound=None, + last_snapshot=None, + next_position_offset=6, + ) + ], + ) def test_journal_client_origin_visit_status_from_journal_last_eventful(swh_scheduler): @@ -328,19 +357,21 @@ ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) - assert actual_origin_visit_stats == [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE3, - last_uneventful=None, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), - next_visit_queue_position=None, - next_position_offset=0, - ) - ] + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE3, + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), + next_position_offset=0, + ) + ], + ) def test_journal_client_origin_visit_status_from_journal_last_uneventful(swh_scheduler): @@ -377,19 +408,21 @@ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( [(visit_status["origin"], visit_status["type"])] ) - assert actual_origin_visit_stats == [ - OriginVisitStats( - url=visit_status["origin"], - visit_type=visit_status["type"], - last_eventful=DATE1, - last_uneventful=visit_status["date"], # most recent date but uneventful - last_failed=DATE2, - last_notfound=DATE1, - last_snapshot=visit_status["snapshot"], - next_visit_queue_position=None, - next_position_offset=5, # uneventful so visit less often - ) - ] + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=DATE1, + last_uneventful=visit_status["date"], # most recent date but uneventful + last_failed=DATE2, + last_notfound=DATE1, + last_snapshot=visit_status["snapshot"], + next_position_offset=5, # uneventful so visit less often + ) + ], + ) VISIT_STATUSES = [ @@ -440,22 +473,23 @@ {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) - expected_visit_stats = OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1 + ONE_DAY, - last_uneventful=DATE1 + 3 * ONE_DAY, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), - next_visit_queue_position=None, - next_position_offset=5, # uneventful, visit origin less often in the future + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE1 + ONE_DAY, + last_uneventful=DATE1 + 3 * ONE_DAY, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + next_position_offset=5, # uneventful, visit origin less often in future + ) + ], ) - assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ - expected_visit_stats - ] - VISIT_STATUSES_1 = [ {**ovs, "date": DATE1 + n * ONE_DAY} @@ -647,20 +681,22 @@ {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) - expected_visit_stats = OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1, - last_uneventful=None, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE1, + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + ) + ], ) - assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ - expected_visit_stats - ] - def test_journal_client_origin_visit_status_several_upsert(swh_scheduler): """An old message updates old information @@ -692,19 +728,22 @@ {"origin_visit_status": [visit_status1]}, scheduler=swh_scheduler ) - assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1, - last_uneventful=DATE2, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - next_visit_queue_position=None, - next_position_offset=5, - ) - ] + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE1, + last_uneventful=DATE2, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_position_offset=5, + ) + ], + ) VISIT_STATUSES_SAME_SNAPSHOT = [ @@ -751,16 +790,120 @@ {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) - assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [ - OriginVisitStats( - url="cavabarder", - visit_type="hg", - last_eventful=DATE1, - last_uneventful=DATE1 + 2 * ONE_YEAR, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - next_visit_queue_position=None, - next_position_offset=6, # 2 uneventful visits, whatever the permutation - ) - ] + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + [("cavabarder", "hg")] + ) + assert_visit_stats_ok( + actual_origin_visit_stats, + [ + OriginVisitStats( + url="cavabarder", + visit_type="hg", + last_eventful=DATE1, + last_uneventful=DATE1 + 2 * ONE_YEAR, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_position_offset=6, # 2 uneventful visits, whatever the permutation + ) + ], + ) + + +@pytest.mark.parametrize( + "position_offset, interval", + [ + (0, 1), + (1, 1), + (2, 2), + (3, 2), + (4, 2), + (5, 4), + (6, 16), + (7, 64), + (8, 256), + (9, 1024), + (10, 4096), + ], +) +def test_journal_client_from_position_offset_to_days(position_offset, interval): + assert from_position_offset_to_days(position_offset) == interval + + +def test_journal_client_from_position_offset_to_days_only_positive_input(): + with pytest.raises(AssertionError): + from_position_offset_to_days(-1) + + +@pytest.mark.parametrize( + "fudge_factor,next_position_offset", [(0.01, 1), (-0.01, 5), (0.1, 8), (-0.1, 10),] +) +def test_next_visit_queue_position(mocker, fudge_factor, next_position_offset): + mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") + mock_random.return_value = fudge_factor + + date_now = utcnow() + + mock_now = mocker.patch("swh.scheduler.journal_client.utcnow") + mock_now.return_value = date_now + + actual_position = next_visit_queue_position( + {}, {"next_position_offset": next_position_offset, "visit_type": "svn",} + ) + + assert actual_position == date_now + timedelta( + days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) + ) + + assert mock_now.called + assert mock_random.called + + +@pytest.mark.parametrize( + "fudge_factor,next_position_offset", [(0.02, 2), (-0.02, 3), (0, 7), (-0.09, 9),] +) +def test_next_visit_queue_position_with_state( + mocker, fudge_factor, next_position_offset +): + mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") + mock_random.return_value = fudge_factor + + date_now = utcnow() + + actual_position = next_visit_queue_position( + {"git": date_now}, + {"next_position_offset": next_position_offset, "visit_type": "git",}, + ) + + assert actual_position == date_now + timedelta( + days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) + ) + + assert mock_random.called + + +@pytest.mark.parametrize( + "fudge_factor,next_position_offset", [(0.03, 3), (-0.03, 4), (0.08, 7), (-0.08, 9),] +) +def test_next_visit_queue_position_with_next_visit_queue( + mocker, fudge_factor, next_position_offset +): + mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") + mock_random.return_value = fudge_factor + + date_now = utcnow() + + actual_position = next_visit_queue_position( + {}, + { + "next_position_offset": next_position_offset, + "visit_type": "hg", + "next_visit_queue_position": date_now, + }, + ) + + assert actual_position == date_now + timedelta( + days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) + ) + + assert mock_random.called