Examples of aiozmq usage

There is a list of examples from aiozmq/examples

Every example is a correct tiny python program.

Simple DEALER-ROUTER pair implemented on Core level

import asyncio
import aiozmq
import zmq


class ZmqDealerProtocol(aiozmq.ZmqProtocol):

    transport = None

    def __init__(self, queue, on_close):
        self.queue = queue
        self.on_close = on_close

    def connection_made(self, transport):
        self.transport = transport

    def msg_received(self, msg):
        self.queue.put_nowait(msg)

    def connection_lost(self, exc):
        self.on_close.set_result(exc)


class ZmqRouterProtocol(aiozmq.ZmqProtocol):

    transport = None

    def __init__(self, on_close):
        self.on_close = on_close

    def connection_made(self, transport):
        self.transport = transport

    def msg_received(self, msg):
        self.transport.write(msg)

    def connection_lost(self, exc):
        self.on_close.set_result(exc)


async def go():
    router_closed = asyncio.Future()
    dealer_closed = asyncio.Future()
    router, _ = await aiozmq.create_zmq_connection(
        lambda: ZmqRouterProtocol(router_closed), zmq.ROUTER, bind="tcp://127.0.0.1:*"
    )

    addr = list(router.bindings())[0]
    queue = asyncio.Queue()
    dealer, _ = await aiozmq.create_zmq_connection(
        lambda: ZmqDealerProtocol(queue, dealer_closed), zmq.DEALER, connect=addr
    )

    for i in range(10):
        msg = (b"data", b"ask", str(i).encode("utf-8"))
        dealer.write(msg)
        answer = await queue.get()
        print(answer)
    dealer.close()
    await dealer_closed
    router.close()
    await router_closed


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()

DEALER-ROUTER pair implemented with streams

import asyncio
import aiozmq
import zmq


async def go():
    router = await aiozmq.create_zmq_stream(zmq.ROUTER, bind="tcp://127.0.0.1:*")

    addr = list(router.transport.bindings())[0]
    dealer = await aiozmq.create_zmq_stream(zmq.DEALER, connect=addr)

    for i in range(10):
        msg = (b"data", b"ask", str(i).encode("utf-8"))
        dealer.write(msg)
        data = await router.read()
        router.write(data)
        answer = await dealer.read()
        print(answer)
    dealer.close()
    router.close()


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()

Remote Procedure Call

import asyncio
import aiozmq.rpc


class ServerHandler(aiozmq.rpc.AttrHandler):
    @aiozmq.rpc.method
    def remote_func(self, a: int, b: int) -> int:
        return a + b


async def go():
    server = await aiozmq.rpc.serve_rpc(ServerHandler(), bind="tcp://*:*")
    server_addr = list(server.transport.bindings())[0]

    client = await aiozmq.rpc.connect_rpc(connect=server_addr)

    ret = await client.call.remote_func(1, 2)
    assert 3 == ret

    server.close()
    await server.wait_closed()
    client.close()
    await client.wait_closed()


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()

Pipeline aka Notifier

import asyncio
import aiozmq.rpc
from itertools import count


class Handler(aiozmq.rpc.AttrHandler):
    def __init__(self):
        self.connected = False

    @aiozmq.rpc.method
    def remote_func(self, step, a: int, b: int):
        self.connected = True
        print("HANDLER", step, a, b)


async def go():
    handler = Handler()
    listener = await aiozmq.rpc.serve_pipeline(handler, bind="tcp://*:*")
    listener_addr = list(listener.transport.bindings())[0]

    notifier = await aiozmq.rpc.connect_pipeline(connect=listener_addr)

    for step in count(0):
        await notifier.notify.remote_func(step, 1, 2)
        if handler.connected:
            break
        else:
            await asyncio.sleep(0.01)

    listener.close()
    await listener.wait_closed()
    notifier.close()
    await notifier.wait_closed()


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()

Publish-Subscribe

import asyncio
import aiozmq.rpc
from itertools import count


class Handler(aiozmq.rpc.AttrHandler):
    def __init__(self):
        self.connected = False

    @aiozmq.rpc.method
    def remote_func(self, step, a: int, b: int):
        self.connected = True
        print("HANDLER", step, a, b)


