"""
:copyright: Copyright since 2006 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE_BSD_Simple.txt for details.
"""
from typing import List, Mapping
from .listener import Listener
from .topicobj import Topic
[docs]
class INotificationHandler:
"""
Defines the interface expected by pubsub for pubsub activity
notifications. Any instance that supports the same methods, or
derives from this class, will work as a notification handler
for pubsub events (see pub.addNotificationHandler).
"""
def notifySubscribe(self, pubListener: Listener, topicObj: Topic, newSub: bool):
"""
Called when a listener is subscribed to a topic.
:param pubListener: the pubsub.core.Listener that wraps subscribed listener.
:param topicObj: the pubsub.core.Topic object subscribed to.
:param newSub: false if pubListener was already subscribed.
"""
raise NotImplementedError
def notifyUnsubscribe(self, pubListener: Listener, topicObj: Topic):
"""
Called when a listener is unsubscribed from given topic.
:param pubListener: the pubsub.core.Listener that wraps unsubscribed listener.
:param topicObj: the pubsub.core.Topic object unsubscribed from.
"""
raise NotImplementedError
def notifyDeadListener(self, pubListener: Listener, topicObj: Topic):
"""
Called when a listener has been garbage collected.
:param pubListener: the pubsub.core.Listener that wraps GC'd listener.
:param topicObj: the pubsub.core.Topic object it was subscribed to.
"""
raise NotImplementedError
def notifySend(self, stage: str, topicObj: Topic, pubListener: Listener = None):
"""
Called multiple times during a sendMessage: once before message
sending has started (pre), once for each listener about to be sent the
message, and once after all listeners have received the message (post).
:param stage: 'pre', 'post', or 'loop'.
:param topicObj: the Topic object for the message.
:param pubListener: None for pre and post stages; for loop, the listener
that is about to be sent the message.
"""
raise NotImplementedError
def notifyNewTopic(self, topicObj: Topic, description: str, required: List[str], argsDocs: Mapping[str, str]):
"""
Called whenever a new topic is added to the topic tree.
:param topicObj: the Topic object for the message.
:param description: docstring for the topic.
:param required: list of message data names (keys in argsDocs) that are required.
:param argsDocs: dictionary of all message data names, with the
corresponding docstring.
"""
raise NotImplementedError
def notifyDelTopic(self, topicName: str):
"""
Called whenever a topic is removed from topic tree.
:param topicName: name of topic removed.
"""
raise NotImplementedError
class NotificationMgr:
"""
Manages notifications for tracing pubsub activity. When pubsub takes a
certain action such as sending a message or creating a topic, and
the notification flag for that activity is True, all registered
notification handlers get corresponding method called with information
about the activity, such as which listener subscribed to which topic.
See INotificationHandler for which method gets called for each activity.
If more than one notification handler has been registered, the order in
which they are notified is unspecified (do not rely on it).
Note that this manager automatically unregisters all handlers when
the Python interpreter exits, to help avoid NoneType exceptions during
shutdown. This "shutdown" starts when the last line of app "main" has
executed; the Python interpreter then starts cleaning up, garbage
collecting everything, which could lead to various pubsub notifications
-- by then they should be of no interest -- such as dead
listeners, etc.
"""
def __init__(self, notificationHandler: INotificationHandler = None):
self.__notifyOnSend = False
self.__notifyOnSubscribe = False
self.__notifyOnUnsubscribe = False
self.__notifyOnNewTopic = False
self.__notifyOnDelTopic = False
self.__notifyOnDeadListener = False
self.__handlers = []
if notificationHandler is not None:
self.addHandler(notificationHandler)
self.__atExitRegistered = False
def addHandler(self, handler: INotificationHandler):
if not self.__atExitRegistered:
self.__registerForAppExit()
self.__handlers.append(handler)
def getHandlers(self) -> List[INotificationHandler]:
return self.__handlers[:]
def clearHandlers(self):
self.__handlers = []
def notifySubscribe(self, *args, **kwargs):
if self.__notifyOnSubscribe and self.__handlers:
for handler in self.__handlers:
handler.notifySubscribe(*args, **kwargs)
def notifyUnsubscribe(self, *args, **kwargs):
if self.__notifyOnUnsubscribe and self.__handlers:
for handler in self.__handlers:
handler.notifyUnsubscribe(*args, **kwargs)
def notifySend(self, *args, **kwargs):
if self.__notifyOnSend and self.__handlers:
for handler in self.__handlers:
handler.notifySend(*args, **kwargs)
def notifyNewTopic(self, *args, **kwargs):
if self.__notifyOnNewTopic and self.__handlers:
for handler in self.__handlers:
handler.notifyNewTopic(*args, **kwargs)
def notifyDelTopic(self, *args, **kwargs):
if self.__notifyOnDelTopic and self.__handlers:
for handler in self.__handlers:
handler.notifyDelTopic(*args, **kwargs)
def notifyDeadListener(self, *args, **kwargs):
if self.__notifyOnDeadListener and self.__handlers:
for handler in self.__handlers:
handler.notifyDeadListener(*args, **kwargs)
def getFlagStates(self) -> Mapping[str, bool]:
"""Return state of each notification flag, as a dict."""
return dict(
subscribe=self.__notifyOnSubscribe,
unsubscribe=self.__notifyOnUnsubscribe,
deadListener=self.__notifyOnDeadListener,
sendMessage=self.__notifyOnSend,
newTopic=self.__notifyOnNewTopic,
delTopic=self.__notifyOnDelTopic,
)
def setFlagStates(self, subscribe: bool = None, unsubscribe: bool = None, deadListener: bool = None,
sendMessage: bool = None, newTopic: bool = None, delTopic: bool = None, all: bool = None):
"""
Set the notification flag on/off for various aspects of pubsub.
The kwargs that are None are left at their current value. The 'all',
if not None, is set first. E.g.
mgr.setFlagStates(all=True, delTopic=False)
will toggle all notifications on, but will turn off the 'delTopic'
notification.
"""
if all is not None:
# ignore all other arg settings, and set all of them to true:
numArgs = 7 # how many args in this method
self.setFlagStates(all=None, *((numArgs - 1) * [all]))
if sendMessage is not None:
self.__notifyOnSend = sendMessage
if subscribe is not None:
self.__notifyOnSubscribe = subscribe
if unsubscribe is not None:
self.__notifyOnUnsubscribe = unsubscribe
if newTopic is not None:
self.__notifyOnNewTopic = newTopic
if delTopic is not None:
self.__notifyOnDelTopic = delTopic
if deadListener is not None:
self.__notifyOnDeadListener = deadListener
def __registerForAppExit(self):
import atexit
atexit.register(self.clearHandlers)
self.__atExitRegistered = True