Why AI Production Monitoring is Critical

Deploying AI systems to production introduces unique challenges that traditional application monitoring doesn't address. Unlike deterministic software, AI models can hallucinate, degrade over time, consume unpredictable resources, and fail in subtle ways that only become apparent through careful observation.

The stakes are high:

  • A hallucinated response in a customer-facing chatbot damages trust
  • Unmonitored token usage can burn through budgets in hours
  • Gradual quality degradation goes unnoticed until users complain
  • Security vulnerabilities like prompt injection slip through without detection
  • Compliance violations occur when PII leakage isn't tracked

Production AI monitoring isn't optional—it's the foundation of reliable, cost-effective, and trustworthy AI applications.

Key Metrics to Track

Performance Metrics

Latency: Response time from request to completion. Critical for user experience.

  • Time to first token (TTFT): How long before the first response appears
  • Tokens per second: Throughput during streaming
  • Total request duration: End-to-end timing including preprocessing
import time
from openai import OpenAI

client = OpenAI()

def track_latency(prompt: str):
    start_time = time.time()
    first_token_time = None
    tokens_received = 0

    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

    for chunk in response:
        if first_token_time is None:
            first_token_time = time.time()
        tokens_received += 1

    total_time = time.time() - start_time
    ttft = first_token_time - start_time if first_token_time else total_time

    return {
        "ttft_ms": ttft * 1000,
        "total_duration_ms": total_time * 1000,
        "tokens_per_second": tokens_received / total_time if total_time > 0 else 0
    }

Error Rates: Track failures by type and cause.

  • API errors: Rate limiting, timeouts, service unavailability
  • Validation errors: Malformed outputs, schema violations
  • Semantic errors: Responses that don't answer the question

Token Usage and Cost Metrics

Every AI request has a direct cost. Track usage granularly:

from dataclasses import dataclass
from typing import Optional

@dataclass
class TokenMetrics:
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int
    model: str
    cost_usd: float
    user_id: Optional[str] = None
    endpoint: Optional[str] = None

    @classmethod
    def from_response(cls, response, user_id: str, endpoint: str):
        usage = response.usage

        # Pricing as of 2025 (update regularly)
        pricing = {
            "gpt-4": {"input": 0.03 / 1000, "output": 0.06 / 1000},
            "gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
            "gpt-3.5-turbo": {"input": 0.0005 / 1000, "output": 0.0015 / 1000}
        }

        model = response.model
        prices = pricing.get(model, {"input": 0, "output": 0})

        cost = (usage.prompt_tokens * prices["input"] +
                usage.completion_tokens * prices["output"])

        return cls(
            prompt_tokens=usage.prompt_tokens,
            completion_tokens=usage.completion_tokens,
            total_tokens=usage.total_tokens,
            model=model,
            cost_usd=cost,
            user_id=user_id,
            endpoint=endpoint
        )

Critical cost metrics:

  • Cost per request (average and P95)
  • Cost per user per day
  • Cost by endpoint/feature
  • Token efficiency (output tokens / input tokens)
  • Cache hit rate (if using caching)

Quality Metrics

Quality monitoring is where AI monitoring diverges most from traditional software:

  • Semantic similarity: Compare responses to known good answers
  • User satisfaction: Track thumbs up/down, explicit feedback
  • Task success rate: Did the AI accomplish the intended goal?
  • Coherence scores: Automated evaluation of response quality
  • Hallucination detection: Flag factually incorrect or unsupported claims

Logging Best Practices

Comprehensive Request Logging

Log everything you need to debug, reproduce, and improve:

import json
import logging
from datetime import datetime
from typing import Any, Dict

class AIRequestLogger:
    def __init__(self, logger: logging.Logger):
        self.logger = logger

    def log_request(
        self,
        request_id: str,
        user_id: str,
        prompt: str,
        response: str,
        model: str,
        metrics: TokenMetrics,
        metadata: Dict[str, Any]
    ):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "request_id": request_id,
            "user_id": user_id,
            "model": model,
            "prompt": prompt,
            "response": response,
            "prompt_tokens": metrics.prompt_tokens,
            "completion_tokens": metrics.completion_tokens,
            "cost_usd": metrics.cost_usd,
            "latency_ms": metadata.get("latency_ms"),
            "endpoint": metadata.get("endpoint"),
            "version": metadata.get("model_version"),
            "error": metadata.get("error"),
            "user_feedback": metadata.get("feedback")
        }

        # Use structured logging (JSON)
        self.logger.info(json.dumps(log_entry))

        # Also send to analytics platform
        self._send_to_analytics(log_entry)

    def _send_to_analytics(self, log_entry: Dict[str, Any]):
        # Integration with your monitoring platform
        # (e.g., PostHog, Mixpanel, custom time-series DB)
        pass