async def go():
    handler = Handler()
    subscriber = await aiozmq.rpc.serve_pubsub(
        handler, subscribe="topic", bind="tcp://127.0.0.1:*", log_exceptions=True
    )
    subscriber_addr = list(subscriber.transport.bindings())[0]
    print("SERVE", subscriber_addr)

    publisher = await aiozmq.rpc.connect_pubsub(connect=subscriber_addr)

    for step in count(0):
        await publisher.publish("topic").remote_func(step, 1, 2)
        if handler.connected:
            break
        else:
            await asyncio.sleep(0.1)

    subscriber.close()
    await subscriber.wait_closed()
    publisher.close()
    await publisher.wait_closed()


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()

Translation RPC exceptions back to client

import asyncio
import aiozmq.rpc


class CustomError(Exception):
    def __init__(self, val):
        self.val = val
        super().__init__(val)


exc_name = CustomError.__module__ + "." + CustomError.__name__
error_table = {exc_name: CustomError}


class ServerHandler(aiozmq.rpc.AttrHandler):
    @aiozmq.rpc.method
    def remote(self, val):
        raise CustomError(val)


async def go():
    server = await aiozmq.rpc.serve_rpc(ServerHandler(), bind="tcp://*:*")
    server_addr = list(server.transport.bindings())[0]

    client = await aiozmq.rpc.connect_rpc(connect=server_addr, error_table=error_table)

    try:
        await client.call.remote("value")
    except CustomError as exc:
        exc.val == "value"

    server.close()
    client.close()


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()

Translation instances of custom classes via RPC

import asyncio
import aiozmq.rpc
import msgpack


class Point:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __eq__(self, other):
        if isinstance(other, Point):
            return (self.x, self.y) == (other.x, other.y)
        return NotImplemented


translation_table = {
    0: (
        Point,
        lambda value: msgpack.packb((value.x, value.y)),
        lambda binary: Point(*msgpack.unpackb(binary)),
    ),
}


class ServerHandler(aiozmq.rpc.AttrHandler):
    @aiozmq.rpc.method
    def remote(self, val):
        return val


async def go():
    server = await aiozmq.rpc.serve_rpc(
        ServerHandler(), bind="tcp://*:*", translation_table=translation_table
    )
    server_addr = list(server.transport.bindings())[0]

    client = await aiozmq.rpc.connect_rpc(
        connect=server_addr, translation_table=translation_table
    )

    ret = await client.call.remote(Point(1, 2))
    assert ret == Point(1, 2)

    server.close()
    await server.wait_closed()
    client.close()
    await client.wait_closed()


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()

Validation of RPC methods

import asyncio
import aiozmq.rpc


class ServerHandler(aiozmq.rpc.AttrHandler):
    @aiozmq.rpc.method
    def remote_func(self, a: int, b: int) -> int:
        return a + b


async def go():
    server = await aiozmq.rpc.serve_rpc(ServerHandler(), bind="tcp://*:*")
    server_addr = list(server.transport.bindings())[0]

    client = await aiozmq.rpc.connect_rpc(connect=server_addr)

    try:
        await client.call.unknown_function()
    except aiozmq.rpc.NotFoundError as exc:
        print("client.rpc.unknown_function(): {}".format(exc))

    try:
        await client.call.remote_func(bad_arg=1)
    except aiozmq.rpc.ParametersError as exc:
        print("client.rpc.remote_func(bad_arg=1): {}".format(exc))

    try:
        await client.call.remote_func(1)
    except aiozmq.rpc.ParametersError as exc:
        print("client.rpc.remote_func(1): {}".format(exc))

    try:
        await client.call.remote_func("a", "b")
    except aiozmq.rpc.ParametersError as exc:
        print("client.rpc.remote_func('a', 'b'): {}".format(exc))

    server.close()
    await server.wait_closed()
    client.close()
    await client.wait_closed()


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()

RPC lookup in nested namespaces

import asyncio
import aiozmq.rpc


class Handler(aiozmq.rpc.AttrHandler):
    def __init__(self, ident):
        self.ident = ident
        self.subhandler = SubHandler(self.ident, "subident")

    @aiozmq.rpc.method
    def a(self):
        return (self.ident, "a")


class SubHandler(aiozmq.rpc.AttrHandler):
    def __init__(self, ident, subident):
        self.ident = ident
        self.subident = subident

    @aiozmq.rpc.method
    def b(self):
        return (self.ident, self.subident, "b")


async def go():
    server = await aiozmq.rpc.serve_rpc(Handler("ident"), bind="tcp://*:*")
    server_addr = list(server.transport.bindings())[0]

    client = await aiozmq.rpc.connect_rpc(connect=server_addr)

    ret = await client.call.a()
    assert ("ident", "a") == ret

    ret = await client.call.subhandler.b()
    assert ("ident", "subident", "b") == ret

    server.close()
    await server.wait_closed()
    client.close()
    await client.wait_closed()


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()

