aiozmq¶
ZeroMQ integration with asyncio (PEP 3156).
Features¶
Implements
create_zmq_connection()
coroutine for making 0MQ connections.Provides
ZmqTransport
andZmqProtocol
Provides RPC Request-Reply, Push-Pull and Publish-Subscribe patterns for remote calls.
Note
The library works on Linux, MacOS X and Windows.
But Windows is a second-class citizen in ZeroMQ world, sorry.
Thus aiozmq has limited support for Windows also.
Limitations are:
You obviously cannot use ipc://name schema for endpoint
aiozmq`s loop
aiozmq.ZmqEventLoop
is built on top ofselect
system call, so it’s not fast comparing toasyncio.ProactorEventLoop
and it doesn’t support subprocesses.
Library Installation¶
The core requires only pyzmq and can be installed (with pyzmq as dependency) by executing:
pip3 install aiozmq
Also probably you want to use aiozmq.rpc
.
RPC module is optional and requires msgpack. You can install msgpack by executing:
pip3 install msgpack
Note
aiozmq can be executed by Python 3 only. The most Linux distributions uses pip3 for installing Python 3 libraries. But your system may be using Python 3 by default than try just pip instead of pip3. The same may be true for virtualenv, travis continuous integration system etc.
Source code¶
The project is hosted on GitHub
Please feel free to file an issue on bug tracker if you have found a bug or have some suggestion for library improvement.
The library uses Github Actions for Continious Integration.
Dependencies¶
Getting Started¶
Low-level request-reply example:
import asyncio
import aiozmq
import zmq
async def go():
router = await aiozmq.create_zmq_stream(
zmq.ROUTER,
bind='tcp://127.0.0.1:*')
addr = list(router.transport.bindings())[0]
dealer = await aiozmq.create_zmq_stream(
zmq.DEALER,
connect=addr)
for i in range(10):
msg = (b'data', b'ask', str(i).encode('utf-8'))
dealer.write(msg)
data = await router.read()
router.write(data)
answer = await dealer.read()
print(answer)
dealer.close()
router.close()
asyncio.run(go())
Example of RPC usage:
import asyncio
import aiozmq.rpc
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote_func(self, a:int, b:int) -> int:
return a + b
async def go():
server = await aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://127.0.0.1:5555')
client = await aiozmq.rpc.connect_rpc(
connect='tcp://127.0.0.1:5555')
ret = await client.call.remote_func(1, 2)
assert 3 == ret
server.close()
client.close()
asyncio.run(go())
Note
To execute the last example you need to install msgpack first.
Indices and tables¶
- Streams API
- create_zmq_stream
- ZmqStream
ZmqStream
ZmqStream.transport
ZmqStream.at_closing()
ZmqStream.close()
ZmqStream.drain()
ZmqStream.exception()
ZmqStream.get_extra_info()
ZmqStream.read()
ZmqStream.read_event()
ZmqStream.write()
ZmqStream.set_exception()
ZmqStream.set_transport()
ZmqStream.set_read_buffer_limits()
ZmqStream.feed_closing()
ZmqStream.feed_msg()
ZmqStream.feed_event()
- Exceptions
- Remote Procedure Calls
- Core API
- create_zmq_connection
- ZmqTransport
ZmqTransport
ZmqTransport.get_extra_info()
ZmqTransport.close()
ZmqTransport.write()
ZmqTransport.abort()
ZmqTransport.getsockopt()
ZmqTransport.setsockopt()
ZmqTransport.get_write_buffer_limits()
ZmqTransport.set_write_buffer_limits()
ZmqTransport.get_write_buffer_size()
ZmqTransport.pause_reading()
ZmqTransport.resume_reading()
ZmqTransport.bind()
ZmqTransport.unbind()
ZmqTransport.bindings()
ZmqTransport.connect()
ZmqTransport.disconnect()
ZmqTransport.connections()
ZmqTransport.subscribe()
ZmqTransport.unsubscribe()
ZmqTransport.subscriptions()
ZmqTransport.enable_monitor()
ZmqTransport.disable_monitor()
- ZmqProtocol
- Exception policy
- Getting aiozmq version
- Installing ZeroMQ event loop
- ZmqEventLoopPolicy
- ZmqEventLoop
- Examples of aiozmq usage
- Simple DEALER-ROUTER pair implemented on Core level
- DEALER-ROUTER pair implemented with streams
- Remote Procedure Call
- Pipeline aka Notifier
- Publish-Subscribe
- Translation RPC exceptions back to client
- Translation instances of custom classes via RPC
- Validation of RPC methods
- RPC lookup in nested namespaces
- Use dict as RPC lookup table
- Use dynamic RPC lookup
- Socket event monitor
- Stream socket event monitor
- Synchronous and asynchronous code works together
- Glossary