diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -762,20 +762,15 @@ @db_transaction() def origin_visit_stats_upsert( - self, visit_stats: OriginVisitStats, db=None, cur=None + self, origin_visit_stats: Iterable[OriginVisitStats], db=None, cur=None ) -> None: - query = """ - INSERT into origin_visit_stats AS ovi ( - url, - visit_type, - last_eventful, - last_uneventful, - last_failed, - last_notfound, - last_snapshot - ) - VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (url, visit_type) DO - UPDATE + pk_cols = OriginVisitStats.primary_key_columns() + insert_cols, insert_meta = OriginVisitStats.insert_columns_and_metavars() + + query = f""" + INSERT into origin_visit_stats AS ovi ({", ".join(insert_cols)}) + VALUES %s + ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE SET last_eventful = ( select max(eventful.date) from (values (excluded.last_eventful), @@ -809,17 +804,13 @@ ) """ # noqa - cur.execute( - query, - ( - visit_stats.url, - visit_stats.visit_type, - visit_stats.last_eventful, - visit_stats.last_uneventful, - visit_stats.last_failed, - visit_stats.last_notfound, - visit_stats.last_snapshot, - ), + psycopg2.extras.execute_values( + cur=cur, + sql=query, + argslist=(attr.asdict(visit_stats) for visit_stats in origin_visit_stats), + template=f"({', '.join(insert_meta)})", + page_size=1000, + fetch=False, ) @db_transaction() diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -327,7 +327,9 @@ ... @remote_api_endpoint("visit_stats/upsert") - def origin_visit_stats_upsert(self, visit_stats: OriginVisitStats) -> None: + def origin_visit_stats_upsert( + self, origin_visit_stats: Iterable[OriginVisitStats] + ) -> None: """Create a new origin visit stats """ ... diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -4,7 +4,9 @@ # See top-level LICENSE file for more information from datetime import datetime -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple + +import attr from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import OriginVisitStats @@ -38,12 +40,13 @@ """ assert msg_type in messages, f"Only '{msg_type}' messages expected" + origin_visit_stats: Dict[Tuple[str, str], Dict] = {} for msg_dict in messages[msg_type]: if msg_dict["status"] in ("created", "ongoing"): continue origin = msg_dict["origin"] visit_type = msg_dict["type"] - visit_stats_d = { + empty_object = { "url": origin, "visit_type": visit_type, "last_uneventful": None, @@ -52,35 +55,41 @@ "last_notfound": None, "last_snapshot": None, } - actual_visit_stats = scheduler.origin_visit_stats_get(origin, visit_type) + pk = origin, visit_type + if pk not in origin_visit_stats: + visit_stats = scheduler.origin_visit_stats_get(origin, visit_type) + origin_visit_stats[pk] = ( + attr.asdict(visit_stats) if visit_stats else empty_object + ) + + visit_stats_d = origin_visit_stats[pk] if msg_dict["status"] == "not_found": visit_stats_d["last_notfound"] = max_date( - msg_dict["date"], - actual_visit_stats.last_notfound if actual_visit_stats else None, + msg_dict["date"], visit_stats_d.get("last_notfound") ) elif msg_dict["snapshot"] is None: visit_stats_d["last_failed"] = max_date( - msg_dict["date"], - actual_visit_stats.last_failed if actual_visit_stats else None, + msg_dict["date"], visit_stats_d.get("last_failed") ) else: # visit with snapshot, something happened - if not actual_visit_stats: + if visit_stats_d == empty_object: visit_stats_d["last_eventful"] = msg_dict["date"] visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: date = max_date( - actual_visit_stats.last_eventful, actual_visit_stats.last_uneventful + visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] ) if date and msg_dict["date"] < date: # ignore out of order message continue - previous_snapshot = actual_visit_stats.last_snapshot + previous_snapshot = visit_stats_d["last_snapshot"] if msg_dict["snapshot"] != previous_snapshot: visit_stats_d["last_eventful"] = msg_dict["date"] visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: visit_stats_d["last_uneventful"] = msg_dict["date"] - visit_stats = OriginVisitStats(**visit_stats_d) - scheduler.origin_visit_stats_upsert(visit_stats) + scheduler.origin_visit_stats_upsert( + OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() + ) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -317,15 +317,17 @@ # Let's insert some visit stats with some previous visit information swh_scheduler.origin_visit_stats_upsert( - OriginVisitStats( - url=visit_status["origin"], - visit_type=visit_status["type"], - last_eventful=D1, - last_uneventful=D3, - last_failed=D2, - last_notfound=D1, - last_snapshot=visit_status["snapshot"], - ) + [ + OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=D1, + last_uneventful=D3, + last_failed=D2, + last_notfound=D1, + last_snapshot=visit_status["snapshot"], + ) + ] ) process_journal_objects( diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -785,8 +785,8 @@ last_failed=None, last_notfound=None, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats) - swh_scheduler.origin_visit_stats_upsert(visit_stats) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) assert swh_scheduler.origin_visit_stats_get(url, "git") == visit_stats assert swh_scheduler.origin_visit_stats_get(url, "svn") is None @@ -800,7 +800,7 @@ last_failed=None, last_notfound=None, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) uneventful_visit = swh_scheduler.origin_visit_stats_get(url, "git") @@ -824,7 +824,7 @@ last_failed=failed_date, last_notfound=None, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) failed_visit = swh_scheduler.origin_visit_stats_get(url, "git") @@ -852,7 +852,7 @@ last_notfound=None, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ) - swh_scheduler.origin_visit_stats_upsert(visit_stats) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) assert swh_scheduler.origin_visit_stats_get(url, "git") == visit_stats assert swh_scheduler.origin_visit_stats_get(url, "svn") is None @@ -878,7 +878,7 @@ last_notfound=None, last_snapshot=snapshot2, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats0) + swh_scheduler.origin_visit_stats_upsert([visit_stats0]) actual_visit_stats0 = swh_scheduler.origin_visit_stats_get(url, "git") assert actual_visit_stats0 == visit_stats0 @@ -891,7 +891,7 @@ last_notfound=None, last_failed=None, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats2) + swh_scheduler.origin_visit_stats_upsert([visit_stats2]) actual_visit_stats2 = swh_scheduler.origin_visit_stats_get(url, "git") assert actual_visit_stats2 == attr.evolve( @@ -910,7 +910,7 @@ last_notfound=None, last_snapshot=snapshot0, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats1) + swh_scheduler.origin_visit_stats_upsert([visit_stats1]) actual_visit_stats1 = swh_scheduler.origin_visit_stats_get(url, "git")