What to log:

  • Full prompts (sanitize PII first)
  • Complete responses
  • All metadata (model, version, temperature, etc.)
  • System prompts and few-shot examples
  • User context (session ID, user tier, A/B test variant)
  • Timing breakdowns (preprocessing, API call, postprocessing)

Privacy considerations:

  • Implement PII detection and redaction
  • Use separate retention policies for production logs vs. analytics
  • Ensure GDPR/CCPA compliance with user data deletion
  • Consider prompt anonymization for training data

User Feedback Logging

Capture explicit and implicit feedback:

class FeedbackTracker:
    def record_feedback(
        self,
        request_id: str,
        feedback_type: str,  # "thumbs_up", "thumbs_down", "report"
        feedback_text: Optional[str] = None,
        feedback_category: Optional[str] = None  # "incorrect", "harmful", "unhelpful"
    ):
        feedback_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "request_id": request_id,
            "feedback_type": feedback_type,
            "feedback_text": feedback_text,
            "feedback_category": feedback_category
        }

        # Join with original request for analysis
        self._correlate_with_request(request_id, feedback_entry)

    def record_implicit_feedback(
        self,
        request_id: str,
        user_action: str  # "copied_response", "regenerated", "edited_output"
    ):
        # Implicit signals about quality
        pass

Real-Time Alerting

Setting Intelligent Thresholds

AI systems need dynamic thresholds that account for natural variation:

from typing import List
import numpy as np

class AdaptiveAlertingSystem:
    def __init__(self, metric_name: str, window_minutes: int = 60):
        self.metric_name = metric_name
        self.window_minutes = window_minutes
        self.baseline_mean = None
        self.baseline_std = None

    def update_baseline(self, historical_values: List[float]):
        """Calculate baseline from historical data"""
        self.baseline_mean = np.mean(historical_values)
        self.baseline_std = np.std(historical_values)

    def check_anomaly(self, current_value: float, sensitivity: float = 3.0) -> bool:
        """Return True if value is anomalous (sensitivity = # of std devs)"""
        if self.baseline_mean is None:
            return False

        z_score = abs((current_value - self.baseline_mean) / self.baseline_std)
        return z_score > sensitivity

    def trigger_alert(self, current_value: float, context: Dict[str, Any]):
        alert = {
            "metric": self.metric_name,
            "current_value": current_value,
            "baseline_mean": self.baseline_mean,
            "deviation": abs(current_value - self.baseline_mean),
            "timestamp": datetime.utcnow().isoformat(),
            "context": context
        }

        # Send to alerting system (PagerDuty, Opsgenie, etc.)
        self._send_alert(alert)

Metrics that should trigger alerts:

  • Latency P95 > 2x baseline
  • Error rate > 5% over 5-minute window
  • Cost per hour > 150% of daily average
  • User feedback thumbs down rate > 20%
  • Zero responses in last 5 minutes (complete failure)
  • Cache miss rate drops suddenly (cache failure)

Avoiding Alert Fatigue

Alert prioritization:

  • P0 (page immediately): Service down, cost runaway, security breach
  • P1 (urgent): High error rates, major quality degradation
  • P2 (important): Moderate latency increase, elevated costs
  • P3 (monitoring): Minor anomalies, trending issues

Escalation policies:

# Example escalation configuration
escalation_policies:
  - name: "AI Production Incidents"
    stages:
      - delay_minutes: 0
        notify: ["on-call-engineer"]
      - delay_minutes: 15
        notify: ["on-call-engineer", "team-lead"]
      - delay_minutes: 30
        notify: ["on-call-engineer", "team-lead", "engineering-manager"]

Quality Monitoring

Output Validation

Implement automated checks for response quality:

from typing import List, Dict
import re

