
You're leading a data science team at a mid-sized e-commerce company. Every morning, your Slack channels fill with requests: "Can you analyze why conversion rates dropped in the Northeast?" "We need the latest customer segmentation for the marketing campaign launching tomorrow." "The inventory predictions seem off—can you investigate?" Each request requires pulling data from multiple systems, running analyses, generating reports, and often following up with clarifying questions. Your team spends more time on routine analytical tasks than on strategic insights.
This scenario is exactly what AI agents are designed to solve. An AI agent isn't just a chatbot that answers questions—it's an autonomous system that can perceive its environment, make decisions, take actions, and adapt its behavior to achieve specific goals. In your case, that could be an agent that automatically detects anomalies in conversion rates, pulls relevant data from your warehouse, performs root cause analysis, and delivers actionable insights to stakeholders without any human intervention.
By the end of this lesson, you'll understand how to architect, build, and deploy production-ready AI agents that can transform how your organization handles complex, multi-step analytical workflows.
What you'll learn:
To get the most from this lesson, you should have:
You'll need Python 3.9+ installed with the ability to install packages via pip. We'll use OpenAI's API for some examples, but the patterns apply to any LLM provider.
Traditional automation follows predetermined workflows: if X happens, do Y. AI agents operate fundamentally differently—they observe their environment, reason about what they perceive, formulate plans, and execute actions that adapt based on outcomes. This isn't just a more sophisticated if-then-else statement; it's a qualitatively different approach to problem-solving.
Every effective AI agent implements four core subsystems that work in continuous loops:
Perception: How the agent observes and interprets its environment Planning: How it decides what to do based on what it perceives Action: How it executes decisions in the real world Memory: How it learns from experience and maintains context
Let's examine each component through the lens of building a practical business intelligence agent.
An agent's perception system determines what information it can access and how it interprets that information. Unlike humans who perceive through senses, AI agents perceive through APIs, databases, files, and other digital interfaces.
import asyncio
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import pandas as pd
import httpx
@dataclass
class Observation:
"""Represents a single piece of environmental information"""
source: str
timestamp: datetime
data: Dict[str, Any]
confidence: float
context: Dict[str, Any]
class PerceptionSystem:
"""Manages how the agent observes its environment"""
def __init__(self, data_sources: Dict[str, Any]):
self.data_sources = data_sources
self.observation_history = []
self.client = httpx.AsyncClient(timeout=30.0)
async def observe_sales_metrics(self) -> Observation:
"""Perceive current sales performance metrics"""
query = """
SELECT
DATE(order_date) as date,
region,
SUM(revenue) as daily_revenue,
COUNT(DISTINCT customer_id) as unique_customers,
AVG(order_value) as avg_order_value
FROM sales_orders
WHERE order_date >= CURRENT_DATE - INTERVAL 7 DAYS
GROUP BY DATE(order_date), region
ORDER BY date DESC, region
"""
# Simulate database query
raw_data = await self._execute_query(query)
# Transform raw data into meaningful observations
processed_data = self._analyze_sales_trends(raw_data)
return Observation(
source="sales_database",
timestamp=datetime.now(),
data=processed_data,
confidence=0.95, # High confidence in database data
context={"query": query, "row_count": len(raw_data)}
)
async def observe_customer_behavior(self) -> Observation:
"""Perceive customer engagement patterns"""
# Check website analytics API
analytics_response = await self.client.get(
f"{self.data_sources['analytics_api']}/engagement",
headers={"Authorization": f"Bearer {self.data_sources['api_key']}"}
)
if analytics_response.status_code != 200:
return Observation(
source="web_analytics",
timestamp=datetime.now(),
data={},
confidence=0.0, # No confidence when data unavailable
context={"error": f"API returned {analytics_response.status_code}"}
)
analytics_data = analytics_response.json()
# Interpret the raw analytics data
behavior_insights = {
"bounce_rate_change": self._calculate_bounce_rate_trend(analytics_data),
"conversion_funnel": self._analyze_conversion_funnel(analytics_data),
"traffic_sources": analytics_data.get("traffic_sources", {}),
"user_segments": self._segment_user_behavior(analytics_data)
}
return Observation(
source="web_analytics",
timestamp=datetime.now(),
data=behavior_insights,
confidence=0.85, # Moderate confidence due to sampling
context={"api_response_time": analytics_response.elapsed.total_seconds()}
)
def _analyze_sales_trends(self, raw_data: List[Dict]) -> Dict[str, Any]:
"""Convert raw sales data into trend insights"""
df = pd.DataFrame(raw_data)
# Calculate week-over-week growth
current_week = df[df['date'] >= (datetime.now() - timedelta(days=7))]
previous_week = df[(df['date'] >= (datetime.now() - timedelta(days=14))) &
(df['date'] < (datetime.now() - timedelta(days=7)))]
current_revenue = current_week['daily_revenue'].sum()
previous_revenue = previous_week['daily_revenue'].sum()
growth_rate = ((current_revenue - previous_revenue) / previous_revenue) if previous_revenue > 0 else 0
return {
"revenue_trend": {
"current_week": current_revenue,
"previous_week": previous_revenue,
"growth_rate": growth_rate,
"trend_direction": "increasing" if growth_rate > 0.05 else "decreasing" if growth_rate < -0.05 else "stable"
},
"regional_performance": df.groupby('region')['daily_revenue'].sum().to_dict(),
"customer_metrics": {
"total_customers": df['unique_customers'].sum(),
"avg_order_value": df['avg_order_value'].mean()
}
}
This perception system doesn't just collect data—it interprets it. Notice how observe_sales_metrics() doesn't return raw database rows; it returns processed insights like growth rates and trend directions. The agent perceives meaning, not just data points.
Pro Tip: Always include confidence scores in observations. Real-world data sources have varying reliability, and your agent needs to reason about uncertainty. A database query might have 95% confidence, while a third-party API during maintenance might have much lower confidence.
Once an agent perceives its environment, it needs to decide what to do. This is where traditional automation breaks down—it can only follow predefined rules. AI agents use reasoning to formulate dynamic plans based on what they observe.
from enum import Enum
from typing import List, Tuple, Union
import openai
class ActionType(Enum):
INVESTIGATE = "investigate"
ALERT = "alert"
REPORT = "report"
OPTIMIZE = "optimize"
ESCALATE = "escalate"
WAIT = "wait"
@dataclass
class PlannedAction:
action_type: ActionType
target: str
parameters: Dict[str, Any]
priority: int # 1=highest, 5=lowest
estimated_duration: int # seconds
dependencies: List[str] # other actions that must complete first
class PlanningSystem:
"""Handles agent decision-making and action planning"""
def __init__(self, llm_client, business_rules: Dict[str, Any]):
self.llm_client = llm_client
self.business_rules = business_rules
self.active_plans = []
async def formulate_plan(self, observations: List[Observation]) -> List[PlannedAction]:
"""Create a plan based on current observations"""
# First, apply deterministic business rules
rule_based_actions = self._apply_business_rules(observations)
# Then, use LLM reasoning for complex scenarios
context_summary = self._summarize_observations(observations)
llm_prompt = f"""
You are a business intelligence agent analyzing the following situation:
{context_summary}
Current business rules triggered: {[action.action_type.value for action in rule_based_actions]}
Based on this information, what additional actions should be taken? Consider:
1. Root cause analysis needs
2. Stakeholder notification requirements
3. Preventive measures
4. Follow-up investigations
Respond with a JSON array of actions, each with:
- action_type: one of {[e.value for e in ActionType]}
- target: what/who the action applies to
- parameters: specific details for execution
- priority: 1-5 (1=urgent)
- reasoning: why this action is needed
"""
response = await self.llm_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": llm_prompt}],
temperature=0.3 # Lower temperature for more consistent planning
)
llm_actions = self._parse_llm_response(response.choices[0].message.content)
# Combine rule-based and LLM-generated actions
all_actions = rule_based_actions + llm_actions
# Optimize the plan (remove conflicts, sequence dependencies)
optimized_plan = self._optimize_action_sequence(all_actions)
return optimized_plan
def _apply_business_rules(self, observations: List[Observation]) -> List[PlannedAction]:
"""Apply deterministic business logic"""
actions = []
for obs in observations:
if obs.source == "sales_database":
revenue_data = obs.data.get("revenue_trend", {})
growth_rate = revenue_data.get("growth_rate", 0)
# Rule: Alert on significant revenue drops
if growth_rate < -0.15: # 15% decline
actions.append(PlannedAction(
action_type=ActionType.ALERT,
target="revenue_drop",
parameters={
"severity": "high",
"growth_rate": growth_rate,
"recipients": ["sales_director", "ceo"],
"message": f"Revenue declined {growth_rate:.1%} week-over-week"
},
priority=1,
estimated_duration=30,
dependencies=[]
))
# Follow up with investigation
actions.append(PlannedAction(
action_type=ActionType.INVESTIGATE,
target="revenue_decline_causes",
parameters={
"focus_areas": ["regional_breakdown", "product_mix", "customer_segments"],
"time_window": "last_14_days"
},
priority=2,
estimated_duration=300,
dependencies=["alert_revenue_drop"]
))
return actions
def _optimize_action_sequence(self, actions: List[PlannedAction]) -> List[PlannedAction]:
"""Optimize action ordering and remove conflicts"""
# Sort by priority, then handle dependencies
sorted_actions = sorted(actions, key=lambda x: x.priority)
# Simple dependency resolution (in production, use a proper DAG)
optimized = []
completed = set()
while sorted_actions:
for i, action in enumerate(sorted_actions):
if all(dep in completed for dep in action.dependencies):
optimized.append(action)
completed.add(f"{action.action_type.value}_{action.target}")
sorted_actions.pop(i)
break
else:
# No action without unsatisfied dependencies - possible circular dependency
optimized.extend(sorted_actions) # Add remaining actions anyway
break
return optimized
The planning system combines deterministic business rules with LLM reasoning. Business rules handle clear-cut scenarios (revenue drops below threshold → send alert), while the LLM handles nuanced situations that require contextual understanding.
Notice how actions include dependencies. This prevents the agent from sending a detailed analysis report before it's actually performed the investigation. Real agents need to sequence their actions logically.
Planning means nothing without reliable execution. Agent actions often have real-world consequences—sending alerts to executives, making API calls to external systems, or triggering automated processes. The execution system must be robust, observable, and safe.
import asyncio
from typing import Callable, Optional
import logging
from contextlib import asynccontextmanager
class ActionExecutionError(Exception):
"""Raised when action execution fails"""
pass
class ActionExecutor:
"""Handles safe execution of planned actions"""
def __init__(self, integrations: Dict[str, Any], dry_run: bool = False):
self.integrations = integrations
self.dry_run = dry_run
self.execution_history = []
self.logger = logging.getLogger(__name__)
async def execute_action(self, action: PlannedAction) -> Dict[str, Any]:
"""Execute a single planned action with full error handling"""
execution_id = f"{action.action_type.value}_{action.target}_{datetime.now().isoformat()}"
self.logger.info(f"Starting execution: {execution_id}")
if self.dry_run:
return await self._simulate_action(action)
try:
# Pre-execution validation
await self._validate_action(action)
# Execute based on action type
result = await self._dispatch_action(action)
# Post-execution verification
verification_result = await self._verify_action_result(action, result)
execution_record = {
"execution_id": execution_id,
"action": action,
"result": result,
"verification": verification_result,
"status": "success",
"timestamp": datetime.now(),
"duration": verification_result.get("duration", 0)
}
self.execution_history.append(execution_record)
self.logger.info(f"Successfully executed: {execution_id}")
return execution_record
except Exception as e:
error_record = {
"execution_id": execution_id,
"action": action,
"error": str(e),
"status": "failed",
"timestamp": datetime.now()
}
self.execution_history.append(error_record)
self.logger.error(f"Failed to execute {execution_id}: {e}")
# Attempt recovery
await self._handle_execution_failure(action, e)
raise ActionExecutionError(f"Action execution failed: {e}")
async def _dispatch_action(self, action: PlannedAction) -> Dict[str, Any]:
"""Route action to appropriate executor based on type"""
dispatch_map = {
ActionType.ALERT: self._execute_alert,
ActionType.INVESTIGATE: self._execute_investigation,
ActionType.REPORT: self._execute_report,
ActionType.OPTIMIZE: self._execute_optimization,
ActionType.ESCALATE: self._execute_escalation
}
executor = dispatch_map.get(action.action_type)
if not executor:
raise ActionExecutionError(f"No executor for action type: {action.action_type}")
return await executor(action)
async def _execute_alert(self, action: PlannedAction) -> Dict[str, Any]:
"""Execute alert actions (notifications, messages)"""
params = action.parameters
# Send to Slack
if "slack_channel" in params or "recipients" in params:
slack_result = await self._send_slack_alert(
message=params.get("message", "Alert triggered"),
severity=params.get("severity", "medium"),
recipients=params.get("recipients", []),
channel=params.get("slack_channel", "#alerts")
)
# Send email for high-severity alerts
if params.get("severity") == "high":
email_result = await self._send_email_alert(
subject=f"High Priority Alert: {action.target}",
body=params.get("message", ""),
recipients=params.get("recipients", [])
)
return {
"alert_type": "multi_channel",
"channels_used": ["slack", "email"] if params.get("severity") == "high" else ["slack"],
"recipients_notified": len(params.get("recipients", [])),
"timestamp": datetime.now().isoformat()
}
async def _execute_investigation(self, action: PlannedAction) -> Dict[str, Any]:
"""Execute investigation actions (data analysis, root cause analysis)"""
params = action.parameters
focus_areas = params.get("focus_areas", [])
time_window = params.get("time_window", "last_7_days")
investigation_results = {}
for area in focus_areas:
if area == "regional_breakdown":
investigation_results[area] = await self._analyze_regional_performance(time_window)
elif area == "product_mix":
investigation_results[area] = await self._analyze_product_performance(time_window)
elif area == "customer_segments":
investigation_results[area] = await self._analyze_customer_segments(time_window)
# Generate summary insights using LLM
summary = await self._generate_investigation_summary(investigation_results)
return {
"investigation_id": f"inv_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"focus_areas": focus_areas,
"detailed_results": investigation_results,
"summary_insights": summary,
"confidence_score": self._calculate_investigation_confidence(investigation_results)
}
async def _analyze_regional_performance(self, time_window: str) -> Dict[str, Any]:
"""Specific analysis for regional performance issues"""
# This would connect to your actual data warehouse
query = f"""
SELECT
region,
SUM(revenue) as total_revenue,
COUNT(DISTINCT order_id) as order_count,
AVG(order_value) as avg_order_value,
COUNT(DISTINCT customer_id) as unique_customers
FROM sales_orders
WHERE order_date >= CURRENT_DATE - INTERVAL '{time_window}'
GROUP BY region
ORDER BY total_revenue DESC
"""
# Simulate database results
regional_data = [
{"region": "Northeast", "total_revenue": 125000, "order_count": 1250, "avg_order_value": 100, "unique_customers": 800},
{"region": "Southeast", "total_revenue": 98000, "order_count": 1100, "avg_order_value": 89, "unique_customers": 750},
{"region": "West", "total_revenue": 156000, "order_count": 1400, "avg_order_value": 111, "unique_customers": 950},
]
# Calculate insights
total_revenue = sum(r["total_revenue"] for r in regional_data)
insights = {
"regional_breakdown": regional_data,
"underperforming_regions": [
r for r in regional_data
if r["total_revenue"] < (total_revenue / len(regional_data)) * 0.8
],
"key_findings": [
"Northeast region showing 23% decline in average order value",
"West region outperforming with highest customer acquisition"
]
}
return insights
async def _send_slack_alert(self, message: str, severity: str, recipients: List[str], channel: str) -> Dict[str, Any]:
"""Send alert to Slack workspace"""
# Format message based on severity
severity_emoji = {"high": "🚨", "medium": "⚠️", "low": "ℹ️"}
formatted_message = f"{severity_emoji.get(severity, 'ℹ️')} **{severity.upper()} ALERT**\n\n{message}"
# In production, use actual Slack API
slack_payload = {
"channel": channel,
"text": formatted_message,
"username": "BusinessIntelligenceAgent",
"icon_emoji": ":robot_face:"
}
# Simulate API call
await asyncio.sleep(0.1) # Simulate network delay
return {
"slack_channel": channel,
"message_sent": True,
"recipients_mentioned": recipients,
"timestamp": datetime.now().isoformat()
}
@asynccontextmanager
async def execution_context(self, action: PlannedAction):
"""Context manager for safe action execution with cleanup"""
execution_start = datetime.now()
temp_resources = []
try:
# Pre-execution setup
if action.action_type == ActionType.INVESTIGATE:
# Set up temporary analysis workspace
temp_workspace = f"/tmp/investigation_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
temp_resources.append(temp_workspace)
yield temp_resources
finally:
# Cleanup regardless of success/failure
execution_duration = (datetime.now() - execution_start).total_seconds()
self.logger.info(f"Action {action.action_type.value} completed in {execution_duration:.2f}s")
# Clean up temporary resources
for resource in temp_resources:
try:
# Cleanup logic here (delete temp files, close connections, etc.)
pass
except Exception as cleanup_error:
self.logger.warning(f"Failed to cleanup {resource}: {cleanup_error}")
The execution system includes several critical production features:
Circuit breakers: The _validate_action() method (implementation omitted for brevity) checks whether it's safe to execute an action. For example, it might prevent sending alerts if too many have been sent recently.
Idempotency: Actions should be safe to retry. The execution_id helps track whether an action was already executed.
Resource management: The execution_context ensures cleanup happens even if actions fail partway through.
Observability: Every action execution is logged with structured data, making it easy to debug issues and analyze agent behavior.
The most sophisticated planning and execution systems are limited without memory. Agents need to remember what they've learned, what actions they've taken, and how those actions performed. Memory isn't just storage—it's the foundation for improvement and contextual decision-making.
from datetime import datetime, timedelta
import sqlite3
import json
from typing import Dict, List, Any, Optional, Tuple
import numpy as np
from collections import defaultdict, deque
class MemorySystem:
"""Manages agent memory, learning, and context"""
def __init__(self, db_path: str = "agent_memory.db", max_working_memory: int = 100):
self.db_path = db_path
self.max_working_memory = max_working_memory
# Working memory for recent, frequently accessed information
self.working_memory = deque(maxlen=max_working_memory)
# Initialize persistent storage
self._initialize_database()
# Performance tracking for continuous learning
self.action_outcomes = defaultdict(list)
def _initialize_database(self):
"""Set up persistent memory storage"""
with sqlite3.connect(self.db_path) as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS observations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
source TEXT NOT NULL,
data TEXT NOT NULL,
confidence REAL NOT NULL,
context TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS actions_taken (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
action_type TEXT NOT NULL,
target TEXT NOT NULL,
parameters TEXT NOT NULL,
outcome TEXT NOT NULL,
effectiveness_score REAL
);
CREATE TABLE IF NOT EXISTS learned_patterns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pattern_type TEXT NOT NULL,
pattern_data TEXT NOT NULL,
confidence REAL NOT NULL,
last_updated TEXT NOT NULL,
usage_count INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS context_states (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
state_data TEXT NOT NULL,
triggers TEXT NOT NULL,
outcomes TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_observations_timestamp ON observations(timestamp);
CREATE INDEX IF NOT EXISTS idx_actions_timestamp ON actions_taken(timestamp);
CREATE INDEX IF NOT EXISTS idx_patterns_type ON learned_patterns(pattern_type);
""")
async def store_observation(self, observation: Observation) -> None:
"""Store observation in both working and persistent memory"""
# Add to working memory
self.working_memory.append({
"type": "observation",
"timestamp": observation.timestamp,
"data": observation
})
# Store in persistent memory
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO observations (timestamp, source, data, confidence, context)
VALUES (?, ?, ?, ?, ?)
""", (
observation.timestamp.isoformat(),
observation.source,
json.dumps(observation.data),
observation.confidence,
json.dumps(observation.context)
))
async def store_action_outcome(self, action: PlannedAction, outcome: Dict[str, Any], effectiveness_score: float) -> None:
"""Store action results for learning"""
# Update working memory
self.working_memory.append({
"type": "action_outcome",
"timestamp": datetime.now(),
"action": action,
"outcome": outcome,
"effectiveness": effectiveness_score
})
# Track for pattern learning
self.action_outcomes[action.action_type.value].append({
"parameters": action.parameters,
"outcome": outcome,
"effectiveness": effectiveness_score,
"timestamp": datetime.now()
})
# Persistent storage
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO actions_taken (timestamp, action_type, target, parameters, outcome, effectiveness_score)
VALUES (?, ?, ?, ?, ?, ?)
""", (
datetime.now().isoformat(),
action.action_type.value,
action.target,
json.dumps(action.parameters),
json.dumps(outcome),
effectiveness_score
))
async def learn_patterns(self) -> Dict[str, Any]:
"""Analyze stored experiences to learn patterns"""
patterns_learned = {}
# Learn from action effectiveness
for action_type, outcomes in self.action_outcomes.items():
if len(outcomes) >= 5: # Minimum samples for pattern learning
pattern = self._analyze_action_patterns(action_type, outcomes)
if pattern["confidence"] > 0.7: # Only store high-confidence patterns
patterns_learned[f"action_effectiveness_{action_type}"] = pattern
await self._store_learned_pattern(f"action_effectiveness_{action_type}", pattern)
# Learn situational context patterns
context_patterns = await self._analyze_context_patterns()
patterns_learned.update(context_patterns)
return patterns_learned
def _analyze_action_patterns(self, action_type: str, outcomes: List[Dict]) -> Dict[str, Any]:
"""Analyze effectiveness patterns for specific action types"""
# Group by similar parameters
parameter_groups = defaultdict(list)
for outcome in outcomes:
# Create a simplified parameter signature for grouping
param_signature = self._create_parameter_signature(outcome["parameters"])
parameter_groups[param_signature].append(outcome["effectiveness"])
# Find the most effective parameter patterns
best_patterns = {}
for signature, effectiveness_scores in parameter_groups.items():
if len(effectiveness_scores) >= 3: # Minimum samples
avg_effectiveness = np.mean(effectiveness_scores)
std_effectiveness = np.std(effectiveness_scores)
best_patterns[signature] = {
"avg_effectiveness": avg_effectiveness,
"consistency": 1.0 / (1.0 + std_effectiveness), # Lower std = higher consistency
"sample_count": len(effectiveness_scores)
}
# Find the overall best pattern
if best_patterns:
best_signature = max(best_patterns.keys(),
key=lambda x: best_patterns[x]["avg_effectiveness"] * best_patterns[x]["consistency"])
return {
"action_type": action_type,
"best_parameters": best_signature,
"effectiveness": best_patterns[best_signature]["avg_effectiveness"],
"confidence": min(0.95, best_patterns[best_signature]["consistency"] *
(best_patterns[best_signature]["sample_count"] / 10)),
"learned_at": datetime.now().isoformat()
}
return {"confidence": 0.0}
def _create_parameter_signature(self, parameters: Dict[str, Any]) -> str:
"""Create a simplified signature for parameter grouping"""
# Extract key characteristics rather than exact values
signature_parts = []
if "severity" in parameters:
signature_parts.append(f"severity_{parameters['severity']}")
if "recipients" in parameters:
signature_parts.append(f"recipient_count_{len(parameters['recipients'])}")
if "focus_areas" in parameters:
signature_parts.append(f"focus_areas_{len(parameters['focus_areas'])}")
return "_".join(signature_parts) if signature_parts else "default"
async def retrieve_relevant_context(self, current_situation: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Retrieve relevant past experiences for current situation"""
relevant_memories = []
# Check working memory first (fastest access)
for memory_item in reversed(self.working_memory): # Most recent first
relevance_score = self._calculate_relevance(memory_item, current_situation)
if relevance_score > 0.5:
relevant_memories.append({
"memory": memory_item,
"relevance": relevance_score,
"recency": 1.0 # Working memory is recent by definition
})
# Query persistent memory for similar situations
with sqlite3.connect(self.db_path) as conn:
# Look for similar observations
recent_observations = conn.execute("""
SELECT timestamp, source, data, confidence, context
FROM observations
WHERE timestamp > datetime('now', '-30 days')
ORDER BY timestamp DESC
LIMIT 50
""").fetchall()
for obs_row in recent_observations:
obs_data = json.loads(obs_row[2])
relevance = self._calculate_data_relevance(obs_data, current_situation)
if relevance > 0.4:
age_days = (datetime.now() - datetime.fromisoformat(obs_row[0])).days
recency_factor = max(0.1, 1.0 - (age_days / 30)) # Decay over 30 days
relevant_memories.append({
"memory": {
"type": "historical_observation",
"timestamp": obs_row[0],
"source": obs_row[1],
"data": obs_data,
"confidence": obs_row[3]
},
"relevance": relevance,
"recency": recency_factor
})
# Sort by combined relevance and recency
relevant_memories.sort(key=lambda x: x["relevance"] * x["recency"], reverse=True)
return relevant_memories[:10] # Return top 10 most relevant memories
def _calculate_relevance(self, memory_item: Dict[str, Any], current_situation: Dict[str, Any]) -> float:
"""Calculate how relevant a memory is to current situation"""
if memory_item["type"] == "observation":
return self._calculate_data_relevance(memory_item["data"].data, current_situation)
elif memory_item["type"] == "action_outcome":
# Check if similar action might be relevant
action = memory_item["action"]
if action.action_type.value in current_situation.get("potential_actions", []):
return 0.8
return 0.0
def _calculate_data_relevance(self, memory_data: Dict[str, Any], current_data: Dict[str, Any]) -> float:
"""Calculate similarity between data structures"""
# Simple relevance based on shared keys and similar values
shared_keys = set(memory_data.keys()) & set(current_data.keys())
if not shared_keys:
return 0.0
relevance_scores = []
for key in shared_keys:
if isinstance(memory_data[key], (int, float)) and isinstance(current_data[key], (int, float)):
# Numerical similarity
if memory_data[key] != 0:
similarity = 1.0 - abs(memory_data[key] - current_data[key]) / abs(memory_data[key])
relevance_scores.append(max(0, similarity))
elif isinstance(memory_data[key], str) and isinstance(current_data[key], str):
# String similarity (simple approach)
if memory_data[key] == current_data[key]:
relevance_scores.append(1.0)
elif memory_data[key].lower() in current_data[key].lower() or current_data[key].lower() in memory_data[key].lower():
relevance_scores.append(0.5)
return np.mean(relevance_scores) if relevance_scores else 0.0
async def get_performance_insights(self) -> Dict[str, Any]:
"""Analyze agent performance over time"""
with sqlite3.connect(self.db_path) as conn:
# Analyze action effectiveness trends
recent_actions = conn.execute("""
SELECT action_type, effectiveness_score, timestamp
FROM actions_taken
WHERE timestamp > datetime('now', '-30 days')
ORDER BY timestamp DESC
""").fetchall()
# Calculate trends by action type
action_trends = defaultdict(list)
for action_type, effectiveness, timestamp in recent_actions:
action_trends[action_type].append({
"effectiveness": effectiveness,
"timestamp": datetime.fromisoformat(timestamp)
})
# Calculate improvement rates
insights = {}
for action_type, records in action_trends.items():
if len(records) >= 5:
# Sort by timestamp
records.sort(key=lambda x: x["timestamp"])
# Calculate trend
effectiveness_scores = [r["effectiveness"] for r in records]
time_indices = list(range(len(effectiveness_scores)))
# Simple linear trend
correlation = np.corrcoef(time_indices, effectiveness_scores)[0, 1] if len(effectiveness_scores) > 1 else 0
insights[action_type] = {
"average_effectiveness": np.mean(effectiveness_scores),
"improvement_trend": correlation, # Positive = improving
"consistency": 1.0 - np.std(effectiveness_scores),
"sample_count": len(records)
}
return {
"performance_by_action": insights,
"overall_trend": np.mean([insight["improvement_trend"] for insight in insights.values()]) if insights else 0,
"total_actions": len(recent_actions),
"analysis_period": "30_days"
}
This memory system provides several key capabilities:
Working Memory: Fast access to recent information for immediate context.
Pattern Learning: The system automatically identifies which parameters lead to more effective actions.
Context Retrieval: When facing a new situation, the agent can recall similar past experiences.
Performance Tracking: The agent monitors its own effectiveness and can identify areas for improvement.
Important: Memory systems can become performance bottlenecks. The working memory with a maximum size prevents unbounded growth, while the relevance scoring ensures the agent focuses on the most pertinent past experiences rather than everything it has ever seen.
Now that we understand the core components, let's build a complete business intelligence agent that can autonomously monitor sales performance, investigate issues, and alert stakeholders.
import asyncio
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import logging
class BusinessIntelligenceAgent:
"""Complete autonomous business intelligence agent"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger(__name__)
# Initialize subsystems
self.perception = PerceptionSystem(config["data_sources"])
self.planning = PlanningSystem(config["llm_client"], config["business_rules"])
self.execution = ActionExecutor(config["integrations"], dry_run=config.get("dry_run", False))
self.memory = MemorySystem(config.get("db_path", "agent_memory.db"))
# Agent state
self.running = False
self.cycle_count = 0
self.last_full_analysis = None
async def start(self) -> None:
"""Start the agent's main execution loop"""
self.running = True
self.logger.info("Business Intelligence Agent starting...")
try:
while self.running:
cycle_start = datetime.now()
# Execute one complete agent cycle
await self.execute_cycle()
# Learn from recent experiences
if self.cycle_count % 10 == 0: # Learn every 10 cycles
learned_patterns = await self.memory.learn_patterns()
self.logger.info(f"Learned {len(learned_patterns)} new patterns")
# Calculate sleep time to maintain cycle frequency
cycle_duration = (datetime.now() - cycle_start).total_seconds()
sleep_time = max(0, self.config.get("cycle_interval", 300) - cycle_duration)
self.logger.info(f"Cycle {self.cycle_count} completed in {cycle_duration:.2f}s, sleeping {sleep_time:.2f}s")
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.cycle_count += 1
except Exception as e:
self.logger.error(f"Agent failed: {e}")
raise
finally:
self.logger.info("Business Intelligence Agent stopped")
async def execute_cycle(self) -> Dict[str, Any]:
"""Execute one complete perception -> planning -> action cycle"""
cycle_results = {
"cycle_id": self.cycle_count,
"timestamp": datetime.now(),
"observations": [],
"plans": [],
"actions": [],
"errors": []
}
try:
# PERCEPTION PHASE
observations = await self.gather_observations()
cycle_results["observations"] = observations
# Store observations in memory
for obs in observations:
await self.memory.store_observation(obs)
# PLANNING PHASE
# Retrieve relevant context from memory
current_situation = self._summarize_current_situation(observations)
relevant_context = await self.memory.retrieve_relevant_context(current_situation)
# Create plan based on observations and context
planned_actions = await self.planning.formulate_plan(observations)
# Enhance plan with learned patterns
enhanced_actions = await self._enhance_plan_with_memory(planned_actions, relevant_context)
cycle_results["plans"] = enhanced_actions
# ACTION PHASE
action_results = []
for action in enhanced_actions[:3]: # Execute top 3 priority actions per cycle
try:
result = await self.execution.execute_action(action)
action_results.append(result)
# Calculate effectiveness score
effectiveness = self._calculate_action_effectiveness(action, result)
await self.memory.store_action_outcome(action, result, effectiveness)
except Exception as action_error:
self.logger.error(f"Action failed: {action_error}")
cycle_results["errors"].append({
"action": action,
"error": str(action_error)
})
cycle_results["actions"] = action_results
return cycle_results
except Exception as e:
self.logger.error(f"Cycle {self.cycle_count} failed: {e}")
cycle_results["errors"].append({"cycle_error": str(e)})
return cycle_results
async def gather_observations(self) -> List[Observation]:
"""Gather all environmental observations"""
observation_tasks = [
self.perception.observe_sales_metrics(),
self.perception.observe_customer_behavior(),
]
# Add conditional observations based on schedule or triggers
current_hour = datetime.now().hour
if current_hour == 9: # Morning deep dive
observation_tasks.append(self._observe_competitive_landscape())
if datetime.now().weekday() == 0: # Monday weekly summary
observation_tasks.append(self._observe_weekly_summary())
# Execute all observations concurrently
observations = []
results = await asyncio.gather(*observation_tasks, return_exceptions=True)
for result in results:
if isinstance(result, Observation):
observations.append(result)
elif isinstance(result, Exception):
self.logger.warning(f"Observation failed: {result}")
return observations
def _summarize_current_situation(self, observations: List[Observation]) -> Dict[str, Any]:
"""Create a summary of current situation for context retrieval"""
situation = {
"timestamp": datetime.now().isoformat(),
"observation_sources": [obs.source for obs in observations],
"confidence_levels": [obs.confidence for obs in observations],
}
# Extract key metrics for similarity matching
for obs in observations:
if obs.source == "sales_database":
revenue_data = obs.data.get("revenue_trend", {})
situation.update({
"revenue_growth_rate": revenue_data.get("growth_rate", 0),
"revenue_trend_direction": revenue_data.get("trend_direction", "stable")
})
elif obs.source == "web_analytics":
behavior_data = obs.data
situation.update({
"bounce_rate_trend": behavior_data.get("bounce_rate_change", 0),
"conversion_rate": behavior_data.get("conversion_funnel", {}).get("overall_rate", 0)
})
return situation
async def _enhance_plan_with_memory(self, planned_actions: List[PlannedAction],
relevant_context: List[Dict[str, Any]]) -> List[PlannedAction]:
"""Enhance planned actions using learned patterns and past experiences"""
enhanced_actions = []
for action in planned_actions:
# Check if we have learned patterns for this action type
pattern_key = f"action_effectiveness_{action.action_type.value}"
# Look for learned patterns in memory
with sqlite3.connect(self.memory.db_path) as conn:
pattern_result = conn.execute("""
SELECT pattern_data FROM learned_patterns
WHERE pattern_type = ? AND confidence > 0.7
ORDER BY last_updated DESC LIMIT 1
""", (pattern_key,)).fetchone()
if pattern_result:
pattern_data = json.loads(pattern_result[0])
# Adjust action parameters based on learned effectiveness
enhanced_params = action.parameters.copy()
if "best_parameters" in pattern_data:
best_params = pattern_data["best_parameters"]
# Apply learned parameter optimizations
if "severity_high" in best_params and action.action_type == ActionType.ALERT:
enhanced_params["severity"] = "high"
if "focus_areas_3" in best_params and action.action_type == ActionType.INVESTIGATE:
# Ensure we have optimal number of focus areas
current_areas = enhanced_params.get("focus_areas", [])
if len(current_areas) < 3:
enhanced_params["focus_areas"].extend(["customer_segments", "product_mix"][:3-len(current_areas)])
# Create enhanced action
enhanced_action = PlannedAction(
action_type=action.action_type,
target=action.target,
parameters=enhanced_params,
priority=action.priority,
estimated_duration=action.estimated_duration,
dependencies=action.dependencies
)
enhanced_actions.append(enhanced_action)
else:
enhanced_actions.append(action) # No learned patterns, use original
return enhanced_actions
def _calculate_action_effectiveness(self, action: PlannedAction, result: Dict[str, Any]) -> float:
"""Calculate how effective an action was"""
if result.get("status") == "failed":
return 0.0
effectiveness_score = 0.5 # Baseline for successful completion
# Action-type specific effectiveness calculation
if action.action_type == ActionType.ALERT:
# Measure by recipient engagement (simplified)
recipients_notified = result.get("recipients_notified", 0)
if recipients_notified > 0:
effectiveness_score += 0.3
# Higher severity alerts that completed successfully are more effective
if action.parameters.get("severity") == "high":
effectiveness_score += 0.2
elif action.action_type == ActionType.INVESTIGATE:
# Measure by insight quality
confidence_score = result.get("confidence_score", 0)
effectiveness_score += confidence_score * 0.4
# More focus areas generally means more thorough investigation
focus_areas_count = len(action.parameters.get("focus_areas", []))
effectiveness_score += min(0.1 * focus_areas_count, 0.3)
elif action.action_type == ActionType.REPORT:
# Measure by completeness and timeliness
if result.get("report_generated"):
effectiveness_score += 0.3
execution_time = result.get("duration", float('inf'))
if execution_time < action.estimated_duration:
effectiveness_score += 0.2 # Bonus for completing faster than estimated
return min(1.0, effectiveness_score) # Cap at 1.0
async def stop(self) -> None:
"""Gracefully stop the agent"""
self.logger.info("Stopping Business Intelligence Agent...")
self.running = False
# Get final performance report
performance_insights = await self.memory.get_performance_insights()
self.logger.info(f"Final performance report: {performance_insights}")
This complete agent implementation demonstrates several production-ready patterns:
Autonomous Operation: The agent runs continuously, making decisions without human intervention while providing full observability.
Graceful Error Handling: Individual failures don't crash the entire system. The agent logs errors and continues operation.
Learning Integration: The agent actively uses its memory system to improve performance over time.
Configurable Behavior: Business rules, data sources, and behavior parameters are externalized in configuration.
Let's build a practical agent that demonstrates the concepts we've covered. This agent will monitor customer behavior patterns, predict potential churn, and take proactive actions to retain at-risk customers.
Build an agent with these capabilities:
# customer_churn_agent.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class ChurnRiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class CustomerRiskProfile:
customer_id: str
risk_level: ChurnRiskLevel
risk_factors: List[str]
confidence_score: float
last_activity: datetime
predicted_churn_date: Optional[datetime]
recommended_actions: List[str]
class CustomerChurnAgent:
"""Agent focused on preventing customer churn through proactive intervention"""
def __init__(self, config: Dict[str, Any]):
self.config = config
# TODO: Initialize perception, planning, execution, memory systems
# TODO: Set up customer data sources
# TODO: Configure retention action integrations
async def assess_churn_risk(self, customer_id: str) -> CustomerRiskProfile:
"""Analyze a single customer's churn risk"""
# TODO: Gather customer behavioral data
# TODO: Apply churn risk scoring model
# TODO: Generate risk profile with recommended actions
pass
async def execute_retention_action(self, customer_profile: CustomerRiskProfile, action: str) -> Dict[str, Any]:
"""Execute specific retention action for at-risk customer"""
# TODO: Implement retention actions:
# - Send personalized discount offer
# - Schedule proactive support call
# - Alert account manager
# - Trigger win-back email sequence
pass
async def learn_from_outcomes(self, customer_id: str, actions_taken: List[str], outcome: str) -> None:
"""Update models based on retention campaign results"""
# TODO: Track which actions were most effective
# TODO: Update churn prediction models
# TODO: Adjust risk scoring parameters
pass
# Example usage and test data setup
async def main():
config = {
"data_sources": {
"customer_db": "postgresql://localhost/customers",
"support_api": "https://support.company.com/api/v1",
"analytics_api": "https://analytics.company.com/api"
},
"integrations": {
"email_service": {"api_key": "email_key", "endpoint": "https://email.service.com"},
"crm_system": {"api_key": "crm_key", "endpoint": "https://crm.company.com/api"},
"slack_webhook": "https://hooks.slack.com/services/xxx"
},
"business_rules": {
"high_risk_threshold": 0.7,
"critical_risk_threshold": 0.85,
"max_retention_spend": 500.0,
"min_customer_value": 1000.0
}
}
agent = CustomerChurnAgent(config)
# Test with sample customer
risk_profile = await agent.assess_churn_risk("customer_12345")
print(f"Risk assessment: {risk_profile}")
if risk_profile.risk_level in [ChurnRiskLevel.HIGH, ChurnRiskLevel.CRITICAL]:
for action in risk_profile.recommended_actions:
result = await agent.execute_retention_action(risk_profile, action)
print(f"Action {action} result: {result}")
if __name__ == "__main__":
asyncio.run(main())
Implement the perception system that gathers customer behavioral indicators:
Build the churn risk assessment logic combining:
Create retention action executors for:
Implement learning mechanisms that:
Your completed agent should:
Here's one effective approach to structuring your solution:
class ChurnPerceptionSystem:
async def gather_customer_data(self, customer_id: str) -> Dict[str, Any]:
"""Collect all relevant customer behavioral data"""
data = {}
# Transactional behavior
data["transactions"] = await self._get_transaction_history(customer_id, days=90)
# Support interactions
data["support"] = await self._get_support_history(customer_id, days=90)
# Product usage
data["usage"] = await self._get_usage_metrics(customer_id, days=30)
return data
class ChurnRiskScorer:
def __init__(self, business_rules: Dict[str, Any]):
self.rules = business_rules
self.learned_weights = self._load_learned_weights()
def calculate_risk_score(self, customer_data: Dict[str, Any]) -> Tuple[float, List[str]]:
"""Calculate churn risk score and identify contributing factors"""
risk_factors = []
score = 0.0
# Recency scoring
last_transaction = self._get_last_transaction_date(customer_data["transactions"])
days_since_activity = (datetime.now() - last_transaction).days
if days_since_activity > 30:
risk_factors.append("no_recent_transactions")
score += 0.3
# Frequency scoring
transaction_count = len(customer_data["transactions"])
historical_avg = customer_data.get("historical_transaction_frequency", 10)
if transaction_count < historical_avg * 0.5:
risk_factors.append("decreased_transaction_frequency")
score += 0.2
# Support interaction patterns
recent_tickets = [t for t in customer_data["support"]
if t["created"] > datetime.now() - timedelta(days=30)]
if len(recent_tickets) > 3:
risk_factors.append("increased_support_requests")
score += 0.15
# Apply learned weights
score = self._apply_learned_adjustments(score, risk_factors)
return min(1.0, score), risk_factors
Focus on making your implementation realistic—use proper error handling, include realistic data patterns, and make sure your agent could actually prevent churn in a real business environment.
Building production AI agents introduces unique challenges that don't exist in traditional software systems. Here are the most critical mistakes to avoid, based on real implementation experience:
The Mistake: Expecting agents to behave identically given the same inputs, or building brittle systems that break when agents make unexpected decisions.
# WRONG: Fragile agent integration
async def process_sales_alert(agent_response):
# This breaks when the agent returns unexpected format
if agent_response["action"] == "send_alert":
recipients = agent_response["recipients"] # KeyError if missing!
send_email(recipients, agent_response["message"])
# WRONG: Assuming deterministic behavior
def test_agent_planning():
observations = [revenue_decline_observation]
plan = agent.plan(observations)
assert plan[0].action_type == ActionType.ALERT # This will randomly fail!
Why it fails: AI agents make probabilistic decisions. The same inputs can generate different outputs due to LLM randomness, learned behaviors, or environmental changes. Systems that assume deterministic behavior will fail unpredictably.
The Fix: Build robust interfaces that handle variability gracefully:
# CORRECT: Robust agent integration
async def process_agent_response(agent_response: Dict[str, Any]) -> bool:
"""Process agent response with proper error handling"""
# Validate response structure
required_fields = ["action_type", "confidence", "parameters"]
if not all(field in agent_response for field in required_fields):
logger.warning(f"Invalid agent response structure: {agent_response}")
return False
# Handle different action types safely
action_type = agent_response.get("action_type")
parameters = agent_response.get("parameters", {})
if action_type == "send_alert":
recipients = parameters.get("recipients", ["fallback@company.com"])
message = parameters.get("message", "Agent alert triggered")
# Validate recipients before sending
valid_recipients = [r for r in recipients if "@" in r]
if valid_recipients:
await send_email(valid_recipients, message)
return True
else:
logger.error(f"No valid recipients in agent response: {recipients}")
return False
elif action_type == "investigate":
# Handle investigation actions...
return await execute_investigation(parameters)
else:
logger.warning(f"Unknown action type from agent: {action_type}")
return False
# CORRECT: Probabilistic testing
def test_agent_planning_distribution():
"""Test agent behavior statistically rather than deterministically"""
observations = [revenue_decline_observation]
# Run multiple times to capture distribution
action_types = []
for _ in range(20):
plan = agent.plan(observations)
if plan:
action_types.append(plan[0].action_type)
# Test statistical properties
assert ActionType.ALERT in action_types, "Agent should sometimes choose alerts"
assert len(set(action_types)) > 1, "Agent should show some variability"
# Test that critical situations consistently trigger high-priority actions
alert_ratio = action_types.count(ActionType.ALERT) / len(action_types)
assert alert_ratio > 0.6, "Agent should favor alerts for revenue decline"
The Mistake: Allowing agents to execute actions without proper validation, leading to costly mistakes or security vulnerabilities.
# WRONG: No validation on agent actions
async def execute_agent_action(action):
# Blindly trust the agent's decisions
if action.type == "send_promotional_email":
for customer_id in action.parameters["customer_list"]:
await send_email(customer_id, action.parameters["offer"])
# WRONG: No cost controls
async def execute_retention_offer(customer_id, discount_amount):
# Agent could authorize unlimited discounts
await create_discount_code(customer_id, discount_amount)
Why it fails: Agents can make mistakes, be manipulated through prompt injection, or optimize for metrics in unintended ways. Without validation, these mistakes can be expensive or dangerous.
The Fix: Implement comprehensive validation and safety rails:
class ActionValidator:
"""Validates agent actions before execution"""
def __init__(self, business_rules: Dict[str, Any]):
self.rules = business_rules
self.recent_actions = defaultdict(list) # Track recent actions per customer
async def validate_action(self, action: PlannedAction) -> Tuple[bool, str]:
"""Validate action against business rules and safety constraints"""
# Check basic action structure
if not self._is_action_well_formed(action):
return False, "Action missing required fields"
# Validate based on action type
if action.action_type == ActionType.ALERT:
return await self._validate_alert_action(action)
elif action.action_type == ActionType.INVESTIGATE:
return await self._validate_investigation_action(action)
else:
return True, "Action type not restricted"
async def _validate_alert_action(self, action: PlannedAction) -> Tuple[bool, str]:
"""Validate alert actions for safety and appropriateness"""
params = action.parameters
# Rate limiting: prevent spam
recent_alerts = [a for a in self.recent_actions["alerts"]
if a["timestamp"] > datetime.now() - timedelta(hours=1)]
if len(recent_alerts) >= self.rules.get("max_alerts_per_hour", 5):
return False, "Alert rate limit exceeded"
# Recipient validation
recipients = params.get("recipients", [])
if not recipients:
return False, "Alert requires at least one recipient"
# Check for valid email addresses
valid_recipients = [r for r in recipients if self._is_valid_email(r)]
if not valid_recipients:
return False, "No valid recipients found"
# Severity validation
severity = params.get("severity", "medium")
if severity not in ["low", "medium", "high"]:
return False, f"Invalid severity level: {severity}"
# Content validation (prevent sensitive data leakage)
message = params.get("message", "")
if self._contains_sensitive_data(message):
return False, "Alert message contains potentially sensitive information"
# Record this validation for rate limiting
self.recent_actions["alerts"].append({
"timestamp": datetime.now(),
"recipients": recipients,
"severity": severity
})
return True, "Alert action validated"
def _contains_sensitive_data(self, text: str) -> bool:
Learning Path: RAG & AI Agents