Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/journal_client.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime | from datetime import datetime, timedelta | ||||
import random | |||||
from typing import Dict, List, Optional, Tuple | from typing import Dict, List, Optional, Tuple | ||||
import attr | import attr | ||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.scheduler.model import OriginVisitStats | from swh.scheduler.model import OriginVisitStats | ||||
from swh.scheduler.utils import utcnow | |||||
msg_type = "origin_visit_status" | msg_type = "origin_visit_status" | ||||
def max_date(*dates: Optional[datetime]) -> datetime: | def max_date(*dates: Optional[datetime]) -> datetime: | ||||
"""Return the max date of given (possibly None) dates | """Return the max date of given (possibly None) dates | ||||
At least one date must be not None. | At least one date must be not None. | ||||
Show All 10 Lines | def update_next_position_offset(visit_stats: Dict, increment: int) -> None: | ||||
resulting value must be a positive integer. | resulting value must be a positive integer. | ||||
""" | """ | ||||
visit_stats["next_position_offset"] = max( | visit_stats["next_position_offset"] = max( | ||||
0, visit_stats["next_position_offset"] + increment | 0, visit_stats["next_position_offset"] + increment | ||||
) | ) | ||||
def from_position_offset_to_days(position_offset: int) -> int: | |||||
"""Compute position offset to interval in days. | |||||
- index 0 and 1: interval 1 day | |||||
- index 2, 3 and 4: interval 2 days | |||||
- index 5 and up: interval `4^(n-4)` days for n in (4, 16, 64, 256, 1024, ...) | |||||
Args: | |||||
position_offset: The actual position offset for a given visit stats | |||||
Returns: | |||||
The offset as an interval in number of days | |||||
""" | |||||
assert position_offset >= 0 | |||||
if position_offset < 2: | |||||
result = 1 | |||||
elif position_offset < 5: | |||||
result = 2 | |||||
else: | |||||
result = 4 ** (position_offset - 4) | |||||
return result | |||||
def next_visit_queue_position( | |||||
queue_position_per_visit_type: Dict, visit_stats: Dict | |||||
) -> datetime: | |||||
"""Compute the next visit queue position for the given visit_stats. | |||||
This takes the visit_stats next_position_offset value and compute a corresponding | |||||
interval in days (with a random fudge factor of -/+ 10% range to avoid scheduling | |||||
burst for hosters). Then computes out of this visit interval and the current visit | |||||
stats's position in the queue a new position. | |||||
As an implementation detail, if the visit stats does not have a queue position yet, | |||||
this fallbacks to use the current global position (for the same visit type as the | |||||
visit stats) to compute the new position in the queue. If there is no global state | |||||
yet for the visit type, this starts up using the ``utcnow`` function as default | |||||
value. | |||||
Args: | |||||
queue_position_per_visit_type: The global state of the queue per visit type | |||||
visit_stats: The actual visit information to compute the next position for | |||||
Returns: | |||||
The actual next visit queue position for that visit stats | |||||
""" | |||||
days = from_position_offset_to_days(visit_stats["next_position_offset"]) | |||||
random_fudge_factor = random.uniform(-0.1, 0.1) | |||||
visit_interval = timedelta(days=days * (1 + random_fudge_factor)) | |||||
# Use the current queue position per visit type as starting position if none is | |||||
# already set | |||||
default_queue_position = queue_position_per_visit_type.get( | |||||
visit_stats["visit_type"], utcnow() | |||||
) | |||||
current_position = ( | |||||
visit_stats["next_visit_queue_position"] | |||||
if visit_stats.get("next_visit_queue_position") | |||||
else default_queue_position | |||||
) | |||||
return current_position + visit_interval | |||||
def process_journal_objects( | def process_journal_objects( | ||||
messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface | messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface | ||||
) -> None: | ) -> None: | ||||
"""Read messages from origin_visit_status journal topic to update "origin_visit_stats" | """Read messages from origin_visit_status journal topic to update "origin_visit_stats" | ||||
information on (origin, visit_type). The goal is to compute visit stats information | information on (origin, visit_type). The goal is to compute visit stats information | ||||
per origin and visit_type: last_eventful, last_uneventful, last_failed, | per origin and visit_type: last_eventful, last_uneventful, last_failed, | ||||
last_notfound, last_snapshot, ... | last_notfound, last_snapshot, ... | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | origin_visit_stats: Dict[Tuple[str, str], Dict] = { | ||||
) | ) | ||||
} | } | ||||
# Use the default values from the model object | # Use the default values from the model object | ||||
empty_object = { | empty_object = { | ||||
field.name: field.default if field.default != attr.NOTHING else None | field.name: field.default if field.default != attr.NOTHING else None | ||||
for field in attr.fields(OriginVisitStats) | for field in attr.fields(OriginVisitStats) | ||||
} | } | ||||
# Retrieve the global queue state | |||||
queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get() | |||||
for msg_dict in interesting_messages: | for msg_dict in interesting_messages: | ||||
origin = msg_dict["origin"] | origin = msg_dict["origin"] | ||||
visit_type = msg_dict["type"] | visit_type = msg_dict["type"] | ||||
pk = origin, visit_type | pk = origin, visit_type | ||||
if pk not in origin_visit_stats: | if pk not in origin_visit_stats: | ||||
origin_visit_stats[pk] = { | origin_visit_stats[pk] = { | ||||
**empty_object, | **empty_object, | ||||
"url": origin, | "url": origin, | ||||
▲ Show 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | for msg_dict in interesting_messages: | ||||
# populating the last_uneventful message | # populating the last_uneventful message | ||||
continue | continue | ||||
else: | else: | ||||
# uneventful event | # uneventful event | ||||
visit_stats_d["last_uneventful"] = current_status_date | visit_stats_d["last_uneventful"] = current_status_date | ||||
# Visit this origin less often in the future | # Visit this origin less often in the future | ||||
update_next_position_offset(visit_stats_d, 1) | update_next_position_offset(visit_stats_d, 1) | ||||
# Update the next visit queue position (which will be used solely for origin | |||||
# without any last_update, cf. the dedicated scheduling policy | |||||
# "origins_without_last_update") | |||||
visit_stats_d["next_visit_queue_position"] = next_visit_queue_position( | |||||
ardumont: use random.uniform! | |||||
Done Inline Actionsgreat stuff [1]: In [16]: while True: print(random.uniform(-0.1, 0.1)) ; sleep(0.1) -0.07802568520265081 -0.012047862685772293 -0.09515824425588425 -0.09383355154670565 -0.07567022279491248 -0.028994578286562933 0.032746260794542265 0.09695343564361572 0.03365570420242092 -0.03822556466045879 -0.08169951622895931 -0.09698416654743738 -0.0011827698844187884 -0.036335877650609 0.05447047809839983 -0.05956846622376569 0.04799241406841087 [1] https://docs.python.org/3/library/random.html#random.uniform ardumont: great stuff [1]:
```
In [16]: while True: print(random.uniform(-0.1, 0.1)) ; sleep(0.1)
-0. | |||||
queue_position_per_visit_type, visit_stats_d | |||||
Done Inline Actionsensure days is ok with float. ardumont: ensure days is ok with float. | |||||
Done Inline ActionsIt was meant in regards to timedelta. It is ok: In [1]: from datetime import datetime, timedelta In [2]: timedelta(days=0.1) Out[2]: datetime.timedelta(seconds=8640) In [3]: timedelta(days=1) Out[3]: datetime.timedelta(days=1) In [5]: timedelta(seconds=86400) Out[5]: datetime.timedelta(days=1) ardumont: It was meant in regards to timedelta.
It is ok:
```
In [1]: from datetime import datetime… | |||||
) | |||||
scheduler.origin_visit_stats_upsert( | scheduler.origin_visit_stats_upsert( | ||||
OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() | OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() | ||||
) | ) |
use random.uniform!