Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/db.py
# Copyright (C) 2015-2017 The Software Heritage developers | # Copyright (C) 2015-2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import binascii | import binascii | ||||
import datetime | import datetime | ||||
import enum | import enum | ||||
import functools | import functools | ||||
import json | import json | ||||
import os | |||||
import select | import select | ||||
import tempfile | import threading | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.extras | import psycopg2.extras | ||||
TMP_CONTENT_TABLE = 'tmp_content' | TMP_CONTENT_TABLE = 'tmp_content' | ||||
▲ Show 20 Lines • Show All 148 Lines • ▼ Show 20 Lines | def copy_to(self, items, tblname, columns, cur=None, item_cb=None): | ||||
']' if data.upper_inc else ')', | ']' if data.upper_inc else ')', | ||||
) | ) | ||||
) | ) | ||||
elif isinstance(data, enum.IntEnum): | elif isinstance(data, enum.IntEnum): | ||||
return escape(int(data)) | return escape(int(data)) | ||||
else: | else: | ||||
# We don't escape here to make sure we pass literals properly | # We don't escape here to make sure we pass literals properly | ||||
return str(data) | return str(data) | ||||
with tempfile.TemporaryFile('w+') as f: | |||||
read_file, write_file = os.pipe() | |||||
def writer(): | |||||
cursor = self._cursor(cur) | |||||
with open(read_file, 'r') as f: | |||||
cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( | |||||
tblname, ', '.join(columns)), f) | |||||
write_thread = threading.Thread(target=writer) | |||||
write_thread.start() | |||||
with open(write_file, 'w') as f: | |||||
for d in items: | for d in items: | ||||
if item_cb is not None: | if item_cb is not None: | ||||
item_cb(d) | item_cb(d) | ||||
line = [escape(d.get(k)) for k in columns] | line = [escape(d.get(k)) for k in columns] | ||||
f.write(','.join(line)) | f.write(','.join(line)) | ||||
f.write('\n') | f.write('\n') | ||||
f.seek(0) | |||||
self._cursor(cur).copy_expert('COPY %s (%s) FROM STDIN CSV' % ( | write_thread.join() | ||||
tblname, ', '.join(columns)), f) | |||||
def mktemp(self, tblname, cur=None): | def mktemp(self, tblname, cur=None): | ||||
self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) | self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) | ||||
class Db(BaseDb): | class Db(BaseDb): | ||||
"""Proxy to the SWH DB, with wrappers around stored procedures | """Proxy to the SWH DB, with wrappers around stored procedures | ||||
▲ Show 20 Lines • Show All 792 Lines • Show Last 20 Lines |