- Home
- /Guides
- /build-deploy
- /Monitoring AI Systems in Production
Monitoring AI Systems in Production
Enterprise-grade monitoring, alerting, and observability for production AI systems. Learn to track performance, costs, quality, and security at scale.
Why AI Production Monitoring is Critical
Deploying AI systems to production introduces unique challenges that traditional application monitoring doesn't address. Unlike deterministic software, AI models can hallucinate, degrade over time, consume unpredictable resources, and fail in subtle ways that only become apparent through careful observation.
The stakes are high:
- A hallucinated response in a customer-facing chatbot damages trust
- Unmonitored token usage can burn through budgets in hours
- Gradual quality degradation goes unnoticed until users complain
- Security vulnerabilities like prompt injection slip through without detection
- Compliance violations occur when PII leakage isn't tracked
Production AI monitoring isn't optional—it's the foundation of reliable, cost-effective, and trustworthy AI applications.
Key Metrics to Track
Performance Metrics
Latency: Response time from request to completion. Critical for user experience.
- Time to first token (TTFT): How long before the first response appears
- Tokens per second: Throughput during streaming
- Total request duration: End-to-end timing including preprocessing
import time
from openai import OpenAI
client = OpenAI()
def track_latency(prompt: str):
start_time = time.time()
first_token_time = None
tokens_received = 0
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in response:
if first_token_time is None:
first_token_time = time.time()
tokens_received += 1
total_time = time.time() - start_time
ttft = first_token_time - start_time if first_token_time else total_time
return {
"ttft_ms": ttft * 1000,
"total_duration_ms": total_time * 1000,
"tokens_per_second": tokens_received / total_time if total_time > 0 else 0
}
Error Rates: Track failures by type and cause.
- API errors: Rate limiting, timeouts, service unavailability
- Validation errors: Malformed outputs, schema violations
- Semantic errors: Responses that don't answer the question
Token Usage and Cost Metrics
Every AI request has a direct cost. Track usage granularly:
from dataclasses import dataclass
from typing import Optional
@dataclass
class TokenMetrics:
prompt_tokens: int
completion_tokens: int
total_tokens: int
model: str
cost_usd: float
user_id: Optional[str] = None
endpoint: Optional[str] = None
@classmethod
def from_response(cls, response, user_id: str, endpoint: str):
usage = response.usage
# Pricing as of 2025 (update regularly)
pricing = {
"gpt-4": {"input": 0.03 / 1000, "output": 0.06 / 1000},
"gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
"gpt-3.5-turbo": {"input": 0.0005 / 1000, "output": 0.0015 / 1000}
}
model = response.model
prices = pricing.get(model, {"input": 0, "output": 0})
cost = (usage.prompt_tokens * prices["input"] +
usage.completion_tokens * prices["output"])
return cls(
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens,
total_tokens=usage.total_tokens,
model=model,
cost_usd=cost,
user_id=user_id,
endpoint=endpoint
)
Critical cost metrics:
- Cost per request (average and P95)
- Cost per user per day
- Cost by endpoint/feature
- Token efficiency (output tokens / input tokens)
- Cache hit rate (if using caching)
Quality Metrics
Quality monitoring is where AI monitoring diverges most from traditional software:
- Semantic similarity: Compare responses to known good answers
- User satisfaction: Track thumbs up/down, explicit feedback
- Task success rate: Did the AI accomplish the intended goal?
- Coherence scores: Automated evaluation of response quality
- Hallucination detection: Flag factually incorrect or unsupported claims
Logging Best Practices
Comprehensive Request Logging
Log everything you need to debug, reproduce, and improve:
import json
import logging
from datetime import datetime
from typing import Any, Dict
class AIRequestLogger:
def __init__(self, logger: logging.Logger):
self.logger = logger
def log_request(
self,
request_id: str,
user_id: str,
prompt: str,
response: str,
model: str,
metrics: TokenMetrics,
metadata: Dict[str, Any]
):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"request_id": request_id,
"user_id": user_id,
"model": model,
"prompt": prompt,
"response": response,
"prompt_tokens": metrics.prompt_tokens,
"completion_tokens": metrics.completion_tokens,
"cost_usd": metrics.cost_usd,
"latency_ms": metadata.get("latency_ms"),
"endpoint": metadata.get("endpoint"),
"version": metadata.get("model_version"),
"error": metadata.get("error"),
"user_feedback": metadata.get("feedback")
}
# Use structured logging (JSON)
self.logger.info(json.dumps(log_entry))
# Also send to analytics platform
self._send_to_analytics(log_entry)
def _send_to_analytics(self, log_entry: Dict[str, Any]):
# Integration with your monitoring platform
# (e.g., PostHog, Mixpanel, custom time-series DB)
pass
What to log:
- Full prompts (sanitize PII first)
- Complete responses
- All metadata (model, version, temperature, etc.)
- System prompts and few-shot examples
- User context (session ID, user tier, A/B test variant)
- Timing breakdowns (preprocessing, API call, postprocessing)
Privacy considerations:
- Implement PII detection and redaction
- Use separate retention policies for production logs vs. analytics
- Ensure GDPR/CCPA compliance with user data deletion
- Consider prompt anonymization for training data
User Feedback Logging
Capture explicit and implicit feedback:
class FeedbackTracker:
def record_feedback(
self,
request_id: str,
feedback_type: str, # "thumbs_up", "thumbs_down", "report"
feedback_text: Optional[str] = None,
feedback_category: Optional[str] = None # "incorrect", "harmful", "unhelpful"
):
feedback_entry = {
"timestamp": datetime.utcnow().isoformat(),
"request_id": request_id,
"feedback_type": feedback_type,
"feedback_text": feedback_text,
"feedback_category": feedback_category
}
# Join with original request for analysis
self._correlate_with_request(request_id, feedback_entry)
def record_implicit_feedback(
self,
request_id: str,
user_action: str # "copied_response", "regenerated", "edited_output"
):
# Implicit signals about quality
pass
Real-Time Alerting
Setting Intelligent Thresholds
AI systems need dynamic thresholds that account for natural variation:
from typing import List
import numpy as np
class AdaptiveAlertingSystem:
def __init__(self, metric_name: str, window_minutes: int = 60):
self.metric_name = metric_name
self.window_minutes = window_minutes
self.baseline_mean = None
self.baseline_std = None
def update_baseline(self, historical_values: List[float]):
"""Calculate baseline from historical data"""
self.baseline_mean = np.mean(historical_values)
self.baseline_std = np.std(historical_values)
def check_anomaly(self, current_value: float, sensitivity: float = 3.0) -> bool:
"""Return True if value is anomalous (sensitivity = # of std devs)"""
if self.baseline_mean is None:
return False
z_score = abs((current_value - self.baseline_mean) / self.baseline_std)
return z_score > sensitivity
def trigger_alert(self, current_value: float, context: Dict[str, Any]):
alert = {
"metric": self.metric_name,
"current_value": current_value,
"baseline_mean": self.baseline_mean,
"deviation": abs(current_value - self.baseline_mean),
"timestamp": datetime.utcnow().isoformat(),
"context": context
}
# Send to alerting system (PagerDuty, Opsgenie, etc.)
self._send_alert(alert)
Metrics that should trigger alerts:
- Latency P95 > 2x baseline
- Error rate > 5% over 5-minute window
- Cost per hour > 150% of daily average
- User feedback thumbs down rate > 20%
- Zero responses in last 5 minutes (complete failure)
- Cache miss rate drops suddenly (cache failure)
Avoiding Alert Fatigue
Alert prioritization:
- P0 (page immediately): Service down, cost runaway, security breach
- P1 (urgent): High error rates, major quality degradation
- P2 (important): Moderate latency increase, elevated costs
- P3 (monitoring): Minor anomalies, trending issues
Escalation policies:
# Example escalation configuration
escalation_policies:
- name: "AI Production Incidents"
stages:
- delay_minutes: 0
notify: ["on-call-engineer"]
- delay_minutes: 15
notify: ["on-call-engineer", "team-lead"]
- delay_minutes: 30
notify: ["on-call-engineer", "team-lead", "engineering-manager"]
Quality Monitoring
Output Validation
Implement automated checks for response quality:
from typing import List, Dict
import re
class OutputValidator:
def validate_response(self, response: str, context: Dict[str, Any]) -> Dict[str, bool]:
"""Run multiple validation checks"""
checks = {
"non_empty": len(response.strip()) > 0,
"not_too_short": len(response.split()) >= 10,
"not_too_long": len(response.split()) <= 1000,
"no_repetition": not self._has_repetition(response),
"proper_formatting": self._check_formatting(response),
"on_topic": self._check_relevance(response, context),
"no_harmful_content": not self._contains_harmful_content(response),
"factually_grounded": self._check_citations(response, context)
}
return checks
def _has_repetition(self, text: str) -> bool:
"""Detect if text repeats itself"""
sentences = text.split(". ")
if len(sentences) < 2:
return False
# Check if any sentence appears more than once
return len(sentences) != len(set(sentences))
def _check_formatting(self, text: str) -> bool:
"""Ensure response follows expected format"""
# Check for balanced brackets, quotes, code blocks
open_brackets = text.count("[") + text.count("(") + text.count("{")
close_brackets = text.count("]") + text.count(")") + text.count("}")
return open_brackets == close_brackets
def _check_relevance(self, response: str, context: Dict[str, Any]) -> bool:
"""Use embedding similarity to check if response is on-topic"""
# Implement semantic similarity check
# Compare response embedding to question embedding
return True # Placeholder
def _contains_harmful_content(self, text: str) -> bool:
"""Check for harmful, biased, or inappropriate content"""
# Use moderation API or custom classifier
return False # Placeholder
def _check_citations(self, response: str, context: Dict[str, Any]) -> bool:
"""Verify claims are supported by provided context"""
# Advanced: Extract claims and verify against source documents
return True # Placeholder
Semantic Similarity Monitoring
Track whether responses are consistent over time:
from openai import OpenAI
import numpy as np
from scipy.spatial.distance import cosine
client = OpenAI()
class SemanticMonitor:
def __init__(self):
self.reference_embeddings = {}
def get_embedding(self, text: str) -> List[float]:
"""Get embedding for text"""
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def set_reference_answer(self, question_id: str, reference_answer: str):
"""Store reference answer for a canonical question"""
embedding = self.get_embedding(reference_answer)
self.reference_embeddings[question_id] = embedding
def check_similarity(self, question_id: str, current_answer: str) -> float:
"""Compare current answer to reference answer"""
if question_id not in self.reference_embeddings:
return 1.0 # No reference, assume OK
current_embedding = self.get_embedding(current_answer)
reference_embedding = self.reference_embeddings[question_id]
# Cosine similarity (1 = identical, 0 = orthogonal)
similarity = 1 - cosine(current_embedding, reference_embedding)
if similarity < 0.8: # Alert if answer has drifted significantly
self._alert_drift(question_id, similarity)
return similarity
User Satisfaction Tracking
Implement a satisfaction scoring system:
class SatisfactionTracker:
def calculate_satisfaction_score(
self,
time_period: str = "24h"
) -> Dict[str, float]:
"""Calculate aggregate satisfaction metrics"""
feedback_data = self._get_feedback_data(time_period)
total_responses = len(feedback_data)
positive_feedback = sum(1 for f in feedback_data if f["type"] == "thumbs_up")
negative_feedback = sum(1 for f in feedback_data if f["type"] == "thumbs_down")
# Net Promoter Score-style calculation
nps_score = (positive_feedback - negative_feedback) / total_responses * 100
# Satisfaction rate (% positive)
satisfaction_rate = positive_feedback / total_responses * 100
# Average resolution success (task completion)
resolution_rate = self._calculate_resolution_rate(feedback_data)
return {
"nps_score": nps_score,
"satisfaction_rate": satisfaction_rate,
"resolution_rate": resolution_rate,
"total_responses": total_responses
}
Cost Monitoring and Optimization
Token Counting and Forecasting
Track token usage patterns to forecast costs:
class CostMonitor:
def __init__(self):
self.usage_history = []
def record_usage(self, tokens: int, cost: float, user_id: str, endpoint: str):
self.usage_history.append({
"timestamp": datetime.utcnow(),
"tokens": tokens,
"cost": cost,
"user_id": user_id,
"endpoint": endpoint
})
def analyze_spending_patterns(self) -> Dict[str, Any]:
"""Identify cost trends and anomalies"""
# Group by endpoint
by_endpoint = {}
for usage in self.usage_history:
endpoint = usage["endpoint"]
if endpoint not in by_endpoint:
by_endpoint[endpoint] = {"tokens": 0, "cost": 0, "requests": 0}
by_endpoint[endpoint]["tokens"] += usage["tokens"]
by_endpoint[endpoint]["cost"] += usage["cost"]
by_endpoint[endpoint]["requests"] += 1
# Calculate cost per request for each endpoint
for endpoint in by_endpoint:
data = by_endpoint[endpoint]
data["cost_per_request"] = data["cost"] / data["requests"]
data["tokens_per_request"] = data["tokens"] / data["requests"]
# Identify expensive endpoints
sorted_endpoints = sorted(
by_endpoint.items(),
key=lambda x: x[1]["cost"],
reverse=True
)
return {
"total_cost": sum(u["cost"] for u in self.usage_history),
"total_tokens": sum(u["tokens"] for u in self.usage_history),
"by_endpoint": by_endpoint,
"most_expensive_endpoints": sorted_endpoints[:5]
}
def forecast_monthly_cost(self) -> float:
"""Forecast monthly cost based on recent usage"""
recent_usage = [u for u in self.usage_history
if (datetime.utcnow() - u["timestamp"]).days <= 7]
if not recent_usage:
return 0.0
avg_daily_cost = sum(u["cost"] for u in recent_usage) / 7
projected_monthly = avg_daily_cost * 30
return projected_monthly
Caching Strategies
Implement semantic caching to reduce costs:
from hashlib import sha256
import json
class SemanticCache:
def __init__(self, similarity_threshold: float = 0.95):
self.cache = {} # In production, use Redis or similar
self.embeddings = {}
self.similarity_threshold = similarity_threshold
def get_cache_key(self, prompt: str, model: str) -> str:
"""Generate cache key from prompt"""
key_data = {"prompt": prompt, "model": model}
return sha256(json.dumps(key_data).encode()).hexdigest()
def get_cached_response(self, prompt: str, model: str) -> Optional[str]:
"""Check if we have a cached response for similar prompt"""
# Exact match check first
exact_key = self.get_cache_key(prompt, model)
if exact_key in self.cache:
return self.cache[exact_key]
# Semantic similarity check
prompt_embedding = self.get_embedding(prompt)
for cached_key, cached_data in self.cache.items():
if cached_data["model"] != model:
continue
cached_embedding = self.embeddings.get(cached_key)
if cached_embedding is None:
continue
similarity = 1 - cosine(prompt_embedding, cached_embedding)
if similarity >= self.similarity_threshold:
return cached_data["response"]
return None
def set_cached_response(self, prompt: str, model: str, response: str):
"""Cache a response"""
cache_key = self.get_cache_key(prompt, model)
self.cache[cache_key] = {
"prompt": prompt,
"model": model,
"response": response,
"timestamp": datetime.utcnow()
}
self.embeddings[cache_key] = self.get_embedding(prompt)
Caching effectiveness metrics:
- Cache hit rate (% of requests served from cache)
- Cost savings from caching
- Average cache freshness
- Cache eviction rate
Model Switching Based on Complexity
Route requests to appropriate model tiers:
class SmartRouter:
def __init__(self):
self.complexity_classifier = self._load_classifier()
def route_request(self, prompt: str) -> str:
"""Determine which model to use based on query complexity"""
complexity = self._assess_complexity(prompt)
if complexity == "simple":
return "gpt-3.5-turbo" # Fast and cheap
elif complexity == "moderate":
return "gpt-4-turbo" # Balanced
else:
return "gpt-4" # Most capable
def _assess_complexity(self, prompt: str) -> str:
"""Assess query complexity"""
# Simple heuristics
word_count = len(prompt.split())
has_code = "```" in prompt or any(kw in prompt.lower()
for kw in ["code", "function", "debug"])
requires_reasoning = any(kw in prompt.lower()
for kw in ["analyze", "compare", "explain why"])
if word_count < 20 and not has_code and not requires_reasoning:
return "simple"
elif word_count > 100 or (has_code and requires_reasoning):
return "complex"
else:
return "moderate"
Security Monitoring
Prompt Injection Detection
Monitor for adversarial inputs attempting to manipulate the model:
import re
from typing import List, Tuple
class SecurityMonitor:
def __init__(self):
self.injection_patterns = [
r"ignore\s+(previous|above|prior)\s+instructions",
r"disregard\s+.+\s+instructions",
r"system:\s+you\s+are",
r"<\|im_start\|>",
r"pretend\s+you\s+are",
r"act\s+as\s+if",
r"forget\s+(everything|all|previous)",
]
def detect_prompt_injection(self, prompt: str) -> Tuple[bool, List[str]]:
"""Detect potential prompt injection attempts"""
detected_patterns = []
for pattern in self.injection_patterns:
if re.search(pattern, prompt, re.IGNORECASE):
detected_patterns.append(pattern)
is_suspicious = len(detected_patterns) > 0
if is_suspicious:
self._log_security_event("prompt_injection_attempt", {
"prompt": prompt,
"patterns": detected_patterns
})
return is_suspicious, detected_patterns
def check_response_for_leakage(self, response: str) -> bool:
"""Check if response leaked system prompt or internal info"""
leakage_indicators = [
"system prompt:",
"my instructions are",
"I was told to",
"my system message",
]
for indicator in leakage_indicators:
if indicator.lower() in response.lower():
self._log_security_event("prompt_leakage", {
"response": response,
"indicator": indicator
})
return True
return False
PII Detection and Redaction
Prevent sensitive data from being logged or processed:
class PIIDetector:
def __init__(self):
self.patterns = {
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b",
"ip_address": r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
}
def detect_pii(self, text: str) -> Dict[str, List[str]]:
"""Detect PII in text"""
detected = {}
for pii_type, pattern in self.patterns.items():
matches = re.findall(pattern, text)
if matches:
detected[pii_type] = matches
return detected
def redact_pii(self, text: str) -> str:
"""Redact PII from text"""
redacted = text
for pii_type, pattern in self.patterns.items():
redacted = re.sub(pattern, f"[REDACTED_{pii_type.upper()}]", redacted)
return redacted
Abuse Detection
Monitor for users abusing the system:
class AbuseDetector:
def __init__(self):
self.user_request_counts = {}
self.rate_limits = {
"requests_per_minute": 60,
"tokens_per_hour": 100000,
"cost_per_day": 100.0
}
def check_rate_limit(self, user_id: str) -> Tuple[bool, Optional[str]]:
"""Check if user is within rate limits"""
if user_id not in self.user_request_counts:
self.user_request_counts[user_id] = {
"requests_this_minute": 0,
"tokens_this_hour": 0,
"cost_today": 0.0
}
counts = self.user_request_counts[user_id]
# Check each limit
if counts["requests_this_minute"] >= self.rate_limits["requests_per_minute"]:
return False, "requests_per_minute"
if counts["tokens_this_hour"] >= self.rate_limits["tokens_per_hour"]:
return False, "tokens_per_hour"
if counts["cost_today"] >= self.rate_limits["cost_per_day"]:
return False, "cost_per_day"
return True, None
Tools and Platforms
LangSmith
LangChain's observability platform for AI applications:
from langsmith import Client
from langsmith.run_helpers import traceable
client = Client()
@traceable(run_type="llm", name="chat_completion")
def call_model(prompt: str, model: str = "gpt-4"):
"""LangSmith automatically traces this call"""
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
LangSmith features:
- Automatic tracing of LangChain applications
- Visual debugging of chain execution
- Dataset management for evaluations
- Human-in-the-loop feedback collection
- Cost and latency tracking per chain
Helicone
Open-source LLM observability focused on cost optimization:
# Add Helicone proxy to OpenAI calls
from openai import OpenAI
client = OpenAI(
base_url="https://oai.hconeai.com/v1",
default_headers={
"Helicone-Auth": "Bearer sk-helicone-xxxxx",
"Helicone-Cache-Enabled": "true",
"Helicone-User-Id": user_id,
}
)
Helicone features:
- Zero-code integration via proxy
- Built-in caching layer
- Real-time cost tracking
- Custom properties for segmentation
- Prompt versioning
Phoenix (Arize AI)
Open-source tool for LLM observability and evaluation:
import phoenix as px
from phoenix.trace.openai import OpenAIInstrumentor
# Start Phoenix session
px.launch_app()
# Auto-instrument OpenAI
OpenAIInstrumentor().instrument()
# Your OpenAI calls are now automatically traced
Phoenix features:
- Embedding visualizations
- Retrieval evaluation (for RAG)
- Drift detection
- Performance analytics
- Local-first (runs on your infrastructure)
Custom Solutions
For maximum control, build custom monitoring:
from prometheus_client import Counter, Histogram, Gauge
import time
# Define Prometheus metrics
ai_requests_total = Counter(
"ai_requests_total",
"Total AI requests",
["model", "endpoint", "status"]
)
ai_request_duration = Histogram(
"ai_request_duration_seconds",
"AI request duration",
["model", "endpoint"]
)
ai_token_usage = Counter(
"ai_tokens_total",
"Total tokens used",
["model", "token_type"]
)
ai_cost = Counter(
"ai_cost_usd_total",
"Total AI cost in USD",
["model", "endpoint"]
)
current_requests = Gauge(
"ai_requests_in_progress",
"Current requests in progress"
)
def monitored_ai_call(prompt: str, model: str, endpoint: str):
"""Wrapper that instruments AI calls with Prometheus metrics"""
current_requests.inc()
start_time = time.time()
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
# Record success metrics
duration = time.time() - start_time
ai_requests_total.labels(model=model, endpoint=endpoint, status="success").inc()
ai_request_duration.labels(model=model, endpoint=endpoint).observe(duration)
# Track tokens and cost
usage = response.usage
ai_token_usage.labels(model=model, token_type="prompt").inc(usage.prompt_tokens)
ai_token_usage.labels(model=model, token_type="completion").inc(usage.completion_tokens)
cost = calculate_cost(usage, model)
ai_cost.labels(model=model, endpoint=endpoint).inc(cost)
return response
except Exception as e:
ai_requests_total.labels(model=model, endpoint=endpoint, status="error").inc()
raise
finally:
current_requests.dec()
Building Dashboards and Reports
Real-Time Dashboard Design
Essential dashboard components:
Overview panel:
- Requests per minute (current vs. baseline)
- Average latency (P50, P95, P99)
- Error rate (last hour)
- Current cost burn rate ($/hour)
- Active users count
Quality panel:
- User satisfaction score (last 24h)
- Thumbs up/down ratio
- Average response length
- Top error types
- Hallucination detection alerts
Cost panel:
- Spend today vs. yesterday
- Cost breakdown by model
- Most expensive endpoints
- Token efficiency trends
- Projected monthly cost
Performance panel:
- Latency percentiles over time
- Throughput (requests/sec)
- Cache hit rate
- Model selection distribution
Creating Automated Reports
from datetime import datetime, timedelta
import pandas as pd
class ReportGenerator:
def generate_daily_report(self, date: datetime) -> Dict[str, Any]:
"""Generate comprehensive daily report"""
metrics = self._fetch_metrics(date, date + timedelta(days=1))
report = {
"date": date.strftime("%Y-%m-%d"),
"summary": {
"total_requests": metrics["request_count"],
"total_cost": metrics["total_cost"],
"avg_latency_ms": metrics["avg_latency"],
"error_rate": metrics["error_rate"],
"user_satisfaction": metrics["satisfaction_score"]
},
"highlights": self._identify_highlights(metrics),
"concerns": self._identify_concerns(metrics),
"recommendations": self._generate_recommendations(metrics)
}
return report
def _identify_highlights(self, metrics: Dict) -> List[str]:
"""Identify positive trends"""
highlights = []
if metrics["satisfaction_score"] > 0.85:
highlights.append("High user satisfaction (>85%)")
if metrics["cache_hit_rate"] > 0.5:
highlights.append(f"Excellent cache performance ({metrics['cache_hit_rate']:.1%} hit rate)")
if metrics["cost_vs_yesterday"] < 0:
highlights.append(f"Cost reduction: {abs(metrics['cost_vs_yesterday']):.1%}")
return highlights
def _identify_concerns(self, metrics: Dict) -> List[str]:
"""Identify issues requiring attention"""
concerns = []
if metrics["error_rate"] > 0.05:
concerns.append(f"High error rate: {metrics['error_rate']:.1%}")
if metrics["p95_latency"] > 5000:
concerns.append(f"Slow P95 latency: {metrics['p95_latency']}ms")
if metrics["cost_vs_yesterday"] > 0.5:
concerns.append(f"Cost spike: +{metrics['cost_vs_yesterday']:.1%}")
return concerns
def _generate_recommendations(self, metrics: Dict) -> List[str]:
"""Generate actionable recommendations"""
recommendations = []
if metrics["cache_hit_rate"] < 0.3:
recommendations.append("Consider implementing semantic caching to reduce costs")
if metrics["gpt4_usage_rate"] > 0.8:
recommendations.append("Review if all requests require GPT-4; consider routing simpler queries to GPT-3.5")
return recommendations
Incident Response for AI Failures
Incident Classification
Critical (P0):
- Complete service outage
- Security breach (PII leak, prompt injection success)
- Cost runaway (>10x normal spend)
- Widespread harmful/incorrect responses
High (P1):
- Elevated error rates (>10%)
- Major quality degradation
- Single high-value customer impacted
Medium (P2):
- Latency spikes
- Moderate cost increases
- Specific feature/endpoint failing
Incident Response Runbook
# AI Incident Response Runbook
## Initial Assessment (5 minutes)
1. Check dashboard for obvious anomalies
2. Review recent deployments/configuration changes
3. Verify external dependencies (OpenAI status page)
4. Determine impact scope (% of users affected)
5. Assign severity level
## Immediate Mitigation (15 minutes)
- **For cost runaway:** Implement emergency rate limiting
- **For quality issues:** Rollback to last known good model/prompt
- **For outage:** Activate fallback responses or degraded mode
- **For security:** Disable affected endpoint, enable stricter filtering
## Investigation (30-60 minutes)
1. Pull logs for affected timeframe
2. Sample failing requests
3. Compare to baseline successful requests
4. Identify root cause pattern
5. Test hypothesis in staging
## Resolution
1. Deploy fix to staging
2. Validate fix with test cases
3. Gradual rollout to production (10% → 50% → 100%)
4. Monitor metrics during rollout
5. Confirm resolution
## Post-Incident
1. Write incident report (root cause, timeline, resolution)
2. Update runbook with lessons learned
3. Implement preventive measures
4. Add new monitoring/alerts if gap identified
Post-Incident Analysis
class IncidentAnalyzer:
def analyze_incident(
self,
incident_start: datetime,
incident_end: datetime,
incident_type: str
) -> Dict[str, Any]:
"""Analyze an incident to prevent recurrence"""
# Gather data from incident timeframe
incident_logs = self._fetch_logs(incident_start, incident_end)
baseline_logs = self._fetch_logs(
incident_start - timedelta(hours=24),
incident_start
)
analysis = {
"duration_minutes": (incident_end - incident_start).total_seconds() / 60,
"affected_requests": len(incident_logs),
"root_cause": self._identify_root_cause(incident_logs),
"contributing_factors": self._find_contributing_factors(incident_logs),
"similar_past_incidents": self._find_similar_incidents(incident_type),
"prevention_recommendations": []
}
# Generate prevention recommendations
if analysis["root_cause"] == "prompt_injection":
analysis["prevention_recommendations"].append(
"Implement stricter input validation"
)
analysis["prevention_recommendations"].append(
"Add prompt injection detection to pre-processing"
)
return analysis
A/B Testing and Gradual Rollouts
Feature Flag System for AI
class AIFeatureFlags:
def __init__(self):
self.flags = {
"new_system_prompt": {"enabled": False, "rollout_percentage": 0},
"gpt4_turbo": {"enabled": True, "rollout_percentage": 50},
"semantic_cache": {"enabled": True, "rollout_percentage": 100}
}
def is_enabled_for_user(self, flag_name: str, user_id: str) -> bool:
"""Determine if feature is enabled for user"""
if flag_name not in self.flags:
return False
flag = self.flags[flag_name]
if not flag["enabled"]:
return False
if flag["rollout_percentage"] == 100:
return True
# Consistent hashing to assign users to variants
user_hash = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
user_percentage = (user_hash % 100)
return user_percentage < flag["rollout_percentage"]
A/B Test Framework
class ABTest:
def __init__(self, test_name: str, variants: List[str]):
self.test_name = test_name
self.variants = variants
self.results = {variant: [] for variant in variants}
def assign_variant(self, user_id: str) -> str:
"""Assign user to variant"""
user_hash = int(hashlib.md5(f"{user_id}{self.test_name}".encode()).hexdigest(), 16)
variant_index = user_hash % len(self.variants)
return self.variants[variant_index]
def record_result(
self,
variant: str,
user_id: str,
success: bool,
metrics: Dict[str, float]
):
"""Record test result"""
self.results[variant].append({
"user_id": user_id,
"success": success,
"metrics": metrics,
"timestamp": datetime.utcnow()
})
def analyze_results(self) -> Dict[str, Any]:
"""Analyze A/B test results"""
analysis = {}
for variant, results in self.results.items():
if not results:
continue
success_rate = sum(1 for r in results if r["success"]) / len(results)
avg_latency = np.mean([r["metrics"]["latency"] for r in results])
avg_cost = np.mean([r["metrics"]["cost"] for r in results])
analysis[variant] = {
"sample_size": len(results),
"success_rate": success_rate,
"avg_latency_ms": avg_latency,
"avg_cost_usd": avg_cost
}
# Determine winner
winner = max(analysis.items(), key=lambda x: x[1]["success_rate"])
return {
"variants": analysis,
"winner": winner[0],
"confidence": self._calculate_confidence(analysis)
}
Gradual Rollout Strategy
class GradualRollout:
def __init__(self, feature_name: str):
self.feature_name = feature_name
self.rollout_schedule = [
{"percentage": 5, "duration_hours": 2, "success_threshold": 0.95},
{"percentage": 25, "duration_hours": 4, "success_threshold": 0.95},
{"percentage": 50, "duration_hours": 12, "success_threshold": 0.95},
{"percentage": 100, "duration_hours": 0, "success_threshold": 0.95}
]
self.current_stage = 0
def should_proceed_to_next_stage(self, metrics: Dict[str, float]) -> bool:
"""Determine if rollout should continue to next stage"""
if self.current_stage >= len(self.rollout_schedule):
return False
stage = self.rollout_schedule[self.current_stage]
# Check success criteria
if metrics["success_rate"] < stage["success_threshold"]:
self._trigger_rollback("Success rate below threshold")
return False
if metrics["error_rate"] > 0.05:
self._trigger_rollback("Error rate too high")
return False
return True
def advance_rollout(self):
"""Move to next rollout stage"""
self.current_stage += 1
new_percentage = self.rollout_schedule[self.current_stage]["percentage"]
self._update_feature_flag(self.feature_name, new_percentage)
Compliance and Audit Trails
Complete Request Logging for Compliance
class ComplianceLogger:
def log_request_for_compliance(
self,
request_id: str,
user_id: str,
prompt: str,
response: str,
model: str,
metadata: Dict[str, Any]
):
"""Log request with compliance requirements in mind"""
# Detect and flag PII
pii_detector = PIIDetector()
prompt_pii = pii_detector.detect_pii(prompt)
response_pii = pii_detector.detect_pii(response)
compliance_log = {
"request_id": request_id,
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"prompt": prompt, # Store original
"prompt_redacted": pii_detector.redact_pii(prompt),
"response": response,
"response_redacted": pii_detector.redact_pii(response),
"model": model,
"model_version": metadata.get("model_version"),
"prompt_version": metadata.get("prompt_version"),
"pii_detected_in_prompt": list(prompt_pii.keys()) if prompt_pii else [],
"pii_detected_in_response": list(response_pii.keys()) if response_pii else [],
"content_moderation_flag": metadata.get("content_flag"),
"user_consent": metadata.get("user_consent", False),
"retention_policy": "90_days",
"jurisdiction": metadata.get("user_jurisdiction", "US")
}
# Write to append-only compliance log
self._write_to_compliance_store(compliance_log)
# If PII detected, also log to security monitoring
if prompt_pii or response_pii:
self._alert_pii_processing(request_id, prompt_pii, response_pii)
def generate_audit_report(
self,
start_date: datetime,
end_date: datetime
) -> Dict[str, Any]:
"""Generate compliance audit report"""
logs = self._fetch_compliance_logs(start_date, end_date)
report = {
"period": f"{start_date.date()} to {end_date.date()}",
"total_requests": len(logs),
"requests_with_pii": sum(1 for log in logs
if log["pii_detected_in_prompt"] or
log["pii_detected_in_response"]),
"content_flags": sum(1 for log in logs
if log.get("content_moderation_flag")),
"models_used": list(set(log["model"] for log in logs)),
"jurisdictions": list(set(log["jurisdiction"] for log in logs)),
"data_retention_compliance": self._check_retention_compliance(logs)
}
return report
Summary
Monitoring AI systems in production requires a multi-faceted approach that goes far beyond traditional application monitoring. The non-deterministic nature of AI, combined with its potential for subtle failures, high costs, and security risks, demands comprehensive observability across performance, quality, cost, and security dimensions.
Key takeaways:
- Implement end-to-end request logging with PII protection
- Track both traditional metrics (latency, errors) and AI-specific metrics (quality scores, token usage)
- Build intelligent alerting that adapts to AI's variability
- Monitor costs proactively and implement optimization strategies
- Watch for security issues like prompt injection and data leakage
- Use specialized AI observability tools (LangSmith, Helicone, Phoenix) or build custom solutions
- Create comprehensive dashboards that give visibility into all aspects of your AI system
- Prepare incident response procedures specific to AI failures
- Use A/B testing and gradual rollouts to de-risk changes
- Maintain compliance-ready audit trails
The investment in robust monitoring pays dividends through improved reliability, reduced costs, better user experiences, and the confidence to iterate quickly on your AI systems.
Was this guide helpful?
Your feedback helps us improve our guides
Key Terms Used in This Guide
Related Guides
Cost & Latency: Making AI Fast and Affordable
AdvancedOptimize AI systems for speed and cost. Techniques for reducing latency, controlling API costs, and scaling efficiently.
MLOps for LLMs
AdvancedApply MLOps practices to LLMs: versioning, CI/CD, monitoring, incident response, and lifecycle management for production AI.
Context Management: Handling Long Conversations and Documents
IntermediateMaster context window management for AI. Learn strategies for long conversations, document processing, memory systems, and context optimization.