Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Incremental Loading Patterns: Timestamps, CDC, and Watermarks

Incremental Loading Patterns: Timestamps, CDC, and Watermarks

Data Engineering⚡ Practitioner23 min readApr 5, 2026Updated Apr 5, 2026
Table of Contents
  • Prerequisites
  • The Timestamp Trap: Beyond Simple Last Modified
  • Problem 1: Clock Skew and Late Arrivals
  • Problem 2: The Boundary Precision Problem
  • Problem 3: Updates vs. Deletes
  • Change Data Capture: Event-Driven Precision
  • CDC Implementation Patterns
  • Handling CDC Complexity: Transactions and Ordering
  • CDC Deduplication Strategies
  • Watermarks: Balancing Completeness and Latency
  • Understanding Watermark Semantics
  • Implementing Sliding Window Aggregations with Watermarks

You're staring at a data pipeline that worked perfectly for months, until it didn't. Your customer analytics dashboard is showing stale data from three days ago, your CDC stream is producing duplicates, and the timestamp-based incremental load that seemed so elegant is now missing records. Sound familiar? You're experiencing the sharp edge of incremental loading at scale.

The promise of incremental loading is seductive: only process what's changed, save compute costs, reduce latency, and keep your data fresh. But as data volumes grow and systems become more distributed, the simple "last modified timestamp" approach starts to crack under pressure. Network delays create gaps in your watermarks, clock skew between systems causes phantom duplicates, and suddenly your "real-time" pipeline is neither real nor reliable.

This isn't just about picking the right tool – it's about understanding the fundamental trade-offs between consistency, performance, and operational complexity. By the end of this lesson, you'll understand not just how to implement these patterns, but when each approach breaks down and how to recover gracefully.

What you'll learn:

  • Design timestamp-based incremental loading that handles clock skew and late-arriving data
  • Implement Change Data Capture (CDC) patterns with proper duplicate handling and ordering guarantees
  • Build watermark systems that balance latency with completeness
  • Troubleshoot common failure modes in each pattern
  • Choose the right approach based on your consistency and latency requirements

Prerequisites

You should be comfortable with:

  • SQL and basic data warehousing concepts
  • Stream processing fundamentals (Kafka, event ordering)
  • Understanding of ACID properties and eventual consistency
  • Basic Python or SQL for data pipeline scripting

The Timestamp Trap: Beyond Simple Last Modified

Let's start with what seems like the simplest approach: timestamp-based incremental loading. You've probably seen code like this:

-- The deceptively simple approach
SELECT * FROM customer_orders 
WHERE updated_at > '2024-01-15 14:30:00'
  AND updated_at <= '2024-01-15 15:30:00';

This works beautifully in development and early production. But in distributed systems with real load, this pattern has three critical failure modes that will eventually bite you.

Problem 1: Clock Skew and Late Arrivals

Your application servers aren't perfectly synchronized. Server A might be 30 seconds ahead of Server B, and your database server might be somewhere in between. When you query for records with updated_at > last_processed_timestamp, you're making a dangerous assumption: that timestamps reflect the actual order of events.

Here's what really happens:

# What you think is happening:
# 14:30:00 - Record A created
# 14:30:05 - Record B created  
# 14:30:10 - Your query runs, captures both

# What actually happens:
# 14:30:00 - Record A created (Server clock: 14:29:45)
# 14:30:05 - Record B created (Server clock: 14:30:20)
# 14:30:10 - Your query runs
# 14:30:15 - Record A finally commits to DB (due to network delay)
# Result: You miss Record A entirely

The solution is to build in a lag window and implement overlap detection:

import datetime as dt
from typing import Optional

class TimestampBasedLoader:
    def __init__(self, lag_seconds: int = 300, overlap_seconds: int = 60):
        self.lag_seconds = lag_seconds  # 5-minute safety window
        self.overlap_seconds = overlap_seconds  # 1-minute overlap for safety
        
    def get_next_window(self, last_processed: Optional[dt.datetime] = None) -> tuple:
        """
        Calculate the next processing window with built-in lag and overlap.
        """
        now = dt.datetime.utcnow()
        # Never process data newer than lag_seconds ago
        safe_end_time = now - dt.timedelta(seconds=self.lag_seconds)
        
        if last_processed is None:
            # First run - start from 24 hours ago
            start_time = safe_end_time - dt.timedelta(days=1)
        else:
            # Overlap with previous window to catch late arrivals
            start_time = last_processed - dt.timedelta(seconds=self.overlap_seconds)
            
        return start_time, safe_end_time
    
    def load_incremental(self, start_time: dt.datetime, end_time: dt.datetime):
        query = """
        SELECT order_id, customer_id, total_amount, updated_at,
               -- Track the actual processing time for debugging
               CURRENT_TIMESTAMP as processed_at
        FROM customer_orders 
        WHERE updated_at > %s 
          AND updated_at <= %s
        ORDER BY updated_at, order_id  -- Deterministic ordering crucial
        """
        
        return execute_query(query, (start_time, end_time))

Problem 2: The Boundary Precision Problem

Most timestamp columns aren't as precise as you think. Your database might store timestamps with second precision, but multiple records could have the exact same timestamp. If your last processed timestamp is 2024-01-15 14:30:00 and you query for updated_at > '2024-01-15 14:30:00', you'll miss any other records that were also updated at exactly 14:30:00.

-- Dangerous: misses records with same timestamp
SELECT * FROM orders WHERE updated_at > '2024-01-15 14:30:00';