Use dict as RPC lookup table

import asyncio
import aiozmq.rpc


@aiozmq.rpc.method
def a():
    return "a"


@aiozmq.rpc.method
def b():
    return "b"


handlers_dict = {"a": a, "subnamespace": {"b": b}}


async def go():
    server = await aiozmq.rpc.serve_rpc(handlers_dict, bind="tcp://*:*")
    server_addr = list(server.transport.bindings())[0]

    client = await aiozmq.rpc.connect_rpc(connect=server_addr)

    ret = await client.call.a()
    assert "a" == ret

    ret = await client.call.subnamespace.b()
    assert "b" == ret

    server.close()
    await server.wait_closed()
    client.close()
    await client.wait_closed()


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()

Use dynamic RPC lookup

import asyncio
import aiozmq.rpc


class DynamicHandler(aiozmq.rpc.AttrHandler):
    def __init__(self, namespace=()):
        self.namespace = namespace

    def __getitem__(self, key):
        try:
            return getattr(self, key)
        except AttributeError:
            return DynamicHandler(self.namespace + (key,))

    @aiozmq.rpc.method
    def func(self):
        return (self.namespace, "val")


async def go():
    server = await aiozmq.rpc.serve_rpc(DynamicHandler(), bind="tcp://*:*")
    server_addr = list(server.transport.bindings())[0]

    client = await aiozmq.rpc.connect_rpc(connect=server_addr)

    ret = await client.call.func()
    assert ((), "val") == ret, ret

    ret = await client.call.a.func()
    assert (("a",), "val") == ret, ret

    ret = await client.call.a.b.func()
    assert (("a", "b"), "val") == ret, ret

    server.close()
    await server.wait_closed()
    client.close()
    await client.wait_closed()


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()

Socket event monitor

"""
This example demonstrates how to use the ZMQ socket monitor to receive
socket events.

The socket event monitor capability requires libzmq >= 4 and pyzmq >= 14.4.

"""

import asyncio
import aiozmq
import zmq


ZMQ_EVENTS = {
    getattr(zmq, name): name.replace("EVENT_", "").lower().replace("_", " ")
    for name in [i for i in dir(zmq) if i.startswith("EVENT_")]
}


def event_description(event):
    """Return a human readable description of the event"""
    return ZMQ_EVENTS.get(event, "unknown")


class Protocol(aiozmq.ZmqProtocol):
    def __init__(self):
        self.wait_ready = asyncio.Future()
        self.wait_done = asyncio.Future()
        self.wait_closed = asyncio.Future()
        self.count = 0

    def connection_made(self, transport):
        self.transport = transport
        self.wait_ready.set_result(True)

    def connection_lost(self, exc):
        self.wait_closed.set_result(exc)

    def msg_received(self, data):
        # This protocol is used by both the Router and Dealer sockets in
        # this example. Router sockets prefix messages with the identity
        # of the sender and hence contain two frames in this simple test
        # protocol.
        if len(data) == 2:
            identity, msg = data
            assert msg == b"Hello"
            self.transport.write([identity, b"World"])
        else:
            msg = data[0]
            assert msg == b"World"
            self.count += 1
            if self.count >= 4:
                self.wait_done.set_result(True)

    def event_received(self, event):
        print(
            "event:{}, value:{}, endpoint:{}, description:{}".format(
                event.event, event.value, event.endpoint, event_description(event.event)
            )
        )


async def go():

    st, sp = await aiozmq.create_zmq_connection(
        Protocol, zmq.ROUTER, bind="tcp://127.0.0.1:*"
    )
    await sp.wait_ready
    addr = list(st.bindings())[0]

    ct, cp = await aiozmq.create_zmq_connection(Protocol, zmq.DEALER, connect=addr)
    await cp.wait_ready

    # Enable the socket monitor on the client socket. Socket events
    # are passed to the 'event_received' method on the client protocol.
    await ct.enable_monitor()

    # Trigger some socket events while also sending a message to the
    # server. When the client protocol receives 4 response it will
    # fire the wait_done future.
    for i in range(4):
        await asyncio.sleep(0.1)
        await ct.disconnect(addr)
        await asyncio.sleep(0.1)
        await ct.connect(addr)
        await asyncio.sleep(0.1)
        ct.write([b"Hello"])

    await cp.wait_done

    # The socket monitor can be explicitly disabled if necessary.
    # await ct.disable_monitor()

    # If a socket monitor is left enabled on a socket being closed,
    # the socket monitor will be closed automatically.
    ct.close()
    await cp.wait_closed

    st.close()
    await sp.wait_closed


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    # import logging
    # logging.basicConfig(level=logging.DEBUG)

    if zmq.zmq_version_info() < (4,) or zmq.pyzmq_version_info() < (14, 4):
        raise NotImplementedError(
            "Socket monitor requires libzmq >= 4 and pyzmq >= 14.4, "
            "have libzmq:{}, pyzmq:{}".format(zmq.zmq_version(), zmq.pyzmq_version())
        )

    main()

