Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 219 additions & 79 deletions app-frontend/react/src/redux/Conversation/ConversationSlice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { createAsyncThunkWrapper } from "../thunkUtil";
import client from "../../common/client";
import { notifications } from "@mantine/notifications";
import { CHAT_QNA_URL, DATA_PREP_URL } from "../../config";
// import { useState } from 'react';

export interface FileDataSource {
id: string;
Expand Down Expand Up @@ -172,9 +173,9 @@ export const isAgentSelector = (state: RootState) => state.conversationReducer.i

export default ConversationSlice.reducer;

let source: string[] = [];
let content: any[] = [];
let currentTool: string = "";
// let source: string[] = [];
// let content: any[] = [];
// let currentTool: string = "";
let isAgent: boolean = false;
let currentAgentSteps: AgentStep[] = []; // Temporary storage for steps during streaming

Expand All @@ -193,7 +194,7 @@ export const doConversation = (conversationRequest: ConversationRequest) => {
} else {
store.dispatch(addMessageToMessages(userPrompt));
}

const userPromptWithoutTime = {
role: userPrompt.role,
content: userPrompt.content,
Expand All @@ -206,17 +207,13 @@ export const doConversation = (conversationRequest: ConversationRequest) => {
stream: true,
};

function isJsonParsable(str: string): boolean {
try {
JSON.parse(str);
return true;
} catch (e) {
return false;
}
}

let result = "";
let result = ""; // Accumulates the final answer
let thinkBuffer = ""; // Accumulates data for think blocks
let postThinkBuffer = ""; // Accumulates plain text after last </think>
let isInThink = false; // Tracks if we're inside a <think> block
currentAgentSteps = []; // Reset steps for this message
isAgent = false; // Tracks if this is an agent message (set once, never reset)
let isMessageDispatched = false; // Tracks if the final message has been dispatched

try {
console.log("CHAT_QNA_URL", CHAT_QNA_URL);
Expand All @@ -239,59 +236,104 @@ export const doConversation = (conversationRequest: ConversationRequest) => {
}
},
onmessage(msg) {
if (msg?.data !== "[DONE]") {
// console.log ( "check is json", isJsonParsable(msg.data) )
if (isJsonParsable(msg.data)) {
if (msg?.data === "[DONE]") {
// Stream is done, finalize the message
if (isAgent && thinkBuffer) {
processThinkContent(thinkBuffer);
}
if (!isMessageDispatched) {
// Use postThinkBuffer as the final answer if present
if (postThinkBuffer.trim()) {
result = postThinkBuffer.trim();
}
store.dispatch(setOnGoingResult(result));
store.dispatch(
addMessageToMessages({
role: MessageRole.Assistant,
content: result,
time: getCurrentTimeStamp(),
agentSteps: isAgent ? [...currentAgentSteps] : [],
}),
);
isMessageDispatched = true;
}
currentAgentSteps = []; // Clear steps for next message
postThinkBuffer = "";
return;
}

const data = msg?.data || "";

// Handle think blocks and non-think content
if (data.includes("<think>")) {
if (!isAgent) {
isAgent = true;
store.dispatch(setIsAgent(true));
const currentMsg = JSON.parse(msg.data);
if (currentMsg.tool || currentMsg.source || currentMsg.content) {
currentAgentSteps.push({
tool: currentMsg.tool || currentTool,
content: currentMsg.content || [],
source: currentMsg.source || [],
});
}
currentTool = currentMsg.tool? currentMsg.tool: "";
source = currentMsg.source? currentMsg.source: "";
if (currentMsg.content) {
content = [...content, ...currentMsg.content];
result = currentMsg.content[0];
}
console.log(currentMsg);
console.log("currentTool", currentTool);
console.log("source", source);
console.log("content", content);
} else {
// isAgent
try {
// const match = msg.data.match(/b'([^']*)'/);
// if (match && match[1] !== "</s>") {
// const extractedText = match[1];
// if (extractedText.includes("\\x")) {
// const decodedText = decodeEscapedBytes(extractedText);
// result += decodedText;
// } else {
// result += extractedText;
// }
// } else if (!match) {
// result += msg?.data;
// }
// store.dispatch(setIsAgent(false));

result += msg?.data;

if (result) {
}
// Split on <think> to handle content before it
const parts = data.split("<think>");
for (let i = 0; i < parts.length; i++) {
const part = parts[i];
if (i === 0 && !isInThink && part) {
// Content before <think> (non-think)
postThinkBuffer += part;
if (isAgent) {
store.dispatch(setOnGoingResult(postThinkBuffer));
} else {
result += part;
store.dispatch(setOnGoingResult(result));
}
} catch (e) {
console.log("something wrong in msg", e);
throw e;
} else {
// Start or continue think block
isInThink = true;
thinkBuffer += part;
// Check if part contains </think>
if (part.includes("</think>")) {
const [thinkContent, afterThink] = part.split("</think>", 2);
thinkBuffer = thinkBuffer.substring(0, thinkBuffer.indexOf(part)) + thinkContent;
processThinkContent(thinkBuffer);
thinkBuffer = "";
isInThink = false;
if (afterThink) {
// Handle content after </think> as non-think
if (!afterThink.includes("<think>")) {
postThinkBuffer += afterThink;
store.dispatch(setOnGoingResult(postThinkBuffer));
} else {
thinkBuffer = afterThink;
isInThink = true;
}
}
}
}
}
} else if (isInThink) {
// Accumulate within think block
thinkBuffer += data;
if (data.includes("</think>")) {
const [thinkContent, afterThink] = data.split("</think>", 2);
thinkBuffer = thinkBuffer.substring(0, thinkBuffer.lastIndexOf(data)) + thinkContent;
processThinkContent(thinkBuffer);
thinkBuffer = "";
isInThink = false;
if (afterThink) {
// Handle content after </think>
if (!afterThink.includes("<think>")) {
postThinkBuffer += afterThink;
store.dispatch(setOnGoingResult(postThinkBuffer));
} else {
thinkBuffer = afterThink;
isInThink = true;
}
}
}
} else {
// Non-agent or post-think plain text
if (isAgent) {
console.log("final answer:", result);
postThinkBuffer += data;
store.dispatch(setOnGoingResult(postThinkBuffer));
} else {
result += data;
store.dispatch(setOnGoingResult(result));
}
}
Expand All @@ -302,32 +344,130 @@ export const doConversation = (conversationRequest: ConversationRequest) => {
throw err;
},
onclose() {
if (!isMessageDispatched && (result || postThinkBuffer || (isAgent && currentAgentSteps.length > 0))) {
// Use postThinkBuffer as the final answer if present
if (postThinkBuffer.trim()) {
result = postThinkBuffer.trim();
}
store.dispatch(setOnGoingResult(result));
store.dispatch(
addMessageToMessages({
role: MessageRole.Assistant,
content: result,
time: getCurrentTimeStamp(),
agentSteps: isAgent ? [...currentAgentSteps] : [],
}),
);
isMessageDispatched = true;
}
store.dispatch(setOnGoingResult(""));
console.log("onclose", result);
store.dispatch(
addMessageToMessages({
role: MessageRole.Assistant,
content: result,
time: getCurrentTimeStamp(),
agentSteps: [...currentAgentSteps], // Store steps with this message
}),
);
currentAgentSteps = []; // Clear steps for the next message
// isAgent = false;
// store.dispatch(setIsAgent(false));
currentAgentSteps = [];
postThinkBuffer = "";
},
});
} catch (err) {
console.log(err);
}
};

// function decodeEscapedBytes(str: string): string {
// const byteArray: number[] = str
// .split("\\x")
// .slice(1)
// .map((byte: string) => parseInt(byte, 16));
// return new TextDecoder("utf-8").decode(new Uint8Array(byteArray));
// }
// Helper function to process content within <think> tags
function processThinkContent(content: string) {
content = content.trim();
if (!content) return;

const toolCallRegex = /TOOL CALL: (\{.*?\})/g;
const finalAnswerRegex = /FINAL ANSWER: (\{.*?\})/;
let stepContent: string[] = []; // Collect all reasoning for this think block
let tool: string = "reasoning"; // Default tool
let source: string[] = []; // Tool output

// Split content by final answer (if present)
let remainingContent = content;
const finalAnswerMatch = content.match(finalAnswerRegex);
if (finalAnswerMatch) {
try {
const finalAnswer = JSON.parse(finalAnswerMatch[1].replace("FINAL ANSWER: ", ""));
if (finalAnswer.answer) {
result = finalAnswer.answer;
}
remainingContent = content.split(finalAnswerMatch[0])[0].trim(); // Content before FINAL ANSWER
tool = "final_answer";
} catch (e) {
console.error("Error parsing final answer:", finalAnswerMatch[1], e);
}
}

// Process tool calls within the remaining content
const toolMatches = remainingContent.match(toolCallRegex) || [];
let currentContent = remainingContent;

if (toolMatches.length > 0) {
// Handle content before and after tool calls
toolMatches.forEach((toolCallStr) => {
const [beforeTool, afterTool] = currentContent.split(toolCallStr, 2);
if (beforeTool.trim()) {
stepContent.push(beforeTool.trim());
}

try {
// Attempt to parse the tool call JSON
let toolCall;
try {
toolCall = JSON.parse(toolCallStr.replace("TOOL CALL: ", ""));
} catch (e) {
console.error("Error parsing tool call JSON, attempting recovery:", toolCallStr, e);
// Attempt to extract tool and content manually
const toolMatch = toolCallStr.match(/"tool":\s*"([^"]+)"/);
const contentMatch = toolCallStr.match(/"tool_content":\s*\["([^"]+)"\]/);
toolCall = {
tool: toolMatch ? toolMatch[1] : "unknown",
args: {
tool_content: contentMatch ? [contentMatch[1]] : [],
},
};
}

tool = toolCall.tool || tool;
source = toolCall.args?.tool_content || source;

// Clean up afterTool to remove invalid JSON fragments
if (afterTool.trim()) {
// Remove any trailing malformed JSON (e.g., "Chinook?"}})
const cleanAfterTool = afterTool.replace(/[\s\S]*?(\}\s*)$/, "").trim();
if (cleanAfterTool) {
stepContent.push(cleanAfterTool);
}
}

} catch (e) {
console.error("Failed to process tool call:", toolCallStr, e);
stepContent.push(`[Error parsing tool call: ${toolCallStr}]`);
}

currentContent = afterTool;
});
} else {
// No tool calls, treat as reasoning
if (remainingContent.trim()) {
stepContent.push(remainingContent.trim());
}
}

// Add the step for this think block
if (stepContent.length > 0 || source.length > 0) {
currentAgentSteps.push({
tool,
content: stepContent,
source,
});
}

// Update onGoingResult to trigger UI update with latest steps
if (isAgent) {
const latestContent = currentAgentSteps.flatMap(step => step.content).join(" ");
const latestSource = source.length > 0 ? source.join(" ") : "";
store.dispatch(setOnGoingResult(latestContent + (latestSource ? " " + latestSource : "") + (postThinkBuffer ? " " + postThinkBuffer : "")));
}
}
};

