Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/journal_client.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||||||||||||||||||||||||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||||||||||||||||||||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||||||||||||||||||||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||||||||||||||||||||||||||||||
from datetime import datetime, timedelta | from datetime import datetime, timedelta | ||||||||||||||||||||||||||||||||||||
import random | import random | ||||||||||||||||||||||||||||||||||||
from typing import Dict, List, Optional, Tuple | from typing import Dict, List, Optional, Tuple | ||||||||||||||||||||||||||||||||||||
import attr | import attr | ||||||||||||||||||||||||||||||||||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||||||||||||||||||||||||||||||||||
from swh.scheduler.model import OriginVisitStats | from swh.scheduler.model import OriginVisitStats | ||||||||||||||||||||||||||||||||||||
from swh.scheduler.utils import utcnow | from swh.scheduler.utils import utcnow | ||||||||||||||||||||||||||||||||||||
msg_type = "origin_visit_status" | msg_type = "origin_visit_status" | ||||||||||||||||||||||||||||||||||||
vlorentz: docstring please | |||||||||||||||||||||||||||||||||||||
INTERESTING_EVENTS = ("eventful", "uneventful", "failed", "notfound") | INTERESTING_EVENTS = ("eventful", "uneventful", "failed", "notfound") | ||||||||||||||||||||||||||||||||||||
def max_date(*dates: Optional[datetime]) -> datetime: | def max_date(*dates: Optional[datetime]) -> datetime: | ||||||||||||||||||||||||||||||||||||
"""Return the max date of given (possibly None) dates | """Return the max date of given (possibly None) dates | ||||||||||||||||||||||||||||||||||||
At least one date must be not None. | At least one date must be not None. | ||||||||||||||||||||||||||||||||||||
▲ Show 20 Lines • Show All 100 Lines • ▼ Show 20 Lines | else: | ||||||||||||||||||||||||||||||||||||
last_event = event_dates[maxdate] | last_event = event_dates[maxdate] | ||||||||||||||||||||||||||||||||||||
return last_event | return last_event | ||||||||||||||||||||||||||||||||||||
def process_journal_objects( | def process_journal_objects( | ||||||||||||||||||||||||||||||||||||
messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface | messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface | ||||||||||||||||||||||||||||||||||||
) -> None: | ) -> None: | ||||||||||||||||||||||||||||||||||||
"""Read messages from origin_visit_status journal topic to update "origin_visit_stats" | """Read messages from origin_visit_status journal topic to update "origin_visit_stats" | ||||||||||||||||||||||||||||||||||||
information on (origin, visit_type). The goal is to compute visit stats information | information on (origin, visit_type). The goal is to compute visit stats information | ||||||||||||||||||||||||||||||||||||
per origin and visit_type: last_eventful, last_uneventful, last_failed, | per origin and visit_type: last_eventful, last_uneventful, last_failed, | ||||||||||||||||||||||||||||||||||||
last_notfound, last_snapshot, ... | last_notfound, last_snapshot, ... | ||||||||||||||||||||||||||||||||||||
Details: | Details: | ||||||||||||||||||||||||||||||||||||
- This journal consumes origin visit status information for final visit status | - This journal consumes origin visit status information for final visit status | ||||||||||||||||||||||||||||||||||||
("full", "partial", "failed", "not_found"). It drops the information on non | ("full", "partial", "failed", "not_found"). It drops the information on non | ||||||||||||||||||||||||||||||||||||
final visit statuses ("ongoing", "created"). | final visit statuses ("ongoing", "created"). | ||||||||||||||||||||||||||||||||||||
- The snapshot is used to determine the "eventful/uneventful" nature of the | - The snapshot is used to determine the "eventful/uneventful" nature of the | ||||||||||||||||||||||||||||||||||||
origin visit status. | origin visit status. | ||||||||||||||||||||||||||||||||||||
- When no snapshot is provided, the visit is considered as failed so the | - When no snapshot is provided, the visit is considered as failed so the | ||||||||||||||||||||||||||||||||||||
last_failed column is updated. | last_failed column is updated. | ||||||||||||||||||||||||||||||||||||
- As there is no time guarantee when reading message from the topic, the code | - As there is no time guarantee when reading message from the topic, the code | ||||||||||||||||||||||||||||||||||||
tries to keep the data in the most timely ordered as possible. | tries to keep the data in the most timely ordered as possible. | ||||||||||||||||||||||||||||||||||||
- Compared to what is already stored in the origin_visit_stats table, only most | - Compared to what is already stored in the origin_visit_stats table, only most | ||||||||||||||||||||||||||||||||||||
recent information is kept. | recent information is kept. | ||||||||||||||||||||||||||||||||||||
- This updates the `next_visit_queue_position` (time at which some new objects | - This updates the `next_visit_queue_position` (time at which some new objects | ||||||||||||||||||||||||||||||||||||
are expected to be added for the origin), and `next_position_offset` (duration | are expected to be added for the origin), and `next_position_offset` (duration | ||||||||||||||||||||||||||||||||||||
that we expect to wait between visits of this origin). | that we expect to wait between visits of this origin). | ||||||||||||||||||||||||||||||||||||
This is a worker function to be used with `JournalClient.process(worker_fn)`, after | This is a worker function to be used with `JournalClient.process(worker_fn)`, after | ||||||||||||||||||||||||||||||||||||
Done Inline Actions
vlorentz: | |||||||||||||||||||||||||||||||||||||
currification of `scheduler` and `task_names`. | currification of `scheduler` and `task_names`. | ||||||||||||||||||||||||||||||||||||
""" | """ | ||||||||||||||||||||||||||||||||||||
assert set(messages) <= { | assert set(messages) <= { | ||||||||||||||||||||||||||||||||||||
msg_type | msg_type | ||||||||||||||||||||||||||||||||||||
}, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" | }, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" | ||||||||||||||||||||||||||||||||||||
assert msg_type in messages, f"Expected {msg_type} messages" | assert msg_type in messages, f"Expected {msg_type} messages" | ||||||||||||||||||||||||||||||||||||
Show All 13 Lines | origin_visit_stats: Dict[Tuple[str, str], Dict] = { | ||||||||||||||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||||||||||||||
} | } | ||||||||||||||||||||||||||||||||||||
# Use the default values from the model object | # Use the default values from the model object | ||||||||||||||||||||||||||||||||||||
empty_object = { | empty_object = { | ||||||||||||||||||||||||||||||||||||
field.name: field.default if field.default != attr.NOTHING else None | field.name: field.default if field.default != attr.NOTHING else None | ||||||||||||||||||||||||||||||||||||
for field in attr.fields(OriginVisitStats) | for field in attr.fields(OriginVisitStats) | ||||||||||||||||||||||||||||||||||||
} | } | ||||||||||||||||||||||||||||||||||||
disabled_urls: List[str] = [] | |||||||||||||||||||||||||||||||||||||
# Retrieve the global queue state | # Retrieve the global queue state | ||||||||||||||||||||||||||||||||||||
queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get() | queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get() | ||||||||||||||||||||||||||||||||||||
for msg_dict in interesting_messages: | for msg_dict in interesting_messages: | ||||||||||||||||||||||||||||||||||||
origin = msg_dict["origin"] | origin = msg_dict["origin"] | ||||||||||||||||||||||||||||||||||||
visit_type = msg_dict["type"] | visit_type = msg_dict["type"] | ||||||||||||||||||||||||||||||||||||
pk = origin, visit_type | pk = origin, visit_type | ||||||||||||||||||||||||||||||||||||
if pk not in origin_visit_stats: | if pk not in origin_visit_stats: | ||||||||||||||||||||||||||||||||||||
▲ Show 20 Lines • Show All 92 Lines • ▼ Show 20 Lines | for msg_dict in interesting_messages: | ||||||||||||||||||||||||||||||||||||
visit_stats_d["next_visit_queue_position"] = next_visit_queue_position( | visit_stats_d["next_visit_queue_position"] = next_visit_queue_position( | ||||||||||||||||||||||||||||||||||||
queue_position_per_visit_type, visit_stats_d | queue_position_per_visit_type, visit_stats_d | ||||||||||||||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||||||||||||||
if increment_successive_visits: | if increment_successive_visits: | ||||||||||||||||||||||||||||||||||||
visit_stats_d["successive_visits"] += 1 | visit_stats_d["successive_visits"] += 1 | ||||||||||||||||||||||||||||||||||||
else: | else: | ||||||||||||||||||||||||||||||||||||
visit_stats_d["successive_visits"] = 1 | visit_stats_d["successive_visits"] = 1 | ||||||||||||||||||||||||||||||||||||
Done Inline Actions
vlorentz: | |||||||||||||||||||||||||||||||||||||
if last_event == "last_failed" and visit_stats_d["successive_visits"] >= 3: | |||||||||||||||||||||||||||||||||||||
disabled_urls.append(visit_stats_d["url"]) | |||||||||||||||||||||||||||||||||||||
scheduler.origin_visit_stats_upsert( | scheduler.origin_visit_stats_upsert( | ||||||||||||||||||||||||||||||||||||
Done Inline Actionshardcoded constant! vlorentz: hardcoded constant! | |||||||||||||||||||||||||||||||||||||
OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() | OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() | ||||||||||||||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||||||||||||||
# Disable any origins if any | |||||||||||||||||||||||||||||||||||||
if disabled_urls: | |||||||||||||||||||||||||||||||||||||
disabled_origins = [] | |||||||||||||||||||||||||||||||||||||
for url in disabled_urls: | |||||||||||||||||||||||||||||||||||||
origins = scheduler.get_listed_origins(url=url).results | |||||||||||||||||||||||||||||||||||||
if len(origins) > 0: | |||||||||||||||||||||||||||||||||||||
origin = attr.evolve(origins[0], enabled=False) | |||||||||||||||||||||||||||||||||||||
disabled_origins.append(origin) | |||||||||||||||||||||||||||||||||||||
if disabled_origins: | |||||||||||||||||||||||||||||||||||||
scheduler.record_listed_origins(disabled_origins) |
docstring please