diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py --- a/swh/provenance/postgresql/provenance.py +++ b/swh/provenance/postgresql/provenance.py @@ -7,6 +7,7 @@ from contextlib import contextmanager from datetime import datetime +from functools import wraps import itertools import logging from types import TracebackType @@ -34,6 +35,21 @@ STORAGE_DURATION_METRIC = "swh_provenance_storage_postgresql_duration_seconds" +def handle_raise_on_commit(f): + @wraps(f) + def handle(self, *args, **kwargs): + try: + return f(self, *args, **kwargs) + except BaseException as ex: + # Unexpected error occurred, rollback all changes and log message + LOGGER.exception("Unexpected error") + if self.raise_on_commit: + raise ex + return False + + return handle + + class ProvenanceStoragePostgreSql: def __init__( self, page_size: Optional[int] = None, raise_on_commit: bool = False, **kwargs @@ -82,26 +98,20 @@ self.conn.close() @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"}) + @handle_raise_on_commit def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool: - try: - if cnts: - sql = """ - INSERT INTO content(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,content.date) - """ - page_size = self.page_size or len(cnts) - with self.transaction() as cursor: - psycopg2.extras.execute_values( - cursor, sql, argslist=cnts.items(), page_size=page_size - ) - return True - except: # noqa: E722 - # Unexpected error occurred, rollback all changes and log message - LOGGER.exception("Unexpected error") - if self.raise_on_commit: - raise - return False + if cnts: + sql = """ + INSERT INTO content(sha1, date) VALUES %s + ON CONFLICT (sha1) DO + UPDATE SET date=LEAST(EXCLUDED.date,content.date) + """ + page_size = self.page_size or len(cnts) + with self.transaction() as cursor: + psycopg2.extras.execute_values( + cursor, sql, argslist=cnts.items(), page_size=page_size + ) + return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"}) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: @@ -139,29 +149,23 @@ return dates @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"}) + @handle_raise_on_commit def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool: data = [(sha1, rev.date, rev.flat) for sha1, rev in dirs.items()] - try: - if data: - sql = """ - INSERT INTO directory(sha1, date, flat) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET - date=LEAST(EXCLUDED.date, directory.date), - flat=(EXCLUDED.flat OR directory.flat) - """ - page_size = self.page_size or len(data) - with self.transaction() as cursor: - psycopg2.extras.execute_values( - cur=cursor, sql=sql, argslist=data, page_size=page_size - ) - return True - except: # noqa: E722 - # Unexpected error occurred, rollback all changes and log message - LOGGER.exception("Unexpected error") - if self.raise_on_commit: - raise - return False + if data: + sql = """ + INSERT INTO directory(sha1, date, flat) VALUES %s + ON CONFLICT (sha1) DO + UPDATE SET + date=LEAST(EXCLUDED.date, directory.date), + flat=(EXCLUDED.flat OR directory.flat) + """ + page_size = self.page_size or len(data) + with self.transaction() as cursor: + psycopg2.extras.execute_values( + cur=cursor, sql=sql, argslist=data, page_size=page_size + ) + return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"}) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]: @@ -191,10 +195,9 @@ return {row["sha1"] for row in cursor} @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"}) + @handle_raise_on_commit def location_add(self, paths: Iterable[bytes]) -> bool: - if not self.with_path(): - return True - try: + if self.with_path(): values = [(path,) for path in paths] if values: sql = """ @@ -206,13 +209,7 @@ psycopg2.extras.execute_values( cursor, sql, argslist=values, page_size=page_size ) - return True - except: # noqa: E722 - # Unexpected error occurred, rollback all changes and log message - LOGGER.exception("Unexpected error") - if self.raise_on_commit: - raise - return False + return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"}) def location_get_all(self) -> Set[bytes]: @@ -221,28 +218,22 @@ return {row["path"] for row in cursor} @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"}) + @handle_raise_on_commit def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: - try: - if orgs: - sql = """ - INSERT INTO origin(sha1, url) VALUES %s - ON CONFLICT DO NOTHING - """ - page_size = self.page_size or len(orgs) - with self.transaction() as cursor: - psycopg2.extras.execute_values( - cur=cursor, - sql=sql, - argslist=orgs.items(), - page_size=page_size, - ) - return True - except: # noqa: E722 - # Unexpected error occurred, rollback all changes and log message - LOGGER.exception("Unexpected error") - if self.raise_on_commit: - raise - return False + if orgs: + sql = """ + INSERT INTO origin(sha1, url) VALUES %s + ON CONFLICT DO NOTHING + """ + page_size = self.page_size or len(orgs) + with self.transaction() as cursor: + psycopg2.extras.execute_values( + cur=cursor, + sql=sql, + argslist=orgs.items(), + page_size=page_size, + ) + return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"}) def open(self) -> None: @@ -269,6 +260,7 @@ return urls @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"}) + @handle_raise_on_commit def revision_add( self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] ) -> bool: @@ -276,30 +268,23 @@ data = [(sha1, rev.date, rev.origin) for sha1, rev in revs.items()] else: data = [(sha1, None, None) for sha1 in revs] - try: - if data: - sql = """ - INSERT INTO revision(sha1, date, origin) - (SELECT V.rev AS sha1, V.date::timestamptz AS date, O.id AS origin - FROM (VALUES %s) AS V(rev, date, org) - LEFT JOIN origin AS O ON (O.sha1=V.org::sha1_git)) - ON CONFLICT (sha1) DO - UPDATE SET - date=LEAST(EXCLUDED.date, revision.date), - origin=COALESCE(EXCLUDED.origin, revision.origin) - """ - page_size = self.page_size or len(data) - with self.transaction() as cursor: - psycopg2.extras.execute_values( - cur=cursor, sql=sql, argslist=data, page_size=page_size - ) - return True - except: # noqa: E722 - # Unexpected error occurred, rollback all changes and log message - LOGGER.exception("Unexpected error") - if self.raise_on_commit: - raise - return False + if data: + sql = """ + INSERT INTO revision(sha1, date, origin) + (SELECT V.rev AS sha1, V.date::timestamptz AS date, O.id AS origin + FROM (VALUES %s) AS V(rev, date, org) + LEFT JOIN origin AS O ON (O.sha1=V.org::sha1_git)) + ON CONFLICT (sha1) DO + UPDATE SET + date=LEAST(EXCLUDED.date, revision.date), + origin=COALESCE(EXCLUDED.origin, revision.origin) + """ + page_size = self.page_size or len(data) + with self.transaction() as cursor: + psycopg2.extras.execute_values( + cur=cursor, sql=sql, argslist=data, page_size=page_size + ) + return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"}) def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: @@ -324,34 +309,28 @@ return result @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_add"}) + @handle_raise_on_commit def relation_add( self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] ) -> bool: rows = [(src, rel.dst, rel.path) for src, dsts in data.items() for rel in dsts] - try: - if rows: - rel_table = relation.value - src_table, *_, dst_table = rel_table.split("_") - page_size = self.page_size or len(rows) - # Put the next three queries in a manual single transaction: - # they use the same temp table - with self.transaction() as cursor: - cursor.execute("SELECT swh_mktemp_relation_add()") - psycopg2.extras.execute_values( - cur=cursor, - sql="INSERT INTO tmp_relation_add(src, dst, path) VALUES %s", - argslist=rows, - page_size=page_size, - ) - sql = "SELECT swh_provenance_relation_add_from_temp(%s, %s, %s)" - cursor.execute(query=sql, vars=(rel_table, src_table, dst_table)) - return True - except: # noqa: E722 - # Unexpected error occurred, rollback all changes and log message - LOGGER.exception("Unexpected error") - if self.raise_on_commit: - raise - return False + if rows: + rel_table = relation.value + src_table, *_, dst_table = rel_table.split("_") + page_size = self.page_size or len(rows) + # Put the next three queries in a manual single transaction: + # they use the same temp table + with self.transaction() as cursor: + cursor.execute("SELECT swh_mktemp_relation_add()") + psycopg2.extras.execute_values( + cur=cursor, + sql="INSERT INTO tmp_relation_add(src, dst, path) VALUES %s", + argslist=rows, + page_size=page_size, + ) + sql = "SELECT swh_provenance_relation_add_from_temp(%s, %s, %s)" + cursor.execute(query=sql, vars=(rel_table, src_table, dst_table)) + return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get"}) def relation_get(