Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Logging, Alerting, and Observability for Data Pipelines

Logging, Alerting, and Observability for Data Pipelines

Data Engineering🌱 Foundation18 min readApr 8, 2026Updated Apr 8, 2026
Table of Contents
  • Prerequisites
  • Understanding the Observability Stack
  • Implementing Structured Logging
  • Building Metrics and Monitoring
  • Designing Effective Alerts
  • Hands-On Exercise
  • Common Mistakes & Troubleshooting
  • Summary & Next Steps

When your data pipeline silently fails at 3 AM, how long does it take you to notice? If you're like most data professionals starting out, the answer is probably "whenever someone complains that their dashboard is empty." This reactive approach to pipeline management is a recipe for sleepless nights and frustrated stakeholders.

Effective logging, alerting, and observability transform you from a firefighter into a proactive guardian of your data infrastructure. Instead of discovering failures hours or days later, you'll know within minutes when something goes wrong—and often prevent problems before they impact your users. This isn't just about writing log statements; it's about building a comprehensive monitoring strategy that gives you confidence in your data systems.

What you'll learn:

  • How to implement structured logging that provides actionable insights into pipeline behavior
  • The difference between metrics, logs, and traces, and when to use each approach
  • How to design effective alerts that notify you of real problems without creating noise
  • Practical techniques for monitoring data quality, pipeline performance, and system health
  • How to build observability into your pipelines from the ground up using Python and common tools

Prerequisites

You should be comfortable with Python programming and have basic familiarity with data pipelines (ETL/ELT processes). No prior experience with monitoring tools is required—we'll build everything from fundamentals.

Understanding the Observability Stack

Before diving into implementation, let's establish what we mean by observability and how it differs from simple monitoring. Observability is your ability to understand what's happening inside your systems based on their external outputs. For data pipelines, this means having visibility into data flow, processing performance, and quality issues.

The observability stack consists of three pillars:

Logs are timestamped records of events that happened in your system. Think of them as a detailed diary of your pipeline's activities. When your pipeline processes a batch of customer data, logs capture what happened: "Started processing 10,000 customer records at 2023-10-15 09:30:15, encountered 3 invalid phone numbers, completed successfully in 45 seconds."

Metrics are numerical measurements of your system's behavior over time. Unlike logs, which are discrete events, metrics give you trends and patterns. Examples include records processed per minute, error rates, memory usage, and data freshness (how old is your most recent data).

Traces follow a single request or data batch through your entire pipeline, showing you the path it took and how long each step took. If your pipeline has multiple stages—extraction, transformation, validation, loading—traces show you the complete journey.

Let's start building these capabilities into a realistic data pipeline.

Implementing Structured Logging

The foundation of good observability is structured logging. Unlike casual print statements or basic logging that produces human-readable text, structured logging creates machine-parseable records with consistent fields.

Here's a customer data processing pipeline with proper structured logging:

