Streams API

New in version 0.6.

aiozmq provides a high level stream oriented API on top of the low-level API (ZmqTransport and ZmqProtocol) which can provide a more convinient API.

Here’s an example:

import asyncio
import aiozmq
import zmq

@asyncio.coroutine
def go():
    router = yield from aiozmq.create_zmq_stream(
        zmq.ROUTER,
        bind='tcp://127.0.0.1:*')

    addr = list(router.transport.bindings())[0]
    dealer = yield from aiozmq.create_zmq_stream(
        zmq.DEALER,
        connect=addr)

    for i in range(10):
        msg = (b'data', b'ask', str(i).encode('utf-8'))
        dealer.write(msg)
        data = yield from router.read()
        router.write(data)
        answer = yield from dealer.read()
        print(answer)
    dealer.close()
    router.close()

asyncio.get_event_loop().run_until_complete(go())

The code creates two streams for request and response part of ZeroMQ connection and sends message through the wire with waiting for response.

Socket events can also be monitored when using streams.

import asyncio
import aiozmq
import zmq


@asyncio.coroutine
def monitor_stream(stream):
    try:
        while True:
            event = yield from stream.read_event()
            print(event)
    except aiozmq.ZmqStreamClosed:
        pass


@asyncio.coroutine
def go():
    router = yield from aiozmq.create_zmq_stream(
        zmq.ROUTER,
        bind='tcp://127.0.0.1:*')
    addr = list(router.transport.bindings())[0]

    dealer = yield from aiozmq.create_zmq_stream(
        zmq.DEALER)

    yield from dealer.transport.enable_monitor()

    asyncio.Task(monitor_stream(dealer))

    yield from dealer.transport.connect(addr)

    for i in range(10):
        msg = (b'data', b'ask', str(i).encode('utf-8'))
        dealer.write(msg)
        data = yield from router.read()
        router.write(data)
        answer = yield from dealer.read()
        print(answer)

    router.close()
    dealer.close()


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    main()

create_zmq_stream

aiozmq.create_zmq_stream(zmq_type, *, bind=None, connect=None, loop=None, zmq_sock=None, high_read=None, low_read=None, high_write=None, low_write=None, events_backlog=100)[source]

A wrapper for create_zmq_connection() returning a ZeroMQ stream (ZmqStream instance).

The arguments are all the usual arguments to create_zmq_connection() plus high and low watermarks for reading and writing messages.

This function is a coroutine.

Parameters:
  • zmq_type (int) – a type of ZeroMQ socket (zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB, zmq.PAIR*, zmq.DEALER, zmq.ROUTER, zmq.PULL, zmq.PUSH, etc.)
  • bind (str or iterable of strings) –

    endpoints specification.

    Every endpoint generates call to ZmqTransport.bind() for accepting connections from specified endpoint.

    Other side should use connect parameter to connect to this transport.

  • connect (str or iterable of strings) –

    endpoints specification.

    Every endpoint generates call to ZmqTransport.connect() for connecting transport to specified endpoint.

    Other side should use bind parameter to wait for incoming connections.

  • zmq_sock (zmq.Socket) – a preexisting zmq socket that will be passed to returned transport.
  • loop (asyncio.AbstractEventLoop) – optional event loop instance, None for default event loop.
  • high_read (int) – high-watermark for reading from ZeroMQ socket. None by default (no limits).
  • low_read (int) – low-watermark for reading from ZeroMQ socket. None by default (no limits).
  • high_write (int) – high-watermark for writing into ZeroMQ socket. None by default (no limits).
  • low_write (int) – low-watermark for writing into ZeroMQ socket. None by default (no limits).
  • events_backlog (int) –

    backlog size for monitoring events, 100 by default. It specifies size of event queue. If count of unread events exceeds events_backlog the oldest events are discarded.

    Use None for unlimited backlog size.

Returns:

ZeroMQ stream object, ZmqStream instance.

New in version 0.7: events_backlog parameter

ZmqStream

class aiozmq.ZmqStream[source]

A class for sending and receiving ZeroMQ messages.

transport

ZmqTransport instance, used for the stream.

at_closing()[source]

Return True if the buffer is empty and feed_closing() was called.

close()[source]

Close the stream and underlying ZeroMQ socket.

drain()[source]

Wait until the write buffer of the underlying transport is flushed.

The intended use is to write:

w.write(data)
yield from w.drain()

When the transport buffer is full (the protocol is paused), block until the buffer is (partially) drained and the protocol is resumed. When there is nothing to wait for, the yield-from continues immediately.

This method is a coroutine.

exception()[source]

Get the stream exception.

get_extra_info(name, default=None)[source]

Return optional transport information: see asyncio.BaseTransport.get_extra_info().

read()[source]

Read one ZeroMQ message from the wire and return it.

Raise ZmqStreamClosed if the stream was closed.

read_event()[source]

Read one ZeroMQ monitoring event and return it.

Raise ZmqStreamClosed if the stream was closed.

Monitoring mode should be enabled by ZmqTransport.enable_monitor() call first:

yield from stream.transport.enable_monitor()

New in version 0.7.

write(msg)[source]

Writes message msg into ZeroMQ socket.

Parameters:msg – a sequence (tuple or list), containing multipart message daata.

Internal API

set_exception(exc)[source]

Set the exception to exc. The exception may be retrieved by exception() call or raised by next read(), the private method.

set_transport(transport)[source]

Set the transport to transport, the private method.

set_read_buffer_limits(high=None, low=None)[source]

Set read buffer limits, the private method.

feed_closing()[source]

Feed the socket closing signal, the private method.

feed_msg(msg)[source]

Feed msg message to the stream’s internal buffer. Any operations waiting for the data will be resumed.

The private method.

feed_event(event)[source]

Feed a socket event message to the stream’s internal buffer.

The private method.

Exceptions

exception aiozmq.ZmqStreamClosed[source]

Raised by read operations on closed stream.