From Prototype to Production: Scaling LangChain and CrewAI Applications in Enterprise Environments

From Prototype to Production: Scaling LangChain and CrewAI Applications in Enterprise Environments

The LangChain prototype that took an afternoon to build will take six months to deploy.

That ratio is wrong, and it’s also predictable. The AI logic is the easy part - the model already exists, the framework wraps it, the demo runs on your laptop. What takes the six months is everything around it: API authentication that survives a security audit, PII redaction that survives a compliance review, observability that survives a 3 AM incident, and a Kubernetes deployment that survives traffic spikes without bankrupting you on token costs.

This post is the infrastructure side of that gap. We’ll cover FastAPI integration, Kubernetes deployment, Prometheus and Grafana observability, secrets management, PII protection, and cost controls - the parts that aren’t in the LangChain or CrewAI documentation because they’re not LangChain or CrewAI’s job. For LangChain Runnable patterns (with_retry(), with_fallbacks(), LCEL composition) see the official docs . For CrewAI’s Crew(...).kickoff_async() see crewai.com/docs .

A note on the success stories you’ve probably read. Klarna’s public case studies report their AI assistant handling roughly two-thirds of customer service chats in its first month, with significant resolution-time improvements - then Bloomberg reported in 2024 that they walked back full automation in favor of a hybrid model. Both outcomes are useful information. Real production AI deployments don’t end with the launch press release; they end with whatever architecture you can actually operate at 3 AM eight months in.

Here’s the architecture we land on, the security patterns that survive audits, and the operational practices that keep the on-call rotation sane.


Why prototypes fail in production #

Your prototype works because the development environment forgives everything: a single user, unlimited retries, manual error handling, no compliance constraints, no rate limits, and a tolerance for 30-second latencies because you’re the only one running it. Production has none of those affordances. The user count goes from one to ten thousand. The error tolerance goes from “I’ll fix it” to a 99.9% uptime SLA - 43 minutes of downtime per month, total. The data goes from sample PDFs to actual PII, financial records, and protected health information. The cost ceiling goes from “whatever it takes” to a budget your CFO has signed off on. And the deployment goes from git push to a multi-region Kubernetes cluster with zero-downtime rolling updates.

The pattern we see most often: a prototype processes a few sample documents flawlessly, the team moves it toward production, and the same code immediately hits four problems in sequence. OpenAI rate-limits the deployment within hours of going live. Compliance blocks the next push because PII is going out to the LLM provider unredacted. Security audits flag API keys living in environment variables. And the first real load test shows 30-second p95 latency that nobody noticed when one developer was the only user.

Most teams rebuild a substantial fraction of the infrastructure before the first real production deployment. The rest of this post is what to put in that rebuild.

The enterprise checklist #

What an enterprise needs before trusting an AI application in production breaks into three buckets: security and compliance (the audit enforces this), operational excellence (the on-call rotation enforces this), and developer experience (nobody enforces this, which is why it gets cut first when timelines slip).

Security and compliance is the non-negotiable pillar - the one that blocks the deployment if it’s missing. Authentication, key rotation, RBAC, encryption in transit and at rest, PII redaction before any external API call, data residency, audit logging, dependency and container scanning, and quarterly penetration testing. Skip any of these and the security review fails on the first review pass.

security_requirements:
  authentication:
    - Multi-factor authentication for admin access
    - API key rotation policies (30-90 days)
    - OAuth 2.0 / SAML integration with corporate identity
    - Role-based access control (RBAC) for different user tiers

  data_protection:
    - End-to-end encryption (in transit and at rest)
    - PII detection and redaction before external API calls
    - Data residency compliance (EU data stays in EU)
    - Audit logging of all data access (GDPR/CCPA)

  compliance_frameworks:
    - SOC 2 Type II audit readiness
    - HIPAA compliance for healthcare data
    - GDPR compliance for European users
    - ISO 27001 information security standards

  vulnerability_management:
    - Dependency scanning (OWASP, Snyk, Dependabot)
    - Container image scanning before deployment
    - Penetration testing quarterly
    - Bug bounty program for critical systems

Operations is the second pillar - alerting, reliability targets, autoscaling, and cost controls. The 99.9% SLA in particular has more constraints than people think: 43 minutes of downtime per month, total, across all causes including the LLM provider’s outages. That number forces decisions about graceful degradation that aren’t obvious in the prototype phase.

operational_requirements:
  monitoring:
    - Real-time alerting (PagerDuty, Opsgenie integration)
    - Custom dashboards for business metrics
    - Distributed tracing for multi-agent workflows
    - Log aggregation (ELK stack, Datadog, Splunk)

  reliability:
    - 99.9% uptime SLA (43 minutes downtime/month maximum)
    - Automated failover between regions
    - Circuit breakers for external API failures
    - Graceful degradation when AI services unavailable

  scalability:
    - Auto-scaling based on request volume
    - Load balancing across multiple instances
    - Caching strategies (Redis, CDN for static assets)
    - Database connection pooling and query optimization

  cost_management:
    - Token usage tracking per user/department
    - Budget alerts before overruns
    - Model selection based on cost/performance trade-offs
    - Request batching to reduce API calls

Developer experience is the third pillar, and the one that gets cut first when timelines slip. It shouldn’t. The teams that operate AI services well in production have one-command deployments, request replay for debugging, A/B testing infrastructure for prompt variations, and runbooks for the failure modes they’ve actually seen. The teams that don’t, page humans for problems that should self-heal.

developer_experience:
  deployment:
    - One-command deployment to staging and production
    - Automated rollback on deployment failures
    - Blue-green or canary deployment strategies
    - Infrastructure as Code (Terraform, Pulumi)

  debugging:
    - Request replay for reproducing issues
    - Detailed error context (not just stack traces)
    - A/B testing framework for prompt variations
    - Integration with developer tools (VS Code extensions)

  documentation:
    - API documentation auto-generated from code (OpenAPI)
    - Runbooks for common operational scenarios
    - Architecture diagrams (Mermaid, Lucidchart)
    - Onboarding guides for new team members

If you’re missing any of these components, you’re not ready for enterprise production. The next sections walk through each one.


Production architecture patterns for LangChain and CrewAI #

Enterprise operations teams need an architecture they can actually deploy, monitor, and maintain at scale. Here’s the pattern we land on.

Core architecture principles #

Before we write any code, let’s establish the non-negotiable principles:

Principle 1: Separation of Concerns

Your AI logic should be completely isolated from your infrastructure concerns:

┌─────────────────────────────────────────────────────────────┐
│                     Load Balancer (AWS ALB)                  │
└─────────────────────┬───────────────────────────────────────┘
                      │
        ┌─────────────┴──────────────┐
        │                            │
┌───────▼────────┐          ┌────────▼───────┐
│  API Gateway   │          │  API Gateway   │
│  (FastAPI)     │          │  (FastAPI)     │
│  • Auth        │          │  • Auth        │
│  • Rate limits │          │  • Rate limits │
└───────┬────────┘          └────────┬───────┘
        │                            │
┌───────▼─────────────────────────────▼────────┐
│         Agent Orchestration Layer             │
│  (LangChain/CrewAI Business Logic)           │
│  • Stateless by design                       │
│  • Environment-agnostic                      │
│  • Testable in isolation                     │
└───────┬──────────────────────────────────────┘
        │
        ├─────────┬──────────┬──────────┬───────────┐
        │         │          │          │           │
┌───────▼──┐ ┌────▼────┐ ┌──▼────┐ ┌───▼────┐ ┌────▼────┐
│  Redis   │ │  Vector │ │  LLM  │ │ Tools  │ │  Queue  │
│  Cache   │ │  Store  │ │  APIs │ │ (MCP)  │ │ (Celery)│
└──────────┘ └─────────┘ └───────┘ └────────┘ └─────────┘

Why this matters: When your LangChain agent logic is entangled with FastAPI routes, Kubernetes configs, and monitoring code, you can’t test anything in isolation. Separate your concerns.

Principle 2: Design for Failure

In production, everything fails. Plan for it:

# app/core/resilience.py
from typing import Optional
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
from circuitbreaker import circuit
from circuitbreaker import CircuitBreakerError
import logging

logger = logging.getLogger(__name__)


class ResilientAIService:
    """
    Wrap AI service calls with production-grade resilience patterns.

    Implements:
    - Exponential backoff retry for transient failures
    - Circuit breaker to prevent cascade failures
    - Timeout enforcement to prevent hanging requests
    - Graceful degradation with fallback responses
    """

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        reraise=True
    )
    @circuit(failure_threshold=5, recovery_timeout=60)
    async def call_llm_with_resilience(
        self,
        prompt: str,
        model: str = "gpt-4o",
        timeout: int = 30,
        fallback_response: Optional[str] = None
    ) -> str:
        """
        Call LLM API with automatic retry, circuit breaking, and fallback.

        Retry policy:
        - 3 attempts with exponential backoff (4s, 8s, 10s)
        - Circuit opens after 5 consecutive failures
        - Circuit recovers after 60 seconds

        Args:
            prompt: Input prompt for LLM
            model: Model identifier (e.g., "gpt-4o", "claude-haiku")
            timeout: Maximum seconds to wait for response
            fallback_response: Response to return if all retries fail

        Returns:
            LLM response or fallback response

        Raises:
            CircuitBreakerError: When circuit is open (too many failures)
            TimeoutError: When request exceeds timeout
        """
        try:
            # Implementation with timeout enforcement
            response = await asyncio.wait_for(
                self._call_llm(prompt, model),
                timeout=timeout
            )
            return response

        except asyncio.TimeoutError:
            logger.error(f"LLM call timeout after {timeout}s for model {model}")
            if fallback_response:
                logger.info(f"Using fallback response: {fallback_response[:100]}...")
                return fallback_response
            raise

        except Exception as e:
            logger.error(f"LLM call failed: {str(e)}", exc_info=True)
            if fallback_response:
                logger.info(f"Using fallback response after error")
                return fallback_response
            raise


