Hardware: any laptop. Python: 3.11+. Docker if you want to containerise (optional).

From Toy to Product

A Python script that calls an LLM is a toy. A FastAPI service that exposes that LLM behind validated endpoints, with auth and rate limiting, is a product. The gap is smaller than people think.

What we'll build in this part:

  • POST /complete — non-streaming completion
  • POST /stream — Server-Sent Events streaming
  • GET /health — liveness check
  • API-key auth via header
  • Per-key rate limiting
  • A Dockerfile that builds in under a minute

Why FastAPI

FrameworkAsyncAuto docsPydantic nativeVerdict
FastAPIyesyes (Swagger + ReDoc)yesdefault for LLM services
Flaskpartialnonofine if you already know it
Djangopartialno (admin yes)nooverkill for an LLM API
http.servernononodo not

LLM calls are I/O-bound. FastAPI's native async support means a single uvicorn worker can have hundreds of in-flight LLM calls without breaking a sweat. That's the whole game for an LLM proxy.

Install

uv add fastapi==0.118.0 uvicorn==0.34.0 slowapi==0.1.9 python-dotenv==1.1.0
uv add httpx==0.28.0  # already added in Part 2 but keep it pinned

The Service: main.py

Save as src/text_tool/server.py. It imports the Part 2 client wrapper, validates requests with Pydantic, supports streaming, and gates everything behind an API key.

"""FastAPI LLM service. Wraps the Part 2 client wrapper."""
from __future__ import annotations

import logging
import os
from contextlib import asynccontextmanager
from typing import AsyncIterator

from fastapi import Depends, FastAPI, HTTPException, Request, Security, status
from fastapi.responses import StreamingResponse
from fastapi.security.api_key import APIKeyHeader
from pydantic import BaseModel, Field
from slowapi import Limiter
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address

from text_tool.llm_client import get_client, session_cost

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")


# ----- Auth -----
API_KEYS = {k.strip() for k in os.environ.get("API_KEYS", "").split(",") if k.strip()}
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)


def get_caller_key(api_key: str | None = Security(api_key_header)) -> str:
    if not api_key or api_key not in API_KEYS:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid api key")
    return api_key


# ----- Rate limiting (per API key) -----
def key_for_limit(request: Request) -> str:
    return request.headers.get("X-API-Key") or get_remote_address(request)


limiter = Limiter(key_func=key_for_limit)


# ----- Lifespan: warm the LLM client -----
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    log.info("starting up; provider=%s", os.environ.get("LLM_PROVIDER", "openai"))
    app.state.client = get_client()
    yield
    log.info("shutting down; %s", session_cost().report())


app = FastAPI(title="text-tool LLM service", version="0.1.0", lifespan=lifespan)
app.state.limiter = limiter


@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
    return StreamingResponse(
        iter([f'{{"error":"rate limit exceeded: {exc.detail}"}}']),
        media_type="application/json",
        status_code=429,
    )


# ----- Schemas -----
class CompleteRequest(BaseModel):
    prompt: str = Field(..., min_length=1, max_length=20000)
    max_tokens: int = Field(512, ge=1, le=4096)
    temperature: float = Field(0.0, ge=0.0, le=2.0)


class CompleteResponse(BaseModel):
    text: str
    model: str
    input_tokens: int
    output_tokens: int
    cost_usd: float


# ----- Endpoints -----
@app.get("/health")
async def health() -> dict[str, str]:
    return {"status": "ok"}


@app.post("/complete", response_model=CompleteResponse)
@limiter.limit("60/minute")
async def complete(request: Request, body: CompleteRequest, _key: str = Depends(get_caller_key)):
    client = request.app.state.client
    result = await run_in_thread(client.complete, body.prompt,
                                  max_tokens=body.max_tokens, temperature=body.temperature)
    return CompleteResponse(
        text=result.text, model=result.model,
        input_tokens=result.input_tokens, output_tokens=result.output_tokens,
        cost_usd=result.cost_usd,
    )


@app.post("/stream")
@limiter.limit("60/minute")
async def stream(request: Request, body: CompleteRequest, _key: str = Depends(get_caller_key)):
    client = request.app.state.client

    async def event_source() -> AsyncIterator[str]:
        # SDK streams are sync iterators; wrap to async to keep the event loop happy.
        for chunk in client.stream(body.prompt,
                                    max_tokens=body.max_tokens,
                                    temperature=body.temperature):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(event_source(), media_type="text/event-stream")


# ----- Helper: run sync SDK in a thread so the event loop stays free -----
import asyncio


