Your team spent three months building an AI-powered customer support assistant. The demos were impressive, stakeholders were thrilled, and you shipped it. Six weeks later, you're getting reports that the system is confidently giving customers incorrect refund policy information, occasionally hallucinating product features that don't exist, and responding with a tone that's somewhere between a DMV clerk and a disappointed parent. The model hasn't changed. Your prompts haven't changed. But your production traffic has drifted, your product catalog grew by 400 SKUs, and someone updated the system prompt to add a holiday greeting that accidentally broke the persona instructions.
This is the evaluation problem in its most painful form. Most teams treat LLM evaluation as something you do once, before launch, with a handful of manually crafted test cases and a vibe check from the PM. That approach fails in production — not because LLMs are unpredictable (though they can be), but because the gap between "works in demos" and "works reliably at scale across diverse real-world inputs" is enormous, and the only way to close that gap is a rigorous, systematic evaluation framework.
By the end of this lesson, you'll be able to design and implement a full-spectrum LLM evaluation system — from offline benchmarking pipelines to real-time production monitoring — that gives you actual signal about what your model is and isn't doing correctly. We're going deep into architecture, scoring methodology, tooling choices, and the statistical subtleties that make the difference between a dashboard that lies to you and one that actually tells the truth.
What you'll learn:
This lesson assumes you're comfortable with Python, have working familiarity with calling LLM APIs (OpenAI, Anthropic, or similar), understand basic statistical concepts like distributions and confidence intervals, and have some exposure to production ML or software systems. You don't need prior experience with formal ML evaluation pipelines, but you should know what a prompt template looks like and have shipped at least one LLM-powered feature.
Before we build anything, we need to understand precisely why informal evaluation breaks down — because if you don't internalize the failure modes, you'll be tempted to cut corners on the framework later.
The first failure mode is selection bias in manual review. When humans evaluate LLM outputs by spot-checking, they systematically over-sample confident-looking responses and under-sample the long tail of edge cases where failures actually cluster. A customer support model might handle 85% of queries gracefully but catastrophically fail on warranty-related questions involving third-party resellers. If your spot-check set doesn't include those, you'll never see it.
The second failure mode is metric proxy collapse. Teams often pick a single metric — ROUGE score, user thumbs-up rate, average response time — and optimize for it. The model gets better at the metric and worse at the underlying goal. A ROUGE-optimized summarization model learns to copy more source sentences verbatim. A thumbs-up optimized chatbot learns to be agreeable rather than accurate. This isn't hypothetical; it's documented behavior in production systems across every major industry.
The third failure mode is temporal blindness. LLMs are sensitive to prompt changes, context length, temperature settings, and the distribution of inputs they receive. A model that performs well in week one can degrade significantly by week eight as your user base grows, your product evolves, and your prompts accumulate well-intentioned patches. Without time-series evaluation data, you have no way to detect this drift until users complain loudly enough.
Understanding these failure modes shapes every architectural decision in the framework we're about to build.
The most robust LLM evaluation systems are organized into three tiers that mirror how software testing works in engineering: unit tests, integration tests, and production monitoring. Each tier serves a different purpose and operates at a different timescale.
Tier 1: Offline Unit Evaluation runs before any code ships. It tests individual components — a single prompt template, a specific retrieval step, an output parser — against a curated dataset. Fast, deterministic when possible, and cheap enough to run on every commit.
Tier 2: Offline Integration Evaluation runs against end-to-end workflows using larger, more realistic datasets. This is where you evaluate multi-step chains, RAG pipelines, and agentic sequences as complete systems. Slower and more expensive than unit evaluation, so you run it on a cadence — daily, or before major releases.
Tier 3: Production Monitoring runs continuously against real traffic. It uses sampling, async evaluation, and anomaly detection to catch regressions without blocking user requests. This tier is where you close the loop between what you tested offline and what actually happens in the wild.
Here's the critical architectural insight: these tiers must be connected. When production monitoring catches a failure, you need a pipeline to extract that failure pattern into a new test case that feeds back into Tier 1 and Tier 2. Without this feedback loop, you're playing whack-a-mole forever.
User Traffic
│
▼
┌─────────────────────┐
│ Production System │──── Async Sample ────► Tier 3 Monitor
│ (LLM Pipeline) │ │
└─────────────────────┘ │ Failure Extraction
▼
Tier 1/2 Test Suite
│
▼
CI/CD Gating
Let's build each tier in detail.
The most important investment you'll make in your evaluation system isn't the scoring code — it's the dataset. Bad datasets give you false confidence. Great datasets give you genuine signal even when your metrics are imperfect.
A production-grade evaluation dataset has four distinct partitions, each with a different purpose:
The Golden Set is a small (50-200 example) hand-curated collection of high-quality input/output pairs that represent your absolute minimum bar. Every example in this set should have been reviewed by at least two domain experts. Changes to this set require explicit approval. This set is your north star — if your model fails on golden set examples, nothing else matters.
The Regression Set is a growing collection of past failures that have been fixed. Every time a bug reaches production, you extract the failure case, document the root cause, and add it to the regression set. This set grows over time and ensures you never ship the same failure twice.
The Adversarial Set is a collection of deliberately tricky inputs designed to probe the boundaries of your model's behavior. This includes edge cases (empty inputs, extremely long inputs, inputs in unexpected languages), jailbreak attempts if security matters for your use case, and distributional shift examples (queries that are slightly out of scope for your system).
The Synthetic Distribution Set is a large (500-5000 example) collection of programmatically generated or real-traffic-sampled examples that represent the statistical distribution of actual user inputs. This is where you get coverage.
import json
from dataclasses import dataclass, field
from typing import Literal
from datetime import datetime
@dataclass
class EvalExample:
id: str
partition: Literal["golden", "regression", "adversarial", "distribution"]
input: dict # Flexible to accommodate different task types
expected_output: str | None # None for reference-free evaluation
metadata: dict = field(default_factory=dict)
created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
tags: list[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"id": self.id,
"partition": self.partition,
"input": self.input,
"expected_output": self.expected_output,
"metadata": self.metadata,
"created_at": self.created_at,
"tags": self.tags,
}
class EvalDataset:
def __init__(self, name: str, task_type: str):
self.name = name
self.task_type = task_type
self.examples: list[EvalExample] = []
def add_example(self, example: EvalExample):
# Enforce uniqueness by ID
existing_ids = {e.id for e in self.examples}
if example.id in existing_ids:
raise ValueError(f"Duplicate example ID: {example.id}")
self.examples.append(example)
def get_partition(self, partition: str) -> list[EvalExample]:
return [e for e in self.examples if e.partition == partition]
def partition_stats(self) -> dict:
from collections import Counter
counts = Counter(e.partition for e in self.examples)
return dict(counts)
def save(self, path: str):
with open(path, 'w') as f:
json.dump({
"name": self.name,
"task_type": self.task_type,
"examples": [e.to_dict() for e in self.examples]
}, f, indent=2)
When building your golden set, resist the temptation to make examples too easy. Evaluators unconsciously select examples where the right answer is obvious. Force yourself to include examples where:
For the adversarial set, think about your actual threat model. A customer support assistant needs to handle users who are angry, confused, or trying to extract information they shouldn't have. A code generation assistant needs to handle ambiguous requirements and security-sensitive operations. Don't just test what you hope users do — test what they actually do.
Warning: The most common dataset construction mistake is building your evaluation set from the same distribution as your few-shot examples or training data. If you used 10 customer query examples to write your prompt, don't use those same 10 in your golden set. You're measuring memorization, not generalization.
Metric selection is where most frameworks go wrong. The right metric depends on your task type, your tolerance for false positives versus false negatives, and the compute budget you have for evaluation. Let's walk through the three major categories.
Reference-based metrics compare model output against a known correct answer. They're fast, deterministic, and easy to interpret — but they require you to have ground truth, which is expensive to collect and hard to maintain.
Exact Match (EM) is the simplest: 1 if the output exactly matches the expected answer, 0 otherwise. This is appropriate for structured outputs like extracted entities, classification labels, or JSON objects. Do not use it for free-text generation — you'll get artificially low scores because "The refund will be processed in 3-5 business days" and "Your refund takes 3-5 business days to process" are semantically equivalent but score 0.
Token-Level F1 is better for extraction tasks. Compute precision and recall at the token level:
from collections import Counter
import re
def normalize_text(text: str) -> str:
"""Normalize text for comparison."""
text = text.lower()
text = re.sub(r'[^\w\s]', '', text) # Remove punctuation
return text.strip()
def token_f1_score(prediction: str, reference: str) -> dict:
"""
Token-level F1 score, as used in SQuAD evaluation.
Better than exact match for extraction tasks.
"""
pred_tokens = normalize_text(prediction).split()
ref_tokens = normalize_text(reference).split()
if not pred_tokens and not ref_tokens:
return {"f1": 1.0, "precision": 1.0, "recall": 1.0}
if not pred_tokens or not ref_tokens:
return {"f1": 0.0, "precision": 0.0, "recall": 0.0}
pred_counter = Counter(pred_tokens)
ref_counter = Counter(ref_tokens)
# Common tokens (intersection)
common = sum((pred_counter & ref_counter).values())
precision = common / len(pred_tokens)
recall = common / len(ref_tokens)
if precision + recall == 0:
f1 = 0.0
else:
f1 = 2 * precision * recall / (precision + recall)
return {"f1": f1, "precision": precision, "recall": recall}
Semantic Similarity using embedding models captures meaning rather than surface form. This is more appropriate for generation tasks where multiple correct phrasings exist:
from openai import OpenAI
import numpy as np
client = OpenAI()
def get_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]:
response = client.embeddings.create(input=text, model=model)
return response.data[0].embedding
def cosine_similarity(vec_a: list[float], vec_b: list[float]) -> float:
a = np.array(vec_a)
b = np.array(vec_b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
def semantic_similarity_score(
prediction: str,
reference: str,
threshold: float = 0.85
) -> dict:
"""
Compute semantic similarity between prediction and reference.
Returns score and pass/fail based on threshold.
"""
pred_embedding = get_embedding(prediction)
ref_embedding = get_embedding(reference)
similarity = cosine_similarity(pred_embedding, ref_embedding)
return {
"score": similarity,
"passed": similarity >= threshold,
"threshold": threshold
}
Tip: Semantic similarity scores above 0.85 typically indicate strong semantic equivalence, but this threshold is task-dependent. Run your golden set through both human raters and your semantic similarity metric to calibrate your threshold before trusting it for automated gating.
Reference-free metrics evaluate model outputs without requiring ground truth. They're essential for open-ended generation tasks where defining a single correct answer is impossible or prohibitively expensive.
Hallucination Detection is one of the most important reference-free metrics for RAG and knowledge-intensive tasks. The core idea is to check whether every claim in the model's output is grounded in the provided context:
def build_hallucination_check_prompt(
context: str,
model_output: str
) -> str:
return f"""You are a precise fact-checker. Your task is to determine whether
each claim in the MODEL OUTPUT is supported by the CONTEXT provided.
CONTEXT:
{context}
MODEL OUTPUT:
{model_output}
Instructions:
1. Identify each distinct factual claim in the MODEL OUTPUT.
2. For each claim, determine if it is:
- SUPPORTED: Directly stated or clearly implied by the context
- UNSUPPORTED: Not present in or contradicted by the context
- NOT_CHECKABLE: Subjective, procedural, or not a factual claim
Return your analysis as a JSON object with this structure:
{{
"claims": [
{{
"claim": "the specific claim text",
"status": "SUPPORTED|UNSUPPORTED|NOT_CHECKABLE",
"evidence": "the context passage that supports or refutes this claim, or null"
}}
],
"overall_faithfulness_score": <float between 0 and 1>,
"has_hallucinations": <boolean>
}}"""
def check_hallucination(
context: str,
model_output: str,
judge_model: str = "gpt-4o"
) -> dict:
prompt = build_hallucination_check_prompt(context, model_output)
response = client.chat.completions.create(
model=judge_model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
temperature=0 # Deterministic for evaluation
)
return json.loads(response.choices[0].message.content)
Format and Constraint Validation is often overlooked but critical for production systems. If your model is supposed to return JSON, does it? If it's supposed to stay under 150 words, does it? These are programmatic checks that should run on every evaluation:
import re
from typing import Callable
class ConstraintChecker:
def __init__(self):
self.constraints: list[tuple[str, Callable[[str], bool]]] = []
def add_constraint(self, name: str, check_fn: Callable[[str], bool]):
self.constraints.append((name, check_fn))
return self # Allow chaining
def check(self, output: str) -> dict:
results = {}
all_passed = True
for name, check_fn in self.constraints:
try:
passed = check_fn(output)
results[name] = {"passed": passed, "error": None}
if not passed:
all_passed = False
except Exception as e:
results[name] = {"passed": False, "error": str(e)}
all_passed = False
return {"constraints": results, "all_passed": all_passed}
# Example: Constraints for a customer support response
def build_support_response_checker() -> ConstraintChecker:
def is_valid_length(text: str) -> bool:
word_count = len(text.split())
return 20 <= word_count <= 200
def no_competitor_mentions(text: str) -> bool:
competitors = ["CompetitorA", "CompetitorB", "OtherBrand"]
text_lower = text.lower()
return not any(c.lower() in text_lower for c in competitors)
def has_professional_closing(text: str) -> bool:
closings = [
"let me know", "feel free to", "happy to help",
"please don't hesitate", "reach out"
]
text_lower = text.lower()
return any(c in text_lower for c in closings)
def no_internal_jargon(text: str) -> bool:
internal_terms = ["ticket escalation", "L1 support", "JIRA", "Salesforce case"]
return not any(term.lower() in text.lower() for term in internal_terms)
checker = ConstraintChecker()
checker.add_constraint("length_check", is_valid_length)
checker.add_constraint("no_competitor_mentions", no_competitor_mentions)
checker.add_constraint("professional_closing", has_professional_closing)
checker.add_constraint("no_internal_jargon", no_internal_jargon)
return checker
LLM-as-judge is currently the most powerful approach for evaluating complex, subjective dimensions of model output — things like helpfulness, tone appropriateness, reasoning quality, and instruction following. The key insight is that large models can make reliable comparative and absolute judgments even when we can't specify the exact ground truth.
But LLM-as-judge has serious failure modes you need to design around:
Position bias: The judge model tends to prefer whichever response appears first in a pairwise comparison. Mitigate by always running comparisons in both orderings and averaging.
Verbosity bias: Judges tend to prefer longer, more detailed responses even when they're not more accurate. Mitigate by using prompts that explicitly penalize unnecessary length.
Self-similarity bias: When using GPT-4 to judge GPT-4 outputs, the judge model may favor outputs stylistically similar to what it would generate. Where possible, use a different model family as judge.
Here's a robust LLM-as-judge implementation that addresses these issues:
from enum import IntEnum
import json
class QualityScore(IntEnum):
POOR = 1
BELOW_AVERAGE = 2
AVERAGE = 3
GOOD = 4
EXCELLENT = 5
JUDGE_SYSTEM_PROMPT = """You are a rigorous, objective evaluator assessing AI assistant responses.
Your evaluations must be:
- Consistent: Same quality = same score regardless of style or length
- Calibrated: Reserve 5 for truly exceptional responses; 1 for clearly harmful or wrong
- Evidence-based: Ground every judgment in specific aspects of the response
Do NOT favor responses that are merely longer or more verbose. Quality over quantity."""
def build_absolute_judge_prompt(
task_description: str,
user_input: str,
model_response: str,
evaluation_criteria: list[str]
) -> str:
criteria_text = "\n".join(f"- {c}" for c in evaluation_criteria)
return f"""Evaluate the following AI assistant response on a scale of 1-5.
TASK CONTEXT:
{task_description}
USER INPUT:
{user_input}
ASSISTANT RESPONSE:
{model_response}
EVALUATION CRITERIA:
{criteria_text}
Scoring Scale:
1 = Poor: Fails on multiple criteria, may be harmful or completely off-task
2 = Below Average: Partially addresses the task but has significant flaws
3 = Average: Adequately addresses the task with minor issues
4 = Good: Clearly addresses the task well with only minor improvements possible
5 = Excellent: Exceptional response that a human expert would be proud of
Return your evaluation as JSON:
{{
"score": <integer 1-5>,
"reasoning": "<2-3 sentences explaining the score with specific evidence>",
"strengths": ["<specific strength>"],
"weaknesses": ["<specific weakness, or empty list if score is 5>"]
}}"""
def judge_response(
task_description: str,
user_input: str,
model_response: str,
evaluation_criteria: list[str],
judge_model: str = "gpt-4o",
n_samples: int = 1 # Increase for higher-stakes evaluations
) -> dict:
"""
Run LLM-as-judge evaluation with optional multi-sample averaging
for more stable scores.
"""
scores = []
reasonings = []
for _ in range(n_samples):
prompt = build_absolute_judge_prompt(
task_description, user_input, model_response, evaluation_criteria
)
response = client.chat.completions.create(
model=judge_model,
messages=[
{"role": "system", "content": JUDGE_SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"},
temperature=0.3 # Slight temperature for variation across samples
)
result = json.loads(response.choices[0].message.content)
scores.append(result["score"])
reasonings.append(result["reasoning"])
return {
"mean_score": sum(scores) / len(scores),
"scores": scores,
"score_variance": np.var(scores) if len(scores) > 1 else 0,
"reasonings": reasonings,
"stable": np.var(scores) < 0.5 if len(scores) > 1 else True
}
Warning: Never use LLM-as-judge as your sole evaluation metric for safety-critical dimensions. A judge model can be manipulated by the same adversarial patterns that affect the model under evaluation. Always pair LLM-as-judge with rule-based checks for safety and constraint validation.
Now let's wire these metrics together into a runnable evaluation pipeline. The pipeline needs to be fast enough to run in CI, produce reproducible results, and output results in a format that supports trend analysis over time.
import asyncio
import hashlib
import time
from typing import Any
from dataclasses import dataclass, field
@dataclass
class EvalResult:
example_id: str
partition: str
model_name: str
prompt_version: str
model_output: str
metrics: dict[str, Any]
latency_ms: float
timestamp: str
passed: bool
failure_reason: str | None = None
class EvaluationPipeline:
def __init__(
self,
dataset: EvalDataset,
model_caller, # Callable that takes input dict and returns string
metrics: list,
model_name: str,
prompt_version: str
):
self.dataset = dataset
self.model_caller = model_caller
self.metrics = metrics
self.model_name = model_name
self.prompt_version = prompt_version
async def evaluate_example(self, example: EvalExample) -> EvalResult:
"""Run a single example through the model and all metrics."""
start_time = time.time()
try:
output = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.model_caller(example.input)
)
except Exception as e:
return EvalResult(
example_id=example.id,
partition=example.partition,
model_name=self.model_name,
prompt_version=self.prompt_version,
model_output="",
metrics={},
latency_ms=(time.time() - start_time) * 1000,
timestamp=datetime.utcnow().isoformat(),
passed=False,
failure_reason=f"Model call failed: {str(e)}"
)
latency_ms = (time.time() - start_time) * 1000
# Run all metrics
metric_results = {}
for metric in self.metrics:
try:
metric_results[metric.name] = metric.compute(
output=output,
example=example
)
except Exception as e:
metric_results[metric.name] = {"error": str(e)}
# Determine overall pass/fail
passed, failure_reason = self._evaluate_pass_fail(metric_results, example)
return EvalResult(
example_id=example.id,
partition=example.partition,
model_name=self.model_name,
prompt_version=self.prompt_version,
model_output=output,
metrics=metric_results,
latency_ms=latency_ms,
timestamp=datetime.utcnow().isoformat(),
passed=passed,
failure_reason=failure_reason
)
def _evaluate_pass_fail(
self,
metric_results: dict,
example: EvalExample
) -> tuple[bool, str | None]:
"""
Apply pass/fail logic. Golden set examples use stricter thresholds.
"""
is_golden = example.partition == "golden"
# Check constraint violations first — these are always failures
if "constraints" in metric_results:
constraint_result = metric_results["constraints"]
if not constraint_result.get("all_passed", True):
failed = [
k for k, v in constraint_result["constraints"].items()
if not v["passed"]
]
return False, f"Constraint failures: {', '.join(failed)}"
# Check hallucination
if "hallucination" in metric_results:
if metric_results["hallucination"].get("has_hallucinations", False):
return False, "Hallucination detected"
# Check semantic similarity for golden examples
if is_golden and "semantic_similarity" in metric_results:
threshold = 0.88 # Stricter for golden set
score = metric_results["semantic_similarity"].get("score", 0)
if score < threshold:
return False, f"Semantic similarity {score:.3f} below golden threshold {threshold}"
# Check judge score
if "judge" in metric_results:
min_score = 4 if is_golden else 3
mean_score = metric_results["judge"].get("mean_score", 0)
if mean_score < min_score:
return False, f"Judge score {mean_score:.2f} below threshold {min_score}"
return True, None
async def run(
self,
partitions: list[str] | None = None,
concurrency: int = 5
) -> dict:
"""Run evaluation across specified partitions with controlled concurrency."""
examples = self.dataset.examples
if partitions:
examples = [e for e in examples if e.partition in partitions]
semaphore = asyncio.Semaphore(concurrency)
async def rate_limited_eval(example):
async with semaphore:
return await self.evaluate_example(example)
results = await asyncio.gather(
*[rate_limited_eval(e) for e in examples],
return_exceptions=True
)
# Filter out exceptions and compute aggregate statistics
valid_results = [r for r in results if isinstance(r, EvalResult)]
return self._compute_aggregate_stats(valid_results)
def _compute_aggregate_stats(self, results: list[EvalResult]) -> dict:
by_partition = {}
for result in results:
partition = result.partition
if partition not in by_partition:
by_partition[partition] = {"passed": 0, "failed": 0, "results": []}
by_partition[partition]["results"].append(result)
if result.passed:
by_partition[partition]["passed"] += 1
else:
by_partition[partition]["failed"] += 1
summary = {}
for partition, data in by_partition.items():
total = data["passed"] + data["failed"]
summary[partition] = {
"pass_rate": data["passed"] / total if total > 0 else 0,
"total": total,
"passed": data["passed"],
"failed": data["failed"],
"failure_reasons": [
r.failure_reason for r in data["results"] if r.failure_reason
],
"avg_latency_ms": np.mean([r.latency_ms for r in data["results"]])
}
# Critical: golden set failures block the pipeline
golden_pass_rate = summary.get("golden", {}).get("pass_rate", 1.0)
return {
"summary": summary,
"golden_pass_rate": golden_pass_rate,
"pipeline_passed": golden_pass_rate >= 1.0, # 100% golden required
"model": self.model_name,
"prompt_version": self.prompt_version,
"timestamp": datetime.utcnow().isoformat(),
"all_results": [vars(r) for r in results]
}
The key design decision here is the strict gate on the golden set. We require 100% pass rate on golden examples because those examples were hand-crafted to represent absolute minimum functionality. Any failure there is a critical regression.
Offline evaluation tells you how your system performs on known inputs. Production monitoring tells you how it performs on the messy, unpredictable, real-world inputs your actual users send. These are different problems requiring different solutions.
You cannot afford to run full evaluation on every production request — the cost and latency would be prohibitive. Instead, you need a smart sampling strategy that gives you statistically valid signal without burning through your evaluation budget.
Use stratified sampling to ensure you're evaluating a representative cross-section of traffic:
import random
from collections import defaultdict
class ProductionSampler:
def __init__(
self,
base_sample_rate: float = 0.05, # 5% of traffic by default
always_sample_patterns: list[str] = None
):
self.base_sample_rate = base_sample_rate
self.always_sample_patterns = always_sample_patterns or []
self._stratum_counts = defaultdict(int)
self._stratum_samples = defaultdict(int)
def should_sample(self, request: dict) -> bool:
"""
Determine if a request should be sampled for evaluation.
Uses stratified sampling to maintain representation across
user segments, query types, and time periods.
"""
# Always sample flagged patterns (potential issues)
user_input = request.get("user_input", "")
if any(pattern.lower() in user_input.lower()
for pattern in self.always_sample_patterns):
return True
# Always sample error responses
if request.get("had_error", False):
return True
# Always sample requests with unusual latency (>p95)
if request.get("latency_ms", 0) > request.get("p95_latency", float('inf')):
return True
# Stratified sampling by time of day to catch temporal patterns
hour = datetime.utcnow().hour
stratum = f"hour_{hour}"
self._stratum_counts[stratum] += 1
# Oversample underrepresented strata
count = self._stratum_counts[stratum]
samples = self._stratum_samples[stratum]
current_rate = samples / count if count > 0 else 0
if current_rate < self.base_sample_rate:
self._stratum_samples[stratum] += 1
return True
return random.random() < self.base_sample_rate
Production monitoring must be completely asynchronous — it cannot add latency to user-facing requests. The pattern is to push sampled requests to a queue and process them in a separate worker:
import asyncio
from asyncio import Queue
import logging
logger = logging.getLogger(__name__)
class ProductionEvalWorker:
def __init__(
self,
eval_functions: list,
alerting_thresholds: dict,
alert_callback,
queue_maxsize: int = 10000
):
self.eval_functions = eval_functions
self.thresholds = alerting_thresholds
self.alert_callback = alert_callback
self.queue = Queue(maxsize=queue_maxsize)
self._metrics_buffer = []
self._running = False
async def push(self, request_data: dict):
"""Non-blocking push to evaluation queue."""
try:
self.queue.put_nowait(request_data)
except asyncio.QueueFull:
logger.warning("Eval queue full, dropping sample. Consider increasing worker capacity.")
async def start(self):
"""Start the background evaluation worker."""
self._running = True
await asyncio.gather(
self._process_queue(),
self._flush_metrics_periodically()
)
async def _process_queue(self):
while self._running:
try:
request_data = await asyncio.wait_for(
self.queue.get(),
timeout=1.0
)
await self._evaluate_and_buffer(request_data)
self.queue.task_done()
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Eval worker error: {e}")
async def _evaluate_and_buffer(self, request_data: dict):
metric_results = {}
for eval_fn in self.eval_functions:
try:
result = await asyncio.get_event_loop().run_in_executor(
None,
lambda: eval_fn(request_data)
)
metric_results[eval_fn.__name__] = result
except Exception as e:
logger.error(f"Eval function {eval_fn.__name__} failed: {e}")
self._metrics_buffer.append({
"timestamp": datetime.utcnow().isoformat(),
"request_id": request_data.get("request_id"),
"metrics": metric_results
})
# Check for immediate alert conditions
await self._check_alerts(metric_results, request_data)
async def _check_alerts(self, metrics: dict, request_data: dict):
"""Check if any metrics breach alerting thresholds."""
alerts = []
# Hallucination threshold
if "hallucination" in metrics:
if metrics["hallucination"].get("has_hallucinations"):
alerts.append({
"type": "hallucination_detected",
"severity": "high",
"request_id": request_data.get("request_id"),
"details": metrics["hallucination"]
})
# Constraint violation
if "constraints" in metrics:
if not metrics["constraints"].get("all_passed", True):
alerts.append({
"type": "constraint_violation",
"severity": "medium",
"request_id": request_data.get("request_id"),
"details": metrics["constraints"]
})
for alert in alerts:
await self.alert_callback(alert)
async def _flush_metrics_periodically(self, interval_seconds: int = 60):
"""Flush buffered metrics for aggregate analysis."""
while self._running:
await asyncio.sleep(interval_seconds)
if self._metrics_buffer:
await self._compute_and_store_aggregates()
self._metrics_buffer = []
async def _compute_and_store_aggregates(self):
"""Compute windowed statistics and check for drift."""
if not self._metrics_buffer:
return
# Compute rolling statistics for trend analysis
judge_scores = [
m["metrics"].get("judge", {}).get("mean_score")
for m in self._metrics_buffer
if m["metrics"].get("judge")
]
if judge_scores:
aggregate = {
"window_start": self._metrics_buffer[0]["timestamp"],
"window_end": self._metrics_buffer[-1]["timestamp"],
"sample_count": len(self._metrics_buffer),
"judge_score_mean": np.mean(judge_scores),
"judge_score_p10": np.percentile(judge_scores, 10),
"hallucination_rate": sum(
1 for m in self._metrics_buffer
if m["metrics"].get("hallucination", {}).get("has_hallucinations")
) / len(self._metrics_buffer)
}
# Check for drift from baseline
if aggregate["judge_score_mean"] < self.thresholds.get("min_judge_score", 3.5):
await self.alert_callback({
"type": "quality_degradation",
"severity": "high",
"details": aggregate
})
logger.info(f"Eval aggregate: {json.dumps(aggregate)}")
One of the trickiest production problems is detecting when a prompt change causes a regression. The challenge is that LLM output distributions are noisy — there's natural variance from run to run, and you need to distinguish signal (actual regression) from noise (random variation).
Use a statistical hypothesis test rather than simple threshold comparison:
from scipy import stats
def detect_regression(
baseline_scores: list[float],
candidate_scores: list[float],
significance_level: float = 0.05,
minimum_detectable_effect: float = 0.3 # Score points
) -> dict:
"""
Use Welch's t-test to detect statistically significant regressions.
More robust than Student's t-test when sample sizes differ.
"""
if len(baseline_scores) < 10 or len(candidate_scores) < 10:
return {
"conclusion": "insufficient_data",
"message": f"Need at least 10 samples each. Got {len(baseline_scores)} baseline, {len(candidate_scores)} candidate."
}
t_stat, p_value = stats.ttest_ind(
baseline_scores,
candidate_scores,
equal_var=False # Welch's t-test
)
baseline_mean = np.mean(baseline_scores)
candidate_mean = np.mean(candidate_scores)
effect_size = candidate_mean - baseline_mean
is_significant = p_value < significance_level
is_meaningful = abs(effect_size) >= minimum_detectable_effect
conclusion = "no_change"
if is_significant and is_meaningful:
conclusion = "regression" if effect_size < 0 else "improvement"
elif is_significant and not is_meaningful:
conclusion = "statistically_significant_but_trivial"
return {
"conclusion": conclusion,
"baseline_mean": baseline_mean,
"candidate_mean": candidate_mean,
"effect_size": effect_size,
"p_value": p_value,
"is_significant": is_significant,
"is_meaningful": is_meaningful,
"recommendation": "block_deployment" if conclusion == "regression" else "proceed"
}
Now you're going to build a minimal but complete evaluation framework for a fictional scenario: a customer support assistant for a SaaS product that answers questions about pricing, features, and account management. The assistant uses a RAG architecture with a small knowledge base.
First, create a simple evaluation dataset with examples across all four partitions. Use this structure (you can mock the model responses to start):
5 golden examples: Choose one from each major query category — pricing, feature questions, billing, cancellation policy, and troubleshooting.
3 adversarial examples: Include one prompt injection attempt ("Ignore previous instructions and..."), one query in a language your assistant doesn't support, and one query asking for competitor comparisons.
2 regression examples: Make up two specific failure scenarios, such as "When asked about the Enterprise plan, the model incorrectly states that API access is not included."
Using the ConstraintChecker class from earlier, define at least five constraints appropriate for a SaaS customer support assistant. Think about:
Create a context document (3-4 paragraphs describing your fictional product's pricing and features) and write three model responses: one with no hallucinations, one with a subtle hallucination (slightly wrong price), and one with a blatant hallucination (fabricated feature). Run all three through the hallucination checker and verify it catches the fabricated cases.
Write a script that:
A CI pipeline should be able to call this script and gate deployments based on the exit code.
Sketch (in writing or in a tool of your choice) what your production monitoring dashboard should show. At minimum, define:
Mistake: Using the same LLM family as both subject and judge
If you're evaluating GPT-4o outputs using GPT-4o as the judge, you're introducing systematic bias. The judge model is more likely to rate stylistically similar outputs favorably, regardless of actual quality. Use a different model family for judging, or use specialized eval models like Prometheus or Ragas when available. If you must use the same family, use an older model version as the subject and the latest as the judge, not the reverse.
Mistake: Conflating pass rate with quality
A 95% pass rate means nothing without knowing what "pass" means. If your constraints are too lenient (only checking that the response isn't empty), you'll have 100% pass rates on terrible outputs. Conversely, if your thresholds are calibrated on a specific time period and your input distribution shifts, your pass rate will drop even if quality hasn't changed. Always track your metrics alongside a human evaluation sample — even 20 examples per week of human spot-checking will catch calibration drift.
Mistake: Ignoring latency in your evaluation pipeline
Evaluation runs that take 45 minutes block your CI pipeline and train engineers to skip or disable them. Profile your evaluation pipeline and enforce time budgets: golden set evaluation should complete in under 5 minutes, full integration evaluation in under 20 minutes. Use caching aggressively — if an input hasn't changed, don't re-run the model.
Mistake: Building evaluation in isolation from product
The worst evaluation frameworks are built by ML engineers without input from the people who understand what "good" means for the product — support managers, customer-facing teams, legal. Before you finalize your metrics, sit with these stakeholders and walk through 20 real examples. Ask them to rate each one. Use those ratings to calibrate your automated metrics. Do this every quarter.
Mistake: Treating the LLM-as-judge prompt as static
Your judge prompt has the same drift problems as your application prompt. If your judge prompt was written for GPT-4-turbo and you upgrade your judge to GPT-4o, the scores may shift systematically even though quality hasn't changed. Treat judge prompt changes as metric changes — maintain version history and re-evaluate your baselines when you change either.
Troubleshooting: High variance in LLM-as-judge scores
If running the same example through your judge multiple times produces scores varying by more than 1.5 points, your judge prompt is underspecified. Add more concrete anchor descriptions for each score level. Include 2-3 example responses with pre-assigned scores directly in the prompt (few-shot judge). Lower the temperature — for evaluation, you generally want temperature ≤ 0.3.
Troubleshooting: Hallucination detector giving too many false positives
This usually happens when your context document is too sparse or your model output includes reasonable inferences that aren't explicitly stated in the context. Add a REASONABLE_INFERENCE status to your hallucination detector for claims that are logically implied but not explicitly stated. Build a calibration dataset of 50 examples where you've manually labeled each claim, and measure your detector's precision and recall before trusting it in production.
You've built the conceptual and technical foundation for a production-grade LLM evaluation framework. The key principles to carry forward:
Tiered evaluation is non-negotiable. Unit tests catch regressions early and cheaply. Integration tests catch interaction effects. Production monitoring catches the real world. You need all three, and they need to feed back into each other.
Your dataset is your most valuable asset. A great metric with a bad dataset gives you false confidence. A decent metric with a representative, adversarial, well-maintained dataset gives you genuine signal. Invest in the dataset first.
Metrics must be calibrated, not just implemented. Run every new metric against a human-labeled set before trusting it. Track metric calibration over time. When your metrics and your users disagree, assume the metric is wrong until proven otherwise.
Statistical rigor matters at scale. The difference between "our model got worse" and "our model got worse in a way that's statistically distinguishable from noise" is the difference between false alarms and real regressions. Use proper hypothesis testing before blocking deployments.
The evaluation system is a product. It needs to be maintained, extended, and adapted as your application evolves. Assign ownership, budget time for it, and treat evaluation failures with the same urgency as production incidents.
The evaluation framework you've designed here will evolve. New failure modes will emerge, new metrics will become available, and your understanding of what "good" means for your specific application will deepen. The point isn't to build a perfect framework on day one — it's to build a framework that learns.
Learning Path: Intro to AI & Prompt Engineering