async def handle_customer_query(prompt: str) -> str:
    resilient_service = ResilientAIService()

    try:
        return await resilient_service.call_llm_with_resilience(
            prompt=prompt,
            model="gpt-4o",
            timeout=30,
            fallback_response="I'm experiencing high load. Please try again in a moment."
        )
    except CircuitBreakerError:
        # Circuit is open - too many consecutive failures
        # Serve cached response or gracefully degrade
        return get_cached_response() or DEFAULT_RESPONSE

Why this matters: When OpenAI has a 30-second outage, your entire application shouldn’t go down with it. Circuit breakers prevent cascade failures.

Principle 3: Observability from Day One

If you can’t measure it, you can’t improve it:

# app/core/observability.py
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from prometheus_client import Counter, Histogram, Gauge
import structlog
import time

# Structured logging configuration
logger = structlog.get_logger()

# Prometheus metrics for business and technical monitoring
llm_requests_total = Counter(
    'llm_requests_total',
    'Total number of LLM API requests',
    ['model', 'status', 'user_tier']
)

llm_request_duration = Histogram(
    'llm_request_duration_seconds',
    'Duration of LLM requests in seconds',
    ['model', 'operation']
)

llm_tokens_used = Counter(
    'llm_tokens_used_total',
    'Total tokens consumed across all LLM calls',
    ['model', 'user_id', 'operation']
)

llm_cost_dollars = Counter(
    'llm_cost_dollars_total',
    'Estimated cost in dollars for LLM usage',
    ['model', 'department']
)

active_agent_workflows = Gauge(
    'active_agent_workflows',
    'Number of currently running agent workflows',
    ['agent_type', 'priority']
)


class ObservableAIAgent:
    """
    Wrap LangChain/CrewAI agents with comprehensive observability.

    Provides:
    - Distributed tracing (OpenTelemetry + Jaeger)
    - Structured logging (structlog)
    - Business metrics (Prometheus)
    - Performance profiling
    """

    def __init__(self, agent_name: str, agent_type: str):
        self.agent_name = agent_name
        self.agent_type = agent_type
        self.tracer = trace.get_tracer(__name__)

    async def execute_with_observability(
        self,
        task: str,
        user_id: str,
        department: str,
        priority: str = "normal"
    ):
        """
        Execute agent task with full observability.

        Automatically tracks:
        - Request duration and latency percentiles
        - Token usage and estimated costs
        - Success/failure rates by user tier
        - Active concurrent workflows
        """
        start_time = time.time()

        # Increment active workflows gauge
        active_agent_workflows.labels(
            agent_type=self.agent_type,
            priority=priority
        ).inc()

        # Start distributed trace span
        with self.tracer.start_as_current_span(
            f"agent_execution:{self.agent_name}",
            attributes={
                "agent.name": self.agent_name,
                "agent.type": self.agent_type,
                "user.id": user_id,
                "user.department": department,
                "task.priority": priority,
            }
        ) as span:
            try:
                # Structured logging with context
                logger.info(
                    "agent_execution_started",
                    agent_name=self.agent_name,
                    agent_type=self.agent_type,
                    user_id=user_id,
                    task_preview=task[:100],
                    priority=priority
                )

                # Execute actual agent work
                result = await self._execute_agent_task(task)

                # Track success metrics
                duration = time.time() - start_time
                llm_request_duration.labels(
                    model=result['model_used'],
                    operation=self.agent_type
                ).observe(duration)

                llm_requests_total.labels(
                    model=result['model_used'],
                    status='success',
                    user_tier=result.get('user_tier', 'standard')
                ).inc()

                # Track token usage and cost
                tokens = result.get('tokens_used', 0)
                llm_tokens_used.labels(
                    model=result['model_used'],
                    user_id=user_id,
                    operation=self.agent_type
                ).inc(tokens)

                estimated_cost = self._calculate_cost(
                    model=result['model_used'],
                    tokens=tokens
                )
                llm_cost_dollars.labels(
                    model=result['model_used'],
                    department=department
                ).inc(estimated_cost)

                # Add result metadata to span
                span.set_attribute("agent.tokens_used", tokens)
                span.set_attribute("agent.cost_dollars", estimated_cost)
                span.set_attribute("agent.duration_seconds", duration)
                span.set_status(trace.Status(trace.StatusCode.OK))

                logger.info(
                    "agent_execution_completed",
                    agent_name=self.agent_name,
                    duration_seconds=duration,
                    tokens_used=tokens,
                    estimated_cost_dollars=estimated_cost,
                    status="success"
                )

                return result

            except Exception as e:
                # Track failure metrics
                duration = time.time() - start_time
                llm_requests_total.labels(
                    model="unknown",
                    status='error',
                    user_tier='unknown'
                ).inc()

                # Record error in span
                span.record_exception(e)
                span.set_status(trace.Status(trace.StatusCode.ERROR))

                logger.error(
                    "agent_execution_failed",
                    agent_name=self.agent_name,
                    error=str(e),
                    duration_seconds=duration,
                    exc_info=True
                )
                raise

            finally:
                # Always decrement active workflows
                active_agent_workflows.labels(
                    agent_type=self.agent_type,
                    priority=priority
                ).dec()


# FastAPI integration for automatic instrumentation
def setup_observability(app):
    """Configure OpenTelemetry and Prometheus for FastAPI application."""

    # Configure Jaeger exporter for distributed tracing
    jaeger_exporter = JaegerExporter(
        agent_host_name="jaeger",
        agent_port=6831,
    )

    provider = TracerProvider()
    provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
    trace.set_tracer_provider(provider)

    # Auto-instrument FastAPI
    FastAPIInstrumentor.instrument_app(app)

    logger.info("observability_configured", exporters=["jaeger", "prometheus"])

Why this matters: When a customer reports “the AI is slow today,” you need data-not guesses. Observability tells you exactly which model, which prompt, and which infrastructure component is the bottleneck.

Production-ready FastAPI integration #

Now let’s build a FastAPI application that enterprises can actually deploy:

# app/main.py
from fastapi import FastAPI, HTTPException, Depends, Security, BackgroundTasks
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from prometheus_fastapi_instrumentator import Instrumentator
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
import uvicorn
from typing import Optional
import os

from app.core.observability import setup_observability, ObservableAIAgent
from app.core.resilience import ResilientAIService
from app.core.security import verify_api_key, get_current_user
from app.models.requests import AgentRequest, AgentResponse
from app.agents.langchain_agent import LangChainAgentOrchestrator
from app.agents.crewai_agent import CrewAIAgentOrchestrator

# Initialize FastAPI with production configuration
app = FastAPI(
    title="Enterprise AI Agent API",
    description="Production-grade LangChain and CrewAI orchestration",
    version="1.0.0",
    docs_url="/api/docs" if os.getenv("ENV") != "production" else None,  # Disable docs in prod
    redoc_url="/api/redoc" if os.getenv("ENV") != "production" else None,
)

# Security middleware
security = HTTPBearer()

# Rate limiting configuration
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# CORS configuration for enterprise environments
app.add_middleware(
    CORSMiddleware,
    allow_origins=os.getenv("ALLOWED_ORIGINS", "*").split(","),
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["*"],
)

# Compression for large responses
app.add_middleware(GZipMiddleware, minimum_size=1000)

# Setup observability (Prometheus + Jaeger)
setup_observability(app)
Instrumentator().instrument(app).expose(app, endpoint="/metrics")

# Initialize agent orchestrators
langchain_orchestrator = LangChainAgentOrchestrator()
crewai_orchestrator = CrewAIAgentOrchestrator()


@app.get("/health")
async def health_check():
    """
    Health check endpoint for load balancer and monitoring.

    Returns:
        - status: Service health status
        - dependencies: Health of critical dependencies
    """
    health_status = {
        "status": "healthy",
        "dependencies": {
            "redis": await check_redis_health(),
            "vector_store": await check_vector_store_health(),
            "llm_api": await check_llm_api_health(),
        }
    }

    # Return 503 if any critical dependency is down
    if any(status == "unhealthy" for status in health_status["dependencies"].values()):
        health_status["status"] = "degraded"
        raise HTTPException(status_code=503, detail=health_status)

    return health_status