import logging
import json
import time
from datetime import datetime
from typing import Dict, Any, List
import pandas as pd

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format='%(message)s',  # We'll format the JSON ourselves
    handlers=[
        logging.FileHandler('pipeline.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

class StructuredLogger:
    def __init__(self, pipeline_name: str, run_id: str):
        self.pipeline_name = pipeline_name
        self.run_id = run_id
        
    def log(self, level: str, message: str, **context):
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'pipeline': self.pipeline_name,
            'run_id': self.run_id,
            'level': level,
            'message': message,
            **context
        }
        logger.info(json.dumps(log_entry))

class CustomerDataPipeline:
    def __init__(self):
        self.run_id = f"run_{int(time.time())}"
        self.log = StructuredLogger("customer_data_pipeline", self.run_id)
        self.metrics = {}
        
    def extract_data(self, source_path: str) -> pd.DataFrame:
        self.log.log("INFO", "Starting data extraction", 
                     source=source_path, stage="extract")
        
        try:
            start_time = time.time()
            df = pd.read_csv(source_path)
            extraction_time = time.time() - start_time
            
            self.log.log("INFO", "Data extraction completed",
                        records_extracted=len(df),
                        extraction_time_seconds=round(extraction_time, 2),
                        columns=list(df.columns),
                        stage="extract")
            
            self.metrics['records_extracted'] = len(df)
            self.metrics['extraction_time'] = extraction_time
            
            return df
            
        except Exception as e:
            self.log.log("ERROR", "Data extraction failed",
                        error=str(e),
                        error_type=type(e).__name__,
                        stage="extract")
            raise
    
    def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
        self.log.log("INFO", "Starting data validation", 
                     records_to_validate=len(df), stage="validate")
        
        validation_results = {
            'missing_customer_ids': df['customer_id'].isna().sum(),
            'invalid_emails': 0,
            'future_signup_dates': 0,
            'duplicate_customers': df['customer_id'].duplicated().sum()
        }
        
        # Email validation
        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        validation_results['invalid_emails'] = (~df['email'].str.match(email_pattern)).sum()
        
        # Date validation  
        df['signup_date'] = pd.to_datetime(df['signup_date'], errors='coerce')
        validation_results['future_signup_dates'] = (
            df['signup_date'] > datetime.now()
        ).sum()
        
        total_issues = sum(validation_results.values())
        
        self.log.log("INFO", "Data validation completed",
                    validation_results=validation_results,
                    total_issues=total_issues,
                    stage="validate")
        
        if total_issues > 0:
            self.log.log("WARNING", "Data quality issues detected",
                        issue_count=total_issues,
                        issue_percentage=round(total_issues/len(df)*100, 2),
                        stage="validate")
        
        self.metrics.update(validation_results)
        return df
    
    def process_data(self, df: pd.DataFrame) -> pd.DataFrame:
        self.log.log("INFO", "Starting data processing", 
                     records_to_process=len(df), stage="transform")
        
        try:
            # Clean email addresses
            df['email'] = df['email'].str.lower().str.strip()
            
            # Calculate customer tenure
            df['signup_date'] = pd.to_datetime(df['signup_date'])
            df['days_since_signup'] = (datetime.now() - df['signup_date']).dt.days
            
            # Categorize customers
            df['customer_segment'] = pd.cut(
                df['days_since_signup'], 
                bins=[0, 30, 365, float('inf')], 
                labels=['new', 'regular', 'loyal']
            )
            
            processed_records = len(df)
            
            self.log.log("INFO", "Data processing completed",
                        records_processed=processed_records,
                        new_customers=len(df[df['customer_segment'] == 'new']),
                        regular_customers=len(df[df['customer_segment'] == 'regular']),
                        loyal_customers=len(df[df['customer_segment'] == 'loyal']),
                        stage="transform")
            
            self.metrics['records_processed'] = processed_records
            return df
            
        except Exception as e:
            self.log.log("ERROR", "Data processing failed",
                        error=str(e),
                        error_type=type(e).__name__,
                        stage="transform")
            raise
    
    def load_data(self, df: pd.DataFrame, destination_path: str):
        self.log.log("INFO", "Starting data load", 
                     records_to_load=len(df),
                     destination=destination_path,
                     stage="load")
        
        try:
            start_time = time.time()
            df.to_csv(destination_path, index=False)
            load_time = time.time() - start_time
            
            self.log.log("INFO", "Data load completed",
                        records_loaded=len(df),
                        load_time_seconds=round(load_time, 2),
                        stage="load")
            
            self.metrics['records_loaded'] = len(df)
            self.metrics['load_time'] = load_time
            
        except Exception as e:
            self.log.log("ERROR", "Data load failed",
                        error=str(e),
                        error_type=type(e).__name__,
                        stage="load")
            raise
    
    def run_pipeline(self, source_path: str, destination_path: str):
        pipeline_start = time.time()
        self.log.log("INFO", "Pipeline execution started",
                    source=source_path,
                    destination=destination_path)
        
        try:
            # Execute pipeline stages
            df = self.extract_data(source_path)
            df = self.validate_data(df)
            df = self.process_data(df)
            self.load_data(df, destination_path)
            
            # Calculate total metrics
            total_time = time.time() - pipeline_start
            self.metrics['total_execution_time'] = total_time
            
            self.log.log("INFO", "Pipeline execution completed successfully",
                        total_time_seconds=round(total_time, 2),
                        final_metrics=self.metrics)
            
        except Exception as e:
            total_time = time.time() - pipeline_start
            self.log.log("ERROR", "Pipeline execution failed",
                        total_time_seconds=round(total_time, 2),
                        error=str(e),
                        partial_metrics=self.metrics)
            raise

This structured approach provides several advantages over basic logging:

  1. Searchable Context: Each log entry includes consistent fields like pipeline, run_id, and stage, making it easy to filter and search logs.
  2. Machine Parseable: The JSON format allows log analysis tools to automatically extract and analyze log data.
  3. Rich Context: Beyond just error messages, we capture metrics, execution times, and business-relevant information.

Tip: Always include a unique run_id for each pipeline execution. This makes it easy to trace all log entries for a specific run, especially when multiple pipeline instances are running concurrently.

Building Metrics and Monitoring

While logs tell you what happened, metrics tell you how your system is behaving over time. Let's extend our pipeline to collect and expose metrics that we can monitor.

import time
from collections import defaultdict
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class MetricPoint:
    timestamp: datetime
    value: float
    tags: Dict[str, str]

class MetricsCollector:
    def __init__(self):
        self.metrics = defaultdict(list)
        
    def record_counter(self, name: str, value: float = 1.0, tags: Dict[str, str] = None):
        """Record a counter metric (cumulative value)"""
        self.metrics[name].append(MetricPoint(
            timestamp=datetime.utcnow(),
            value=value,
            tags=tags or {}
        ))
    
    def record_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
        """Record a gauge metric (current value)"""
        self.metrics[name].append(MetricPoint(
            timestamp=datetime.utcnow(),
            value=value,
            tags=tags or {}
        ))
    
    def record_timing(self, name: str, duration_seconds: float, tags: Dict[str, str] = None):
        """Record a timing metric"""
        self.metrics[f"{name}.duration"].append(MetricPoint(
            timestamp=datetime.utcnow(),
            value=duration_seconds,
            tags=tags or {}
        ))
    
    def get_metrics_summary(self, hours: int = 1) -> Dict[str, Any]:
        """Get a summary of metrics from the last N hours"""
        cutoff = datetime.utcnow() - timedelta(hours=hours)
        summary = {}
        
        for metric_name, points in self.metrics.items():
            recent_points = [p for p in points if p.timestamp > cutoff]
            if recent_points:
                values = [p.value for p in recent_points]
                summary[metric_name] = {
                    'count': len(values),
                    'avg': sum(values) / len(values),
                    'min': min(values),
                    'max': max(values),
                    'latest': values[-1],
                    'latest_timestamp': recent_points[-1].timestamp.isoformat()
                }
        
        return summary

class MonitoredPipeline(CustomerDataPipeline):
    def __init__(self):
        super().__init__()
        self.metrics_collector = MetricsCollector()
        
    def extract_data(self, source_path: str) -> pd.DataFrame:
        start_time = time.time()
        
        try:
            df = super().extract_data(source_path)
            
            # Record success metrics
            duration = time.time() - start_time
            self.metrics_collector.record_timing("pipeline.extract", duration)
            self.metrics_collector.record_counter("pipeline.extract.success")
            self.metrics_collector.record_gauge("pipeline.records_extracted", len(df))
            
            return df
            
        except Exception as e:
            # Record failure metrics
            duration = time.time() - start_time
            self.metrics_collector.record_timing("pipeline.extract", duration, 
                                               tags={"status": "failed"})
            self.metrics_collector.record_counter("pipeline.extract.error", 
                                                tags={"error_type": type(e).__name__})
            raise
    
    def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
        df = super().validate_data(df)
        
        # Record data quality metrics
        total_records = len(df)
        missing_ids = df['customer_id'].isna().sum()
        duplicates = df['customer_id'].duplicated().sum()
        
        self.metrics_collector.record_gauge("data_quality.missing_customer_ids", missing_ids)
        self.metrics_collector.record_gauge("data_quality.duplicate_customers", duplicates)
        self.metrics_collector.record_gauge("data_quality.completeness_rate", 
                                          (total_records - missing_ids) / total_records)
        
        # Alert on data quality issues
        if missing_ids > total_records * 0.05:  # More than 5% missing IDs
            self.log.log("CRITICAL", "High rate of missing customer IDs",
                        missing_count=missing_ids,
                        missing_rate=missing_ids/total_records,
                        threshold=0.05)
        
        return df
    
    def get_health_status(self) -> Dict[str, Any]:
        """Return current health status of the pipeline"""
        metrics_summary = self.metrics_collector.get_metrics_summary()
        
        # Calculate health indicators
        health_status = {
            'overall_status': 'healthy',
            'last_successful_run': None,
            'error_rate': 0.0,
            'avg_processing_time': None,
            'data_quality_score': 1.0,
            'metrics_summary': metrics_summary
        }
        
        # Check for recent errors
        if 'pipeline.extract.error' in metrics_summary:
            error_count = metrics_summary['pipeline.extract.error']['count']
            total_runs = metrics_summary.get('pipeline.extract.success', {}).get('count', 0) + error_count
            health_status['error_rate'] = error_count / max(total_runs, 1)
            
            if health_status['error_rate'] > 0.1:  # More than 10% failure rate
                health_status['overall_status'] = 'degraded'
        
        # Check processing time trends
        if 'pipeline.extract.duration' in metrics_summary:
            health_status['avg_processing_time'] = metrics_summary['pipeline.extract.duration']['avg']
        
        # Check data quality
        if 'data_quality.completeness_rate' in metrics_summary:
            completeness = metrics_summary['data_quality.completeness_rate']['latest']
            if completeness < 0.95:  # Less than 95% complete
                health_status['overall_status'] = 'degraded'
                health_status['data_quality_score'] = completeness
        
        return health_status

This metrics system provides several key capabilities:

Performance Monitoring: We track how long each stage takes, allowing us to identify performance degradation over time.

Data Quality Metrics: We monitor missing data, duplicates, and completeness rates—critical indicators for data pipeline health.

Error Tracking: We count different types of errors and calculate error rates, helping identify when problems are becoming systematic rather than isolated incidents.

Designing Effective Alerts

The goal of alerting is to notify you of problems that require human intervention while avoiding alert fatigue from false positives or minor issues. Let's build an alerting system that follows best practices.

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from enum import Enum
from typing import List, Callable
import json

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning" 
    CRITICAL = "critical"

class AlertRule:
    def __init__(self, name: str, condition: Callable, severity: AlertSeverity, 
                 message: str, cooldown_minutes: int = 60):
        self.name = name
        self.condition = condition  # Function that returns True if alert should fire
        self.severity = severity
        self.message = message
        self.cooldown_minutes = cooldown_minutes
        self.last_fired = None
    
    def should_fire(self, metrics: Dict[str, Any]) -> bool:
        """Check if this alert should fire given current metrics"""
        # Check cooldown period
        if self.last_fired:
            minutes_since_last = (datetime.utcnow() - self.last_fired).total_minutes()
            if minutes_since_last < self.cooldown_minutes:
                return False
        
        return self.condition(metrics)
    
    def fire(self):
        """Mark this alert as fired"""
        self.last_fired = datetime.utcnow()

class AlertManager:
    def __init__(self, email_config: Dict[str, str] = None):
        self.rules = []
        self.email_config = email_config
        self.alert_history = []
        
    def add_rule(self, rule: AlertRule):
        """Add an alert rule to the manager"""
        self.rules.append(rule)
    
    def check_alerts(self, metrics: Dict[str, Any], pipeline_logger: StructuredLogger):
        """Check all alert rules against current metrics"""
        for rule in self.rules:
            if rule.should_fire(metrics):
                self._send_alert(rule, metrics, pipeline_logger)
                rule.fire()
    
    def _send_alert(self, rule: AlertRule, metrics: Dict[str, Any], logger: StructuredLogger):
        """Send an alert notification"""
        alert_data = {
            'rule_name': rule.name,
            'severity': rule.severity.value,
            'message': rule.message,
            'metrics_snapshot': metrics,
            'timestamp': datetime.utcnow().isoformat()
        }
        
        # Log the alert
        logger.log("ALERT", f"Alert fired: {rule.name}",
                  severity=rule.severity.value,
                  alert_message=rule.message,
                  metrics=metrics)
        
        # Store in history
        self.alert_history.append(alert_data)
        
        # Send email if configured
        if self.email_config and rule.severity in [AlertSeverity.WARNING, AlertSeverity.CRITICAL]:
            self._send_email_alert(alert_data)
        
        # Print to console for immediate visibility
        print(f"🚨 ALERT [{rule.severity.value.upper()}]: {rule.name}")
        print(f"   {rule.message}")
        print(f"   Time: {alert_data['timestamp']}")
    
    def _send_email_alert(self, alert_data: Dict[str, Any]):
        """Send email notification (if configured)"""
        if not self.email_config:
            return
            
        try:
            msg = MIMEMultipart()
            msg['From'] = self.email_config['sender']
            msg['To'] = self.email_config['recipient']
            msg['Subject'] = f"Pipeline Alert: {alert_data['rule_name']} [{alert_data['severity'].upper()}]"
            
            body = f"""
Pipeline Alert Notification

Rule: {alert_data['rule_name']}
Severity: {alert_data['severity']}
Time: {alert_data['timestamp']}
Message: {alert_data['message']}

Metrics Snapshot:
{json.dumps(alert_data['metrics_snapshot'], indent=2)}
            """
            
            msg.attach(MIMEText(body, 'plain'))
            
            server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
            if self.email_config.get('username'):
                server.starttls()
                server.login(self.email_config['username'], self.email_config['password'])
            
            server.send_message(msg)
            server.quit()
            
        except Exception as e:
            print(f"Failed to send email alert: {e}")

# Define specific alert conditions
def high_error_rate(metrics: Dict[str, Any]) -> bool:
    """Alert if error rate exceeds 10%"""
    summary = metrics.get('metrics_summary', {})
    if 'pipeline.extract.error' in summary and 'pipeline.extract.success' in summary:
        errors = summary['pipeline.extract.error']['count']
        successes = summary['pipeline.extract.success']['count']
        error_rate = errors / (errors + successes)
        return error_rate > 0.1
    return False

def low_data_quality(metrics: Dict[str, Any]) -> bool:
    """Alert if data completeness drops below 95%"""
    summary = metrics.get('metrics_summary', {})
    if 'data_quality.completeness_rate' in summary:
        completeness = summary['data_quality.completeness_rate']['latest']
        return completeness < 0.95
    return False

def slow_processing(metrics: Dict[str, Any]) -> bool:
    """Alert if processing time exceeds normal range"""
    summary = metrics.get('metrics_summary', {})
    if 'pipeline.extract.duration' in summary:
        avg_duration = summary['pipeline.extract.duration']['avg']
        return avg_duration > 300  # More than 5 minutes
    return False

def no_recent_data(metrics: Dict[str, Any]) -> bool:
    """Alert if no data has been processed recently"""
    summary = metrics.get('metrics_summary', {})
    if 'pipeline.records_extracted' in summary:
        latest_timestamp = summary['pipeline.records_extracted']['latest_timestamp']
        latest_time = datetime.fromisoformat(latest_timestamp.replace('Z', '+00:00'))
        hours_since_last = (datetime.utcnow() - latest_time.replace(tzinfo=None)).total_seconds() / 3600
        return hours_since_last > 24  # No data for 24+ hours
    return False

class AlertingPipeline(MonitoredPipeline):
    def __init__(self, email_config: Dict[str, str] = None):
        super().__init__()
        self.alert_manager = AlertManager(email_config)
        
        # Set up alert rules
        self.alert_manager.add_rule(AlertRule(
            name="high_error_rate",
            condition=high_error_rate,
            severity=AlertSeverity.CRITICAL,
            message="Pipeline error rate exceeds 10% - immediate attention required",
            cooldown_minutes=30
        ))
        
        self.alert_manager.add_rule(AlertRule(
            name="low_data_quality", 
            condition=low_data_quality,
            severity=AlertSeverity.WARNING,
            message="Data completeness below 95% - check data sources",
            cooldown_minutes=60
        ))
        
        self.alert_manager.add_rule(AlertRule(
            name="slow_processing",
            condition=slow_processing, 
            severity=AlertSeverity.WARNING,
            message="Processing time significantly higher than normal",
            cooldown_minutes=120
        ))
        
        self.alert_manager.add_rule(AlertRule(
            name="no_recent_data",
            condition=no_recent_data,
            severity=AlertSeverity.CRITICAL,
            message="No data processed in the last 24 hours - pipeline may be down",
            cooldown_minutes=360  # 6 hours
        ))
    
    def run_pipeline(self, source_path: str, destination_path: str):
        """Run pipeline with alerting"""
        super().run_pipeline(source_path, destination_path)
        
        # Check alerts after pipeline execution
        health_status = self.get_health_status()
        self.alert_manager.check_alerts(health_status, self.log)

This alerting system implements several best practices:

Severity Levels: Different types of issues get different severities, allowing you to prioritize responses appropriately.

Cooldown Periods: Alerts won't fire repeatedly for the same issue, preventing notification spam.

Contextual Information: Alerts include relevant metrics and snapshots, giving you the information needed to diagnose problems.

Multiple Channels: Alerts can go to logs, email, and console output, ensuring visibility across different contexts.

Warning: Be careful with alert thresholds. Start with conservative values and adjust based on your actual system behavior. Too many false positives will train you to ignore alerts entirely.

Hands-On Exercise

Let's put everything together by creating a complete monitoring setup for a real-world scenario. You'll build a pipeline that processes e-commerce order data and implement comprehensive observability.

Create a sample dataset first:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# Generate sample e-commerce data
def create_sample_data(filename: str, num_records: int = 1000):
    np.random.seed(42)  # For reproducible results
    
    # Generate realistic e-commerce order data
    base_date = datetime.now() - timedelta(days=30)
    dates = [base_date + timedelta(days=np.random.randint(0, 30)) for _ in range(num_records)]
    
    data = {
        'order_id': [f"ORD-{1000 + i}" for i in range(num_records)],
        'customer_id': np.random.choice([f"CUST-{i}" for i in range(1, 201)], num_records),
        'order_date': dates,
        'product_category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Home'], num_records),
        'order_amount': np.random.lognormal(3, 1, num_records).round(2),
        'payment_method': np.random.choice(['credit_card', 'debit_card', 'paypal'], num_records),
        'shipping_address': [f"{np.random.randint(1, 9999)} Main St, City" for _ in range(num_records)]
    }
    
    # Introduce some data quality issues (5% of records)
    issues_count = int(num_records * 0.05)
    issue_indices = np.random.choice(num_records, issues_count, replace=False)
    
    for idx in issue_indices:
        if np.random.random() < 0.3:  # Missing customer ID
            data['customer_id'][idx] = None
        if np.random.random() < 0.2:  # Negative amount
            data['order_amount'][idx] = -abs(data['order_amount'][idx])
        if np.random.random() < 0.3:  # Future date
            data['order_date'][idx] = datetime.now() + timedelta(days=np.random.randint(1, 10))
    
    df = pd.DataFrame(data)
    df.to_csv(filename, index=False)
    print(f"Created sample data: {filename} with {num_records} records")