-- Better: use >= with additional logic
SELECT * FROM orders 
WHERE updated_at >= '2024-01-15 14:30:00'
  AND NOT (updated_at = '2024-01-15 14:30:00' AND order_id <= 12345);

The robust solution uses a composite bookmark that tracks both timestamp and primary key:

class CompositeBookmark:
    def __init__(self, timestamp: dt.datetime, last_id: int):
        self.timestamp = timestamp
        self.last_id = last_id
        
    def to_dict(self):
        return {
            'timestamp': self.timestamp.isoformat(),
            'last_id': self.last_id
        }
    
    @classmethod
    def from_dict(cls, data):
        return cls(
            timestamp=dt.datetime.fromisoformat(data['timestamp']),
            last_id=data['last_id']
        )

def build_incremental_query(bookmark: CompositeBookmark):
    return """
    SELECT order_id, customer_id, total_amount, updated_at
    FROM customer_orders 
    WHERE (updated_at > %(timestamp)s)
       OR (updated_at = %(timestamp)s AND order_id > %(last_id)s)
    ORDER BY updated_at, order_id
    LIMIT 10000
    """, {
        'timestamp': bookmark.timestamp,
        'last_id': bookmark.last_id
    }

Problem 3: Updates vs. Deletes

Timestamp-based loading naturally captures inserts and updates, but what about deletes? The record is gone – there's no timestamp to query. You need a separate strategy:

-- Option 1: Soft deletes (if you control the schema)
ALTER TABLE customer_orders ADD COLUMN deleted_at TIMESTAMP NULL;

-- Option 2: Separate audit table
CREATE TABLE customer_orders_audit (
    order_id BIGINT,
    operation_type VARCHAR(10), -- 'INSERT', 'UPDATE', 'DELETE' 
    operation_timestamp TIMESTAMP,
    old_values JSONB,
    new_values JSONB
);

Pro tip: If you can't modify the source schema, consider implementing a "full vs. incremental reconciliation" pattern. Run incremental loads for speed, but periodically do full comparisons to catch deletes and fix any consistency issues.

Change Data Capture: Event-Driven Precision

When timestamp-based loading isn't precise enough, Change Data Capture (CDC) gives you event-level accuracy. Instead of polling for changes, CDC streams every insert, update, and delete as it happens. But with great power comes great complexity.

CDC Implementation Patterns

Modern CDC tools like Debezium, AWS DMS, or cloud-native solutions like BigQuery Change Streams provide the event stream, but you still need to handle the events correctly:

from dataclasses import dataclass
from typing import Dict, Any, Optional
import json

@dataclass
class CDCEvent:
    table_name: str
    operation: str  # 'INSERT', 'UPDATE', 'DELETE'
    primary_key: Dict[str, Any]
    before: Optional[Dict[str, Any]]  # Pre-change values
    after: Optional[Dict[str, Any]]   # Post-change values
    transaction_id: str
    lsn: int  # Log Sequence Number for ordering
    timestamp: dt.datetime

class CDCProcessor:
    def __init__(self):
        self.processed_lsn = self.load_checkpoint()
        
    def process_event(self, event: CDCEvent) -> Optional[Dict[str, Any]]:
        """
        Convert CDC event into target table format.
        Returns None if event should be skipped.
        """
        # Ensure we don't process events out of order
        if event.lsn <= self.processed_lsn:
            print(f"Skipping already processed event: {event.lsn}")
            return None
            
        if event.operation == 'INSERT':
            return self._handle_insert(event)
        elif event.operation == 'UPDATE':
            return self._handle_update(event)
        elif event.operation == 'DELETE':
            return self._handle_delete(event)
        else:
            raise ValueError(f"Unknown operation: {event.operation}")
    
    def _handle_insert(self, event: CDCEvent) -> Dict[str, Any]:
        record = event.after.copy()
        record['_cdc_operation'] = 'INSERT'
        record['_cdc_timestamp'] = event.timestamp
        record['_cdc_lsn'] = event.lsn
        return record
    
    def _handle_update(self, event: CDCEvent) -> Dict[str, Any]:
        # For updates, we typically want the final state
        record = event.after.copy()
        record['_cdc_operation'] = 'UPDATE'
        record['_cdc_timestamp'] = event.timestamp
        record['_cdc_lsn'] = event.lsn
        
        # Optionally track what changed
        changed_fields = []
        for key in event.after:
            if event.before.get(key) != event.after.get(key):
                changed_fields.append(key)
        record['_cdc_changed_fields'] = changed_fields
        
        return record
    
    def _handle_delete(self, event: CDCEvent) -> Dict[str, Any]:
        # For deletes, we only have the before state
        record = event.before.copy()
        record['_cdc_operation'] = 'DELETE'
        record['_cdc_timestamp'] = event.timestamp
        record['_cdc_lsn'] = event.lsn
        return record

Handling CDC Complexity: Transactions and Ordering

CDC streams give you raw database events, but databases are complex. A single business transaction might generate dozens of CDC events across multiple tables. These events must be processed in the correct order to maintain consistency.

from collections import defaultdict
from typing import List

