Build a production-ready Agentic RAG system with LangGraph, conversation memory, and human-in-the-loop query clarification
Overview • How It Works • LLM Providers • Implementation • Installation & Usage • Troubleshooting
If you like this project, a star ⭐️ would mean a lot :)
✨ New:
• Multi-Agent Map-Reduce architecture for parallel query processing
• Comprehensive PDF → Markdown conversion guide, including tool comparisons and VLM-based approaches
• End-to-end Gradio interface for a complete interactive RAG pipeline
This repository demonstrates how to build an Agentic RAG (Retrieval-Augmented Generation) system using LangGraph with minimal code. It implements:
- 💬 Conversation Memory: Maintains context across multiple questions for natural dialogue
- 🔄 Query Clarification: Automatically rewrites ambiguous queries or asks for clarification
- 🔍 Hierarchical Indexing: Search small, specific chunks (Child) for precision, retrieve larger Parent chunks for context
- 🤖 Agent Orchestration: Uses LangGraph to coordinate the entire workflow
- 🧠 Intelligent Evaluation: Assesses relevance at the granular chunk level
- ✅ Self-Correction: Re-queries if initial results are insufficient
- 🔀 Multi-Agent Map-Reduce: Decomposes queries into parallel sub-queries for comprehensive answers
1️⃣ Learning Path: Interactive Notebook
Step-by-step tutorial perfect for understanding core concepts. Start here if you're new to Agentic RAG or want to experiment quickly. Focuses on the essential workflow without advanced features to keep things simple.
2️⃣ Building Path: Modular Project
Modular architecture where each component can be independently swapped. Use this approach if you want to build real applications or customize the system to your needs.
Examples of what you can customize:
- LLM Provider: Switch from Ollama to Claude, OpenAI, or Gemini (one line change)
- Agent Workflow: Add/remove nodes in the graph and customize system prompts for specific domains (legal, medical, etc.)
- PDF Conversion: Replace PyMuPDF with Docling, PaddleOCR, or other tools
- Embedding Models: Change dense/sparse embedding models via config
See the Modular Architecture section for details on how the system is organized and the Installation & Usage section to get started.
This approach combines the precision of small chunks with the contextual richness of large chunks, while understanding conversation flow, resolving unclear queries, and handling multi-faceted questions through parallel agent processing. The modular architecture ensures every component—from document processing to retrieval logic—can be customized without breaking the system.
Most RAG tutorials show basic concepts but lack production readiness. This repository bridges that gap by providing both learning materials and deployable code:
❌ Typical RAG repos:
- Simple pipelines that trade off precision vs context
- No conversation memory
- Static, non-adaptive retrieval
- Hard to customize for your use case
- No UI interface
- Single-threaded query processing
✅ This repo:
- Two learning paths: Interactive notebook AND modular project
- Hierarchical indexing for precision + context
- Conversation memory for natural dialogue
- Human-in-the-loop query clarification
- Multi-Agent Map-Reduce for parallel processing of complex queries
- Modular architecture - swap any component
- Provider-agnostic - use any LLM (Ollama, OpenAI, Gemini, Claude)
- UI interface - end-to-end Gradio app with document management
Before queries can be processed, documents are split twice for optimal retrieval:
- Parent Chunks: Large sections based on Markdown headers (H1, H2, H3)
- Child Chunks: Small, fixed-size pieces derived from parents
This approach combines the precision of small chunks for search with the contextual richness of large chunks for answer generation.
User Query → Conversation Analysis → Query Clarification →
Agent Reasoning → Search Child Chunks → Evaluate Relevance →
(If needed) → Retrieve Parent Chunks → Generate Answer → Return Response
- Analyzes recent conversation history to extract context
- Maintains conversational continuity across multiple questions
The system intelligently processes the user's query:
- Resolves references - Converts "How do I update it?" → "How do I update SQL?"
- Splits complex questions - Breaks multi-part questions into focused sub-queries
- Detects unclear queries - Identifies nonsense, insults, or vague questions
- Requests clarification - Uses human-in-the-loop to pause and ask for details
- Rewrites for retrieval - Optimizes query with specific, keyword-rich language
Multi-Agent Map-Reduce Architecture:
When the query analysis stage identifies multiple distinct questions (either explicitly asked or decomposed from a complex query), the system automatically spawns parallel agent subgraphs using LangGraph's Send API. Each agent independently processes one question through the full retrieval workflow:
- Agent searches child chunks for precision
- Evaluates if results are sufficient
- Fetches parent chunks for context if needed
- Extracts final answer from conversation
- Self-corrects and re-queries if insufficient
All agent responses are then aggregated into a unified answer.
Example: "What is JavaScript? What is Python?" → 2 parallel agents execute simultaneously
Single question workflow: For simple queries, a single agent executes the retrieval workflow without parallelization.
The system synthesizes information from retrieved chunks (or multiple agents) into a coherent, accurate answer that directly addresses the user's question.
This system is provider-agnostic - you can use any LLM supported by LangChain. Choose the option that best fits your needs:
Install Ollama and download the model:
# Install Ollama from https://ollama.com
ollama pull qwen3:4b-instruct-2507-q4_K_MPython code:
from langchain_ollama import ChatOllama
llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0)Install the package:
pip install -qU langchain-google-genaiPython code:
import os
from langchain_google_genai import ChatGoogleGenerativeAI
# Set your Google API key
os.environ["GOOGLE_API_KEY"] = "your-api-key-here"
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0)Click to expand
OpenAI:
pip install -qU langchain-openaifrom langchain_openai import ChatOpenAI
import os
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)Anthropic Claude:
pip install -qU langchain-anthropicfrom langchain_anthropic import ChatAnthropic
import os
os.environ["ANTHROPIC_API_KEY"] = "your-api-key-here"
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022", temperature=0)- All providers work with the exact same code - only the LLM initialization changes
- Cost considerations: Cloud providers charge per token, while Ollama is free but requires local compute
💡 Recommendation: Start with Ollama for development, then switch to Google Gemini or OpenAI for production.
Additional details and extended explanations are available in the notebook here 👉
Define paths and initialize core components.
import os
from pathlib import Path
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_qdrant.fastembed_sparse import FastEmbedSparse
from qdrant_client import QdrantClient
# Configuration
DOCS_DIR = "docs" # Directory containing your pdfs files
MARKDOWN_DIR = "markdown" # Directory containing the pdfs converted to markdown
PARENT_STORE_PATH = "parent_store" # Directory for parent chunk JSON files
CHILD_COLLECTION = "document_child_chunks"
os.makedirs(DOCS_DIR, exist_ok=True)
os.makedirs(MARKDOWN_DIR, exist_ok=True)
os.makedirs(PARENT_STORE_PATH, exist_ok=True)
from langchain_ollama import ChatOllama
llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0)
# Dense embeddings for semantic understanding
dense_embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
# Sparse embeddings for keyword matching
sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25")
# Qdrant client (local file-based storage)
client = QdrantClient(path="qdrant_db")Set up Qdrant to store child chunks with hybrid search capabilities.
from qdrant_client.http import models as qmodels
from langchain_qdrant import QdrantVectorStore
from langchain_qdrant.qdrant import RetrievalMode
# Get embedding dimension
embedding_dimension = len(dense_embeddings.embed_query("test"))
def ensure_collection(collection_name):
"""Create Qdrant collection if it doesn't exist"""
if not client.collection_exists(collection_name):
client.create_collection(
collection_name=collection_name,
vectors_config=qmodels.VectorParams(
size=embedding_dimension,
distance=qmodels.Distance.COSINE
),
sparse_vectors_config={
"sparse": qmodels.SparseVectorParams()
},
)
print(f"✓ Created collection: {collection_name}")
else:
print(f"✓ Collection already exists: {collection_name}")Convert the PDFs to Markdown. For more details about other techniques use this companion notebook:
import os
import pymupdf.layout
import pymupdf4llm
from pathlib import Path
import glob
os.environ["TOKENIZERS_PARALLELISM"] = "false"
def pdf_to_markdown(pdf_path, output_dir):
doc = pymupdf.open(pdf_path)
md = pymupdf4llm.to_markdown(doc, header=False, footer=False, page_separators=True, ignore_images=True, write_images=False, image_path=None)
md_cleaned = md.encode('utf-8', errors='surrogatepass').decode('utf-8', errors='ignore')
output_path = Path(output_dir) / Path(doc.name).stem
Path(output_path).with_suffix(".md").write_bytes(md_cleaned.encode('utf-8'))
def pdfs_to_markdowns(path_pattern, overwrite: bool = False):
output_dir = Path(MARKDOWN_DIR)
output_dir.mkdir(parents=True, exist_ok=True)
for pdf_path in map(Path, glob.glob(path_pattern)):
md_path = (output_dir / pdf_path.stem).with_suffix(".md")
if overwrite or not md_path.exists():
pdf_to_markdown(pdf_path, output_dir)
pdfs_to_markdowns(f"{DOCS_DIR}/*.pdf")Process documents with the Parent/Child splitting strategy.
import os
import glob
import json
from pathlib import Path
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
if client.collection_exists(CHILD_COLLECTION):
print(f"Removing existing Qdrant collection: {CHILD_COLLECTION}")
client.delete_collection(CHILD_COLLECTION)
ensure_collection(CHILD_COLLECTION)
else:
ensure_collection(CHILD_COLLECTION)
child_vector_store = QdrantVectorStore(
client=client,
collection_name=CHILD_COLLECTION,
embedding=dense_embeddings,
sparse_embedding=sparse_embeddings,
retrieval_mode=RetrievalMode.HYBRID,
sparse_vector_name="sparse"
)
def index_documents():
headers_to_split_on = [("#", "H1"), ("##", "H2"), ("###", "H3")]
parent_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on, strip_headers=False)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)
min_parent_size = 2000
max_parent_size = 10000
all_parent_pairs, all_child_chunks = [], []
md_files = sorted(glob.glob(os.path.join(MARKDOWN_DIR, "*.md")))
if not md_files:
print(f"⚠️ No .md files found in {MARKDOWN_DIR}/")
return
for doc_path_str in md_files:
doc_path = Path(doc_path_str)
print(f"📄 Processing: {doc_path.name}")
try:
with open(doc_path, "r", encoding="utf-8") as f:
md_text = f.read()
except Exception as e:
print(f"❌ Error reading {doc_path.name}: {e}")
continue
parent_chunks = parent_splitter.split_text(md_text)
merged_parents = merge_small_parents(parent_chunks, min_parent_size)
split_parents = split_large_parents(merged_parents, max_parent_size, child_splitter)
cleaned_parents = clean_small_chunks(split_parents, min_parent_size)
for i, p_chunk in enumerate(cleaned_parents):
parent_id = f"{doc_path.stem}_parent_{i}"
p_chunk.metadata.update({"source": doc_path.stem + ".pdf", "parent_id": parent_id})
all_parent_pairs.append((parent_id, p_chunk))
children = child_splitter.split_documents([p_chunk])
all_child_chunks.extend(children)
if not all_child_chunks:
print("⚠️ No child chunks to index")
return
print(f"\n🔍 Indexing {len(all_child_chunks)} child chunks into Qdrant...")
try:
child_vector_store.add_documents(all_child_chunks)
print("✓ Child chunks indexed successfully")
except Exception as e:
print(f"❌ Error indexing child chunks: {e}")
return
print(f"💾 Saving {len(all_parent_pairs)} parent chunks to JSON...")
for item in os.listdir(PARENT_STORE_PATH):
os.remove(os.path.join(PARENT_STORE_PATH, item))
for parent_id, doc in all_parent_pairs:
doc_dict = {"page_content": doc.page_content, "metadata": doc.metadata}
filepath = os.path.join(PARENT_STORE_PATH, f"{parent_id}.json")
with open(filepath, "w", encoding="utf-8") as f:
json.dump(doc_dict, f, ensure_ascii=False, indent=2)
def merge_small_parents(chunks, min_size):
if not chunks:
return []
merged, current = [], None
for chunk in chunks:
if current is None:
current = chunk
else:
current.page_content += "\n\n" + chunk.page_content
for k, v in chunk.metadata.items():
if k in current.metadata:
current.metadata[k] = f"{current.metadata[k]} -> {v}"
else:
current.metadata[k] = v
if len(current.page_content) >= min_size:
merged.append(current)
current = None
if current:
if merged:
merged[-1].page_content += "\n\n" + current.page_content
for k, v in current.metadata.items():
if k in merged[-1].metadata:
merged[-1].metadata[k] = f"{merged[-1].metadata[k]} -> {v}"
else:
merged[-1].metadata[k] = v
else:
merged.append(current)
return merged
def split_large_parents(chunks, max_size, splitter):
split_chunks = []
for chunk in chunks:
if len(chunk.page_content) <= max_size:
split_chunks.append(chunk)
else:
large_splitter = RecursiveCharacterTextSplitter(
chunk_size=max_size,
chunk_overlap=splitter._chunk_overlap
)
sub_chunks = large_splitter.split_documents([chunk])
split_chunks.extend(sub_chunks)
return split_chunks
def clean_small_chunks(chunks, min_size):
cleaned = []
for i, chunk in enumerate(chunks):
if len(chunk.page_content) < min_size:
if cleaned:
cleaned[-1].page_content += "\n\n" + chunk.page_content
for k, v in chunk.metadata.items():
if k in cleaned[-1].metadata:
cleaned[-1].metadata[k] = f"{cleaned[-1].metadata[k]} -> {v}"
else:
cleaned[-1].metadata[k] = v
elif i < len(chunks) - 1:
chunks[i + 1].page_content = chunk.page_content + "\n\n" + chunks[i + 1].page_content
for k, v in chunk.metadata.items():
if k in chunks[i + 1].metadata:
chunks[i + 1].metadata[k] = f"{v} -> {chunks[i + 1].metadata[k]}"
else:
chunks[i + 1].metadata[k] = v
else:
cleaned.append(chunk)
else:
cleaned.append(chunk)
return cleaned
index_documents()Create the retrieval tools the agent will use.
import json
from typing import List
from langchain_core.tools import tool
@tool
def search_child_chunks(query: str, k: int = 5) -> List[dict]:
"""Search for the top K most relevant child chunks.
Args:
query: Search query string
k: Number of results to return
"""
try:
results = child_vector_store.similarity_search(query, k=k, score_threshold=0.7)
return [
{
"content": doc.page_content,
"parent_id": doc.metadata.get("parent_id", ""),
"source": doc.metadata.get("source", "")
}
for doc in results
]
except Exception as e:
print(f"Error searching child chunks: {e}")
return []
@tool
def retrieve_parent_chunks(parent_ids: List[str]) -> List[dict]:
"""Retrieve full parent chunks by their IDs.
Args:
parent_ids: List of parent chunk IDs to retrieve
"""
unique_ids = sorted(list(set(parent_ids)))
results = []
for parent_id in unique_ids:
file_path = os.path.join(PARENT_STORE_PATH, parent_id if parent_id.lower().endswith(".json") else f"{parent_id}.json")
if os.path.exists(file_path):
try:
with open(file_path, "r", encoding="utf-8") as f:
doc_dict = json.load(f)
results.append({
"content": doc_dict["page_content"],
"parent_id": parent_id,
"metadata": doc_dict["metadata"]
})
except Exception as e:
print(f"Error loading parent chunk {parent_id}: {e}")
return results
# Bind tools to LLM
llm_with_tools = llm.bind_tools([search_child_chunks, retrieve_parent_chunks])Define the system prompts for conversation summarization, query analysis, RAG agent reasoning, and response aggregation.
def get_conversation_summary_prompt() -> str:
return """
Summarize the key topics and context from this conversation in 1-2 concise sentences.
Focus on:
- Main topics discussed
- Important facts or entities mentioned
- Any unresolved questions
Discard: greetings, misunderstandings, off-topic content.
If no meaningful topics exist, return an empty string.
Output:
- Return ONLY the summary.
- Do NOT include any explanations or justifications.
"""
def get_query_analysis_prompt() -> str:
return """
Rewrite the user query so it can be used for document retrieval.
Rules:
- The final query must be clear and self-contained.
- Always return at least one rewritten query.
- If the query contains a specific product name, brand, proper noun, or technical term,
treat it as domain-specific and IGNORE the conversation context.
- Use the conversation context ONLY if it is needed to understand the query
OR to determine the domain when the query itself is ambiguous.
- If the query is clear but underspecified, use relevant context to disambiguate.
- Do NOT use context to reinterpret or replace explicit terms in the query.
- Do NOT add new constraints, subtopics, or details not explicitly asked.
- Fix grammar, typos, and unclear abbreviations.
- Remove filler words and conversational wording.
- Use concrete keywords and entities ONLY if already implied.
Splitting:
- If the query contains multiple unrelated information needs,
split it into at most 3 separate search queries.
- When splitting, keep each sub-query semantically equivalent.
- Do NOT enrich or expand meaning.
- Do NOT split unless it improves retrieval.
Failure:
- If the intent is unclear or meaningless, mark as unclear.
"""
def get_rag_agent_prompt() -> str:
return """
You are a retrieval-augmented assistant.
You are NOT allowed to answer immediately.
Before producing ANY final answer, you must first perform a document search
and observe retrieved content.
If you have not searched, the answer is invalid.
Workflow:
1. Search the documents using the user query.
2. Inspect retrieved excerpts and keep only relevant ones.
3. Retrieve additional surrounding context ONLY if excerpts are insufficient.
4. Stop retrieval as soon as information is sufficient.
5. Answer using ONLY retrieved information.
6. List file name at the end.
Retry rule:
- If no relevant information is found, rewrite the query into a concise,
answer-focused statement and restart the process from STEP 1.
- Perform this retry only once.
If no relevant information is found after the retry, say so.
"""
def get_aggregation_prompt() -> str:
return """
You are merging multiple retrieved answers into a final response.
Rules:
- Use ONLY the content provided in the retrieved answers.
- Do NOT add new information, explanations, or assumptions.
- Do NOT rephrase or paraphrase unless combining overlapping answers is required.
Aggregation instructions:
1. If the answers cover different parts of the question:
- Combine them into a single coherent response.
- Preserve ALL details.
2. If multiple answers contain overlapping or duplicate information:
- Merge them carefully without removing details.
3. If an answer is irrelevant or empty:
- Ignore it completely.
Sources and citations:
4. Include source references ONLY if they already exist in the answers.
5. Do NOT invent, modify, or add new sources.
6. Place all source references ONLY at the end of the final answer.
7. Deduplicate sources if repeated.
Failure handling:
8. If no usable answers are present:
- Respond exactly with:
"Sorry, I could not find any information to answer your question."
Output:
- Return ONLY the final answer.
- Do NOT mention sub-questions.
- Do NOT describe your reasoning.
"""Create the state structure for conversation tracking and agent execution.
from langgraph.graph import MessagesState
from pydantic import BaseModel, Field
from typing import List, Annotated
def accumulate_or_reset(existing: List[dict], new: List[dict]) -> List[dict]:
"""Custom reducer that allows resetting agent answers"""
if new and any(item.get('__reset__') for item in new):
return []
return existing + new
class State(MessagesState):
"""State for main agent graph"""
questionIsClear: bool = False
conversation_summary: str = ""
originalQuery: str = ""
rewrittenQuestions: List[str] = []
agent_answers: Annotated[List[dict], accumulate_or_reset] = []
class AgentState(MessagesState):
"""State for individual agent subgraph"""
question: str = ""
question_index: int = 0
final_answer: str = ""
agent_answers: List[dict] = []
class QueryAnalysis(BaseModel):
"""Structured output for query analysis"""
is_clear: bool = Field(description="Indicates if the user's question is clear and answerable")
questions: List[str] = Field(description="List of rewritten, self-contained questions")
clarification_needed: str = Field(description="Explanation if the question is unclear")Create the processing nodes for the LangGraph workflow.
from langgraph.types import Send
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, RemoveMessage
from typing import Literal
def analyze_chat_and_summarize(state: State):
"""
Analyzes chat history and summarizes key points for context.
"""
if len(state["messages"]) < 4: # Need some history to summarize
return {"conversation_summary": ""}
# Extract relevant messages (excluding current query and system messages)
relevant_msgs = [
msg for msg in state["messages"][:-1] # Exclude current query
if isinstance(msg, (HumanMessage, AIMessage))
and not getattr(msg, "tool_calls", None)
]
if not relevant_msgs:
return {"conversation_summary": ""}
conversation = "Conversation history:\n"
for msg in relevant_msgs[-6:]:
role = "User" if isinstance(msg, HumanMessage) else "Assistant"
conversation += f"{role}: {msg.content}\n"
summary_response = llm.with_config(temperature=0.2).invoke([SystemMessage(content=get_conversation_summary_prompt())] + [HumanMessage(content=conversation)])
return {"conversation_summary": summary_response.content}
def analyze_and_rewrite_query(state: State):
"""
Analyzes user query and rewrites it for clarity, optionally using conversation context.
"""
last_message = state["messages"][-1]
conversation_summary = state.get("conversation_summary", "")
context_section = (f"Conversation Context:\n{conversation_summary}\n" if conversation_summary.strip() else "") + f"User Query:\n{last_message.content}\n"
llm_with_structure = llm.with_config(temperature=0.1).with_structured_output(QueryAnalysis)
response = llm_with_structure.invoke([SystemMessage(content=get_query_analysis_prompt())] + [HumanMessage(content=context_section)])
if len(response.questions) > 0 and response.is_clear:
# Remove all non-system messages
delete_all = [
RemoveMessage(id=m.id)
for m in state["messages"]
if not isinstance(m, SystemMessage)
]
return {
"questionIsClear": True,
"messages": delete_all,
"originalQuery": last_message.content,
"rewrittenQuestions": response.questions
}
else:
clarification = response.clarification_needed if (response.clarification_needed and len(response.clarification_needed.strip()) > 10) else "I need more information to understand your question."
return {
"questionIsClear": False,
"messages": [AIMessage(content=clarification)]
}
def human_input_node(state: State):
"""Placeholder node for human-in-the-loop interruption"""
return {}
def route_after_rewrite(state: State) -> Literal["human_input", "process_question"]:
"""Route to agent if question is clear, otherwise wait for human input"""
if not state.get("questionIsClear", False):
return "human_input"
else:
# Spawn parallel agents for each sub-question using Send API
return [
Send("process_question", {"question": query, "question_index": idx, "messages": []})
for idx, query in enumerate(state["rewrittenQuestions"])
]
def agent_node(state: AgentState):
"""Main agent node that processes queries using tools"""
sys_msg = SystemMessage(content=get_rag_agent_prompt())
if not state.get("messages"):
human_msg = HumanMessage(content=state["question"])
response = llm_with_tools.invoke([sys_msg] + [human_msg])
return {"messages": [human_msg, response]}
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
def extract_final_answer(state: AgentState):
"""Extract final answer from agent conversation"""
for msg in reversed(state["messages"]):
if isinstance(msg, AIMessage) and msg.content and not msg.tool_calls:
res = {
"final_answer": msg.content,
"agent_answers": [{
"index": state["question_index"],
"question": state["question"],
"answer": msg.content
}]
}
return res
return {
"final_answer": "Unable to generate an answer.",
"agent_answers": [{
"index": state["question_index"],
"question": state["question"],
"answer": "Unable to generate an answer."
}]
}
def aggregate_responses(state: State):
"""Merge multiple agent responses into final answer"""
if not state.get("agent_answers"):
return {"messages": [AIMessage(content="No answers were generated.")]}
sorted_answers = sorted(state["agent_answers"], key=lambda x: x["index"])
formatted_answers = ""
for i, ans in enumerate(sorted_answers, start=1):
formatted_answers += f"\nAnswer {i}:\n{ans['answer']}\n"
user_message = HumanMessage(content=f"""Original user question: {state["originalQuery"]}\nRetrieved answers:{formatted_answers}""")
synthesis_response = llm.invoke([SystemMessage(content=get_aggregation_prompt())] + [user_message])
return {"messages": [AIMessage(content=synthesis_response.content)]}Why this architecture?
- Summarization maintains conversational context without overwhelming the LLM
- Query rewriting ensures search queries are precise and unambiguous, using context intelligently
- Human-in-the-loop catches unclear queries before wasting retrieval resources
- Parallel execution with
SendAPI spawns independent agent subgraphs for each sub-question - Answer extraction ensures we get clean final answers from agent tool-calling conversations
- Aggregation merges all parallel results into a coherent single response
Assemble the complete workflow graph with conversation memory and multi-agent architecture.
from langgraph.graph import START, END, StateGraph
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import InMemorySaver
from IPython.display import Image, display
# Initialize checkpointer for conversation memory
checkpointer = InMemorySaver()
# Build agent subgraph (handles individual questions)
agent_builder = StateGraph(AgentState)
agent_builder.add_node("agent", agent_node)
agent_builder.add_node("tools", ToolNode([search_child_chunks, retrieve_parent_chunks]))
agent_builder.add_node("extract_answer", extract_final_answer)
agent_builder.add_edge(START, "agent")
agent_builder.add_conditional_edges("agent", tools_condition, {"tools": "tools", END: "extract_answer"})
agent_builder.add_edge("tools", "agent")
agent_builder.add_edge("extract_answer", END)
agent_subgraph = agent_builder.compile()
# Build main graph (orchestrates workflow)
graph_builder = StateGraph(State)
# Add nodes
graph_builder.add_node("summarize", analyze_chat_and_summarize)
graph_builder.add_node("analyze_rewrite", analyze_and_rewrite_query)
graph_builder.add_node("human_input", human_input_node)
graph_builder.add_node("process_question", agent_subgraph)
graph_builder.add_node("aggregate", aggregate_responses)
# Define edges
graph_builder.add_edge(START, "summarize")
graph_builder.add_edge("summarize", "analyze_rewrite")
graph_builder.add_conditional_edges("analyze_rewrite", route_after_rewrite)
graph_builder.add_edge("human_input", "analyze_rewrite")
graph_builder.add_edge(["process_question"], "aggregate")
graph_builder.add_edge("aggregate", END)
# Compile graph with checkpointer and interruption
agent_graph = graph_builder.compile(
checkpointer=checkpointer,
interrupt_before=["human_input"]
)Graph architecture explained:
Agent Subgraph (processes individual questions):
- START →
agent(invoke LLM with tools) agent→tools(if tool calls needed) ORextract_answer(if done)tools→agent(return tool results)extract_answer→ END (clean final answer)
Main Graph (orchestrates complete workflow):
- START →
summarize(extract conversation context from history) summarize→analyze_rewrite(rewrite query with context, check clarity)analyze_rewrite→human_input(if unclear) OR spawn parallelprocess_questionagents (if clear)human_input→analyze_rewrite(after user provides clarification)- All
process_questionagents →aggregate(merge all responses) aggregate→ END (return final synthesized answer)
Key features:
- Parallel execution: Multiple agent subgraphs run simultaneously using LangGraph's
SendAPI - Human-in-the-loop: Graph pauses at
human_inputnode when queries are unclear - Conversation memory:
InMemorySavercheckpointer maintains state across interactions
The architecture flow diagram can be viewed here
Build a Gradio interface with conversation persistence and human-in-the-loop support. For a complete end-to-end pipeline Gradio interface, including document ingestion, please refer to the project folder
import gradio as gr
import uuid
def create_thread_id():
"""Generate a unique thread ID for each conversation"""
return {"configurable": {"thread_id": str(uuid.uuid4())}}
def clear_session():
"""Clear thread for new conversation"""
global config
agent_graph.checkpointer.delete_thread(config["configurable"]["thread_id"])
config = create_thread_id()
def chat_with_agent(message, history):
current_state = agent_graph.get_state(config)
if current_state.next:
# Resume interrupted conversation
agent_graph.update_state(config,{"messages": [HumanMessage(content=message.strip())]})
result = agent_graph.invoke(None, config)
else:
# Start new query
result = agent_graph.invoke({"messages": [HumanMessage(content=message.strip())]},config)
return result['messages'][-1].content
# Initialize thread configuration
config = create_thread_id()
# Create Gradio interface
with gr.Blocks() as demo:
chatbot = gr.Chatbot(
height=600,
placeholder="<strong>Ask me anything!</strong><br><em>I'll search, reason, and act to give you the best answer :)</em>"
)
chatbot.clear(clear_session)
gr.ChatInterface(fn=chat_with_agent, chatbot=chatbot)
demo.launch(theme=gr.themes.Citrus())You're done! You now have a fully functional Agentic RAG system with conversation memory and query clarification.
The app (project/ folder) is organized in modular components that can be easily customized:
project/
├── app.py # Main Gradio application entry point
├── config.py # Configuration hub (models, chunk sizes, providers)
├── util.py # PDF to markdown conversion
├── document_chunker.py # Chunking strategy
├── core/ # Core RAG components orchestration
│ ├── chat_interface.py
│ ├── document_manager.py
│ └── rag_system.py
├── db/ # Storage management
│ ├── parent_store_manager.py # Parent chunks storage (JSON)
│ └── vector_db_manager.py # Qdrant vector database setup
├── rag_agent/ # LangGraph agent workflow
│ ├── edges.py # Conditional routing logic
│ ├── graph.py # Graph construction and compilation
│ ├── graph_state.py # State definitions
│ ├── nodes.py # Processing nodes (summarize, rewrite, agent)
│ ├── prompts.py # System prompts
│ ├── schemas.py # Pydantic data models
│ └── tools.py # Retrieval tools
└── ui/ # User interface
└── gradio_app.py # Gradio interface components
- LLM Provider & Model: Switch between Ollama, Claude, OpenAI, or Gemini
- Embedding Model: Configure embedding model for vector representations
- Chunk Sizes: Adjust child and parent chunk dimensions for optimal retrieval
- Workflow Customization: Add or remove nodes and edges to modify the agent flow
- System Prompts: Tailor prompts in
prompts.pyfor domain-specific applications - Retrieval Tools: Extend or modify tools in
tools.pyto enhance retrieval capabilities - Graph Logic: Customize conditional routing in
edges.pyand node processing innodes.py
- Markdown Conversion (
util.py): Replace PDF conversion tools with alternatives (e.g., Docling, PaddleOCR). More details here - Chunking Strategy (
document_chunker.py): Implement custom chunking algorithms (e.g., semantic or hybrid approaches)
This modular design ensures flexibility for experimenting with different RAG techniques, LLM providers, and document processing pipelines.
Sample pdf files can be found here: javascript, blockchain, microservices, fortinet
The easiest way to get started:
Running in Google Colab:
- Click the Open in Colab badge at the top of this README
- Create a
docs/folder in the file browser - Upload your pdf files to the
docs/folder - Run all cells from top to bottom
- The chat interface will appear at the end
Running Locally (Jupyter/VSCode):
- Install dependencies first
pip install -r requirements.txt - Open the notebook in your preferred environment
- Add your pdf files to the
docs/folder - Run all cells from top to bottom
- The chat interface will appear at the end
# Clone the repository
git clone <repo-url>
cd agentic-rag-for-dummies
# Create virtual environment (recommended)
python -m venv venv
# Activate it
# On macOS/Linux:
source venv/bin/activate
# On Windows:
.\venv\Scripts\activate
# Install packages
pip install -r requirements.txtpython app.pyOpen the local URL (e.g., http://127.0.0.1:7860) to start chatting.
⚠️ System Requirements: Docker deployment requires at least 8GB of RAM allocated to Docker. The Ollama model (qwen3:4b-instruct-2507-q4_K_M) needs approximately 3.3GB of memory to run.
- Docker installed on your system (Get Docker)
- Docker Desktop configured with at least 8GB of RAM (Settings → Resources → Memory)
docker build -f project/Dockerfile -t agentic-rag .docker run --name rag-assistant -p 7860:7860 agentic-rag
⚠️ Performance Note: Docker deployment may be 20-50% slower than running Python locally, especially on Windows/Mac, due to virtualization overhead and I/O operations. This is normal and expected. For maximum performance during development, consider using Option 2 (Full Python Project).
Optional: Enable GPU acceleration (NVIDIA GPU only):
If you have an NVIDIA GPU and NVIDIA Container Toolkit installed:
docker run --gpus all --name rag-assistant -p 7860:7860 agentic-ragCommon Docker commands:
# Stop the container
docker stop rag-assistant
# Start an existing container
docker start rag-assistant
# View logs in real-time
docker logs -f rag-assistant
# Remove the container
docker rm rag-assistant
# Remove the container forcefully (if running)
docker rm -f rag-assistantOnce the container is running and you see:
🚀 Launching RAG Assistant...
* Running on local URL: http://0.0.0.0:7860
Open your browser and navigate to:
http://localhost:7860
With Conversation Memory:
User: "How do I install SQL?"
Agent: [Provides installation steps from documentation]
User: "How do I update it?"
Agent: [Understands "it" = SQL, provides update instructions]
With Query Clarification:
User: "Tell me about that thing"
Agent: "I need more information. What specific topic are you asking about?"
User: "The installation process for PostgreSQL"
Agent: [Retrieves and answers with specific information]
| Area | Common Problems | Suggested Solutions |
|---|---|---|
| Model Selection | - Responses ignore instructions - Tools (retrieval/search) used incorrectly - Poor context understanding - Hallucinations or incomplete aggregation |
- Use more capable LLMs - Prefer models 7B+ for better reasoning - Consider cloud-based models if local models are limited |
| System Prompt Behavior | - Model answers without retrieving documents - Query rewriting loses context - Aggregation introduces hallucinations |
- Make retrieval explicit in system prompts - Keep query rewriting close to user intent - Enforce strict aggregation rules |
| Retrieval Configuration | - Relevant documents not retrieved - Too much irrelevant information |
- Increase retrieved chunks (k) or lower similarity thresholds to improve recall- Reduce k or increase thresholds to improve precision |
| Chunk Size / Document Splitting | - Answers lack context or feel fragmented - Retrieval is slow or embedding costs are high |
- Increase chunk & parent sizes for more context - Decrease chunk sizes to improve speed and reduce costs |
| Temperature & Consistency | - Responses inconsistent or overly creative - Responses too rigid or repetitive |
- Set temperature to 0 for factual, consistent output- Slightly increase temperature for summarization or analysis tasks |
| Embedding Model Quality | - Poor semantic search - Weak performance on domain-specific or multilingual docs |
- Use higher-quality or domain-specific embeddings - Re-index all documents after changing embeddings |
MIT License - Feel free to use this for learning and building your own projects!
Contributions are welcome! Open an issue or submit a pull request!

