
You're building an AI-powered customer support chatbot that can answer questions about your company's products. The model works perfectly on your laptop, but now you need to make it available to thousands of users simultaneously. How do you transform your local Python script into a robust, scalable service that can handle real-world traffic without breaking your budget or your sanity?
This is the challenge every data professional faces when moving from experimentation to production with Large Language Models (LLMs). Unlike traditional machine learning models that make quick predictions, LLMs are resource-intensive services that require careful architectural planning. You need to think about API design, infrastructure scaling, cost management, and reliability—all while keeping your service fast and responsive.
In this lesson, we'll build a complete LLM application deployment from scratch. You'll learn how to design clean APIs, choose the right infrastructure, handle concurrent requests, and monitor your system in production.
What you'll learn:
You should have basic familiarity with:
No prior experience with deployment or infrastructure is required—we'll cover everything from the ground up.
Before diving into solutions, let's understand what makes LLM deployment unique. Traditional web applications serve static content or perform quick database lookups. LLM applications are fundamentally different beasts.
When a user asks your customer support bot "How do I return a product?", here's what happens behind the scenes:
Each step introduces potential bottlenecks. The LLM inference is computationally expensive, memory-intensive, and time-consuming. Unlike a database query that returns in milliseconds, LLM responses take seconds. This means you need to think carefully about:
Let's build a system that addresses each of these challenges.
A well-designed API is the foundation of any successful LLM application. Your API design affects everything from user experience to scaling costs. Let's build a customer support API that demonstrates best practices.
Start with a clear, intuitive endpoint structure:
from flask import Flask, request, jsonify, stream_template
from datetime import datetime
import uuid
import json
app = Flask(__name__)
@app.route('/api/v1/chat/completions', methods=['POST'])
def chat_completion():
"""
Handle chat completion requests
Expected payload:
{
"messages": [
{"role": "user", "content": "How do I return a product?"}
],
"model": "gpt-3.5-turbo",
"max_tokens": 150,
"temperature": 0.7,
"stream": false
}
"""
try:
data = request.json
# Validate required fields
if not data or 'messages' not in data:
return jsonify({
'error': 'Missing required field: messages',
'code': 'INVALID_REQUEST'
}), 400
# Generate unique request ID for tracking
request_id = str(uuid.uuid4())
# Extract parameters with defaults
messages = data['messages']
model = data.get('model', 'gpt-3.5-turbo')
max_tokens = data.get('max_tokens', 150)
temperature = data.get('temperature', 0.7)
stream = data.get('stream', False)
# Process the request
if stream:
return handle_streaming_response(
messages, model, max_tokens, temperature, request_id
)
else:
return handle_complete_response(
messages, model, max_tokens, temperature, request_id
)
except Exception as e:
return jsonify({
'error': f'Internal server error: {str(e)}',
'code': 'INTERNAL_ERROR'
}), 500
@app.route('/api/v1/health', methods=['GET'])
def health_check():
"""Simple health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'version': '1.0.0'
})
@app.route('/api/v1/models', methods=['GET'])
def list_models():
"""List available models"""
return jsonify({
'models': [
{
'id': 'gpt-3.5-turbo',
'name': 'GPT-3.5 Turbo',
'description': 'Fast, cost-effective model for most tasks',
'max_tokens': 4096
},
{
'id': 'gpt-4',
'name': 'GPT-4',
'description': 'Most capable model for complex reasoning',
'max_tokens': 8192
}
]
})
This API structure follows OpenAI's format, making it familiar to developers while adding essential production features:
Now let's implement the response handlers. The key decision here is whether to stream responses or return them complete:
import openai
import time
from flask import Response
def handle_complete_response(messages, model, max_tokens, temperature, request_id):
"""Handle non-streaming responses"""
start_time = time.time()
try:
# Call your LLM (this example uses OpenAI, but adapt for your model)
response = openai.ChatCompletion.create(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature
)
processing_time = time.time() - start_time
# Structure response in OpenAI format
return jsonify({
'id': f'chatcmpl-{request_id}',
'object': 'chat.completion',
'created': int(time.time()),
'model': model,
'choices': [{
'index': 0,
'message': {
'role': 'assistant',
'content': response.choices[0].message.content
},
'finish_reason': response.choices[0].finish_reason
}],
'usage': {
'prompt_tokens': response.usage.prompt_tokens,
'completion_tokens': response.usage.completion_tokens,
'total_tokens': response.usage.total_tokens,
'processing_time_seconds': round(processing_time, 2)
}
})
except openai.error.RateLimitError:
return jsonify({
'error': 'Rate limit exceeded. Please try again later.',
'code': 'RATE_LIMIT_EXCEEDED'
}), 429
except openai.error.InvalidRequestError as e:
return jsonify({
'error': f'Invalid request: {str(e)}',
'code': 'INVALID_REQUEST'
}), 400
except Exception as e:
return jsonify({
'error': 'Model temporarily unavailable',
'code': 'MODEL_UNAVAILABLE'
}), 503
def handle_streaming_response(messages, model, max_tokens, temperature, request_id):
"""Handle streaming responses"""
def generate_stream():
try:
stream = openai.ChatCompletion.create(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.get('content'):
# Format as Server-Sent Events
data = {
'id': f'chatcmpl-{request_id}',
'object': 'chat.completion.chunk',
'created': int(time.time()),
'model': model,
'choices': [{
'index': 0,
'delta': {
'content': chunk.choices[0].delta.content
},
'finish_reason': chunk.choices[0].finish_reason
}]
}
yield f"data: {json.dumps(data)}\n\n"
# Send final message
yield "data: [DONE]\n\n"
except Exception as e:
error_data = {
'error': 'Stream interrupted',
'code': 'STREAM_ERROR'
}
yield f"data: {json.dumps(error_data)}\n\n"
return Response(
generate_stream(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*'
}
)
Choose streaming for:
Choose complete responses for:
Tip: Start with complete responses for simplicity, then add streaming if user experience demands it. Streaming adds significant complexity to both server and client code.
Now that we have an API, we need infrastructure to run it reliably at scale. LLM applications have unique infrastructure requirements that differ from typical web applications.
For small applications (under 100 concurrent users), start with a simple single-server deployment:
# docker-compose.yml
version: '3.8'
services:
llm-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- FLASK_ENV=production
- WORKERS=4
volumes:
- ./logs:/app/logs
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/ssl/certs
depends_on:
- llm-api
restart: unless-stopped
redis:
image: redis:alpine
ports:
- "6379:6379"
restart: unless-stopped
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd --create-home --shell /bin/bash app
RUN chown -R app:app /app
USER app
# Use gunicorn for production
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "--timeout", "120", "app:app"]
This setup includes:
When you outgrow a single server, move to a load-balanced architecture:
# Load balancer health check endpoint
@app.route('/api/v1/ready', methods=['GET'])
def readiness_check():
"""
Readiness check for load balancer
Returns 200 only if the service can handle requests
"""
try:
# Check if we can reach the LLM service
start_time = time.time()
# Quick test request (adapt this for your LLM provider)
test_response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hi"}],
max_tokens=1
)
response_time = time.time() - start_time
# Fail health check if response is too slow
if response_time > 10: # 10 second threshold
return jsonify({
'status': 'not_ready',
'reason': 'slow_response',
'response_time': response_time
}), 503
return jsonify({
'status': 'ready',
'response_time': response_time,
'timestamp': datetime.utcnow().isoformat()
})
except Exception as e:
return jsonify({
'status': 'not_ready',
'reason': str(e),
'timestamp': datetime.utcnow().isoformat()
}), 503
Configure your load balancer (AWS ALB, Google Cloud Load Balancer, or nginx) to:
/api/v1/ready endpoint every 30 secondsFor large-scale applications, consider separating concerns into microservices:
# Gateway service - handles routing and authentication
from flask import Flask, request, jsonify
import requests
import jwt
gateway_app = Flask(__name__)
@gateway_app.route('/api/v1/chat/completions', methods=['POST'])
def route_chat_completion():
"""Route requests to appropriate LLM service"""
# Authenticate request
auth_header = request.headers.get('Authorization')
if not auth_header or not validate_api_key(auth_header):
return jsonify({'error': 'Invalid API key'}), 401
# Extract model preference
data = request.json
model = data.get('model', 'gpt-3.5-turbo')
# Route to appropriate service
if model.startswith('gpt-4'):
service_url = 'http://gpt4-service:8000'
else:
service_url = 'http://gpt35-service:8000'
# Forward request
response = requests.post(
f'{service_url}/api/v1/chat/completions',
json=data,
timeout=120
)
return response.json(), response.status_code
def validate_api_key(auth_header):
"""Validate API key against your authentication system"""
try:
token = auth_header.split(' ')[1] # Bearer <token>
# Implement your authentication logic here
return True
except:
return False
This pattern allows you to:
LLMs can't handle unlimited concurrent requests like traditional APIs. A single GPU might handle only 4-8 simultaneous inference requests efficiently. Beyond that, you need queuing.
import redis
import json
import time
from threading import Thread
import queue
class LLMRequestQueue:
def __init__(self, redis_url='redis://localhost:6379', max_workers=4):
self.redis_client = redis.from_url(redis_url)
self.max_workers = max_workers
self.worker_queue = queue.Queue(maxsize=max_workers * 2)
self.start_workers()
def start_workers(self):
"""Start worker threads to process requests"""
for i in range(self.max_workers):
worker = Thread(target=self.worker_loop, daemon=True)
worker.start()
def worker_loop(self):
"""Worker thread that processes queued requests"""
while True:
try:
request_data = self.worker_queue.get(timeout=1)
self.process_request(request_data)
self.worker_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Worker error: {e}")
def enqueue_request(self, request_id, messages, model, max_tokens, temperature):
"""Add request to queue"""
request_data = {
'id': request_id,
'messages': messages,
'model': model,
'max_tokens': max_tokens,
'temperature': temperature,
'queued_at': time.time()
}
# Store request details in Redis
self.redis_client.setex(
f"request:{request_id}",
300, # 5 minute expiry
json.dumps(request_data)
)
# Add to processing queue
try:
self.worker_queue.put(request_data, block=False)
return True
except queue.Full:
return False # Queue is full
def process_request(self, request_data):
"""Process a single request"""
request_id = request_data['id']
try:
# Update status to processing
self.redis_client.setex(
f"status:{request_id}",
300,
json.dumps({
'status': 'processing',
'started_at': time.time()
})
)
# Make LLM call
response = openai.ChatCompletion.create(
model=request_data['model'],
messages=request_data['messages'],
max_tokens=request_data['max_tokens'],
temperature=request_data['temperature']
)
# Store result
result = {
'status': 'completed',
'response': response.to_dict(),
'completed_at': time.time()
}
self.redis_client.setex(
f"result:{request_id}",
300,
json.dumps(result)
)
except Exception as e:
# Store error
error_result = {
'status': 'error',
'error': str(e),
'failed_at': time.time()
}
self.redis_client.setex(
f"result:{request_id}",
300,
json.dumps(error_result)
)
def get_request_status(self, request_id):
"""Get current status of a request"""
result = self.redis_client.get(f"result:{request_id}")
if result:
return json.loads(result)
status = self.redis_client.get(f"status:{request_id}")
if status:
return json.loads(status)
# Check if request exists
request_data = self.redis_client.get(f"request:{request_id}")
if request_data:
return {'status': 'queued'}
return {'status': 'not_found'}
# Initialize global queue
request_queue = LLMRequestQueue()
@app.route('/api/v1/chat/async', methods=['POST'])
def async_chat_completion():
"""Submit request for async processing"""
try:
data = request.json
request_id = str(uuid.uuid4())
success = request_queue.enqueue_request(
request_id,
data['messages'],
data.get('model', 'gpt-3.5-turbo'),
data.get('max_tokens', 150),
data.get('temperature', 0.7)
)
if success:
return jsonify({
'request_id': request_id,
'status': 'queued',
'status_url': f'/api/v1/requests/{request_id}'
}), 202
else:
return jsonify({
'error': 'Server too busy. Please try again later.',
'code': 'QUEUE_FULL'
}), 503
except Exception as e:
return jsonify({
'error': f'Failed to queue request: {str(e)}',
'code': 'QUEUE_ERROR'
}), 500
@app.route('/api/v1/requests/<request_id>', methods=['GET'])
def get_request_status(request_id):
"""Check status of async request"""
status = request_queue.get_request_status(request_id)
if status['status'] == 'not_found':
return jsonify({'error': 'Request not found'}), 404
return jsonify(status)
This queuing system provides:
Monitor your queue health with these metrics:
@app.route('/api/v1/queue/metrics', methods=['GET'])
def queue_metrics():
"""Expose queue metrics for monitoring"""
# Count requests by status
queue_size = request_queue.worker_queue.qsize()
# Get Redis stats
redis_info = request_queue.redis_client.info()
# Count active requests
active_requests = len([
key for key in request_queue.redis_client.scan_iter("status:*")
])
return jsonify({
'queue_size': queue_size,
'max_queue_size': request_queue.worker_queue.maxsize,
'active_workers': request_queue.max_workers,
'active_requests': active_requests,
'redis_memory_usage': redis_info.get('used_memory_human', 'unknown'),
'redis_connected_clients': redis_info.get('connected_clients', 0)
})
Set up alerts when:
LLM inference is expensive. A single GPT-4 conversation can cost $0.10 or more. Without proper cost controls, your bill can spiral out of control quickly.
import sqlite3
from datetime import datetime, timedelta
class UsageTracker:
def __init__(self, db_path='usage.db'):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize usage tracking database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id TEXT NOT NULL,
user_id TEXT,
model TEXT NOT NULL,
prompt_tokens INTEGER NOT NULL,
completion_tokens INTEGER NOT NULL,
total_tokens INTEGER NOT NULL,
cost_usd REAL NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS rate_limits (
user_id TEXT PRIMARY KEY,
daily_requests INTEGER DEFAULT 0,
daily_tokens INTEGER DEFAULT 0,
last_reset DATE DEFAULT CURRENT_DATE
)
''')
conn.commit()
conn.close()
def track_usage(self, request_id, user_id, model, prompt_tokens,
completion_tokens, cost_usd):
"""Record usage for a request"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO usage
(request_id, user_id, model, prompt_tokens, completion_tokens,
total_tokens, cost_usd)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
request_id, user_id, model, prompt_tokens, completion_tokens,
prompt_tokens + completion_tokens, cost_usd
))
conn.commit()
conn.close()
def check_rate_limit(self, user_id, max_daily_requests=100, max_daily_tokens=50000):
"""Check if user is within rate limits"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get or create user rate limit record
cursor.execute('''
INSERT OR IGNORE INTO rate_limits (user_id) VALUES (?)
''', (user_id,))
cursor.execute('''
SELECT daily_requests, daily_tokens, last_reset
FROM rate_limits WHERE user_id = ?
''', (user_id,))
row = cursor.fetchone()
daily_requests, daily_tokens, last_reset = row
# Reset counters if it's a new day
today = datetime.now().date()
if last_reset != today.isoformat():
cursor.execute('''
UPDATE rate_limits
SET daily_requests = 0, daily_tokens = 0, last_reset = ?
WHERE user_id = ?
''', (today.isoformat(), user_id))
daily_requests = 0
daily_tokens = 0
conn.commit()
conn.close()
# Check limits
if daily_requests >= max_daily_requests:
return False, "Daily request limit exceeded"
if daily_tokens >= max_daily_tokens:
return False, "Daily token limit exceeded"
return True, None
def update_rate_limit(self, user_id, tokens_used):
"""Update user's rate limit counters"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
UPDATE rate_limits
SET daily_requests = daily_requests + 1,
daily_tokens = daily_tokens + ?
WHERE user_id = ?
''', (tokens_used, user_id))
conn.commit()
conn.close()
# Initialize tracker
usage_tracker = UsageTracker()
# Updated completion handler with usage tracking
def handle_complete_response_with_tracking(messages, model, max_tokens,
temperature, request_id, user_id):
"""Handle response with usage tracking and rate limiting"""
# Check rate limits first
allowed, reason = usage_tracker.check_rate_limit(user_id)
if not allowed:
return jsonify({
'error': reason,
'code': 'RATE_LIMIT_EXCEEDED'
}), 429
try:
response = openai.ChatCompletion.create(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature
)
# Calculate cost (example pricing)
cost_per_1k_tokens = {
'gpt-3.5-turbo': 0.002,
'gpt-4': 0.03,
'gpt-4-32k': 0.06
}
base_cost = cost_per_1k_tokens.get(model, 0.002)
total_cost = (response.usage.total_tokens / 1000) * base_cost
# Track usage
usage_tracker.track_usage(
request_id, user_id, model,
response.usage.prompt_tokens,
response.usage.completion_tokens,
total_cost
)
# Update rate limits
usage_tracker.update_rate_limit(user_id, response.usage.total_tokens)
return jsonify({
'id': f'chatcmpl-{request_id}',
'object': 'chat.completion',
'created': int(time.time()),
'model': model,
'choices': [{
'index': 0,
'message': {
'role': 'assistant',
'content': response.choices[0].message.content
},
'finish_reason': response.choices[0].finish_reason
}],
'usage': {
'prompt_tokens': response.usage.prompt_tokens,
'completion_tokens': response.usage.completion_tokens,
'total_tokens': response.usage.total_tokens,
'estimated_cost_usd': round(total_cost, 4)
}
})
except Exception as e:
return jsonify({
'error': str(e),
'code': 'MODEL_ERROR'
}), 500
1. Model Selection Based on Task Complexity
def select_optimal_model(messages, user_tier='basic'):
"""Select the most cost-effective model for the task"""
# Analyze conversation complexity
total_length = sum(len(msg['content']) for msg in messages)
has_code = any('def ' in msg['content'] or 'import ' in msg['content']
for msg in messages)
# Simple heuristics for model selection
if user_tier == 'premium':
if has_code or total_length > 1000:
return 'gpt-4'
else:
return 'gpt-3.5-turbo'
else:
# Basic tier always uses cheapest model
return 'gpt-3.5-turbo'
# Use in your endpoint
@app.route('/api/v1/chat/smart', methods=['POST'])
def smart_chat_completion():
"""Automatically select optimal model"""
data = request.json
user_tier = get_user_tier(request.headers.get('Authorization'))
optimal_model = select_optimal_model(data['messages'], user_tier)
data['model'] = optimal_model
return handle_complete_response_with_tracking(
data['messages'], optimal_model,
data.get('max_tokens', 150),
data.get('temperature', 0.7),
str(uuid.uuid4()),
get_user_id_from_auth(request.headers.get('Authorization'))
)
2. Response Caching
import hashlib
class ResponseCache:
def __init__(self, redis_client, ttl_seconds=3600):
self.redis = redis_client
self.ttl = ttl_seconds
def get_cache_key(self, messages, model, temperature):
"""Generate cache key from request parameters"""
# Create deterministic hash of request
content = json.dumps({
'messages': messages,
'model': model,
'temperature': temperature
}, sort_keys=True)
return f"cache:{hashlib.md5(content.encode()).hexdigest()}"
def get_cached_response(self, messages, model, temperature):
"""Get cached response if available"""
cache_key = self.get_cache_key(messages, model, temperature)
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
return None
def cache_response(self, messages, model, temperature, response):
"""Cache a response"""
# Only cache responses with low temperature (deterministic)
if temperature <= 0.3:
cache_key = self.get_cache_key(messages, model, temperature)
self.redis.setex(cache_key, self.ttl, json.dumps(response))
# Use caching in your handler
response_cache = ResponseCache(redis.from_url('redis://localhost:6379'))
def handle_cached_response(messages, model, max_tokens, temperature, request_id, user_id):
"""Handle response with caching"""
# Check cache first
cached_response = response_cache.get_cached_response(messages, model, temperature)
if cached_response:
# Still track usage for billing, but mark as cached
usage_tracker.track_usage(
request_id, user_id, f"{model}-cached",
0, 0, 0 # No cost for cached responses
)
cached_response['id'] = f'chatcmpl-{request_id}'
cached_response['cached'] = True
return jsonify(cached_response)
# Not cached, make real request
response = handle_complete_response_with_tracking(
messages, model, max_tokens, temperature, request_id, user_id
)
# Cache the response
if response.status_code == 200:
response_cache.cache_response(messages, model, temperature, response.json)
return response
Proper monitoring is crucial for LLM applications. You need visibility into performance, costs, errors, and usage patterns.
import logging
import structlog
from datetime import datetime
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
@app.before_request
def log_request_start():
"""Log incoming requests"""
logger.info(
"request_started",
method=request.method,
path=request.path,
user_agent=request.headers.get('User-Agent'),
client_ip=request.remote_addr,
request_id=getattr(request, 'id', None)
)
@app.after_request
def log_request_end(response):
"""Log request completion"""
logger.info(
"request_completed",
method=request.method,
path=request.path,
status_code=response.status_code,
response_size=len(response.get_data()),
request_id=getattr(request, 'id', None)
)
return response
def log_llm_request(request_id, model, messages, response_time, tokens_used, cost):
"""Log LLM-specific metrics"""
logger.info(
"llm_request_completed",
request_id=request_id,
model=model,
message_count=len(messages),
response_time_seconds=response_time,
tokens_used=tokens_used,
cost_usd=cost,
timestamp=datetime.utcnow().isoformat()
)
# Health check with detailed status
@app.route('/api/v1/health/detailed', methods=['GET'])
def detailed_health_check():
"""Comprehensive health check"""
health_status = {
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'version': '1.0.0',
'checks': {}
}
# Check Redis connectivity
try:
redis_client.ping()
health_status['checks']['redis'] = 'healthy'
except:
health_status['checks']['redis'] = 'unhealthy'
health_status['status'] = 'degraded'
# Check LLM provider
try:
start_time = time.time()
openai.Model.list()
provider_time = time.time() - start_time
health_status['checks']['llm_provider'] = {
'status': 'healthy',
'response_time_seconds': round(provider_time, 2)
}
except:
health_status['checks']['llm_provider'] = 'unhealthy'
health_status['status'] = 'unhealthy'
# Check database
try:
conn = sqlite3.connect(usage_tracker.db_path)
cursor = conn.cursor()
cursor.execute('SELECT 1')
conn.close()
health_status['checks']['database'] = 'healthy'
except:
health_status['checks']['database'] = 'unhealthy'
health_status['status'] = 'degraded'
# Check queue status
queue_size = request_queue.worker_queue.qsize()
max_queue = request_queue.worker_queue.maxsize
health_status['checks']['queue'] = {
'status': 'healthy' if queue_size < max_queue * 0.8 else 'degraded',
'current_size': queue_size,
'max_size': max_queue,
'utilization_percent': round((queue_size / max_queue) * 100, 1)
}
status_code = 200 if health_status['status'] == 'healthy' else 503
return jsonify(health_status), status_code
from prometheus_client import Counter, Histogram, Gauge, generate_latest
# Define metrics
request_count = Counter('llm_requests_total', 'Total LLM requests', ['model', 'status'])
request_duration = Histogram('llm_request_duration_seconds', 'LLM request duration', ['model'])
tokens_used = Histogram('llm_tokens_used_total', 'Tokens used per request', ['model'])
cost_per_request = Histogram('llm_cost_per_request_usd', 'Cost per request', ['model'])
queue_size = Gauge('llm_queue_size', 'Current queue size')
active_requests = Gauge('llm_active_requests', 'Currently processing requests')
def record_metrics(model, status, duration, tokens, cost):
"""Record metrics for monitoring"""
request_count.labels(model=model, status=status).inc()
request_duration.labels(model=model).observe(duration)
tokens_used.labels(model=model).observe(tokens)
cost_per_request.labels(model=model).observe(cost)
@app.route('/metrics')
def metrics():
"""Prometheus metrics endpoint"""
# Update current queue size
queue_size.set(request_queue.worker_queue.qsize())
return generate_latest()
# Update your response handler to record metrics
def handle_complete_response_with_metrics(messages, model, max_tokens, temperature, request_id, user_id):
"""Handle response with full monitoring"""
start_time = time.time()
active_requests.inc()
try:
response = openai.ChatCompletion.create(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature
)
duration = time.time() - start_time
tokens = response.usage.total_tokens
cost = calculate_cost(model, tokens)
# Record metrics
record_metrics(model, 'success', duration, tokens, cost)
# Log request
log_llm_request(request_id, model, messages, duration, tokens, cost)
return jsonify({
'id': f'chatcmpl-{request_id}',
'object': 'chat.completion',
'created': int(time.time()),
'model': model,
'choices': [{
'index': 0,
'message': {
'role': 'assistant',
'content': response.choices[0].message.content
},
'finish_reason': response.choices[0].finish_reason
}],
'usage': {
'prompt_tokens': response.usage.prompt_tokens,
'completion_tokens': response.usage.completion_tokens,
'total_tokens': response.usage.total_tokens,
'processing_time_seconds': round(duration, 2),
'estimated_cost_usd': round(cost, 4)
}
})
except Exception as e:
duration = time.time() - start_time
record_metrics(model, 'error', duration, 0, 0)
logger.error(
"llm_request_failed",
request_id=request_id,
model=model,
error=str(e),
duration=duration
)
return jsonify({
'error': 'Request failed',
'code': 'LLM_ERROR'
}), 500
finally:
active_requests.dec()
Let's build a complete customer support chatbot API with all the features we've discussed. This exercise will give you hands-on experience with the entire deployment pipeline.
Create a new directory and set up the basic structure:
mkdir llm-customer-support
cd llm-customer-support
# Create project structure
mkdir app config logs tests
touch app/__init__.py app/main.py app/models.py app/utils.py
touch config/development.py config/production.py
touch requirements.txt Dockerfile docker-compose.yml
Create app/main.py:
from flask import Flask, request, jsonify
import os
import uuid
import time
import openai
import redis
from datetime import datetime
import json
# Initialize Flask app
app = Flask(__name__)
# Configuration
openai.api_key = os.getenv('OPENAI_API_KEY')
redis_client = redis.from_url(os.getenv('REDIS_URL', 'redis://localhost:6379'))
# Customer support context
SYSTEM_PROMPT = """You are a helpful customer support assistant for TechCorp, an online electronics retailer.
You can help customers with:
- Order status and tracking
- Product information and recommendations
- Return and refund policies
- Technical support for products
- Account questions
Be friendly, professional, and helpful. If you cannot answer a question, politely direct the customer to human support.
Company policies:
- Free shipping on orders over $50
- 30-day return policy
- 1-year warranty on electronics
- Support hours: Mon-Fri 9AM-6PM EST
"""
class CustomerSupportBot:
def __init__(self):
self.conversation_history = {}
def get_conversation(self, user_id):
"""Get or create conversation history"""
if user_id not in self.conversation_history:
self.conversation_history[user_id] = [
{"role": "system", "content": SYSTEM_PROMPT}
]
return self.conversation_history[user_id]
def add_message(self, user_id, role, content):
"""Add message to conversation"""
conversation = self.get_conversation(user_id)
conversation.append({"role": role, "content": content})
# Keep conversation under token limit
if len(conversation) > 20: # Keep last 20 messages + system
conversation = [conversation[0]] + conversation[-19:]
self.conversation_history[user_id] = conversation
def generate_response(self, user_id, user_message):
"""Generate bot response"""
# Add user message
self.add_message(user_id, "user", user_message)
conversation = self.get_conversation(user_id)
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=conversation,
max_tokens=200,
temperature=0.7
)
bot_message = response.choices[0].message.content
self.add_message(user_id, "assistant", bot_message)
return {
'success': True,
'message': bot_message,
'usage': response.usage.to_dict()
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
# Initialize bot
support_bot = CustomerSupportBot()
@app.route('/api/v1/chat', methods=['POST'])
def chat():
"""Main chat endpoint"""
try:
data = request.json
if not data or 'message' not in data:
return jsonify({'error': 'Message is required'}), 400
user_message = data['message']
user_id = data.get('user_id', 'anonymous')
# Generate response
result = support_bot.generate_response(user_id, user_message)
if result['success']:
return jsonify({
'response': result['message'],
'user_id': user_id,
'timestamp': datetime.utcnow().isoformat(),
'usage': result['usage']
})
else:
return jsonify({
'error': 'Failed to generate response',
'details': result['error']
}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/v1/chat/history/<user_id>', methods=['GET'])
def get_chat_history(user_id):
"""Get conversation history for a user"""
conversation = support_bot.get_conversation(user_id)
# Remove system prompt from response
user_conversation = [
msg for msg in conversation
if msg['role'] != 'system'
]
return jsonify({
'user_id': user_id,
'messages': user_conversation,
'message_count': len(user_conversation)
})
@app.route('/api/v1/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
try:
# Test Redis connection
redis_client.ping()
redis_status = 'healthy'
except:
redis_status = 'unhealthy'
try:
# Test OpenAI connection
openai.Model.list()
openai_status = 'healthy'
except:
openai_status = 'unhealthy'
return jsonify({
'status': 'healthy',
'services': {
'redis': redis_status,
'openai': openai_status
},
'timestamp': datetime.utcnow().isoformat()
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
Create requirements.txt:
Flask==2.3.3
openai==0.27.8
redis==4.6.0
gunicorn==21.2.0
python-dotenv==1.0.0
requests==2.31.0
Create Dockerfile:
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Create non-root user
RUN useradd --create-home --shell /bin/bash app
RUN chown -R app:app /app
USER app
# Expose port
EXPOSE 5000
# Run with gunicorn
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "2", "--timeout", "120", "app.main:app"]
Create docker-compose.yml:
version: '3.8'
services:
web:
build: .
ports:
- "5000:5000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- web
restart: unless-stopped
Set up environment variables:
export OPENAI_API_KEY="your-openai-key-here"
Run the application:
docker-compose up --build
Test the API:
# Health check
curl http://localhost/api/v1/health
# Send a message
curl -X POST http://localhost/api/v1/chat \
-H "Content-Type: application/json" \
-d '{"message": "How do I return a product?", "user_id": "test_user_1"}'
# Get conversation history
curl http://localhost/api/v1/chat/history/test_user_1
Load test your API:
# Install Apache Bench if not available
# apt-get install apache2-utils
# Test with 10 concurrent requests
ab -n 50 -c 10 -T application/json -p test_message.json http://localhost/api/v1/chat
Create test_message.json:
{"message": "What are your store hours?", "user_id": "load_test"}
Add basic monitoring to see how your application performs:
# Add this to your main.py
import time
from functools import wraps
def monitor_performance(f):
"""Decorator to monitor endpoint performance"""
@wraps(f)
def decorated_function(*args, **kwargs):
start_time = time.time()
try:
result = f(*args, **kwargs)
duration = time.time() - start_time
print(f"[MONITOR] {f.__name__} completed in {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
print(f"[MONITOR] {f.__name__} failed in {duration:.2f}s: {str(e)}")
raise
return decorated_function
# Apply to your endpoints
@app.route('/api/v1/chat', methods=['POST'])
@monitor_performance
def chat():
# ... existing code ...
You now have a fully functional LLM application with proper API design, containerization, and basic monitoring!
Problem: LLM requests can take 30+ seconds, but default web server timeouts are often much shorter.
Symptoms:
Solution:
# Configure proper timeouts in your server
# gunicorn.conf.py
timeout = 120 # 2 minutes
keepalive = 2
max_requests = 1000
max_requests_jitter = 50
# In your Flask app, handle timeout gracefully
import signal
class TimeoutError(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutError("Request timed out")
@app.route('/api/v1/chat/completions', methods=['POST'])
def chat_completion():
# Set up timeout handler
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(90) # 90 second timeout
try:
# Your LLM call here
response = openai.ChatCompletion.create(...)
signal.alarm(0) # Cancel timeout
return jsonify(response)
except TimeoutError:
return jsonify({
'error': 'Request timed out. Please try again.',
'code': 'TIMEOUT'
}), 504
finally:
signal.alarm(0) # Always cancel timeout
Problem: Without rate limiting, your API can be overwhelmed by a single user or suffer from abuse.
Symptoms:
Solution:
from functools import wraps
from flask import request, jsonify
import time
def rate_limit(requests_per_minute=10):
"""Simple rate limiting decorator"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Get client identifier
client_id = request.headers.get('Authorization', request.remote_addr)
# Check rate limit
current_time = time.time()
key = f"rate_limit:{client_id}"
# Get request timestamps from Redis
timestamps = redis_client.lrange(key, 0, -1)
timestamps = [float(ts) for ts in timestamps]
# Remove old timestamps (older than 1 minute)
cutoff_time = current_time - 60
recent_timestamps = [ts for ts in timestamps if ts > cutoff_time]
if len(recent_timestamps) >= requests_per_minute:
return jsonify({
'error': f'Rate limit exceeded. Max {requests_per_minute} requests per minute.',
'retry_after': 60
}), 429
# Add current timestamp
redis_client.lpush(key, current_time)
redis_client.expire(key, 60)
return f(*args, **kwargs)
return decorated_function
return decorator
# Apply to your endpoints
@app.route('/api/v1/chat/completions', methods=['POST'])
@rate_limit(requests_per_minute=20)
def chat_completion():
# ... your code ...
Problem: Generic error messages don't help users understand what went wrong or how to fix it.
Symptoms:
Solution:
class LLMError(Exception):
"""Base class for LLM-related errors"""
def __init__(self, message, code, status_code=500):
self.message = message
self.code = code
self.status_code = status_code
super().__init__(message)
class RateLimitError(LLMError):
def __init__(self):
super().__init__(
"Too many requests. Please slow down.",
"RATE_LIMIT_EXCEEDED",
429
)
class ModelUnavailableError(LLMError):
def __init__(self, model_name):
super().__init__(
f"Model {model_name} is temporarily unavailable.",
"MODEL_UNAVAILABLE",
503
)
@app.errorhandler(LLMError)
def handle_llm_error(error):
"""Handle custom LLM errors"""
return jsonify({
'error': error.message,
'code': error.code,
'timestamp': datetime.utcnow().isoformat()
}), error.status_code
@app.errorhandler(500)
def handle_internal_error(error):
"""Handle unexpected errors gracefully"""
return jsonify({
'error': 'An unexpected error occurred. Please try again.',
'code': 'INTERNAL_ERROR',
'timestamp': datetime.utcnow().isoformat()
}), 500
Problem: LLM costs can spiral out of control without proper optimization.
Warning Signs:
Solution:
def optimize_request(messages, max_tokens):
"""Optimize request to reduce costs"""
# Truncate very long conversations
if len(messages) > 10:
# Keep system message + last 8 messages
system_msg = messages[0] if messages[0]['role'] == 'system' else None
recent_messages = messages[-8:]
if system_msg and recent_messages[0]['role'] != 'system':
messages = [system_msg] + recent_messages
else:
messages = recent_messages
# Reduce max_tokens for simple queries
last_message = messages[-1]['content'].lower()
if any(word in last_message for word in ['yes', 'no', 'thanks', 'hi', 'hello']):
max_tokens = min(max_tokens, 50) # Short responses for simple queries
return messages, max_tokens
# Use in your handler
def handle_optimized_response(messages, model, max_tokens, temperature, request_id):
"""Handle response with cost optimization"""
# Optimize request
optimized_messages, optimized_max_tokens = optimize_request(messages, max_tokens)
# Log optimization
if len(optimized_messages) < len(messages):
print(f"[OPTIMIZATION] Reduced messages from {len(messages)} to {len(optimized_messages)}")
if optimized_max_tokens < max_tokens:
print(f"[OPTIMIZATION] Reduced max_tokens from {max_tokens} to {optimized_max_tokens}")
return handle_complete_response(
optimized_messages, model, optimized_max_tokens,
temperature, request_id
)
When your LLM application isn't working properly, check these common issues:
API Key Issues:
Network Connectivity:
Resource Limits:
Configuration Issues:
Performance Problems:
Use this debugging endpoint to help diagnose issues:
@app.route('/api/v1/debug', methods=['GET'])
def debug_info():
"""Debug information endpoint"""
import psutil
import sys
return jsonify({
'environment': {
'python_version': sys.version,
'openai_key_set': bool(os.getenv('OPENAI_API_KEY')),
'redis_url': os.getenv('REDIS_URL', 'not_set')
},
'system': {
'memory_usage_mb': psutil.virtual_memory().used // 1024 // 1024,
'memory_percent': psutil.virtual_memory().percent,
'cpu_percent': psutil.cpu_percent(),
'disk_usage_percent': psutil.disk_usage('/').percent
},
'application': {
'active_conversations': len(support_bot.conversation_history),
'queue_size': getattr(request_queue, 'worker_queue', {}).qsize() if 'request_queue' in globals() else 'N/A'
}
})
Congratulations! You've learned how to deploy LLM applications from development to production. Let's recap the key concepts:
API Design: You learned to create clean, RESTful APIs that handle LLM-specific challenges like long response times, streaming, and proper error handling. Good API design is the foundation of any successful LLM application.
Infrastructure Patterns: We covered three deployment patterns—single server for simple applications, load-balanced architecture for scaling, and microservices for complex systems. Each pattern has its place depending on your scale and requirements.
Concurrency Management: LLMs require special handling for concurrent requests. You implemented queuing systems that prevent overload while maintaining good user experience.
Cost Control: LLM inference is expensive, but proper tracking, caching, and optimization can keep costs manageable while maintaining functionality.
Monitoring and Reliability: Production LLM applications need comprehensive monitoring, structured logging, and health checks to maintain reliability at scale.
Practice with Different Models: Try deploying the same architecture with different LLM providers (Anthropic Claude, local models with Hugging Face, etc.) to understand the differences in deployment requirements.
Add Authentication and Security: Implement proper API authentication, input validation, and security headers. Consider using JWT tokens or API keys with proper scoping.
Scale Testing: Use tools like Apache Bench, wrk, or Artillery to test how your deployment handles increasing load. Identify bottlenecks and optimize accordingly.
Advanced Features: Add features like conversation persistence, user management, custom model fine-tuning, and integration with customer support systems.
Production Deployment: Deploy your application to a cloud provider (AWS, GCP, Azure) using their managed services for databases, caching, and container orchestration.
Learn Container Orchestration: Explore Kubernetes or Docker Swarm for managing LLM applications at scale across multiple servers.
The fundamentals you've learned here apply to any LLM deployment scenario. Whether you're building customer support bots, content generation systems, or AI assistants, these patterns will serve as your foundation for reliable, scalable LLM applications.
Remember: start simple, monitor everything, and scale based on real user needs rather than theoretical requirements. LLM applications are powerful, but they require careful architectural thinking to deploy successfully.
Learning Path: Building with LLMs