class OutputValidator:
    def validate_response(self, response: str, context: Dict[str, Any]) -> Dict[str, bool]:
        """Run multiple validation checks"""
        checks = {
            "non_empty": len(response.strip()) > 0,
            "not_too_short": len(response.split()) >= 10,
            "not_too_long": len(response.split()) <= 1000,
            "no_repetition": not self._has_repetition(response),
            "proper_formatting": self._check_formatting(response),
            "on_topic": self._check_relevance(response, context),
            "no_harmful_content": not self._contains_harmful_content(response),
            "factually_grounded": self._check_citations(response, context)
        }

        return checks

    def _has_repetition(self, text: str) -> bool:
        """Detect if text repeats itself"""
        sentences = text.split(". ")
        if len(sentences) < 2:
            return False

        # Check if any sentence appears more than once
        return len(sentences) != len(set(sentences))

    def _check_formatting(self, text: str) -> bool:
        """Ensure response follows expected format"""
        # Check for balanced brackets, quotes, code blocks
        open_brackets = text.count("[") + text.count("(") + text.count("{")
        close_brackets = text.count("]") + text.count(")") + text.count("}")

        return open_brackets == close_brackets

    def _check_relevance(self, response: str, context: Dict[str, Any]) -> bool:
        """Use embedding similarity to check if response is on-topic"""
        # Implement semantic similarity check
        # Compare response embedding to question embedding
        return True  # Placeholder

    def _contains_harmful_content(self, text: str) -> bool:
        """Check for harmful, biased, or inappropriate content"""
        # Use moderation API or custom classifier
        return False  # Placeholder

    def _check_citations(self, response: str, context: Dict[str, Any]) -> bool:
        """Verify claims are supported by provided context"""
        # Advanced: Extract claims and verify against source documents
        return True  # Placeholder

Semantic Similarity Monitoring

Track whether responses are consistent over time:

from openai import OpenAI
import numpy as np
from scipy.spatial.distance import cosine

client = OpenAI()

class SemanticMonitor:
    def __init__(self):
        self.reference_embeddings = {}

    def get_embedding(self, text: str) -> List[float]:
        """Get embedding for text"""
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding

    def set_reference_answer(self, question_id: str, reference_answer: str):
        """Store reference answer for a canonical question"""
        embedding = self.get_embedding(reference_answer)
        self.reference_embeddings[question_id] = embedding

    def check_similarity(self, question_id: str, current_answer: str) -> float:
        """Compare current answer to reference answer"""
        if question_id not in self.reference_embeddings:
            return 1.0  # No reference, assume OK

        current_embedding = self.get_embedding(current_answer)
        reference_embedding = self.reference_embeddings[question_id]

        # Cosine similarity (1 = identical, 0 = orthogonal)
        similarity = 1 - cosine(current_embedding, reference_embedding)

        if similarity < 0.8:  # Alert if answer has drifted significantly
            self._alert_drift(question_id, similarity)

        return similarity

User Satisfaction Tracking

Implement a satisfaction scoring system:

class SatisfactionTracker:
    def calculate_satisfaction_score(
        self,
        time_period: str = "24h"
    ) -> Dict[str, float]:
        """Calculate aggregate satisfaction metrics"""

        feedback_data = self._get_feedback_data(time_period)

        total_responses = len(feedback_data)
        positive_feedback = sum(1 for f in feedback_data if f["type"] == "thumbs_up")
        negative_feedback = sum(1 for f in feedback_data if f["type"] == "thumbs_down")

        # Net Promoter Score-style calculation
        nps_score = (positive_feedback - negative_feedback) / total_responses * 100

        # Satisfaction rate (% positive)
        satisfaction_rate = positive_feedback / total_responses * 100

        # Average resolution success (task completion)
        resolution_rate = self._calculate_resolution_rate(feedback_data)

        return {
            "nps_score": nps_score,
            "satisfaction_rate": satisfaction_rate,
            "resolution_rate": resolution_rate,
            "total_responses": total_responses
        }

Cost Monitoring and Optimization

Token Counting and Forecasting

Track token usage patterns to forecast costs:

class CostMonitor:
    def __init__(self):
        self.usage_history = []

    def record_usage(self, tokens: int, cost: float, user_id: str, endpoint: str):
        self.usage_history.append({
            "timestamp": datetime.utcnow(),
            "tokens": tokens,
            "cost": cost,
            "user_id": user_id,
            "endpoint": endpoint
        })

    def analyze_spending_patterns(self) -> Dict[str, Any]:
        """Identify cost trends and anomalies"""

        # Group by endpoint
        by_endpoint = {}
        for usage in self.usage_history:
            endpoint = usage["endpoint"]
            if endpoint not in by_endpoint:
                by_endpoint[endpoint] = {"tokens": 0, "cost": 0, "requests": 0}

            by_endpoint[endpoint]["tokens"] += usage["tokens"]
            by_endpoint[endpoint]["cost"] += usage["cost"]
            by_endpoint[endpoint]["requests"] += 1

        # Calculate cost per request for each endpoint
        for endpoint in by_endpoint:
            data = by_endpoint[endpoint]
            data["cost_per_request"] = data["cost"] / data["requests"]
            data["tokens_per_request"] = data["tokens"] / data["requests"]

        # Identify expensive endpoints
        sorted_endpoints = sorted(
            by_endpoint.items(),
            key=lambda x: x[1]["cost"],
            reverse=True
        )

        return {
            "total_cost": sum(u["cost"] for u in self.usage_history),
            "total_tokens": sum(u["tokens"] for u in self.usage_history),
            "by_endpoint": by_endpoint,
            "most_expensive_endpoints": sorted_endpoints[:5]
        }

    def forecast_monthly_cost(self) -> float:
        """Forecast monthly cost based on recent usage"""
        recent_usage = [u for u in self.usage_history
                       if (datetime.utcnow() - u["timestamp"]).days <= 7]

        if not recent_usage:
            return 0.0

        avg_daily_cost = sum(u["cost"] for u in recent_usage) / 7
        projected_monthly = avg_daily_cost * 30

        return projected_monthly

