Core Classes

The following classes are useful for advanced use of PyPubSub:

  • pubsub.core.Listener

  • pubsub.core.TopicObj

  • pubsub.core.TopicManager

  • pubsub.core.Publisher

It is not typically necessary to know about or use these: the pub module instantiates a default Publisher, which contains a TopicManager, which generates a Topic object for every topic used. The Publisher instance returns a Listener instance from subscribe, wrapping the given callable with PyPubSub-relevant meta-data about the callable.

Publisher

class pubsub.core.Publisher(treeConfig: TreeConfig = None)[source]

Represent the class that send messages to listeners of given topics and that knows how to subscribe/unsubscribe listeners from topics.

addNotificationHandler(handler: INotificationHandler)[source]

Add a handler for tracing pubsub activity.

clearNotificationHandlers()[source]

Remove all notification handlers that were added via self.addNotificationHandler().

getListenerExcHandler() IListenerExcHandler[source]

Get the listener exception handler that was registered via setListenerExcHandler(), or None of none registered.

getNotificationFlags() Mapping[str, bool][source]

Return a dictionary with the notification flag states.

getTopicMgr() TopicManager[source]

Get the topic manager created for this publisher.

sendMessage(topicName: str, **msgData)[source]

Send a message. :param topicName: name of message topic (dotted or tuple format) :param msgData: message data (must satisfy the topic’s MDS)

setListenerExcHandler(handler: IListenerExcHandler)[source]

Set the function to call when a listener raises an exception during a sendMessage().

setNotificationFlags(**kwargs: Mapping[str, bool | None])[source]

Set the notification flags on or off for each type of pubsub activity. The kwargs keys can be any of the following:

  • subscribe: if True, get notified whenever a listener subscribes to a topic;

  • unsubscribe: if True, get notified whenever a listener unsubscribes from a topic;

  • deadListener: if True, get notified whenever a subscribed listener has been garbage-collected;

  • sendMessage: if True, get notified whenever sendMessage() is called;

  • newTopic: if True, get notified whenever a new topic is created;

  • delTopic: if True, get notified whenever a topic is “deleted” from topic tree;

  • all: set all of the above to the given value (True or False).

The kwargs that are not given or None are left at their current value. Those that are False will cause corresponding notification to be silenced. The ‘all’ is set first, then the others. E.g.

mgr.setFlagStates(all=True, delTopic=False)

will toggle all notifications on, but will turn off the ‘delTopic’ notification.

setTopicUnspecifiedFatal(newVal: bool = True, checkExisting: bool = True) bool[source]

Changes the creation policy for topics.

By default, pubsub will accept topic names for topics that don’t have a message data specification (MDS). This default behavior makes pubsub easier to use initially, but allows topic names with typos to go uncaught in common operations such as sendMessage() and subscribe(). In a large application, this can lead to nasty bugs. Pubsub’s default behavior is equivalent to setTopicUnspecifiedFatal(false).

When called with newVal=True, any future pubsub operation that requires a topic (such as subscribe and sendMessage) will require an MDS; if none is available, pubsub will raise a TopicDefnError exception.

If checkExisting is not given or True, all existing topics are validated. A TopicDefnError exception is raised if one is found to be incomplete (has hasMDS() false).

Returns previous value of newVal.

Note that this method can be used in several ways:

  1. Only use it in your application when something is not working as expected: just add a call at the beginning of your app when you have a problem with topic messages not being received (for instance), and remove it when you have fixed the problem.

  2. Use it from the beginning of your app and never use newVal=False: add a call at the beginning of your app and you leave it in (forever), and use Topic Definition Providers to provide the listener specifications. These are easy to use via the pub.addTopicDefnProvider().

  3. Use it as in #1 during app development, and once stable, use #2. This is easiest to do in combination with pub.exportTopicTreeSpec().

subscribe(listener: Callable[[...], Any], topicName: str, **curriedArgs) Listener[source]

Subscribe listener to named topic. Raises ListenerMismatchError if listener isn’t compatible with the topic’s MDS. Returns (pubsub.core.Listener, success), where success is False if listener was already subscribed. The pub.core.Listener wraps the callable subscribed and provides introspection-based info about the callable. Extra keyword arguments are treated as currying of listener arguments.

Example:

pub.subscribe(listener1, ‘some_topic’) pub.subscribe(listener2, ‘some_other_topic’, a=2, b=3)

In the second example, the listener2 will always receive a=2 and b=3 and pubsub treats it as though a and b were curried, i.e. as if the actual listener subscribed were a callable that did not have a or b parameters. Hence if some_other_topic has a or b as message data, subscription will raise a ListenerInadequate error.

