Source code for pubsub.utils.notification

"""
Provide an interface class for handling pubsub notification messages, 
and an example class (though very useful in practice) showing how to 
use it. 

Notification messages are generated by pubsub

- if a handler has been configured via pub.addNotificationHandler()
- when pubsub does certain tasks, such as when a listener subscribes to
  or unsubscribes from a topic
  
Derive from this class to handle notification events from 
various parts of pubsub. E.g. when a listener subscribes, 
unsubscribes, or dies, a notification handler, if you 
specified one via pub.addNotificationHandler(), is given the 
relevant information. 

:copyright: Copyright since 2006 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE_BSD_Simple.txt for details.
"""

from typing import List, Mapping, Any, TextIO

from ..core import TopicManager, INotificationHandler, Listener, Topic, Publisher


[docs] class IgnoreNotificationsMixin(INotificationHandler): """ Derive your Notifications handler from this class if your handler just wants to be notified of one or two types of pubsub events. Then just override the desired methods. The rest of the notifications will automatically be ignored. """
[docs] def notifySubscribe(self, pubListener: Listener, topicObj: Topic, newSub: bool): pass
[docs] def notifyUnsubscribe(self, pubListener: Listener, topicObj: Topic): pass
[docs] def notifyDeadListener(self, pubListener: Listener, topicObj: Topic): pass
[docs] def notifySend(self, stage: str, topicObj: Topic, pubListener: Listener = None): pass
[docs] def notifyNewTopic(self, topicObj: Topic, description: str, required: List[str], argsDocs: Mapping[str, str]): pass
[docs] def notifyDelTopic(self, topicName: str): pass
class NotifyByWriteFile(INotificationHandler): """ Print a message to stdout when a notification is received. """ defaultPrefix = 'PUBSUB:' def __init__(self, fileObj: TextIO = None, prefix: str = None): """ Will write to stdout unless fileObj given. Will use defaultPrefix as prefix for each line output, unless prefix specified. """ self.__pre = prefix or self.defaultPrefix if fileObj is None: import sys self.__fileObj = sys.stdout else: self.__fileObj = fileObj def changeFile(self, fileObj): self.__fileObj = fileObj def notifySubscribe(self, pubListener: Listener, topicObj: Topic, newSub: bool): if newSub: msg = '%s Subscribed listener "%s" to topic "%s"\n' else: msg = '%s Subscription of "%s" to topic "%s" redundant\n' msg = msg % (self.__pre, pubListener, topicObj.getName()) self.__fileObj.write(msg) def notifyUnsubscribe(self, pubListener: Listener, topicObj: Topic): msg = '%s Unsubscribed listener "%s" from topic "%s"\n' msg = msg % (self.__pre, pubListener, topicObj.getName()) self.__fileObj.write(msg) def notifyDeadListener(self, pubListener: Listener, topicObj: Topic): msg = '%s Listener "%s" of Topic "%s" has died\n' \ % (self.__pre, pubListener, topicObj.getName()) # a bug apparently: sometimes on exit, the stream gets closed before # and leads to a TypeError involving NoneType self.__fileObj.write(msg) def notifySend(self, stage: str, topicObj: Topic, pubListener: Listener = None): if stage == 'in': msg = '%s Sending message of topic "%s" to listener %s\n' % (self.__pre, topicObj.getName(), pubListener) elif stage == 'pre': msg = '%s Start sending message of topic "%s"\n' % (self.__pre, topicObj.getName()) else: msg = '%s Done sending message of topic "%s"\n' % (self.__pre, topicObj.getName()) self.__fileObj.write(msg) def notifyNewTopic(self, topicObj: Topic, description: str, required: List[str], argsDocs: Mapping[str, str]): msg = '%s New topic "%s" created\n' % (self.__pre, topicObj.getName()) self.__fileObj.write(msg) def notifyDelTopic(self, topicName: str): msg = '%s Topic "%s" destroyed\n' % (self.__pre, topicName) self.__fileObj.write(msg) class NotifyByPubsubMessage(INotificationHandler): """ Handle pubsub notification messages by generating messages of a 'pubsub.' subtopic. Also provides an example of how to create a notification handler. Use it by calling:: import pubsub.utils pubsub.utils.useNotifyByPubsubMessage() ... pub.setNotificationFlags(...) # optional E.g. whenever a listener is unsubscribed, a 'pubsub.unsubscribe' message is generated. If you have subscribed a listener of this topic, your listener will be notified of what listener unsubscribed from what topic. """ topicRoot = 'pubsub' topics = dict( send='%s.sendMessage' % topicRoot, subscribe='%s.subscribe' % topicRoot, unsubscribe='%s.unsubscribe' % topicRoot, newTopic='%s.newTopic' % topicRoot, delTopic='%s.delTopic' % topicRoot, deadListener='%s.deadListener' % topicRoot) def __init__(self, topicMgr: TopicManager = None): self._pubTopic = None self.__sending = False # used to guard against infinite loop if topicMgr is not None: self.createNotificationTopics(topicMgr) def createNotificationTopics(self, topicMgr: TopicManager): """ Create the notification topics. The root of the topics created is self.topicRoot. The topicMgr is (usually) pub.topicMgr. """ # see if the special topics have already been defined try: topicMgr.getTopic(self.topicRoot) except ValueError: # no, so create them self._pubTopic = topicMgr.getOrCreateTopic(self.topicRoot) self._pubTopic.setDescription('root of all pubsub-specific topics') _createTopics(self.topics, topicMgr) def notifySubscribe(self, pubListener: Listener, topicObj: Topic, newSub: bool): if (self._pubTopic is None) or self.__sending: return pubTopic = self._pubTopic.getSubtopic('subscribe') if topicObj is not pubTopic: kwargs = dict(listener=pubListener, topic=topicObj, newSub=newSub) self.__doNotification(pubTopic, kwargs) def notifyUnsubscribe(self, pubListener: Listener, topicObj: Topic): if (self._pubTopic is None) or self.__sending: return pubTopic = self._pubTopic.getSubtopic('unsubscribe') if topicObj is not pubTopic: kwargs = dict( topic=topicObj, listenerRaw=pubListener.getCallable(), listener=pubListener) self.__doNotification(pubTopic, kwargs) def notifyDeadListener(self, pubListener: Listener, topicObj: Topic): if (self._pubTopic is None) or self.__sending: return pubTopic = self._pubTopic.getSubtopic('deadListener') kwargs = dict(topic=topicObj, listener=pubListener) self.__doNotification(pubTopic, kwargs) def notifySend(self, stage: str, topicObj: Topic, pubListener: Listener = None): """ Stage must be 'pre' or 'post'. Note that any pubsub sendMessage operation resulting from this notification (which sends a message; listener could handle by sending another message!) will NOT themselves lead to a send notification. """ if (self._pubTopic is None) or self.__sending: return sendMsgTopic = self._pubTopic.getSubtopic('sendMessage') if stage == 'pre' and (topicObj is sendMsgTopic): msg = 'Not allowed to send messages of topic %s' % topicObj.getName() raise ValueError(msg) self.__doNotification(sendMsgTopic, dict(topic=topicObj, stage=stage)) def notifyNewTopic(self, topicObj: Topic, description: str, required: List[str], argsDocs: Mapping[str, str]): if (self._pubTopic is None) or self.__sending: return pubTopic = self._pubTopic.getSubtopic('newTopic') kwargs = dict(topic=topicObj, description=description, required=required, args=argsDocs) self.__doNotification(pubTopic, kwargs) def notifyDelTopic(self, topicName: str): if (self._pubTopic is None) or self.__sending: return pubTopic = self._pubTopic.getSubtopic('delTopic') self.__doNotification(pubTopic, dict(name=topicName)) def __doNotification(self, pubTopic: Topic, kwargs: Mapping[str, Any]): self.__sending = True try: pubTopic.publish(**kwargs) finally: self.__sending = False def _createTopics(topicMap: Mapping[str, str], topicMgr: TopicManager): """ Create notification topics. These are used when some of the notification flags have been set to True (see pub.setNotificationFlags(). The topicMap is a dict where key is the notification type, and value is the topic name to create. Notification type is a string in ('send', 'subscribe', 'unsubscribe', 'newTopic', 'delTopic', 'deadListener'). """ def newTopic(_name, _desc, _required=None, **argsDocs): topic = topicMgr.getOrCreateTopic(_name) topic.setDescription(_desc) topic.setMsgArgSpec(argsDocs, _required) newTopic( _name=topicMap['subscribe'], _desc='whenever a listener is subscribed to a topic', topic='topic that listener has subscribed to', listener='instance of pub.Listener containing listener', newSub='false if listener was already subscribed, true otherwise') newTopic( _name=topicMap['unsubscribe'], _desc='whenever a listener is unsubscribed from a topic', topic='instance of Topic that listener has been unsubscribed from', listener='instance of pub.Listener unsubscribed; None if listener not found', listenerRaw='listener unsubscribed') newTopic( _name=topicMap['send'], _desc='sent at beginning and end of sendMessage()', topic='instance of topic for message being sent', stage='stage of send operation: "pre" or "post" or "in"', listener='which listener being sent to') newTopic( _name=topicMap['newTopic'], _desc='whenever a new topic is defined', topic='instance of Topic created', description='description of topic (use)', args='the argument names/descriptions for arguments that listeners must accept', required='which args are required (all others are optional)') newTopic( _name=topicMap['delTopic'], _desc='whenever a topic is deleted', name='full name of the Topic instance that was destroyed') newTopic( _name=topicMap['deadListener'], _desc='whenever a listener dies without having unsubscribed', topic='instance of Topic that listener was subscribed to', listener='instance of pub.Listener containing dead listener')
[docs] def useNotifyByPubsubMessage(publisher: Publisher = None, all: bool = True, **kwargs): """ Will cause all of pubsub's notifications of pubsub "actions" (such as new topic created, message sent, listener subscribed, etc) to be sent out as messages. Topic will be 'pubsub' subtopics, such as 'pubsub.newTopic', 'pubsub.delTopic', 'pubsub.sendMessage', etc. The 'all' and kwargs args are the same as pubsub's setNotificationFlags(), except that 'all' defaults to True. The publisher is rarely needed: * The publisher must be specfied if pubsub is not installed on the system search path (ie from pubsub import ... would fail or import wrong pubsub -- such as if pubsub is within wxPython's wx.lib package). Then pbuModule is the pub module to use:: from wx.lib.pubsub import pub from wx.lib.pubsub.utils import notification notification.useNotifyByPubsubMessage() """ if publisher is None: from .. import pub publisher = pub.getDefaultPublisher() topicMgr = publisher.getTopicMgr() notifHandler = NotifyByPubsubMessage(topicMgr) publisher.addNotificationHandler(notifHandler) publisher.setNotificationFlags(all=all, **kwargs)
[docs] def useNotifyByWriteFile(fileObj: TextIO = None, prefix: str = None, publisher: Publisher = None, all: bool = True, **kwargs): """ Will cause all pubsub notifications of pubsub "actions" (such as new topic created, message sent, listener died etc) to be written to specified file (or stdout if none given). The fileObj need only provide a 'write(string)' method. The first two arguments are the same as those of NotifyByWriteFile constructor. The 'all' and kwargs arguments are those of pubsub's setNotificationFlags(), except that 'all' defaults to True. See useNotifyByPubsubMessage() for an explanation of pubModule (typically only if pubsub inside wxPython's wx.lib) """ if publisher is None: from .. import pub publisher = pub.getDefaultPublisher() notifHandler = NotifyByWriteFile(fileObj, prefix) publisher.addNotificationHandler(notifHandler) publisher.setNotificationFlags(all=all, **kwargs)