How to fit a multistep query decomposition with a custom chat engine built with a query pipeline #15117
Replies: 1 comment 4 replies
-
To fit multistep query decomposition into your custom chat engine built with a query pipeline, you can integrate the
Here is the updated code: from llama_index.legacy.chat_engine.types import (
AgentChatResponse,
BaseChatEngine,
StreamingAgentChatResponse,
ChatMessage,
)
from typing import AsyncGenerator, Generator, List, Optional, Union
from llama_index.core.memory import BaseMemory, ChatMemoryBuffer
from llama_index.core.storage.chat_store import SimpleChatStore
from llama_index.core.query_engine import MultiStepQueryEngine
from llama_index.legacy.indices.query.query_transform.base import StepDecomposeQueryTransform
from llama_index.legacy.response_synthesizers import BaseSynthesizer
class CustomChatEngine(BaseChatEngine):
def __init__(self, query_pipeline, memory: BaseMemory, *args, **kwargs):
super().__init__(*args, **kwargs)
self.query_pipeline = query_pipeline
self._memory = memory
# Initialize MultiStepQueryEngine
self.multi_step_query_engine = MultiStepQueryEngine(
query_engine=self.query_pipeline,
query_transform=StepDecomposeQueryTransform(),
response_synthesizer=BaseSynthesizer(),
num_steps=3, # Number of steps for the multi-step query
early_stopping=True, # Whether to stop early if the stop function returns True
index_summary="Used to answer questions about the author",
stop_fn=None # Optional custom stop function
)
def reset(self) -> None:
# Reset conversation state if applicable
self._memory.chat_store.set_messages(self._memory.chat_store_key, [])
async def achat(
self, message: str, chat_history: Optional[List[ChatMessage]] = None
):
try:
# Retrieve chat history
chat_history = self._memory.get()
# Add the new message to the chat history
chat_history.append(ChatMessage(role="user", content=message))
# Process the message with the MultiStepQueryEngine
query_bundle = QueryBundle(query_str=message)
chat_response = await self.multi_step_query_engine._aquery(query_bundle)
# Add the response to the chat history
chat_history.append(ChatMessage(role="assistant", content=chat_response))
# Update the memory with the new chat history
self._memory.chat_store.set_messages(self._memory.chat_store_key, chat_history)
return chat_response
except Exception as e:
return "An error occurred while processing your query: {}".format(e)
# Initialize the query pipeline
vn_component = create_vn_component()
response_component = create_response_component()
qp = QP(
modules={
"input": InputComponent(),
"vn_component": vn_component,
"response_component": response_component,
},
verbose=True,
)
qp.add_link("input", "vn_component", dest_key="input_str")
qp.add_link("vn_component", "response_component", dest_key="response")
# Initialize the chat memory buffer
chat_store = SimpleChatStore()
memory = ChatMemoryBuffer.from_defaults(
token_limit=3000,
chat_store=chat_store,
chat_store_key="user1",
)
# Create the custom chat engine with memory and multistep query engine
chat_engine = CustomChatEngine(qp, memory)
print("chat_engine", chat_engine) This code integrates the |
Beta Was this translation helpful? Give feedback.
-
How can I fit multistep query decomposition like this one into my custom chat engine:
this is my code:
response_llm = await st.session_state.chat_engine.achat(user_input)
Beta Was this translation helpful? Give feedback.
All reactions