Page MenuHomeSoftware Heritage

D5980.diff
No OneTemporary

D5980.diff

diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py
--- a/swh/scheduler/journal_client.py
+++ b/swh/scheduler/journal_client.py
@@ -15,6 +15,9 @@
msg_type = "origin_visit_status"
+DISABLE_ORIGIN_THRESHOLD = 3
+"""Threshold to disable failing origins"""
+
def max_date(*dates: Optional[datetime]) -> datetime:
"""Return the max date of given (possibly None) dates
@@ -130,7 +133,7 @@
def process_journal_objects(
messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface
) -> None:
- """Read messages from origin_visit_status journal topic to update "origin_visit_stats"
+ f"""Read messages from origin_visit_status journal topic to update "origin_visit_stats"
information on (origin, visit_type). The goal is to compute visit stats information
per origin and visit_type: `last_successful`, `last_visit`, `last_visit_status`, ...
@@ -156,6 +159,10 @@
are expected to be added for the origin), and `next_position_offset` (duration
that we expect to wait between visits of this origin) are updated.
+ - When visits fails at least {DISABLE_ORIGIN_THRESHOLD} times in a row, the
+ origins are disabled in the scheduler table. It's up to the lister to activate
+ those back when they are listed again.
+
This is a worker function to be used with `JournalClient.process(worker_fn)`, after
currification of `scheduler` and `task_names`.
@@ -193,6 +200,8 @@
for field in attr.fields(OriginVisitStats)
}
+ disabled_urls: List[str] = []
+
# Retrieve the global queue state
queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get()
@@ -258,6 +267,25 @@
visit_stats_d["successive_visits"] + 1 if same_visit_status else 1
)
+ # Disable recurring failing/not-found origins
+ if (
+ visit_stats_d["last_visit_status"]
+ in [LastVisitStatus.not_found, LastVisitStatus.failed]
+ ) and visit_stats_d["successive_visits"] >= DISABLE_ORIGIN_THRESHOLD:
+ disabled_urls.append(visit_stats_d["url"])
+
scheduler.origin_visit_stats_upsert(
OriginVisitStats(**ovs) for ovs in origin_visit_stats.values()
)
+
+ # Disable any origins if any
+ if disabled_urls:
+ disabled_origins = []
+ for url in disabled_urls:
+ origins = scheduler.get_listed_origins(url=url).results
+ if len(origins) > 0:
+ origin = attr.evolve(origins[0], enabled=False)
+ disabled_origins.append(origin)
+
+ if disabled_origins:
+ scheduler.record_listed_origins(disabled_origins)
diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py
--- a/swh/scheduler/tests/test_journal_client.py
+++ b/swh/scheduler/tests/test_journal_client.py
@@ -921,3 +921,76 @@
)
assert mock_random.called
+
+
+def test_disable_failing_origins(swh_scheduler):
+ """Origin with too many failed attempts ends up being deactivated in the scheduler.
+
+ """
+
+ # actually store the origin in the scheduler so we can check it's deactivated in the
+ # end.
+ lister = swh_scheduler.get_or_create_lister(
+ name="something", instance_name="something"
+ )
+ origin = ListedOrigin(
+ url="bar", enabled=True, visit_type="svn", lister_id=lister.id
+ )
+ swh_scheduler.record_listed_origins([origin])
+
+ visit_statuses = [
+ {
+ "origin": "bar",
+ "visit": 2,
+ "status": "failed",
+ "date": DATE1,
+ "type": "svn",
+ "snapshot": None,
+ },
+ {
+ "origin": "bar",
+ "visit": 3,
+ "status": "failed",
+ "date": DATE2,
+ "type": "svn",
+ "snapshot": None,
+ },
+ {
+ "origin": "bar",
+ "visit": 3,
+ "status": "failed",
+ "date": DATE3,
+ "type": "svn",
+ "snapshot": None,
+ },
+ ]
+
+ process_journal_objects(
+ {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler
+ )
+
+ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "svn")])
+ assert_visit_stats_ok(
+ actual_origin_visit_stats[0],
+ OriginVisitStats(
+ url="bar",
+ visit_type="svn",
+ last_successful=None,
+ last_visit=DATE3,
+ last_visit_status=LastVisitStatus.failed,
+ next_position_offset=6,
+ successive_visits=3,
+ ),
+ )
+
+ # Now check that the origin in question is disabled
+ actual_page = swh_scheduler.get_listed_origins(url="bar")
+
+ assert len(actual_page.results) == 1
+ assert actual_page.next_page_token is None
+
+ for origin in actual_page.results:
+ assert origin.enabled is False
+ assert origin.lister_id == lister.id
+ assert origin.url == "bar"
+ assert origin.visit_type == "svn"

File Metadata

Mime Type
text/plain
Expires
Wed, Dec 18, 3:38 AM (1 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217479

Event Timeline