Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Building Your First Data Pipeline with Python

Building Your First Data Pipeline with Python

Data Engineering🔥 Expert25 min readMar 29, 2026Updated Mar 29, 2026
Table of Contents
  • Prerequisites
  • Understanding Data Pipeline Architecture
  • Building the Pipeline Foundation
  • Advanced Data Ingestion Strategies
  • Implementing Data Quality Validation
  • Advanced Data Transformation Engine
  • Production Data Storage and Output Management

Building Your First Data Pipeline with Python

You're managing a growing e-commerce platform, and data is scattered across multiple systems: user activity streams from your web analytics, transaction records from your payment processor, product catalog updates from your inventory management system, and customer service tickets from your support platform. Your executive team needs daily reports, your data science team needs clean datasets for machine learning models, and your engineering team needs real-time alerts when system performance degrades.

This is the reality for most data professionals today—valuable insights locked away in siloed systems, waiting to be unlocked through robust data pipelines. By the end of this lesson, you'll have built a production-ready data pipeline that can ingest, transform, and deliver data across your organization with the reliability and scalability that enterprise systems demand.

What you'll learn:

  • Design and implement a multi-stage data pipeline architecture with proper error handling and monitoring
  • Build resilient data ingestion systems that handle various data sources and formats
  • Implement advanced transformation logic using pandas and custom Python functions
  • Create robust data quality validation and testing frameworks
  • Deploy monitoring, alerting, and recovery mechanisms for production reliability
  • Apply performance optimization techniques for processing large datasets
  • Integrate with external systems and databases using best practices

Prerequisites

This lesson assumes you have solid Python programming experience, familiarity with pandas for data manipulation, basic understanding of SQL databases, and experience with object-oriented programming concepts. You should also be comfortable with command-line operations and have Python 3.8+ installed with pip package management.

Understanding Data Pipeline Architecture

Before diving into code, we need to understand what makes a data pipeline robust at the architectural level. A production data pipeline isn't just a script that moves data from point A to point B—it's a sophisticated system that handles failures gracefully, provides visibility into operations, and scales with your organization's needs.

The pipeline we're building follows the Extract-Transform-Load (ETL) pattern, but with modern enhancements. Our architecture includes five key stages: ingestion (handling multiple data sources), validation (ensuring data quality), transformation (business logic and enrichment), storage (multiple output destinations), and monitoring (operational visibility).

