From 9daaef75837aa9150e0a30fd2f6ca249ff606faf Mon Sep 17 00:00:00 2001 From: Henry Date: Fri, 11 Aug 2023 19:06:05 +0100 Subject: [PATCH] removing child mode --- CONTRIBUTING-ZH.md | 121 ++++++------ CONTRIBUTING.md | 3 +- docker/.env.example | 1 - docker/docker-compose.yml | 1 - packages/server/.env.example | 1 - packages/server/README-ZH.md | 59 +++--- packages/server/src/ChildProcess.ts | 253 ------------------------ packages/server/src/Interface.ts | 13 -- packages/server/src/commands/start.ts | 2 - packages/server/src/index.ts | 269 +++++++++----------------- packages/server/src/utils/index.ts | 2 +- 11 files changed, 182 insertions(+), 543 deletions(-) delete mode 100644 packages/server/src/ChildProcess.ts diff --git a/CONTRIBUTING-ZH.md b/CONTRIBUTING-ZH.md index b0176e4..bec081f 100644 --- a/CONTRIBUTING-ZH.md +++ b/CONTRIBUTING-ZH.md @@ -1,22 +1,22 @@ -# 贡献给Flowise +# 贡献给 Flowise -[English](<./CONTRIBUTING.md>) | 中文 +[English](./CONTRIBUTING.md) | 中文 我们欢迎任何形式的贡献。 ## ⭐ 点赞 -点赞并分享[Github仓库](https://github.com/FlowiseAI/Flowise)。 +点赞并分享[Github 仓库](https://github.com/FlowiseAI/Flowise)。 ## 🙋 问题和回答 在[问题和回答](https://github.com/FlowiseAI/Flowise/discussions/categories/q-a)部分搜索任何问题,如果找不到,可以毫不犹豫地创建一个。这可能会帮助到其他有类似问题的人。 -## 🙌 分享Chatflow +## 🙌 分享 Chatflow -是的!分享你如何使用Flowise是一种贡献方式。将你的Chatflow导出为JSON,附上截图并在[展示和分享](https://github.com/FlowiseAI/Flowise/discussions/categories/show-and-tell)部分分享。 +是的!分享你如何使用 Flowise 是一种贡献方式。将你的 Chatflow 导出为 JSON,附上截图并在[展示和分享](https://github.com/FlowiseAI/Flowise/discussions/categories/show-and-tell)部分分享。 ## 💡 想法 @@ -30,75 +30,75 @@ 不确定要贡献什么?一些想法: -- 从Langchain创建新组件 -- 更新现有组件,如扩展功能、修复错误 -- 添加新的Chatflow想法 +- 从 Langchain 创建新组件 +- 更新现有组件,如扩展功能、修复错误 +- 添加新的 Chatflow 想法 ### 开发人员 -Flowise在一个单一的单体存储库中有3个不同的模块。 +Flowise 在一个单一的单体存储库中有 3 个不同的模块。 -- `server`:用于提供API逻辑的Node后端 -- `ui`:React前端 -- `components`:Langchain组件 +- `server`:用于提供 API 逻辑的 Node 后端 +- `ui`:React 前端 +- `components`:Langchain 组件 #### 先决条件 -- 安装 [Yarn v1](https://classic.yarnpkg.com/en/docs/install) - ```bash - npm i -g yarn - ``` +- 安装 [Yarn v1](https://classic.yarnpkg.com/en/docs/install) + ```bash + npm i -g yarn + ``` #### 逐步指南 -1. Fork官方的[Flowise Github 仓库](https://github.com/FlowiseAI/Flowise)。 +1. Fork 官方的[Flowise Github 仓库](https://github.com/FlowiseAI/Flowise)。 -2. 克隆你fork的存储库。 +2. 克隆你 fork 的存储库。 3. 创建一个新的分支,参考[指南](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-and-deleting-branches-within-your-repository)。命名约定: - - 对于功能分支:`feature/<你的新功能>` - - 对于bug修复分支:`bugfix/<你的新bug修复>`。 + - 对于功能分支:`feature/<你的新功能>` + - 对于 bug 修复分支:`bugfix/<你的新bug修复>`。 4. 切换到新创建的分支。 5. 进入存储库文件夹 - ```bash - cd Flowise - ``` + ```bash + cd Flowise + ``` 6. 安装所有模块的依赖项: - ```bash - yarn install - ``` + ```bash + yarn install + ``` 7. 构建所有代码: - ```bash - yarn build - ``` + ```bash + yarn build + ``` 8. 在[http://localhost:3000](http://localhost:3000)上启动应用程序 - ```bash - yarn start - ``` + ```bash + yarn start + ``` 9. 开发时: - - 在`packages/ui`中创建`.env`文件并指定`PORT`(参考`.env.example`) - - 在`packages/server`中创建`.env`文件并指定`PORT`(参考`.env.example`) - - 运行 + - 在`packages/ui`中创建`.env`文件并指定`PORT`(参考`.env.example`) + - 在`packages/server`中创建`.env`文件并指定`PORT`(参考`.env.example`) + - 运行 - ```bash - yarn dev - ``` + ```bash + yarn dev + ``` - 对`packages/ui`或`packages/server`进行的任何更改都将反映在[http://localhost:8080](http://localhost:8080)上 + 对`packages/ui`或`packages/server`进行的任何更改都将反映在[http://localhost:8080](http://localhost:8080)上 - 对于`packages/components`中进行的更改,再次运行`yarn build`以应用更改。 + 对于`packages/components`中进行的更改,再次运行`yarn build`以应用更改。 10. 做完所有的更改后,运行以下命令来确保在生产环境中一切正常: @@ -118,26 +118,25 @@ Flowise在一个单一的单体存储库中有3个不同的模块。 Flowise 支持不同的环境变量来配置您的实例。您可以在 `packages/server` 文件夹中的 `.env` 文件中指定以下变量。阅读[更多信息](https://docs.flowiseai.com/environment-variables) -| 变量名 | 描述 | 类型 | 默认值 | -| -------------------------- | ------------------------------------------------------------ | ------------------------------------------------- | ----------------------------------- | -| PORT | Flowise 运行的 HTTP 端口 | 数字 | 3000 | -| FLOWISE_USERNAME | 登录用户名 | 字符串 | | -| FLOWISE_PASSWORD | 登录密码 | 字符串 | | -| DEBUG | 打印组件的日志 | 布尔值 | | -| LOG_PATH | 存储日志文件的位置 | 字符串 | `your-path/Flowise/logs` | -| LOG_LEVEL | 日志的不同级别 | 枚举字符串: `error`, `info`, `verbose`, `debug` | `info` | -| APIKEY_PATH | 存储 API 密钥的位置 | 字符串 | `your-path/Flowise/packages/server` | -| EXECUTION_MODE | 预测是否在独立进程中运行还是在主进程中运行 | 枚举字符串: `child`, `main` | `main` | -| TOOL_FUNCTION_BUILTIN_DEP | 用于工具函数的 NodeJS 内置模块 | 字符串 | | -| TOOL_FUNCTION_EXTERNAL_DEP | 用于工具函数的外部模块 | 字符串 | | -| OVERRIDE_DATABASE | 是否使用默认值覆盖当前数据库 | 枚举字符串: `true`, `false` | `true` | -| DATABASE_TYPE | 存储 flowise 数据的数据库类型 | 枚举字符串: `sqlite`, `mysql`, `postgres` | `sqlite` | -| DATABASE_PATH | 数据库保存的位置(当 DATABASE_TYPE 是 sqlite 时) | 字符串 | `your-home-dir/.flowise` | -| DATABASE_HOST | 主机 URL 或 IP 地址(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | -| DATABASE_PORT | 数据库端口(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | -| DATABASE_USERNAME | 数据库用户名(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | -| DATABASE_PASSWORD | 数据库密码(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | -| DATABASE_NAME | 数据库名称(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | +| 变量名 | 描述 | 类型 | 默认值 | +| -------------------------- | ------------------------------------------------------ | ----------------------------------------------- | ----------------------------------- | +| PORT | Flowise 运行的 HTTP 端口 | 数字 | 3000 | +| FLOWISE_USERNAME | 登录用户名 | 字符串 | | +| FLOWISE_PASSWORD | 登录密码 | 字符串 | | +| DEBUG | 打印组件的日志 | 布尔值 | | +| LOG_PATH | 存储日志文件的位置 | 字符串 | `your-path/Flowise/logs` | +| LOG_LEVEL | 日志的不同级别 | 枚举字符串: `error`, `info`, `verbose`, `debug` | `info` | +| APIKEY_PATH | 存储 API 密钥的位置 | 字符串 | `your-path/Flowise/packages/server` | +| TOOL_FUNCTION_BUILTIN_DEP | 用于工具函数的 NodeJS 内置模块 | 字符串 | | +| TOOL_FUNCTION_EXTERNAL_DEP | 用于工具函数的外部模块 | 字符串 | | +| OVERRIDE_DATABASE | 是否使用默认值覆盖当前数据库 | 枚举字符串: `true`, `false` | `true` | +| DATABASE_TYPE | 存储 flowise 数据的数据库类型 | 枚举字符串: `sqlite`, `mysql`, `postgres` | `sqlite` | +| DATABASE_PATH | 数据库保存的位置(当 DATABASE_TYPE 是 sqlite 时) | 字符串 | `your-home-dir/.flowise` | +| DATABASE_HOST | 主机 URL 或 IP 地址(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | +| DATABASE_PORT | 数据库端口(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | +| DATABASE_USERNAME | 数据库用户名(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | +| DATABASE_PASSWORD | 数据库密码(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | +| DATABASE_NAME | 数据库名称(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | 您也可以在使用 `npx` 时指定环境变量。例如: @@ -153,4 +152,4 @@ npx flowise start --PORT=3000 --DEBUG=true 当您打开一个 Pull Request 时,FlowiseAI 团队的成员将自动收到通知/指派。您也可以在 [Discord](https://discord.gg/jbaHfsRVBW) 上联系我们。 -## \ No newline at end of file +## diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 74865d4..90ba549 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -2,7 +2,7 @@ # Contributing to Flowise -English | [中文](<./CONTRIBUTING-ZH.md>) +English | [中文](./CONTRIBUTING-ZH.md) We appreciate any form of contributions. @@ -129,7 +129,6 @@ Flowise support different environment variables to configure your instance. You | LOG_PATH | Location where log files are stored | String | `your-path/Flowise/logs` | | LOG_LEVEL | Different levels of logs | Enum String: `error`, `info`, `verbose`, `debug` | `info` | | APIKEY_PATH | Location where api keys are saved | String | `your-path/Flowise/packages/server` | -| EXECUTION_MODE | Whether predictions run in their own process or the main process | Enum String: `child`, `main` | `main` | | TOOL_FUNCTION_BUILTIN_DEP | NodeJS built-in modules to be used for Tool Function | String | | | TOOL_FUNCTION_EXTERNAL_DEP | External modules to be used for Tool Function | String | | | OVERRIDE_DATABASE | Override current database with default | Enum String: `true`, `false` | `true` | diff --git a/docker/.env.example b/docker/.env.example index 66b0910..16b19cd 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -17,7 +17,6 @@ LOG_PATH=/root/.flowise/logs # FLOWISE_PASSWORD=1234 # DEBUG=true # LOG_LEVEL=debug (error | warn | info | verbose | debug) -# EXECUTION_MODE=main (child | main) # TOOL_FUNCTION_BUILTIN_DEP=crypto,fs # TOOL_FUNCTION_EXTERNAL_DEP=moment,lodash diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index e9b2824..4a03bcf 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -15,7 +15,6 @@ services: - SECRETKEY_PATH=${SECRETKEY_PATH} - LOG_LEVEL=${LOG_LEVEL} - LOG_PATH=${LOG_PATH} - - EXECUTION_MODE=${EXECUTION_MODE} ports: - '${PORT}:${PORT}' volumes: diff --git a/packages/server/.env.example b/packages/server/.env.example index 7ab0551..bedbf63 100644 --- a/packages/server/.env.example +++ b/packages/server/.env.example @@ -17,7 +17,6 @@ PASSPHRASE=MYPASSPHRASE # Passphrase used to create encryption key # FLOWISE_PASSWORD=1234 # DEBUG=true # LOG_LEVEL=debug (error | warn | info | verbose | debug) -# EXECUTION_MODE=main (child | main) # TOOL_FUNCTION_BUILTIN_DEP=crypto,fs # TOOL_FUNCTION_EXTERNAL_DEP=moment,lodash diff --git a/packages/server/README-ZH.md b/packages/server/README-ZH.md index d32096d..e58f08b 100644 --- a/packages/server/README-ZH.md +++ b/packages/server/README-ZH.md @@ -1,20 +1,20 @@ -# Flowise - 低代码LLM应用程序构建器 +# Flowise - 低代码 LLM 应用程序构建器 -[English](<./README.md>) | 中文 +[English](./README.md) | 中文 ![Flowise](https://github.com/FlowiseAI/Flowise/blob/main/images/flowise.gif?raw=true) -拖放界面来构建自定义的LLM流程 +拖放界面来构建自定义的 LLM 流程 -## ⚡快速入门 +## ⚡ 快速入门 -1. 安装Flowise +1. 安装 Flowise ```bash npm install -g flowise ``` -2. 启动Flowise +2. 启动 Flowise ```bash npx flowise start @@ -33,28 +33,27 @@ FLOWISE_PASSWORD=1234 ## 🌱 环境变量 -Flowise支持不同的环境变量来配置您的实例。您可以在`packages/server`文件夹中的`.env`文件中指定以下变量。阅读[更多](https://docs.flowiseai.com/environment-variables) - -| 变量 | 描述 | 类型 | 默认值 | -| ---------------- | ---------------------------------------------------------------- | ------------------------------------------------ | ----------------------------------- | -| PORT | Flowise运行的HTTP端口 | 数字 | 3000 | -| FLOWISE_USERNAME | 登录的用户名 | 字符串 | | -| FLOWISE_PASSWORD | 登录的密码 | 字符串 | | -| DEBUG | 打印组件的日志 | 布尔值 | | -| LOG_PATH | 存储日志文件的位置 | 字符串 | `your-path/Flowise/logs` | -| LOG_LEVEL | 日志的不同级别 | 枚举字符串:`error`、`info`、`verbose`、`debug` | `info` | -| APIKEY_PATH | 存储API密钥的位置 | 字符串 | `your-path/Flowise/packages/server` | -| EXECUTION_MODE | 预测是在其自己的进程中运行还是在主进程中运行 | 枚举字符串:`child`、`main` | `main` | -| TOOL_FUNCTION_BUILTIN_DEP | 用于工具函数的NodeJS内置模块 | 字符串 | | -| TOOL_FUNCTION_EXTERNAL_DEP | 用于工具函数的外部模块 | 字符串 | | -| OVERRIDE_DATABASE | 使用默认值覆盖当前数据库 | 枚举字符串:`true`、`false` | `true` | -| DATABASE_TYPE | 存储flowise数据的数据库类型 | 枚举字符串:`sqlite`、`mysql`、`postgres` | `sqlite` | -| DATABASE_PATH | 数据库的保存位置(当DATABASE_TYPE为sqlite时) | 字符串 | `your-home-dir/.flowise` | -| DATABASE_HOST | 主机URL或IP地址(当DATABASE_TYPE不为sqlite时) | 字符串 | | -| DATABASE_PORT | 数据库端口(当DATABASE_TYPE不为sqlite时) | 字符串 | | -| DATABASE_USERNAME | 数据库用户名(当DATABASE_TYPE不为sqlite时) | 字符串 | | -| DATABASE_PASSWORD | 数据库密码(当DATABASE_TYPE不为sqlite时) | 字符串 | | -| DATABASE_NAME | 数据库名称(当DATABASE_TYPE不为sqlite时) | 字符串 | | +Flowise 支持不同的环境变量来配置您的实例。您可以在`packages/server`文件夹中的`.env`文件中指定以下变量。阅读[更多](https://docs.flowiseai.com/environment-variables) + +| 变量 | 描述 | 类型 | 默认值 | +| -------------------------- | ------------------------------------------------------ | ----------------------------------------------- | ----------------------------------- | +| PORT | Flowise 运行的 HTTP 端口 | 数字 | 3000 | +| FLOWISE_USERNAME | 登录的用户名 | 字符串 | | +| FLOWISE_PASSWORD | 登录的密码 | 字符串 | | +| DEBUG | 打印组件的日志 | 布尔值 | | +| LOG_PATH | 存储日志文件的位置 | 字符串 | `your-path/Flowise/logs` | +| LOG_LEVEL | 日志的不同级别 | 枚举字符串:`error`、`info`、`verbose`、`debug` | `info` | +| APIKEY_PATH | 存储 API 密钥的位置 | 字符串 | `your-path/Flowise/packages/server` | +| TOOL_FUNCTION_BUILTIN_DEP | 用于工具函数的 NodeJS 内置模块 | 字符串 | | +| TOOL_FUNCTION_EXTERNAL_DEP | 用于工具函数的外部模块 | 字符串 | | +| OVERRIDE_DATABASE | 使用默认值覆盖当前数据库 | 枚举字符串:`true`、`false` | `true` | +| DATABASE_TYPE | 存储 flowise 数据的数据库类型 | 枚举字符串:`sqlite`、`mysql`、`postgres` | `sqlite` | +| DATABASE_PATH | 数据库的保存位置(当 DATABASE_TYPE 为 sqlite 时) | 字符串 | `your-home-dir/.flowise` | +| DATABASE_HOST | 主机 URL 或 IP 地址(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | | +| DATABASE_PORT | 数据库端口(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | | +| DATABASE_USERNAME | 数据库用户名(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | | +| DATABASE_PASSWORD | 数据库密码(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | | +| DATABASE_NAME | 数据库名称(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | | 您还可以在使用`npx`时指定环境变量。例如: @@ -64,7 +63,7 @@ npx flowise start --PORT=3000 --DEBUG=true ## 📖 文档 -[Flowise文档](https://docs.flowiseai.com/) +[Flowise 文档](https://docs.flowiseai.com/) ## 🌐 自托管 @@ -98,4 +97,4 @@ npx flowise start --PORT=3000 --DEBUG=true ## 📄 许可证 -本仓库中的源代码在[MIT许可证](https://github.com/FlowiseAI/Flowise/blob/master/LICENSE.md)下提供。 \ No newline at end of file +本仓库中的源代码在[MIT 许可证](https://github.com/FlowiseAI/Flowise/blob/master/LICENSE.md)下提供。 diff --git a/packages/server/src/ChildProcess.ts b/packages/server/src/ChildProcess.ts deleted file mode 100644 index 0112f84..0000000 --- a/packages/server/src/ChildProcess.ts +++ /dev/null @@ -1,253 +0,0 @@ -import path from 'path' -import { IChildProcessMessage, IReactFlowNode, IReactFlowObject, IRunChatflowMessageValue, INodeData } from './Interface' -import { - buildLangchain, - checkMemorySessionId, - constructGraphs, - getEndingNode, - getStartingNodes, - getUserHome, - replaceInputsWithConfig, - resolveVariables, - databaseEntities -} from './utils' -import { DataSource } from 'typeorm' -import { ChatFlow } from './entity/ChatFlow' -import { ChatMessage } from './entity/ChatMessage' -import { Tool } from './entity/Tool' -import { Credential } from './entity/Credential' -import logger from './utils/logger' - -export class ChildProcess { - /** - * Stop child process when app is killed - */ - static async stopChildProcess() { - setTimeout(() => { - process.exit(0) - }, 50000) - } - - /** - * Process prediction - * @param {IRunChatflowMessageValue} messageValue - * @return {Promise} - */ - async runChildProcess(messageValue: IRunChatflowMessageValue): Promise { - process.on('SIGTERM', ChildProcess.stopChildProcess) - process.on('SIGINT', ChildProcess.stopChildProcess) - - await sendToParentProcess('start', '_') - - try { - const childAppDataSource = await initDB() - - // Create a Queue and add our initial node in it - const { endingNodeData, chatflow, chatId, incomingInput, componentNodes } = messageValue - - let nodeToExecuteData: INodeData - let addToChatFlowPool: any = {} - - /* Don't rebuild the flow (to avoid duplicated upsert, recomputation) when all these conditions met: - * - Node Data already exists in pool - * - Still in sync (i.e the flow has not been modified since) - * - Existing overrideConfig and new overrideConfig are the same - * - Flow doesn't start with nodes that depend on incomingInput.question - ***/ - if (endingNodeData) { - nodeToExecuteData = endingNodeData - } else { - /*** Get chatflows and prepare data ***/ - const flowData = chatflow.flowData - const parsedFlowData: IReactFlowObject = JSON.parse(flowData) - const nodes = parsedFlowData.nodes - const edges = parsedFlowData.edges - - /*** Get Ending Node with Directed Graph ***/ - const { graph, nodeDependencies } = constructGraphs(nodes, edges) - const directedGraph = graph - const endingNodeId = getEndingNode(nodeDependencies, directedGraph) - if (!endingNodeId) { - await sendToParentProcess('error', `Ending node ${endingNodeId} not found`) - return - } - - const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data - if (!endingNodeData) { - await sendToParentProcess('error', `Ending node ${endingNodeId} data not found`) - return - } - - if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') { - await sendToParentProcess('error', `Ending node must be either a Chain or Agent`) - return - } - - if ( - endingNodeData.outputs && - Object.keys(endingNodeData.outputs).length && - !Object.values(endingNodeData.outputs).includes(endingNodeData.name) - ) { - await sendToParentProcess( - 'error', - `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` - ) - return - } - - /*** Get Starting Nodes with Non-Directed Graph ***/ - const constructedObj = constructGraphs(nodes, edges, true) - const nonDirectedGraph = constructedObj.graph - const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId) - - logger.debug(`[server] [mode:child]: Start building chatflow ${chatflow.id}`) - /*** BFS to traverse from Starting Nodes to Ending Node ***/ - const reactFlowNodes = await buildLangchain( - startingNodeIds, - nodes, - graph, - depthQueue, - componentNodes, - incomingInput.question, - chatId, - childAppDataSource, - incomingInput?.overrideConfig - ) - - const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId) - if (!nodeToExecute) { - await sendToParentProcess('error', `Node ${endingNodeId} not found`) - return - } - - if (incomingInput.overrideConfig) - nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig) - const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question) - nodeToExecuteData = reactFlowNodeData - - const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id)) - addToChatFlowPool = { - chatflowid: chatflow.id, - nodeToExecuteData, - startingNodes, - overrideConfig: incomingInput?.overrideConfig - } - } - - const nodeInstanceFilePath = componentNodes[nodeToExecuteData.name].filePath as string - const nodeModule = await import(nodeInstanceFilePath) - const nodeInstance = new nodeModule.nodeClass() - - logger.debug(`[server] [mode:child]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`) - - if (nodeToExecuteData.instance) checkMemorySessionId(nodeToExecuteData.instance, chatId) - - const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { - chatHistory: incomingInput.history, - appDataSource: childAppDataSource, - databaseEntities - }) - - logger.debug(`[server] [mode:child]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`) - - await sendToParentProcess('finish', { result, addToChatFlowPool }) - } catch (e: any) { - await sendToParentProcess('error', e.message) - logger.error('[server] [mode:child]: Error:', e) - } - } -} - -/** - * Initialize DB in child process - * @returns {DataSource} - */ -async function initDB() { - let childAppDataSource - let homePath - const synchronize = process.env.OVERRIDE_DATABASE === 'false' ? false : true - switch (process.env.DATABASE_TYPE) { - case 'sqlite': - homePath = process.env.DATABASE_PATH ?? path.join(getUserHome(), '.flowise') - childAppDataSource = new DataSource({ - type: 'sqlite', - database: path.resolve(homePath, 'database.sqlite'), - synchronize, - entities: [ChatFlow, ChatMessage, Tool, Credential], - migrations: [] - }) - break - case 'mysql': - childAppDataSource = new DataSource({ - type: 'mysql', - host: process.env.DATABASE_HOST, - port: parseInt(process.env.DATABASE_PORT || '3306'), - username: process.env.DATABASE_USER, - password: process.env.DATABASE_PASSWORD, - database: process.env.DATABASE_NAME, - charset: 'utf8mb4', - synchronize, - entities: [ChatFlow, ChatMessage, Tool, Credential], - migrations: [] - }) - break - case 'postgres': - childAppDataSource = new DataSource({ - type: 'postgres', - host: process.env.DATABASE_HOST, - port: parseInt(process.env.DATABASE_PORT || '5432'), - username: process.env.DATABASE_USER, - password: process.env.DATABASE_PASSWORD, - database: process.env.DATABASE_NAME, - synchronize, - entities: [ChatFlow, ChatMessage, Tool, Credential], - migrations: [] - }) - break - default: - homePath = process.env.DATABASE_PATH ?? path.join(getUserHome(), '.flowise') - childAppDataSource = new DataSource({ - type: 'sqlite', - database: path.resolve(homePath, 'database.sqlite'), - synchronize, - entities: [ChatFlow, ChatMessage, Tool, Credential], - migrations: [] - }) - break - } - - return await childAppDataSource.initialize() -} - -/** - * Send data back to parent process - * @param {string} key Key of message - * @param {*} value Value of message - * @returns {Promise} - */ -async function sendToParentProcess(key: string, value: any): Promise { - // tslint:disable-line:no-any - return new Promise((resolve, reject) => { - process.send!( - { - key, - value - }, - (error: Error) => { - if (error) { - return reject(error) - } - resolve() - } - ) - }) -} - -const childProcess = new ChildProcess() - -process.on('message', async (message: IChildProcessMessage) => { - if (message.key === 'start') { - await childProcess.runChildProcess(message.value) - process.exit() - } -}) diff --git a/packages/server/src/Interface.ts b/packages/server/src/Interface.ts index 8feb427..92e3054 100644 --- a/packages/server/src/Interface.ts +++ b/packages/server/src/Interface.ts @@ -169,19 +169,6 @@ export interface IDatabaseExport { apikeys: ICommonObject[] } -export interface IRunChatflowMessageValue { - chatflow: IChatFlow - chatId: string - incomingInput: IncomingInput - componentNodes: IComponentNodes - endingNodeData?: INodeData -} - -export interface IChildProcessMessage { - key: string - value?: any -} - export type ICredentialDataDecrypted = ICommonObject // Plain credential object sent to server diff --git a/packages/server/src/commands/start.ts b/packages/server/src/commands/start.ts index 71459d1..4b58ae7 100644 --- a/packages/server/src/commands/start.ts +++ b/packages/server/src/commands/start.ts @@ -25,7 +25,6 @@ export default class Start extends Command { SECRETKEY_PATH: Flags.string(), LOG_PATH: Flags.string(), LOG_LEVEL: Flags.string(), - EXECUTION_MODE: Flags.string(), TOOL_FUNCTION_BUILTIN_DEP: Flags.string(), TOOL_FUNCTION_EXTERNAL_DEP: Flags.string(), OVERRIDE_DATABASE: Flags.string(), @@ -73,7 +72,6 @@ export default class Start extends Command { const { flags } = await this.parse(Start) if (flags.PORT) process.env.PORT = flags.PORT - if (flags.EXECUTION_MODE) process.env.EXECUTION_MODE = flags.EXECUTION_MODE if (flags.DEBUG) process.env.DEBUG = flags.DEBUG // Authorization diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 803dd17..b728914 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -16,8 +16,6 @@ import { IReactFlowObject, INodeData, IDatabaseExport, - IRunChatflowMessageValue, - IChildProcessMessage, ICredentialReturnResponse } from './Interface' import { @@ -57,7 +55,6 @@ import { Credential } from './entity/Credential' import { Tool } from './entity/Tool' import { ChatflowPool } from './ChatflowPool' import { ICommonObject, INodeOptionsValue } from 'flowise-components' -import { fork } from 'child_process' export class App { app: express.Application @@ -764,68 +761,6 @@ export class App { } } - /** - * Start child process - * @param {ChatFlow} chatflow - * @param {IncomingInput} incomingInput - * @param {INodeData} endingNodeData - */ - async startChildProcess(chatflow: ChatFlow, chatId: string, incomingInput: IncomingInput, endingNodeData?: INodeData) { - try { - const controller = new AbortController() - const { signal } = controller - - let childpath = path.join(__dirname, '..', 'dist', 'ChildProcess.js') - if (!fs.existsSync(childpath)) childpath = 'ChildProcess.ts' - - const childProcess = fork(childpath, [], { signal }) - - const value = { - chatflow, - chatId, - incomingInput, - componentNodes: cloneDeep(this.nodesPool.componentNodes), - endingNodeData - } as IRunChatflowMessageValue - childProcess.send({ key: 'start', value } as IChildProcessMessage) - - let childProcessTimeout: NodeJS.Timeout - - return new Promise((resolve, reject) => { - childProcess.on('message', async (message: IChildProcessMessage) => { - if (message.key === 'finish') { - const { result, addToChatFlowPool } = message.value as ICommonObject - if (childProcessTimeout) { - clearTimeout(childProcessTimeout) - } - if (Object.keys(addToChatFlowPool).length) { - const { chatflowid, nodeToExecuteData, startingNodes, overrideConfig } = addToChatFlowPool - this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, overrideConfig) - } - resolve(result) - } - if (message.key === 'start') { - if (process.env.EXECUTION_TIMEOUT) { - childProcessTimeout = setTimeout(async () => { - childProcess.kill() - resolve(undefined) - }, parseInt(process.env.EXECUTION_TIMEOUT, 10)) - } - } - if (message.key === 'error') { - let errMessage = message.value as string - if (childProcessTimeout) { - clearTimeout(childProcessTimeout) - } - reject(errMessage) - } - }) - }) - } catch (err) { - logger.error('[server] [mode:child]: Error:', err) - } - } - /** * Process Prediction * @param {Request} req @@ -895,126 +830,104 @@ export class App { ) } - if (process.env.EXECUTION_MODE === 'child') { - if (isFlowReusable()) { - nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData - logger.debug( - `[server] [mode:child]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})` - ) - try { - const result = await this.startChildProcess(chatflow, chatId, incomingInput, nodeToExecuteData) - return res.json(result) - } catch (error) { - return res.status(500).send(error) - } - } else { - try { - const result = await this.startChildProcess(chatflow, chatId, incomingInput) - return res.json(result) - } catch (error) { - return res.status(500).send(error) - } - } - } else { - /*** Get chatflows and prepare data ***/ - const flowData = chatflow.flowData - const parsedFlowData: IReactFlowObject = JSON.parse(flowData) - const nodes = parsedFlowData.nodes - const edges = parsedFlowData.edges - - if (isFlowReusable()) { - nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData - isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData) - logger.debug( - `[server]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})` - ) - } else { - /*** Get Ending Node with Directed Graph ***/ - const { graph, nodeDependencies } = constructGraphs(nodes, edges) - const directedGraph = graph - const endingNodeId = getEndingNode(nodeDependencies, directedGraph) - if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`) + /*** Get chatflows and prepare data ***/ + const flowData = chatflow.flowData + const parsedFlowData: IReactFlowObject = JSON.parse(flowData) + const nodes = parsedFlowData.nodes + const edges = parsedFlowData.edges - const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data - if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`) + if (isFlowReusable()) { + nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData + isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData) + logger.debug( + `[server]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})` + ) + } else { + /*** Get Ending Node with Directed Graph ***/ + const { graph, nodeDependencies } = constructGraphs(nodes, edges) + const directedGraph = graph + const endingNodeId = getEndingNode(nodeDependencies, directedGraph) + if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`) - if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') { - return res.status(500).send(`Ending node must be either a Chain or Agent`) - } + const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data + if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`) - if ( - endingNodeData.outputs && - Object.keys(endingNodeData.outputs).length && - !Object.values(endingNodeData.outputs).includes(endingNodeData.name) - ) { - return res - .status(500) - .send( - `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` - ) - } + if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') { + return res.status(500).send(`Ending node must be either a Chain or Agent`) + } - isStreamValid = isFlowValidForStream(nodes, endingNodeData) - - /*** Get Starting Nodes with Non-Directed Graph ***/ - const constructedObj = constructGraphs(nodes, edges, true) - const nonDirectedGraph = constructedObj.graph - const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId) - - logger.debug(`[server]: Start building chatflow ${chatflowid}`) - /*** BFS to traverse from Starting Nodes to Ending Node ***/ - const reactFlowNodes = await buildLangchain( - startingNodeIds, - nodes, - graph, - depthQueue, - this.nodesPool.componentNodes, - incomingInput.question, - chatId, - this.AppDataSource, - incomingInput?.overrideConfig - ) - - const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId) - if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`) - - if (incomingInput.overrideConfig) - nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig) - const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question) - nodeToExecuteData = reactFlowNodeData - - const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id)) - this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig) + if ( + endingNodeData.outputs && + Object.keys(endingNodeData.outputs).length && + !Object.values(endingNodeData.outputs).includes(endingNodeData.name) + ) { + return res + .status(500) + .send( + `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` + ) } - const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string - const nodeModule = await import(nodeInstanceFilePath) - const nodeInstance = new nodeModule.nodeClass() - - isStreamValid = isStreamValid && !isVectorStoreFaiss(nodeToExecuteData) - logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`) - - if (nodeToExecuteData.instance) checkMemorySessionId(nodeToExecuteData.instance, chatId) - - const result = isStreamValid - ? await nodeInstance.run(nodeToExecuteData, incomingInput.question, { - chatHistory: incomingInput.history, - socketIO, - socketIOClientId: incomingInput.socketIOClientId, - logger, - appDataSource: this.AppDataSource, - databaseEntities - }) - : await nodeInstance.run(nodeToExecuteData, incomingInput.question, { - chatHistory: incomingInput.history, - logger, - appDataSource: this.AppDataSource, - databaseEntities - }) - - logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`) - return res.json(result) + isStreamValid = isFlowValidForStream(nodes, endingNodeData) + + /*** Get Starting Nodes with Non-Directed Graph ***/ + const constructedObj = constructGraphs(nodes, edges, true) + const nonDirectedGraph = constructedObj.graph + const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId) + + logger.debug(`[server]: Start building chatflow ${chatflowid}`) + /*** BFS to traverse from Starting Nodes to Ending Node ***/ + const reactFlowNodes = await buildLangchain( + startingNodeIds, + nodes, + graph, + depthQueue, + this.nodesPool.componentNodes, + incomingInput.question, + chatId, + this.AppDataSource, + incomingInput?.overrideConfig + ) + + const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId) + if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`) + + if (incomingInput.overrideConfig) + nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig) + const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question) + nodeToExecuteData = reactFlowNodeData + + const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id)) + this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig) } + + const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string + const nodeModule = await import(nodeInstanceFilePath) + const nodeInstance = new nodeModule.nodeClass() + + isStreamValid = isStreamValid && !isVectorStoreFaiss(nodeToExecuteData) + logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`) + + if (nodeToExecuteData.instance) checkMemorySessionId(nodeToExecuteData.instance, chatId) + + const result = isStreamValid + ? await nodeInstance.run(nodeToExecuteData, incomingInput.question, { + chatHistory: incomingInput.history, + socketIO, + socketIOClientId: incomingInput.socketIOClientId, + logger, + appDataSource: this.AppDataSource, + databaseEntities + }) + : await nodeInstance.run(nodeToExecuteData, incomingInput.question, { + chatHistory: incomingInput.history, + logger, + appDataSource: this.AppDataSource, + databaseEntities + }) + + logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`) + return res.json(result) } catch (e: any) { logger.error('[server]: Error:', e) return res.status(500).send(e.message) diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index 12dd654..cf4a9c3 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -791,7 +791,7 @@ export const isFlowValidForStream = (reactFlowNodes: IReactFlowNode[], endingNod isValidChainOrAgent = whitelistAgents.includes(endingNodeData.name) } - return isChatOrLLMsExist && isValidChainOrAgent && !isVectorStoreFaiss(endingNodeData) && process.env.EXECUTION_MODE !== 'child' + return isChatOrLLMsExist && isValidChainOrAgent && !isVectorStoreFaiss(endingNodeData) } /**