A user asks about your enterprise pricing tier. The retriever pulls back three chunks about the free tier instead. The LLM dutifully generates a confident, well-structured answer about the wrong thing.
Nobody catches it. Your pipeline has no concept of "these documents don't actually answer the question." It retrieved, it generated, it's done.
In the RAG from scratch tutorial, we built the standard pipeline: chunk, embed, retrieve top-K, generate. That pipeline works right up until it doesn't. The retriever returns plausible-looking results, the LLM synthesizes them convincingly, and the user gets a wrong answer wrapped in false confidence. The gap isn't in generation. It's in the total absence of any quality check between retrieval and generation.
Here you'll build the machinery that fills that gap. A query router that decides how to answer before retrieving anything. A relevance grader that evaluates every retrieved document before the LLM sees it. A self-correction loop that rewrites the query and retries when the grader rejects too many documents. And circuit breakers that prevent the whole thing from spinning forever in production.
| Component | What it does | Why it matters |
|---|---|---|
| Query router | Classifies the question and picks a retrieval strategy | Simple questions skip retrieval entirely, complex ones get the full pipeline |
| Relevance grader | Scores each retrieved document against the original question | Catches the "wrong documents, confident answer" failure mode |
| Query rewriter | Reformulates the query when grading fails | Fixes the retrieval instead of generating from garbage |
| Circuit breaker | Caps retries and forces graceful fallback | Prevents infinite loops and runaway costs in production |
| Adaptive controller | Selects the full strategy based on query complexity | Matches retrieval cost to query difficulty |
Where does basic RAG actually fail?
Basic RAG fails in three predictable ways. Each one points to a specific component you need to add.
The most common is irrelevant retrieval: the vector search returns documents that are semantically similar to the query but don't contain the answer. The second is partial retrieval: the answer requires information from multiple documents but the pipeline only retrieves one piece. The third is unnecessary retrieval: the question is simple enough that the LLM already knows the answer, but the pipeline wastes time and money retrieving documents that add nothing.
Irrelevant retrieval
This is the most dangerous failure because it's invisible. The similarity scores look fine. The documents are topically related. But they don't contain the specific information needed.
Imagine a knowledge base about a SaaS product. A user asks "What happens if I exceed my API rate limit?" The retriever pulls back chunks about API authentication, API versioning, and API pricing. All API-related. All high similarity scores. None of them mention rate limits.
The LLM does what LLMs do: it synthesizes the available context into a plausible answer. "When you exceed your API usage, you may incur additional charges based on your pricing tier." It sounds right. It's completely fabricated from the wrong documents.
A basic RAG pipeline has zero machinery to catch this. The retriever's job is done after it returns top-K results. The generator's job starts when it receives them. There is no step in between that asks: "Do these documents actually help answer this question?"
Partial retrieval
Some questions need information that lives across multiple chunks or documents. "Compare the refund policies for enterprise and self-serve customers" requires finding both policies. Top-K retrieval grabs whichever chunks have the highest similarity to the query as a whole, which often means you get multiple chunks about the same subtopic rather than one from each.
We covered multi-hop retrieval in the agentic RAG deep dive. Self-corrective RAG complements that approach by catching cases where the first retrieval round comes back incomplete and triggering targeted follow-up searches.
Unnecessary retrieval
"What is the capital of France?" doesn't need a knowledge base lookup. Neither does "Explain what REST APIs are." When every query runs through the full retrieval pipeline regardless of complexity, you're paying for embeddings, vector search, and LLM context tokens on questions the model could answer directly.
Query routing solves this by classifying the question before retrieval starts.
How does query routing work?
Query routing classifies an incoming question and directs it to the most appropriate answering strategy before any retrieval happens. A router typically distinguishes between questions the LLM can answer directly from its training data, questions that need your vector store, questions that need a structured database query, and questions that need external web search. The classification itself is a cheap LLM call that saves expensive retrieval on simple questions and improves accuracy on complex ones.
The router is the first thing that runs. It looks at the question and decides: does this need retrieval at all? If so, what kind?
Here's a straightforward implementation. The router uses structured output to return a classification with reasoning.
TypeScript:
// Query router: classify the question before retrieval
import OpenAI from "openai";
const openai = new OpenAI();
type RouteType = "direct" | "vector_search" | "web_search";
interface RouteDecision {
route: RouteType;
reasoning: string;
rewritten_query?: string;
}
async function routeQuery(query: string): Promise<RouteDecision> {
const response = await openai.chat.completions.create({
model: "gpt-4o-mini",
temperature: 0,
response_format: { type: "json_object" },
messages: [
{
role: "system",
content: `You are a query router. Classify the user's question into one of three routes:
- "direct": General knowledge questions the LLM can answer without retrieval.
Examples: "What is RAG?", "Explain HTTP status codes"
- "vector_search": Questions about specific internal documents, products, or policies.
Examples: "What's our refund policy?", "How do I configure SSO?"
- "web_search": Questions about recent events, live data, or external information
not likely in the knowledge base.
Examples: "What did OpenAI announce yesterday?", "Current stock price of NVDA"
Return JSON: {"route": "...", "reasoning": "...", "rewritten_query": "..."}
The rewritten_query should optimize the question for the chosen retrieval method.
If route is "direct", omit rewritten_query.`,
},
{ role: "user", content: query },
],
});
return JSON.parse(response.choices[0].message.content ?? "{}");
}Python:
# Query router: classify the question before retrieval
import json
from openai import OpenAI
client = OpenAI()
def route_query(query: str) -> dict:
response = client.chat.completions.create(
model="gpt-4o-mini",
temperature=0,
response_format={"type": "json_object"},
messages=[
{
"role": "system",
"content": """You are a query router. Classify the user's question into one of three routes:
- "direct": General knowledge questions the LLM can answer without retrieval.
- "vector_search": Questions about specific internal documents, products, or policies.
- "web_search": Questions about recent events, live data, or external information.
Return JSON: {"route": "...", "reasoning": "...", "rewritten_query": "..."}
The rewritten_query should optimize the question for the chosen retrieval method.
If route is "direct", omit rewritten_query.""",
},
{"role": "user", "content": query},
],
)
return json.loads(response.choices[0].message.content or "{}")The router costs roughly $0.0002 per query with gpt-4o-mini. For a pipeline that processes 10,000 queries a day, if 40% of those are "direct" questions that skip retrieval, you save approximately $8-12 in embedding and generation costs daily while also returning faster responses.
Notice the rewritten_query field. When the router decides on vector search, it can also reformulate the query for better retrieval. "How do I get my money back?" becomes "refund policy return process." This is query expansion baked into the routing step, and it is one of the optimizations we mentioned at the end of the RAG from scratch article.
Now wire the router into a pipeline that dispatches to different strategies:
TypeScript:
// Dispatch based on route decision
async function handleQuery(query: string): Promise<string> {
const route = await routeQuery(query);
switch (route.route) {
case "direct":
// Skip retrieval entirely, answer from LLM knowledge
return generateDirectAnswer(query);
case "vector_search":
// Full self-corrective RAG pipeline (built in the next sections)
return selfCorrectiveRag(route.rewritten_query ?? query);
case "web_search":
// Web search fallback for current events
return webSearchAndGenerate(route.rewritten_query ?? query);
default:
return selfCorrectiveRag(query);
}
}
async function generateDirectAnswer(query: string): Promise<string> {
const response = await openai.chat.completions.create({
model: "gpt-4o-mini",
messages: [
{
role: "system",
content: "Answer the question directly and concisely. If you are not confident in the answer, say so.",
},
{ role: "user", content: query },
],
temperature: 0.3,
});
return response.choices[0].message.content ?? "I could not generate an answer.";
}What does a relevance grader actually do?
The relevance grader is the core innovation of self-corrective RAG. It sits between retrieval and generation, evaluating each retrieved document against the original question. Documents that pass go to the generator. Documents that fail get discarded. If too many fail, the query gets rewritten and retrieval runs again.
This is the component that catches "wrong documents, confident answer." Without it, every retrieved document gets treated as equally relevant regardless of whether it actually helps answer the question.
The grader needs to be fast and cheap because it runs on every retrieved document. A binary yes/no decision with a short explanation is the right granularity. You don't need a 1-5 scale here. You need a gate: is this document relevant to the question, or not?
TypeScript:
// Relevance grader: evaluate each document against the query
interface GradeResult {
relevant: boolean;
reasoning: string;
}
async function gradeDocument(
query: string,
document: string
): Promise<GradeResult> {
const response = await openai.chat.completions.create({
model: "gpt-4o-mini",
temperature: 0,
response_format: { type: "json_object" },
messages: [
{
role: "system",
content: `You are a relevance grader. Given a user question and a retrieved document,
determine if the document contains information relevant to answering the question.
Focus on whether the document provides facts, data, or context that would help
answer the question. Topical similarity alone is not enough. The document must
contain actual useful information.
Return JSON: {"relevant": true/false, "reasoning": "one sentence explanation"}`,
},
{
role: "user",
content: `Question: ${query}\n\nDocument: ${document}`,
},
],
});
return JSON.parse(response.choices[0].message.content ?? '{"relevant": false, "reasoning": "Parse error"}');
}
// Grade all retrieved documents, return only the relevant ones
async function gradeDocuments(
query: string,
documents: string[]
): Promise<{ relevant: string[]; irrelevant: number }> {
const grades = await Promise.all(
documents.map((doc) => gradeDocument(query, doc))
);
const relevant: string[] = [];
let irrelevant = 0;
for (let i = 0; i < documents.length; i++) {
if (grades[i].relevant) {
relevant.push(documents[i]);
} else {
irrelevant++;
}
}
return { relevant, irrelevant };
}Python:
# Relevance grader: evaluate each document against the query
import asyncio
def grade_document(query: str, document: str) -> dict:
response = client.chat.completions.create(
model="gpt-4o-mini",
temperature=0,
response_format={"type": "json_object"},
messages=[
{
"role": "system",
"content": """You are a relevance grader. Given a user question and a retrieved
document, determine if the document contains information relevant to answering
the question. Topical similarity alone is not enough.
Return JSON: {"relevant": true/false, "reasoning": "one sentence explanation"}""",
},
{
"role": "user",
"content": f"Question: {query}\n\nDocument: {document}",
},
],
)
return json.loads(response.choices[0].message.content or '{"relevant": false}')
def grade_documents(query: str, documents: list[str]) -> dict:
grades = [grade_document(query, doc) for doc in documents]
relevant = [
doc for doc, grade in zip(documents, grades) if grade["relevant"]
]
irrelevant = sum(1 for g in grades if not g["relevant"])
return {"relevant": relevant, "irrelevant": irrelevant}Grading five documents with gpt-4o-mini costs about $0.001 total. The documents are short (they're chunks, not full pages), and the prompt is minimal. In practice, grading adds 200-400ms of latency when you run the calls in parallel with Promise.all.
One important design choice: the grader evaluates each document independently. It doesn't see the other retrieved documents. This prevents the grader from making relative comparisons ("this one is better than that one") and keeps each decision focused on absolute relevance. A document is either useful for answering the question or it isn't.
How does the self-correction loop work?
Now we wire the grader into a retry loop. When the grader rejects too many documents, the system rewrites the query and tries retrieval again. This is the corrective RAG (CRAG) pattern from the research literature, and it's the single biggest accuracy improvement you can add to a basic RAG pipeline.
The loop works in three stages. First, retrieve documents normally. Second, grade them. If enough documents pass grading, proceed to generation. If most documents fail, rewrite the query and go back to step one. A hard cap on retries prevents infinite loops.
TypeScript:
// Self-corrective RAG: retrieve, grade, rewrite, retry
interface CragResult {
answer: string;
iterations: number;
totalRetrieved: number;
totalRelevant: number;
queryVersions: string[];
}
async function rewriteQuery(
originalQuery: string,
failedQuery: string,
irrelevantDocs: number
): Promise<string> {
const response = await openai.chat.completions.create({
model: "gpt-4o-mini",
temperature: 0.3,
messages: [
{
role: "system",
content: `The user asked a question and retrieval returned mostly irrelevant documents.
Rewrite the query to improve retrieval. Strategies:
- Use more specific terminology
- Break compound questions into focused searches
- Add context keywords that would appear in relevant documents
- Remove ambiguous terms
Return only the rewritten query, nothing else.`,
},
{
role: "user",
content: `Original question: ${originalQuery}
Query that failed: ${failedQuery}
${irrelevantDocs} out of 5 retrieved documents were irrelevant.
Rewrite this query for better retrieval:`,
},
],
});
return response.choices[0].message.content ?? failedQuery;
}
async function selfCorrectiveRag(
query: string,
maxRetries: number = 2,
relevanceThreshold: number = 0.6 // at least 60% of docs must be relevant
): Promise<CragResult> {
let currentQuery = query;
const queryVersions = [query];
let totalRetrieved = 0;
let totalRelevant = 0;
for (let i = 0; i <= maxRetries; i++) {
// Step 1: Retrieve
const documents = await vectorSearch(currentQuery, 5);
totalRetrieved += documents.length;
// Step 2: Grade
const { relevant, irrelevant } = await gradeDocuments(currentQuery, documents);
totalRelevant += relevant.length;
const relevanceRatio = relevant.length / documents.length;
// Step 3: Decide - generate or retry
if (relevanceRatio >= relevanceThreshold && relevant.length > 0) {
// Enough relevant documents, proceed to generation
const answer = await generate(query, relevant);
return {
answer,
iterations: i + 1,
totalRetrieved,
totalRelevant,
queryVersions,
};
}
// Not enough relevant docs. If we have retries left, rewrite and retry.
if (i < maxRetries) {
currentQuery = await rewriteQuery(query, currentQuery, irrelevant);
queryVersions.push(currentQuery);
}
}
// Exhausted retries. Generate with whatever relevant docs we accumulated,
// or return a graceful fallback.
const allRelevant = await gatherAllRelevantDocs(queryVersions);
if (allRelevant.length > 0) {
const answer = await generate(query, allRelevant);
return {
answer: answer + "\n\n(Note: I had limited relevant information available for this answer.)",
iterations: maxRetries + 1,
totalRetrieved,
totalRelevant,
queryVersions,
};
}
return {
answer: "I could not find enough relevant information to answer this question confidently. Could you rephrase or provide more context?",
iterations: maxRetries + 1,
totalRetrieved,
totalRelevant,
queryVersions,
};
}Python:
# Self-corrective RAG: retrieve, grade, rewrite, retry
def rewrite_query(original_query: str, failed_query: str, irrelevant_count: int) -> str:
response = client.chat.completions.create(
model="gpt-4o-mini",
temperature=0.3,
messages=[
{
"role": "system",
"content": """The user asked a question and retrieval returned mostly irrelevant
documents. Rewrite the query to improve retrieval. Use more specific terminology,
break compound questions into focused searches, and add context keywords.
Return only the rewritten query.""",
},
{
"role": "user",
"content": f"""Original question: {original_query}
Query that failed: {failed_query}
{irrelevant_count} out of 5 retrieved documents were irrelevant.
Rewrite this query for better retrieval:""",
},
],
)
return response.choices[0].message.content or failed_query
def self_corrective_rag(
query: str,
max_retries: int = 2,
relevance_threshold: float = 0.6,
) -> dict:
current_query = query
query_versions = [query]
total_retrieved = 0
total_relevant = 0
for i in range(max_retries + 1):
# Step 1: Retrieve
documents = vector_search(current_query, k=5)
total_retrieved += len(documents)
# Step 2: Grade
grading = grade_documents(current_query, documents)
relevant = grading["relevant"]
irrelevant = grading["irrelevant"]
total_relevant += len(relevant)
relevance_ratio = len(relevant) / len(documents) if documents else 0
# Step 3: Generate or retry
if relevance_ratio >= relevance_threshold and relevant:
answer = generate(query, relevant)
return {
"answer": answer,
"iterations": i + 1,
"total_retrieved": total_retrieved,
"total_relevant": total_relevant,
"query_versions": query_versions,
}
# Rewrite and retry if attempts remain
if i < max_retries:
current_query = rewrite_query(query, current_query, irrelevant)
query_versions.append(current_query)
# Exhausted retries: generate with whatever we have or return fallback
if total_relevant > 0:
answer = generate(query, relevant) + "\n\n(Limited relevant information available.)"
else:
answer = "I could not find enough relevant information to answer confidently."
return {
"answer": answer,
"iterations": max_retries + 1,
"total_retrieved": total_retrieved,
"total_relevant": total_relevant,
"query_versions": query_versions,
}Notice how the function tracks queryVersions and totalRetrieved vs totalRelevant. These are not cosmetic. In production, these metrics tell you exactly how your pipeline behaves. If a query consistently requires 2+ retries to find relevant documents, that's a signal that your knowledge base has a gap, your embeddings aren't capturing the right semantics for that topic, or your chunking strategy is splitting relevant information across too many pieces.
The relevanceThreshold of 0.6 means at least 3 out of 5 retrieved documents need to pass the grader. This is tunable. If you're in a domain where precision matters more than recall (medical, legal, financial), push it to 0.8. If your knowledge base is sparse and you'd rather have a partial answer than no answer, drop it to 0.4.
How do you prevent infinite loops in production?
Circuit breakers prevent self-corrective RAG from spinning indefinitely when the knowledge base simply doesn't contain the answer. The hard retry cap is the first layer. But production systems need more: timeout budgets, cost caps, and degradation tracking that feeds back into your eval pipeline.
The retry cap (maxRetries = 2) is a blunt instrument. It works, but it treats all queries equally. A more sophisticated approach budgets time and cost per query, degrading gracefully when either budget runs out.
TypeScript:
// Circuit breaker: budget-aware query execution
interface QueryBudget {
maxRetries: number;
maxLatencyMs: number;
maxCostUsd: number;
}
const DEFAULT_BUDGET: QueryBudget = {
maxRetries: 2,
maxLatencyMs: 8000, // 8 seconds total
maxCostUsd: 0.05, // 5 cents max per query
};
interface ExecutionMetrics {
retrieval_ms: number;
grading_ms: number;
generation_ms: number;
total_ms: number;
estimated_cost: number;
retries_used: number;
circuit_breaker_triggered: boolean;
trigger_reason?: "retries" | "latency" | "cost";
}
async function budgetAwareRag(
query: string,
budget: QueryBudget = DEFAULT_BUDGET
): Promise<{ answer: string; metrics: ExecutionMetrics }> {
const startTime = Date.now();
let estimatedCost = 0;
let retriesUsed = 0;
let currentQuery = query;
let relevantDocs: string[] = [];
let circuitBroken = false;
let triggerReason: ExecutionMetrics["trigger_reason"];
// Cost estimation per operation (approximate, based on gpt-4o-mini pricing)
const ROUTE_COST = 0.0002;
const GRADE_COST_PER_DOC = 0.0002;
const REWRITE_COST = 0.0003;
const GENERATE_COST = 0.002;
for (let i = 0; i <= budget.maxRetries; i++) {
// Check latency budget
if (Date.now() - startTime > budget.maxLatencyMs * 0.7) {
circuitBroken = true;
triggerReason = "latency";
break;
}
// Check cost budget
if (estimatedCost > budget.maxCostUsd * 0.8) {
circuitBroken = true;
triggerReason = "cost";
break;
}
const documents = await vectorSearch(currentQuery, 5);
const { relevant, irrelevant } = await gradeDocuments(currentQuery, documents);
estimatedCost += GRADE_COST_PER_DOC * documents.length;
if (relevant.length >= 3) {
relevantDocs = relevant;
break;
}
retriesUsed++;
if (i < budget.maxRetries) {
currentQuery = await rewriteQuery(query, currentQuery, irrelevant);
estimatedCost += REWRITE_COST;
} else {
circuitBroken = true;
triggerReason = "retries";
}
}
// Generate with whatever we have
estimatedCost += GENERATE_COST;
const answer = relevantDocs.length > 0
? await generate(query, relevantDocs)
: "I could not find sufficient information to answer this question.";
const totalMs = Date.now() - startTime;
return {
answer,
metrics: {
retrieval_ms: 0, // populated by actual timing in production
grading_ms: 0,
generation_ms: 0,
total_ms: totalMs,
estimated_cost: estimatedCost,
retries_used: retriesUsed,
circuit_breaker_triggered: circuitBroken,
trigger_reason: triggerReason,
},
};
}The metrics object is the most important part of this code. In production, you log these metrics for every query. When you see patterns, like a specific category of questions consistently triggering the circuit breaker, that tells you where to invest in your knowledge base or your chunking strategy.
If you're running evals on your RAG pipeline (and after reading the eval framework guide, you should be), the circuit breaker metrics become eval dimensions. "What percentage of queries trigger the circuit breaker?" is a top-level health metric for your retrieval system. Track it weekly. If it's climbing, your knowledge base isn't keeping pace with the questions your users are asking.
How do you combine routing, grading, and self-correction?
Adaptive RAG combines routing, grading, self-correction, and circuit breakers into a single pipeline that matches retrieval strategy to query complexity. Simple questions get fast, cheap answers. Complex questions get the full self-corrective loop. The controller decides which path to take based on the query classification.
This is the full architecture:
Here's the full controller that orchestrates everything:
TypeScript:
// Adaptive RAG controller: full pipeline
interface AdaptiveRagResult {
answer: string;
route: string;
iterations: number;
relevantDocCount: number;
totalLatencyMs: number;
queryVersions: string[];
}
async function adaptiveRag(query: string): Promise<AdaptiveRagResult> {
const startTime = Date.now();
// Step 1: Route the query
const route = await routeQuery(query);
// Step 2: Execute the appropriate strategy
let answer: string;
let iterations = 0;
let relevantDocCount = 0;
let queryVersions = [query];
switch (route.route) {
case "direct": {
answer = await generateDirectAnswer(query);
break;
}
case "vector_search": {
const result = await selfCorrectiveRag(
route.rewritten_query ?? query,
2, // max retries
0.6 // relevance threshold
);
answer = result.answer;
iterations = result.iterations;
relevantDocCount = result.totalRelevant;
queryVersions = result.queryVersions;
break;
}
case "web_search": {
// Web search with the same grading loop
const webResults = await webSearch(route.rewritten_query ?? query);
const { relevant } = await gradeDocuments(query, webResults);
relevantDocCount = relevant.length;
if (relevant.length > 0) {
answer = await generate(query, relevant);
} else {
// Fall back to vector search if web search yields nothing relevant
const fallback = await selfCorrectiveRag(query, 1, 0.4);
answer = fallback.answer;
iterations = fallback.iterations;
relevantDocCount = fallback.totalRelevant;
}
iterations = 1;
break;
}
default:
answer = await selfCorrectiveRag(query).then((r) => r.answer);
}
return {
answer,
route: route.route,
iterations,
relevantDocCount,
totalLatencyMs: Date.now() - startTime,
queryVersions,
};
}Python:
# Adaptive RAG controller: full pipeline
def adaptive_rag(query: str) -> dict:
import time
start = time.time()
# Step 1: Route
route = route_query(query)
# Step 2: Execute strategy
answer = ""
iterations = 0
relevant_count = 0
query_versions = [query]
if route["route"] == "direct":
answer = generate_direct_answer(query)
elif route["route"] == "vector_search":
search_query = route.get("rewritten_query", query)
result = self_corrective_rag(search_query, max_retries=2)
answer = result["answer"]
iterations = result["iterations"]
relevant_count = result["total_relevant"]
query_versions = result["query_versions"]
elif route["route"] == "web_search":
web_results = web_search(route.get("rewritten_query", query))
grading = grade_documents(query, web_results)
if grading["relevant"]:
answer = generate(query, grading["relevant"])
relevant_count = len(grading["relevant"])
else:
# Fall back to vector search
fallback = self_corrective_rag(query, max_retries=1)
answer = fallback["answer"]
relevant_count = fallback["total_relevant"]
iterations = 1
else:
result = self_corrective_rag(query)
answer = result["answer"]
return {
"answer": answer,
"route": route["route"],
"iterations": iterations,
"relevant_doc_count": relevant_count,
"total_latency_ms": int((time.time() - start) * 1000),
"query_versions": query_versions,
}How does LangGraph simplify this?
Everything above is framework-agnostic: plain functions, a while loop, if/else branches. That's intentional. You should understand the mechanics before adding abstraction. But once you're running self-corrective RAG in production, LangGraph offers three things that are genuinely hard to replicate with raw code: visual graph debugging, built-in state checkpointing, and a declarative way to express the control flow.
LangGraph models the pipeline as a state graph. Each node is a function that reads and writes to a shared state object. Edges define the transitions between nodes, including conditional edges that implement the "grade and decide" logic. The framework handles the loop mechanics, state persistence, and visualization.
Here's the same self-corrective pipeline expressed as a LangGraph graph:
Python (LangGraph):
# Self-corrective RAG as a LangGraph state graph
from typing import TypedDict
from langgraph.graph import StateGraph, END
class RagState(TypedDict):
query: str
original_query: str
documents: list[str]
relevant_documents: list[str]
generation: str
retries: int
max_retries: int
def retrieve_node(state: RagState) -> dict:
"""Retrieve documents from vector store."""
docs = vector_search(state["query"], k=5)
return {"documents": docs}
def grade_node(state: RagState) -> dict:
"""Grade each retrieved document for relevance."""
grading = grade_documents(state["query"], state["documents"])
return {"relevant_documents": grading["relevant"]}
def rewrite_node(state: RagState) -> dict:
"""Rewrite the query for better retrieval."""
irrelevant_count = len(state["documents"]) - len(state["relevant_documents"])
new_query = rewrite_query(
state["original_query"], state["query"], irrelevant_count
)
return {"query": new_query, "retries": state["retries"] + 1}
def generate_node(state: RagState) -> dict:
"""Generate answer from relevant documents."""
answer = generate(state["original_query"], state["relevant_documents"])
return {"generation": answer}
def fallback_node(state: RagState) -> dict:
"""Return fallback when retries exhausted."""
return {"generation": "I could not find sufficient information to answer."}
# Conditional edge: decide whether to generate, retry, or fall back
def should_retry(state: RagState) -> str:
relevance_ratio = (
len(state["relevant_documents"]) / len(state["documents"])
if state["documents"]
else 0
)
if relevance_ratio >= 0.6 and state["relevant_documents"]:
return "generate"
elif state["retries"] < state["max_retries"]:
return "rewrite"
else:
return "fallback"
# Build the graph
workflow = StateGraph(RagState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("grade", grade_node)
workflow.add_node("rewrite", rewrite_node)
workflow.add_node("generate", generate_node)
workflow.add_node("fallback", fallback_node)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade")
workflow.add_conditional_edges(
"grade",
should_retry,
{"generate": "generate", "rewrite": "rewrite", "fallback": "fallback"},
)
workflow.add_edge("rewrite", "retrieve") # retry loop
workflow.add_edge("generate", END)
workflow.add_edge("fallback", END)
app = workflow.compile()
# Run it
result = app.invoke({
"query": "What happens if I exceed my API rate limit?",
"original_query": "What happens if I exceed my API rate limit?",
"documents": [],
"relevant_documents": [],
"generation": "",
"retries": 0,
"max_retries": 2,
})
print(result["generation"])Compare this to the while-loop version. The logic is identical. The difference is structural:
| Aspect | Raw implementation | LangGraph |
|---|---|---|
| Control flow | While loop + if/else | Nodes + conditional edges |
| State | Local variables | Typed state dict, auto-persisted |
| Debugging | Print statements | Visual graph + step-by-step replay |
| Checkpointing | Manual (save state to DB yourself) | Built-in (resume from any node) |
| Adding nodes | Refactor the loop | Add a node and an edge |
LangGraph's real value shows up when the graph gets more complex. Adding a web search fallback, a hallucination checker after generation, or a human-in-the-loop approval step are each a single node plus edge. In the raw implementation, each addition makes the while loop harder to follow.
That said, if your pipeline is the simple retrieve-grade-retry-generate loop, the raw implementation is clearer and has zero dependencies. Use LangGraph when you need checkpointing, visualization, or more than four or five nodes.
What should you monitor in production?
Self-corrective RAG in production generates metrics that tell you whether your system is healthy. The most important ones aren't accuracy numbers (those come from your eval pipeline). They're operational signals that warn you when accuracy is degrading before users notice.
Five metrics to track
Retry rate. What percentage of queries require at least one retry? If this climbs above 30%, your retrieval quality is degrading. Either the knowledge base is stale, the question distribution has shifted, or your embeddings aren't capturing the right semantics for recent topics.
Circuit breaker trigger rate. What percentage of queries exhaust all retries without finding enough relevant documents? This is your "knowledge gap" metric. Every triggered circuit breaker represents a question your system cannot answer. Log the query text for these and review weekly.
Route distribution. What percentage of queries go to each route (direct, vector search, web search)? If 80% of queries are being routed to vector search but 40% of those trigger retries, your router might be misclassifying questions that should go to web search or direct answer.
Grading agreement. How often does the grader agree with the retriever? If your top-K results consistently score high similarity but the grader rejects them, you have an embedding-grading misalignment. Either recalibrate your similarity threshold or re-examine your grader prompt.
Latency by route. Direct answers should take 500ms or less. Single-pass vector search should take 1-2 seconds. Self-corrective queries with retries should stay under 6 seconds. If any route exceeds its budget, investigate.
Logging structure
Here's a practical logging schema you can adapt:
// Production telemetry for self-corrective RAG
interface RagTelemetry {
queryId: string;
timestamp: string;
query: string;
route: "direct" | "vector_search" | "web_search";
iterations: number;
documentsRetrieved: number;
documentsRelevant: number;
circuitBreakerTriggered: boolean;
circuitBreakerReason?: string;
queryVersions: string[];
latencyMs: number;
estimatedCostUsd: number;
}
function logRagTelemetry(result: AdaptiveRagResult, query: string): void {
const telemetry: RagTelemetry = {
queryId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
query,
route: result.route as RagTelemetry["route"],
iterations: result.iterations,
documentsRetrieved: result.relevantDocCount, // simplified
documentsRelevant: result.relevantDocCount,
circuitBreakerTriggered: result.iterations > 2,
queryVersions: result.queryVersions,
latencyMs: result.totalLatencyMs,
estimatedCostUsd: 0, // populated by budget tracker
};
// Send to your observability stack
console.log(JSON.stringify(telemetry));
}When you're building production agents that use RAG alongside MCP tool calls, custom tools, and structured testing scenarios, these telemetry signals become the input to your scoring pipeline. A scorecard criterion like "retrieval quality" can pull directly from the grading metrics, giving you automated quality monitoring without manual review.
Where should you start adding self-correction?
If you already have a working RAG pipeline from the from-scratch tutorial, here's the order to add each component. Each step is independent and gives you measurable improvement before moving to the next.
Step 1: Add the relevance grader (2-3 hours). This is the highest-impact change. Insert the grader between retrieval and generation. Log the results but don't block on them yet. After a week of logs, you'll know exactly how often your retriever returns irrelevant documents.
Step 2: Add the self-correction loop (1-2 hours). Now use the grader results to trigger retries. Start with maxRetries = 1 and relevanceThreshold = 0.5. Monitor the retry rate. If retries are rare (under 10% of queries), your retrieval is already solid. If they're frequent, tighten the threshold.
Step 3: Add the query router (1-2 hours). Route simple questions to direct answers. This reduces latency and cost on the 30-50% of queries that don't need retrieval. The router also rewrites queries before they hit the vector store, which improves first-pass retrieval quality.
Step 4: Add circuit breakers and telemetry (2-3 hours). Budget-aware execution with structured logging. This is the production readiness step. Without it, a spike in complex queries can blow your cost budget or latency SLAs.
Step 5: Consider LangGraph for complex pipelines (half a day). If your pipeline grows beyond the basic loop, like adding web search fallback, hallucination checking, or human review, LangGraph's graph abstraction becomes worth the dependency. Start with the raw implementation, migrate to LangGraph when the while loop gets hard to follow.
Each step improves a specific metric:
| Step | Primary metric improved | Typical improvement |
|---|---|---|
| Relevance grader | Answer accuracy | +15-25% on queries with poor retrieval |
| Self-correction loop | Retrieval recall | Catches 60-80% of missed documents |
| Query router | Average latency, cost per query | -30-50% latency on simple queries |
| Circuit breakers | Tail latency (p99), cost variance | Eliminates runaway queries |
| LangGraph | Developer velocity, debuggability | Subjective, but significant at 5+ nodes |
The key insight across all of this: the generation step was never the problem. The LLM is good at synthesizing context into answers. What was missing is quality control on which context reaches the LLM. The relevance grader and self-correction loop provide that quality control, transforming a pipeline that sometimes works into one that consistently works.
Remember that enterprise pricing question from the top of this article? With a relevance grader in place, those three free-tier chunks get flagged as irrelevant. The query gets rewritten to "enterprise pricing tier limits features." The second retrieval pulls the right documents. The user gets the right answer. Nobody has to catch anything because the pipeline caught it itself.
Your prompt engineering constrains how the LLM uses context. Your eval framework measures whether it used context well. Self-corrective RAG ensures the context was worth using in the first place.
Build agents with self-correcting retrieval
Chanl connects your AI agents to knowledge bases with built-in retrieval monitoring, tool execution via MCP, and automated quality scorecards. Build the pipeline once, watch it correct itself in production.
Start buildingCo-founder
Building the platform for AI agents at Chanl — tools, testing, and observability for customer experience.
Aprende IA Agéntica
Una lección por semana: técnicas prácticas para construir, probar y lanzar agentes IA. Desde ingeniería de prompts hasta monitoreo en producción. Aprende haciendo.



