Examples of aiozmq usage¶
There is a list of examples from aiozmq/examples
Every example is a correct tiny python program.
Simple DEALER-ROUTER pair implemented on Core level¶
import asyncio
import aiozmq
import zmq
class ZmqDealerProtocol(aiozmq.ZmqProtocol):
    transport = None
    def __init__(self, queue, on_close):
        self.queue = queue
        self.on_close = on_close
    def connection_made(self, transport):
        self.transport = transport
    def msg_received(self, msg):
        self.queue.put_nowait(msg)
    def connection_lost(self, exc):
        self.on_close.set_result(exc)
class ZmqRouterProtocol(aiozmq.ZmqProtocol):
    transport = None
    def __init__(self, on_close):
        self.on_close = on_close
    def connection_made(self, transport):
        self.transport = transport
    def msg_received(self, msg):
        self.transport.write(msg)
    def connection_lost(self, exc):
        self.on_close.set_result(exc)
@asyncio.coroutine
def go():
    router_closed = asyncio.Future()
    dealer_closed = asyncio.Future()
    router, _ = yield from aiozmq.create_zmq_connection(
        lambda: ZmqRouterProtocol(router_closed),
        zmq.ROUTER,
        bind='tcp://127.0.0.1:*')
    addr = list(router.bindings())[0]
    queue = asyncio.Queue()
    dealer, _ = yield from aiozmq.create_zmq_connection(
        lambda: ZmqDealerProtocol(queue, dealer_closed),
        zmq.DEALER,
        connect=addr)
    for i in range(10):
        msg = (b'data', b'ask', str(i).encode('utf-8'))
        dealer.write(msg)
        answer = yield from queue.get()
        print(answer)
    dealer.close()
    yield from dealer_closed
    router.close()
    yield from router_closed
def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")
if __name__ == '__main__':
    main()
DEALER-ROUTER pair implemented with streams¶
import asyncio
import aiozmq
import zmq
@asyncio.coroutine
def go():
    router = yield from aiozmq.create_zmq_stream(
        zmq.ROUTER,
        bind='tcp://127.0.0.1:*')
    addr = list(router.transport.bindings())[0]
    dealer = yield from aiozmq.create_zmq_stream(
        zmq.DEALER,
        connect=addr)
    for i in range(10):
        msg = (b'data', b'ask', str(i).encode('utf-8'))
        dealer.write(msg)
        data = yield from router.read()
        router.write(data)
        answer = yield from dealer.read()
        print(answer)
    dealer.close()
    router.close()
def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")
if __name__ == '__main__':
    main()
Remote Procedure Call¶
import asyncio
import aiozmq.rpc
class ServerHandler(aiozmq.rpc.AttrHandler):
    @aiozmq.rpc.method
    def remote_func(self, a: int, b: int) -> int:
        return a + b
@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        ServerHandler(), bind='tcp://*:*')
    server_addr = list(server.transport.bindings())[0]
    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr)
    ret = yield from client.call.remote_func(1, 2)
    assert 3 == ret
    server.close()
    yield from server.wait_closed()
    client.close()
    yield from client.wait_closed()
def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")
if __name__ == '__main__':
    main()
Pipeline aka Notifier¶
import asyncio
import aiozmq.rpc
from itertools import count
class Handler(aiozmq.rpc.AttrHandler):
    def __init__(self):
        self.connected = False
    @aiozmq.rpc.method
    def remote_func(self, step, a: int, b: int):
        self.connected = True
        print("HANDLER", step, a, b)
@asyncio.coroutine
def go():
    handler = Handler()
    listener = yield from aiozmq.rpc.serve_pipeline(
        handler, bind='tcp://*:*')
    listener_addr = list(listener.transport.bindings())[0]
    notifier = yield from aiozmq.rpc.connect_pipeline(
        connect=listener_addr)
    for step in count(0):
        yield from notifier.notify.remote_func(step, 1, 2)
        if handler.connected:
            break
        else:
            yield from asyncio.sleep(0.01)
    listener.close()
    yield from listener.wait_closed()
    notifier.close()
    yield from notifier.wait_closed()
def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")
if __name__ == '__main__':
    main()
Publish-Subscribe¶
import asyncio
import aiozmq.rpc
from itertools import count
class Handler(aiozmq.rpc.AttrHandler):
    def __init__(self):
        self.connected = False
    @aiozmq.rpc.method
    def remote_func(self, step, a: int, b: int):
        self.connected = True
        print("HANDLER", step, a, b)