class TransactionAwareProcessor:
    def __init__(self):
        self.pending_transactions = defaultdict(list)
        self.committed_transactions = set()
        
    def process_event_stream(self, events: List[CDCEvent]):
        """
        Process a batch of CDC events, handling transaction boundaries.
        """
        for event in sorted(events, key=lambda e: e.lsn):
            if event.operation == 'BEGIN':
                # Transaction start marker
                self.pending_transactions[event.transaction_id] = []
            elif event.operation == 'COMMIT':
                # Process all events in this transaction
                tx_events = self.pending_transactions.pop(event.transaction_id)
                self._process_transaction(tx_events)
                self.committed_transactions.add(event.transaction_id)
            else:
                # Regular data change event
                self.pending_transactions[event.transaction_id].append(event)
    
    def _process_transaction(self, events: List[CDCEvent]):
        """
        Process all events in a transaction as an atomic unit.
        """
        # Group events by table to handle foreign key dependencies
        events_by_table = defaultdict(list)
        for event in events:
            events_by_table[event.table_name].append(event)
        
        # Process in dependency order (customers before orders)
        table_order = ['customers', 'orders', 'order_items']
        
        for table_name in table_order:
            if table_name in events_by_table:
                for event in events_by_table[table_name]:
                    processed_record = self.process_event(event)
                    if processed_record:
                        self._write_to_target(table_name, processed_record)

CDC Deduplication Strategies

CDC streams can produce duplicates due to retries, network issues, or connector restarts. Your processing logic must be idempotent:

class IdempotentCDCProcessor:
    def __init__(self, target_db):
        self.target_db = target_db
        
    def upsert_record(self, table_name: str, record: Dict[str, Any]):
        """
        Idempotent upsert that handles duplicates gracefully.
        """
        # Extract primary key for conflict resolution
        pk_fields = self._get_primary_key_fields(table_name)
        pk_values = {field: record[field] for field in pk_fields}
        
        if record['_cdc_operation'] == 'DELETE':
            # For deletes, we might receive the same delete event multiple times
            query = f"""
            DELETE FROM {table_name} 
            WHERE {' AND '.join(f'{k} = %({k})s' for k in pk_values.keys())}
              AND (_cdc_lsn < %(lsn)s OR _cdc_lsn IS NULL)
            """
            params = {**pk_values, 'lsn': record['_cdc_lsn']}
            
        else:
            # For inserts/updates, use LSN to ensure we don't regress
            fields = [k for k in record.keys() if not k.startswith('_pk_')]
            
            query = f"""
            INSERT INTO {table_name} ({', '.join(fields)})
            VALUES ({', '.join(f'%({f})s' for f in fields)})
            ON CONFLICT ({', '.join(pk_fields)}) 
            DO UPDATE SET
                {', '.join(f'{f} = EXCLUDED.{f}' for f in fields if f not in pk_fields)},
                _cdc_lsn = EXCLUDED._cdc_lsn,
                _cdc_timestamp = EXCLUDED._cdc_timestamp
            WHERE EXCLUDED._cdc_lsn > {table_name}._cdc_lsn
            """
            params = record
            
        return self.target_db.execute(query, params)

Watermarks: Balancing Completeness and Latency

Watermarks represent a fundamentally different approach to incremental processing. Instead of asking "what changed since timestamp X?", watermarks ask "what can we safely say is complete up to timestamp Y?" This shift in thinking is crucial for handling late-arriving data in distributed systems.

Understanding Watermark Semantics

A watermark is a promise: "No more events will arrive with timestamps earlier than this value." This promise lets you trigger computations, close time windows, and emit final results. But the key insight is that watermarks trade latency for completeness.

from dataclasses import dataclass
import heapq
from typing import Dict, List, Optional

@dataclass
class DataEvent:
    event_time: dt.datetime  # When the event actually occurred
    processing_time: dt.datetime  # When we received it
    partition: str
    data: Dict[str, Any]

class WatermarkManager:
    def __init__(self, max_out_of_order_minutes: int = 10):
        self.max_out_of_order = dt.timedelta(minutes=max_out_of_order_minutes)
        self.partition_watermarks = {}
        self.pending_events = []  # Min-heap ordered by event_time
        
    def process_event(self, event: DataEvent) -> Optional[List[DataEvent]]:
        """
        Process an incoming event and return any events that are now
        safe to process based on updated watermarks.
        """
        # Update watermark for this partition
        processing_time = dt.datetime.utcnow()
        partition_watermark = processing_time - self.max_out_of_order
        self.partition_watermarks[event.partition] = partition_watermark
        
        # Add event to pending queue
        heapq.heappush(self.pending_events, (event.event_time, event))
        
        # Calculate global watermark (minimum across all partitions)
        if not self.partition_watermarks:
            return []
            
        global_watermark = min(self.partition_watermarks.values())
        
        # Release events that are now safe to process
        safe_events = []
        while (self.pending_events and 
               self.pending_events[0][0] <= global_watermark):
            _, safe_event = heapq.heappop(self.pending_events)
            safe_events.append(safe_event)
            
        return safe_events if safe_events else None

Implementing Sliding Window Aggregations with Watermarks

Watermarks become most powerful when combined with time-windowed operations. Here's how to implement a sliding window aggregation that handles late arrivals gracefully:

from collections import defaultdict, deque
from typing import Dict, List, NamedTuple