Note that if ‘subscribe’ notification is on, the handler’s ‘notifySubscribe’ method is called after subscription.

unsubAll(topicName: str = None, listenerFilter: Callable[[Listener], bool] = None, topicFilter: str | Callable[[str], bool] = None) List[Listener][source]

Unsubscribe all listeners of a topic.

Parameters:
  • topicName – if none given, unsub from all topics.

  • listenerFilter – filter function to apply to listeners, unsubscribe only the listeners that satisfy listenerFilter(listener: Listener) == True

  • topicFilter – topic name, or a filter function to apply to topics; in latter case, only topics that satisfy topicFilter(topic name) == True will be affected

Returns:

list of all listeners (instances of pub.Listener) that were unsubscribed from the topic tree

Note: this method will generate one ‘unsubcribe’ notification message (see pub.setNotificationFlags()) for each listener unsubscribed.

unsubscribe(listener: Callable[[...], Any], topicName: str)[source]

Unsubscribe from given topic. Returns the pubsub.core.Listener instance that was used to wrap listener at subscription time. Raises an TopicNameError if topicName doesn’t exist.

Note that if ‘unsubscribe’ notification is on, the handler’s notifyUnsubscribe() method will be called after unsubscribing.

TopicManager

class pubsub.core.TopicManager(treeConfig: TreeConfig = None)[source]

Manages the registry of all topics and creation/deletion of topics.

Note that any method that accepts a topic name can accept it in the ‘dotted’ format such as 'a.b.c.' or in tuple format such as ('a', 'b', 'c'). Any such method will raise a ValueError if name not valid (empty, invalid characters, etc).

addDefnProvider(providerOrSource: Any, format=None) ITopicDefnProvider[source]

Register a topic definition provider. After this method is called, whenever a topic must be created, the first definition provider that has a definition for the required topic is used to instantiate the topic.

If providerOrSource is an instance of ITopicDefnProvider, register it as a provider of topic definitions. Otherwise, register a new instance of TopicDefnProvider(providerOrSource, format). In that case, if format is not given, it defaults to TOPIC_TREE_FROM_MODULE. Either way, returns the instance of ITopicDefnProvider registered.

checkAllTopicsHaveMDS()[source]

Check that all topics that have been created for their MDS. Raise a TopicDefnError if one is found that does not have one.

clearDefnProviders()[source]

Remove all registered topic definition providers

clearTree()[source]

Remove every topic from the topic tree

delTopic(name: str) bool[source]

Delete the named topic, including all sub-topics. Returns False if topic does not exist; True otherwise. Also unsubscribe any listeners of topic and all subtopics.

getNumDefnProviders() int[source]

Get how many topic definitions providers are registered.

getOrCreateTopic(name: str, protoListener: Callable[[...], Any] = None) Topic[source]

Get the Topic instance for topic of given name, creating it (and any of its missing parent topics) as necessary. Pubsub functions such as subscribe() use this to obtain the Topic object corresponding to a topic name.

The name can be in dotted or string format ('a.b.' or ('a','b')).

This method always attempts to return a “complete” topic, i.e. one with a Message Data Specification (MDS). So if the topic does not have an MDS, it attempts to add it. It first tries to find an MDS from a TopicDefnProvider (see addDefnProvider()). If none is available, it attempts to set it from protoListener, if it has been given. If not, the topic has no MDS.

Once a topic’s MDS has been set, it is never again changed or accessed by this method.

Examples:

# assume no topics exist
# but a topic definition provider has been added via
# pub.addTopicDefnProvider() and has definition for topics 'a' and 'a.b'

# creates topic a and a.b; both will have MDS from the defn provider:
t1 = topicMgr.getOrCreateTopic('a.b')
t2 = topicMgr.getOrCreateTopic('a.b')
assert(t1 is t2)
assert(t1.getParent().getName() == 'a')

def proto(req1, optarg1=None): pass
# creates topic c.d with MDS based on proto; creates c without an MDS
# since no proto for it, nor defn provider:
t1 = topicMgr.getOrCreateTopic('c.d', proto)

The MDS can also be defined via a call to subscribe(listener, topicName), which indirectly calls getOrCreateTopic(topicName, listener).

getRootAllTopics() Topic[source]

Get the topic that is parent of all root (ie top-level) topics, for default TopicManager instance created when this module is imported. Some notes:

  • “root of all topics” topic satisfies isAll()==True, isRoot()==False, getParent() is None;

  • all root-level topics satisfy isAll()==False, isRoot()==True, and getParent() is getDefaultTopicTreeRoot();

  • all other topics satisfy neither.

