Picture this: you've built a RAG pipeline for your company's internal knowledge base. It retrieves documents, passes them to an LLM, and generates answers. It works beautifully in your demos. Then it hits production, and users start asking questions that span multiple documents, require synthesizing conflicting information, or demand follow-up clarifications before a real answer is even possible. Your pipeline confidently returns garbage — or worse, it returns a plausible-sounding answer that's subtly wrong because the retrieval step pulled the wrong chunk.
Standard RAG is a single-pass architecture: query in, documents out, answer generated. It has no mechanism for recognizing when its own retrieval was inadequate, no way to ask a clarifying sub-question, no ability to say "wait, this evidence contradicts itself — let me look harder." That's the fundamental ceiling of naive RAG, and if you're working on anything beyond simple FAQ lookup, you've probably already hit it.
Agentic RAG breaks that ceiling by treating retrieval as a reasoning loop rather than a lookup step. In this lesson, we're going to build a self-correcting retrieval pipeline from scratch — one that retrieves, reflects on the quality of what it found, decides whether to retry with a different strategy, and synthesizes a final answer only when it's confident the evidence is actually good enough. By the end, you'll understand the architecture at a design level and have working Python code implementing the core patterns.
What you'll learn:
You should be comfortable with:
You'll need: langchain, langgraph, openai, chromadb, and pydantic installed. We'll use OpenAI models throughout, but the patterns translate to any capable LLM.
Before we write a single line of agentic code, let's be precise about what fails in standard RAG and why. This matters because agentic RAG adds significant complexity — you should only reach for it when you understand the specific failure mode it's solving.
Standard RAG fails in four distinct ways:
Retrieval failure (wrong documents). The embedding similarity between a question and the relevant passage is lower than the similarity to an irrelevant but topically adjacent passage. This is particularly common with technical jargon, acronyms, or domain-specific phrasing. You retrieve documents about the right topic that don't actually answer the question.
Coverage failure (partial documents). The answer requires synthesizing information from three separate sections of a codebase, or combining a policy document with a specific amendment that supersedes it. Single-pass retrieval with a fixed top_k can't guarantee coverage of all required pieces.
Ambiguity failure (wrong interpretation). The query "how do I handle errors in the pipeline?" could mean exception handling in code, error recovery in data processing, or alerting and escalation policy. Without disambiguation, the retrieval is a coin flip.
Generation failure (hallucination despite good retrieval). Even with perfect documents in context, the LLM can confabulate. The model says something that isn't in any of the retrieved documents, or synthesizes a plausible-but-wrong conclusion.
Each of these failures calls for a different corrective mechanism:
| Failure Type | Agentic Mechanism |
|---|---|
| Wrong documents | Relevance grading + query rewriting |
| Partial coverage | Iterative sub-question decomposition |
| Ambiguous query | Query clarification / expansion |
| Hallucination | Post-generation grounding check |
Agentic RAG adds these mechanisms as nodes in a reasoning graph. The pipeline can traverse multiple paths through this graph before committing to a final answer.
The mental model shift here is crucial. Standard RAG is a chain: each step executes once, in sequence, and passes its output to the next step. Agentic RAG is a graph: nodes represent operations (retrieve, grade, rewrite, generate, check), and edges represent conditional routing decisions.
Here's the full graph we'll build:
[Start]
│
▼
[Query Rewriter] ─────────────────────┐
│ │ (rewrite on failure)
▼ │
[Retriever] │
│ │
▼ │
[Relevance Grader] ──── (all docs irrelevant) ──►[Rewrite?]──┘
│ (sufficient relevant docs) │
│ (max retries hit)
▼ │
[Generator] ▼
│ [Fallback Response]
▼
[Hallucination Checker]
│ │
(grounded) (hallucination detected)
│ │
▼ ▼
[Answer] [Retry with augmented context]
This graph has cycles (the rewrite loop), conditional branches (relevance grade routing, hallucination routing), and a termination guard (max retries). Getting these right is the difference between a useful self-correcting pipeline and an infinite loop that burns your OpenAI budget.
Let's build it piece by piece.
In LangGraph, the entire pipeline shares a single state object that flows through nodes and gets updated at each step. Define it thoughtfully — it's the backbone of every decision in your pipeline.
from dataclasses import dataclass, field
from typing import Optional
from pydantic import BaseModel, Field
class AgenticRAGState(BaseModel):
"""
Shared state for the agentic RAG pipeline.
All nodes read from and write to this object.
"""
# The original question, never modified
original_query: str
# The current working query (may be rewritten)
current_query: str
# Retrieved document chunks with metadata
retrieved_documents: list[dict] = Field(default_factory=list)
# Documents that passed the relevance grader
relevant_documents: list[dict] = Field(default_factory=list)
# The generated answer (if any)
generation: Optional[str] = None
# Number of query rewrites attempted
rewrite_count: int = 0
# Maximum rewrites before giving up
max_rewrites: int = 3
# Did the hallucination checker pass the final generation?
grounded: Optional[bool] = None
# Trace of decisions made (useful for debugging and logging)
reasoning_trace: list[str] = Field(default_factory=list)
# Final answer, set only when the pipeline commits
final_answer: Optional[str] = None
# Failure message if the pipeline exhausts all retries
failure_reason: Optional[str] = None
The reasoning_trace field is genuinely important in production — it's how you audit why the pipeline made each decision. When an answer is wrong, you don't want to guess; you want a record of every routing decision.
This is your standard retrieval step, but we're making it explicit as a node so the graph can route back to it after a rewrite.
import chromadb
from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction
from openai import OpenAI
client = OpenAI()
# Initialize ChromaDB with OpenAI embeddings
embedding_fn = OpenAIEmbeddingFunction(
api_key="your-api-key",
model_name="text-embedding-3-small"
)
chroma_client = chromadb.PersistentClient(path="./knowledge_base")
collection = chroma_client.get_collection(
name="engineering_docs",
embedding_function=embedding_fn
)
def retrieve(state: AgenticRAGState) -> AgenticRAGState:
"""
Query the vector store with the current working query.
Returns top-k documents with their distances and metadata.
"""
results = collection.query(
query_texts=[state.current_query],
n_results=6, # Retrieve more than you need; the grader will filter
include=["documents", "metadatas", "distances"]
)
# Flatten ChromaDB's nested result format into a usable list
documents = []
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
):
documents.append({
"content": doc,
"metadata": meta,
"distance": dist, # Lower = more similar in ChromaDB
"source": meta.get("source", "unknown")
})
state.retrieved_documents = documents
state.reasoning_trace.append(
f"Retrieved {len(documents)} documents for query: '{state.current_query}'"
)
return state
Notice we retrieve n_results=6 even though we might only need 3-4. We're deliberately over-fetching because the relevance grader will filter. You want the grader to have options to work with.
This is where agentic RAG starts to diverge from standard RAG. Instead of passing everything retrieved directly to the generator, we stop and ask: are these documents actually relevant to what was asked?
The grader uses structured output to give us a machine-readable decision plus a reasoning trace.
from pydantic import BaseModel
from typing import Literal
class RelevanceGrade(BaseModel):
"""Structured output for document relevance grading."""
relevant: bool = Field(
description="Whether this document is relevant to answering the query"
)
relevance_score: Literal["high", "medium", "low"] = Field(
description="Qualitative relevance score"
)
reasoning: str = Field(
description="One sentence explaining why this document is or isn't relevant"
)
def grade_document_relevance(document: dict, query: str) -> RelevanceGrade:
"""
Use an LLM to grade whether a retrieved document is relevant
to the user's query. This is a binary decision with reasoning.
"""
system_prompt = """You are a precise relevance grader for a technical knowledge base.
Your job is to assess whether a retrieved document chunk contains information
that would help answer the user's question. Be strict — a document that is
topically related but doesn't actually address the question should be marked
NOT relevant.
Consider: Does this document contain facts, procedures, or explanations that
a reader would need to answer this specific question? If the question asks
about error handling in Python and the document discusses Python decorators
without mentioning errors, mark it not relevant."""
user_prompt = f"""Query: {query}
Document chunk:
---
{document['content']}
---
Source: {document['source']}
Is this document relevant to answering the query?"""
response = client.beta.chat.completions.parse(
model="gpt-4o-mini", # Use mini here — this is a cheap classification call
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
response_format=RelevanceGrade,
temperature=0 # Deterministic grading
)
return response.choices[0].message.parsed
def grade_relevance(state: AgenticRAGState) -> AgenticRAGState:
"""
Grade all retrieved documents and filter to only the relevant ones.
If fewer than 2 relevant documents are found, flag for query rewrite.
"""
relevant_docs = []
for doc in state.retrieved_documents:
grade = grade_document_relevance(doc, state.current_query)
if grade.relevant and grade.relevance_score in ("high", "medium"):
relevant_docs.append({
**doc,
"relevance_score": grade.relevance_score,
"grade_reasoning": grade.reasoning
})
state.reasoning_trace.append(
f"Document from {doc['source']}: {grade.relevance_score} relevance. "
f"Reason: {grade.reasoning}"
)
state.relevant_documents = relevant_docs
state.reasoning_trace.append(
f"Grading complete: {len(relevant_docs)}/{len(state.retrieved_documents)} "
f"documents passed relevance check."
)
return state
Cost optimization note: We use
gpt-4o-minifor grading, notgpt-4o. Grading is a simple binary classification task — it doesn't need a powerful model. Save your expensive model calls for generation and hallucination checking. In production, you might even fine-tune a small classifier for this step and eliminate the LLM call entirely.
After grading, the graph needs to decide: do we have enough good documents to generate an answer, or do we need to rewrite the query and try again?
def route_after_grading(state: AgenticRAGState) -> str:
"""
Conditional edge function. Returns the name of the next node.
Rules:
- If we have 2+ relevant documents: proceed to generation
- If we have fewer and haven't hit max retries: rewrite query
- If we've hit max retries: go to fallback
"""
MIN_RELEVANT_DOCS = 2
if len(state.relevant_documents) >= MIN_RELEVANT_DOCS:
state.reasoning_trace.append(
"Routing decision: Sufficient relevant documents found. Proceeding to generation."
)
return "generate"
if state.rewrite_count >= state.max_rewrites:
state.reasoning_trace.append(
f"Routing decision: Max rewrites ({state.max_rewrites}) reached. "
f"Routing to fallback."
)
return "fallback"
state.reasoning_trace.append(
f"Routing decision: Insufficient relevant docs ({len(state.relevant_documents)}). "
f"Routing to query rewriter (attempt {state.rewrite_count + 1}/{state.max_rewrites})."
)
return "rewrite_query"
The MIN_RELEVANT_DOCS threshold is a parameter you'll tune based on your use case. For factual lookups, 2 might be enough. For complex synthesis tasks, you might want 4 or 5. Make it configurable.
When retrieval fails, the question is why it failed and how to fix it. A naive rewriter just rephrases the question slightly. A good rewriter looks at what was retrieved, understands why it was irrelevant, and produces a fundamentally different query strategy.
class RewrittenQuery(BaseModel):
"""Structured output for the query rewriter."""
rewritten_query: str = Field(
description="The new, improved query to try"
)
rewrite_strategy: Literal[
"expand_terms",
"narrow_focus",
"rephrase_intent",
"decompose_to_subquestion",
"add_context"
] = Field(
description="What strategy was used to rewrite the query"
)
reasoning: str = Field(
description="Why this rewrite should perform better than the original"
)
def rewrite_query(state: AgenticRAGState) -> AgenticRAGState:
"""
Rewrite the current query based on what was retrieved and why it failed.
We give the LLM the original query, the current query (if different),
the retrieved documents, and the grading decisions — all the context
it needs to understand what went wrong and how to fix it.
"""
# Build context about what was retrieved and why it failed
retrieval_context = []
for doc in state.retrieved_documents[:3]: # Show top 3 to save tokens
grade_info = f" (Grade: {doc.get('relevance_score', 'not graded')}, Reason: {doc.get('grade_reasoning', 'N/A')})"
retrieval_context.append(
f"Source: {doc['source']}{grade_info}\n"
f"Content preview: {doc['content'][:200]}..."
)
context_str = "\n\n".join(retrieval_context) if retrieval_context else "No documents were retrieved."
system_prompt = """You are an expert at query optimization for vector search retrieval systems.
When a query fails to retrieve relevant documents, your job is to diagnose why
and produce a better query. Common failure modes:
1. Query uses jargon the knowledge base doesn't — try expanding with synonyms
2. Query is too broad — narrow it to the specific aspect needed
3. Query asks multiple things at once — focus on one sub-question
4. Query describes the problem without naming the concept — reframe using
the conceptual terminology likely to appear in documentation
Your rewritten query should be substantively different, not just a synonym swap."""
user_prompt = f"""Original user question: {state.original_query}
Current query being used for retrieval: {state.current_query}
This query retrieved documents that were not relevant. Here's what was retrieved:
{context_str}
Previous rewrite attempts: {state.rewrite_count}
Analyze why the retrieval failed and produce a better query."""
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
response_format=RewrittenQuery,
temperature=0.3 # Slight temperature to encourage creative rewrites
)
result = response.choices[0].message.parsed
state.current_query = result.rewritten_query
state.rewrite_count += 1
state.reasoning_trace.append(
f"Query rewrite #{state.rewrite_count}: Strategy='{result.rewrite_strategy}'. "
f"New query: '{result.rewritten_query}'. "
f"Reasoning: {result.reasoning}"
)
return state
The rewrite_strategy field in the structured output is not just documentation — it's data. In production, you should log these to a database and analyze which strategies succeed most often for your knowledge base. This data can drive improvements to your chunking strategy, metadata tagging, or even your embedding model choice.
Once we have relevant documents, we generate an answer. Notice that we pass the graded relevance scores to the generator — this gives it a signal about which documents to weight more heavily.
def generate_answer(state: AgenticRAGState) -> AgenticRAGState:
"""
Generate an answer using only the relevant, graded documents.
We explicitly tell the generator which documents were high-relevance
and instruct it to stay strictly within the provided context.
"""
# Format documents with their relevance scores
context_parts = []
for i, doc in enumerate(state.relevant_documents, 1):
relevance_label = doc.get("relevance_score", "medium")
context_parts.append(
f"[Document {i} | Relevance: {relevance_label} | Source: {doc['source']}]\n"
f"{doc['content']}"
)
context = "\n\n---\n\n".join(context_parts)
system_prompt = """You are a precise technical assistant. Answer questions using
ONLY the information provided in the context documents below.
Critical rules:
1. If the context doesn't contain enough information to answer the question
completely, say so explicitly — do not fill gaps with general knowledge.
2. Cite the source documents when making specific claims (e.g., "According to
Document 2...").
3. If documents contradict each other, acknowledge the contradiction and explain
what each source says.
4. High-relevance documents should be prioritized over medium-relevance ones."""
user_prompt = f"""Question: {state.original_query}
Context documents:
{context}
Answer the question based strictly on the provided context."""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1
)
state.generation = response.choices[0].message.content
state.reasoning_trace.append(
f"Generated answer using {len(state.relevant_documents)} relevant documents."
)
return state
This is the most underimplemented component in most RAG pipelines. Post-generation grounding verification asks: does the answer we just generated actually come from the retrieved documents, or did the model confabulate?
class GroundingCheck(BaseModel):
"""Structured output for the hallucination checker."""
is_grounded: bool = Field(
description="Whether the answer is fully supported by the provided documents"
)
ungrounded_claims: list[str] = Field(
description="List of specific claims in the answer that cannot be verified "
"in the source documents. Empty list if fully grounded.",
default_factory=list
)
confidence: Literal["high", "medium", "low"] = Field(
description="Confidence in the grounding assessment"
)
reasoning: str = Field(
description="Brief explanation of the grounding assessment"
)
def check_hallucination(state: AgenticRAGState) -> AgenticRAGState:
"""
Verify that every factual claim in the generated answer
can be traced back to a source document.
"""
# Build the full context used for generation
context = "\n\n".join([
f"[{doc['source']}]: {doc['content']}"
for doc in state.relevant_documents
])
system_prompt = """You are a meticulous fact-checker for AI-generated content.
Your job is to verify whether a generated answer is fully supported by the
provided source documents. Check every specific factual claim:
- Numbers, dates, version numbers, thresholds
- Procedural steps and their ordering
- Names, identifiers, configurations
- Causal relationships and logical conclusions
A claim is ungrounded if it:
1. States something not mentioned in any source document
2. Contradicts what the source documents say
3. Makes an inference that isn't directly supported (even if plausible)
Be thorough. A confident-sounding wrong answer is worse than "I don't know."
"""
user_prompt = f"""Source documents used to generate this answer:
---
{context}
---
Generated answer to verify:
---
{state.generation}
---
Is every factual claim in this answer supported by the source documents?"""
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
response_format=GroundingCheck,
temperature=0
)
check = response.choices[0].message.parsed
state.grounded = check.is_grounded
if check.ungrounded_claims:
state.reasoning_trace.append(
f"Hallucination check: FAILED. Found {len(check.ungrounded_claims)} "
f"ungrounded claims: {'; '.join(check.ungrounded_claims[:3])}"
)
else:
state.reasoning_trace.append(
f"Hallucination check: PASSED with {check.confidence} confidence. "
f"{check.reasoning}"
)
return state
def route_after_hallucination_check(state: AgenticRAGState) -> str:
"""
If grounded: finalize the answer.
If not grounded and retries remain: trigger a re-retrieval with augmented context.
If not grounded and no retries: return answer with a caveat.
"""
if state.grounded:
return "finalize"
if state.rewrite_count < state.max_rewrites:
state.reasoning_trace.append(
"Routing: Answer not grounded. Attempting retrieval augmentation."
)
return "augment_and_retry"
state.reasoning_trace.append(
"Routing: Answer not grounded but max retries reached. "
"Will finalize with caveat."
)
return "finalize_with_caveat"
Now we assemble all nodes into a LangGraph StateGraph. This is where the routing logic becomes executable.
from langgraph.graph import StateGraph, END
def fallback_response(state: AgenticRAGState) -> AgenticRAGState:
"""Called when max retries are exhausted without finding relevant docs."""
state.final_answer = (
f"I was unable to find relevant information in the knowledge base to "
f"answer your question: '{state.original_query}'. "
f"I attempted {state.rewrite_count} different query formulations "
f"without finding sufficient evidence. Please try rephrasing your "
f"question or check if this topic is covered in the knowledge base."
)
state.failure_reason = "max_retries_exhausted_no_relevant_docs"
return state
def finalize_answer(state: AgenticRAGState) -> AgenticRAGState:
"""Commit the generated answer as the final answer."""
state.final_answer = state.generation
return state
def finalize_with_caveat(state: AgenticRAGState) -> AgenticRAGState:
"""
Commit the generated answer but prepend a caveat about unverified claims.
This is better than suppressing the answer entirely when retries are exhausted.
"""
caveat = (
"⚠️ Note: The following answer may contain claims that could not be "
"fully verified against the source documents. Please verify critical "
"details independently.\n\n"
)
state.final_answer = caveat + (state.generation or "No answer was generated.")
return state
def augment_and_retry(state: AgenticRAGState) -> AgenticRAGState:
"""
When hallucination is detected, formulate a targeted follow-up query
to retrieve the specific facts that were confabulated.
"""
# In a more sophisticated implementation, this would analyze the
# ungrounded claims and generate targeted retrieval queries.
# For now, we fall back to query rewriting with hallucination context.
state.current_query = (
f"{state.original_query} - specifically looking for "
f"factual details and specific values"
)
state.rewrite_count += 1
state.generation = None # Clear the failed generation
state.grounded = None
return state
# Build the graph
workflow = StateGraph(AgenticRAGState)
# Add all nodes
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_relevance", grade_relevance)
workflow.add_node("rewrite_query", rewrite_query)
workflow.add_node("generate", generate_answer)
workflow.add_node("check_hallucination", check_hallucination)
workflow.add_node("fallback", fallback_response)
workflow.add_node("finalize", finalize_answer)
workflow.add_node("finalize_with_caveat", finalize_with_caveat)
workflow.add_node("augment_and_retry", augment_and_retry)
# Set entry point
workflow.set_entry_point("retrieve")
# Add edges
workflow.add_edge("retrieve", "grade_relevance")
# Conditional routing after grading
workflow.add_conditional_edges(
"grade_relevance",
route_after_grading,
{
"generate": "generate",
"rewrite_query": "rewrite_query",
"fallback": "fallback"
}
)
# After rewrite, go back to retrieval
workflow.add_edge("rewrite_query", "retrieve")
# After generation, check for hallucinations
workflow.add_edge("generate", "check_hallucination")
# Conditional routing after hallucination check
workflow.add_conditional_edges(
"check_hallucination",
route_after_hallucination_check,
{
"finalize": "finalize",
"augment_and_retry": "augment_and_retry",
"finalize_with_caveat": "finalize_with_caveat"
}
)
# After augmentation, go back to retrieval
workflow.add_edge("augment_and_retry", "retrieve")
# Terminal nodes
workflow.add_edge("fallback", END)
workflow.add_edge("finalize", END)
workflow.add_edge("finalize_with_caveat", END)
# Compile the graph
app = workflow.compile()
Here's how you invoke it and inspect the full reasoning trace:
def run_agentic_rag(question: str, max_rewrites: int = 3) -> dict:
"""
Run the agentic RAG pipeline and return the result with full trace.
"""
initial_state = AgenticRAGState(
original_query=question,
current_query=question,
max_rewrites=max_rewrites
)
# Run the graph
result = app.invoke(initial_state)
return {
"question": result["original_query"],
"answer": result["final_answer"],
"queries_attempted": [result["original_query"]] + [
# Extract rewritten queries from trace for display
],
"documents_used": len(result["relevant_documents"]),
"rewrites": result["rewrite_count"],
"grounded": result["grounded"],
"failure_reason": result.get("failure_reason"),
"reasoning_trace": result["reasoning_trace"]
}
# Example usage
result = run_agentic_rag(
"What's the retry backoff strategy for the ingestion pipeline when "
"the data warehouse connection times out?"
)
print(f"Answer: {result['answer']}\n")
print(f"Documents used: {result['documents_used']}")
print(f"Query rewrites: {result['rewrites']}")
print(f"Grounded: {result['grounded']}")
print("\n--- Reasoning Trace ---")
for i, step in enumerate(result["reasoning_trace"], 1):
print(f"{i}. {step}")
A typical trace for a successful multi-attempt retrieval looks like:
1. Retrieved 6 documents for query: 'retry backoff strategy ingestion pipeline...'
2. Document from pipeline/ingestion.py: low relevance. Reason: Document covers...
3. Document from ops/runbook.md: low relevance. Reason: General runbook...
4. Grading complete: 0/6 documents passed relevance check.
5. Routing decision: Insufficient relevant docs (0). Routing to query rewriter.
6. Query rewrite #1: Strategy='rephrase_intent'. New query: 'exponential backoff...
7. Retrieved 6 documents for query: 'exponential backoff connection timeout...'
8. Document from config/pipeline_config.yaml: high relevance. Reason: Contains...
9. Document from src/retry_handler.py: high relevance. Reason: Implements...
10. Grading complete: 4/6 documents passed relevance check.
11. Routing decision: Sufficient relevant documents found. Proceeding to generation.
12. Generated answer using 4 relevant documents.
13. Hallucination check: PASSED with high confidence. All claims traceable...
Now that you understand the full pipeline, your task is to extend it with one of the most powerful agentic RAG patterns: iterative sub-question decomposition.
Some questions can't be answered by a single retrieval pass because they require synthesizing information that lives in genuinely separate parts of the knowledge base. For example: "Compare the error handling strategies used in our Python data pipelines vs. our Go microservices." This requires at least two separate retrieval passes, each targeting different content.
Your exercise is to add a query decomposition node that sits before the retriever for complex queries:
Implement a QueryDecomposer class that uses an LLM with structured output to detect whether a query is "complex" (requires multiple sub-questions) or "simple" (single retrieval pass). For complex queries, produce a list of 2-4 sub-questions.
Add a decompose_query node to the graph that runs before the first retrieval. Route simple queries directly to retrieve, complex queries through the decomposer.
Modify the AgenticRAGState to include a sub_questions: list[str] field and a current_sub_question_index: int field. The retriever should use sub_questions[current_sub_question_index] as the query when sub-questions exist.
Add a loop that iterates through all sub-questions, accumulating relevant documents from each pass. Route back to the retriever if there are remaining sub-questions, or forward to generation once all sub-questions have been processed.
Test it with a question that clearly requires synthesis across multiple topics in your knowledge base. Verify (via the reasoning trace) that the pipeline actually retrieves documents for each sub-question separately.
Hint on structured output for the decomposer:
class DecompositionDecision(BaseModel):
is_complex: bool
sub_questions: list[str] = Field(
description="List of 2-4 specific sub-questions. Empty if not complex.",
default_factory=list
)
reasoning: str
The most dangerous bug in agentic RAG is a loop that never terminates. This typically happens when max_rewrites is checked inconsistently — for example, if your routing function increments the counter in one place and checks it in another, or if the augment_and_retry node bypasses the rewrite counter.
Fix: Make the counter increment atomic with the routing decision. Never increment in the routing function — increment in the action node (like rewrite_query), and check in the routing function.
# BAD: Incrementing in the router creates race conditions
def route_after_grading(state):
if ...:
state.rewrite_count += 1 # Don't do this in routers
return "rewrite_query"
# GOOD: Increment in the action node
def rewrite_query(state):
state.rewrite_count += 1 # Increment here
...
return state
Running gpt-4o on every document for every grading call will make your pipeline 3-5x more expensive than necessary. Relevance grading is a classification task, not a reasoning task.
Fix: Use gpt-4o-mini for grading. In high-volume scenarios, consider a fine-tuned classifier or a cross-encoder re-ranker (like cross-encoder/ms-marco-MiniLM-L-6-v2 from Sentence Transformers) that runs locally and costs nothing per call.
If you have 6 relevant documents totaling 12,000 tokens and then pass all of that plus the generated answer to the hallucination checker, you're burning expensive context. The hallucination checker doesn't need to see every nuance of every document — it needs to verify specific claims.
Fix: For the hallucination check, pass only the document excerpts that are specifically cited in the generated answer, not the full context. You can extract cited sources from the answer text or from the structured metadata your generator logs.
If your ChromaDB collection is empty, collection.query() will return empty lists. Downstream nodes expecting state.retrieved_documents to be a non-empty list will fail unexpectedly.
Fix: Always validate the retrieval result before proceeding:
def retrieve(state: AgenticRAGState) -> AgenticRAGState:
results = collection.query(...)
if not results["documents"] or not results["documents"][0]:
state.retrieved_documents = []
state.reasoning_trace.append(
"WARNING: No documents returned from vector store. "
"Check collection population and query format."
)
return state
# ... rest of the function
If the rewriter always produces semantically similar reformulations, you'll waste retries on queries that all retrieve the same irrelevant documents. This happens when the rewriter doesn't get enough context about what failed.
Fix: Pass the rewriter the actual content snippets of the failed documents, not just their count. Seeing "this is what I retrieved and it's about X" gives the rewriter the signal it needs to try a genuinely different direction.
Not all ungrounded claims are hallucinations — some are the model correctly reporting general knowledge that happens to supplement the retrieved documents. A finalize_with_caveat path (as we implemented) is almost always better than a hard failure.
Fix: Use ungrounded_claims to make intelligent decisions. If there are 0 ungrounded claims that are factual (only stylistic additions), consider it grounded. Reserve the caveat path for when specific named entities, numbers, or procedures appear in the answer without source support.
Latency budget: A full agentic RAG cycle — retrieve, grade 6 docs, generate, check hallucination — takes approximately 4-8 seconds with gpt-4o. Each retry adds 3-6 seconds. Design your UX around this: streaming intermediate status updates ("Searching knowledge base...", "Verifying answer quality...") dramatically improves perceived performance.
Parallelizing the grading step: Grading each document sequentially is the biggest latency bottleneck after the LLM calls themselves. Use asyncio.gather() to grade all retrieved documents in parallel:
import asyncio
async def grade_document_relevance_async(document, query):
# Async version of grade_document_relevance
...
async def grade_relevance_parallel(state: AgenticRAGState) -> AgenticRAGState:
tasks = [
grade_document_relevance_async(doc, state.current_query)
for doc in state.retrieved_documents
]
grades = await asyncio.gather(*tasks)
# Process grades...
return state
This alone can cut grading latency by 60-70% for 6 documents.
Caching: Query rewriting produces structured queries that are often semantically equivalent. Implement a semantic cache keyed on query embeddings — if the cosine similarity between a new query and a cached query is above 0.97, return the cached results. This is particularly effective for high-traffic production systems where many users ask similar questions.
Monitoring what matters: Instrument these specific metrics:
rewrite_strategy distributionA high rewrite rate combined with narrow_focus as the dominant strategy tells you your queries are too broad — improve your chunking. A high hallucination detection rate tells you your generator prompt isn't strict enough.
This is an expert lesson, so let's be honest about the cost-benefit. Agentic RAG adds latency, cost, and operational complexity. It is not the right choice when:
The right trigger for adopting agentic RAG is a specific, measured failure mode in a production system — not architectural enthusiasm. Build the simpler version first, measure where it fails, and add agentic components surgically.
You've now built a production-grade agentic RAG pipeline with four core self-correcting mechanisms: relevance grading to filter bad retrievals before they corrupt your context, query rewriting to reformulate failed queries with diagnostic context, a generation step that grounds answers in graded evidence, and hallucination checking that verifies every factual claim post-generation.
The architectural pattern we've implemented — a directed graph with conditional edges, shared state, and termination guards — is the right abstraction for any agentic pipeline. The specific nodes and routing logic are application-specific, but the structure scales to much more complex orchestration.
What to explore next:
Multi-agent RAG: split the rewriter, grader, and checker into separate specialized agents with their own context and memory, coordinated by a meta-agent. This improves specialization at the cost of orchestration complexity.
Self-RAG paper implementation: the original Self-RAG paper (Asai et al., 2023) introduces special reflection tokens that the model generates inline, eliminating the need for separate grading calls. Understanding it will deepen your intuition for where the field is heading.
Tool-augmented RAG: give your agentic pipeline access to structured data queries (SQL), API calls, and web search in addition to vector retrieval. The graph patterns we've built here extend naturally to multi-tool orchestration.
Evaluation frameworks: building the pipeline is half the work. Implement RAGAS metrics (faithfulness, answer relevancy, context precision, context recall) to measure whether your agentic improvements actually improve outcomes — or just add latency.
Production hardening: add Redis-based caching for repeated queries, distributed tracing with OpenTelemetry, and a feedback loop that lets users flag bad answers to automatically trigger knowledge base updates.
The hardest part of agentic RAG isn't writing the code — it's knowing when to trust the pipeline's decisions and when to override them. Build the observability layer (reasoning traces, metrics, human-in-the-loop flagging) from day one, and let the data tell you where your agents need more help.
Learning Path: RAG & AI Agents