Source code for aiozmq.interface
from asyncio import BaseProtocol, BaseTransport
__all__ = ["ZmqTransport", "ZmqProtocol"]
[docs]class ZmqTransport(BaseTransport):
"""Interface for ZeroMQ transport."""
[docs] def write(self, data):
"""Write message to the transport.
data is iterable to send as multipart message.
This does not block; it buffers the data and arranges for it
to be sent out asynchronously.
"""
raise NotImplementedError
[docs] def abort(self):
"""Close the transport immediately.
Buffered data will be lost. No more data will be received.
The protocol's connection_lost() method will (eventually) be
called with None as its argument.
"""
raise NotImplementedError
[docs] def getsockopt(self, option):
"""Get ZeroMQ socket option.
option is a constant like zmq.SUBSCRIBE, zmq.UNSUBSCRIBE, zmq.TYPE etc.
For list of available options please see:
http://api.zeromq.org/master:zmq-getsockopt
"""
raise NotImplementedError
[docs] def setsockopt(self, option, value):
"""Set ZeroMQ socket option.
option is a constant like zmq.SUBSCRIBE, zmq.UNSUBSCRIBE, zmq.TYPE etc.
value is a new option value, it's type depend of option name.
For list of available options please see:
http://api.zeromq.org/master:zmq-setsockopt
"""
raise NotImplementedError
[docs] def set_write_buffer_limits(self, high=None, low=None):
"""Set the high- and low-water limits for write flow control.
These two values control when to call the protocol's
pause_writing() and resume_writing() methods. If specified,
the low-water limit must be less than or equal to the
high-water limit. Neither value can be negative.
The defaults are implementation-specific. If only the
high-water limit is given, the low-water limit defaults to a
implementation-specific value less than or equal to the
high-water limit. Setting high to zero forces low to zero as
well, and causes pause_writing() to be called whenever the
buffer becomes non-empty. Setting low to zero causes
resume_writing() to be called only once the buffer is empty.
Use of zero for either limit is generally sub-optimal as it
reduces opportunities for doing I/O and computation
concurrently.
"""
raise NotImplementedError
[docs] def get_write_buffer_limits(self):
raise NotImplementedError
[docs] def get_write_buffer_size(self):
"""Return the current size of the write buffer."""
raise NotImplementedError
[docs] def pause_reading(self):
"""Pause the receiving end.
No data will be passed to the protocol's msg_received()
method until resume_reading() is called.
"""
raise NotImplementedError
[docs] def resume_reading(self):
"""Resume the receiving end.
Data received will once again be passed to the protocol's
msg_received() method.
"""
raise NotImplementedError
[docs] def bind(self, endpoint):
"""Bind transpot to endpoint.
endpoint is a string in format transport://address as ZeroMQ requires.
Return bound endpoint, unwinding wildcards if needed.
"""
raise NotImplementedError
[docs] def unbind(self, endpoint):
"""Unbind transpot from endpoint."""
raise NotImplementedError
[docs] def bindings(self):
"""Return immutable set of endpoints bound to transport.
N.B. returned endpoints includes only ones that has been bound
via transport.bind or event_loop.create_zmq_connection calls
and does not includes bindings that has been done to zmq_sock
before create_zmq_connection has been called.
"""
raise NotImplementedError
[docs] def connect(self, endpoint):
"""Connect transport to endpoint.
endpoint is a string in format transport://address as ZeroMQ requires.
For TCP connections endpoint should specify IPv4 or IPv6 address,
not DNS name.
Use await get_event_loop().getaddrinfo(host, port)
for translating DNS into address.
Raise ValueError if endpoint is tcp DNS address.
Return bound connection, unwinding wildcards if needed.
"""
raise NotImplementedError
[docs] def disconnect(self, endpoint):
"""Disconnect transpot from endpoint."""
raise NotImplementedError
[docs] def connections(self):
"""Return immutable set of endpoints connected to transport.
N.B. returned endpoints includes only ones that has been connected
via transport.connect or event_loop.create_zmq_connection calls
and does not includes connections that has been done to zmq_sock
before create_zmq_connection has been called.
"""
raise NotImplementedError
[docs] def subscribe(self, value):
"""Establish a new message filter on SUB transport.
Newly created SUB transports filters out all incoming
messages, therefore you should to call this method to
establish an initial message filter.
Value should be bytes.
An empty (b'') value subscribes to all incoming messages. A
non-empty value subscribes to all messages beginning with the
specified prefix. Multiple filters may be attached to a single
SUB transport, in which case a message shall be accepted if it
matches at least one filter.
"""
raise NotImplementedError
[docs] def unsubscribe(self, value):
"""Remove an existing message filter on a SUB transport.
Value should be bytes.
The filter specified must match an existing filter previously
established with the .subscribe(). If the transport has
several instances of the same filter attached the
.unsubscribe() removes only one instance, leaving
the rest in place and functional.
"""
raise NotImplementedError
[docs] def subscriptions(self):
"""Return immutable set of subscriptions (bytes) subscribed on
transport.
N.B. returned subscriptions includes only ones that has been
subscribed via transport.subscribe call and does not includes
subscribtions that has been done to zmq_sock before
create_zmq_connection has been called.
"""
raise NotImplementedError
[docs] async def enable_monitor(self, events=None):
"""Enables socket events to be reported for this socket.
Socket events are passed to the protocol's ZmqProtocol's
event_received method.
This method is a coroutine.
The socket event monitor capability requires libzmq >= 4 and
pyzmq >= 14.4.
events is a bitmask (e.g zmq.EVENT_CONNECTED) defining the events
to monitor. Default is all events (i.e. zmq.EVENT_ALL).
For list of available events please see:
http://api.zeromq.org/4-0:zmq-socket-monitor
Raise NotImplementedError if libzmq or pyzmq versions do not support
socket monitoring.
"""
raise NotImplementedError
[docs] def disable_monitor(self):
"""Stop the socket event monitor."""
raise NotImplementedError
[docs]class ZmqProtocol(BaseProtocol):
"""Interface for ZeroMQ protocol."""
[docs] def msg_received(self, data):
"""Called when some ZeroMQ message is received.
data is the multipart tuple of bytes with at least one item.
"""
[docs] def event_received(self, event):
"""Called when a ZeroMQ socket event is received.
This method is only called when a socket monitor is enabled.
:param event: A namedtuple containing 3 items `event`, `value`, and
`endpoint`.
"""