Streams API¶
New in version 0.6.
aiozmq provides a high level stream oriented API on top of the low-level
API (ZmqTransport
and ZmqProtocol
) which can provide a
more convinient API.
Here’s an example:
import asyncio
import aiozmq
import zmq
@asyncio.coroutine
def go():
router = yield from aiozmq.create_zmq_stream(
zmq.ROUTER,
bind='tcp://127.0.0.1:*')
addr = list(router.transport.bindings())[0]
dealer = yield from aiozmq.create_zmq_stream(
zmq.DEALER,
connect=addr)
for i in range(10):
msg = (b'data', b'ask', str(i).encode('utf-8'))
dealer.write(msg)
data = yield from router.read()
router.write(data)
answer = yield from dealer.read()
print(answer)
dealer.close()
router.close()
asyncio.get_event_loop().run_until_complete(go())
The code creates two streams for request and response part of ZeroMQ connection and sends message through the wire with waiting for response.
Socket events can also be monitored when using streams.
import asyncio
import aiozmq
import zmq
@asyncio.coroutine
def monitor_stream(stream):
try:
while True:
event = yield from stream.read_event()
print(event)
except aiozmq.ZmqStreamClosed:
pass
@asyncio.coroutine
def go():
router = yield from aiozmq.create_zmq_stream(
zmq.ROUTER,
bind='tcp://127.0.0.1:*')
addr = list(router.transport.bindings())[0]
dealer = yield from aiozmq.create_zmq_stream(
zmq.DEALER)
yield from dealer.transport.enable_monitor()
asyncio.Task(monitor_stream(dealer))
yield from dealer.transport.connect(addr)
for i in range(10):
msg = (b'data', b'ask', str(i).encode('utf-8'))
dealer.write(msg)
data = yield from router.read()
router.write(data)
answer = yield from dealer.read()
print(answer)
router.close()
dealer.close()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
create_zmq_stream¶
-
aiozmq.
create_zmq_stream
(zmq_type, *, bind=None, connect=None, loop=None, zmq_sock=None, high_read=None, low_read=None, high_write=None, low_write=None, events_backlog=100)[source]¶ A wrapper for
create_zmq_connection()
returning a ZeroMQ stream (ZmqStream
instance).The arguments are all the usual arguments to
create_zmq_connection()
plus high and low watermarks for reading and writing messages.This function is a coroutine.
Parameters: - zmq_type (int) – a type of ZeroMQ socket (zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB, zmq.PAIR*, zmq.DEALER, zmq.ROUTER, zmq.PULL, zmq.PUSH, etc.)
- bind (str or iterable of strings) –
endpoints specification.
Every endpoint generates call to
ZmqTransport.bind()
for accepting connections from specified endpoint.Other side should use connect parameter to connect to this transport.
- connect (str or iterable of strings) –
endpoints specification.
Every endpoint generates call to
ZmqTransport.connect()
for connecting transport to specified endpoint.Other side should use bind parameter to wait for incoming connections.
- zmq_sock (zmq.Socket) – a preexisting zmq socket that will be passed to returned transport.
- loop (asyncio.AbstractEventLoop) – optional event loop
instance,
None
for default event loop. - high_read (int) – high-watermark for reading from
ZeroMQ socket.
None
by default (no limits). - low_read (int) – low-watermark for reading from
ZeroMQ socket.
None
by default (no limits). - high_write (int) – high-watermark for writing into
ZeroMQ socket.
None
by default (no limits). - low_write (int) – low-watermark for writing into
ZeroMQ socket.
None
by default (no limits). - events_backlog (int) –
backlog size for monitoring events,
100
by default. It specifies size of event queue. If count of unread events exceeds events_backlog the oldest events are discarded.Use
None
for unlimited backlog size.
Returns: ZeroMQ stream object,
ZmqStream
instance.New in version 0.7: events_backlog parameter
ZmqStream¶
-
class
aiozmq.
ZmqStream
[source]¶ A class for sending and receiving ZeroMQ messages.
-
transport
¶ ZmqTransport
instance, used for the stream.
-
at_closing
()[source]¶ Return
True
if the buffer is empty andfeed_closing()
was called.
-
drain
()[source]¶ Wait until the write buffer of the underlying transport is flushed.
The intended use is to write:
w.write(data) yield from w.drain()
When the transport buffer is full (the protocol is paused), block until the buffer is (partially) drained and the protocol is resumed. When there is nothing to wait for, the yield-from continues immediately.
This method is a coroutine.
-
get_extra_info
(name, default=None)[source]¶ Return optional transport information: see
asyncio.BaseTransport.get_extra_info()
.
-
read
()[source]¶ Read one ZeroMQ message from the wire and return it.
Raise
ZmqStreamClosed
if the stream was closed.
-
read_event
()[source]¶ Read one ZeroMQ monitoring event and return it.
Raise
ZmqStreamClosed
if the stream was closed.Monitoring mode should be enabled by
ZmqTransport.enable_monitor()
call first:yield from stream.transport.enable_monitor()
New in version 0.7.
-
write
(msg)[source]¶ Writes message msg into ZeroMQ socket.
Parameters: msg – a sequence ( tuple
orlist
), containing multipart message daata.
Internal API
-
set_exception
(exc)[source]¶ Set the exception to exc. The exception may be retrieved by
exception()
call or raised by nextread()
, the private method.
-