@asyncio.coroutine
def go():
    handler = Handler()
    subscriber = yield from aiozmq.rpc.serve_pubsub(
        handler, subscribe='topic', bind='tcp://127.0.0.1:*',
        log_exceptions=True)
    subscriber_addr = list(subscriber.transport.bindings())[0]
    print("SERVE", subscriber_addr)
    publisher = yield from aiozmq.rpc.connect_pubsub(
        connect=subscriber_addr)
    for step in count(0):
        yield from publisher.publish('topic').remote_func(step, 1, 2)
        if handler.connected:
            break
        else:
            yield from asyncio.sleep(0.1)
    subscriber.close()
    yield from subscriber.wait_closed()
    publisher.close()
    yield from publisher.wait_closed()
def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")
if __name__ == '__main__':
    main()
Translation RPC exceptions back to client¶
import asyncio
import aiozmq.rpc
class CustomError(Exception):
    def __init__(self, val):
        self.val = val
        super().__init__(val)
exc_name = CustomError.__module__+'.'+CustomError.__name__
error_table = {exc_name: CustomError}
class ServerHandler(aiozmq.rpc.AttrHandler):
    @aiozmq.rpc.method
    def remote(self, val):
        raise CustomError(val)
@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        ServerHandler(), bind='tcp://*:*')
    server_addr = list(server.transport.bindings())[0]
    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr,
        error_table=error_table)
    try:
        yield from client.call.remote('value')
    except CustomError as exc:
        exc.val == 'value'
    server.close()
    client.close()
def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")
if __name__ == '__main__':
    main()
Translation instances of custom classes via RPC¶
import asyncio
import aiozmq.rpc
import msgpack
class Point:
    def __init__(self, x, y):
        self.x = x
        self.y = y
    def __eq__(self, other):
        if isinstance(other, Point):
            return (self.x, self.y) == (other.x, other.y)
        return NotImplemented
translation_table = {
    0: (Point,
        lambda value: msgpack.packb((value.x, value.y)),
        lambda binary: Point(*msgpack.unpackb(binary))),
}
class ServerHandler(aiozmq.rpc.AttrHandler):
    @aiozmq.rpc.method
    def remote(self, val):
        return val
@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        ServerHandler(), bind='tcp://*:*',
        translation_table=translation_table)
    server_addr = list(server.transport.bindings())[0]
    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr,
        translation_table=translation_table)
    ret = yield from client.call.remote(Point(1, 2))
    assert ret == Point(1, 2)
    server.close()
    yield from server.wait_closed()
    client.close()
    yield from client.wait_closed()
def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")
if __name__ == '__main__':
    main()
Validation of RPC methods¶
import asyncio
import aiozmq.rpc
class ServerHandler(aiozmq.rpc.AttrHandler):
    @aiozmq.rpc.method
    def remote_func(self, a: int, b: int) -> int:
        return a + b
@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        ServerHandler(), bind='tcp://*:*')
    server_addr = list(server.transport.bindings())[0]
    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr)
    try:
        yield from client.call.unknown_function()
    except aiozmq.rpc.NotFoundError as exc:
        print("client.rpc.unknown_function(): {}".format(exc))
    try:
        yield from client.call.remote_func(bad_arg=1)
    except aiozmq.rpc.ParametersError as exc:
        print("client.rpc.remote_func(bad_arg=1): {}".format(exc))
    try:
        yield from client.call.remote_func(1)
    except aiozmq.rpc.ParametersError as exc:
        print("client.rpc.remote_func(1): {}".format(exc))
    try:
        yield from client.call.remote_func('a', 'b')
    except aiozmq.rpc.ParametersError as exc:
        print("client.rpc.remote_func('a', 'b'): {}".format(exc))
    server.close()
    yield from server.wait_closed()
    client.close()
    yield from client.wait_closed()
def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")
if __name__ == '__main__':
    main()
RPC lookup in nested namespaces¶
import asyncio
import aiozmq.rpc
class Handler(aiozmq.rpc.AttrHandler):
    def __init__(self, ident):
        self.ident = ident
        self.subhandler = SubHandler(self.ident, 'subident')
    @aiozmq.rpc.method
    def a(self):
        return (self.ident, 'a')
class SubHandler(aiozmq.rpc.AttrHandler):
    def __init__(self, ident, subident):
        self.ident = ident
        self.subident = subident
    @aiozmq.rpc.method
    def b(self):
        return (self.ident, self.subident, 'b')
@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        Handler('ident'), bind='tcp://*:*')
    server_addr = list(server.transport.bindings())[0]
    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr)
    ret = yield from client.call.a()
    assert ('ident', 'a') == ret
    ret = yield from client.call.subhandler.b()
    assert ('ident', 'subident', 'b') == ret
    server.close()
    yield from server.wait_closed()
    client.close()
    yield from client.wait_closed()