export const getCurrentAgentSteps = () => currentAgentSteps; // Export for use in Conversation.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ spec:
command: ["/bin/sh", "-c"]
args:
- |
TOOLS_GIT_URL="https://github.com/wanhakim/GenAIStudio/tree/main/app-backend/templates/tools"
TOOLS_GIT_URL="https://github.com/opea-project/GenAIStudio/tree/main/studio-backend/app/templates/tools"
OWNER=$(echo ${TOOLS_GIT_URL} | sed -E 's|https://github.com/([^/]+)/([^/]+)/tree/([^/]+)/.*|\1|')
REPO=$(echo ${TOOLS_GIT_URL} | sed -E 's|https://github.com/([^/]+)/([^/]+)/tree/([^/]+)/.*|\2|')
BRANCH=$(echo ${TOOLS_GIT_URL} | sed -E 's|https://github.com/[^/]+/[^/]+/tree/([^/]+)/.*|\1|')
TOOLS_DIR=$(echo ${TOOLS_GIT_URL} | sed -E 's|https://github.com/[^/]+/[^/]+/tree/[^/]+/(.*?)/?$|\1|')
if [[ "${TOOLS_DIR: -1}" == "/" ]]; then TOOLS_DIR="${TOOLS_DIR%/}"; fi
DOWNLOAD_URL="https://codeload.github.com/${OWNER}/${REPO}/tar.gz/${BRANCH}"
curl "${DOWNLOAD_URL}" | tar -xz --strip-components=4 -C /home/user/tools/ "${REPO}-${BRANCH}/${TOOLS_DIR}"
curl "${DOWNLOAD_URL}" | tar -xz --strip-components=5 -C /home/user/tools/ "${REPO}-${BRANCH}/${TOOLS_DIR}"

