Source code for gDBPool.connection_pool
# -*- coding: utf-8 -*-
# Copyright 2011-2012 Florian von Bock (f at vonbock dot info)
#
# gDBPool - db connection pooling for gevent
__author__ = "Florian von Bock"
__email__ = "f at vonbock dot info"
__version__ = "0.1.2"
import gevent
from gevent import monkey; monkey.patch_all()
import psycopg2
import sys, traceback
from psyco_ge import make_psycopg_green; make_psycopg_green()
assert 'gDBPool.psyco_ge' in sys.modules.keys()
from gevent.queue import Queue, Empty as QueueEmptyException
# from gevent.pool import Pool as GreenPool
# from gevent.event import AsyncResult
from time import time
# from types import FunctionType, MethodType, StringType
from gDBPoolError import DBInteractionException, DBPoolConnectionException, PoolConnectionException, StreamEndException
# from PGChannelListener import PGChannelListener
# from psycopg2.extras import RealDictCursor
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_UNCOMMITTED, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ, ISOLATION_LEVEL_SERIALIZABLE
psycopg2.extensions.register_type( psycopg2.extensions.UNICODE )
psycopg2.extensions.register_type( psycopg2.extensions.UNICODEARRAY )
from psycopg2 import InterfaceError
from pool_connection import PoolConnection
[docs]class DBConnectionPool( object ):
"""
The Connection Pool
"Classic" pool of connections with connection lifecycle management
"""
def __init__( self, dsn, db_module = 'psycopg2', pool_size = 10,
conn_lifetime = 600, do_log = False ):
"""
:param string dsn: DSN for the default `class:DBConnectionPool`
:param string db_module: name of the DB-API module to use
:param int pool_size: Poolsize of the first/default `class:DBConnectionPool`
:param int conn_lifetime: Number of seconds after which a connection will be recycled when :meth:`.put` back
:param bool do_log: Log to the console or not
"""
if do_log:
import logging
logging.basicConfig( level = logging.INFO, format = "%(asctime)s %(message)s" )
self.logger = logging.getLogger()
self.do_log = do_log
self.dsn = dsn
self.db_module = db_module
self.pool_size = pool_size
self.CONN_RECYCLE_AFTER = conn_lifetime if conn_lifetime is not None else 0
self.pool = Queue( self.pool_size )
__import__( db_module )
self.connection_jobs = map( lambda x: gevent.spawn( self.create_connection ), xrange( self.pool_size ) )
try:
gevent.joinall( self.connection_jobs, timeout = 10 )
assert self.pool_size == self.pool.qsize()
if self.do_log:
self.logger.info( "$ poolsize: %i" % self.pool.qsize() )
self.ready = True
# except PoolConnectionException, e:
# raise e
except AssertionError, e:
raise DBPoolConnectionException( "Could not get %s connections for the pool as requested. %s" % ( self.pool_size, e.message ) )
except Exception, e:
raise e
def __del__( self ):
while not self.pool.empty():
conn = self.pool.get()
conn.close()
[docs] def create_connection( self ):
"""
Try to open a new connection to the database and put it on the pool
"""
try:
self.pool.put( PoolConnection( self.db_module, self.dsn ) )
except PoolConnectionException, e:
raise e
[docs] def get( self, timeout = None, iso_level = ISOLATION_LEVEL_READ_COMMITTED ):
"""
Get a connection from the pool
:param int timeout: seconds to wait for a connection or None
:param iso_level: transaction isolation level to be set on the connection. Must be one of psycopg2.extensions ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_UNCOMMITTED, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ, ISOLATION_LEVEL_SERIALIZABLE
:returns: -- a :class:`PoolConnection`
"""
try:
conn = self.pool.get( timeout = timeout )
if iso_level != ISOLATION_LEVEL_READ_COMMITTED:
conn.set_isolation_level( iso_level )
# if auto_commit and self.db_module == 'psycopg2':
# conn.set_isolation_level( ISOLATION_LEVEL_AUTOCOMMIT )
return conn
except gevent.queue.Empty, e:
raise PoolConnectionException( e )
[docs] def put( self, conn, timeout = 1, force_recycle = False ):
"""
Put a connection back onto the pool
:param conn: The :class:`PoolConnection` object to be put back onto the pool
:param int timeout: timeout in seconds to to put the connection onto the pool
:param bool force_recycle: Force connection recycling independent from the pool wide connection lifecycle
"""
if isinstance( conn, PoolConnection ):
if ( self.CONN_RECYCLE_AFTER != 0 and time() - conn.PoolConnection_initialized_at < self.CONN_RECYCLE_AFTER ) and force_recycle == False:
try:
conn.reset()
if conn.isolation_level != ISOLATION_LEVEL_READ_COMMITTED:
if self.do_log:
self.logger.info( "set ISOLATION_LEVEL_READ_COMMITTED." )
conn.set_isolation_level( ISOLATION_LEVEL_READ_COMMITTED )
conn.commit()
self.pool.put( conn, timeout = timeout )
except gevent.queue.Full, e:
raise PoolConnectionException( e )
else:
if self.do_log:
self.logger.info( "recycling conn." )
try:
conn.reset() # ?
conn.close()
except InterfaceError:
pass
del conn
gevent.spawn( self.create_connection ).join()
else:
raise PoolConnectionException( "Passed object %s is not a PoolConnection." % ( conn, ) )
@property
def qsize( self ):
return self.pool.qsize()