# Create the sample data
create_sample_data('ecommerce_orders.csv', 1000)

Now implement the complete monitoring pipeline:

class ECommerceOrderPipeline(AlertingPipeline):
    def __init__(self, email_config: Dict[str, str] = None):
        super().__init__(email_config)
        
        # Add e-commerce specific alert rules
        self.alert_manager.add_rule(AlertRule(
            name="high_negative_amounts",
            condition=self._check_negative_amounts,
            severity=AlertSeverity.WARNING,
            message="High percentage of negative order amounts detected",
            cooldown_minutes=60
        ))
        
        self.alert_manager.add_rule(AlertRule(
            name="unusual_order_volume",
            condition=self._check_order_volume,
            severity=AlertSeverity.INFO,
            message="Order volume significantly different from normal",
            cooldown_minutes=120
        ))
    
    def _check_negative_amounts(self, metrics: Dict[str, Any]) -> bool:
        summary = metrics.get('metrics_summary', {})
        if 'data_quality.negative_amounts' in summary:
            negative_count = summary['data_quality.negative_amounts']['latest']
            total_records = summary.get('pipeline.records_extracted', {}).get('latest', 1)
            return (negative_count / total_records) > 0.02  # More than 2%
        return False
    
    def _check_order_volume(self, metrics: Dict[str, Any]) -> bool:
        summary = metrics.get('metrics_summary', {})
        if 'pipeline.records_extracted' in summary:
            current_volume = summary['pipeline.records_extracted']['latest']
            # Simple anomaly detection: alert if volume is 50% higher/lower than average
            avg_volume = summary['pipeline.records_extracted']['avg']
            return abs(current_volume - avg_volume) > (avg_volume * 0.5)
        return False
    
    def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
        self.log.log("INFO", "Starting e-commerce order validation", 
                     records_to_validate=len(df), stage="validate")
        
        # Standard validations
        validation_results = {
            'missing_order_ids': df['order_id'].isna().sum(),
            'missing_customer_ids': df['customer_id'].isna().sum(),
            'duplicate_orders': df['order_id'].duplicated().sum(),
            'negative_amounts': (df['order_amount'] < 0).sum(),
            'future_orders': 0,
            'invalid_payment_methods': 0
        }
        
        # Date validation
        df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce')
        validation_results['future_orders'] = (df['order_date'] > datetime.now()).sum()
        
        # Payment method validation
        valid_methods = ['credit_card', 'debit_card', 'paypal']
        validation_results['invalid_payment_methods'] = (
            ~df['payment_method'].isin(valid_methods)
        ).sum()
        
        # Record detailed metrics
        for metric_name, count in validation_results.items():
            self.metrics_collector.record_gauge(f"data_quality.{metric_name}", count)
        
        # Calculate business metrics
        total_amount = df['order_amount'][df['order_amount'] > 0].sum()
        avg_order_value = df['order_amount'][df['order_amount'] > 0].mean()
        
        self.metrics_collector.record_gauge("business.total_order_value", total_amount)
        self.metrics_collector.record_gauge("business.avg_order_value", avg_order_value)
        
        category_counts = df['product_category'].value_counts()
        for category, count in category_counts.items():
            self.metrics_collector.record_gauge("business.orders_by_category", count, 
                                               tags={"category": category})
        
        total_issues = sum(validation_results.values())
        self.log.log("INFO", "E-commerce order validation completed",
                    validation_results=validation_results,
                    total_issues=total_issues,
                    total_order_value=total_amount,
                    avg_order_value=round(avg_order_value, 2),
                    stage="validate")
        
        return df

