Examples of aiozmq usage¶
There is a list of examples from aiozmq/examples
Every example is a correct tiny python program.
Simple DEALER-ROUTER pair implemented on Core level¶
import asyncio
import aiozmq
import zmq
class ZmqDealerProtocol(aiozmq.ZmqProtocol):
transport = None
def __init__(self, queue, on_close):
self.queue = queue
self.on_close = on_close
def connection_made(self, transport):
self.transport = transport
def msg_received(self, msg):
self.queue.put_nowait(msg)
def connection_lost(self, exc):
self.on_close.set_result(exc)
class ZmqRouterProtocol(aiozmq.ZmqProtocol):
transport = None
def __init__(self, on_close):
self.on_close = on_close
def connection_made(self, transport):
self.transport = transport
def msg_received(self, msg):
self.transport.write(msg)
def connection_lost(self, exc):
self.on_close.set_result(exc)
async def go():
router_closed = asyncio.Future()
dealer_closed = asyncio.Future()
router, _ = await aiozmq.create_zmq_connection(
lambda: ZmqRouterProtocol(router_closed), zmq.ROUTER, bind="tcp://127.0.0.1:*"
)
addr = list(router.bindings())[0]
queue = asyncio.Queue()
dealer, _ = await aiozmq.create_zmq_connection(
lambda: ZmqDealerProtocol(queue, dealer_closed), zmq.DEALER, connect=addr
)
for i in range(10):
msg = (b"data", b"ask", str(i).encode("utf-8"))
dealer.write(msg)
answer = await queue.get()
print(answer)
dealer.close()
await dealer_closed
router.close()
await router_closed
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
DEALER-ROUTER pair implemented with streams¶
import asyncio
import aiozmq
import zmq
async def go():
router = await aiozmq.create_zmq_stream(zmq.ROUTER, bind="tcp://127.0.0.1:*")
addr = list(router.transport.bindings())[0]
dealer = await aiozmq.create_zmq_stream(zmq.DEALER, connect=addr)
for i in range(10):
msg = (b"data", b"ask", str(i).encode("utf-8"))
dealer.write(msg)
data = await router.read()
router.write(data)
answer = await dealer.read()
print(answer)
dealer.close()
router.close()
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
Remote Procedure Call¶
import asyncio
import aiozmq.rpc
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote_func(self, a: int, b: int) -> int:
return a + b
async def go():
server = await aiozmq.rpc.serve_rpc(ServerHandler(), bind="tcp://*:*")
server_addr = list(server.transport.bindings())[0]
client = await aiozmq.rpc.connect_rpc(connect=server_addr)
ret = await client.call.remote_func(1, 2)
assert 3 == ret
server.close()
await server.wait_closed()
client.close()
await client.wait_closed()
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
Pipeline aka Notifier¶
import asyncio
import aiozmq.rpc
from itertools import count
class Handler(aiozmq.rpc.AttrHandler):
def __init__(self):
self.connected = False
@aiozmq.rpc.method
def remote_func(self, step, a: int, b: int):
self.connected = True
print("HANDLER", step, a, b)
async def go():
handler = Handler()
listener = await aiozmq.rpc.serve_pipeline(handler, bind="tcp://*:*")
listener_addr = list(listener.transport.bindings())[0]
notifier = await aiozmq.rpc.connect_pipeline(connect=listener_addr)
for step in count(0):
await notifier.notify.remote_func(step, 1, 2)
if handler.connected:
break
else:
await asyncio.sleep(0.01)
listener.close()
await listener.wait_closed()
notifier.close()
await notifier.wait_closed()
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
Publish-Subscribe¶
import asyncio
import aiozmq.rpc
from itertools import count
class Handler(aiozmq.rpc.AttrHandler):
def __init__(self):
self.connected = False
@aiozmq.rpc.method
def remote_func(self, step, a: int, b: int):
self.connected = True
print("HANDLER", step, a, b)
async def go():
handler = Handler()
subscriber = await aiozmq.rpc.serve_pubsub(
handler, subscribe="topic", bind="tcp://127.0.0.1:*", log_exceptions=True
)
subscriber_addr = list(subscriber.transport.bindings())[0]
print("SERVE", subscriber_addr)
publisher = await aiozmq.rpc.connect_pubsub(connect=subscriber_addr)
for step in count(0):
await publisher.publish("topic").remote_func(step, 1, 2)
if handler.connected:
break
else:
await asyncio.sleep(0.1)
subscriber.close()
await subscriber.wait_closed()
publisher.close()
await publisher.wait_closed()
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
Translation RPC exceptions back to client¶
import asyncio
import aiozmq.rpc
class CustomError(Exception):
def __init__(self, val):
self.val = val
super().__init__(val)
exc_name = CustomError.__module__ + "." + CustomError.__name__
error_table = {exc_name: CustomError}
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote(self, val):
raise CustomError(val)
async def go():
server = await aiozmq.rpc.serve_rpc(ServerHandler(), bind="tcp://*:*")
server_addr = list(server.transport.bindings())[0]
client = await aiozmq.rpc.connect_rpc(connect=server_addr, error_table=error_table)
try:
await client.call.remote("value")
except CustomError as exc:
exc.val == "value"
server.close()
client.close()
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
Translation instances of custom classes via RPC¶
import asyncio
import aiozmq.rpc
import msgpack
class Point:
def __init__(self, x, y):
self.x = x
self.y = y
def __eq__(self, other):
if isinstance(other, Point):
return (self.x, self.y) == (other.x, other.y)
return NotImplemented
translation_table = {
0: (
Point,
lambda value: msgpack.packb((value.x, value.y)),
lambda binary: Point(*msgpack.unpackb(binary)),
),
}
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote(self, val):
return val
async def go():
server = await aiozmq.rpc.serve_rpc(
ServerHandler(), bind="tcp://*:*", translation_table=translation_table
)
server_addr = list(server.transport.bindings())[0]
client = await aiozmq.rpc.connect_rpc(
connect=server_addr, translation_table=translation_table
)
ret = await client.call.remote(Point(1, 2))
assert ret == Point(1, 2)
server.close()
await server.wait_closed()
client.close()
await client.wait_closed()
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
Validation of RPC methods¶
import asyncio
import aiozmq.rpc
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote_func(self, a: int, b: int) -> int:
return a + b
async def go():
server = await aiozmq.rpc.serve_rpc(ServerHandler(), bind="tcp://*:*")
server_addr = list(server.transport.bindings())[0]
client = await aiozmq.rpc.connect_rpc(connect=server_addr)
try:
await client.call.unknown_function()
except aiozmq.rpc.NotFoundError as exc:
print("client.rpc.unknown_function(): {}".format(exc))
try:
await client.call.remote_func(bad_arg=1)
except aiozmq.rpc.ParametersError as exc:
print("client.rpc.remote_func(bad_arg=1): {}".format(exc))
try:
await client.call.remote_func(1)
except aiozmq.rpc.ParametersError as exc:
print("client.rpc.remote_func(1): {}".format(exc))
try:
await client.call.remote_func("a", "b")
except aiozmq.rpc.ParametersError as exc:
print("client.rpc.remote_func('a', 'b'): {}".format(exc))
server.close()
await server.wait_closed()
client.close()
await client.wait_closed()
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
RPC lookup in nested namespaces¶
import asyncio
import aiozmq.rpc
class Handler(aiozmq.rpc.AttrHandler):
def __init__(self, ident):
self.ident = ident
self.subhandler = SubHandler(self.ident, "subident")
@aiozmq.rpc.method
def a(self):
return (self.ident, "a")
class SubHandler(aiozmq.rpc.AttrHandler):
def __init__(self, ident, subident):
self.ident = ident
self.subident = subident
@aiozmq.rpc.method
def b(self):
return (self.ident, self.subident, "b")
async def go():
server = await aiozmq.rpc.serve_rpc(Handler("ident"), bind="tcp://*:*")
server_addr = list(server.transport.bindings())[0]
client = await aiozmq.rpc.connect_rpc(connect=server_addr)
ret = await client.call.a()
assert ("ident", "a") == ret
ret = await client.call.subhandler.b()
assert ("ident", "subident", "b") == ret
server.close()
await server.wait_closed()
client.close()
await client.wait_closed()
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
Use dict as RPC lookup table¶
import asyncio
import aiozmq.rpc
@aiozmq.rpc.method
def a():
return "a"
@aiozmq.rpc.method
def b():
return "b"
handlers_dict = {"a": a, "subnamespace": {"b": b}}
async def go():
server = await aiozmq.rpc.serve_rpc(handlers_dict, bind="tcp://*:*")
server_addr = list(server.transport.bindings())[0]
client = await aiozmq.rpc.connect_rpc(connect=server_addr)
ret = await client.call.a()
assert "a" == ret
ret = await client.call.subnamespace.b()
assert "b" == ret
server.close()
await server.wait_closed()
client.close()
await client.wait_closed()
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
Use dynamic RPC lookup¶
import asyncio
import aiozmq.rpc
class DynamicHandler(aiozmq.rpc.AttrHandler):
def __init__(self, namespace=()):
self.namespace = namespace
def __getitem__(self, key):
try:
return getattr(self, key)
except AttributeError:
return DynamicHandler(self.namespace + (key,))
@aiozmq.rpc.method
def func(self):
return (self.namespace, "val")
async def go():
server = await aiozmq.rpc.serve_rpc(DynamicHandler(), bind="tcp://*:*")
server_addr = list(server.transport.bindings())[0]
client = await aiozmq.rpc.connect_rpc(connect=server_addr)
ret = await client.call.func()
assert ((), "val") == ret, ret
ret = await client.call.a.func()
assert (("a",), "val") == ret, ret
ret = await client.call.a.b.func()
assert (("a", "b"), "val") == ret, ret
server.close()
await server.wait_closed()
client.close()
await client.wait_closed()
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
Socket event monitor¶
"""
This example demonstrates how to use the ZMQ socket monitor to receive
socket events.
The socket event monitor capability requires libzmq >= 4 and pyzmq >= 14.4.
"""
import asyncio
import aiozmq
import zmq
ZMQ_EVENTS = {
getattr(zmq, name): name.replace("EVENT_", "").lower().replace("_", " ")
for name in [i for i in dir(zmq) if i.startswith("EVENT_")]
}
def event_description(event):
"""Return a human readable description of the event"""
return ZMQ_EVENTS.get(event, "unknown")
class Protocol(aiozmq.ZmqProtocol):
def __init__(self):
self.wait_ready = asyncio.Future()
self.wait_done = asyncio.Future()
self.wait_closed = asyncio.Future()
self.count = 0
def connection_made(self, transport):
self.transport = transport
self.wait_ready.set_result(True)
def connection_lost(self, exc):
self.wait_closed.set_result(exc)
def msg_received(self, data):
# This protocol is used by both the Router and Dealer sockets in
# this example. Router sockets prefix messages with the identity
# of the sender and hence contain two frames in this simple test
# protocol.
if len(data) == 2:
identity, msg = data
assert msg == b"Hello"
self.transport.write([identity, b"World"])
else:
msg = data[0]
assert msg == b"World"
self.count += 1
if self.count >= 4:
self.wait_done.set_result(True)
def event_received(self, event):
print(
"event:{}, value:{}, endpoint:{}, description:{}".format(
event.event, event.value, event.endpoint, event_description(event.event)
)
)
async def go():
st, sp = await aiozmq.create_zmq_connection(
Protocol, zmq.ROUTER, bind="tcp://127.0.0.1:*"
)
await sp.wait_ready
addr = list(st.bindings())[0]
ct, cp = await aiozmq.create_zmq_connection(Protocol, zmq.DEALER, connect=addr)
await cp.wait_ready
# Enable the socket monitor on the client socket. Socket events
# are passed to the 'event_received' method on the client protocol.
await ct.enable_monitor()
# Trigger some socket events while also sending a message to the
# server. When the client protocol receives 4 response it will
# fire the wait_done future.
for i in range(4):
await asyncio.sleep(0.1)
await ct.disconnect(addr)
await asyncio.sleep(0.1)
await ct.connect(addr)
await asyncio.sleep(0.1)
ct.write([b"Hello"])
await cp.wait_done
# The socket monitor can be explicitly disabled if necessary.
# await ct.disable_monitor()
# If a socket monitor is left enabled on a socket being closed,
# the socket monitor will be closed automatically.
ct.close()
await cp.wait_closed
st.close()
await sp.wait_closed
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
# import logging
# logging.basicConfig(level=logging.DEBUG)
if zmq.zmq_version_info() < (4,) or zmq.pyzmq_version_info() < (14, 4):
raise NotImplementedError(
"Socket monitor requires libzmq >= 4 and pyzmq >= 14.4, "
"have libzmq:{}, pyzmq:{}".format(zmq.zmq_version(), zmq.pyzmq_version())
)
main()
Stream socket event monitor¶
import asyncio
import aiozmq
import zmq
async def monitor_stream(stream):
try:
while True:
event = await stream.read_event()
print(event)
except aiozmq.ZmqStreamClosed:
pass
async def go():
router = await aiozmq.create_zmq_stream(zmq.ROUTER, bind="tcp://127.0.0.1:*")
addr = list(router.transport.bindings())[0]
dealer = await aiozmq.create_zmq_stream(zmq.DEALER)
await dealer.transport.enable_monitor()
asyncio.Task(monitor_stream(dealer))
await dealer.transport.connect(addr)
for i in range(10):
msg = (b"data", b"ask", str(i).encode("utf-8"))
dealer.write(msg)
data = await router.read()
router.write(data)
answer = await dealer.read()
print(answer)
router.close()
dealer.close()
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
Synchronous and asynchronous code works together¶
import asyncio
import aiozmq
import zmq
import argparse
import time
def main():
ap = argparse.ArgumentParser()
ap.add_argument(
"--addr",
default="tcp://127.0.0.1:7777",
help="Address to use, default `%(default)s`",
)
gr = ap.add_mutually_exclusive_group()
gr.add_argument(
"--sync",
action="store_const",
dest="mode",
const=sync_main,
default=None,
help="Run synchronous example",
)
gr.add_argument(
"--async",
action="store_const",
dest="mode",
const=async_main,
help="Run asynchronous example",
)
ap.add_argument(
"--client", action="store_true", default=False, help="Run client part"
)
ap.add_argument(
"--server", action="store_true", default=False, help="Run server part"
)
options = ap.parse_args()
return options.mode(options)
def read_data():
return input("Enter some phrase: ").encode("utf-8").split()
def sync_main(options):
print("Running sync at {!r}".format(options.addr))
ctx = zmq.Context()
srv_sock = cl_sock = None
if options.server:
srv_sock = ctx.socket(zmq.ROUTER)
srv_sock.bind(options.addr)
if options.client:
cl_sock = ctx.socket(zmq.DEALER)
cl_sock.connect(options.addr)
data = read_data()
cl_sock.send_multipart(data)
print("Sync client write: {!r}".format(data))
while True:
if srv_sock:
try:
data = srv_sock.recv_multipart(zmq.NOBLOCK)
print("Sync server read: {!r}".format(data))
srv_sock.send_multipart(data)
print("Sync server write: {!r}".format(data))
except zmq.ZMQError:
pass
if cl_sock:
try:
data = cl_sock.recv_multipart(zmq.NOBLOCK)
print("Sync client read: {!r}".format(data))
return
except zmq.ZMQError:
pass
time.sleep(0.1)
def async_main(options):
print("Running async at {!r}".format(options.addr))
loop = asyncio.get_event_loop()
stop = asyncio.Future()
async def server():
router = await aiozmq.create_zmq_stream(zmq.ROUTER, bind=options.addr)
while True:
try:
data = await router.read()
except asyncio.CancelledError:
break
print("Async server read: {!r}".format(data))
router.write(data)
print("Async server write: {!r}".format(data))
router.close()
async def client():
dealer = await aiozmq.create_zmq_stream(zmq.DEALER, connect=options.addr)
data = read_data()
dealer.write(data)
print("Async client write: {!r}".format(data))
echo = await dealer.read()
print("Async client read: {!r}".format(echo))
stop.set_result(None)
tasks = []
if options.server:
tasks.append(asyncio.ensure_future(server()))
if options.client:
tasks.append(asyncio.ensure_future(client()))
if tasks:
try:
loop.run_until_complete(stop)
except KeyboardInterrupt:
loop.call_soon(loop.stop)
loop.run_forever()
loop.close()
if __name__ == "__main__":
main()