Examples of aiozmq usage

There is a list of examples from aiozmq/examples

Every example is a correct tiny python program.

Simple DEALER-ROUTER pair implemented on Core level

import asyncio
import aiozmq
import zmq


class ZmqDealerProtocol(aiozmq.ZmqProtocol):

    transport = None

    def __init__(self, queue, on_close):
        self.queue = queue
        self.on_close = on_close

    def connection_made(self, transport):
        self.transport = transport

    def msg_received(self, msg):
        self.queue.put_nowait(msg)

    def connection_lost(self, exc):
        self.on_close.set_result(exc)


class ZmqRouterProtocol(aiozmq.ZmqProtocol):

    transport = None

    def __init__(self, on_close):
        self.on_close = on_close

    def connection_made(self, transport):
        self.transport = transport

    def msg_received(self, msg):
        self.transport.write(msg)

    def connection_lost(self, exc):
        self.on_close.set_result(exc)


@asyncio.coroutine
def go():
    router_closed = asyncio.Future()
    dealer_closed = asyncio.Future()
    router, _ = yield from aiozmq.create_zmq_connection(
        lambda: ZmqRouterProtocol(router_closed),
        zmq.ROUTER,
        bind='tcp://127.0.0.1:*')

    addr = list(router.bindings())[0]
    queue = asyncio.Queue()
    dealer, _ = yield from aiozmq.create_zmq_connection(
        lambda: ZmqDealerProtocol(queue, dealer_closed),
        zmq.DEALER,
        connect=addr)

    for i in range(10):
        msg = (b'data', b'ask', str(i).encode('utf-8'))
        dealer.write(msg)
        answer = yield from queue.get()
        print(answer)
    dealer.close()
    yield from dealer_closed
    router.close()
    yield from router_closed


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    main()

DEALER-ROUTER pair implemented with streams

import asyncio
import aiozmq
import zmq


@asyncio.coroutine
def go():
    router = yield from aiozmq.create_zmq_stream(
        zmq.ROUTER,
        bind='tcp://127.0.0.1:*')

    addr = list(router.transport.bindings())[0]
    dealer = yield from aiozmq.create_zmq_stream(
        zmq.DEALER,
        connect=addr)

    for i in range(10):
        msg = (b'data', b'ask', str(i).encode('utf-8'))
        dealer.write(msg)
        data = yield from router.read()
        router.write(data)
        answer = yield from dealer.read()
        print(answer)
    dealer.close()
    router.close()


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    main()

Remote Procedure Call

import asyncio
import aiozmq.rpc


class ServerHandler(aiozmq.rpc.AttrHandler):

    @aiozmq.rpc.method
    def remote_func(self, a: int, b: int) -> int:
        return a + b


@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        ServerHandler(), bind='tcp://*:*')
    server_addr = list(server.transport.bindings())[0]

    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr)

    ret = yield from client.call.remote_func(1, 2)
    assert 3 == ret

    server.close()
    yield from server.wait_closed()
    client.close()
    yield from client.wait_closed()


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    main()

Pipeline aka Notifier

import asyncio
import aiozmq.rpc
from itertools import count


class Handler(aiozmq.rpc.AttrHandler):

    def __init__(self):
        self.connected = False

    @aiozmq.rpc.method
    def remote_func(self, step, a: int, b: int):
        self.connected = True
        print("HANDLER", step, a, b)


@asyncio.coroutine
def go():
    handler = Handler()
    listener = yield from aiozmq.rpc.serve_pipeline(
        handler, bind='tcp://*:*')
    listener_addr = list(listener.transport.bindings())[0]

    notifier = yield from aiozmq.rpc.connect_pipeline(
        connect=listener_addr)

    for step in count(0):
        yield from notifier.notify.remote_func(step, 1, 2)
        if handler.connected:
            break
        else:
            yield from asyncio.sleep(0.01)

    listener.close()
    yield from listener.wait_closed()
    notifier.close()
    yield from notifier.wait_closed()


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    main()

Publish-Subscribe

import asyncio
import aiozmq.rpc
from itertools import count


class Handler(aiozmq.rpc.AttrHandler):

    def __init__(self):
        self.connected = False

    @aiozmq.rpc.method
    def remote_func(self, step, a: int, b: int):
        self.connected = True
        print("HANDLER", step, a, b)


