diff --git a/apps/api/src/workflow-triggers/controllers/chatbot.controller.ts b/apps/api/src/workflow-triggers/controllers/chatbot.controller.ts new file mode 100644 index 00000000..9057e68e --- /dev/null +++ b/apps/api/src/workflow-triggers/controllers/chatbot.controller.ts @@ -0,0 +1,244 @@ +import { XmtpMessageOutput } from '@app/definitions/integration-definitions/xmtp/xmtp.common' +import { BadRequestException, Body, Controller, Logger, Post, Req, UnauthorizedException } from '@nestjs/common' +import { Request } from 'express' +import { uniq } from 'lodash' +import { ObjectId } from 'mongodb' +import { Types } from 'mongoose' +import { RunnerService } from '../../../../runner/src/services/runner.service' +import { ContactService } from '../../contacts/services/contact.service' +import { IntegrationTrigger } from '../../integration-triggers/entities/integration-trigger' +import { IntegrationTriggerService } from '../../integration-triggers/services/integration-trigger.service' +import { Integration } from '../../integrations/entities/integration' +import { IntegrationService } from '../../integrations/services/integration.service' +import { WorkflowActionService } from '../../workflow-actions/services/workflow-action.service' +import { WorkflowRunStatus } from '../../workflow-runs/entities/workflow-run-status' +import { WorkflowSleep } from '../../workflow-runs/entities/workflow-sleep' +import { WorkflowRunService } from '../../workflow-runs/services/workflow-run.service' +import { WorkflowSleepService } from '../../workflow-runs/services/workflow-sleep.service' +import { Workflow } from '../../workflows/entities/workflow' +import { WorkflowService } from '../../workflows/services/workflow.service' +import { WorkflowTrigger } from '../entities/workflow-trigger' +import { WorkflowTriggerService } from '../services/workflow-trigger.service' +import { WorkflowUsedIdService } from '../services/workflow-used-id.service' + +@Controller('/chatbots') +export class ChatbotController { + private readonly logger = new Logger(ChatbotController.name) + + private chatbotIntegration: Integration + private chatbotIntegrationTrigger: IntegrationTrigger + private xmtpIntegration: Integration + private xmtpIntegrationTrigger: IntegrationTrigger + + constructor( + private readonly integrationService: IntegrationService, + private readonly integrationTriggerService: IntegrationTriggerService, + private readonly workflowService: WorkflowService, + private readonly workflowTriggerService: WorkflowTriggerService, + private readonly workflowActionService: WorkflowActionService, + private readonly workflowRunService: WorkflowRunService, + private readonly runnerService: RunnerService, + private workflowUsedIdService: WorkflowUsedIdService, + private workflowSleepService: WorkflowSleepService, + private contactService: ContactService, + ) {} + + async onModuleInit() { + this.chatbotIntegration = (await this.integrationService.findOne({ key: 'chatbot' })) as Integration + this.chatbotIntegrationTrigger = (await this.integrationTriggerService.findOne({ + integration: this.chatbotIntegration._id, + key: 'newChatbotMessage', + })) as IntegrationTrigger + + this.xmtpIntegration = (await this.integrationService.findOne({ key: 'xmtp' })) as Integration + this.xmtpIntegrationTrigger = (await this.integrationTriggerService.findOne({ + integration: this.xmtpIntegration._id, + key: 'newMessage', + })) as IntegrationTrigger + } + + @Post('/') + async received(@Body() body: Record, @Req() req: Request) { + if (req.headers?.authorization !== process.env.CHATBOT_SECRET) { + throw new UnauthorizedException() + } + if (!body.user || !body.message) { + throw new BadRequestException() + } + + const chatbotWorkflowTriggers = await this.workflowTriggerService.find({ + owner: new ObjectId(body.user), + integrationTrigger: this.chatbotIntegrationTrigger._id, + enabled: true, + planLimited: { $ne: true }, + }) + const chatbotPromises = chatbotWorkflowTriggers.map(async (workflowTrigger) => + this.processChatbotMessage(body.message, workflowTrigger), + ) + await Promise.all(chatbotPromises) + + const xmtpWorkflowTriggers = await this.workflowTriggerService.find({ + owner: new ObjectId(body.user), + integrationTrigger: this.xmtpIntegrationTrigger._id, + enabled: true, + planLimited: { $ne: true }, + }) + const xmtpPromises = xmtpWorkflowTriggers.map(async (workflowTrigger) => + this.processXmtpMessage(body.message, workflowTrigger), + ) + await Promise.all(xmtpPromises) + + return { ok: true } + } + + async processChatbotMessage(message: XmtpMessageOutput, workflowTrigger: WorkflowTrigger) { + await this.workflowUsedIdService.createOne({ + workflow: workflowTrigger.workflow, + triggerId: message.id, + }) + + const workflow = await this.workflowService.findOne({ _id: workflowTrigger.workflow }) + if (!workflow) { + return + } + + this.logger.log(`Processing chatbot message: ${message.id} for workflow: ${workflow._id}`) + + const workflowSleeps = await this.workflowSleepService.find({ + workflow: workflowTrigger.workflow, + uniqueGroup: message.conversation.id, + }) + + // continue previous conversation + if (workflowSleeps.length > 0) { + void this.continueConversation(workflow, workflowTrigger, workflowSleeps, message) + return + } + + const tags = workflowTrigger.inputs?.tags?.split(',').map((tag) => tag.trim()) ?? [] + const contact = await this.contactService.findOne({ + owner: workflow.owner, + address: message.senderAddress, + }) + if (!contact) { + await this.contactService.createOne({ + owner: workflow.owner, + address: message.senderAddress, + tags, + }) + } else if (workflowTrigger.inputs?.tags) { + const newTags = uniq([...contact.tags, ...tags]) + if (newTags.length !== contact.tags.length) { + await this.contactService.updateById(contact._id, { + tags: contact.tags, + }) + } + } + + const hookTriggerOutputs = { + id: message.id, + outputs: { + [workflowTrigger.id]: message as Record, + trigger: message as Record, + contact: { + address: message.senderAddress, + }, + }, + } + const rootActions = await this.workflowActionService.find({ workflow: workflow._id, isRootAction: true }) + const workflowRun = await this.workflowRunService.createOneByInstantTrigger( + this.chatbotIntegration, + this.chatbotIntegrationTrigger, + workflow, + workflowTrigger, + rootActions.length > 0, + ) + await this.workflowTriggerService.updateById(workflowTrigger._id, { + lastId: message.id, + lastItem: message, + }) + void this.runnerService.runWorkflowActions(rootActions, [hookTriggerOutputs], workflowRun) + } + + async continueConversation( + workflow: Workflow, + workflowTrigger: WorkflowTrigger, + workflowSleeps: WorkflowSleep[], + outputs: XmtpMessageOutput, + ) { + const workflowSleep = workflowSleeps[0] + + // clean up + await this.workflowSleepService.deleteManyNative({ + _id: { + $in: workflowSleeps.map((workflowSleep) => workflowSleep._id), + }, + }) + + this.logger.log(`Continuing chatbot conversation ${workflowSleep.id} for workflow ${workflowTrigger.workflow}`) + + const workflowAction = await this.workflowActionService.findById(workflowSleep.workflowAction.toString()) + const workflowRun = await this.workflowRunService.findById(workflowSleep.workflowRun.toString()) + + if (!workflowAction || !workflowRun) { + this.logger.error(`Missing workflow action or workflow run for workflow sleep ${workflowSleep.id}`) + await this.workflowRunService.updateById(workflowSleep._id, { status: WorkflowRunStatus.failed }) + return + } + + await this.workflowRunService.wakeUpWorkflowRun(workflowRun) + const nextActionInputs = { + ...(workflowSleep.nextActionInputs ?? {}), + [workflowAction.id]: { + ...((workflowSleep.nextActionInputs?.[workflowAction.id] as any) ?? {}), + responseId: outputs.id, + responseContent: outputs.content, + }, + } as Record> + const actions = await this.workflowActionService.findByIds( + workflowAction.nextActions.map((next) => next.action) as Types.ObjectId[], + ) + const promises = actions.map((action) => + this.runnerService.runWorkflowActionsTree(workflow, action, nextActionInputs, workflowRun, workflowSleep.itemId), + ) + void Promise.all(promises).then(() => { + return this.workflowRunService.markWorkflowRunAsCompleted(workflowRun._id) + }) + } + + async processXmtpMessage(message: XmtpMessageOutput, workflowTrigger: WorkflowTrigger) { + await this.workflowUsedIdService.createOne({ + workflow: workflowTrigger.workflow, + triggerId: message.id, + }) + + const workflow = await this.workflowService.findOne({ _id: workflowTrigger.workflow }) + if (!workflow) { + return + } + + this.logger.log(`Processing xmtp message: ${message.id} for workflow: ${workflow._id}`) + + const hookTriggerOutputs = { + id: message.id, + outputs: { + [workflowTrigger.id]: message as Record, + trigger: message as Record, + }, + } + + const rootActions = await this.workflowActionService.find({ workflow: workflow._id, isRootAction: true }) + const workflowRun = await this.workflowRunService.createOneByInstantTrigger( + this.xmtpIntegration, + this.xmtpIntegrationTrigger, + workflow, + workflowTrigger, + rootActions.length > 0, + ) + await this.workflowTriggerService.updateById(workflowTrigger._id, { + lastId: message.id, + lastItem: message, + }) + void this.runnerService.runWorkflowActions(rootActions, [hookTriggerOutputs], workflowRun) + } +} diff --git a/apps/api/src/workflow-triggers/workflow-triggers.module.ts b/apps/api/src/workflow-triggers/workflow-triggers.module.ts index c5f0eab9..0922667f 100644 --- a/apps/api/src/workflow-triggers/workflow-triggers.module.ts +++ b/apps/api/src/workflow-triggers/workflow-triggers.module.ts @@ -5,6 +5,7 @@ import { DefinitionsModule } from '../../../../libs/definitions/src' import { RunnerModule } from '../../../runner/src/runner.module' import { AccountCredentialsModule } from '../account-credentials/account-credentials.module' import { AuthModule } from '../auth/auth.module' +import { ContactsModule } from '../contacts/contacts.module' import { IntegrationAccountsModule } from '../integration-accounts/integration-accounts.module' import { IntegrationTriggersModule } from '../integration-triggers/integration-triggers.module' import { IntegrationsModule } from '../integrations/integrations.module' @@ -13,6 +14,7 @@ import { WorkflowActionsModule } from '../workflow-actions/workflow-actions.modu import { WorkflowRunsModule } from '../workflow-runs/workflow-runs.module' import { WorkflowsModule } from '../workflows/workflows.module' import { ChainJetBotController } from './controllers/chainjetbot.controller' +import { ChatbotController } from './controllers/chatbot.controller' import { HooksController } from './controllers/hooks.controller' import { WorkflowTrigger, WorkflowTriggerAuthorizer } from './entities/workflow-trigger' import { WorkflowUsedId } from './entities/workflow-used-id' @@ -42,9 +44,10 @@ import { WorkflowUsedIdService } from './services/workflow-used-id.service' // TODO remove forwardRef once Runner calls are replaced with queues forwardRef(() => RunnerModule), + ContactsModule, ], providers: [WorkflowTriggerResolver, WorkflowTriggerService, WorkflowTriggerAuthorizer, WorkflowUsedIdService], exports: [WorkflowTriggerService, WorkflowUsedIdService], - controllers: [HooksController, ChainJetBotController], + controllers: [HooksController, ChainJetBotController, ChatbotController], }) export class WorkflowTriggersModule {} diff --git a/apps/blockchain-listener/src/blockchain-listener.module.ts b/apps/blockchain-listener/src/blockchain-listener.module.ts index 65e702da..1056f21c 100644 --- a/apps/blockchain-listener/src/blockchain-listener.module.ts +++ b/apps/blockchain-listener/src/blockchain-listener.module.ts @@ -17,7 +17,6 @@ import { WorkflowTriggersModule } from 'apps/api/src/workflow-triggers/workflow- import { WorkflowsModule } from 'apps/api/src/workflows/workflows.module' import { RunnerModule } from 'apps/runner/src/runner.module' import { BlockchainListenerService } from './blockchain-listener.service' -import { ChatbotListenerService } from './chatbot-listener.service' import { XmtpListenerService } from './xmtp-listener.service' @Module({ @@ -40,6 +39,6 @@ import { XmtpListenerService } from './xmtp-listener.service' UserDatabaseModule, ContactsModule, ], - providers: [BlockchainListenerService, XmtpListenerService, ChatbotListenerService], + providers: [BlockchainListenerService, XmtpListenerService], }) export class BlockchainListenerModule {} diff --git a/apps/blockchain-listener/src/chatbot-listener.service.ts b/apps/blockchain-listener/src/chatbot-listener.service.ts deleted file mode 100644 index a7a0c31b..00000000 --- a/apps/blockchain-listener/src/chatbot-listener.service.ts +++ /dev/null @@ -1,251 +0,0 @@ -import { XmtpMessageOutput, mapXmtpMessageToOutput } from '@app/definitions/integration-definitions/xmtp/xmtp.common' -import { XmtpLib } from '@app/definitions/integration-definitions/xmtp/xmtp.lib' -import { Injectable, Logger } from '@nestjs/common' -import { Interval } from '@nestjs/schedule' -import { Client, DecodedMessage } from '@xmtp/xmtp-js' -import { AccountCredentialService } from 'apps/api/src/account-credentials/services/account-credentials.service' -import { ContactService } from 'apps/api/src/contacts/services/contact.service' -import { IntegrationAccount } from 'apps/api/src/integration-accounts/entities/integration-account' -import { IntegrationAccountService } from 'apps/api/src/integration-accounts/services/integration-account.service' -import { IntegrationTrigger } from 'apps/api/src/integration-triggers/entities/integration-trigger' -import { IntegrationTriggerService } from 'apps/api/src/integration-triggers/services/integration-trigger.service' -import { Integration } from 'apps/api/src/integrations/entities/integration' -import { IntegrationService } from 'apps/api/src/integrations/services/integration.service' -import { WorkflowActionService } from 'apps/api/src/workflow-actions/services/workflow-action.service' -import { WorkflowRunStatus } from 'apps/api/src/workflow-runs/entities/workflow-run-status' -import { WorkflowSleep } from 'apps/api/src/workflow-runs/entities/workflow-sleep' -import { WorkflowRunService } from 'apps/api/src/workflow-runs/services/workflow-run.service' -import { WorkflowSleepService } from 'apps/api/src/workflow-runs/services/workflow-sleep.service' -import { WorkflowTrigger } from 'apps/api/src/workflow-triggers/entities/workflow-trigger' -import { WorkflowTriggerService } from 'apps/api/src/workflow-triggers/services/workflow-trigger.service' -import { WorkflowUsedIdService } from 'apps/api/src/workflow-triggers/services/workflow-used-id.service' -import { Workflow } from 'apps/api/src/workflows/entities/workflow' -import { WorkflowService } from 'apps/api/src/workflows/services/workflow.service' -import { RunnerService } from 'apps/runner/src/services/runner.service' -import { shuffle, uniq } from 'lodash' -import { Types } from 'mongoose' - -@Injectable() -export class ChatbotListenerService { - private logger = new Logger(ChatbotListenerService.name) - - private listeners: { [key: string]: boolean } = {} - - private integration: Integration - private integrationTrigger: IntegrationTrigger - private integrationAccount: IntegrationAccount - - constructor( - private integrationService: IntegrationService, - private integrationTriggerService: IntegrationTriggerService, - private workflowService: WorkflowService, - private workflowTriggerService: WorkflowTriggerService, - private workflowActionService: WorkflowActionService, - private workflowRunService: WorkflowRunService, - private workflowUsedIdService: WorkflowUsedIdService, - private workflowSleepService: WorkflowSleepService, - private runnerService: RunnerService, - private accountCredentialService: AccountCredentialService, - private integrationAccountService: IntegrationAccountService, - private contactService: ContactService, - ) {} - - async onModuleInit() { - this.logger.log(`Starting XMTP events listener`) - await this.fetchIntegrationData() - this.startChatbotListener() - } - - async fetchIntegrationData() { - this.integration = (await this.integrationService.findOne({ key: 'chatbot', version: '1' })) as Integration - if (!this.integration) { - throw new Error(`Chatbot integration not found`) - } - this.integrationTrigger = (await this.integrationTriggerService.findOne({ - key: 'newChatbotMessage', - integration: this.integration.id, - })) as IntegrationTrigger - if (!this.integrationTrigger) { - throw new Error(`Chatbot integration not found`) - } - this.integrationAccount = (await this.integrationAccountService.findOne({ key: 'xmtp' })) as IntegrationAccount - if (!this.integrationAccount) { - throw new Error(`XMTP integration account not found`) - } - } - - // TODO we need the interval to start listening for new triggers after the server has started. - // it could be more efficient if the api notifies when this happens rather than polling every 30 seconds. - @Interval(30 * 1000) - async startChatbotListener() { - if (process.env.XMTP_LISTENER_DISABLED === 'true') { - return - } - - const workflowTriggers = await this.workflowTriggerService.find({ - integrationTrigger: this.integrationTrigger.id, - enabled: true, - planLimited: { $ne: true }, - numberOfActions: { $gt: 0 }, - }) - const triggersWithoutListener = workflowTriggers.filter((trigger) => !this.listeners[trigger.id]) - const shuffledTriggers = shuffle(triggersWithoutListener) - - this.logger.log(`Found ${triggersWithoutListener.length} XMTP new message triggers`) - - // Listen for new messages without blocking the event loop - for (const workflowTrigger of shuffledTriggers) { - this.listenForNewMessages(workflowTrigger) - } - } - - async listenForNewMessages(workflowTrigger: WorkflowTrigger) { - const workflow = await this.workflowService.findOne({ _id: workflowTrigger.workflow }) - if (!workflow) { - return - } - try { - const accountCredentials = await this.accountCredentialService.findOne({ - owner: workflow.owner, - integrationAccount: this.integrationAccount._id, - }) - if (!accountCredentials?.credentials?.keys) { - this.logger.error(`Missing keys for XMTP on workflow ${workflowTrigger.workflow}`) - return - } - const credentials = accountCredentials.credentials - const client = await XmtpLib.getClient(credentials.keys) - - this.listeners[workflowTrigger._id.toString()] = true - - const streams = await client.conversations.streamAllMessages() - - this.logger.log(`Streaming all messages for workflow ${workflowTrigger.workflow}`) - - for await (const message of streams) { - try { - await this.processMessage(workflow, workflowTrigger, message, client) - } catch (e) { - this.logger.error(`Error processing XMTP message for workflow ${workflowTrigger.workflow}: ${e.message}`) - } - } - } catch (e) { - this.logger.error(`Error starting XMTP listener for workflow ${workflowTrigger.workflow}: ${e.message}`) - } - } - - async processMessage(workflow: Workflow, workflowTrigger: WorkflowTrigger, message: DecodedMessage, client: Client) { - // Ignore messages sent by the current user - if (message.senderAddress === client.address) { - return - } - - await this.workflowUsedIdService.createOne({ - workflow: workflowTrigger.workflow, - triggerId: message.id, - }) - - const outputs = mapXmtpMessageToOutput(message) - - const workflowSleeps = await this.workflowSleepService.find({ - workflow: workflowTrigger.workflow, - uniqueGroup: outputs.conversation.id, - }) - - // continue previous conversation - if (workflowSleeps.length > 0) { - void this.continueConversation(workflow, workflowTrigger, workflowSleeps, outputs) - return - } - - const tags = workflowTrigger.inputs?.tags?.split(',').map((tag) => tag.trim()) ?? [] - const contact = await this.contactService.findOne({ - owner: workflow.owner, - address: outputs.senderAddress, - }) - if (!contact) { - await this.contactService.createOne({ - owner: workflow.owner, - address: outputs.senderAddress, - tags, - }) - } else if (workflowTrigger.inputs?.tags) { - const newTags = uniq([...contact.tags, ...tags]) - if (newTags.length !== contact.tags.length) { - await this.contactService.updateById(contact._id, { - tags: contact.tags, - }) - } - } - - const hookTriggerOutputs = { - id: outputs.id, - outputs: { - [workflowTrigger.id]: outputs as Record, - trigger: outputs as Record, - contact: { - address: outputs.senderAddress, - }, - }, - } - const rootActions = await this.workflowActionService.find({ workflow: workflow._id, isRootAction: true }) - const workflowRun = await this.workflowRunService.createOneByInstantTrigger( - this.integration, - this.integrationTrigger, - workflow, - workflowTrigger, - rootActions.length > 0, - ) - await this.workflowTriggerService.updateById(workflowTrigger._id, { - lastId: outputs.id, - lastItem: outputs, - }) - void this.runnerService.runWorkflowActions(rootActions, [hookTriggerOutputs], workflowRun) - } - - async continueConversation( - workflow: Workflow, - workflowTrigger: WorkflowTrigger, - workflowSleeps: WorkflowSleep[], - outputs: XmtpMessageOutput, - ) { - const workflowSleep = workflowSleeps[0] - - // clean up - await this.workflowSleepService.deleteManyNative({ - _id: { - $in: workflowSleeps.map((workflowSleep) => workflowSleep._id), - }, - }) - - this.logger.log(`Continuing chatbot conversation ${workflowSleep.id} for workflow ${workflowTrigger.workflow}`) - - const workflowAction = await this.workflowActionService.findById(workflowSleep.workflowAction.toString()) - const workflowRun = await this.workflowRunService.findById(workflowSleep.workflowRun.toString()) - - if (!workflowAction || !workflowRun) { - this.logger.error(`Missing workflow action or workflow run for workflow sleep ${workflowSleep.id}`) - await this.workflowRunService.updateById(workflowSleep._id, { status: WorkflowRunStatus.failed }) - return - } - - await this.workflowRunService.wakeUpWorkflowRun(workflowRun) - const nextActionInputs = { - ...(workflowSleep.nextActionInputs ?? {}), - [workflowAction.id]: { - ...((workflowSleep.nextActionInputs?.[workflowAction.id] as any) ?? {}), - responseId: outputs.id, - responseContent: outputs.content, - }, - } as Record> - const actions = await this.workflowActionService.findByIds( - workflowAction.nextActions.map((next) => next.action) as Types.ObjectId[], - ) - const promises = actions.map((action) => - this.runnerService.runWorkflowActionsTree(workflow, action, nextActionInputs, workflowRun, workflowSleep.itemId), - ) - void Promise.all(promises).then(() => { - return this.workflowRunService.markWorkflowRunAsCompleted(workflowRun._id) - }) - } -}