Caching Strategies

Implement semantic caching to reduce costs:

from hashlib import sha256
import json

class SemanticCache:
    def __init__(self, similarity_threshold: float = 0.95):
        self.cache = {}  # In production, use Redis or similar
        self.embeddings = {}
        self.similarity_threshold = similarity_threshold

    def get_cache_key(self, prompt: str, model: str) -> str:
        """Generate cache key from prompt"""
        key_data = {"prompt": prompt, "model": model}
        return sha256(json.dumps(key_data).encode()).hexdigest()

    def get_cached_response(self, prompt: str, model: str) -> Optional[str]:
        """Check if we have a cached response for similar prompt"""

        # Exact match check first
        exact_key = self.get_cache_key(prompt, model)
        if exact_key in self.cache:
            return self.cache[exact_key]

        # Semantic similarity check
        prompt_embedding = self.get_embedding(prompt)

        for cached_key, cached_data in self.cache.items():
            if cached_data["model"] != model:
                continue

            cached_embedding = self.embeddings.get(cached_key)
            if cached_embedding is None:
                continue

            similarity = 1 - cosine(prompt_embedding, cached_embedding)

            if similarity >= self.similarity_threshold:
                return cached_data["response"]

        return None

    def set_cached_response(self, prompt: str, model: str, response: str):
        """Cache a response"""
        cache_key = self.get_cache_key(prompt, model)
        self.cache[cache_key] = {
            "prompt": prompt,
            "model": model,
            "response": response,
            "timestamp": datetime.utcnow()
        }
        self.embeddings[cache_key] = self.get_embedding(prompt)

Caching effectiveness metrics:

  • Cache hit rate (% of requests served from cache)
  • Cost savings from caching
  • Average cache freshness
  • Cache eviction rate

Model Switching Based on Complexity

Route requests to appropriate model tiers:

class SmartRouter:
    def __init__(self):
        self.complexity_classifier = self._load_classifier()

    def route_request(self, prompt: str) -> str:
        """Determine which model to use based on query complexity"""

        complexity = self._assess_complexity(prompt)

        if complexity == "simple":
            return "gpt-3.5-turbo"  # Fast and cheap
        elif complexity == "moderate":
            return "gpt-4-turbo"     # Balanced
        else:
            return "gpt-4"           # Most capable

    def _assess_complexity(self, prompt: str) -> str:
        """Assess query complexity"""

        # Simple heuristics
        word_count = len(prompt.split())
        has_code = "```" in prompt or any(kw in prompt.lower()
                                          for kw in ["code", "function", "debug"])
        requires_reasoning = any(kw in prompt.lower()
                                 for kw in ["analyze", "compare", "explain why"])

        if word_count < 20 and not has_code and not requires_reasoning:
            return "simple"
        elif word_count > 100 or (has_code and requires_reasoning):
            return "complex"
        else:
            return "moderate"

Security Monitoring

Prompt Injection Detection

Monitor for adversarial inputs attempting to manipulate the model:

import re
from typing import List, Tuple

class SecurityMonitor:
    def __init__(self):
        self.injection_patterns = [
            r"ignore\s+(previous|above|prior)\s+instructions",
            r"disregard\s+.+\s+instructions",
            r"system:\s+you\s+are",
            r"<\|im_start\|>",
            r"pretend\s+you\s+are",
            r"act\s+as\s+if",
            r"forget\s+(everything|all|previous)",
        ]

    def detect_prompt_injection(self, prompt: str) -> Tuple[bool, List[str]]:
        """Detect potential prompt injection attempts"""

        detected_patterns = []

        for pattern in self.injection_patterns:
            if re.search(pattern, prompt, re.IGNORECASE):
                detected_patterns.append(pattern)

        is_suspicious = len(detected_patterns) > 0

        if is_suspicious:
            self._log_security_event("prompt_injection_attempt", {
                "prompt": prompt,
                "patterns": detected_patterns
            })

        return is_suspicious, detected_patterns

    def check_response_for_leakage(self, response: str) -> bool:
        """Check if response leaked system prompt or internal info"""

        leakage_indicators = [
            "system prompt:",
            "my instructions are",
            "I was told to",
            "my system message",
        ]

        for indicator in leakage_indicators:
            if indicator.lower() in response.lower():
                self._log_security_event("prompt_leakage", {
                    "response": response,
                    "indicator": indicator
                })
                return True

        return False