@asyncio.coroutine
def go():
    handler = Handler()
    subscriber = yield from aiozmq.rpc.serve_pubsub(
        handler, subscribe='topic', bind='tcp://127.0.0.1:*',
        log_exceptions=True)
    subscriber_addr = list(subscriber.transport.bindings())[0]
    print("SERVE", subscriber_addr)

    publisher = yield from aiozmq.rpc.connect_pubsub(
        connect=subscriber_addr)

    for step in count(0):
        yield from publisher.publish('topic').remote_func(step, 1, 2)
        if handler.connected:
            break
        else:
            yield from asyncio.sleep(0.1)

    subscriber.close()
    yield from subscriber.wait_closed()
    publisher.close()
    yield from publisher.wait_closed()


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    main()

Translation RPC exceptions back to client

import asyncio
import aiozmq.rpc


class CustomError(Exception):

    def __init__(self, val):
        self.val = val
        super().__init__(val)


exc_name = CustomError.__module__+'.'+CustomError.__name__
error_table = {exc_name: CustomError}


class ServerHandler(aiozmq.rpc.AttrHandler):
    @aiozmq.rpc.method
    def remote(self, val):
        raise CustomError(val)


@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        ServerHandler(), bind='tcp://*:*')
    server_addr = list(server.transport.bindings())[0]

    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr,
        error_table=error_table)

    try:
        yield from client.call.remote('value')
    except CustomError as exc:
        exc.val == 'value'

    server.close()
    client.close()


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    main()

Translation instances of custom classes via RPC

import asyncio
import aiozmq.rpc
import msgpack


class Point:

    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __eq__(self, other):
        if isinstance(other, Point):
            return (self.x, self.y) == (other.x, other.y)
        return NotImplemented


translation_table = {
    0: (Point,
        lambda value: msgpack.packb((value.x, value.y)),
        lambda binary: Point(*msgpack.unpackb(binary))),
}


class ServerHandler(aiozmq.rpc.AttrHandler):
    @aiozmq.rpc.method
    def remote(self, val):
        return val


@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        ServerHandler(), bind='tcp://*:*',
        translation_table=translation_table)
    server_addr = list(server.transport.bindings())[0]

    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr,
        translation_table=translation_table)

    ret = yield from client.call.remote(Point(1, 2))
    assert ret == Point(1, 2)

    server.close()
    yield from server.wait_closed()
    client.close()
    yield from client.wait_closed()


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    main()

Validation of RPC methods

import asyncio
import aiozmq.rpc


class ServerHandler(aiozmq.rpc.AttrHandler):

    @aiozmq.rpc.method
    def remote_func(self, a: int, b: int) -> int:
        return a + b


@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        ServerHandler(), bind='tcp://*:*')
    server_addr = list(server.transport.bindings())[0]

    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr)

    try:
        yield from client.call.unknown_function()
    except aiozmq.rpc.NotFoundError as exc:
        print("client.rpc.unknown_function(): {}".format(exc))

    try:
        yield from client.call.remote_func(bad_arg=1)
    except aiozmq.rpc.ParametersError as exc:
        print("client.rpc.remote_func(bad_arg=1): {}".format(exc))

    try:
        yield from client.call.remote_func(1)
    except aiozmq.rpc.ParametersError as exc:
        print("client.rpc.remote_func(1): {}".format(exc))

    try:
        yield from client.call.remote_func('a', 'b')
    except aiozmq.rpc.ParametersError as exc:
        print("client.rpc.remote_func('a', 'b'): {}".format(exc))

    server.close()
    yield from server.wait_closed()
    client.close()
    yield from client.wait_closed()


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    main()

RPC lookup in nested namespaces

import asyncio
import aiozmq.rpc


class Handler(aiozmq.rpc.AttrHandler):

    def __init__(self, ident):
        self.ident = ident
        self.subhandler = SubHandler(self.ident, 'subident')

    @aiozmq.rpc.method
    def a(self):
        return (self.ident, 'a')


class SubHandler(aiozmq.rpc.AttrHandler):

    def __init__(self, ident, subident):
        self.ident = ident
        self.subident = subident

    @aiozmq.rpc.method
    def b(self):
        return (self.ident, self.subident, 'b')


@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        Handler('ident'), bind='tcp://*:*')
    server_addr = list(server.transport.bindings())[0]

    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr)

    ret = yield from client.call.a()
    assert ('ident', 'a') == ret

    ret = yield from client.call.subhandler.b()
    assert ('ident', 'subident', 'b') == ret

    server.close()
    yield from server.wait_closed()
    client.close()
    yield from client.wait_closed()


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    main()

Use dict as RPC lookup table

import asyncio
import aiozmq.rpc


@aiozmq.rpc.method
def a():
    return 'a'


@aiozmq.rpc.method
def b():
    return 'b'


