Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/journal_client.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||||||||||||||||||||||||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||||||||||||||||||||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||||||||||||||||||||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||||||||||||||||||||||||||||||
from datetime import datetime, timedelta | from datetime import datetime, timedelta | ||||||||||||||||||||||||||||||||||||
import random | import random | ||||||||||||||||||||||||||||||||||||
from typing import Dict, List, Optional, Tuple | from typing import Dict, List, Optional, Tuple | ||||||||||||||||||||||||||||||||||||
import attr | import attr | ||||||||||||||||||||||||||||||||||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||||||||||||||||||||||||||||||||||
from swh.scheduler.model import LastVisitStatus, OriginVisitStats | from swh.scheduler.model import LastVisitStatus, OriginVisitStats | ||||||||||||||||||||||||||||||||||||
from swh.scheduler.utils import utcnow | from swh.scheduler.utils import utcnow | ||||||||||||||||||||||||||||||||||||
msg_type = "origin_visit_status" | msg_type = "origin_visit_status" | ||||||||||||||||||||||||||||||||||||
vlorentz: docstring please | |||||||||||||||||||||||||||||||||||||
DISABLE_ORIGIN_THRESHOLD = 3 | |||||||||||||||||||||||||||||||||||||
"""Threshold to disable failing origins""" | |||||||||||||||||||||||||||||||||||||
def max_date(*dates: Optional[datetime]) -> datetime: | def max_date(*dates: Optional[datetime]) -> datetime: | ||||||||||||||||||||||||||||||||||||
"""Return the max date of given (possibly None) dates | """Return the max date of given (possibly None) dates | ||||||||||||||||||||||||||||||||||||
At least one date must be not None. | At least one date must be not None. | ||||||||||||||||||||||||||||||||||||
""" | """ | ||||||||||||||||||||||||||||||||||||
filtered_dates = [d for d in dates if d is not None] | filtered_dates = [d for d in dates if d is not None] | ||||||||||||||||||||||||||||||||||||
if not filtered_dates: | if not filtered_dates: | ||||||||||||||||||||||||||||||||||||
▲ Show 20 Lines • Show All 99 Lines • ▼ Show 20 Lines | if incoming_visit_status["snapshot"] != known_visit_stats.get("last_snapshot"): | ||||||||||||||||||||||||||||||||||||
return LastVisitStatus.successful, True | return LastVisitStatus.successful, True | ||||||||||||||||||||||||||||||||||||
return LastVisitStatus.successful, False | return LastVisitStatus.successful, False | ||||||||||||||||||||||||||||||||||||
def process_journal_objects( | def process_journal_objects( | ||||||||||||||||||||||||||||||||||||
messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface | messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface | ||||||||||||||||||||||||||||||||||||
) -> None: | ) -> None: | ||||||||||||||||||||||||||||||||||||
"""Read messages from origin_visit_status journal topic to update "origin_visit_stats" | f"""Read messages from origin_visit_status journal topic to update "origin_visit_stats" | ||||||||||||||||||||||||||||||||||||
information on (origin, visit_type). The goal is to compute visit stats information | information on (origin, visit_type). The goal is to compute visit stats information | ||||||||||||||||||||||||||||||||||||
per origin and visit_type: `last_successful`, `last_visit`, `last_visit_status`, ... | per origin and visit_type: `last_successful`, `last_visit`, `last_visit_status`, ... | ||||||||||||||||||||||||||||||||||||
Details: | Details: | ||||||||||||||||||||||||||||||||||||
- This journal consumes origin visit status information for final visit | - This journal consumes origin visit status information for final visit | ||||||||||||||||||||||||||||||||||||
status (`"full"`, `"partial"`, `"failed"`, `"not_found"`). It drops | status (`"full"`, `"partial"`, `"failed"`, `"not_found"`). It drops | ||||||||||||||||||||||||||||||||||||
the information of non final visit statuses (`"ongoing"`, | the information of non final visit statuses (`"ongoing"`, | ||||||||||||||||||||||||||||||||||||
`"created"`). | `"created"`). | ||||||||||||||||||||||||||||||||||||
- This journal client only considers messages that arrive in | - This journal client only considers messages that arrive in | ||||||||||||||||||||||||||||||||||||
chronological order. Messages that arrive out of order (i.e. with a | chronological order. Messages that arrive out of order (i.e. with a | ||||||||||||||||||||||||||||||||||||
date field smaller than the latest recorded visit of the origin) are | date field smaller than the latest recorded visit of the origin) are | ||||||||||||||||||||||||||||||||||||
ignored. This is a tradeoff between correctness and simplicity of | ignored. This is a tradeoff between correctness and simplicity of | ||||||||||||||||||||||||||||||||||||
implementation [1]_. | implementation [1]_. | ||||||||||||||||||||||||||||||||||||
- The snapshot is used to determine the eventful or uneventful nature of | - The snapshot is used to determine the eventful or uneventful nature of | ||||||||||||||||||||||||||||||||||||
the origin visit. | the origin visit. | ||||||||||||||||||||||||||||||||||||
- When no snapshot is provided, the visit is considered as failed. | - When no snapshot is provided, the visit is considered as failed. | ||||||||||||||||||||||||||||||||||||
- Finally, the `next_visit_queue_position` (time at which some new objects | - Finally, the `next_visit_queue_position` (time at which some new objects | ||||||||||||||||||||||||||||||||||||
are expected to be added for the origin), and `next_position_offset` (duration | are expected to be added for the origin), and `next_position_offset` (duration | ||||||||||||||||||||||||||||||||||||
that we expect to wait between visits of this origin) are updated. | that we expect to wait between visits of this origin) are updated. | ||||||||||||||||||||||||||||||||||||
- When visits fails at least {DISABLE_ORIGIN_THRESHOLD} times in a row, the | |||||||||||||||||||||||||||||||||||||
origins are disabled in the scheduler table. It's up to the lister to activate | |||||||||||||||||||||||||||||||||||||
those back when they are listed again. | |||||||||||||||||||||||||||||||||||||
This is a worker function to be used with `JournalClient.process(worker_fn)`, after | This is a worker function to be used with `JournalClient.process(worker_fn)`, after | ||||||||||||||||||||||||||||||||||||
Done Inline Actions
vlorentz: | |||||||||||||||||||||||||||||||||||||
currification of `scheduler` and `task_names`. | currification of `scheduler` and `task_names`. | ||||||||||||||||||||||||||||||||||||
.. [1] Ignoring out of order messages makes the initialization of the | .. [1] Ignoring out of order messages makes the initialization of the | ||||||||||||||||||||||||||||||||||||
origin_visit_status table (from a full journal) less deterministic: only the | origin_visit_status table (from a full journal) less deterministic: only the | ||||||||||||||||||||||||||||||||||||
`last_visit`, `last_visit_state` and `last_successful` fields are guaranteed | `last_visit`, `last_visit_state` and `last_successful` fields are guaranteed | ||||||||||||||||||||||||||||||||||||
to be exact, the `next_position_offset` field is a best effort estimate | to be exact, the `next_position_offset` field is a best effort estimate | ||||||||||||||||||||||||||||||||||||
(which should converge once the client has run for a while on in-order | (which should converge once the client has run for a while on in-order | ||||||||||||||||||||||||||||||||||||
messages). | messages). | ||||||||||||||||||||||||||||||||||||
Show All 20 Lines | origin_visit_stats: Dict[Tuple[str, str], Dict] = { | ||||||||||||||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||||||||||||||
} | } | ||||||||||||||||||||||||||||||||||||
# Use the default values from the model object | # Use the default values from the model object | ||||||||||||||||||||||||||||||||||||
empty_object = { | empty_object = { | ||||||||||||||||||||||||||||||||||||
field.name: field.default if field.default != attr.NOTHING else None | field.name: field.default if field.default != attr.NOTHING else None | ||||||||||||||||||||||||||||||||||||
for field in attr.fields(OriginVisitStats) | for field in attr.fields(OriginVisitStats) | ||||||||||||||||||||||||||||||||||||
} | } | ||||||||||||||||||||||||||||||||||||
disabled_urls: List[str] = [] | |||||||||||||||||||||||||||||||||||||
# Retrieve the global queue state | # Retrieve the global queue state | ||||||||||||||||||||||||||||||||||||
queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get() | queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get() | ||||||||||||||||||||||||||||||||||||
for msg_dict in interesting_messages: | for msg_dict in interesting_messages: | ||||||||||||||||||||||||||||||||||||
origin = msg_dict["origin"] | origin = msg_dict["origin"] | ||||||||||||||||||||||||||||||||||||
visit_type = msg_dict["type"] | visit_type = msg_dict["type"] | ||||||||||||||||||||||||||||||||||||
pk = origin, visit_type | pk = origin, visit_type | ||||||||||||||||||||||||||||||||||||
if pk not in origin_visit_stats: | if pk not in origin_visit_stats: | ||||||||||||||||||||||||||||||||||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | for msg_dict in interesting_messages: | ||||||||||||||||||||||||||||||||||||
# "origins_without_last_update") | # "origins_without_last_update") | ||||||||||||||||||||||||||||||||||||
visit_stats_d["next_visit_queue_position"] = next_visit_queue_position( | visit_stats_d["next_visit_queue_position"] = next_visit_queue_position( | ||||||||||||||||||||||||||||||||||||
queue_position_per_visit_type, visit_stats_d | queue_position_per_visit_type, visit_stats_d | ||||||||||||||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||||||||||||||
visit_stats_d["successive_visits"] = ( | visit_stats_d["successive_visits"] = ( | ||||||||||||||||||||||||||||||||||||
visit_stats_d["successive_visits"] + 1 if same_visit_status else 1 | visit_stats_d["successive_visits"] + 1 if same_visit_status else 1 | ||||||||||||||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||||||||||||||
Done Inline Actions
vlorentz: | |||||||||||||||||||||||||||||||||||||
# Disable recurring failing/not-found origins | |||||||||||||||||||||||||||||||||||||
if ( | |||||||||||||||||||||||||||||||||||||
visit_stats_d["last_visit_status"] | |||||||||||||||||||||||||||||||||||||
in [LastVisitStatus.not_found, LastVisitStatus.failed] | |||||||||||||||||||||||||||||||||||||
Done Inline Actionshardcoded constant! vlorentz: hardcoded constant! | |||||||||||||||||||||||||||||||||||||
) and visit_stats_d["successive_visits"] >= DISABLE_ORIGIN_THRESHOLD: | |||||||||||||||||||||||||||||||||||||
disabled_urls.append(visit_stats_d["url"]) | |||||||||||||||||||||||||||||||||||||
scheduler.origin_visit_stats_upsert( | scheduler.origin_visit_stats_upsert( | ||||||||||||||||||||||||||||||||||||
OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() | OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() | ||||||||||||||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||||||||||||||
# Disable any origins if any | |||||||||||||||||||||||||||||||||||||
if disabled_urls: | |||||||||||||||||||||||||||||||||||||
disabled_origins = [] | |||||||||||||||||||||||||||||||||||||
for url in disabled_urls: | |||||||||||||||||||||||||||||||||||||
origins = scheduler.get_listed_origins(url=url).results | |||||||||||||||||||||||||||||||||||||
if len(origins) > 0: | |||||||||||||||||||||||||||||||||||||
origin = attr.evolve(origins[0], enabled=False) | |||||||||||||||||||||||||||||||||||||
disabled_origins.append(origin) | |||||||||||||||||||||||||||||||||||||
if disabled_origins: | |||||||||||||||||||||||||||||||||||||
scheduler.record_listed_origins(disabled_origins) |
docstring please