Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion agents-api/src/domains/run/agents/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,19 @@ export class Agent {
this.ctx.approvedToolCalls = approvedToolCalls;
}

getPendingDurableApproval(): { toolCallId: string; toolName: string; args: unknown } | undefined {
getPendingDurableApproval():
| {
toolCallId: string;
toolName: string;
args: unknown;
delegatedApproval?: {
toolCallId: string;
toolName: string;
args: unknown;
subAgentId: string;
};
}
| undefined {
return this.ctx.pendingDurableApproval;
}

Expand Down
18 changes: 17 additions & 1 deletion agents-api/src/domains/run/agents/agent-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,5 +270,21 @@ export interface AgentRunContext {
string,
Array<{ approved: boolean; reason?: string; originalToolCallId?: string }>
>;
pendingDurableApproval?: { toolCallId: string; toolName: string; args: unknown };
pendingDurableApproval?: {
toolCallId: string;
toolName: string;
args: unknown;
delegatedApproval?: {
toolCallId: string;
toolName: string;
args: unknown;
subAgentId: string;
};
};
delegatedToolApproval?: {
toolCallId: string;
toolName: string;
approved: boolean;
reason?: string;
};
}
69 changes: 49 additions & 20 deletions agents-api/src/domains/run/agents/generateTaskHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,41 @@ export interface TaskHandlerConfig {
userId?: string;
}

// Returns a TaskState.Completed result with a `durable-approval-required` data artifact.
// We use Completed (not InputRequired) because the parent agent's tool-wrapper parses
// the artifact from the A2A response — using a different state would require changes to
// the A2A result handling pipeline. The artifact's `type` field distinguishes it.
function buildDurableApprovalResult(pendingApproval: {
toolCallId: string;
toolName: string;
args: unknown;
}): A2ATaskResult {
logger.info(
{ toolCallId: pendingApproval.toolCallId, toolName: pendingApproval.toolName },
'Returning durable-approval-required artifact'
);
return {
status: { state: TaskState.Completed },
artifacts: [
{
artifactId: generateId(),
parts: [
{
kind: 'data' as const,
data: {
type: 'durable-approval-required',
toolCallId: pendingApproval.toolCallId,
toolName: pendingApproval.toolName,
args: pendingApproval.args,
},
},
],
createdAt: new Date().toISOString(),
},
],
};
}

export const createTaskHandler = (
config: TaskHandlerConfig,
credentialStoreRegistry?: CredentialStoreRegistry
Expand Down Expand Up @@ -429,26 +464,7 @@ export const createTaskHandler = (

const pendingApproval = agent.getPendingDurableApproval();
if (pendingApproval) {
return {
status: { state: TaskState.Completed },
artifacts: [
{
artifactId: generateId(),
parts: [
{
kind: 'data' as const,
data: {
type: 'durable-approval-required',
toolCallId: pendingApproval.toolCallId,
toolName: pendingApproval.toolName,
args: pendingApproval.args,
},
},
],
createdAt: new Date().toISOString(),
},
],
};
return buildDurableApprovalResult(pendingApproval);
}

const stepContents =
Expand Down Expand Up @@ -599,6 +615,19 @@ export const createTaskHandler = (
],
};
} catch (error) {
const pendingApproval = agent?.getPendingDurableApproval();
if (pendingApproval) {
logger.warn(
{
toolCallId: pendingApproval.toolCallId,
toolName: pendingApproval.toolName,
error,
},
'Task handler caught error during durable approval flow, returning approval artifact'
);
return buildDurableApprovalResult(pendingApproval);
}

console.error('Task handler error:', error);

const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred';
Expand Down
36 changes: 30 additions & 6 deletions agents-api/src/domains/run/agents/relationTools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ export function createDelegateToAgentTool({
metadata,
sessionId,
credentialStoreRegistry,
agentRunContext,
}: {
delegateConfig: DelegateRelation;
callingAgentId: string;
Expand All @@ -296,6 +297,7 @@ export function createDelegateToAgentTool({
metadata: DelegationMetadata;
sessionId?: string;
credentialStoreRegistry?: CredentialStoreRegistry;
agentRunContext?: import('./agent-types').AgentRunContext;
}) {
const { tenantId, projectId, agentId, project } = executionContext;

Expand Down Expand Up @@ -424,18 +426,40 @@ export function createDelegateToAgentTool({
...(isInternal || isTeam ? { fetchFn: getInProcessFetch() } : {}),
});

const baseDelegationMeta = getDelegationMetadata({
isInternal,
callingAgentId,
delegationId,
metadata,
});

const delegationMeta = {
...baseDelegationMeta,
...(agentRunContext?.durableWorkflowRunId
? { durable_workflow_run_id: agentRunContext.durableWorkflowRunId }
: {}),
...(agentRunContext?.delegatedToolApproval
? {
approved_tool_calls: JSON.stringify({
[agentRunContext.delegatedToolApproval.toolName]: [
{
approved: agentRunContext.delegatedToolApproval.approved,
reason: agentRunContext.delegatedToolApproval.reason,
originalToolCallId: agentRunContext.delegatedToolApproval.toolCallId,
},
],
}),
}
: {}),
};

const messageToSend = {
role: 'agent' as const,
parts: [{ text: input.message, kind: 'text' as const }],
messageId: generateId(),
kind: 'message' as const,
contextId,
metadata: getDelegationMetadata({
isInternal,
callingAgentId,
delegationId,
metadata,
}),
metadata: delegationMeta,
};
logger.info({ messageToSend }, 'messageToSend');

Expand Down
1 change: 1 addition & 0 deletions agents-api/src/domains/run/agents/tools/relation-tools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export function getRelationTools(
},
sessionId,
credentialStoreRegistry: ctx.credentialStoreRegistry,
agentRunContext: ctx,
}),
runtimeContext?.metadata?.streamRequestId,
'delegation'
Expand Down
12 changes: 0 additions & 12 deletions agents-api/src/domains/run/agents/tools/tool-approval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,6 @@ export async function waitForToolApproval(
toolName,
input: args as Record<string, unknown>,
});
} else if (ctx.isDelegatedAgent) {
const currentStreamRequestId = ctx.streamRequestId ?? '';
if (currentStreamRequestId) {
await toolApprovalUiBus.publish(currentStreamRequestId, {
type: 'approval-needed',
toolCallId,
toolName,
input: args,
providerMetadata,
approvalId: `aitxt-${toolCallId}`,
});
}
}

ctx.pendingDurableApproval = { toolCallId, toolName, args };
Expand Down
78 changes: 77 additions & 1 deletion agents-api/src/domains/run/agents/tools/tool-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ export function wrapToolWithStreaming(
const isInternalToolForUi =
isInternalTool || toolName.startsWith('delegate_to_') || toolName === 'load_skill';

// In durable workflows, delegate_to_ tool results must be stored in
// conversation history so the next callLlmStep sees the delegation outcome
// and doesn't re-delegate in a loop.
const isDurableDelegation = !!ctx.durableWorkflowRunId && toolName.startsWith('delegate_to_');
const skipHistoryStorage = isInternalToolForUi && !isDurableDelegation;

const needsApproval = options?.needsApproval || false;

const preApprovedEntry = ctx.durableWorkflowRunId
Expand Down Expand Up @@ -183,13 +189,83 @@ export function wrapToolWithStreaming(
const result = await originalExecute(resolvedArgs, context);
const duration = Date.now() - startTime;

if (ctx.durableWorkflowRunId && result && typeof result === 'object') {
const resultObj = result as Record<string, unknown>;
const taskResult = resultObj?.result as Record<string, unknown> | undefined;

const findApprovalRequired = (
parts: Array<Record<string, unknown>> | undefined
): Record<string, unknown> | undefined => {
if (!Array.isArray(parts)) return undefined;
for (const part of parts) {
if (part?.kind === 'data') {
const data = part.data as Record<string, unknown> | undefined;
if (data?.type === 'durable-approval-required') return data;
}
}
return undefined;
};

const findApprovalInArtifacts = (
artifacts: Array<Record<string, unknown>> | undefined
): Record<string, unknown> | undefined => {
if (!Array.isArray(artifacts)) return undefined;
for (const artifact of artifacts) {
const found = findApprovalRequired(
artifact?.parts as Array<Record<string, unknown>> | undefined
);
if (found) return found;
}
return undefined;
};

const approvalData =
findApprovalRequired(taskResult?.parts as Array<Record<string, unknown>> | undefined) ??
findApprovalInArtifacts(
taskResult?.artifacts as Array<Record<string, unknown>> | undefined
);

if (approvalData) {
const delegatedToolCallId = approvalData.toolCallId;
const delegatedToolName = approvalData.toolName;

if (typeof delegatedToolCallId !== 'string' || !delegatedToolCallId) {
logger.error(
{ approvalData, parentToolName: toolName },
'Malformed durable-approval-required artifact: invalid toolCallId'
);
return result;
}
if (typeof delegatedToolName !== 'string' || !delegatedToolName) {
logger.error(
{ approvalData, parentToolName: toolName },
'Malformed durable-approval-required artifact: invalid toolName'
);
return result;
}

ctx.pendingDurableApproval = {
toolCallId: effectiveToolCallId,
toolName,
args: resolvedArgs,
delegatedApproval: {
toolCallId: delegatedToolCallId,
toolName: delegatedToolName,
args: approvalData.args,
subAgentId: toolName.replace(/^delegate_to_/, ''),
},
};
return result;
}
}

if (ctx.pendingDurableApproval) {
return result;
}

const toolResultConversationId = ctx.conversationId;

if (streamRequestId && !isInternalToolForUi && toolResultConversationId) {
if (streamRequestId && !skipHistoryStorage && toolResultConversationId) {
try {
const messageId = generateId();
const messageContent = await buildToolResultForConversationHistory(
Expand Down
31 changes: 30 additions & 1 deletion agents-api/src/domains/run/workflow/functions/agentExecution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async function _agentExecutionWorkflow(payload: AgentExecutionPayload) {
let currentSubAgentId = defaultSubAgentId;
let iterations = 0;
let approvalRound = 0;
let isPostApproval = false;

try {
while (iterations < maxTransfers) {
Expand All @@ -61,10 +62,12 @@ async function _agentExecutionWorkflow(payload: AgentExecutionPayload) {
workflowRunId,
streamNamespace,
taskId,
isPostApproval,
});

if (llmResult.type === 'transfer') {
currentSubAgentId = llmResult.targetSubAgentId;
isPostApproval = false;
continue;
}

Expand All @@ -78,7 +81,23 @@ async function _agentExecutionWorkflow(payload: AgentExecutionPayload) {
continuationStreamNamespace: continuationNs,
});

const token = `tool-approval:${payload.conversationId}:${workflowRunId}:${toolCall.toolCallId}`;
const hookToolCallId = llmResult.delegatedApproval?.toolCallId ?? toolCall.toolCallId;
const token = `tool-approval:${payload.conversationId}:${workflowRunId}:${hookToolCallId}`;

console.info(
JSON.stringify({
msg: '[agentExecution] Creating tool approval hook',
hookToolCallId,
parentToolCallId: toolCall.toolCallId,
isDelegated: !!llmResult.delegatedApproval,
workflowRunId,
})
);
Comment on lines +87 to +95
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Minor: Use structured logger instead of console.info

Issue: Using console.info(JSON.stringify({...})) bypasses the structured logging infrastructure used consistently throughout the workflow code.

Why: The sibling file agentExecutionSteps.ts uses getLogger('agentExecutionSteps') for consistent log formatting, redaction support, and correlation IDs. Using console.info creates inconsistent log output that won't appear in log aggregation with proper metadata.

Fix: Add import and logger at module scope, then use structured logging:

import { getLogger } from '../../../../logger';
const logger = getLogger('agentExecution');

// Then replace console.info with:
logger.info(
  {
    hookToolCallId,
    parentToolCallId: toolCall.toolCallId,
    isDelegated: !!llmResult.delegatedApproval,
    workflowRunId,
  },
  '[agentExecution] Creating tool approval hook'
);

Refs:


// The hook suspends the workflow until an external system resumes it.
// Unlike the in-process PendingToolApprovalManager (10-min timeout), durable
// hooks persist across restarts. Stale suspended workflows should be cleaned
// up by an external job that queries workflow_executions with status='suspended'.
const hook = toolApprovalHook.create({ token });
const approvalResult = await hook;
approvalRound++;
Expand All @@ -100,8 +119,18 @@ async function _agentExecutionWorkflow(payload: AgentExecutionPayload) {
taskId,
preApproved: approvalResult.approved,
approvalReason: approvalResult.reason,
...(llmResult.delegatedApproval
? {
delegatedApproval: llmResult.delegatedApproval,
delegatedApprovalDecision: {
approved: approvalResult.approved,
reason: approvalResult.reason,
},
}
: {}),
});
}
isPostApproval = true;
continue;
}

Expand Down
Loading