Using the OpenAI Python SDK End-to-End
A working tour of the OpenAI Python SDK: chat completions, streaming, structured output, embeddings, tool calls, and production-grade error handling.
What you'll learn
- ✓Make chat, streaming, and JSON-mode calls
- ✓Generate embeddings for retrieval
- ✓Wire up function calling with tool schemas
- ✓Handle retries, timeouts, and rate limits
- ✓Stream responses through a FastAPI endpoint
Prerequisites
- •Python 3.10 or newer
- •Read [What is an LLM](/blog/what-is-an-llm)
- •Optional: [LLM Tool Use and Function Calling](/blog/llm-tool-use-and-function-calling)
The OpenAI Python SDK is one file in your code: a client object. The trick is using the right method for the right job and handling failure modes properly.
Setup
pip install openai
export OPENAI_API_KEY=sk-...
from openai import OpenAI
client = OpenAI()
The client reads OPENAI_API_KEY from the environment. Never hardcode keys; load them from os.environ or a secret manager.
A first chat call
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are concise."},
{"role": "user", "content": "Explain HTTP/2 in one sentence."},
],
temperature=0.2,
)
print(resp.choices[0].message.content)
Set temperature=0 for deterministic-ish output on classification and extraction tasks. Use higher values for creative work.
Streaming
Streaming reduces time to first token, which is the perceived latency for chat UIs.
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Count to ten."}],
stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta.content or ""
print(delta, end="", flush=True)
Each chunk is a small delta. Concatenate them server side if you need the full text for logging.
Structured output
When you need JSON, use response_format. The SDK will validate the shape if you pass a Pydantic model via the parse helper.
from pydantic import BaseModel
class Ticket(BaseModel):
title: str
priority: str
tags: list[str]
resp = client.chat.completions.parse(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Login button broken on Safari"}],
response_format=Ticket,
)
ticket = resp.choices[0].message.parsed
print(ticket.title, ticket.priority)
This eliminates the regex-on-LLM-output era. Use it.
Embeddings
Embeddings are vectors you compare with cosine similarity. They are the basis of retrieval.
emb = client.embeddings.create(
model="text-embedding-3-small",
input=["LangChain is a framework", "Pinecone is a vector DB"],
)
vectors = [d.embedding for d in emb.data]
print(len(vectors[0])) # 1536
Batch up to a few hundred inputs per call to amortize latency. For storage and search, see RAG Vector Databases Overview.
Tool calling
You describe functions as JSON schema. The model returns a structured call you execute.
tools = [{
"type": "function",
"function": {
"name": "lookup_order",
"description": "Find an order by ID.",
"parameters": {
"type": "object",
"properties": {"order_id": {"type": "string"}},
"required": ["order_id"],
},
},
}]
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Where is order 88321?"}],
tools=tools,
)
call = resp.choices[0].message.tool_calls[0]
print(call.function.name, call.function.arguments)
Feed the result back as a tool message with the same tool_call_id and call the model again. That second call produces the user-facing answer.
Retries, timeouts, and errors
Networks fail. Rate limits exist. The SDK gives you both built-in.
from openai import OpenAI, RateLimitError, APITimeoutError
client = OpenAI(timeout=20.0, max_retries=3)
try:
client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "hi"}],
)
except RateLimitError:
print("Backoff and retry later.")
except APITimeoutError:
print("Slow upstream.")
For batch workloads, use the Batch API instead of looping requests. It is cheaper and avoids tripping rate limits.
Async client
For high-concurrency servers, use AsyncOpenAI with asyncio. This pairs well with FastAPI; see What is FastAPI.
from openai import AsyncOpenAI
import asyncio
aclient = AsyncOpenAI()
async def ask(q: str) -> str:
r = await aclient.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": q}],
)
return r.choices[0].message.content
print(asyncio.run(ask("What is gRPC?")))
Streaming through FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat")
async def chat(q: str):
async def gen():
stream = await aclient.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": q}],
stream=True,
)
async for chunk in stream:
yield chunk.choices[0].delta.content or ""
return StreamingResponse(gen(), media_type="text/plain")
That is a production-shaped endpoint in under twenty lines.
Wrap up
The OpenAI SDK rewards a small mental model: client, messages, tools, stream. Combine that with Pydantic for structured output and the async client for scale, and you have everything you need to ship.