FastAPI WebSockets Tutorial
Build real-time features with FastAPI WebSockets. Manage connections, broadcast messages, and handle disconnects cleanly.
What you'll learn
- ✓How WebSockets differ from HTTP
- ✓Accepting and closing connections
- ✓Broadcasting to many clients
- ✓Handling disconnects
- ✓Scaling across processes
Prerequisites
- •Comfortable with FastAPI and async Python
What and Why
HTTP is request-response. The client asks, the server answers, the connection closes. WebSockets flip that model: once the handshake completes, both sides can send messages whenever they want over a single long-lived TCP connection. That is what makes them ideal for chat, live dashboards, multiplayer games, and any feature where the server needs to push updates without polling.
FastAPI inherits its WebSocket support from Starlette, so you get a clean, async API with full access to typed routes, dependencies, and middleware.
Mental Model
A WebSocket lifecycle has three phases: handshake, message loop, and close. The handshake looks like an HTTP request with Upgrade: websocket headers. The server accepts, and from that moment on, both sides can call send and receive independently. Closing is symmetric: either side can initiate, and both should handle the disconnect gracefully.
In FastAPI, a WebSocket endpoint is just an async function decorated with @app.websocket. Inside, you await ws.accept(), then loop calling await ws.receive_text() or receive_json(), and finally exit when the client disconnects.
Hands-on Example
A minimal chat room that broadcasts every message to every connected client.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active: list[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.active.append(ws)
def disconnect(self, ws: WebSocket):
if ws in self.active:
self.active.remove(ws)
async def broadcast(self, message: dict):
dead = []
for ws in self.active:
try:
await ws.send_json(message)
except Exception:
dead.append(ws)
for ws in dead:
self.disconnect(ws)
manager = ConnectionManager()
@app.websocket("/ws/chat")
async def chat(ws: WebSocket):
await manager.connect(ws)
try:
while True:
data = await ws.receive_json()
await manager.broadcast({
"user": data.get("user", "anon"),
"text": data.get("text", ""),
})
except WebSocketDisconnect:
manager.disconnect(ws)
A small browser client to test it.
<script>
const ws = new WebSocket("ws://localhost:8000/ws/chat");
ws.onopen = () => ws.send(JSON.stringify({ user: "ada", text: "hi" }));
ws.onmessage = (e) => console.log(JSON.parse(e.data));
</script>
Client -> HTTP Upgrade -> Server accept
|
v
Open connection
| |
v v
send_json receive_json (loop)
| |
v v
Close <- Disconnect / error -> cleanup Common Pitfalls
- Forgetting to handle
WebSocketDisconnect. Without it, every disconnect spams an error in the logs and the connection lingers in your manager list. - Sending to a dead socket. A client closing the tab is not always observed immediately. Wrap
send_jsonin try/except and clean up. - Blocking the event loop. WebSocket handlers run inside the same loop. A sync
time.sleepor heavy CPU work stalls every connection. - Treating one process as the limit. In-memory connection managers do not span workers. If you run Gunicorn with multiple workers, broadcasts only reach clients on the same worker.
- Skipping authentication. WebSocket upgrade requests carry cookies and headers. Authenticate during
acceptand close with a 4001-style code on failure.
Practical Tips
- Authenticate using a dependency. FastAPI allows dependencies on WebSocket endpoints, so you can validate tokens before accepting.
- Use a backplane for multi-worker setups. Redis pub/sub is the simplest: every worker subscribes, broadcasts publish, and the message reaches all workers.
- Send heartbeats. A
pingevery 30 seconds keeps proxies from killing idle connections. - Use JSON only when you need it. For binary data, prefer
send_bytesand use a compact format. - Add backpressure. If a client is slow, drop or batch messages instead of letting the send buffer grow unbounded.
import asyncio, redis.asyncio as redis
r = redis.from_url("redis://localhost")
async def fanout():
pubsub = r.pubsub()
await pubsub.subscribe("chat")
async for msg in pubsub.listen():
if msg["type"] == "message":
await manager.broadcast({"text": msg["data"].decode()})
@app.on_event("startup")
async def start_fanout():
asyncio.create_task(fanout())
Wrap-up
WebSockets unlock the kind of UX users now expect: live updates without refresh. FastAPI makes the basics easy with @app.websocket and an async loop. Build a connection manager, handle disconnects, and reach for Redis pub/sub the moment you scale past one worker. Get those pieces right and you have a foundation for chat, dashboards, and collaborative tools that feel instantaneous.
Related articles
- FastAPI FastAPI Streaming Responses Tutorial
Stream large files, generated text, and Server-Sent Events from FastAPI without loading everything into memory.
- FastAPI FastAPI: Async Routes and Dependency Injection
A practical guide to async path operations and Depends() in FastAPI — when async actually helps, per-request DB sessions, auth dependencies, and how sub-dependencies compose.
- FastAPI FastAPI Authentication with JWT
Implement JWT-based authentication in FastAPI with OAuth2 password flow, secure token signing, and a reusable get_current_user dependency.
- FastAPI FastAPI CORS: A Practical Tutorial
Configure CORS in FastAPI without security holes: how the browser preflight works, which origins and headers to allow, credentials and cookies, and the most common misconfigurations to avoid.