Core API¶
create_zmq_connection¶
-
aiozmq.
create_zmq_connection
(protocol_factory, zmq_type, *, bind=None, connect=None, zmq_sock=None, loop=None)[source]¶ Create a ZeroMQ connection.
This method is a coroutine.
If you don’t use bind or connect params you can do it later by
ZmqTransport.bind()
andZmqTransport.connect()
calls.Parameters: - protocol_factory (callable) – a factory that instantiates
ZmqProtocol
object. - zmq_type (int) – a type of ZeroMQ socket (zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB, zmq.PAIR*, zmq.DEALER, zmq.ROUTER, zmq.PULL, zmq.PUSH, etc.)
- bind (str or iterable of strings) –
endpoints specification.
Every endpoint generates call to
ZmqTransport.bind()
for accepting connections from specified endpoint.Other side should use connect parameter to connect to this transport.
- connect (str or iterable of strings) –
endpoints specification.
Every endpoint generates call to
ZmqTransport.connect()
for connecting transport to specified endpoint.Other side should use bind parameter to wait for incoming connections.
- zmq_sock (zmq.Socket) – a preexisting zmq socket that will be passed to returned transport.
- loop (asyncio.AbstractEventLoop) – optional event loop
instance,
None
for default event loop.
Returns: a pair of
(transport, protocol)
where transport supportsZmqTransport
interface.Return type: New in version 0.5.
- protocol_factory (callable) – a factory that instantiates
ZmqTransport¶
-
class
aiozmq.
ZmqTransport
[source]¶ Transport for ZeroMQ connections. Implements
asyncio.BaseTransport
interface.End user should never create
ZmqTransport
objects directly, he gets it byyield from aiozmq.create_zmq_connection()
call.-
get_extra_info
(key, default=None)¶ Return optional transport information if name is present otherwise return default.
ZmqTransport
supports the only valid key:"zmq_socket"
. The value iszmq.Socket
instance.Parameters: - name (str) – name of info record.
- default – default value
-
close
()¶ Close the transport.
Buffered data will be flushed asynchronously. No more data will be received. After all buffered data is flushed, the protocol’s
connection_lost()
method will (eventually) called with None as its argument.
-
write
(data)[source]¶ Write message to the transport.
Parameters: data – iterable to send as multipart message. This does not block; it buffers the data and arranges for it to be sent out asynchronously.
-
abort
()[source]¶ Close the transport immediately.
Buffered data will be lost. No more data will be received. The protocol’s
connection_lost()
method will (eventually) be called with None as it’s argument.
-
getsockopt
(option)[source]¶ Get ZeroMQ socket option.
Parameters: option (int) – a constant like zmq.SUBSCRIBE, zmq.UNSUBSCRIBE, zmq.TYPE etc.
For list of available options please see: http://api.zeromq.org/master:zmq-getsockopt
Returns: option value Raises: OSError – if call to ZeroMQ was unsuccessful.
-
setsockopt
(option, value)[source]¶ Set ZeroMQ socket option.
param int option: a constant like zmq.SUBSCRIBE, zmq.UNSUBSCRIBE, zmq.TYPE etc.
param value: a new option value, it’s type depend of option name.
For list of available options please see: http://api.zeromq.org/master:zmq-setsockopt
-
get_write_buffer_limits
()[source]¶ Get the high- and low-water limits for write flow control. Return a tuple
(low, high)
where low and high are positive number of bytes.Use
set_write_buffer_limits()
to set the limits.New in version 0.6.
-
-
set_write_buffer_limits
(high=None, low=None)[source]¶ Set the high- and low-water limits for write flow control.
Parameters: These two values control when to call the protocol’s
pause_writing()
andresume_writing()
methods. If specified, the low-water limit must be less than or equal to the high-water limit. Neither value can be negative.The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to a implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes
pause_writing()
to be called whenever the buffer becomes non-empty. Setting low to zero causesresume_writing()
to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.Use
get_write_buffer_limits()
to get the limits.
-
pause_reading
()[source]¶ Pause the receiving end.
No data will be passed to the protocol’s
ZmqProtocol.msg_received()
method untilZmqTransport.resume_reading()
is called.See also
ZmqTransport.resume_reading()
method.
-
resume_reading
()[source]¶ Resume the receiving end.
Data received will once again be passed to the protocol’s
ZmqProtocol.msg_received()
method.See also
ZmqTransport.pause_reading()
method.
-
bind
(endpoint)[source]¶ Bind transpot to endpoint. See http://api.zeromq.org/master:zmq-bind for details.
This method is a coroutine.
Parameters: endpoint – a string in format
transport://address
as ZeroMQ requires.Returns: bound endpoint, unwinding wildcards if needed.
Return type: Raises:
-
unbind
(endpoint)[source]¶ Unbind transpot from endpoint.
This method is a coroutine.
Parameters: endpoint – a string in format
transport://address
as ZeroMQ requires.Returns: None
Raises:
-
bindings
()[source]¶ Return immutable set of endpoints bound to transport.
Note
Returned endpoints include only ones that has been bound via
ZmqTransport.bind()
orcreate_zmq_connection()
calls and do not include bindings that have been done on zmq_sock beforecreate_zmq_connection()
call.
-
connect
(endpoint)[source]¶ Connect transpot to endpoint. See http://api.zeromq.org/master:zmq-connect for details.
This method is a coroutine.
Parameters: endpoint (str) –
a string in format
transport://address
as ZeroMQ requires.For tcp connections the endpoint should specify IPv4 or IPv6 address, not DNS name. Use
yield from get_event_loop().getaddrinfo(host, port)
for translating DNS into IP address.Returns: endpoint
Return type: Raises: - ValueError – if the endpoint is a tcp DNS address.
- OSError – on error from ZeroMQ layer
- TypeError – if endpoint is not a
str
-
disconnect
(endpoint)[source]¶ Disconnect transpot from endpoint.
This method is a coroutine.
Parameters: endpoint – a string in format
transport://address
as ZeroMQ requires.Returns: None
Raises:
-
connections
()[source]¶ Return immutable set of endpoints connected to transport.
Note
Returned endpoints include only ones that has been connected via
ZmqTransport.connect()
orcreate_zmq_connection()
calls and do not include connections that have been done to zmq_sock beforecreate_zmq_connection()
call.
-
subscribe
(value)[source]¶ Establish a new message filter on SUB transport.
Newly created SUB transports filters out all incoming messages, therefore you should call this method to establish an initial message filter.
An empty (
b''
) value subscribes to all incoming messages. A non-empty value subscribes to all messages beginning with the specified prefix. Multiple filters may be attached to a single SUB transport, in which case a message shall be accepted if it matches at least one filter.Parameters: value (bytes) – a filter value to add to SUB filters.
Raises: - NotImplementedError – the transport is not SUB.
- TypeError – when value is not bytes.
Warning
Unlike to ZeroMQ socket level the call first check for value in
ZmqTransport.subscriptions()
and does nothing if the transport already has been subscribed to the value.
-
unsubscribe
(value)[source]¶ Remove an existing message filter on a SUB transport.
The filter specified must match an existing filter previously established with the
ZmqTransport.subscribe()
.If the transport has several instances of the same filter attached the
.unsubscribe()
removes only one instance, leaving the rest in place and functional (if you useZmqTransport.subscribe()
to adding new filters that never happens, see difference between aiozmq and ZeroMQ raw sockets for details).Parameters: value (bytes) – a filter value to add to SUB filters.
Raises: - NotImplementedError – the transport is not SUB.
- TypeError – when value is not bytes.
-
subscriptions
()[source]¶ Return immutable set of subscriptions (set of bytes) subscribed on transport.
Note
Returned subscriptions include only ones that has been subscribed via
ZmqTransport.subscribe()
call and do not include subscribtions that have been done to zmq_sock beforecreate_zmq_connection()
call.Raises: NotImplementedError – the transport is not SUB.
-
enable_monitor
(events=None)[source]¶ Enables socket events to be reported for this socket. Socket events are passed to the protocol’s
ZmqProtocol.event_received()
method.The socket event monitor capability requires
libzmq >= 4
andpyzmq >= 14.4
.This method is a coroutine.
Parameters: events – a bitmask of socket events to watch for. If no value is specified then all events will monitored (i.e. zmq.EVENT_ALL
). For list of available events please see: http://api.zeromq.org/4-0:zmq-socket-monitorRaises: NotImplementedError – if libzmq or pyzmq versions do not support socket monitoring. New in version 0.7.
-
ZmqProtocol¶
-
class
aiozmq.
ZmqProtocol
[source]¶ Protocol for ZeroMQ connections. Derives from
asyncio.BaseProtocol
.-
connection_made
(transport)¶ Called when a connection is made.
Parameters: transport (ZmqTransport) – representing the pipe connection. To receive data, wait for msg_received()
calls. When the connection is closed,connection_lost()
is called.
-
connection_lost
(exc)¶ Called when the connection is lost or closed.
Parameters: exc (instance of Exception
or derived class) – an exception object or None (the latter meaning the connection was aborted or closed).
-
pause_writing
()¶ Called when the transport’s buffer goes over the high-water mark.
Pause and resume calls are paired –
pause_writing()
is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventuallyresume_writing()
is called once when the buffer size reaches the low-water mark.Note that if the buffer size equals the high-water mark,
pause_writing()
is not called – it must go strictly over. Conversely,resume_writing()
is called when the buffer size is equal or lower than the low-water mark. These end conditions are important to ensure that things go as expected when either mark is zero.Note
This is the only Protocol callback that is not called through
asyncio.AbstractEventLoop.call_soon()
– if it were, it would have no effect when it’s most needed (when the app keeps writing without yielding untilpause_writing()
is called).
-
resume_writing
()¶ Called when the transport’s buffer drains below the low-water mark.
See
pause_writing()
for details.
-
Exception policy¶
Every call to zmq.Socket
method can raise
zmq.ZMQError
exception. But all methods of
ZmqEventLoop
and ZmqTransport
translate ZMQError
into OSError
(or descendat) with errno and strerror borrowed
from underlying ZMQError values.
The reason for translation is that Python 3.3 implements PEP 3151
— Reworking the OS and IO Exception Hierarchy which gets rid of
exceptions zoo and uses OSError
and descendants for all
exceptions generated by system function calls.
aiozmq
implements the same pattern. Internally it looks like:
try:
return self._zmq_sock.getsockopt(option)
except zmq.ZMQError as exc:
raise OSError(exc.errno, exc.strerror)
Also public methods of aiozmq
will never raise
InterruptedError
(aka EINTR), they process interruption
internally.
Getting aiozmq version¶
-
aiozmq.
version
¶ a text version of the library:
'0.1.0 , Python 3.3.2+ (default, Feb 28 2014, 00:52:16) \n[GCC 4.8.1]'
-
aiozmq.
version_info
¶ a named tuple with version information, useful for comparison:
VersionInfo(major=0, minor=1, micro=0, releaselevel='alpha', serial=0)
The Python itself uses the same schema (
sys.version_info
).
Installing ZeroMQ event loop¶
Deprecated since version 0.5: aiozmq
works with any asyncio event loop, it doesn’t
require dedicated event loop policy.
To use ZeroMQ layer you may install proper event loop first.
The recommended way is to setup global event loop policy:
import asyncio
import aiozmq
asyncio.set_event_loop_policy(aiozmq.ZmqEventLoopPolicy())
That installs ZmqEventLoopPolicy
globally. After installing
you can get event loop instance from main thread by
asyncio.get_event_loop()
call:
loop = asyncio.get_event_loop()
If you need to execute event loop in your own (not main) thread you have to set it up first:
import threading
def thread_func():
loop = asyncio.new_event_loop()
asyncio.set_event_loop()
loop.run_forever()
thread = threading.Thread(target=thread_func)
thread.start()
ZmqEventLoopPolicy¶
Deprecated since version 0.5: aiozmq
works with any asyncio event loop, it doesn’t
require dedicated event loop policy.
ZeroMQ policy implementation for accessing the event loop.
In this policy, each thread has its own event loop. However, we only automatically create an event loop by default for the main thread; other threads by default have no event loop.
ZmqEventLoopPolicy
implements an
asyncio.AbstractEventLoopPolicy
interface.
-
class
aiozmq.
ZmqEventLoopPolicy
[source]¶ Create policy for ZeroMQ event loops.
Note
policy should be installed, see Installing ZeroMQ event loop.
-
get_event_loop
()[source]¶ Get the event loop.
If current thread is the main thread and there are no registered event loop for current thread then the call creates new event loop and registers it.
Returns: Return an instance of ZmqEventLoop
.Raises: RuntimeError – if there is no registered event loop for current thread.
-
new_event_loop
()[source]¶ Create a new event loop.
You must call
ZmqEventLoopPolicy.set_event_loop()
to make this the current event loop.
-
set_event_loop
(loop)[source]¶ Set the event loop.
As a side effect, if a child watcher was set before, then calling
.set_event_loop()
from the main thread will callasyncio.AbstractChildWatcher.attach_loop()
on the child watcher.Parameters: loop – an asyncio.AbstractEventLoop
instance or NoneRaises: TypeError – if loop is not instance of asyncio.AbstractEventLoop
-
get_child_watcher
()[source]¶ Get the child watcher
If not yet set, a
asyncio.SafeChildWatcher
object is automatically created.Returns: Return an instance of asyncio.AbstractChildWatcher
.
-
set_child_watcher
(watcher)[source]¶ Set the child watcher.
Parameters: watcher – an asyncio.AbstractChildWatcher
instance or NoneRaises: TypeError – if watcher is not instance of asyncio.AbstractChildWatcher
-
ZmqEventLoop¶
Deprecated since version 0.5: aiozmq
works with any asyncio event loop, it doesn’t
require dedicated event loop object.
Event loop with ZeroMQ support.
Follows asyncio.AbstractEventLoop
specification and has
create_zmq_connection()
method for ZeroMQ
sockets layer.
-
class
aiozmq.
ZmqEventLoop
(*, zmq_context=None)[source]¶ Parameters: zmq_context (zmq.Context) – explicit context to use for ZeroMQ socket creation inside ZmqEventLoop.create_zmq_connection()
calls.aiozmq
shares global context returned byzmq.Context.instance()
call if zmq_context parameter isNone
.-
create_zmq_connection
(protocol_factory, zmq_type, *, bind=None, connect=None, zmq_sock=None)[source]¶ Create a ZeroMQ connection.
If you don’t use bind or connect params you can do it later by
ZmqTransport.bind()
andZmqTransport.connect()
calls.Parameters: - protocol_factory (callable) – a factory that instantiates
ZmqProtocol
object. - zmq_type (int) – a type of ZeroMQ socket (zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB, zmq.PAIR*, zmq.DEALER, zmq.ROUTER, zmq.PULL, zmq.PUSH, etc.)
- bind (str or iterable of strings) –
endpoints specification.
Every endpoint generates call to
ZmqTransport.bind()
for accepting connections from specified endpoint.Other side should use connect parameter to connect to this transport.
- connect (str or iterable of strings) –
endpoints specification.
Every endpoint generates call to
ZmqTransport.connect()
for connecting transport to specified endpoint.Other side should use bind parameter to wait for incoming connections.
- zmq_sock (zmq.Socket) – a preexisting zmq socket that will be passed to returned transport.
Returns: a pair of
(transport, protocol)
where transport supportsZmqTransport
interface.Return type: - protocol_factory (callable) – a factory that instantiates
-