# Example usage with monitoring
def run_monitored_pipeline():
    # Optional: configure email alerts
    email_config = {
        'smtp_server': 'smtp.gmail.com',
        'smtp_port': 587,
        'sender': 'your-pipeline@company.com',
        'recipient': 'data-team@company.com',
        'username': 'your-username',
        'password': 'your-password'
    }
    
    # Run pipeline with full monitoring
    pipeline = ECommerceOrderPipeline()  # Remove email_config for this example
    
    try:
        pipeline.run_pipeline('ecommerce_orders.csv', 'processed_orders.csv')
        
        # Display health status
        health = pipeline.get_health_status()
        print("\n" + "="*50)
        print("PIPELINE HEALTH STATUS")
        print("="*50)
        print(f"Overall Status: {health['overall_status']}")
        print(f"Error Rate: {health['error_rate']:.2%}")
        print(f"Data Quality Score: {health['data_quality_score']:.2%}")
        
        if health['avg_processing_time']:
            print(f"Avg Processing Time: {health['avg_processing_time']:.2f} seconds")
        
        print(f"\nMetrics Summary:")
        for metric_name, stats in health['metrics_summary'].items():
            if 'business.' in metric_name or 'data_quality.' in metric_name:
                print(f"  {metric_name}: {stats['latest']}")
        
    except Exception as e:
        print(f"Pipeline failed: {e}")
        # Health status would still show partial metrics and error information
        health = pipeline.get_health_status()
        print(f"Error rate: {health['error_rate']:.2%}")

