Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 118 additions & 4 deletions agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import io.agentscope.core.tool.ToolExecutionContext;
import io.agentscope.core.tool.ToolResultMessageBuilder;
import io.agentscope.core.tool.Toolkit;
import io.agentscope.core.util.ExceptionUtils;
import io.agentscope.core.util.MessageUtils;
import java.util.ArrayList;
import java.util.Comparator;
Expand Down Expand Up @@ -254,9 +255,92 @@ protected Mono<Msg> doCall(List<Msg> msgs) {
return executeIteration(0);
}

// Has pending tools -> validate and add tool results
validateAndAddToolResults(msgs, pendingIds);
return hasPendingToolUse() ? acting(0) : executeIteration(0);
// Has pending tools but no input -> resume (execute pending tools directly)
if (msgs == null || msgs.isEmpty()) {
return hasPendingToolUse() ? acting(0) : executeIteration(0);
}

// Has pending tools + input -> check if user provided tool results
List<ToolResultBlock> providedResults =
msgs.stream()
.flatMap(m -> m.getContentBlocks(ToolResultBlock.class).stream())
.toList();

if (!providedResults.isEmpty()) {
// User provided tool results -> validate and add
validateAndAddToolResults(msgs, pendingIds);
return hasPendingToolUse() ? acting(0) : executeIteration(0);
}

// User sent a new message without tool results -> auto-recover from orphaned pending state
log.warn(
"Pending tool calls detected without results, auto-generating error results."
+ " Pending IDs: {}",
pendingIds);
return generateAndAddErrorToolResults(pendingIds)
.then(
Mono.defer(
() -> {
addToMemory(msgs);
return executeIteration(0);
}));
}

/**
* Build a {@link ToolResultBlock} representing a tool execution error.
*
* @param toolId the id of the tool call that failed
* @param errorMessage the human-readable error description
* @return a {@link ToolResultBlock} containing the formatted error message
*/
private static ToolResultBlock buildErrorToolResult(String toolId, String errorMessage) {
return ToolResultBlock.builder()
.id(toolId)
.output(List.of(TextBlock.builder().text("[ERROR] " + errorMessage).build()))
.build();
}

/**
* Generate error tool results for pending tool calls and emit them through the
* {@link PostActingEvent} hook pipeline before adding to memory. This ensures consistent
* tool-result lifecycle behavior (including StreamingHook's TOOL_RESULT emission and any
* hook-based sanitization/transform) for auto-recovered error results.
*
* @param pendingIds The set of pending tool use IDs
* @return Mono that completes when all error results have been processed through hooks and
* added to memory
*/
private Mono<Void> generateAndAddErrorToolResults(Set<String> pendingIds) {
Msg lastAssistant = findLastAssistantMsg();
if (lastAssistant == null) {
return Mono.empty();
}

List<ToolUseBlock> pendingToolCalls =
lastAssistant.getContentBlocks(ToolUseBlock.class).stream()
.filter(toolUse -> pendingIds.contains(toolUse.getId()))
.toList();

if (pendingToolCalls.isEmpty()) {
return Mono.empty();
}

return Flux.fromIterable(pendingToolCalls)
.concatMap(
toolCall -> {
ToolResultBlock errorResult =
buildErrorToolResult(
toolCall.getId(),
"Previous tool execution failed or was interrupted."
+ " Tool: "
+ toolCall.getName());
log.info(
"Auto-generated error result for pending tool call: {} ({})",
toolCall.getName(),
toolCall.getId());
return notifyPostActingHook(Map.entry(toolCall, errorResult));
})
.then();
}