import logging
import json
import pandas as pd
import sqlite3
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from pathlib import Path
import time
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# Configure logging for production monitoring
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('pipeline.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

The logging configuration above isn't just good practice—it's essential for production operations. We're writing to both file and console, with timestamps and structured formatting that makes debugging easier when things go wrong at 2 AM.

Building the Pipeline Foundation

Our pipeline foundation centers around a configurable, extensible base class that handles common concerns like error handling, retry logic, and state management. This approach allows us to focus on business logic while ensuring operational robustness.

@dataclass
class PipelineConfig:
    """Configuration container for pipeline settings"""
    source_db_path: str = "source_data.db"
    target_db_path: str = "analytics.db"
    api_base_url: str = "https://api.example.com"
    api_key: str = ""
    batch_size: int = 1000
    max_retries: int = 3
    retry_delay: int = 5
    quality_threshold: float = 0.95
    email_alerts: bool = True
    smtp_server: str = "smtp.gmail.com"
    alert_recipients: List[str] = None
    
    def __post_init__(self):
        if self.alert_recipients is None:
            self.alert_recipients = []

class PipelineStage:
    """Base class for pipeline stages with error handling and monitoring"""
    
    def __init__(self, name: str, config: PipelineConfig):
        self.name = name
        self.config = config
        self.logger = logging.getLogger(f"pipeline.{name}")
        self.metrics = {
            'records_processed': 0,
            'errors': 0,
            'start_time': None,
            'end_time': None
        }
    
    def execute(self, data: Any = None) -> Any:
        """Execute the pipeline stage with monitoring and error handling"""
        self.metrics['start_time'] = datetime.now()
        self.logger.info(f"Starting {self.name} stage")
        
        try:
            result = self._execute_with_retry(data)
            self.metrics['end_time'] = datetime.now()
            duration = (self.metrics['end_time'] - self.metrics['start_time']).total_seconds()
            
            self.logger.info(
                f"Completed {self.name} stage - "
                f"Processed: {self.metrics['records_processed']}, "
                f"Errors: {self.metrics['errors']}, "
                f"Duration: {duration:.2f}s"
            )
            return result
            
        except Exception as e:
            self.metrics['end_time'] = datetime.now()
            self.logger.error(f"Stage {self.name} failed: {str(e)}")
            if self.config.email_alerts:
                self._send_alert(f"Pipeline stage {self.name} failed", str(e))
            raise
    
    def _execute_with_retry(self, data: Any) -> Any:
        """Execute with retry logic for transient failures"""
        last_exception = None
        
        for attempt in range(self.config.max_retries):
            try:
                return self._execute(data)
            except Exception as e:
                last_exception = e
                self.logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
                if attempt < self.config.max_retries - 1:
                    time.sleep(self.config.retry_delay * (2 ** attempt))  # Exponential backoff
                    
        raise last_exception
    
    def _execute(self, data: Any) -> Any:
        """Override this method in concrete stages"""
        raise NotImplementedError("Subclasses must implement _execute method")
    
    def _send_alert(self, subject: str, message: str):
        """Send email alert for critical failures"""
        try:
            msg = MIMEMultipart()
            msg['From'] = "data-pipeline@company.com"
            msg['Subject'] = f"[DATA PIPELINE ALERT] {subject}"
            
            body = f"""
            Pipeline Alert Details:
            Stage: {self.name}
            Time: {datetime.now().isoformat()}
            Error: {message}
            
            Metrics:
            Records Processed: {self.metrics['records_processed']}
            Errors: {self.metrics['errors']}
            """
            
            msg.attach(MIMEText(body, 'plain'))
            
            # Note: In production, use proper SMTP authentication
            # This is a simplified example
            self.logger.info(f"Alert would be sent: {subject}")
            
        except Exception as e:
            self.logger.error(f"Failed to send alert: {str(e)}")

This foundation provides several critical capabilities. The retry mechanism with exponential backoff handles transient failures gracefully. The metrics collection gives us operational visibility. The alert system ensures that failures don't go unnoticed in production environments.

Advanced Data Ingestion Strategies

Data ingestion is where most pipelines fail in production. Systems go down, APIs change, file formats evolve, and network connections drop. Our ingestion stage needs to handle all of these scenarios while maintaining data integrity and providing clear visibility into what's happening.

class DataIngestionStage(PipelineStage):
    """Handles multiple data sources with robust error handling"""
    
    def __init__(self, config: PipelineConfig):
        super().__init__("ingestion", config)
        self.session = requests.Session()
        self.session.headers.update({'Authorization': f'Bearer {config.api_key}'})
        
    def _execute(self, data: Any = None) -> Dict[str, pd.DataFrame]:
        """Ingest data from multiple sources concurrently"""
        ingestion_tasks = [
            ('transactions', self._ingest_database_data),
            ('user_events', self._ingest_api_data),
            ('product_catalog', self._ingest_file_data)
        ]
        
        datasets = {}
        
        # Use thread pool for concurrent ingestion
        with ThreadPoolExecutor(max_workers=3) as executor:
            future_to_source = {
                executor.submit(task_func): source_name 
                for source_name, task_func in ingestion_tasks
            }
            
            for future in as_completed(future_to_source):
                source_name = future_to_source[future]
                try:
                    datasets[source_name] = future.result()
                    self.logger.info(f"Successfully ingested {source_name}")
                except Exception as e:
                    self.logger.error(f"Failed to ingest {source_name}: {str(e)}")
                    self.metrics['errors'] += 1
                    # Continue with other sources even if one fails
        
        total_records = sum(len(df) for df in datasets.values())
        self.metrics['records_processed'] = total_records
        
        if not datasets:
            raise Exception("Failed to ingest data from any source")
            
        return datasets
    
    def _ingest_database_data(self) -> pd.DataFrame:
        """Ingest transaction data from source database"""
        try:
            conn = sqlite3.connect(self.config.source_db_path)
            
            # Use incremental loading based on last processed timestamp
            last_processed = self._get_last_processed_timestamp('transactions')
            
            query = """
            SELECT 
                transaction_id,
                user_id,
                product_id,
                amount,
                currency,
                transaction_date,
                payment_method,
                status
            FROM transactions 
            WHERE transaction_date > ? 
            ORDER BY transaction_date
            """
            
            df = pd.read_sql_query(query, conn, params=[last_processed])
            conn.close()
            
            # Validate critical fields
            if df.empty:
                self.logger.info("No new transactions to process")
                return pd.DataFrame()
            
            # Data type validation and conversion
            df['transaction_date'] = pd.to_datetime(df['transaction_date'])
            df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
            
            # Remove records with invalid amounts
            invalid_amounts = df['amount'].isna().sum()
            if invalid_amounts > 0:
                self.logger.warning(f"Removing {invalid_amounts} records with invalid amounts")
                df = df.dropna(subset=['amount'])
            
            self.logger.info(f"Ingested {len(df)} transaction records")
            return df
            
        except Exception as e:
            self.logger.error(f"Database ingestion failed: {str(e)}")
            raise
    
    def _ingest_api_data(self) -> pd.DataFrame:
        """Ingest user event data from API with pagination"""
        try:
            all_events = []
            page = 1
            last_processed = self._get_last_processed_timestamp('user_events')
            
            while True:
                params = {
                    'page': page,
                    'limit': 1000,
                    'since': last_processed.isoformat() if last_processed else None
                }
                
                response = self.session.get(
                    f"{self.config.api_base_url}/events",
                    params=params,
                    timeout=30
                )
                
                if response.status_code == 404 and page == 1:
                    # API endpoint might not have data yet
                    self.logger.info("No user events available from API")
                    return pd.DataFrame()
                
                response.raise_for_status()
                data = response.json()
                
                events = data.get('events', [])
                if not events:
                    break
                
                all_events.extend(events)
                
                # Check if there are more pages
                if not data.get('has_next_page', False):
                    break
                    
                page += 1
                
                # Rate limiting courtesy
                time.sleep(0.1)
            
            if not all_events:
                self.logger.info("No new user events to process")
                return pd.DataFrame()
            
            df = pd.DataFrame(all_events)
            
            # Normalize nested JSON data
            if 'properties' in df.columns:
                properties_df = pd.json_normalize(df['properties'])
                df = pd.concat([df.drop('properties', axis=1), properties_df], axis=1)
            
            # Convert timestamp strings to datetime
            df['event_time'] = pd.to_datetime(df['event_time'])
            
            self.logger.info(f"Ingested {len(df)} user event records")
            return df
            
        except requests.RequestException as e:
            self.logger.error(f"API ingestion failed: {str(e)}")
            raise
        except Exception as e:
            self.logger.error(f"API data processing failed: {str(e)}")
            raise
    
    def _ingest_file_data(self) -> pd.DataFrame:
        """Ingest product catalog from file with format detection"""
        try:
            # Look for the most recent product catalog file
            data_dir = Path("data/incoming")
            catalog_files = list(data_dir.glob("product_catalog_*.csv"))
            
            if not catalog_files:
                self.logger.warning("No product catalog files found")
                return pd.DataFrame()
            
            # Get the most recent file
            latest_file = max(catalog_files, key=lambda x: x.stat().st_mtime)
            
            # Detect file encoding
            import chardet
            with open(latest_file, 'rb') as f:
                raw_data = f.read(10000)  # Read first 10KB for detection
                encoding_result = chardet.detect(raw_data)
                encoding = encoding_result['encoding']
            
            # Read with detected encoding
            df = pd.read_csv(latest_file, encoding=encoding)
            
            # Validate required columns
            required_columns = ['product_id', 'product_name', 'category', 'price']
            missing_columns = set(required_columns) - set(df.columns)
            
            if missing_columns:
                raise ValueError(f"Missing required columns: {missing_columns}")
            
            # Clean and validate data
            df['price'] = pd.to_numeric(df['price'], errors='coerce')
            df = df.dropna(subset=['product_id', 'price'])
            
            # Remove duplicates, keeping the last occurrence
            df = df.drop_duplicates(subset=['product_id'], keep='last')
            
            self.logger.info(f"Ingested {len(df)} product catalog records from {latest_file.name}")
            return df
            
        except Exception as e:
            self.logger.error(f"File ingestion failed: {str(e)}")
            raise
    
    def _get_last_processed_timestamp(self, source: str) -> Optional[datetime]:
        """Get the last processed timestamp for incremental loading"""
        try:
            conn = sqlite3.connect(self.config.target_db_path)
            cursor = conn.cursor()
            
            cursor.execute("""
                SELECT MAX(last_processed_at) 
                FROM pipeline_state 
                WHERE source_name = ?
            """, (source,))
            
            result = cursor.fetchone()[0]
            conn.close()
            
            if result:
                return datetime.fromisoformat(result)
            else:
                # Default to 30 days ago for initial run
                return datetime.now() - timedelta(days=30)
                
        except Exception:
            # If state table doesn't exist or other error, start from 30 days ago
            return datetime.now() - timedelta(days=30)

The ingestion stage demonstrates several advanced patterns. Concurrent processing speeds up data collection from multiple sources. Incremental loading prevents reprocessing the same data. Robust error handling ensures one failing source doesn't break the entire pipeline. Format detection and encoding handling prevent common file processing failures.

Implementing Data Quality Validation

Data quality issues are silent killers in analytics systems. Bad data leads to wrong conclusions, which lead to poor business decisions. Our validation stage implements comprehensive checks that catch quality issues early and provide detailed reporting on data health.

class DataQualityValidator:
    """Comprehensive data quality validation with detailed reporting"""
    
    def __init__(self, config: PipelineConfig):
        self.config = config
        self.logger = logging.getLogger("pipeline.quality_validator")
        self.quality_rules = self._load_quality_rules()
    
    def validate_datasets(self, datasets: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
        """Validate all datasets and return comprehensive quality report"""
        quality_report = {
            'overall_score': 0.0,
            'datasets': {},
            'critical_issues': [],
            'warnings': [],
            'timestamp': datetime.now().isoformat()
        }
        
        total_score = 0.0
        dataset_count = 0
        
        for dataset_name, df in datasets.items():
            if df.empty:
                self.logger.warning(f"Skipping validation for empty dataset: {dataset_name}")
                continue
                
            dataset_report = self._validate_dataset(dataset_name, df)
            quality_report['datasets'][dataset_name] = dataset_report
            
            total_score += dataset_report['quality_score']
            dataset_count += 1
            
            # Collect critical issues
            if dataset_report['critical_issues']:
                quality_report['critical_issues'].extend(
                    [f"{dataset_name}: {issue}" for issue in dataset_report['critical_issues']]
                )
            
            # Collect warnings
            if dataset_report['warnings']:
                quality_report['warnings'].extend(
                    [f"{dataset_name}: {warning}" for warning in dataset_report['warnings']]
                )
        
        if dataset_count > 0:
            quality_report['overall_score'] = total_score / dataset_count
        
        # Log summary
        self.logger.info(
            f"Data quality validation complete - "
            f"Overall score: {quality_report['overall_score']:.2f}, "
            f"Critical issues: {len(quality_report['critical_issues'])}, "
            f"Warnings: {len(quality_report['warnings'])}"
        )
        
        # Check if quality meets threshold
        if quality_report['overall_score'] < self.config.quality_threshold:
            raise Exception(
                f"Data quality score {quality_report['overall_score']:.2f} "
                f"below threshold {self.config.quality_threshold}"
            )
        
        return quality_report
    
    def _validate_dataset(self, dataset_name: str, df: pd.DataFrame) -> Dict[str, Any]:
        """Validate individual dataset with specific rules"""
        report = {
            'quality_score': 1.0,
            'record_count': len(df),
            'column_count': len(df.columns),
            'checks': {},
            'critical_issues': [],
            'warnings': [],
            'column_profiles': {}
        }
        
        rules = self.quality_rules.get(dataset_name, {})
        total_checks = 0
        passed_checks = 0
        
        # Basic structural checks
        total_checks += 1
        if len(df) > 0:
            passed_checks += 1
            report['checks']['non_empty'] = True
        else:
            report['checks']['non_empty'] = False
            report['critical_issues'].append("Dataset is empty")
        
        # Required columns check
        required_columns = rules.get('required_columns', [])
        if required_columns:
            total_checks += 1
            missing_columns = set(required_columns) - set(df.columns)
            if not missing_columns:
                passed_checks += 1
                report['checks']['required_columns'] = True
            else:
                report['checks']['required_columns'] = False
                report['critical_issues'].append(f"Missing required columns: {missing_columns}")
        
        # Data type validation
        expected_types = rules.get('column_types', {})
        for column, expected_type in expected_types.items():
            if column in df.columns:
                total_checks += 1
                actual_type = df[column].dtype
                
                if self._types_compatible(actual_type, expected_type):
                    passed_checks += 1
                    report['checks'][f'{column}_type'] = True
                else:
                    report['checks'][f'{column}_type'] = False
                    report['warnings'].append(
                        f"Column {column} has type {actual_type}, expected {expected_type}"
                    )
        
        # Uniqueness constraints
        unique_columns = rules.get('unique_columns', [])
        for column in unique_columns:
            if column in df.columns:
                total_checks += 1
                duplicates = df[column].duplicated().sum()
                
                if duplicates == 0:
                    passed_checks += 1
                    report['checks'][f'{column}_unique'] = True
                else:
                    report['checks'][f'{column}_unique'] = False
                    report['critical_issues'].append(
                        f"Column {column} has {duplicates} duplicate values"
                    )
        
        # Null value constraints
        non_null_columns = rules.get('non_null_columns', [])
        for column in non_null_columns:
            if column in df.columns:
                total_checks += 1
                null_count = df[column].isna().sum()
                null_percentage = (null_count / len(df)) * 100
                
                if null_count == 0:
                    passed_checks += 1
                    report['checks'][f'{column}_non_null'] = True
                else:
                    report['checks'][f'{column}_non_null'] = False
                    if null_percentage > 5:  # More than 5% nulls is critical
                        report['critical_issues'].append(
                            f"Column {column} has {null_percentage:.1f}% null values"
                        )
                    else:
                        report['warnings'].append(
                            f"Column {column} has {null_percentage:.1f}% null values"
                        )
        
        # Range validations
        ranges = rules.get('value_ranges', {})
        for column, (min_val, max_val) in ranges.items():
            if column in df.columns and df[column].dtype in ['int64', 'float64']:
                total_checks += 1
                out_of_range = ((df[column] < min_val) | (df[column] > max_val)).sum()
                
                if out_of_range == 0:
                    passed_checks += 1
                    report['checks'][f'{column}_range'] = True
                else:
                    report['checks'][f'{column}_range'] = False
                    report['warnings'].append(
                        f"Column {column} has {out_of_range} values outside range [{min_val}, {max_val}]"
                    )
        
        # Pattern validations (e.g., email formats, phone numbers)
        patterns = rules.get('patterns', {})
        for column, pattern in patterns.items():
            if column in df.columns:
                total_checks += 1
                import re
                valid_pattern = df[column].astype(str).str.match(pattern, na=False)
                invalid_count = (~valid_pattern).sum()
                
                if invalid_count == 0:
                    passed_checks += 1
                    report['checks'][f'{column}_pattern'] = True
                else:
                    report['checks'][f'{column}_pattern'] = False
                    report['warnings'].append(
                        f"Column {column} has {invalid_count} values not matching expected pattern"
                    )
        
        # Generate column profiles for monitoring
        for column in df.columns:
            if df[column].dtype in ['int64', 'float64']:
                report['column_profiles'][column] = {
                    'mean': float(df[column].mean()),
                    'median': float(df[column].median()),
                    'std': float(df[column].std()),
                    'min': float(df[column].min()),
                    'max': float(df[column].max()),
                    'null_count': int(df[column].isna().sum())
                }
            else:
                report['column_profiles'][column] = {
                    'unique_count': int(df[column].nunique()),
                    'null_count': int(df[column].isna().sum()),
                    'most_common': df[column].mode().iloc[0] if not df[column].mode().empty else None
                }
        
        # Calculate final quality score
        if total_checks > 0:
            report['quality_score'] = passed_checks / total_checks
        
        return report
    
    def _types_compatible(self, actual_type, expected_type: str) -> bool:
        """Check if data types are compatible"""
        type_mappings = {
            'int': ['int64', 'int32', 'int16', 'int8'],
            'float': ['float64', 'float32', 'int64', 'int32'],
            'string': ['object', 'string'],
            'datetime': ['datetime64[ns]', 'datetime64'],
            'bool': ['bool']
        }
        
        compatible_types = type_mappings.get(expected_type, [expected_type])
        return str(actual_type) in compatible_types
    
    def _load_quality_rules(self) -> Dict[str, Dict]:
        """Load data quality rules from configuration"""
        return {
            'transactions': {
                'required_columns': ['transaction_id', 'user_id', 'amount', 'transaction_date'],
                'column_types': {
                    'transaction_id': 'string',
                    'user_id': 'string',
                    'amount': 'float',
                    'transaction_date': 'datetime'
                },
                'unique_columns': ['transaction_id'],
                'non_null_columns': ['transaction_id', 'user_id', 'amount'],
                'value_ranges': {
                    'amount': (0, 1000000)  # $0 to $1M
                }
            },
            'user_events': {
                'required_columns': ['user_id', 'event_type', 'event_time'],
                'column_types': {
                    'user_id': 'string',
                    'event_type': 'string',
                    'event_time': 'datetime'
                },
                'non_null_columns': ['user_id', 'event_type', 'event_time']
            },
            'product_catalog': {
                'required_columns': ['product_id', 'product_name', 'price'],
                'column_types': {
                    'product_id': 'string',
                    'product_name': 'string',
                    'price': 'float'
                },
                'unique_columns': ['product_id'],
                'non_null_columns': ['product_id', 'product_name', 'price'],
                'value_ranges': {
                    'price': (0, 100000)  # $0 to $100K
                }
            }
        }

class DataValidationStage(PipelineStage):
    """Pipeline stage for data quality validation"""
    
    def __init__(self, config: PipelineConfig):
        super().__init__("validation", config)
        self.validator = DataQualityValidator(config)
    
    def _execute(self, datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
        """Execute data quality validation"""
        quality_report = self.validator.validate_datasets(datasets)
        
        # Store quality report for monitoring
        self._store_quality_report(quality_report)
        
        # Log quality metrics
        self.metrics['records_processed'] = sum(
            report['record_count'] for report in quality_report['datasets'].values()
        )
        
        return datasets  # Pass through the data unchanged
    
    def _store_quality_report(self, quality_report: Dict[str, Any]):
        """Store quality report for trend analysis"""
        try:
            conn = sqlite3.connect(self.config.target_db_path)
            cursor = conn.cursor()
            
            # Create quality reports table if it doesn't exist
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS quality_reports (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TEXT,
                    overall_score REAL,
                    critical_issues INTEGER,
                    warnings INTEGER,
                    report_json TEXT
                )
            """)
            
            cursor.execute("""
                INSERT INTO quality_reports 
                (timestamp, overall_score, critical_issues, warnings, report_json)
                VALUES (?, ?, ?, ?, ?)
            """, (
                quality_report['timestamp'],
                quality_report['overall_score'],
                len(quality_report['critical_issues']),
                len(quality_report['warnings']),
                json.dumps(quality_report)
            ))
            
            conn.commit()
            conn.close()
            
        except Exception as e:
            self.logger.error(f"Failed to store quality report: {str(e)}")

This validation framework provides enterprise-grade data quality monitoring. It checks structural integrity, data types, business rules, and statistical properties. The detailed reporting helps identify data quality trends over time, enabling proactive quality management.

Advanced Data Transformation Engine

Data transformation is where raw data becomes valuable business insights. Our transformation engine handles complex business logic, enrichment operations, and performance optimizations that scale with data volume.

class DataTransformationEngine:
    """Advanced data transformation with optimized operations"""
    
    def __init__(self, config: PipelineConfig):
        self.config = config
        self.logger = logging.getLogger("pipeline.transformation")
        
    def transform_datasets(self, datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
        """Apply comprehensive transformations to create analytics-ready datasets"""
        transformed = {}
        
        # Create base transformed datasets
        if 'transactions' in datasets:
            transformed['transactions_clean'] = self._transform_transactions(datasets['transactions'])
        
        if 'user_events' in datasets:
            transformed['user_events_clean'] = self._transform_user_events(datasets['user_events'])
        
        if 'product_catalog' in datasets:
            transformed['products_clean'] = self._transform_products(datasets['product_catalog'])
        
        # Create enriched analytical datasets
        if 'transactions_clean' in transformed and 'products_clean' in transformed:
            transformed['sales_analytics'] = self._create_sales_analytics(
                transformed['transactions_clean'], 
                transformed['products_clean']
            )
        
        if 'user_events_clean' in transformed:
            transformed['user_behavior_summary'] = self._create_user_behavior_summary(
                transformed['user_events_clean']
            )
        
        # Create cross-dataset insights
        available_datasets = set(transformed.keys())
        required_for_360 = {'transactions_clean', 'user_events_clean', 'products_clean'}
        
        if required_for_360.issubset(available_datasets):
            transformed['customer_360'] = self._create_customer_360_view(
                transformed['transactions_clean'],
                transformed['user_events_clean'],
                transformed['products_clean']
            )
        
        return transformed
    
    def _transform_transactions(self, df: pd.DataFrame) -> pd.DataFrame:
        """Clean and enrich transaction data"""
        if df.empty:
            return df
        
        df = df.copy()
        
        # Standardize currency amounts to USD
        df['amount_usd'] = df.apply(self._convert_to_usd, axis=1)
        
        # Create derived fields
        df['transaction_hour'] = df['transaction_date'].dt.hour
        df['transaction_day_of_week'] = df['transaction_date'].dt.day_name()
        df['transaction_month'] = df['transaction_date'].dt.to_period('M').astype(str)
        
        # Categorize transaction amounts
        df['amount_category'] = pd.cut(
            df['amount_usd'],
            bins=[0, 10, 50, 100, 500, float('inf')],
            labels=['micro', 'small', 'medium', 'large', 'enterprise']
        )
        
        # Calculate days since first transaction per user
        first_transaction = df.groupby('user_id')['transaction_date'].min().reset_index()
        first_transaction.columns = ['user_id', 'first_transaction_date']
        
        df = df.merge(first_transaction, on='user_id', how='left')
        df['days_since_first_transaction'] = (
            df['transaction_date'] - df['first_transaction_date']
        ).dt.days
        
        # Add running totals and counts per user
        df = df.sort_values(['user_id', 'transaction_date'])
        df['user_transaction_count'] = df.groupby('user_id').cumcount() + 1
        df['user_running_total'] = df.groupby('user_id')['amount_usd'].cumsum()
        
        # Flag potential fraudulent transactions
        df['is_potentially_fraudulent'] = self._flag_potential_fraud(df)
        
        # Remove sensitive data that shouldn't be in analytics
        columns_to_keep = [
            'transaction_id', 'user_id', 'product_id', 'amount_usd', 
            'transaction_date', 'payment_method', 'status',
            'transaction_hour', 'transaction_day_of_week', 'transaction_month',
            'amount_category', 'days_since_first_transaction',
            'user_transaction_count', 'user_running_total', 'is_potentially_fraudulent'
        ]
        
        df = df[columns_to_keep]
        
        self.logger.info(f"Transformed {len(df)} transaction records")
        return df
    
    def _transform_user_events(self, df: pd.DataFrame) -> pd.DataFrame:
        """Clean and structure user event data"""
        if df.empty:
            return df
            
        df = df.copy()
        
        # Standardize event types
        event_mapping = {
            'page_view': 'pageview',
            'page-view': 'pageview',
            'product_view': 'product_view',
            'add_to_cart': 'add_to_cart',
            'purchase': 'purchase',
            'sign_up': 'signup',
            'login': 'login',
            'logout': 'logout'
        }
        
        df['event_type'] = df['event_type'].str.lower().replace(event_mapping)
        
        # Extract session information
        df = df.sort_values(['user_id', 'event_time'])
        
        # Create session IDs based on 30-minute inactivity gaps
        df['time_diff'] = df.groupby('user_id')['event_time'].diff()
        df['new_session'] = (df['time_diff'].isna() | (df['time_diff'] > pd.Timedelta(minutes=30)))
        df['session_id'] = df.groupby('user_id')['new_session'].cumsum()
        df['session_id'] = df['user_id'] + '_session_' + df['session_id'].astype(str)
        
        # Add session sequence numbers
        df['event_sequence'] = df.groupby('session_id').cumcount() + 1
        
        # Extract time-based features
        df['event_hour'] = df['event_time'].dt.hour
        df['event_day_of_week'] = df['event_time'].dt.day_name()
        
        # Clean up columns
        columns_to_keep = [
            'user_id', 'event_type', 'event_time', 'session_id', 
            'event_sequence', 'event_hour', 'event_day_of_week'
        ]
        
        # Add any product_id if it exists in the events
        if 'product_id' in df.columns:
            columns_to_keep.append('product_id')
        
        df = df[columns_to_keep]
        
        self.logger.info(f"Transformed {len(df)} user event records into {df['session_id'].nunique()} sessions")
        return df
    
    def _transform_products(self, df: pd.DataFrame) -> pd.DataFrame:
        """Clean and enrich product catalog data"""
        if df.empty:
            return df
            
        df = df.copy()
        
        # Standardize category names
        df['category'] = df['category'].str.lower().str.strip()
        
        # Create price tiers
        df['price_tier'] = pd.cut(
            df['price'],
            bins=[0, 20, 50, 100, 500, float('inf')],
            labels=['budget', 'economy', 'standard', 'premium', 'luxury']
        )
        
        # Extract features from product names (simplified NLP)
        df['name_word_count'] = df['product_name'].str.split().str.len()
        df['name_length'] = df['product_name'].str.len()
        
        # Create product slugs for URLs
        df['product_slug'] = (
            df['product_name']
            .str.lower()
            .str.replace(r'[^a-z0-9\s]', '', regex=True)
            .str.replace(r'\s+', '-', regex=True)
        )
        
        self.logger.info(f"Transformed {len(df)} product records")
        return df
    
    def _create_sales_analytics(self, transactions: pd.DataFrame, products: pd.DataFrame) -> pd.DataFrame:
        """Create enriched sales analytics dataset"""
        if transactions.empty or products.empty:
            return pd.DataFrame()
        
        # Join transactions with product information
        sales = transactions.merge(
            products[['product_id', 'product_name', 'category', 'price', 'price_tier']], 
            on='product_id', 
            how='left'
        )
        
        # Calculate profit margins (simplified - assuming cost is 60% of price)
        sales['estimated_cost'] = sales['price'] * 0.6
        sales['estimated_profit'] = sales['amount_usd'] - sales['estimated_cost']
        sales['profit_margin'] = sales['estimated_profit'] / sales['amount_usd']
        
        # Add discount information
        sales['discount_amount'] = sales['price'] - sales['amount_usd']
        sales['discount_percentage'] = (sales['discount_amount'] / sales['price']) * 100
        sales['is_discounted'] = sales['discount_percentage'] > 0
        
        # Create time-based aggregations
        sales['revenue'] = sales['amount_usd']
        
        self.logger.info(f"Created sales analytics with {len(sales)} enriched records")
        return sales
    
    def _create_user_behavior_summary(self, events: pd.DataFrame) -> pd.DataFrame:
        """Create user behavior summary for analytics"""
        if events.empty:
            return pd.DataFrame()
        
        # Session-level metrics
        session_metrics = events.groupby(['user_id', 'session_id']).agg({
            'event_time': ['min', 'max', 'count'],
            'event_sequence': 'max'
        }).round(2)
        
        session_metrics.columns = ['session_start', 'session_end', 'total_events', 'max_sequence']
        session_metrics['session_duration_minutes'] = (
            (session_metrics['session_end'] - session_metrics['session_start']).dt.total_seconds() / 60
        )
        session_metrics.reset_index(inplace=True)
        
        # User-level aggregations
        user_summary = session_metrics.groupby('user_id').agg({
            'session_id': 'count',
            'total_events': ['sum', 'mean'],
            'session_duration_minutes': ['sum', 'mean']
        }).round(2)
        
        user_summary.columns = [
            'total_sessions', 'total_events', 'avg_events_per_session',
            'total_session_time', 'avg_session_duration'
        ]
        
        # Add event type distributions
        event_types = events.groupby(['user_id', 'event_type']).size().unstack(fill_value=0)
        
        user_summary = user_summary.join(event_types, how='left')
        user_summary = user_summary.fillna(0)
        
        user_summary.reset_index(inplace=True)
        
        self.logger.info(f"Created behavior summary for {len(user_summary)} users")
        return user_summary
    
    def _create_customer_360_view(self, transactions: pd.DataFrame, events: pd.DataFrame, products: pd.DataFrame) -> pd.DataFrame:
        """Create comprehensive customer 360-degree view"""
        if transactions.empty:
            return pd.DataFrame()
        
        # Transaction aggregations per user
        transaction_summary = transactions.groupby('user_id').agg({
            'transaction_id': 'count',
            'amount_usd': ['sum', 'mean', 'std'],
            'transaction_date': ['min', 'max'],
            'days_since_first_transaction': 'max',
            'is_potentially_fraudulent': 'sum'
        }).round(2)
        
        transaction_summary.columns = [
            'total_transactions', 'total_spent', 'avg_transaction_amount', 'transaction_amount_std',
            'first_transaction_date', 'last_transaction_date', 'customer_lifetime_days', 'fraud_flags'
        ]
        
        # Calculate customer lifetime value and frequency
        transaction_summary['avg_days_between_transactions'] = (
            transaction_summary['customer_lifetime_days'] / transaction_summary['total_transactions']
        )
        transaction_summary['clv_estimate'] = (
            transaction_summary['avg_transaction_amount'] * 
            (365 / transaction_summary['avg_days_between_transactions'].clip(lower=1))
        )
        
        # Behavioral data (if available)
        if not events.empty:
            behavior_summary = events.groupby('user_id').agg({
                'session_id': 'nunique',
                'event_type': 'count'
            })
            behavior_summary.columns = ['total_sessions', 'total_events']
            
            transaction_summary = transaction_summary.join(behavior_summary, how='left')
        
        # Customer segmentation based on RFM analysis
        transaction_summary['recency_days'] = (
            datetime.now() - pd.to_datetime(transaction_summary['last_transaction_date'])
        ).dt.days
        
        # Create RFM segments
        transaction_summary['recency_score'] = pd.qcut(
            transaction_summary['recency_days'].rank(method='first'), 
            5, labels=[5,4,3,2,1]
        ).astype(int)
        
        transaction_summary['frequency_score'] = pd.qcut(
            transaction_summary['total_transactions'].rank(method='first'), 
            5, labels=[1,2,3,4,5]
        ).astype(int)
        
        transaction_summary['monetary_score'] = pd.qcut(
            transaction_summary['total_spent'].rank(method='first'), 
            5, labels=[1,2,3,4,5]
        ).astype(int)
        
        transaction_summary['rfm_segment'] = (
            transaction_summary['recency_score'].astype(str) + 
            transaction_summary['frequency_score'].astype(str) + 
            transaction_summary['monetary_score'].astype(str)
        )
        
        # Add customer category labels
        def categorize_customer(row):
            r, f, m = row['recency_score'], row['frequency_score'], row['monetary_score']
            if r >= 4 and f >= 4 and m >= 4:
                return 'Champions'
            elif r >= 3 and f >= 3 and m >= 3:
                return 'Loyal Customers'
            elif r >= 4 and f >= 2 and m >= 2:
                return 'Potential Loyalists'
            elif r >= 4 and f <= 2 and m <= 2:
                return 'New Customers'
            elif r <= 2 and f >= 3 and m >= 3:
                return 'At Risk'
            elif r <= 2 and f <= 2 and m >= 3:
                return 'Cannot Lose Them'
            else:
                return 'Others'
        
        transaction_summary['customer_category'] = transaction_summary.apply(categorize_customer, axis=1)
        
        transaction_summary.reset_index(inplace=True)
        
        self.logger.info(f"Created 360-degree view for {len(transaction_summary)} customers")
        return transaction_summary
    
    def _convert_to_usd(self, row) -> float:
        """Convert currency amounts to USD (simplified conversion)"""
        # In production, you'd use real-time exchange rates
        exchange_rates = {
            'USD': 1.0,
            'EUR': 1.1,
            'GBP': 1.25,
            'JPY': 0.0091,
            'CAD': 0.78
        }
        
        currency = row.get('currency', 'USD')
        amount = row.get('amount', 0)
        
        return amount * exchange_rates.get(currency, 1.0)
    
    def _flag_potential_fraud(self, df: pd.DataFrame) -> pd.Series:
        """Simple fraud detection flags"""
        fraud_flags = pd.Series(False, index=df.index)
        
        # Flag unusually large transactions (> 99th percentile)
        if len(df) > 100:  # Need sufficient data for percentiles
            high_amount_threshold = df['amount_usd'].quantile(0.99)
            fraud_flags |= (df['amount_usd'] > high_amount_threshold)
        
        # Flag multiple transactions in short time windows
        df_sorted = df.sort_values(['user_id', 'transaction_date'])
        time_diff = df_sorted.groupby('user_id')['transaction_date'].diff()
        rapid_transactions = time_diff < pd.Timedelta(minutes=1)
        fraud_flags |= rapid_transactions
        
        return fraud_flags

class DataTransformationStage(PipelineStage):
    """Pipeline stage for data transformation"""
    
    def __init__(self, config: PipelineConfig):
        super().__init__("transformation", config)
        self.transformation_engine = DataTransformationEngine(config)
    
    def _execute(self, datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
        """Execute data transformations"""
        transformed_datasets = self.transformation_engine.transform_datasets(datasets)
        
        # Calculate metrics
        self.metrics['records_processed'] = sum(
            len(df) for df in transformed_datasets.values()
        )
        
        return transformed_datasets

The transformation engine demonstrates production-grade data processing patterns. It handles currency conversion, session analysis, customer segmentation, and fraud detection. The modular design makes it easy to add new transformations as business requirements evolve.

Production Data Storage and Output Management

Data storage in production pipelines requires careful consideration of performance, reliability, and accessibility. Our output stage supports multiple storage formats and destinations while maintaining data lineage and providing rollback capabilities.

class DataOutputStage(PipelineStage):
    """Handles multiple output destinations with transaction safety"""
    
    def __init__(self, config: PipelineConfig):
        super().__init__("output", config)
        self.output_configs = self._load_output_configurations()
        
    def _execute(self, datasets: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
        """Execute data output to multiple destinations"""
        output_results = {}
        
        for dataset_name, df in datasets.items():
            if df.empty:
                self.logger.info(f"Skipping empty dataset: {dataset_name}")
                continue
            
            dataset_outputs = self.output_configs.get(dataset_name, [])
            output_results[dataset_name] = {}
            
            for output_config in dataset_outputs:
                try:
                    result = self._write_dataset(df, output_config)
                    output_results[dataset_name][output_config['name']] = result
                    self.logger.info(f"Successfully wrote {dataset_name} to {output_config['name']}")
                    
                except Exception as e:
                    self.logger.error(f"Failed to write {dataset_name} to {output_config['name']}: {str(e)}")
                    self.metrics['errors'] += 1
                    
                    # Continue with other outputs even if one fails
                    output_results[dataset_name][output_config['name']] = {
                        'success': False, 
                        'error': str(e)
                    }
        
        # Update pipeline state tracking
        self._update_pipeline_state(datasets)
        
        # Calculate metrics
        self.metrics['records_processed'] = sum(len(df) for df in datasets.values())
        
        return output_results
    
    def _write_dataset(self, df: pd.DataFrame, output_config: Dict[str, Any]) -> Dict[str, Any]:
        """Write dataset to specified output destination"""
        output_type = output_config['type']
        
        if output_type == 'sqlite':
            return self._write_to_sqlite(df, output_config)
        elif output_type == 'csv':
            return self._write_to_csv(df, output_config)
        elif output_type == 'parquet':
            return self._write_to_parquet(df, output_config)
        elif output_type == 'json':
            return self._write_to_json(df, output_config)
        elif output_type == 'api':
            return self._write_to_api(df, output_config)
        else:
            raise ValueError(f"Unsupported output type: {output_type}")
    
    def _write_to_sqlite(self, df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
        """Write to SQLite database with transaction safety"""
        db_path = config['path']
        table_name = config['table']
        write_mode = config.get('mode', 'replace')  # replace, append, or upsert
        
        # Create backup of existing data if replacing
        if write_mode == 'replace':
            self._backup_sqlite_table(db_path, table_name)
        
        try:
            conn = sqlite3.connect(db_path)
            
            if write_mode == 'upsert':
                # For upsert, we need to handle conflicts
                self._upsert_to_sqlite(df, conn, table_name, config.get('key_columns', []))
            else:
                # Standard pandas to_sql
                df.to_sql(
                    table_name, 
                    conn, 
                    if_exists='replace' if write_mode == 'replace' else 'append',
                    index=False,
                    method='multi',  # Faster bulk inserts
                    chunksize=self.config.batch_size
                )
            
            # Create indexes for performance
            indexes = config.get('indexes', [])
            for index_config in indexes:
                self._create_index(conn, table_name, index_config)
            
            conn.close()
            
            return {
                'success': True,
                'records_written': len(df),
                'destination': f"{db_path}:{table_name}"
            }
            
        except Exception as e:
            # Restore backup if replace operation failed
            if write_mode == 'replace':
                self._restore_sqlite_backup(db_path, table_name)
            raise e
    
    def _write_to_parquet(self, df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
        """Write to Parquet format with partitioning support"""
        file_path = config['path']
        partition_columns = config.get('partition_columns', [])
        
        # Ensure directory exists
        Path(file_path).parent.mkdir(parents=True, exist_ok=True)
        
        if partition_columns:
            # Write with partitioning
            df.to_parquet(
                file_path,
                partition_cols=partition_columns,
                engine='pyarrow',
                compression='snappy'
            )
        else:
            # Single file output
            df.to_parquet(
                file_path,
                engine='pyarrow',
                compression='snappy',
                index=False
            )
        
        # Calculate file size for monitoring
        if Path(file_path).is_file():
            file_size = Path(file_path).stat().st_size
        else:
            # For partitioned datasets, calculate total size
            file_size = sum(f.stat().st_size for f in Path(file_path).rglob('*.parquet'))
        
        return {
            'success': True,
            'records_written': len(df),
            'file_size_bytes': file_size,
            'destination': file_path
        }
    
    def _write_to_csv(self, df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
        """Write to CSV with proper encoding and formatting"""
        file_path = config['path']
        
        # Add timestamp to filename if configured
        if config.get('timestamped', False):
            path_obj = Path(file_path)
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            file_path = path_obj.parent / f"{path_obj.stem}_{timestamp}{path_obj.suffix}"
        
        # Ensure directory exists
        Path(file_path).parent.mkdir(parents=True, exist_ok=True)
        
        df.to_csv(
            file_path,
            index=False,
            encoding='utf-8',
            date_format='%Y-%m-%d %H:%M:%S'
        )
        
        file_size = Path(file_path).stat().st_size
        
        return {
            'success': True,
            'records_written': len(df),
            'file_size_bytes': file_size,
            'destination': str(file_path)
        }
    
    def _write_to_json(self, df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
        """Write to JSON format with proper serialization"""
        file_path = config['path']
        
        # Ensure directory exists
        Path(file_path).parent.mkdir(parents=True, exist_ok=True)
        
        # Convert timestamps to ISO format for JSON serialization
        df_json = df.copy()
        for col in df_json.columns:
            if df_json[col].dtype == 'datetime64[ns]':
                df_json[col] = df_json[col].dt.strftime('%Y-%m-%dT%H:%M:%S')
        
        # Write as JSON lines format for better streaming support
        if config.get('format') == 'jsonl':
            df_json.to_json(file_path, orient='records', lines=True, date_format='iso')
        else:
            df_json.to_json(file_path, orient='records', indent=2, date_format='iso')
        
        file_size = Path(file_path).stat().st_size
        
        return {
            'success': True,
            'records_written': len(df),
            'file_size_bytes': file_size,
            'destination': file_path
        }
    
    def _write_to_api(self, df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
        """Write to API endpoint with batching and retry logic"""
        endpoint_url = config['url']
        batch_size = config.get('batch_size', 100)
        headers = config.get('headers', {})
        
        total_sent = 0
        session = requests.Session()
        session.headers.update(headers)
        
        # Process in batches
        for i in range(0, len(df), batch_size):
            batch_df = df.iloc[i:i+batch_size]
            
            # Convert batch to JSON
            batch_data = batch_df.to_dict(orient='records')
            
            # Send batch with retry logic
            for attempt in range(self.config.max_retries):
                try:
                    response = session.post(
                        endpoint_url,
                        json={'data': batch_data, 'batch_id': i // batch_size},
                        timeout=30
                    )
                    response.raise_for_status()
                    total_sent += len(batch_data)
                    break
                    
                except requests.RequestException as e:
                    if attempt == self.config.max_retries - 1:
                        raise e
                    time.sleep(self.config.retry_delay * (2 ** attempt))
        
        return {
            'success': True,
            'records_written': total_sent,
            'destination': endpoint_url
        }
    
    def _upsert_to_sqlite(self, df: pd.DataFrame, conn: sqlite3.Connection, table_name: str, key_columns: List[str]):
        """Perform upsert operation (insert or update) to SQLite"""
        if not key_columns:
            # If no key columns specified, just append
            df.to_sql(table_name, conn, if_exists='append', index=False)
            return
        
        # Create temporary table
        temp_table = f"{table_name}_temp"
        df.to_sql(temp_table, conn, if_exists='replace', index=False)
        
        # Get column names
        columns = df.columns.tolist()
        key_columns_str = ', '.join(key_columns)
        
        # Build the upsert query
        set_clauses = [f"{col} = excluded.{col}" for col in columns if col not in key_columns]
        
        upsert_query = f"""
        INSERT INTO {table_name} ({', '.join(columns)})
        SELECT {', '.join(columns)} FROM {temp_table}
        ON CONFLICT({key_columns_str}) DO UPDATE SET
        {', '.join(set_clauses)}
        """
        
        conn.execute(upsert_query)
        conn.execute(f"DROP TABLE {temp_table}")
        conn.commit()
    
    def _backup_sqlite_table(self, db_path: str, table_name: str):
        """Create backup of SQLite table before replacement"""
        try:
            conn = sqlite3.connect(db_path)
            cursor = conn.cursor()
            
            # Check if table exists
            cursor.execute("""
                SELECT name FROM sqlite_master 
                WHERE type='table' AND name=?
            """, (table_name,))
            
            if cursor.fetchone():
                backup_table = f"{table_name}_backup_{int(time.time())}"
                cursor.execute(f"""
                    CREATE TABLE {backup_table} AS 
                    SELECT * FROM {table_name}
                """)
                self.logger.info(f"Created backup table: {backup_table}")
            
            conn.close()
            
        except Exception as e:
            self.logger.warning(f"Failed to create backup: {str(e)}")
    
    def _restore_sqlite_backup(self, db_path: str, table_name: str):
        """Restore SQLite table from most recent backup"""
        try:
            conn = sqlite3.connect(db_path)
            cursor = conn.cursor()
            
            # Find most recent backup
            cursor.execute("""
                SELECT name FROM sqlite_master 
                WHERE type='table' AND name LIKE ?
                ORDER BY name DESC
                LIMIT 1
            """, (f"{table_name}_backup_%",))
            
            backup_table = cursor.fetchone()
            if backup_table:
                backup_name = backup_table[0]
                cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
                cursor.execute(f"ALTER TABLE {backup_name} RENAME TO {table_name}")
                self.logger.info(f"Restored table {table_name} from {backup_name}")
            
            conn.close()
            
        except Exception as e:
            self.logger.error(f"Failed to restore backup: {str(e)}")
    
    def _create_index(self, conn: sqlite3.Connection, table_name: str, index_config: Dict[str, Any]):
        """Create database index for performance"""
        try:
            index_name = index_config['name']
            columns = index_config['columns']
            unique = index_config.get('unique', False)
            
            unique_clause = "UNIQUE " if unique else ""
            columns_str = ', '.join(columns)
            
            create_index_sql = f"""
            CREATE {unique_clause}INDEX IF NOT EXISTS {index_name} 
            ON {table_name} ({columns_str})
            """
            
            conn.execute(create_index_sql)
            
        except Exception as e:
            self.logger.warning(f"Failed to create index: {str(e)}")
    
    def _update_pipeline_state(self, datasets: Dict[str, pd.DataFrame]):
        """Update pipeline state for incremental processing"""
        try:
            conn = sqlite3.connect(self.config.target_db_path)
            cursor = conn.cursor()
            
            # Create state table if it doesn't exist
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS pipeline_state (
                    source_name TEXT PRIMARY KEY,
                    last_processed_at TEXT,
                    records_processed INTEGER,
                    updated_at TEXT
                )
            """)
            
            current_time = datetime.now().isoformat()
            
            for dataset_name, df in datasets.items():
                if df.empty:
                    continue
                
                # Get the latest timestamp from the dataset
                timestamp_columns = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()]
                
                if timestamp_columns:
                    latest_timestamp = df[timestamp_columns[0]].max()
                    if pd.isna(latest_timestamp):
                        latest_timestamp = current_time
                    else:
                        latest_timestamp = latest_timestamp.isoformat()
                else:
                    latest_timestamp = current_time
                
                cursor.execute("""
                    INSERT OR REPLACE INTO pipeline_state 
                    (source_name, last_processed_at, records_processed, updated_at)
                    VALUES (?, ?, ?, ?)
                """, (dataset_name, latest_timestamp, len(df), current_time))
            
            conn.commit()
            conn.close()
            
        except Exception as e:
            self.logger.error(f"Failed to update pipeline state: {str(e)}")
    
    def _load_output_configurations(self) -> Dict[str, List[Dict[str, Any]]]:
        """Load output configurations for each dataset"""
        return {
            'transactions_clean': [
                {
                    'name': 'analytics_db',
                    'type': 'sqlite',
                    'path': self.config.target_db_path,
                    'table': 'transactions',
                    'mode': 'replace',
                    'indexes': [
                        {'name': 'idx_user_id', 'columns': ['user_id']},
                        {'name': 'idx_transaction_date', 'columns': ['transaction_date']}
                    ]
                },
                {
                    'name': 'parquet_archive',
                    'type

Learning Path: Data Pipeline Fundamentals

Previous

What is a Data Pipeline? Architecture and Core Concepts for Data Engineers

Related Articles

Data Engineering⚡ Practitioner

What is a Data Pipeline? Architecture and Core Concepts for Data Engineers

19 min
Data Engineering⚡ Practitioner

Introduction to dbt (Data Build Tool)

20 min
Data Engineering🌱 Foundation

ETL vs ELT: Choosing the Right Approach

17 min

On this page

  • Prerequisites
  • Understanding Data Pipeline Architecture
  • Building the Pipeline Foundation
  • Advanced Data Ingestion Strategies
  • Implementing Data Quality Validation
  • Advanced Data Transformation Engine
  • Production Data Storage and Output Management