diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -9,6 +9,7 @@ from uuid import UUID import attr +from psycopg2.errors import CardinalityViolation import psycopg2.extras import psycopg2.pool @@ -16,7 +17,7 @@ from swh.core.db.common import db_transaction from swh.scheduler.utils import utcnow -from .exc import StaleData, UnknownPolicy +from .exc import SchedulerException, StaleData, UnknownPolicy from .model import ( ListedOrigin, ListedOriginPageToken, @@ -762,20 +763,15 @@ @db_transaction() def origin_visit_stats_upsert( - self, visit_stats: OriginVisitStats, db=None, cur=None + self, origin_visit_stats: Iterable[OriginVisitStats], db=None, cur=None ) -> None: - query = """ - INSERT into origin_visit_stats AS ovi ( - url, - visit_type, - last_eventful, - last_uneventful, - last_failed, - last_notfound, - last_snapshot - ) - VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (url, visit_type) DO - UPDATE + pk_cols = OriginVisitStats.primary_key_columns() + insert_cols, insert_meta = OriginVisitStats.insert_columns_and_metavars() + + query = f""" + INSERT into origin_visit_stats AS ovi ({", ".join(insert_cols)}) + VALUES %s + ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE SET last_eventful = ( select max(eventful.date) from (values (excluded.last_eventful), @@ -808,18 +804,19 @@ ) """ # noqa - cur.execute( - query, - ( - visit_stats.url, - visit_stats.visit_type, - visit_stats.last_eventful, - visit_stats.last_uneventful, - visit_stats.last_failed, - visit_stats.last_notfound, - visit_stats.last_snapshot, - ), - ) + try: + psycopg2.extras.execute_values( + cur=cur, + sql=query, + argslist=( + attr.asdict(visit_stats) for visit_stats in origin_visit_stats + ), + template=f"({', '.join(insert_meta)})", + page_size=1000, + fetch=False, + ) + except CardinalityViolation as e: + raise SchedulerException(repr(e)) @db_transaction() def origin_visit_stats_get( diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -327,7 +327,9 @@ ... @remote_api_endpoint("visit_stats/upsert") - def origin_visit_stats_upsert(self, visit_stats: OriginVisitStats) -> None: + def origin_visit_stats_upsert( + self, origin_visit_stats: Iterable[OriginVisitStats] + ) -> None: """Create a new origin visit stats """ ... diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -4,7 +4,9 @@ # See top-level LICENSE file for more information from datetime import datetime -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple + +import attr from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import OriginVisitStats @@ -41,12 +43,13 @@ }, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" assert msg_type in messages, f"Expected {msg_type} messages" + origin_visit_stats: Dict[Tuple[str, str], Dict] = {} for msg_dict in messages[msg_type]: if msg_dict["status"] in ("created", "ongoing"): continue origin = msg_dict["origin"] visit_type = msg_dict["type"] - visit_stats_d = { + empty_object = { "url": origin, "visit_type": visit_type, "last_uneventful": None, @@ -55,35 +58,41 @@ "last_notfound": None, "last_snapshot": None, } - actual_visit_stats = scheduler.origin_visit_stats_get(origin, visit_type) + pk = origin, visit_type + if pk not in origin_visit_stats: + visit_stats = scheduler.origin_visit_stats_get(origin, visit_type) + origin_visit_stats[pk] = ( + attr.asdict(visit_stats) if visit_stats else empty_object + ) + + visit_stats_d = origin_visit_stats[pk] if msg_dict["status"] == "not_found": visit_stats_d["last_notfound"] = max_date( - msg_dict["date"], - actual_visit_stats.last_notfound if actual_visit_stats else None, + msg_dict["date"], visit_stats_d.get("last_notfound") ) elif msg_dict["snapshot"] is None: visit_stats_d["last_failed"] = max_date( - msg_dict["date"], - actual_visit_stats.last_failed if actual_visit_stats else None, + msg_dict["date"], visit_stats_d.get("last_failed") ) else: # visit with snapshot, something happened - if not actual_visit_stats: + if visit_stats_d == empty_object: visit_stats_d["last_eventful"] = msg_dict["date"] visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: date = max_date( - actual_visit_stats.last_eventful, actual_visit_stats.last_uneventful + visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] ) if date and msg_dict["date"] < date: # ignore out of order message continue - previous_snapshot = actual_visit_stats.last_snapshot + previous_snapshot = visit_stats_d["last_snapshot"] if msg_dict["snapshot"] != previous_snapshot: visit_stats_d["last_eventful"] = msg_dict["date"] visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: visit_stats_d["last_uneventful"] = msg_dict["date"] - visit_stats = OriginVisitStats(**visit_stats_d) - scheduler.origin_visit_stats_upsert(visit_stats) + scheduler.origin_visit_stats_upsert( + OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() + ) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -324,15 +324,17 @@ # Let's insert some visit stats with some previous visit information swh_scheduler.origin_visit_stats_upsert( - OriginVisitStats( - url=visit_status["origin"], - visit_type=visit_status["type"], - last_eventful=DATE1, - last_uneventful=DATE3, - last_failed=DATE2, - last_notfound=DATE1, - last_snapshot=visit_status["snapshot"], - ) + [ + OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=DATE1, + last_uneventful=DATE3, + last_failed=DATE2, + last_notfound=DATE1, + last_snapshot=visit_status["snapshot"], + ) + ] ) process_journal_objects( diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -15,7 +15,7 @@ import pytest from swh.model.hashutil import hash_to_bytes -from swh.scheduler.exc import StaleData, UnknownPolicy +from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin, ListedOriginPageToken, OriginVisitStats from swh.scheduler.utils import utcnow @@ -785,8 +785,8 @@ last_failed=None, last_notfound=None, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats) - swh_scheduler.origin_visit_stats_upsert(visit_stats) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) assert swh_scheduler.origin_visit_stats_get(url, "git") == visit_stats assert swh_scheduler.origin_visit_stats_get(url, "svn") is None @@ -800,7 +800,7 @@ last_failed=None, last_notfound=None, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) uneventful_visit = swh_scheduler.origin_visit_stats_get(url, "git") @@ -824,7 +824,7 @@ last_failed=failed_date, last_notfound=None, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) failed_visit = swh_scheduler.origin_visit_stats_get(url, "git") @@ -852,7 +852,7 @@ last_notfound=None, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ) - swh_scheduler.origin_visit_stats_upsert(visit_stats) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) assert swh_scheduler.origin_visit_stats_get(url, "git") == visit_stats assert swh_scheduler.origin_visit_stats_get(url, "svn") is None @@ -877,7 +877,7 @@ last_notfound=None, last_snapshot=snapshot2, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats0) + swh_scheduler.origin_visit_stats_upsert([visit_stats0]) actual_visit_stats0 = swh_scheduler.origin_visit_stats_get(url, "git") assert actual_visit_stats0 == visit_stats0 @@ -890,7 +890,7 @@ last_notfound=None, last_failed=None, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats2) + swh_scheduler.origin_visit_stats_upsert([visit_stats2]) actual_visit_stats2 = swh_scheduler.origin_visit_stats_get(url, "git") assert actual_visit_stats2 == attr.evolve( @@ -909,10 +909,71 @@ last_notfound=None, last_snapshot=snapshot0, ) - swh_scheduler.origin_visit_stats_upsert(visit_stats1) + swh_scheduler.origin_visit_stats_upsert([visit_stats1]) actual_visit_stats1 = swh_scheduler.origin_visit_stats_get(url, "git") assert actual_visit_stats1 == attr.evolve( actual_visit_stats2, last_eventful=date2 ) + + def test_origin_visit_stats_upsert_batch(self, swh_scheduler) -> None: + """Batch upsert is ok""" + visit_stats = [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=utcnow(), + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + ), + OriginVisitStats( + url="bar", + visit_type="git", + last_eventful=None, + last_uneventful=utcnow(), + last_notfound=None, + last_failed=None, + last_snapshot=hash_to_bytes("fffcc0710eb6cf9efd5b920a8453e1e07157bfff"), + ), + ] + + swh_scheduler.origin_visit_stats_upsert(visit_stats) + + for visit_stat in visit_stats: + assert ( + swh_scheduler.origin_visit_stats_get( + visit_stat.url, visit_stat.visit_type + ) + is not None + ) + + def test_origin_visit_stats_upsert_cardinality_failing(self, swh_scheduler) -> None: + """Batch upsert does not support altering multiple times the same origin-visit-status + + """ + with pytest.raises(SchedulerException, match="CardinalityViolation"): + swh_scheduler.origin_visit_stats_upsert( + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=None, + last_uneventful=utcnow(), + last_notfound=None, + last_failed=None, + last_snapshot=None, + ), + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=None, + last_uneventful=utcnow(), + last_notfound=None, + last_failed=None, + last_snapshot=None, + ), + ] + )