Patterns#
Here are typical patterns for processing messages in a WebSocket server or client. You will certainly implement some of them in your application.
This page gives examples of connection handlers for a server. However, they’re
also applicable to a client, simply by assuming that websocket
is a
connection created with connect()
.
WebSocket connections are long-lived. You will usually write a loop to process several messages during the lifetime of a connection.
Consumer#
To receive messages from the WebSocket connection:
async def consumer_handler(websocket):
async for message in websocket:
await consumer(message)
In this example, consumer()
is a coroutine implementing your business
logic for processing a message received on the WebSocket connection. Each
message may be str
or bytes
.
Iteration terminates when the client disconnects.
Producer#
To send messages to the WebSocket connection:
async def producer_handler(websocket):
while True:
message = await producer()
await websocket.send(message)
In this example, producer()
is a coroutine implementing your business
logic for generating the next message to send on the WebSocket connection.
Each message must be str
or bytes
.
Iteration terminates when the client disconnects
because send()
raises a
ConnectionClosed
exception,
which breaks out of the while True
loop.
Consumer and producer#
You can receive and send messages on the same WebSocket connection by combining the consumer and producer patterns. This requires running two tasks in parallel:
async def handler(websocket):
await asyncio.gather(
consumer_handler(websocket),
producer_handler(websocket),
)
If a task terminates, gather()
doesn’t cancel the other task.
This can result in a situation where the producer keeps running after the
consumer finished, which may leak resources.
Here’s a way to exit and close the WebSocket connection as soon as a task terminates, after canceling the other task:
async def handler(websocket):
consumer_task = asyncio.create_task(consumer_handler(websocket))
producer_task = asyncio.create_task(producer_handler(websocket))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
Registration#
To keep track of currently connected clients, you can register them when they connect and unregister them when they disconnect:
connected = set()
async def handler(websocket):
# Register.
connected.add(websocket)
try:
# Broadcast a message to all connected clients.
websockets.broadcast(connected, "Hello!")
await asyncio.sleep(10)
finally:
# Unregister.
connected.remove(websocket)
This example maintains the set of connected clients in memory. This works as long as you run a single process. It doesn’t scale to multiple processes.
Publish–subscribe#
If you plan to run multiple processes and you want to communicate updates between processes, then you must deploy a messaging system. You may find publish-subscribe functionality useful.
A complete implementation of this idea with Redis is described in the Django integration guide.