Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions multimodal/tarko/agent-interface/src/agent-event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,13 @@ export namespace AgentEventStream {
type: 'codebase';
}

/**
* User interrupt metadata
*/
export interface UserInterruptMetadata extends BaseEnvironmentInputMetadata {
type: 'user_interrupt';
}

/**
* Generic metadata for other types
*/
Expand All @@ -399,6 +406,7 @@ export namespace AgentEventStream {
| ScreenshotMetadata
| TextMetadata
| CodebaseMetadata
| UserInterruptMetadata
| GenericMetadata;

/**
Expand Down
38 changes: 36 additions & 2 deletions multimodal/tarko/agent-server/src/api/controllers/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ export async function executeQuery(req: Request, res: Response) {
}

try {
// Check if agent is currently running
const session = req.session!;
const isAgentRunning = session.getProcessingStatus();

if (isAgentRunning) {
// Agent is running - insert as environment input instead of starting new query
await session.insertEnvironmentInput(query, 'User interrupt message during agent execution');

// Return success response for environment input insertion
return res.status(200).json({
success: true,
type: 'environment_input_inserted',
message: 'User input inserted as environment input during agent execution'
});
}

// Agent is not running - proceed with normal query
// Get server instance to access workspace path
const server = req.app.locals.server;
const workspacePath = server.getCurrentWorkspace();
Expand Down Expand Up @@ -71,7 +88,7 @@ export async function executeQuery(req: Request, res: Response) {
};

// Use enhanced error handling in runQuery with environment input
const response = await req.session!.runQuery(runOptions);
const response = await session.runQuery(runOptions);

if (response.success) {
res.status(200).json({ result: response.result });
Expand All @@ -97,6 +114,23 @@ export async function executeStreamingQuery(req: Request, res: Response) {
}

try {
// Check if agent is currently running
const session = req.session!;
const isAgentRunning = session.getProcessingStatus();

if (isAgentRunning) {
// Agent is running - insert as environment input instead of starting new query
await session.insertEnvironmentInput(query, 'User interrupt message during agent execution');

// Return success response for environment input insertion
return res.status(200).json({
success: true,
type: 'environment_input_inserted',
message: 'User input inserted as environment input during agent execution'
});
}

// Agent is not running - proceed with normal streaming query
// Set response headers for streaming
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
Expand Down Expand Up @@ -130,7 +164,7 @@ export async function executeStreamingQuery(req: Request, res: Response) {
};

// Get streaming response with environment input - any errors will be returned as events
const eventStream = await req.session!.runQueryStreaming(runOptions);
const eventStream = await session.runQueryStreaming(runOptions);

// Stream events one by one
for await (const event of eventStream) {
Expand Down
45 changes: 45 additions & 0 deletions multimodal/tarko/agent-server/src/core/AgentSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,51 @@ export class AgentSession {
}
}

/**
* Insert environment input while agent is running
* @param content Environment input content
* @param description Optional description of the input
* @returns Promise resolving when environment input is inserted
*/
async insertEnvironmentInput(
content: string | ChatCompletionContentPart[],
description?: string,
): Promise<void> {
try {
// Create environment input event
const environmentInputEvent = this.agent.getEventStream().createEvent('environment_input', {
content,
description: description || 'User interrupt message during agent execution',
metadata: {
type: 'user_interrupt',
},
});

// Process the event through our handler for storage and AGIO
const handleEvent = this.createEventHandler();
await handleEvent(environmentInputEvent);

// Emit to client via event bridge
this.eventBridge.emit('environment_input', {
sessionId: this.id,
event: environmentInputEvent,
});

if (this.server.isDebug) {
console.log('[DEBUG] Environment input inserted', {
sessionId: this.id,
contentType: typeof content === 'string' ? 'string' : 'ContentPart',
description,
});
}
} catch (error) {
this.eventBridge.emit('error', {
message: error instanceof Error ? error.message : String(error),
});
throw error;
}
}

/**
* Update session configuration (model and runtime settings)
* The configuration will be used in subsequent queries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ import { Request, Response } from 'express';
const { mockContextProcessor, mockImageProcessor, mockSession, mockServer } = vi.hoisted(() => ({
mockContextProcessor: { processContextualReferences: vi.fn() },
mockImageProcessor: { compressImagesInQuery: vi.fn() },
mockSession: { runQuery: vi.fn(), runQueryStreaming: vi.fn() },
mockSession: {
runQuery: vi.fn(),
runQueryStreaming: vi.fn(),
getProcessingStatus: vi.fn().mockReturnValue(false),
insertEnvironmentInput: vi.fn()
},
mockServer: { getCurrentWorkspace: vi.fn().mockReturnValue('/workspace') },
}));

Expand Down Expand Up @@ -57,6 +62,8 @@ describe('Contextual References Bug Fix', () => {
beforeEach(() => {
vi.clearAllMocks();
mockImageProcessor.compressImagesInQuery.mockImplementation(q => Promise.resolve(q));
// Reset to default: agent not running
mockSession.getProcessingStatus.mockReturnValue(false);
});

describe('environmentInput conditional logic', () => {
Expand Down
59 changes: 58 additions & 1 deletion multimodal/tarko/agent-server/tests/api/queries.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import { Request, Response } from 'express';
const { mockContextProcessor, mockImageProcessor, mockSession, mockServer } = vi.hoisted(() => ({
mockContextProcessor: { processContextualReferences: vi.fn() },
mockImageProcessor: { compressImagesInQuery: vi.fn() },
mockSession: { runQuery: vi.fn(), runQueryStreaming: vi.fn(), abortQuery: vi.fn() },
mockSession: {
runQuery: vi.fn(),
runQueryStreaming: vi.fn(),
abortQuery: vi.fn(),
getProcessingStatus: vi.fn().mockReturnValue(false),
insertEnvironmentInput: vi.fn()
},
mockServer: { getCurrentWorkspace: vi.fn().mockReturnValue('/test/workspace') },
}));

Expand Down Expand Up @@ -53,9 +59,35 @@ const mockProcessing = (expandedContext: string | null, compressedQuery: any = n
describe('Queries Controller', () => {
beforeEach(() => {
vi.clearAllMocks();
// Reset to default: agent not running
mockSession.getProcessingStatus.mockReturnValue(false);
});

describe('executeQuery', () => {
it('should insert environment input when agent is running', async () => {
const userQuery = 'User interrupt message';
const req = createRequest(userQuery);
const res = createResponse();

mockSession.getProcessingStatus.mockReturnValue(true);
mockSession.insertEnvironmentInput.mockResolvedValue(undefined);

await executeQuery(req as Request, res as Response);

expect(mockSession.getProcessingStatus).toHaveBeenCalled();
expect(mockSession.insertEnvironmentInput).toHaveBeenCalledWith(
userQuery,
'User interrupt message during agent execution'
);
expect(res.status).toHaveBeenCalledWith(200);
expect(res.json).toHaveBeenCalledWith({
success: true,
type: 'environment_input_inserted',
message: 'User input inserted as environment input during agent execution'
});
expect(mockSession.runQuery).not.toHaveBeenCalled();
});

it('should process context and pass environmentInput when references exist', async () => {
const userQuery = 'Test query with @file reference';
const expandedContext = 'File content: function test() { return true; }';
Expand Down Expand Up @@ -164,6 +196,31 @@ describe('Queries Controller', () => {
});

describe('executeStreamingQuery', () => {
it('should insert environment input when agent is running', async () => {
const userQuery = 'User interrupt streaming message';
const req = createRequest(userQuery);
const res = createResponse();

mockSession.getProcessingStatus.mockReturnValue(true);
mockSession.insertEnvironmentInput.mockResolvedValue(undefined);

await executeStreamingQuery(req as Request, res as Response);

expect(mockSession.getProcessingStatus).toHaveBeenCalled();
expect(mockSession.insertEnvironmentInput).toHaveBeenCalledWith(
userQuery,
'User interrupt message during agent execution'
);
expect(res.status).toHaveBeenCalledWith(200);
expect(res.json).toHaveBeenCalledWith({
success: true,
type: 'environment_input_inserted',
message: 'User input inserted as environment input during agent execution'
});
expect(mockSession.runQueryStreaming).not.toHaveBeenCalled();
expect(res.setHeader).not.toHaveBeenCalled();
});

it('should process context and stream with environmentInput', async () => {
const userQuery = 'Streaming query with @dir reference';
const expandedContext = 'Directory listing: file1.js, file2.ts';
Expand Down
12 changes: 12 additions & 0 deletions multimodal/tarko/agent-ui/src/common/services/apiService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,18 @@ class ApiService {
throw new Error(`Failed to send query: ${response.statusText}`);
}

// Check if response is JSON (environment input insertion) or streaming
const contentType = response.headers.get('content-type');
if (contentType?.includes('application/json')) {
// This is an environment input insertion response
const result = await response.json();
if (result.type === 'environment_input_inserted') {
console.log('User input inserted as environment input during agent execution');
return; // No streaming for environment input insertion
}
}

// Handle streaming response
const reader = response.body?.getReader();
if (!reader) {
throw new Error('ReadableStream not supported');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ export const ChatPanel: React.FC = () => {
isDisabled={
!currentSessionId ||
currentSessionId === 'creating' ||
isProcessing ||
!connectionStatus.connected ||
isReplayMode
}
Expand Down
18 changes: 18 additions & 0 deletions multimodal/tarko/agent-ui/src/standalone/chat/Message/Message.css
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@
@apply px-2 text-gray-700 dark:text-gray-300 max-w-full mx-auto;
}

.environment-message-minimal {
@apply bg-amber-50 dark:bg-amber-900/20 border border-amber-200 dark:border-amber-800/50 text-amber-800 dark:text-amber-200 px-4 py-3 rounded-lg mx-auto max-w-fit text-sm;
position: relative;
}

.environment-message-minimal::before {
content: '💬';
@apply mr-2 opacity-70;
}

.environment-message-minimal .prose {
@apply text-amber-800 dark:text-amber-200 m-0;
}

.environment-message-minimal .prose p {
@apply m-0 text-amber-800 dark:text-amber-200;
}

.message-gap {
margin-bottom: 1rem;
position: relative;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ interface MessageGroupProps {
export const MessageGroup: React.FC<MessageGroupProps> = ({ messages, isThinking }) => {
const isProcessing = useAtomValue(isProcessingAtom);

// Filter out environment messages
const filteredMessages = messages.filter((msg) => msg.role !== 'environment');
// Keep all messages including environment messages
const filteredMessages = messages;

// If no messages after filtering, render nothing
if (filteredMessages.length === 0) {
return null;
}

// Get user messages and assistant messages
// Get user messages, environment messages, and assistant messages
const userMessages = filteredMessages.filter((msg) => msg.role === 'user');
const environmentMessages = filteredMessages.filter((msg) => msg.role === 'environment');
const assistantMessages = filteredMessages.filter(
(msg) => msg.role === 'assistant' || msg.role === 'system',
);
Expand Down Expand Up @@ -76,6 +77,11 @@ export const MessageGroup: React.FC<MessageGroupProps> = ({ messages, isThinking
return <Message key={userMsg.id} message={userMsg} />;
})}

{/* Render environment messages (inserted messages) */}
{environmentMessages.map((envMsg) => (
<Message key={envMsg.id} message={envMsg} />
))}

{/* Render all assistant messages - each message renders independently, supporting streaming display */}
{assistantMessages.map((message, index) => (
<Message
Expand Down
Loading