PII Detection and Redaction

Prevent sensitive data from being logged or processed:

class PIIDetector:
    def __init__(self):
        self.patterns = {
            "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
            "phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
            "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
            "credit_card": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b",
            "ip_address": r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
        }

    def detect_pii(self, text: str) -> Dict[str, List[str]]:
        """Detect PII in text"""
        detected = {}

        for pii_type, pattern in self.patterns.items():
            matches = re.findall(pattern, text)
            if matches:
                detected[pii_type] = matches

        return detected

    def redact_pii(self, text: str) -> str:
        """Redact PII from text"""
        redacted = text

        for pii_type, pattern in self.patterns.items():
            redacted = re.sub(pattern, f"[REDACTED_{pii_type.upper()}]", redacted)

        return redacted

Abuse Detection

Monitor for users abusing the system:

class AbuseDetector:
    def __init__(self):
        self.user_request_counts = {}
        self.rate_limits = {
            "requests_per_minute": 60,
            "tokens_per_hour": 100000,
            "cost_per_day": 100.0
        }

    def check_rate_limit(self, user_id: str) -> Tuple[bool, Optional[str]]:
        """Check if user is within rate limits"""

        if user_id not in self.user_request_counts:
            self.user_request_counts[user_id] = {
                "requests_this_minute": 0,
                "tokens_this_hour": 0,
                "cost_today": 0.0
            }

        counts = self.user_request_counts[user_id]

        # Check each limit
        if counts["requests_this_minute"] >= self.rate_limits["requests_per_minute"]:
            return False, "requests_per_minute"

        if counts["tokens_this_hour"] >= self.rate_limits["tokens_per_hour"]:
            return False, "tokens_per_hour"

        if counts["cost_today"] >= self.rate_limits["cost_per_day"]:
            return False, "cost_per_day"

        return True, None

Tools and Platforms

LangSmith

LangChain's observability platform for AI applications:

from langsmith import Client
from langsmith.run_helpers import traceable

client = Client()

@traceable(run_type="llm", name="chat_completion")
def call_model(prompt: str, model: str = "gpt-4"):
    """LangSmith automatically traces this call"""
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

LangSmith features:

  • Automatic tracing of LangChain applications
  • Visual debugging of chain execution
  • Dataset management for evaluations
  • Human-in-the-loop feedback collection
  • Cost and latency tracking per chain

Helicone

Open-source LLM observability focused on cost optimization:

# Add Helicone proxy to OpenAI calls
from openai import OpenAI

client = OpenAI(
    base_url="https://oai.hconeai.com/v1",
    default_headers={
        "Helicone-Auth": "Bearer sk-helicone-xxxxx",
        "Helicone-Cache-Enabled": "true",
        "Helicone-User-Id": user_id,
    }
)

Helicone features:

  • Zero-code integration via proxy
  • Built-in caching layer
  • Real-time cost tracking
  • Custom properties for segmentation
  • Prompt versioning

Phoenix (Arize AI)

Open-source tool for LLM observability and evaluation:

import phoenix as px
from phoenix.trace.openai import OpenAIInstrumentor

# Start Phoenix session
px.launch_app()

# Auto-instrument OpenAI
OpenAIInstrumentor().instrument()

# Your OpenAI calls are now automatically traced

Phoenix features:

  • Embedding visualizations
  • Retrieval evaluation (for RAG)
  • Drift detection
  • Performance analytics
  • Local-first (runs on your infrastructure)

Custom Solutions

For maximum control, build custom monitoring:

from prometheus_client import Counter, Histogram, Gauge
import time

# Define Prometheus metrics
ai_requests_total = Counter(
    "ai_requests_total",
    "Total AI requests",
    ["model", "endpoint", "status"]
)

ai_request_duration = Histogram(
    "ai_request_duration_seconds",
    "AI request duration",
    ["model", "endpoint"]
)

ai_token_usage = Counter(
    "ai_tokens_total",
    "Total tokens used",
    ["model", "token_type"]
)

ai_cost = Counter(
    "ai_cost_usd_total",
    "Total AI cost in USD",
    ["model", "endpoint"]
)

current_requests = Gauge(
    "ai_requests_in_progress",
    "Current requests in progress"
)

