
You're leading a team tasked with integrating large language models into your organization's data pipeline. The CFO wants cost efficiency, your security team demands on-premises deployment options, your product manager needs multilingual capabilities, and your engineering team is concerned about vendor lock-in. Meanwhile, three distinct LLM ecosystems compete for your attention: OpenAI's polished commercial offerings, Anthropic's safety-focused models, and the rapidly evolving open-source landscape.
This isn't just about picking the "best" model—it's about architecting a decision framework that balances technical performance, operational constraints, and strategic business objectives. The choice you make will influence everything from your monthly cloud bills to your team's development velocity, and potentially even your company's competitive positioning.
By the end of this lesson, you'll have a systematic approach to LLM selection that goes far beyond benchmark scores and marketing claims.
What you'll learn:
Before diving in, you should have:
OpenAI's ecosystem centers around GPT models accessed through their API, with recent additions like GPT-4 Turbo, GPT-4o, and specialized models for specific tasks. Their approach prioritizes ease of use, consistent performance, and rapid feature deployment.
The technical architecture follows a simple pattern: your application makes HTTP requests to OpenAI's endpoints, receives structured responses, and handles the results. This simplicity masks sophisticated infrastructure—OpenAI manages model hosting, scaling, and optimization behind the scenes.
import openai
from openai import OpenAI
import time
class OpenAIEvaluator:
def __init__(self, api_key, model="gpt-4-turbo"):
self.client = OpenAI(api_key=api_key)
self.model = model
self.request_count = 0
self.total_tokens = 0
def evaluate_response_quality(self, prompt, expected_format=None):
"""Evaluate response quality with timing and token tracking."""
start_time = time.time()
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.1, # Lower temperature for consistent evaluation
max_tokens=1000
)
end_time = time.time()
# Track usage metrics
self.request_count += 1
self.total_tokens += response.usage.total_tokens
return {
'response': response.choices[0].message.content,
'latency': end_time - start_time,
'input_tokens': response.usage.prompt_tokens,
'output_tokens': response.usage.completion_tokens,
'total_tokens': response.usage.total_tokens
}
OpenAI's strength lies in consistent performance and comprehensive tooling. Their models typically excel at following complex instructions, maintaining context over long conversations, and generating high-quality text across diverse domains. The API includes advanced features like function calling, JSON mode, and vision capabilities that reduce the engineering overhead for common use cases.
However, this convenience comes with constraints. You have no control over model updates—OpenAI can deprecate versions or change behavior without notice. Data locality is limited to their approved regions, and you're dependent on their infrastructure availability and pricing decisions.
Anthropic's Claude models represent a different philosophical approach, prioritizing AI safety and controllable behavior. Their Constitutional AI training methodology aims to create models that are helpful, harmless, and honest—often resulting in different response characteristics than OpenAI's models.
import anthropic
import json
class AnthropicEvaluator:
def __init__(self, api_key, model="claude-3-5-sonnet-20241022"):
self.client = anthropic.Anthropic(api_key=api_key)
self.model = model
self.request_metrics = []
def evaluate_safety_compliance(self, prompts):
"""Evaluate how models handle edge cases and safety constraints."""
results = []
for prompt in prompts:
try:
start_time = time.time()
message = self.client.messages.create(
model=self.model,
max_tokens=1000,
temperature=0.1,
messages=[{"role": "user", "content": prompt}]
)
end_time = time.time()
result = {
'prompt': prompt,
'response': message.content[0].text,
'latency': end_time - start_time,
'input_tokens': message.usage.input_tokens,
'output_tokens': message.usage.output_tokens,
'refused': False
}
except anthropic.BadRequestError as e:
# Claude refused to respond
result = {
'prompt': prompt,
'response': None,
'error': str(e),
'refused': True,
'latency': None
}
results.append(result)
self.request_metrics.append(result)
return results
Claude models often demonstrate superior performance on tasks requiring nuanced reasoning, ethical considerations, or handling of ambiguous instructions. Their longer context windows (up to 200K tokens for Claude-3) enable novel use cases like analyzing entire codebases or processing lengthy documents.
Anthropic's safety-first approach can be both an advantage and a limitation. While Claude is less likely to generate harmful content, it may also be more conservative in borderline cases, potentially refusing legitimate requests that other models would handle.
The open-source LLM landscape has exploded with options ranging from Meta's Llama series to specialized models like Code Llama, Mistral, and Phi. These models offer unprecedented control but require significantly more operational sophistication.
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import time
import psutil
class OpenSourceEvaluator:
def __init__(self, model_name="microsoft/Phi-3.5-mini-instruct", quantization=True):
self.model_name = model_name
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Configure quantization for memory efficiency
if quantization and torch.cuda.is_available():
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True
)
else:
quantization_config = None
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
quantization_config=quantization_config,
torch_dtype=torch.float16,
trust_remote_code=True
)
if not quantization:
self.model.to(self.device)
def evaluate_with_monitoring(self, prompt, max_tokens=512):
"""Evaluate with detailed resource monitoring."""
# Monitor system resources
process = psutil.Process()
memory_before = process.memory_info().rss / 1024 / 1024 # MB
if torch.cuda.is_available():
torch.cuda.reset_peak_memory_stats()
gpu_memory_before = torch.cuda.memory_allocated() / 1024 / 1024 # MB
start_time = time.time()
# Tokenize and generate
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_tokens,
temperature=0.1,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
# Decode response
response = self.tokenizer.decode(
outputs[0][inputs.input_ids.shape[1]:],
skip_special_tokens=True
)
end_time = time.time()
# Calculate resource usage
memory_after = process.memory_info().rss / 1024 / 1024
memory_delta = memory_after - memory_before
metrics = {
'response': response,
'latency': end_time - start_time,
'cpu_memory_mb': memory_delta,
'input_length': inputs.input_ids.shape[1],
'output_length': len(outputs[0]) - inputs.input_ids.shape[1]
}
if torch.cuda.is_available():
gpu_memory_after = torch.cuda.memory_allocated() / 1024 / 1024
metrics['gpu_memory_peak_mb'] = torch.cuda.max_memory_allocated() / 1024 / 1024
metrics['gpu_memory_delta_mb'] = gpu_memory_after - gpu_memory_before
return metrics
Open-source models provide complete control over the inference pipeline. You can modify model weights, implement custom sampling strategies, run models on your own hardware, and ensure data never leaves your infrastructure. This control comes with responsibility—you must handle model hosting, scaling, monitoring, and updates.
The performance gap between open-source and commercial models continues to narrow. Models like Llama 3.1 405B compete directly with GPT-4 on many benchmarks, while smaller models like Phi-3.5 offer compelling performance per parameter.
Most public benchmarks focus on general capabilities, but your specific use case likely requires different evaluation criteria. Here's a framework for building custom benchmarks:
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, f1_score
import re
class LLMBenchmarkSuite:
def __init__(self):
self.evaluators = {
'openai': None,
'anthropic': None,
'opensource': None
}
self.test_cases = []
self.results = []
def add_test_case(self, prompt, expected_answer=None, evaluation_criteria=None, category="general"):
"""Add a test case with custom evaluation criteria."""
test_case = {
'prompt': prompt,
'expected_answer': expected_answer,
'evaluation_criteria': evaluation_criteria,
'category': category,
'id': len(self.test_cases)
}
self.test_cases.append(test_case)
def evaluate_structured_output(self, response, expected_schema):
"""Evaluate whether response follows expected JSON schema."""
try:
import json
parsed = json.loads(response.strip())
# Check required fields
required_fields = expected_schema.get('required', [])
missing_fields = [field for field in required_fields if field not in parsed]
if missing_fields:
return {
'valid_json': True,
'schema_valid': False,
'missing_fields': missing_fields,
'score': 0.0
}
# Check field types
properties = expected_schema.get('properties', {})
type_errors = []
for field, field_schema in properties.items():
if field in parsed:
expected_type = field_schema.get('type')
actual_value = parsed[field]
if expected_type == 'string' and not isinstance(actual_value, str):
type_errors.append(f"{field}: expected string, got {type(actual_value)}")
elif expected_type == 'number' and not isinstance(actual_value, (int, float)):
type_errors.append(f"{field}: expected number, got {type(actual_value)}")
schema_valid = len(type_errors) == 0
score = 1.0 if schema_valid else 0.5 # Partial credit for valid JSON
return {
'valid_json': True,
'schema_valid': schema_valid,
'type_errors': type_errors,
'score': score
}
except json.JSONDecodeError as e:
return {
'valid_json': False,
'schema_valid': False,
'json_error': str(e),
'score': 0.0
}
def evaluate_factual_accuracy(self, response, expected_facts):
"""Evaluate factual accuracy by checking for specific facts in response."""
response_lower = response.lower()
facts_found = 0
for fact in expected_facts:
# Use fuzzy matching for factual claims
if isinstance(fact, str):
if fact.lower() in response_lower:
facts_found += 1
elif isinstance(fact, dict) and 'pattern' in fact:
# Support regex patterns for flexible matching
if re.search(fact['pattern'], response_lower, re.IGNORECASE):
facts_found += 1
accuracy = facts_found / len(expected_facts) if expected_facts else 1.0
return {
'facts_found': facts_found,
'total_facts': len(expected_facts),
'accuracy': accuracy
}
def run_comprehensive_benchmark(self):
"""Run all test cases against all available evaluators."""
results = []
for test_case in self.test_cases:
case_results = {'test_case_id': test_case['id'], 'category': test_case['category']}
# Test each available evaluator
for provider_name, evaluator in self.evaluators.items():
if evaluator is None:
continue
try:
# Get response from model
if provider_name == 'openai':
result = evaluator.evaluate_response_quality(test_case['prompt'])
response = result['response']
latency = result['latency']
tokens = result['total_tokens']
elif provider_name == 'anthropic':
result = evaluator.evaluate_safety_compliance([test_case['prompt']])[0]
response = result['response'] if not result['refused'] else ""
latency = result['latency']
tokens = result.get('input_tokens', 0) + result.get('output_tokens', 0)
elif provider_name == 'opensource':
result = evaluator.evaluate_with_monitoring(test_case['prompt'])
response = result['response']
latency = result['latency']
tokens = result['input_length'] + result['output_length']
# Apply evaluation criteria
evaluation_score = self._evaluate_response(response, test_case)
case_results[f'{provider_name}_response'] = response
case_results[f'{provider_name}_latency'] = latency
case_results[f'{provider_name}_tokens'] = tokens
case_results[f'{provider_name}_score'] = evaluation_score
except Exception as e:
case_results[f'{provider_name}_error'] = str(e)
case_results[f'{provider_name}_score'] = 0.0
results.append(case_results)
return pd.DataFrame(results)
def _evaluate_response(self, response, test_case):
"""Apply custom evaluation criteria to a response."""
if not test_case['evaluation_criteria']:
return 1.0 # Default score if no criteria specified
criteria = test_case['evaluation_criteria']
if criteria['type'] == 'structured_output':
result = self.evaluate_structured_output(response, criteria['schema'])
return result['score']
elif criteria['type'] == 'factual_accuracy':
result = self.evaluate_factual_accuracy(response, criteria['expected_facts'])
return result['accuracy']
elif criteria['type'] == 'similarity':
# Could integrate with semantic similarity models
from difflib import SequenceMatcher
similarity = SequenceMatcher(None, response.lower(), test_case['expected_answer'].lower()).ratio()
return similarity
return 1.0 # Default fallback
This benchmarking framework allows you to evaluate models on your specific criteria rather than relying solely on public benchmarks. For example, if you're building a financial analysis tool, you might test models on their ability to parse earnings reports, extract key metrics, and maintain consistent formatting.
Context window size directly impacts your application's architecture. Here's how to evaluate and optimize for different context lengths:
class ContextWindowAnalyzer:
def __init__(self, tokenizer):
self.tokenizer = tokenizer
def analyze_token_efficiency(self, texts):
"""Analyze how efficiently different models use their context windows."""
results = []
for text in texts:
tokens = self.tokenizer.encode(text)
# Analyze token distribution
result = {
'text_length': len(text),
'token_count': len(tokens),
'chars_per_token': len(text) / len(tokens),
'efficiency_score': len(text) / len(tokens) / 4.0 # Normalize to ~4 chars/token baseline
}
results.append(result)
return results
def test_context_retention(self, model_evaluator, context_sizes=[1000, 5000, 10000, 20000]):
"""Test how well models maintain context at different lengths."""
results = []
for size in context_sizes:
# Create a context of specified size
context = self._generate_context_with_facts(size)
# Add a question about information from early in the context
question = "What was mentioned about the Q1 revenue figures in the beginning of this document?"
full_prompt = f"{context}\n\nQuestion: {question}"
# Evaluate response
response = model_evaluator.evaluate_response_quality(full_prompt)
# Check if the model correctly retrieved early context
contains_q1_info = self._check_q1_retrieval(response['response'])
results.append({
'context_size': size,
'token_count': len(self.tokenizer.encode(full_prompt)),
'retrieved_early_context': contains_q1_info,
'latency': response['latency'],
'total_tokens': response['total_tokens']
})
return results
def _generate_context_with_facts(self, target_size):
"""Generate a context of specified size with verifiable facts."""
base_content = """
Q1 Financial Results: Revenue increased 23% to $45.2 million, with particular strength
in the enterprise segment showing 31% growth year-over-year.
"""
# Pad with additional content to reach target size
filler_content = "The company continues to focus on operational efficiency and market expansion. " * 100
context = base_content
while len(context) < target_size:
context += filler_content
return context[:target_size]
def _check_q1_retrieval(self, response):
"""Check if response contains Q1 revenue information."""
indicators = ['45.2 million', '23%', 'Q1', 'enterprise segment', '31%']
return any(indicator.lower() in response.lower() for indicator in indicators)
Long context windows enable new architectural patterns. Instead of implementing complex retrieval-augmented generation (RAG) systems, you might simply include all relevant documents in the prompt. However, this approach has cost implications and may suffer from "lost in the middle" problems where models perform poorly on information in the middle of very long contexts.
Pro tip: Test context retention with your actual data patterns. Models may handle structured documents differently than conversational text or code.
LLM costs extend far beyond API fees. Here's a comprehensive cost modeling framework:
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
class LLMCostAnalyzer:
def __init__(self):
# Current pricing (update regularly)
self.pricing = {
'openai': {
'gpt-4-turbo': {'input': 0.01, 'output': 0.03}, # per 1K tokens
'gpt-4o': {'input': 0.005, 'output': 0.015},
'gpt-3.5-turbo': {'input': 0.001, 'output': 0.002}
},
'anthropic': {
'claude-3-5-sonnet': {'input': 0.003, 'output': 0.015},
'claude-3-haiku': {'input': 0.00025, 'output': 0.00125}
},
'opensource': {
# Infrastructure costs vary significantly
'hosting': {
'gpu_hourly': {
'h100': 4.90, # AWS p5.xlarge
'a100': 3.20, # AWS p4d.xlarge
'v100': 1.50 # AWS p3.xlarge
}
}
}
}
def calculate_api_costs(self, usage_data, provider, model):
"""Calculate costs for API-based providers."""
if provider not in self.pricing:
raise ValueError(f"Unknown provider: {provider}")
if model not in self.pricing[provider]:
raise ValueError(f"Unknown model: {model}")
pricing = self.pricing[provider][model]
total_cost = 0
cost_breakdown = []
for usage in usage_data:
input_cost = (usage['input_tokens'] / 1000) * pricing['input']
output_cost = (usage['output_tokens'] / 1000) * pricing['output']
request_cost = input_cost + output_cost
total_cost += request_cost
cost_breakdown.append({
'timestamp': usage.get('timestamp', datetime.now()),
'input_tokens': usage['input_tokens'],
'output_tokens': usage['output_tokens'],
'input_cost': input_cost,
'output_cost': output_cost,
'total_cost': request_cost
})
return {
'total_cost': total_cost,
'breakdown': cost_breakdown,
'average_cost_per_request': total_cost / len(usage_data) if usage_data else 0
}
def calculate_infrastructure_costs(self, hardware_config, usage_hours, utilization_rate=0.7):
"""Calculate costs for self-hosted open source models."""
# Hardware costs
gpu_type = hardware_config.get('gpu_type', 'a100')
gpu_count = hardware_config.get('gpu_count', 1)
if gpu_type not in self.pricing['opensource']['hosting']['gpu_hourly']:
raise ValueError(f"Unknown GPU type: {gpu_type}")
gpu_hourly_cost = self.pricing['opensource']['hosting']['gpu_hourly'][gpu_type]
hardware_cost = gpu_hourly_cost * gpu_count * usage_hours
# Additional infrastructure costs
storage_cost = hardware_config.get('storage_gb', 100) * 0.10 * (usage_hours / 24 / 30) # $0.10/GB/month
network_cost = hardware_config.get('network_gb', 1000) * 0.05 # $0.05/GB transfer
# Operational overhead (monitoring, maintenance, etc.)
operational_overhead = hardware_cost * 0.2 # 20% overhead
# Utilization adjustment
effective_cost = hardware_cost / utilization_rate
return {
'hardware_cost': hardware_cost,
'effective_cost': effective_cost,
'storage_cost': storage_cost,
'network_cost': network_cost,
'operational_overhead': operational_overhead,
'total_cost': effective_cost + storage_cost + network_cost + operational_overhead,
'cost_per_hour': (effective_cost + storage_cost + network_cost + operational_overhead) / usage_hours
}
def project_costs(self, current_usage, growth_scenarios, time_horizon_months=12):
"""Project costs under different growth scenarios."""
projections = {}
for scenario_name, scenario_config in growth_scenarios.items():
monthly_costs = []
current_monthly_cost = scenario_config['base_monthly_cost']
for month in range(time_horizon_months):
# Apply growth rate
growth_factor = (1 + scenario_config['monthly_growth_rate']) ** month
projected_cost = current_monthly_cost * growth_factor
# Apply any scaling discounts (e.g., volume pricing)
if 'volume_discount_tiers' in scenario_config:
for tier in scenario_config['volume_discount_tiers']:
if projected_cost >= tier['minimum']:
projected_cost *= (1 - tier['discount'])
break
monthly_costs.append(projected_cost)
projections[scenario_name] = {
'monthly_costs': monthly_costs,
'total_cost': sum(monthly_costs),
'final_monthly_cost': monthly_costs[-1]
}
return projections
def compare_providers(self, usage_profile, scenarios):
"""Compare total costs across providers for given usage scenarios."""
comparison = {}
for provider_config in scenarios:
provider = provider_config['provider']
if provider in ['openai', 'anthropic']:
costs = self.calculate_api_costs(
usage_profile,
provider,
provider_config['model']
)
# Add hidden costs
hidden_costs = self._calculate_hidden_costs(provider_config)
total_cost = costs['total_cost'] + hidden_costs['total']
elif provider == 'opensource':
costs = self.calculate_infrastructure_costs(
provider_config['hardware'],
provider_config['usage_hours'],
provider_config.get('utilization', 0.7)
)
hidden_costs = self._calculate_hidden_costs(provider_config)
total_cost = costs['total_cost'] + hidden_costs['total']
comparison[provider_config['name']] = {
'direct_costs': costs,
'hidden_costs': hidden_costs,
'total_cost': total_cost,
'cost_per_request': total_cost / len(usage_profile) if usage_profile else 0
}
return comparison
def _calculate_hidden_costs(self, provider_config):
"""Calculate often-overlooked costs."""
hidden_costs = {
'development_time': 0,
'integration_complexity': 0,
'monitoring_tools': 0,
'compliance_overhead': 0,
'vendor_risk': 0
}
provider = provider_config['provider']
if provider == 'openai':
# API-based: lower dev costs but vendor risk
hidden_costs['development_time'] = 2000 # $2K for integration
hidden_costs['vendor_risk'] = 5000 # Risk premium
elif provider == 'anthropic':
# Similar to OpenAI but potentially different API patterns
hidden_costs['development_time'] = 2500
hidden_costs['vendor_risk'] = 3000 # Lower risk premium
elif provider == 'opensource':
# Higher operational complexity
hidden_costs['development_time'] = 15000 # Significant dev investment
hidden_costs['integration_complexity'] = 5000 # Custom infrastructure
hidden_costs['monitoring_tools'] = 1200 # Annual monitoring costs
hidden_costs['compliance_overhead'] = 3000 # Security auditing
hidden_costs['total'] = sum(hidden_costs.values())
return hidden_costs
This cost analysis reveals several insights often missed in simple comparisons:
Open source isn't always cheaper: While you avoid per-token fees, infrastructure and operational costs can exceed API costs for lower-volume use cases.
Utilization matters enormously: If your GPU runs at 30% utilization, your effective costs triple compared to the advertised hourly rates.
Hidden costs are substantial: Development time, integration complexity, and operational overhead often dwarf direct model costs.
class LLMCostOptimizer:
def __init__(self, cost_analyzer):
self.cost_analyzer = cost_analyzer
def optimize_model_selection(self, tasks, performance_requirements):
"""Select most cost-effective models for different task categories."""
optimizations = {}
for task_category, requirements in performance_requirements.items():
candidates = self._get_model_candidates(requirements)
# Benchmark each candidate
best_model = None
best_cost_performance_ratio = float('inf')
for candidate in candidates:
# Simulate costs for this model
cost_per_request = self._estimate_cost_per_request(candidate, task_category)
# Get performance score (you'd implement actual benchmarking)
performance_score = self._get_performance_score(candidate, task_category)
# Calculate cost-performance ratio
if performance_score >= requirements['min_performance']:
ratio = cost_per_request / performance_score
if ratio < best_cost_performance_ratio:
best_cost_performance_ratio = ratio
best_model = candidate
optimizations[task_category] = {
'recommended_model': best_model,
'cost_performance_ratio': best_cost_performance_ratio,
'estimated_savings': self._calculate_savings(task_category, best_model)
}
return optimizations
def implement_request_batching(self, requests, batch_size=10):
"""Optimize costs through request batching where possible."""
if len(requests) <= 1:
return requests
batched_requests = []
current_batch = []
current_batch_tokens = 0
max_batch_tokens = 4000 # Conservative limit
for request in requests:
request_tokens = len(request['prompt'].split()) * 1.3 # Rough estimation
if (current_batch_tokens + request_tokens > max_batch_tokens or
len(current_batch) >= batch_size):
# Process current batch
if current_batch:
batched_requests.append(self._create_batch_request(current_batch))
current_batch = [request]
current_batch_tokens = request_tokens
else:
current_batch.append(request)
current_batch_tokens += request_tokens
# Don't forget the last batch
if current_batch:
batched_requests.append(self._create_batch_request(current_batch))
return batched_requests
def _create_batch_request(self, requests):
"""Combine multiple requests into a single batch request."""
combined_prompt = "Process the following requests:\n\n"
for i, request in enumerate(requests, 1):
combined_prompt += f"Request {i}: {request['prompt']}\n"
combined_prompt += "---\n"
combined_prompt += "\nProvide responses for each request separately, clearly labeled."
return {
'prompt': combined_prompt,
'original_requests': requests,
'type': 'batch'
}
def implement_caching_strategy(self, request_patterns):
"""Design caching strategy based on request patterns."""
cache_analysis = {}
# Analyze request similarity
similar_requests = self._find_similar_requests(request_patterns)
for pattern_group in similar_requests:
cache_hit_rate = len(pattern_group) / len(request_patterns)
if cache_hit_rate > 0.1: # 10% hit rate threshold
potential_savings = self._calculate_cache_savings(pattern_group, cache_hit_rate)
cache_analysis[pattern_group[0]['pattern']] = {
'hit_rate': cache_hit_rate,
'potential_savings': potential_savings,
'recommended_ttl': self._recommend_cache_ttl(pattern_group)
}
return cache_analysis
def _find_similar_requests(self, requests, similarity_threshold=0.8):
"""Group similar requests for caching opportunities."""
from difflib import SequenceMatcher
groups = []
processed = set()
for i, request_a in enumerate(requests):
if i in processed:
continue
similar_group = [request_a]
processed.add(i)
for j, request_b in enumerate(requests[i+1:], i+1):
if j in processed:
continue
similarity = SequenceMatcher(
None,
request_a['prompt'],
request_b['prompt']
).ratio()
if similarity >= similarity_threshold:
similar_group.append(request_b)
processed.add(j)
if len(similar_group) > 1:
groups.append(similar_group)
return groups
Different deployment models have dramatically different security implications:
import hashlib
import json
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any
class DataClassification(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
class DeploymentModel(Enum):
CLOUD_API = "cloud_api"
PRIVATE_CLOUD = "private_cloud"
ON_PREMISE = "on_premise"
HYBRID = "hybrid"
@dataclass
class ComplianceRequirement:
name: str
applies_to: List[DataClassification]
requirements: Dict[str, Any]
deployment_constraints: List[DeploymentModel]
class LLMSecurityAnalyzer:
def __init__(self):
self.compliance_frameworks = {
'gdpr': ComplianceRequirement(
name="GDPR",
applies_to=[DataClassification.CONFIDENTIAL, DataClassification.RESTRICTED],
requirements={
'data_residency': ['EU'],
'encryption_at_rest': True,
'encryption_in_transit': True,
'right_to_erasure': True,
'data_processing_agreement': True
},
deployment_constraints=[DeploymentModel.ON_PREMISE, DeploymentModel.PRIVATE_CLOUD]
),
'hipaa': ComplianceRequirement(
name="HIPAA",
applies_to=[DataClassification.RESTRICTED],
requirements={
'baa_required': True,
'access_logging': True,
'encryption_at_rest': True,
'encryption_in_transit': True,
'audit_trail': True
},
deployment_constraints=[DeploymentModel.ON_PREMISE, DeploymentModel.PRIVATE_CLOUD]
),
'sox': ComplianceRequirement(
name="SOX",
applies_to=[DataClassification.CONFIDENTIAL, DataClassification.RESTRICTED],
requirements={
'audit_trail': True,
'change_management': True,
'access_controls': True,
'data_retention': True
},
deployment_constraints=list(DeploymentModel) # All models can work with proper controls
)
}
def assess_deployment_compliance(self, data_classification, required_frameworks, proposed_deployment):
"""Assess whether a deployment model meets compliance requirements."""
compliance_assessment = {
'compliant': True,
'violations': [],
'recommendations': [],
'risk_score': 0
}
for framework_name in required_frameworks:
if framework_name not in self.compliance_frameworks:
compliance_assessment['violations'].append(f"Unknown compliance framework: {framework_name}")
continue
framework = self.compliance_frameworks[framework_name]
# Check if framework applies to this data classification
if data_classification not in framework.applies_to:
continue
# Check deployment model compatibility
if proposed_deployment not in framework.deployment_constraints:
compliance_assessment['compliant'] = False
compliance_assessment['violations'].append(
f"{framework_name} requires deployment in {framework.deployment_constraints}, "
f"but {proposed_deployment} was proposed"
)
compliance_assessment['risk_score'] += 3
# Check specific requirements
for requirement, value in framework.requirements.items():
violation = self._check_requirement_compliance(
requirement, value, proposed_deployment, data_classification
)
if violation:
compliance_assessment['violations'].append(violation)
compliance_assessment['compliant'] = False
compliance_assessment['risk_score'] += 1
# Generate recommendations
if not compliance_assessment['compliant']:
recommendations = self._generate_compliance_recommendations(
data_classification, required_frameworks, proposed_deployment
)
compliance_assessment['recommendations'] = recommendations
return compliance_assessment
def _check_requirement_compliance(self, requirement, expected_value, deployment, data_classification):
"""Check specific compliance requirement."""
# This would integrate with your actual security controls
# For demo purposes, we'll simulate some checks
if requirement == 'data_residency' and deployment == DeploymentModel.CLOUD_API:
return f"Data residency requirement ({expected_value}) cannot be guaranteed with cloud API deployment"
if requirement == 'baa_required' and deployment == DeploymentModel.CLOUD_API:
# Check if the cloud provider offers BAA
providers_with_baa = ['openai_enterprise', 'anthropic_enterprise']
# This would be configured based on actual provider choice
return "BAA required but standard cloud API may not provide adequate coverage"
if requirement == 'audit_trail' and deployment == DeploymentModel.CLOUD_API:
return "Comprehensive audit trails may be limited with third-party API providers"
return None # No violation
def _generate_compliance_recommendations(self, data_classification, frameworks, deployment):
"""Generate recommendations to achieve compliance."""
recommendations = []
if deployment == DeploymentModel.CLOUD_API:
recommendations.extend([
"Consider upgrading to enterprise tier with cloud provider for enhanced compliance features",
"Implement additional logging and monitoring at the application layer",
"Evaluate private cloud or on-premise deployment for sensitive data",
"Implement data anonymization/pseudonymization before sending to external APIs"
])
elif deployment == DeploymentModel.ON_PREMISE:
recommendations.extend([
"Ensure proper encryption key management",
"Implement comprehensive access logging",
"Establish regular security auditing procedures",
"Document data processing procedures for compliance officers"
])
return recommendations
def implement_data_sanitization(self, text, classification):
"""Sanitize data based on classification level."""
if classification in [DataClassification.CONFIDENTIAL, DataClassification.RESTRICTED]:
# Implement PII detection and masking
sanitized_text = self._mask_pii(text)
# Track what was sanitized for potential reconstruction
sanitization_log = {
'original_hash': hashlib.sha256(text.encode()).hexdigest(),
'sanitized_hash': hashlib.sha256(sanitized_text.encode()).hexdigest(),
'classification': classification.value,
'timestamp': datetime.now().isoformat()
}
return sanitized_text, sanitization_log
return text, None
def _mask_pii(self, text):
"""Basic PII masking implementation."""
import re
# Email addresses
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]', text)
# Phone numbers (US format)
text = re.sub(r'\b\d{3}-\d{3}-\d{4}\b', '[PHONE]', text)
text = re.sub(r'\(\d{3}\)\s*\d{3}-\d{4}', '[PHONE]', text)
# Social Security Numbers
text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', text)
# Credit card numbers (basic pattern)
text = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CARD]', text)
return text
Large organizations need LLM integration patterns that support governance, monitoring, and control:
class EnterpriseMLLMGateway:
"""Enterprise gateway for LLM requests with governance controls."""
def __init__(self, config):
self.config = config
self.audit_logger = AuditLogger()
self.rate_limiter = RateLimiter()
self.data_classifier = DataClassifier()
self.security_analyzer = LLMSecurityAnalyzer()
async def process_request(self, request, user_context):
"""Process LLM request through enterprise controls."""
# Step 1: Authenticate and authorize
auth_result = await self._authenticate_request(request, user_context)
if not auth_result['authorized']:
return self._create_error_response("Unauthorized", 403)
# Step 2: Classify data and check compliance
data_classification = self.data_classifier.classify(request['prompt'])
compliance_check = self._check_compliance(data_classification, user_context)
if not compliance_check['compliant']:
self.audit_logger.log_compliance_violation(request, compliance_check)
return self._create_error_response("Compliance violation", 400)
# Step 3: Apply rate limiting
rate_limit_result = await self.rate_limiter.check_limit(user_context['user_id'])
if rate_limit_result['limited']:
return self._create_error_response("Rate limit exceeded", 429)
# Step 4: Sanitize data if needed
sanitized_prompt, sanitization_log = self.security_analyzer.implement_data_sanitization(
request['prompt'],
data_classification
)
# Step 5: Route to appropriate model
model_config = self._select_model(data_classification, request.get('model_preference'))
# Step 6: Execute request
try:
response = await self._execute_llm_request(sanitized_prompt, model_config)
# Step 7: Post-process response
processed_response = self._post_process_response(response, sanitization_log)
# Step 8: Audit logging
self.audit_logger.log_request({
'user_id': user_context['user_id'],
'classification': data_classification.value,
'model_used': model_config['model'],
'tokens_used': response.get('tokens', 0),
'sanitized': sanitization_log is not None
})
return processed_response
except Exception as e:
self.audit_logger.log_error(request, str(e))
return self._create_error_response("Processing failed", 500)
def _select_model(self, classification, preference=None):
"""Select appropriate model based on data classification and governance rules."""
# High-security data must use on-premise models
if classification in [DataClassification.CONFIDENTIAL, DataClassification.RESTRICTED]:
return {
'provider': 'opensource',
'model': 'llama-3.1-70b-instruct',
'deployment': 'on_premise'
}
# Internal data can use private cloud
elif classification == DataClassification.INTERNAL:
return {
'provider': 'anthropic',
'model': 'claude-3-5-sonnet',
'deployment': 'private_cloud'
}
# Public data can use any provider
else:
preferred_provider = preference or 'openai'
return {
'provider': preferred_provider,
'model': 'gpt-4o',
'deployment': 'cloud_api'
}
Complex applications often benefit from orchestrating multiple models:
import asyncio
from typing import List, Dict, Any
from enum import Enum
class TaskType(Enum):
SUMMARIZATION = "summarization"
CODE_GENERATION = "code_generation"
REASONING = "reasoning"
CREATIVE_WRITING = "creative_writing"
DATA_ANALYSIS = "data_analysis"
class LLMOrchestrator:
"""Orchestrate multiple LLMs for complex tasks."""
def __init__(self):
self.model_registry = {
TaskType.SUMMARIZATION: [
{'provider': 'anthropic', 'model': 'claude-3-haiku', 'speed': 'fast', 'cost': 'low'},
{'provider': 'openai', 'model': 'gpt-4o-mini', 'speed': 'fast', 'cost': 'low'}
],
TaskType.CODE_GENERATION: [
{'provider': 'opensource', 'model': 'code-llama-34b', 'speed': 'medium', 'cost': 'medium'},
{'provider': 'openai', 'model': 'gpt-4o', 'speed': 'fast', 'cost': 'high'}
],
TaskType.REASONING: [
{'provider': 'openai', 'model': 'gpt-4-turbo', 'speed': 'slow', 'cost': 'high'},
{'provider': 'anthropic', 'model': 'claude-3-5-sonnet', 'speed': 'medium', 'cost': 'high'}
]
}
async def execute_complex_task(self, task_description, optimization_target='quality'):
"""Break down complex task and orchestrate multiple models."""
# Step 1: Analyze task and break down into subtasks
subtasks = await self._decompose_task(task_description)
# Step 2: Plan execution strategy
execution_plan = self._create_execution_plan(subtasks, optimization_target)
# Step 3: Execute subtasks (potentially in parallel)
results = await self._execute_plan(execution_plan)
# Step 4: Synthesize final result
final_result = await self._synthesize_results(results, task_description)
return final_result
async def _decompose_task(self, task_description):
"""Use a reasoning model to break down complex tasks."""
decomposition_prompt = f"""
Analyze this task and break it down into specific, actionable subtasks:
Task: {task_description}
For each subtask, specify:
1. The specific action needed
2. The type of task (summarization, reasoning, code_generation, etc.)
3. Dependencies on other subtasks
4. Expected output format
Return as structured JSON.
"""
# Use a strong reasoning model for task decomposition
response = await self._call_model('openai', 'gpt-4-turbo', decomposition_prompt)
try:
import json
subtasks = json.loads(response['content'])
return subtasks
except json.JSONDecodeError:
# Fallback to simple task execution
return [{'task': task_description, 'type': TaskType.REASONING.value}]
def _create_execution_plan(self, subtasks, optimization_target):
"""Create execution plan optimizing for quality, speed, or cost."""
plan = {
'subtasks': [],
'parallel_groups': [],
'total_estimated_cost': 0,
'total_estimated_time': 0
}
for subtask in subtasks:
task_type = TaskType(subtask['type'])
# Select best model for this subtask based on optimization target
best_model = self._select_optimal_model(task_type, optimization_target)
planned_subtask = {
'subtask': subtask,
'model': best_model,
'dependencies': subtask.get('dependencies', [])
}
plan['subtasks'].append(planned_subtask)
# Identify which subtasks can run in parallel
plan['parallel_groups'] = self._identify_parallel_groups(plan['subtasks'])
return plan
def _select_optimal_model(self, task_type, optimization_target):
"""Select the optimal model for a task type and optimization target."""
candidates = self.model_registry.get(task_type, [])
if not candidates:
# Fallback to general-purpose model
return {'provider': 'openai', 'model': 'gpt-4o', 'speed': 'fast', 'cost': 'high'}
if optimization_target == 'cost':
return min(candidates, key=lambda x: {'low': 1, 'medium': 2, 'high': 3}[x['cost']])
elif optimization_target == 'speed':
return min(candidates, key=lambda x: {'fast': 1, 'medium': 2, 'slow': 3}[x['speed']])
else: # quality
return max(candidates, key=lambda x: {'low': 1, 'medium': 2, 'high': 3}[x.get('quality', 'medium')])
async def _execute_plan(self, plan):
"""Execute the planned subtasks with optimal parallelization."""
results = {}
for parallel_group in plan['parallel_groups']:
# Execute all tasks in this group in parallel
group_tasks = []
for subtask_id in parallel_group:
subtask_plan = plan['subtasks'][subtask_id]
task_coroutine = self._execute_subtask(subtask_plan, results)
group_tasks.append(task_coroutine)
# Wait for all tasks in this group to complete
group_results = await asyncio.gather(*group_tasks)
# Update results
for subtask_id, result in zip(parallel_group, group_results):
results[subtask_id] = result
return results
async def _execute_subtask(self, subtask_plan, previous_results):
"""Execute a single subtask."""
subtask = subtask_plan['subtask']
model = subtask_plan['model']
# Build context from dependency results
context = ""
for dep_id in subtask.get('dependencies', []):
if dep_id in previous_results:
context += f"Result from step {dep_id}: {previous_results[dep_id]['content']}\n\n"
# Build final prompt
prompt = f"{context}Task: {subtask['task']}"
# Execute
result = await self._call_model(model['provider'], model['model'], prompt)
return {
'subtask_id': subtask.get('id'),
'content': result['content'],
'model_used': f"{model['provider']}/{model['model']}",
'execution_time': result.get('execution_time', 0)
}
async def _call_model(self, provider, model, prompt):
"""Abstract method to call any model provider."""
# This would integrate with your actual model calling logic
# For now, simulate a response
await asyncio.sleep(0.1) # Simulate API call
return {
'content': f"Simulated response from {provider}/{model} for: {prompt[:50]}...",
'execution_time': 0.1
}
Production LLM systems need robust fallback mechanisms:
import asyncio
import random
from typing import List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class ModelEndpoint:
provider: str
model: str
priority: int # Lower number = higher priority
rate_limit: int # requests per minute
cost_per_token: float
max_tokens: int
availability_sla: float # 0.0 to 1.0
class LLMResilienceManager:
"""Manage fallbacks, retries, and circuit breakers for LLM calls."""
def __init__(self):
self.endpoints = []
self.circuit_breakers = {}
self.request_history = {}
self.rate_limiters = {}
def add_endpoint(self, endpoint: ModelEndpoint):
"""Add a model endpoint to the available pool."""
self.endpoints.append(endpoint)
self.circuit_breakers[f"{endpoint.provider}/{endpoint.model}"] = CircuitBreaker()
# Sort by priority
self.endpoints.sort(key=lambda x: x.priority)
async def resilient_request(self, prompt, requirements=None):
"""Make a request with automatic fallbacks and retry logic."""
requirements = requirements or {}
max_retries = requirements.get('max_retries', 3)
timeout_seconds = requirements.get('timeout', 30)
max_cost = requirements.get('max_cost_per_token', float('inf'))
suitable_endpoints = self._filter_suitable_endpoints(prompt, requirements, max_cost)
if not suitable_endpoints:
raise Exception("No suitable endpoints available for request")
last_error = None
for endpoint in suitable_endpoints:
circuit_breaker = self.circuit_breakers[f"{endpoint.provider}/{endpoint.model}"]
# Skip if circuit breaker is open
if circuit_breaker.is_open():
continue
# Check rate limiting
if self._is_rate_limited(endpoint):
continue
# Attempt request with retries
for attempt in range(max_retries):
try:
result = await asyncio.wait_for(
self._make_request(endpoint, prompt),
timeout=timeout_seconds
)
# Success - update circuit breaker and return
circuit_breaker.record_success()
self._update_rate_limiter(endpoint)
return {
'content': result['content'],
'endpoint_used': f"{endpoint.provider}/{endpoint.model}",
'attempt_number': attempt + 1,
'cost': result.get('tokens', 0) * endpoint.cost_per_token
}
except asyncio.TimeoutError:
last_error = f"Timeout after {timeout_seconds}s"
circuit_breaker.record_failure()
except Exception as e:
last_error = str(e)
circuit_breaker.record_failure()
# Wait before retry (exponential backoff)
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt + random.uniform(0, 1))
# All endpoints failed
raise Exception(f"All endpoints failed. Last error: {last_error}")
def _filter_suitable_endpoints(self, prompt, requirements, max_cost):
"""Filter endpoints based on requirements."""
suitable = []
prompt_tokens = len(prompt.split()) * 1.3 # Rough estimation
min_performance = requirements.get('min_performance_score', 0)
for endpoint in self.endpoints:
# Check token limits
if prompt_tokens > endpoint.max_tokens * 0.8: # Leave room for response
continue
# Check cost constraints
if endpoint.cost_per_token > max_cost:
continue
# Check performance requirements (you'd implement actual scoring)
performance_score = self._get_performance_score(endpoint, requirements)
if performance_score < min_performance:
continue
suitable.append(endpoint)
return suitable
def _is_rate_limited(self, endpoint):
"""Check if endpoint is currently rate limited."""
key = f"{endpoint.provider}/{endpoint.model}"
if key not in self.rate_limiters:
self.rate_limiters[key] = {
'requests': [],
'limit': endpoint.rate_limit
}
rate_limiter = self.rate_limiters[key]
now = datetime.now()
# Remove requests older than 1 minute
rate_limiter['requests'] = [
req_time for req_time in rate_limiter['requests']
if now - req_time < timedelta(minutes=1)
]
return len(rate_limiter['requests']) >= rate_limiter['limit']
def _update_rate_limiter(self, endpoint):
"""Update rate limiter after successful request."""
key = f"{endpoint.provider}/{endpoint.model}"
self.rate_limiters[key]['requests'].append(datetime.now())
async def _make_request(self, endpoint, prompt):
"""Make the actual API request to the endpoint."""
# This would integrate with your actual API calling logic
# Simulate different failure modes for testing
if random.random() < 0.05: # 5% chance of timeout
await asyncio.sleep(35) # Will trigger timeout
if random.random() < 0.02: # 2% chance of API error
raise Exception("API Error: Rate limit exceeded")
# Simulate successful response
await asyncio.sleep(random.uniform(0.5, 2.0)) # Simulate API latency
return {
'content': f"Response from {endpoint.provider}/{endpoint.model}",
'tokens': random.randint(50, 200)
}
def _get_performance_score(self, endpoint, requirements):
"""Get performance score for endpoint (implement with actual benchmarking)."""
# Placeholder - you'd implement actual performance scoring
base_scores = {
('openai', 'gpt-4-turbo'): 0.95,
('anthropic', 'claude-3-5-sonnet'): 0.93,
('opensource', 'llama-3.1-70b'): 0.88
}
return base_scores.get((endpoint.provider, endpoint.model), 0.8)
class CircuitBreaker:
"""Circuit breaker pattern for LLM endpoints."""
def __init__(self, failure_threshold=5, timeout_duration=60):
self.failure_threshold = failure_threshold
self.timeout_duration = timeout_duration # seconds
self.failure_count = 0
self.last_failure_time = None
self.state = 'closed' # closed, open, half-open
def record_success(self):
"""Record a successful request."""
self.failure_count = 0
self.state = 'closed'
def record_failure(self):
"""Record a failed request."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = 'open'
def is_open(self):
"""Check if circuit breaker is open."""
if self.state == 'closed':
return False
if self.state == 'open':
# Check if timeout period has elapsed
if (datetime.now() - self.last_failure_time).seconds >= self.timeout_duration:
self.state = 'half-open'
return False
return True
return False # half-open state allows one request through
Now let's apply these concepts with a realistic scenario: building an LLM selection system for a financial services company.
Scenario: You're architecting an LLM system for a mid-size investment firm. The system needs to handle:
Exercise Steps:
# Define your requirements framework
requirements = {
'client_communication': {
'data_classification': DataClassification.CONFIDENTIAL,
'compliance_frameworks': ['sox', 'finra'],
'latency_requirements': 'medium', # < 5 seconds
'quality_requirements': 'high',
'cost_sensitivity': 'medium',
'volume': 500 # requests per day
},
'research_analysis': {
'data_classification': DataClassification.PUBLIC,
'compliance_frameworks': [],
'latency_requirements': 'low', # < 10 seconds acceptable
'quality_requirements': 'very_high',
'cost_sensitivity': 'high',
'volume': 2000 # requests per day
},
'code_assistance': {
'data_classification': DataClassification.CONFIDENTIAL,
'compliance_frameworks': ['sox'],
'latency_requirements': 'high', # < 2 seconds
'quality_requirements': 'high',
'cost_sensitivity': 'low',
'volume': 300 # requests per day
},
'compliance_monitoring': {
'data_classification': DataClassification.RESTRICTED,
'compliance_frameworks': ['sox', 'finra'],
'latency_requirements': 'low',
'quality_requirements': 'very_high',
'cost_sensitivity': 'very_low',
'volume': 1000 # requests per day
}
}
# Create evaluation framework
evaluator = LLMBenchmarkSuite()
cost_analyzer = LLMCostAnalyzer()
security_analyzer = LLMSecurityAnalyzer()
# Test cases for each use case
test_cases = {
'client_communication': [
{
'prompt': 'Draft a professional email to a client explaining why their portfolio underperformed the market this quarter.',
'evaluation_criteria': {
'type': 'professional_tone',
'required_elements': ['acknowledgment', 'explanation', 'next_steps']
}
}
],
'research_analysis': [
{
'prompt': 'Summarize the key points from Apple\'s Q3 2024 earnings report
Learning Path: Building with LLMs