def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")
if __name__ == '__main__':
    main()
Use dict as RPC lookup table¶
import asyncio
import aiozmq.rpc
@aiozmq.rpc.method
def a():
    return 'a'
@aiozmq.rpc.method
def b():
    return 'b'
handlers_dict = {'a': a,
                 'subnamespace': {'b': b}}
@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        handlers_dict, bind='tcp://*:*')
    server_addr = list(server.transport.bindings())[0]
    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr)
    ret = yield from client.call.a()
    assert 'a' == ret
    ret = yield from client.call.subnamespace.b()
    assert 'b' == ret
    server.close()
    yield from server.wait_closed()
    client.close()
    yield from client.wait_closed()
def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")
if __name__ == '__main__':
    main()
Use dynamic RPC lookup¶
import asyncio
import aiozmq.rpc
class DynamicHandler(aiozmq.rpc.AttrHandler):
    def __init__(self, namespace=()):
        self.namespace = namespace
    def __getitem__(self, key):
        try:
            return getattr(self, key)
        except AttributeError:
            return DynamicHandler(self.namespace + (key,))
    @aiozmq.rpc.method
    def func(self):
        return (self.namespace, 'val')
@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        DynamicHandler(), bind='tcp://*:*')
    server_addr = list(server.transport.bindings())[0]
    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr)
    ret = yield from client.call.func()
    assert ((), 'val') == ret, ret
    ret = yield from client.call.a.func()
    assert (('a',), 'val') == ret, ret
    ret = yield from client.call.a.b.func()
    assert (('a', 'b'), 'val') == ret, ret
    server.close()
    yield from server.wait_closed()
    client.close()
    yield from client.wait_closed()
def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")
if __name__ == '__main__':
    main()
Socket event monitor¶
'''
This example demonstrates how to use the ZMQ socket monitor to receive
socket events.
The socket event monitor capability requires libzmq >= 4 and pyzmq >= 14.4.
'''
import asyncio
import aiozmq
import zmq
ZMQ_EVENTS = {
    getattr(zmq, name): name.replace('EVENT_', '').lower().replace('_', ' ')
    for name in [i for i in dir(zmq) if i.startswith('EVENT_')]}
def event_description(event):
    ''' Return a human readable description of the event '''
    return ZMQ_EVENTS.get(event, 'unknown')
class Protocol(aiozmq.ZmqProtocol):
    def __init__(self):
        self.wait_ready = asyncio.Future()
        self.wait_done = asyncio.Future()
        self.wait_closed = asyncio.Future()
        self.count = 0
    def connection_made(self, transport):
        self.transport = transport
        self.wait_ready.set_result(True)
    def connection_lost(self, exc):
        self.wait_closed.set_result(exc)
    def msg_received(self, data):
        # This protocol is used by both the Router and Dealer sockets in
        # this example. Router sockets prefix messages with the identity
        # of the sender and hence contain two frames in this simple test
        # protocol.
        if len(data) == 2:
            identity, msg = data
            assert msg == b'Hello'
            self.transport.write([identity, b'World'])
        else:
            msg = data[0]
            assert msg == b'World'
            self.count += 1
            if self.count >= 4:
                self.wait_done.set_result(True)
    def event_received(self, event):
        print(
            'event:{}, value:{}, endpoint:{}, description:{}'.format(
                event.event, event.value, event.endpoint,
                event_description(event.event)))
@asyncio.coroutine
def go():
    st, sp = yield from aiozmq.create_zmq_connection(
        Protocol, zmq.ROUTER, bind='tcp://127.0.0.1:*')
    yield from sp.wait_ready
    addr = list(st.bindings())[0]
    ct, cp = yield from aiozmq.create_zmq_connection(
        Protocol, zmq.DEALER, connect=addr)
    yield from cp.wait_ready
    # Enable the socket monitor on the client socket. Socket events
    # are passed to the 'event_received' method on the client protocol.
    yield from ct.enable_monitor()
    # Trigger some socket events while also sending a message to the
    # server. When the client protocol receives 4 response it will
    # fire the wait_done future.
    for i in range(4):
        yield from asyncio.sleep(0.1)
        yield from ct.disconnect(addr)
        yield from asyncio.sleep(0.1)
        yield from ct.connect(addr)
        yield from asyncio.sleep(0.1)
        ct.write([b'Hello'])
    yield from cp.wait_done
    # The socket monitor can be explicitly disabled if necessary.
    # yield from ct.disable_monitor()
    # If a socket monitor is left enabled on a socket being closed,
    # the socket monitor will be closed automatically.
    ct.close()
    yield from cp.wait_closed
    st.close()
    yield from sp.wait_closed
