Integrate with Django#
If you’re looking at adding real-time capabilities to a Django project with WebSocket, you have two main options.
Using Django Channels, a project adding WebSocket to Django, among other features. This approach is fully supported by Django. However, it requires switching to a new deployment architecture.
Deploying a separate WebSocket server next to your Django project. This technique is well suited when you need to add a small set of real-time features — maybe a notification service — to an HTTP application.
This guide shows how to implement the second technique with websockets. It assumes familiarity with Django.
Authenticate connections#
Since the websockets server runs outside of Django, we need to integrate it
with django.contrib.auth
.
We will generate authentication tokens in the Django project. Then we will send them to the websockets server, where they will authenticate the user.
Generating a token for the current user and making it available in the browser is up to you. You could render the token in a template or fetch it with an API call.
Refer to the topic guide on authentication for details on this design.
Generate tokens#
We want secure, short-lived tokens containing the user ID. We’ll rely on django-sesame, a small library designed exactly for this purpose.
Add django-sesame to the dependencies of your Django project, install it, and configure it in the settings of the project:
AUTHENTICATION_BACKENDS = [
"django.contrib.auth.backends.ModelBackend",
"sesame.backends.ModelBackend",
]
(If your project already uses another authentication backend than the default
"django.contrib.auth.backends.ModelBackend"
, adjust accordingly.)
You don’t need "sesame.middleware.AuthenticationMiddleware"
. It is for
authenticating users in the Django server, while we’re authenticating them in
the websockets server.
We’d like our tokens to be valid for 30 seconds. We expect web pages to load and to establish the WebSocket connection within this delay. Configure django-sesame accordingly in the settings of your Django project:
SESAME_MAX_AGE = 30
If you expect your web site to load faster for all clients, a shorter lifespan is possible. However, in the context of this document, it would make manual testing more difficult.
You could also enable single-use tokens. However, this would update the last login date of the user every time a WebSocket connection is established. This doesn’t seem like a good idea, both in terms of behavior and in terms of performance.
Now you can generate tokens in a django-admin shell
as follows:
>>> from django.contrib.auth import get_user_model
>>> User = get_user_model()
>>> user = User.objects.get(username="<your username>")
>>> from sesame.utils import get_token
>>> get_token(user)
'<your token>'
Keep this console open: since tokens expire after 30 seconds, you’ll have to generate a new token every time you want to test connecting to the server.
Validate tokens#
Let’s move on to the websockets server.
Add websockets to the dependencies of your Django project and install it. Indeed, we’re going to reuse the environment of the Django project, so we can call its APIs in the websockets server.
Now here’s how to implement authentication.
#!/usr/bin/env python
import asyncio
import django
import websockets
django.setup()
from sesame.utils import get_user
async def handler(websocket):
sesame = await websocket.recv()
user = await asyncio.to_thread(get_user, sesame)
if user is None:
await websocket.close(1011, "authentication failed")
return
await websocket.send(f"Hello {user}!")
async def main():
async with websockets.serve(handler, "localhost", 8888):
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(main())
Let’s unpack this code.
We’re calling django.setup()
before doing anything with Django because
we’re using Django in a standalone script. This assumes that the
DJANGO_SETTINGS_MODULE
environment variable is set to the Python path to
your settings module.
The connection handler reads the first message received from the client, which
is expected to contain a django-sesame token. Then it authenticates the user
with get_user()
, the API for authentication outside a view. If
authentication fails, it closes the connection and exits.
When we call an API that makes a database query such as get_user()
, we
wrap the call in to_thread()
. Indeed, the Django ORM doesn’t
support asynchronous I/O. It would block the event loop if it didn’t run in a
separate thread. to_thread()
is available since Python 3.9. In
earlier versions, use run_in_executor()
instead.
Finally, we start a server with serve()
.
We’re ready to test!
Save this code to a file called authentication.py
, make sure the
DJANGO_SETTINGS_MODULE
environment variable is set properly, and start the
websockets server:
$ python authentication.py
Generate a new token — remember, they’re only valid for 30 seconds — and use it to connect to your server. Paste your token and press Enter when you get a prompt:
$ python -m websockets ws://localhost:8888/
Connected to ws://localhost:8888/
> <your token>
< Hello <your username>!
Connection closed: 1000 (OK).
It works!
If you enter an expired or invalid token, authentication fails and the server closes the connection:
$ python -m websockets ws://localhost:8888/
Connected to ws://localhost:8888.
> not a token
Connection closed: 1011 (unexpected error) authentication failed.
You can also test from a browser by generating a new token and running the following code in the JavaScript console of the browser:
websocket = new WebSocket("ws://localhost:8888/");
websocket.onopen = (event) => websocket.send("<your token>");
websocket.onmessage = (event) => console.log(event.data);
If you don’t want to import your entire Django project into the websockets
server, you can build a separate Django project with django.contrib.auth
,
django-sesame
, a suitable User
model, and a subset of the settings of
the main project.
Stream events#
We can connect and authenticate but our server doesn’t do anything useful yet!
Let’s send a message every time a user makes an action in the admin. This message will be broadcast to all users who can access the model on which the action was made. This may be used for showing notifications to other users.
Many use cases for WebSocket with Django follow a similar pattern.
Set up event bus#
We need a event bus to enable communications between Django and websockets. Both sides connect permanently to the bus. Then Django writes events and websockets reads them. For the sake of simplicity, we’ll rely on Redis Pub/Sub.
The easiest way to add Redis to a Django project is by configuring a cache backend with django-redis. This library manages connections to Redis efficiently, persisting them between requests, and provides an API to access the Redis connection directly.
Install Redis, add django-redis to the dependencies of your Django project, install it, and configure it in the settings of the project:
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379/1",
},
}
If you already have a default cache, add a new one with a different name and
change get_redis_connection("default")
in the code below to the same name.
Publish events#
Now let’s write events to the bus.
Add the following code to a module that is imported when your Django project
starts. Typically, you would put it in a signals.py
module, which you
would import in the AppConfig.ready()
method of one of your apps:
import json
from django.contrib.admin.models import LogEntry
from django.db.models.signals import post_save
from django.dispatch import receiver
from django_redis import get_redis_connection
@receiver(post_save, sender=LogEntry)
def publish_event(instance, **kwargs):
event = {
"model": instance.content_type.name,
"object": instance.object_repr,
"message": instance.get_change_message(),
"timestamp": instance.action_time.isoformat(),
"user": str(instance.user),
"content_type_id": instance.content_type_id,
"object_id": instance.object_id,
}
connection = get_redis_connection("default")
payload = json.dumps(event)
connection.publish("events", payload)
This code runs every time the admin saves a LogEntry
object to keep track
of a change. It extracts interesting data, serializes it to JSON, and writes
an event to Redis.
Let’s check that it works:
$ redis-cli
127.0.0.1:6379> SELECT 1
OK
127.0.0.1:6379[1]> SUBSCRIBE events
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "events"
3) (integer) 1
Leave this command running, start the Django development server and make
changes in the admin: add, modify, or delete objects. You should see
corresponding events published to the "events"
stream.
Broadcast events#
Now let’s turn to reading events and broadcasting them to connected clients. We need to add several features:
Keep track of connected clients so we can broadcast messages.
Tell which content types the user has permission to view or to change.
Connect to the message bus and read events.
Broadcast these events to users who have corresponding permissions.
Here’s a complete implementation.
#!/usr/bin/env python
import asyncio
import json
import aioredis
import django
import websockets
django.setup()
from django.contrib.contenttypes.models import ContentType
from sesame.utils import get_user
CONNECTIONS = {}
def get_content_types(user):
"""Return the set of IDs of content types visible by user."""
# This does only three database queries because Django caches
# all permissions on the first call to user.has_perm(...).
return {
ct.id
for ct in ContentType.objects.all()
if user.has_perm(f"{ct.app_label}.view_{ct.model}")
or user.has_perm(f"{ct.app_label}.change_{ct.model}")
}
async def handler(websocket):
"""Authenticate user and register connection in CONNECTIONS."""
sesame = await websocket.recv()
user = await asyncio.to_thread(get_user, sesame)
if user is None:
await websocket.close(1011, "authentication failed")
return
ct_ids = await asyncio.to_thread(get_content_types, user)
CONNECTIONS[websocket] = {"content_type_ids": ct_ids}
try:
await websocket.wait_closed()
finally:
del CONNECTIONS[websocket]
async def process_events():
"""Listen to events in Redis and process them."""
redis = aioredis.from_url("redis://127.0.0.1:6379/1")
pubsub = redis.pubsub()
await pubsub.subscribe("events")
async for message in pubsub.listen():
if message["type"] != "message":
continue
payload = message["data"].decode()
# Broadcast event to all users who have permissions to see it.
event = json.loads(payload)
recipients = (
websocket
for websocket, connection in CONNECTIONS.items()
if event["content_type_id"] in connection["content_type_ids"]
)
websockets.broadcast(recipients, payload)
async def main():
async with websockets.serve(handler, "localhost", 8888):
await process_events() # runs forever
if __name__ == "__main__":
asyncio.run(main())
Since the get_content_types()
function makes a database query, it is
wrapped inside asyncio.to_thread()
. It runs once when each WebSocket
connection is open; then its result is cached for the lifetime of the
connection. Indeed, running it for each message would trigger database queries
for all connected users at the same time, which would hurt the database.
The connection handler merely registers the connection in a global variable, associated to the list of content types for which events should be sent to that connection, and waits until the client disconnects.
The process_events()
function reads events from Redis and broadcasts them
to all connections that should receive them. We don’t care much if a sending a
notification fails — this happens when a connection drops between the moment
we iterate on connections and the moment the corresponding message is sent —
so we start a task with for each message and forget about it. Also, this means
we’re immediately ready to process the next event, even if it takes time to
send a message to a slow client.
Since Redis can publish a message to multiple subscribers, multiple instances of this server can safely run in parallel.
Does it scale?#
In theory, given enough servers, this design can scale to a hundred million clients, since Redis can handle ten thousand servers and each server can handle ten thousand clients. In practice, you would need a more scalable message bus before reaching that scale, due to the volume of messages.