
You're standing in the conference room, whiteboard covered in boxes and arrows, as your CEO asks the question that makes every data engineer's pulse quicken: "How do we build a data platform that can handle everything we throw at it for the next five years?" The marketing team wants real-time customer journey analytics. Finance needs daily P&L reports with zero downtime. Product wants to A/B test features with millisecond precision. Engineering wants to ship ML models that actually work in production.
The modern data stack isn't just about choosing the right tools—it's about architecting a system that can evolve from handling gigabytes to petabytes while maintaining reliability, governance, and cost efficiency. This lesson will guide you through building a production-ready data platform from the ground up, making architectural decisions that matter, and implementing patterns that scale.
By the end of this deep dive, you'll have hands-on experience with every layer of a modern data stack and understand the subtle engineering decisions that separate functional systems from exceptional ones.
What you'll learn:
This is an expert-level lesson requiring solid experience with:
Before writing a single line of code, we need to make foundational architectural decisions. The modern data stack typically follows a medallion architecture with these core layers:
Ingestion Layer: Handles data collection from various sources with proper schema evolution and error recovery. We'll use Apache Kafka for streaming data and Airbyte for batch extraction, chosen for their robust connector ecosystems and operational maturity.
Storage Layer: Raw data lands in object storage (S3), with processed data in a cloud data warehouse. We're selecting Snowflake for its separation of compute and storage, automatic scaling, and mature ecosystem integration.
Transformation Layer: dbt handles all data transformations, chosen for its software engineering best practices, testing framework, and deployment capabilities.
Serving Layer: Multiple consumption patterns require different tools. We'll implement Metabase for self-service analytics, Apache Superset for advanced visualization, and direct SQL access for data scientists.
Orchestration Layer: Airflow manages workflow dependencies and monitoring, selected for its flexibility and extensive operator library.
Here's our target architecture with realistic data flows:
# architecture-overview.yaml
ingestion:
streaming:
- kafka_cluster: 3_brokers_multi_az
- topics:
user_events: 10_partitions
transaction_events: 20_partitions
system_metrics: 5_partitions
batch:
- airbyte_connectors:
salesforce: daily_full_refresh
postgres_replica: hourly_incremental
stripe_api: hourly_incremental
storage:
raw_layer: s3://company-data-lake/raw/
processed_layer: snowflake.analytics_db
transformation:
tool: dbt_cloud
models: 200+_tables
tests: 500+_data_quality_checks
serving:
dashboards: metabase_cloud
adhoc_queries: snowflake_worksheets
ml_features: feature_store_api
The critical architectural decision here is embracing the ELT pattern over ETL. Raw data lands in the lake with minimal transformation, then gets processed in the warehouse. This provides maximum flexibility for future use cases and takes advantage of modern compute elasticity.
Modern data platforms require infrastructure as code for repeatability and disaster recovery. We'll build our foundation using Terraform with proper state management and secrets handling.
First, let's establish our Terraform structure with environment separation:
# terraform/environments/prod/main.tf
terraform {
required_version = ">= 1.0"
backend "s3" {
bucket = "company-terraform-state"
key = "data-platform/prod/terraform.tfstate"
region = "us-west-2"
encrypt = true
dynamodb_table = "terraform-state-lock"
}
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
snowflake = {
source = "Snowflake-Labs/snowflake"
version = "~> 0.87"
}
}
}
module "data_platform" {
source = "../../modules/data-platform"
environment = "prod"
# Kafka configuration
kafka_instance_type = "kafka.m5.large"
kafka_broker_count = 3
kafka_storage_size = 1000
# Snowflake configuration
snowflake_warehouse_size = "X-LARGE"
snowflake_auto_suspend = 60
snowflake_auto_resume = true
# S3 configuration
data_lake_retention_days = 2555 # 7 years
tags = {
Environment = "prod"
Platform = "data"
Owner = "data-engineering"
}
}
The core infrastructure module handles the complex networking and security requirements:
# terraform/modules/data-platform/kafka.tf
resource "aws_msk_cluster" "data_platform" {
cluster_name = "${var.environment}-data-kafka"
kafka_version = "2.8.1"
number_of_broker_nodes = var.kafka_broker_count
broker_node_group_info {
instance_type = var.kafka_instance_type
ebs_volume_size = var.kafka_storage_size
client_subnets = aws_subnet.private[*].id
security_groups = [aws_security_group.kafka.id]
}
configuration_info {
arn = aws_msk_configuration.kafka_config.arn
revision = aws_msk_configuration.kafka_config.latest_revision
}
encryption_info {
encryption_in_transit {
client_broker = "TLS"
in_cluster = true
}
encryption_at_rest_kms_key_id = aws_kms_key.kafka.arn
}
logging_info {
broker_logs {
cloudwatch_logs {
enabled = true
log_group = aws_cloudwatch_log_group.kafka.name
}
}
}
tags = var.tags
}
resource "aws_msk_configuration" "kafka_config" {
kafka_versions = ["2.8.1"]
name = "${var.environment}-kafka-config"
server_properties = <<PROPERTIES
auto.create.topics.enable=false
delete.topic.enable=true
default.replication.factor=3
min.insync.replicas=2
num.partitions=10
log.retention.hours=168
log.segment.bytes=104857600
log.retention.check.interval.ms=300000
compression.type=snappy
PROPERTIES
}
Critical infrastructure considerations include:
Security: All data in transit uses TLS encryption. Data at rest uses customer-managed KMS keys with proper key rotation. Network access follows least-privilege with security groups restricting traffic to necessary ports.
High Availability: Kafka brokers span multiple availability zones. Snowflake provides built-in redundancy. S3 offers 99.999999999% durability.
Monitoring: CloudWatch integration captures infrastructure metrics. Custom alarms trigger on broker failures, unusual traffic patterns, and storage thresholds.
Cost Optimization: Auto-suspend Snowflake warehouses after 60 seconds of inactivity. S3 lifecycle policies move older data to cheaper storage classes.
Streaming data ingestion requires careful consideration of throughput, latency, and fault tolerance. Our Kafka implementation handles multiple data sources with different characteristics.
Let's implement a robust producer for user event data:
# kafka_producers/user_events.py
import json
import logging
from typing import Dict, Any, Optional
from kafka import KafkaProducer
from kafka.errors import KafkaError, KafkaTimeoutError
from datetime import datetime
import hashlib
class UserEventProducer:
def __init__(self, bootstrap_servers: str,
security_protocol: str = "SSL",
compression_type: str = "snappy"):
self.logger = logging.getLogger(__name__)
producer_config = {
'bootstrap_servers': bootstrap_servers,
'security_protocol': security_protocol,
'compression_type': compression_type,
'acks': 'all', # Wait for all replicas
'retries': 10,
'retry_backoff_ms': 300,
'batch_size': 16384,
'linger_ms': 5, # Small batching delay
'buffer_memory': 33554432,
'max_in_flight_requests_per_connection': 5,
'value_serializer': lambda x: json.dumps(x).encode('utf-8'),
'key_serializer': lambda x: x.encode('utf-8') if x else None
}
self.producer = KafkaProducer(**producer_config)
self.topic = "user_events"
def produce_event(self, user_id: str, event_type: str,
event_data: Dict[str, Any]) -> bool:
"""
Produces a user event with proper partitioning and error handling
"""
try:
# Create standardized event payload
event_payload = {
'user_id': user_id,
'event_type': event_type,
'event_data': event_data,
'timestamp': datetime.utcnow().isoformat(),
'schema_version': '1.0'
}
# Partition by user_id hash for even distribution
# while maintaining user session ordering
partition_key = self._get_partition_key(user_id)
future = self.producer.send(
topic=self.topic,
key=partition_key,
value=event_payload,
partition=None # Let Kafka choose based on key
)
# Non-blocking send with callback
future.add_callback(self._on_send_success)
future.add_errback(self._on_send_error)
return True
except Exception as e:
self.logger.error(f"Failed to produce event: {e}")
return False
def _get_partition_key(self, user_id: str) -> str:
"""Generate consistent partition key from user_id"""
return hashlib.md5(user_id.encode()).hexdigest()[:8]
def _on_send_success(self, record_metadata):
self.logger.debug(
f"Message sent to {record_metadata.topic} "
f"partition {record_metadata.partition} "
f"offset {record_metadata.offset}"
)
def _on_send_error(self, exception):
self.logger.error(f"Failed to send message: {exception}")
# Implement dead letter queue logic here
def flush_and_close(self):
"""Ensure all messages are sent before closing"""
self.producer.flush(timeout=30)
self.producer.close(timeout=30)
For high-volume transaction data, we need a more sophisticated consumer with proper offset management and backpressure handling:
# kafka_consumers/transaction_processor.py
from kafka import KafkaConsumer, TopicPartition
from kafka.errors import CommitFailedError
import json
import logging
from typing import Dict, List
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
class TransactionProcessor:
def __init__(self, bootstrap_servers: str, consumer_group: str = "transaction_processor"):
self.logger = logging.getLogger(__name__)
self.consumer_group = consumer_group
consumer_config = {
'bootstrap_servers': bootstrap_servers,
'group_id': consumer_group,
'security_protocol': 'SSL',
'auto_offset_reset': 'earliest',
'enable_auto_commit': False, # Manual offset management
'max_poll_records': 500, # Batch processing
'session_timeout_ms': 30000,
'heartbeat_interval_ms': 10000,
'value_deserializer': lambda x: json.loads(x.decode('utf-8')),
'consumer_timeout_ms': 10000
}
self.consumer = KafkaConsumer('transaction_events', **consumer_config)
self.executor = ThreadPoolExecutor(max_workers=4)
self.processing_times = []
def process_batch(self, messages: List) -> bool:
"""Process a batch of transaction messages with parallelization"""
start_time = time.time()
try:
# Group messages by partition for ordered processing
partition_groups = {}
for message in messages:
partition = message.partition
if partition not in partition_groups:
partition_groups[partition] = []
partition_groups[partition].append(message)
# Process partitions in parallel
futures = []
for partition, partition_messages in partition_groups.items():
future = self.executor.submit(
self._process_partition_messages,
partition_messages
)
futures.append(future)
# Wait for all partitions to complete
success_count = 0
for future in as_completed(futures):
if future.result():
success_count += 1
processing_time = time.time() - start_time
self.processing_times.append(processing_time)
# Log performance metrics
if len(self.processing_times) >= 100:
avg_time = sum(self.processing_times) / len(self.processing_times)
self.logger.info(f"Average batch processing time: {avg_time:.2f}s")
self.processing_times = []
return success_count == len(partition_groups)
except Exception as e:
self.logger.error(f"Batch processing failed: {e}")
return False
def _process_partition_messages(self, messages: List) -> bool:
"""Process messages from a single partition in order"""
try:
for message in messages:
transaction_data = message.value
# Validate message schema
if not self._validate_transaction_schema(transaction_data):
self.logger.error(f"Invalid schema: {message.offset}")
continue
# Process individual transaction
success = self._process_transaction(transaction_data)
if not success:
self.logger.error(f"Processing failed: {message.offset}")
return False
return True
except Exception as e:
self.logger.error(f"Partition processing failed: {e}")
return False
def _validate_transaction_schema(self, data: Dict) -> bool:
"""Validate transaction data against expected schema"""
required_fields = ['transaction_id', 'user_id', 'amount', 'currency', 'timestamp']
return all(field in data for field in required_fields)
def _process_transaction(self, transaction: Dict) -> bool:
"""Process individual transaction with fraud detection and enrichment"""
try:
# Fraud detection logic
if self._detect_fraud(transaction):
self.logger.warning(f"Fraudulent transaction: {transaction['transaction_id']}")
# Send to fraud queue for manual review
return True # Don't fail batch for fraud detection
# Enrich transaction data
enriched_transaction = self._enrich_transaction(transaction)
# Write to data lake
success = self._write_to_data_lake(enriched_transaction)
return success
except Exception as e:
self.logger.error(f"Transaction processing error: {e}")
return False
def run(self):
"""Main consumer loop with proper error handling"""
self.logger.info("Starting transaction processor")
try:
while True:
message_batch = self.consumer.poll(timeout_ms=10000)
if not message_batch:
continue
# Flatten messages from all partitions
all_messages = []
for partition_messages in message_batch.values():
all_messages.extend(partition_messages)
if not all_messages:
continue
self.logger.info(f"Processing batch of {len(all_messages)} messages")
success = self.process_batch(all_messages)
if success:
try:
self.consumer.commit()
self.logger.debug("Offset committed successfully")
except CommitFailedError as e:
self.logger.error(f"Offset commit failed: {e}")
# Don't break - consumer will reprocess messages
else:
self.logger.error("Batch processing failed, not committing offsets")
except KeyboardInterrupt:
self.logger.info("Shutdown signal received")
except Exception as e:
self.logger.error(f"Consumer error: {e}")
finally:
self.consumer.close()
self.executor.shutdown(wait=True)
While Kafka handles streaming data, batch sources require different tooling. Airbyte provides a robust framework for extract-and-load operations with proper error handling and incremental sync capabilities.
Let's configure Airbyte connectors for our key data sources:
# airbyte_configs/salesforce_connection.yaml
apiVersion: airbyte.com/v1alpha1
kind: SourceDefinition
metadata:
name: salesforce-prod
spec:
sourceDefinitionId: "b117307c-14b6-41aa-9422-947e34922962"
dockerRepository: "airbyte/source-salesforce"
dockerImageTag: "2.0.12"
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
type: object
properties:
client_id:
type: string
description: "Salesforce client ID"
airbyte_secret: true
client_secret:
type: string
description: "Salesforce client secret"
airbyte_secret: true
refresh_token:
type: string
description: "Salesforce refresh token"
airbyte_secret: true
domain:
type: string
description: "Salesforce domain"
default: "login"
is_sandbox:
type: boolean
description: "Whether to use Salesforce sandbox"
default: false
start_date:
type: string
description: "Start date for incremental sync"
format: date-time
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$"
---
apiVersion: airbyte.com/v1alpha1
kind: Connection
metadata:
name: salesforce-to-s3
spec:
sourceId: "salesforce-prod"
destinationId: "s3-data-lake"
syncCatalog:
streams:
- stream:
name: "Account"
jsonSchema:
type: object
properties:
Id: { type: string }
Name: { type: string }
Industry: { type: string }
AnnualRevenue: { type: number }
CreatedDate: { type: string, format: date-time }
LastModifiedDate: { type: string, format: date-time }
config:
syncMode: "incremental"
cursorField: ["LastModifiedDate"]
destinationSyncMode: "append_dedup"
primaryKey: [["Id"]]
- stream:
name: "Opportunity"
config:
syncMode: "incremental"
cursorField: ["LastModifiedDate"]
destinationSyncMode: "append_dedup"
primaryKey: [["Id"]]
schedule:
timeUnit: "hours"
units: 6
prefix: "salesforce"
For database sources, we need careful handling of incremental extraction:
# custom_connectors/postgres_extractor.py
import psycopg2
import pandas as pd
import boto3
import logging
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
import json
class PostgresExtractor:
def __init__(self, connection_params: Dict[str, Any],
s3_bucket: str, s3_prefix: str):
self.connection_params = connection_params
self.s3_bucket = s3_bucket
self.s3_prefix = s3_prefix
self.s3_client = boto3.client('s3')
self.logger = logging.getLogger(__name__)
def extract_incremental(self, table_name: str,
cursor_field: str,
last_cursor_value: Optional[str] = None,
chunk_size: int = 10000) -> str:
"""
Extract data incrementally with proper cursor management
"""
try:
conn = psycopg2.connect(**self.connection_params)
# Get table metadata
table_info = self._get_table_info(conn, table_name)
# Determine extraction query
if last_cursor_value:
query = f"""
SELECT * FROM {table_name}
WHERE {cursor_field} > %s
ORDER BY {cursor_field}
"""
params = (last_cursor_value,)
else:
# Initial full extraction
query = f"""
SELECT * FROM {table_name}
ORDER BY {cursor_field}
"""
params = ()
self.logger.info(f"Extracting {table_name} with cursor: {last_cursor_value}")
# Process in chunks to handle large tables
chunk_number = 0
total_rows = 0
max_cursor_value = last_cursor_value
with conn.cursor(name=f'{table_name}_cursor') as cursor:
cursor.execute(query, params)
while True:
rows = cursor.fetchmany(chunk_size)
if not rows:
break
# Convert to DataFrame for easier manipulation
columns = [desc[0] for desc in cursor.description]
df = pd.DataFrame(rows, columns=columns)
# Track maximum cursor value for next extraction
if cursor_field in df.columns:
chunk_max_cursor = df[cursor_field].max()
if max_cursor_value is None or chunk_max_cursor > max_cursor_value:
max_cursor_value = str(chunk_max_cursor)
# Write chunk to S3
self._write_chunk_to_s3(table_name, df, chunk_number)
chunk_number += 1
total_rows += len(df)
self.logger.info(f"Processed chunk {chunk_number}: {len(df)} rows")
# Update cursor state
self._update_cursor_state(table_name, max_cursor_value)
self.logger.info(f"Extraction complete: {total_rows} rows, cursor: {max_cursor_value}")
return max_cursor_value
except Exception as e:
self.logger.error(f"Extraction failed for {table_name}: {e}")
raise
finally:
if 'conn' in locals():
conn.close()
def _get_table_info(self, conn, table_name: str) -> Dict:
"""Get table metadata for validation and optimization"""
with conn.cursor() as cursor:
cursor.execute("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = %s
ORDER BY ordinal_position
""", (table_name,))
columns = cursor.fetchall()
return {
'columns': [{'name': col[0], 'type': col[1], 'nullable': col[2]}
for col in columns],
'column_count': len(columns)
}
def _write_chunk_to_s3(self, table_name: str, df: pd.DataFrame, chunk_number: int):
"""Write DataFrame chunk to S3 in Parquet format"""
timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S")
s3_key = f"{self.s3_prefix}/{table_name}/year={timestamp[:4]}/month={timestamp[5:7]}/day={timestamp[8:10]}/{table_name}_{timestamp}_{chunk_number:06d}.parquet"
try:
# Convert DataFrame to Parquet bytes
parquet_buffer = df.to_parquet(index=False, engine='pyarrow')
# Upload to S3
self.s3_client.put_object(
Bucket=self.s3_bucket,
Key=s3_key,
Body=parquet_buffer,
ContentType='application/octet-stream',
Metadata={
'table_name': table_name,
'chunk_number': str(chunk_number),
'row_count': str(len(df)),
'extraction_timestamp': timestamp
}
)
self.logger.debug(f"Chunk written to s3://{self.s3_bucket}/{s3_key}")
except Exception as e:
self.logger.error(f"Failed to write chunk to S3: {e}")
raise
def _update_cursor_state(self, table_name: str, cursor_value: str):
"""Update cursor state in S3 for next incremental extraction"""
state_key = f"{self.s3_prefix}/_airbyte_state/{table_name}_state.json"
state_data = {
'cursor_field': 'updated_at', # Adjust based on your schema
'cursor_value': cursor_value,
'last_updated': datetime.utcnow().isoformat()
}
try:
self.s3_client.put_object(
Bucket=self.s3_bucket,
Key=state_key,
Body=json.dumps(state_data),
ContentType='application/json'
)
except Exception as e:
self.logger.error(f"Failed to update cursor state: {e}")
raise
dbt transforms raw data into analytics-ready models using SQL and software engineering best practices. Our implementation includes comprehensive testing, documentation, and deployment strategies.
First, let's establish our dbt project structure with proper environment configuration:
# dbt_project.yml
name: 'company_analytics'
version: '1.0.0'
config-version: 2
profile: 'company_analytics'
model-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
company_analytics:
staging:
+materialized: view
+docs:
node_color: "#F7A343"
intermediate:
+materialized: ephemeral
+docs:
node_color: "#B565A7"
marts:
+materialized: table
+docs:
node_color: "#009639"
finance:
+materialized: table
+post-hook: "grant select on {{ this }} to role analyst"
marketing:
+materialized: incremental
+unique_key: user_id
+on_schema_change: "append_new_columns"
vars:
# dbt_utils configurations
surrogate_key_treat_nulls_as_empty_strings: true
# Date configurations
start_date: '2020-01-01'
# Feature flags
enable_customer_360: true
enable_real_time_features: false
snapshots:
company_analytics:
+target_schema: snapshots
+strategy: timestamp
+updated_at: updated_at
Our staging models clean and standardize raw data:
-- models/staging/salesforce/stg_salesforce__accounts.sql
{{ config(
materialized='view',
docs={'node_color': '#F7A343'}
) }}
with source as (
select * from {{ source('salesforce', 'account') }}
),
cleaned as (
select
id as account_id,
name as account_name,
type as account_type,
industry,
-- Clean and standardize revenue data
case
when annual_revenue is not null and annual_revenue > 0
then annual_revenue
else null
end as annual_revenue,
-- Standardize country codes
{{ standardize_country_code('billing_country') }} as billing_country_code,
billing_state as billing_state,
billing_city as billing_city,
billing_postal_code,
-- Parse and validate dates
{{ safe_cast_timestamp('created_date') }} as created_at,
{{ safe_cast_timestamp('last_modified_date') }} as updated_at,
-- Data quality flags
case when name is null or name = '' then true else false end as is_missing_name,
case when industry is null or industry = '' then true else false end as is_missing_industry,
-- Metadata
_airbyte_extracted_at,
_airbyte_raw_id
from source
),
final as (
select
*,
-- Generate surrogate key for downstream joins
{{ dbt_utils.surrogate_key(['account_id']) }} as account_sk
from cleaned
)
select * from final
Intermediate models handle complex business logic:
-- models/intermediate/int_customer_lifetime_value.sql
{{ config(
materialized='ephemeral'
) }}
with customer_orders as (
select
customer_id,
order_date,
order_total,
-- Calculate days between orders for frequency analysis
lag(order_date) over (partition by customer_id order by order_date) as previous_order_date,
row_number() over (partition by customer_id order by order_date) as order_sequence
from {{ ref('stg_orders__orders') }}
where order_status = 'completed'
),
customer_metrics as (
select
customer_id,
-- Monetary metrics
sum(order_total) as total_revenue,
avg(order_total) as avg_order_value,
count(*) as total_orders,
-- Frequency metrics
min(order_date) as first_order_date,
max(order_date) as last_order_date,
-- Calculate average days between orders (excluding first order)
avg(
case
when previous_order_date is not null
then order_date - previous_order_date
end
) as avg_days_between_orders,
-- Recency in days from current date
{{ dbt.current_timestamp() }}::date - max(order_date) as days_since_last_order
from customer_orders
group by customer_id
),
clv_calculation as (
select
*,
-- Simple CLV calculation: (AOV * Purchase Frequency * Gross Margin) / Churn Rate
-- Assuming 20% gross margin and estimating churn rate from recency
case
when avg_days_between_orders > 0 then
(avg_order_value * (365.0 / avg_days_between_orders) * 0.20) /
greatest(0.01, least(1.0, days_since_last_order / 365.0))
else
avg_order_value * 0.20 -- Single purchase customers
end as estimated_clv,
-- Customer segmentation based on RFM
case
when days_since_last_order <= 30 and total_orders >= 5 and total_revenue >= 1000 then 'VIP'
when days_since_last_order <= 90 and total_orders >= 3 then 'Active'
when days_since_last_order <= 180 and total_orders >= 2 then 'At Risk'
when days_since_last_order > 180 then 'Churned'
else 'New'
end as customer_segment
from customer_metrics
)
select * from clv_calculation
Our mart models serve specific business use cases with comprehensive testing:
-- models/marts/finance/fct_monthly_revenue.sql
{{ config(
materialized='incremental',
unique_key='revenue_month',
on_schema_change='append_new_columns',
post_hook="grant select on {{ this }} to role finance_analyst"
) }}
with revenue_base as (
select
date_trunc('month', order_date) as revenue_month,
-- Revenue metrics
sum(case when order_status = 'completed' then order_total else 0 end) as gross_revenue,
sum(case when order_status = 'refunded' then order_total else 0 end) as refunded_revenue,
sum(case when order_status = 'completed' then order_total
when order_status = 'refunded' then -order_total
else 0 end) as net_revenue,
-- Order metrics
count(case when order_status = 'completed' then 1 end) as completed_orders,
count(case when order_status = 'refunded' then 1 end) as refunded_orders,
count(distinct customer_id) as unique_customers,
-- Customer acquisition
count(distinct case when customer_order_sequence = 1 then customer_id end) as new_customers,
-- Product metrics
sum(case when order_status = 'completed' then total_quantity else 0 end) as units_sold
from {{ ref('fct_orders') }}
{% if is_incremental() %}
-- Only process new/updated data
where date_trunc('month', order_date) >= (
select max(revenue_month)
from {{ this }}
)
{% endif %}
group by revenue_month
),
revenue_with_growth as (
select
*,
-- Month-over-month growth calculations
lag(net_revenue) over (order by revenue_month) as previous_month_revenue,
case
when lag(net_revenue) over (order by revenue_month) > 0 then
round(
100.0 * (net_revenue - lag(net_revenue) over (order by revenue_month)) /
lag(net_revenue) over (order by revenue_month),
2
)
else null
end as revenue_growth_pct,
-- Calculate average order value
case
when completed_orders > 0 then gross_revenue / completed_orders
else 0
end as avg_order_value,
-- Customer metrics
case
when unique_customers > 0 then net_revenue / unique_customers
else 0
end as revenue_per_customer
from revenue_base
)
select * from revenue_with_growth
Comprehensive testing ensures data quality:
-- tests/assert_revenue_consistency.sql
-- Test that monthly revenue aggregation matches daily revenue sum
with monthly_totals as (
select
date_trunc('month', revenue_date) as revenue_month,
sum(net_revenue) as daily_sum_revenue
from {{ ref('fct_daily_revenue') }}
group by revenue_month
),
monthly_table as (
select
revenue_month,
net_revenue as monthly_net_revenue
from {{ ref('fct_monthly_revenue') }}
)
select
m.revenue_month,
m.daily_sum_revenue,
t.monthly_net_revenue,
abs(m.daily_sum_revenue - t.monthly_net_revenue) as revenue_difference
from monthly_totals m
join monthly_table t using (revenue_month)
where abs(m.daily_sum_revenue - t.monthly_net_revenue) > 0.01
Advanced macros handle complex transformation logic:
-- macros/standardize_country_code.sql
{% macro standardize_country_code(column_name) %}
case
when upper({{ column_name }}) in ('US', 'USA', 'UNITED STATES', 'UNITED STATES OF AMERICA') then 'US'
when upper({{ column_name }}) in ('UK', 'UNITED KINGDOM', 'GB', 'GREAT BRITAIN') then 'GB'
when upper({{ column_name }}) in ('CA', 'CANADA') then 'CA'
when upper({{ column_name }}) in ('AU', 'AUSTRALIA') then 'AU'
when upper({{ column_name }}) in ('DE', 'GERMANY', 'DEUTSCHLAND') then 'DE'
when upper({{ column_name }}) in ('FR', 'FRANCE') then 'FR'
when {{ column_name }} is not null and length({{ column_name }}) = 2 then upper({{ column_name }})
else null
end
{% endmacro %}
Snowflake serves as our analytical engine, handling both batch and streaming workloads. Proper warehouse sizing, clustering, and query optimization are critical for performance and cost management.
Let's configure our Snowflake environment with multiple warehouses for different workloads:
-- snowflake_setup/warehouses.sql
-- ETL warehouse for dbt transformations
CREATE OR REPLACE WAREHOUSE DBT_TRANSFORM_WH WITH
WAREHOUSE_SIZE = 'X-LARGE'
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 4
SCALING_POLICY = 'STANDARD'
COMMENT = 'Warehouse for dbt transformations and heavy ETL jobs';
-- Analytics warehouse for dashboard queries
CREATE OR REPLACE WAREHOUSE ANALYTICS_WH WITH
WAREHOUSE_SIZE = 'LARGE'
AUTO_SUSPEND = 300 -- 5 minutes for dashboard caching
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 8
SCALING_POLICY = 'ECONOMY' -- Cost-optimized scaling
COMMENT = 'Warehouse for dashboard and analytical queries';
-- Data science warehouse for ML workloads
CREATE OR REPLACE WAREHOUSE DATA_SCIENCE_WH WITH
WAREHOUSE_SIZE = 'XX-LARGE'
AUTO_SUSPEND = 180
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 2
SCALING_POLICY = 'STANDARD'
COMMENT = 'Warehouse for ML training and feature engineering';
Database structure with proper clustering and data retention:
-- snowflake_setup/databases.sql
CREATE OR REPLACE DATABASE RAW_DATA
DATA_RETENTION_TIME_IN_DAYS = 90
COMMENT = 'Raw data from ingestion pipelines';
CREATE OR REPLACE DATABASE ANALYTICS
DATA_RETENTION_TIME_IN_DAYS = 365
COMMENT = 'Transformed data for analytics and reporting';
-- Create schemas with appropriate access controls
USE DATABASE RAW_DATA;
CREATE OR REPLACE SCHEMA KAFKA_STREAMS
COMMENT = 'Real-time data from Kafka topics';
CREATE OR REPLACE SCHEMA BATCH_EXTRACTS
COMMENT = 'Batch data from Airbyte and custom extractors';
USE DATABASE ANALYTICS;
CREATE OR REPLACE SCHEMA STAGING
COMMENT = 'Cleaned and standardized source data';
CREATE OR REPLACE SCHEMA MARTS
COMMENT = 'Business-ready analytical models';
CREATE OR REPLACE SCHEMA SNAPSHOTS
COMMENT = 'Historical snapshots for slowly changing dimensions';
For high-performance analytical queries, we need properly clustered tables:
-- snowflake_optimization/clustered_tables.sql
-- Customer events table with multi-column clustering
CREATE OR REPLACE TABLE analytics.marts.fct_customer_events (
event_id STRING,
user_id STRING,
event_type STRING,
event_timestamp TIMESTAMP_NTZ,
session_id STRING,
page_url STRING,
referrer STRING,
device_type STRING,
country_code STRING,
event_properties VARIANT,
created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
CLUSTER BY (DATE(event_timestamp), user_id, event_type)
COMMENT = 'Customer behavioral events with optimized clustering for time-series and user analysis';
-- Large transaction table with automatic clustering
CREATE OR REPLACE TABLE analytics.marts.fct_transactions (
transaction_id STRING PRIMARY KEY,
user_id STRING,
transaction_date DATE,
transaction_timestamp TIMESTAMP_NTZ,
amount DECIMAL(10,2),
currency STRING,
payment_method STRING,
merchant_category STRING,
status STRING,
created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
CLUSTER BY (transaction_date, user_id)
COMMENT = 'Financial transactions with date and user clustering';
-- Enable automatic clustering for maintenance-free optimization
ALTER TABLE analytics.marts.fct_transactions RESUME RECLUSTER;
ALTER TABLE analytics.marts.fct_customer_events RESUME RECLUSTER;
Materialized views provide real-time aggregations:
-- snowflake_optimization/materialized_views.sql
-- Real-time revenue dashboard view
CREATE OR REPLACE MATERIALIZED VIEW analytics.marts.mv_real_time_revenue AS
SELECT
DATE(transaction_timestamp) as transaction_date,
HOUR(transaction_timestamp) as transaction_hour,
COUNT(*) as transaction_count,
SUM(CASE WHEN status = 'completed' THEN amount ELSE 0 END) as completed_revenue,
SUM(CASE WHEN status = 'failed' THEN amount ELSE 0 END) as failed_revenue,
COUNT(DISTINCT user_id) as unique_customers,
AVG(CASE WHEN status = 'completed' THEN amount END) as avg_transaction_value
FROM analytics.marts.fct_transactions
WHERE transaction_timestamp >= CURRENT_DATE() - INTERVAL '7 days'
GROUP BY transaction_date, transaction_hour;
-- Customer activity summary for real-time personalization
CREATE OR REPLACE MATERIALIZED VIEW analytics.marts.mv_customer_activity AS
SELECT
user_id,
DATE(event_timestamp) as activity_date,
COUNT(*) as total_events,
COUNT(DISTINCT session_id) as session_count,
MAX(event_timestamp) as last_activity_timestamp,
ARRAY_AGG(DISTINCT event_type) as event_types,
COUNT(CASE WHEN event_type = 'purchase' THEN 1 END) as purchase_events,
COUNT(CASE WHEN event_type = 'page_view' THEN 1 END) as page_views
FROM analytics.marts.fct_customer_events
WHERE event_timestamp >= CURRENT_DATE() - INTERVAL '30 days'
GROUP BY user_id, activity_date;
Advanced query optimization with search optimization service:
-- snowflake_optimization/search_optimization.sql
-- Enable search optimization for frequently filtered columns
ALTER TABLE analytics.marts.fct_customer_events
ADD SEARCH OPTIMIZATION ON EQUALITY(user_id, event_type, country_code);
ALTER TABLE analytics.marts.fct_transactions
ADD SEARCH OPTIMIZATION ON EQUALITY(user_id, merchant_category, payment_method);
-- Create secure views for sensitive data access
CREATE OR REPLACE SECURE VIEW analytics.marts.vw_customer_pii AS
SELECT
user_id,
CASE
WHEN CURRENT_ROLE() IN ('ADMIN', 'DATA_SCIENTIST') THEN email
ELSE REGEXP_REPLACE(email, '(.{2}).*@', '\\1***@')
END as email,
CASE
WHEN CURRENT_ROLE() IN ('ADMIN', 'DATA_SCIENTIST') THEN full_name
ELSE REGEXP_REPLACE(full_name, '(.{2}).*', '\\1***')
END as full_name,
created_at,
last_login_at
FROM analytics.staging.stg_users__users;
Comprehensive data governance ensures reliability, compliance, and trustworthiness across the platform. We'll implement automated quality monitoring, lineage tracking, and access controls.
First, let's establish data quality monitoring with Great Expectations:
# data_quality/expectations_suite.py
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.data_context import DataContext
import logging
class DataQualityMonitor:
def __init__(self, data_context_path: str):
self.context = DataContext(data_context_path)
self.logger = logging.getLogger(__name__)
def create_customer_events_suite(self) -> str:
"""Create comprehensive expectations for customer events data"""
suite_name = "customer_events_quality_suite"
# Create or update expectation suite
try:
suite = self.context.get_expectation_suite(suite_name)
except:
suite = self.context.create_expectation_suite(suite_name)
# Core data expectations
expectations = [
# Completeness checks
ExpectationConfiguration(
expectation_type="expect_column_to_exist",
kwargs={"column": "event_id"}
),
ExpectationConfiguration(
expectation_type="expect_column_to_exist",
kwargs={"column": "user_id"}
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "event_id"}
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "user_id"}
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "event_timestamp"}
),
# Format and type checks
ExpectationConfiguration(
expectation_type="expect_column_values_to_match_regex",
kwargs={
"column": "event_id",
"regex": r"^[a-f0-9-]{36}$" # UUID format
}
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": "event_type",
"value_set": [
"page_view", "button_click", "form_submit",
"purchase", "cart_add", "cart_remove", "login", "logout"
]
}
),
# Business logic checks
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={
"column": "event_timestamp",
"min_value": "2020-01-01T00:00:00Z",
"max_value": None, # No future limit
"parse_strings_as_datetimes": True
}
),
# Statistical checks
ExpectationConfiguration(
expectation_type="expect_column_unique_value_count_to_be_between",
kwargs={
"column": "user_id",
"min_value": 1000, # Expect at least 1000 unique users daily
"max_value": None
}
),
# Custom business rule: Session integrity
ExpectationConfiguration(
expectation_type="expect_multicolumn_sum_to_equal",
kwargs={
"column_list": ["session_start_events", "session_end_events"],
"sum_total": 0,
"ignore_row_if": "any_value_is_missing"
}
)
]
# Add all expectations to suite
for expectation in expectations:
suite.add_expectation(expectation, send_notification_on_failure=True)
# Save suite
self.context.save_expectation_suite(suite)
return suite_name
def create_revenue_data_suite(self) -> str:
"""Create financial data quality expectations with strict validation"""
suite_name = "revenue_data_quality_suite"
try:
suite = self.context.get_expectation_suite(suite_name)
except:
suite = self.context.create_expectation_suite(suite_name)
# Financial data requires stricter validation
financial_expectations = [
# Revenue amount validation
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={
"column": "transaction_amount",
"min_value": 0.01, # No zero or negative transactions
"max_value": 1000000.00, # Reasonable upper limit
"mostly": 0.999 # Allow for 0.1% outliers
}
),
# Currency code validation
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": "currency_code",
"value_set": ["USD", "EUR", "GBP", "CAD", "AUD"]
}
),
# Transaction ID uniqueness
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_unique",
kwargs={"column": "transaction_id"}
),
# Status validation
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": "transaction_status",
"value_set": ["pending", "completed", "failed", "refunded"]
}
),
# Date consistency check
ExpectationConfiguration(
expectation_type="expect_column_pair_values_A_to_be_greater_than_B",
kwargs={
"column_A": "updated_at",
"column_B": "created_at",
"or_equal": True,
"ignore_row_if": "either_value_is_missing"
}
)
]
for expectation in financial_expectations:
suite.add_expectation(expectation, send_notification_on_failure=True)
self.context.save_expectation_suite(suite)
return suite_name
def run_validation_checkpoint(self, suite_name: str,
datasource_name: str,
data_asset_name: str) -> bool:
"""Execute validation checkpoint and return success status"""
checkpoint_config = {
"name": f"{suite_name}_checkpoint",
"config_version": 1.0,
"template_name": None,
"module_name": "great_expectations.checkpoint",
"class_name": "Checkpoint",
"run_name_template": "%Y%m%d-%H%M%S-" + suite_name,
"expectation_suite_name": suite_name,
"batch_request": {
"datasource_name": datasource_name,
"data_connector_name": "default_inferred_data_connector_name",
"data_asset_name": data_asset_name
},
"action_list": [
{
"name": "store_validation_result",
"action": {
"class_name": "StoreValidationResultAction"
}
},
{
"name": "store_evaluation_params",
"action": {
"class_name": "StoreEvaluationParametersAction"
}
},
{
"name": "update_data_docs",
"action": {
"class_name": "UpdateDataDocsAction",
"site_names": []
}
}
]
}
try:
checkpoint = Checkpoint(**checkpoint_config)
results = checkpoint.run()
success = results["success"]
if not success:
self.logger.error(f"Data validation failed for {suite_name}")
# Send alert to monitoring system
self._send_validation_alert(suite_name, results)
else:
self.logger.info(f"Data validation passed for {suite_name}")
return success
except Exception as e:
self.logger.error(f"Checkpoint execution failed: {e}")
return False
def _send_validation_alert(self, suite_name: str, results: dict):
"""Send data quality alert to monitoring systems"""
# Implementation would integrate with Slack, PagerDuty, etc.
pass
Data lineage tracking with custom metadata collection:
# data_governance/lineage_tracker.py
import json
import requests
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
import networkx as nx
@dataclass
class DataLineageNode:
node_id: str
node_type: str # 'source', 'transformation', 'destination'
name: str
schema_name: Optional[str] = None
table_name: Optional[str] = None
transformation_logic: Optional[str] = None
created_at: datetime = None
@dataclass
class DataLineageEdge:
source_node_id: str
target_node_id: str
edge_type: str # 'reads_from', 'writes_to', 'transforms'
created_at: datetime = None
class DataLineageTracker:
def __init__(self, metadata_store_url: str):
self.metadata_store_url = metadata_store_url
self.graph = nx.DiGraph()
def track_dbt_lineage(self, manifest_path: str, run_results_path: str):
"""Extract lineage information from dbt artifacts"""
with open(manifest_path, 'r') as f:
manifest = json.load(f)
with open(run_results_path, 'r') as f:
run_results = json.load(f)
# Process dbt models
for model_id, model_data in manifest.get('nodes', {}).items():
if model_data['resource_type'] != 'model':
continue
# Create node for the model
model_node = DataLineageNode(
node_id=model_id,
node_type='transformation',
name=model_data['name'],
schema_name=model_data['schema'],
table_name=model_data['name'],
transformation_logic=model_data.get('raw_code', ''),
created_at=datetime.utcnow()
)
self._add_lineage_node(model_node)
# Track dependencies
for dep in model_data.get('depends_on', {}).get('nodes', []):
edge = DataLineageEdge(
source_node_id=dep,
target_node_id=model_id,
edge_type='transforms',
created_at=datetime.utcnow()
)
self._add_lineage_edge(edge)
# Process sources
for source_id, source_data in manifest.get('sources', {}).items():
source_node = DataLineageNode(
node_id=source_id,
node_type='source',
name=source_data['name'],
schema_name=source_data['schema'],
table_name=source_data['name'],
created_at=datetime.utcnow()
)
self._add_lineage_node(source_node)
def track_airflow_lineage(self, dag_id: str, task_metadata: Dict):
"""Track lineage from Airflow DAG execution"""
for task_id, task_info in task_metadata.items():
# Create transformation node for each task
task_node = DataLineageNode(
node_id=f"{dag_id}.{task_id}",
node_type='transformation',
name=task_id,
transformation_logic=task_info.get('operator_class', ''),
created_at=datetime.utcnow()
)
self._add_lineage_node(task_node)
# Track input and output datasets
for input_dataset in task_info.get('inputs', []):
input_node = DataLineageNode(
node_id=input_dataset,
node_type='source' if 'source' in input_dataset else 'transformation',
name=input_dataset.split('.')[-1],
created_at=datetime.utcnow()
)
self._add_lineage_node(input_node)
edge = DataLineageEdge(
source_node_id=input_dataset,
target_node_id=f"{dag_id}.{task_id}",
edge_type='reads_from',
created_at=datetime.utcnow()
)
self._add_lineage_edge(edge)
def get_upstream_dependencies(self, node_id: str, max_depth: int = 5) -> List[str]:
"""Get all upstream dependencies for a given node"""
if node_id not in self.graph:
return []
upstream_nodes = []
visited = set()
def traverse_upstream(current_node, depth):
if depth >= max_depth or current_node in visited:
return
visited.add(current_node)
for predecessor in self.graph.predecessors(current_node):
upstream_nodes.append(predecessor)
traverse_upstream(predecessor, depth + 1)
traverse_upstream(node_id, 0)
return list(set(upstream_nodes))
def get_downstream_impact(self, node_id: str, max_depth: int = 5) -> List[str]:
"""Get all downstream nodes that would be impacted by changes"""
if node_id not in self.graph:
return []
downstream_nodes = []
visited = set()
def traverse_downstream(current_node, depth):
if depth >= max_depth or current_node in visited:
return
visited.add(current_node)
for successor in self.graph.successors(current_node):
downstream_nodes.append(successor)
traverse_downstream(successor, depth + 1)
traverse_downstream(node_id, 0)
return list(set(downstream_nodes))
def _add_lineage_node(self, node: DataLineageNode):
"""Add or update lineage node in graph and metadata store"""
self.graph.add_node(node.node_id, **{
'name': node.name,
'node_type': node.node_type,
'schema_name': node.schema_name,
'table_name': node.table_name,
'created_at': node.created_at.isoformat() if node.created_at else None
})
# Store in metadata system
self._persist_to_metadata_store('nodes', node.__dict__)
def _add_lineage_edge(self, edge: DataLineageEdge):
"""Add lineage edge to graph and metadata store"""
self.graph.add_edge(
edge.source_node_id,
edge.target_node_id,
edge_type=edge.edge_type,
created_at=edge.created_at.isoformat() if edge.created_at else None
)
# Store in metadata system
self._persist_to_metadata_store('edges', edge.__dict__)
def _persist_to_metadata_store(self, entity_type: str, data: Dict):
"""Persist lineage data to external metadata store"""
try:
response = requests.post(
f"{self.metadata_store_url}/api/v1/{entity_type}",
json=data,
headers={'Content-Type': 'application/json'},
timeout=30
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Failed to persist {entity_type} to metadata store: {e}")
Apache Airflow orchestrates our data pipelines with proper error handling, monitoring, and dependency management. Our implementation includes custom operators and comprehensive observability.
First, let's establish our Airflow configuration with production-ready settings:
# airflow/dags/config/dag_config.py
from datetime import datetime, timedelta
from airflow.models import Variable
import os
class DAGConfig:
"""Centralized configuration for all data platform DAGs"""
# Default DAG arguments
DEFAULT_ARGS = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email': ['data-alerts@company.com'],
'sla': timedelta(hours=2)
}
# Environment-specific configurations
ENVIRONMENT = Variable.get("environment", default_var="dev")
SNOWFLAKE_CONN_ID = f"snowflake_{ENVIRONMENT}"
KAFKA_CONN_ID = f"kafka_{ENVIRONMENT}"
S3_CONN_ID = f"s3_{ENVIRONMENT}"
# Data quality settings
DATA_QUALITY_FAILURE_THRESHOLD = 0.95 # 95% tests must pass
# Resource limits
KUBERNETES_RESOURCE_LIMITS = {
'memory': '4Gi',
'cpu': '2000m'
}
KUBERNETES_RESOURCE_REQUESTS = {
'memory': '2Gi',
'cpu': '1000m'
}
# Monitoring and alerting
SLACK_CONN_ID = 'slack_data_alerts'
PAGERDUTY_CONN_ID = 'pagerduty_critical'
@staticmethod
def get_warehouse_config(workload_type: str) -> dict:
"""Get Snowflake warehouse configuration based on workload type"""
warehouse_configs = {
'etl': {
'warehouse': 'DBT_TRANSFORM_WH',
'database': 'ANALYTICS',
Learning Path: Modern Data Stack