Changeset View
Changeset View
Standalone View
Standalone View
swh/core/db/__init__.py
Show First 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | def connect(cls, *args, **kwargs): | ||||
""" | """ | ||||
conn = psycopg2.connect(*args, **kwargs) | conn = psycopg2.connect(*args, **kwargs) | ||||
return cls(conn) | return cls(conn) | ||||
@classmethod | @classmethod | ||||
def from_pool(cls, pool): | def from_pool(cls, pool): | ||||
return cls(pool.getconn(), pool=pool) | return cls(pool.getconn(), pool=pool) | ||||
def _cursor(self, cur_arg): | |||||
"""get a cursor: from cur_arg if given, or a fresh one otherwise | |||||
meant to avoid boilerplate if/then/else in methods that proxy stored | |||||
procedures | |||||
""" | |||||
if cur_arg is not None: | |||||
return cur_arg | |||||
else: | |||||
return self.conn.cursor() | |||||
def __init__(self, conn, pool=None): | def __init__(self, conn, pool=None): | ||||
"""create a DB proxy | """create a DB proxy | ||||
Args: | Args: | ||||
conn: psycopg2 connection to the SWH DB | conn: psycopg2 connection to the SWH DB | ||||
pool: psycopg2 pool of connections | pool: psycopg2 pool of connections | ||||
""" | """ | ||||
self.conn = conn | self.conn = conn | ||||
self.pool = pool | self.pool = pool | ||||
def __del__(self): | def __del__(self): | ||||
if self.pool: | if self.pool: | ||||
self.pool.putconn(self.conn) | self.pool.putconn(self.conn) | ||||
def cursor(self, cur_arg): | |||||
"""get a cursor: from cur_arg if given, or a fresh one otherwise | |||||
meant to avoid boilerplate if/then/else in methods that proxy stored | |||||
procedures | |||||
""" | |||||
if cur_arg is not None: | |||||
return cur_arg | |||||
else: | |||||
return self.conn.cursor() | |||||
_cursor = cursor # for bw compat | |||||
@contextmanager | @contextmanager | ||||
def transaction(self): | def transaction(self): | ||||
"""context manager to execute within a DB transaction | """context manager to execute within a DB transaction | ||||
Yields: | Yields: | ||||
a psycopg2 cursor | a psycopg2 cursor | ||||
""" | """ | ||||
Show All 16 Lines | def copy_to(self, items, tblname, columns, cur=None, item_cb=None): | ||||
column names in the destination table. | column names in the destination table. | ||||
item_cb (fn): optional function to apply to items's entry | item_cb (fn): optional function to apply to items's entry | ||||
""" | """ | ||||
read_file, write_file = os.pipe() | read_file, write_file = os.pipe() | ||||
def writer(): | def writer(): | ||||
cursor = self._cursor(cur) | cursor = self.cursor(cur) | ||||
with open(read_file, 'r') as f: | with open(read_file, 'r') as f: | ||||
cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( | cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( | ||||
tblname, ', '.join(columns)), f) | tblname, ', '.join(columns)), f) | ||||
write_thread = threading.Thread(target=writer) | write_thread = threading.Thread(target=writer) | ||||
write_thread.start() | write_thread.start() | ||||
try: | try: | ||||
with open(write_file, 'w') as f: | with open(write_file, 'w') as f: | ||||
for d in items: | for d in items: | ||||
if item_cb is not None: | if item_cb is not None: | ||||
item_cb(d) | item_cb(d) | ||||
line = [escape(d.get(k)) for k in columns] | line = [escape(d.get(k)) for k in columns] | ||||
f.write(','.join(line)) | f.write(','.join(line)) | ||||
f.write('\n') | f.write('\n') | ||||
finally: | finally: | ||||
# No problem bubbling up exceptions, but we still need to make sure | # No problem bubbling up exceptions, but we still need to make sure | ||||
# we finish copying, even though we're probably going to cancel the | # we finish copying, even though we're probably going to cancel the | ||||
# transaction. | # transaction. | ||||
write_thread.join() | write_thread.join() | ||||
def mktemp(self, tblname, cur=None): | def mktemp(self, tblname, cur=None): | ||||
self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) | self.cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) |