
You've built a data pipeline that ingests raw sales transactions from an S3 bucket, transforms them, and loads the results into a Snowflake data warehouse. It works beautifully in development. Then your team asks you to deploy it to staging, and suddenly you're hunting through the codebase swapping out bucket names, connection strings, schema names, and batch size limits. You get staging working, and then production needs a slightly different configuration — different credentials, a larger batch size, an additional alerting webhook. Before long, you have three copies of the same pipeline, diverging slowly and silently, each one a maintenance liability.
This is the parameterization problem, and it's one of the most underestimated challenges in production data engineering. The solution isn't exotic — it's disciplined design. A well-parameterized pipeline separates what the pipeline does from where and how it runs. You define behavior once, then drive it with configuration at runtime. The result is a codebase that's easier to test, safer to deploy, and dramatically easier to maintain as your infrastructure scales.
By the end of this lesson, you'll be able to design and implement config-driven pipelines that work cleanly across multiple environments without code duplication or fragile search-and-replace deployments.
What you'll learn:
You should be comfortable writing Python ETL scripts, familiar with the concept of environment variables, and have at least some experience with a pipeline orchestration tool (Airflow, Prefect, or similar). You don't need to be an expert in any of these — but this lesson won't explain what a DAG is or why you'd use Snowflake.
Before writing a single config file, you need to develop a sharp instinct for what should be a parameter. The rule of thumb is this: if it changes between environments, or if changing it changes where the pipeline runs rather than what the pipeline does, it's a parameter.
Let's get concrete. Imagine a pipeline that pulls customer orders from a Postgres database, enriches them with product metadata from an API, and writes the results to a data warehouse. Here's a first pass at what's environment-specific:
Infrastructure identifiers:
orders_dev, not orders)Operational settings:
DEBUG in dev, WARNING in production)Business logic knobs (the trickier category):
Secrets (a special subcategory):
What shouldn't be a parameter? The core transformation logic. If you're computing gross_revenue = quantity * unit_price - discount, that formula belongs in code. If you're filtering out test accounts by checking a boolean flag in the source data, that's logic. Parameters control where and at what scale your pipeline runs; they don't replace business rules.
A mistake practitioners often make is over-parameterizing. If you find yourself with a config key called filter_test_accounts_column_name, you've probably gone too far — that's embedding business logic in config instead of code, which makes the system harder to reason about, not easier.
Once you've identified your parameters, you need a home for them. The most practical structure for multi-environment pipelines is a layered config system: a base configuration that defines defaults, with environment-specific files that override only what's different.
Here's a realistic directory layout for a pipeline project:
pipeline_project/
├── config/
│ ├── base.yaml
│ ├── dev.yaml
│ ├── staging.yaml
│ └── production.yaml
├── pipeline/
│ ├── __init__.py
│ ├── extract.py
│ ├── transform.py
│ ├── load.py
│ └── config_loader.py
├── tests/
│ └── test_config.py
├── .env.example
└── run_pipeline.py
The base.yaml defines the full shape of your configuration — every key your pipeline might look for, with reasonable defaults:
# config/base.yaml
pipeline:
name: "customer_orders_etl"
version: "2.1.0"
batch_size: 10000
max_retries: 3
retry_delay_seconds: 30
log_level: "INFO"
source:
type: "postgres"
host: "localhost"
port: 5432
database: "orders_db"
schema: "public"
table: "raw_orders"
lookback_days: 7
enrichment:
api_base_url: "https://api.products.internal"
request_timeout_seconds: 10
rate_limit_calls_per_minute: 60
destination:
type: "snowflake"
account: "my_org.us-east-1"
database: "ANALYTICS"
schema: "ORDERS"
table: "CUSTOMER_ORDERS"
warehouse: "TRANSFORM_XS"
write_mode: "append"
notifications:
slack_webhook_enabled: false
alert_on_row_count_below: 100
Now, dev.yaml only contains what's different from base:
# config/dev.yaml
pipeline:
batch_size: 1000
log_level: "DEBUG"
source:
host: "dev-postgres.internal"
schema: "orders_dev"
destination:
schema: "ORDERS_DEV"
warehouse: "TRANSFORM_XS"
write_mode: "overwrite"
notifications:
slack_webhook_enabled: false
And production.yaml overrides for the live environment:
# config/production.yaml
pipeline:
batch_size: 500000
max_retries: 5
retry_delay_seconds: 60
source:
host: "prod-postgres.internal"
schema: "orders_prod"
destination:
schema: "ORDERS_PROD"
warehouse: "TRANSFORM_L"
write_mode: "append"
notifications:
slack_webhook_enabled: true
alert_on_row_count_below: 10000
Notice that production.yaml doesn't touch api_base_url — the enrichment API is the same across all environments in this case. Only override what's actually different. This is important: if you copy the entire base config into each environment file, you lose the benefit of layering. Now when you need to change a shared default, you have to find and update it in multiple places.
The loader is the piece that reads your files and merges them. Python's PyYAML and a recursive dictionary merge function are all you need:
# pipeline/config_loader.py
import os
import yaml
from pathlib import Path
from typing import Any
def deep_merge(base: dict, override: dict) -> dict:
"""
Recursively merge override into base.
Keys in override take precedence; nested dicts are merged, not replaced.
"""
result = base.copy()
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = deep_merge(result[key], value)
else:
result[key] = value
return result
def load_config(environment: str, config_dir: str = None) -> dict:
"""
Load and merge base config with environment-specific overrides.
Args:
environment: One of 'dev', 'staging', 'production'
config_dir: Path to config directory. Defaults to project root /config.
Returns:
Merged configuration dictionary.
Raises:
FileNotFoundError: If base.yaml or the environment config file is missing.
ValueError: If environment is not a recognized value.
"""
valid_environments = {"dev", "staging", "production"}
if environment not in valid_environments:
raise ValueError(
f"Unknown environment '{environment}'. "
f"Must be one of: {', '.join(sorted(valid_environments))}"
)
if config_dir is None:
# Default: two levels up from this file, then into /config
config_dir = Path(__file__).parent.parent / "config"
else:
config_dir = Path(config_dir)
base_path = config_dir / "base.yaml"
env_path = config_dir / f"{environment}.yaml"
if not base_path.exists():
raise FileNotFoundError(f"Base config not found at: {base_path}")
if not env_path.exists():
raise FileNotFoundError(
f"Environment config for '{environment}' not found at: {env_path}"
)
with open(base_path) as f:
base_config = yaml.safe_load(f)
with open(env_path) as f:
env_config = yaml.safe_load(f) or {}
return deep_merge(base_config, env_config)
The deep_merge function is worth understanding carefully. A naive {**base, **override} merge would replace entire nested dictionaries. If base has 10 keys under destination and your dev.yaml overrides just schema, a shallow merge would wipe out the other 9 keys. Deep merge handles this correctly — it descends into nested dicts and only replaces individual leaf values.
Config files handle structural settings, but you have three additional mechanisms for getting values into a pipeline at runtime: environment variables, CLI arguments, and secrets managers. Each has a specific role.
Passwords, API keys, and connection tokens should never live in YAML files, especially not ones that get committed to a repository. The standard approach is to read sensitive values from environment variables at runtime, and store them in a secrets manager or CI/CD secret store.
# pipeline/config_loader.py (continued)
import os
def inject_secrets(config: dict) -> dict:
"""
Overlay secret values from environment variables into the config.
Environment variables take precedence over any config file values.
Expected env vars:
ORDERS_DB_PASSWORD - Postgres connection password
SNOWFLAKE_PASSWORD - Snowflake user password
PRODUCTS_API_KEY - Bearer token for enrichment API
SLACK_WEBHOOK_URL - Slack notification URL (optional)
"""
secrets_map = {
("source", "password"): "ORDERS_DB_PASSWORD",
("destination", "password"): "SNOWFLAKE_PASSWORD",
("enrichment", "api_key"): "PRODUCTS_API_KEY",
("notifications", "slack_webhook_url"): "SLACK_WEBHOOK_URL",
}
result = config.copy()
for key_path, env_var in secrets_map.items():
value = os.environ.get(env_var)
if value is not None:
# Navigate to the correct nested key and set it
d = result
for part in key_path[:-1]:
d = d.setdefault(part, {})
d[key_path[-1]] = value
return result
Security tip: Add a startup check that verifies required secrets are present before the pipeline does any real work. A missing
SNOWFLAKE_PASSWORDthat surfaces 45 minutes into a pipeline run is much more painful than one that fails immediately at launch.
def validate_required_secrets(config: dict) -> None:
"""Fail fast if required secrets are not present."""
required = [
(("source", "password"), "ORDERS_DB_PASSWORD"),
(("destination", "password"), "SNOWFLAKE_PASSWORD"),
(("enrichment", "api_key"), "PRODUCTS_API_KEY"),
]
missing = []
for key_path, env_var in required:
d = config
try:
for part in key_path:
d = d[part]
except KeyError:
missing.append(env_var)
if missing:
raise EnvironmentError(
f"Required environment variables not set: {', '.join(missing)}"
)
The pipeline entry point should accept the environment as a command-line argument, not read it from another environment variable (that gets confusing fast). Use Python's argparse for this:
# run_pipeline.py
import argparse
import logging
from pipeline.config_loader import load_config, inject_secrets, validate_required_secrets
from pipeline.extract import extract_orders
from pipeline.transform import transform_orders
from pipeline.load import load_to_warehouse
def parse_args():
parser = argparse.ArgumentParser(
description="Customer Orders ETL Pipeline",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--env",
required=True,
choices=["dev", "staging", "production"],
help="Target environment for this pipeline run",
)
parser.add_argument(
"--lookback-days",
type=int,
default=None,
help="Override the number of lookback days (useful for backfills)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Run extraction and transformation but skip the load step",
)
return parser.parse_args()
def main():
args = parse_args()
# Load and merge config
config = load_config(args.env)
config = inject_secrets(config)
validate_required_secrets(config)
# Apply CLI overrides on top of file-based config
if args.lookback_days is not None:
config["source"]["lookback_days"] = args.lookback_days
# Configure logging from config
logging.basicConfig(
level=getattr(logging, config["pipeline"]["log_level"]),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("pipeline")
logger.info(
f"Starting {config['pipeline']['name']} v{config['pipeline']['version']} "
f"in {args.env} environment"
)
# Run the pipeline
raw_data = extract_orders(config)
transformed_data = transform_orders(raw_data, config)
if args.dry_run:
logger.info(f"Dry run complete. {len(transformed_data)} rows would be loaded.")
else:
load_to_warehouse(transformed_data, config)
if __name__ == "__main__":
main()
This layering — base config, environment overrides, secrets injection, CLI overrides — gives you a predictable precedence chain. The most specific source wins. That's a mental model your whole team can learn quickly.
Silently ignoring a missing or mistyped config key is one of the most frustrating bugs to track down in production. Your pipeline runs fine until it hits the code path that reads the bad key, and by then you may have already spent 20 minutes on partially processed data.
The cleanest way to enforce your config schema in Python is with pydantic. Define your configuration as typed models and validate on load:
# pipeline/config_schema.py
from pydantic import BaseModel, Field, field_validator
from typing import Literal
from enum import Enum
class WriteMode(str, Enum):
append = "append"
overwrite = "overwrite"
merge = "merge"
class PipelineConfig(BaseModel):
name: str
version: str
batch_size: int = Field(gt=0, le=1_000_000)
max_retries: int = Field(ge=0, le=10)
retry_delay_seconds: int = Field(ge=0)
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"
class SourceConfig(BaseModel):
type: Literal["postgres", "mysql"]
host: str
port: int = Field(gt=0, lt=65536)
database: str
schema: str
table: str
lookback_days: int = Field(gt=0, le=365)
password: str = "" # Populated at runtime from env vars
class EnrichmentConfig(BaseModel):
api_base_url: str
request_timeout_seconds: int = Field(gt=0, le=300)
rate_limit_calls_per_minute: int = Field(gt=0, le=6000)
api_key: str = "" # Populated at runtime from env vars
@field_validator("api_base_url")
@classmethod
def must_be_https(cls, v):
if not v.startswith("https://"):
raise ValueError(
f"api_base_url must use HTTPS. Got: {v}"
)
return v
class DestinationConfig(BaseModel):
type: Literal["snowflake", "bigquery", "redshift"]
account: str
database: str
schema: str
table: str
warehouse: str
write_mode: WriteMode = WriteMode.append
password: str = "" # Populated at runtime from env vars
class NotificationConfig(BaseModel):
slack_webhook_enabled: bool = False
slack_webhook_url: str = ""
alert_on_row_count_below: int = Field(ge=0)
class AppConfig(BaseModel):
pipeline: PipelineConfig
source: SourceConfig
enrichment: EnrichmentConfig
destination: DestinationConfig
notifications: NotificationConfig
def validate_config(raw_config: dict) -> AppConfig:
"""
Parse and validate raw config dict against the schema.
Raises pydantic.ValidationError with detailed messages on failure.
"""
return AppConfig(**raw_config)
Now update run_pipeline.py to validate before running:
from pipeline.config_schema import validate_config, AppConfig
# After loading and injecting secrets:
config_dict = inject_secrets(config_dict)
config: AppConfig = validate_config(config_dict) # Raises on bad config
# Access with type safety
batch_size = config.pipeline.batch_size # int, guaranteed
The payoff here is significant. When someone accidentally sets batch_size: "ten thousand" in a YAML file, Pydantic catches it immediately with a message like batch_size: value is not a valid integer. When a new team member adds a staging config that points to an HTTP (not HTTPS) enrichment endpoint, the must_be_https validator fires before a single byte of data is processed.
Practical note: If you're using Python 3.9 or earlier, use
pydanticv1 syntax (@validatorinstead of@field_validator). The code above targets pydantic v2.
The config object needs to flow through your pipeline cleanly. The most practical pattern is to pass the config (or the relevant sub-config) explicitly to each component function. Avoid module-level globals or singletons — they make testing painful.
# pipeline/extract.py
import logging
import psycopg2
import pandas as pd
from datetime import datetime, timedelta
from pipeline.config_schema import AppConfig
logger = logging.getLogger(__name__)
def extract_orders(config: AppConfig) -> pd.DataFrame:
"""
Extract recent orders from the source Postgres database.
Batch size and lookback window are driven entirely by config.
"""
src = config.source
pipeline = config.pipeline
cutoff_date = datetime.utcnow() - timedelta(days=src.lookback_days)
logger.info(
f"Extracting orders from {src.host}/{src.database}.{src.schema}.{src.table} "
f"since {cutoff_date.strftime('%Y-%m-%d')}, "
f"batch size: {pipeline.batch_size:,}"
)
conn = psycopg2.connect(
host=src.host,
port=src.port,
dbname=src.database,
user="pipeline_user", # username is not secret; could also be in config
password=src.password,
)
query = """
SELECT
order_id,
customer_id,
product_id,
quantity,
unit_price,
discount_amount,
order_timestamp,
status
FROM {schema}.{table}
WHERE order_timestamp >= %(cutoff)s
AND status != 'TEST'
ORDER BY order_timestamp
LIMIT %(batch_size)s
""".format(schema=src.schema, table=src.table)
df = pd.read_sql(
query,
conn,
params={"cutoff": cutoff_date, "batch_size": pipeline.batch_size},
)
logger.info(f"Extracted {len(df):,} rows")
conn.close()
return df
Notice that status != 'TEST' is hardcoded in the SQL — that's intentional. Filtering test orders is business logic, not a configurable behavior. The schema name, table name, lookback window, and batch size are all drawn from config. This is the boundary we talked about earlier.
If you're running pipelines in Airflow, you have an additional configuration layer to think about: Airflow Variables and Connections. The pattern that works best in practice is to use Airflow's environment-specific deployment (separate Airflow instances for dev/staging/prod, or namespaced Variables), and pass the environment name into your DAG at runtime.
# dags/customer_orders_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta
# Pull environment from Airflow Variables, not hardcoded
ENVIRONMENT = Variable.get("pipeline_environment", default_var="dev")
def run_extraction(**context):
from pipeline.config_loader import load_config, inject_secrets
from pipeline.config_schema import validate_config
from pipeline.extract import extract_orders
config_dict = load_config(ENVIRONMENT)
config_dict = inject_secrets(config_dict)
config = validate_config(config_dict)
# Handle backfill: use execution_date as the reference point
execution_date = context["execution_date"]
days_since_execution = (datetime.utcnow() - execution_date).days
if days_since_execution > config.source.lookback_days:
config.source.lookback_days = days_since_execution + 1
df = extract_orders(config)
# Store row count for downstream tasks to validate
context["ti"].xcom_push(key="extracted_row_count", value=len(df))
# Serialize to parquet and store path in XCom for the next task
temp_path = f"/tmp/orders_{execution_date.strftime('%Y%m%d')}.parquet"
df.to_parquet(temp_path, index=False)
context["ti"].xcom_push(key="staging_file_path", value=temp_path)
default_args = {
"owner": "data-engineering",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["data-alerts@yourcompany.com"],
}
with DAG(
dag_id="customer_orders_etl",
default_args=default_args,
schedule_interval="0 6 * * *", # 6 AM UTC daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["orders", "etl", ENVIRONMENT],
) as dag:
extract_task = PythonOperator(
task_id="extract_orders",
python_callable=run_extraction,
provide_context=True,
)
# ... additional tasks for transform and load
Airflow tip: Don't use
Variable.get()at module level in production DAGs on large Airflow deployments — it queries the metadata database every time the scheduler parses the DAG file. Instead, use it inside the callable function, or set the variable in the DAG'sparamsand access via the context. The example above is simplified for readability.
Let's put this together. You're going to build a parameterized pipeline that processes daily website clickstream data. It reads from a CSV file (simulating an S3 download), filters and aggregates the data, and writes a summary to a SQLite database (simulating a data warehouse).
Setup: Create the project structure shown above. Install dependencies:
pip install pyyaml pydantic pandas
Step 1: Create your base config.
# config/base.yaml
pipeline:
name: "clickstream_aggregator"
version: "1.0.0"
batch_size: 50000
log_level: "INFO"
source:
type: "csv"
input_directory: "./data/raw"
file_pattern: "clicks_*.csv"
lookback_days: 1
destination:
type: "sqlite"
database_path: "./data/warehouse.db"
table: "daily_page_views"
write_mode: "append"
filtering:
exclude_bot_traffic: true
minimum_session_duration_seconds: 5
Step 2: Create dev and production overrides.
# config/dev.yaml
pipeline:
log_level: "DEBUG"
batch_size: 1000
source:
input_directory: "./data/dev/raw"
lookback_days: 3
destination:
database_path: "./data/dev/warehouse.db"
table: "daily_page_views_dev"
write_mode: "overwrite"
# config/production.yaml
source:
input_directory: "/mnt/data/clickstream/raw"
lookback_days: 1
destination:
database_path: "/mnt/data/warehouse/analytics.db"
write_mode: "append"
Step 3: Implement the config loader using the load_config and deep_merge functions from earlier in this lesson.
Step 4: Write the pipeline components. Your extract.py should read CSV files from config.source.input_directory matching config.source.file_pattern. Your transform.py should apply the filtering settings from config.filtering. Your load.py should write to the SQLite path in config.destination.database_path.
Step 5: Test your config isolation. Run:
python run_pipeline.py --env dev
python run_pipeline.py --env production --dry-run
python run_pipeline.py --env dev --lookback-days 7
Verify that dev runs write to the dev database, that the dry run skips the load step, and that the lookback override changes the date range of your query.
Challenge extension: Add a --config-override CLI argument that accepts key=value pairs in dot notation (e.g., --config-override pipeline.batch_size=2000) and applies them on top of everything else. This is genuinely useful for one-off backfills.
The mistake: Using {**base_config, **env_config} to merge configs. If your environment YAML overrides a single nested key, the entire parent dictionary gets replaced.
The symptom: You add write_mode: overwrite to dev.yaml under destination, and suddenly your pipeline can't find destination.schema anymore.
The fix: Always use a recursive deep merge, as shown in the deep_merge function above. Write a unit test that confirms a single nested key override doesn't destroy sibling keys.
The mistake: Putting a real password in dev.yaml because "it's just dev." Dev configs get committed, repos get cloned to new machines, and secrets proliferate.
The fix: Use environment variables for all secrets without exception. Check your .gitignore to make sure .env files are excluded. Scan your repo with a tool like git-secrets or truffleHog as part of your CI pipeline.
The mistake: Using config.get("batch_size", 10000) everywhere instead of a schema validator. When batch_size is misspelled as batch-size in a YAML file, you silently use the default and your pipeline runs at a tenth of the expected speed. No errors, just slow and inexplicable behavior.
The fix: Validate your config against a schema immediately after loading. Pydantic's strict mode will catch key mismatches. If a key is optional with a default, make that explicit in the schema rather than scattered across dict.get() calls throughout the codebase.
The mistake: Your test suite reads from environment variables or the current ENV setting, which means tests behave differently on a developer's laptop versus CI.
The fix: In your tests, always construct config explicitly:
# tests/test_transform.py
def get_test_config():
"""Build a minimal config for testing — never reads from disk or env vars."""
return {
"pipeline": {
"name": "test_pipeline",
"batch_size": 100,
"log_level": "WARNING",
# ... other required fields
},
# ...
}
def test_transform_excludes_bot_traffic():
config = validate_config(get_test_config())
input_df = pd.DataFrame({...}) # synthetic test data
result = transform_orders(input_df, config)
assert not result["is_bot"].any()
The mistake: Writing log_level: DEBUG (no quotes) in a YAML file. YAML will parse this as the string "DEBUG", which is fine — until someone writes port: 05432, which YAML parses as the octal number 2842 in some parsers.
The fix: Quote strings that could be ambiguous. Port numbers, version strings, and anything that looks like it could be a number should be unambiguous. Pydantic validation will catch type errors downstream, but explicit YAML is easier to read.
The mistake: Over time, your environment configs diverge. Staging has settings that production doesn't, production has tuning parameters that nobody documented. Eventually, "works in staging" stops meaning anything.
The fix: Keep environment config files minimal — only genuine overrides from base. Review them in code review the same way you'd review application code. Add a CI check that validates all environment configs can be successfully loaded and pass schema validation.
You now have a complete mental model and implementation pattern for config-driven pipelines. Let's recap the key principles:
Separate concerns cleanly. Pipeline logic lives in code. Environment-specific behavior lives in config. Secrets live in environment variables and secrets managers, never in files.
Layer your configuration. Base defaults, environment overrides, runtime injection. Each layer only specifies what it owns. The most specific source wins.
Validate early and loudly. A schema validator like Pydantic transforms mysterious runtime failures into clear, immediate error messages at startup. This pays dividends every single time a config file is edited.
Pass config explicitly. Thread your config object through your pipeline components as a function argument. It makes testing trivial and makes the data flow obvious.
Keep environment files minimal. The more you override in an environment file, the harder it is to understand how that environment actually differs from base. Override only what genuinely changes.
With parameterized pipelines under your belt, the natural next topics to explore are:
Secrets management at scale: Integrating with HashiCorp Vault, AWS Secrets Manager, or GCP Secret Manager instead of relying on environment variables set manually. The config injection pattern you learned here makes this a straightforward swap.
Dynamic configuration in Airflow: Using Airflow's Params API to pass runtime configuration through to task operators without hardcoding it in the DAG file.
Configuration versioning and auditing: Storing a hash or snapshot of the config used for each pipeline run alongside the run's output, so you can always answer "what config produced this data?"
Feature flags for data pipelines: Using a lightweight feature flag system (LaunchDarkly, Unleash, or even a simple database table) to control pipeline behavior without deployments — particularly useful for gradual rollouts of new transformation logic.
The ability to confidently promote a pipeline from dev through staging to production — knowing that the only things changing are the config, not the code — is one of the most tangible signs of a mature data engineering practice. You've now got the foundation to build exactly that.
Learning Path: Data Pipeline Fundamentals