def monitored_ai_call(prompt: str, model: str, endpoint: str):
    """Wrapper that instruments AI calls with Prometheus metrics"""

    current_requests.inc()
    start_time = time.time()

    try:
        response = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )

        # Record success metrics
        duration = time.time() - start_time
        ai_requests_total.labels(model=model, endpoint=endpoint, status="success").inc()
        ai_request_duration.labels(model=model, endpoint=endpoint).observe(duration)

        # Track tokens and cost
        usage = response.usage
        ai_token_usage.labels(model=model, token_type="prompt").inc(usage.prompt_tokens)
        ai_token_usage.labels(model=model, token_type="completion").inc(usage.completion_tokens)

        cost = calculate_cost(usage, model)
        ai_cost.labels(model=model, endpoint=endpoint).inc(cost)

        return response

    except Exception as e:
        ai_requests_total.labels(model=model, endpoint=endpoint, status="error").inc()
        raise
    finally:
        current_requests.dec()

Building Dashboards and Reports

Real-Time Dashboard Design

Essential dashboard components:

Overview panel:

  • Requests per minute (current vs. baseline)
  • Average latency (P50, P95, P99)
  • Error rate (last hour)
  • Current cost burn rate ($/hour)
  • Active users count

Quality panel:

  • User satisfaction score (last 24h)
  • Thumbs up/down ratio
  • Average response length
  • Top error types
  • Hallucination detection alerts

Cost panel:

  • Spend today vs. yesterday
  • Cost breakdown by model
  • Most expensive endpoints
  • Token efficiency trends
  • Projected monthly cost

Performance panel:

  • Latency percentiles over time
  • Throughput (requests/sec)
  • Cache hit rate
  • Model selection distribution

Creating Automated Reports

from datetime import datetime, timedelta
import pandas as pd

class ReportGenerator:
    def generate_daily_report(self, date: datetime) -> Dict[str, Any]:
        """Generate comprehensive daily report"""

        metrics = self._fetch_metrics(date, date + timedelta(days=1))

        report = {
            "date": date.strftime("%Y-%m-%d"),
            "summary": {
                "total_requests": metrics["request_count"],
                "total_cost": metrics["total_cost"],
                "avg_latency_ms": metrics["avg_latency"],
                "error_rate": metrics["error_rate"],
                "user_satisfaction": metrics["satisfaction_score"]
            },
            "highlights": self._identify_highlights(metrics),
            "concerns": self._identify_concerns(metrics),
            "recommendations": self._generate_recommendations(metrics)
        }

        return report

    def _identify_highlights(self, metrics: Dict) -> List[str]:
        """Identify positive trends"""
        highlights = []

        if metrics["satisfaction_score"] > 0.85:
            highlights.append("High user satisfaction (>85%)")

        if metrics["cache_hit_rate"] > 0.5:
            highlights.append(f"Excellent cache performance ({metrics['cache_hit_rate']:.1%} hit rate)")

        if metrics["cost_vs_yesterday"] < 0:
            highlights.append(f"Cost reduction: {abs(metrics['cost_vs_yesterday']):.1%}")

        return highlights

    def _identify_concerns(self, metrics: Dict) -> List[str]:
        """Identify issues requiring attention"""
        concerns = []

        if metrics["error_rate"] > 0.05:
            concerns.append(f"High error rate: {metrics['error_rate']:.1%}")

        if metrics["p95_latency"] > 5000:
            concerns.append(f"Slow P95 latency: {metrics['p95_latency']}ms")

        if metrics["cost_vs_yesterday"] > 0.5:
            concerns.append(f"Cost spike: +{metrics['cost_vs_yesterday']:.1%}")

        return concerns

    def _generate_recommendations(self, metrics: Dict) -> List[str]:
        """Generate actionable recommendations"""
        recommendations = []

        if metrics["cache_hit_rate"] < 0.3:
            recommendations.append("Consider implementing semantic caching to reduce costs")

        if metrics["gpt4_usage_rate"] > 0.8:
            recommendations.append("Review if all requests require GPT-4; consider routing simpler queries to GPT-3.5")

        return recommendations

Incident Response for AI Failures

Incident Classification

Critical (P0):

  • Complete service outage
  • Security breach (PII leak, prompt injection success)
  • Cost runaway (>10x normal spend)
  • Widespread harmful/incorrect responses

High (P1):

  • Elevated error rates (>10%)
  • Major quality degradation
  • Single high-value customer impacted

Medium (P2):

  • Latency spikes
  • Moderate cost increases
  • Specific feature/endpoint failing

Incident Response Runbook

# AI Incident Response Runbook

