@contextlib.contextmanager
def convert_validation_exceptions():
"""Catches postgresql errors related to invalid arguments, and
re-raises a StorageArgumentException."""
try:
> yield
.tox/py3/lib/python3.7/site-packages/swh/storage/storage.py:76:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.storage.Storage object at 0x7f7c417d79e8>
revisions = [Revision(message=b'hello', author=Person(fullname=b'foo', name=b'foo', email=b''), committer=Person(fullname=b'foo', ...se, metadata=None, parents=(), id=b'gpc\xf5\xc4\x05\xd6\xfc\x17\x81\xfcV7\x9c\x9a\x9a\xdfC\xd3\xa0', extra_headers=())]
db = <swh.storage.db.Db object at 0x7f7c417d7ac8>
cur = <cursor object at 0x7f7c425dcce0; closed: -1>
@timed
@process_metrics
@db_transaction()
def revision_add(self, revisions: Iterable[Revision], db=None, cur=None) -> Dict:
revisions = list(revisions)
summary = {"revision:add": 0}
revisions_missing = set(
self.revision_missing(
set(revision.id for revision in revisions), db=db, cur=cur
)
)
if not revisions_missing:
return summary
db.mktemp_revision(cur)
revisions_filtered = [
revision for revision in revisions if revision.id in revisions_missing
]
self.journal_writer.revision_add(revisions_filtered)
revisions_filtered = list(map(converters.revision_to_db, revisions_filtered))
parents_filtered: List[bytes] = []
with convert_validation_exceptions():
db.copy_to(
revisions_filtered,
"tmp_revision",
db.revision_add_cols,
cur,
> lambda rev: parents_filtered.extend(rev["parents"]),
)
.tox/py3/lib/python3.7/site-packages/swh/storage/storage.py:579:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.db.Db object at 0x7f7c417d7ac8>
items = [{'author_email': b'', 'author_fullname': b'foo', 'author_name': b'foo', 'committer_date': '2009-02-13T23:31:31+00:00'...hor_email': b'', 'author_fullname': b'bar', 'author_name': b'bar', 'committer_date': '2009-02-13T23:31:32+00:00', ...}]
tblname = 'tmp_revision'
columns = ['id', 'date', 'date_offset', 'date_neg_utc_offset', 'committer_date', 'committer_date_offset', ...]
cur = <cursor object at 0x7f7c425dcce0; closed: -1>
item_cb = <function Storage.revision_add.<locals>.<lambda> at 0x7f7c42459950>
default_values = {}
def copy_to(
self, items, tblname, columns, cur=None, item_cb=None, default_values={}
):
"""Copy items' entries to table tblname with columns information.
Args:
items (List[dict]): dictionaries of data to copy over tblname.
tblname (str): destination table's name.
columns ([str]): keys to access data in items and also the
column names in the destination table.
default_values (dict): dictionary of default values to use when
inserting entried int the tblname table.
cur: a db cursor; if not given, a new cursor will be created.
item_cb (fn): optional function to apply to items's entry.
"""
read_file, write_file = os.pipe()
exc_info = None
def writer():
nonlocal exc_info
cursor = self.cursor(cur)
with open(read_file, "r") as f:
try:
cursor.copy_expert(
"COPY %s (%s) FROM STDIN CSV" % (tblname, ", ".join(columns)), f
)
except Exception:
# Tell the main thread about the exception
exc_info = sys.exc_info()
write_thread = threading.Thread(target=writer)
write_thread.start()
try:
with open(write_file, "w") as f:
for d in items:
if item_cb is not None:
item_cb(d)
line = []
for k in columns:
value = d.get(k, default_values.get(k))
try:
line.append(escape(value))
except Exception as e:
logger.error(
"Could not escape value `%r` for column `%s`:"
"Received exception: `%s`",
value,
k,
e,
)
raise e from None
f.write(",".join(line))
f.write("\n")
finally:
# No problem bubbling up exceptions, but we still need to make sure
# we finish copying, even though we're probably going to cancel the
# transaction.
write_thread.join()
if exc_info:
# postgresql returned an error, let's raise it.
> raise exc_info[1].with_traceback(exc_info[2])
.tox/py3/lib/python3.7/site-packages/swh/core/db/__init__.py:213:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def writer():
nonlocal exc_info
cursor = self.cursor(cur)
with open(read_file, "r") as f:
try:
cursor.copy_expert(
> "COPY %s (%s) FROM STDIN CSV" % (tblname, ", ".join(columns)), f
)
E psycopg2.errors.InvalidTextRepresentation: malformed array literal: "()"
E DETAIL: Array value must start with "{" or dimension information.
E CONTEXT: COPY tmp_revision, line 1, column extra_headers: "()"
.tox/py3/lib/python3.7/site-packages/swh/core/db/__init__.py:175: InvalidTextRepresentation
During handling of the above exception, another exception occurred:
swh_storage_backend_config = {'cls': 'local', 'db': 'postgresql://postgres@127.0.0.1:28613/tests', 'journal_writer': {'brokers': ['127.0.0.1:35429'], 'client_id': 'kafka_writer-1', 'cls': 'kafka', 'prefix': 'ipmqvgwfcw-1'}, 'objstorage': {'args': {}, 'cls': 'memory'}}
kafka_prefix = 'ipmqvgwfcw', kafka_consumer_group = 'test-consumer-ipmqvgwfcw'
kafka_server = '127.0.0.1:35429'
@patch("swh.storage.backfill.RANGE_GENERATORS", RANGE_GENERATORS)
def test_backfiller(
swh_storage_backend_config,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: str,
):
prefix1 = f"{kafka_prefix}-1"
prefix2 = f"{kafka_prefix}-2"
journal1 = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer-1",
"prefix": prefix1,
}
swh_storage_backend_config["journal_writer"] = journal1
storage = get_storage(**swh_storage_backend_config)
# fill the storage and the journal (under prefix1)
for object_type, objects in TEST_OBJECTS.items():
method = getattr(storage, object_type + "_add")
> method(objects)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_backfill.py:201:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/storage/metrics.py:24: in d
return f(*a, **kw)
.tox/py3/lib/python3.7/site-packages/swh/storage/metrics.py:77: in d
r = f(*a, **kw)
.tox/py3/lib/python3.7/site-packages/swh/core/db/common.py:62: in _meth
return meth(self, *args, db=db, cur=cur, **kwargs)
.tox/py3/lib/python3.7/site-packages/swh/storage/storage.py:579: in revision_add
lambda rev: parents_filtered.extend(rev["parents"]),
/usr/lib/python3.7/contextlib.py:130: in __exit__
self.gen.throw(type, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
@contextlib.contextmanager
def convert_validation_exceptions():
"""Catches postgresql errors related to invalid arguments, and
re-raises a StorageArgumentException."""
try:
yield
except tuple(VALIDATION_EXCEPTIONS) as e:
> raise StorageArgumentException(str(e))
E swh.storage.exc.StorageArgumentException: malformed array literal: "()"
E DETAIL: Array value must start with "{" or dimension information.
E CONTEXT: COPY tmp_revision, line 1, column extra_headers: "()"
.tox/py3/lib/python3.7/site-packages/swh/storage/storage.py:78: StorageArgumentException
TEST RESULT
TEST RESULT
- Run At
- Jul 6 2020, 2:26 PM