diff --git a/PKG-INFO b/PKG-INFO index 21ca6d0..d445425 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.core -Version: 0.0.58 +Version: 0.0.59 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Source, https://forge.softwareheritage.org/source/swh-core Description: swh-core ======== core library for swh's modules: - config parser - hash computations - serialization - logging mechanism Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.core.egg-info/PKG-INFO b/swh.core.egg-info/PKG-INFO index 21ca6d0..d445425 100644 --- a/swh.core.egg-info/PKG-INFO +++ b/swh.core.egg-info/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.core -Version: 0.0.58 +Version: 0.0.59 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Source, https://forge.softwareheritage.org/source/swh-core Description: swh-core ======== core library for swh's modules: - config parser - hash computations - serialization - logging mechanism Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh/core/db/__init__.py b/swh/core/db/__init__.py index 57506e7..da0e402 100644 --- a/swh/core/db/__init__.py +++ b/swh/core/db/__init__.py @@ -1,192 +1,193 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime import enum import json import os import threading from contextlib import contextmanager import psycopg2 import psycopg2.extras psycopg2.extras.register_uuid() def escape(data): if data is None: return '' if isinstance(data, bytes): return '\\x%s' % binascii.hexlify(data).decode('ascii') elif isinstance(data, str): return '"%s"' % data.replace('"', '""') elif isinstance(data, datetime.datetime): # We escape twice to make sure the string generated by # isoformat gets escaped return escape(data.isoformat()) elif isinstance(data, dict): return escape(json.dumps(data)) elif isinstance(data, list): return escape("{%s}" % ','.join(escape(d) for d in data)) elif isinstance(data, psycopg2.extras.Range): # We escape twice here too, so that we make sure # everything gets passed to copy properly return escape( '%s%s,%s%s' % ( '[' if data.lower_inc else '(', '-infinity' if data.lower_inf else escape(data.lower), 'infinity' if data.upper_inf else escape(data.upper), ']' if data.upper_inc else ')', ) ) elif isinstance(data, enum.IntEnum): return escape(int(data)) else: # We don't escape here to make sure we pass literals properly return str(data) def typecast_bytea(value, cur): if value is not None: data = psycopg2.BINARY(value, cur) return data.tobytes() class BaseDb: """Base class for swh.*.*Db. cf. swh.storage.db.Db, swh.archiver.db.ArchiverDb """ @classmethod def adapt_conn(cls, conn): """Makes psycopg2 use 'bytes' to decode bytea instead of 'memoryview', for this connection.""" cur = conn.cursor() cur.execute("SELECT null::bytea, null::bytea[]") bytea_oid = cur.description[0][1] bytea_array_oid = cur.description[1][1] t_bytes = psycopg2.extensions.new_type( (bytea_oid,), "bytea", typecast_bytea) psycopg2.extensions.register_type(t_bytes, conn) t_bytes_array = psycopg2.extensions.new_array_type( (bytea_array_oid,), "bytea[]", t_bytes) psycopg2.extensions.register_type(t_bytes_array, conn) @classmethod def connect(cls, *args, **kwargs): """factory method to create a DB proxy Accepts all arguments of psycopg2.connect; only some specific possibilities are reported below. Args: connstring: libpq2 connection string """ conn = psycopg2.connect(*args, **kwargs) return cls(conn) @classmethod def from_pool(cls, pool): conn = pool.getconn() return cls(conn, pool=pool) def __init__(self, conn, pool=None): """create a DB proxy Args: conn: psycopg2 connection to the SWH DB pool: psycopg2 pool of connections """ self.adapt_conn(conn) self.conn = conn self.pool = pool - def __del__(self): + def put_conn(self): if self.pool: self.pool.putconn(self.conn) def cursor(self, cur_arg=None): """get a cursor: from cur_arg if given, or a fresh one otherwise meant to avoid boilerplate if/then/else in methods that proxy stored procedures """ if cur_arg is not None: return cur_arg else: return self.conn.cursor() _cursor = cursor # for bw compat @contextmanager def transaction(self): """context manager to execute within a DB transaction Yields: a psycopg2 cursor """ with self.conn.cursor() as cur: try: yield cur self.conn.commit() except Exception: if not self.conn.closed: self.conn.rollback() raise def copy_to(self, items, tblname, columns, cur=None, item_cb=None, default_values={}): """Copy items' entries to table tblname with columns information. Args: items (List[dict]): dictionaries of data to copy over tblname. tblname (str): destination table's name. columns ([str]): keys to access data in items and also the column names in the destination table. default_values (dict): dictionnary of default values to use when inserting entried int the tblname table. cur: a db cursor; if not given, a new cursor will be created. item_cb (fn): optional function to apply to items's entry. """ read_file, write_file = os.pipe() def writer(): cursor = self.cursor(cur) with open(read_file, 'r') as f: cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) write_thread = threading.Thread(target=writer) write_thread.start() try: with open(write_file, 'w') as f: for d in items: if item_cb is not None: item_cb(d) line = [escape(d.get(k, default_values.get(k))) for k in columns] f.write(','.join(line)) f.write('\n') + finally: # No problem bubbling up exceptions, but we still need to make sure # we finish copying, even though we're probably going to cancel the # transaction. write_thread.join() def mktemp(self, tblname, cur=None): self.cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) diff --git a/swh/core/db/common.py b/swh/core/db/common.py index a09ad63..c18e6ec 100644 --- a/swh/core/db/common.py +++ b/swh/core/db/common.py @@ -1,80 +1,87 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import inspect import functools def apply_options(cursor, options): """Applies the given postgresql client options to the given cursor. Returns a dictionary with the old values if they changed.""" old_options = {} for option, value in options.items(): cursor.execute('SHOW %s' % option) old_value = cursor.fetchall()[0][0] if old_value != value: cursor.execute('SET LOCAL %s TO %%s' % option, (value,)) old_options[option] = old_value return old_options def db_transaction(**client_options): """decorator to execute Backend methods within DB transactions The decorated method must accept a `cur` and `db` keyword argument Client options are passed as `set` options to the postgresql server """ def decorator(meth, __client_options=client_options): if inspect.isgeneratorfunction(meth): raise ValueError( 'Use db_transaction_generator for generator functions.') @functools.wraps(meth) def _meth(self, *args, **kwargs): if 'cur' in kwargs and kwargs['cur']: cur = kwargs['cur'] old_options = apply_options(cur, __client_options) ret = meth(self, *args, **kwargs) apply_options(cur, old_options) return ret else: db = self.get_db() - with db.transaction() as cur: - apply_options(cur, __client_options) - return meth(self, *args, db=db, cur=cur, **kwargs) + try: + with db.transaction() as cur: + apply_options(cur, __client_options) + return meth(self, *args, db=db, cur=cur, **kwargs) + finally: + self.put_db(db) return _meth return decorator def db_transaction_generator(**client_options): """decorator to execute Backend methods within DB transactions, while returning a generator The decorated method must accept a `cur` and `db` keyword argument Client options are passed as `set` options to the postgresql server """ def decorator(meth, __client_options=client_options): if not inspect.isgeneratorfunction(meth): raise ValueError( 'Use db_transaction for non-generator functions.') @functools.wraps(meth) def _meth(self, *args, **kwargs): if 'cur' in kwargs and kwargs['cur']: cur = kwargs['cur'] old_options = apply_options(cur, __client_options) yield from meth(self, *args, **kwargs) apply_options(cur, old_options) else: db = self.get_db() - with db.transaction() as cur: - apply_options(cur, __client_options) - yield from meth(self, *args, db=db, cur=cur, **kwargs) + try: + with db.transaction() as cur: + apply_options(cur, __client_options) + yield from meth(self, *args, db=db, cur=cur, **kwargs) + finally: + self.put_db(db) + return _meth return decorator diff --git a/version.txt b/version.txt index 0cf0028..ab0de05 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.58-0-gd54f0af \ No newline at end of file +v0.0.59-0-g1d9c0bf \ No newline at end of file