getTopic(name: str, okIfNone: bool = False) Topic[source]

Get the Topic instance for the given topic name. By default, raises an TopicNameError exception if a topic with given name doesn’t exist. If okIfNone=True, returns None instead of raising an exception.

getTopicsSubscribed(listener: Callable[[...], Any]) List[Topic][source]

Get the list of Topic objects that have given listener subscribed. Note: the listener can also get messages from any sub-topic of returned list.

hasTopicDefinition(name: str) bool[source]

Determine if there is a definition avaiable for topic ‘name’. Return true if there is, false otherwise. Note: a topic may have a definition without being in use, and vice versa.

isTopicInUse(name: str) bool[source]

Determine if topic ‘name’ is in use. True if a Topic object exists for topic name (i.e. message has already been sent for that topic, or a least one listener subscribed), false otherwise. Note: a topic may be in use but not have a definition (MDS and docstring); or a topic may have a definition, but not be in use.

Topic

class pubsub.core.Topic(treeConfig: TreeConfig, nameTuple: Tuple[str, ...], description: str, msgArgsInfo: ArgsInfo, parent: Topic = None)[source]

Represent topics in pubsub. Contains information about a topic, including topic’s message data specification (MDS), the list of subscribed listeners, docstring for the topic. It allows Python-like access to subtopics (e.g. A.B is subtopic B of topic A).

property argDescriptions: Dict[str, str]

Get a map of keyword names to docstrings: documents each MDS element.

property args: Tuple[Sequence[str], Sequence[str]]

Returns a pair (reqdArgs, optArgs) where reqdArgs is tuple of names of required message arguments, optArgs is tuple of names for optional arguments. If topic args not specified yet, returns (None, None).

property description: str

Return the ‘docstring’ of topic

filterMsgArgs(msgData: Mapping[str, Any], check: bool = False) Mapping[str, Any][source]

Get the MDS docstrings for each of the spedified kwargs.

getArgDescriptions() Dict[str, str][source]

Get a map of keyword names to docstrings: documents each MDS element.

getArgs() Tuple[Sequence[str], Sequence[str]][source]

Returns a pair (reqdArgs, optArgs) where reqdArgs is tuple of names of required message arguments, optArgs is tuple of names for optional arguments. If topic args not specified yet, returns (None, None).

getDescription() str[source]

Return the ‘docstring’ of topic

getListeners() List[Listener][source]

Get a copy of list of listeners subscribed to this topic. Safe to iterate over while listeners get un/subscribed from this topics (such as while sending a message).

getListenersIter() Iterator[Listener][source]

Get an iterator over listeners subscribed to this topic. Do not use if listeners can be un/subscribed while iterating.

getName() str[source]

Return dotted form of full topic name

getNameTuple() Tuple[str, ...][source]

Return tuple form of full topic name

getNodeName() str[source]

Return the last part of the topic name (has no dots)

getNumListeners() int[source]

Return number of listeners currently subscribed to topic. This is different from number of listeners that will get notified since more general topics up the topic tree may have listeners.

getParent() Topic[source]

Get Topic object that is parent of self (i.e. self is a subtopic of parent). Return none if self is the “all topics” topic.

getSubtopic(relName: str | Tuple[str, ...]) Topic[source]

Get the specified subtopic object. The relName can be a valid subtopic name, a dotted-name string, or a tuple.

getSubtopics() ValuesView[Topic][source]

Get a list of Topic instances that are subtopics of self.

hasListener(listener: Callable[[...], Any]) bool[source]

Return true if listener is subscribed to this topic.

hasListeners() bool[source]

Return true if there are any listeners subscribed to this topic, false otherwise.

hasMDS() bool[source]

Return true if this topic has a message data specification (MDS).

hasSubtopic(name: str = None) bool[source]

Return true only if name is a subtopic of self. If name not specified, return true only if self has at least one subtopic.

isAll() bool[source]

Returns true if this topic is the ‘all topics’ topic. All root topics behave as though they are child of that topic.

isRoot() bool[source]

Returns true if this is a “root” topic, false otherwise. A root topic is a topic whose name contains no dots and which has pub.ALL_TOPICS as parent.

isValid(listener: Callable[[...], Any], curriedArgNames: Sequence[str] = None) bool[source]

Return True only if listener could be subscribed to this topic, otherwise returns False. Note that method raises TopicDefnError if self not hasMDS().

property listeners: List[Listener]

Get a copy of list of listeners subscribed to this topic. Safe to iterate over while listeners get un/subscribed from this topics (such as while sending a message).

property name: str

Return dotted form of full topic name

property numListeners: int