@app.post(
    "/api/v1/agents/langchain/execute",
    response_model=AgentResponse,
    dependencies=[Depends(verify_api_key)]
)
@limiter.limit("100/minute")  # Per-IP rate limiting
async def execute_langchain_agent(
    request: AgentRequest,
    background_tasks: BackgroundTasks,
    credentials: HTTPAuthorizationCredentials = Security(security),
    current_user = Depends(get_current_user)
):
    """
    Execute LangChain agent workflow with enterprise-grade observability.

    Rate Limits:
        - 100 requests per minute per IP
        - Higher limits available for enterprise tier

    Authentication:
        - Bearer token required in Authorization header
        - API key validated against user database

    Args:
        request: Agent execution request with task and configuration
        background_tasks: Background task queue for async operations
        credentials: HTTP bearer token from Authorization header
        current_user: Authenticated user object from JWT/API key

    Returns:
        AgentResponse with execution results, metrics, and trace ID
    """
    observable_agent = ObservableAIAgent(
        agent_name="langchain_executor",
        agent_type="langchain"
    )

    try:
        result = await observable_agent.execute_with_observability(
            task=request.task,
            user_id=current_user.id,
            department=current_user.department,
            priority=request.priority
        )

        # Queue background analytics update
        background_tasks.add_task(
            update_usage_analytics,
            user_id=current_user.id,
            tokens_used=result['tokens_used'],
            cost=result['estimated_cost']
        )

        return AgentResponse(**result)

    except Exception as e:
        logger.error(
            "agent_execution_error",
            agent_type="langchain",
            user_id=current_user.id,
            error=str(e),
            exc_info=True
        )
        raise HTTPException(
            status_code=500,
            detail=f"Agent execution failed: {str(e)}"
        )


@app.post(
    "/api/v1/agents/crewai/execute",
    response_model=AgentResponse,
    dependencies=[Depends(verify_api_key)]
)
@limiter.limit("50/minute")  # Lower limit for more expensive CrewAI workflows
async def execute_crewai_agent(
    request: AgentRequest,
    background_tasks: BackgroundTasks,
    credentials: HTTPAuthorizationCredentials = Security(security),
    current_user = Depends(get_current_user)
):
    """
    Execute CrewAI multi-agent workflow with team coordination.

    Rate Limits:
        - 50 requests per minute per IP (higher token usage than LangChain)

    CrewAI workflows consume more tokens due to multi-agent coordination,
    so rate limits are lower to prevent budget overruns.
    """
    observable_agent = ObservableAIAgent(
        agent_name="crewai_executor",
        agent_type="crewai"
    )

    try:
        result = await observable_agent.execute_with_observability(
            task=request.task,
            user_id=current_user.id,
            department=current_user.department,
            priority=request.priority
        )

        background_tasks.add_task(
            update_usage_analytics,
            user_id=current_user.id,
            tokens_used=result['tokens_used'],
            cost=result['estimated_cost']
        )

        return AgentResponse(**result)

    except Exception as e:
        logger.error(
            "agent_execution_error",
            agent_type="crewai",
            user_id=current_user.id,
            error=str(e),
            exc_info=True
        )
        raise HTTPException(
            status_code=500,
            detail=f"Agent execution failed: {str(e)}"
        )


@app.get("/api/v1/usage/summary")
async def get_usage_summary(current_user = Depends(get_current_user)):
    """
    Retrieve token usage and cost summary for current user.

    Returns:
        - tokens_used_today: Total tokens consumed today
        - cost_today: Estimated cost in dollars
        - remaining_budget: Monthly budget remaining
    """
    summary = await fetch_usage_summary(current_user.id)
    return summary


if __name__ == "__main__":
    uvicorn.run(
        "app.main:app",
        host="0.0.0.0",
        port=8000,
        reload=os.getenv("ENV") == "development",
        workers=int(os.getenv("WORKERS", "4")),
        log_config="logging_config.yaml"
    )

What this gives you:

  • Security: API key authentication, rate limiting, CORS protection
  • Observability: Prometheus metrics, Jaeger tracing, structured logs
  • Reliability: Health checks, graceful degradation, background tasks
  • Scalability: Multi-worker support, compression, efficient routing

This isn’t a prototype anymore-it’s production infrastructure.


Security and compliance framework #

Here are the security patterns that pass enterprise audits.

API authentication and authorization #

Never trust incoming requests. Always verify, always authorize:

# app/core/security.py
from fastapi import HTTPException, Security, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional
import os
import redis
from sqlalchemy.orm import Session

from app.models.database import User, APIKey
from app.core.database import get_db

# Security configuration
SECRET_KEY = os.getenv("JWT_SECRET_KEY")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 30

# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# Redis for token blacklist and rate limiting
redis_client = redis.from_url(os.getenv("REDIS_URL", "redis://localhost:6379"))

security = HTTPBearer()


class AuthService:
    """
    Enterprise authentication service with JWT and API key support.

    Implements:
    - JWT token generation with refresh tokens
    - API key validation with rotation policies
    - Token blacklisting for logout
    - Role-based access control (RBAC)
    """

    @staticmethod
    def verify_password(plain_password: str, hashed_password: str) -> bool:
        """Verify password against bcrypt hash."""
        return pwd_context.verify(plain_password, hashed_password)

    @staticmethod
    def get_password_hash(password: str) -> str:
        """Generate bcrypt hash for password storage."""
        return pwd_context.hash(password)

    @staticmethod
    def create_access_token(
        data: dict,
        expires_delta: Optional[timedelta] = None
    ) -> str:
        """
        Create JWT access token with expiration.

        Args:
            data: Payload dictionary to encode in JWT
            expires_delta: Custom expiration time (default: 30 minutes)

        Returns:
            Encoded JWT token string
        """
        to_encode = data.copy()

        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

        to_encode.update({
            "exp": expire,
            "iat": datetime.utcnow(),
            "type": "access"
        })

        encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
        return encoded_jwt

    @staticmethod
    def create_refresh_token(user_id: str) -> str:
        """
        Create long-lived refresh token for token renewal.

        Refresh tokens have longer expiration (30 days) and can be used
        to generate new access tokens without re-authentication.
        """
        expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)

        to_encode = {
            "sub": user_id,
            "exp": expire,
            "iat": datetime.utcnow(),
            "type": "refresh"
        }

        encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
        return encoded_jwt

    @staticmethod
    async def verify_token(token: str) -> dict:
        """
        Verify JWT token and extract payload.

        Checks:
        - Token signature validity
        - Token expiration
        - Token not in blacklist (for logout)

        Raises:
            HTTPException: If token is invalid, expired, or blacklisted
        """
        try:
            # Check if token is blacklisted (user logged out)
            if redis_client.get(f"blacklist:{token}"):
                raise HTTPException(
                    status_code=401,
                    detail="Token has been revoked"
                )

            # Decode and verify token
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])

            # Verify token type
            if payload.get("type") != "access":
                raise HTTPException(
                    status_code=401,
                    detail="Invalid token type"
                )

            return payload

        except JWTError as e:
            raise HTTPException(
                status_code=401,
                detail=f"Token validation failed: {str(e)}"
            )

    @staticmethod
    async def blacklist_token(token: str, expire_seconds: int):
        """
        Add token to blacklist (for logout functionality).

        Blacklisted tokens are stored in Redis with expiration matching
        the token's original expiration time.
        """
        redis_client.setex(
            f"blacklist:{token}",
            expire_seconds,
            "1"
        )


async def verify_api_key(
    credentials: HTTPAuthorizationCredentials = Security(security),
    db: Session = Depends(get_db)
) -> dict:
    """
    Verify API key from Authorization header.

    Supports two authentication methods:
    1. JWT Bearer tokens (from login)
    2. API keys (for programmatic access)

    Args:
        credentials: HTTP Bearer token from Authorization header
        db: Database session for API key lookup

    Returns:
        User information dictionary

    Raises:
        HTTPException: If authentication fails
    """
    token = credentials.credentials

    # Try JWT token verification first
    try:
        payload = await AuthService.verify_token(token)
        user_id = payload.get("sub")

        if not user_id:
            raise HTTPException(
                status_code=401,
                detail="Invalid token payload"
            )

        user = db.query(User).filter(User.id == user_id).first()

        if not user:
            raise HTTPException(
                status_code=401,
                detail="User not found"
            )

        return {
            "user_id": user.id,
            "email": user.email,
            "department": user.department,
            "tier": user.subscription_tier,
            "roles": user.roles
        }

    except HTTPException:
        # JWT verification failed, try API key
        api_key = db.query(APIKey).filter(
            APIKey.key == token,
            APIKey.is_active == True,
            APIKey.expires_at > datetime.utcnow()
        ).first()

        if not api_key:
            raise HTTPException(
                status_code=401,
                detail="Invalid or expired API key"
            )

        # Check API key rotation policy (warn if > 60 days old)
        key_age = (datetime.utcnow() - api_key.created_at).days
        if key_age > 60:
            logger.warning(
                "api_key_rotation_warning",
                api_key_id=api_key.id,
                age_days=key_age,
                user_id=api_key.user_id
            )

        # Update last_used_at timestamp
        api_key.last_used_at = datetime.utcnow()
        db.commit()

        user = api_key.user

        return {
            "user_id": user.id,
            "email": user.email,
            "department": user.department,
            "tier": user.subscription_tier,
            "roles": user.roles,
            "api_key_id": api_key.id
        }


async def get_current_user(
    auth_data: dict = Depends(verify_api_key),
    db: Session = Depends(get_db)
):
    """
    Retrieve current authenticated user object.

    Use this dependency in route handlers to get full user object
    with all attributes and relationships.
    """
    user = db.query(User).filter(User.id == auth_data["user_id"]).first()

    if not user:
        raise HTTPException(
            status_code=401,
            detail="User not found"
        )

    return user


def require_role(required_role: str):
    """
    Dependency factory for role-based access control (RBAC).

    Usage:
        @app.get("/admin/users")
        async def list_users(current_user = require_role("admin")):
            # Only users with "admin" role can access
            pass
    """
    async def role_checker(current_user = Depends(get_current_user)):
        if required_role not in current_user.roles:
            raise HTTPException(
                status_code=403,
                detail=f"Insufficient permissions. Required role: {required_role}"
            )
        return current_user

    return Depends(role_checker)

