diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -487,6 +487,15 @@ query_args.append(count) + # fmt: off + common_table_expressions.append(("deduplicated_selected_origins", """ + SELECT DISTINCT + url, visit_type + FROM + selected_origins + """)) + # fmt: on + # fmt: off common_table_expressions.append(("update_stats", """ INSERT INTO @@ -494,7 +503,7 @@ SELECT url, visit_type, %s FROM - selected_origins + deduplicated_selected_origins ON CONFLICT (url, visit_type) DO UPDATE SET last_scheduled = GREATEST( origin_visit_stats.last_scheduled, diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1179,6 +1179,37 @@ with pytest.raises(UnknownPolicy, match=unknown_policy): swh_scheduler.grab_next_visits("type", NUM_RESULTS, policy=unknown_policy) + def test_grab_next_visit_duplicates(self, swh_scheduler, listed_origins): + """Checks grab_next_visits does not crash when there are rows with + duplicated (origin_url, visit_type) in the database + """ + lister2 = swh_scheduler.get_or_create_lister(**LISTERS[1]) + assert lister2.id != listed_origins[0].lister_id + + # Create two origins with the same url and visit_type, but different listers + # (and also differing value for last_update so they are returned in + # deterministic order) + origin1 = attr.evolve( + listed_origins[0], first_seen=utcnow(), last_seen=utcnow() + ) + origin2 = attr.evolve( + origin1, + lister_id=lister2.id, + last_update=origin1.last_update + datetime.timedelta(seconds=10), + ) + + origins = [origin1, origin2] + recorded_origins = swh_scheduler.record_listed_origins(origins) + + expected_origins = sorted(recorded_origins, key=lambda o: o.last_update) + + self._check_grab_next_visit( + swh_scheduler, + visit_type=origin1.visit_type, + policy="never_visited_oldest_update_first", + expected=expected_origins, + ) + def _create_task_types(self, scheduler): for tt in TASK_TYPES.values(): scheduler.create_task_type(tt)