/**
Expand Down Expand Up @@ -595,6 +679,10 @@ private Msg buildSuspendedMsg(List<Map.Entry<ToolUseBlock, ToolResultBlock>> pen
/**
* Execute tool calls and return paired results.
*
* <p>If tool execution fails (timeout, error, etc.), this method generates error tool results
* for all pending tool calls instead of propagating the error. This ensures the agent can
* continue processing and the model receives proper error feedback.
*
* @param toolCalls The list of tool calls (potentially modified by PreActingEvent hooks)
* @return Mono containing list of (ToolUseBlock, ToolResultBlock) pairs
*/
Expand All @@ -605,7 +693,33 @@ private Mono<List<Map.Entry<ToolUseBlock, ToolResultBlock>>> executeToolCalls(
results ->
IntStream.range(0, toolCalls.size())
.mapToObj(i -> Map.entry(toolCalls.get(i), results.get(i)))
.toList());
.toList())
.onErrorResume(
Exception.class,
error -> {
// Generate error tool results for all pending tool calls.
// Only catch Exception subclasses; critical JVM errors
// (e.g. OutOfMemoryError) are left to propagate.
String errorMsg = ExceptionUtils.getErrorMessage(error);
log.error(
"Tool execution failed, generating error results for {} tool"
+ " calls",
toolCalls.size(),
error);
List<Map.Entry<ToolUseBlock, ToolResultBlock>> errorResults =
toolCalls.stream()
.map(
toolCall -> {
ToolResultBlock errorResult =
buildErrorToolResult(
toolCall.getId(),
"Tool execution failed: "
+ errorMsg);
return Map.entry(toolCall, errorResult);
})
.toList();
return Mono.just(errorResults);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

/**
* Comprehensive tests for the Hook Stop Agent feature.
Expand Down Expand Up @@ -346,10 +345,15 @@ void testResumeWithToolResultMsg() {
}

@Test
@DisplayName("New message with pending tool calls throws error")
@DisplayName("New message with pending tool calls auto-recovers")
void testNewMsgWithPendingToolUseContinuesActing() {
Msg toolUseMsg = createToolUseMsg("tool1", "test_tool", Map.of());
setupModelToReturnToolUse(toolUseMsg);
Msg textResponse =
createAssistantTextMsg("Recovered after auto-generated error results");

when(mockModel.stream(anyList(), anyList(), any()))
.thenReturn(createFluxFromMsg(toolUseMsg))
.thenReturn(createFluxFromMsg(textResponse));

Hook stopHook = createPostReasoningStopHook();

Expand All @@ -369,15 +373,51 @@ void testNewMsgWithPendingToolUseContinuesActing() {
result1.hasContentBlocks(ToolUseBlock.class),
"First call should return ToolUse message");

// Send a new regular message - should throw error due to pending tool calls
// Send a new regular message - should auto-recover by generating error results
Msg newMsg = createUserMsg("new message");
Msg result2 = agent.call(newMsg).block(TEST_TIMEOUT);

assertNotNull(result2, "Agent should auto-recover and return a result");

// Verify the model was invoked a second time (the follow-up reasoning call)
verify(mockModel, times(2)).stream(anyList(), anyList(), any());

StepVerifier.create(agent.call(newMsg))
.expectErrorMatches(
e ->
e instanceof IllegalStateException
&& e.getMessage().contains("pending tool calls"))
.verify();
// Verify the follow-up response content is the expected text
assertTrue(
result2.hasContentBlocks(TextBlock.class),
"Recovery result should contain text content");
String resultText =
result2.getContentBlocks(TextBlock.class).stream()
.map(TextBlock::getText)
.findFirst()
.orElse("");
assertEquals(
"Recovered after auto-generated error results",
resultText,
"Recovery result should match the model's follow-up response");

// Verify that an error ToolResultBlock was written into memory for the
// pending tool call id, proving the pending state was actually cleared
List<Msg> memoryMsgs = memory.getMessages();
boolean hasErrorToolResult =
memoryMsgs.stream()
.flatMap(m -> m.getContentBlocks(ToolResultBlock.class).stream())
.anyMatch(
tr ->
"tool1".equals(tr.getId())
&& tr.getOutput().stream()
.anyMatch(
cb ->
cb instanceof TextBlock
&& ((TextBlock)
cb)
.getText()
.contains(
"[ERROR]")));
assertTrue(
hasErrorToolResult,
"Memory should contain an error ToolResultBlock for the pending tool call"
+ " id='tool1'");
}
}

Expand Down Expand Up @@ -643,10 +683,14 @@ void testNormalCallAfterCompletion() {
}

@Test
@DisplayName("Agent throws error when adding regular message with pending tool calls")
@DisplayName("Agent auto-recovers when adding regular message with pending tool calls")
void testAgentHandlesPendingToolCallsGracefully() {
Msg toolUseMsg = createToolUseMsg("tool1", "test_tool", Map.of());
setupModelToReturnToolUse(toolUseMsg);
Msg textResponse = createAssistantTextMsg("Recovered");

when(mockModel.stream(anyList(), anyList(), any()))
.thenReturn(createFluxFromMsg(toolUseMsg))
.thenReturn(createFluxFromMsg(textResponse));

Hook stopHook = createPostReasoningStopHook();

Expand All @@ -662,14 +706,10 @@ void testAgentHandlesPendingToolCallsGracefully() {

agent.call(createUserMsg("test")).block(TEST_TIMEOUT);

// With new design, agent will throw error when adding regular message
// with pending tool calls
StepVerifier.create(agent.call(createUserMsg("new")))
.expectErrorMatches(
e ->
e instanceof IllegalStateException
&& e.getMessage().contains("pending tool calls"))
.verify();
// With new design, agent will auto-recover by generating error results
// for pending tool calls and continue processing
Msg result = agent.call(createUserMsg("new")).block(TEST_TIMEOUT);
assertNotNull(result, "Agent should auto-recover and return a result");
}
}

Expand Down
Loading