
It's 2:47 AM and your on-call phone rings. The data pipeline that feeds your company's revenue dashboard has been down for six hours. You dig into the logs and find it: a database password that expired at midnight, hardcoded in a config file that nobody remembered to update. The pipeline was silently failing, writing nothing, while leadership prepared for a board meeting that starts in four hours. You've just learned, at significant personal cost, why secrets management is a first-class engineering concern — not an afterthought.
This scenario plays out constantly in data engineering teams of every size. Credentials get committed to Git repositories, stored in plaintext environment variables, shared over Slack, or rotated manually on a calendar reminder that someone inevitably misses. The result is a class of production incidents that are entirely preventable but remain stubbornly common, because the tooling and patterns for doing this correctly aren't always obvious when you're moving fast.
By the end of this lesson, you will have a production-grade approach to managing credentials in data pipelines — from the initial storage of a secret through automated rotation, to integration with orchestration systems like Airflow and dbt. You'll build real code that reads secrets dynamically at runtime, handles rotation gracefully without downtime, and alerts you when something goes wrong before it becomes an incident.
What you'll learn:
You should be comfortable with:
Before we get into tooling, let's be precise about what we're actually protecting against. The security concerns are well-documented elsewhere, so we'll focus on the operational failures that secrets mismanagement causes in data pipelines specifically.
Rotation friction causes outages. When credentials live in config files or environment variables, rotating a password means coordinating a deployment. Someone has to update the secret value, redeploy the service, and verify everything works — ideally without downtime. In practice, teams avoid this friction, so passwords stay active far beyond their intended lifespan. When a security team eventually forces rotation, the pipeline breaks.
Sprawl makes incident response painful. A single database password might exist in your Airflow connection config, a dbt profiles.yml, three Glue job scripts, a Lambda function, and a local .env file on two developers' laptops. When that password needs to rotate, you're doing archaeology. The more copies a secret has, the longer your blast radius.
Static secrets can't be audited meaningfully. When every pipeline run uses the same credential, you can't answer "which pipeline accessed the warehouse at 3 AM and ran that expensive query?" because they all look identical. Dynamic, short-lived credentials solve this — each pipeline run gets a unique credential, and your audit log becomes meaningful.
The goal of modern secrets management is not just security — it's operational agility: the ability to rotate credentials at any time without downtime, understand who accessed what, and reduce the operational cost of credential lifecycle management.
You have three realistic choices for production data pipelines, and the right answer depends on where your infrastructure lives.
If your pipelines run on AWS (ECS, EKS, Lambda, Glue, or even EC2), AWS Secrets Manager is the path of least resistance. It integrates natively with IAM, supports automatic rotation for RDS databases, and has a straightforward API.
Strengths:
Weaknesses:
Vault is the gold standard for multi-cloud or on-premises environments. Its killer feature is dynamic secrets: Vault connects to your database and generates a unique username and password for each pipeline run, with a configurable TTL. After the TTL expires, the credential is automatically revoked.
Strengths:
Weaknesses:
The GCP equivalent of AWS Secrets Manager. If you're on BigQuery, Dataflow, or Cloud Composer, this is the natural choice.
Strengths:
Weaknesses:
For the rest of this lesson, we'll use AWS Secrets Manager for static-credential examples and Vault dynamic secrets for the rotation patterns, since these represent the two most common real-world scenarios. The patterns translate directly to GCP Secret Manager and other backends.
Before writing code, let's establish what a well-structured secret looks like. A common mistake is storing a single password string as the secret value. Instead, store a JSON object with all the connection details:
{
"engine": "postgresql",
"host": "analytics-db.cluster-abc123.us-east-1.rds.amazonaws.com",
"port": 5432,
"dbname": "analytics",
"username": "pipeline_user",
"password": "s3cr3t-value-here",
"ssl_mode": "require"
}
Storing the full connection spec in one secret gives you several advantages: you can update the host or port alongside the password during migrations without touching pipeline code, and your secret-fetching code has a clean, stable interface regardless of what changes underneath.
In AWS Secrets Manager, create this secret via the CLI:
aws secretsmanager create-secret \
--name "prod/analytics-pipeline/warehouse-db" \
--description "PostgreSQL credentials for the analytics data warehouse" \
--secret-string '{
"engine": "postgresql",
"host": "analytics-db.cluster-abc123.us-east-1.rds.amazonaws.com",
"port": 5432,
"dbname": "analytics",
"username": "pipeline_user",
"password": "s3cr3t-value-here",
"ssl_mode": "require"
}' \
--region us-east-1
Name your secrets with a consistent hierarchy: {environment}/{service}/{resource}. This isn't just organizational tidiness — it lets you write IAM policies that grant access to all secrets in prod/analytics-pipeline/* without listing individual secret ARNs.
Rather than calling boto3 inline everywhere a pipeline needs a credential, build a dedicated secret client module. This gives you a single place to add caching, error handling, and fallback logic.
# secrets_client.py
import json
import logging
import time
from functools import lru_cache
from typing import Any, Optional
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
class SecretsManagerClient:
"""
A caching wrapper around AWS Secrets Manager that handles
version-aware retrieval and provides connection-dict output
suitable for direct use with SQLAlchemy or psycopg2.
"""
def __init__(
self,
region_name: str = "us-east-1",
cache_ttl_seconds: int = 300,
):
self._client = boto3.client("secretsmanager", region_name=region_name)
self._cache: dict[str, tuple[Any, float]] = {}
self._cache_ttl = cache_ttl_seconds
def get_secret(
self,
secret_name: str,
version_stage: str = "AWSCURRENT",
force_refresh: bool = False,
) -> dict:
"""
Retrieve a secret by name. Results are cached for cache_ttl_seconds
to avoid hammering the Secrets Manager API on every pipeline task.
During credential rotation, pass version_stage="AWSPENDING" to
retrieve the new credential before it becomes AWSCURRENT.
"""
cache_key = f"{secret_name}:{version_stage}"
if not force_refresh and cache_key in self._cache:
cached_value, cached_at = self._cache[cache_key]
if time.monotonic() - cached_at < self._cache_ttl:
logger.debug("Returning cached secret for %s", secret_name)
return cached_value
try:
logger.info("Fetching secret %s (stage=%s)", secret_name, version_stage)
response = self._client.get_secret_value(
SecretId=secret_name,
VersionStage=version_stage,
)
except ClientError as e:
error_code = e.response["Error"]["Code"]
if error_code == "ResourceNotFoundException":
raise ValueError(f"Secret '{secret_name}' not found") from e
elif error_code == "AccessDeniedException":
raise PermissionError(
f"IAM role lacks access to secret '{secret_name}'"
) from e
else:
# For transient errors, log and re-raise so the caller
# can implement retry logic at the orchestration layer
logger.error(
"Unexpected error fetching secret %s: %s",
secret_name,
error_code,
)
raise
secret_value = json.loads(response["SecretString"])
self._cache[cache_key] = (secret_value, time.monotonic())
return secret_value
def get_connection_url(self, secret_name: str) -> str:
"""
Returns a SQLAlchemy-compatible connection URL built from secret fields.
Useful for direct use with pandas.read_sql or SQLAlchemy create_engine.
"""
s = self.get_secret(secret_name)
return (
f"{s['engine']}+psycopg2://{s['username']}:{s['password']}"
f"@{s['host']}:{s['port']}/{s['dbname']}"
f"?sslmode={s.get('ssl_mode', 'require')}"
)
def invalidate_cache(self, secret_name: str) -> None:
"""Call this when you know a rotation has occurred."""
keys_to_remove = [k for k in self._cache if k.startswith(secret_name)]
for key in keys_to_remove:
del self._cache[key]
logger.info("Invalidated cache for %s", secret_name)
# Module-level singleton — import this in your pipeline code
secrets = SecretsManagerClient()
A few design decisions worth explaining:
The TTL cache matters more than you think. In a complex Airflow DAG with 50 tasks, each task that needs a database connection will call get_secret. Without caching, that's 50 API calls per DAG run. AWS Secrets Manager charges per API call, and more importantly, it has rate limits. A 5-minute TTL is usually safe — short enough that a rotation is picked up within one DAG run interval, long enough to avoid rate limit issues.
force_refresh is your escape hatch. When your rotation Lambda explicitly tells your pipeline "I just rotated, invalidate your cache," the force_refresh=True parameter ensures the pipeline picks up the new credential immediately rather than serving stale data for up to 5 minutes.
Distinguish error types explicitly. ResourceNotFoundException and AccessDeniedException are configuration errors — retrying won't fix them, and they need different remediation. Raising them as ValueError and PermissionError respectively lets your orchestration layer handle them differently from transient network failures.
Airflow has its own secrets backend system, and wiring it to AWS Secrets Manager means your connections and variables are fetched dynamically at task execution time — no Airflow metadata database, no plaintext in your DAG code.
In your airflow.cfg or via environment variables:
[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {
"connections_prefix": "airflow/connections",
"variables_prefix": "airflow/variables",
"region_name": "us-east-1"
}
With this configuration, when your DAG references conn_id="warehouse_postgres", Airflow looks up airflow/connections/warehouse_postgres in Secrets Manager and constructs the connection object dynamically. The secret format needs to match Airflow's expected schema:
{
"conn_type": "postgres",
"host": "analytics-db.cluster-abc123.us-east-1.rds.amazonaws.com",
"schema": "analytics",
"login": "pipeline_user",
"password": "s3cr3t-value-here",
"port": 5432,
"extra": "{\"sslmode\": \"require\"}"
}
Here's a realistic DAG that loads Stripe payment data into a PostgreSQL warehouse. Notice how credentials are never touched directly in the DAG code:
# dags/stripe_to_warehouse.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
import logging
logger = logging.getLogger(__name__)
default_args = {
"owner": "data-engineering",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
}
with DAG(
dag_id="stripe_payments_to_warehouse",
start_date=datetime(2024, 1, 1),
schedule_interval="@hourly",
default_args=default_args,
catchup=False,
tags=["payments", "stripe", "ingestion"],
) as dag:
@task()
def extract_stripe_events(execution_date=None):
"""
Fetch Stripe payment events. The Stripe API key is stored in
Secrets Manager at prod/stripe/api-key and fetched here at
task runtime, not at DAG parse time.
"""
from secrets_client import secrets
import stripe
stripe_config = secrets.get_secret("prod/stripe/api-key")
stripe.api_key = stripe_config["api_key"]
start_ts = int(execution_date.timestamp())
end_ts = int((execution_date + timedelta(hours=1)).timestamp())
events = stripe.Event.list(
type="payment_intent.succeeded",
created={"gte": start_ts, "lt": end_ts},
limit=100,
)
return [e.to_dict() for e in events.auto_paging_iter()]
@task()
def load_events_to_staging(events: list):
"""
Load extracted events into the staging table.
Connection 'warehouse_postgres' is resolved by Airflow's
SecretsManagerBackend — no credentials in this code.
"""
if not events:
logger.info("No events to load for this execution window")
return 0
hook = PostgresHook(postgres_conn_id="warehouse_postgres")
records = [
(
event["id"],
event["data"]["object"]["amount"],
event["data"]["object"]["currency"],
event["created"],
event["data"]["object"].get("customer"),
)
for event in events
]
hook.insert_rows(
table="staging.stripe_payment_events",
rows=records,
target_fields=["event_id", "amount_cents", "currency", "event_ts", "customer_id"],
replace=True,
replace_index="event_id",
)
logger.info("Loaded %d events to staging", len(records))
return len(records)
events = extract_stripe_events()
load_events_to_staging(events)
Important: Notice that
from secrets_client import secretshappens inside the task function body, not at the module level. This is critical. Airflow parses DAG files frequently to discover tasks. If you import and callsecrets.get_secret()at module level, every DAG parse triggers an API call — often hundreds of times per minute. Import and use secrets only inside task callables.
dbt reads database credentials from profiles.yml, which many teams check into Git. Don't do this. Instead, use environment variables that are populated from your secrets manager at runtime.
The correct profiles.yml uses Jinja template syntax to reference environment variables:
# profiles.yml — safe to commit to Git
analytics:
target: "{{ env_var('DBT_TARGET', 'dev') }}"
outputs:
prod:
type: postgres
host: "{{ env_var('DBT_HOST') }}"
port: "{{ env_var('DBT_PORT', '5432') | int }}"
user: "{{ env_var('DBT_USER') }}"
password: "{{ env_var('DBT_PASSWORD') }}"
dbname: "{{ env_var('DBT_DBNAME') }}"
schema: "{{ env_var('DBT_SCHEMA', 'public') }}"
sslmode: require
threads: 4
dev:
type: postgres
host: localhost
port: 5432
user: "{{ env_var('DBT_USER', 'dev_user') }}"
password: "{{ env_var('DBT_PASSWORD', 'dev_password') }}"
dbname: analytics_dev
schema: "{{ env_var('DBT_USER', 'dev') }}"
threads: 2
Then write a wrapper script that populates those environment variables from Secrets Manager before invoking dbt:
#!/usr/bin/env python3
# run_dbt.py — used by CI/CD and Airflow to execute dbt with live credentials
import json
import os
import subprocess
import sys
import boto3
from botocore.exceptions import ClientError
def inject_secrets_to_env(secret_name: str, region: str = "us-east-1") -> None:
"""
Fetch warehouse credentials from Secrets Manager and export them
as environment variables for the dbt process.
"""
client = boto3.client("secretsmanager", region_name=region)
try:
response = client.get_secret_value(SecretId=secret_name)
creds = json.loads(response["SecretString"])
except ClientError as e:
print(f"ERROR: Failed to fetch secret '{secret_name}': {e}", file=sys.stderr)
sys.exit(1)
# Map secret fields to the env vars dbt profiles.yml expects
env_mapping = {
"host": "DBT_HOST",
"port": "DBT_PORT",
"username": "DBT_USER",
"password": "DBT_PASSWORD",
"dbname": "DBT_DBNAME",
}
for secret_key, env_var in env_mapping.items():
if secret_key in creds:
os.environ[env_var] = str(creds[secret_key])
os.environ["DBT_TARGET"] = os.getenv("ENVIRONMENT", "prod")
os.environ["DBT_SCHEMA"] = os.getenv("DBT_SCHEMA", "public")
def main():
secret_name = os.environ.get(
"WAREHOUSE_SECRET_NAME",
"prod/analytics-pipeline/warehouse-db"
)
print(f"Fetching credentials from: {secret_name}")
inject_secrets_to_env(secret_name)
# Pass all remaining arguments directly to dbt
dbt_command = ["dbt"] + sys.argv[1:]
print(f"Running: {' '.join(dbt_command)}")
result = subprocess.run(dbt_command, check=False)
sys.exit(result.returncode)
if __name__ == "__main__":
main()
Usage in CI/CD or an Airflow BashOperator:
python run_dbt.py run --select staging.+ --target prod
The credentials live in memory only for the duration of the process and are never written to disk or logs.
This is where most teams get it wrong. Naive rotation looks like this:
This approach has a window of failure. If any pipeline reads the old password from cache between the database update and the Secrets Manager update, it fails. The correct approach is dual-credential rotation, which AWS Secrets Manager's built-in rotation implements natively.
AWS Secrets Manager rotation Lambdas implement this protocol, and understanding it helps you build robust pipelines:
Phase 1 — createSecret: Generate a new credential. Write it to the secret as AWSPENDING, alongside the existing AWSCURRENT. The database still only accepts the old credential.
Phase 2 — setSecret: Apply the new credential to the database. Now the database accepts both the old and new credentials simultaneously (this requires your database to support multiple active passwords, which PostgreSQL does via ALTER USER ... PASSWORD).
Phase 3 — testSecret: Verify the AWSPENDING credential actually works by opening a real database connection with it.
Phase 4 — finishSecret: Promote AWSPENDING to AWSCURRENT. The old credential becomes AWSPREVIOUS and remains valid temporarily. After a grace period, remove the old credential from the database.
During this entire process, running pipelines that read AWSCURRENT continue working — they get the old credential until Phase 4, and the new credential after. Pipelines that cache credentials for up to 5 minutes have a grace period during which AWSPREVIOUS is still valid.
For databases that aren't natively supported by AWS Secrets Manager (Snowflake, for example), you write your own rotation Lambda:
# rotation_lambda.py — deploys as a Lambda function triggered by Secrets Manager
import json
import logging
import boto3
import snowflake.connector
from botocore.exceptions import ClientError
logger = logging.getLogger()
logger.setLevel(logging.INFO)
secrets_client = boto3.client("secretsmanager")
def lambda_handler(event, context):
"""
Entry point for AWS Secrets Manager rotation.
The event contains the secret ARN, the rotation step, and a token
identifying this specific rotation attempt.
"""
arn = event["SecretId"]
token = event["ClientRequestToken"]
step = event["Step"]
# Verify the secret exists and the token matches a pending version
metadata = secrets_client.describe_secret(SecretId=arn)
if not metadata.get("RotationEnabled"):
raise ValueError(f"Secret {arn} is not configured for rotation")
versions = metadata.get("VersionIdsToStages", {})
if token not in versions:
raise ValueError(f"Token {token} not found in secret versions")
if "AWSCURRENT" in versions[token]:
logger.info("Token %s is already AWSCURRENT — rotation complete", token)
return
if "AWSPENDING" not in versions[token]:
raise ValueError(f"Token {token} is not AWSPENDING")
# Dispatch to the appropriate phase handler
handlers = {
"createSecret": create_secret,
"setSecret": set_secret,
"testSecret": test_secret,
"finishSecret": finish_secret,
}
if step not in handlers:
raise ValueError(f"Unknown rotation step: {step}")
handlers[step](arn, token)
def create_secret(arn: str, token: str) -> None:
"""Phase 1: Generate a new password and write it as AWSPENDING."""
try:
# If AWSPENDING already exists (e.g., Lambda retried), use it as-is
secrets_client.get_secret_value(SecretId=arn, VersionStage="AWSPENDING")
logger.info("AWSPENDING already exists, skipping creation")
return
except ClientError as e:
if e.response["Error"]["Code"] != "ResourceNotFoundException":
raise
current = json.loads(
secrets_client.get_secret_value(SecretId=arn, VersionStage="AWSCURRENT")[
"SecretString"
]
)
# Generate a new password — use Secrets Manager's password generator
# to get a cryptographically secure, policy-compliant password
new_password = secrets_client.get_random_password(
PasswordLength=32,
ExcludeCharacters='/@"\' ', # Exclude chars that break connection strings
RequireEachIncludedType=True,
)["RandomPassword"]
pending_secret = {**current, "password": new_password}
secrets_client.put_secret_value(
SecretId=arn,
ClientRequestToken=token,
SecretString=json.dumps(pending_secret),
VersionStages=["AWSPENDING"],
)
logger.info("Created AWSPENDING secret for %s", arn)
def set_secret(arn: str, token: str) -> None:
"""Phase 2: Apply the new password to Snowflake."""
current = _get_secret(arn, "AWSCURRENT")
pending = _get_secret(arn, "AWSPENDING")
# Connect using the CURRENT credential (we know it works)
conn = snowflake.connector.connect(
account=current["account"],
user=current["admin_user"], # A separate admin user with ALTER USER privilege
password=current["admin_password"],
warehouse=current["warehouse"],
)
try:
cursor = conn.cursor()
cursor.execute(
f"ALTER USER {pending['username']} SET PASSWORD = %s",
(pending["password"],),
)
logger.info("Updated Snowflake password for user %s", pending["username"])
finally:
conn.close()
def test_secret(arn: str, token: str) -> None:
"""Phase 3: Verify the AWSPENDING credential actually connects."""
pending = _get_secret(arn, "AWSPENDING")
try:
conn = snowflake.connector.connect(
account=pending["account"],
user=pending["username"],
password=pending["password"],
warehouse=pending["warehouse"],
database=pending["database"],
)
conn.cursor().execute("SELECT CURRENT_USER()")
conn.close()
logger.info("AWSPENDING credential test passed")
except Exception as e:
logger.error("AWSPENDING credential test FAILED: %s", e)
raise
def finish_secret(arn: str, token: str) -> None:
"""Phase 4: Promote AWSPENDING to AWSCURRENT."""
metadata = secrets_client.describe_secret(SecretId=arn)
current_version = next(
version_id
for version_id, stages in metadata["VersionIdsToStages"].items()
if "AWSCURRENT" in stages
)
if current_version == token:
logger.info("Version %s is already AWSCURRENT", token)
return
secrets_client.update_secret_version_stage(
SecretId=arn,
VersionStage="AWSCURRENT",
MoveToVersionId=token,
RemoveFromVersionId=current_version,
)
logger.info("Promoted %s to AWSCURRENT", token)
def _get_secret(arn: str, stage: str) -> dict:
response = secrets_client.get_secret_value(SecretId=arn, VersionStage=stage)
return json.loads(response["SecretString"])
Key insight: The rotation Lambda must be idempotent. Secrets Manager may call each phase multiple times if the Lambda times out or errors. That's why
create_secretchecks whetherAWSPENDINGalready exists before generating a new password — otherwise, a retry would create a second new password and invalidate the one already set in the database.
Short-lived tasks (< 5 minutes) are generally fine — they read the credential at start, finish before rotation could occur. Long-running pipelines (bulk historical loads, large Spark jobs) need explicit rotation awareness.
The pattern is to use a connection factory rather than a single long-lived connection:
# connection_factory.py
import time
import logging
import psycopg2
from secrets_client import secrets
logger = logging.getLogger(__name__)
class RotationAwareConnectionFactory:
"""
Creates new database connections that automatically refresh credentials
if a rotation has occurred since the last connection was opened.
Use this for batch jobs that run longer than the rotation schedule.
"""
def __init__(
self,
secret_name: str,
connection_refresh_interval: int = 3600, # Refresh connection every hour
):
self._secret_name = secret_name
self._refresh_interval = connection_refresh_interval
self._conn = None
self._conn_opened_at = 0.0
def get_connection(self) -> psycopg2.extensions.connection:
"""
Return an active connection. If the connection is older than
refresh_interval, close it and open a new one with fresh credentials.
"""
now = time.monotonic()
age = now - self._conn_opened_at
if self._conn is None or self._conn.closed or age > self._refresh_interval:
if self._conn and not self._conn.closed:
self._conn.close()
logger.info("Closed connection aged %.0f seconds", age)
# Force-refresh the secret cache to pick up any rotation
secrets.invalidate_cache(self._secret_name)
cred = secrets.get_secret(self._secret_name)
self._conn = psycopg2.connect(
host=cred["host"],
port=cred["port"],
dbname=cred["dbname"],
user=cred["username"],
password=cred["password"],
sslmode=cred.get("ssl_mode", "require"),
connect_timeout=10,
)
self._conn_opened_at = now
logger.info("Opened fresh connection with credentials from %s", self._secret_name)
return self._conn
def execute_batch(self, sql: str, batch: list) -> int:
"""Execute a batch with automatic retry on authentication failure."""
conn = self.get_connection()
try:
with conn.cursor() as cur:
from psycopg2.extras import execute_values
execute_values(cur, sql, batch)
conn.commit()
return cur.rowcount
except psycopg2.OperationalError as e:
if "authentication" in str(e).lower() or "password" in str(e).lower():
logger.warning(
"Authentication error — credential may have rotated. "
"Forcing refresh and retrying."
)
# Force connection refresh on next call
self._conn = None
self._conn_opened_at = 0.0
# Retry once with fresh credentials
conn = self.get_connection()
with conn.cursor() as cur:
from psycopg2.extras import execute_values
execute_values(cur, sql, batch)
conn.commit()
return cur.rowcount
raise
You're going to build a complete credential rotation test harness for a hypothetical analytics pipeline. This exercise can run locally with LocalStack or against real AWS if you have an account.
Install dependencies:
pip install boto3 localstack awscli-local psycopg2-binary
Start LocalStack (Docker required):
docker run --rm -d \
-p 4566:4566 \
-e SERVICES=secretsmanager \
--name localstack \
localstack/localstack
Step 1: Create a secret in LocalStack:
awslocal secretsmanager create-secret \
--name "prod/exercise/warehouse-db" \
--secret-string '{
"engine": "postgresql",
"host": "localhost",
"port": 5432,
"dbname": "analytics",
"username": "pipeline_user",
"password": "initial-password-abc123",
"ssl_mode": "disable"
}'
Step 2: Write a script that:
# exercise_rotation.py
import json
import boto3
import time
# Point the client at LocalStack
client = boto3.client(
"secretsmanager",
region_name="us-east-1",
endpoint_url="http://localhost:4566",
aws_access_key_id="test",
aws_secret_access_key="test",
)
SECRET_NAME = "prod/exercise/warehouse-db"
def read_current_password() -> str:
response = client.get_secret_value(SecretId=SECRET_NAME)
return json.loads(response["SecretString"])["password"]
def simulate_rotation(new_password: str) -> None:
"""
Simulate what the rotation Lambda does:
write new value as AWSPENDING, then promote to AWSCURRENT.
"""
# Get current secret content
current = json.loads(
client.get_secret_value(
SecretId=SECRET_NAME, VersionStage="AWSCURRENT"
)["SecretString"]
)
# Write pending version
pending_secret = {**current, "password": new_password}
put_response = client.put_secret_value(
SecretId=SECRET_NAME,
SecretString=json.dumps(pending_secret),
VersionStages=["AWSPENDING"],
)
pending_version_id = put_response["VersionId"]
print(f"Wrote AWSPENDING version: {pending_version_id}")
# Simulate test phase (in reality you'd connect to the DB here)
print("Testing pending credential... (simulated) OK")
# Promote AWSPENDING to AWSCURRENT
metadata = client.describe_secret(SecretId=SECRET_NAME)
current_version_id = next(
vid
for vid, stages in metadata["VersionIdsToStages"].items()
if "AWSCURRENT" in stages
)
client.update_secret_version_stage(
SecretId=SECRET_NAME,
VersionStage="AWSCURRENT",
MoveToVersionId=pending_version_id,
RemoveFromVersionId=current_version_id,
)
print(f"Promoted {pending_version_id} to AWSCURRENT")
if __name__ == "__main__":
print("=== Credential Rotation Exercise ===")
password_before = read_current_password()
print(f"Password before rotation: {password_before}")
simulate_rotation("rotated-password-xyz789")
password_after = read_current_password()
print(f"Password after rotation: {password_after}")
assert password_before != password_after, "Rotation failed — password unchanged!"
print("\n✓ Rotation successful. Your pipeline client would now fetch the new credential.")
Step 3: Modify the SecretsManagerClient from earlier to point at LocalStack by passing endpoint_url="http://localhost:4566", then verify that:
invalidate_cache(), the new credential is returnedThis exercise mirrors exactly what happens during production rotation — implement it correctly here, and the production version is just a matter of removing the LocalStack endpoint.
Symptom: Airflow scheduler CPU spikes. Secrets Manager rate limit errors in logs (ThrottlingException).
Cause: secrets.get_secret() called at module level in DAG files. Airflow parses every DAG file every 30 seconds by default.
Fix: Move all secret fetches inside task callables. If you need the secret to construct an operator argument, use a deferred approach or store it as an Airflow Variable (which is also backed by Secrets Manager if configured correctly).
Symptom: The database password was updated, but Secrets Manager still shows the old value as AWSCURRENT. Pipelines eventually fail when the DBA manually cleans up the orphaned database user.
Cause: The rotation Lambda's testSecret phase failed (usually a network timeout), but the database password was already changed in setSecret. The rotation was rolled back at the Secrets Manager level, but the database doesn't know that.
Fix: Always implement testSecret with comprehensive validation. Log failures verbosely. Consider adding a CloudWatch alarm on rotation Lambda error rates. For critical pipelines, keep the AWSPREVIOUS credential valid for at least 24 hours by not removing it until you've confirmed the AWSCURRENT credential has been used successfully.
Symptom: AccessDeniedException in pipeline logs, but the IAM policy shows secretsmanager:GetSecretValue is allowed.
Cause: The IAM policy grants access to arn:aws:secretsmanager:us-east-1:123456789:secret:prod/* but the actual secret ARN includes a random 6-character suffix: arn:aws:secretsmanager:us-east-1:123456789:secret:prod/analytics-pipeline/warehouse-db-AbCdEf. The wildcard in a resource ARN must account for this suffix.
Fix: Use arn:aws:secretsmanager:us-east-1:123456789:secret:prod/* with a trailing wildcard, or better, use a condition key that matches on the secret name rather than the full ARN.
Symptom: Passwords visible in CloudWatch Logs, Datadog, or Airflow task logs.
Cause: logger.debug("Fetched secret: %s", secret_dict) — the entire dict gets serialized.
Fix: Never log secret values. Log the secret name and version ID only. Add a pre-commit hook that scans for patterns like password, secret_string, or api_key appearing as log arguments.
Symptom: You haven't rotated credentials in 18 months because you're not sure if the rotation Lambda works, and the security team is now requiring 90-day rotation.
Fix: Treat credential rotation like a database migration — it needs to be tested in staging before running in production. Use the LocalStack exercise above to build confidence. Then run a manual rotation in your staging environment during business hours with engineers watching, before you enable automated rotation on the production schedule.
You've covered a substantial amount of ground. Let's consolidate what you now know how to do:
You can evaluate and choose a secrets backend based on your infrastructure: AWS Secrets Manager for AWS-native pipelines, Vault for multi-cloud or on-premises, and GCP Secret Manager for GCP workloads. The choice isn't just about features — it's about where your IAM lives and who manages the operational overhead.
You've built a production-grade Python secrets client with TTL caching, explicit error handling, and a cache invalidation mechanism. This pattern reduces API calls, handles transient errors gracefully, and gives you a single place to add observability.
You understand the four-phase rotation protocol and why it matters. Zero-downtime rotation isn't magic — it's a carefully sequenced set of operations that keeps both old and new credentials valid simultaneously during the transition window. You've seen how to implement this for a non-native database like Snowflake.
You've wired secrets into Airflow and dbt in a way that avoids credentials in DAG code, config files, or Git history. Your pipelines now fetch credentials dynamically at runtime, which means rotation happens transparently.
You have a troubleshooting playbook for the five most common secrets management failures — the kinds of things that cause 3 AM incidents.
Dynamic secrets with Vault: If you're ready to move beyond static credential rotation to genuinely ephemeral credentials, explore Vault's database secrets engine. Your pipeline gets a unique username/password that expires in 15 minutes, making credential compromise nearly meaningless.
IRSA and Workload Identity: For Kubernetes-based pipelines, investigate IAM Roles for Service Accounts (AWS) or Workload Identity Federation (GCP) to eliminate static credentials entirely. The pipeline's identity is its Kubernetes service account, and credentials are never stored at all.
Secrets scanning in CI/CD: Tools like detect-secrets, truffleHog, and GitHub's built-in secret scanning prevent credentials from ever reaching Git. Add this to your pipeline CI as a mandatory gate.
Centralized secrets audit logging: With credentials managed centrally, you can build a meaningful audit trail. CloudTrail logs every GetSecretValue call with the IAM principal, timestamp, and source IP. Build a dashboard that surfaces anomalies: unexpected principals, unusual hours, high call volumes from a single pipeline run.
The 3 AM incident at the beginning of this lesson is preventable. The tools exist, the patterns are proven, and you now have the knowledge to implement them.
Learning Path: Data Pipeline Fundamentals