handlers_dict = {'a': a,
                 'subnamespace': {'b': b}}


@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        handlers_dict, bind='tcp://*:*')
    server_addr = list(server.transport.bindings())[0]

    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr)

    ret = yield from client.call.a()
    assert 'a' == ret

    ret = yield from client.call.subnamespace.b()
    assert 'b' == ret

    server.close()
    yield from server.wait_closed()
    client.close()
    yield from client.wait_closed()


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    main()

Use dynamic RPC lookup

import asyncio
import aiozmq.rpc


class DynamicHandler(aiozmq.rpc.AttrHandler):

    def __init__(self, namespace=()):
        self.namespace = namespace

    def __getitem__(self, key):
        try:
            return getattr(self, key)
        except AttributeError:
            return DynamicHandler(self.namespace + (key,))

    @aiozmq.rpc.method
    def func(self):
        return (self.namespace, 'val')


@asyncio.coroutine
def go():
    server = yield from aiozmq.rpc.serve_rpc(
        DynamicHandler(), bind='tcp://*:*')
    server_addr = list(server.transport.bindings())[0]

    client = yield from aiozmq.rpc.connect_rpc(
        connect=server_addr)

    ret = yield from client.call.func()
    assert ((), 'val') == ret, ret

    ret = yield from client.call.a.func()
    assert (('a',), 'val') == ret, ret

    ret = yield from client.call.a.b.func()
    assert (('a', 'b'), 'val') == ret, ret

    server.close()
    yield from server.wait_closed()
    client.close()
    yield from client.wait_closed()


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    main()

Socket event monitor


'''
This example demonstrates how to use the ZMQ socket monitor to receive
socket events.

The socket event monitor capability requires libzmq >= 4 and pyzmq >= 14.4.

'''

import asyncio
import aiozmq
import zmq


ZMQ_EVENTS = {
    getattr(zmq, name): name.replace('EVENT_', '').lower().replace('_', ' ')
    for name in [i for i in dir(zmq) if i.startswith('EVENT_')]}


def event_description(event):
    ''' Return a human readable description of the event '''
    return ZMQ_EVENTS.get(event, 'unknown')


class Protocol(aiozmq.ZmqProtocol):

    def __init__(self):
        self.wait_ready = asyncio.Future()
        self.wait_done = asyncio.Future()
        self.wait_closed = asyncio.Future()
        self.count = 0

    def connection_made(self, transport):
        self.transport = transport
        self.wait_ready.set_result(True)

    def connection_lost(self, exc):
        self.wait_closed.set_result(exc)

    def msg_received(self, data):
        # This protocol is used by both the Router and Dealer sockets in
        # this example. Router sockets prefix messages with the identity
        # of the sender and hence contain two frames in this simple test
        # protocol.
        if len(data) == 2:
            identity, msg = data
            assert msg == b'Hello'
            self.transport.write([identity, b'World'])
        else:
            msg = data[0]
            assert msg == b'World'
            self.count += 1
            if self.count >= 4:
                self.wait_done.set_result(True)

    def event_received(self, event):
        print(
            'event:{}, value:{}, endpoint:{}, description:{}'.format(
                event.event, event.value, event.endpoint,
                event_description(event.event)))


@asyncio.coroutine
def go():

    st, sp = yield from aiozmq.create_zmq_connection(
        Protocol, zmq.ROUTER, bind='tcp://127.0.0.1:*')
    yield from sp.wait_ready
    addr = list(st.bindings())[0]

    ct, cp = yield from aiozmq.create_zmq_connection(
        Protocol, zmq.DEALER, connect=addr)
    yield from cp.wait_ready

    # Enable the socket monitor on the client socket. Socket events
    # are passed to the 'event_received' method on the client protocol.
    yield from ct.enable_monitor()

    # Trigger some socket events while also sending a message to the
    # server. When the client protocol receives 4 response it will
    # fire the wait_done future.
    for i in range(4):
        yield from asyncio.sleep(0.1)
        yield from ct.disconnect(addr)
        yield from asyncio.sleep(0.1)
        yield from ct.connect(addr)
        yield from asyncio.sleep(0.1)
        ct.write([b'Hello'])

    yield from cp.wait_done

    # The socket monitor can be explicitly disabled if necessary.
    # yield from ct.disable_monitor()

    # If a socket monitor is left enabled on a socket being closed,
    # the socket monitor will be closed automatically.
    ct.close()
    yield from cp.wait_closed

    st.close()
    yield from sp.wait_closed


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    # import logging
    # logging.basicConfig(level=logging.DEBUG)

    if (zmq.zmq_version_info() < (4,) or
            zmq.pyzmq_version_info() < (14, 4,)):
        raise NotImplementedError(
            "Socket monitor requires libzmq >= 4 and pyzmq >= 14.4, "
            "have libzmq:{}, pyzmq:{}".format(
                zmq.zmq_version(), zmq.pyzmq_version()))

    main()

