
You're three weeks into a new role as a data engineer when your manager drops a Slack message that ruins your afternoon: "Hey, the daily revenue report shows $4.2M yesterday but finance says it should be $2.1M. Can you look into it?" You dig through the logs and find it immediately — the ETL job failed halfway through at 2 AM, the scheduler retried it, and the second run inserted the same transactions again. Every row got written twice. Exactly doubled. You fix it in an hour, but the damage is done: a finance team that no longer trusts the pipeline, an incident report to write, and a nagging feeling that this could happen again anywhere else in the system.
This scenario plays out in production data engineering constantly. The cause is almost always the same: pipelines designed around the happy path, where retries, partial failures, and out-of-order processing were treated as edge cases instead of core design requirements. The technical term for what went wrong is a violation of exactly-once semantics — the guarantee that each record in a pipeline is processed and delivered exactly one time, regardless of retries, restarts, or partial failures. The architectural pattern that makes exactly-once semantics achievable is idempotency — designing your operations so that applying them multiple times produces the same result as applying them once.
By the end of this lesson, you'll understand how exactly-once semantics actually work at an engineering level, why "at-least-once with idempotent consumers" is often a better practical guarantee than true exactly-once, and how to design every layer of a production pipeline — ingestion, transformation, and load — so that retries are safe, partial failures are recoverable, and your on-call rotation doesn't get paged at 2 AM because a scheduler retried at the wrong moment.
What you'll learn:
You should be comfortable with:
Before we write a single line of code, we need to be precise about what we're trying to guarantee, because the term "exactly-once" is notoriously abused in marketing material and engineering blogs alike.
There are three delivery semantic guarantees in distributed systems:
At-most-once: A message is delivered zero or one times. You prioritize availability and low latency over completeness. If something goes wrong, you skip it and move on. This is appropriate for telemetry you're sampling anyway — if you lose 0.1% of click events in an A/B test, the statistical significance probably doesn't change. It is catastrophically inappropriate for financial transactions.
At-least-once: A message is delivered one or more times. You guarantee nothing gets lost, but you accept that duplicates may occur. This is the default behavior of almost every message queue and most ETL frameworks when you implement basic retry logic. The practical implication: your consumers must handle duplicates, which means they need to be idempotent.
Exactly-once: A message is delivered exactly one time. No loss, no duplicates. This sounds like the obvious goal, but achieving it requires coordination across systems that may not support it, and that coordination has a real cost in latency and complexity.
Here's the uncomfortable truth: true exactly-once delivery across distributed systems is theoretically impossible without 2-phase commit or equivalent distributed transactions, and those have significant performance and operational costs. What modern systems like Kafka call "exactly-once" is more precisely "effectively-once" — they use transactional producers and idempotent consumers together to emulate exactly-once semantics. The distinction matters when you're debugging.
The pragmatic approach taken by most production data engineering teams is: implement at-least-once delivery at the infrastructure layer, and guarantee idempotency at the application layer. The net effect is exactly-once semantics from the perspective of your data consumers, at a fraction of the complexity cost.
An operation is idempotent if applying it multiple times produces the same result as applying it once. This is a mathematical property borrowed from abstract algebra, but its practical meaning in data engineering is straightforward: if your pipeline runs twice, your data should look like it ran once.
The canonical example of a non-idempotent operation is an INSERT:
-- Non-idempotent: running this twice doubles your data
INSERT INTO daily_revenue (date, product_id, revenue)
VALUES ('2024-01-15', 'PROD-8821', 14235.50);
Run that statement twice, you get two rows. Run it ten times during debugging, you get ten rows. The operation doesn't know its own history.
The canonical example of an idempotent operation is an UPSERT (also called MERGE):
-- Idempotent: running this ten times produces one row
MERGE INTO daily_revenue AS target
USING (
SELECT '2024-01-15' AS date, 'PROD-8821' AS product_id, 14235.50 AS revenue
) AS source
ON target.date = source.date AND target.product_id = source.product_id
WHEN MATCHED THEN
UPDATE SET revenue = source.revenue
WHEN NOT MATCHED THEN
INSERT (date, product_id, revenue)
VALUES (source.date, source.product_id, source.revenue);
Run that ten times. You get one row with the correct value. This is what idempotency looks like in practice.
But idempotency is subtle. Consider aggregations. If you're calculating revenue by summing transaction amounts, an upsert on the pre-computed aggregate is idempotent — you're replacing the computed value. But an UPDATE daily_revenue SET revenue = revenue + 14235.50 is absolutely not idempotent, because each execution compounds. The distinction between replacing a value and accumulating a value is one of the most common sources of bugs in pipeline design.
To implement any upsert or deduplication strategy, you need a key that uniquely identifies a logical record. Sometimes this key exists naturally in your source data. A transaction ID from a payment processor, an order ID from an e-commerce platform, or a device serial number from an IoT sensor are all natural idempotency keys — they uniquely identify the business event, not just the row in your database.
When natural keys don't exist or aren't reliable, you need to construct a surrogate idempotency key — a deterministic hash of the record's business content. "Deterministic" is the operative word: given the same input data, you must always produce the same key.
import hashlib
import json
from datetime import date
def generate_idempotency_key(record: dict, key_fields: list[str]) -> str:
"""
Generate a deterministic idempotency key from specific fields.
We sort both the key_fields and the resulting dict to ensure
consistent ordering across different Python versions and environments.
The key must be stable: the same logical event must always
produce the same hash.
"""
key_data = {field: record[field] for field in sorted(key_fields)}
# JSON serialize with sorted keys for determinism
# Use separators=(',', ':') to eliminate whitespace variation
canonical_form = json.dumps(key_data, sort_keys=True, separators=(',', ':'), default=str)
return hashlib.sha256(canonical_form.encode('utf-8')).hexdigest()
# Example: an e-commerce order line item
order_line = {
"order_id": "ORD-2024-88821",
"product_sku": "WIDGET-PRO-XL",
"quantity": 3,
"unit_price": 49.99,
"warehouse_id": "WH-EAST-02",
"created_at": "2024-01-15T14:32:11Z",
"customer_id": "CUST-44892"
}
# Key fields are the fields that uniquely identify the business event
idempotency_key = generate_idempotency_key(
order_line,
key_fields=["order_id", "product_sku", "warehouse_id"]
)
print(idempotency_key)
# a3f9c8d2e1b4... (always the same for this input)
A few critical design decisions here that are easy to get wrong:
Don't include mutable fields in your key. If unit_price can be updated after the order is placed (due to price corrections), including it in the key means the same logical event produces different keys before and after the correction. Your key should uniquely identify the event identity, not its current state.
Don't include timestamps generated at processing time. inserted_at, processed_at, or pipeline_run_id fields should never be part of an idempotency key. They're different every time you run, which defeats the entire purpose.
Do include source system identifiers. If you're ingesting from multiple order management systems into a unified warehouse, your key should include a source system identifier. ORD-88821 from System A and ORD-88821 from System B are different events.
For batch pipelines writing to columnar data warehouses (Snowflake, BigQuery, Redshift, Delta Lake), the most robust idempotency pattern isn't upserts — it's partition-based overwrite. The idea is simple and powerful: define your pipeline's output in terms of discrete, non-overlapping partitions, and always fully replace a partition rather than merging into it.
The mental model: instead of thinking "add today's new records to the table," think "compute today's complete state and replace yesterday's computation entirely."
from datetime import date, timedelta
from google.cloud import bigquery
def run_daily_revenue_pipeline(
processing_date: date,
client: bigquery.Client,
source_dataset: str,
target_table: str
) -> None:
"""
Computes daily revenue aggregates and atomically replaces the partition.
This function is safe to run multiple times for the same processing_date.
The partition overwrite guarantees idempotency at the storage layer.
"""
partition_decorator = processing_date.strftime('%Y%m%d')
target_partition = f"{target_table}${partition_decorator}"
query = f"""
SELECT
DATE(transaction_timestamp) AS revenue_date,
product_category,
warehouse_region,
SUM(net_amount_usd) AS total_revenue,
COUNT(DISTINCT transaction_id) AS transaction_count,
COUNT(DISTINCT customer_id) AS unique_customers
FROM `{source_dataset}.transactions`
WHERE
DATE(transaction_timestamp) = '{processing_date.isoformat()}'
AND status NOT IN ('REFUNDED', 'CANCELLED', 'DISPUTED')
GROUP BY 1, 2, 3
"""
job_config = bigquery.QueryJobConfig(
destination=target_partition,
# WRITE_TRUNCATE is the key: it fully replaces the partition
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
time_partitioning=bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="revenue_date"
)
)
job = client.query(query, job_config=job_config)
job.result() # Wait for completion
print(f"Successfully wrote partition {partition_decorator} to {target_table}")
The WRITE_TRUNCATE disposition is the entire mechanism. When the job runs, BigQuery atomically truncates the target partition and replaces it with the new query results. If the job fails halfway through, BigQuery does not partially update the partition — it either fully replaces it or leaves the old data intact. This is the atomic, idempotent property you want.
Warning: Partition-based overwrite only works when your partition boundaries align precisely with your pipeline's logical processing units. If you're running a daily job on date partitions, this is clean. If you have late-arriving data that can land in multiple partitions, or your partition granularity doesn't match your retry granularity, upserts are more appropriate.
Late-arriving data is the scenario that breaks naive partition overwrite strategies. Suppose a transaction from January 15th doesn't arrive in your data warehouse until January 20th because of a delay in the upstream system. Your January 15th partition has already been computed and closed. What now?
The standard industry approach is watermarking with a lookback window. Instead of processing only today's partition, your pipeline always reprocesses the last N days:
def run_revenue_pipeline_with_lookback(
run_date: date,
client: bigquery.Client,
lookback_days: int = 3,
**kwargs
) -> None:
"""
Reprocesses the last N days to capture late-arriving data.
This is still idempotent because each partition is fully replaced.
The cost is proportional to lookback_days, so choose carefully.
"""
for days_back in range(lookback_days + 1):
processing_date = run_date - timedelta(days=days_back)
print(f"Processing partition: {processing_date.isoformat()}")
run_daily_revenue_pipeline(
processing_date=processing_date,
client=client,
**kwargs
)
print(f"Completed pipeline run for {run_date}, lookback={lookback_days} days")
The key insight: this is still idempotent. Running the whole pipeline twice for run_date=2024-01-20 will produce identical output because we're replacing partitions, not accumulating into them. The lookback window determines the maximum acceptable lateness for your data. If a transaction arrives more than 3 days late, it'll be missing from your aggregates. This is a business decision, not a technical one — define it explicitly and document it.
Idempotent writes handle the destination side of your pipeline. But what about long-running pipelines that need to be resilient on the processing side? If you're extracting 10 million rows from a source system and your pipeline fails at row 7.3 million, you don't want to restart from the beginning — both because it's slow and because the source system's rate limits might kick in.
The solution is checkpointing: persisting the pipeline's progress at regular intervals so it can resume from the last known good state rather than from scratch.
import json
import logging
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from typing import Iterator, Any
logger = logging.getLogger(__name__)
@dataclass
class PipelineCheckpoint:
pipeline_id: str
run_date: str
last_processed_offset: int
last_processed_key: str | None
records_processed: int
checkpoint_timestamp: str
def save(self, checkpoint_dir: Path) -> None:
checkpoint_file = checkpoint_dir / f"{self.pipeline_id}_{self.run_date}.json"
with open(checkpoint_file, 'w') as f:
json.dump(asdict(self), f, indent=2)
logger.info(f"Checkpoint saved: offset={self.last_processed_offset}")
@classmethod
def load(cls, pipeline_id: str, run_date: str, checkpoint_dir: Path) -> 'PipelineCheckpoint | None':
checkpoint_file = checkpoint_dir / f"{pipeline_id}_{run_date}.json"
if not checkpoint_file.exists():
return None
with open(checkpoint_file, 'r') as f:
data = json.load(f)
return cls(**data)
def extract_with_checkpointing(
pipeline_id: str,
run_date: str,
source_extractor: Iterator[dict],
transformer,
loader,
checkpoint_dir: Path,
checkpoint_every_n_records: int = 10_000
) -> int:
"""
Runs extraction with periodic checkpointing for resumability.
On restart, picks up from the last checkpoint offset rather
than reprocessing from the beginning.
"""
checkpoint = PipelineCheckpoint.load(pipeline_id, run_date, checkpoint_dir)
if checkpoint:
records_to_skip = checkpoint.last_processed_offset
records_processed = checkpoint.records_processed
logger.info(
f"Resuming from checkpoint: skipping {records_to_skip} records, "
f"{records_processed} already processed"
)
else:
records_to_skip = 0
records_processed = 0
logger.info(f"Starting fresh pipeline run for {pipeline_id} on {run_date}")
batch = []
current_offset = 0
for record in source_extractor:
# Skip records we've already processed
if current_offset < records_to_skip:
current_offset += 1
continue
transformed = transformer(record)
batch.append(transformed)
current_offset += 1
records_processed += 1
if len(batch) >= checkpoint_every_n_records:
loader(batch)
new_checkpoint = PipelineCheckpoint(
pipeline_id=pipeline_id,
run_date=run_date,
last_processed_offset=current_offset,
last_processed_key=record.get('id'),
records_processed=records_processed,
checkpoint_timestamp=datetime.utcnow().isoformat()
)
new_checkpoint.save(checkpoint_dir)
batch = []
# Flush remaining batch
if batch:
loader(batch)
# Clear checkpoint on successful completion
checkpoint_file = checkpoint_dir / f"{pipeline_id}_{run_date}.json"
if checkpoint_file.exists():
checkpoint_file.unlink()
logger.info("Pipeline completed successfully, checkpoint cleared")
return records_processed
Tip: In production, store checkpoints in a durable, shared location — not on local disk. Use object storage (S3, GCS) or a database. Local disk disappears when your container restarts, which is often exactly when you need the checkpoint.
A subtle issue with offset-based checkpointing: if your source system doesn't guarantee stable ordering between runs (many APIs don't), skipping by offset count is unreliable. After a restart, record 7,300,001 in the second run might not be the same record as 7,300,001 in the first run. Use cursor-based pagination instead — store the last processed key (a timestamp, an ID, or a cursor token) and resume from that key in the next run.
Batch pipelines have it relatively easy. Streaming pipelines are where exactly-once semantics get genuinely hard, because you're dealing with continuous, potentially out-of-order data flows with no natural "run boundaries."
Kafka's transactional API, introduced in version 0.11, allows producers to write to multiple partitions atomically and consumers to commit offsets atomically with their processing. Together, they provide what Kafka calls "exactly-once stream processing."
The mechanics work like this: a transactional producer assigns itself a stable transactional.id. When it begins a transaction, writes to multiple Kafka topics within that transaction are either all committed or all aborted — consumers configured with isolation.level=read_committed only see committed data. The broker tracks the producer's state using a Producer ID (PID) and epoch number, which allows it to deduplicate retried writes at the broker level.
from confluent_kafka import Producer, Consumer, KafkaException
import json
import logging
logger = logging.getLogger(__name__)
class ExactlyOnceStreamProcessor:
"""
Demonstrates Kafka's exactly-once transactional pattern for
a read-process-write pipeline (consume from one topic, produce to another).
This pattern is commonly used in streaming ETL: consume raw events,
transform them, and write enriched events to a downstream topic,
with exactly-once guarantees.
"""
def __init__(
self,
bootstrap_servers: str,
source_topic: str,
target_topic: str,
consumer_group: str,
transactional_id: str
):
self.source_topic = source_topic
self.target_topic = target_topic
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': consumer_group,
'enable.auto.commit': False, # We manage commits inside transactions
'isolation.level': 'read_committed', # Only see committed messages
'auto.offset.reset': 'earliest'
})
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'transactional.id': transactional_id, # Enables exactly-once
'enable.idempotence': True, # Required for transactional producers
'acks': 'all',
'retries': 2147483647 # INT_MAX - retry forever
})
# Initialize transactions — must be called once before any transactions
self.producer.init_transactions()
def transform(self, raw_event: dict) -> dict:
"""Your business logic goes here."""
return {
**raw_event,
'processed_at': '2024-01-15T14:32:11Z',
'revenue_tier': 'high' if raw_event.get('amount', 0) > 1000 else 'standard',
'pipeline_version': '2.1.0'
}
def process_batch(self, messages: list) -> None:
"""
Process a batch of messages within a single transaction.
The offset commit for the consumer group is included in the
producer transaction — this is what makes it exactly-once.
"""
if not messages:
return
self.producer.begin_transaction()
try:
offsets_to_commit = {}
for message in messages:
raw_event = json.loads(message.value())
transformed = self.transform(raw_event)
self.producer.produce(
topic=self.target_topic,
key=message.key(),
value=json.dumps(transformed).encode('utf-8')
)
# Track the offset for this partition
tp = message.topic_partition()
offsets_to_commit[tp] = message.offset() + 1
# Flush all produce calls before committing
self.producer.flush()
# Send consumer offsets AS PART OF the transaction
# This is the critical step — offset and output commit are atomic
self.producer.send_offsets_to_transaction(
list(offsets_to_commit.items()),
self.consumer.consumer_group_metadata()
)
self.producer.commit_transaction()
logger.info(f"Committed transaction for {len(messages)} messages")
except KafkaException as e:
logger.error(f"Transaction failed, aborting: {e}")
self.producer.abort_transaction()
raise
def run(self, batch_size: int = 100) -> None:
self.consumer.subscribe([self.source_topic])
try:
while True:
messages = self.consumer.consume(
num_messages=batch_size,
timeout=5.0
)
valid_messages = [m for m in messages if m.error() is None]
if valid_messages:
self.process_batch(valid_messages)
finally:
self.consumer.close()
The critical line is send_offsets_to_transaction(). By including the consumer's offset commit inside the producer's transaction, we create an atomic unit: either the transformed messages are written to the output topic AND the source offsets are advanced, or neither happens. This eliminates the gap between "messages processed" and "offsets committed" that causes duplicates in at-least-once systems.
Warning: The
transactional.idmust be unique per producer instance AND stable across restarts of the same logical producer. If two producer instances share atransactional.id, one will fence the other — intentionally. In Kubernetes, derive the transactional ID from a stable pod identifier, not a randomly generated UUID.
Streaming pipelines must distinguish between two notions of time:
A payment processed at 11:59:50 PM might arrive in Kafka at 12:00:05 AM — it's a January 15th transaction that landed in your January 16th processing window. If you're aggregating by processing time, it goes in the wrong bucket. If you're aggregating by event time (correct for most analytical use cases), you need to handle late arrivals.
Apache Flink's watermarking mechanism is the industry standard for this. A watermark is a signal that says "we believe all events up to timestamp T have now arrived." When the watermark advances past a window's end time, Flink closes that window and emits results. The watermark lag — how far behind event time your watermark trails — is your late-arrival tolerance.
# Apache Flink Python API (PyFlink) example
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import ProcessWindowFunction
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time, Duration
from pyflink.common.watermark_strategy import WatermarkStrategy
def build_revenue_aggregation_pipeline():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# Configure watermark strategy:
# - BoundedOutOfOrderness: tolerate up to 5 minutes of late arrivals
# - TimestampAssigner: extract event time from the record
watermark_strategy = (
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_minutes(5))
.with_timestamp_assigner(lambda event, _: event['transaction_timestamp_ms'])
)
source = (
env
.from_source(
source=build_kafka_source(), # Your Kafka source connector
watermark_strategy=watermark_strategy,
source_name="transactions"
)
)
revenue_by_category = (
source
.key_by(lambda event: event['product_category'])
.window(TumblingEventTimeWindows.of(Time.hours(1)))
# late_data_allowed gives events an additional grace period
# after the watermark passes, before the window is discarded
.allowed_lateness(Time.minutes(10))
.process(RevenueAggregationFunction())
)
revenue_by_category.add_sink(build_bigquery_sink())
env.execute("Revenue Aggregation Pipeline")
The combination of for_bounded_out_of_orderness(5 minutes) and allowed_lateness(10 minutes) means your system will tolerate events arriving up to 15 minutes late. Events arriving after that window are either dropped or routed to a side output for separate handling.
Once your pipeline processes millions of events per hour, naive deduplication strategies break down. You can't maintain an in-memory set of every processed ID — it'll exhaust memory in hours. You can't query a database for every record — latency becomes unacceptable. You need probabilistic or partitioned deduplication strategies.
A Bloom filter is a probabilistic data structure that answers the question "have I seen this ID before?" with the property that false negatives are impossible (it will never say "unseen" for something it has seen) but false positives are possible (it might say "seen" for something it hasn't). For deduplication, false positives mean you occasionally drop a record you should have kept — acceptable if the false positive rate is tuned appropriately.
from pybloom_live import ScalableBloomFilter
import redis
import pickle
from typing import Optional
class DistributedDeduplicator:
"""
Redis-backed Bloom filter for high-throughput stream deduplication.
Uses a Scalable Bloom Filter that automatically expands as the
cardinality of seen IDs grows. Persistence in Redis ensures
the filter survives process restarts.
Appropriate for: high-volume event streams where <0.1% false positive
rate is acceptable and memory is constrained.
NOT appropriate for: financial transactions, compliance data, or any
use case where a duplicate is more costly than a false positive.
"""
def __init__(
self,
redis_client: redis.Redis,
filter_key: str,
initial_capacity: int = 1_000_000,
error_rate: float = 0.001 # 0.1% false positive rate
):
self.redis = redis_client
self.filter_key = filter_key
self.error_rate = error_rate
self.initial_capacity = initial_capacity
self._filter = self._load_or_create()
def _load_or_create(self) -> ScalableBloomFilter:
serialized = self.redis.get(self.filter_key)
if serialized:
return pickle.loads(serialized)
return ScalableBloomFilter(
initial_capacity=self.initial_capacity,
error_rate=self.error_rate,
mode=ScalableBloomFilter.LARGE_SET_GROWTH
)
def _persist(self) -> None:
self.redis.set(self.filter_key, pickle.dumps(self._filter))
def is_duplicate(self, event_id: str) -> bool:
return event_id in self._filter
def mark_seen(self, event_id: str) -> None:
self._filter.add(event_id)
def process_and_deduplicate(self, events: list[dict], id_field: str = 'event_id') -> list[dict]:
"""
Filter duplicates from a batch. Persists the filter after each batch.
"""
unique_events = []
for event in events:
event_id = event[id_field]
if not self.is_duplicate(event_id):
unique_events.append(event)
self.mark_seen(event_id)
# Persist after batch to survive restarts
self._persist()
duplicate_count = len(events) - len(unique_events)
if duplicate_count > 0:
print(f"Filtered {duplicate_count} duplicates from batch of {len(events)}")
return unique_events
For financial-grade deduplication where you cannot tolerate false positives, use a partitioned key-value store approach: hash the event ID to a shard, and maintain an exact set per shard with a TTL equal to your maximum retry window. If a transaction retry window is 24 hours, keep IDs for 48 hours to be safe.
The most reliable deduplication pattern for data warehouse loads is the staging table with MERGE pattern:
-- Step 1: Load all incoming data (including potential duplicates)
-- into a staging table, no constraints enforced
CREATE OR REPLACE TABLE staging.raw_transactions_20240115 AS
SELECT * FROM EXTERNAL_STAGE('s3://data-lake/raw/transactions/2024/01/15/');
-- Step 2: Deduplicate within the staging table using ROW_NUMBER
-- This handles duplicates that arrived in the same batch
CREATE OR REPLACE TABLE staging.deduped_transactions_20240115 AS
SELECT *
FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY transaction_id
ORDER BY ingested_at DESC -- Keep the most recent version
) AS row_num
FROM staging.raw_transactions_20240115
)
WHERE row_num = 1;
-- Step 3: MERGE into production, handling cross-batch duplicates
MERGE INTO production.transactions AS target
USING staging.deduped_transactions_20240115 AS source
ON target.transaction_id = source.transaction_id
WHEN MATCHED AND source.updated_at > target.updated_at THEN
UPDATE SET
target.status = source.status,
target.net_amount_usd = source.net_amount_usd,
target.updated_at = source.updated_at,
target.last_pipeline_run = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
INSERT (transaction_id, order_id, customer_id, product_sku,
net_amount_usd, status, created_at, updated_at, last_pipeline_run)
VALUES (source.transaction_id, source.order_id, source.customer_id,
source.product_sku, source.net_amount_usd, source.status,
source.created_at, source.updated_at, CURRENT_TIMESTAMP());
-- Step 4: Audit
SELECT
COUNT(*) AS rows_processed,
SUM(CASE WHEN _merge_action = 'INSERT' THEN 1 ELSE 0 END) AS new_records,
SUM(CASE WHEN _merge_action = 'UPDATE' THEN 1 ELSE 0 END) AS updated_records
FROM staging.deduped_transactions_20240115;
This three-step pattern — raw load, in-batch deduplication, cross-batch MERGE — handles both within-batch and cross-batch duplicates, and it's idempotent. Run it twice for the same date, you get the same result.
Large-scale pipelines need a control layer that tracks run state, prevents concurrent execution, and enables safe reruns. This is often implemented as a pipeline metadata table — a simple database table that records what ran, when, and what the outcome was.
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
import psycopg2
from contextlib import contextmanager
class RunStatus(Enum):
RUNNING = 'RUNNING'
SUCCESS = 'SUCCESS'
FAILED = 'FAILED'
SKIPPED = 'SKIPPED'
@contextmanager
def pipeline_run_guard(
conn: psycopg2.extensions.connection,
pipeline_name: str,
logical_date: str,
allow_rerun: bool = True
):
"""
Context manager that:
1. Prevents concurrent runs of the same pipeline for the same logical_date
2. Records run start, outcome, and duration
3. Allows safe reruns of failed pipelines
4. Skips reruns of successful pipelines (unless explicitly overridden)
Usage:
with pipeline_run_guard(conn, 'daily_revenue', '2024-01-15') as run_id:
run_my_pipeline()
"""
cur = conn.cursor()
run_id = None
try:
# Check for existing runs
cur.execute("""
SELECT run_id, status, started_at
FROM pipeline_metadata.pipeline_runs
WHERE pipeline_name = %s
AND logical_date = %s
ORDER BY started_at DESC
LIMIT 1
""", (pipeline_name, logical_date))
existing = cur.fetchone()
if existing:
existing_run_id, existing_status, started_at = existing
if existing_status == RunStatus.RUNNING.value:
# Check if the run is stale (started more than 4 hours ago)
staleness_hours = (datetime.utcnow() - started_at).total_seconds() / 3600
if staleness_hours < 4:
raise RuntimeError(
f"Pipeline {pipeline_name}/{logical_date} is already running "
f"(run_id={existing_run_id}). Aborting to prevent concurrent execution."
)
# Stale run detected, mark it as failed and proceed
cur.execute("""
UPDATE pipeline_metadata.pipeline_runs
SET status = 'FAILED', error_message = 'Marked stale by subsequent run'
WHERE run_id = %s
""", (existing_run_id,))
elif existing_status == RunStatus.SUCCESS.value and not allow_rerun:
yield None # Signal that we're skipping
return
# Register this run
cur.execute("""
INSERT INTO pipeline_metadata.pipeline_runs
(pipeline_name, logical_date, status, started_at)
VALUES (%s, %s, %s, %s)
RETURNING run_id
""", (pipeline_name, logical_date, RunStatus.RUNNING.value, datetime.utcnow()))
run_id = cur.fetchone()[0]
conn.commit()
# Yield control to the caller
yield run_id
# Mark success
cur.execute("""
UPDATE pipeline_metadata.pipeline_runs
SET status = %s, completed_at = %s
WHERE run_id = %s
""", (RunStatus.SUCCESS.value, datetime.utcnow(), run_id))
conn.commit()
except Exception as e:
if run_id:
cur.execute("""
UPDATE pipeline_metadata.pipeline_runs
SET status = %s, completed_at = %s, error_message = %s
WHERE run_id = %s
""", (RunStatus.FAILED.value, datetime.utcnow(), str(e)[:2000], run_id))
conn.commit()
raise
finally:
cur.close()
This guard prevents two major classes of failure: concurrent execution (two schedulers both firing the same job) and silent reruns (a job marked SUCCESS getting run again and writing duplicate data). The staleness check handles the case where a job died without updating its status — a container crash, OOM kill, or network partition.
Build an end-to-end idempotent pipeline for the following scenario:
Scenario: You're ingesting customer support ticket data from a REST API into BigQuery. The API returns tickets paginated by cursor. Tickets can be updated after creation (status changes, agent reassignment). Your pipeline runs every 4 hours. The source API has a rate limit of 100 requests per minute.
Requirements:
Implementation Tasks:
Task 1: Design the BigQuery schema for the target table. Decide on partitioning strategy, clustering keys, and which fields will serve as your idempotency key. Justify each decision.
Task 2: Implement a cursor-based checkpoint mechanism that stores the last processed cursor token in GCS. Handle the edge case where the checkpoint was written but the corresponding batch write to BigQuery failed.
Task 3: Implement the BigQuery write as a MERGE operation using a staging table. Your MERGE should handle three cases: new tickets, status updates to existing tickets, and tickets that arrived in the same batch with different versions (keep the latest by updated_at).
Task 4: Add a pipeline metadata guard that prevents concurrent executions and records run outcomes. Include a health check endpoint that reports the last successful run time and whether the pipeline is currently running.
Task 5 (Challenge): The API sometimes returns the same ticket ID in different cursor pages (a known bug in the source system). Add a within-batch deduplication step that keeps the most recent version of each ticket ID before writing to the staging table.
Test your implementation by simulating a mid-run failure at different stages and verifying that a restart produces the correct final state.
-- WRONG: This is different every time you run it
INSERT INTO pipeline_runs (pipeline_id, run_date, loaded_at)
SELECT pipeline_id, run_date, CURRENT_TIMESTAMP() as loaded_at -- Not idempotent!
FROM staging_table;
-- RIGHT: Use a parameterized, fixed timestamp passed in from your orchestrator
INSERT INTO pipeline_runs (pipeline_id, run_date, loaded_at)
SELECT pipeline_id, run_date, '{{ run_timestamp }}' as loaded_at
FROM staging_table;
CURRENT_TIMESTAMP() in a transformation means every run produces different output for the same input. This makes reruns produce different data, which violates the idempotency contract. Always pass timestamps as pipeline parameters from your orchestrator.
If your pipeline processes a date partition in chunks (multiple separate jobs writing to the same partition), partition-based overwrite doesn't work cleanly — you need the full partition to be computed in one atomic operation. Either:
Mixing chunked writes with WRITE_TRUNCATE means each chunk wipes out the previous chunks' work.
Without the transactional producer setup, committing Kafka consumer offsets and writing to a database are two separate operations. A crash between them creates duplicates (write succeeded, offset not committed) or message loss (offset committed, write failed). Always use Kafka's transactional API or implement idempotent consumers with a local deduplication table.
If your pipeline maintains in-memory state (running totals, lookup caches), checkpointing only the processing offset means you'll resume from the right record but with the wrong state. Either:
Flink handles this correctly with its distributed snapshotting algorithm. Custom pipelines often get it wrong.
# WRONG: UUID4 is random — same input produces different keys on each run
record['idempotency_key'] = str(uuid.uuid4())
# WRONG: uuid5 with random namespace — still different each time
record['idempotency_key'] = str(uuid.uuid5(uuid.uuid4(), record['event_id']))
# RIGHT: uuid5 with a fixed, application-specific namespace
APP_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
record['idempotency_key'] = str(uuid.uuid5(APP_NAMESPACE, record['event_id']))
UUID5 is deterministic — given the same namespace and name, it always produces the same UUID. This makes it suitable for surrogate idempotency keys. Store the namespace UUID as a constant in your codebase, not a runtime value.
When your source system adds a new column, your MERGE statement needs to handle it. A common failure mode: the MERGE was written with an explicit column list that doesn't include the new column. New data comes in, the MERGE runs, and the new column is silently dropped. Audit your MERGE statements whenever source schemas change, and consider using column-list-free MERGE variants that dynamically handle schema.
Let's consolidate what you've built in this lesson.
The core principle: Design for failure as the default, not the exception. Every pipeline that runs in production will be restarted, retried, or rerun. The question isn't whether — it's when. Idempotency guarantees that retries are safe.
The practical approach: True exactly-once delivery is expensive and complex. The production-proven approach is at-least-once delivery + idempotent consumers = effectively-once semantics for your data consumers.
The layered strategy:
Where to go next:
Delta Lake / Apache Iceberg ACID transactions — If your lakehouse architecture uses Delta Lake or Iceberg, these formats bring ACID transactions to object storage, enabling atomic partition swaps and row-level deletes that make idempotent pipeline design substantially easier.
Temporal tables and bi-temporal data modeling — Once your pipelines are idempotent, the next frontier is tracking not just the current state of data but its full history: both when events occurred and when they were recorded in your system. This is bi-temporal modeling, and it's essential for regulatory compliance and accurate historical reporting.
dbt's incremental models — If you use dbt, study its incremental materialization strategy and unique_key configuration. It implements several of the patterns from this lesson (MERGE, partition overwrite) with a declarative interface, but understanding the underlying mechanics lets you make better decisions about when to use which strategy.
Apache Flink's stateful stream processing and checkpointing — Flink's distributed snapshot algorithm (based on the Chandy-Lamport algorithm) is the gold standard for streaming pipeline idempotency. Deep-diving into Flink's state backends, checkpoint storage, and recovery mechanisms will give you the most rigorous understanding of exactly-once streaming semantics available in open-source software.
Learning Path: Data Pipeline Fundamentals