Stream socket event monitor

import asyncio
import aiozmq
import zmq


async def monitor_stream(stream):
    try:
        while True:
            event = await stream.read_event()
            print(event)
    except aiozmq.ZmqStreamClosed:
        pass


async def go():
    router = await aiozmq.create_zmq_stream(zmq.ROUTER, bind="tcp://127.0.0.1:*")
    addr = list(router.transport.bindings())[0]

    dealer = await aiozmq.create_zmq_stream(zmq.DEALER)

    await dealer.transport.enable_monitor()

    asyncio.Task(monitor_stream(dealer))

    await dealer.transport.connect(addr)

    for i in range(10):
        msg = (b"data", b"ask", str(i).encode("utf-8"))
        dealer.write(msg)
        data = await router.read()
        router.write(data)
        answer = await dealer.read()
        print(answer)

    router.close()
    dealer.close()


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()

Synchronous and asynchronous code works together

import asyncio
import aiozmq
import zmq
import argparse
import time


def main():
    ap = argparse.ArgumentParser()
    ap.add_argument(
        "--addr",
        default="tcp://127.0.0.1:7777",
        help="Address to use, default `%(default)s`",
    )
    gr = ap.add_mutually_exclusive_group()
    gr.add_argument(
        "--sync",
        action="store_const",
        dest="mode",
        const=sync_main,
        default=None,
        help="Run synchronous example",
    )
    gr.add_argument(
        "--async",
        action="store_const",
        dest="mode",
        const=async_main,
        help="Run asynchronous example",
    )

    ap.add_argument(
        "--client", action="store_true", default=False, help="Run client part"
    )
    ap.add_argument(
        "--server", action="store_true", default=False, help="Run server part"
    )

    options = ap.parse_args()
    return options.mode(options)


def read_data():
    return input("Enter some phrase: ").encode("utf-8").split()


def sync_main(options):
    print("Running sync at {!r}".format(options.addr))

    ctx = zmq.Context()
    srv_sock = cl_sock = None
    if options.server:
        srv_sock = ctx.socket(zmq.ROUTER)
        srv_sock.bind(options.addr)

    if options.client:
        cl_sock = ctx.socket(zmq.DEALER)
        cl_sock.connect(options.addr)
        data = read_data()
        cl_sock.send_multipart(data)
        print("Sync client write: {!r}".format(data))

    while True:
        if srv_sock:
            try:
                data = srv_sock.recv_multipart(zmq.NOBLOCK)
                print("Sync server read: {!r}".format(data))
                srv_sock.send_multipart(data)
                print("Sync server write: {!r}".format(data))
            except zmq.ZMQError:
                pass
        if cl_sock:
            try:
                data = cl_sock.recv_multipart(zmq.NOBLOCK)
                print("Sync client read: {!r}".format(data))
                return
            except zmq.ZMQError:
                pass
        time.sleep(0.1)


def async_main(options):
    print("Running async at {!r}".format(options.addr))
    loop = asyncio.get_event_loop()

    stop = asyncio.Future()

    async def server():
        router = await aiozmq.create_zmq_stream(zmq.ROUTER, bind=options.addr)
        while True:
            try:
                data = await router.read()
            except asyncio.CancelledError:
                break
            print("Async server read: {!r}".format(data))
            router.write(data)
            print("Async server write: {!r}".format(data))
        router.close()

    async def client():
        dealer = await aiozmq.create_zmq_stream(zmq.DEALER, connect=options.addr)
        data = read_data()
        dealer.write(data)
        print("Async client write: {!r}".format(data))
        echo = await dealer.read()
        print("Async client read: {!r}".format(echo))
        stop.set_result(None)

    tasks = []
    if options.server:
        tasks.append(asyncio.ensure_future(server()))

    if options.client:
        tasks.append(asyncio.ensure_future(client()))

    if tasks:
        try:
            loop.run_until_complete(stop)
        except KeyboardInterrupt:
            loop.call_soon(loop.stop)
            loop.run_forever()
            loop.close()


if __name__ == "__main__":
    main()