diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -15,6 +15,9 @@ msg_type = "origin_visit_status" +DISABLE_ORIGIN_THRESHOLD = 3 +"""Threshold to disable failing origins""" + def max_date(*dates: Optional[datetime]) -> datetime: """Return the max date of given (possibly None) dates @@ -130,7 +133,7 @@ def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: - """Read messages from origin_visit_status journal topic to update "origin_visit_stats" + f"""Read messages from origin_visit_status journal topic to update "origin_visit_stats" information on (origin, visit_type). The goal is to compute visit stats information per origin and visit_type: `last_successful`, `last_visit`, `last_visit_status`, ... @@ -156,6 +159,10 @@ are expected to be added for the origin), and `next_position_offset` (duration that we expect to wait between visits of this origin) are updated. + - When visits fails at least {DISABLE_ORIGIN_THRESHOLD} times in a row, the + origins are disabled in the scheduler table. It's up to the lister to activate + those back when they are listed again. + This is a worker function to be used with `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`. @@ -193,6 +200,8 @@ for field in attr.fields(OriginVisitStats) } + disabled_urls: List[str] = [] + # Retrieve the global queue state queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get() @@ -258,6 +267,25 @@ visit_stats_d["successive_visits"] + 1 if same_visit_status else 1 ) + # Disable recurring failing/not-found origins + if ( + visit_stats_d["last_visit_status"] + in [LastVisitStatus.not_found, LastVisitStatus.failed] + ) and visit_stats_d["successive_visits"] >= DISABLE_ORIGIN_THRESHOLD: + disabled_urls.append(visit_stats_d["url"]) + scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() ) + + # Disable any origins if any + if disabled_urls: + disabled_origins = [] + for url in disabled_urls: + origins = scheduler.get_listed_origins(url=url).results + if len(origins) > 0: + origin = attr.evolve(origins[0], enabled=False) + disabled_origins.append(origin) + + if disabled_origins: + scheduler.record_listed_origins(disabled_origins) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -921,3 +921,76 @@ ) assert mock_random.called + + +def test_disable_failing_origins(swh_scheduler): + """Origin with too many failed attempts ends up being deactivated in the scheduler. + + """ + + # actually store the origin in the scheduler so we can check it's deactivated in the + # end. + lister = swh_scheduler.get_or_create_lister( + name="something", instance_name="something" + ) + origin = ListedOrigin( + url="bar", enabled=True, visit_type="svn", lister_id=lister.id + ) + swh_scheduler.record_listed_origins([origin]) + + visit_statuses = [ + { + "origin": "bar", + "visit": 2, + "status": "failed", + "date": DATE1, + "type": "svn", + "snapshot": None, + }, + { + "origin": "bar", + "visit": 3, + "status": "failed", + "date": DATE2, + "type": "svn", + "snapshot": None, + }, + { + "origin": "bar", + "visit": 3, + "status": "failed", + "date": DATE3, + "type": "svn", + "snapshot": None, + }, + ] + + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "svn")]) + assert_visit_stats_ok( + actual_origin_visit_stats[0], + OriginVisitStats( + url="bar", + visit_type="svn", + last_successful=None, + last_visit=DATE3, + last_visit_status=LastVisitStatus.failed, + next_position_offset=6, + successive_visits=3, + ), + ) + + # Now check that the origin in question is disabled + actual_page = swh_scheduler.get_listed_origins(url="bar") + + assert len(actual_page.results) == 1 + assert actual_page.next_page_token is None + + for origin in actual_page.results: + assert origin.enabled is False + assert origin.lister_id == lister.id + assert origin.url == "bar" + assert origin.visit_type == "svn"