# Run the example
run_monitored_pipeline()

This exercise demonstrates a production-ready monitoring setup that tracks both technical metrics (performance, errors) and business metrics (order values, category distribution). The alerting system will notify you of both technical issues and business anomalies.

Common Mistakes & Troubleshooting

Mistake: Logging Too Much Detail New practitioners often log every variable and operation, creating noise that obscures important information. Focus on logging state changes, errors, and key business events. Log at INFO level for normal operations and DEBUG level for detailed troubleshooting information that you'll only need during development.

Solution: Create logging standards that specify what to log at each level:

  • ERROR: Failures that stop processing
  • WARNING: Issues that don't stop processing but need attention
  • INFO: Major state changes and business events
  • DEBUG: Detailed technical information for troubleshooting

Mistake: Alert Fatigue Setting alert thresholds too aggressively results in constant notifications for minor issues, training operators to ignore alerts entirely. This is dangerous—when a real crisis occurs, the alert might be dismissed as another false positive.

Solution: Start with conservative thresholds and gradually tighten them based on actual system behavior. Use alert severity levels and ensure critical alerts are rare but always actionable.

Mistake: Not Monitoring Business Metrics Technical monitoring (CPU, memory, execution time) is important, but ignoring business metrics means you might not notice when your pipeline is technically successful but producing incorrect results.

