diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 97725a8a06..6f597627eb 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -79,10 +79,10 @@ importers: specifier: 0.9.118 version: link:../../packages/common '@ballerine/react-pdf-toolkit': - specifier: ^1.2.130 + specifier: ^1.2.131 version: link:../../packages/react-pdf-toolkit '@ballerine/ui': - specifier: 0.7.169 + specifier: 0.7.171 version: link:../../packages/ui '@ballerine/workflow-browser-sdk': specifier: 0.6.142 @@ -527,7 +527,7 @@ importers: specifier: ^0.9.118 version: link:../../packages/common '@ballerine/ui': - specifier: 0.7.169 + specifier: 0.7.171 version: link:../../packages/ui '@ballerine/workflow-browser-sdk': specifier: 0.6.142 @@ -1525,7 +1525,7 @@ importers: specifier: ^1.1.44 version: link:../config '@ballerine/ui': - specifier: 0.7.169 + specifier: 0.7.171 version: link:../ui '@react-pdf/renderer': specifier: ^3.1.14 diff --git a/services/workflows-service/src/alert/alert-queue.service.ts b/services/workflows-service/src/alert/alert-queue.service.ts index 1c277ed735..441f26bd64 100644 --- a/services/workflows-service/src/alert/alert-queue.service.ts +++ b/services/workflows-service/src/alert/alert-queue.service.ts @@ -1,12 +1,7 @@ import { Inject, Injectable, OnModuleInit } from '@nestjs/common'; -import { Job } from 'bullmq'; import { AppLoggerService } from '@/common/app-logger/app-logger.service'; import { AlertService } from './alert.service'; -import { QueueService } from '@/common/queue/queue.service'; -import { QueueBullboardService } from '@/common/queue/queue-bullboard.service'; -import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.service'; -import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types'; -import type { BullBoardInjectedInstance } from '@/common/queue/types'; +import type { IQueueService } from '@/common/queue/types'; import { env } from '@/env'; export interface AlertCheckJobData extends Record { @@ -21,53 +16,34 @@ export class AlertQueueService implements OnModuleInit { constructor( private readonly logger: AppLoggerService, private readonly alertService: AlertService, - private readonly queueService: QueueService, - private readonly queueBullboardService: QueueBullboardService, - private readonly bullMQPrometheusService: BullMQPrometheusService, - @Inject(BULLBOARD_INSTANCE_INJECTION_TOKEN) - private bullBoard: BullBoardInjectedInstance, + @Inject('IQueueService') private readonly queueService: IQueueService, ) {} async onModuleInit() { if (!env.QUEUE_SYSTEM_ENABLED) { return; } + await this.setupAlertQueue(); } private async setupAlertQueue() { try { - const queue = this.queueService.getQueue({ - name: this.QUEUE_NAME, - jobOptions: { - attempts: 10, - backoff: { - type: 'exponential', - delay: 10_000, - }, - removeOnComplete: { count: 100, age: 3600 * 24 }, - removeOnFail: false, + await this.queueService.setupJobScheduler( + this.QUEUE_NAME, + this.SCHEDULER_ID, + { every: 60 * 60 * 1000 }, + { + name: 'alert-check', + data: { timestamp: Date.now() }, }, - }); - - this.bullMQPrometheusService.registerQueue(queue); - - if (this.queueService.isWorkerEnabled()) { - this.queueBullboardService.registerQueue(this.bullBoard, queue); - } - - await this.queueService.setupJobScheduler(queue, this.SCHEDULER_ID, { - every: 60 * 60 * 1000, - jobName: 'check-transaction-monitoring-alerts', - data: { timestamp: Date.now() }, - jobOptions: { - attempts: 10, - backoff: { - type: 'exponential', - delay: 10_000, + { + jobOptions: { + attempts: 2, + backoff: { type: 'exponential', delay: 10000 }, }, }, - }); + ); this.registerWorker(); @@ -78,22 +54,21 @@ export class AlertQueueService implements OnModuleInit { } private registerWorker() { - this.queueService.registerWorker( - this.QUEUE_NAME, - async (job: Job) => { - this.logger.log('Processing transaction monitoring alerts check job', { - jobId: job.id, - }); - - await this.alertService.checkAllAlerts(); + this.queueService.registerWorker(this.QUEUE_NAME, this.processAlertCheckJob.bind(this), { + concurrency: 1, + }); + } - this.logger.log('Completed transaction monitoring alerts check', { - jobId: job.id, - }); + private async processAlertCheckJob(job: any) { + this.logger.log('Processing transaction monitoring alerts check job', { jobId: job.id }); + try { + await this.alertService.checkAllAlerts(); + this.logger.log('Completed transaction monitoring alerts check', { jobId: job.id }); - return { success: true, timestamp: Date.now() }; - }, - { concurrency: 1 }, - ); + return { success: true, timestamp: Date.now() }; + } catch (error) { + this.logger.error('Alert check job failed', { jobId: job.id, error }); + throw error; + } } } diff --git a/services/workflows-service/src/app.worker.module.ts b/services/workflows-service/src/app.worker.module.ts index b51eecd63d..863592d9e5 100644 --- a/services/workflows-service/src/app.worker.module.ts +++ b/services/workflows-service/src/app.worker.module.ts @@ -1,11 +1,10 @@ -import { MiddlewareConsumer, Module } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { ClsModule } from 'nestjs-cls'; import { AnalyticsModule } from '@/common/analytics-logger/analytics.module'; import { AppLoggerModule } from '@/common/app-logger/app-logger.module'; import { QueueModule } from '@/common/queue/queue.module'; -import { configs, env } from '@/env'; +import { configs } from '@/env'; import { validate } from '@/env-validate'; import { SecretsManagerModule } from '@/secrets-manager/secrets-manager.module'; import { SentryModule } from '@/sentry/sentry.module'; @@ -13,7 +12,7 @@ import { WebhooksModule } from '@/webhooks/webhooks.module'; import { HealthModule } from './health/health.module'; import { PrismaModule } from './prisma/prisma.module'; import { AlertModule } from './alert/alert.module'; -import { MetricsAuthMiddleware } from './common/middlewares/metrics-auth.middleware'; +import { Module } from '@nestjs/common'; @Module({ imports: [ @@ -37,8 +36,4 @@ import { MetricsAuthMiddleware } from './common/middlewares/metrics-auth.middlew AlertModule, ], }) -export class WorkerAppModule { - configure(consumer: MiddlewareConsumer) { - consumer.apply(MetricsAuthMiddleware).forRoutes('*'); - } -} +export class WorkerAppModule {} diff --git a/services/workflows-service/src/common/queue/queue-bullboard.service.ts b/services/workflows-service/src/common/queue/queue-bullboard.service.ts index abe1a3c5c9..e28844d65b 100644 --- a/services/workflows-service/src/common/queue/queue-bullboard.service.ts +++ b/services/workflows-service/src/common/queue/queue-bullboard.service.ts @@ -22,14 +22,4 @@ export class QueueBullboardService { this.logger.error(`Error registering queue ${queue.name} with BullBoard`, { error }); } } - - registerQueues(bullBoardInstance: any, queues: Queue[]) { - try { - const adapters = queues.map(queue => new BullMQAdapter(queue)); - bullBoardInstance.boardInstance.setQueues(adapters); - this.logger.log(`Registered ${queues.length} queues with BullBoard`); - } catch (error) { - this.logger.error('Error registering queues with BullBoard', { error }); - } - } } diff --git a/services/workflows-service/src/common/queue/queue.module.ts b/services/workflows-service/src/common/queue/queue.module.ts index 43d6ac2746..89fdebdcec 100644 --- a/services/workflows-service/src/common/queue/queue.module.ts +++ b/services/workflows-service/src/common/queue/queue.module.ts @@ -1,10 +1,37 @@ import { Module } from '@nestjs/common'; -import { QueueService } from './queue.service'; +import { BullMQQueueService } from './queue.service'; import { QueueBullboardService } from './queue-bullboard.service'; import { QueueOtelService } from './otel.service'; +import { MonitoringModule } from '@/common/monitoring/monitoring.module'; +import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from './types'; +import { createBullBoard } from '@bull-board/api'; +import { ExpressAdapter } from '@bull-board/express'; +import { RedisModule } from '../redis/redis.module'; @Module({ - providers: [QueueService, QueueBullboardService, QueueOtelService], - exports: [QueueService, QueueBullboardService, QueueOtelService], + imports: [MonitoringModule, RedisModule], + providers: [ + BullMQQueueService, + { provide: 'IQueueService', useExisting: BullMQQueueService }, + QueueBullboardService, + QueueOtelService, + { + provide: BULLBOARD_INSTANCE_INJECTION_TOKEN, + useFactory: () => { + const serverAdapter = new ExpressAdapter(); + serverAdapter.setBasePath('/api/queues'); + const boardInstance = createBullBoard({ queues: [], serverAdapter }); + + return { boardInstance, serverAdapter }; + }, + }, + ], + exports: [ + BullMQQueueService, + 'IQueueService', + QueueBullboardService, + QueueOtelService, + BULLBOARD_INSTANCE_INJECTION_TOKEN, + ], }) export class QueueModule {} diff --git a/services/workflows-service/src/common/queue/queue.service.ts b/services/workflows-service/src/common/queue/queue.service.ts index f93456fcc0..bb8d273660 100644 --- a/services/workflows-service/src/common/queue/queue.service.ts +++ b/services/workflows-service/src/common/queue/queue.service.ts @@ -1,43 +1,45 @@ -import { Injectable, OnModuleDestroy } from '@nestjs/common'; -import { Queue, Worker, Job } from 'bullmq'; +import { Injectable, OnModuleDestroy, Inject } from '@nestjs/common'; +import { Queue, Worker } from 'bullmq'; import IORedis from 'ioredis'; import { AppLoggerService } from '@/common/app-logger/app-logger.service'; import { env } from '@/env'; -import { QueueOtelService } from './otel.service'; - -export type JobProcessor = (job: Job) => Promise; - -export interface QueueOptions { - name: string; - concurrency?: number; - jobOptions?: { - attempts?: number; - backoff?: { - type: 'exponential' | 'fixed'; - delay: number; - }; - removeOnComplete?: boolean | number | { count: number; age: number }; - removeOnFail?: boolean | number | { count: number; age: number }; - priority?: number; - }; -} +import { RedisService } from '../redis/redis.service'; +import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.service'; +import type { BullBoardInjectedInstance, IQueueService, QueueOptions } from './types'; +import { QueueBullboardService } from './queue-bullboard.service'; + +const defaultJobOptions = { + attempts: 3, + backoff: { type: 'exponential', delay: 2000 }, + removeOnComplete: { count: 100, age: 3600 * 24 * 7 }, + removeOnFail: false, +}; @Injectable() -export class QueueService implements OnModuleDestroy { - private redisClient: IORedis | null = null; +export class BullMQQueueService implements OnModuleDestroy, IQueueService { + private redisClient: IORedis | null; private queues: Map = new Map(); private workers: Map = new Map(); private readonly shouldProcessJobs: boolean; + async addJob(queueName: string, job_name: string, data: T, opts?: any): Promise { + const queue = this.getQueue(queueName); + + return queue.add(job_name, data, { + priority: opts?.priority, + }); + } + constructor( private readonly logger: AppLoggerService, - private readonly queueOtelService: QueueOtelService, + private readonly redisService: RedisService, + private readonly bullMQPrometheusService: BullMQPrometheusService, + @Inject('BULLBOARD_INSTANCE') private readonly bullBoard?: BullBoardInjectedInstance, + private readonly queueBullboardService?: QueueBullboardService, ) { this.shouldProcessJobs = this.determineIfShouldProcessJobs(); this.logger.log(`Queue worker mode: ${this.shouldProcessJobs ? 'ENABLED' : 'DISABLED'}`); - if (env.QUEUE_SYSTEM_ENABLED) { - this.initRedisConnection(); - } + this.redisClient = this.redisService.client; } private determineIfShouldProcessJobs(): boolean { @@ -56,108 +58,52 @@ export class QueueService implements OnModuleDestroy { return this.shouldProcessJobs; } - private initRedisConnection() { - try { - const redisConfig = { - host: env.REDIS_HOST || 'localhost', - port: env.REDIS_PORT || 6379, - password: env.REDIS_PASSWORD, - maxRetriesPerRequest: null, - }; - - this.redisClient = new IORedis({ - host: redisConfig.host, - port: redisConfig.port, - password: redisConfig.password, - maxRetriesPerRequest: redisConfig.maxRetriesPerRequest, - }); - - this.redisClient.on('error', error => { - this.logger.error('Redis connection error', { error }); - }); - - this.redisClient.on('connect', () => { - this.logger.log('Redis connected successfully'); - }); - - this.logger.log('Redis client initialized'); - } catch (error) { - this.logger.error('Failed to initialize Redis client', { error }); - throw error; + private validateQueueName(queueName: string): void { + if (!queueName || typeof queueName !== 'string' || queueName.trim().length === 0) { + throw new Error('Queue name must be a non-empty string'); } } - getQueue(options: QueueOptions): Queue { + public getQueue(queueName: string): Queue { if (!this.redisClient) { throw new Error('Redis client not initialized'); } - if (this.queues.has(options.name)) { - return this.queues.get(options.name) as unknown as Queue; + if (this.queues.has(queueName)) { + return this.queues.get(queueName) as Queue; } - const queue = new Queue(options.name, { - connection: this.redisClient as any, - defaultJobOptions: { - attempts: options.jobOptions?.attempts ?? 3, - backoff: options.jobOptions?.backoff ?? { - type: options.jobOptions?.backoff?.type ?? 'exponential', - delay: options.jobOptions?.backoff?.delay ?? 5000, - }, - removeOnComplete: options.jobOptions?.removeOnComplete ?? { count: 100, age: 3600 * 24 }, - removeOnFail: options.jobOptions?.removeOnFail ?? false, - }, - }); - - this.queues.set(options.name, queue as Queue); - this.logger.log(`Queue created: ${options.name}`); - - return queue; + throw new Error(`Queue with name '${queueName}' does not exist. Please create it first.`); } - registerWorker( + registerWorker( queueName: string, - processor: JobProcessor, - options: { - concurrency?: number; - forceLocalProcessing?: boolean; - } = {}, - ): Worker | null { + processor: (job: any) => Promise, + options: { concurrency?: number } = {}, + ): void { + this.validateQueueName(queueName); + if (!this.redisClient) { throw new Error('Redis client not initialized'); } - if (!this.shouldProcessJobs && !options.forceLocalProcessing) { + if (!this.shouldProcessJobs) { this.logger.debug( `Skipping worker registration for queue ${queueName} (not a worker instance)`, ); - return null; + + return; } if (this.workers.has(queueName)) { - return this.workers.get(queueName) as unknown as Worker; + return; } - const worker = new Worker( - queueName, - async job => { - try { - return await processor(job); - } catch (error) { - this.logger.error(`Error processing job ${job.id} in queue ${queueName}`, { - error, - jobId: job.id, - queueName, - }); - throw error; - } - }, - { - connection: this.redisClient as any, - concurrency: options.concurrency ?? 1, - autorun: true, - }, - ); + const worker = new Worker(queueName, processor, { + connection: this.redisClient, + concurrency: options.concurrency ?? 1, + autorun: true, + }); worker.on('failed', (job, error) => { this.logger.error(`Job ${job?.id} failed in queue ${queueName}`, { @@ -177,62 +123,77 @@ export class QueueService implements OnModuleDestroy { }); }); - this.workers.set(queueName, worker as unknown as Worker); + this.workers.set(queueName, worker); this.logger.log(`Worker registered for queue: ${queueName}`); + } + + createQueue(queueName: string, options?: QueueOptions): void { + this.validateQueueName(queueName); + + if (this.queues.has(queueName)) { + return; + } + + const mergedJobOptions = { ...defaultJobOptions, ...(options?.jobOptions || {}) }; + const queue = new Queue(queueName, { + connection: this.redisClient as IORedis, + defaultJobOptions: mergedJobOptions, + }); + this.queues.set(queueName, queue); + this.logger.log(`Queue created: ${queueName}`); + + if (this.bullMQPrometheusService) { + this.bullMQPrometheusService.registerQueue(queue); + } - return worker; + if (this.shouldProcessJobs && this.bullBoard && this.queueBullboardService) { + this.queueBullboardService.registerQueue(this.bullBoard, queue); + } + } + + async onModuleDestroy() { + const workerClosePromises = Array.from(this.workers.values()).map(worker => + worker.close().catch(err => this.logger.error(`Error closing worker`, { err })), + ); + + const queueClosePromises = Array.from(this.queues.values()).map(queue => + queue.close().catch(err => this.logger.error(`Error closing queue`, { err })), + ); + + await Promise.all([...workerClosePromises, ...queueClosePromises]); } async setupJobScheduler( - queue: Queue, + queueName: string, schedulerId: string, - options: { - every: number; - data?: T; - jobName?: string; - jobOptions?: { - attempts?: number; - backoff?: { - type: 'exponential' | 'fixed'; - delay: number; - }; - }; + scheduleOpts: { every: number }, + jobOpts: { + name: string; + data: T; }, - ) { + queueOptions?: QueueOptions, + ): Promise { + this.validateQueueName(queueName); try { - const schedulers = await queue.getJobSchedulers(); - const existingScheduler = schedulers.find(s => s.id === schedulerId); - - if (existingScheduler) { - this.logger.log(`Job scheduler already exists: ${schedulerId}`, { - schedulerId, - pattern: existingScheduler.pattern, - every: existingScheduler.every, - }); - return existingScheduler; + if (!this.queues.has(queueName)) { + this.createQueue(queueName, queueOptions); } - const jobName = options.jobName || 'scheduled-job'; + const queue = this.getQueue(queueName); + + const jobName = jobOpts.name; const firstJob = await queue.upsertJobScheduler( schedulerId, - { every: options.every, jobId: schedulerId }, + { every: scheduleOpts.every, jobId: schedulerId }, { name: jobName, - data: options.data || { timestamp: Date.now() }, - opts: { - attempts: options.jobOptions?.attempts || 10, - backoff: options.jobOptions?.backoff || { - type: 'exponential', - delay: 10000, - }, - }, + data: jobOpts.data || { timestamp: Date.now() }, }, ); - this.logger.log(`Created job scheduler: ${schedulerId}`, { schedulerId, - every: options.every, - firstJobId: firstJob?.id, + every: scheduleOpts.every, + jobId: firstJob?.id, }); return firstJob; @@ -241,23 +202,4 @@ export class QueueService implements OnModuleDestroy { throw error; } } - - async onModuleDestroy() { - const workerClosePromises = Array.from(this.workers.values()).map(worker => - worker.close().catch(err => this.logger.error(`Error closing worker`, { err })), - ); - - const queueClosePromises = Array.from(this.queues.values()).map(queue => - queue.close().catch(err => this.logger.error(`Error closing queue`, { err })), - ); - - await Promise.all([...workerClosePromises, ...queueClosePromises]); - - if (this.redisClient) { - await this.redisClient - .quit() - .catch(err => this.logger.error(`Error closing Redis connection`, { err })); - this.redisClient = null; - } - } } diff --git a/services/workflows-service/src/common/queue/types.ts b/services/workflows-service/src/common/queue/types.ts index b8f17f12f0..9b80c2d9a0 100644 --- a/services/workflows-service/src/common/queue/types.ts +++ b/services/workflows-service/src/common/queue/types.ts @@ -7,3 +7,39 @@ export interface BullBoardInjectedInstance { boardInstance: ReturnType; serverAdapter: ExpressAdapter; } + +export interface IQueueService { + addJob(queueName: string, job_name: string, data: T, opts?: any): Promise; + + registerWorker( + queueName: string, + processor: (job: any) => Promise, + options?: { concurrency?: number }, + ): void; + + isWorkerEnabled(): boolean; + createQueue(queueName: string, options?: QueueOptions): void; + setupJobScheduler( + queueName: string, + schedulerId: string, + scheduleOpts: { every: number }, + jobOpts: { + name: string; + data: T; + }, + queueOptions?: QueueOptions, + ): Promise; +} + +export interface QueueOptions { + jobOptions?: { + attempts?: number; + backoff?: { + type: 'exponential' | 'fixed'; + delay: number; + }; + removeOnComplete?: boolean | number | { count: number; age: number }; + removeOnFail?: boolean | number | { count: number; age: number }; + priority?: number; + }; +} diff --git a/services/workflows-service/src/common/redis/redis.module.ts b/services/workflows-service/src/common/redis/redis.module.ts new file mode 100644 index 0000000000..2d3d7608df --- /dev/null +++ b/services/workflows-service/src/common/redis/redis.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; +import { RedisService, redisProvider } from './redis.service'; +import { AppLoggerModule } from '@/common/app-logger/app-logger.module'; + +@Module({ + imports: [AppLoggerModule], + providers: [RedisService, redisProvider], + exports: [RedisService, redisProvider], +}) +export class RedisModule {} diff --git a/services/workflows-service/src/common/redis/redis.service.ts b/services/workflows-service/src/common/redis/redis.service.ts new file mode 100644 index 0000000000..4792b368e5 --- /dev/null +++ b/services/workflows-service/src/common/redis/redis.service.ts @@ -0,0 +1,53 @@ +import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import IORedis from 'ioredis'; +import { env } from '@/env'; +import { AppLoggerService } from '@/common/app-logger/app-logger.service'; + +export const REDIS_CLIENT = Symbol('REDIS_CLIENT'); + +@Injectable() +export class RedisService implements OnModuleDestroy { + public readonly client!: IORedis; + + constructor(private readonly logger: AppLoggerService) { + if (!env.QUEUE_SYSTEM_ENABLED) { + Object.defineProperty(this, 'client', { + get: () => { + this.logger.warn('Redis client is not available when QUEUE_SYSTEM_ENABLED is false'); + }, + }); + + return; + } + + const redisConfig = { + host: env.REDIS_HOST, + port: env.REDIS_PORT, + password: env.REDIS_PASSWORD, + maxRetriesPerRequest: null, + ...(env.ENVIRONMENT_NAME !== 'local' ? { tls: {} } : {}), + }; + this.client = new IORedis(redisConfig); + + this.client.on('error', error => { + this.logger.error('Redis connection error', { error }); + }); + + this.client.on('connect', () => { + this.logger.log('Redis connected successfully'); + }); + + this.logger.log('Redis client initialized via RedisService.'); + } + + async onModuleDestroy() { + if (this.client) { + await this.client.quit(); + } + } +} + +export const redisProvider = { + provide: REDIS_CLIENT, + useExisting: RedisService, +}; diff --git a/services/workflows-service/src/events/document-changed-webhook-caller.ts b/services/workflows-service/src/events/document-changed-webhook-caller.ts index 4ca20a4a76..4b4e87e23f 100644 --- a/services/workflows-service/src/events/document-changed-webhook-caller.ts +++ b/services/workflows-service/src/events/document-changed-webhook-caller.ts @@ -84,6 +84,7 @@ export class DocumentChangedWebhookCaller { select: { authenticationConfiguration: true, subscriptions: true, + features: true, }, }); @@ -134,7 +135,7 @@ export class DocumentChangedWebhookCaller { oldDocuments, webhook, webhookSharedSecret, - forceDirect: !customer.features?.WEBHOOK_QUEUE_SYSTEM_ENABLED?.enabled, + forceDirect: customer.features?.WEBHOOK_QUEUE_SYSTEM_ENABLED?.enabled !== true, }); } } diff --git a/services/workflows-service/src/events/workflow-completed-webhook-caller.ts b/services/workflows-service/src/events/workflow-completed-webhook-caller.ts index 53d31bd689..20f214e835 100644 --- a/services/workflows-service/src/events/workflow-completed-webhook-caller.ts +++ b/services/workflows-service/src/events/workflow-completed-webhook-caller.ts @@ -41,6 +41,7 @@ export class WorkflowCompletedWebhookCaller { select: { authenticationConfiguration: true, subscriptions: true, + features: true, }, }); diff --git a/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts b/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts index 601192f0cd..0fd1d756cd 100644 --- a/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts +++ b/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts @@ -33,6 +33,7 @@ export class WorkflowStateChangedWebhookCaller { select: { authenticationConfiguration: true, subscriptions: true, + features: true, }, }); @@ -51,7 +52,7 @@ export class WorkflowStateChangedWebhookCaller { data, webhook, webhookSharedSecret, - forceDirect: !customer.features?.WEBHOOK_QUEUE_SYSTEM_ENABLED?.enabled, + forceDirect: customer.features?.WEBHOOK_QUEUE_SYSTEM_ENABLED?.enabled !== true, }); } } diff --git a/services/workflows-service/src/main.worker.ts b/services/workflows-service/src/main.worker.ts index 216781de30..917c66ef1f 100644 --- a/services/workflows-service/src/main.worker.ts +++ b/services/workflows-service/src/main.worker.ts @@ -33,7 +33,7 @@ const workerMain = async () => { process.once('SIGINT', () => closeApp('SIGINT')); const configService = app.get(ConfigService); - const port = configService.getOrThrow('PORT'); + const port = configService.getOrThrow('WORKER_PORT'); void app.listen(+port); logger.log(`Listening on port ${port}`); diff --git a/services/workflows-service/src/webhooks/webhooks.module.ts b/services/workflows-service/src/webhooks/webhooks.module.ts index 0d42b099f0..d4137dd55b 100644 --- a/services/workflows-service/src/webhooks/webhooks.module.ts +++ b/services/workflows-service/src/webhooks/webhooks.module.ts @@ -1,5 +1,3 @@ -import { createBullBoard } from '@bull-board/api'; -import { ExpressAdapter } from '@bull-board/express'; import { HttpModule } from '@nestjs/axios'; import { Inject, MiddlewareConsumer, Module } from '@nestjs/common'; @@ -8,26 +6,12 @@ import { QueueModule } from '@/common/queue/queue.module'; import { MonitoringModule } from '@/common/monitoring/monitoring.module'; import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types'; import type { BullBoardInjectedInstance } from '@/common/queue/types'; -import { BullBoardAuthMiddleware } from './bull-board.auth.middleware'; import { WebhooksService } from './webhooks.service'; @Module({ imports: [AppLoggerModule, HttpModule, QueueModule, MonitoringModule], - providers: [ - WebhooksService, - { - provide: BULLBOARD_INSTANCE_INJECTION_TOKEN, - useFactory: (): BullBoardInjectedInstance => { - const serverAdapter = new ExpressAdapter(); - serverAdapter.setBasePath('/api/queues'); - - const boardInstance = createBullBoard({ queues: [], serverAdapter }); - - return { boardInstance, serverAdapter }; - }, - }, - ], - exports: [WebhooksService, BULLBOARD_INSTANCE_INJECTION_TOKEN], + providers: [WebhooksService], + exports: [WebhooksService], }) export class WebhooksModule { constructor( @@ -36,8 +20,6 @@ export class WebhooksModule { ) {} configure(consumer: MiddlewareConsumer) { - consumer - .apply(BullBoardAuthMiddleware, this.bullBoard.serverAdapter.getRouter()) - .forRoutes('/api/queues'); + consumer.apply(this.bullBoard.serverAdapter.getRouter()).forRoutes('/api/queues'); } } diff --git a/services/workflows-service/src/webhooks/webhooks.service.ts b/services/workflows-service/src/webhooks/webhooks.service.ts index c34a89a3cc..00dffbf3b5 100644 --- a/services/workflows-service/src/webhooks/webhooks.service.ts +++ b/services/workflows-service/src/webhooks/webhooks.service.ts @@ -6,11 +6,8 @@ import { isAxiosError, RawAxiosRequestHeaders } from 'axios'; import { Job } from 'bullmq'; import { AppLoggerService } from '@/common/app-logger/app-logger.service'; -import { QueueService } from '@/common/queue/queue.service'; -import { QueueBullboardService } from '@/common/queue/queue-bullboard.service'; import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types'; -import type { BullBoardInjectedInstance } from '@/common/queue/types'; -import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.service'; +import type { BullBoardInjectedInstance, IQueueService } from '@/common/queue/types'; import { env } from '@/env'; import { WebhookError, @@ -35,9 +32,7 @@ export class WebhooksService implements OnModuleInit { constructor( private readonly logger: AppLoggerService, private readonly httpService: HttpService, - private readonly queueService: QueueService, - private readonly queueBullboardService: QueueBullboardService, - private readonly bullMQPrometheusService: BullMQPrometheusService, + @Inject('IQueueService') private readonly queueService: IQueueService, @Inject(BULLBOARD_INSTANCE_INJECTION_TOKEN) private bullBoard: BullBoardInjectedInstance, ) { @@ -49,8 +44,10 @@ export class WebhooksService implements OnModuleInit { if (!env.QUEUE_SYSTEM_ENABLED) { this.logger.log('Queue system is disabled. Webhooks will be sent directly.'); + return; } + this.logger.log( `Setting up queue system. env.QUEUE_SYSTEM_ENABLED: ${env.QUEUE_SYSTEM_ENABLED}`, ); @@ -59,25 +56,9 @@ export class WebhooksService implements OnModuleInit { private async setupQueueSystem() { try { - const queue = this.queueService.getQueue({ - name: this.QUEUE_NAME, - jobOptions: { - attempts: 5, - backoff: { - type: 'exponential', - delay: 5_000, - }, - removeOnComplete: { count: 1000, age: 3600 * 24 * 7 }, - removeOnFail: false, - }, - }); + this.queueService.createQueue(this.QUEUE_NAME); - this.bullMQPrometheusService.registerQueue(queue); - - if (this.queueService.isWorkerEnabled()) { - this.queueBullboardService.registerQueue(this.bullBoard, queue); - this.registerWorker(); - } + this.registerWorker(); this.queueInitialized = true; this.logger.log('Webhook queue system setup complete'); @@ -88,30 +69,30 @@ export class WebhooksService implements OnModuleInit { } private registerWorker() { - this.queueService.registerWorker( - this.QUEUE_NAME, - async (job: Job) => { - try { - const res = await this.httpService.axiosRef.request(job.data); - return res.data; - } catch (error) { - this.handleWebhookJobError(job, error); - - if (isAxiosError(error)) { - const webhookError = new WebhookError('Webhook request failed'); - webhookError.cause = error; - webhookError.statusCode = error.response?.status; - webhookError.responseData = error.response?.data; - webhookError.headers = error.response?.headers; - throw webhookError; - } - throw error; - } - }, - { - concurrency: 10, - }, - ); + this.queueService.registerWorker(this.QUEUE_NAME, this.processWebhookJob.bind(this), { + concurrency: 10, + }); + } + + private async processWebhookJob(job: Job) { + try { + const res = await this.httpService.axiosRef.request(job.data); + + return res.data; + } catch (error) { + this.handleWebhookJobError(job, error); + + if (isAxiosError(error)) { + const webhookError = new WebhookError('Webhook request failed'); + webhookError.cause = error; + webhookError.statusCode = error.response?.status; + webhookError.responseData = error.response?.data; + webhookError.headers = error.response?.headers; + throw webhookError; + } + + throw error; + } } private handleWebhookJobError(job: Job, error: any) { @@ -191,10 +172,10 @@ export class WebhooksService implements OnModuleInit { data, timeout: timeout ?? 15_000, }; + if (env.QUEUE_SYSTEM_ENABLED && this.queueInitialized && !forceDirect) { try { - const queue = this.queueService.getQueue({ name: this.QUEUE_NAME }); - return await queue.add(name, requestData); + return await this.queueService.addJob(this.QUEUE_NAME, name, requestData); } catch (error) { const enqueueErrorPayload = { message: 'Failed to add webhook job to the queue',