Building Production APIs with FastAPI for Data Services
Expose your data pipelines via REST APIs using FastAPI. Covers async patterns, Pydantic validation, authentication, and deployment strategies.
· projects · 3 minutes
Building Production APIs with FastAPI for Data Services
Data engineers increasingly own the “last mile” of data delivery — serving curated data through APIs. FastAPI has become the go-to framework for this in the Python ecosystem, and for good reason.
Why FastAPI
FastAPI is built on Starlette (async HTTP) and Pydantic (data validation). It’s fast in two senses: runtime performance (async, on par with Node.js for I/O-bound workloads) and development speed (automatic request validation, auto-generated OpenAPI docs, excellent type hint integration).
For data services — endpoints that query BigQuery, read from a cache, or serve ML predictions — FastAPI’s async support means you can handle many concurrent requests without blocking on I/O.
A Data Service Skeleton
from fastapi import FastAPI, HTTPException, Queryfrom pydantic import BaseModelfrom google.cloud import bigqueryfrom typing import Optionalimport asynciofrom concurrent.futures import ThreadPoolExecutor
app = FastAPI(title="Analytics API", version="1.0.0")bq_client = bigquery.Client()executor = ThreadPoolExecutor(max_workers=4)
class MetricResponse(BaseModel): metric_name: str value: float period: str segment: Optional[str] = None
class MetricsListResponse(BaseModel): data: list[MetricResponse] count: int
def _query_bq(query: str, params: list) -> list[dict]: """Synchronous BQ call — runs in thread pool.""" job_config = bigquery.QueryJobConfig(query_parameters=params) results = bq_client.query(query, job_config=job_config).result() return [dict(row) for row in results]
@app.get("/metrics/revenue", response_model=MetricsListResponse)async def get_revenue( period: str = Query(..., regex="^(daily|weekly|monthly)$"), segment: Optional[str] = Query(None),): query = """ SELECT metric_name, value, period, segment FROM `my_project.serving.revenue_metrics` WHERE period = @period """ params = [bigquery.ScalarQueryParameter("period", "STRING", period)]
if segment: query += " AND segment = @segment" params.append( bigquery.ScalarQueryParameter("segment", "STRING", segment) )
loop = asyncio.get_event_loop() rows = await loop.run_in_executor(executor, _query_bq, query, params)
if not rows: raise HTTPException(status_code=404, detail="No data found")
return MetricsListResponse( data=[MetricResponse(**r) for r in rows], count=len(rows), )Key Patterns
Pydantic models for request and response validation. Define your response schema as a Pydantic model. FastAPI validates outgoing data automatically and generates accurate OpenAPI documentation. Consumers of your API get a self-documenting contract.
Async with thread pool for blocking clients. The BigQuery Python client is synchronous. Wrapping it in run_in_executor lets you handle concurrent HTTP requests without blocking the event loop. This is the pragmatic pattern when your downstream clients aren’t async-native.
Parameterized queries always. Never interpolate user input into SQL strings. BigQuery’s QueryJobConfig with ScalarQueryParameter prevents injection and handles type coercion.
Health Checks and Middleware
Production APIs need health endpoints and observability hooks:
from fastapi import Requestimport timeimport logging
logger = logging.getLogger(__name__)
@app.middleware("http")async def log_requests(request: Request, call_next): start = time.perf_counter() response = await call_next(request) duration = time.perf_counter() - start logger.info( f"{request.method} {request.url.path} " f"status={response.status_code} duration={duration:.3f}s" ) return response
@app.get("/health")async def health(): return {"status": "healthy"}
@app.get("/health/ready")async def readiness(): # Check downstream dependencies try: bq_client.query("SELECT 1").result() return {"status": "ready"} except Exception: raise HTTPException(status_code=503, detail="BigQuery unavailable")The /health endpoint is for liveness probes (is the process alive?). The /health/ready endpoint is for readiness probes (can it serve traffic?). Kubernetes uses both to manage pod lifecycle.
Containerizing and Deploying
FROM python:3.11-slimWORKDIR /appCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txtCOPY app/ ./app/USER nobodyCMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]Deploy to Cloud Run for serverless scaling, or to GKE if you need persistent connections or sidecar containers. Cloud Run is usually simpler for stateless data APIs.
Takeaway: FastAPI gives you a production-grade API framework with minimal boilerplate. Pair it with Pydantic models, async patterns, health checks, and a container, and you have a complete data service ready for Kubernetes or Cloud Run.
More posts
-
Understanding GCP's Data Storage Spectrum - When to Use What
A practical guide to choosing between Cloud Storage, BigQuery, Bigtable, and Spanner based on your data access patterns and scale requirements.
-
Docker for Data Engineers — Containerizing Python Pipelines
Build reproducible data pipelines with Docker. Covers multi-stage builds, dependency management, and patterns for PySpark and Airflow containers.
-
Apache Airflow on GCP - Patterns for Production DAGs
Production-ready patterns for Cloud Composer including DAG design, error handling, secrets management, and monitoring strategies.