Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Hero image for AI Agents: From Concept to Implementation

AI Agents: From Concept to Implementation

AI & Machine Learning🔥 Expert30 min readMar 23, 2026Updated Mar 24, 2026
Table of Contents
  • Prerequisites
  • The Agent Architecture: Beyond Simple Automation
  • Core Agent Components
  • Perception: Making Sense of Complex Environments
  • Planning: From Observations to Actions
  • Action Execution: Safe Operations in Production
  • Memory: Learning and Context Management
  • Building Your First Production Agent
  • Hands-On Exercise: Building a Customer Churn Prevention Agent
  • Exercise Requirements
  • Starter Code Structure
  • Your Implementation Tasks

AI Agents: From Concept to Implementation

You're leading a data science team at a mid-sized e-commerce company. Every morning, your Slack channels fill with requests: "Can you analyze why conversion rates dropped in the Northeast?" "We need the latest customer segmentation for the marketing campaign launching tomorrow." "The inventory predictions seem off—can you investigate?" Each request requires pulling data from multiple systems, running analyses, generating reports, and often following up with clarifying questions. Your team spends more time on routine analytical tasks than on strategic insights.

This scenario is exactly what AI agents are designed to solve. An AI agent isn't just a chatbot that answers questions—it's an autonomous system that can perceive its environment, make decisions, take actions, and adapt its behavior to achieve specific goals. In your case, that could be an agent that automatically detects anomalies in conversion rates, pulls relevant data from your warehouse, performs root cause analysis, and delivers actionable insights to stakeholders without any human intervention.

By the end of this lesson, you'll understand how to architect, build, and deploy production-ready AI agents that can transform how your organization handles complex, multi-step analytical workflows.

What you'll learn:

  • The fundamental architecture patterns that make AI agents different from traditional automation
  • How to design agent perception systems that integrate with real-world data sources and APIs
  • Planning and reasoning frameworks that enable agents to break down complex tasks autonomously
  • Action execution patterns for safe, reliable agent operations in production environments
  • Memory and learning systems that allow agents to improve performance over time
  • Production deployment strategies including monitoring, error handling, and human oversight mechanisms

Prerequisites