class WindowedAggregator:
    def __init__(self, window_minutes: int = 60, slide_minutes: int = 15):
        self.window_size = dt.timedelta(minutes=window_minutes)
        self.slide_interval = dt.timedelta(minutes=slide_minutes)
        
        # Track active windows: timestamp -> {aggregation_state}
        self.active_windows = {}
        
        # Track events by window for late arrival handling
        self.window_events = defaultdict(list)
        
    def add_event(self, event: DataEvent, current_watermark: dt.datetime) -> List[Dict]:
        """
        Add event to appropriate windows and return any completed windows.
        """
        completed_windows = []
        
        # Determine which windows this event belongs to
        affected_windows = self._get_affected_windows(event.event_time)
        
        for window_start in affected_windows:
            window_end = window_start + self.window_size
            
            # Initialize window if needed
            if window_start not in self.active_windows:
                self.active_windows[window_start] = {
                    'start': window_start,
                    'end': window_end,
                    'count': 0,
                    'sum_amount': 0.0,
                    'unique_customers': set()
                }
            
            # Add event to window
            window = self.active_windows[window_start]
            window['count'] += 1
            window['sum_amount'] += event.data.get('amount', 0)
            window['unique_customers'].add(event.data.get('customer_id'))
            
            # Track event for potential late arrival adjustments
            self.window_events[window_start].append(event)
            
            # Check if window is complete (watermark has passed window end)
            if current_watermark >= window_end:
                completed_window = self._finalize_window(window_start)
                if completed_window:
                    completed_windows.append(completed_window)
        
        return completed_windows
    
    def _get_affected_windows(self, event_time: dt.datetime) -> List[dt.datetime]:
        """
        Calculate which sliding windows this event belongs to.
        """
        windows = []
        
        # Find the latest window start that includes this event
        latest_window_start = self._floor_to_slide_boundary(
            event_time - self.window_size + self.slide_interval
        )
        
        # Generate all windows that include this event
        current_window = latest_window_start
        while current_window <= event_time:
            if current_window + self.window_size > event_time:
                windows.append(current_window)
            current_window += self.slide_interval
            
        return windows
    
    def _floor_to_slide_boundary(self, timestamp: dt.datetime) -> dt.datetime:
        """
        Floor timestamp to slide interval boundary.
        """
        minutes_since_epoch = int(timestamp.timestamp() / 60)
        slide_minutes = int(self.slide_interval.total_seconds() / 60)
        floored_minutes = (minutes_since_epoch // slide_minutes) * slide_minutes
        return dt.datetime.fromtimestamp(floored_minutes * 60)
    
    def _finalize_window(self, window_start: dt.datetime) -> Optional[Dict]:
        """
        Finalize a window and return the aggregated result.
        """
        if window_start not in self.active_windows:
            return None
            
        window = self.active_windows.pop(window_start)
        
        # Clean up event tracking
        if window_start in self.window_events:
            del self.window_events[window_start]
        
        return {
            'window_start': window['start'].isoformat(),
            'window_end': window['end'].isoformat(),
            'event_count': window['count'],
            'total_amount': window['sum_amount'],
            'unique_customers': len(window['unique_customers']),
            'avg_amount': window['sum_amount'] / max(window['count'], 1)
        }

Late Data Handling Strategies

Real-world systems must handle the inevitable late arrivals. Here are three approaches, each with different trade-offs:

class LateDataStrategy:
    """Base class for late data handling strategies."""
    
    def handle_late_event(self, event: DataEvent, 
                         affected_windows: List[dt.datetime]) -> Dict:
        raise NotImplementedError

class IgnoreLateData(LateDataStrategy):
    """Simply drop events that arrive after window completion."""
    
    def handle_late_event(self, event: DataEvent, 
                         affected_windows: List[dt.datetime]) -> Dict:
        return {
            'action': 'dropped',
            'reason': 'arrived_after_watermark',
            'event': event,
            'affected_windows': len(affected_windows)
        }

class ReprocessWithCorrections(LateDataStrategy):
    """Recompute affected windows and emit corrections."""
    
    def __init__(self, correction_sink):
        self.correction_sink = correction_sink
        
    def handle_late_event(self, event: DataEvent, 
                         affected_windows: List[dt.datetime]) -> Dict:
        corrections = []
        
        for window_start in affected_windows:
            # Recompute window with the late event
            corrected_result = self._recompute_window(window_start, event)
            
            # Emit correction
            correction = {
                'window_start': window_start.isoformat(),
                'correction_type': 'late_arrival',
                'original_timestamp': dt.datetime.utcnow().isoformat(),
                'new_values': corrected_result
            }
            
            self.correction_sink.emit(correction)
            corrections.append(correction)
            
        return {
            'action': 'corrected',
            'corrections_emitted': len(corrections),
            'affected_windows': affected_windows
        }

class SideOutputLateData(LateDataStrategy):
    """Send late data to a separate stream for manual handling."""
    
    def __init__(self, side_output_sink):
        self.side_output_sink = side_output_sink
        
    def handle_late_event(self, event: DataEvent, 
                         affected_windows: List[dt.datetime]) -> Dict:
        # Send to side output with metadata
        side_output_record = {
            'event': event.__dict__,
            'lateness_minutes': self._calculate_lateness(event),
            'affected_windows': [w.isoformat() for w in affected_windows],
            'side_output_timestamp': dt.datetime.utcnow().isoformat()
        }
        
        self.side_output_sink.emit(side_output_record)
        
        return {
            'action': 'side_output',
            'sink': 'late_data_queue',
            'affected_windows': len(affected_windows)
        }

Hands-On Exercise: Building a Robust Order Analytics Pipeline

Let's put these concepts together by building a realistic order analytics pipeline that handles all three patterns. You'll process a stream of e-commerce orders with the following requirements:

  • Real-time: Process orders as they arrive (CDC)
  • Hourly batches: Reconcile any missed data (timestamp-based)
  • Daily reports: Generate sliding window analytics with late data handling (watermarks)

Setting Up the Exercise Environment

First, let's create our source data structure:

-- Source: Orders table with CDC enabled
CREATE TABLE source_orders (
    order_id BIGINT PRIMARY KEY,
    customer_id BIGINT NOT NULL,
    order_amount DECIMAL(10,2) NOT NULL,
    order_status VARCHAR(20) NOT NULL,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL,
    deleted_at TIMESTAMP NULL
);

-- Target: Analytics-ready orders table
CREATE TABLE analytics_orders (
    order_id BIGINT PRIMARY KEY,
    customer_id BIGINT NOT NULL,
    order_amount DECIMAL(10,2) NOT NULL,
    order_status VARCHAR(20) NOT NULL,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL,
    
    -- CDC metadata
    _cdc_operation VARCHAR(10),
    _cdc_timestamp TIMESTAMP,
    _cdc_lsn BIGINT,
    
    -- Processing metadata
    _processing_method VARCHAR(20), -- 'cdc', 'timestamp', 'reconciliation'
    _processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Sliding window aggregations
CREATE TABLE hourly_order_metrics (
    window_start TIMESTAMP,
    window_end TIMESTAMP,
    total_orders BIGINT,
    total_amount DECIMAL(12,2),
    unique_customers BIGINT,
    avg_order_amount DECIMAL(10,2),
    
    -- Watermark metadata
    _watermark_timestamp TIMESTAMP,
    _is_final BOOLEAN DEFAULT FALSE,
    _correction_count INT DEFAULT 0,
    
    PRIMARY KEY (window_start)
);

Implementing the Unified Pipeline

import asyncio
import logging
from typing import Dict, List, Optional, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta

class UnifiedOrderPipeline:
    def __init__(self, db_connection, kafka_consumer):
        self.db = db_connection
        self.kafka = kafka_consumer
        
        # Initialize components
        self.cdc_processor = CDCProcessor()
        self.timestamp_loader = TimestampBasedLoader(lag_seconds=300)
        self.watermark_manager = WatermarkManager(max_out_of_order_minutes=15)
        self.window_aggregator = WindowedAggregator(
            window_minutes=60, 
            slide_minutes=15
        )
        
        # State management
        self.last_timestamp_checkpoint = self._load_timestamp_checkpoint()
        self.last_cdc_offset = self._load_cdc_checkpoint()
        
    async def run_pipeline(self):
        """
        Main pipeline orchestration - runs all patterns concurrently.
        """
        try:
            await asyncio.gather(
                self._run_cdc_stream(),
                self._run_timestamp_reconciliation(),
                self._run_watermark_processing(),
                return_exceptions=True
            )
        except Exception as e:
            logging.error(f"Pipeline error: {e}")
            raise
    
    async def _run_cdc_stream(self):
        """
        Process real-time CDC events from Kafka.
        """
        async for message in self.kafka.consume('orders_cdc'):
            try:
                # Parse CDC event
                cdc_event = self._parse_cdc_message(message)
                
                # Process through CDC handler
                processed_record = self.cdc_processor.process_event(cdc_event)
                
                if processed_record:
                    # Write to analytics table
                    processed_record['_processing_method'] = 'cdc'
                    await self._upsert_order(processed_record)
                    
                    # Forward to watermark processing for aggregations
                    if processed_record['_cdc_operation'] in ['INSERT', 'UPDATE']:
                        data_event = DataEvent(
                            event_time=processed_record['created_at'],
                            processing_time=datetime.utcnow(),
                            partition=str(processed_record['customer_id'] % 10),
                            data=processed_record
                        )
                        
                        await self._process_for_aggregation(data_event)
                
                # Update checkpoint
                self._save_cdc_checkpoint(message.offset)
                
            except Exception as e:
                logging.error(f"CDC processing error: {e}")
                # Implement your error handling strategy here
                continue
    
    async def _run_timestamp_reconciliation(self):
        """
        Run periodic reconciliation to catch any missed CDC events.
        """
        while True:
            try:
                # Wait for next reconciliation interval (every hour)
                await asyncio.sleep(3600)
                
                # Get next window to process
                start_time, end_time = self.timestamp_loader.get_next_window(
                    self.last_timestamp_checkpoint
                )
                
                logging.info(f"Starting reconciliation: {start_time} to {end_time}")
                
                # Load data using timestamp approach
                reconciliation_records = self.timestamp_loader.load_incremental(
                    start_time, end_time
                )
                
                # Process each record
                reconciliation_count = 0
                for record in reconciliation_records:
                    # Check if we already have this record from CDC
                    existing = await self._get_existing_order(record['order_id'])
                    
                    if not existing or existing['_cdc_lsn'] < record.get('_version', 0):
                        # Either missing or we have an older version
                        record['_processing_method'] = 'reconciliation'
                        await self._upsert_order(record)
                        reconciliation_count += 1
                
                # Update checkpoint
                self.last_timestamp_checkpoint = end_time
                self._save_timestamp_checkpoint(end_time)
                
                logging.info(f"Reconciliation complete: {reconciliation_count} records processed")
                
            except Exception as e:
                logging.error(f"Reconciliation error: {e}")
                continue
    
    async def _run_watermark_processing(self):
        """
        Process watermark advancement and window completions.
        """
        while True:
            try:
                # Check for watermark advancement every 30 seconds
                await asyncio.sleep(30)
                
                current_time = datetime.utcnow()
                
                # Advance watermarks for all partitions
                for partition in range(10):  # Assuming 10 partitions
                    partition_watermark = current_time - timedelta(minutes=15)
                    self.watermark_manager.partition_watermarks[str(partition)] = partition_watermark
                
                # Get global watermark
                if self.watermark_manager.partition_watermarks:
                    global_watermark = min(self.watermark_manager.partition_watermarks.values())
                    
                    # Process any completed windows
                    completed_windows = self.window_aggregator.get_completed_windows(
                        global_watermark
                    )
                    
                    for window_result in completed_windows:
                        await self._save_window_result(window_result)
                        
                    if completed_windows:
                        logging.info(f"Completed {len(completed_windows)} windows")
                
            except Exception as e:
                logging.error(f"Watermark processing error: {e}")
                continue
    
    async def _process_for_aggregation(self, event: DataEvent):
        """
        Add event to windowed aggregations.
        """
        current_watermark = min(self.watermark_manager.partition_watermarks.values()) \
                          if self.watermark_manager.partition_watermarks else datetime.min
        
        completed_windows = self.window_aggregator.add_event(event, current_watermark)
        
        for window_result in completed_windows:
            await self._save_window_result(window_result)
    
    async def _upsert_order(self, record: Dict):
        """
        Idempotent upsert of order record.
        """
        query = """
        INSERT INTO analytics_orders 
        (order_id, customer_id, order_amount, order_status, created_at, updated_at,
         _cdc_operation, _cdc_timestamp, _cdc_lsn, _processing_method)
        VALUES (%(order_id)s, %(customer_id)s, %(order_amount)s, %(order_status)s, 
                %(created_at)s, %(updated_at)s, %(_cdc_operation)s, 
                %(_cdc_timestamp)s, %(_cdc_lsn)s, %(_processing_method)s)
        ON CONFLICT (order_id) DO UPDATE SET
            customer_id = EXCLUDED.customer_id,
            order_amount = EXCLUDED.order_amount,
            order_status = EXCLUDED.order_status,
            updated_at = EXCLUDED.updated_at,
            _cdc_operation = EXCLUDED._cdc_operation,
            _cdc_timestamp = EXCLUDED._cdc_timestamp,
            _cdc_lsn = EXCLUDED._cdc_lsn,
            _processing_method = EXCLUDED._processing_method,
            _processed_at = CURRENT_TIMESTAMP
        WHERE EXCLUDED._cdc_lsn > analytics_orders._cdc_lsn 
           OR analytics_orders._cdc_lsn IS NULL
        """
        
        await self.db.execute(query, record)
    
    async def _save_window_result(self, window_result: Dict):
        """
        Save completed window aggregation.
        """
        query = """
        INSERT INTO hourly_order_metrics 
        (window_start, window_end, total_orders, total_amount, 
         unique_customers, avg_order_amount, _watermark_timestamp)
        VALUES (%(window_start)s, %(window_end)s, %(event_count)s, %(total_amount)s,
                %(unique_customers)s, %(avg_amount)s, %(watermark_timestamp)s)
        ON CONFLICT (window_start) DO UPDATE SET
            total_orders = EXCLUDED.total_orders,
            total_amount = EXCLUDED.total_amount,
            unique_customers = EXCLUDED.unique_customers,
            avg_order_amount = EXCLUDED.avg_order_amount,
            _watermark_timestamp = EXCLUDED._watermark_timestamp,
            _correction_count = hourly_order_metrics._correction_count + 1,
            _is_final = FALSE
        """
        
        params = window_result.copy()
        params['watermark_timestamp'] = datetime.utcnow()
        
        await self.db.execute(query, params)

Testing the Pipeline

Create a comprehensive test that validates all three patterns work together:

import pytest
import asyncio
from unittest.mock import Mock, AsyncMock

class TestUnifiedPipeline:
    @pytest.fixture
    async def pipeline(self):
        db_mock = AsyncMock()
        kafka_mock = AsyncMock()
        return UnifiedOrderPipeline(db_mock, kafka_mock)
    
    async def test_cdc_to_aggregation_flow(self, pipeline):
        """
        Test that CDC events flow through to aggregations correctly.
        """
        # Simulate CDC event
        cdc_message = Mock()
        cdc_message.value = json.dumps({
            'op': 'c',  # Create
            'after': {
                'order_id': 12345,
                'customer_id': 789,
                'order_amount': 99.99,
                'order_status': 'pending',
                'created_at': '2024-01-15T14:30:00Z',
                'updated_at': '2024-01-15T14:30:00Z'
            },
            'ts_ms': int(datetime.utcnow().timestamp() * 1000),
            'lsn': 1001
        })
        cdc_message.offset = 42
        
        # Process the message
        await pipeline._run_cdc_stream()
        
        # Verify order was upserted
        pipeline.db.execute.assert_called()
        call_args = pipeline.db.execute.call_args
        assert 'analytics_orders' in call_args[0][0]
        assert call_args[0][1]['order_id'] == 12345
        assert call_args[0][1]['_processing_method'] == 'cdc'
    
    async def test_reconciliation_catches_missed_events(self, pipeline):
        """
        Test that timestamp-based reconciliation catches missed CDC events.
        """
        # Mock existing order (from CDC)
        pipeline._get_existing_order = AsyncMock(return_value={
            'order_id': 12345,
            '_cdc_lsn': 1000,
            '_processing_method': 'cdc'
        })
        
        # Mock timestamp loader returning newer version
        pipeline.timestamp_loader.load_incremental = Mock(return_value=[{
            'order_id': 12345,
            'customer_id': 789,
            'order_amount': 149.99,  # Updated amount
            'order_status': 'completed',
            '_version': 1002  # Newer than CDC version
        }])
        
        # Run reconciliation
        await pipeline._run_timestamp_reconciliation()
        
        # Verify the newer version was processed
        pipeline.db.execute.assert_called()
        call_args = pipeline.db.execute.call_args
        assert call_args[0][1]['_processing_method'] == 'reconciliation'
        assert call_args[0][1]['order_amount'] == 149.99
    
    async def test_watermark_window_completion(self, pipeline):
        """
        Test that watermark advancement triggers window completion.
        """
        # Add events to aggregator
        test_event = DataEvent(
            event_time=datetime(2024, 1, 15, 14, 30),
            processing_time=datetime.utcnow(),
            partition='0',
            data={'order_id': 12345, 'amount': 99.99, 'customer_id': 789}
        )
        
        # Process event
        await pipeline._process_for_aggregation(test_event)
        
        # Advance watermark to complete the window
        future_watermark = datetime(2024, 1, 15, 16, 0)  # 2 hours later
        pipeline.watermark_manager.partition_watermarks['0'] = future_watermark
        
        # Trigger watermark processing
        await pipeline._run_watermark_processing()
        
        # Verify window result was saved
        assert pipeline.db.execute.called
        # Check that a window result was written to hourly_order_metrics

Common Mistakes & Troubleshooting

The "Everything is Fine" Syndrome

The most dangerous mistake with incremental loading is assuming it's working when it's actually missing data. Unlike batch processing failures that crash loudly, incremental loading failures are often silent – your pipeline keeps running, but you're gradually losing data.

Symptoms:

  • Downstream reports show declining metrics despite business growth
  • Occasional gaps in time-series data
  • Customer complaints about missing recent transactions
  • Reconciliation jobs finding significant differences

Detection strategy:

class DataCompletenessMonitor:
    def __init__(self, source_db, target_db):
        self.source_db = source_db
        self.target_db = target_db
        
    async def check_completeness(self, hours_back: int = 24):
        """
        Compare source and target counts to detect missing data.
        """
        check_time = datetime.utcnow() - timedelta(hours=hours_back)
        
        # Count in source system
        source_query = """
        SELECT COUNT(*), MAX(updated_at), MIN(updated_at)
        FROM customer_orders 
        WHERE updated_at >= %s
        """
        source_result = await self.source_db.fetchone(source_query, (check_time,))
        
        # Count in target system
        target_query = """
        SELECT COUNT(*), MAX(updated_at), MIN(updated_at)
        FROM analytics_orders 
        WHERE updated_at >= %s AND _processing_method != 'deleted'
        """
        target_result = await self.target_db.fetchone(target_query, (check_time,))
        
        completeness_ratio = target_result[0] / max(source_result[0], 1)
        
        if completeness_ratio < 0.95:  # Alert if less than 95% complete
            return {
                'status': 'INCOMPLETE',
                'source_count': source_result[0],
                'target_count': target_result[0],
                'completeness_ratio': completeness_ratio,
                'recommended_action': 'run_reconciliation'
            }
        
        return {'status': 'OK', 'completeness_ratio': completeness_ratio}

Clock Skew Horror Stories

Problem: Your application server is 2 minutes fast, your database server is 30 seconds slow, and your processing server is accurate. Chaos ensues.

Real-world example:

# This actually happened in production
# Application writes: updated_at = '2024-01-15 14:30:00' (server 2 min fast)
# Database stores:   updated_at = '2024-01-15 14:29:30' (DB 30 sec slow) 
# Pipeline queries:  WHERE updated_at > '2024-01-15 14:29:45' (accurate time)
# Result: Record exists but query misses it

Solution: Always use database-generated timestamps for incremental loading:

-- Instead of this:
INSERT INTO orders (id, amount, updated_at) 
VALUES (1, 100.00, '2024-01-15 14:30:00');

-- Do this:
INSERT INTO orders (id, amount, updated_at) 
VALUES (1, 100.00, CURRENT_TIMESTAMP);

CDC Replication Lag Monitoring

CDC systems can fall behind during high load periods. Monitor replication lag and implement backpressure:

class CDCLagMonitor:
    def __init__(self, cdc_connector, alert_threshold_minutes=5):
        self.cdc_connector = cdc_connector
        self.alert_threshold = timedelta(minutes=alert_threshold_minutes)
        
    async def check_replication_lag(self) -> Dict[str, Any]:
        """
        Check how far behind CDC replication is from the source database.
        """
        # Get latest LSN from source database
        source_lsn_query = "SELECT pg_current_wal_lsn();"
        source_lsn = await self.source_db.fetchval(source_lsn_query)
        
        # Get latest processed LSN from CDC connector
        connector_status = await self.cdc_connector.get_status()
        processed_lsn = connector_status['last_processed_lsn']
        processed_timestamp = connector_status['last_processed_timestamp']
        
        # Calculate lag
        current_time = datetime.utcnow()
        time_lag = current_time - processed_timestamp
        
        lag_status = {
            'source_lsn': source_lsn,
            'processed_lsn': processed_lsn,
            'time_lag_seconds': time_lag.total_seconds(),
            'is_healthy': time_lag < self.alert_threshold
        }
        
        if not lag_status['is_healthy']:
            await self._trigger_backpressure()
            
        return lag_status
    
    async def _trigger_backpressure(self):
        """
        Implement backpressure when CDC lag is too high.
        """
        # Reduce downstream processing rate
        # Pause non-critical jobs
        # Alert operations team
        pass

Watermark Debugging

When watermarks aren't advancing properly, events pile up in memory and windows never complete. Here's how to debug:

class WatermarkDebugger:
    def __init__(self, watermark_manager):
        self.watermark_manager = watermark_manager
        
    def diagnose_stalled_watermarks(self) -> Dict[str, Any]:
        """
        Identify why watermarks aren't advancing.
        """
        diagnosis = {
            'current_time': datetime.utcnow(),
            'partition_watermarks': dict(self.watermark_manager.partition_watermarks),
            'pending_events': len(self.watermark_manager.pending_events),
            'issues': []
        }
        
        # Check for inactive partitions
        current_time = datetime.utcnow()
        for partition, watermark in diagnosis['partition_watermarks'].items():
            staleness = current_time - watermark
            if staleness > timedelta(minutes=10):
                diagnosis['issues'].append({
                    'type': 'stale_partition',
                    'partition': partition,
                    'staleness_minutes': staleness.total_seconds() / 60,
                    'suggested_action': 'check_partition_health'
                })
        
        # Check for memory pressure from pending events
        if diagnosis['pending_events'] > 100000:
            diagnosis['issues'].append({
                'type': 'memory_pressure',
                'pending_count': diagnosis['pending_events'],
                'suggested_action': 'increase_max_out_of_order_time'
            })
        
        # Check for watermark spread (inconsistent progress)
        if diagnosis['partition_watermarks']:
            watermarks = list(diagnosis['partition_watermarks'].values())
            spread = max(watermarks) - min(watermarks)
            if spread > timedelta(hours=1):
                diagnosis['issues'].append({
                    'type': 'watermark_spread',
                    'spread_minutes': spread.total_seconds() / 60,
                    'suggested_action': 'investigate_slow_partitions'
                })
        
        return diagnosis

Summary & Next Steps

Incremental loading isn't just about choosing between timestamps, CDC, or watermarks – it's about understanding the fundamental trade-offs between consistency, latency, and operational complexity. Each pattern solves different problems and fails in different ways.

Timestamp-based loading is simple and works well for systems where you control the data model and can accept some lag. Use it when you need periodic reconciliation or batch processing with reasonable freshness requirements. The key insight is building in lag windows and overlap detection to handle the inevitable clock skew and network delays.

Change Data Capture gives you event-level precision and near real-time processing, but at the cost of operational complexity. CDC excels when you need to maintain strict consistency across systems or when downstream systems need to react immediately to changes. The critical success factor is handling transaction boundaries and implementing robust duplicate detection.

Watermarks provide a sophisticated framework for handling late-arriving data in distributed systems, but they require careful tuning of lateness bounds and completeness requirements. Use watermarks when you're doing complex time-windowed analytics or need to balance low latency with guaranteed completeness.

The most robust systems combine all three approaches: CDC for real-time processing, timestamp-based reconciliation to catch gaps, and watermarks for time-windowed analytics. But remember that complexity is a feature – add it only when the business requirements justify the operational overhead.

Next steps to deepen your expertise:

  1. Implement schema evolution handling – Learn how each pattern handles upstream schema changes without breaking downstream consumers
  2. Master exactly-once processing – Dive into the subtle differences between at-least-once and exactly-once semantics in distributed systems
  3. Study conflict resolution strategies – Understand how to handle conflicting updates when multiple systems can modify the same data
  4. Explore cross-region replication – Learn how these patterns behave when data must be replicated across geographic regions with different consistency requirements

The principles you've learned here apply beyond data engineering – they're fundamental patterns for building reliable distributed systems that process changing data at scale.

Learning Path: Data Pipeline Fundamentals

Previous

Working with APIs: REST, Pagination, and Rate Limiting for Data Engineers

Next

Data Pipeline Error Handling and Recovery Strategies

Related Articles

Data Engineering🔥 Expert

Building a Complete Modern Data Stack from Scratch

26 min
Data Engineering⚡ Practitioner

Cost Management in Cloud Data Platforms

28 min
Data Engineering🌱 Foundation

Real-Time Data: When to Use Streaming vs Batch Processing

21 min

On this page

  • Prerequisites
  • The Timestamp Trap: Beyond Simple Last Modified
  • Problem 1: Clock Skew and Late Arrivals
  • Problem 2: The Boundary Precision Problem
  • Problem 3: Updates vs. Deletes
  • Change Data Capture: Event-Driven Precision
  • CDC Implementation Patterns
  • Handling CDC Complexity: Transactions and Ordering
  • CDC Deduplication Strategies
  • Watermarks: Balancing Completeness and Latency
  • Late Data Handling Strategies
  • Hands-On Exercise: Building a Robust Order Analytics Pipeline
  • Setting Up the Exercise Environment
  • Implementing the Unified Pipeline
  • Testing the Pipeline
  • Common Mistakes & Troubleshooting
  • The "Everything is Fine" Syndrome
  • Clock Skew Horror Stories
  • CDC Replication Lag Monitoring
  • Watermark Debugging
  • Summary & Next Steps
  • Understanding Watermark Semantics
  • Implementing Sliding Window Aggregations with Watermarks
  • Late Data Handling Strategies
  • Hands-On Exercise: Building a Robust Order Analytics Pipeline
  • Setting Up the Exercise Environment
  • Implementing the Unified Pipeline
  • Testing the Pipeline
  • Common Mistakes & Troubleshooting
  • The "Everything is Fine" Syndrome
  • Clock Skew Horror Stories
  • CDC Replication Lag Monitoring
  • Watermark Debugging
  • Summary & Next Steps