
You've seen the ChatGPT interface, played with GPT models, and maybe even tried some prompt engineering. But when your manager asks you to integrate AI capabilities into your company's customer support system, or when you need to process 10,000 product descriptions for sentiment analysis, the web interface won't cut it. You need programmatic access through the OpenAI API.
The difference between experimenting with AI and building production systems lies in understanding how to architect reliable, scalable integrations. This means handling rate limits gracefully, managing costs effectively, implementing proper error handling, and structuring your code for maintainability. Whether you're automating content generation for marketing campaigns, building intelligent data analysis pipelines, or creating custom AI-powered features for your applications, mastering the OpenAI API with Python is essential for any data professional working with AI.
What you'll learn:
You should have intermediate Python experience, including working with classes, exception handling, and external libraries. Basic familiarity with REST APIs and JSON is helpful. You'll need Python 3.8+ installed with the ability to install packages via pip.
If you're new to language models or prompt engineering concepts, quickly review OpenAI's documentation on model capabilities and basic prompting techniques before diving into the API implementation.
Before writing your first line of code, you need to establish a secure, organized foundation for your API integration. This isn't just about getting your first API call to work—it's about building a setup that will scale with your projects and keep your credentials secure.
Start by obtaining your API key from the OpenAI platform. But here's what the documentation doesn't emphasize enough: never hardcode your API key directly in your scripts. Even in development environments, this creates security risks and makes collaboration difficult.
Create a dedicated project directory and set up environment variable management:
# requirements.txt
openai>=1.3.0
python-dotenv>=0.19.0
requests>=2.25.0
# .env (never commit this file to version control)
OPENAI_API_KEY=sk-your-actual-key-here
OPENAI_ORG_ID=org-your-org-id-if-applicable
# config.py
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
OPENAI_ORG_ID = os.getenv('OPENAI_ORG_ID')
# Model configurations
DEFAULT_MODEL = 'gpt-3.5-turbo'
FALLBACK_MODEL = 'gpt-3.5-turbo'
MAX_TOKENS_DEFAULT = 150
TEMPERATURE_DEFAULT = 0.7
# Rate limiting
MAX_REQUESTS_PER_MINUTE = 3000
MAX_TOKENS_PER_MINUTE = 90000
if not Config.OPENAI_API_KEY:
raise ValueError("OPENAI_API_KEY environment variable is required")
This configuration approach separates concerns and makes your code portable across different environments (development, staging, production).
The OpenAI Python library has evolved significantly. The current version (v1.x) uses a different initialization pattern than earlier versions you might see in older tutorials:
# openai_client.py
from openai import OpenAI
from config import Config
import logging
# Set up logging for debugging API interactions
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class OpenAIClient:
def __init__(self):
self.client = OpenAI(
api_key=Config.OPENAI_API_KEY,
organization=Config.OPENAI_ORG_ID
)
def test_connection(self):
"""Test API connectivity and authentication"""
try:
models = self.client.models.list()
logger.info(f"Successfully connected. Available models: {len(models.data)}")
return True
except Exception as e:
logger.error(f"Connection failed: {e}")
return False
# Initialize and test
if __name__ == "__main__":
client = OpenAIClient()
client.test_connection()
When you run this script, you should see output like:
INFO:__main__:Successfully connected. Available models: 45
Pro Tip: Always test your connection before building complex workflows. The
test_connection()method will catch authentication issues early and help you verify which models are available to your account.
One of the most critical decisions you'll make is selecting the appropriate model for your use case. This isn't just about capabilities—it's about balancing performance, cost, and latency for production applications.
Here's a realistic breakdown of when to use each major model family:
# model_selector.py
class ModelSelector:
MODEL_CONFIGS = {
'gpt-4-turbo': {
'cost_per_1k_tokens': {'input': 0.01, 'output': 0.03},
'context_window': 128000,
'best_for': ['complex reasoning', 'code generation', 'detailed analysis'],
'latency': 'high'
},
'gpt-4': {
'cost_per_1k_tokens': {'input': 0.03, 'output': 0.06},
'context_window': 8192,
'best_for': ['high-quality content', 'complex tasks', 'accuracy-critical work'],
'latency': 'high'
},
'gpt-3.5-turbo': {
'cost_per_1k_tokens': {'input': 0.0015, 'output': 0.002},
'context_window': 16385,
'best_for': ['general tasks', 'high-volume processing', 'quick responses'],
'latency': 'low'
}
}
@classmethod
def recommend_model(cls, task_type, budget_priority=False, volume='medium'):
"""Recommend optimal model based on task requirements"""
recommendations = {
'content_generation': 'gpt-3.5-turbo' if budget_priority else 'gpt-4',
'code_review': 'gpt-4',
'data_analysis': 'gpt-4-turbo',
'customer_support': 'gpt-3.5-turbo',
'translation': 'gpt-3.5-turbo',
'summarization': 'gpt-3.5-turbo' if volume == 'high' else 'gpt-4'
}
return recommendations.get(task_type, 'gpt-3.5-turbo')
@classmethod
def estimate_cost(cls, model, input_tokens, output_tokens):
"""Calculate estimated cost for a request"""
config = cls.MODEL_CONFIGS.get(model, cls.MODEL_CONFIGS['gpt-3.5-turbo'])
input_cost = (input_tokens / 1000) * config['cost_per_1k_tokens']['input']
output_cost = (output_tokens / 1000) * config['cost_per_1k_tokens']['output']
return round(input_cost + output_cost, 6)
# Example usage
selector = ModelSelector()
recommended = selector.recommend_model('data_analysis', budget_priority=False)
cost = selector.estimate_cost('gpt-4', 1000, 500)
print(f"Recommended model: {recommended}")
print(f"Estimated cost for 1000 input + 500 output tokens: ${cost}")
Output:
Recommended model: gpt-4-turbo
Estimated cost for 1000 input + 500 tokens: $0.045
Understanding tokens is crucial for cost control and avoiding errors. Tokens aren't just words—they include punctuation, spaces, and special characters. Here's how to work with them effectively:
import tiktoken
class TokenManager:
def __init__(self, model_name='gpt-3.5-turbo'):
self.model_name = model_name
self.encoding = tiktoken.encoding_for_model(model_name)
def count_tokens(self, text):
"""Count tokens in text string"""
return len(self.encoding.encode(text))
def estimate_tokens_for_messages(self, messages):
"""Estimate tokens for a list of chat messages"""
tokens_per_message = 3 # Message overhead
tokens_per_name = 1 # If name is present
num_tokens = 0
for message in messages:
num_tokens += tokens_per_message
for key, value in message.items():
num_tokens += len(self.encoding.encode(value))
if key == "name":
num_tokens += tokens_per_name
num_tokens += 3 # Reply primer
return num_tokens
def truncate_text(self, text, max_tokens):
"""Truncate text to fit within token limit"""
tokens = self.encoding.encode(text)
if len(tokens) <= max_tokens:
return text
truncated_tokens = tokens[:max_tokens]
return self.encoding.decode(truncated_tokens)
# Example with realistic business content
token_manager = TokenManager()
customer_email = """
Dear Support Team,
I've been experiencing significant issues with your platform over the past week.
The dashboard keeps crashing when I try to generate monthly reports, and the
data export feature isn't working properly. This is affecting our entire
quarterly review process.
I've tried clearing my browser cache and using different browsers, but the
problems persist. Can someone please help resolve this urgently?
Best regards,
Sarah Chen
Operations Manager
TechCorp Solutions
"""
token_count = token_manager.count_tokens(customer_email)
print(f"Customer email tokens: {token_count}")
# If we need to fit this into a smaller context
truncated = token_manager.truncate_text(customer_email, 50)
print(f"Truncated version: {truncated}")
Output:
Customer email tokens: 89
Truncated version: Dear Support Team,
I've been experiencing significant issues with your platform over the past week.
The dashboard keeps crashing when I try to generate
Now that you have proper authentication and understand model selection, let's build your first production-ready API integration. We'll start with the chat completions endpoint, which handles most modern use cases.
The chat completions API uses a conversation format with roles: system, user, and assistant. This structure is crucial for creating reliable AI interactions:
# basic_chat.py
from openai_client import OpenAIClient
from config import Config
import json
class ChatCompletionHandler:
def __init__(self):
self.client = OpenAIClient().client
def create_completion(self, messages, model=None, **kwargs):
"""Create a chat completion with sensible defaults"""
completion_params = {
'model': model or Config.DEFAULT_MODEL,
'messages': messages,
'max_tokens': kwargs.get('max_tokens', Config.MAX_TOKENS_DEFAULT),
'temperature': kwargs.get('temperature', Config.TEMPERATURE_DEFAULT),
}
try:
response = self.client.chat.completions.create(**completion_params)
return response
except Exception as e:
print(f"API call failed: {e}")
raise
# Realistic business use case: Customer support ticket analysis
chat_handler = ChatCompletionHandler()
support_ticket = """
Customer: John Martinez
Issue: Cannot access premium features after upgrading account
Account Type: Business Pro
Date: 2024-01-15
Description: Upgraded to Business Pro yesterday but still seeing "upgrade required"
messages when trying to use advanced analytics. Payment went through successfully.
"""
messages = [
{
"role": "system",
"content": "You are a customer support analyst. Analyze support tickets and provide: 1) Issue category, 2) Priority level (Low/Medium/High/Critical), 3) Suggested resolution steps, 4) Estimated resolution time. Be specific and actionable."
},
{
"role": "user",
"content": f"Analyze this support ticket:\n\n{support_ticket}"
}
]
response = chat_handler.create_completion(
messages=messages,
temperature=0.3, # Lower temperature for more consistent analysis
max_tokens=300
)
print("Support Ticket Analysis:")
print(response.choices[0].message.content)
print(f"\nTokens used: {response.usage.total_tokens}")
print(f"Cost estimate: ${ModelSelector.estimate_cost('gpt-3.5-turbo', response.usage.prompt_tokens, response.usage.completion_tokens)}")
Output:
Support Ticket Analysis:
**Issue Category:** Account Management - Feature Access
**Priority Level:** High
- Customer has paid for upgrade but cannot access purchased features
- Business account affected, likely impacting operations
**Suggested Resolution Steps:**
1. Verify payment processing and account upgrade status in admin panel
2. Check for system synchronization delays between billing and feature access
3. Manually refresh account permissions if needed
4. Provide temporary workaround if available
**Estimated Resolution Time:** 2-4 hours
- Immediate: Account verification (15 minutes)
- Follow-up: System sync or manual intervention (1-3 hours)
Tokens used: 187
Cost estimate: $0.000467
Different business scenarios require different response structures. Here's how to design for various output formats:
class FormattedResponseHandler(ChatCompletionHandler):
def analyze_sales_data(self, sales_data, format_type='structured'):
"""Analyze sales data with different output formats"""
base_prompt = f"Analyze this sales data: {sales_data}"
format_instructions = {
'json': "Respond only with valid JSON containing: total_revenue, top_product, growth_rate, key_insights",
'executive_summary': "Provide a 2-paragraph executive summary suitable for C-level presentation",
'structured': "Use clear headers and bullet points for detailed analysis",
'action_items': "Focus on specific, actionable recommendations with priorities"
}
messages = [
{"role": "system", "content": f"You are a sales analyst. {format_instructions[format_type]}"},
{"role": "user", "content": base_prompt}
]
response = self.create_completion(messages, temperature=0.2)
return response.choices[0].message.content
# Test with realistic sales data
sales_data = {
"q4_2023": {
"total_revenue": 2400000,
"units_sold": 15680,
"top_products": ["CRM Pro", "Analytics Suite", "Mobile App"],
"regions": {"North": 45, "South": 30, "East": 15, "West": 10}
},
"q3_2023": {
"total_revenue": 2100000,
"units_sold": 14200
}
}
formatter = FormattedResponseHandler()
# Get JSON response for dashboard integration
json_analysis = formatter.analyze_sales_data(sales_data, 'json')
print("JSON Format:")
print(json_analysis)
print("\n" + "="*50 + "\n")
# Get executive summary for presentation
exec_summary = formatter.analyze_sales_data(sales_data, 'executive_summary')
print("Executive Summary:")
print(exec_summary)
Real-world API integrations fail in predictable ways: rate limits, network timeouts, service outages, and malformed requests. Building robust error handling isn't optional—it's what separates experimental code from production systems.
# error_handling.py
import time
import random
from typing import Optional, Dict, Any
from openai import OpenAI, APIError, RateLimitError, APITimeoutError
class ProductionChatHandler:
def __init__(self, max_retries=3, base_delay=1.0):
self.client = OpenAI(api_key=Config.OPENAI_API_KEY)
self.max_retries = max_retries
self.base_delay = base_delay
def exponential_backoff(self, attempt: int) -> float:
"""Calculate exponential backoff with jitter"""
delay = self.base_delay * (2 ** attempt)
jitter = random.uniform(0, delay * 0.1)
return delay + jitter
def create_completion_with_retry(self, messages, **kwargs) -> Optional[Dict[Any, Any]]:
"""Create completion with comprehensive error handling"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
response = self.client.chat.completions.create(
messages=messages,
**kwargs
)
# Validate response structure
if not response.choices or not response.choices[0].message.content:
raise ValueError("Empty response received from API")
return {
'success': True,
'content': response.choices[0].message.content,
'usage': response.usage,
'model': response.model,
'attempts': attempt + 1
}
except RateLimitError as e:
last_exception = e
if attempt < self.max_retries:
delay = self.exponential_backoff(attempt)
print(f"Rate limit hit. Waiting {delay:.2f}s before retry {attempt + 1}")
time.sleep(delay)
continue
except APITimeoutError as e:
last_exception = e
if attempt < self.max_retries:
delay = self.exponential_backoff(attempt)
print(f"Request timeout. Retrying in {delay:.2f}s")
time.sleep(delay)
continue
except APIError as e:
last_exception = e
# Some API errors shouldn't be retried (e.g., invalid request)
if e.status_code in [400, 401, 403, 404]:
break
if attempt < self.max_retries:
delay = self.exponential_backoff(attempt)
print(f"API error {e.status_code}. Retrying in {delay:.2f}s")
time.sleep(delay)
continue
except Exception as e:
last_exception = e
print(f"Unexpected error on attempt {attempt + 1}: {e}")
break
return {
'success': False,
'error': str(last_exception),
'error_type': type(last_exception).__name__,
'attempts': self.max_retries + 1
}
Rate limiting is one of the most common production issues. Here's how to handle it proactively:
# rate_limiter.py
import time
from collections import deque
from threading import Lock
class RateLimiter:
def __init__(self, max_requests_per_minute=3000, max_tokens_per_minute=90000):
self.max_requests_per_minute = max_requests_per_minute
self.max_tokens_per_minute = max_tokens_per_minute
self.request_times = deque()
self.token_usage = deque()
self.lock = Lock()
def wait_if_needed(self, estimated_tokens=0):
"""Wait if necessary to avoid rate limits"""
with self.lock:
current_time = time.time()
minute_ago = current_time - 60
# Clean old entries
while self.request_times and self.request_times[0] < minute_ago:
self.request_times.popleft()
while self.token_usage and self.token_usage[0][0] < minute_ago:
self.token_usage.popleft()
# Check request rate limit
if len(self.request_times) >= self.max_requests_per_minute:
sleep_time = 60 - (current_time - self.request_times[0])
if sleep_time > 0:
print(f"Request rate limit reached. Waiting {sleep_time:.1f}s")
time.sleep(sleep_time)
# Check token rate limit
current_token_usage = sum(tokens for _, tokens in self.token_usage)
if current_token_usage + estimated_tokens > self.max_tokens_per_minute:
sleep_time = 60 - (current_time - self.token_usage[0][0])
if sleep_time > 0:
print(f"Token rate limit would be exceeded. Waiting {sleep_time:.1f}s")
time.sleep(sleep_time)
# Record this request
self.request_times.append(current_time)
if estimated_tokens > 0:
self.token_usage.append((current_time, estimated_tokens))
# Enhanced production handler with rate limiting
class EnhancedProductionHandler(ProductionChatHandler):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.rate_limiter = RateLimiter()
self.token_manager = TokenManager()
def safe_completion(self, messages, **kwargs):
"""Create completion with rate limiting and error handling"""
# Estimate tokens for rate limiting
estimated_tokens = self.token_manager.estimate_tokens_for_messages(messages)
estimated_tokens += kwargs.get('max_tokens', Config.MAX_TOKENS_DEFAULT)
# Wait if needed to avoid rate limits
self.rate_limiter.wait_if_needed(estimated_tokens)
# Add fallback model in case primary model fails
primary_model = kwargs.get('model', Config.DEFAULT_MODEL)
kwargs['model'] = primary_model
result = self.create_completion_with_retry(messages, **kwargs)
# If primary model fails, try fallback
if not result['success'] and 'model' in str(result['error']):
print(f"Primary model {primary_model} failed. Trying fallback.")
kwargs['model'] = Config.FALLBACK_MODEL
result = self.create_completion_with_retry(messages, **kwargs)
return result
# Example usage with realistic high-volume scenario
handler = EnhancedProductionHandler()
# Process multiple customer inquiries
customer_inquiries = [
"How do I reset my password?",
"What's included in the premium plan?",
"I'm having trouble with the mobile app",
"Can I export my data to Excel?",
"When is my next billing date?"
]
results = []
for inquiry in customer_inquiries:
messages = [
{"role": "system", "content": "You are a helpful customer support assistant. Provide clear, concise answers."},
{"role": "user", "content": inquiry}
]
result = handler.safe_completion(messages, max_tokens=100, temperature=0.3)
results.append(result)
if result['success']:
print(f"Q: {inquiry}")
print(f"A: {result['content'][:100]}...")
print(f"Attempts: {result['attempts']}, Tokens: {result['usage'].total_tokens}\n")
else:
print(f"Failed to process: {inquiry}")
print(f"Error: {result['error']}\n")
For production applications, you'll often need responses to appear in real-time (streaming) or want the model to interact with your existing systems (function calling). These features transform static API calls into dynamic, interactive experiences.
Streaming is crucial for user-facing applications where you want to display responses as they're generated, rather than waiting for the complete response:
# streaming_handler.py
class StreamingChatHandler:
def __init__(self):
self.client = OpenAI(api_key=Config.OPENAI_API_KEY)
def stream_completion(self, messages, **kwargs):
"""Stream response tokens as they're generated"""
try:
stream = self.client.chat.completions.create(
messages=messages,
stream=True,
**kwargs
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
token = chunk.choices[0].delta.content
full_response += token
yield token
return full_response
except Exception as e:
yield f"Error: {str(e)}"
return None
def stream_with_status_updates(self, messages, **kwargs):
"""Stream with periodic status updates for long responses"""
token_count = 0
status_interval = 50 # Update every 50 tokens
print("Starting response generation...")
for token in self.stream_completion(messages, **kwargs):
if token.startswith("Error:"):
print(f"\n{token}")
break
print(token, end='', flush=True)
token_count += 1
if token_count % status_interval == 0:
print(f"\n[Generated {token_count} tokens so far...]")
# Real-world example: Generate detailed product analysis
streaming_handler = StreamingChatHandler()
product_data = """
Product: CloudSync Pro Business Suite
Q4 Sales: $2.4M revenue, 1,247 units sold
Customer Feedback: 4.2/5 stars (1,890 reviews)
Top Complaints: Integration complexity (23%), Mobile app bugs (18%), Pricing concerns (15%)
Competitor Analysis: 15% market share, trailing behind SyncMaster (28%) and DataFlow Pro (22%)
"""
analysis_messages = [
{
"role": "system",
"content": "You are a senior product analyst. Provide comprehensive analysis including market position, customer satisfaction insights, competitive advantages, areas for improvement, and strategic recommendations."
},
{
"role": "user",
"content": f"Analyze this product performance data and provide detailed strategic recommendations:\n\n{product_data}"
}
]
print("Generating comprehensive product analysis...\n")
streaming_handler.stream_with_status_updates(
messages=analysis_messages,
model='gpt-4',
max_tokens=800,
temperature=0.4
)
Function calling allows the model to interact with your existing systems—databases, APIs, calculation engines. This is where AI becomes truly powerful for business applications:
# function_calling.py
import json
import sqlite3
from datetime import datetime, timedelta
class BusinessFunctionHandler:
def __init__(self, db_path="business_data.db"):
self.client = OpenAI(api_key=Config.OPENAI_API_KEY)
self.db_path = db_path
self.setup_database()
def setup_database(self):
"""Set up sample business database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS sales_data
(date TEXT, product TEXT, revenue REAL, units INTEGER, region TEXT)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS customer_data
(customer_id TEXT, name TEXT, plan TEXT, mrr REAL, signup_date TEXT)''')
# Insert sample data
sample_sales = [
('2024-01-15', 'CRM Pro', 45000, 150, 'North'),
('2024-01-16', 'Analytics Suite', 38000, 95, 'South'),
('2024-01-17', 'Mobile App', 22000, 440, 'East'),
]
sample_customers = [
('CUST001', 'TechCorp Solutions', 'Enterprise', 2500, '2023-06-15'),
('CUST002', 'StartupXYZ', 'Pro', 299, '2024-01-10'),
('CUST003', 'Global Industries', 'Enterprise', 5000, '2023-03-22'),
]
cursor.executemany('INSERT OR REPLACE INTO sales_data VALUES (?, ?, ?, ?, ?)', sample_sales)
cursor.executemany('INSERT OR REPLACE INTO customer_data VALUES (?, ?, ?, ?, ?)', sample_customers)
conn.commit()
conn.close()
def get_sales_data(self, start_date=None, end_date=None, product=None):
"""Retrieve sales data from database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query = "SELECT * FROM sales_data WHERE 1=1"
params = []
if start_date:
query += " AND date >= ?"
params.append(start_date)
if end_date:
query += " AND date <= ?"
params.append(end_date)
if product:
query += " AND product LIKE ?"
params.append(f"%{product}%")
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
return [{"date": r[0], "product": r[1], "revenue": r[2],
"units": r[3], "region": r[4]} for r in results]
def get_customer_analytics(self, plan_type=None):
"""Get customer analytics data"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if plan_type:
cursor.execute("SELECT * FROM customer_data WHERE plan = ?", (plan_type,))
else:
cursor.execute("SELECT * FROM customer_data")
results = cursor.fetchall()
conn.close()
return [{"customer_id": r[0], "name": r[1], "plan": r[2],
"mrr": r[3], "signup_date": r[4]} for r in results]
def calculate_metrics(self, data_type, period="7d"):
"""Calculate business metrics"""
if data_type == "revenue":
sales_data = self.get_sales_data()
total_revenue = sum(item["revenue"] for item in sales_data)
return {"total_revenue": total_revenue, "period": period}
elif data_type == "customer_ltv":
customers = self.get_customer_analytics()
avg_mrr = sum(c["mrr"] for c in customers) / len(customers) if customers else 0
estimated_ltv = avg_mrr * 24 # Simplified LTV calculation
return {"avg_ltv": estimated_ltv, "customer_count": len(customers)}
return {"error": "Unknown data type"}
def chat_with_functions(self, user_query):
"""Chat with function calling capabilities"""
# Define available functions
functions = [
{
"name": "get_sales_data",
"description": "Retrieve sales data from the database",
"parameters": {
"type": "object",
"properties": {
"start_date": {"type": "string", "description": "Start date (YYYY-MM-DD)"},
"end_date": {"type": "string", "description": "End date (YYYY-MM-DD)"},
"product": {"type": "string", "description": "Product name to filter by"}
}
}
},
{
"name": "get_customer_analytics",
"description": "Get customer analytics and subscription data",
"parameters": {
"type": "object",
"properties": {
"plan_type": {"type": "string", "description": "Filter by plan type (Pro, Enterprise, etc.)"}
}
}
},
{
"name": "calculate_metrics",
"description": "Calculate business metrics like revenue, LTV, etc.",
"parameters": {
"type": "object",
"properties": {
"data_type": {"type": "string", "description": "Type of metric: revenue, customer_ltv"},
"period": {"type": "string", "description": "Time period for calculation"}
},
"required": ["data_type"]
}
}
]
messages = [
{"role": "system", "content": "You are a business analyst assistant. Use the available functions to retrieve and analyze business data to answer user questions accurately."},
{"role": "user", "content": user_query}
]
response = self.client.chat.completions.create(
model="gpt-4",
messages=messages,
functions=functions,
function_call="auto"
)
# Check if function was called
if response.choices[0].message.function_call:
function_name = response.choices[0].message.function_call.name
function_args = json.loads(response.choices[0].message.function_call.arguments)
# Execute the function
if function_name == "get_sales_data":
function_result = self.get_sales_data(**function_args)
elif function_name == "get_customer_analytics":
function_result = self.get_customer_analytics(**function_args)
elif function_name == "calculate_metrics":
function_result = self.calculate_metrics(**function_args)
else:
function_result = {"error": "Unknown function"}
# Send function result back to the model
messages.append({
"role": "assistant",
"content": None,
"function_call": response.choices[0].message.function_call
})
messages.append({
"role": "function",
"name": function_name,
"content": json.dumps(function_result)
})
# Get final response
final_response = self.client.chat.completions.create(
model="gpt-4",
messages=messages
)
return final_response.choices[0].message.content
return response.choices[0].message.content
# Example usage
business_handler = BusinessFunctionHandler()
queries = [
"What's our total revenue from the sales data?",
"How many Enterprise customers do we have and what's their average MRR?",
"Calculate the estimated customer lifetime value based on our current data",
"Show me all sales data for CRM Pro product"
]
for query in queries:
print(f"Query: {query}")
response = business_handler.chat_with_functions(query)
print(f"Response: {response}\n")
print("-" * 80)
Now let's put together everything you've learned into a practical system that a real business could use. You'll build a customer insight generator that analyzes support tickets, generates responses, and provides business intelligence.
Build a system that:
# customer_insight_generator.py
import json
import csv
from datetime import datetime
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass
class SupportTicket:
ticket_id: str
customer_name: str
customer_tier: str
subject: str
message: str
date_created: str
priority: str = None
category: str = None
response_draft: str = None
resolution_estimate: str = None
class CustomerInsightGenerator:
def __init__(self):
self.handler = EnhancedProductionHandler()
self.processed_tickets = []
self.insights = {}
def load_tickets_from_csv(self, file_path: str) -> List[SupportTicket]:
"""Load support tickets from CSV file"""
tickets = []
try:
with open(file_path, 'r', newline='', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
ticket = SupportTicket(**row)
tickets.append(ticket)
except FileNotFoundError:
print(f"File {file_path} not found. Creating sample data...")
tickets = self.create_sample_tickets()
self.save_sample_tickets_to_csv(tickets, file_path)
return tickets
def create_sample_tickets(self) -> List[SupportTicket]:
"""Create sample tickets for testing"""
return [
SupportTicket(
ticket_id="TK001",
customer_name="Sarah Chen",
customer_tier="Enterprise",
subject="Dashboard not loading",
message="The main dashboard hasn't been loading for the past 2 hours. Getting timeout errors.",
date_created="2024-01-15 14:30:00"
),
SupportTicket(
ticket_id="TK002",
customer_name="Mike Rodriguez",
customer_tier="Pro",
subject="Data export issue",
message="Cannot export monthly reports to Excel. The export button is grayed out.",
date_created="2024-01-15 16:45:00"
),
SupportTicket(
ticket_id="TK003",
customer_name="Global Corp Inc",
customer_tier="Enterprise",
subject="API integration failing",
message="Our API integration stopped working yesterday. Getting 401 errors on all endpoints.",
date_created="2024-01-16 09:15:00"
)
]
def save_sample_tickets_to_csv(self, tickets: List[SupportTicket], file_path: str):
"""Save sample tickets to CSV file"""
with open(file_path, 'w', newline='', encoding='utf-8') as file:
writer = csv.DictWriter(file, fieldnames=[
'ticket_id', 'customer_name', 'customer_tier',
'subject', 'message', 'date_created'
])
writer.writeheader()
for ticket in tickets:
writer.writerow({
'ticket_id': ticket.ticket_id,
'customer_name': ticket.customer_name,
'customer_tier': ticket.customer_tier,
'subject': ticket.subject,
'message': ticket.message,
'date_created': ticket.date_created
})
def analyze_ticket(self, ticket: SupportTicket) -> Dict[str, Any]:
"""Analyze individual ticket and generate insights"""
analysis_prompt = f"""
Analyze this customer support ticket:
Customer: {ticket.customer_name} ({ticket.customer_tier})
Subject: {ticket.subject}
Message: {ticket.message}
Provide analysis in this exact JSON format:
{{
"priority": "Low|Medium|High|Critical",
"category": "Technical|Billing|Account|Feature Request|Bug Report",
"severity_reason": "Brief explanation of priority assignment",
"estimated_resolution_time": "X hours/days",
"requires_escalation": true/false
}}
"""
messages = [
{"role": "system", "content": "You are an expert customer support analyst. Analyze tickets accurately and respond only with valid JSON."},
{"role": "user", "content": analysis_prompt}
]
result = self.handler.safe_completion(
messages=messages,
temperature=0.2,
max_tokens=200
)
if result['success']:
try:
analysis = json.loads(result['content'])
return analysis
except json.JSONDecodeError:
return {"error": "Invalid JSON response"}
else:
return {"error": result['error']}
# Your task: Complete the implementation
Complete the CustomerInsightGenerator class by implementing these methods:
generate_response_draft(self, ticket: SupportTicket, analysis: Dict) -> str
process_tickets_batch(self, tickets: List[SupportTicket]) -> List[SupportTicket]
generate_executive_summary(self, processed_tickets: List[SupportTicket]) -> str
export_results(self, filename: str = "processed_tickets.json")
def generate_response_draft(self, ticket: SupportTicket, analysis: Dict) -> str:
"""Generate professional response draft"""
tier_greeting = {
"Enterprise": f"Dear {ticket.customer_name},\n\nThank you for contacting us. As a valued Enterprise customer, this issue is our top priority.",
"Pro": f"Hello {ticket.customer_name},\n\nThank you for reaching out. We appreciate your Pro subscription and will resolve this promptly.",
"Basic": f"Hi {ticket.customer_name},\n\nThanks for contacting support. We're here to help!"
}
response_prompt = f"""
Generate a professional customer support response draft:
Customer: {ticket.customer_name} ({ticket.customer_tier})
Issue: {ticket.subject} - {ticket.message}
Priority: {analysis.get('priority', 'Medium')}
Category: {analysis.get('category', 'Technical')}
Requirements:
- Use appropriate greeting for {ticket.customer_tier} tier
- Acknowledge the specific issue
- Provide clear next steps
- Include timeline: {analysis.get('estimated_resolution_time', '24 hours')}
- Professional but friendly tone
- If escalation needed: {analysis.get('requires_escalation', False)}
"""
messages = [
{"role": "system", "content": "You are a senior customer support manager writing response drafts. Be specific, empathetic, and solution-focused."},
{"role": "user", "content": response_prompt}
]
result = self.handler.safe_completion(messages=messages, temperature=0.4, max_tokens=300)
return result['content'] if result['success'] else "Error generating response"
def process_tickets_batch(self, tickets: List[SupportTicket]) -> List[SupportTicket]:
"""Process multiple tickets with error handling and cost tracking"""
processed = []
total_cost = 0
total_tokens = 0
print(f"Processing {len(tickets)} tickets...")
for i, ticket in enumerate(tickets, 1):
print(f"Processing ticket {i}/{len(tickets)}: {ticket.ticket_id}")
# Analyze ticket
analysis = self.analyze_ticket(ticket)
if 'error' not in analysis:
# Update ticket with analysis
ticket.priority = analysis.get('priority')
ticket.category = analysis.get('category')
ticket.resolution_estimate = analysis.get('estimated_resolution_time')
# Generate response draft
ticket.response_draft = self.generate_response_draft(ticket, analysis)
processed.append(ticket)
print(f" ✓ Priority: {ticket.priority}, Category: {ticket.category}")
else:
print(f" ✗ Error: {analysis['error']}")
processed.append(ticket) # Still add to list but with error info
self.processed_tickets = processed
return processed
def generate_executive_summary(self, processed_tickets: List[SupportTicket]) -> str:
"""Generate executive summary of support trends"""
# Calculate metrics
total_tickets = len(processed_tickets)
priority_counts = {}
category_counts = {}
tier_counts = {}
for ticket in processed_tickets:
if ticket.priority:
priority_counts[ticket.priority] = priority_counts.get(ticket.priority, 0) + 1
if ticket.category:
category_counts[ticket.category] = category_counts.get(ticket.category, 0) + 1
tier_counts[ticket.customer_tier] = tier_counts.get(ticket.customer_tier, 0) + 1
summary_data = f"""
Support Ticket Analysis Summary
Period: {datetime.now().strftime('%Y-%m-%d')}
Total Tickets Processed: {total_tickets}
Priority Distribution: {priority_counts}
Category Breakdown: {category_counts}
Customer Tier Distribution: {tier_counts}
Generate an executive summary highlighting:
1. Key trends and patterns
2. Areas of concern requiring attention
3. Recommendations for support team optimization
4. Customer satisfaction impact analysis
"""
messages = [
{"role": "system", "content": "You are a VP of Customer Success creating an executive summary. Focus on actionable insights and business impact."},
{"role": "user", "content": summary_data}
]
result = self.handler.safe_completion(messages=messages, temperature=0.3, max_tokens=500)
return result['content'] if result['success'] else "Error generating summary"
# Test the complete system
if __name__ == "__main__":
generator = CustomerInsightGenerator()
# Load and process tickets
tickets = generator.load_tickets_from_csv("support_tickets.csv")
processed = generator.process_tickets_batch(tickets)
# Generate executive summary
summary = generator.generate_executive_summary(processed)
# Display results
print("\n" + "="*60)
print("EXECUTIVE SUMMARY")
print("="*60)
print(summary)
print("\n" + "="*60)
print("PROCESSED TICKETS")
print("="*60)
for ticket in processed:
print(f"\nTicket ID: {ticket.ticket_id}")
print(f"Customer: {ticket.customer_name} ({ticket.customer_tier})")
print(f"Priority: {ticket.priority} | Category: {ticket.category}")
print(f"Response Draft: {ticket.response_draft[:200]}...")
Even experienced developers make predictable mistakes when working with the OpenAI API. Here are the most common issues and how to fix them:
Mistake: Hardcoding API keys or mixing up environment variables
# DON'T do this
client = OpenAI(api_key="sk-your-key-here") # Security risk
# DON'T do this either
api_key = os.getenv('OPENAI_KEY') # Wrong variable name
Fix: Use proper environment variable management
# DO this
from dotenv import load_dotenv
import os
load_dotenv()
api_key = os.getenv('OPENAI_API_KEY')
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable is required")
client = OpenAI(api_key=api_key)
Mistake: Not validating input size before making API calls
# DON'T do this - will fail with large inputs
def analyze_document(document_text):
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": document_text}]
)
return response.choices[0].message.content
Fix: Implement proper token management
# DO this
def analyze_document(document_text, max_input_tokens=3000):
token_manager = TokenManager()
token_count = token_manager.count_tokens(document_text)
if token_count > max_input_tokens:
document_text = token_manager.truncate_text(document_text, max_input_tokens)
print(f"Document truncated to fit {max_input_tokens} token limit")
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": document_text}],
max_tokens=min(4096 - token_count, 1000) # Leave room for response
)
return response.choices[0].message.content
Mistake: Making rapid-fire API calls without considering rate limits
# DON'T do this - will hit rate limits
results = []
for item in large_dataset: # 1000+ items
response = client.chat.completions.create(...)
results.append(response)
Fix: Implement batching and rate limiting
# DO this
import time
from typing import List, Generator
def process_items_with_rate_limiting(items: List[str], batch_size=10, delay=1.0):
"""Process items in batches with rate limiting"""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = []
for item in batch:
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": item}]
)
batch_results.append(response.choices[0].message.content)
except Exception as e:
print(f"Error processing item: {e}")
batch_results.append(None)
yield batch_results
# Delay between batches
if i + batch_size < len(items):
time.sleep(delay)
# Usage
all_results = []
for batch_results in process_items_with_rate_limiting(large_dataset):
all_results.extend(batch_results)
print(f"Processed {len(all_results)} items so far...")
Mistake: Not handling specific error types appropriately
# DON'T do this - generic error handling
try:
response = client.chat.completions.create(...)
except Exception as e:
print(f"Something went wrong: {e}")
return None
Fix: Handle specific error scenarios
# DO this
from openai import APIError, RateLimitError, APITimeoutError
def robust_api_call(messages, max_retries=3):
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
timeout=30 # Set explicit timeout
)
return response.choices[0].message.content
except RateLimitError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited. Waiting {wait_time}s before retry...")
time.sleep(wait_time)
continue
else:
raise Exception("Max retries exceeded for rate limit")
except APITimeoutError:
if attempt < max_retries - 1:
print(f"Request timeout. Retrying attempt {attempt + 2}...")
continue
else:
raise Exception("Request timeout after max retries")
except APIError as e:
if e.status_code in [400, 401, 403]:
# Don't retry client errors
raise Exception(f"Client error: {e}")
elif attempt < max_retries - 1:
print(f"API error {e.status_code}. Retrying...")
continue
else:
raise Exception(f"API error after max retries: {e}")
Mistake: Using expensive models for simple tasks
# DON'T do this - using GPT-4 for simple classification
def classify_sentiment(text):
response = client.chat.completions.create(
model="gpt-4", # Expensive for simple task
messages=[{"role": "user", "content": f"Classify sentiment: {text}"}],
max_tokens=1000 # Way too many tokens for "positive/negative/neutral"
)
return response.choices[0].message.content
Fix: Choose appropriate models and token limits
# DO this
def classify_sentiment(text, use_budget_model=True):
model = "gpt-3.5-turbo" if use_budget_model else "gpt-4"
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "Classify sentiment as: positive, negative, or neutral. Respond with only one word."},
{"role": "user", "content": text}
],
max_tokens=1, # Only need one token for the response
temperature=0 # Deterministic for classification
)
return response.choices[0].message.content.strip().lower()
Mistake: Blindly trusting API responses without validation
# DON'T do this
def extract_email(text):
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": f"Extract email from: {text}"}]
)
return response.choices[0].message.content # Could be anything
Fix: Implement response validation
import re
def extract_email(text):
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "Extract email address. If no valid email found, respond with 'NONE'."},
{"role": "user", "content": text}
],
max_tokens=50
)
extracted = response.choices[0].message.content.strip()
# Validate email format
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
if re.match(email_pattern, extracted):
return extracted
else:
print(f"Invalid email format returned: {extracted}")
return None
You've built a comprehensive foundation for using the OpenAI API in production environments. You now understand how to properly authenticate and configure your API access, implement robust error handling with retry logic, manage costs through intelligent model selection and token management, and create scalable solutions that handle real-world challenges like rate limiting and response validation.
The key insight that separates successful API integrations from failed experiments is this: the API call itself is usually the easiest part. The real work lies in architecting reliable systems around those calls—systems that gracefully handle failures, manage costs effectively, and scale with your business needs. The patterns you've learned here (configuration management, error handling, token management, and response validation) apply to virtually every production API integration you'll build.
Your customer insight generator exercise demonstrated how these individual components work together to create business value. By combining multiple API calls with proper error handling and rate limiting, you built something that could genuinely improve a support team's efficiency and provide actionable business intelligence.
Next Steps to Expand Your Skills:
Explore Advanced Prompting Techniques - Learn about few-shot prompting, chain-of-thought reasoning, and prompt engineering strategies that can dramatically improve response quality without changing your code architecture. Understanding how to craft better prompts often yields bigger improvements than switching to more expensive models.
Build Custom Fine-Tuned Models - For specialized use cases with consistent data patterns, fine-tuning can provide better results at lower costs than using general-purpose models. This is particularly valuable for domain-specific applications like legal document analysis or technical support classification.
Integrate Vector Databases and RAG (Retrieval-Augmented Generation) - Combine your API skills with vector databases like Pinecone or Weaviate to build systems that can reference your company's specific knowledge base. This enables AI applications that understand your products, policies, and procedures while maintaining the flexibility of general-purpose language models.