To get the most from this lesson, you should have:

  • Solid Python programming experience, particularly with async/await patterns
  • Familiarity with REST APIs and data integration concepts
  • Basic understanding of machine learning model inference (you don't need to train models, but should understand how to call them)
  • Experience with database operations and data pipeline concepts
  • Some exposure to LLMs (Large Language Models) and their capabilities, though we'll cover the agent-specific aspects

You'll need Python 3.9+ installed with the ability to install packages via pip. We'll use OpenAI's API for some examples, but the patterns apply to any LLM provider.

The Agent Architecture: Beyond Simple Automation

Traditional automation follows predetermined workflows: if X happens, do Y. AI agents operate fundamentally differently—they observe their environment, reason about what they perceive, formulate plans, and execute actions that adapt based on outcomes. This isn't just a more sophisticated if-then-else statement; it's a qualitatively different approach to problem-solving.

Core Agent Components

Every effective AI agent implements four core subsystems that work in continuous loops:

Perception: How the agent observes and interprets its environment Planning: How it decides what to do based on what it perceives Action: How it executes decisions in the real world Memory: How it learns from experience and maintains context

Let's examine each component through the lens of building a practical business intelligence agent.

Perception: Making Sense of Complex Environments

An agent's perception system determines what information it can access and how it interprets that information. Unlike humans who perceive through senses, AI agents perceive through APIs, databases, files, and other digital interfaces.

import asyncio
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import pandas as pd
import httpx

@dataclass
class Observation:
    """Represents a single piece of environmental information"""
    source: str
    timestamp: datetime
    data: Dict[str, Any]
    confidence: float
    context: Dict[str, Any]

class PerceptionSystem:
    """Manages how the agent observes its environment"""
    
    def __init__(self, data_sources: Dict[str, Any]):
        self.data_sources = data_sources
        self.observation_history = []
        self.client = httpx.AsyncClient(timeout=30.0)
    
    async def observe_sales_metrics(self) -> Observation:
        """Perceive current sales performance metrics"""
        query = """
        SELECT 
            DATE(order_date) as date,
            region,
            SUM(revenue) as daily_revenue,
            COUNT(DISTINCT customer_id) as unique_customers,
            AVG(order_value) as avg_order_value
        FROM sales_orders 
        WHERE order_date >= CURRENT_DATE - INTERVAL 7 DAYS
        GROUP BY DATE(order_date), region
        ORDER BY date DESC, region
        """
        
        # Simulate database query
        raw_data = await self._execute_query(query)
        
        # Transform raw data into meaningful observations
        processed_data = self._analyze_sales_trends(raw_data)
        
        return Observation(
            source="sales_database",
            timestamp=datetime.now(),
            data=processed_data,
            confidence=0.95,  # High confidence in database data
            context={"query": query, "row_count": len(raw_data)}
        )
    
    async def observe_customer_behavior(self) -> Observation:
        """Perceive customer engagement patterns"""
        # Check website analytics API
        analytics_response = await self.client.get(
            f"{self.data_sources['analytics_api']}/engagement",
            headers={"Authorization": f"Bearer {self.data_sources['api_key']}"}
        )
        
        if analytics_response.status_code != 200:
            return Observation(
                source="web_analytics",
                timestamp=datetime.now(),
                data={},
                confidence=0.0,  # No confidence when data unavailable
                context={"error": f"API returned {analytics_response.status_code}"}
            )
        
        analytics_data = analytics_response.json()
        
        # Interpret the raw analytics data
        behavior_insights = {
            "bounce_rate_change": self._calculate_bounce_rate_trend(analytics_data),
            "conversion_funnel": self._analyze_conversion_funnel(analytics_data),
            "traffic_sources": analytics_data.get("traffic_sources", {}),
            "user_segments": self._segment_user_behavior(analytics_data)
        }
        
        return Observation(
            source="web_analytics",
            timestamp=datetime.now(),
            data=behavior_insights,
            confidence=0.85,  # Moderate confidence due to sampling
            context={"api_response_time": analytics_response.elapsed.total_seconds()}
        )
    
    def _analyze_sales_trends(self, raw_data: List[Dict]) -> Dict[str, Any]:
        """Convert raw sales data into trend insights"""
        df = pd.DataFrame(raw_data)
        
        # Calculate week-over-week growth
        current_week = df[df['date'] >= (datetime.now() - timedelta(days=7))]
        previous_week = df[(df['date'] >= (datetime.now() - timedelta(days=14))) & 
                          (df['date'] < (datetime.now() - timedelta(days=7)))]
        
        current_revenue = current_week['daily_revenue'].sum()
        previous_revenue = previous_week['daily_revenue'].sum()
        
        growth_rate = ((current_revenue - previous_revenue) / previous_revenue) if previous_revenue > 0 else 0
        
        return {
            "revenue_trend": {
                "current_week": current_revenue,
                "previous_week": previous_revenue,
                "growth_rate": growth_rate,
                "trend_direction": "increasing" if growth_rate > 0.05 else "decreasing" if growth_rate < -0.05 else "stable"
            },
            "regional_performance": df.groupby('region')['daily_revenue'].sum().to_dict(),
            "customer_metrics": {
                "total_customers": df['unique_customers'].sum(),
                "avg_order_value": df['avg_order_value'].mean()
            }
        }

This perception system doesn't just collect data—it interprets it. Notice how observe_sales_metrics() doesn't return raw database rows; it returns processed insights like growth rates and trend directions. The agent perceives meaning, not just data points.

Pro Tip: Always include confidence scores in observations. Real-world data sources have varying reliability, and your agent needs to reason about uncertainty. A database query might have 95% confidence, while a third-party API during maintenance might have much lower confidence.

Planning: From Observations to Actions

Once an agent perceives its environment, it needs to decide what to do. This is where traditional automation breaks down—it can only follow predefined rules. AI agents use reasoning to formulate dynamic plans based on what they observe.

from enum import Enum
from typing import List, Tuple, Union
import openai

class ActionType(Enum):
    INVESTIGATE = "investigate"
    ALERT = "alert"
    REPORT = "report"
    OPTIMIZE = "optimize"
    ESCALATE = "escalate"
    WAIT = "wait"

@dataclass
class PlannedAction:
    action_type: ActionType
    target: str
    parameters: Dict[str, Any]
    priority: int  # 1=highest, 5=lowest
    estimated_duration: int  # seconds
    dependencies: List[str]  # other actions that must complete first

class PlanningSystem:
    """Handles agent decision-making and action planning"""
    
    def __init__(self, llm_client, business_rules: Dict[str, Any]):
        self.llm_client = llm_client
        self.business_rules = business_rules
        self.active_plans = []
    
    async def formulate_plan(self, observations: List[Observation]) -> List[PlannedAction]:
        """Create a plan based on current observations"""
        
        # First, apply deterministic business rules
        rule_based_actions = self._apply_business_rules(observations)
        
        # Then, use LLM reasoning for complex scenarios
        context_summary = self._summarize_observations(observations)
        
        llm_prompt = f"""
        You are a business intelligence agent analyzing the following situation:
        
        {context_summary}
        
        Current business rules triggered: {[action.action_type.value for action in rule_based_actions]}
        
        Based on this information, what additional actions should be taken? Consider:
        1. Root cause analysis needs
        2. Stakeholder notification requirements  
        3. Preventive measures
        4. Follow-up investigations
        
        Respond with a JSON array of actions, each with:
        - action_type: one of {[e.value for e in ActionType]}
        - target: what/who the action applies to
        - parameters: specific details for execution
        - priority: 1-5 (1=urgent)
        - reasoning: why this action is needed
        """
        
        response = await self.llm_client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": llm_prompt}],
            temperature=0.3  # Lower temperature for more consistent planning
        )
        
        llm_actions = self._parse_llm_response(response.choices[0].message.content)
        
        # Combine rule-based and LLM-generated actions
        all_actions = rule_based_actions + llm_actions
        
        # Optimize the plan (remove conflicts, sequence dependencies)
        optimized_plan = self._optimize_action_sequence(all_actions)
        
        return optimized_plan
    
    def _apply_business_rules(self, observations: List[Observation]) -> List[PlannedAction]:
        """Apply deterministic business logic"""
        actions = []
        
        for obs in observations:
            if obs.source == "sales_database":
                revenue_data = obs.data.get("revenue_trend", {})
                growth_rate = revenue_data.get("growth_rate", 0)
                
                # Rule: Alert on significant revenue drops
                if growth_rate < -0.15:  # 15% decline
                    actions.append(PlannedAction(
                        action_type=ActionType.ALERT,
                        target="revenue_drop",
                        parameters={
                            "severity": "high",
                            "growth_rate": growth_rate,
                            "recipients": ["sales_director", "ceo"],
                            "message": f"Revenue declined {growth_rate:.1%} week-over-week"
                        },
                        priority=1,
                        estimated_duration=30,
                        dependencies=[]
                    ))
                    
                    # Follow up with investigation
                    actions.append(PlannedAction(
                        action_type=ActionType.INVESTIGATE,
                        target="revenue_decline_causes",
                        parameters={
                            "focus_areas": ["regional_breakdown", "product_mix", "customer_segments"],
                            "time_window": "last_14_days"
                        },
                        priority=2,
                        estimated_duration=300,
                        dependencies=["alert_revenue_drop"]
                    ))
        
        return actions
    
    def _optimize_action_sequence(self, actions: List[PlannedAction]) -> List[PlannedAction]:
        """Optimize action ordering and remove conflicts"""
        # Sort by priority, then handle dependencies
        sorted_actions = sorted(actions, key=lambda x: x.priority)
        
        # Simple dependency resolution (in production, use a proper DAG)
        optimized = []
        completed = set()
        
        while sorted_actions:
            for i, action in enumerate(sorted_actions):
                if all(dep in completed for dep in action.dependencies):
                    optimized.append(action)
                    completed.add(f"{action.action_type.value}_{action.target}")
                    sorted_actions.pop(i)
                    break
            else:
                # No action without unsatisfied dependencies - possible circular dependency
                optimized.extend(sorted_actions)  # Add remaining actions anyway
                break
        
        return optimized

The planning system combines deterministic business rules with LLM reasoning. Business rules handle clear-cut scenarios (revenue drops below threshold → send alert), while the LLM handles nuanced situations that require contextual understanding.

Notice how actions include dependencies. This prevents the agent from sending a detailed analysis report before it's actually performed the investigation. Real agents need to sequence their actions logically.

Action Execution: Safe Operations in Production

Planning means nothing without reliable execution. Agent actions often have real-world consequences—sending alerts to executives, making API calls to external systems, or triggering automated processes. The execution system must be robust, observable, and safe.

import asyncio
from typing import Callable, Optional
import logging
from contextlib import asynccontextmanager

class ActionExecutionError(Exception):
    """Raised when action execution fails"""
    pass

