PYZO(1) | Pyzo | PYZO(1) |
pyzo - Pyzo Documentation
API docs in progress.
IEP is organized in several subpackages, some of which can be used completely independent from the rest. Although IEP the IDE requires Qt and Python 3, some of its subpackages can safely be imported without these dependencies…
Contents:
This subpackage is independent of any other components of IEP and only has Qt (PySide2 or PyQt5) as a dependency. It works on Python 3 and may also still work on Python2.
Content and API will come here.
Yoton is a Python package that provides a simple interface to communicate between two or more processes.
Yoton is independent of any other component of IEP and has no dependencies except Python itself. It runs on any Python version from 2.4.
----
How it works:
Messaging patterns:
Some features:
One end
import yoton # Create one context and a pub channel ct1 = yoton.Context(verbose=verbosity) pub = yoton.PubChannel(ct1, 'chat') # Connect ct1.bind('publichost:test') # Send pub.send('hello world')
Other end
import yoton # Create another context and a sub channel ct2 = yoton.Context(verbose=verbosity) sub = yoton.SubChannel(ct2, 'chat') # Connect ct2.connect('publichost:test') # Receive print(sub.recv())
[image]
Two contexts. One takes a server role by having a publish-channel and a reply-channel. The other takes on a client role by deploying the corresponding subscribe-channel and request-channel. [image]
This network contains only two types of context: requesters and repliers. Yoton performs a simple load balancing scheme: when the user posts a request, the req-channel first asks all repliers whether they can want to handle it. Eventually all repliers will answer that they do, but the actual request is only send to the first to respond. Requesters that are handling a previous request are unable to respond quickly so that the request will be handled by a "free" replier automatically. If all requesters are busy, the first to "come back" will handle the request. [image]
This network consists of context which all take the same role; they all send chat messages and receive chat messages of the other nodes. One big chat room! [image]
This network illustrates a simplified version of what yoton was initially designed for: client-kernel communication. IEP uses yoton for its kernel-client communications, see here which channels IEP uses for that.
The network consists of one kernel and two clients which are connected via a broker. Both clients can control the kernel via an stdin stream and receive output on stdout. The kernel also has a reply-channel so that the IDE's can obtain introspection information (think auto-completion). The broker also publishes some status messages. The bottom kernel is apparently only interested in kernel control. [image]
A context represents a node in the network. It can connect to multiple other contexts (using a yoton.Connection. These other contexts can be in another process on the same machine, or on another machine connected via a network or the internet.
This class represents a context that can be used by channel instances to communicate to other channels in the network. (Thus the name.)
The context is the entity that queue routes the packages produced by the channels to the other context in the network, where the packages are distributed to the right channels. A context queues packages while it is not connected to any other context.
If messages are send on a channel registered at this context while the context is not connected, the messages are stored by the context and will be send to the first connecting context.
Example 1
# Create context and bind to a port on localhost context = yoton.Context() context.bind('localhost:11111') # Create a channel and send a message pub = yoton.PubChannel(context, 'test') pub.send('Hello world!')
Example 2
# Create context and connect to the port on localhost context = yoton.Context() context.connect('localhost:11111') # Create a channel and receive a message sub = yoton.SubChannel(context, 'test') print(sub.recv() # Will print 'Hello world!'
Queue params
The queue_params parameter allows one to specify the package queues used in the system. It is recommended to use the same parameters for every context in the network. The value of queue_params should be a 2-element tuple specifying queue size and discard mode. The latter can be ‘old’ (default) or ‘new’, meaning that if the queue is full, either the oldest or newest messages are discarted.
PROPERTIES
METHODS
Returns a Connection instance that represents the connection to the other context. These connection objects can also be obtained via the Context.connections property.
Parameters
Notes on hostname
Closing a connection means disconnecting two contexts. Closing a channel means disasociating a channel from its context. Unlike connections and channels, a Context instance can be reused after closing (although this might not always the best strategy).
Returns a Connection instance that represents the connection to the other context. These connection objects can also be obtained via the Context.connections property.
Parameters
Notes on hostname
Raises an error when the flushing times out.
The connection classes represent the connection between two context. There is one base class (yoton.Connection) and currently there is one implementation: the yoton.TcpConnection. In the future other connections might be added that use other methods than TCP/IP.
Abstract base class for a connection between two Context objects. This base class defines the full interface; subclasses only need to implement a few private methods.
The connection classes are intended as a simple interface for the user, for example to query port number, and be notified of timeouts and closing of the connection.
All connection instances are intended for one-time use. To make a new connection, instantiate a new Connection object. After instantiation, either _bind() or _connect() should be called.
PROPERTIES
Handlers are called with two arguments: the ContextConnection instance, and a boolean. The latter is True when the connection times out, and False when data is received again.
METHODS
Optionally, a reason for closing can be specified. A closed connection cannot be reused.
Optionally, a reason for stopping can be specified. This is highly recommended in case the connection is closed due to a problem.
In contrast to the normal close() method, this method does not try to notify the other end of the closing.
The TcpConnection class implements a connection between two contexts that are in differenr processes or on different machines connected via the internet.
This class handles the low-level communication for the context. A ContextConnection instance wraps a single BSD socket for its communication, and uses TCP/IP as the underlying communication protocol. A persisten connection is used (the BSD sockets stay connected). This allows to better distinguish between connection problems and timeouts caused by the other side being busy.
The channel classes represent the mechanism for the user to send messages into the network and receive messages from it. A channel needs a context to function; the context represents a node in the network.
To be able to route messages to the right channel, channels are associated with a slot (a string name). This slot consists of a user-defined base name and an extension to tell the message type and messaging pattern. Messages send from a channel with slot X, are only received by channels with the same slot X. Slots are case insensitive.
Yoton supports three base messaging patterns. For each messaging pattern there are specific channel classes. All channels derive from yoton.BaseChannel.
publish/subscribe The yoton.PubChannel class is used for sending messages into the network, and the yoton.SubChannel class is used to receiving these messages. Multiple PubChannels and SubChannels can exist in the same network at the same slot; the SubChannels simply collect the messages send by all PubChannels.
request/reply The yoton.ReqChannel class is used to do requests, and the yoton.RepChannel class is used to reply to requests. If multiple ReqChannels are present at the same slot, simple load balancing is performed.
state The yoton.StateChannel class is used to communicate state to other state channels. Each yoton.StateChannel can set and get the state.
Messages are of a specific type (text, binary, …), the default being Unicode text. The third (optional) argument to a Channel’s initializer is a MessageType object that specifies how messages should be converted to bytes and the other way around.
This way, the channels classes themself can be agnostic about the message type, while the user can implement its own MessageType class to send whatever messages he/she likes.
Abstract class for all channels.
Parameters
Details
Messages send via a channel are delivered asynchronically to the corresponding channels.
All channels are associated with a context and can be used to send messages to other channels in the network. Each channel is also associated with a slot, which is a string that represents a kind of address. A message send by a channel at slot X can only be received by a channel with slot X.
Note that the channel appends an extension to the user-supplied slot name, that represents the message type and messaging pattern of the channel. In this way, it is prevented that for example a PubChannel can communicate with a RepChannel.
PROPERTIES
METHODS
Future attempt to send() messages will result in an IOError being raised. Messages currently in the channel’s queue can still be recv()’ed, but no new messages will be delivered at this channel.
The publish part of the publish/subscribe messaging pattern. Sent messages are received by all yoton.SubChannel instances with the same slot.
There are no limitations for this channel if events are not processed.
Parameters
METHODS
The message is queued and delivered to all corresponding SubChannels (i.e. with the same slot) in the network.
The subscribe part of the publish/subscribe messaging pattern. Received messages were sent by a yoton.PubChannel instance at the same slot.
This channel can be used as an iterator, which yields all pending messages. The function yoton.select_sub_channel can be used to synchronize multiple SubChannel instances.
If no events being processed this channel works as normal, except that the received signal will not be emitted, and sync mode will not work.
Parameters
METHODS
If block is False, returns empty message if no data is available. If block is True, waits forever until data is available. If block is an int or float, waits that many seconds. If the channel is closed, returns empty message.
The returned messages are all received before the first pending message in the other SUB-channels given to select_sub_channel.
The combination of this method and the function select_sub_channel enables users to combine multiple SUB-channels in a way that preserves the original order of the messages.
This feature can be used to limit the rate of senders if the consumer (i.e. the one that calls recv()) cannot keep up with processing the data.
This feature requires the yoton event loop to run at the side of the SubChannel (not necessary for the yoton.PubChannel side).
This function can be used to read from SubCannels instances in the order that the messages were send.
After calling this function, use channel.recv_selected() to obtain all messages that are older than any pending messages in the other given channels.
The request part of the request/reply messaging pattern. A ReqChannel instance sends request and receive the corresponding replies. The requests are replied by a yoton.RepChannel instance.
This class adopts req/rep in a remote procedure call (RPC) scheme. The handling of the result is done using a yoton.Future object, which follows the approach specified in PEP 3148. Note that for the use of callbacks, the yoton event loop must run.
Basic load balancing is performed by first asking all potential repliers whether they can handle a request. The actual request is then send to the first replier to respond.
Parameters
Usage
One performs a call on a virtual method of this object. The actual method is executed by the yoton.RepChannel instance. The method can be called with normal and keyword arguments, which can be (a combination of): None, bool, int, float, string, list, tuple, dict.
Example
# Fast, but process is idling when waiting for the response. reply = req.add(3,4).result(2.0) # Wait two seconds # Asynchronous processing, but no waiting. def reply_handler(future):
... # Handle reply future = req.add(3,4) future.add_done_callback(reply_handler)
The reply part of the request/reply messaging pattern. A RepChannel instance receives request and sends the corresponding replies. The requests are send from a yoton.ReqChannel instance.
This class adopts req/rep in a remote procedure call (RPC) scheme.
To use a RepChannel, subclass this class and implement the methods that need to be available. The reply should be (a combination of) None, bool, int, float, string, list, tuple, dict.
This channel needs to be set to event or thread mode to function (in the first case yoton events need to be processed too). To stop handling events again, use set_mode(‘off’).
Parameters
METHODS
The Future object represents the future result of a request done at a yoton.ReqChannel.
METHODS
Added callables are called in the order that they were added. If the callable raises a Exception subclass, it will be logged and ignored. If the callable raises a BaseException subclass, the behavior is undefined.
If the future has already completed or been cancelled, fn will be called immediately.
If the future is cancelled before completing then CancelledError will be raised.
If the call completed without raising, None is returned.
If the future is cancelled before completing then CancelledError will be raised.
If the call raised, this method will raise the same exception.
If timeout is None, there is no limit to the wait time.
If the method returns False then the Future was cancelled, i.e. Future.cancel() was called and returned True.
If the method returns True then the Future was not cancelled and has been put in the running state, i.e. calls to Future.running() will return True.
This method can only be called once and cannot be called after Future.set_result() or Future.set_exception() have been called.
Channel class for the state messaging pattern. A state is synchronized over all state channels of the same slot. Each channel can send (i.e. set) the state and recv (i.e. get) the current state. Note however, that if two StateChannel instances set the state around the same time, due to the network delay, it is undefined which one sets the state the last.
The context will automatically call this channel’s send_last() method when a new context enters the network.
The recv() call is always non-blocking and always returns the last received message: i.e. the current state.
There are no limitations for this channel if events are not processed, except that the received signal is not emitted.
Parameters
METHODS
The state-message is queued and send over the socket by the IO-thread. Zero-length messages are ignored.
Module yoton.events
Yoton comes with a simple event system to enable event-driven applications.
All channels are capable of running without the event system, but some channels have limitations. See the documentation of the channels for more information. Note that signals only work if events are processed.
The purpose of a signal is to provide an interface to bind/unbind to events and to fire them.
One can bind() or unbind() a callable to the signal. When emitted, an event is created for each bound handler. Therefore, the event loop must run for signals to work.
Some signals call the handlers using additional arguments to specify specific information.
PROPERTIES
METHODS
The callback/handler (func) must be a callable. It is called with one argument: the event instance, which can contain additional information about the event.
*args and
**kwargs. An event is queues for each callback registered to this signal. Therefore it is safe to call this method from another thread.
Timer class. You can bind callbacks to the timer. The timer is fired when it runs out of time.
Parameters
PROPERTIES
METHODS
Parameters
block can be False (no blocking), True (block), or a float blocking for maximally ‘block’ seconds.
Use None as an argument to disable the embedding.
yoton.clientserver.py
Yoton comes with a small framework to setup a request-reply pattern using a client-server model (over a non-persistent connection), similar to telnet. This allows one process to easily ask small pieces of information from another process.
To create a server, create a class that inherits from yoton.RequestServer and implement its handle_request() method.
A client process can simply use the yoton.do_request function. Example: yoton.do_request('www.google.com:80', 'GET http/1.1\r\n')
The client server model is implemented using one function and one class: yoton.do_request and yoton.RequestServer.
Details
The server implements a request/reply pattern by listening at a socket. Similar to telnet, each request is handled using a connection and the socket is closed after the response is send.
The request server can setup to run in the main thread, or can be started using its own thread. In the latter case, one can easily create multiple servers in a single process, that listen on different ports.
The client server model is implemented using one function and one class: yoton.do_request and yoton.RequestServer.
Parameters
Notes on hostname
Setup a simple server that handles requests similar to a telnet server, or asyncore. Starting the server using run() will run the server in the calling thread. Starting the server using start() will run the server in a separate thread.
To create a server, subclass this class and re-implement the handle_request method. It accepts a request and should return a reply. This server assumes utf-8 encoded messages.
Parameters
Notes on hostname
In yoton, the yoton.Context is the object that represents the a node in the network. The context only handles packages. It gets packages from all its associated channels and from the other nodes in the network. It routes packages to the other nodes, and deposits packages in channel instances if the package slot matches the channel slot.
The yoton.Connection represents a one-to-one connection between two contexts. It handles the low level messaging. It breaks packages into pieces and tries to send them as efficiently as possible. It also receives bytes from the other end, and reconstructs it into packages, which are then given to the context to handle (i.e. route). For the yoton.TcpConnection this is all done by dedicated io threads.
Packages are simply a bunch of bytes (the encoded message), wrapped in a header. Packages are directed at a certain slot. They also have a source id, source sequence number, and optionally a destination id and destination sequence number (so that packages can be replies to other packages). When a package is received, it also gets assigned a receiving sequence number (in order to synchronize channels).
Two yoton.Connection instances also communicate directly with each-other. They do this during the handshaking procedure, obviously, but also during operation they send each-other heart beat messages to detect time-outs. When the connection is closed in a nice way, thet also send a close message to the other end. A package addressed directly at the Connection has no body (consists only of a header).
Two contexts can also communicate. They do this to notify each-other of new formed connections, closing of contexts, etc. A package directed at a context uses a special slot.
Channel instances can also communicate. Well, that’s what yoton is all about… A sending channels packs a message in a package and gives it to the contect. All other contexts will receive the package and deposit it in the channel’s queue if the slots match. On recv’ing, the message is extracted/decoded from the package.
Two yoton.TcpConnection instances are connected using a single BSD-socket (TCP/IP). The socket operates in persistent mode; once the connection is established, the socket remains open until the connection is closed indefinetely.
Would we adopt a req/rep approach (setting up the connection for each request), failure could mean either that the kernel is running extension code, or that the connection is broken. It’s not possible to differentiate between the two.
On initialization of the connection, TcpConnection’s perform a small handshake procedue to establish that both are a yoton.TcpConnection objects, and to exchange the context id’s.
There is one thread dedicated to receive data from the socket, and subsequently have the context route the packages. Another dedicated thread gets data from a queue (of the Connection) and sends the packages over the sockets. The sockets and queues are blocking, but on a timeout (the receiving thread uses a select() call for this). This makes it easy to periodically send heartbeat packages if necessary, and their absence can be detected.
In a previous design, there was a single io thread per context that did all the work. It would run through a generator function owned by the connections to send/receive data. This required all queueing and io to be non-blocking. After changing the design the code got much smaller, cleaner and easier to read, and is probably more robust. We could also get rid of several classes to buffer data, because with blocking threads the data can simply be buffered at the queues and sockets.
To differentiate between messages, there are two common approaches. One can add a small header to each message that indicates how long the message is. Or one can delimit the messages with a specific character.
In earlier designs, yoton used the second approach and was limited to sending text which was encoded using utf-8. This meant the bytes 0xff and 0xfe could be used for delimiting.
The first approach is more complex and requires more per-message processing. However, because the message size is know, messages can be received with much less copying of data. This significantly improved the performance for larger messages (with the delimiting approach we would get memory errors when Yoton tried to encode/decode the message to/from utf-8).
The current design is such that as little data has to be copied (particularly for larger messages).
If there is no data to send for a while, small heart beat messages are produced, so that connection problems can be easily detected. For TCP one needs to send data in order to detect connection problem (because no ACK’s will be received). However, the TCP timeout is in the order of minutes and is different between OS’s. Therefore we check when the last time was that data was received, enabling us to detect connection problems in the order of a few seconds.
Note that when two Context’s are connected using ‘localhost’, there is no way for the connection to be lost, as several network layers are bypassed. In such a situation, we can therefore be sure that the reason for the timeout lies not in the connection, but is caused for example by the process running extension code.
With respect to client-kernel comminication: the kernel will not be able to send any data (neither heart beat signals) if its running extension code. In such a case, the client can still send messages; this data is transported by TCP and ends up in the network buffer until the kernel returns from extension code and starts receiving messages again.
For this reason, in a client-kernel configuration, the kernel should always be connected to another process via ‘localhost’, and should use a proxi/broker to connect with clients on another box.
In that case, the client can detect that the kernel is running extension code because the kernel stopped sending data (incl heartbeat messages).
In any communication system, there is a risk of congestion: one end sends data faster than the other end can process it. This data can be buffered, but as the buffer fills, it consumes more memory.
Yoton uses two approaches to solve this problem. The first (and most common) solution is that all queues have a maximum size. When this size is reached and a new messages is added, messages will be discarted. The user can choose whether the oldest or the newest message should be discarted.
The second approach is only possible for the PUB/SUB channels. If the yoton.SubChannel is put in sync-mode (using the set_sync_mode method), the yoton.SubChannel will send a message to the corresponding PubChannels if its queue reaches a certain size. This size is relatively small (e.g. 10-100). When a yoton.PubChannel receives the message, its send method will block (for at most 1 second). The SubChannel sends a second message when the queue is below a certain level again. Note that it takes a while for these control messages to be received by the PubChannel. Therefore the actual queue size can easily grow larger than the threshold. In this situation, the first approach (discarting messages is still used as a failsave, but messages are very unlikely to be discarted since the threshold is much much smaller than the maximum queue size.
An important aspect for the second approach is that the queue that buffers packages before they are send over the socket remains small. If this is not the case, the PubChannel is able to spam the queue with gigantic amounts of messages before the SubChannel even receives the first message. To keep this queue small, much like the queue of the SubChannel, it has a certain threshold. If this threshold is reached, subsequent pushes on the queue will block for maximally 1 second. The threshold is in the same order of magnitude as the queue for the SubChannel.
I performed a series on tests on bot Windows and Linux. Testing sockets on localhost and publichost (what you get with gethostname()), for killing one of the processes, and removing the connection (unplugging the cable).
Pyzo contributors
2022, Pyzo contributors
November 12, 2022 | 3.4 |