Page MenuHomeSoftware Heritage

Jenkins > .tox.py3.lib.python3.7.site-packages.swh.storage.tests.test_backfill::test_backfiller
Failed

TEST RESULT

Run At
Jul 6 2020, 2:26 PM
Details
@contextlib.contextmanager def convert_validation_exceptions(): """Catches postgresql errors related to invalid arguments, and re-raises a StorageArgumentException.""" try: > yield .tox/py3/lib/python3.7/site-packages/swh/storage/storage.py:76: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <swh.storage.storage.Storage object at 0x7f7c417d79e8> revisions = [Revision(message=b'hello', author=Person(fullname=b'foo', name=b'foo', email=b''), committer=Person(fullname=b'foo', ...se, metadata=None, parents=(), id=b'gpc\xf5\xc4\x05\xd6\xfc\x17\x81\xfcV7\x9c\x9a\x9a\xdfC\xd3\xa0', extra_headers=())] db = <swh.storage.db.Db object at 0x7f7c417d7ac8> cur = <cursor object at 0x7f7c425dcce0; closed: -1> @timed @process_metrics @db_transaction() def revision_add(self, revisions: Iterable[Revision], db=None, cur=None) -> Dict: revisions = list(revisions) summary = {"revision:add": 0} revisions_missing = set( self.revision_missing( set(revision.id for revision in revisions), db=db, cur=cur ) ) if not revisions_missing: return summary db.mktemp_revision(cur) revisions_filtered = [ revision for revision in revisions if revision.id in revisions_missing ] self.journal_writer.revision_add(revisions_filtered) revisions_filtered = list(map(converters.revision_to_db, revisions_filtered)) parents_filtered: List[bytes] = [] with convert_validation_exceptions(): db.copy_to( revisions_filtered, "tmp_revision", db.revision_add_cols, cur, > lambda rev: parents_filtered.extend(rev["parents"]), ) .tox/py3/lib/python3.7/site-packages/swh/storage/storage.py:579: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <swh.storage.db.Db object at 0x7f7c417d7ac8> items = [{'author_email': b'', 'author_fullname': b'foo', 'author_name': b'foo', 'committer_date': '2009-02-13T23:31:31+00:00'...hor_email': b'', 'author_fullname': b'bar', 'author_name': b'bar', 'committer_date': '2009-02-13T23:31:32+00:00', ...}] tblname = 'tmp_revision' columns = ['id', 'date', 'date_offset', 'date_neg_utc_offset', 'committer_date', 'committer_date_offset', ...] cur = <cursor object at 0x7f7c425dcce0; closed: -1> item_cb = <function Storage.revision_add.<locals>.<lambda> at 0x7f7c42459950> default_values = {} def copy_to( self, items, tblname, columns, cur=None, item_cb=None, default_values={} ): """Copy items' entries to table tblname with columns information. Args: items (List[dict]): dictionaries of data to copy over tblname. tblname (str): destination table's name. columns ([str]): keys to access data in items and also the column names in the destination table. default_values (dict): dictionary of default values to use when inserting entried int the tblname table. cur: a db cursor; if not given, a new cursor will be created. item_cb (fn): optional function to apply to items's entry. """ read_file, write_file = os.pipe() exc_info = None def writer(): nonlocal exc_info cursor = self.cursor(cur) with open(read_file, "r") as f: try: cursor.copy_expert( "COPY %s (%s) FROM STDIN CSV" % (tblname, ", ".join(columns)), f ) except Exception: # Tell the main thread about the exception exc_info = sys.exc_info() write_thread = threading.Thread(target=writer) write_thread.start() try: with open(write_file, "w") as f: for d in items: if item_cb is not None: item_cb(d) line = [] for k in columns: value = d.get(k, default_values.get(k)) try: line.append(escape(value)) except Exception as e: logger.error( "Could not escape value `%r` for column `%s`:" "Received exception: `%s`", value, k, e, ) raise e from None f.write(",".join(line)) f.write("\n") finally: # No problem bubbling up exceptions, but we still need to make sure # we finish copying, even though we're probably going to cancel the # transaction. write_thread.join() if exc_info: # postgresql returned an error, let's raise it. > raise exc_info[1].with_traceback(exc_info[2]) .tox/py3/lib/python3.7/site-packages/swh/core/db/__init__.py:213: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ def writer(): nonlocal exc_info cursor = self.cursor(cur) with open(read_file, "r") as f: try: cursor.copy_expert( > "COPY %s (%s) FROM STDIN CSV" % (tblname, ", ".join(columns)), f ) E psycopg2.errors.InvalidTextRepresentation: malformed array literal: "()" E DETAIL: Array value must start with "{" or dimension information. E CONTEXT: COPY tmp_revision, line 1, column extra_headers: "()" .tox/py3/lib/python3.7/site-packages/swh/core/db/__init__.py:175: InvalidTextRepresentation During handling of the above exception, another exception occurred: swh_storage_backend_config = {'cls': 'local', 'db': 'postgresql://postgres@127.0.0.1:28613/tests', 'journal_writer': {'brokers': ['127.0.0.1:35429'], 'client_id': 'kafka_writer-1', 'cls': 'kafka', 'prefix': 'ipmqvgwfcw-1'}, 'objstorage': {'args': {}, 'cls': 'memory'}} kafka_prefix = 'ipmqvgwfcw', kafka_consumer_group = 'test-consumer-ipmqvgwfcw' kafka_server = '127.0.0.1:35429' @patch("swh.storage.backfill.RANGE_GENERATORS", RANGE_GENERATORS) def test_backfiller( swh_storage_backend_config, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, ): prefix1 = f"{kafka_prefix}-1" prefix2 = f"{kafka_prefix}-2" journal1 = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer-1", "prefix": prefix1, } swh_storage_backend_config["journal_writer"] = journal1 storage = get_storage(**swh_storage_backend_config) # fill the storage and the journal (under prefix1) for object_type, objects in TEST_OBJECTS.items(): method = getattr(storage, object_type + "_add") > method(objects) .tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_backfill.py:201: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .tox/py3/lib/python3.7/site-packages/swh/storage/metrics.py:24: in d return f(*a, **kw) .tox/py3/lib/python3.7/site-packages/swh/storage/metrics.py:77: in d r = f(*a, **kw) .tox/py3/lib/python3.7/site-packages/swh/core/db/common.py:62: in _meth return meth(self, *args, db=db, cur=cur, **kwargs) .tox/py3/lib/python3.7/site-packages/swh/storage/storage.py:579: in revision_add lambda rev: parents_filtered.extend(rev["parents"]), /usr/lib/python3.7/contextlib.py:130: in __exit__ self.gen.throw(type, value, traceback) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ @contextlib.contextmanager def convert_validation_exceptions(): """Catches postgresql errors related to invalid arguments, and re-raises a StorageArgumentException.""" try: yield except tuple(VALIDATION_EXCEPTIONS) as e: > raise StorageArgumentException(str(e)) E swh.storage.exc.StorageArgumentException: malformed array literal: "()" E DETAIL: Array value must start with "{" or dimension information. E CONTEXT: COPY tmp_revision, line 1, column extra_headers: "()" .tox/py3/lib/python3.7/site-packages/swh/storage/storage.py:78: StorageArgumentException