class ActionExecutor:
    """Handles safe execution of planned actions"""
    
    def __init__(self, integrations: Dict[str, Any], dry_run: bool = False):
        self.integrations = integrations
        self.dry_run = dry_run
        self.execution_history = []
        self.logger = logging.getLogger(__name__)
    
    async def execute_action(self, action: PlannedAction) -> Dict[str, Any]:
        """Execute a single planned action with full error handling"""
        
        execution_id = f"{action.action_type.value}_{action.target}_{datetime.now().isoformat()}"
        
        self.logger.info(f"Starting execution: {execution_id}")
        
        if self.dry_run:
            return await self._simulate_action(action)
        
        try:
            # Pre-execution validation
            await self._validate_action(action)
            
            # Execute based on action type
            result = await self._dispatch_action(action)
            
            # Post-execution verification
            verification_result = await self._verify_action_result(action, result)
            
            execution_record = {
                "execution_id": execution_id,
                "action": action,
                "result": result,
                "verification": verification_result,
                "status": "success",
                "timestamp": datetime.now(),
                "duration": verification_result.get("duration", 0)
            }
            
            self.execution_history.append(execution_record)
            self.logger.info(f"Successfully executed: {execution_id}")
            
            return execution_record
            
        except Exception as e:
            error_record = {
                "execution_id": execution_id,
                "action": action,
                "error": str(e),
                "status": "failed",
                "timestamp": datetime.now()
            }
            
            self.execution_history.append(error_record)
            self.logger.error(f"Failed to execute {execution_id}: {e}")
            
            # Attempt recovery
            await self._handle_execution_failure(action, e)
            
            raise ActionExecutionError(f"Action execution failed: {e}")
    
    async def _dispatch_action(self, action: PlannedAction) -> Dict[str, Any]:
        """Route action to appropriate executor based on type"""
        
        dispatch_map = {
            ActionType.ALERT: self._execute_alert,
            ActionType.INVESTIGATE: self._execute_investigation,
            ActionType.REPORT: self._execute_report,
            ActionType.OPTIMIZE: self._execute_optimization,
            ActionType.ESCALATE: self._execute_escalation
        }
        
        executor = dispatch_map.get(action.action_type)
        if not executor:
            raise ActionExecutionError(f"No executor for action type: {action.action_type}")
        
        return await executor(action)
    
    async def _execute_alert(self, action: PlannedAction) -> Dict[str, Any]:
        """Execute alert actions (notifications, messages)"""
        params = action.parameters
        
        # Send to Slack
        if "slack_channel" in params or "recipients" in params:
            slack_result = await self._send_slack_alert(
                message=params.get("message", "Alert triggered"),
                severity=params.get("severity", "medium"),
                recipients=params.get("recipients", []),
                channel=params.get("slack_channel", "#alerts")
            )
        
        # Send email for high-severity alerts
        if params.get("severity") == "high":
            email_result = await self._send_email_alert(
                subject=f"High Priority Alert: {action.target}",
                body=params.get("message", ""),
                recipients=params.get("recipients", [])
            )
        
        return {
            "alert_type": "multi_channel",
            "channels_used": ["slack", "email"] if params.get("severity") == "high" else ["slack"],
            "recipients_notified": len(params.get("recipients", [])),
            "timestamp": datetime.now().isoformat()
        }
    
    async def _execute_investigation(self, action: PlannedAction) -> Dict[str, Any]:
        """Execute investigation actions (data analysis, root cause analysis)"""
        params = action.parameters
        focus_areas = params.get("focus_areas", [])
        time_window = params.get("time_window", "last_7_days")
        
        investigation_results = {}
        
        for area in focus_areas:
            if area == "regional_breakdown":
                investigation_results[area] = await self._analyze_regional_performance(time_window)
            elif area == "product_mix":
                investigation_results[area] = await self._analyze_product_performance(time_window)
            elif area == "customer_segments":
                investigation_results[area] = await self._analyze_customer_segments(time_window)
        
        # Generate summary insights using LLM
        summary = await self._generate_investigation_summary(investigation_results)
        
        return {
            "investigation_id": f"inv_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
            "focus_areas": focus_areas,
            "detailed_results": investigation_results,
            "summary_insights": summary,
            "confidence_score": self._calculate_investigation_confidence(investigation_results)
        }
    
    async def _analyze_regional_performance(self, time_window: str) -> Dict[str, Any]:
        """Specific analysis for regional performance issues"""
        # This would connect to your actual data warehouse
        query = f"""
        SELECT 
            region,
            SUM(revenue) as total_revenue,
            COUNT(DISTINCT order_id) as order_count,
            AVG(order_value) as avg_order_value,
            COUNT(DISTINCT customer_id) as unique_customers
        FROM sales_orders 
        WHERE order_date >= CURRENT_DATE - INTERVAL '{time_window}' 
        GROUP BY region
        ORDER BY total_revenue DESC
        """
        
        # Simulate database results
        regional_data = [
            {"region": "Northeast", "total_revenue": 125000, "order_count": 1250, "avg_order_value": 100, "unique_customers": 800},
            {"region": "Southeast", "total_revenue": 98000, "order_count": 1100, "avg_order_value": 89, "unique_customers": 750},
            {"region": "West", "total_revenue": 156000, "order_count": 1400, "avg_order_value": 111, "unique_customers": 950},
        ]
        
        # Calculate insights
        total_revenue = sum(r["total_revenue"] for r in regional_data)
        
        insights = {
            "regional_breakdown": regional_data,
            "underperforming_regions": [
                r for r in regional_data 
                if r["total_revenue"] < (total_revenue / len(regional_data)) * 0.8
            ],
            "key_findings": [
                "Northeast region showing 23% decline in average order value",
                "West region outperforming with highest customer acquisition"
            ]
        }
        
        return insights
    
    async def _send_slack_alert(self, message: str, severity: str, recipients: List[str], channel: str) -> Dict[str, Any]:
        """Send alert to Slack workspace"""
        
        # Format message based on severity
        severity_emoji = {"high": "🚨", "medium": "⚠️", "low": "ℹ️"}
        formatted_message = f"{severity_emoji.get(severity, 'ℹ️')} **{severity.upper()} ALERT**\n\n{message}"
        
        # In production, use actual Slack API
        slack_payload = {
            "channel": channel,
            "text": formatted_message,
            "username": "BusinessIntelligenceAgent",
            "icon_emoji": ":robot_face:"
        }
        
        # Simulate API call
        await asyncio.sleep(0.1)  # Simulate network delay
        
        return {
            "slack_channel": channel,
            "message_sent": True,
            "recipients_mentioned": recipients,
            "timestamp": datetime.now().isoformat()
        }

    @asynccontextmanager
    async def execution_context(self, action: PlannedAction):
        """Context manager for safe action execution with cleanup"""
        execution_start = datetime.now()
        temp_resources = []
        
        try:
            # Pre-execution setup
            if action.action_type == ActionType.INVESTIGATE:
                # Set up temporary analysis workspace
                temp_workspace = f"/tmp/investigation_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
                temp_resources.append(temp_workspace)
            
            yield temp_resources
            
        finally:
            # Cleanup regardless of success/failure
            execution_duration = (datetime.now() - execution_start).total_seconds()
            self.logger.info(f"Action {action.action_type.value} completed in {execution_duration:.2f}s")
            
            # Clean up temporary resources
            for resource in temp_resources:
                try:
                    # Cleanup logic here (delete temp files, close connections, etc.)
                    pass
                except Exception as cleanup_error:
                    self.logger.warning(f"Failed to cleanup {resource}: {cleanup_error}")

The execution system includes several critical production features:

Circuit breakers: The _validate_action() method (implementation omitted for brevity) checks whether it's safe to execute an action. For example, it might prevent sending alerts if too many have been sent recently.

Idempotency: Actions should be safe to retry. The execution_id helps track whether an action was already executed.

Resource management: The execution_context ensures cleanup happens even if actions fail partway through.

Observability: Every action execution is logged with structured data, making it easy to debug issues and analyze agent behavior.

Memory: Learning and Context Management

The most sophisticated planning and execution systems are limited without memory. Agents need to remember what they've learned, what actions they've taken, and how those actions performed. Memory isn't just storage—it's the foundation for improvement and contextual decision-making.

from datetime import datetime, timedelta
import sqlite3
import json
from typing import Dict, List, Any, Optional, Tuple
import numpy as np
from collections import defaultdict, deque

