Source code for gDBPool.PGChannelListener
# -*- coding: utf-8 -*-
# Copyright 2011-2012 Florian von Bock (f at vonbock dot info)
#
# gDBPool - db connection pooling for gevent
#
# PGChannelListener - subscribes to a (NOTIFY) channel on postgres
# via LISTEN and streams the events to the subscribes result_queues
__author__ = "Florian von Bock"
__email__ = "f at vonbock dot info"
__version__ = "0.1.2"
import gevent
from gevent import monkey; monkey.patch_all()
import psycopg2
from psyco_ge import make_psycopg_green; make_psycopg_green()
from gevent.select import select
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
class PGChannelListenerException( Exception ):
pass
[docs]def pipe_colon_unmarshall( payload_string ):
"""
:param string payload_string:
:returns: dict -- unmarshalled NOTIFY data
"""
payload_data = {}
try:
for kv_pair in payload_string.split( '|' ):
k, v = kv_pair.split( ':', 1 )
payload_data[ k ] = v
except Exception, e:
raise PGChannelListenerException( "Unmarshalling of the LISTEN payload failed: %s" % ( e.message, ) )
return payload_data
[docs]class PGChannelListener( object ):
"""
A Listener for Postgres LISTEN/NOTIFY channels using gevent.
For each channel there will be one `:class:PGChannelListener` instance that fans
notifications out to the subscribed `:class:Queue`}
"""
def __new__( cls, q, pool, channel_name, *args, **kwargs ):
if not hasattr( cls, '_instances' ):
cls._instances = {}
if not cls._instances.has_key( channel_name ):
cls._instances[ channel_name ] = object.__new__( cls )
cls._instances[ channel_name ].subscribers = {}
cls._instances[ channel_name ].pool = pool
cls._instances[ channel_name ].conn = pool.get( iso_level = ISOLATION_LEVEL_AUTOCOMMIT )
cls._instances[ channel_name ].cur = None
cls._instances[ channel_name ].channel_name = channel_name
gevent.spawn( cls._instances[ channel_name ].listen )
# hai hai... using id for this kind of stuff is sort of dangerous.
# will come up with something less pointing gun at foot(TM). later. (TM).
cls._instances[ channel_name ].subscribers[ id( q ) ] = q
return cls._instances[ channel_name ]
def __init__( self, q, pool, channel_name ):
"""
Create a Listener for a channel_name and pass the notifications to q result `Queue`.
:param gevent.Queue q: Queue to pass asynchronous payloads to
:param gDBPool.DBConnectionPool pool: Connection pool to get a connection from and execute ``LISTEN <channel_name>;`` on if no other instance listens on that channel already. In the latter case the q is just being subscribed to the channel.
:param string channel_name: channel to listen on. (``LISTEN <channel_name>;``)
:returns: -- PGChannelListener instance
"""
pass
def __del__( self ):
print "** __del__", self.channel_name
del PGChannelListener._instances[ self.channel_name ]
[docs] def unregister_queue( self, q_id ):
"""
Unregister a Queue from its channel.
"""
if self.stop_event.is_set():
return
del self.subscribers[ q_id ]
if len( self.subscribers ) == 0:
self.stop_event.set()
self.cur.close()
self.pool.put( self.conn )
[docs] def listen( self, unmarshaller = pipe_colon_unmarshall ):
"""
Subscriber to the channel and send notification payloads to the
results Queue.
:param function unmarshaller: Function to pass the `notify.payload` string into for unmarshalling into python object (ie. dict) data
"""
self.stop_event = stop_event = gevent.event.Event()
self.cur = self.conn.cursor()
self.cur.execute( "LISTEN %s;" % ( self.channel_name, ) )
while 1:
if self.stop_event.is_set():
return
if select( [ self.conn ], [], [] ) == ( [], [], [] ):
print "LISTEN timeout."
else:
if self.stop_event.is_set():
return
self.conn.poll()
while self.conn.notifies:
if self.stop_event.is_set():
return
notify = self.conn.notifies.pop()
payload_data = unmarshaller( notify.payload )
for q_id in self.subscribers.iterkeys():
self.subscribers[ q_id ].put( payload_data )