Streams API¶
New in version 0.6.
aiozmq provides a high level stream oriented API on top of the low-level
API (ZmqTransport
and ZmqProtocol
) which can provide a
more convinient API.
Here’s an example:
import asyncio
import aiozmq
import zmq
async def go():
router = await aiozmq.create_zmq_stream(
zmq.ROUTER,
bind='tcp://127.0.0.1:*')
addr = list(router.transport.bindings())[0]
dealer = await aiozmq.create_zmq_stream(
zmq.DEALER,
connect=addr)
for i in range(10):
msg = (b'data', b'ask', str(i).encode('utf-8'))
dealer.write(msg)
data = await router.read()
router.write(data)
answer = await dealer.read()
print(answer)
dealer.close()
router.close()
asyncio.run(go())
The code creates two streams for request and response part of ZeroMQ connection and sends message through the wire with waiting for response.
Socket events can also be monitored when using streams.
import asyncio
import aiozmq
import zmq
async def monitor_stream(stream):
try:
while True:
event = await stream.read_event()
print(event)
except aiozmq.ZmqStreamClosed:
pass
async def go():
router = await aiozmq.create_zmq_stream(zmq.ROUTER, bind="tcp://127.0.0.1:*")
addr = list(router.transport.bindings())[0]
dealer = await aiozmq.create_zmq_stream(zmq.DEALER)
await dealer.transport.enable_monitor()
asyncio.Task(monitor_stream(dealer))
await dealer.transport.connect(addr)
for i in range(10):
msg = (b"data", b"ask", str(i).encode("utf-8"))
dealer.write(msg)
data = await router.read()
router.write(data)
answer = await dealer.read()
print(answer)
router.close()
dealer.close()
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
create_zmq_stream¶
- aiozmq.create_zmq_stream(zmq_type, *, bind=None, connect=None, loop=None, zmq_sock=None, high_read=None, low_read=None, high_write=None, low_write=None, events_backlog=100)[source]¶
A wrapper for
create_zmq_connection()
returning a ZeroMQ stream (ZmqStream
instance).The arguments are all the usual arguments to
create_zmq_connection()
plus high and low watermarks for reading and writing messages.This function is a coroutine.
- Parameters:
zmq_type (int) – a type of ZeroMQ socket (zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB, zmq.PAIR*, zmq.DEALER, zmq.ROUTER, zmq.PULL, zmq.PUSH, etc.)
bind (str or iterable of strings) –
endpoints specification.
Every endpoint generates call to
ZmqTransport.bind()
for accepting connections from specified endpoint.Other side should use connect parameter to connect to this transport.
connect (str or iterable of strings) –
endpoints specification.
Every endpoint generates call to
ZmqTransport.connect()
for connecting transport to specified endpoint.Other side should use bind parameter to wait for incoming connections.
zmq_sock (zmq.Socket) – a preexisting zmq socket that will be passed to returned transport.
loop (asyncio.AbstractEventLoop) – optional event loop instance,
None
for default event loop.high_read (int) – high-watermark for reading from ZeroMQ socket.
None
by default (no limits).low_read (int) – low-watermark for reading from ZeroMQ socket.
None
by default (no limits).high_write (int) – high-watermark for writing into ZeroMQ socket.
None
by default (no limits).low_write (int) – low-watermark for writing into ZeroMQ socket.
None
by default (no limits).events_backlog (int) –
backlog size for monitoring events,
100
by default. It specifies size of event queue. If count of unread events exceeds events_backlog the oldest events are discarded.Use
None
for unlimited backlog size.
- Returns:
ZeroMQ stream object,
ZmqStream
instance.
New in version 0.7: events_backlog parameter
ZmqStream¶
- class aiozmq.ZmqStream[source]¶
A class for sending and receiving ZeroMQ messages.
- transport¶
ZmqTransport
instance, used for the stream.
- at_closing()[source]¶
Return
True
if the buffer is empty andfeed_closing()
was called.
- drain()[source]¶
Wait until the write buffer of the underlying transport is flushed.
The intended use is to write:
w.write(data) await w.drain()
When the transport buffer is full (the protocol is paused), block until the buffer is (partially) drained and the protocol is resumed. When there is nothing to wait for, the await continues immediately.
This method is a coroutine.
- get_extra_info(name, default=None)[source]¶
Return optional transport information: see
asyncio.BaseTransport.get_extra_info()
.
- read()[source]¶
Read one ZeroMQ message from the wire and return it.
Raise
ZmqStreamClosed
if the stream was closed.
- read_event()[source]¶
Read one ZeroMQ monitoring event and return it.
Raise
ZmqStreamClosed
if the stream was closed.Monitoring mode should be enabled by
ZmqTransport.enable_monitor()
call first:await stream.transport.enable_monitor()
New in version 0.7.
Internal API
- set_exception(exc)[source]¶
Set the exception to exc. The exception may be retrieved by
exception()
call or raised by nextread()
, the private method.