Source code for gDBPool.interaction_pool

# -*- coding: utf-8 -*-

# Copyright 2011-2012 Florian von Bock (f at vonbock dot info)
#
# gDBPool - db connection pooling for gevent

__author__ = "Florian von Bock"
__email__ = "f at vonbock dot info"
__version__ = "0.1.2"


import gevent
from gevent import monkey; monkey.patch_all()

import psycopg2
import sys, traceback

from psyco_ge import make_psycopg_green; make_psycopg_green()
assert 'gDBPool.psyco_ge' in sys.modules.keys()

from gevent.queue import Queue, Empty as QueueEmptyException
# from gevent.pool import Pool as GreenPool
from gevent.event import AsyncResult
# from time import time
from types import FunctionType, MethodType, StringType
from gDBPoolError import DBInteractionException, DBPoolConnectionException, PoolConnectionException, StreamEndException
from PGChannelListener import PGChannelListener
# from psycopg2.extras import RealDictCursor
# from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_UNCOMMITTED, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ, ISOLATION_LEVEL_SERIALIZABLE
from psycopg2 import DatabaseError
psycopg2.extensions.register_type( psycopg2.extensions.UNICODE )
psycopg2.extensions.register_type( psycopg2.extensions.UNICODEARRAY )
from inspect import getargspec

from connection_pool import DBConnectionPool

# TODO: make this an option...
# DEC2FLOAT = psycopg2.extensions.new_type(
#     psycopg2.extensions.DECIMAL.values,
#     'DEC2FLOAT',
#     lambda value, curs: float( value ) if value is not None else None )
# psycopg2.extensions.register_type( DEC2FLOAT )


