Changeset View
Changeset View
Standalone View
Standalone View
swh/core/db/__init__.py
Show First 20 Lines • Show All 116 Lines • ▼ Show 20 Lines | def transaction(self): | ||||
try: | try: | ||||
yield cur | yield cur | ||||
self.conn.commit() | self.conn.commit() | ||||
except Exception: | except Exception: | ||||
if not self.conn.closed: | if not self.conn.closed: | ||||
self.conn.rollback() | self.conn.rollback() | ||||
raise | raise | ||||
def copy_to(self, items, tblname, columns, cur=None, item_cb=None): | def copy_to(self, items, tblname, columns, default_values={}, | ||||
cur=None, item_cb=None): | |||||
"""Copy items' entries to table tblname with columns information. | """Copy items' entries to table tblname with columns information. | ||||
Args: | Args: | ||||
items (dict): dictionary of data to copy over tblname | items (dict): dictionary of data to copy over tblname. | ||||
tblname (str): Destination table's name | tblname (str): destination table's name. | ||||
columns ([str]): keys to access data in items and also the | columns ([str]): keys to access data in items and also the | ||||
column names in the destination table. | column names in the destination table. | ||||
item_cb (fn): optional function to apply to items's entry | default_values (dict): dictionnary of default values to use when | ||||
inserting entried int the tblname table. | |||||
cur: a db cursor; if not given, a new cursor will be created. | |||||
item_cb (fn): optional function to apply to items's entry. | |||||
""" | """ | ||||
read_file, write_file = os.pipe() | read_file, write_file = os.pipe() | ||||
def writer(): | def writer(): | ||||
cursor = self.cursor(cur) | cursor = self.cursor(cur) | ||||
with open(read_file, 'r') as f: | with open(read_file, 'r') as f: | ||||
cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( | cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( | ||||
tblname, ', '.join(columns)), f) | tblname, ', '.join(columns)), f) | ||||
write_thread = threading.Thread(target=writer) | write_thread = threading.Thread(target=writer) | ||||
write_thread.start() | write_thread.start() | ||||
try: | try: | ||||
with open(write_file, 'w') as f: | with open(write_file, 'w') as f: | ||||
for d in items: | for d in items: | ||||
if item_cb is not None: | if item_cb is not None: | ||||
item_cb(d) | item_cb(d) | ||||
line = [escape(d.get(k)) for k in columns] | line = [escape(d.get(k) or default_values.get(k)) | ||||
for k in columns] | |||||
f.write(','.join(line)) | f.write(','.join(line)) | ||||
f.write('\n') | f.write('\n') | ||||
finally: | finally: | ||||
# No problem bubbling up exceptions, but we still need to make sure | # No problem bubbling up exceptions, but we still need to make sure | ||||
# we finish copying, even though we're probably going to cancel the | # we finish copying, even though we're probably going to cancel the | ||||
# transaction. | # transaction. | ||||
write_thread.join() | write_thread.join() | ||||
def mktemp(self, tblname, cur=None): | def mktemp(self, tblname, cur=None): | ||||
self.cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) | self.cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) |