Solution: Always include business-relevant metrics like record counts, value ranges, and data quality scores. Monitor both the technical and business health of your pipelines.

Mistake: Missing Context in Alerts Alerts that just say "Pipeline failed" without context require additional investigation time. In a crisis, every minute counts.

Solution: Include relevant metrics, recent changes, and diagnostic hints in alert messages. Make your alerts actionable by providing the information needed to start troubleshooting immediately.

Troubleshooting Tip: When logs aren't appearing as expected, check your logging configuration. Ensure the log level is set appropriately and that log handlers are configured correctly. A common issue is setting the logger level too high, filtering out important messages.

Troubleshooting Tip: If metrics seem inconsistent, verify that your metric collection points are in the correct locations in your code flow. Metrics should be recorded after operations complete successfully, and error metrics should be recorded in exception handlers.

Summary & Next Steps

You've now built a comprehensive observability system that provides visibility into your data pipeline's technical performance, data quality, and business impact. The structured logging gives you detailed insights into what happened during each pipeline run, while metrics tracking shows you trends over time. The alerting system ensures you'll know about problems quickly, with enough context to respond effectively.

The key principles you've implemented are:

Structured, contextual logging that captures both technical and business information in a machine-readable format. This foundation enables powerful log analysis and correlation across pipeline runs.