# Conditional wait for remote service based on llm_engine
if [ "$llm_engine" = "tgi" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ spec:
command: ["/bin/sh", "-c"]
args:
- |
TOOLS_GIT_URL="https://github.com/wanhakim/GenAIStudio/tree/main/app-backend/templates/tools"
TOOLS_GIT_URL="https://github.com/opea-project/GenAIStudio/tree/main/studio-backend/app/templates/tools"
OWNER=$(echo ${TOOLS_GIT_URL} | sed -E 's|https://github.com/([^/]+)/([^/]+)/tree/([^/]+)/.*|\1|')
REPO=$(echo ${TOOLS_GIT_URL} | sed -E 's|https://github.com/([^/]+)/([^/]+)/tree/([^/]+)/.*|\2|')
BRANCH=$(echo ${TOOLS_GIT_URL} | sed -E 's|https://github.com/[^/]+/[^/]+/tree/([^/]+)/.*|\1|')
TOOLS_DIR=$(echo ${TOOLS_GIT_URL} | sed -E 's|https://github.com/[^/]+/[^/]+/tree/[^/]+/(.*?)/?$|\1|')
if [[ "${TOOLS_DIR: -1}" == "/" ]]; then TOOLS_DIR="${TOOLS_DIR%/}"; fi
DOWNLOAD_URL="https://codeload.github.com/${OWNER}/${REPO}/tar.gz/${BRANCH}"
curl "${DOWNLOAD_URL}" | tar -xz --strip-components=4 -C /home/user/tools/ "${REPO}-${BRANCH}/${TOOLS_DIR}"
curl "${DOWNLOAD_URL}" | tar -xz --strip-components=5 -C /home/user/tools/ "${REPO}-${BRANCH}/${TOOLS_DIR}"

# Conditional wait for remote service based on llm_engine
if [ "$llm_engine" = "tgi" ]; then
Expand Down
Loading
Loading