Skip to content

Commit c572c06

Browse files
netroyOlegIvaniv
andauthored
fix(core): Fix support for multiple invocation of AI tools (#12141)
Co-authored-by: Oleg Ivaniv <[email protected]>
1 parent f4c2523 commit c572c06

10 files changed

+604
-225
lines changed

Diff for: packages/core/src/CreateNodeAsTool.ts

+33-40
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import { DynamicStructuredTool } from '@langchain/core/tools';
22
import type {
33
IExecuteFunctions,
4+
INode,
45
INodeParameters,
56
INodeType,
67
ISupplyDataFunctions,
8+
ITaskDataConnections,
79
} from 'n8n-workflow';
810
import { jsonParse, NodeConnectionType, NodeOperationError } from 'n8n-workflow';
911
import { z } from 'zod';
@@ -16,22 +18,25 @@ interface FromAIArgument {
1618
defaultValue?: string | number | boolean | Record<string, unknown>;
1719
}
1820

21+
type ParserOptions = {
22+
node: INode;
23+
nodeType: INodeType;
24+
contextFactory: (runIndex: number, inputData: ITaskDataConnections) => ISupplyDataFunctions;
25+
};
26+
1927
/**
2028
* AIParametersParser
2129
*
2230
* This class encapsulates the logic for parsing node parameters, extracting $fromAI calls,
2331
* generating Zod schemas, and creating LangChain tools.
2432
*/
2533
class AIParametersParser {
26-
private ctx: ISupplyDataFunctions;
34+
private runIndex = 0;
2735

2836
/**
2937
* Constructs an instance of AIParametersParser.
30-
* @param ctx The execution context.
3138
*/
32-
constructor(ctx: ISupplyDataFunctions) {
33-
this.ctx = ctx;
34-
}
39+
constructor(private readonly options: ParserOptions) {}
3540

3641
/**
3742
* Generates a Zod schema based on the provided FromAIArgument placeholder.
@@ -162,14 +167,14 @@ class AIParametersParser {
162167
} catch (error) {
163168
// If parsing fails, throw an ApplicationError with details
164169
throw new NodeOperationError(
165-
this.ctx.getNode(),
170+
this.options.node,
166171
`Failed to parse $fromAI arguments: ${argsString}: ${error}`,
167172
);
168173
}
169174
} else {
170175
// Log an error if parentheses are unbalanced
171176
throw new NodeOperationError(
172-
this.ctx.getNode(),
177+
this.options.node,
173178
`Unbalanced parentheses while parsing $fromAI call: ${str.slice(startIndex)}`,
174179
);
175180
}
@@ -254,7 +259,7 @@ class AIParametersParser {
254259
const type = cleanArgs?.[2] || 'string';
255260

256261
if (!['string', 'number', 'boolean', 'json'].includes(type.toLowerCase())) {
257-
throw new NodeOperationError(this.ctx.getNode(), `Invalid type: ${type}`);
262+
throw new NodeOperationError(this.options.node, `Invalid type: ${type}`);
258263
}
259264

260265
return {
@@ -315,13 +320,12 @@ class AIParametersParser {
315320

316321
/**
317322
* Creates a DynamicStructuredTool from a node.
318-
* @param node The node type.
319-
* @param nodeParameters The parameters of the node.
320323
* @returns A DynamicStructuredTool instance.
321324
*/
322-
public createTool(node: INodeType, nodeParameters: INodeParameters): DynamicStructuredTool {
325+
public createTool(): DynamicStructuredTool {
326+
const { node, nodeType } = this.options;
323327
const collectedArguments: FromAIArgument[] = [];
324-
this.traverseNodeParameters(nodeParameters, collectedArguments);
328+
this.traverseNodeParameters(node.parameters, collectedArguments);
325329

326330
// Validate each collected argument
327331
const nameValidationRegex = /^[a-zA-Z0-9_-]{1,64}$/;
@@ -331,7 +335,7 @@ class AIParametersParser {
331335
const isEmptyError = 'You must specify a key when using $fromAI()';
332336
const isInvalidError = `Parameter key \`${argument.key}\` is invalid`;
333337
const error = new Error(argument.key.length === 0 ? isEmptyError : isInvalidError);
334-
throw new NodeOperationError(this.ctx.getNode(), error, {
338+
throw new NodeOperationError(node, error, {
335339
description:
336340
'Invalid parameter key, must be between 1 and 64 characters long and only contain letters, numbers, underscores, and hyphens',
337341
});
@@ -348,7 +352,7 @@ class AIParametersParser {
348352
) {
349353
// If not, throw an error for inconsistent duplicate keys
350354
throw new NodeOperationError(
351-
this.ctx.getNode(),
355+
node,
352356
`Duplicate key '${argument.key}' found with different description or type`,
353357
{
354358
description:
@@ -378,37 +382,38 @@ class AIParametersParser {
378382
}, {});
379383

380384
const schema = z.object(schemaObj).required();
381-
const description = this.getDescription(node, nodeParameters);
382-
const nodeName = this.ctx.getNode().name.replace(/ /g, '_');
383-
const name = nodeName || node.description.name;
385+
const description = this.getDescription(nodeType, node.parameters);
386+
const nodeName = node.name.replace(/ /g, '_');
387+
const name = nodeName || nodeType.description.name;
384388

385389
const tool = new DynamicStructuredTool({
386390
name,
387391
description,
388392
schema,
389-
func: async (functionArgs: z.infer<typeof schema>) => {
390-
const { index } = this.ctx.addInputData(NodeConnectionType.AiTool, [
391-
[{ json: functionArgs }],
392-
]);
393+
func: async (toolArgs: z.infer<typeof schema>) => {
394+
const context = this.options.contextFactory(this.runIndex, {});
395+
context.addInputData(NodeConnectionType.AiTool, [[{ json: toolArgs }]]);
393396

394397
try {
395398
// Execute the node with the proxied context
396-
const result = await node.execute?.bind(this.ctx as IExecuteFunctions)();
399+
const result = await nodeType.execute?.call(context as IExecuteFunctions);
397400

398401
// Process and map the results
399402
const mappedResults = result?.[0]?.flatMap((item) => item.json);
400403

401404
// Add output data to the context
402-
this.ctx.addOutputData(NodeConnectionType.AiTool, index, [
405+
context.addOutputData(NodeConnectionType.AiTool, this.runIndex, [
403406
[{ json: { response: mappedResults } }],
404407
]);
405408

406409
// Return the stringified results
407410
return JSON.stringify(mappedResults);
408411
} catch (error) {
409-
const nodeError = new NodeOperationError(this.ctx.getNode(), error as Error);
410-
this.ctx.addOutputData(NodeConnectionType.AiTool, index, nodeError);
412+
const nodeError = new NodeOperationError(this.options.node, error as Error);
413+
context.addOutputData(NodeConnectionType.AiTool, this.runIndex, nodeError);
411414
return 'Error during node execution: ' + nodeError.description;
415+
} finally {
416+
this.runIndex++;
412417
}
413418
},
414419
});
@@ -421,20 +426,8 @@ class AIParametersParser {
421426
* Converts node into LangChain tool by analyzing node parameters,
422427
* identifying placeholders using the $fromAI function, and generating a Zod schema. It then creates
423428
* a DynamicStructuredTool that can be used in LangChain workflows.
424-
*
425-
* @param ctx The execution context.
426-
* @param node The node type.
427-
* @param nodeParameters The parameters of the node.
428-
* @returns An object containing the DynamicStructuredTool instance.
429429
*/
430-
export function createNodeAsTool(
431-
ctx: ISupplyDataFunctions,
432-
node: INodeType,
433-
nodeParameters: INodeParameters,
434-
) {
435-
const parser = new AIParametersParser(ctx);
436-
437-
return {
438-
response: parser.createTool(node, nodeParameters),
439-
};
430+
export function createNodeAsTool(options: ParserOptions) {
431+
const parser = new AIParametersParser(options);
432+
return { response: parser.createTool() };
440433
}

Diff for: packages/core/src/NodeExecuteFunctions.ts

+74-67
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ import type {
7777
DeduplicationScope,
7878
DeduplicationItemTypes,
7979
ICheckProcessedContextData,
80-
ISupplyDataFunctions,
8180
WebhookType,
8281
SchedulingFunctions,
82+
SupplyData,
8383
} from 'n8n-workflow';
8484
import {
8585
NodeConnectionType,
@@ -2023,9 +2023,9 @@ export async function getInputConnectionData(
20232023
this: IAllExecuteFunctions,
20242024
workflow: Workflow,
20252025
runExecutionData: IRunExecutionData,
2026-
runIndex: number,
2026+
parentRunIndex: number,
20272027
connectionInputData: INodeExecutionData[],
2028-
inputData: ITaskDataConnections,
2028+
parentInputData: ITaskDataConnections,
20292029
additionalData: IWorkflowExecuteAdditionalData,
20302030
executeData: IExecuteData,
20312031
mode: WorkflowExecuteMode,
@@ -2034,10 +2034,13 @@ export async function getInputConnectionData(
20342034
itemIndex: number,
20352035
abortSignal?: AbortSignal,
20362036
): Promise<unknown> {
2037-
const node = this.getNode();
2038-
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
2037+
const parentNode = this.getNode();
2038+
const parentNodeType = workflow.nodeTypes.getByNameAndVersion(
2039+
parentNode.type,
2040+
parentNode.typeVersion,
2041+
);
20392042

2040-
const inputs = NodeHelpers.getNodeInputs(workflow, node, nodeType.description);
2043+
const inputs = NodeHelpers.getNodeInputs(workflow, parentNode, parentNodeType.description);
20412044

20422045
let inputConfiguration = inputs.find((input) => {
20432046
if (typeof input === 'string') {
@@ -2048,7 +2051,7 @@ export async function getInputConnectionData(
20482051

20492052
if (inputConfiguration === undefined) {
20502053
throw new ApplicationError('Node does not have input of type', {
2051-
extra: { nodeName: node.name, connectionType },
2054+
extra: { nodeName: parentNode.name, connectionType },
20522055
});
20532056
}
20542057

@@ -2059,14 +2062,14 @@ export async function getInputConnectionData(
20592062
}
20602063

20612064
const connectedNodes = workflow
2062-
.getParentNodes(node.name, connectionType, 1)
2065+
.getParentNodes(parentNode.name, connectionType, 1)
20632066
.map((nodeName) => workflow.getNode(nodeName) as INode)
20642067
.filter((connectedNode) => connectedNode.disabled !== true);
20652068

20662069
if (connectedNodes.length === 0) {
20672070
if (inputConfiguration.required) {
20682071
throw new NodeOperationError(
2069-
node,
2072+
parentNode,
20702073
`A ${inputConfiguration?.displayName ?? connectionType} sub-node must be connected and enabled`,
20712074
);
20722075
}
@@ -2078,82 +2081,86 @@ export async function getInputConnectionData(
20782081
connectedNodes.length > inputConfiguration.maxConnections
20792082
) {
20802083
throw new NodeOperationError(
2081-
node,
2084+
parentNode,
20822085
`Only ${inputConfiguration.maxConnections} ${connectionType} sub-nodes are/is allowed to be connected`,
20832086
);
20842087
}
20852088

2086-
const constParentNodes = connectedNodes.map(async (connectedNode) => {
2087-
const nodeType = workflow.nodeTypes.getByNameAndVersion(
2089+
const nodes: SupplyData[] = [];
2090+
for (const connectedNode of connectedNodes) {
2091+
const connectedNodeType = workflow.nodeTypes.getByNameAndVersion(
20882092
connectedNode.type,
20892093
connectedNode.typeVersion,
20902094
);
2091-
const context = new SupplyDataContext(
2092-
workflow,
2093-
connectedNode,
2094-
additionalData,
2095-
mode,
2096-
runExecutionData,
2097-
runIndex,
2098-
connectionInputData,
2099-
inputData,
2100-
executeData,
2101-
closeFunctions,
2102-
abortSignal,
2103-
);
2095+
const contextFactory = (runIndex: number, inputData: ITaskDataConnections) =>
2096+
new SupplyDataContext(
2097+
workflow,
2098+
connectedNode,
2099+
additionalData,
2100+
mode,
2101+
runExecutionData,
2102+
runIndex,
2103+
connectionInputData,
2104+
inputData,
2105+
connectionType,
2106+
executeData,
2107+
closeFunctions,
2108+
abortSignal,
2109+
);
21042110

2105-
if (!nodeType.supplyData) {
2106-
if (nodeType.description.outputs.includes(NodeConnectionType.AiTool)) {
2107-
nodeType.supplyData = async function (this: ISupplyDataFunctions) {
2108-
return createNodeAsTool(this, nodeType, this.getNode().parameters);
2109-
};
2111+
if (!connectedNodeType.supplyData) {
2112+
if (connectedNodeType.description.outputs.includes(NodeConnectionType.AiTool)) {
2113+
const supplyData = createNodeAsTool({
2114+
node: connectedNode,
2115+
nodeType: connectedNodeType,
2116+
contextFactory,
2117+
});
2118+
nodes.push(supplyData);
21102119
} else {
21112120
throw new ApplicationError('Node does not have a `supplyData` method defined', {
21122121
extra: { nodeName: connectedNode.name },
21132122
});
21142123
}
2115-
}
2124+
} else {
2125+
const context = contextFactory(parentRunIndex, parentInputData);
2126+
try {
2127+
const supplyData = await connectedNodeType.supplyData.call(context, itemIndex);
2128+
if (supplyData.closeFunction) {
2129+
closeFunctions.push(supplyData.closeFunction);
2130+
}
2131+
nodes.push(supplyData);
2132+
} catch (error) {
2133+
// Propagate errors from sub-nodes
2134+
if (error.functionality === 'configuration-node') throw error;
2135+
if (!(error instanceof ExecutionBaseError)) {
2136+
error = new NodeOperationError(connectedNode, error, {
2137+
itemIndex,
2138+
});
2139+
}
21162140

2117-
try {
2118-
const response = await nodeType.supplyData.call(context, itemIndex);
2119-
if (response.closeFunction) {
2120-
closeFunctions.push(response.closeFunction);
2121-
}
2122-
return response;
2123-
} catch (error) {
2124-
// Propagate errors from sub-nodes
2125-
if (error.functionality === 'configuration-node') throw error;
2126-
if (!(error instanceof ExecutionBaseError)) {
2127-
error = new NodeOperationError(connectedNode, error, {
2141+
let currentNodeRunIndex = 0;
2142+
if (runExecutionData.resultData.runData.hasOwnProperty(parentNode.name)) {
2143+
currentNodeRunIndex = runExecutionData.resultData.runData[parentNode.name].length;
2144+
}
2145+
2146+
// Display the error on the node which is causing it
2147+
await context.addExecutionDataFunctions(
2148+
'input',
2149+
error,
2150+
connectionType,
2151+
parentNode.name,
2152+
currentNodeRunIndex,
2153+
);
2154+
2155+
// Display on the calling node which node has the error
2156+
throw new NodeOperationError(connectedNode, `Error in sub-node ${connectedNode.name}`, {
21282157
itemIndex,
2158+
functionality: 'configuration-node',
2159+
description: error.message,
21292160
});
21302161
}
2131-
2132-
let currentNodeRunIndex = 0;
2133-
if (runExecutionData.resultData.runData.hasOwnProperty(node.name)) {
2134-
currentNodeRunIndex = runExecutionData.resultData.runData[node.name].length;
2135-
}
2136-
2137-
// Display the error on the node which is causing it
2138-
await context.addExecutionDataFunctions(
2139-
'input',
2140-
error,
2141-
connectionType,
2142-
node.name,
2143-
currentNodeRunIndex,
2144-
);
2145-
2146-
// Display on the calling node which node has the error
2147-
throw new NodeOperationError(connectedNode, `Error in sub-node ${connectedNode.name}`, {
2148-
itemIndex,
2149-
functionality: 'configuration-node',
2150-
description: error.message,
2151-
});
21522162
}
2153-
});
2154-
2155-
// Validate the inputs
2156-
const nodes = await Promise.all(constParentNodes);
2163+
}
21572164

21582165
return inputConfiguration.maxConnections === 1
21592166
? (nodes || [])[0]?.response

Diff for: packages/core/src/node-execution-context/__tests__/supply-data-context.test.ts

+1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ describe('SupplyDataContext', () => {
7272
runIndex,
7373
connectionInputData,
7474
inputData,
75+
connectionType,
7576
executeData,
7677
[closeFn],
7778
abortSignal,

0 commit comments

Comments
 (0)