Multi-dimensional metrics that track performance, data quality, and business outcomes. These metrics provide the quantitative foundation for alerting and trend analysis.

Intelligent alerting that balances sensitivity with noise reduction, ensuring you're notified of real problems while avoiding alert fatigue.

Health monitoring that gives you a comprehensive view of pipeline status across multiple dimensions, making it easy to assess overall system health at a glance.

To continue building your observability expertise, consider these next steps:

Integrate with monitoring tools like Prometheus and Grafana for metrics visualization, or ELK stack (Elasticsearch, Logstash, Kibana) for log analysis. These tools provide powerful dashboards and advanced alerting capabilities.

Add distributed tracing to track data flow across multiple services or pipeline stages. This becomes essential as your data architecture grows more complex.

Implement anomaly detection using statistical methods or machine learning to automatically identify unusual patterns that static thresholds might miss.

Build monitoring into your deployment process by including observability requirements in your pipeline development standards and deployment checklists.

Remember that observability is not a one-time setup—it's an ongoing practice that evolves with your systems. Start with the fundamentals you've learned here, then gradually add sophistication as your monitoring needs grow.

Learning Path: Data Pipeline Fundamentals

Previous

Data Pipeline Error Handling and Recovery Strategies

Next

Pipeline Testing: Unit Tests, Integration Tests, and Data Contracts

Related Articles

Data Engineering🔥 Expert

Building a Complete Modern Data Stack from Scratch

26 min
Data Engineering⚡ Practitioner

Cost Management in Cloud Data Platforms

28 min
Data Engineering🌱 Foundation

Real-Time Data: When to Use Streaming vs Batch Processing

21 min

On this page

  • Prerequisites
  • Understanding the Observability Stack
  • Implementing Structured Logging
  • Building Metrics and Monitoring
  • Designing Effective Alerts
  • Hands-On Exercise
  • Common Mistakes & Troubleshooting
  • Summary & Next Steps