[docs]class DBInteractionPool( object ): """ The DBInteractionPool manages `DBConnectionPool` instances and can run queries or functions (ie. several queries wrapped in a function) on one of these pools. """ def __new__( cls, dsn, *args, **kwargs ): if not hasattr( cls, '_instance' ): cls._instance = object.__new__( cls ) return cls._instance def __init__( self, dsn, pool_size = 10, pool_name = 'default', do_log = False ): """ :param string dsn: DSN for the default `class:DBConnectionPool` :param int pool_size: Poolsize of the first/default `class:DBConnectionPool` :param string pool_name: Keyname for the first/default `class:DBConnectionPool` :param bool do_log: Log to the console or not """ if do_log == True: import logging logging.basicConfig( level = logging.INFO, format = "%(asctime)s %(message)s" ) self.logger = logging.getLogger() self.do_log = do_log self.request_queue = Queue( maxsize = None ) self.db_module = 'psycopg2' self.conn_pools = {} self.default_write_pool = None self.default_read_pool = None self.default_pool = None self.add_pool( dsn = dsn, pool_name = 'default', pool_size = pool_size, default_write_pool = True, default_read_pool = True, db_module = self.db_module ) def __del__( self ): if self.do_log: self.logger.info( "__del__ DBInteractionPool" ) for p in self.conn_pools: self.conn_pools[ p ].__del__() def __call__( self, *args, **kwargs ): """ syntactic sugar for `:ref:DBInteractionPool.run` """ return self.run( *args, **kwargs )
[docs] def add_pool( self, dsn = None, pool_name = None, pool_size = 10, default_write_pool = False, default_read_pool = False, default_pool = False, db_module = 'psycopg2' ): """ Add a named `:class:DBConnectionPool` :param string dsn: dsn :param string pool_name: a name for the pool to identify it inside the DBInteractionPool :param int pool_size: Number of connections the pool should have. :param bool default_write_pool: Should the added pool used as the default pool for write operations? :param bool default_read_pool: Should the added pool used as the default pool for read operations? :param bool default_pool: Should the added pool used as the default pool? (*must* be a write pool) :param string db_module: name of the DB-API module to use .. note:: db_module right now ONLY supports psycopg2 and the option most likely will be removed in the future """ if not self.conn_pools.has_key( pool_name ): self.conn_pools[ pool_name ] = DBConnectionPool( dsn, db_module = self.db_module, pool_size = pool_size, do_log = self.do_log ) if default_write_pool: self.default_write_pool = pool_name if self.default_pool or self.default_pool is None: self.default_pool = pool_name if default_read_pool: self.default_read_pool = pool_name else: raise DBInteractionException( "Already have a pool with the name: %s. ConnectionPool not added!" % ( pool_name, ) )
@property def pool( self ): return self.conn_pools[ self.default_pool ]
[docs] def run( self, interaction = None, interaction_args = None, get_result = True, is_write = True, pool = None, conn = None, cursor = None, partial_txn = False, dry_run = False, *args, **kwargs ): """ Run an interaction on one of the managed `:class:DBConnectionPool` pools. :param function|method interaction: The interaction to run. Either a SQL string or a function that takes at least a parameter `conn`. :param string interaction_args: None, :param bool get_result: call and return cursor.fetchall() when True - otherwise just return True as result if no exception was raised. :param bool is_write: If the interaction has no side-effects set to `False`. Without naming a pool the default_read pool would be used. :param string pool: Keyname of the pool to get the a connection from :param connection conn: Pass in a `Connection` instead of getting one from the pool. (ie. for locks in transactions that span several interactions. Use `partial_txn = True` to retrieve the Connection and then pass it into the next interaction run.) :param cursor cursor: Pass in a `Cursor` instead of getting one from the `Connection` (ie. for locks in transactions that span several interactions. Use `partial_txn = True` to retrieve the Cursor and then pass it into the next interaction run.) :param bool partial_txn: Return connection and cursor after executing the interaction (ie. for locks in transactions that span several interactions) :param bool dry_run: Run the query with `mogrify` instead of `execute` and output the query that would have run. (Only applies to query interactions) :param list args: args :param dict kwargs: kwargs :returns: -- a :class:`gevent.AsyncResult` that will hold the result of the interaction once it finished. When `partial_txn = True` it will return a dict that will hold the result, the connection, and the cursor that ran the transaction. (use for locking with SELECT FOR UPDATE) """ async_result = AsyncResult() if is_write: use_pool = self.default_write_pool if pool is None else pool else: use_pool = self.default_read_pool if pool is None else pool if not conn: conn = self.conn_pools[ use_pool ].get() if isinstance( interaction, FunctionType ) or isinstance( interaction, MethodType ): def wrapped_transaction_f( async_res, interaction, conn = None, cursor = None, *args ): try: if not conn: conn = self.conn_pools[ use_pool ].get() kwargs[ 'conn' ] = conn if cursor: kwargs[ 'cursor' ] = cursor elif 'cursor' in getargspec( interaction )[ 0 ]: kwargs[ 'cursor' ] = kwargs[ 'conn' ].cursor() res = interaction( *args, **kwargs ) if not partial_txn: async_res.set( res ) if cursor and not cursor.closed: cursor.close() else: async_res.set( { 'result': res, 'connection': conn, 'cursor': kwargs[ 'cursor' ] } ) except DatabaseError, e: if self.do_log: self.logger.info( "exception: %s", ( e, ) ) async_result.set_exception( DBInteractionException( e ) ) except Exception, e: if self.do_log: self.logger.info( "exception: %s", ( e, ) ) async_result.set_exception( DBInteractionException( e ) ) finally: if conn and not partial_txn: self.conn_pools[ use_pool ].put( conn ) gevent.spawn( wrapped_transaction_f, async_result, interaction, conn = conn, cursor = cursor, *args ) return async_result elif isinstance( interaction, StringType ): def transaction_f( async_res, sql, conn = None, cursor = None, *args ): try: if not conn: conn = self.conn_pools[ use_pool ].get() if not cursor: cursor = conn.cursor() if interaction_args is not None: cursor.execute( sql, interaction_args ) else: cursor.execute( sql ) if get_result: res = cursor.fetchall() else: res = True if is_write and not partial_txn: conn.commit() if not partial_txn: cursor.close() async_res.set( res ) else: async_res.set( { 'result': res, 'connection': conn, 'cursor': cursor} ) except DatabaseError, e: if self.do_log: self.logger.info( "exception: %s", ( e, ) ) async_result.set_exception( DBInteractionException( e ) ) except Exception, e: traceback.print_exc( file = sys.stdout ) # if is_write and partial_txn: # ?? conn.rollback() if self.do_log: self.logger.info( "exception: %s", ( e, ) ) async_result.set_exception( DBInteractionException( e ) ) finally: if conn and not partial_txn: self.conn_pools[ use_pool ].put( conn ) gevent.spawn( transaction_f, async_result, interaction, conn = conn, cursor = cursor, *args ) return async_result else: raise DBInteractionException( "%s cannot be run. run() only accepts FunctionTypes, MethodType, and StringTypes" % interacetion )
[docs] def listen_on( self, result_queue = None, channel_name = None, pool = None, cancel_event = None ): """ Listen for asynchronous events on a named Channel and pass them to the result_queue :param gevent.Queue result_queue: The :class:`gevent.Queue` to pass event payloads to :param string channel_name: Name of the channel to LISTEN on :param string pool: Name of the pool to get the connection from :param gevent.Event cancel_event: A :class:`gevent.Event` which will break the listening loop when set """ if self.db_module != 'psycopg2': raise DBInteractionException( "This feature requires PostgreSQL 9.x." ) use_pool = self.default_write_pool if pool is None else pool try: q = Queue( maxsize = None ) listener = PGChannelListener( q, self.conn_pools[ use_pool ], channel_name ) while 1: if cancel_event.is_set(): break try: result_queue.put( q.get_nowait() ) except QueueEmptyException: gevent.sleep( 0.001 ) except Exception, e: print "# FRAKK", e if self.do_log: self.logger.info( e ) listener.unregister_queue( id( q ) ) if self.do_log: self.logger.info( "stopped listening on: %s", ( channel_name, ) )