Stream socket event monitor

import asyncio
import aiozmq
import zmq


@asyncio.coroutine
def monitor_stream(stream):
    try:
        while True:
            event = yield from stream.read_event()
            print(event)
    except aiozmq.ZmqStreamClosed:
        pass


@asyncio.coroutine
def go():
    router = yield from aiozmq.create_zmq_stream(
        zmq.ROUTER,
        bind='tcp://127.0.0.1:*')
    addr = list(router.transport.bindings())[0]

    dealer = yield from aiozmq.create_zmq_stream(
        zmq.DEALER)

    yield from dealer.transport.enable_monitor()

    asyncio.Task(monitor_stream(dealer))

    yield from dealer.transport.connect(addr)

    for i in range(10):
        msg = (b'data', b'ask', str(i).encode('utf-8'))
        dealer.write(msg)
        data = yield from router.read()
        router.write(data)
        answer = yield from dealer.read()
        print(answer)

    router.close()
    dealer.close()


def main():
    asyncio.get_event_loop().run_until_complete(go())
    print("DONE")


if __name__ == '__main__':
    main()

Synchronous and asynchronous code works together

import asyncio
import aiozmq
import zmq
import argparse
import time


def main():
    ap = argparse.ArgumentParser()
    ap.add_argument('--addr', default='tcp://127.0.0.1:7777',
                    help="Address to use, default `%(default)s`")
    gr = ap.add_mutually_exclusive_group()
    gr.add_argument('--sync', action='store_const', dest='mode',
                    const=sync_main, default=None,
                    help="Run synchronous example")
    gr.add_argument('--async', action='store_const', dest='mode',
                    const=async_main,
                    help="Run asynchronous example")

    ap.add_argument('--client', action='store_true', default=False,
                    help="Run client part")
    ap.add_argument('--server', action='store_true', default=False,
                    help="Run server part")

    options = ap.parse_args()
    return options.mode(options)


def read_data():
    return input("Enter some phrase: ").encode('utf-8').split()


def sync_main(options):
    print("Running sync at {!r}".format(options.addr))

    ctx = zmq.Context()
    srv_sock = cl_sock = None
    if options.server:
        srv_sock = ctx.socket(zmq.ROUTER)
        srv_sock.bind(options.addr)

    if options.client:
        cl_sock = ctx.socket(zmq.DEALER)
        cl_sock.connect(options.addr)
        data = read_data()
        cl_sock.send_multipart(data)
        print("Sync client write: {!r}".format(data))

    while True:
        if srv_sock:
            try:
                data = srv_sock.recv_multipart(zmq.NOBLOCK)
                print("Sync server read: {!r}".format(data))
                srv_sock.send_multipart(data)
                print("Sync server write: {!r}".format(data))
            except zmq.ZMQError:
                pass
        if cl_sock:
            try:
                data = cl_sock.recv_multipart(zmq.NOBLOCK)
                print("Sync client read: {!r}".format(data))
                return
            except zmq.ZMQError:
                pass
        time.sleep(.1)


def async_main(options):
    print("Running async at {!r}".format(options.addr))
    loop = asyncio.get_event_loop()

    stop = asyncio.Future()

    @asyncio.coroutine
    def server():
        router = yield from aiozmq.create_zmq_stream(
            zmq.ROUTER, bind=options.addr)
        while True:
            try:
                data = yield from router.read()
            except asyncio.CancelledError:
                break
            print("Async server read: {!r}".format(data))
            router.write(data)
            print("Async server write: {!r}".format(data))
        router.close()

    @asyncio.coroutine
    def client():
        dealer = yield from aiozmq.create_zmq_stream(
            zmq.DEALER, connect=options.addr)
        data = read_data()
        dealer.write(data)
        print("Async client write: {!r}".format(data))
        echo = yield from dealer.read()
        print("Async client read: {!r}".format(echo))
        stop.set_result(None)

    tasks = []
    if options.server:
        tasks.append(asyncio.ensure_future(server()))

    if options.client:
        tasks.append(asyncio.ensure_future(client()))

    if tasks:
        try:
            loop.run_until_complete(stop)
        except KeyboardInterrupt:
            loop.call_soon(loop.stop)
            loop.run_forever()
            loop.close()


if __name__ == '__main__':
    main()