# -*- coding: utf-8 -*-
# Copyright 2011-2012 Florian von Bock (f at vonbock dot info)
#
# gDBPool - db connection pooling for gevent
__author__ = "Florian von Bock"
__email__ = "f at vonbock dot info"
__version__ = "0.1.2"
import gevent
from gevent import monkey; monkey.patch_all()
import psycopg2
import sys, traceback
from psyco_ge import make_psycopg_green; make_psycopg_green()
assert 'gDBPool.psyco_ge' in sys.modules.keys()
from gevent.queue import Queue, Empty as QueueEmptyException
from gevent.pool import Pool as GreenPool
from gevent.event import AsyncResult
from time import time
from types import FunctionType, MethodType, StringType
from gDBPoolError import DBInteractionException, DBPoolConnectionException, PoolConnectionException, StreamEndException
from PGChannelListener import PGChannelListener
from psycopg2.extras import RealDictCursor
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_UNCOMMITTED, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ, ISOLATION_LEVEL_SERIALIZABLE
psycopg2.extensions.register_type( psycopg2.extensions.UNICODE )
psycopg2.extensions.register_type( psycopg2.extensions.UNICODEARRAY )
from psycopg2 import DatabaseError, InterfaceError
from inspect import getargspec
# TODO: make this an option...
# DEC2FLOAT = psycopg2.extensions.new_type(
# psycopg2.extensions.DECIMAL.values,
# 'DEC2FLOAT',
# lambda value, curs: float( value ) if value is not None else None )
# psycopg2.extensions.register_type( DEC2FLOAT )
[docs]class DBInteractionPool( object ):
"""
The DBInteractionPool manages `DBConnectionPool` instances and can run
queries or functions (ie. several queries wrapped in a function) on one of
these pools.
"""
def __new__( cls, dsn, *args, **kwargs ):
if not hasattr( cls, '_instance' ):
cls._instance = object.__new__( cls )
return cls._instance
def __init__( self, dsn, pool_size = 10, pool_name = 'default',
do_log = False ):
"""
:param string dsn: DSN for the default `class:DBConnectionPool`
:param int pool_size: Poolsize of the first/default `class:DBConnectionPool`
:param string pool_name: Keyname for the first/default `class:DBConnectionPool`
:param bool do_log: Log to the console or not
"""
if do_log == True:
import logging
logging.basicConfig( level = logging.INFO, format = "%(asctime)s %(message)s" )
self.logger = logging.getLogger()
self.do_log = do_log
self.request_queue = Queue( maxsize = None )
self.db_module = 'psycopg2'
self.conn_pools = {}
self.default_write_pool = None
self.default_read_pool = None
self.default_pool = None
self.add_pool( dsn = dsn, pool_name = 'default', pool_size = pool_size,
default_write_pool = True, default_read_pool = True,
db_module = self.db_module )
def __del__( self ):
if self.do_log:
self.logger.info( "__del__ DBInteractionPool" )
for p in self.conn_pools:
self.conn_pools[ p ].__del__()
def __call__( self, *args, **kwargs ):
""" syntactic sugar for `:ref:DBInteractionPool.run` """
return self.run( *args, **kwargs )
[docs] def add_pool( self, dsn = None, pool_name = None, pool_size = 10,
default_write_pool = False, default_read_pool = False,
default_pool = False, db_module = 'psycopg2' ):
"""
Add a named `:class:DBConnectionPool`
:param string dsn: dsn
:param string pool_name: a name for the pool to identify it inside the DBInteractionPool
:param int pool_size: Number of connections the pool should have.
:param bool default_write_pool: Should the added pool used as the default pool for write operations?
:param bool default_read_pool: Should the added pool used as the default pool for read operations?
:param bool default_pool: Should the added pool used as the default pool? (*must* be a write pool)
:param string db_module: name of the DB^API module to use
.. note::
db_module right now ONLY supports psycopg2 and the option most likely will be removed in the future
"""
if not self.conn_pools.has_key( pool_name ):
self.conn_pools[ pool_name ] = DBConnectionPool( dsn, db_module = self.db_module,
pool_size = pool_size, do_log = self.do_log )
if default_write_pool:
self.default_write_pool = pool_name
if self.default_pool or self.default_pool is None:
self.default_pool = pool_name
if default_read_pool:
self.default_read_pool = pool_name
else:
raise DBInteractionException( "Already have a pool with the name: %s. ConnectionPool not added!" % ( pool_name, ) )
@property
def pool( self ):
return self.conn_pools[ self.default_pool ]
[docs] def run( self, interaction = None, interaction_args = None,
get_result = True, is_write = True, pool = None, conn = None,
cursor = None, partial_txn = False, dry_run = False, *args,
**kwargs ):
"""
Run an interaction on one of the managed `:class:DBConnectionPool` pools.
:param function|method interaction: The interaction to run. Either a SQL string or a function that takes at least a parameter `conn`.
:param string interaction_args: None,
:param bool get_result: call and return cursor.fetchall() when True - otherwise just return True as result if no exception was raised.
:param bool is_write: If the interaction has no side-effects set to `False`. Without naming a pool the default_read pool would be used.
:param string pool: Keyname of the pool to get the a connection from
:param connection conn: Pass in a `Connection` instead of getting one from the pool. (ie. for locks in transactions that span several interactions. Use `partial_txn = True` to retrieve the Connection and then pass it into the next interaction run.)
:param cursor cursor: Pass in a `Cursor` instead of getting one from the `Connection` (ie. for locks in transactions that span several interactions. Use `partial_txn = True` to retrieve the Cursor and then pass it into the next interaction run.)
:param bool partial_txn: Return connection and cursor after executing the interaction (ie. for locks in transactions that span several interactions)
:param bool dry_run: Run the query with `mogrify` instead of `execute` and output the query that would have run. (Only applies to query interactions)
:param list args: args
:param dict kwargs: kwargs
"""
async_result = AsyncResult()
if is_write:
use_pool = self.default_write_pool if pool is None else pool
else:
use_pool = self.default_read_pool if pool is None else pool
if not conn:
conn = self.conn_pools[ use_pool ].get()
if isinstance( interaction, FunctionType ) or isinstance( interaction, MethodType ):
def wrapped_transaction_f( async_res, interaction, conn = None,
cursor = None, *args ):
try:
if not conn:
conn = self.conn_pools[ use_pool ].get()
kwargs[ 'conn' ] = conn
if cursor:
kwargs[ 'cursor' ] = cursor
elif 'cursor' in getargspec( interaction )[ 0 ]:
kwargs[ 'cursor' ] = kwargs[ 'conn' ].cursor()
res = interaction( *args, **kwargs )
if not partial_txn:
async_res.set( res )
if cursor and not cursor.closed:
cursor.close()
else:
async_res.set( { 'result': res,
'connection': conn,
'cursor': kwargs[ 'cursor' ] } )
except DatabaseError, e:
if self.do_log:
self.logger.info( "exception: %s", ( e, ) )
async_result.set_exception( DBInteractionException( e ) )
except Exception, e:
if self.do_log:
self.logger.info( "exception: %s", ( e, ) )
async_result.set_exception( DBInteractionException( e ) )
finally:
if conn and not partial_txn:
self.conn_pools[ use_pool ].put( conn )
gevent.spawn( wrapped_transaction_f, async_result, interaction,
conn = conn, cursor = cursor, *args )
return async_result
elif isinstance( interaction, StringType ):
def transaction_f( async_res, sql, conn = None, cursor = None,
*args ):
try:
if not conn:
conn = self.conn_pools[ use_pool ].get()
if not cursor:
cursor = conn.cursor()
if interaction_args is not None:
cursor.execute( sql, interaction_args )
else:
cursor.execute( sql )
if get_result:
res = cursor.fetchall()
else:
res = True
if is_write and not partial_txn:
conn.commit()
if not partial_txn:
cursor.close()
async_res.set( res )
else:
async_res.set( { 'result': res,
'connection': conn,
'cursor': cursor} )
except DatabaseError, e:
if self.do_log:
self.logger.info( "exception: %s", ( e, ) )
async_result.set_exception( DBInteractionException( e ) )
except Exception, e:
traceback.print_exc( file = sys.stdout )
# if is_write and partial_txn: # ??
conn.rollback()
if self.do_log:
self.logger.info( "exception: %s", ( e, ) )
async_result.set_exception( DBInteractionException( e ) )
finally:
if conn and not partial_txn:
self.conn_pools[ use_pool ].put( conn )
gevent.spawn( transaction_f, async_result, interaction,
conn = conn, cursor = cursor, *args )
return async_result
else:
raise DBInteractionException( "%s cannot be run. run() only accepts FunctionTypes, MethodType, and StringTypes" % interacetion )
[docs] def listen_on( self, result_queue = None, channel_name = None, pool = None,
timeout = None, cancel_event = None ):
"""
Listen for asynchronous events on a named Channel and pass them to Queue
"""
if self.db_module != 'psycopg2':
raise DBInteractionException( "This feature requires PostgreSQL 9.x." )
use_pool = self.default_write_pool if pool is None else pool
try:
q = Queue( maxsize = None )
listener = PGChannelListener( q, self.conn_pools[ use_pool ], channel_name )
while 1:
if cancel_event.is_set():
break
try:
result_queue.put( q.get_nowait() )
except QueueEmptyException:
gevent.sleep( 0.001 )
except Exception, e:
print "# FRAKK", e
if self.do_log:
self.logger.info( e )
listener.unregister_queue( id( q ) )
if self.do_log:
self.logger.info( "stopped listening on: %s", ( channel_name, ) )
[docs]class DBConnectionPool( object ):
"""
The Connection Pool
The pool takes a DSN, the name of the database module (default: psycopg2),
the pool size and the minimal connection life time as initialization arguments.
The DNS is the only mandatory argument.
If you want logging pass `do_log = True` to the constructor.
The pool provides 2 main functions: get() and put()
"""
def __init__( self, dsn, db_module = 'psycopg2', pool_size = 10,
conn_lifetime = 600, do_log = False ):
if do_log:
import logging
logging.basicConfig( level = logging.INFO, format = "%(asctime)s %(message)s" )
self.logger = logging.getLogger()
self.do_log = do_log
self.dsn = dsn
self.db_module = db_module
self.pool_size = pool_size
self.CONN_RECYCLE_AFTER = conn_lifetime if conn_lifetime is not None else 0
self.pool = Queue( self.pool_size )
__import__( db_module )
self.connection_jobs = map( lambda x: gevent.spawn( self.create_connection ), xrange( self.pool_size ) )
try:
gevent.joinall( self.connection_jobs, timeout = 10 )
assert self.pool_size == self.pool.qsize()
if self.do_log:
self.logger.info( "$ poolsize: %i" % self.pool.qsize() )
self.ready = True
except Exception, e:
raise PoolConnectionException( e )
def __del__( self ):
while not self.pool.empty():
conn = self.pool.get()
conn.close()
[docs] def create_connection( self ):
try:
self.pool.put( PoolConnection( self.db_module, self.dsn ) )
except PoolConnectionException, e:
raise e
[docs] def get( self, timeout = None, iso_level = ISOLATION_LEVEL_READ_COMMITTED,
auto_commit = False ):
try:
conn = self.pool.get( timeout = timeout )
if iso_level != ISOLATION_LEVEL_READ_COMMITTED:
conn.set_isolation_level( iso_level )
# if auto_commit and self.db_module == 'psycopg2':
# conn.set_isolation_level( ISOLATION_LEVEL_AUTOCOMMIT )
return conn
except gevent.queue.Empty, e:
raise PoolConnectionException( e )
[docs] def put( self, conn, timeout = 1, force_recycle = False ):
if isinstance( conn, PoolConnection ):
if ( self.CONN_RECYCLE_AFTER != 0 and time() - conn.PoolConnection_initialized_at < self.CONN_RECYCLE_AFTER ) and force_recycle == False:
try:
conn.reset()
if conn.isolation_level != ISOLATION_LEVEL_READ_COMMITTED:
if self.do_log:
self.logger.info( "set ISOLATION_LEVEL_READ_COMMITTED." )
conn.set_isolation_level( ISOLATION_LEVEL_READ_COMMITTED )
conn.commit()
self.pool.put( conn, timeout = timeout )
except gevent.queue.Full, e:
raise PoolConnectionException( e )
else:
if self.do_log:
self.logger.info( "recycling conn." )
try:
conn.reset() # ?
conn.close()
except InterfaceError:
pass
del conn
gevent.spawn( self.create_connection ).join()
else:
raise PoolConnectionException( "Passed object %s is not a PoolConnection." % ( conn, ) )
@property
def qsize( self ):
return self.pool.qsize()
@property
def is_ready( self ):
return self.ready == True
[docs]class PoolConnection( object ):
"""
Single connection object for the pool
On object initialization the object initializes the DB connection.
Direct access to the object is only possible when the member name
is prefixed with 'PoolConnection_'. Otherwise member access goes
to the 'inner' connection object.
"""
def __init__( self, db_module, dsn, cursor_type = None ):
self.db_module_name = db_module
self.cursor_type = cursor_type
self.db_module = sys.modules[ db_module ]
try:
self.conn = self.PoolConnection_db_module.connect( dsn )
self.initialized_at = time()
except Exception, e:
raise PoolConnectionException( "PoolConnection failed: Could not connect to database." )
def __getattribute__( self, name ):
if name.startswith( 'PoolConnection_' ) or name == 'cursor':
if name == 'cursor':
return object.__getattribute__( self, name )
else:
return object.__getattribute__( self, name[15:] )
else:
return object.__getattribute__( self.PoolConnection_conn, name )
def cursor( self, *args, **kwargs ):
if self.PoolConnection_db_module_name == 'psycopg2':
kwargs[ 'cursor_factory' ] = RealDictCursor if self.PoolConnection_cursor_type is None else self.PoolConnection_cursor_type
return self.PoolConnection_conn.cursor( *args, **kwargs )
elif self.PoolConnection_db_module_name == 'MySQLdb':
args.append( MySQLdb.cursors.DictCursor if self.PoolConnection_cursor_type is None else self.PoolConnection_cursor_type )
return self.PoolConnection_conn.cursor( *args, **kwargs )