Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion agents-api/src/domains/run/agents/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,19 @@ export class Agent {
this.ctx.approvedToolCalls = approvedToolCalls;
}

getPendingDurableApproval(): { toolCallId: string; toolName: string; args: unknown } | undefined {
getPendingDurableApproval():
| {
toolCallId: string;
toolName: string;
args: unknown;
delegatedApproval?: {
toolCallId: string;
toolName: string;
args: unknown;
subAgentId: string;
};
}
| undefined {
return this.ctx.pendingDurableApproval;
}

Expand Down
18 changes: 17 additions & 1 deletion agents-api/src/domains/run/agents/agent-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,5 +270,21 @@ export interface AgentRunContext {
string,
Array<{ approved: boolean; reason?: string; originalToolCallId?: string }>
>;
pendingDurableApproval?: { toolCallId: string; toolName: string; args: unknown };
pendingDurableApproval?: {
toolCallId: string;
toolName: string;
args: unknown;
delegatedApproval?: {
toolCallId: string;
toolName: string;
args: unknown;
subAgentId: string;
};
};
delegatedToolApproval?: {
toolCallId: string;
toolName: string;
approved: boolean;
reason?: string;
};
}
69 changes: 49 additions & 20 deletions agents-api/src/domains/run/agents/generateTaskHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,41 @@ export interface TaskHandlerConfig {
userId?: string;
}

// Returns a TaskState.Completed result with a `durable-approval-required` data artifact.
// We use Completed (not InputRequired) because the parent agent's tool-wrapper parses
// the artifact from the A2A response — using a different state would require changes to
// the A2A result handling pipeline. The artifact's `type` field distinguishes it.
function buildDurableApprovalResult(pendingApproval: {
toolCallId: string;
toolName: string;
args: unknown;
}): A2ATaskResult {
logger.info(
{ toolCallId: pendingApproval.toolCallId, toolName: pendingApproval.toolName },
'Returning durable-approval-required artifact'
);
return {
status: { state: TaskState.Completed },
artifacts: [
{
artifactId: generateId(),
parts: [
{
kind: 'data' as const,
data: {
type: 'durable-approval-required',
toolCallId: pendingApproval.toolCallId,
toolName: pendingApproval.toolName,
args: pendingApproval.args,
},
},
],
createdAt: new Date().toISOString(),
},
],
};
}

export const createTaskHandler = (
config: TaskHandlerConfig,
credentialStoreRegistry?: CredentialStoreRegistry
Expand Down Expand Up @@ -429,26 +464,7 @@ export const createTaskHandler = (

const pendingApproval = agent.getPendingDurableApproval();
if (pendingApproval) {
return {
status: { state: TaskState.Completed },
artifacts: [
{
artifactId: generateId(),
parts: [
{
kind: 'data' as const,
data: {
type: 'durable-approval-required',
toolCallId: pendingApproval.toolCallId,
toolName: pendingApproval.toolName,
args: pendingApproval.args,
},
},
],
createdAt: new Date().toISOString(),
},
],
};
return buildDurableApprovalResult(pendingApproval);
}

const stepContents =
Expand Down Expand Up @@ -599,6 +615,19 @@ export const createTaskHandler = (
],
};
} catch (error) {
const pendingApproval = agent?.getPendingDurableApproval();
if (pendingApproval) {
logger.info(
{
toolCallId: pendingApproval.toolCallId,
toolName: pendingApproval.toolName,
error: error instanceof Error ? error.message : String(error),
},
'Task handler caught error during durable approval flow, returning approval artifact'
);
return buildDurableApprovalResult(pendingApproval);
}

console.error('Task handler error:', error);

const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred';
Expand Down
36 changes: 30 additions & 6 deletions agents-api/src/domains/run/agents/relationTools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ export function createDelegateToAgentTool({
metadata,
sessionId,
credentialStoreRegistry,
agentRunContext,
}: {
delegateConfig: DelegateRelation;
callingAgentId: string;
Expand All @@ -296,6 +297,7 @@ export function createDelegateToAgentTool({
metadata: DelegationMetadata;
sessionId?: string;
credentialStoreRegistry?: CredentialStoreRegistry;
agentRunContext?: import('./agent-types').AgentRunContext;
}) {
const { tenantId, projectId, agentId, project } = executionContext;

Expand Down Expand Up @@ -424,18 +426,40 @@ export function createDelegateToAgentTool({
...(isInternal || isTeam ? { fetchFn: getInProcessFetch() } : {}),
});

const baseDelegationMeta = getDelegationMetadata({
isInternal,
callingAgentId,
delegationId,
metadata,
});

const delegationMeta = {
...baseDelegationMeta,
...(agentRunContext?.durableWorkflowRunId
? { durable_workflow_run_id: agentRunContext.durableWorkflowRunId }
: {}),
...(agentRunContext?.delegatedToolApproval
? {
approved_tool_calls: JSON.stringify({
[agentRunContext.delegatedToolApproval.toolName]: [
{
approved: agentRunContext.delegatedToolApproval.approved,
reason: agentRunContext.delegatedToolApproval.reason,
originalToolCallId: agentRunContext.delegatedToolApproval.toolCallId,
},
],
}),
}
: {}),
};

const messageToSend = {
role: 'agent' as const,
parts: [{ text: input.message, kind: 'text' as const }],
messageId: generateId(),
kind: 'message' as const,
contextId,
metadata: getDelegationMetadata({
isInternal,
callingAgentId,
delegationId,
metadata,
}),
metadata: delegationMeta,
};
logger.info({ messageToSend }, 'messageToSend');

Expand Down
1 change: 1 addition & 0 deletions agents-api/src/domains/run/agents/tools/relation-tools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export function getRelationTools(
},
sessionId,
credentialStoreRegistry: ctx.credentialStoreRegistry,
agentRunContext: ctx,
}),
runtimeContext?.metadata?.streamRequestId,
'delegation'
Expand Down
12 changes: 0 additions & 12 deletions agents-api/src/domains/run/agents/tools/tool-approval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,6 @@ export async function waitForToolApproval(
toolName,
input: args as Record<string, unknown>,
});
} else if (ctx.isDelegatedAgent) {
const currentStreamRequestId = ctx.streamRequestId ?? '';
if (currentStreamRequestId) {
await toolApprovalUiBus.publish(currentStreamRequestId, {
type: 'approval-needed',
toolCallId,
toolName,
input: args,
providerMetadata,
approvalId: `aitxt-${toolCallId}`,
});
}
}

ctx.pendingDurableApproval = { toolCallId, toolName, args };
Expand Down
78 changes: 77 additions & 1 deletion agents-api/src/domains/run/agents/tools/tool-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ export function wrapToolWithStreaming(
const isInternalToolForUi =
isInternalTool || toolName.startsWith('delegate_to_') || toolName === 'load_skill';

// In durable workflows, delegate_to_ tool results must be stored in
// conversation history so the next callLlmStep sees the delegation outcome
// and doesn't re-delegate in a loop.
const isDurableDelegation = !!ctx.durableWorkflowRunId && toolName.startsWith('delegate_to_');
const skipHistoryStorage = isInternalToolForUi && !isDurableDelegation;

const needsApproval = options?.needsApproval || false;

const preApprovedEntry = ctx.durableWorkflowRunId
Expand Down Expand Up @@ -183,13 +189,83 @@ export function wrapToolWithStreaming(
const result = await originalExecute(resolvedArgs, context);
const duration = Date.now() - startTime;

if (ctx.durableWorkflowRunId && result && typeof result === 'object') {
const resultObj = result as Record<string, unknown>;
const taskResult = resultObj?.result as Record<string, unknown> | undefined;

const findApprovalRequired = (
parts: Array<Record<string, unknown>> | undefined
): Record<string, unknown> | undefined => {
if (!Array.isArray(parts)) return undefined;
for (const part of parts) {
if (part?.kind === 'data') {
const data = part.data as Record<string, unknown> | undefined;
if (data?.type === 'durable-approval-required') return data;
}
}
return undefined;
};

const findApprovalInArtifacts = (
artifacts: Array<Record<string, unknown>> | undefined
): Record<string, unknown> | undefined => {
if (!Array.isArray(artifacts)) return undefined;
for (const artifact of artifacts) {
const found = findApprovalRequired(
artifact?.parts as Array<Record<string, unknown>> | undefined
);
if (found) return found;
}
return undefined;
};

const approvalData =
findApprovalRequired(taskResult?.parts as Array<Record<string, unknown>> | undefined) ??
findApprovalInArtifacts(
taskResult?.artifacts as Array<Record<string, unknown>> | undefined
);

if (approvalData) {
const delegatedToolCallId = approvalData.toolCallId;
const delegatedToolName = approvalData.toolName;

if (typeof delegatedToolCallId !== 'string' || !delegatedToolCallId) {
logger.error(
{ approvalData, parentToolName: toolName },
'Malformed durable-approval-required artifact: invalid toolCallId'
);
return result;
}
if (typeof delegatedToolName !== 'string' || !delegatedToolName) {
logger.error(
{ approvalData, parentToolName: toolName },
'Malformed durable-approval-required artifact: invalid toolName'
);
return result;
}

ctx.pendingDurableApproval = {
toolCallId: effectiveToolCallId,
toolName,
args: resolvedArgs,
delegatedApproval: {
toolCallId: delegatedToolCallId,
toolName: delegatedToolName,
args: approvalData.args,
subAgentId: toolName.replace(/^delegate_to_/, ''),
},
};
return result;
}
}

if (ctx.pendingDurableApproval) {
return result;
}

const toolResultConversationId = ctx.conversationId;

if (streamRequestId && !isInternalToolForUi && toolResultConversationId) {
if (streamRequestId && !skipHistoryStorage && toolResultConversationId) {
try {
const messageId = generateId();
const messageContent = await buildToolResultForConversationHistory(
Expand Down
28 changes: 27 additions & 1 deletion agents-api/src/domains/run/workflow/functions/agentExecution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async function _agentExecutionWorkflow(payload: AgentExecutionPayload) {
let currentSubAgentId = defaultSubAgentId;
let iterations = 0;
let approvalRound = 0;
let isPostApproval = false;

try {
while (iterations < maxTransfers) {
Expand All @@ -61,10 +62,12 @@ async function _agentExecutionWorkflow(payload: AgentExecutionPayload) {
workflowRunId,
streamNamespace,
taskId,
isPostApproval,
});

if (llmResult.type === 'transfer') {
currentSubAgentId = llmResult.targetSubAgentId;
isPostApproval = false;
continue;
}

Expand All @@ -78,7 +81,20 @@ async function _agentExecutionWorkflow(payload: AgentExecutionPayload) {
continuationStreamNamespace: continuationNs,
});

const token = `tool-approval:${payload.conversationId}:${workflowRunId}:${toolCall.toolCallId}`;
const hookToolCallId = llmResult.delegatedApproval?.toolCallId ?? toolCall.toolCallId;
const token = `tool-approval:${payload.conversationId}:${workflowRunId}:${hookToolCallId}`;

console.info('[agentExecution] Creating tool approval hook', {
hookToolCallId,
parentToolCallId: toolCall.toolCallId,
isDelegated: !!llmResult.delegatedApproval,
workflowRunId,
});

// The hook suspends the workflow until an external system resumes it.
// Unlike the in-process PendingToolApprovalManager (10-min timeout), durable
// hooks persist across restarts. Stale suspended workflows should be cleaned
// up by an external job that queries workflow_executions with status='suspended'.
const hook = toolApprovalHook.create({ token });
const approvalResult = await hook;
approvalRound++;
Expand All @@ -100,8 +116,18 @@ async function _agentExecutionWorkflow(payload: AgentExecutionPayload) {
taskId,
preApproved: approvalResult.approved,
approvalReason: approvalResult.reason,
...(llmResult.delegatedApproval
? {
delegatedApproval: llmResult.delegatedApproval,
delegatedApprovalDecision: {
approved: approvalResult.approved,
reason: approvalResult.reason,
},
}
: {}),
});
}
isPostApproval = true;
continue;
}

Expand Down
Loading