diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -93,6 +93,10 @@ conn = psycopg2.connect(*args, **kwargs) return cls(conn) + @classmethod + def from_pool(cls, pool): + return cls(pool.getconn(), pool=pool) + def _cursor(self, cur_arg): """get a cursor: from cur_arg if given, or a fresh one otherwise @@ -107,14 +111,20 @@ else: return self.conn.cursor() - def __init__(self, conn): + def __init__(self, conn, pool=None): """create a DB proxy Args: conn: psycopg2 connection to the SWH DB + pool: psycopg2 pool of connections """ self.conn = conn + self.pool = pool + + def __del__(self): + if self.pool: + self.pool.putconn(self.conn) @contextmanager def transaction(self): diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -10,6 +10,7 @@ import json import dateutil.parser import psycopg2 +import psycopg2.pool from . import converters from .common import db_transaction_generator, db_transaction @@ -33,7 +34,7 @@ """ - def __init__(self, db, objstorage): + def __init__(self, db, objstorage, min_pool_conns=1, max_pool_conns=10): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection @@ -42,16 +43,23 @@ """ try: if isinstance(db, psycopg2.extensions.connection): + self._pool = None self._db = Db(db) else: - self._db = Db.connect(db) + self._pool = psycopg2.pool.ThreadedConnectionPool( + min_pool_conns, max_pool_conns, db + ) + self._db = None except psycopg2.OperationalError as e: raise StorageDBError(e) self.objstorage = get_objstorage(**objstorage) def get_db(self): - return self._db + if self._db: + return self._db + else: + return Db.from_pool(self._pool) def check_config(self, *, check_write): """Check that the storage is configured and ready to go."""