## Initial Assessment (5 minutes)
1. Check dashboard for obvious anomalies
2. Review recent deployments/configuration changes
3. Verify external dependencies (OpenAI status page)
4. Determine impact scope (% of users affected)
5. Assign severity level

## Immediate Mitigation (15 minutes)
- **For cost runaway:** Implement emergency rate limiting
- **For quality issues:** Rollback to last known good model/prompt
- **For outage:** Activate fallback responses or degraded mode
- **For security:** Disable affected endpoint, enable stricter filtering

## Investigation (30-60 minutes)
1. Pull logs for affected timeframe
2. Sample failing requests
3. Compare to baseline successful requests
4. Identify root cause pattern
5. Test hypothesis in staging

## Resolution
1. Deploy fix to staging
2. Validate fix with test cases
3. Gradual rollout to production (10% → 50% → 100%)
4. Monitor metrics during rollout
5. Confirm resolution

## Post-Incident
1. Write incident report (root cause, timeline, resolution)
2. Update runbook with lessons learned
3. Implement preventive measures
4. Add new monitoring/alerts if gap identified

Post-Incident Analysis

class IncidentAnalyzer:
    def analyze_incident(
        self,
        incident_start: datetime,
        incident_end: datetime,
        incident_type: str
    ) -> Dict[str, Any]:
        """Analyze an incident to prevent recurrence"""

        # Gather data from incident timeframe
        incident_logs = self._fetch_logs(incident_start, incident_end)
        baseline_logs = self._fetch_logs(
            incident_start - timedelta(hours=24),
            incident_start
        )

        analysis = {
            "duration_minutes": (incident_end - incident_start).total_seconds() / 60,
            "affected_requests": len(incident_logs),
            "root_cause": self._identify_root_cause(incident_logs),
            "contributing_factors": self._find_contributing_factors(incident_logs),
            "similar_past_incidents": self._find_similar_incidents(incident_type),
            "prevention_recommendations": []
        }

        # Generate prevention recommendations
        if analysis["root_cause"] == "prompt_injection":
            analysis["prevention_recommendations"].append(
                "Implement stricter input validation"
            )
            analysis["prevention_recommendations"].append(
                "Add prompt injection detection to pre-processing"
            )

        return analysis

A/B Testing and Gradual Rollouts

Feature Flag System for AI

class AIFeatureFlags:
    def __init__(self):
        self.flags = {
            "new_system_prompt": {"enabled": False, "rollout_percentage": 0},
            "gpt4_turbo": {"enabled": True, "rollout_percentage": 50},
            "semantic_cache": {"enabled": True, "rollout_percentage": 100}
        }

    def is_enabled_for_user(self, flag_name: str, user_id: str) -> bool:
        """Determine if feature is enabled for user"""

        if flag_name not in self.flags:
            return False

        flag = self.flags[flag_name]

        if not flag["enabled"]:
            return False

        if flag["rollout_percentage"] == 100:
            return True

        # Consistent hashing to assign users to variants
        user_hash = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
        user_percentage = (user_hash % 100)

        return user_percentage < flag["rollout_percentage"]

A/B Test Framework

class ABTest:
    def __init__(self, test_name: str, variants: List[str]):
        self.test_name = test_name
        self.variants = variants
        self.results = {variant: [] for variant in variants}

    def assign_variant(self, user_id: str) -> str:
        """Assign user to variant"""
        user_hash = int(hashlib.md5(f"{user_id}{self.test_name}".encode()).hexdigest(), 16)
        variant_index = user_hash % len(self.variants)
        return self.variants[variant_index]

    def record_result(
        self,
        variant: str,
        user_id: str,
        success: bool,
        metrics: Dict[str, float]
    ):
        """Record test result"""
        self.results[variant].append({
            "user_id": user_id,
            "success": success,
            "metrics": metrics,
            "timestamp": datetime.utcnow()
        })

    def analyze_results(self) -> Dict[str, Any]:
        """Analyze A/B test results"""

        analysis = {}

        for variant, results in self.results.items():
            if not results:
                continue

            success_rate = sum(1 for r in results if r["success"]) / len(results)
            avg_latency = np.mean([r["metrics"]["latency"] for r in results])
            avg_cost = np.mean([r["metrics"]["cost"] for r in results])

            analysis[variant] = {
                "sample_size": len(results),
                "success_rate": success_rate,
                "avg_latency_ms": avg_latency,
                "avg_cost_usd": avg_cost
            }

        # Determine winner
        winner = max(analysis.items(), key=lambda x: x[1]["success_rate"])

        return {
            "variants": analysis,
            "winner": winner[0],
            "confidence": self._calculate_confidence(analysis)
        }

Gradual Rollout Strategy

