diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -15,6 +15,8 @@ msg_type = "origin_visit_status" +DEACTIVATE_ORIGIN_THRESHOLD = 3 + def max_date(*dates: Optional[datetime]) -> datetime: """Return the max date of given (possibly None) dates @@ -156,6 +158,10 @@ are expected to be added for the origin), and `next_position_offset` (duration that we expect to wait between visits of this origin) are updated. + - When visits fails at least 3 times in a row, the origins are disabled in the + scheduler table. It's up to the lister to activate those back when they are + getting listed again. + This is a worker function to be used with `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`. @@ -193,6 +199,8 @@ for field in attr.fields(OriginVisitStats) } + disabled_urls: List[str] = [] + # Retrieve the global queue state queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get() @@ -258,6 +266,25 @@ visit_stats_d["successive_visits"] + 1 if same_visit_status else 1 ) + # Disable recurring failing/not-found origins + if ( + visit_stats_d["last_visit_status"] + in [LastVisitStatus.not_found, LastVisitStatus.failed] + ) and visit_stats_d["successive_visits"] >= DEACTIVATE_ORIGIN_THRESHOLD: + disabled_urls.append(visit_stats_d["url"]) + scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() ) + + # Disable any origins if any + if disabled_urls: + disabled_origins = [] + for url in disabled_urls: + origins = scheduler.get_listed_origins(url=url).results + if len(origins) > 0: + origin = attr.evolve(origins[0], enabled=False) + disabled_origins.append(origin) + + if disabled_origins: + scheduler.record_listed_origins(disabled_origins) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -921,3 +921,76 @@ ) assert mock_random.called + + +def test_disable_failing_origins(swh_scheduler): + """Origin with too many failed attempts ends up being deactivated in the scheduler. + + """ + + # actually store the origin in the scheduler so we can check it's deactivated in the + # end. + lister = swh_scheduler.get_or_create_lister( + name="something", instance_name="something" + ) + origin = ListedOrigin( + url="bar", enabled=True, visit_type="svn", lister_id=lister.id + ) + swh_scheduler.record_listed_origins([origin]) + + visit_statuses = [ + { + "origin": "bar", + "visit": 2, + "status": "failed", + "date": DATE1, + "type": "svn", + "snapshot": None, + }, + { + "origin": "bar", + "visit": 3, + "status": "failed", + "date": DATE2, + "type": "svn", + "snapshot": None, + }, + { + "origin": "bar", + "visit": 3, + "status": "failed", + "date": DATE3, + "type": "svn", + "snapshot": None, + }, + ] + + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "svn")]) + assert_visit_stats_ok( + actual_origin_visit_stats[0], + OriginVisitStats( + url="bar", + visit_type="svn", + last_successful=None, + last_visit=DATE3, + last_visit_status=LastVisitStatus.failed, + next_position_offset=6, + successive_visits=3, + ), + ) + + # Now check that the origin in question is disabled + actual_page = swh_scheduler.get_listed_origins(url="bar") + + assert len(actual_page.results) == 1 + assert actual_page.next_page_token is None + + for origin in actual_page.results: + assert origin.enabled is False + assert origin.lister_id == lister.id + assert origin.url == "bar" + assert origin.visit_type == "svn"