diff --git a/swh/core/db/__init__.py b/swh/core/db/__init__.py --- a/swh/core/db/__init__.py +++ b/swh/core/db/__init__.py @@ -9,6 +9,7 @@ import json import logging import os +import sys import threading from contextlib import contextmanager @@ -167,12 +168,18 @@ """ read_file, write_file = os.pipe() + exc_info = None def writer(): + nonlocal exc_info cursor = self.cursor(cur) with open(read_file, 'r') as f: - cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( - tblname, ', '.join(columns)), f) + try: + cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( + tblname, ', '.join(columns)), f) + except Exception: + # Tell the main thread about the exception + exc_info = sys.exc_info() write_thread = threading.Thread(target=writer) write_thread.start() @@ -202,6 +209,9 @@ # we finish copying, even though we're probably going to cancel the # transaction. write_thread.join() + if exc_info: + # postgresql returned an error, let's raise it. + raise exc_info[1].with_traceback(exc_info[2]) def mktemp(self, tblname, cur=None): self.cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) diff --git a/swh/core/db/tests/test_db.py b/swh/core/db/tests/test_db.py --- a/swh/core/db/tests/test_db.py +++ b/swh/core/db/tests/test_db.py @@ -8,6 +8,7 @@ import unittest from hypothesis import strategies, given +import psycopg2 import pytest from swh.core.db import BaseDb @@ -100,3 +101,10 @@ cur = self.db.cursor() cur.execute('select * from test_table;') self.assertCountEqual(list(cur), data) + + def test_copy_to_thread_exception(self): + data = [(2**65, 'foo', b'bar')] + + items = [dict(zip(['i', 'txt', 'bytes'], item)) for item in data] + with self.assertRaises(psycopg2.errors.NumericValueOutOfRange): + self.db.copy_to(items, 'test_table', ['i', 'txt', 'bytes'])