
When your data pipeline silently fails at 3 AM, how long does it take you to notice? If you're like most data professionals starting out, the answer is probably "whenever someone complains that their dashboard is empty." This reactive approach to pipeline management is a recipe for sleepless nights and frustrated stakeholders.
Effective logging, alerting, and observability transform you from a firefighter into a proactive guardian of your data infrastructure. Instead of discovering failures hours or days later, you'll know within minutes when something goes wrong—and often prevent problems before they impact your users. This isn't just about writing log statements; it's about building a comprehensive monitoring strategy that gives you confidence in your data systems.
What you'll learn:
You should be comfortable with Python programming and have basic familiarity with data pipelines (ETL/ELT processes). No prior experience with monitoring tools is required—we'll build everything from fundamentals.
Before diving into implementation, let's establish what we mean by observability and how it differs from simple monitoring. Observability is your ability to understand what's happening inside your systems based on their external outputs. For data pipelines, this means having visibility into data flow, processing performance, and quality issues.
The observability stack consists of three pillars:
Logs are timestamped records of events that happened in your system. Think of them as a detailed diary of your pipeline's activities. When your pipeline processes a batch of customer data, logs capture what happened: "Started processing 10,000 customer records at 2023-10-15 09:30:15, encountered 3 invalid phone numbers, completed successfully in 45 seconds."
Metrics are numerical measurements of your system's behavior over time. Unlike logs, which are discrete events, metrics give you trends and patterns. Examples include records processed per minute, error rates, memory usage, and data freshness (how old is your most recent data).
Traces follow a single request or data batch through your entire pipeline, showing you the path it took and how long each step took. If your pipeline has multiple stages—extraction, transformation, validation, loading—traces show you the complete journey.
Let's start building these capabilities into a realistic data pipeline.
The foundation of good observability is structured logging. Unlike casual print statements or basic logging that produces human-readable text, structured logging creates machine-parseable records with consistent fields.
Here's a customer data processing pipeline with proper structured logging:
import logging
import json
import time
from datetime import datetime
from typing import Dict, Any, List
import pandas as pd
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='%(message)s', # We'll format the JSON ourselves
handlers=[
logging.FileHandler('pipeline.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class StructuredLogger:
def __init__(self, pipeline_name: str, run_id: str):
self.pipeline_name = pipeline_name
self.run_id = run_id
def log(self, level: str, message: str, **context):
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'pipeline': self.pipeline_name,
'run_id': self.run_id,
'level': level,
'message': message,
**context
}
logger.info(json.dumps(log_entry))
class CustomerDataPipeline:
def __init__(self):
self.run_id = f"run_{int(time.time())}"
self.log = StructuredLogger("customer_data_pipeline", self.run_id)
self.metrics = {}
def extract_data(self, source_path: str) -> pd.DataFrame:
self.log.log("INFO", "Starting data extraction",
source=source_path, stage="extract")
try:
start_time = time.time()
df = pd.read_csv(source_path)
extraction_time = time.time() - start_time
self.log.log("INFO", "Data extraction completed",
records_extracted=len(df),
extraction_time_seconds=round(extraction_time, 2),
columns=list(df.columns),
stage="extract")
self.metrics['records_extracted'] = len(df)
self.metrics['extraction_time'] = extraction_time
return df
except Exception as e:
self.log.log("ERROR", "Data extraction failed",
error=str(e),
error_type=type(e).__name__,
stage="extract")
raise
def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
self.log.log("INFO", "Starting data validation",
records_to_validate=len(df), stage="validate")
validation_results = {
'missing_customer_ids': df['customer_id'].isna().sum(),
'invalid_emails': 0,
'future_signup_dates': 0,
'duplicate_customers': df['customer_id'].duplicated().sum()
}
# Email validation
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
validation_results['invalid_emails'] = (~df['email'].str.match(email_pattern)).sum()
# Date validation
df['signup_date'] = pd.to_datetime(df['signup_date'], errors='coerce')
validation_results['future_signup_dates'] = (
df['signup_date'] > datetime.now()
).sum()
total_issues = sum(validation_results.values())
self.log.log("INFO", "Data validation completed",
validation_results=validation_results,
total_issues=total_issues,
stage="validate")
if total_issues > 0:
self.log.log("WARNING", "Data quality issues detected",
issue_count=total_issues,
issue_percentage=round(total_issues/len(df)*100, 2),
stage="validate")
self.metrics.update(validation_results)
return df
def process_data(self, df: pd.DataFrame) -> pd.DataFrame:
self.log.log("INFO", "Starting data processing",
records_to_process=len(df), stage="transform")
try:
# Clean email addresses
df['email'] = df['email'].str.lower().str.strip()
# Calculate customer tenure
df['signup_date'] = pd.to_datetime(df['signup_date'])
df['days_since_signup'] = (datetime.now() - df['signup_date']).dt.days
# Categorize customers
df['customer_segment'] = pd.cut(
df['days_since_signup'],
bins=[0, 30, 365, float('inf')],
labels=['new', 'regular', 'loyal']
)
processed_records = len(df)
self.log.log("INFO", "Data processing completed",
records_processed=processed_records,
new_customers=len(df[df['customer_segment'] == 'new']),
regular_customers=len(df[df['customer_segment'] == 'regular']),
loyal_customers=len(df[df['customer_segment'] == 'loyal']),
stage="transform")
self.metrics['records_processed'] = processed_records
return df
except Exception as e:
self.log.log("ERROR", "Data processing failed",
error=str(e),
error_type=type(e).__name__,
stage="transform")
raise
def load_data(self, df: pd.DataFrame, destination_path: str):
self.log.log("INFO", "Starting data load",
records_to_load=len(df),
destination=destination_path,
stage="load")
try:
start_time = time.time()
df.to_csv(destination_path, index=False)
load_time = time.time() - start_time
self.log.log("INFO", "Data load completed",
records_loaded=len(df),
load_time_seconds=round(load_time, 2),
stage="load")
self.metrics['records_loaded'] = len(df)
self.metrics['load_time'] = load_time
except Exception as e:
self.log.log("ERROR", "Data load failed",
error=str(e),
error_type=type(e).__name__,
stage="load")
raise
def run_pipeline(self, source_path: str, destination_path: str):
pipeline_start = time.time()
self.log.log("INFO", "Pipeline execution started",
source=source_path,
destination=destination_path)
try:
# Execute pipeline stages
df = self.extract_data(source_path)
df = self.validate_data(df)
df = self.process_data(df)
self.load_data(df, destination_path)
# Calculate total metrics
total_time = time.time() - pipeline_start
self.metrics['total_execution_time'] = total_time
self.log.log("INFO", "Pipeline execution completed successfully",
total_time_seconds=round(total_time, 2),
final_metrics=self.metrics)
except Exception as e:
total_time = time.time() - pipeline_start
self.log.log("ERROR", "Pipeline execution failed",
total_time_seconds=round(total_time, 2),
error=str(e),
partial_metrics=self.metrics)
raise
This structured approach provides several advantages over basic logging:
pipeline, run_id, and stage, making it easy to filter and search logs.Tip: Always include a unique
run_idfor each pipeline execution. This makes it easy to trace all log entries for a specific run, especially when multiple pipeline instances are running concurrently.
While logs tell you what happened, metrics tell you how your system is behaving over time. Let's extend our pipeline to collect and expose metrics that we can monitor.
import time
from collections import defaultdict
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class MetricPoint:
timestamp: datetime
value: float
tags: Dict[str, str]
class MetricsCollector:
def __init__(self):
self.metrics = defaultdict(list)
def record_counter(self, name: str, value: float = 1.0, tags: Dict[str, str] = None):
"""Record a counter metric (cumulative value)"""
self.metrics[name].append(MetricPoint(
timestamp=datetime.utcnow(),
value=value,
tags=tags or {}
))
def record_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
"""Record a gauge metric (current value)"""
self.metrics[name].append(MetricPoint(
timestamp=datetime.utcnow(),
value=value,
tags=tags or {}
))
def record_timing(self, name: str, duration_seconds: float, tags: Dict[str, str] = None):
"""Record a timing metric"""
self.metrics[f"{name}.duration"].append(MetricPoint(
timestamp=datetime.utcnow(),
value=duration_seconds,
tags=tags or {}
))
def get_metrics_summary(self, hours: int = 1) -> Dict[str, Any]:
"""Get a summary of metrics from the last N hours"""
cutoff = datetime.utcnow() - timedelta(hours=hours)
summary = {}
for metric_name, points in self.metrics.items():
recent_points = [p for p in points if p.timestamp > cutoff]
if recent_points:
values = [p.value for p in recent_points]
summary[metric_name] = {
'count': len(values),
'avg': sum(values) / len(values),
'min': min(values),
'max': max(values),
'latest': values[-1],
'latest_timestamp': recent_points[-1].timestamp.isoformat()
}
return summary
class MonitoredPipeline(CustomerDataPipeline):
def __init__(self):
super().__init__()
self.metrics_collector = MetricsCollector()
def extract_data(self, source_path: str) -> pd.DataFrame:
start_time = time.time()
try:
df = super().extract_data(source_path)
# Record success metrics
duration = time.time() - start_time
self.metrics_collector.record_timing("pipeline.extract", duration)
self.metrics_collector.record_counter("pipeline.extract.success")
self.metrics_collector.record_gauge("pipeline.records_extracted", len(df))
return df
except Exception as e:
# Record failure metrics
duration = time.time() - start_time
self.metrics_collector.record_timing("pipeline.extract", duration,
tags={"status": "failed"})
self.metrics_collector.record_counter("pipeline.extract.error",
tags={"error_type": type(e).__name__})
raise
def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
df = super().validate_data(df)
# Record data quality metrics
total_records = len(df)
missing_ids = df['customer_id'].isna().sum()
duplicates = df['customer_id'].duplicated().sum()
self.metrics_collector.record_gauge("data_quality.missing_customer_ids", missing_ids)
self.metrics_collector.record_gauge("data_quality.duplicate_customers", duplicates)
self.metrics_collector.record_gauge("data_quality.completeness_rate",
(total_records - missing_ids) / total_records)
# Alert on data quality issues
if missing_ids > total_records * 0.05: # More than 5% missing IDs
self.log.log("CRITICAL", "High rate of missing customer IDs",
missing_count=missing_ids,
missing_rate=missing_ids/total_records,
threshold=0.05)
return df
def get_health_status(self) -> Dict[str, Any]:
"""Return current health status of the pipeline"""
metrics_summary = self.metrics_collector.get_metrics_summary()
# Calculate health indicators
health_status = {
'overall_status': 'healthy',
'last_successful_run': None,
'error_rate': 0.0,
'avg_processing_time': None,
'data_quality_score': 1.0,
'metrics_summary': metrics_summary
}
# Check for recent errors
if 'pipeline.extract.error' in metrics_summary:
error_count = metrics_summary['pipeline.extract.error']['count']
total_runs = metrics_summary.get('pipeline.extract.success', {}).get('count', 0) + error_count
health_status['error_rate'] = error_count / max(total_runs, 1)
if health_status['error_rate'] > 0.1: # More than 10% failure rate
health_status['overall_status'] = 'degraded'
# Check processing time trends
if 'pipeline.extract.duration' in metrics_summary:
health_status['avg_processing_time'] = metrics_summary['pipeline.extract.duration']['avg']
# Check data quality
if 'data_quality.completeness_rate' in metrics_summary:
completeness = metrics_summary['data_quality.completeness_rate']['latest']
if completeness < 0.95: # Less than 95% complete
health_status['overall_status'] = 'degraded'
health_status['data_quality_score'] = completeness
return health_status
This metrics system provides several key capabilities:
Performance Monitoring: We track how long each stage takes, allowing us to identify performance degradation over time.
Data Quality Metrics: We monitor missing data, duplicates, and completeness rates—critical indicators for data pipeline health.
Error Tracking: We count different types of errors and calculate error rates, helping identify when problems are becoming systematic rather than isolated incidents.
The goal of alerting is to notify you of problems that require human intervention while avoiding alert fatigue from false positives or minor issues. Let's build an alerting system that follows best practices.
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from enum import Enum
from typing import List, Callable
import json
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
class AlertRule:
def __init__(self, name: str, condition: Callable, severity: AlertSeverity,
message: str, cooldown_minutes: int = 60):
self.name = name
self.condition = condition # Function that returns True if alert should fire
self.severity = severity
self.message = message
self.cooldown_minutes = cooldown_minutes
self.last_fired = None
def should_fire(self, metrics: Dict[str, Any]) -> bool:
"""Check if this alert should fire given current metrics"""
# Check cooldown period
if self.last_fired:
minutes_since_last = (datetime.utcnow() - self.last_fired).total_minutes()
if minutes_since_last < self.cooldown_minutes:
return False
return self.condition(metrics)
def fire(self):
"""Mark this alert as fired"""
self.last_fired = datetime.utcnow()
class AlertManager:
def __init__(self, email_config: Dict[str, str] = None):
self.rules = []
self.email_config = email_config
self.alert_history = []
def add_rule(self, rule: AlertRule):
"""Add an alert rule to the manager"""
self.rules.append(rule)
def check_alerts(self, metrics: Dict[str, Any], pipeline_logger: StructuredLogger):
"""Check all alert rules against current metrics"""
for rule in self.rules:
if rule.should_fire(metrics):
self._send_alert(rule, metrics, pipeline_logger)
rule.fire()
def _send_alert(self, rule: AlertRule, metrics: Dict[str, Any], logger: StructuredLogger):
"""Send an alert notification"""
alert_data = {
'rule_name': rule.name,
'severity': rule.severity.value,
'message': rule.message,
'metrics_snapshot': metrics,
'timestamp': datetime.utcnow().isoformat()
}
# Log the alert
logger.log("ALERT", f"Alert fired: {rule.name}",
severity=rule.severity.value,
alert_message=rule.message,
metrics=metrics)
# Store in history
self.alert_history.append(alert_data)
# Send email if configured
if self.email_config and rule.severity in [AlertSeverity.WARNING, AlertSeverity.CRITICAL]:
self._send_email_alert(alert_data)
# Print to console for immediate visibility
print(f"🚨 ALERT [{rule.severity.value.upper()}]: {rule.name}")
print(f" {rule.message}")
print(f" Time: {alert_data['timestamp']}")
def _send_email_alert(self, alert_data: Dict[str, Any]):
"""Send email notification (if configured)"""
if not self.email_config:
return
try:
msg = MIMEMultipart()
msg['From'] = self.email_config['sender']
msg['To'] = self.email_config['recipient']
msg['Subject'] = f"Pipeline Alert: {alert_data['rule_name']} [{alert_data['severity'].upper()}]"
body = f"""
Pipeline Alert Notification
Rule: {alert_data['rule_name']}
Severity: {alert_data['severity']}
Time: {alert_data['timestamp']}
Message: {alert_data['message']}
Metrics Snapshot:
{json.dumps(alert_data['metrics_snapshot'], indent=2)}
"""
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
if self.email_config.get('username'):
server.starttls()
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
server.quit()
except Exception as e:
print(f"Failed to send email alert: {e}")
# Define specific alert conditions
def high_error_rate(metrics: Dict[str, Any]) -> bool:
"""Alert if error rate exceeds 10%"""
summary = metrics.get('metrics_summary', {})
if 'pipeline.extract.error' in summary and 'pipeline.extract.success' in summary:
errors = summary['pipeline.extract.error']['count']
successes = summary['pipeline.extract.success']['count']
error_rate = errors / (errors + successes)
return error_rate > 0.1
return False
def low_data_quality(metrics: Dict[str, Any]) -> bool:
"""Alert if data completeness drops below 95%"""
summary = metrics.get('metrics_summary', {})
if 'data_quality.completeness_rate' in summary:
completeness = summary['data_quality.completeness_rate']['latest']
return completeness < 0.95
return False
def slow_processing(metrics: Dict[str, Any]) -> bool:
"""Alert if processing time exceeds normal range"""
summary = metrics.get('metrics_summary', {})
if 'pipeline.extract.duration' in summary:
avg_duration = summary['pipeline.extract.duration']['avg']
return avg_duration > 300 # More than 5 minutes
return False
def no_recent_data(metrics: Dict[str, Any]) -> bool:
"""Alert if no data has been processed recently"""
summary = metrics.get('metrics_summary', {})
if 'pipeline.records_extracted' in summary:
latest_timestamp = summary['pipeline.records_extracted']['latest_timestamp']
latest_time = datetime.fromisoformat(latest_timestamp.replace('Z', '+00:00'))
hours_since_last = (datetime.utcnow() - latest_time.replace(tzinfo=None)).total_seconds() / 3600
return hours_since_last > 24 # No data for 24+ hours
return False
class AlertingPipeline(MonitoredPipeline):
def __init__(self, email_config: Dict[str, str] = None):
super().__init__()
self.alert_manager = AlertManager(email_config)
# Set up alert rules
self.alert_manager.add_rule(AlertRule(
name="high_error_rate",
condition=high_error_rate,
severity=AlertSeverity.CRITICAL,
message="Pipeline error rate exceeds 10% - immediate attention required",
cooldown_minutes=30
))
self.alert_manager.add_rule(AlertRule(
name="low_data_quality",
condition=low_data_quality,
severity=AlertSeverity.WARNING,
message="Data completeness below 95% - check data sources",
cooldown_minutes=60
))
self.alert_manager.add_rule(AlertRule(
name="slow_processing",
condition=slow_processing,
severity=AlertSeverity.WARNING,
message="Processing time significantly higher than normal",
cooldown_minutes=120
))
self.alert_manager.add_rule(AlertRule(
name="no_recent_data",
condition=no_recent_data,
severity=AlertSeverity.CRITICAL,
message="No data processed in the last 24 hours - pipeline may be down",
cooldown_minutes=360 # 6 hours
))
def run_pipeline(self, source_path: str, destination_path: str):
"""Run pipeline with alerting"""
super().run_pipeline(source_path, destination_path)
# Check alerts after pipeline execution
health_status = self.get_health_status()
self.alert_manager.check_alerts(health_status, self.log)
This alerting system implements several best practices:
Severity Levels: Different types of issues get different severities, allowing you to prioritize responses appropriately.
Cooldown Periods: Alerts won't fire repeatedly for the same issue, preventing notification spam.
Contextual Information: Alerts include relevant metrics and snapshots, giving you the information needed to diagnose problems.
Multiple Channels: Alerts can go to logs, email, and console output, ensuring visibility across different contexts.
Warning: Be careful with alert thresholds. Start with conservative values and adjust based on your actual system behavior. Too many false positives will train you to ignore alerts entirely.
Let's put everything together by creating a complete monitoring setup for a real-world scenario. You'll build a pipeline that processes e-commerce order data and implement comprehensive observability.
Create a sample dataset first:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# Generate sample e-commerce data
def create_sample_data(filename: str, num_records: int = 1000):
np.random.seed(42) # For reproducible results
# Generate realistic e-commerce order data
base_date = datetime.now() - timedelta(days=30)
dates = [base_date + timedelta(days=np.random.randint(0, 30)) for _ in range(num_records)]
data = {
'order_id': [f"ORD-{1000 + i}" for i in range(num_records)],
'customer_id': np.random.choice([f"CUST-{i}" for i in range(1, 201)], num_records),
'order_date': dates,
'product_category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Home'], num_records),
'order_amount': np.random.lognormal(3, 1, num_records).round(2),
'payment_method': np.random.choice(['credit_card', 'debit_card', 'paypal'], num_records),
'shipping_address': [f"{np.random.randint(1, 9999)} Main St, City" for _ in range(num_records)]
}
# Introduce some data quality issues (5% of records)
issues_count = int(num_records * 0.05)
issue_indices = np.random.choice(num_records, issues_count, replace=False)
for idx in issue_indices:
if np.random.random() < 0.3: # Missing customer ID
data['customer_id'][idx] = None
if np.random.random() < 0.2: # Negative amount
data['order_amount'][idx] = -abs(data['order_amount'][idx])
if np.random.random() < 0.3: # Future date
data['order_date'][idx] = datetime.now() + timedelta(days=np.random.randint(1, 10))
df = pd.DataFrame(data)
df.to_csv(filename, index=False)
print(f"Created sample data: {filename} with {num_records} records")
# Create the sample data
create_sample_data('ecommerce_orders.csv', 1000)
Now implement the complete monitoring pipeline:
class ECommerceOrderPipeline(AlertingPipeline):
def __init__(self, email_config: Dict[str, str] = None):
super().__init__(email_config)
# Add e-commerce specific alert rules
self.alert_manager.add_rule(AlertRule(
name="high_negative_amounts",
condition=self._check_negative_amounts,
severity=AlertSeverity.WARNING,
message="High percentage of negative order amounts detected",
cooldown_minutes=60
))
self.alert_manager.add_rule(AlertRule(
name="unusual_order_volume",
condition=self._check_order_volume,
severity=AlertSeverity.INFO,
message="Order volume significantly different from normal",
cooldown_minutes=120
))
def _check_negative_amounts(self, metrics: Dict[str, Any]) -> bool:
summary = metrics.get('metrics_summary', {})
if 'data_quality.negative_amounts' in summary:
negative_count = summary['data_quality.negative_amounts']['latest']
total_records = summary.get('pipeline.records_extracted', {}).get('latest', 1)
return (negative_count / total_records) > 0.02 # More than 2%
return False
def _check_order_volume(self, metrics: Dict[str, Any]) -> bool:
summary = metrics.get('metrics_summary', {})
if 'pipeline.records_extracted' in summary:
current_volume = summary['pipeline.records_extracted']['latest']
# Simple anomaly detection: alert if volume is 50% higher/lower than average
avg_volume = summary['pipeline.records_extracted']['avg']
return abs(current_volume - avg_volume) > (avg_volume * 0.5)
return False
def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
self.log.log("INFO", "Starting e-commerce order validation",
records_to_validate=len(df), stage="validate")
# Standard validations
validation_results = {
'missing_order_ids': df['order_id'].isna().sum(),
'missing_customer_ids': df['customer_id'].isna().sum(),
'duplicate_orders': df['order_id'].duplicated().sum(),
'negative_amounts': (df['order_amount'] < 0).sum(),
'future_orders': 0,
'invalid_payment_methods': 0
}
# Date validation
df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce')
validation_results['future_orders'] = (df['order_date'] > datetime.now()).sum()
# Payment method validation
valid_methods = ['credit_card', 'debit_card', 'paypal']
validation_results['invalid_payment_methods'] = (
~df['payment_method'].isin(valid_methods)
).sum()
# Record detailed metrics
for metric_name, count in validation_results.items():
self.metrics_collector.record_gauge(f"data_quality.{metric_name}", count)
# Calculate business metrics
total_amount = df['order_amount'][df['order_amount'] > 0].sum()
avg_order_value = df['order_amount'][df['order_amount'] > 0].mean()
self.metrics_collector.record_gauge("business.total_order_value", total_amount)
self.metrics_collector.record_gauge("business.avg_order_value", avg_order_value)
category_counts = df['product_category'].value_counts()
for category, count in category_counts.items():
self.metrics_collector.record_gauge("business.orders_by_category", count,
tags={"category": category})
total_issues = sum(validation_results.values())
self.log.log("INFO", "E-commerce order validation completed",
validation_results=validation_results,
total_issues=total_issues,
total_order_value=total_amount,
avg_order_value=round(avg_order_value, 2),
stage="validate")
return df
# Example usage with monitoring
def run_monitored_pipeline():
# Optional: configure email alerts
email_config = {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'sender': 'your-pipeline@company.com',
'recipient': 'data-team@company.com',
'username': 'your-username',
'password': 'your-password'
}
# Run pipeline with full monitoring
pipeline = ECommerceOrderPipeline() # Remove email_config for this example
try:
pipeline.run_pipeline('ecommerce_orders.csv', 'processed_orders.csv')
# Display health status
health = pipeline.get_health_status()
print("\n" + "="*50)
print("PIPELINE HEALTH STATUS")
print("="*50)
print(f"Overall Status: {health['overall_status']}")
print(f"Error Rate: {health['error_rate']:.2%}")
print(f"Data Quality Score: {health['data_quality_score']:.2%}")
if health['avg_processing_time']:
print(f"Avg Processing Time: {health['avg_processing_time']:.2f} seconds")
print(f"\nMetrics Summary:")
for metric_name, stats in health['metrics_summary'].items():
if 'business.' in metric_name or 'data_quality.' in metric_name:
print(f" {metric_name}: {stats['latest']}")
except Exception as e:
print(f"Pipeline failed: {e}")
# Health status would still show partial metrics and error information
health = pipeline.get_health_status()
print(f"Error rate: {health['error_rate']:.2%}")
# Run the example
run_monitored_pipeline()
This exercise demonstrates a production-ready monitoring setup that tracks both technical metrics (performance, errors) and business metrics (order values, category distribution). The alerting system will notify you of both technical issues and business anomalies.
Mistake: Logging Too Much Detail New practitioners often log every variable and operation, creating noise that obscures important information. Focus on logging state changes, errors, and key business events. Log at INFO level for normal operations and DEBUG level for detailed troubleshooting information that you'll only need during development.
Solution: Create logging standards that specify what to log at each level:
Mistake: Alert Fatigue Setting alert thresholds too aggressively results in constant notifications for minor issues, training operators to ignore alerts entirely. This is dangerous—when a real crisis occurs, the alert might be dismissed as another false positive.
Solution: Start with conservative thresholds and gradually tighten them based on actual system behavior. Use alert severity levels and ensure critical alerts are rare but always actionable.
Mistake: Not Monitoring Business Metrics Technical monitoring (CPU, memory, execution time) is important, but ignoring business metrics means you might not notice when your pipeline is technically successful but producing incorrect results.
Solution: Always include business-relevant metrics like record counts, value ranges, and data quality scores. Monitor both the technical and business health of your pipelines.
Mistake: Missing Context in Alerts Alerts that just say "Pipeline failed" without context require additional investigation time. In a crisis, every minute counts.
Solution: Include relevant metrics, recent changes, and diagnostic hints in alert messages. Make your alerts actionable by providing the information needed to start troubleshooting immediately.
Troubleshooting Tip: When logs aren't appearing as expected, check your logging configuration. Ensure the log level is set appropriately and that log handlers are configured correctly. A common issue is setting the logger level too high, filtering out important messages.
Troubleshooting Tip: If metrics seem inconsistent, verify that your metric collection points are in the correct locations in your code flow. Metrics should be recorded after operations complete successfully, and error metrics should be recorded in exception handlers.
You've now built a comprehensive observability system that provides visibility into your data pipeline's technical performance, data quality, and business impact. The structured logging gives you detailed insights into what happened during each pipeline run, while metrics tracking shows you trends over time. The alerting system ensures you'll know about problems quickly, with enough context to respond effectively.
The key principles you've implemented are:
Structured, contextual logging that captures both technical and business information in a machine-readable format. This foundation enables powerful log analysis and correlation across pipeline runs.
Multi-dimensional metrics that track performance, data quality, and business outcomes. These metrics provide the quantitative foundation for alerting and trend analysis.
Intelligent alerting that balances sensitivity with noise reduction, ensuring you're notified of real problems while avoiding alert fatigue.
Health monitoring that gives you a comprehensive view of pipeline status across multiple dimensions, making it easy to assess overall system health at a glance.
To continue building your observability expertise, consider these next steps:
Integrate with monitoring tools like Prometheus and Grafana for metrics visualization, or ELK stack (Elasticsearch, Logstash, Kibana) for log analysis. These tools provide powerful dashboards and advanced alerting capabilities.
Add distributed tracing to track data flow across multiple services or pipeline stages. This becomes essential as your data architecture grows more complex.
Implement anomaly detection using statistical methods or machine learning to automatically identify unusual patterns that static thresholds might miss.
Build monitoring into your deployment process by including observability requirements in your pipeline development standards and deployment checklists.
Remember that observability is not a one-time setup—it's an ongoing practice that evolves with your systems. Start with the fundamentals you've learned here, then gradually add sophistication as your monitoring needs grow.
Learning Path: Data Pipeline Fundamentals