diff --git a/multimodal/tarko/agent-interface/src/agent-event-stream.ts b/multimodal/tarko/agent-interface/src/agent-event-stream.ts index f8d37a00ad..b51a515893 100644 --- a/multimodal/tarko/agent-interface/src/agent-event-stream.ts +++ b/multimodal/tarko/agent-interface/src/agent-event-stream.ts @@ -385,6 +385,13 @@ export namespace AgentEventStream { type: 'codebase'; } + /** + * User interrupt metadata + */ + export interface UserInterruptMetadata extends BaseEnvironmentInputMetadata { + type: 'user_interrupt'; + } + /** * Generic metadata for other types */ @@ -399,6 +406,7 @@ export namespace AgentEventStream { | ScreenshotMetadata | TextMetadata | CodebaseMetadata + | UserInterruptMetadata | GenericMetadata; /** diff --git a/multimodal/tarko/agent-server/src/api/controllers/queries.ts b/multimodal/tarko/agent-server/src/api/controllers/queries.ts index 464b2572f9..390e771788 100644 --- a/multimodal/tarko/agent-server/src/api/controllers/queries.ts +++ b/multimodal/tarko/agent-server/src/api/controllers/queries.ts @@ -43,6 +43,23 @@ export async function executeQuery(req: Request, res: Response) { } try { + // Check if agent is currently running + const session = req.session!; + const isAgentRunning = session.getProcessingStatus(); + + if (isAgentRunning) { + // Agent is running - insert as environment input instead of starting new query + await session.insertEnvironmentInput(query, 'User interrupt message during agent execution'); + + // Return success response for environment input insertion + return res.status(200).json({ + success: true, + type: 'environment_input_inserted', + message: 'User input inserted as environment input during agent execution' + }); + } + + // Agent is not running - proceed with normal query // Get server instance to access workspace path const server = req.app.locals.server; const workspacePath = server.getCurrentWorkspace(); @@ -71,7 +88,7 @@ export async function executeQuery(req: Request, res: Response) { }; // Use enhanced error handling in runQuery with environment input - const response = await req.session!.runQuery(runOptions); + const response = await session.runQuery(runOptions); if (response.success) { res.status(200).json({ result: response.result }); @@ -97,6 +114,23 @@ export async function executeStreamingQuery(req: Request, res: Response) { } try { + // Check if agent is currently running + const session = req.session!; + const isAgentRunning = session.getProcessingStatus(); + + if (isAgentRunning) { + // Agent is running - insert as environment input instead of starting new query + await session.insertEnvironmentInput(query, 'User interrupt message during agent execution'); + + // Return success response for environment input insertion + return res.status(200).json({ + success: true, + type: 'environment_input_inserted', + message: 'User input inserted as environment input during agent execution' + }); + } + + // Agent is not running - proceed with normal streaming query // Set response headers for streaming res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); @@ -130,7 +164,7 @@ export async function executeStreamingQuery(req: Request, res: Response) { }; // Get streaming response with environment input - any errors will be returned as events - const eventStream = await req.session!.runQueryStreaming(runOptions); + const eventStream = await session.runQueryStreaming(runOptions); // Stream events one by one for await (const event of eventStream) { diff --git a/multimodal/tarko/agent-server/src/core/AgentSession.ts b/multimodal/tarko/agent-server/src/core/AgentSession.ts index a3f5f50368..eb38dd31c2 100644 --- a/multimodal/tarko/agent-server/src/core/AgentSession.ts +++ b/multimodal/tarko/agent-server/src/core/AgentSession.ts @@ -504,6 +504,51 @@ export class AgentSession { } } + /** + * Insert environment input while agent is running + * @param content Environment input content + * @param description Optional description of the input + * @returns Promise resolving when environment input is inserted + */ + async insertEnvironmentInput( + content: string | ChatCompletionContentPart[], + description?: string, + ): Promise { + try { + // Create environment input event + const environmentInputEvent = this.agent.getEventStream().createEvent('environment_input', { + content, + description: description || 'User interrupt message during agent execution', + metadata: { + type: 'user_interrupt', + }, + }); + + // Process the event through our handler for storage and AGIO + const handleEvent = this.createEventHandler(); + await handleEvent(environmentInputEvent); + + // Emit to client via event bridge + this.eventBridge.emit('environment_input', { + sessionId: this.id, + event: environmentInputEvent, + }); + + if (this.server.isDebug) { + console.log('[DEBUG] Environment input inserted', { + sessionId: this.id, + contentType: typeof content === 'string' ? 'string' : 'ContentPart', + description, + }); + } + } catch (error) { + this.eventBridge.emit('error', { + message: error instanceof Error ? error.message : String(error), + }); + throw error; + } + } + /** * Update session configuration (model and runtime settings) * The configuration will be used in subsequent queries diff --git a/multimodal/tarko/agent-server/tests/api/contextual-references.test.ts b/multimodal/tarko/agent-server/tests/api/contextual-references.test.ts index e82e20f222..2809772056 100644 --- a/multimodal/tarko/agent-server/tests/api/contextual-references.test.ts +++ b/multimodal/tarko/agent-server/tests/api/contextual-references.test.ts @@ -10,7 +10,12 @@ import { Request, Response } from 'express'; const { mockContextProcessor, mockImageProcessor, mockSession, mockServer } = vi.hoisted(() => ({ mockContextProcessor: { processContextualReferences: vi.fn() }, mockImageProcessor: { compressImagesInQuery: vi.fn() }, - mockSession: { runQuery: vi.fn(), runQueryStreaming: vi.fn() }, + mockSession: { + runQuery: vi.fn(), + runQueryStreaming: vi.fn(), + getProcessingStatus: vi.fn().mockReturnValue(false), + insertEnvironmentInput: vi.fn() + }, mockServer: { getCurrentWorkspace: vi.fn().mockReturnValue('/workspace') }, })); @@ -57,6 +62,8 @@ describe('Contextual References Bug Fix', () => { beforeEach(() => { vi.clearAllMocks(); mockImageProcessor.compressImagesInQuery.mockImplementation(q => Promise.resolve(q)); + // Reset to default: agent not running + mockSession.getProcessingStatus.mockReturnValue(false); }); describe('environmentInput conditional logic', () => { diff --git a/multimodal/tarko/agent-server/tests/api/queries.test.ts b/multimodal/tarko/agent-server/tests/api/queries.test.ts index 9811ac774d..89228c0eff 100644 --- a/multimodal/tarko/agent-server/tests/api/queries.test.ts +++ b/multimodal/tarko/agent-server/tests/api/queries.test.ts @@ -10,7 +10,13 @@ import { Request, Response } from 'express'; const { mockContextProcessor, mockImageProcessor, mockSession, mockServer } = vi.hoisted(() => ({ mockContextProcessor: { processContextualReferences: vi.fn() }, mockImageProcessor: { compressImagesInQuery: vi.fn() }, - mockSession: { runQuery: vi.fn(), runQueryStreaming: vi.fn(), abortQuery: vi.fn() }, + mockSession: { + runQuery: vi.fn(), + runQueryStreaming: vi.fn(), + abortQuery: vi.fn(), + getProcessingStatus: vi.fn().mockReturnValue(false), + insertEnvironmentInput: vi.fn() + }, mockServer: { getCurrentWorkspace: vi.fn().mockReturnValue('/test/workspace') }, })); @@ -53,9 +59,35 @@ const mockProcessing = (expandedContext: string | null, compressedQuery: any = n describe('Queries Controller', () => { beforeEach(() => { vi.clearAllMocks(); + // Reset to default: agent not running + mockSession.getProcessingStatus.mockReturnValue(false); }); describe('executeQuery', () => { + it('should insert environment input when agent is running', async () => { + const userQuery = 'User interrupt message'; + const req = createRequest(userQuery); + const res = createResponse(); + + mockSession.getProcessingStatus.mockReturnValue(true); + mockSession.insertEnvironmentInput.mockResolvedValue(undefined); + + await executeQuery(req as Request, res as Response); + + expect(mockSession.getProcessingStatus).toHaveBeenCalled(); + expect(mockSession.insertEnvironmentInput).toHaveBeenCalledWith( + userQuery, + 'User interrupt message during agent execution' + ); + expect(res.status).toHaveBeenCalledWith(200); + expect(res.json).toHaveBeenCalledWith({ + success: true, + type: 'environment_input_inserted', + message: 'User input inserted as environment input during agent execution' + }); + expect(mockSession.runQuery).not.toHaveBeenCalled(); + }); + it('should process context and pass environmentInput when references exist', async () => { const userQuery = 'Test query with @file reference'; const expandedContext = 'File content: function test() { return true; }'; @@ -164,6 +196,31 @@ describe('Queries Controller', () => { }); describe('executeStreamingQuery', () => { + it('should insert environment input when agent is running', async () => { + const userQuery = 'User interrupt streaming message'; + const req = createRequest(userQuery); + const res = createResponse(); + + mockSession.getProcessingStatus.mockReturnValue(true); + mockSession.insertEnvironmentInput.mockResolvedValue(undefined); + + await executeStreamingQuery(req as Request, res as Response); + + expect(mockSession.getProcessingStatus).toHaveBeenCalled(); + expect(mockSession.insertEnvironmentInput).toHaveBeenCalledWith( + userQuery, + 'User interrupt message during agent execution' + ); + expect(res.status).toHaveBeenCalledWith(200); + expect(res.json).toHaveBeenCalledWith({ + success: true, + type: 'environment_input_inserted', + message: 'User input inserted as environment input during agent execution' + }); + expect(mockSession.runQueryStreaming).not.toHaveBeenCalled(); + expect(res.setHeader).not.toHaveBeenCalled(); + }); + it('should process context and stream with environmentInput', async () => { const userQuery = 'Streaming query with @dir reference'; const expandedContext = 'Directory listing: file1.js, file2.ts'; diff --git a/multimodal/tarko/agent-ui/src/common/services/apiService.ts b/multimodal/tarko/agent-ui/src/common/services/apiService.ts index 87cf60a422..6870ba5838 100644 --- a/multimodal/tarko/agent-ui/src/common/services/apiService.ts +++ b/multimodal/tarko/agent-ui/src/common/services/apiService.ts @@ -243,6 +243,18 @@ class ApiService { throw new Error(`Failed to send query: ${response.statusText}`); } + // Check if response is JSON (environment input insertion) or streaming + const contentType = response.headers.get('content-type'); + if (contentType?.includes('application/json')) { + // This is an environment input insertion response + const result = await response.json(); + if (result.type === 'environment_input_inserted') { + console.log('User input inserted as environment input during agent execution'); + return; // No streaming for environment input insertion + } + } + + // Handle streaming response const reader = response.body?.getReader(); if (!reader) { throw new Error('ReadableStream not supported'); diff --git a/multimodal/tarko/agent-ui/src/standalone/chat/ChatPanel.tsx b/multimodal/tarko/agent-ui/src/standalone/chat/ChatPanel.tsx index 0019a3b6e7..dd27d06f6e 100644 --- a/multimodal/tarko/agent-ui/src/standalone/chat/ChatPanel.tsx +++ b/multimodal/tarko/agent-ui/src/standalone/chat/ChatPanel.tsx @@ -88,7 +88,6 @@ export const ChatPanel: React.FC = () => { isDisabled={ !currentSessionId || currentSessionId === 'creating' || - isProcessing || !connectionStatus.connected || isReplayMode } diff --git a/multimodal/tarko/agent-ui/src/standalone/chat/Message/Message.css b/multimodal/tarko/agent-ui/src/standalone/chat/Message/Message.css index e9769c2aad..190afbbdba 100644 --- a/multimodal/tarko/agent-ui/src/standalone/chat/Message/Message.css +++ b/multimodal/tarko/agent-ui/src/standalone/chat/Message/Message.css @@ -90,6 +90,24 @@ @apply px-2 text-gray-700 dark:text-gray-300 max-w-full mx-auto; } +.environment-message-minimal { + @apply bg-amber-50 dark:bg-amber-900/20 border border-amber-200 dark:border-amber-800/50 text-amber-800 dark:text-amber-200 px-4 py-3 rounded-lg mx-auto max-w-fit text-sm; + position: relative; +} + +.environment-message-minimal::before { + content: '💬'; + @apply mr-2 opacity-70; +} + +.environment-message-minimal .prose { + @apply text-amber-800 dark:text-amber-200 m-0; +} + +.environment-message-minimal .prose p { + @apply m-0 text-amber-800 dark:text-amber-200; +} + .message-gap { margin-bottom: 1rem; position: relative; diff --git a/multimodal/tarko/agent-ui/src/standalone/chat/Message/components/MessageGroup.tsx b/multimodal/tarko/agent-ui/src/standalone/chat/Message/components/MessageGroup.tsx index 634bacdd9c..bb053d0afa 100644 --- a/multimodal/tarko/agent-ui/src/standalone/chat/Message/components/MessageGroup.tsx +++ b/multimodal/tarko/agent-ui/src/standalone/chat/Message/components/MessageGroup.tsx @@ -24,16 +24,17 @@ interface MessageGroupProps { export const MessageGroup: React.FC = ({ messages, isThinking }) => { const isProcessing = useAtomValue(isProcessingAtom); - // Filter out environment messages - const filteredMessages = messages.filter((msg) => msg.role !== 'environment'); + // Keep all messages including environment messages + const filteredMessages = messages; // If no messages after filtering, render nothing if (filteredMessages.length === 0) { return null; } - // Get user messages and assistant messages + // Get user messages, environment messages, and assistant messages const userMessages = filteredMessages.filter((msg) => msg.role === 'user'); + const environmentMessages = filteredMessages.filter((msg) => msg.role === 'environment'); const assistantMessages = filteredMessages.filter( (msg) => msg.role === 'assistant' || msg.role === 'system', ); @@ -76,6 +77,11 @@ export const MessageGroup: React.FC = ({ messages, isThinking return ; })} + {/* Render environment messages (inserted messages) */} + {environmentMessages.map((envMsg) => ( + + ))} + {/* Render all assistant messages - each message renders independently, supporting streaming display */} {assistantMessages.map((message, index) => ( = ({ connectionStatus && !connectionStatus.connected ? 'Server disconnected...' : isProcessing - ? `${getAgentTitle()} is running...` + ? `Insert a message while ${getAgentTitle()} is running... (Ctrl+Enter to send)` : contextualSelectorEnabled ? `Ask ${getAgentTitle()} something... (Use @ to reference files/folders, Ctrl+Enter to send)` : `Ask ${getAgentTitle()} something... (Ctrl+Enter to send)`; @@ -532,64 +532,78 @@ export const ChatInput: React.FC = ({ )} {/* Action buttons */} - - {connectionStatus && !connectionStatus.connected ? ( - - - - ) : isProcessing ? ( - - {isAborting ? ( -
- ) : ( -
- )} - - ) : ( - - - - )} - +
+ {/* Reconnect button */} + + {connectionStatus && !connectionStatus.connected && ( + + + + )} + + + {/* Abort button - show when processing */} + + {isProcessing && ( + + {isAborting ? ( +
+ ) : ( +
+ )} + + )} + + + {/* Send button - always show when not reconnecting */} + + {!(connectionStatus && !connectionStatus.connected) && ( + + + + )} + +