diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -3,13 +3,15 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from datetime import datetime +from datetime import datetime, timedelta +import random from typing import Dict, List, Optional, Tuple import attr from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import OriginVisitStats +from swh.scheduler.utils import utcnow msg_type = "origin_visit_status" @@ -36,6 +38,24 @@ ) +def from_position_offset_to_days(position_offset: int) -> int: + """Compute from position offset to interval in days. + + - index 0 and 1: interval 1 day + - index 2, 3 and 4: interval 2 days + - index 5 and up: interval `4^(n-4)` days for n in (4, 16, 64, 256, 1024, ...) + + """ + assert position_offset >= 0 + if position_offset < 2: + result = 1 + elif position_offset < 5: + result = 2 + else: + result = 4 ** (position_offset - 4) + return result + + def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: @@ -96,6 +116,8 @@ for field in attr.fields(OriginVisitStats) } + queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get() + for msg_dict in interesting_messages: origin = msg_dict["origin"] visit_type = msg_dict["type"] @@ -176,6 +198,22 @@ # Visit this origin less often in the future update_next_position_offset(visit_stats_d, 1) + # We set the next visit target to its current value + the new visit interval + # multiplied by a random fudge factor (picked in the -/+ 10% range) to avoid + # scheduling burst + days = from_position_offset_to_days(visit_stats_d["next_position_offset"]) + random_fudge_factor = random.choice(range(-10, 11)) / 100 + visit_interval = timedelta(days=days * (1 + random_fudge_factor)) + # Use the current queue position per visit type as starting position if none is + # already set + default_queue_position = queue_position_per_visit_type.get(visit_type, utcnow()) + current_position = ( + visit_stats_d["next_visit_queue_position"] + if visit_stats_d.get("next_visit_queue_position") + else default_queue_position + ) + visit_stats_d["next_visit_queue_position"] = current_position + visit_interval + scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() ) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -11,7 +11,11 @@ import pytest from swh.model.hashutil import hash_to_bytes -from swh.scheduler.journal_client import max_date, process_journal_objects +from swh.scheduler.journal_client import ( + from_position_offset_to_days, + max_date, + process_journal_objects, +) from swh.scheduler.model import ListedOrigin, OriginVisitStats from swh.scheduler.utils import utcnow @@ -764,3 +768,28 @@ next_position_offset=6, # 2 uneventful visits, whatever the permutation ) ] + + +@pytest.mark.parametrize( + "position_offset, interval", + [ + (0, 1), + (1, 1), + (2, 2), + (3, 2), + (4, 2), + (5, 4), + (6, 16), + (7, 64), + (8, 256), + (9, 1024), + (10, 4096), + ], +) +def test_journal_client_from_position_offset_to_days(position_offset, interval): + assert from_position_offset_to_days(position_offset) == interval + + +def test_journal_client_from_position_offset_to_days_only_positive_input(): + with pytest.raises(AssertionError): + from_position_offset_to_days(-1)