Examples of aiozmq usage¶
There is a list of examples from aiozmq/examples
Every example is a correct tiny python program.
Simple DEALER-ROUTER pair implemented on Core level¶
import asyncio
import aiozmq
import zmq
class ZmqDealerProtocol(aiozmq.ZmqProtocol):
transport = None
def __init__(self, queue, on_close):
self.queue = queue
self.on_close = on_close
def connection_made(self, transport):
self.transport = transport
def msg_received(self, msg):
self.queue.put_nowait(msg)
def connection_lost(self, exc):
self.on_close.set_result(exc)
class ZmqRouterProtocol(aiozmq.ZmqProtocol):
transport = None
def __init__(self, on_close):
self.on_close = on_close
def connection_made(self, transport):
self.transport = transport
def msg_received(self, msg):
self.transport.write(msg)
def connection_lost(self, exc):
self.on_close.set_result(exc)
@asyncio.coroutine
def go():
router_closed = asyncio.Future()
dealer_closed = asyncio.Future()
router, _ = yield from aiozmq.create_zmq_connection(
lambda: ZmqRouterProtocol(router_closed),
zmq.ROUTER,
bind='tcp://127.0.0.1:*')
addr = list(router.bindings())[0]
queue = asyncio.Queue()
dealer, _ = yield from aiozmq.create_zmq_connection(
lambda: ZmqDealerProtocol(queue, dealer_closed),
zmq.DEALER,
connect=addr)
for i in range(10):
msg = (b'data', b'ask', str(i).encode('utf-8'))
dealer.write(msg)
answer = yield from queue.get()
print(answer)
dealer.close()
yield from dealer_closed
router.close()
yield from router_closed
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
DEALER-ROUTER pair implemented with streams¶
import asyncio
import aiozmq
import zmq
@asyncio.coroutine
def go():
router = yield from aiozmq.create_zmq_stream(
zmq.ROUTER,
bind='tcp://127.0.0.1:*')
addr = list(router.transport.bindings())[0]
dealer = yield from aiozmq.create_zmq_stream(
zmq.DEALER,
connect=addr)
for i in range(10):
msg = (b'data', b'ask', str(i).encode('utf-8'))
dealer.write(msg)
data = yield from router.read()
router.write(data)
answer = yield from dealer.read()
print(answer)
dealer.close()
router.close()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Remote Procedure Call¶
import asyncio
import aiozmq.rpc
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote_func(self, a: int, b: int) -> int:
return a + b
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://*:*')
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
ret = yield from client.call.remote_func(1, 2)
assert 3 == ret
server.close()
yield from server.wait_closed()
client.close()
yield from client.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Pipeline aka Notifier¶
import asyncio
import aiozmq.rpc
from itertools import count
class Handler(aiozmq.rpc.AttrHandler):
def __init__(self):
self.connected = False
@aiozmq.rpc.method
def remote_func(self, step, a: int, b: int):
self.connected = True
print("HANDLER", step, a, b)
@asyncio.coroutine
def go():
handler = Handler()
listener = yield from aiozmq.rpc.serve_pipeline(
handler, bind='tcp://*:*')
listener_addr = list(listener.transport.bindings())[0]
notifier = yield from aiozmq.rpc.connect_pipeline(
connect=listener_addr)
for step in count(0):
yield from notifier.notify.remote_func(step, 1, 2)
if handler.connected:
break
else:
yield from asyncio.sleep(0.01)
listener.close()
yield from listener.wait_closed()
notifier.close()
yield from notifier.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Publish-Subscribe¶
import asyncio
import aiozmq.rpc
from itertools import count
class Handler(aiozmq.rpc.AttrHandler):
def __init__(self):
self.connected = False
@aiozmq.rpc.method
def remote_func(self, step, a: int, b: int):
self.connected = True
print("HANDLER", step, a, b)
@asyncio.coroutine
def go():
handler = Handler()
subscriber = yield from aiozmq.rpc.serve_pubsub(
handler, subscribe='topic', bind='tcp://127.0.0.1:*',
log_exceptions=True)
subscriber_addr = list(subscriber.transport.bindings())[0]
print("SERVE", subscriber_addr)
publisher = yield from aiozmq.rpc.connect_pubsub(
connect=subscriber_addr)
for step in count(0):
yield from publisher.publish('topic').remote_func(step, 1, 2)
if handler.connected:
break
else:
yield from asyncio.sleep(0.1)
subscriber.close()
yield from subscriber.wait_closed()
publisher.close()
yield from publisher.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Translation RPC exceptions back to client¶
import asyncio
import aiozmq.rpc
class CustomError(Exception):
def __init__(self, val):
self.val = val
super().__init__(val)
exc_name = CustomError.__module__+'.'+CustomError.__name__
error_table = {exc_name: CustomError}
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote(self, val):
raise CustomError(val)
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://*:*')
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr,
error_table=error_table)
try:
yield from client.call.remote('value')
except CustomError as exc:
exc.val == 'value'
server.close()
client.close()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Translation instances of custom classes via RPC¶
import asyncio
import aiozmq.rpc
import msgpack
class Point:
def __init__(self, x, y):
self.x = x
self.y = y
def __eq__(self, other):
if isinstance(other, Point):
return (self.x, self.y) == (other.x, other.y)
return NotImplemented
translation_table = {
0: (Point,
lambda value: msgpack.packb((value.x, value.y)),
lambda binary: Point(*msgpack.unpackb(binary))),
}
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote(self, val):
return val
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://*:*',
translation_table=translation_table)
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr,
translation_table=translation_table)
ret = yield from client.call.remote(Point(1, 2))
assert ret == Point(1, 2)
server.close()
yield from server.wait_closed()
client.close()
yield from client.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Validation of RPC methods¶
import asyncio
import aiozmq.rpc
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote_func(self, a: int, b: int) -> int:
return a + b
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://*:*')
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
try:
yield from client.call.unknown_function()
except aiozmq.rpc.NotFoundError as exc:
print("client.rpc.unknown_function(): {}".format(exc))
try:
yield from client.call.remote_func(bad_arg=1)
except aiozmq.rpc.ParametersError as exc:
print("client.rpc.remote_func(bad_arg=1): {}".format(exc))
try:
yield from client.call.remote_func(1)
except aiozmq.rpc.ParametersError as exc:
print("client.rpc.remote_func(1): {}".format(exc))
try:
yield from client.call.remote_func('a', 'b')
except aiozmq.rpc.ParametersError as exc:
print("client.rpc.remote_func('a', 'b'): {}".format(exc))
server.close()
yield from server.wait_closed()
client.close()
yield from client.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
RPC lookup in nested namespaces¶
import asyncio
import aiozmq.rpc
class Handler(aiozmq.rpc.AttrHandler):
def __init__(self, ident):
self.ident = ident
self.subhandler = SubHandler(self.ident, 'subident')
@aiozmq.rpc.method
def a(self):
return (self.ident, 'a')
class SubHandler(aiozmq.rpc.AttrHandler):
def __init__(self, ident, subident):
self.ident = ident
self.subident = subident
@aiozmq.rpc.method
def b(self):
return (self.ident, self.subident, 'b')
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
Handler('ident'), bind='tcp://*:*')
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
ret = yield from client.call.a()
assert ('ident', 'a') == ret
ret = yield from client.call.subhandler.b()
assert ('ident', 'subident', 'b') == ret
server.close()
yield from server.wait_closed()
client.close()
yield from client.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Use dict as RPC lookup table¶
import asyncio
import aiozmq.rpc
@aiozmq.rpc.method
def a():
return 'a'
@aiozmq.rpc.method
def b():
return 'b'
handlers_dict = {'a': a,
'subnamespace': {'b': b}}
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
handlers_dict, bind='tcp://*:*')
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
ret = yield from client.call.a()
assert 'a' == ret
ret = yield from client.call.subnamespace.b()
assert 'b' == ret
server.close()
yield from server.wait_closed()
client.close()
yield from client.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Use dynamic RPC lookup¶
import asyncio
import aiozmq.rpc
class DynamicHandler(aiozmq.rpc.AttrHandler):
def __init__(self, namespace=()):
self.namespace = namespace
def __getitem__(self, key):
try:
return getattr(self, key)
except AttributeError:
return DynamicHandler(self.namespace + (key,))
@aiozmq.rpc.method
def func(self):
return (self.namespace, 'val')
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
DynamicHandler(), bind='tcp://*:*')
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
ret = yield from client.call.func()
assert ((), 'val') == ret, ret
ret = yield from client.call.a.func()
assert (('a',), 'val') == ret, ret
ret = yield from client.call.a.b.func()
assert (('a', 'b'), 'val') == ret, ret
server.close()
yield from server.wait_closed()
client.close()
yield from client.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Socket event monitor¶
'''
This example demonstrates how to use the ZMQ socket monitor to receive
socket events.
The socket event monitor capability requires libzmq >= 4 and pyzmq >= 14.4.
'''
import asyncio
import aiozmq
import zmq
ZMQ_EVENTS = {
getattr(zmq, name): name.replace('EVENT_', '').lower().replace('_', ' ')
for name in [i for i in dir(zmq) if i.startswith('EVENT_')]}
def event_description(event):
''' Return a human readable description of the event '''
return ZMQ_EVENTS.get(event, 'unknown')
class Protocol(aiozmq.ZmqProtocol):
def __init__(self):
self.wait_ready = asyncio.Future()
self.wait_done = asyncio.Future()
self.wait_closed = asyncio.Future()
self.count = 0
def connection_made(self, transport):
self.transport = transport
self.wait_ready.set_result(True)
def connection_lost(self, exc):
self.wait_closed.set_result(exc)
def msg_received(self, data):
# This protocol is used by both the Router and Dealer sockets in
# this example. Router sockets prefix messages with the identity
# of the sender and hence contain two frames in this simple test
# protocol.
if len(data) == 2:
identity, msg = data
assert msg == b'Hello'
self.transport.write([identity, b'World'])
else:
msg = data[0]
assert msg == b'World'
self.count += 1
if self.count >= 4:
self.wait_done.set_result(True)
def event_received(self, event):
print(
'event:{}, value:{}, endpoint:{}, description:{}'.format(
event.event, event.value, event.endpoint,
event_description(event.event)))
@asyncio.coroutine
def go():
st, sp = yield from aiozmq.create_zmq_connection(
Protocol, zmq.ROUTER, bind='tcp://127.0.0.1:*')
yield from sp.wait_ready
addr = list(st.bindings())[0]
ct, cp = yield from aiozmq.create_zmq_connection(
Protocol, zmq.DEALER, connect=addr)
yield from cp.wait_ready
# Enable the socket monitor on the client socket. Socket events
# are passed to the 'event_received' method on the client protocol.
yield from ct.enable_monitor()
# Trigger some socket events while also sending a message to the
# server. When the client protocol receives 4 response it will
# fire the wait_done future.
for i in range(4):
yield from asyncio.sleep(0.1)
yield from ct.disconnect(addr)
yield from asyncio.sleep(0.1)
yield from ct.connect(addr)
yield from asyncio.sleep(0.1)
ct.write([b'Hello'])
yield from cp.wait_done
# The socket monitor can be explicitly disabled if necessary.
# yield from ct.disable_monitor()
# If a socket monitor is left enabled on a socket being closed,
# the socket monitor will be closed automatically.
ct.close()
yield from cp.wait_closed
st.close()
yield from sp.wait_closed
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
# import logging
# logging.basicConfig(level=logging.DEBUG)
if (zmq.zmq_version_info() < (4,) or
zmq.pyzmq_version_info() < (14, 4,)):
raise NotImplementedError(
"Socket monitor requires libzmq >= 4 and pyzmq >= 14.4, "
"have libzmq:{}, pyzmq:{}".format(
zmq.zmq_version(), zmq.pyzmq_version()))
main()
Stream socket event monitor¶
import asyncio
import aiozmq
import zmq
@asyncio.coroutine
def monitor_stream(stream):
try:
while True:
event = yield from stream.read_event()
print(event)
except aiozmq.ZmqStreamClosed:
pass
@asyncio.coroutine
def go():
router = yield from aiozmq.create_zmq_stream(
zmq.ROUTER,
bind='tcp://127.0.0.1:*')
addr = list(router.transport.bindings())[0]
dealer = yield from aiozmq.create_zmq_stream(
zmq.DEALER)
yield from dealer.transport.enable_monitor()
asyncio.Task(monitor_stream(dealer))
yield from dealer.transport.connect(addr)
for i in range(10):
msg = (b'data', b'ask', str(i).encode('utf-8'))
dealer.write(msg)
data = yield from router.read()
router.write(data)
answer = yield from dealer.read()
print(answer)
router.close()
dealer.close()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Synchronous and asynchronous code works together¶
import asyncio
import aiozmq
import zmq
import argparse
import time
def main():
ap = argparse.ArgumentParser()
ap.add_argument('--addr', default='tcp://127.0.0.1:7777',
help="Address to use, default `%(default)s`")
gr = ap.add_mutually_exclusive_group()
gr.add_argument('--sync', action='store_const', dest='mode',
const=sync_main, default=None,
help="Run synchronous example")
gr.add_argument('--async', action='store_const', dest='mode',
const=async_main,
help="Run asynchronous example")
ap.add_argument('--client', action='store_true', default=False,
help="Run client part")
ap.add_argument('--server', action='store_true', default=False,
help="Run server part")
options = ap.parse_args()
return options.mode(options)
def read_data():
return input("Enter some phrase: ").encode('utf-8').split()
def sync_main(options):
print("Running sync at {!r}".format(options.addr))
ctx = zmq.Context()
srv_sock = cl_sock = None
if options.server:
srv_sock = ctx.socket(zmq.ROUTER)
srv_sock.bind(options.addr)
if options.client:
cl_sock = ctx.socket(zmq.DEALER)
cl_sock.connect(options.addr)
data = read_data()
cl_sock.send_multipart(data)
print("Sync client write: {!r}".format(data))
while True:
if srv_sock:
try:
data = srv_sock.recv_multipart(zmq.NOBLOCK)
print("Sync server read: {!r}".format(data))
srv_sock.send_multipart(data)
print("Sync server write: {!r}".format(data))
except zmq.ZMQError:
pass
if cl_sock:
try:
data = cl_sock.recv_multipart(zmq.NOBLOCK)
print("Sync client read: {!r}".format(data))
return
except zmq.ZMQError:
pass
time.sleep(.1)
def async_main(options):
print("Running async at {!r}".format(options.addr))
loop = asyncio.get_event_loop()
stop = asyncio.Future()
@asyncio.coroutine
def server():
router = yield from aiozmq.create_zmq_stream(
zmq.ROUTER, bind=options.addr)
while True:
try:
data = yield from router.read()
except asyncio.CancelledError:
break
print("Async server read: {!r}".format(data))
router.write(data)
print("Async server write: {!r}".format(data))
router.close()
@asyncio.coroutine
def client():
dealer = yield from aiozmq.create_zmq_stream(
zmq.DEALER, connect=options.addr)
data = read_data()
dealer.write(data)
print("Async client write: {!r}".format(data))
echo = yield from dealer.read()
print("Async client read: {!r}".format(echo))
stop.set_result(None)
tasks = []
if options.server:
tasks.append(asyncio.ensure_future(server()))
if options.client:
tasks.append(asyncio.ensure_future(client()))
if tasks:
try:
loop.run_until_complete(stop)
except KeyboardInterrupt:
loop.call_soon(loop.stop)
loop.run_forever()
loop.close()
if __name__ == '__main__':
main()