class MemorySystem:
    """Manages agent memory, learning, and context"""
    
    def __init__(self, db_path: str = "agent_memory.db", max_working_memory: int = 100):
        self.db_path = db_path
        self.max_working_memory = max_working_memory
        
        # Working memory for recent, frequently accessed information
        self.working_memory = deque(maxlen=max_working_memory)
        
        # Initialize persistent storage
        self._initialize_database()
        
        # Performance tracking for continuous learning
        self.action_outcomes = defaultdict(list)
        
    def _initialize_database(self):
        """Set up persistent memory storage"""
        with sqlite3.connect(self.db_path) as conn:
            conn.executescript("""
                CREATE TABLE IF NOT EXISTS observations (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TEXT NOT NULL,
                    source TEXT NOT NULL,
                    data TEXT NOT NULL,
                    confidence REAL NOT NULL,
                    context TEXT NOT NULL
                );
                
                CREATE TABLE IF NOT EXISTS actions_taken (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TEXT NOT NULL,
                    action_type TEXT NOT NULL,
                    target TEXT NOT NULL,
                    parameters TEXT NOT NULL,
                    outcome TEXT NOT NULL,
                    effectiveness_score REAL
                );
                
                CREATE TABLE IF NOT EXISTS learned_patterns (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    pattern_type TEXT NOT NULL,
                    pattern_data TEXT NOT NULL,
                    confidence REAL NOT NULL,
                    last_updated TEXT NOT NULL,
                    usage_count INTEGER DEFAULT 0
                );
                
                CREATE TABLE IF NOT EXISTS context_states (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TEXT NOT NULL,
                    state_data TEXT NOT NULL,
                    triggers TEXT NOT NULL,
                    outcomes TEXT NOT NULL
                );
                
                CREATE INDEX IF NOT EXISTS idx_observations_timestamp ON observations(timestamp);
                CREATE INDEX IF NOT EXISTS idx_actions_timestamp ON actions_taken(timestamp);
                CREATE INDEX IF NOT EXISTS idx_patterns_type ON learned_patterns(pattern_type);
            """)
    
    async def store_observation(self, observation: Observation) -> None:
        """Store observation in both working and persistent memory"""
        
        # Add to working memory
        self.working_memory.append({
            "type": "observation",
            "timestamp": observation.timestamp,
            "data": observation
        })
        
        # Store in persistent memory
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT INTO observations (timestamp, source, data, confidence, context)
                VALUES (?, ?, ?, ?, ?)
            """, (
                observation.timestamp.isoformat(),
                observation.source,
                json.dumps(observation.data),
                observation.confidence,
                json.dumps(observation.context)
            ))
    
    async def store_action_outcome(self, action: PlannedAction, outcome: Dict[str, Any], effectiveness_score: float) -> None:
        """Store action results for learning"""
        
        # Update working memory
        self.working_memory.append({
            "type": "action_outcome",
            "timestamp": datetime.now(),
            "action": action,
            "outcome": outcome,
            "effectiveness": effectiveness_score
        })
        
        # Track for pattern learning
        self.action_outcomes[action.action_type.value].append({
            "parameters": action.parameters,
            "outcome": outcome,
            "effectiveness": effectiveness_score,
            "timestamp": datetime.now()
        })
        
        # Persistent storage
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT INTO actions_taken (timestamp, action_type, target, parameters, outcome, effectiveness_score)
                VALUES (?, ?, ?, ?, ?, ?)
            """, (
                datetime.now().isoformat(),
                action.action_type.value,
                action.target,
                json.dumps(action.parameters),
                json.dumps(outcome),
                effectiveness_score
            ))
    
    async def learn_patterns(self) -> Dict[str, Any]:
        """Analyze stored experiences to learn patterns"""
        
        patterns_learned = {}
        
        # Learn from action effectiveness
        for action_type, outcomes in self.action_outcomes.items():
            if len(outcomes) >= 5:  # Minimum samples for pattern learning
                pattern = self._analyze_action_patterns(action_type, outcomes)
                if pattern["confidence"] > 0.7:  # Only store high-confidence patterns
                    patterns_learned[f"action_effectiveness_{action_type}"] = pattern
                    await self._store_learned_pattern(f"action_effectiveness_{action_type}", pattern)
        
        # Learn situational context patterns
        context_patterns = await self._analyze_context_patterns()
        patterns_learned.update(context_patterns)
        
        return patterns_learned
    
    def _analyze_action_patterns(self, action_type: str, outcomes: List[Dict]) -> Dict[str, Any]:
        """Analyze effectiveness patterns for specific action types"""
        
        # Group by similar parameters
        parameter_groups = defaultdict(list)
        
        for outcome in outcomes:
            # Create a simplified parameter signature for grouping
            param_signature = self._create_parameter_signature(outcome["parameters"])
            parameter_groups[param_signature].append(outcome["effectiveness"])
        
        # Find the most effective parameter patterns
        best_patterns = {}
        for signature, effectiveness_scores in parameter_groups.items():
            if len(effectiveness_scores) >= 3:  # Minimum samples
                avg_effectiveness = np.mean(effectiveness_scores)
                std_effectiveness = np.std(effectiveness_scores)
                
                best_patterns[signature] = {
                    "avg_effectiveness": avg_effectiveness,
                    "consistency": 1.0 / (1.0 + std_effectiveness),  # Lower std = higher consistency
                    "sample_count": len(effectiveness_scores)
                }
        
        # Find the overall best pattern
        if best_patterns:
            best_signature = max(best_patterns.keys(), 
                               key=lambda x: best_patterns[x]["avg_effectiveness"] * best_patterns[x]["consistency"])
            
            return {
                "action_type": action_type,
                "best_parameters": best_signature,
                "effectiveness": best_patterns[best_signature]["avg_effectiveness"],
                "confidence": min(0.95, best_patterns[best_signature]["consistency"] * 
                                (best_patterns[best_signature]["sample_count"] / 10)),
                "learned_at": datetime.now().isoformat()
            }
        
        return {"confidence": 0.0}
    
    def _create_parameter_signature(self, parameters: Dict[str, Any]) -> str:
        """Create a simplified signature for parameter grouping"""
        # Extract key characteristics rather than exact values
        signature_parts = []
        
        if "severity" in parameters:
            signature_parts.append(f"severity_{parameters['severity']}")
        
        if "recipients" in parameters:
            signature_parts.append(f"recipient_count_{len(parameters['recipients'])}")
        
        if "focus_areas" in parameters:
            signature_parts.append(f"focus_areas_{len(parameters['focus_areas'])}")
        
        return "_".join(signature_parts) if signature_parts else "default"
    
    async def retrieve_relevant_context(self, current_situation: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Retrieve relevant past experiences for current situation"""
        
        relevant_memories = []
        
        # Check working memory first (fastest access)
        for memory_item in reversed(self.working_memory):  # Most recent first
            relevance_score = self._calculate_relevance(memory_item, current_situation)
            if relevance_score > 0.5:
                relevant_memories.append({
                    "memory": memory_item,
                    "relevance": relevance_score,
                    "recency": 1.0  # Working memory is recent by definition
                })
        
        # Query persistent memory for similar situations
        with sqlite3.connect(self.db_path) as conn:
            # Look for similar observations
            recent_observations = conn.execute("""
                SELECT timestamp, source, data, confidence, context
                FROM observations 
                WHERE timestamp > datetime('now', '-30 days')
                ORDER BY timestamp DESC
                LIMIT 50
            """).fetchall()
            
            for obs_row in recent_observations:
                obs_data = json.loads(obs_row[2])
                relevance = self._calculate_data_relevance(obs_data, current_situation)
                
                if relevance > 0.4:
                    age_days = (datetime.now() - datetime.fromisoformat(obs_row[0])).days
                    recency_factor = max(0.1, 1.0 - (age_days / 30))  # Decay over 30 days
                    
                    relevant_memories.append({
                        "memory": {
                            "type": "historical_observation",
                            "timestamp": obs_row[0],
                            "source": obs_row[1],
                            "data": obs_data,
                            "confidence": obs_row[3]
                        },
                        "relevance": relevance,
                        "recency": recency_factor
                    })
        
        # Sort by combined relevance and recency
        relevant_memories.sort(key=lambda x: x["relevance"] * x["recency"], reverse=True)
        
        return relevant_memories[:10]  # Return top 10 most relevant memories
    
    def _calculate_relevance(self, memory_item: Dict[str, Any], current_situation: Dict[str, Any]) -> float:
        """Calculate how relevant a memory is to current situation"""
        
        if memory_item["type"] == "observation":
            return self._calculate_data_relevance(memory_item["data"].data, current_situation)
        elif memory_item["type"] == "action_outcome":
            # Check if similar action might be relevant
            action = memory_item["action"]
            if action.action_type.value in current_situation.get("potential_actions", []):
                return 0.8
        
        return 0.0
    
    def _calculate_data_relevance(self, memory_data: Dict[str, Any], current_data: Dict[str, Any]) -> float:
        """Calculate similarity between data structures"""
        
        # Simple relevance based on shared keys and similar values
        shared_keys = set(memory_data.keys()) & set(current_data.keys())
        if not shared_keys:
            return 0.0
        
        relevance_scores = []
        
        for key in shared_keys:
            if isinstance(memory_data[key], (int, float)) and isinstance(current_data[key], (int, float)):
                # Numerical similarity
                if memory_data[key] != 0:
                    similarity = 1.0 - abs(memory_data[key] - current_data[key]) / abs(memory_data[key])
                    relevance_scores.append(max(0, similarity))
            elif isinstance(memory_data[key], str) and isinstance(current_data[key], str):
                # String similarity (simple approach)
                if memory_data[key] == current_data[key]:
                    relevance_scores.append(1.0)
                elif memory_data[key].lower() in current_data[key].lower() or current_data[key].lower() in memory_data[key].lower():
                    relevance_scores.append(0.5)
        
        return np.mean(relevance_scores) if relevance_scores else 0.0

    async def get_performance_insights(self) -> Dict[str, Any]:
        """Analyze agent performance over time"""
        
        with sqlite3.connect(self.db_path) as conn:
            # Analyze action effectiveness trends
            recent_actions = conn.execute("""
                SELECT action_type, effectiveness_score, timestamp
                FROM actions_taken 
                WHERE timestamp > datetime('now', '-30 days')
                ORDER BY timestamp DESC
            """).fetchall()
            
            # Calculate trends by action type
            action_trends = defaultdict(list)
            for action_type, effectiveness, timestamp in recent_actions:
                action_trends[action_type].append({
                    "effectiveness": effectiveness,
                    "timestamp": datetime.fromisoformat(timestamp)
                })
            
            # Calculate improvement rates
            insights = {}
            for action_type, records in action_trends.items():
                if len(records) >= 5:
                    # Sort by timestamp
                    records.sort(key=lambda x: x["timestamp"])
                    
                    # Calculate trend
                    effectiveness_scores = [r["effectiveness"] for r in records]
                    time_indices = list(range(len(effectiveness_scores)))
                    
                    # Simple linear trend
                    correlation = np.corrcoef(time_indices, effectiveness_scores)[0, 1] if len(effectiveness_scores) > 1 else 0
                    
                    insights[action_type] = {
                        "average_effectiveness": np.mean(effectiveness_scores),
                        "improvement_trend": correlation,  # Positive = improving
                        "consistency": 1.0 - np.std(effectiveness_scores),
                        "sample_count": len(records)
                    }
            
            return {
                "performance_by_action": insights,
                "overall_trend": np.mean([insight["improvement_trend"] for insight in insights.values()]) if insights else 0,
                "total_actions": len(recent_actions),
                "analysis_period": "30_days"
            }

This memory system provides several key capabilities:

Working Memory: Fast access to recent information for immediate context.

Pattern Learning: The system automatically identifies which parameters lead to more effective actions.

Context Retrieval: When facing a new situation, the agent can recall similar past experiences.

Performance Tracking: The agent monitors its own effectiveness and can identify areas for improvement.

Important: Memory systems can become performance bottlenecks. The working memory with a maximum size prevents unbounded growth, while the relevance scoring ensures the agent focuses on the most pertinent past experiences rather than everything it has ever seen.

Building Your First Production Agent

Now that we understand the core components, let's build a complete business intelligence agent that can autonomously monitor sales performance, investigate issues, and alert stakeholders.

import asyncio
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import logging

class BusinessIntelligenceAgent:
    """Complete autonomous business intelligence agent"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = logging.getLogger(__name__)
        
        # Initialize subsystems
        self.perception = PerceptionSystem(config["data_sources"])
        self.planning = PlanningSystem(config["llm_client"], config["business_rules"])
        self.execution = ActionExecutor(config["integrations"], dry_run=config.get("dry_run", False))
        self.memory = MemorySystem(config.get("db_path", "agent_memory.db"))
        
        # Agent state
        self.running = False
        self.cycle_count = 0
        self.last_full_analysis = None
        
    async def start(self) -> None:
        """Start the agent's main execution loop"""
        self.running = True
        self.logger.info("Business Intelligence Agent starting...")
        
        try:
            while self.running:
                cycle_start = datetime.now()
                
                # Execute one complete agent cycle
                await self.execute_cycle()
                
                # Learn from recent experiences
                if self.cycle_count % 10 == 0:  # Learn every 10 cycles
                    learned_patterns = await self.memory.learn_patterns()
                    self.logger.info(f"Learned {len(learned_patterns)} new patterns")
                
                # Calculate sleep time to maintain cycle frequency
                cycle_duration = (datetime.now() - cycle_start).total_seconds()
                sleep_time = max(0, self.config.get("cycle_interval", 300) - cycle_duration)
                
                self.logger.info(f"Cycle {self.cycle_count} completed in {cycle_duration:.2f}s, sleeping {sleep_time:.2f}s")
                
                if sleep_time > 0:
                    await asyncio.sleep(sleep_time)
                
                self.cycle_count += 1
                
        except Exception as e:
            self.logger.error(f"Agent failed: {e}")
            raise
        finally:
            self.logger.info("Business Intelligence Agent stopped")
    
    async def execute_cycle(self) -> Dict[str, Any]:
        """Execute one complete perception -> planning -> action cycle"""
        
        cycle_results = {
            "cycle_id": self.cycle_count,
            "timestamp": datetime.now(),
            "observations": [],
            "plans": [],
            "actions": [],
            "errors": []
        }
        
        try:
            # PERCEPTION PHASE
            observations = await self.gather_observations()
            cycle_results["observations"] = observations
            
            # Store observations in memory
            for obs in observations:
                await self.memory.store_observation(obs)
            
            # PLANNING PHASE
            # Retrieve relevant context from memory
            current_situation = self._summarize_current_situation(observations)
            relevant_context = await self.memory.retrieve_relevant_context(current_situation)
            
            # Create plan based on observations and context
            planned_actions = await self.planning.formulate_plan(observations)
            
            # Enhance plan with learned patterns
            enhanced_actions = await self._enhance_plan_with_memory(planned_actions, relevant_context)
            cycle_results["plans"] = enhanced_actions
            
            # ACTION PHASE
            action_results = []
            for action in enhanced_actions[:3]:  # Execute top 3 priority actions per cycle
                try:
                    result = await self.execution.execute_action(action)
                    action_results.append(result)
                    
                    # Calculate effectiveness score
                    effectiveness = self._calculate_action_effectiveness(action, result)
                    await self.memory.store_action_outcome(action, result, effectiveness)
                    
                except Exception as action_error:
                    self.logger.error(f"Action failed: {action_error}")
                    cycle_results["errors"].append({
                        "action": action,
                        "error": str(action_error)
                    })
            
            cycle_results["actions"] = action_results
            
            return cycle_results
            
        except Exception as e:
            self.logger.error(f"Cycle {self.cycle_count} failed: {e}")
            cycle_results["errors"].append({"cycle_error": str(e)})
            return cycle_results
    
    async def gather_observations(self) -> List[Observation]:
        """Gather all environmental observations"""
        
        observation_tasks = [
            self.perception.observe_sales_metrics(),
            self.perception.observe_customer_behavior(),
        ]
        
        # Add conditional observations based on schedule or triggers
        current_hour = datetime.now().hour
        if current_hour == 9:  # Morning deep dive
            observation_tasks.append(self._observe_competitive_landscape())
        
        if datetime.now().weekday() == 0:  # Monday weekly summary
            observation_tasks.append(self._observe_weekly_summary())
        
        # Execute all observations concurrently
        observations = []
        results = await asyncio.gather(*observation_tasks, return_exceptions=True)
        
        for result in results:
            if isinstance(result, Observation):
                observations.append(result)
            elif isinstance(result, Exception):
                self.logger.warning(f"Observation failed: {result}")
        
        return observations
    
    def _summarize_current_situation(self, observations: List[Observation]) -> Dict[str, Any]:
        """Create a summary of current situation for context retrieval"""
        
        situation = {
            "timestamp": datetime.now().isoformat(),
            "observation_sources": [obs.source for obs in observations],
            "confidence_levels": [obs.confidence for obs in observations],
        }
        
        # Extract key metrics for similarity matching
        for obs in observations:
            if obs.source == "sales_database":
                revenue_data = obs.data.get("revenue_trend", {})
                situation.update({
                    "revenue_growth_rate": revenue_data.get("growth_rate", 0),
                    "revenue_trend_direction": revenue_data.get("trend_direction", "stable")
                })
            
            elif obs.source == "web_analytics":
                behavior_data = obs.data
                situation.update({
                    "bounce_rate_trend": behavior_data.get("bounce_rate_change", 0),
                    "conversion_rate": behavior_data.get("conversion_funnel", {}).get("overall_rate", 0)
                })
        
        return situation
    
    async def _enhance_plan_with_memory(self, planned_actions: List[PlannedAction], 
                                      relevant_context: List[Dict[str, Any]]) -> List[PlannedAction]:
        """Enhance planned actions using learned patterns and past experiences"""
        
        enhanced_actions = []
        
        for action in planned_actions:
            # Check if we have learned patterns for this action type
            pattern_key = f"action_effectiveness_{action.action_type.value}"
            
            # Look for learned patterns in memory
            with sqlite3.connect(self.memory.db_path) as conn:
                pattern_result = conn.execute("""
                    SELECT pattern_data FROM learned_patterns 
                    WHERE pattern_type = ? AND confidence > 0.7
                    ORDER BY last_updated DESC LIMIT 1
                """, (pattern_key,)).fetchone()
                
                if pattern_result:
                    pattern_data = json.loads(pattern_result[0])
                    
                    # Adjust action parameters based on learned effectiveness
                    enhanced_params = action.parameters.copy()
                    
                    if "best_parameters" in pattern_data:
                        best_params = pattern_data["best_parameters"]
                        
                        # Apply learned parameter optimizations
                        if "severity_high" in best_params and action.action_type == ActionType.ALERT:
                            enhanced_params["severity"] = "high"
                        
                        if "focus_areas_3" in best_params and action.action_type == ActionType.INVESTIGATE:
                            # Ensure we have optimal number of focus areas
                            current_areas = enhanced_params.get("focus_areas", [])
                            if len(current_areas) < 3:
                                enhanced_params["focus_areas"].extend(["customer_segments", "product_mix"][:3-len(current_areas)])
                    
                    # Create enhanced action
                    enhanced_action = PlannedAction(
                        action_type=action.action_type,
                        target=action.target,
                        parameters=enhanced_params,
                        priority=action.priority,
                        estimated_duration=action.estimated_duration,
                        dependencies=action.dependencies
                    )
                    
                    enhanced_actions.append(enhanced_action)
                else:
                    enhanced_actions.append(action)  # No learned patterns, use original
        
        return enhanced_actions
    
    def _calculate_action_effectiveness(self, action: PlannedAction, result: Dict[str, Any]) -> float:
        """Calculate how effective an action was"""
        
        if result.get("status") == "failed":
            return 0.0
        
        effectiveness_score = 0.5  # Baseline for successful completion
        
        # Action-type specific effectiveness calculation
        if action.action_type == ActionType.ALERT:
            # Measure by recipient engagement (simplified)
            recipients_notified = result.get("recipients_notified", 0)
            if recipients_notified > 0:
                effectiveness_score += 0.3
            
            # Higher severity alerts that completed successfully are more effective
            if action.parameters.get("severity") == "high":
                effectiveness_score += 0.2
        
        elif action.action_type == ActionType.INVESTIGATE:
            # Measure by insight quality
            confidence_score = result.get("confidence_score", 0)
            effectiveness_score += confidence_score * 0.4
            
            # More focus areas generally means more thorough investigation
            focus_areas_count = len(action.parameters.get("focus_areas", []))
            effectiveness_score += min(0.1 * focus_areas_count, 0.3)
        
        elif action.action_type == ActionType.REPORT:
            # Measure by completeness and timeliness
            if result.get("report_generated"):
                effectiveness_score += 0.3
            
            execution_time = result.get("duration", float('inf'))
            if execution_time < action.estimated_duration:
                effectiveness_score += 0.2  # Bonus for completing faster than estimated
        
        return min(1.0, effectiveness_score)  # Cap at 1.0
    
    async def stop(self) -> None:
        """Gracefully stop the agent"""
        self.logger.info("Stopping Business Intelligence Agent...")
        self.running = False
        
        # Get final performance report
        performance_insights = await self.memory.get_performance_insights()
        self.logger.info(f"Final performance report: {performance_insights}")

This complete agent implementation demonstrates several production-ready patterns:

Autonomous Operation: The agent runs continuously, making decisions without human intervention while providing full observability.

Graceful Error Handling: Individual failures don't crash the entire system. The agent logs errors and continues operation.

Learning Integration: The agent actively uses its memory system to improve performance over time.

Configurable Behavior: Business rules, data sources, and behavior parameters are externalized in configuration.

Hands-On Exercise: Building a Customer Churn Prevention Agent

Let's build a practical agent that demonstrates the concepts we've covered. This agent will monitor customer behavior patterns, predict potential churn, and take proactive actions to retain at-risk customers.

Exercise Requirements

Build an agent with these capabilities:

  1. Monitor customer engagement metrics from multiple sources (transaction data, support tickets, website activity)
  2. Identify churn risk patterns using both rule-based logic and learned behaviors
  3. Execute retention actions like personalized offers, proactive support outreach, or account manager alerts
  4. Learn and improve its churn prediction accuracy over time

Starter Code Structure

# customer_churn_agent.py

import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum

class ChurnRiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"  
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class CustomerRiskProfile:
    customer_id: str
    risk_level: ChurnRiskLevel
    risk_factors: List[str]
    confidence_score: float
    last_activity: datetime
    predicted_churn_date: Optional[datetime]
    recommended_actions: List[str]

class CustomerChurnAgent:
    """Agent focused on preventing customer churn through proactive intervention"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        # TODO: Initialize perception, planning, execution, memory systems
        # TODO: Set up customer data sources
        # TODO: Configure retention action integrations
    
    async def assess_churn_risk(self, customer_id: str) -> CustomerRiskProfile:
        """Analyze a single customer's churn risk"""
        # TODO: Gather customer behavioral data
        # TODO: Apply churn risk scoring model
        # TODO: Generate risk profile with recommended actions
        pass
    
    async def execute_retention_action(self, customer_profile: CustomerRiskProfile, action: str) -> Dict[str, Any]:
        """Execute specific retention action for at-risk customer"""
        # TODO: Implement retention actions:
        # - Send personalized discount offer
        # - Schedule proactive support call  
        # - Alert account manager
        # - Trigger win-back email sequence
        pass
    
    async def learn_from_outcomes(self, customer_id: str, actions_taken: List[str], outcome: str) -> None:
        """Update models based on retention campaign results"""
        # TODO: Track which actions were most effective
        # TODO: Update churn prediction models
        # TODO: Adjust risk scoring parameters
        pass

# Example usage and test data setup
async def main():
    config = {
        "data_sources": {
            "customer_db": "postgresql://localhost/customers",
            "support_api": "https://support.company.com/api/v1",
            "analytics_api": "https://analytics.company.com/api"
        },
        "integrations": {
            "email_service": {"api_key": "email_key", "endpoint": "https://email.service.com"},
            "crm_system": {"api_key": "crm_key", "endpoint": "https://crm.company.com/api"},
            "slack_webhook": "https://hooks.slack.com/services/xxx"
        },
        "business_rules": {
            "high_risk_threshold": 0.7,
            "critical_risk_threshold": 0.85,
            "max_retention_spend": 500.0,
            "min_customer_value": 1000.0
        }
    }
    
    agent = CustomerChurnAgent(config)
    
    # Test with sample customer
    risk_profile = await agent.assess_churn_risk("customer_12345")
    print(f"Risk assessment: {risk_profile}")
    
    if risk_profile.risk_level in [ChurnRiskLevel.HIGH, ChurnRiskLevel.CRITICAL]:
        for action in risk_profile.recommended_actions:
            result = await agent.execute_retention_action(risk_profile, action)
            print(f"Action {action} result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

Your Implementation Tasks

  1. Implement the perception system that gathers customer behavioral indicators:

    • Recent transaction frequency and amounts
    • Support ticket volume and sentiment
    • Website/app engagement metrics
    • Account usage patterns
  2. Build the churn risk assessment logic combining:

    • Rule-based scoring (no activity for X days = +risk)
    • Pattern matching against known churn behaviors
    • Statistical models for churn probability
  3. Create retention action executors for:

    • Personalized email campaigns
    • Automated discount generation
    • CRM task creation for account managers
    • Slack notifications to customer success team
  4. Implement learning mechanisms that:

    • Track action effectiveness by customer segment
    • Adjust risk scoring based on false positives/negatives
    • Optimize retention offer amounts and timing

Success Criteria

Your completed agent should:

  • Successfully identify high-risk customers from simulated data
  • Execute appropriate retention actions without errors
  • Show evidence of learning from action outcomes
  • Handle edge cases gracefully (missing data, API failures, etc.)
  • Provide clear logging and observability into its decision-making

Solution Approach

Here's one effective approach to structuring your solution:

class ChurnPerceptionSystem:
    async def gather_customer_data(self, customer_id: str) -> Dict[str, Any]:
        """Collect all relevant customer behavioral data"""
        data = {}
        
        # Transactional behavior
        data["transactions"] = await self._get_transaction_history(customer_id, days=90)
        
        # Support interactions  
        data["support"] = await self._get_support_history(customer_id, days=90)
        
        # Product usage
        data["usage"] = await self._get_usage_metrics(customer_id, days=30)
        
        return data

class ChurnRiskScorer:
    def __init__(self, business_rules: Dict[str, Any]):
        self.rules = business_rules
        self.learned_weights = self._load_learned_weights()
    
    def calculate_risk_score(self, customer_data: Dict[str, Any]) -> Tuple[float, List[str]]:
        """Calculate churn risk score and identify contributing factors"""
        risk_factors = []
        score = 0.0
        
        # Recency scoring
        last_transaction = self._get_last_transaction_date(customer_data["transactions"])
        days_since_activity = (datetime.now() - last_transaction).days
        
        if days_since_activity > 30:
            risk_factors.append("no_recent_transactions")
            score += 0.3
        
        # Frequency scoring
        transaction_count = len(customer_data["transactions"])
        historical_avg = customer_data.get("historical_transaction_frequency", 10)
        
        if transaction_count < historical_avg * 0.5:
            risk_factors.append("decreased_transaction_frequency")  
            score += 0.2
        
        # Support interaction patterns
        recent_tickets = [t for t in customer_data["support"] 
                         if t["created"] > datetime.now() - timedelta(days=30)]
        
        if len(recent_tickets) > 3:
            risk_factors.append("increased_support_requests")
            score += 0.15
        
        # Apply learned weights
        score = self._apply_learned_adjustments(score, risk_factors)
        
        return min(1.0, score), risk_factors

Focus on making your implementation realistic—use proper error handling, include realistic data patterns, and make sure your agent could actually prevent churn in a real business environment.

Common Mistakes & Troubleshooting

Building production AI agents introduces unique challenges that don't exist in traditional software systems. Here are the most critical mistakes to avoid, based on real implementation experience:

1. Treating Agents Like Deterministic Systems

The Mistake: Expecting agents to behave identically given the same inputs, or building brittle systems that break when agents make unexpected decisions.

# WRONG: Fragile agent integration
async def process_sales_alert(agent_response):
    # This breaks when the agent returns unexpected format
    if agent_response["action"] == "send_alert":
        recipients = agent_response["recipients"]  # KeyError if missing!
        send_email(recipients, agent_response["message"])

# WRONG: Assuming deterministic behavior  
def test_agent_planning():
    observations = [revenue_decline_observation]
    plan = agent.plan(observations)
    assert plan[0].action_type == ActionType.ALERT  # This will randomly fail!

Why it fails: AI agents make probabilistic decisions. The same inputs can generate different outputs due to LLM randomness, learned behaviors, or environmental changes. Systems that assume deterministic behavior will fail unpredictably.

The Fix: Build robust interfaces that handle variability gracefully:

# CORRECT: Robust agent integration
async def process_agent_response(agent_response: Dict[str, Any]) -> bool:
    """Process agent response with proper error handling"""
    
    # Validate response structure
    required_fields = ["action_type", "confidence", "parameters"]
    if not all(field in agent_response for field in required_fields):
        logger.warning(f"Invalid agent response structure: {agent_response}")
        return False
    
    # Handle different action types safely
    action_type = agent_response.get("action_type")
    parameters = agent_response.get("parameters", {})
    
    if action_type == "send_alert":
        recipients = parameters.get("recipients", ["fallback@company.com"])
        message = parameters.get("message", "Agent alert triggered")
        
        # Validate recipients before sending
        valid_recipients = [r for r in recipients if "@" in r]
        if valid_recipients:
            await send_email(valid_recipients, message)
            return True
        else:
            logger.error(f"No valid recipients in agent response: {recipients}")
            return False
    
    elif action_type == "investigate":
        # Handle investigation actions...
        return await execute_investigation(parameters)
    
    else:
        logger.warning(f"Unknown action type from agent: {action_type}")
        return False

# CORRECT: Probabilistic testing
def test_agent_planning_distribution():
    """Test agent behavior statistically rather than deterministically"""
    observations = [revenue_decline_observation]
    
    # Run multiple times to capture distribution
    action_types = []
    for _ in range(20):
        plan = agent.plan(observations)
        if plan:
            action_types.append(plan[0].action_type)
    
    # Test statistical properties
    assert ActionType.ALERT in action_types, "Agent should sometimes choose alerts"
    assert len(set(action_types)) > 1, "Agent should show some variability"
    
    # Test that critical situations consistently trigger high-priority actions
    alert_ratio = action_types.count(ActionType.ALERT) / len(action_types)
    assert alert_ratio > 0.6, "Agent should favor alerts for revenue decline"

2. Insufficient Action Validation and Safety Rails

The Mistake: Allowing agents to execute actions without proper validation, leading to costly mistakes or security vulnerabilities.

# WRONG: No validation on agent actions
async def execute_agent_action(action):
    # Blindly trust the agent's decisions
    if action.type == "send_promotional_email":
        for customer_id in action.parameters["customer_list"]:
            await send_email(customer_id, action.parameters["offer"])

# WRONG: No cost controls
async def execute_retention_offer(customer_id, discount_amount):
    # Agent could authorize unlimited discounts
    await create_discount_code(customer_id, discount_amount)

Why it fails: Agents can make mistakes, be manipulated through prompt injection, or optimize for metrics in unintended ways. Without validation, these mistakes can be expensive or dangerous.

The Fix: Implement comprehensive validation and safety rails:

class ActionValidator:
    """Validates agent actions before execution"""
    
    def __init__(self, business_rules: Dict[str, Any]):
        self.rules = business_rules
        self.recent_actions = defaultdict(list)  # Track recent actions per customer
    
    async def validate_action(self, action: PlannedAction) -> Tuple[bool, str]:
        """Validate action against business rules and safety constraints"""
        
        # Check basic action structure
        if not self._is_action_well_formed(action):
            return False, "Action missing required fields"
        
        # Validate based on action type
        if action.action_type == ActionType.ALERT:
            return await self._validate_alert_action(action)
        elif action.action_type == ActionType.INVESTIGATE:
            return await self._validate_investigation_action(action)
        else:
            return True, "Action type not restricted"
    
    async def _validate_alert_action(self, action: PlannedAction) -> Tuple[bool, str]:
        """Validate alert actions for safety and appropriateness"""
        
        params = action.parameters
        
        # Rate limiting: prevent spam
        recent_alerts = [a for a in self.recent_actions["alerts"] 
                        if a["timestamp"] > datetime.now() - timedelta(hours=1)]
        
        if len(recent_alerts) >= self.rules.get("max_alerts_per_hour", 5):
            return False, "Alert rate limit exceeded"
        
        # Recipient validation
        recipients = params.get("recipients", [])
        if not recipients:
            return False, "Alert requires at least one recipient"
        
        # Check for valid email addresses
        valid_recipients = [r for r in recipients if self._is_valid_email(r)]
        if not valid_recipients:
            return False, "No valid recipients found"
        
        # Severity validation
        severity = params.get("severity", "medium")
        if severity not in ["low", "medium", "high"]:
            return False, f"Invalid severity level: {severity}"
        
        # Content validation (prevent sensitive data leakage)
        message = params.get("message", "")
        if self._contains_sensitive_data(message):
            return False, "Alert message contains potentially sensitive information"
        
        # Record this validation for rate limiting
        self.recent_actions["alerts"].append({
            "timestamp": datetime.now(),
            "recipients": recipients,
            "severity": severity
        })
        
        return True, "Alert action validated"
    
    def _contains_sensitive_data(self, text: str) -> bool:

Learning Path: RAG & AI Agents

Previous

Building Your First RAG Pipeline

Related Articles

AI & Machine Learning🔥 Expert

OpenAI vs Anthropic vs Open Source: Choosing the Right LLM

28 min
AI & Machine Learning🌱 Foundation

Building Your First AI App with the Claude API: Complete Beginner's Guide

15 min
AI & Machine Learning⚡ Practitioner

Building Custom GPTs and Claude Projects for Your Team

27 min

On this page

  • Prerequisites
  • The Agent Architecture: Beyond Simple Automation
  • Core Agent Components
  • Perception: Making Sense of Complex Environments
  • Planning: From Observations to Actions
  • Action Execution: Safe Operations in Production
  • Memory: Learning and Context Management
  • Building Your First Production Agent
  • Hands-On Exercise: Building a Customer Churn Prevention Agent
  • Exercise Requirements
  • Success Criteria
  • Solution Approach
  • Common Mistakes & Troubleshooting
  • 1. Treating Agents Like Deterministic Systems
  • 2. Insufficient Action Validation and Safety Rails
  • Starter Code Structure
  • Your Implementation Tasks
  • Success Criteria
  • Solution Approach
  • Common Mistakes & Troubleshooting
  • 1. Treating Agents Like Deterministic Systems
  • 2. Insufficient Action Validation and Safety Rails