Building Production APIs with FastAPI for Data Services

Expose your data pipelines via REST APIs using FastAPI. Covers async patterns, Pydantic validation, authentication, and deployment strategies.

· projects · 3 minutes

Building Production APIs with FastAPI for Data Services

Data engineers increasingly own the “last mile” of data delivery — serving curated data through APIs. FastAPI has become the go-to framework for this in the Python ecosystem, and for good reason.

Why FastAPI

FastAPI is built on Starlette (async HTTP) and Pydantic (data validation). It’s fast in two senses: runtime performance (async, on par with Node.js for I/O-bound workloads) and development speed (automatic request validation, auto-generated OpenAPI docs, excellent type hint integration).

For data services — endpoints that query BigQuery, read from a cache, or serve ML predictions — FastAPI’s async support means you can handle many concurrent requests without blocking on I/O.

A Data Service Skeleton

from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from google.cloud import bigquery
from typing import Optional
import asyncio
from concurrent.futures import ThreadPoolExecutor
app = FastAPI(title="Analytics API", version="1.0.0")
bq_client = bigquery.Client()
executor = ThreadPoolExecutor(max_workers=4)
class MetricResponse(BaseModel):
metric_name: str
value: float
period: str
segment: Optional[str] = None
class MetricsListResponse(BaseModel):
data: list[MetricResponse]
count: int
def _query_bq(query: str, params: list) -> list[dict]:
"""Synchronous BQ call — runs in thread pool."""
job_config = bigquery.QueryJobConfig(query_parameters=params)
results = bq_client.query(query, job_config=job_config).result()
return [dict(row) for row in results]
@app.get("/metrics/revenue", response_model=MetricsListResponse)
async def get_revenue(
period: str = Query(..., regex="^(daily|weekly|monthly)$"),
segment: Optional[str] = Query(None),
):
query = """
SELECT metric_name, value, period, segment
FROM `my_project.serving.revenue_metrics`
WHERE period = @period
"""
params = [bigquery.ScalarQueryParameter("period", "STRING", period)]
if segment:
query += " AND segment = @segment"
params.append(
bigquery.ScalarQueryParameter("segment", "STRING", segment)
)
loop = asyncio.get_event_loop()
rows = await loop.run_in_executor(executor, _query_bq, query, params)
if not rows:
raise HTTPException(status_code=404, detail="No data found")
return MetricsListResponse(
data=[MetricResponse(**r) for r in rows],
count=len(rows),
)

Key Patterns

Pydantic models for request and response validation. Define your response schema as a Pydantic model. FastAPI validates outgoing data automatically and generates accurate OpenAPI documentation. Consumers of your API get a self-documenting contract.

Async with thread pool for blocking clients. The BigQuery Python client is synchronous. Wrapping it in run_in_executor lets you handle concurrent HTTP requests without blocking the event loop. This is the pragmatic pattern when your downstream clients aren’t async-native.

Parameterized queries always. Never interpolate user input into SQL strings. BigQuery’s QueryJobConfig with ScalarQueryParameter prevents injection and handles type coercion.

Health Checks and Middleware

Production APIs need health endpoints and observability hooks:

from fastapi import Request
import time
import logging
logger = logging.getLogger(__name__)
@app.middleware("http")
async def log_requests(request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
logger.info(
f"{request.method} {request.url.path} "
f"status={response.status_code} duration={duration:.3f}s"
)
return response
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.get("/health/ready")
async def readiness():
# Check downstream dependencies
try:
bq_client.query("SELECT 1").result()
return {"status": "ready"}
except Exception:
raise HTTPException(status_code=503, detail="BigQuery unavailable")

The /health endpoint is for liveness probes (is the process alive?). The /health/ready endpoint is for readiness probes (can it serve traffic?). Kubernetes uses both to manage pod lifecycle.

Containerizing and Deploying

FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app/ ./app/
USER nobody
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]

Deploy to Cloud Run for serverless scaling, or to GKE if you need persistent connections or sidecar containers. Cloud Run is usually simpler for stateless data APIs.

Takeaway: FastAPI gives you a production-grade API framework with minimal boilerplate. Pair it with Pydantic models, async patterns, health checks, and a container, and you have a complete data service ready for Kubernetes or Cloud Run.


More posts