aiozmq¶
ZeroMQ integration with asyncio (PEP 3156).
Features¶
- Implements
create_zmq_connection()
coroutine for making 0MQ connections. - Provides
ZmqTransport
andZmqProtocol
- Provides RPC Request-Reply, Push-Pull and Publish-Subscribe patterns for remote calls.
Note
The library works on Linux, MacOS X and Windows.
But Windows is a second-class citizen in ZeroMQ world, sorry.
Thus aiozmq has limited support for Windows also.
Limitations are:
- You obviously cannot use ipc://name schema for endpoint
- aiozmq`s loop
aiozmq.ZmqEventLoop
is built on top ofselect
system call, so it’s not fast comparing toasyncio.ProactorEventLoop
and it doesn’t support subprocesses.
Library Installation¶
The core requires only pyzmq and can be installed (with pyzmq as dependency) by executing:
pip3 install aiozmq
Also probably you want to use aiozmq.rpc
.
RPC module is optional and requires msgpack. You can install msgpack by executing:
pip3 install msgpack
Note
aiozmq can be executed by Python 3 only. The most Linux distributions uses pip3 for installing Python 3 libraries. But your system may be using Python 3 by default than try just pip instead of pip3. The same may be true for virtualenv, travis continuous integration system etc.
Source code¶
The project is hosted on GitHub
Please feel free to file an issue on bug tracker if you have found a bug or have some suggestion for library improvement.
The library uses Travis for Continious Integration.
Dependencies¶
Authors and License¶
The aiozmq
package is initially written by Nikolay Kim, now
maintained by Andrew Svetlov. It’s BSD licensed and freely available.
Feel free to improve this package and send a pull request to GitHub.
Getting Started¶
Low-level request-reply example:
import asyncio
import aiozmq
import zmq
@asyncio.coroutine
def go():
router = yield from aiozmq.create_zmq_stream(
zmq.ROUTER,
bind='tcp://127.0.0.1:*')
addr = list(router.transport.bindings())[0]
dealer = yield from aiozmq.create_zmq_stream(
zmq.DEALER,
connect=addr)
for i in range(10):
msg = (b'data', b'ask', str(i).encode('utf-8'))
dealer.write(msg)
data = yield from router.read()
router.write(data)
answer = yield from dealer.read()
print(answer)
dealer.close()
router.close()
asyncio.get_event_loop().run_until_complete(go())
Example of RPC usage:
import asyncio
import aiozmq.rpc
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote_func(self, a:int, b:int) -> int:
return a + b
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://127.0.0.1:5555')
client = yield from aiozmq.rpc.connect_rpc(
connect='tcp://127.0.0.1:5555')
ret = yield from client.call.remote_func(1, 2)
assert 3 == ret
server.close()
client.close()
asyncio.get_event_loop().run_until_complete(go())
Note
To execute the last example you need to install msgpack first.
Indices and tables¶
- Streams API
- Remote Procedure Calls
- Core API
- Examples of aiozmq usage
- Simple DEALER-ROUTER pair implemented on Core level
- DEALER-ROUTER pair implemented with streams
- Remote Procedure Call
- Pipeline aka Notifier
- Publish-Subscribe
- Translation RPC exceptions back to client
- Translation instances of custom classes via RPC
- Validation of RPC methods
- RPC lookup in nested namespaces
- Use dict as RPC lookup table
- Use dynamic RPC lookup
- Socket event monitor
- Stream socket event monitor
- Synchronous and asynchronous code works together
- Glossary