Skip to content
C Codeloom
FastAPI

FastAPI WebSockets Tutorial

Build real-time features with FastAPI WebSockets. Manage connections, broadcast messages, and handle disconnects cleanly.

·4 min read · By Codeloom
Intermediate 10 min read

What you'll learn

  • How WebSockets differ from HTTP
  • Accepting and closing connections
  • Broadcasting to many clients
  • Handling disconnects
  • Scaling across processes

Prerequisites

  • Comfortable with FastAPI and async Python

What and Why

HTTP is request-response. The client asks, the server answers, the connection closes. WebSockets flip that model: once the handshake completes, both sides can send messages whenever they want over a single long-lived TCP connection. That is what makes them ideal for chat, live dashboards, multiplayer games, and any feature where the server needs to push updates without polling.

FastAPI inherits its WebSocket support from Starlette, so you get a clean, async API with full access to typed routes, dependencies, and middleware.

Mental Model

A WebSocket lifecycle has three phases: handshake, message loop, and close. The handshake looks like an HTTP request with Upgrade: websocket headers. The server accepts, and from that moment on, both sides can call send and receive independently. Closing is symmetric: either side can initiate, and both should handle the disconnect gracefully.

In FastAPI, a WebSocket endpoint is just an async function decorated with @app.websocket. Inside, you await ws.accept(), then loop calling await ws.receive_text() or receive_json(), and finally exit when the client disconnects.

Hands-on Example

A minimal chat room that broadcasts every message to every connected client.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active: list[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.active.append(ws)

    def disconnect(self, ws: WebSocket):
        if ws in self.active:
            self.active.remove(ws)

    async def broadcast(self, message: dict):
        dead = []
        for ws in self.active:
            try:
                await ws.send_json(message)
            except Exception:
                dead.append(ws)
        for ws in dead:
            self.disconnect(ws)

manager = ConnectionManager()

@app.websocket("/ws/chat")
async def chat(ws: WebSocket):
    await manager.connect(ws)
    try:
        while True:
            data = await ws.receive_json()
            await manager.broadcast({
                "user": data.get("user", "anon"),
                "text": data.get("text", ""),
            })
    except WebSocketDisconnect:
        manager.disconnect(ws)

A small browser client to test it.

<script>
  const ws = new WebSocket("ws://localhost:8000/ws/chat");
  ws.onopen = () => ws.send(JSON.stringify({ user: "ada", text: "hi" }));
  ws.onmessage = (e) => console.log(JSON.parse(e.data));
</script>
Client -> HTTP Upgrade -> Server accept
                              |
                              v
                        Open connection
                        |             |
                        v             v
                    send_json    receive_json (loop)
                        |             |
                        v             v
                     Close <- Disconnect / error -> cleanup
WebSocket lifecycle

Common Pitfalls

  • Forgetting to handle WebSocketDisconnect. Without it, every disconnect spams an error in the logs and the connection lingers in your manager list.
  • Sending to a dead socket. A client closing the tab is not always observed immediately. Wrap send_json in try/except and clean up.
  • Blocking the event loop. WebSocket handlers run inside the same loop. A sync time.sleep or heavy CPU work stalls every connection.
  • Treating one process as the limit. In-memory connection managers do not span workers. If you run Gunicorn with multiple workers, broadcasts only reach clients on the same worker.
  • Skipping authentication. WebSocket upgrade requests carry cookies and headers. Authenticate during accept and close with a 4001-style code on failure.

Practical Tips

  • Authenticate using a dependency. FastAPI allows dependencies on WebSocket endpoints, so you can validate tokens before accepting.
  • Use a backplane for multi-worker setups. Redis pub/sub is the simplest: every worker subscribes, broadcasts publish, and the message reaches all workers.
  • Send heartbeats. A ping every 30 seconds keeps proxies from killing idle connections.
  • Use JSON only when you need it. For binary data, prefer send_bytes and use a compact format.
  • Add backpressure. If a client is slow, drop or batch messages instead of letting the send buffer grow unbounded.
import asyncio, redis.asyncio as redis

r = redis.from_url("redis://localhost")

async def fanout():
    pubsub = r.pubsub()
    await pubsub.subscribe("chat")
    async for msg in pubsub.listen():
        if msg["type"] == "message":
            await manager.broadcast({"text": msg["data"].decode()})

@app.on_event("startup")
async def start_fanout():
    asyncio.create_task(fanout())

Wrap-up

WebSockets unlock the kind of UX users now expect: live updates without refresh. FastAPI makes the basics easy with @app.websocket and an async loop. Build a connection manager, handle disconnects, and reach for Redis pub/sub the moment you scale past one worker. Get those pieces right and you have a foundation for chat, dashboards, and collaborative tools that feel instantaneous.