Data privacy and PII protection #

Enterprises care deeply about data privacy. Here’s how to handle sensitive data correctly:

# app/core/privacy.py
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from typing import Dict, List, Optional
import hashlib
import re

# Initialize Presidio for PII detection
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()


class PIIProtectionService:
    """
    Detect and redact PII before sending data to external APIs.

    Compliant with:
    - GDPR (European data protection)
    - CCPA (California privacy law)
    - HIPAA (healthcare data)
    - SOC 2 (security controls)
    """

    # Supported PII entity types
    PII_ENTITIES = [
        "PERSON",           # Names
        "EMAIL_ADDRESS",    # Email addresses
        "PHONE_NUMBER",     # Phone numbers
        "CREDIT_CARD",      # Credit card numbers
        "US_SSN",           # Social Security Numbers
        "IBAN_CODE",        # Bank account numbers
        "IP_ADDRESS",       # IP addresses
        "LOCATION",         # Physical addresses
        "DATE_TIME",        # Dates that could identify individuals
        "MEDICAL_LICENSE",  # Healthcare identifiers
        "US_DRIVER_LICENSE", # Driver's license numbers
    ]

    @staticmethod
    def detect_pii(text: str, language: str = "en") -> List[Dict]:
        """
        Detect PII entities in text using Presidio.

        Args:
            text: Input text to scan for PII
            language: Language code (default: English)

        Returns:
            List of detected PII entities with type, location, and score
        """
        results = analyzer.analyze(
            text=text,
            language=language,
            entities=PIIProtectionService.PII_ENTITIES
        )

        return [
            {
                "entity_type": result.entity_type,
                "start": result.start,
                "end": result.end,
                "score": result.score,
                "text": text[result.start:result.end]
            }
            for result in results
        ]

    @staticmethod
    def redact_pii(
        text: str,
        language: str = "en",
        redaction_char: str = "X"
    ) -> Dict[str, any]:
        """
        Redact PII from text before sending to external APIs.

        Example:
            Input: "Contact John Doe at john@example.com or 555-1234"
            Output: "Contact XXXX XXX at XXXXXXXXXXXXXXXXXXXXX or XXXXXXXX"

        Returns:
            - redacted_text: Text with PII replaced by redaction characters
            - entities_found: List of redacted entities
            - original_hash: SHA256 hash for audit trail
        """
        # Detect PII first
        results = analyzer.analyze(
            text=text,
            language=language,
            entities=PIIProtectionService.PII_ENTITIES
        )

        if not results:
            # No PII found
            return {
                "redacted_text": text,
                "entities_found": [],
                "original_hash": hashlib.sha256(text.encode()).hexdigest(),
                "pii_detected": False
            }

        # Anonymize detected PII
        anonymized = anonymizer.anonymize(
            text=text,
            analyzer_results=results,
            operators={"DEFAULT": {"type": "replace", "new_value": redaction_char}}
        )

        return {
            "redacted_text": anonymized.text,
            "entities_found": [
                {
                    "entity_type": result.entity_type,
                    "score": result.score
                }
                for result in results
            ],
            "original_hash": hashlib.sha256(text.encode()).hexdigest(),
            "pii_detected": True
        }

    @staticmethod
    def pseudonymize_pii(text: str, language: str = "en") -> Dict[str, any]:
        """
        Replace PII with fake but realistic values (pseudonymization).

        Useful for:
        - Testing with production-like data
        - Demos without exposing real PII
        - GDPR-compliant analytics

        Example:
            Input: "Contact John Doe at john@example.com"
            Output: "Contact Jane Smith at jane_smith_8473@email.com"
        """
        results = analyzer.analyze(
            text=text,
            language=language,
            entities=PIIProtectionService.PII_ENTITIES
        )

        if not results:
            return {
                "pseudonymized_text": text,
                "mapping": {},
                "pii_detected": False
            }

        # Use Presidio's built-in faker operators
        anonymized = anonymizer.anonymize(
            text=text,
            analyzer_results=results,
            operators={
                "PERSON": {"type": "replace", "new_value": "PERSON_PLACEHOLDER"},
                "EMAIL_ADDRESS": {"type": "replace", "new_value": "email@example.com"},
                "PHONE_NUMBER": {"type": "replace", "new_value": "555-0000"},
                "CREDIT_CARD": {"type": "replace", "new_value": "XXXX-XXXX-XXXX-0000"},
            }
        )

        return {
            "pseudonymized_text": anonymized.text,
            "entities_replaced": len(results),
            "pii_detected": True
        }


# Integration with LangChain agent execution
class PrivacyAwareLangChainAgent:
    """
    Wrap LangChain agents with automatic PII protection.

    Before sending any data to external LLM APIs:
    1. Detect PII in user prompts
    2. Redact or pseudonymize sensitive data
    3. Log PII detection for compliance audits
    4. Process with external API safely
    """

    def __init__(self, langchain_agent, enable_pii_protection: bool = True):
        self.agent = langchain_agent
        self.enable_pii_protection = enable_pii_protection
        self.pii_service = PIIProtectionService()

    async def execute_with_privacy(
        self,
        prompt: str,
        user_id: str,
        redaction_mode: str = "redact"  # "redact" or "pseudonymize"
    ):
        """
        Execute agent with automatic PII protection.

        Args:
            prompt: User input prompt (may contain PII)
            user_id: User identifier for audit trail
            redaction_mode: How to handle PII ("redact" or "pseudonymize")
        """
        if not self.enable_pii_protection:
            # PII protection disabled (only for testing)
            return await self.agent.execute(prompt)

        # Detect and redact PII
        if redaction_mode == "redact":
            protection_result = self.pii_service.redact_pii(prompt)
        else:
            protection_result = self.pii_service.pseudonymize_pii(prompt)

        if protection_result["pii_detected"]:
            # Log PII detection for compliance audit
            logger.warning(
                "pii_detected_and_protected",
                user_id=user_id,
                entities_found=protection_result.get("entities_found", []),
                original_hash=protection_result.get("original_hash"),
                redaction_mode=redaction_mode
            )

        # Execute agent with PII-protected prompt
        result = await self.agent.execute(
            protection_result["redacted_text"] if redaction_mode == "redact"
            else protection_result["pseudonymized_text"]
        )

        return {
            "result": result,
            "pii_protection_applied": protection_result["pii_detected"],
            "entities_protected": protection_result.get("entities_found", [])
        }


# Usage example
agent = PrivacyAwareLangChainAgent(langchain_agent, enable_pii_protection=True)

async def analyze_customer_record():
    response = await agent.execute_with_privacy(
        prompt="Analyze customer record: John Doe, SSN 123-45-6789, email john@example.com",
        user_id="user_12345",
        redaction_mode="redact"
    )
    return response

# Prompt sent to external API:
# "Analyze customer record: XXXX XXX, SSN XXXXXXXXXXX, email XXXXXXXXXXXXXXXXXXXXX"

Why this matters: When your AI application processes customer support tickets, employee records, or financial documents, you’re handling PII. One data breach could cost millions in fines and destroy customer trust. Automated PII protection isn’t optional-it’s mandatory.


Docker and Kubernetes deployment #

Containerize and orchestrate the AI application so it scales predictably under production load.

Production-grade Dockerfile #

# Dockerfile
# Multi-stage build for optimal image size and security

# Stage 1: Builder - Install dependencies and compile extensions
FROM python:3.11-slim AS builder

# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    gcc \
    g++ \
    git \
    && rm -rf /var/lib/apt/lists/*

# Create virtual environment for dependency isolation
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# Copy and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip setuptools wheel && \
    pip install --no-cache-dir -r requirements.txt


# Stage 2: Runtime - Minimal production image
FROM python:3.11-slim

# Install runtime dependencies only (no build tools)
RUN apt-get update && apt-get install -y --no-install-recommends \
    curl \
    ca-certificates \
    && rm -rf /var/lib/apt/lists/*

# Create non-root user for security (never run as root in production)
RUN groupadd -r appuser && useradd -r -g appuser appuser

# Copy Python virtual environment from builder stage
COPY --from=builder /opt/venv /opt/venv

# Set working directory
WORKDIR /app

# Copy application code
COPY --chown=appuser:appuser . /app

# Set environment variables
ENV PATH="/opt/venv/bin:$PATH" \
    PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    PIP_NO_CACHE_DIR=1

# Switch to non-root user
USER appuser

# Expose application port
EXPOSE 8000

# Health check for container orchestration
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Run application with production settings
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Kubernetes deployment configuration #

# kubernetes/deployment.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ai-agent-api
  namespace: production
  labels:
    app: ai-agent-api
    version: v1.0.0
    tier: backend
spec:
  replicas: 3  # Start with 3 pods for high availability
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1        # Allow 1 extra pod during deployment
      maxUnavailable: 0  # Zero downtime deployment
  selector:
    matchLabels:
      app: ai-agent-api
  template:
    metadata:
      labels:
        app: ai-agent-api
        version: v1.0.0
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8000"
        prometheus.io/path: "/metrics"
    spec:
      # Anti-affinity for pod distribution across nodes
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            - weight: 100
              podAffinityTerm:
                labelSelector:
                  matchExpressions:
                    - key: app
                      operator: In
                      values:
                        - ai-agent-api
                topologyKey: kubernetes.io/hostname

      # Service account for RBAC
      serviceAccountName: ai-agent-api

      # Security context (non-root, read-only filesystem)
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 1000

      containers:
        - name: ai-agent-api
          image: your-registry.com/ai-agent-api:1.0.0
          imagePullPolicy: IfNotPresent

          ports:
            - name: http
              containerPort: 8000
              protocol: TCP

          # Environment variables from ConfigMap and Secrets
          env:
            - name: ENV
              value: "production"
            - name: LOG_LEVEL
              value: "INFO"
            - name: WORKERS
              value: "4"

            # Database configuration
            - name: DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: ai-agent-secrets
                  key: database-url

            # Redis configuration
            - name: REDIS_URL
              valueFrom:
                secretKeyRef:
                  name: ai-agent-secrets
                  key: redis-url

            # OpenAI API key
            - name: OPENAI_API_KEY
              valueFrom:
                secretKeyRef:
                  name: ai-agent-secrets
                  key: openai-api-key

            # JWT secret for authentication
            - name: JWT_SECRET_KEY
              valueFrom:
                secretKeyRef:
                  name: ai-agent-secrets
                  key: jwt-secret-key

          # Resource requests and limits (important for cost optimization)
          resources:
            requests:
              memory: "512Mi"
              cpu: "500m"
            limits:
              memory: "2Gi"
              cpu: "2000m"

          # Liveness probe (restart if unhealthy)
          livenessProbe:
            httpGet:
              path: /health
              port: http
            initialDelaySeconds: 30
            periodSeconds: 10
            timeoutSeconds: 5
            successThreshold: 1
            failureThreshold: 3

          # Readiness probe (don't send traffic if not ready)
          readinessProbe:
            httpGet:
              path: /health
              port: http
            initialDelaySeconds: 10
            periodSeconds: 5
            timeoutSeconds: 3
            successThreshold: 1
            failureThreshold: 3

          # Volume mounts (if needed)
          volumeMounts:
            - name: cache
              mountPath: /tmp/cache
            - name: logs
              mountPath: /app/logs

      volumes:
        - name: cache
          emptyDir: {}
        - name: logs
          emptyDir: {}

---
# Horizontal Pod Autoscaler (HPA)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ai-agent-api-hpa
  namespace: production
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ai-agent-api
  minReplicas: 3
  maxReplicas: 20  # Scale up to 20 pods under high load
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70  # Scale when CPU > 70%
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80  # Scale when memory > 80%
    # Custom metric: requests per second
    - type: Pods
      pods:
        metric:
          name: http_requests_per_second
        target:
          type: AverageValue
          averageValue: "100"  # Scale when > 100 req/s per pod

---
# Service for load balancing
apiVersion: v1
kind: Service
metadata:
  name: ai-agent-api
  namespace: production
  labels:
    app: ai-agent-api
spec:
  type: ClusterIP
  ports:
    - port: 80
      targetPort: http
      protocol: TCP
      name: http
  selector:
    app: ai-agent-api

---
# Ingress for external access
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: ai-agent-api
  namespace: production
  annotations:
    kubernetes.io/ingress.class: nginx
    cert-manager.io/cluster-issuer: letsencrypt-prod
    nginx.ingress.kubernetes.io/rate-limit: "100"  # 100 req/s per IP
    nginx.ingress.kubernetes.io/ssl-redirect: "true"
spec:
  tls:
    - hosts:
        - api.yourcompany.com
      secretName: ai-agent-api-tls
  rules:
    - host: api.yourcompany.com
      http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: ai-agent-api
                port:
                  number: 80

Secrets and ConfigMaps #

# kubernetes/externalsecret.yaml
# Store real values in AWS Secrets Manager, HashiCorp Vault, or another backend.
---
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
  name: ai-agent-secrets
  namespace: production
spec:
  refreshInterval: 1h
  secretStoreRef:
    name: production-secrets
    kind: ClusterSecretStore
  target:
    name: ai-agent-secrets
    creationPolicy: Owner
  data:
    - secretKey: database-url
      remoteRef:
        key: ai-agent/database-url
    - secretKey: redis-url
      remoteRef:
        key: ai-agent/redis-url
    - secretKey: openai-api-key
      remoteRef:
        key: ai-agent/openai-api-key
    - secretKey: anthropic-api-key
      remoteRef:
        key: ai-agent/anthropic-api-key
    - secretKey: jwt-secret-key
      remoteRef:
        key: ai-agent/jwt-secret-key

---
# kubernetes/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: ai-agent-config
  namespace: production
data:
  # Application configuration
  APP_NAME: "AI Agent API"
  LOG_LEVEL: "INFO"
  WORKERS: "4"

  # Feature flags
  ENABLE_PII_PROTECTION: "true"
  ENABLE_RATE_LIMITING: "true"
  ENABLE_OBSERVABILITY: "true"

  # Rate limiting configuration
  RATE_LIMIT_REQUESTS: "100"
  RATE_LIMIT_PERIOD: "60"

  # Model configuration
  DEFAULT_LLM_MODEL: "gpt-4o"
  DEFAULT_LLM_TEMPERATURE: "0.7"
  DEFAULT_MAX_TOKENS: "2000"

What this deployment gives you:

  • Zero-downtime deployments: Rolling updates with health checks
  • Auto-scaling: Horizontal scaling based on CPU, memory, and custom metrics
  • High availability: Pod anti-affinity spreads pods across nodes
  • Security: Non-root containers, secrets management, TLS encryption
  • Observability: Prometheus annotations for automatic metric collection

This is production-grade infrastructure that passes enterprise scrutiny.


Observability and monitoring setup #

This is the monitoring stack that tells you exactly what’s happening in production.

Prometheus and Grafana configuration #

# kubernetes/prometheus-config.yaml
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
  namespace: monitoring
data:
  prometheus.yml: |
    global:
      scrape_interval: 15s
      evaluation_interval: 15s
      external_labels:
        cluster: 'production'
        environment: 'prod'

    # Alerting configuration
    alerting:
      alertmanagers:
        - static_configs:
            - targets:
                - alertmanager:9093

    # Load alerting rules
    rule_files:
      - '/etc/prometheus/rules/*.yml'

    # Scrape configurations
    scrape_configs:
      # AI Agent API metrics
      - job_name: 'ai-agent-api'
        kubernetes_sd_configs:
          - role: pod
            namespaces:
              names:
                - production
        relabel_configs:
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
            action: keep
            regex: true
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
            action: replace
            target_label: __metrics_path__
            regex: (.+)
          - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
            action: replace
            regex: ([^:]+)(?::\d+)?;(\d+)
            replacement: $1:$2
            target_label: __address__
          - action: labelmap
            regex: __meta_kubernetes_pod_label_(.+)
          - source_labels: [__meta_kubernetes_namespace]
            action: replace
            target_label: kubernetes_namespace
          - source_labels: [__meta_kubernetes_pod_name]
            action: replace
            target_label: kubernetes_pod_name

      # Node exporter for infrastructure metrics
      - job_name: 'node-exporter'
        kubernetes_sd_configs:
          - role: node
        relabel_configs:
          - source_labels: [__address__]
            regex: '(.*):10250'
            replacement: '${1}:9100'
            target_label: __address__

      # Kubernetes API server metrics
      - job_name: 'kubernetes-apiservers'
        kubernetes_sd_configs:
          - role: endpoints
        scheme: https
        tls_config:
          ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
        bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
        relabel_configs:
          - source_labels: [__meta_kubernetes_namespace, __meta_kubernetes_service_name, __meta_kubernetes_endpoint_port_name]
            action: keep
            regex: default;kubernetes;https

---
# Prometheus alerting rules for AI agents
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-rules
  namespace: monitoring
data:
  ai_agent_alerts.yml: |
    groups:
      - name: ai_agent_alerts
        interval: 30s
        rules:
          # High error rate alert
          - alert: HighLLMErrorRate
            expr: |
              (
                sum(rate(llm_requests_total{status="error"}[5m]))
                /
                sum(rate(llm_requests_total[5m]))
              ) > 0.05
            for: 5m
            labels:
              severity: critical
              component: ai-agent
            annotations:
              summary: "High LLM error rate detected"
              description: "LLM error rate is {{ $value | humanizePercentage }} (threshold: 5%)"

          # High latency alert
          - alert: HighLLMLatency
            expr: |
              histogram_quantile(0.95,
                sum(rate(llm_request_duration_seconds_bucket[5m])) by (le, model)
              ) > 30
            for: 10m
            labels:
              severity: warning
              component: ai-agent
            annotations:
              summary: "High LLM request latency detected"
              description: "95th percentile latency is {{ $value }}s for model {{ $labels.model }}"

          # Budget overrun alert
          - alert: LLMCostBudgetOverrun
            expr: |
              sum(increase(llm_cost_dollars_total[1d])) > 1000
            labels:
              severity: critical
              component: billing
            annotations:
              summary: "Daily LLM cost budget exceeded"
              description: "Daily LLM costs: ${{ $value }} (threshold: $1000)"

          # Token usage spike alert
          - alert: UnusualTokenUsageSpike
            expr: |
              (
                sum(rate(llm_tokens_used_total[5m]))
                /
                sum(rate(llm_tokens_used_total[1h] offset 1d))
              ) > 3
            for: 10m
            labels:
              severity: warning
              component: ai-agent
            annotations:
              summary: "Unusual token usage spike detected"
              description: "Current token usage is {{ $value }}x higher than yesterday"

          # Circuit breaker open alert
          - alert: CircuitBreakerOpen
            expr: |
              circuit_breaker_state{state="open"} == 1
            for: 5m
            labels:
              severity: critical
              component: resilience
            annotations:
              summary: "Circuit breaker opened for {{ $labels.service }}"
              description: "Too many failures detected. Service may be degraded."

          # Low cache hit rate
          - alert: LowCacheHitRate
            expr: |
              (
                sum(rate(cache_hits_total[10m]))
                /
                sum(rate(cache_requests_total[10m]))
              ) < 0.7
            for: 15m
            labels:
              severity: warning
              component: caching
            annotations:
              summary: "Cache hit rate below 70%"
              description: "Current cache hit rate: {{ $value | humanizePercentage }}"

Grafana dashboard configuration #

{
  "dashboard": {
    "title": "AI Agent Production Metrics",
    "tags": ["ai", "langchain", "crewai", "production"],
    "timezone": "browser",
    "panels": [
      {
        "title": "Request Rate (req/s)",
        "type": "graph",
        "targets": [
          {
            "expr": "sum(rate(llm_requests_total[1m])) by (model)"
          }
        ],
        "yAxes": [{"label": "Requests/sec"}]
      },
      {
        "title": "Error Rate (%)",
        "type": "graph",
        "targets": [
          {
            "expr": "(sum(rate(llm_requests_total{status=\"error\"}[5m])) / sum(rate(llm_requests_total[5m]))) * 100"
          }
        ],
        "alert": {
          "conditions": [
            {"evaluator": {"params": [5], "type": "gt"}}
          ],
          "name": "High Error Rate"
        }
      },
      {
        "title": "Latency Percentiles (seconds)",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.50, sum(rate(llm_request_duration_seconds_bucket[5m])) by (le))",
            "legendFormat": "p50"
          },
          {
            "expr": "histogram_quantile(0.95, sum(rate(llm_request_duration_seconds_bucket[5m])) by (le))",
            "legendFormat": "p95"
          },
          {
            "expr": "histogram_quantile(0.99, sum(rate(llm_request_duration_seconds_bucket[5m])) by (le))",
            "legendFormat": "p99"
          }
        ]
      },
      {
        "title": "Token Usage (tokens/min)",
        "type": "graph",
        "targets": [
          {
            "expr": "sum(rate(llm_tokens_used_total[1m])) by (model, operation)"
          }
        ]
      },
      {
        "title": "Cost by Department ($/hour)",
        "type": "graph",
        "targets": [
          {
            "expr": "sum(rate(llm_cost_dollars_total[1h])) by (department) * 3600"
          }
        ]
      },
      {
        "title": "Active Workflows",
        "type": "stat",
        "targets": [
          {
            "expr": "sum(active_agent_workflows)"
          }
        ]
      },
      {
        "title": "Cache Hit Rate (%)",
        "type": "gauge",
        "targets": [
          {
            "expr": "(sum(rate(cache_hits_total[5m])) / sum(rate(cache_requests_total[5m]))) * 100"
          }
        ],
        "thresholds": "50,70,90"
      },
      {
        "title": "Top Users by Token Usage",
        "type": "table",
        "targets": [
          {
            "expr": "topk(10, sum by (user_id) (increase(llm_tokens_used_total[1h])))"
          }
        ]
      }
    ]
  }
}

Distributed tracing with Jaeger #

# app/core/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
import os


def setup_tracing(app, service_name: str = "ai-agent-api"):
    """
    Configure distributed tracing with Jaeger.

    Automatically instruments:
    - FastAPI HTTP endpoints
    - External HTTP requests (OpenAI, Anthropic APIs)
    - Redis cache operations
    - Database queries (SQLAlchemy)

    Traces propagate across service boundaries using W3C Trace Context.
    """
    # Configure Jaeger exporter
    jaeger_exporter = JaegerExporter(
        agent_host_name=os.getenv("JAEGER_AGENT_HOST", "jaeger"),
        agent_port=int(os.getenv("JAEGER_AGENT_PORT", "6831")),
    )

    # Create tracer provider
    provider = TracerProvider(
        resource=Resource.create({"service.name": service_name})
    )

    # Add batch span processor for efficient export
    provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))

    # Set as global tracer provider
    trace.set_tracer_provider(provider)

    # Auto-instrument FastAPI
    FastAPIInstrumentor.instrument_app(app)

    # Auto-instrument external HTTP requests
    RequestsInstrumentor().instrument()

    # Auto-instrument Redis
    RedisInstrumentor().instrument()

    # Auto-instrument SQLAlchemy
    SQLAlchemyInstrumentor().instrument(
        enable_commenter=True,  # Add trace context to SQL comments
        commenter_options={"db_driver": True, "db_framework": True}
    )

    logger.info(
        "distributed_tracing_configured",
        service_name=service_name,
        jaeger_host=os.getenv("JAEGER_AGENT_HOST", "jaeger")
    )


# Example: Manual span creation for custom operations
tracer = trace.get_tracer(__name__)

async def process_complex_workflow(workflow_id: str):
    """Example of custom span creation for workflow tracking."""

    with tracer.start_as_current_span(
        "complex_workflow",
        attributes={"workflow.id": workflow_id}
    ) as workflow_span:

        # Step 1: Load workflow configuration
        with tracer.start_as_current_span("load_workflow_config"):
            config = await load_config(workflow_id)
            workflow_span.set_attribute("workflow.config", str(config))

        # Step 2: Execute agent tasks
        with tracer.start_as_current_span("execute_agent_tasks"):
            results = []
            for i, task in enumerate(config["tasks"]):
                with tracer.start_as_current_span(
                    f"execute_task_{i}",
                    attributes={"task.type": task["type"]}
                ):
                    result = await execute_task(task)
                    results.append(result)

        # Step 3: Aggregate results
        with tracer.start_as_current_span("aggregate_results"):
            final_result = aggregate(results)
            workflow_span.set_attribute("workflow.status", "completed")

        return final_result

What this observability stack gives you:

  • Prometheus: Time-series metrics with alerting
  • Grafana: Visual dashboards for real-time monitoring
  • Jaeger: Distributed tracing across multi-agent workflows
  • Structured logs: Context-rich logging with correlation IDs

When something breaks at 3 AM, you’ll know exactly what, where, and why.


What the prototype-to-production rewrite actually looks like #

The pattern below is what the same code looks like at the prototype stage versus what it looks like after surviving security review, compliance review, and the first three weeks of production load. The differences are mostly not in the LangChain part - they’re in everything around it.

Prototype version (works on a laptop, fails in production) #

# Initial prototype (worked on laptop, failed in production)
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(api_key="sk-...", model="gpt-4o")  # Hardcoded API key (security issue!)
prompt = ChatPromptTemplate.from_template("Analyze this document: {text}")
parser = StrOutputParser()
chain = prompt | model | parser  # LCEL composition

def process_document(document_text):
    return chain.invoke({"text": document_text})  # No error handling, no retries, no logging

# Works fine on the test set. Fails the moment real load hits.

The prototype’s failure modes are the predictable ones. One transient API error fails an entire batch because there’s no retry. Rate limits get hit on the first real load test because there’s no backoff or queueing. Debugging is impossible because there are no structured logs. The hardcoded API key fails the security review on day one. And the moment someone uses the prototype for cost forecasting, the absence of cost tracking turns a $200 weekend bill into a four-figure one.

Production version (what survives the audit) #

# Production architecture with resilience and observability
from app.core.resilience import ResilientAIService
from app.core.observability import ObservableAIAgent
from app.core.security import PIIProtectionService
from app.core.caching import SmartCacheService
from asgiref.sync import async_to_sync
from celery import Celery
import structlog
import os

logger = structlog.get_logger()

# Initialize services
resilient_service = ResilientAIService()
observable_agent = ObservableAIAgent("document_analyzer", "langchain")
pii_service = PIIProtectionService()
cache_service = SmartCacheService(redis_url=os.getenv("REDIS_URL"))

# Celery task queue for async processing
celery_app = Celery('tasks', broker=os.getenv("CELERY_BROKER_URL"))


@celery_app.task(
    bind=True,
    max_retries=3,
    retry_backoff=True,
    retry_jitter=True
)
def process_document_production(self, document_id: str, user_id: str):
    """
    Production document processing with full resilience.

    Features:
    - Automatic retry with exponential backoff
    - PII detection and redaction
    - Smart caching (avoid reprocessing same documents)
    - Comprehensive observability
    - Cost tracking per department
    """
    try:
        # Load document from storage
        document = load_document(document_id)

        # Check cache first (avoid redundant LLM calls)
        cache_key = f"doc_analysis:{document.content_hash}"
        cached_result = cache_service.get(cache_key)

        if cached_result:
            logger.info(
                "document_analysis_cache_hit",
                document_id=document_id,
                cache_key=cache_key
            )
            return cached_result

        # Detect and redact PII before sending to external API
        pii_result = pii_service.redact_pii(document.text)

        if pii_result["pii_detected"]:
            logger.warning(
                "pii_detected_in_document",
                document_id=document_id,
                entities=pii_result["entities_found"]
            )

        # Execute agent with observability
        result = async_to_sync(observable_agent.execute_with_observability)(
            task=f"Analyze financial document: {pii_result['redacted_text'][:500]}...",
            user_id=user_id,
            department=document.department,
            priority="high"
        )

        # Cache result for 24 hours (financial docs change daily)
        cache_service.set(
            cache_key,
            result,
            expire_seconds=86400
        )

        # Store analysis in database
        store_analysis_result(
            document_id=document_id,
            analysis=result,
            tokens_used=result['tokens_used'],
            cost=result['estimated_cost']
        )

        return result

    except Exception as e:
        logger.error(
            "document_processing_failed",
            document_id=document_id,
            error=str(e),
            exc_info=True
        )

        # Retry task with exponential backoff
        raise self.retry(exc=e, countdown=min(2 ** self.request.retries, 300))

The shape of the wins is consistent across the rewrites we do: an order-of-magnitude drop in p95 latency once a content-hash cache lands in front of the LLM (most production document workloads see a cache hit rate around 60-80% within a week of normal use), an order-of-magnitude drop in error rate once retries with exponential backoff and a circuit breaker land between the application and the LLM provider, and a meaningful drop in cost-per-request once a cheaper model handles the simple classifications instead of GPT-4 doing everything. Specific numbers depend on workload, model mix, and current provider pricing - but the direction is reliable.

The latency win in particular is almost entirely about caching. The LLM call itself is the slow part of the pipeline; serving it from a content-hash cache when the same document comes through twice removes the slow part for most requests. Once that’s in place, the remaining latency reduction comes from running the document parsing async instead of blocking, pooling database connections, and replacing manual retries with structured backoff.

Cost optimization is mandatory #

The most expensive failure mode for an early production deployment is the team that doesn’t put a budget alert in place. GPT-4-class models on the wrong workload, no caching, no per-tenant token tracking, no automatic kill switch when daily spend exceeds a threshold - this is the configuration that turns a routine deployment into a four-figure surprise on the first invoice.

The fix is a router that picks the cheapest model that meets the task’s complexity bar:

# app/core/cost_optimization.py
from typing import Optional
import os

class CostOptimizedLLMRouter:
    """
    Route requests to most cost-effective model based on complexity.

    Model selection logic:
    - Simple tasks (classification, extraction): GPT-4o mini
    - Medium tasks (summarization, analysis): Claude 3 Haiku
    - Complex tasks (reasoning, multi-step): GPT-4o

    Estimated savings depend on workload mix and current provider pricing.
    """

    MODEL_COSTS = {
        "gpt-4o":         {"input_per_1m": 2.50, "output_per_1m": 10.00},
        "gpt-4o-mini":    {"input_per_1m": 0.15, "output_per_1m": 0.60},
        "claude-haiku":   {"input_per_1m": 0.25, "output_per_1m": 1.25},
    }

    @staticmethod
    def select_model(
        task_complexity: str,
        max_cost_per_request: float = 0.10
    ) -> str:
        """
        Select most cost-effective model for task complexity.

        Args:
            task_complexity: "simple", "medium", or "complex"
            max_cost_per_request: Budget constraint per request

        Returns:
            Model identifier
        """
        if task_complexity == "simple":
            # Use cheapest model for simple tasks
            return "gpt-4o-mini"

        elif task_complexity == "medium":
            # Use a low-cost model for medium complexity
            return "claude-haiku"

        else:  # complex
            # Use GPT-4o only when necessary
            if max_cost_per_request >= 0.15:
                return "gpt-4o"
            else:
                # Fallback to cheaper model if budget constrained
                logger.warning(
                    "cost_budget_constraint",
                    requested_model="gpt-4o",
                    fallback_model="gpt-4o-mini",
                    max_cost=max_cost_per_request
                )
                return "gpt-4o-mini"

    @staticmethod
    def estimate_cost(
        model: str,
        prompt_tokens: int,
        max_completion_tokens: int
    ) -> float:
        """Estimate total cost for request before execution."""
        pricing = CostOptimizedLLMRouter.MODEL_COSTS.get(
            model, {"input_per_1m": 2.50, "output_per_1m": 10.00}
        )
        input_cost = (prompt_tokens / 1_000_000) * pricing["input_per_1m"]
        output_cost = (max_completion_tokens / 1_000_000) * pricing["output_per_1m"]
        return input_cost + output_cost

    @staticmethod
    async def execute_with_budget_control(
        task: str,
        user_department: str,
        monthly_budget: float
    ):
        """
        Execute task with automatic budget enforcement.

        Prevents budget overruns by:
        - Checking department's monthly spend before execution
        - Estimating cost before making LLM call
        - Blocking requests if budget exceeded
        """
        # Check current month's spend for department
        current_spend = await get_monthly_spend(user_department)

        if current_spend >= monthly_budget:
            raise BudgetExceededError(
                f"Department {user_department} has exceeded monthly budget: "
                f"${current_spend:.2f} / ${monthly_budget:.2f}"
            )

        # Estimate cost for this request
        estimated_cost = CostOptimizedLLMRouter.estimate_cost(
            model="gpt-4o",
            prompt_tokens=len(task.split()) * 1.3,  # Rough estimate
            max_completion_tokens=1000
        )

        # Check if request would exceed budget
        if (current_spend + estimated_cost) > monthly_budget:
            logger.warning(
                "request_would_exceed_budget",
                department=user_department,
                current_spend=current_spend,
                estimated_cost=estimated_cost,
                monthly_budget=monthly_budget
            )

            # Offer cheaper alternative
            cheaper_model = "gpt-4o-mini"
            cheaper_cost = CostOptimizedLLMRouter.estimate_cost(
                model=cheaper_model,
                prompt_tokens=len(task.split()) * 1.3,
                max_completion_tokens=1000
            )

            if (current_spend + cheaper_cost) <= monthly_budget:
                logger.info(
                    "using_cheaper_model_alternative",
                    original_model="gpt-4o",
                    alternative_model=cheaper_model,
                    cost_savings=estimated_cost - cheaper_cost
                )
                model = cheaper_model
            else:
                raise BudgetExceededError("No available model fits within budget")

        # Execute with selected model
        # ... (actual execution logic)


# Usage in production
router = CostOptimizedLLMRouter()

# Automatic model selection based on complexity
model = router.select_model(
    task_complexity="simple",  # Classification task
    max_cost_per_request=0.05
)
# Returns: "gpt-4o-mini" (cheapest option)

# Budget-controlled execution
async def analyze_with_budget_control():
    await router.execute_with_budget_control(
        task="Analyze document...",
        user_department="finance",
        monthly_budget=5000.00  # $5K monthly cap
    )

A router like this typically takes a workload’s monthly LLM spend down by half to three-quarters once it’s tuned, with the bigger win on workloads where most requests are simple classification or extraction tasks that don’t need a frontier model.

Caching is the second mandatory pattern #

The second thing every production AI workload needs is content-hash caching. The same document, the same prompt, the same result - and yet without a cache, every request hits the LLM again. Most document analysis workloads deduplicate to a 60-80% cache hit rate within a week, because users re-upload the same files, multiple users in the same org analyze the same source documents, and the same prompts get reused across sessions. Each cache hit is one fewer paid LLM call.

A simple Redis content-hash cache:

# app/core/caching.py
import hashlib
import pickle
from typing import Optional, Any
import redis
from functools import wraps

class SmartCacheService:
    """
    Intelligent caching for LLM responses with content-based deduplication.

    Caching strategies:
    1. Content-based hashing (same content = same cache key)
    2. TTL-based expiration (financial data: 24h, news: 1h)
    3. Semantic similarity caching (similar prompts use cached results)
    4. Multi-tier caching (Redis for hot data, S3 for cold data)
    """

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    @staticmethod
    def generate_cache_key(content: str, operation: str) -> str:
        """
        Generate deterministic cache key from content hash.

        Same content always produces same key, enabling deduplication.
        """
        content_hash = hashlib.sha256(content.encode()).hexdigest()
        return f"cache:{operation}:{content_hash[:16]}"

    def get(self, key: str) -> Optional[Any]:
        """Retrieve cached value if exists."""
        cached = self.redis.get(key)

        if cached:
            logger.info("cache_hit", cache_key=key)
            return pickle.loads(cached)

        logger.info("cache_miss", cache_key=key)
        return None

    def set(
        self,
        key: str,
        value: Any,
        expire_seconds: int = 86400  # 24 hours default
    ):
        """Store value in cache with TTL."""
        serialized = pickle.dumps(value)
        self.redis.setex(key, expire_seconds, serialized)

        logger.info(
            "cache_set",
            cache_key=key,
            expire_seconds=expire_seconds,
            size_bytes=len(serialized)
        )

    def smart_cache(
        self,
        operation: str,
        expire_seconds: int = 86400
    ):
        """
        Decorator for automatic caching of expensive operations.

        Usage:
            @cache_service.smart_cache("document_analysis", expire_seconds=3600)
            async def analyze_document(document_text: str):
                # Expensive LLM call here
                return result
        """
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                # Generate cache key from function arguments
                cache_key_data = f"{func.__name__}:{args}:{kwargs}"
                cache_key = SmartCacheService.generate_cache_key(
                    cache_key_data,
                    operation
                )

                # Try to get from cache
                cached_result = self.get(cache_key)
                if cached_result:
                    return cached_result

                # Cache miss - execute function
                result = await func(*args, **kwargs)

                # Store in cache
                self.set(cache_key, result, expire_seconds)

                return result

            return wrapper
        return decorator


# Usage example
cache_service = SmartCacheService(redis_client)

@cache_service.smart_cache("document_analysis", expire_seconds=86400)
async def analyze_document_with_caching(document_text: str):
    """
    Analyze document with automatic caching.

    If same document analyzed before (within 24h), returns cached result.
    Avoids redundant LLM API calls.
    """
    result = await langchain_agent.analyze(document_text)
    return result


# Production usage
async def analyze_uploaded_document(document):
    result = await analyze_document_with_caching(document.text)
    return result

# First call: Cache miss → LLM API call → Store result
# Second call (same document): Cache hit → Instant response

A cache layer this simple turns most of the cost-control story into a one-time investment. Set a reasonable TTL based on how often the source documents actually change, hash on content not filename, and invalidate by document version when the upstream data updates.

Observability is the third mandatory pattern #

The third pattern is observability, and it’s the one teams skip until the first 3 AM incident teaches them why they shouldn’t have. Without structured logging, distributed tracing, and per-request token tracking, an LLM error is just “something went wrong somewhere in a chain of three model calls and four tools.” With those three things in place, the same error tells you which step failed, which model was responsible, what the prompt was, and what it cost.

The minimum stack is structured logs (grep by request ID), distributed tracing (find the bottleneck step in a multi-agent chain), and a cost meter that tracks tokens per request, per user, and per department. Here’s the composition:

# app/core/incident_response.py
import structlog
from opentelemetry import trace

logger = structlog.get_logger()
tracer = trace.get_tracer(__name__)


class IncidentResponseToolkit:
    """
    Tools for rapid incident response and debugging.

    When production breaks, you need answers in seconds, not hours.
    """

    @staticmethod
    async def debug_slow_request(request_id: str):
        """
        Analyze why specific request was slow.

        Provides:
        - Full distributed trace with timing breakdowns
        - LLM model used and token counts
        - Cache hit/miss information
        - Database query performance
        - External API call latencies
        """
        # Query Jaeger for distributed trace
        trace_data = await fetch_trace(request_id)

        analysis = {
            "request_id": request_id,
            "total_duration_ms": trace_data["duration"],
            "bottlenecks": []
        }

        # Identify bottlenecks
        for span in trace_data["spans"]:
            if span["duration"] > 1000:  # > 1 second
                analysis["bottlenecks"].append({
                    "operation": span["operation_name"],
                    "duration_ms": span["duration"],
                    "percentage_of_total": (span["duration"] / trace_data["duration"]) * 100,
                    "attributes": span["tags"]
                })

        # Sort by duration
        analysis["bottlenecks"].sort(key=lambda x: x["duration_ms"], reverse=True)

        return analysis

    @staticmethod
    async def debug_high_error_rate(time_range: str = "5m"):
        """
        Analyze why error rate is high.

        Returns:
        - Top error types and frequencies
        - Affected models and operations
        - Correlation with external API status
        - Recent deployments (potential cause)
        """
        # Query Prometheus for error metrics
        error_metrics = await query_prometheus(
            f'sum by (error_type, model) (rate(llm_requests_total{{status="error"}}[{time_range}]))'
        )

        analysis = {
            "time_range": time_range,
            "total_errors": sum(m["value"] for m in error_metrics),
            "error_breakdown": []
        }

        for metric in error_metrics:
            analysis["error_breakdown"].append({
                "error_type": metric["labels"]["error_type"],
                "model": metric["labels"]["model"],
                "errors_per_second": metric["value"],
                "sample_logs": await fetch_error_logs(
                    error_type=metric["labels"]["error_type"],
                    limit=3
                )
            })

        return analysis

    @staticmethod
    async def debug_budget_overrun(department: str):
        """
        Analyze why department exceeded budget.

        Identifies:
        - Top users by token consumption
        - Most expensive operations
        - Model usage breakdown
        - Cost trends over time
        """
        usage_data = await query_prometheus(
            f'sum by (user_id, operation, model) (increase(llm_cost_dollars_total{{department="{department}"}}[7d]))'
        )

        analysis = {
            "department": department,
            "total_spend_7d": sum(m["value"] for m in usage_data),
            "top_users": [],
            "top_operations": [],
            "model_breakdown": []
        }

        # Aggregate by user
        user_costs = {}
        for metric in usage_data:
            user_id = metric["labels"]["user_id"]
            user_costs[user_id] = user_costs.get(user_id, 0) + metric["value"]

        analysis["top_users"] = [
            {"user_id": k, "spend_7d": v}
            for k, v in sorted(user_costs.items(), key=lambda x: x[1], reverse=True)[:10]
        ]

        return analysis


# Usage during production incident
incident_toolkit = IncidentResponseToolkit()

async def inspect_slow_request(request_id: str):
    debug_info = await incident_toolkit.debug_slow_request(request_id)

    print(f"Total duration: {debug_info['total_duration_ms']}ms")
    print(f"Top bottleneck: {debug_info['bottlenecks'][0]['operation']} ({debug_info['bottlenecks'][0]['duration_ms']}ms)")

# Output:
# Total duration: 12,450ms
# Top bottleneck: call_openai_api (11,200ms - 90% of request time)
# → Root cause identified: OpenAI API latency spike

An observability stack this complete pays for itself the first time an LLM provider has a regional outage at 3 AM. The MTTR difference between “tail logs and guess” and “open a trace, see the slow span, ack the alert” is the difference between a one-hour incident and a five-minute one.

Production deployment checklist #

Before we deploy to production, we validate every item on this checklist:

Security & Compliance:

  • All secrets stored in AWS Secrets Manager (never in environment variables)
  • API authentication enforced on all endpoints
  • Rate limiting configured per user tier
  • PII detection and redaction enabled
  • Audit logging for all data access
  • TLS 1.3 encryption for all traffic
  • Dependency vulnerability scanning passed (Snyk, OWASP)
  • Penetration testing completed (external security firm)
  • SOC 2 compliance audit documentation prepared

Reliability & Resilience:

  • Circuit breakers configured for external APIs
  • Retry logic with exponential backoff implemented
  • Graceful degradation patterns for API failures
  • Health check endpoints respond correctly
  • Liveness and readiness probes configured
  • Horizontal pod autoscaling configured (HPA)
  • Multi-region failover tested
  • Disaster recovery runbook documented

Observability & Monitoring:

  • Prometheus metrics exposed at /metrics endpoint
  • Grafana dashboards created for all key metrics
  • Jaeger distributed tracing configured
  • Structured logging with correlation IDs
  • PagerDuty alerts configured for critical issues
  • Runbooks created for common incidents
  • Log retention policy configured (90 days minimum)
  • Cost tracking dashboards created

Performance & Cost:

  • Load testing completed (2x expected peak load)
  • Caching strategy implemented (Redis)
  • Database connection pooling configured
  • Prompt optimization completed (reduce tokens by 50%+)
  • Model selection logic for cost optimization
  • Budget alerts configured per department
  • Token usage monitoring and anomaly detection
  • CDN configured for static assets

Deployment & Operations:

  • Infrastructure as Code (Terraform) validated
  • CI/CD pipeline configured with automated tests
  • Blue-green or canary deployment strategy
  • Automated rollback on health check failures
  • Database migration scripts tested
  • Backup and restore procedures tested
  • Incident response procedures documented
  • On-call rotation schedule established

Documentation & Training:

  • API documentation published (OpenAPI/Swagger)
  • Architecture diagrams created (Mermaid, Lucidchart)
  • Operational runbooks completed
  • Developer onboarding guide written
  • Customer-facing documentation published
  • Training sessions completed for support team
  • Post-mortem process documented

Each item exists because we’ve watched a deployment fail without it.


A 90-day path from prototype to production #

If your prototype works and you’re staring at the gap between that and a real deployment, here’s the order we’d do it in. Not because the order is sacred - because doing security and cost controls last is the failure mode we see most often.

The first month is the foundation: API authentication and authorization, retries with exponential backoff and a circuit breaker, PII detection and redaction (the Presidio library is a reasonable starting point), and secrets management out of environment variables and into AWS Secrets Manager or HashiCorp Vault. This block is what survives the security review.

The second month is observability and cost control: Prometheus and Grafana for metrics, Jaeger for distributed tracing across multi-step agent chains, content-hash caching with Redis to deduplicate identical requests, and per-department token tracking with budget alerts that fire before the invoice does. This block is what keeps the on-call rotation sane.

The third month is deployment and scale: a production Dockerfile, Kubernetes with horizontal pod autoscaling, a CI/CD pipeline with automated rollback, and a load test that pushes past the steady-state target before you ship. This block is what handles a traffic spike without melting.

After launch, the recurring work is post-mortem reviews on incidents, monthly cost analysis with model-selection adjustments, quarterly security audits, and an annual architecture review. None of it’s glamorous; all of it compounds.

Further reading #

Official documentation worth bookmarking: LangChain , LangGraph , CrewAI , FastAPI , and the Kubernetes production patterns guide .

For the security and observability layers specifically: OWASP API Security Top 10 , Presidio for PII detection , Prometheus monitoring , Grafana dashboards , and OpenTelemetry tracing .

Related posts on this blog: LangChain Architecture: Production-Ready AI Agent Systems covers resilient chain composition and safety patterns. CrewAI Multi-Agent Systems Orchestration covers agent collaboration. Cost Optimization for LLM Applications goes deeper on token-management strategy.


If you’re working on this and want a second pair of eyes - whether on a security review, an architecture decision, or a cost-control problem that won’t sit still - we work on production AI deployments at JetThoughts . We’re not going to claim a list of compliance certifications we don’t hold. What we do is run code through the same audit-and-monitoring loop described above, and we’re happy to do a 45-minute review for free if you want a written second opinion before you ship.

Keywords: langchain production, crewai enterprise, scaling ai applications, langchain deployment, crewai kubernetes, ai agent architecture, production ai systems, enterprise ai deployment, langchain security, crewai monitoring, ai observability, fastapi langchain, docker kubernetes ai, production machine learning