def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")
if __name__ == '__main__':
    # import logging
    # logging.basicConfig(level=logging.DEBUG)
    if (zmq.zmq_version_info() < (4,) or
            zmq.pyzmq_version_info() < (14, 4,)):
        raise NotImplementedError(
            "Socket monitor requires libzmq >= 4 and pyzmq >= 14.4, "
            "have libzmq:{}, pyzmq:{}".format(
                zmq.zmq_version(), zmq.pyzmq_version()))
    main()
Stream socket event monitor¶
import asyncio
import aiozmq
import zmq
@asyncio.coroutine
def monitor_stream(stream):
    try:
        while True:
            event = yield from stream.read_event()
            print(event)
    except aiozmq.ZmqStreamClosed:
        pass
@asyncio.coroutine
def go():
    router = yield from aiozmq.create_zmq_stream(
        zmq.ROUTER,
        bind='tcp://127.0.0.1:*')
    addr = list(router.transport.bindings())[0]
    dealer = yield from aiozmq.create_zmq_stream(
        zmq.DEALER)
    yield from dealer.transport.enable_monitor()
    asyncio.Task(monitor_stream(dealer))
    yield from dealer.transport.connect(addr)
    for i in range(10):
        msg = (b'data', b'ask', str(i).encode('utf-8'))
        dealer.write(msg)
        data = yield from router.read()
        router.write(data)
        answer = yield from dealer.read()
        print(answer)
    router.close()
    dealer.close()
def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")
if __name__ == '__main__':
    main()
Synchronous and asynchronous code works together¶
import asyncio
import aiozmq
import zmq
import argparse
import time
def main():
    ap = argparse.ArgumentParser()
    ap.add_argument('--addr', default='tcp://127.0.0.1:7777',
                    help="Address to use, default `%(default)s`")
    gr = ap.add_mutually_exclusive_group()
    gr.add_argument('--sync', action='store_const', dest='mode',
                    const=sync_main, default=None,
                    help="Run synchronous example")
    gr.add_argument('--async', action='store_const', dest='mode',
                    const=async_main,
                    help="Run asynchronous example")
    ap.add_argument('--client', action='store_true', default=False,
                    help="Run client part")
    ap.add_argument('--server', action='store_true', default=False,
                    help="Run server part")
    options = ap.parse_args()
    return options.mode(options)
def read_data():
    return input("Enter some phrase: ").encode('utf-8').split()
def sync_main(options):
    print("Running sync at {!r}".format(options.addr))
    ctx = zmq.Context()
    srv_sock = cl_sock = None
    if options.server:
        srv_sock = ctx.socket(zmq.ROUTER)
        srv_sock.bind(options.addr)
    if options.client:
        cl_sock = ctx.socket(zmq.DEALER)
        cl_sock.connect(options.addr)
        data = read_data()
        cl_sock.send_multipart(data)
        print("Sync client write: {!r}".format(data))
    while True:
        if srv_sock:
            try:
                data = srv_sock.recv_multipart(zmq.NOBLOCK)
                print("Sync server read: {!r}".format(data))
                srv_sock.send_multipart(data)
                print("Sync server write: {!r}".format(data))
            except zmq.ZMQError:
                pass
        if cl_sock:
            try:
                data = cl_sock.recv_multipart(zmq.NOBLOCK)
                print("Sync client read: {!r}".format(data))
                return
            except zmq.ZMQError:
                pass
        time.sleep(.1)
def async_main(options):
    print("Running async at {!r}".format(options.addr))
    loop = asyncio.get_event_loop()
    stop = asyncio.Future()
    @asyncio.coroutine
    def server():
        router = yield from aiozmq.create_zmq_stream(
            zmq.ROUTER, bind=options.addr)
        while True:
            try:
                data = yield from router.read()
            except asyncio.CancelledError:
                break
            print("Async server read: {!r}".format(data))
            router.write(data)
            print("Async server write: {!r}".format(data))
        router.close()
    @asyncio.coroutine
    def client():
        dealer = yield from aiozmq.create_zmq_stream(
            zmq.DEALER, connect=options.addr)
        data = read_data()
        dealer.write(data)
        print("Async client write: {!r}".format(data))
        echo = yield from dealer.read()
        print("Async client read: {!r}".format(echo))
        stop.set_result(None)
    tasks = []
    if options.server:
        tasks.append(asyncio.ensure_future(server()))
    if options.client:
        tasks.append(asyncio.ensure_future(client()))
    if tasks:
        try:
            loop.run_until_complete(stop)
        except KeyboardInterrupt:
            loop.call_soon(loop.stop)
            loop.run_forever()
            loop.close()
if __name__ == '__main__':
    main()