
You're staring at a data pipeline that worked perfectly for months, until it didn't. Your customer analytics dashboard is showing stale data from three days ago, your CDC stream is producing duplicates, and the timestamp-based incremental load that seemed so elegant is now missing records. Sound familiar? You're experiencing the sharp edge of incremental loading at scale.
The promise of incremental loading is seductive: only process what's changed, save compute costs, reduce latency, and keep your data fresh. But as data volumes grow and systems become more distributed, the simple "last modified timestamp" approach starts to crack under pressure. Network delays create gaps in your watermarks, clock skew between systems causes phantom duplicates, and suddenly your "real-time" pipeline is neither real nor reliable.
This isn't just about picking the right tool – it's about understanding the fundamental trade-offs between consistency, performance, and operational complexity. By the end of this lesson, you'll understand not just how to implement these patterns, but when each approach breaks down and how to recover gracefully.
What you'll learn:
You should be comfortable with:
Let's start with what seems like the simplest approach: timestamp-based incremental loading. You've probably seen code like this:
-- The deceptively simple approach
SELECT * FROM customer_orders
WHERE updated_at > '2024-01-15 14:30:00'
AND updated_at <= '2024-01-15 15:30:00';
This works beautifully in development and early production. But in distributed systems with real load, this pattern has three critical failure modes that will eventually bite you.
Your application servers aren't perfectly synchronized. Server A might be 30 seconds ahead of Server B, and your database server might be somewhere in between. When you query for records with updated_at > last_processed_timestamp, you're making a dangerous assumption: that timestamps reflect the actual order of events.
Here's what really happens:
# What you think is happening:
# 14:30:00 - Record A created
# 14:30:05 - Record B created
# 14:30:10 - Your query runs, captures both
# What actually happens:
# 14:30:00 - Record A created (Server clock: 14:29:45)
# 14:30:05 - Record B created (Server clock: 14:30:20)
# 14:30:10 - Your query runs
# 14:30:15 - Record A finally commits to DB (due to network delay)
# Result: You miss Record A entirely
The solution is to build in a lag window and implement overlap detection:
import datetime as dt
from typing import Optional
class TimestampBasedLoader:
def __init__(self, lag_seconds: int = 300, overlap_seconds: int = 60):
self.lag_seconds = lag_seconds # 5-minute safety window
self.overlap_seconds = overlap_seconds # 1-minute overlap for safety
def get_next_window(self, last_processed: Optional[dt.datetime] = None) -> tuple:
"""
Calculate the next processing window with built-in lag and overlap.
"""
now = dt.datetime.utcnow()
# Never process data newer than lag_seconds ago
safe_end_time = now - dt.timedelta(seconds=self.lag_seconds)
if last_processed is None:
# First run - start from 24 hours ago
start_time = safe_end_time - dt.timedelta(days=1)
else:
# Overlap with previous window to catch late arrivals
start_time = last_processed - dt.timedelta(seconds=self.overlap_seconds)
return start_time, safe_end_time
def load_incremental(self, start_time: dt.datetime, end_time: dt.datetime):
query = """
SELECT order_id, customer_id, total_amount, updated_at,
-- Track the actual processing time for debugging
CURRENT_TIMESTAMP as processed_at
FROM customer_orders
WHERE updated_at > %s
AND updated_at <= %s
ORDER BY updated_at, order_id -- Deterministic ordering crucial
"""
return execute_query(query, (start_time, end_time))
Most timestamp columns aren't as precise as you think. Your database might store timestamps with second precision, but multiple records could have the exact same timestamp. If your last processed timestamp is 2024-01-15 14:30:00 and you query for updated_at > '2024-01-15 14:30:00', you'll miss any other records that were also updated at exactly 14:30:00.
-- Dangerous: misses records with same timestamp
SELECT * FROM orders WHERE updated_at > '2024-01-15 14:30:00';
-- Better: use >= with additional logic
SELECT * FROM orders
WHERE updated_at >= '2024-01-15 14:30:00'
AND NOT (updated_at = '2024-01-15 14:30:00' AND order_id <= 12345);
The robust solution uses a composite bookmark that tracks both timestamp and primary key:
class CompositeBookmark:
def __init__(self, timestamp: dt.datetime, last_id: int):
self.timestamp = timestamp
self.last_id = last_id
def to_dict(self):
return {
'timestamp': self.timestamp.isoformat(),
'last_id': self.last_id
}
@classmethod
def from_dict(cls, data):
return cls(
timestamp=dt.datetime.fromisoformat(data['timestamp']),
last_id=data['last_id']
)
def build_incremental_query(bookmark: CompositeBookmark):
return """
SELECT order_id, customer_id, total_amount, updated_at
FROM customer_orders
WHERE (updated_at > %(timestamp)s)
OR (updated_at = %(timestamp)s AND order_id > %(last_id)s)
ORDER BY updated_at, order_id
LIMIT 10000
""", {
'timestamp': bookmark.timestamp,
'last_id': bookmark.last_id
}
Timestamp-based loading naturally captures inserts and updates, but what about deletes? The record is gone – there's no timestamp to query. You need a separate strategy:
-- Option 1: Soft deletes (if you control the schema)
ALTER TABLE customer_orders ADD COLUMN deleted_at TIMESTAMP NULL;
-- Option 2: Separate audit table
CREATE TABLE customer_orders_audit (
order_id BIGINT,
operation_type VARCHAR(10), -- 'INSERT', 'UPDATE', 'DELETE'
operation_timestamp TIMESTAMP,
old_values JSONB,
new_values JSONB
);
Pro tip: If you can't modify the source schema, consider implementing a "full vs. incremental reconciliation" pattern. Run incremental loads for speed, but periodically do full comparisons to catch deletes and fix any consistency issues.
When timestamp-based loading isn't precise enough, Change Data Capture (CDC) gives you event-level accuracy. Instead of polling for changes, CDC streams every insert, update, and delete as it happens. But with great power comes great complexity.
Modern CDC tools like Debezium, AWS DMS, or cloud-native solutions like BigQuery Change Streams provide the event stream, but you still need to handle the events correctly:
from dataclasses import dataclass
from typing import Dict, Any, Optional
import json
@dataclass
class CDCEvent:
table_name: str
operation: str # 'INSERT', 'UPDATE', 'DELETE'
primary_key: Dict[str, Any]
before: Optional[Dict[str, Any]] # Pre-change values
after: Optional[Dict[str, Any]] # Post-change values
transaction_id: str
lsn: int # Log Sequence Number for ordering
timestamp: dt.datetime
class CDCProcessor:
def __init__(self):
self.processed_lsn = self.load_checkpoint()
def process_event(self, event: CDCEvent) -> Optional[Dict[str, Any]]:
"""
Convert CDC event into target table format.
Returns None if event should be skipped.
"""
# Ensure we don't process events out of order
if event.lsn <= self.processed_lsn:
print(f"Skipping already processed event: {event.lsn}")
return None
if event.operation == 'INSERT':
return self._handle_insert(event)
elif event.operation == 'UPDATE':
return self._handle_update(event)
elif event.operation == 'DELETE':
return self._handle_delete(event)
else:
raise ValueError(f"Unknown operation: {event.operation}")
def _handle_insert(self, event: CDCEvent) -> Dict[str, Any]:
record = event.after.copy()
record['_cdc_operation'] = 'INSERT'
record['_cdc_timestamp'] = event.timestamp
record['_cdc_lsn'] = event.lsn
return record
def _handle_update(self, event: CDCEvent) -> Dict[str, Any]:
# For updates, we typically want the final state
record = event.after.copy()
record['_cdc_operation'] = 'UPDATE'
record['_cdc_timestamp'] = event.timestamp
record['_cdc_lsn'] = event.lsn
# Optionally track what changed
changed_fields = []
for key in event.after:
if event.before.get(key) != event.after.get(key):
changed_fields.append(key)
record['_cdc_changed_fields'] = changed_fields
return record
def _handle_delete(self, event: CDCEvent) -> Dict[str, Any]:
# For deletes, we only have the before state
record = event.before.copy()
record['_cdc_operation'] = 'DELETE'
record['_cdc_timestamp'] = event.timestamp
record['_cdc_lsn'] = event.lsn
return record
CDC streams give you raw database events, but databases are complex. A single business transaction might generate dozens of CDC events across multiple tables. These events must be processed in the correct order to maintain consistency.
from collections import defaultdict
from typing import List
class TransactionAwareProcessor:
def __init__(self):
self.pending_transactions = defaultdict(list)
self.committed_transactions = set()
def process_event_stream(self, events: List[CDCEvent]):
"""
Process a batch of CDC events, handling transaction boundaries.
"""
for event in sorted(events, key=lambda e: e.lsn):
if event.operation == 'BEGIN':
# Transaction start marker
self.pending_transactions[event.transaction_id] = []
elif event.operation == 'COMMIT':
# Process all events in this transaction
tx_events = self.pending_transactions.pop(event.transaction_id)
self._process_transaction(tx_events)
self.committed_transactions.add(event.transaction_id)
else:
# Regular data change event
self.pending_transactions[event.transaction_id].append(event)
def _process_transaction(self, events: List[CDCEvent]):
"""
Process all events in a transaction as an atomic unit.
"""
# Group events by table to handle foreign key dependencies
events_by_table = defaultdict(list)
for event in events:
events_by_table[event.table_name].append(event)
# Process in dependency order (customers before orders)
table_order = ['customers', 'orders', 'order_items']
for table_name in table_order:
if table_name in events_by_table:
for event in events_by_table[table_name]:
processed_record = self.process_event(event)
if processed_record:
self._write_to_target(table_name, processed_record)
CDC streams can produce duplicates due to retries, network issues, or connector restarts. Your processing logic must be idempotent:
class IdempotentCDCProcessor:
def __init__(self, target_db):
self.target_db = target_db
def upsert_record(self, table_name: str, record: Dict[str, Any]):
"""
Idempotent upsert that handles duplicates gracefully.
"""
# Extract primary key for conflict resolution
pk_fields = self._get_primary_key_fields(table_name)
pk_values = {field: record[field] for field in pk_fields}
if record['_cdc_operation'] == 'DELETE':
# For deletes, we might receive the same delete event multiple times
query = f"""
DELETE FROM {table_name}
WHERE {' AND '.join(f'{k} = %({k})s' for k in pk_values.keys())}
AND (_cdc_lsn < %(lsn)s OR _cdc_lsn IS NULL)
"""
params = {**pk_values, 'lsn': record['_cdc_lsn']}
else:
# For inserts/updates, use LSN to ensure we don't regress
fields = [k for k in record.keys() if not k.startswith('_pk_')]
query = f"""
INSERT INTO {table_name} ({', '.join(fields)})
VALUES ({', '.join(f'%({f})s' for f in fields)})
ON CONFLICT ({', '.join(pk_fields)})
DO UPDATE SET
{', '.join(f'{f} = EXCLUDED.{f}' for f in fields if f not in pk_fields)},
_cdc_lsn = EXCLUDED._cdc_lsn,
_cdc_timestamp = EXCLUDED._cdc_timestamp
WHERE EXCLUDED._cdc_lsn > {table_name}._cdc_lsn
"""
params = record
return self.target_db.execute(query, params)
Watermarks represent a fundamentally different approach to incremental processing. Instead of asking "what changed since timestamp X?", watermarks ask "what can we safely say is complete up to timestamp Y?" This shift in thinking is crucial for handling late-arriving data in distributed systems.
A watermark is a promise: "No more events will arrive with timestamps earlier than this value." This promise lets you trigger computations, close time windows, and emit final results. But the key insight is that watermarks trade latency for completeness.
from dataclasses import dataclass
import heapq
from typing import Dict, List, Optional
@dataclass
class DataEvent:
event_time: dt.datetime # When the event actually occurred
processing_time: dt.datetime # When we received it
partition: str
data: Dict[str, Any]
class WatermarkManager:
def __init__(self, max_out_of_order_minutes: int = 10):
self.max_out_of_order = dt.timedelta(minutes=max_out_of_order_minutes)
self.partition_watermarks = {}
self.pending_events = [] # Min-heap ordered by event_time
def process_event(self, event: DataEvent) -> Optional[List[DataEvent]]:
"""
Process an incoming event and return any events that are now
safe to process based on updated watermarks.
"""
# Update watermark for this partition
processing_time = dt.datetime.utcnow()
partition_watermark = processing_time - self.max_out_of_order
self.partition_watermarks[event.partition] = partition_watermark
# Add event to pending queue
heapq.heappush(self.pending_events, (event.event_time, event))
# Calculate global watermark (minimum across all partitions)
if not self.partition_watermarks:
return []
global_watermark = min(self.partition_watermarks.values())
# Release events that are now safe to process
safe_events = []
while (self.pending_events and
self.pending_events[0][0] <= global_watermark):
_, safe_event = heapq.heappop(self.pending_events)
safe_events.append(safe_event)
return safe_events if safe_events else None
Watermarks become most powerful when combined with time-windowed operations. Here's how to implement a sliding window aggregation that handles late arrivals gracefully:
from collections import defaultdict, deque
from typing import Dict, List, NamedTuple
class WindowedAggregator:
def __init__(self, window_minutes: int = 60, slide_minutes: int = 15):
self.window_size = dt.timedelta(minutes=window_minutes)
self.slide_interval = dt.timedelta(minutes=slide_minutes)
# Track active windows: timestamp -> {aggregation_state}
self.active_windows = {}
# Track events by window for late arrival handling
self.window_events = defaultdict(list)
def add_event(self, event: DataEvent, current_watermark: dt.datetime) -> List[Dict]:
"""
Add event to appropriate windows and return any completed windows.
"""
completed_windows = []
# Determine which windows this event belongs to
affected_windows = self._get_affected_windows(event.event_time)
for window_start in affected_windows:
window_end = window_start + self.window_size
# Initialize window if needed
if window_start not in self.active_windows:
self.active_windows[window_start] = {
'start': window_start,
'end': window_end,
'count': 0,
'sum_amount': 0.0,
'unique_customers': set()
}
# Add event to window
window = self.active_windows[window_start]
window['count'] += 1
window['sum_amount'] += event.data.get('amount', 0)
window['unique_customers'].add(event.data.get('customer_id'))
# Track event for potential late arrival adjustments
self.window_events[window_start].append(event)
# Check if window is complete (watermark has passed window end)
if current_watermark >= window_end:
completed_window = self._finalize_window(window_start)
if completed_window:
completed_windows.append(completed_window)
return completed_windows
def _get_affected_windows(self, event_time: dt.datetime) -> List[dt.datetime]:
"""
Calculate which sliding windows this event belongs to.
"""
windows = []
# Find the latest window start that includes this event
latest_window_start = self._floor_to_slide_boundary(
event_time - self.window_size + self.slide_interval
)
# Generate all windows that include this event
current_window = latest_window_start
while current_window <= event_time:
if current_window + self.window_size > event_time:
windows.append(current_window)
current_window += self.slide_interval
return windows
def _floor_to_slide_boundary(self, timestamp: dt.datetime) -> dt.datetime:
"""
Floor timestamp to slide interval boundary.
"""
minutes_since_epoch = int(timestamp.timestamp() / 60)
slide_minutes = int(self.slide_interval.total_seconds() / 60)
floored_minutes = (minutes_since_epoch // slide_minutes) * slide_minutes
return dt.datetime.fromtimestamp(floored_minutes * 60)
def _finalize_window(self, window_start: dt.datetime) -> Optional[Dict]:
"""
Finalize a window and return the aggregated result.
"""
if window_start not in self.active_windows:
return None
window = self.active_windows.pop(window_start)
# Clean up event tracking
if window_start in self.window_events:
del self.window_events[window_start]
return {
'window_start': window['start'].isoformat(),
'window_end': window['end'].isoformat(),
'event_count': window['count'],
'total_amount': window['sum_amount'],
'unique_customers': len(window['unique_customers']),
'avg_amount': window['sum_amount'] / max(window['count'], 1)
}
Real-world systems must handle the inevitable late arrivals. Here are three approaches, each with different trade-offs:
class LateDataStrategy:
"""Base class for late data handling strategies."""
def handle_late_event(self, event: DataEvent,
affected_windows: List[dt.datetime]) -> Dict:
raise NotImplementedError
class IgnoreLateData(LateDataStrategy):
"""Simply drop events that arrive after window completion."""
def handle_late_event(self, event: DataEvent,
affected_windows: List[dt.datetime]) -> Dict:
return {
'action': 'dropped',
'reason': 'arrived_after_watermark',
'event': event,
'affected_windows': len(affected_windows)
}
class ReprocessWithCorrections(LateDataStrategy):
"""Recompute affected windows and emit corrections."""
def __init__(self, correction_sink):
self.correction_sink = correction_sink
def handle_late_event(self, event: DataEvent,
affected_windows: List[dt.datetime]) -> Dict:
corrections = []
for window_start in affected_windows:
# Recompute window with the late event
corrected_result = self._recompute_window(window_start, event)
# Emit correction
correction = {
'window_start': window_start.isoformat(),
'correction_type': 'late_arrival',
'original_timestamp': dt.datetime.utcnow().isoformat(),
'new_values': corrected_result
}
self.correction_sink.emit(correction)
corrections.append(correction)
return {
'action': 'corrected',
'corrections_emitted': len(corrections),
'affected_windows': affected_windows
}
class SideOutputLateData(LateDataStrategy):
"""Send late data to a separate stream for manual handling."""
def __init__(self, side_output_sink):
self.side_output_sink = side_output_sink
def handle_late_event(self, event: DataEvent,
affected_windows: List[dt.datetime]) -> Dict:
# Send to side output with metadata
side_output_record = {
'event': event.__dict__,
'lateness_minutes': self._calculate_lateness(event),
'affected_windows': [w.isoformat() for w in affected_windows],
'side_output_timestamp': dt.datetime.utcnow().isoformat()
}
self.side_output_sink.emit(side_output_record)
return {
'action': 'side_output',
'sink': 'late_data_queue',
'affected_windows': len(affected_windows)
}
Let's put these concepts together by building a realistic order analytics pipeline that handles all three patterns. You'll process a stream of e-commerce orders with the following requirements:
First, let's create our source data structure:
-- Source: Orders table with CDC enabled
CREATE TABLE source_orders (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT NOT NULL,
order_amount DECIMAL(10,2) NOT NULL,
order_status VARCHAR(20) NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
deleted_at TIMESTAMP NULL
);
-- Target: Analytics-ready orders table
CREATE TABLE analytics_orders (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT NOT NULL,
order_amount DECIMAL(10,2) NOT NULL,
order_status VARCHAR(20) NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
-- CDC metadata
_cdc_operation VARCHAR(10),
_cdc_timestamp TIMESTAMP,
_cdc_lsn BIGINT,
-- Processing metadata
_processing_method VARCHAR(20), -- 'cdc', 'timestamp', 'reconciliation'
_processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Sliding window aggregations
CREATE TABLE hourly_order_metrics (
window_start TIMESTAMP,
window_end TIMESTAMP,
total_orders BIGINT,
total_amount DECIMAL(12,2),
unique_customers BIGINT,
avg_order_amount DECIMAL(10,2),
-- Watermark metadata
_watermark_timestamp TIMESTAMP,
_is_final BOOLEAN DEFAULT FALSE,
_correction_count INT DEFAULT 0,
PRIMARY KEY (window_start)
);
import asyncio
import logging
from typing import Dict, List, Optional, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
class UnifiedOrderPipeline:
def __init__(self, db_connection, kafka_consumer):
self.db = db_connection
self.kafka = kafka_consumer
# Initialize components
self.cdc_processor = CDCProcessor()
self.timestamp_loader = TimestampBasedLoader(lag_seconds=300)
self.watermark_manager = WatermarkManager(max_out_of_order_minutes=15)
self.window_aggregator = WindowedAggregator(
window_minutes=60,
slide_minutes=15
)
# State management
self.last_timestamp_checkpoint = self._load_timestamp_checkpoint()
self.last_cdc_offset = self._load_cdc_checkpoint()
async def run_pipeline(self):
"""
Main pipeline orchestration - runs all patterns concurrently.
"""
try:
await asyncio.gather(
self._run_cdc_stream(),
self._run_timestamp_reconciliation(),
self._run_watermark_processing(),
return_exceptions=True
)
except Exception as e:
logging.error(f"Pipeline error: {e}")
raise
async def _run_cdc_stream(self):
"""
Process real-time CDC events from Kafka.
"""
async for message in self.kafka.consume('orders_cdc'):
try:
# Parse CDC event
cdc_event = self._parse_cdc_message(message)
# Process through CDC handler
processed_record = self.cdc_processor.process_event(cdc_event)
if processed_record:
# Write to analytics table
processed_record['_processing_method'] = 'cdc'
await self._upsert_order(processed_record)
# Forward to watermark processing for aggregations
if processed_record['_cdc_operation'] in ['INSERT', 'UPDATE']:
data_event = DataEvent(
event_time=processed_record['created_at'],
processing_time=datetime.utcnow(),
partition=str(processed_record['customer_id'] % 10),
data=processed_record
)
await self._process_for_aggregation(data_event)
# Update checkpoint
self._save_cdc_checkpoint(message.offset)
except Exception as e:
logging.error(f"CDC processing error: {e}")
# Implement your error handling strategy here
continue
async def _run_timestamp_reconciliation(self):
"""
Run periodic reconciliation to catch any missed CDC events.
"""
while True:
try:
# Wait for next reconciliation interval (every hour)
await asyncio.sleep(3600)
# Get next window to process
start_time, end_time = self.timestamp_loader.get_next_window(
self.last_timestamp_checkpoint
)
logging.info(f"Starting reconciliation: {start_time} to {end_time}")
# Load data using timestamp approach
reconciliation_records = self.timestamp_loader.load_incremental(
start_time, end_time
)
# Process each record
reconciliation_count = 0
for record in reconciliation_records:
# Check if we already have this record from CDC
existing = await self._get_existing_order(record['order_id'])
if not existing or existing['_cdc_lsn'] < record.get('_version', 0):
# Either missing or we have an older version
record['_processing_method'] = 'reconciliation'
await self._upsert_order(record)
reconciliation_count += 1
# Update checkpoint
self.last_timestamp_checkpoint = end_time
self._save_timestamp_checkpoint(end_time)
logging.info(f"Reconciliation complete: {reconciliation_count} records processed")
except Exception as e:
logging.error(f"Reconciliation error: {e}")
continue
async def _run_watermark_processing(self):
"""
Process watermark advancement and window completions.
"""
while True:
try:
# Check for watermark advancement every 30 seconds
await asyncio.sleep(30)
current_time = datetime.utcnow()
# Advance watermarks for all partitions
for partition in range(10): # Assuming 10 partitions
partition_watermark = current_time - timedelta(minutes=15)
self.watermark_manager.partition_watermarks[str(partition)] = partition_watermark
# Get global watermark
if self.watermark_manager.partition_watermarks:
global_watermark = min(self.watermark_manager.partition_watermarks.values())
# Process any completed windows
completed_windows = self.window_aggregator.get_completed_windows(
global_watermark
)
for window_result in completed_windows:
await self._save_window_result(window_result)
if completed_windows:
logging.info(f"Completed {len(completed_windows)} windows")
except Exception as e:
logging.error(f"Watermark processing error: {e}")
continue
async def _process_for_aggregation(self, event: DataEvent):
"""
Add event to windowed aggregations.
"""
current_watermark = min(self.watermark_manager.partition_watermarks.values()) \
if self.watermark_manager.partition_watermarks else datetime.min
completed_windows = self.window_aggregator.add_event(event, current_watermark)
for window_result in completed_windows:
await self._save_window_result(window_result)
async def _upsert_order(self, record: Dict):
"""
Idempotent upsert of order record.
"""
query = """
INSERT INTO analytics_orders
(order_id, customer_id, order_amount, order_status, created_at, updated_at,
_cdc_operation, _cdc_timestamp, _cdc_lsn, _processing_method)
VALUES (%(order_id)s, %(customer_id)s, %(order_amount)s, %(order_status)s,
%(created_at)s, %(updated_at)s, %(_cdc_operation)s,
%(_cdc_timestamp)s, %(_cdc_lsn)s, %(_processing_method)s)
ON CONFLICT (order_id) DO UPDATE SET
customer_id = EXCLUDED.customer_id,
order_amount = EXCLUDED.order_amount,
order_status = EXCLUDED.order_status,
updated_at = EXCLUDED.updated_at,
_cdc_operation = EXCLUDED._cdc_operation,
_cdc_timestamp = EXCLUDED._cdc_timestamp,
_cdc_lsn = EXCLUDED._cdc_lsn,
_processing_method = EXCLUDED._processing_method,
_processed_at = CURRENT_TIMESTAMP
WHERE EXCLUDED._cdc_lsn > analytics_orders._cdc_lsn
OR analytics_orders._cdc_lsn IS NULL
"""
await self.db.execute(query, record)
async def _save_window_result(self, window_result: Dict):
"""
Save completed window aggregation.
"""
query = """
INSERT INTO hourly_order_metrics
(window_start, window_end, total_orders, total_amount,
unique_customers, avg_order_amount, _watermark_timestamp)
VALUES (%(window_start)s, %(window_end)s, %(event_count)s, %(total_amount)s,
%(unique_customers)s, %(avg_amount)s, %(watermark_timestamp)s)
ON CONFLICT (window_start) DO UPDATE SET
total_orders = EXCLUDED.total_orders,
total_amount = EXCLUDED.total_amount,
unique_customers = EXCLUDED.unique_customers,
avg_order_amount = EXCLUDED.avg_order_amount,
_watermark_timestamp = EXCLUDED._watermark_timestamp,
_correction_count = hourly_order_metrics._correction_count + 1,
_is_final = FALSE
"""
params = window_result.copy()
params['watermark_timestamp'] = datetime.utcnow()
await self.db.execute(query, params)
Create a comprehensive test that validates all three patterns work together:
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock
class TestUnifiedPipeline:
@pytest.fixture
async def pipeline(self):
db_mock = AsyncMock()
kafka_mock = AsyncMock()
return UnifiedOrderPipeline(db_mock, kafka_mock)
async def test_cdc_to_aggregation_flow(self, pipeline):
"""
Test that CDC events flow through to aggregations correctly.
"""
# Simulate CDC event
cdc_message = Mock()
cdc_message.value = json.dumps({
'op': 'c', # Create
'after': {
'order_id': 12345,
'customer_id': 789,
'order_amount': 99.99,
'order_status': 'pending',
'created_at': '2024-01-15T14:30:00Z',
'updated_at': '2024-01-15T14:30:00Z'
},
'ts_ms': int(datetime.utcnow().timestamp() * 1000),
'lsn': 1001
})
cdc_message.offset = 42
# Process the message
await pipeline._run_cdc_stream()
# Verify order was upserted
pipeline.db.execute.assert_called()
call_args = pipeline.db.execute.call_args
assert 'analytics_orders' in call_args[0][0]
assert call_args[0][1]['order_id'] == 12345
assert call_args[0][1]['_processing_method'] == 'cdc'
async def test_reconciliation_catches_missed_events(self, pipeline):
"""
Test that timestamp-based reconciliation catches missed CDC events.
"""
# Mock existing order (from CDC)
pipeline._get_existing_order = AsyncMock(return_value={
'order_id': 12345,
'_cdc_lsn': 1000,
'_processing_method': 'cdc'
})
# Mock timestamp loader returning newer version
pipeline.timestamp_loader.load_incremental = Mock(return_value=[{
'order_id': 12345,
'customer_id': 789,
'order_amount': 149.99, # Updated amount
'order_status': 'completed',
'_version': 1002 # Newer than CDC version
}])
# Run reconciliation
await pipeline._run_timestamp_reconciliation()
# Verify the newer version was processed
pipeline.db.execute.assert_called()
call_args = pipeline.db.execute.call_args
assert call_args[0][1]['_processing_method'] == 'reconciliation'
assert call_args[0][1]['order_amount'] == 149.99
async def test_watermark_window_completion(self, pipeline):
"""
Test that watermark advancement triggers window completion.
"""
# Add events to aggregator
test_event = DataEvent(
event_time=datetime(2024, 1, 15, 14, 30),
processing_time=datetime.utcnow(),
partition='0',
data={'order_id': 12345, 'amount': 99.99, 'customer_id': 789}
)
# Process event
await pipeline._process_for_aggregation(test_event)
# Advance watermark to complete the window
future_watermark = datetime(2024, 1, 15, 16, 0) # 2 hours later
pipeline.watermark_manager.partition_watermarks['0'] = future_watermark
# Trigger watermark processing
await pipeline._run_watermark_processing()
# Verify window result was saved
assert pipeline.db.execute.called
# Check that a window result was written to hourly_order_metrics
The most dangerous mistake with incremental loading is assuming it's working when it's actually missing data. Unlike batch processing failures that crash loudly, incremental loading failures are often silent – your pipeline keeps running, but you're gradually losing data.
Symptoms:
Detection strategy:
class DataCompletenessMonitor:
def __init__(self, source_db, target_db):
self.source_db = source_db
self.target_db = target_db
async def check_completeness(self, hours_back: int = 24):
"""
Compare source and target counts to detect missing data.
"""
check_time = datetime.utcnow() - timedelta(hours=hours_back)
# Count in source system
source_query = """
SELECT COUNT(*), MAX(updated_at), MIN(updated_at)
FROM customer_orders
WHERE updated_at >= %s
"""
source_result = await self.source_db.fetchone(source_query, (check_time,))
# Count in target system
target_query = """
SELECT COUNT(*), MAX(updated_at), MIN(updated_at)
FROM analytics_orders
WHERE updated_at >= %s AND _processing_method != 'deleted'
"""
target_result = await self.target_db.fetchone(target_query, (check_time,))
completeness_ratio = target_result[0] / max(source_result[0], 1)
if completeness_ratio < 0.95: # Alert if less than 95% complete
return {
'status': 'INCOMPLETE',
'source_count': source_result[0],
'target_count': target_result[0],
'completeness_ratio': completeness_ratio,
'recommended_action': 'run_reconciliation'
}
return {'status': 'OK', 'completeness_ratio': completeness_ratio}
Problem: Your application server is 2 minutes fast, your database server is 30 seconds slow, and your processing server is accurate. Chaos ensues.
Real-world example:
# This actually happened in production
# Application writes: updated_at = '2024-01-15 14:30:00' (server 2 min fast)
# Database stores: updated_at = '2024-01-15 14:29:30' (DB 30 sec slow)
# Pipeline queries: WHERE updated_at > '2024-01-15 14:29:45' (accurate time)
# Result: Record exists but query misses it
Solution: Always use database-generated timestamps for incremental loading:
-- Instead of this:
INSERT INTO orders (id, amount, updated_at)
VALUES (1, 100.00, '2024-01-15 14:30:00');
-- Do this:
INSERT INTO orders (id, amount, updated_at)
VALUES (1, 100.00, CURRENT_TIMESTAMP);
CDC systems can fall behind during high load periods. Monitor replication lag and implement backpressure:
class CDCLagMonitor:
def __init__(self, cdc_connector, alert_threshold_minutes=5):
self.cdc_connector = cdc_connector
self.alert_threshold = timedelta(minutes=alert_threshold_minutes)
async def check_replication_lag(self) -> Dict[str, Any]:
"""
Check how far behind CDC replication is from the source database.
"""
# Get latest LSN from source database
source_lsn_query = "SELECT pg_current_wal_lsn();"
source_lsn = await self.source_db.fetchval(source_lsn_query)
# Get latest processed LSN from CDC connector
connector_status = await self.cdc_connector.get_status()
processed_lsn = connector_status['last_processed_lsn']
processed_timestamp = connector_status['last_processed_timestamp']
# Calculate lag
current_time = datetime.utcnow()
time_lag = current_time - processed_timestamp
lag_status = {
'source_lsn': source_lsn,
'processed_lsn': processed_lsn,
'time_lag_seconds': time_lag.total_seconds(),
'is_healthy': time_lag < self.alert_threshold
}
if not lag_status['is_healthy']:
await self._trigger_backpressure()
return lag_status
async def _trigger_backpressure(self):
"""
Implement backpressure when CDC lag is too high.
"""
# Reduce downstream processing rate
# Pause non-critical jobs
# Alert operations team
pass
When watermarks aren't advancing properly, events pile up in memory and windows never complete. Here's how to debug:
class WatermarkDebugger:
def __init__(self, watermark_manager):
self.watermark_manager = watermark_manager
def diagnose_stalled_watermarks(self) -> Dict[str, Any]:
"""
Identify why watermarks aren't advancing.
"""
diagnosis = {
'current_time': datetime.utcnow(),
'partition_watermarks': dict(self.watermark_manager.partition_watermarks),
'pending_events': len(self.watermark_manager.pending_events),
'issues': []
}
# Check for inactive partitions
current_time = datetime.utcnow()
for partition, watermark in diagnosis['partition_watermarks'].items():
staleness = current_time - watermark
if staleness > timedelta(minutes=10):
diagnosis['issues'].append({
'type': 'stale_partition',
'partition': partition,
'staleness_minutes': staleness.total_seconds() / 60,
'suggested_action': 'check_partition_health'
})
# Check for memory pressure from pending events
if diagnosis['pending_events'] > 100000:
diagnosis['issues'].append({
'type': 'memory_pressure',
'pending_count': diagnosis['pending_events'],
'suggested_action': 'increase_max_out_of_order_time'
})
# Check for watermark spread (inconsistent progress)
if diagnosis['partition_watermarks']:
watermarks = list(diagnosis['partition_watermarks'].values())
spread = max(watermarks) - min(watermarks)
if spread > timedelta(hours=1):
diagnosis['issues'].append({
'type': 'watermark_spread',
'spread_minutes': spread.total_seconds() / 60,
'suggested_action': 'investigate_slow_partitions'
})
return diagnosis
Incremental loading isn't just about choosing between timestamps, CDC, or watermarks – it's about understanding the fundamental trade-offs between consistency, latency, and operational complexity. Each pattern solves different problems and fails in different ways.
Timestamp-based loading is simple and works well for systems where you control the data model and can accept some lag. Use it when you need periodic reconciliation or batch processing with reasonable freshness requirements. The key insight is building in lag windows and overlap detection to handle the inevitable clock skew and network delays.
Change Data Capture gives you event-level precision and near real-time processing, but at the cost of operational complexity. CDC excels when you need to maintain strict consistency across systems or when downstream systems need to react immediately to changes. The critical success factor is handling transaction boundaries and implementing robust duplicate detection.
Watermarks provide a sophisticated framework for handling late-arriving data in distributed systems, but they require careful tuning of lateness bounds and completeness requirements. Use watermarks when you're doing complex time-windowed analytics or need to balance low latency with guaranteed completeness.
The most robust systems combine all three approaches: CDC for real-time processing, timestamp-based reconciliation to catch gaps, and watermarks for time-windowed analytics. But remember that complexity is a feature – add it only when the business requirements justify the operational overhead.
Next steps to deepen your expertise:
The principles you've learned here apply beyond data engineering – they're fundamental patterns for building reliable distributed systems that process changing data at scale.
Learning Path: Data Pipeline Fundamentals