diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -762,20 +762,15 @@ @db_transaction() def origin_visit_stats_upsert( - self, visit_stats: OriginVisitStats, db=None, cur=None + self, origin_visit_stats: Iterable[OriginVisitStats], db=None, cur=None ) -> None: - query = """ - INSERT into origin_visit_stats AS ovi ( - url, - visit_type, - last_eventful, - last_uneventful, - last_failed, - last_notfound, - last_snapshot - ) - VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (url, visit_type) DO - UPDATE + pk_cols = OriginVisitStats.primary_key_columns() + insert_cols, insert_meta = OriginVisitStats.insert_columns_and_metavars() + + query = f""" + INSERT into origin_visit_stats AS ovi ({", ".join(insert_cols)}) + VALUES %s + ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE SET last_eventful = ( select max(eventful.date) from (values (excluded.last_eventful), @@ -809,17 +804,13 @@ ) """ # noqa - cur.execute( - query, - ( - visit_stats.url, - visit_stats.visit_type, - visit_stats.last_eventful, - visit_stats.last_uneventful, - visit_stats.last_failed, - visit_stats.last_notfound, - visit_stats.last_snapshot, - ), + psycopg2.extras.execute_values( + cur=cur, + sql=query, + argslist=(attr.asdict(visit_stats) for visit_stats in origin_visit_stats), + template=f"({', '.join(insert_meta)})", + page_size=1000, + fetch=False, ) @db_transaction() diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -327,7 +327,9 @@ ... @remote_api_endpoint("visit_stats/upsert") - def origin_visit_stats_upsert(self, visit_stats: OriginVisitStats) -> None: + def origin_visit_stats_upsert( + self, origin_visit_stats: Iterable[OriginVisitStats] + ) -> None: """Create a new origin visit stats """ ... diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -38,6 +38,7 @@ """ assert msg_type in messages, f"Only '{msg_type}' messages expected" + origin_visit_stats = [] for msg_dict in messages[msg_type]: if msg_dict["status"] in ("created", "ongoing"): continue @@ -82,5 +83,7 @@ else: visit_stats_d["last_uneventful"] = msg_dict["date"] - visit_stats = OriginVisitStats(**visit_stats_d) - scheduler.origin_visit_stats_upsert(visit_stats) + origin_visit_stats.append(OriginVisitStats(**visit_stats_d)) + + if origin_visit_stats: + scheduler.origin_visit_stats_upsert(origin_visit_stats) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -317,15 +317,17 @@ # Let's insert some visit stats with some previous visit information swh_scheduler.origin_visit_stats_upsert( - OriginVisitStats( - url=visit_status["origin"], - visit_type=visit_status["type"], - last_eventful=D1, - last_uneventful=D3, - last_failed=D2, - last_notfound=D1, - last_snapshot=visit_status["snapshot"], - ) + [ + OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=D1, + last_uneventful=D3, + last_failed=D2, + last_notfound=D1, + last_snapshot=visit_status["snapshot"], + ) + ] ) process_journal_objects( diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -785,8 +785,8 @@ last_failed=None, last_notfound=None, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats) - swh_scheduler.origin_visit_stats_upsert(visit_stats) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) assert swh_scheduler.origin_visit_stats_get(url, "git") == visit_stats assert swh_scheduler.origin_visit_stats_get(url, "svn") is None @@ -800,7 +800,7 @@ last_failed=None, last_notfound=None, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) uneventful_visit = swh_scheduler.origin_visit_stats_get(url, "git") @@ -824,7 +824,7 @@ last_failed=failed_date, last_notfound=None, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) failed_visit = swh_scheduler.origin_visit_stats_get(url, "git") @@ -852,7 +852,7 @@ last_notfound=None, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ) - swh_scheduler.origin_visit_stats_upsert(visit_stats) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) assert swh_scheduler.origin_visit_stats_get(url, "git") == visit_stats assert swh_scheduler.origin_visit_stats_get(url, "svn") is None @@ -878,7 +878,7 @@ last_notfound=None, last_snapshot=snapshot2, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats0) + swh_scheduler.origin_visit_stats_upsert([visit_stats0]) actual_visit_stats0 = swh_scheduler.origin_visit_stats_get(url, "git") assert actual_visit_stats0 == visit_stats0 @@ -891,7 +891,7 @@ last_notfound=None, last_failed=None, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats2) + swh_scheduler.origin_visit_stats_upsert([visit_stats2]) actual_visit_stats2 = swh_scheduler.origin_visit_stats_get(url, "git") assert actual_visit_stats2 == attr.evolve( @@ -910,7 +910,7 @@ last_notfound=None, last_snapshot=snapshot0, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats1) + swh_scheduler.origin_visit_stats_upsert([visit_stats1]) actual_visit_stats1 = swh_scheduler.origin_visit_stats_get(url, "git")