class GradualRollout:
    def __init__(self, feature_name: str):
        self.feature_name = feature_name
        self.rollout_schedule = [
            {"percentage": 5, "duration_hours": 2, "success_threshold": 0.95},
            {"percentage": 25, "duration_hours": 4, "success_threshold": 0.95},
            {"percentage": 50, "duration_hours": 12, "success_threshold": 0.95},
            {"percentage": 100, "duration_hours": 0, "success_threshold": 0.95}
        ]
        self.current_stage = 0

    def should_proceed_to_next_stage(self, metrics: Dict[str, float]) -> bool:
        """Determine if rollout should continue to next stage"""

        if self.current_stage >= len(self.rollout_schedule):
            return False

        stage = self.rollout_schedule[self.current_stage]

        # Check success criteria
        if metrics["success_rate"] < stage["success_threshold"]:
            self._trigger_rollback("Success rate below threshold")
            return False

        if metrics["error_rate"] > 0.05:
            self._trigger_rollback("Error rate too high")
            return False

        return True

    def advance_rollout(self):
        """Move to next rollout stage"""
        self.current_stage += 1
        new_percentage = self.rollout_schedule[self.current_stage]["percentage"]
        self._update_feature_flag(self.feature_name, new_percentage)

Compliance and Audit Trails

Complete Request Logging for Compliance

class ComplianceLogger:
    def log_request_for_compliance(
        self,
        request_id: str,
        user_id: str,
        prompt: str,
        response: str,
        model: str,
        metadata: Dict[str, Any]
    ):
        """Log request with compliance requirements in mind"""

        # Detect and flag PII
        pii_detector = PIIDetector()
        prompt_pii = pii_detector.detect_pii(prompt)
        response_pii = pii_detector.detect_pii(response)

        compliance_log = {
            "request_id": request_id,
            "timestamp": datetime.utcnow().isoformat(),
            "user_id": user_id,
            "prompt": prompt,  # Store original
            "prompt_redacted": pii_detector.redact_pii(prompt),
            "response": response,
            "response_redacted": pii_detector.redact_pii(response),
            "model": model,
            "model_version": metadata.get("model_version"),
            "prompt_version": metadata.get("prompt_version"),
            "pii_detected_in_prompt": list(prompt_pii.keys()) if prompt_pii else [],
            "pii_detected_in_response": list(response_pii.keys()) if response_pii else [],
            "content_moderation_flag": metadata.get("content_flag"),
            "user_consent": metadata.get("user_consent", False),
            "retention_policy": "90_days",
            "jurisdiction": metadata.get("user_jurisdiction", "US")
        }

        # Write to append-only compliance log
        self._write_to_compliance_store(compliance_log)

        # If PII detected, also log to security monitoring
        if prompt_pii or response_pii:
            self._alert_pii_processing(request_id, prompt_pii, response_pii)

    def generate_audit_report(
        self,
        start_date: datetime,
        end_date: datetime
    ) -> Dict[str, Any]:
        """Generate compliance audit report"""

        logs = self._fetch_compliance_logs(start_date, end_date)

        report = {
            "period": f"{start_date.date()} to {end_date.date()}",
            "total_requests": len(logs),
            "requests_with_pii": sum(1 for log in logs
                                    if log["pii_detected_in_prompt"] or
                                       log["pii_detected_in_response"]),
            "content_flags": sum(1 for log in logs
                                if log.get("content_moderation_flag")),
            "models_used": list(set(log["model"] for log in logs)),
            "jurisdictions": list(set(log["jurisdiction"] for log in logs)),
            "data_retention_compliance": self._check_retention_compliance(logs)
        }

        return report

Summary

Monitoring AI systems in production requires a multi-faceted approach that goes far beyond traditional application monitoring. The non-deterministic nature of AI, combined with its potential for subtle failures, high costs, and security risks, demands comprehensive observability across performance, quality, cost, and security dimensions.

Key takeaways:

  • Implement end-to-end request logging with PII protection
  • Track both traditional metrics (latency, errors) and AI-specific metrics (quality scores, token usage)
  • Build intelligent alerting that adapts to AI's variability
  • Monitor costs proactively and implement optimization strategies
  • Watch for security issues like prompt injection and data leakage
  • Use specialized AI observability tools (LangSmith, Helicone, Phoenix) or build custom solutions
  • Create comprehensive dashboards that give visibility into all aspects of your AI system
  • Prepare incident response procedures specific to AI failures
  • Use A/B testing and gradual rollouts to de-risk changes
  • Maintain compliance-ready audit trails

The investment in robust monitoring pays dividends through improved reliability, reduced costs, better user experiences, and the confidence to iterate quickly on your AI systems.