Return number of listeners currently subscribed to topic. This is different from number of listeners that will get notified since more general topics up the topic tree may have listeners.

property parent: Topic

Get Topic object that is parent of self (i.e. self is a subtopic of parent). Return none if self is the “all topics” topic.

publish(**msgData)[source]

This sends message to listeners of parent topics as well. If an exception is raised in a listener, the publish is aborted, except if there is a handler (see pub.setListenerExcHandler).

Note that it is important that the PublisherMixin NOT modify any state data during message sending, because in principle it could happen that a listener causes another message of same topic to be sent (presumably, the listener has a way of preventing infinite loop).

setArgDescriptions(**docs: Dict[str, str])[source]

Set the docstring for each MDS datum.

setDescription(desc: str)[source]

Set the ‘docstring’ of topic

setMsgArgSpec(argsDocs: Dict[str, str], required: Sequence[str] = ())[source]

Specify the message data for topic messages. :param argsDocs: a dictionary of keyword names (message data name) and data ‘docstring’; cannot be None :param required: a list of those keyword names, appearing in argsDocs, which are required (all others are assumed optional)

Can only be called if this info has not been already set at construction or in a previous call. :raise RuntimeError: if MDS already set at construction or previous call.

subscribe(listener: Callable[[...], Any], **curriedArgs) Tuple[Listener, bool][source]

Subscribe listener to this topic. Returns a pair (pub.Listener, success).

Parameters:

curriedArgs – keyword argument to curry the listener arguments at message time; the listener(args) is treated essentially as listener(**(args - curriedArgs)). If the listener was already subscribed, the pure curried args names (curriendArgs.keys() - _overrides_) must be unchanged.

Returns:

True only if listener was not already subscribed; False if it was already subscribed.

property subtopics: ValuesView[Topic]

Get a list of Topic instances that are subtopics of self.

unsubscribe(listener: Callable[[...], Any]) Listener[source]

Unsubscribe the specified listener from this topic. Returns the pub.Listener object associated with the listener that was unsubscribed, or None if the specified listener was not subscribed to this topic. Note that this method calls notifyUnsubscribe(listener, self) on all registered notification handlers (see pub.addNotificationHandler).

unsubscribeAllListeners(filter: Callable[[Listener], bool] = None) List[Listener][source]

Clears list of subscribed listeners. If filter is given, it must be a function that takes a listener and returns true if the listener should be unsubscribed. Returns the list of Listener for listeners that were unsubscribed.

validate(listener: Callable[[...], Any], curriedArgNames: Sequence[str] = None) CallArgsInfo[source]

Checks whether listener could be subscribed to this topic: if yes, just returns; if not, raises ListenerMismatchError. Note that method raises TopicDefnError if self not hasMDS().

Listener

class pubsub.core.Listener(callable_obj: Callable[[...], Any], argsInfo: CallArgsInfo, curriedArgs: Mapping[str, Any] = None, onDead: Callable[[Listener], None] = None)[source]

Wraps a callable (UserListener) so it can be stored by weak reference and introspected to verify that it adheres to a topic’s MDS.

A Listener instance has the same hash value as the callable that it wraps.

Callables that have ‘argName=pub.AUTO_TOPIC’ as a kwarg will be given the Topic object for the message sent by sendMessage(). Such a Listener will have wantsTopicObjOnCall() True.

Callables that have a ‘** kargs’ argument will receive all message data, not just that for the topic they are subscribed to. Such a listener will have wantsAllMessageData() True.

getCallable() Callable[[...], Any][source]

Get the listener that was given at initialization. Note that this could be None if it has been garbage collected (e.g. if it was created as a wrapper of some other callable, and not stored locally).

isDead() bool[source]

Return True if this listener died (has been garbage collected)

module() module[source]

Get the module in which the callable was defined.

name() str[source]

Return a human readable name for listener, based on the listener’s type name and its id (as obtained from id(listener)). If caller just needs name based on type info, specify instance=False. Note that the listener’s id() was saved at construction time (since it may get garbage collected at any time) so the return value of name() is not necessarily unique if the callable has died (because id’s can be re-used after garbage collection).

setCurriedArgs(**curriedArgs)[source]

Curry the wrapped listener so it appears to not have list(curriedArgs) among its parameters. The curriedArgs key-value pairs will be given to wrapped listener at call time.

typeName() str[source]

Get a type name for the listener. This is a class name or function name, as appropriate.

wantsAllMessageData() bool[source]

True if this listener wants all message data: it has a ** kwargs argument

wantsTopicObjOnCall() bool[source]

True if this listener wants topic object: it has a arg=pub.AUTO_TOPIC