async def run_in_thread(fn, /, *args, **kwargs):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, lambda: fn(*args, **kwargs))

Why run_in_thread

The OpenAI and Anthropic Python SDKs have async clients (AsyncOpenAI, AsyncAnthropic). For this series I'm using the sync clients in the wrapper to keep the wrapper interface single. To keep the FastAPI event loop unblocked, the sync call runs in a worker thread via loop.run_in_executor. Once you commit fully to async, swap the wrapper for the async SDK and drop this helper.

Run It

export OPENAI_API_KEY=sk-...
export API_KEYS="dev-key-001,dev-key-002"

uv run uvicorn text_tool.server:app --host 0.0.0.0 --port 8000 --reload

Visit http://localhost:8000/docs for the auto-generated Swagger UI — that's free with FastAPI.

Hit it with curl:

curl -X POST http://localhost:8000/complete \
  -H "X-API-Key: dev-key-001" \
  -H "Content-Type: application/json" \
  -d '{"prompt":"Write one haiku about Nairobi.","max_tokens":80}'

And the streaming endpoint:

curl -N -X POST http://localhost:8000/stream \
  -H "X-API-Key: dev-key-001" \
  -H "Content-Type: application/json" \
  -d '{"prompt":"Tell me a story in 3 sentences.","max_tokens":200}'

The -N flag tells curl not to buffer — you'll see tokens print as they arrive.

Health Checks: Liveness vs Readiness

One pattern that bites every team that skips it:

  • Liveness = "is the process alive?" Always returns 200 unless the process is wedged. Used by Kubernetes / Docker / load balancer to decide whether to kill and restart you
  • Readiness = "can I take traffic?" Returns 503 during warmup or when downstream is dead. Used to decide whether to send requests

The /health above is liveness. A real readiness check would do a tiny LLM call (or check the cached client status). For most LLM services, liveness alone is fine because the LLM provider is the dependency that matters, and you can't usefully probe it from inside a request without paying for it.

Dockerfile (Multi-Stage)

Save as Dockerfile in the repo root:

FROM python:3.11-slim AS build
RUN pip install --no-cache-dir uv==0.5.10
WORKDIR /app
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev
COPY src ./src
RUN uv pip install --no-deps --no-cache-dir ./src

FROM python:3.11-slim
WORKDIR /app
COPY --from=build /app/.venv /app/.venv
COPY --from=build /app/src /app/src
ENV PATH="/app/.venv/bin:$PATH" PYTHONPATH=/app/src
EXPOSE 8000
CMD ["uvicorn", "text_tool.server:app", "--host", "0.0.0.0", "--port", "8000"]

Build and run:

docker build -t text-tool:0.1 .
docker run --rm -p 8000:8000 \
  -e OPENAI_API_KEY=sk-... \
  -e API_KEYS=dev-key-001 \
  text-tool:0.1

One-Liner Deploy

Both Fly.io and Railway accept the same Dockerfile:

# Fly.io
fly launch --image text-tool:0.1 --name my-llm-api

# Railway
railway up

Set OPENAI_API_KEY and API_KEYS as secrets in their dashboards. The endpoint is live in <5 minutes. Don't deploy production traffic to a free tier — but for "give the API to one collaborator", this is enough.

Errors as Clean JSON

FastAPI's defaults emit reasonable JSON for HTTPException. For internal errors you don't want stack traces leaking. Add this:

from fastapi.responses import JSONResponse


@app.exception_handler(Exception)
async def unhandled_exception(request: Request, exc: Exception):
    log.exception("unhandled error")
    return JSONResponse(status_code=500, content={"error": "internal server error"})

Key Takeaways

  • FastAPI is the default for LLM services because async + Pydantic + auto-docs lines up exactly with what an LLM API needs
  • Validate every request body with a Pydantic schema. Bad input never reaches your business logic
  • API-key auth is one Security dependency away. Store keys in env or a secret manager — never in code
  • Rate-limit per API key, not per IP. Per IP punishes shared clients (corporate proxies, classrooms)
  • Streaming uses StreamingResponse + an async generator. The SDK's sync stream wraps fine via run_in_executor
  • Multi-stage Dockerfile keeps the image small. Final stage is just the venv and the source — ~150 MB
  • Use lifespan to construct expensive clients once at startup; reuse via app.state

Next Up

Part 4 adds embeddings and vector search to this service so it can do more than completion. By the end of Part 4 the service has a /search endpoint over a sample corpus, ready for Part 5 to combine with retrieval-augmented generation.

Next: Part 4 — Embeddings and Vector Search