Source code for gdbpool.connection_pool
# -*- coding: utf-8 -*-
# Copyright 2011-2012 Florian von Bock (f at vonbock dot info)
#
# gDBPool - db connection pooling for gevent
__author__ = "Florian von Bock"
__email__ = "f at vonbock dot info"
__version__ = "0.1.2"
import gevent
from gevent import monkey; monkey.patch_all()
import psycopg2
import sys, traceback
from psyco_ge import make_psycopg_green; make_psycopg_green()
assert 'gdbpool.psyco_ge' in sys.modules.keys()
from gevent.queue import Queue, Empty as QueueEmptyException
from time import time
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_UNCOMMITTED, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ, ISOLATION_LEVEL_SERIALIZABLE
psycopg2.extensions.register_type( psycopg2.extensions.UNICODE )
psycopg2.extensions.register_type( psycopg2.extensions.UNICODEARRAY )
from psycopg2 import InterfaceError
from pool_connection import PoolConnection
from gdbpool_error import DBInteractionException, DBPoolConnectionException, PoolConnectionException, StreamEndException
[docs]class DBConnectionPool( object ):
    """
    The Connection Pool
    "Classic" pool of connections with connection lifecycle management
    """
    def __init__( self, dsn, db_module = 'psycopg2', pool_size = 10,
                  conn_lifetime = 600, do_log = False ):
        """
        :param string dsn: DSN for the default `class:DBConnectionPool`
        :param string db_module: name of the DB-API module to use
        :param int pool_size: Poolsize of the first/default `class:DBConnectionPool`
        :param int conn_lifetime: Number of seconds after which a connection will be recycled when :meth:`.put` back
        :param bool do_log: Log to the console or not
        """
        if do_log:
            import logging
            logging.basicConfig( level = logging.INFO, format = "%(asctime)s %(message)s" )
            self.logger = logging.getLogger()
        self.do_log = do_log
        self.dsn = dsn
        self.db_module = db_module
        self.pool_size = pool_size
        self.CONN_RECYCLE_AFTER = conn_lifetime if conn_lifetime is not None else 0
        self.pool = Queue( self.pool_size )
        __import__( db_module )
        self.connection_jobs = map( lambda x: gevent.spawn( self.create_connection ), xrange( self.pool_size ) )
        try:
            gevent.joinall( self.connection_jobs, timeout = 10 )
            assert self.pool_size == self.pool.qsize()
            if self.do_log:
                self.logger.info( "$ poolsize: %i" % self.pool.qsize() )
            self.ready = True
        except AssertionError, e:
            raise DBPoolConnectionException( "Could not get %s connections for the pool as requested. %s" % ( self.pool_size, e.message ) )
        except Exception, e:
            raise e
    def __del__( self ):
        while not self.pool.empty():
            conn = self.pool.get()
            conn.close()
[docs]    def create_connection( self ):
        """
        Try to open a new connection to the database and put it on the pool
        """
        try:
            self.pool.put( PoolConnection( self.db_module, self.dsn ) )
        except PoolConnectionException, e:
            raise e
 
[docs]    def get( self, timeout = None, iso_level = ISOLATION_LEVEL_READ_COMMITTED ):
        """
        Get a connection from the pool
        :param int timeout: seconds to wait for a connection or None
        :param iso_level: transaction isolation level to be set on the connection. Must be one of psycopg2.extensions ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_UNCOMMITTED, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ, ISOLATION_LEVEL_SERIALIZABLE
        :returns: -- a :class:`PoolConnection`
        """
        try:
            conn = self.pool.get( timeout = timeout )
            if iso_level != ISOLATION_LEVEL_READ_COMMITTED:
                conn.set_isolation_level( iso_level )
            return conn
        except gevent.queue.Empty, e:
            raise PoolConnectionException( e )
 
[docs]    def put( self, conn, timeout = 1, force_recycle = False ):
        """
        Put a connection back onto the pool
        :param conn: The :class:`PoolConnection` object to be put back onto the pool
        :param int timeout: timeout in seconds to to put the connection onto the pool
        :param bool force_recycle: Force connection recycling independent from the pool wide connection lifecycle
        """
        if isinstance( conn, PoolConnection ):
            if ( self.CONN_RECYCLE_AFTER != 0 and time() - conn.PoolConnection_initialized_at < self.CONN_RECYCLE_AFTER ) and force_recycle == False:
                try:
                    conn.reset()
                    if conn.isolation_level != ISOLATION_LEVEL_READ_COMMITTED:
                        if self.do_log:
                            self.logger.info( "set ISOLATION_LEVEL_READ_COMMITTED." )
                        conn.set_isolation_level( ISOLATION_LEVEL_READ_COMMITTED )
                        conn.commit()
                    self.pool.put( conn, timeout = timeout )
                except gevent.queue.Full, e:
                    raise PoolConnectionException( e )
            else:
                if self.do_log:
                    self.logger.info( "recycling conn." )
                try:
                    conn.reset() # ?
                    conn.close()
                except InterfaceError:
                    pass
                del conn
                gevent.spawn( self.create_connection ).join()
        else:
            raise PoolConnectionException( "Passed object %s is not a PoolConnection." % ( conn, ) )
 
    @property
    def qsize( self ):
        return self.pool.qsize()