From 2a1d63ce30259d12ed9b654a60fa55963ee696e2 Mon Sep 17 00:00:00 2001 From: Omer Harel Date: Tue, 24 Jun 2025 15:46:26 +0300 Subject: [PATCH 1/9] refactor(workflows-service): remove unnecessary middleware and update port config - Remove MetricsAuthMiddleware configuration from WorkerAppModule - Change retrieved port from 'PORT' to 'WORKER_PORT' in main.worker.ts - Update BullBoardAuthMiddleware usage in WebhooksModule configuration --- services/workflows-service/src/app.worker.module.ts | 6 +----- services/workflows-service/src/main.worker.ts | 2 +- services/workflows-service/src/webhooks/webhooks.module.ts | 5 +---- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/services/workflows-service/src/app.worker.module.ts b/services/workflows-service/src/app.worker.module.ts index b51eecd63d..9d026d3c83 100644 --- a/services/workflows-service/src/app.worker.module.ts +++ b/services/workflows-service/src/app.worker.module.ts @@ -37,8 +37,4 @@ import { MetricsAuthMiddleware } from './common/middlewares/metrics-auth.middlew AlertModule, ], }) -export class WorkerAppModule { - configure(consumer: MiddlewareConsumer) { - consumer.apply(MetricsAuthMiddleware).forRoutes('*'); - } -} +export class WorkerAppModule {} diff --git a/services/workflows-service/src/main.worker.ts b/services/workflows-service/src/main.worker.ts index 216781de30..917c66ef1f 100644 --- a/services/workflows-service/src/main.worker.ts +++ b/services/workflows-service/src/main.worker.ts @@ -33,7 +33,7 @@ const workerMain = async () => { process.once('SIGINT', () => closeApp('SIGINT')); const configService = app.get(ConfigService); - const port = configService.getOrThrow('PORT'); + const port = configService.getOrThrow('WORKER_PORT'); void app.listen(+port); logger.log(`Listening on port ${port}`); diff --git a/services/workflows-service/src/webhooks/webhooks.module.ts b/services/workflows-service/src/webhooks/webhooks.module.ts index 0d42b099f0..4ee16dba5f 100644 --- a/services/workflows-service/src/webhooks/webhooks.module.ts +++ b/services/workflows-service/src/webhooks/webhooks.module.ts @@ -8,7 +8,6 @@ import { QueueModule } from '@/common/queue/queue.module'; import { MonitoringModule } from '@/common/monitoring/monitoring.module'; import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types'; import type { BullBoardInjectedInstance } from '@/common/queue/types'; -import { BullBoardAuthMiddleware } from './bull-board.auth.middleware'; import { WebhooksService } from './webhooks.service'; @Module({ @@ -36,8 +35,6 @@ export class WebhooksModule { ) {} configure(consumer: MiddlewareConsumer) { - consumer - .apply(BullBoardAuthMiddleware, this.bullBoard.serverAdapter.getRouter()) - .forRoutes('/api/queues'); + consumer.apply(this.bullBoard.serverAdapter.getRouter()).forRoutes('/api/queues'); } } From c0dbd6464a86fb197531679babdb89160c700f78 Mon Sep 17 00:00:00 2001 From: Omer Harel Date: Tue, 24 Jun 2025 15:59:36 +0300 Subject: [PATCH 2/9] fix(queue): refactor redis connection handling in QueueService - Remove Redis connection initialization from constructor - Inject Redis client through dependency injection - Improve code structure by eliminating unnecessary methods --- .../src/alert/alert-queue.service.ts | 1 + .../src/common/queue/queue.module.ts | 5 ++- .../src/common/queue/queue.service.ts | 43 +++---------------- 3 files changed, 11 insertions(+), 38 deletions(-) diff --git a/services/workflows-service/src/alert/alert-queue.service.ts b/services/workflows-service/src/alert/alert-queue.service.ts index 1c277ed735..3c68125557 100644 --- a/services/workflows-service/src/alert/alert-queue.service.ts +++ b/services/workflows-service/src/alert/alert-queue.service.ts @@ -32,6 +32,7 @@ export class AlertQueueService implements OnModuleInit { if (!env.QUEUE_SYSTEM_ENABLED) { return; } + await this.setupAlertQueue(); } diff --git a/services/workflows-service/src/common/queue/queue.module.ts b/services/workflows-service/src/common/queue/queue.module.ts index 43d6ac2746..ce381b3412 100644 --- a/services/workflows-service/src/common/queue/queue.module.ts +++ b/services/workflows-service/src/common/queue/queue.module.ts @@ -2,9 +2,10 @@ import { Module } from '@nestjs/common'; import { QueueService } from './queue.service'; import { QueueBullboardService } from './queue-bullboard.service'; import { QueueOtelService } from './otel.service'; +import { redisProvider } from './redis.provider'; @Module({ - providers: [QueueService, QueueBullboardService, QueueOtelService], - exports: [QueueService, QueueBullboardService, QueueOtelService], + providers: [QueueService, QueueBullboardService, QueueOtelService, redisProvider], + exports: [QueueService, QueueBullboardService, QueueOtelService, redisProvider], }) export class QueueModule {} diff --git a/services/workflows-service/src/common/queue/queue.service.ts b/services/workflows-service/src/common/queue/queue.service.ts index f93456fcc0..84c729852d 100644 --- a/services/workflows-service/src/common/queue/queue.service.ts +++ b/services/workflows-service/src/common/queue/queue.service.ts @@ -1,9 +1,10 @@ -import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import { Injectable, OnModuleDestroy, Inject } from '@nestjs/common'; import { Queue, Worker, Job } from 'bullmq'; import IORedis from 'ioredis'; import { AppLoggerService } from '@/common/app-logger/app-logger.service'; import { env } from '@/env'; import { QueueOtelService } from './otel.service'; +import { REDIS_CLIENT } from './redis.provider'; export type JobProcessor = (job: Job) => Promise; @@ -24,7 +25,7 @@ export interface QueueOptions { @Injectable() export class QueueService implements OnModuleDestroy { - private redisClient: IORedis | null = null; + private redisClient: IORedis | null; private queues: Map = new Map(); private workers: Map = new Map(); private readonly shouldProcessJobs: boolean; @@ -32,12 +33,11 @@ export class QueueService implements OnModuleDestroy { constructor( private readonly logger: AppLoggerService, private readonly queueOtelService: QueueOtelService, + @Inject(REDIS_CLIENT) redisClient: IORedis | null, ) { this.shouldProcessJobs = this.determineIfShouldProcessJobs(); this.logger.log(`Queue worker mode: ${this.shouldProcessJobs ? 'ENABLED' : 'DISABLED'}`); - if (env.QUEUE_SYSTEM_ENABLED) { - this.initRedisConnection(); - } + this.redisClient = redisClient; } private determineIfShouldProcessJobs(): boolean { @@ -56,37 +56,6 @@ export class QueueService implements OnModuleDestroy { return this.shouldProcessJobs; } - private initRedisConnection() { - try { - const redisConfig = { - host: env.REDIS_HOST || 'localhost', - port: env.REDIS_PORT || 6379, - password: env.REDIS_PASSWORD, - maxRetriesPerRequest: null, - }; - - this.redisClient = new IORedis({ - host: redisConfig.host, - port: redisConfig.port, - password: redisConfig.password, - maxRetriesPerRequest: redisConfig.maxRetriesPerRequest, - }); - - this.redisClient.on('error', error => { - this.logger.error('Redis connection error', { error }); - }); - - this.redisClient.on('connect', () => { - this.logger.log('Redis connected successfully'); - }); - - this.logger.log('Redis client initialized'); - } catch (error) { - this.logger.error('Failed to initialize Redis client', { error }); - throw error; - } - } - getQueue(options: QueueOptions): Queue { if (!this.redisClient) { throw new Error('Redis client not initialized'); @@ -131,6 +100,7 @@ export class QueueService implements OnModuleDestroy { this.logger.debug( `Skipping worker registration for queue ${queueName} (not a worker instance)`, ); + return null; } @@ -209,6 +179,7 @@ export class QueueService implements OnModuleDestroy { pattern: existingScheduler.pattern, every: existingScheduler.every, }); + return existingScheduler; } From 1b9e40a030e50ea79c45a3ebcbb9889aa06f10c1 Mon Sep 17 00:00:00 2001 From: Omer Harel Date: Tue, 24 Jun 2025 16:28:10 +0300 Subject: [PATCH 3/9] refactor(queue): streamline queue registration process and improve imports - Remove redundant queue registration lines in AlertQueueService - Integrate BullBoard instance creation within QueueModule - Simplify dependency handling in WebhooksService and related modules --- .../src/alert/alert-queue.service.ts | 6 ---- .../src/common/queue/queue.module.ts | 30 +++++++++++++++++-- .../src/common/queue/queue.service.ts | 14 +++++++++ .../src/common/queue/types.ts | 11 +++++++ .../src/webhooks/webhooks.module.ts | 19 ++---------- .../src/webhooks/webhooks.service.ts | 9 ++++-- 6 files changed, 61 insertions(+), 28 deletions(-) diff --git a/services/workflows-service/src/alert/alert-queue.service.ts b/services/workflows-service/src/alert/alert-queue.service.ts index 3c68125557..7dd242f26d 100644 --- a/services/workflows-service/src/alert/alert-queue.service.ts +++ b/services/workflows-service/src/alert/alert-queue.service.ts @@ -51,12 +51,6 @@ export class AlertQueueService implements OnModuleInit { }, }); - this.bullMQPrometheusService.registerQueue(queue); - - if (this.queueService.isWorkerEnabled()) { - this.queueBullboardService.registerQueue(this.bullBoard, queue); - } - await this.queueService.setupJobScheduler(queue, this.SCHEDULER_ID, { every: 60 * 60 * 1000, jobName: 'check-transaction-monitoring-alerts', diff --git a/services/workflows-service/src/common/queue/queue.module.ts b/services/workflows-service/src/common/queue/queue.module.ts index ce381b3412..b093c5f0b3 100644 --- a/services/workflows-service/src/common/queue/queue.module.ts +++ b/services/workflows-service/src/common/queue/queue.module.ts @@ -3,9 +3,35 @@ import { QueueService } from './queue.service'; import { QueueBullboardService } from './queue-bullboard.service'; import { QueueOtelService } from './otel.service'; import { redisProvider } from './redis.provider'; +import { MonitoringModule } from '@/common/monitoring/monitoring.module'; +import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from './types'; +import { createBullBoard } from '@bull-board/api'; +import { ExpressAdapter } from '@bull-board/express'; @Module({ - providers: [QueueService, QueueBullboardService, QueueOtelService, redisProvider], - exports: [QueueService, QueueBullboardService, QueueOtelService, redisProvider], + imports: [MonitoringModule], + providers: [ + QueueService, + QueueBullboardService, + QueueOtelService, + redisProvider, + { + provide: BULLBOARD_INSTANCE_INJECTION_TOKEN, + useFactory: () => { + const serverAdapter = new ExpressAdapter(); + serverAdapter.setBasePath('/api/queues'); + const boardInstance = createBullBoard({ queues: [], serverAdapter }); + + return { boardInstance, serverAdapter }; + }, + }, + ], + exports: [ + QueueService, + QueueBullboardService, + QueueOtelService, + redisProvider, + BULLBOARD_INSTANCE_INJECTION_TOKEN, + ], }) export class QueueModule {} diff --git a/services/workflows-service/src/common/queue/queue.service.ts b/services/workflows-service/src/common/queue/queue.service.ts index 84c729852d..32827ea789 100644 --- a/services/workflows-service/src/common/queue/queue.service.ts +++ b/services/workflows-service/src/common/queue/queue.service.ts @@ -5,6 +5,9 @@ import { AppLoggerService } from '@/common/app-logger/app-logger.service'; import { env } from '@/env'; import { QueueOtelService } from './otel.service'; import { REDIS_CLIENT } from './redis.provider'; +import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.service'; +import type { BullBoardInjectedInstance } from './types'; +import { QueueBullboardService } from './queue-bullboard.service'; export type JobProcessor = (job: Job) => Promise; @@ -34,6 +37,9 @@ export class QueueService implements OnModuleDestroy { private readonly logger: AppLoggerService, private readonly queueOtelService: QueueOtelService, @Inject(REDIS_CLIENT) redisClient: IORedis | null, + private readonly bullMQPrometheusService: BullMQPrometheusService, + @Inject('BULLBOARD_INSTANCE') private readonly bullBoard?: BullBoardInjectedInstance, + private readonly queueBullboardService?: QueueBullboardService, ) { this.shouldProcessJobs = this.determineIfShouldProcessJobs(); this.logger.log(`Queue worker mode: ${this.shouldProcessJobs ? 'ENABLED' : 'DISABLED'}`); @@ -81,6 +87,14 @@ export class QueueService implements OnModuleDestroy { this.queues.set(options.name, queue as Queue); this.logger.log(`Queue created: ${options.name}`); + if (this.bullMQPrometheusService) { + this.bullMQPrometheusService.registerQueue(queue); + } + + if (this.shouldProcessJobs && this.bullBoard && this.queueBullboardService) { + this.queueBullboardService.registerQueue(this.bullBoard, queue); + } + return queue; } diff --git a/services/workflows-service/src/common/queue/types.ts b/services/workflows-service/src/common/queue/types.ts index b8f17f12f0..81b9b76880 100644 --- a/services/workflows-service/src/common/queue/types.ts +++ b/services/workflows-service/src/common/queue/types.ts @@ -7,3 +7,14 @@ export interface BullBoardInjectedInstance { boardInstance: ReturnType; serverAdapter: ExpressAdapter; } + +export const bullBoardProvider = { + provide: BULLBOARD_INSTANCE_INJECTION_TOKEN, + useFactory: (): BullBoardInjectedInstance => { + const serverAdapter = new ExpressAdapter(); + serverAdapter.setBasePath('/api/queues'); + const boardInstance = createBullBoard({ queues: [], serverAdapter }); + + return { boardInstance, serverAdapter }; + }, +}; diff --git a/services/workflows-service/src/webhooks/webhooks.module.ts b/services/workflows-service/src/webhooks/webhooks.module.ts index 4ee16dba5f..d4137dd55b 100644 --- a/services/workflows-service/src/webhooks/webhooks.module.ts +++ b/services/workflows-service/src/webhooks/webhooks.module.ts @@ -1,5 +1,3 @@ -import { createBullBoard } from '@bull-board/api'; -import { ExpressAdapter } from '@bull-board/express'; import { HttpModule } from '@nestjs/axios'; import { Inject, MiddlewareConsumer, Module } from '@nestjs/common'; @@ -12,21 +10,8 @@ import { WebhooksService } from './webhooks.service'; @Module({ imports: [AppLoggerModule, HttpModule, QueueModule, MonitoringModule], - providers: [ - WebhooksService, - { - provide: BULLBOARD_INSTANCE_INJECTION_TOKEN, - useFactory: (): BullBoardInjectedInstance => { - const serverAdapter = new ExpressAdapter(); - serverAdapter.setBasePath('/api/queues'); - - const boardInstance = createBullBoard({ queues: [], serverAdapter }); - - return { boardInstance, serverAdapter }; - }, - }, - ], - exports: [WebhooksService, BULLBOARD_INSTANCE_INJECTION_TOKEN], + providers: [WebhooksService], + exports: [WebhooksService], }) export class WebhooksModule { constructor( diff --git a/services/workflows-service/src/webhooks/webhooks.service.ts b/services/workflows-service/src/webhooks/webhooks.service.ts index c34a89a3cc..c5e45e9e46 100644 --- a/services/workflows-service/src/webhooks/webhooks.service.ts +++ b/services/workflows-service/src/webhooks/webhooks.service.ts @@ -49,8 +49,10 @@ export class WebhooksService implements OnModuleInit { if (!env.QUEUE_SYSTEM_ENABLED) { this.logger.log('Queue system is disabled. Webhooks will be sent directly.'); + return; } + this.logger.log( `Setting up queue system. env.QUEUE_SYSTEM_ENABLED: ${env.QUEUE_SYSTEM_ENABLED}`, ); @@ -72,10 +74,7 @@ export class WebhooksService implements OnModuleInit { }, }); - this.bullMQPrometheusService.registerQueue(queue); - if (this.queueService.isWorkerEnabled()) { - this.queueBullboardService.registerQueue(this.bullBoard, queue); this.registerWorker(); } @@ -93,6 +92,7 @@ export class WebhooksService implements OnModuleInit { async (job: Job) => { try { const res = await this.httpService.axiosRef.request(job.data); + return res.data; } catch (error) { this.handleWebhookJobError(job, error); @@ -105,6 +105,7 @@ export class WebhooksService implements OnModuleInit { webhookError.headers = error.response?.headers; throw webhookError; } + throw error; } }, @@ -191,9 +192,11 @@ export class WebhooksService implements OnModuleInit { data, timeout: timeout ?? 15_000, }; + if (env.QUEUE_SYSTEM_ENABLED && this.queueInitialized && !forceDirect) { try { const queue = this.queueService.getQueue({ name: this.QUEUE_NAME }); + return await queue.add(name, requestData); } catch (error) { const enqueueErrorPayload = { From 1ac3cadc8d02fba412f009eb9cd79b44d8721c02 Mon Sep 17 00:00:00 2001 From: Omer Harel Date: Tue, 24 Jun 2025 18:09:30 +0300 Subject: [PATCH 4/9] refactor(queue): improve alert queue service and worker registration - Simplify dependency injection for services in AlertQueueService - Replace queue setup logic with createQueue method and refactor worker processing - Eliminate unused code and streamline error handling in webhook job processing --- .../src/alert/alert-queue.service.ts | 62 +++----- .../common/queue/queue-bullboard.service.ts | 10 -- .../src/common/queue/queue.module.ts | 2 + .../src/common/queue/queue.service.ts | 133 +++++------------- .../src/common/queue/types.ts | 11 -- .../src/webhooks/webhooks.service.ts | 65 +++------ 6 files changed, 81 insertions(+), 202 deletions(-) diff --git a/services/workflows-service/src/alert/alert-queue.service.ts b/services/workflows-service/src/alert/alert-queue.service.ts index 7dd242f26d..6782d56bc8 100644 --- a/services/workflows-service/src/alert/alert-queue.service.ts +++ b/services/workflows-service/src/alert/alert-queue.service.ts @@ -1,10 +1,7 @@ import { Inject, Injectable, OnModuleInit } from '@nestjs/common'; -import { Job } from 'bullmq'; import { AppLoggerService } from '@/common/app-logger/app-logger.service'; import { AlertService } from './alert.service'; -import { QueueService } from '@/common/queue/queue.service'; -import { QueueBullboardService } from '@/common/queue/queue-bullboard.service'; -import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.service'; +import type { IQueueService } from '@/common/queue/queue.interface'; import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types'; import type { BullBoardInjectedInstance } from '@/common/queue/types'; import { env } from '@/env'; @@ -19,11 +16,9 @@ export class AlertQueueService implements OnModuleInit { private readonly SCHEDULER_ID = 'transaction-monitoring-alert-check'; constructor( - private readonly logger: AppLoggerService, - private readonly alertService: AlertService, - private readonly queueService: QueueService, - private readonly queueBullboardService: QueueBullboardService, - private readonly bullMQPrometheusService: BullMQPrometheusService, + @Inject(AppLoggerService) private readonly logger: AppLoggerService, + @Inject(AlertService) private readonly alertService: AlertService, + @Inject('IQueueService') private readonly queueService: IQueueService, @Inject(BULLBOARD_INSTANCE_INJECTION_TOKEN) private bullBoard: BullBoardInjectedInstance, ) {} @@ -38,32 +33,16 @@ export class AlertQueueService implements OnModuleInit { private async setupAlertQueue() { try { - const queue = this.queueService.getQueue({ + this.queueService.createQueue(this.QUEUE_NAME, { name: this.QUEUE_NAME, jobOptions: { attempts: 10, - backoff: { - type: 'exponential', - delay: 10_000, - }, + backoff: { type: 'exponential', delay: 10000 }, removeOnComplete: { count: 100, age: 3600 * 24 }, removeOnFail: false, }, }); - await this.queueService.setupJobScheduler(queue, this.SCHEDULER_ID, { - every: 60 * 60 * 1000, - jobName: 'check-transaction-monitoring-alerts', - data: { timestamp: Date.now() }, - jobOptions: { - attempts: 10, - backoff: { - type: 'exponential', - delay: 10_000, - }, - }, - }); - this.registerWorker(); this.logger.log('Alert queue system setup complete'); @@ -73,22 +52,21 @@ export class AlertQueueService implements OnModuleInit { } private registerWorker() { - this.queueService.registerWorker( - this.QUEUE_NAME, - async (job: Job) => { - this.logger.log('Processing transaction monitoring alerts check job', { - jobId: job.id, - }); - - await this.alertService.checkAllAlerts(); + this.queueService.registerWorker(this.QUEUE_NAME, this.processAlertCheckJob.bind(this), { + concurrency: 1, + }); + } - this.logger.log('Completed transaction monitoring alerts check', { - jobId: job.id, - }); + private async processAlertCheckJob(job: any) { + this.logger.log('Processing transaction monitoring alerts check job', { jobId: job.id }); + try { + await this.alertService.checkAllAlerts(); + this.logger.log('Completed transaction monitoring alerts check', { jobId: job.id }); - return { success: true, timestamp: Date.now() }; - }, - { concurrency: 1 }, - ); + return { success: true, timestamp: Date.now() }; + } catch (error) { + this.logger.error('Alert check job failed', { jobId: job.id, error }); + throw error; + } } } diff --git a/services/workflows-service/src/common/queue/queue-bullboard.service.ts b/services/workflows-service/src/common/queue/queue-bullboard.service.ts index abe1a3c5c9..e28844d65b 100644 --- a/services/workflows-service/src/common/queue/queue-bullboard.service.ts +++ b/services/workflows-service/src/common/queue/queue-bullboard.service.ts @@ -22,14 +22,4 @@ export class QueueBullboardService { this.logger.error(`Error registering queue ${queue.name} with BullBoard`, { error }); } } - - registerQueues(bullBoardInstance: any, queues: Queue[]) { - try { - const adapters = queues.map(queue => new BullMQAdapter(queue)); - bullBoardInstance.boardInstance.setQueues(adapters); - this.logger.log(`Registered ${queues.length} queues with BullBoard`); - } catch (error) { - this.logger.error('Error registering queues with BullBoard', { error }); - } - } } diff --git a/services/workflows-service/src/common/queue/queue.module.ts b/services/workflows-service/src/common/queue/queue.module.ts index b093c5f0b3..2fd036dfca 100644 --- a/services/workflows-service/src/common/queue/queue.module.ts +++ b/services/workflows-service/src/common/queue/queue.module.ts @@ -12,6 +12,7 @@ import { ExpressAdapter } from '@bull-board/express'; imports: [MonitoringModule], providers: [ QueueService, + { provide: 'IQueueService', useExisting: QueueService }, QueueBullboardService, QueueOtelService, redisProvider, @@ -28,6 +29,7 @@ import { ExpressAdapter } from '@bull-board/express'; ], exports: [ QueueService, + 'IQueueService', QueueBullboardService, QueueOtelService, redisProvider, diff --git a/services/workflows-service/src/common/queue/queue.service.ts b/services/workflows-service/src/common/queue/queue.service.ts index 32827ea789..c7342557dc 100644 --- a/services/workflows-service/src/common/queue/queue.service.ts +++ b/services/workflows-service/src/common/queue/queue.service.ts @@ -1,5 +1,5 @@ import { Injectable, OnModuleDestroy, Inject } from '@nestjs/common'; -import { Queue, Worker, Job } from 'bullmq'; +import { Queue, Worker } from 'bullmq'; import IORedis from 'ioredis'; import { AppLoggerService } from '@/common/app-logger/app-logger.service'; import { env } from '@/env'; @@ -9,8 +9,6 @@ import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.s import type { BullBoardInjectedInstance } from './types'; import { QueueBullboardService } from './queue-bullboard.service'; -export type JobProcessor = (job: Job) => Promise; - export interface QueueOptions { name: string; concurrency?: number; @@ -62,16 +60,16 @@ export class QueueService implements OnModuleDestroy { return this.shouldProcessJobs; } - getQueue(options: QueueOptions): Queue { + private getQueue(options: QueueOptions): Queue { if (!this.redisClient) { throw new Error('Redis client not initialized'); } if (this.queues.has(options.name)) { - return this.queues.get(options.name) as unknown as Queue; + return this.queues.get(options.name) as unknown as Queue; } - const queue = new Queue(options.name, { + const queue = new Queue(options.name, { connection: this.redisClient as any, defaultJobOptions: { attempts: options.jobOptions?.attempts ?? 3, @@ -98,50 +96,32 @@ export class QueueService implements OnModuleDestroy { return queue; } - registerWorker( + registerWorker( queueName: string, - processor: JobProcessor, - options: { - concurrency?: number; - forceLocalProcessing?: boolean; - } = {}, - ): Worker | null { + processor: (job: any) => Promise, + options: { concurrency?: number } = {}, + ): void { if (!this.redisClient) { throw new Error('Redis client not initialized'); } - if (!this.shouldProcessJobs && !options.forceLocalProcessing) { + if (!this.shouldProcessJobs) { this.logger.debug( `Skipping worker registration for queue ${queueName} (not a worker instance)`, ); - return null; + return; } if (this.workers.has(queueName)) { - return this.workers.get(queueName) as unknown as Worker; + return; } - const worker = new Worker( - queueName, - async job => { - try { - return await processor(job); - } catch (error) { - this.logger.error(`Error processing job ${job.id} in queue ${queueName}`, { - error, - jobId: job.id, - queueName, - }); - throw error; - } - }, - { - connection: this.redisClient as any, - concurrency: options.concurrency ?? 1, - autorun: true, - }, - ); + const worker = new Worker(queueName, processor, { + connection: this.redisClient as any, + concurrency: options.concurrency ?? 1, + autorun: true, + }); worker.on('failed', (job, error) => { this.logger.error(`Job ${job?.id} failed in queue ${queueName}`, { @@ -161,69 +141,34 @@ export class QueueService implements OnModuleDestroy { }); }); - this.workers.set(queueName, worker as unknown as Worker); + this.workers.set(queueName, worker as unknown as Worker); this.logger.log(`Worker registered for queue: ${queueName}`); - - return worker; } - async setupJobScheduler( - queue: Queue, - schedulerId: string, - options: { - every: number; - data?: T; - jobName?: string; - jobOptions?: { - attempts?: number; - backoff?: { - type: 'exponential' | 'fixed'; - delay: number; - }; - }; - }, - ) { - try { - const schedulers = await queue.getJobSchedulers(); - const existingScheduler = schedulers.find(s => s.id === schedulerId); - - if (existingScheduler) { - this.logger.log(`Job scheduler already exists: ${schedulerId}`, { - schedulerId, - pattern: existingScheduler.pattern, - every: existingScheduler.every, - }); - - return existingScheduler; - } - - const jobName = options.jobName || 'scheduled-job'; - const firstJob = await queue.upsertJobScheduler( - schedulerId, - { every: options.every, jobId: schedulerId }, - { - name: jobName, - data: options.data || { timestamp: Date.now() }, - opts: { - attempts: options.jobOptions?.attempts || 10, - backoff: options.jobOptions?.backoff || { - type: 'exponential', - delay: 10000, - }, - }, - }, - ); + createQueue(queueName: string, options?: QueueOptions): void { + if (this.queues.has(queueName)) { + // Optionally update options if needed + return; + } - this.logger.log(`Created job scheduler: ${schedulerId}`, { - schedulerId, - every: options.every, - firstJobId: firstJob?.id, - }); + const queue = new Queue(queueName, { + connection: this.redisClient as any, + defaultJobOptions: options?.jobOptions ?? { + attempts: 3, + backoff: { type: 'exponential', delay: 5000 }, + removeOnComplete: { count: 100, age: 3600 * 24 }, + removeOnFail: false, + }, + }); + this.queues.set(queueName, queue as Queue); + this.logger.log(`Queue created: ${queueName}`); + + if (this.bullMQPrometheusService) { + this.bullMQPrometheusService.registerQueue(queue); + } - return firstJob; - } catch (error) { - this.logger.error(`Failed to set up job scheduler: ${schedulerId}`, { error }); - throw error; + if (this.shouldProcessJobs && this.bullBoard && this.queueBullboardService) { + this.queueBullboardService.registerQueue(this.bullBoard, queue); } } diff --git a/services/workflows-service/src/common/queue/types.ts b/services/workflows-service/src/common/queue/types.ts index 81b9b76880..b8f17f12f0 100644 --- a/services/workflows-service/src/common/queue/types.ts +++ b/services/workflows-service/src/common/queue/types.ts @@ -7,14 +7,3 @@ export interface BullBoardInjectedInstance { boardInstance: ReturnType; serverAdapter: ExpressAdapter; } - -export const bullBoardProvider = { - provide: BULLBOARD_INSTANCE_INJECTION_TOKEN, - useFactory: (): BullBoardInjectedInstance => { - const serverAdapter = new ExpressAdapter(); - serverAdapter.setBasePath('/api/queues'); - const boardInstance = createBullBoard({ queues: [], serverAdapter }); - - return { boardInstance, serverAdapter }; - }, -}; diff --git a/services/workflows-service/src/webhooks/webhooks.service.ts b/services/workflows-service/src/webhooks/webhooks.service.ts index c5e45e9e46..7c71d3641e 100644 --- a/services/workflows-service/src/webhooks/webhooks.service.ts +++ b/services/workflows-service/src/webhooks/webhooks.service.ts @@ -6,17 +6,11 @@ import { isAxiosError, RawAxiosRequestHeaders } from 'axios'; import { Job } from 'bullmq'; import { AppLoggerService } from '@/common/app-logger/app-logger.service'; -import { QueueService } from '@/common/queue/queue.service'; -import { QueueBullboardService } from '@/common/queue/queue-bullboard.service'; import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types'; import type { BullBoardInjectedInstance } from '@/common/queue/types'; -import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.service'; import { env } from '@/env'; -import { - WebhookError, - type OutgoingWebhookJobData, - type OutgoingWebhookPayloads, -} from './types/webhook'; +import { type OutgoingWebhookJobData, type OutgoingWebhookPayloads } from './types/webhook'; +import type { IQueueService } from '@/common/queue/queue.interface'; const captureWebhookFailureWithSentry = (errorPayload: Record) => { Sentry.captureException( @@ -35,9 +29,7 @@ export class WebhooksService implements OnModuleInit { constructor( private readonly logger: AppLoggerService, private readonly httpService: HttpService, - private readonly queueService: QueueService, - private readonly queueBullboardService: QueueBullboardService, - private readonly bullMQPrometheusService: BullMQPrometheusService, + @Inject('IQueueService') private readonly queueService: IQueueService, @Inject(BULLBOARD_INSTANCE_INJECTION_TOKEN) private bullBoard: BullBoardInjectedInstance, ) { @@ -61,22 +53,17 @@ export class WebhooksService implements OnModuleInit { private async setupQueueSystem() { try { - const queue = this.queueService.getQueue({ + this.queueService.createQueue(this.QUEUE_NAME, { name: this.QUEUE_NAME, jobOptions: { attempts: 5, - backoff: { - type: 'exponential', - delay: 5_000, - }, + backoff: { type: 'exponential', delay: 5000 }, removeOnComplete: { count: 1000, age: 3600 * 24 * 7 }, removeOnFail: false, }, }); - if (this.queueService.isWorkerEnabled()) { - this.registerWorker(); - } + this.registerWorker(); this.queueInitialized = true; this.logger.log('Webhook queue system setup complete'); @@ -89,32 +76,22 @@ export class WebhooksService implements OnModuleInit { private registerWorker() { this.queueService.registerWorker( this.QUEUE_NAME, - async (job: Job) => { - try { - const res = await this.httpService.axiosRef.request(job.data); - - return res.data; - } catch (error) { - this.handleWebhookJobError(job, error); - - if (isAxiosError(error)) { - const webhookError = new WebhookError('Webhook request failed'); - webhookError.cause = error; - webhookError.statusCode = error.response?.status; - webhookError.responseData = error.response?.data; - webhookError.headers = error.response?.headers; - throw webhookError; - } - - throw error; - } - }, - { - concurrency: 10, - }, + this.processWebhookJob.bind(this), + { concurrency: 10 }, ); } + private async processWebhookJob(job: Job) { + try { + const res = await this.httpService.axiosRef.request(job.data); + + return res.data; + } catch (error) { + this.handleWebhookJobError(job, error); + throw error; + } + } + private handleWebhookJobError(job: Job, error: any) { const isLastAttempt = job.attemptsMade >= (job.opts.attempts || 1); @@ -195,9 +172,7 @@ export class WebhooksService implements OnModuleInit { if (env.QUEUE_SYSTEM_ENABLED && this.queueInitialized && !forceDirect) { try { - const queue = this.queueService.getQueue({ name: this.QUEUE_NAME }); - - return await queue.add(name, requestData); + return await this.queueService.addJob(this.QUEUE_NAME, requestData); } catch (error) { const enqueueErrorPayload = { message: 'Failed to add webhook job to the queue', From 58cbbeeda9f4facedd15173c42e800be6db68f52 Mon Sep 17 00:00:00 2001 From: Omer Harel Date: Wed, 25 Jun 2025 10:41:48 +0300 Subject: [PATCH 5/9] feat(queue): implement job scheduler in alert queue service - Add job scheduling setup for alert-check jobs with specific intervals - Update relevant imports and refactor the QueueService to utilize RedisService - Enhance IQueueService interface with additional documentation and job-related methods --- pnpm-lock.yaml | 8 +-- .../src/alert/alert-queue.service.ts | 10 +++- .../src/app.worker.module.ts | 5 +- .../src/common/queue/queue.module.ts | 6 +- .../src/common/queue/queue.service.ts | 58 +++++++++++++++---- .../src/common/queue/types.ts | 23 ++++++++ .../src/webhooks/webhooks.service.ts | 19 +++++- 7 files changed, 102 insertions(+), 27 deletions(-) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 97725a8a06..6f597627eb 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -79,10 +79,10 @@ importers: specifier: 0.9.118 version: link:../../packages/common '@ballerine/react-pdf-toolkit': - specifier: ^1.2.130 + specifier: ^1.2.131 version: link:../../packages/react-pdf-toolkit '@ballerine/ui': - specifier: 0.7.169 + specifier: 0.7.171 version: link:../../packages/ui '@ballerine/workflow-browser-sdk': specifier: 0.6.142 @@ -527,7 +527,7 @@ importers: specifier: ^0.9.118 version: link:../../packages/common '@ballerine/ui': - specifier: 0.7.169 + specifier: 0.7.171 version: link:../../packages/ui '@ballerine/workflow-browser-sdk': specifier: 0.6.142 @@ -1525,7 +1525,7 @@ importers: specifier: ^1.1.44 version: link:../config '@ballerine/ui': - specifier: 0.7.169 + specifier: 0.7.171 version: link:../ui '@react-pdf/renderer': specifier: ^3.1.14 diff --git a/services/workflows-service/src/alert/alert-queue.service.ts b/services/workflows-service/src/alert/alert-queue.service.ts index 6782d56bc8..a1498b7346 100644 --- a/services/workflows-service/src/alert/alert-queue.service.ts +++ b/services/workflows-service/src/alert/alert-queue.service.ts @@ -1,9 +1,8 @@ import { Inject, Injectable, OnModuleInit } from '@nestjs/common'; import { AppLoggerService } from '@/common/app-logger/app-logger.service'; import { AlertService } from './alert.service'; -import type { IQueueService } from '@/common/queue/queue.interface'; +import type { IQueueService, BullBoardInjectedInstance } from '@/common/queue/types'; import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types'; -import type { BullBoardInjectedInstance } from '@/common/queue/types'; import { env } from '@/env'; export interface AlertCheckJobData extends Record { @@ -43,6 +42,13 @@ export class AlertQueueService implements OnModuleInit { }, }); + const queue = (this.queueService as any).getQueue({ name: this.QUEUE_NAME }); + await (this.queueService as any).setupJobScheduler(queue, this.SCHEDULER_ID, { + every: 60 * 60 * 1000, + jobName: 'alert-check', + data: { timestamp: Date.now() }, + }); + this.registerWorker(); this.logger.log('Alert queue system setup complete'); diff --git a/services/workflows-service/src/app.worker.module.ts b/services/workflows-service/src/app.worker.module.ts index 9d026d3c83..863592d9e5 100644 --- a/services/workflows-service/src/app.worker.module.ts +++ b/services/workflows-service/src/app.worker.module.ts @@ -1,11 +1,10 @@ -import { MiddlewareConsumer, Module } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { ClsModule } from 'nestjs-cls'; import { AnalyticsModule } from '@/common/analytics-logger/analytics.module'; import { AppLoggerModule } from '@/common/app-logger/app-logger.module'; import { QueueModule } from '@/common/queue/queue.module'; -import { configs, env } from '@/env'; +import { configs } from '@/env'; import { validate } from '@/env-validate'; import { SecretsManagerModule } from '@/secrets-manager/secrets-manager.module'; import { SentryModule } from '@/sentry/sentry.module'; @@ -13,7 +12,7 @@ import { WebhooksModule } from '@/webhooks/webhooks.module'; import { HealthModule } from './health/health.module'; import { PrismaModule } from './prisma/prisma.module'; import { AlertModule } from './alert/alert.module'; -import { MetricsAuthMiddleware } from './common/middlewares/metrics-auth.middleware'; +import { Module } from '@nestjs/common'; @Module({ imports: [ diff --git a/services/workflows-service/src/common/queue/queue.module.ts b/services/workflows-service/src/common/queue/queue.module.ts index 2fd036dfca..a8196c25d1 100644 --- a/services/workflows-service/src/common/queue/queue.module.ts +++ b/services/workflows-service/src/common/queue/queue.module.ts @@ -2,20 +2,19 @@ import { Module } from '@nestjs/common'; import { QueueService } from './queue.service'; import { QueueBullboardService } from './queue-bullboard.service'; import { QueueOtelService } from './otel.service'; -import { redisProvider } from './redis.provider'; import { MonitoringModule } from '@/common/monitoring/monitoring.module'; import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from './types'; import { createBullBoard } from '@bull-board/api'; import { ExpressAdapter } from '@bull-board/express'; +import { RedisModule } from '../redis/redis.module'; @Module({ - imports: [MonitoringModule], + imports: [MonitoringModule, RedisModule], providers: [ QueueService, { provide: 'IQueueService', useExisting: QueueService }, QueueBullboardService, QueueOtelService, - redisProvider, { provide: BULLBOARD_INSTANCE_INJECTION_TOKEN, useFactory: () => { @@ -32,7 +31,6 @@ import { ExpressAdapter } from '@bull-board/express'; 'IQueueService', QueueBullboardService, QueueOtelService, - redisProvider, BULLBOARD_INSTANCE_INJECTION_TOKEN, ], }) diff --git a/services/workflows-service/src/common/queue/queue.service.ts b/services/workflows-service/src/common/queue/queue.service.ts index c7342557dc..52e48098e6 100644 --- a/services/workflows-service/src/common/queue/queue.service.ts +++ b/services/workflows-service/src/common/queue/queue.service.ts @@ -3,8 +3,7 @@ import { Queue, Worker } from 'bullmq'; import IORedis from 'ioredis'; import { AppLoggerService } from '@/common/app-logger/app-logger.service'; import { env } from '@/env'; -import { QueueOtelService } from './otel.service'; -import { REDIS_CLIENT } from './redis.provider'; +import { RedisService } from '../redis/redis.service'; import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.service'; import type { BullBoardInjectedInstance } from './types'; import { QueueBullboardService } from './queue-bullboard.service'; @@ -33,15 +32,14 @@ export class QueueService implements OnModuleDestroy { constructor( private readonly logger: AppLoggerService, - private readonly queueOtelService: QueueOtelService, - @Inject(REDIS_CLIENT) redisClient: IORedis | null, + private readonly redisService: RedisService, private readonly bullMQPrometheusService: BullMQPrometheusService, @Inject('BULLBOARD_INSTANCE') private readonly bullBoard?: BullBoardInjectedInstance, private readonly queueBullboardService?: QueueBullboardService, ) { this.shouldProcessJobs = this.determineIfShouldProcessJobs(); this.logger.log(`Queue worker mode: ${this.shouldProcessJobs ? 'ENABLED' : 'DISABLED'}`); - this.redisClient = redisClient; + this.redisClient = this.redisService.client; } private determineIfShouldProcessJobs(): boolean { @@ -147,7 +145,6 @@ export class QueueService implements OnModuleDestroy { createQueue(queueName: string, options?: QueueOptions): void { if (this.queues.has(queueName)) { - // Optionally update options if needed return; } @@ -182,12 +179,51 @@ export class QueueService implements OnModuleDestroy { ); await Promise.all([...workerClosePromises, ...queueClosePromises]); + } + + async setupJobScheduler( + queue: Queue, + schedulerId: string, + options: { + every: number; + data?: T; + jobName?: string; + jobOptions?: { + attempts?: number; + backoff?: { + type: 'exponential' | 'fixed'; + delay: number; + }; + }; + }, + ) { + try { + const jobName = options.jobName || 'scheduled-job'; + const firstJob = await queue.upsertJobScheduler( + schedulerId, + { every: options.every, jobId: schedulerId }, + { + name: jobName, + data: options.data || { timestamp: Date.now() }, + opts: { + attempts: options.jobOptions?.attempts || 10, + backoff: options.jobOptions?.backoff || { + type: 'exponential', + delay: 10000, + }, + }, + }, + ); + this.logger.log(`Created job scheduler: ${schedulerId}`, { + schedulerId, + every: options.every, + jobId: firstJob?.id, + }); - if (this.redisClient) { - await this.redisClient - .quit() - .catch(err => this.logger.error(`Error closing Redis connection`, { err })); - this.redisClient = null; + return firstJob; + } catch (error) { + this.logger.error(`Failed to set up job scheduler: ${schedulerId}`, { error }); + throw error; } } } diff --git a/services/workflows-service/src/common/queue/types.ts b/services/workflows-service/src/common/queue/types.ts index b8f17f12f0..a7d4b6c256 100644 --- a/services/workflows-service/src/common/queue/types.ts +++ b/services/workflows-service/src/common/queue/types.ts @@ -1,5 +1,6 @@ import { createBullBoard } from '@bull-board/api'; import { ExpressAdapter } from '@bull-board/express'; +import type { QueueOptions } from './queue.service'; export const BULLBOARD_INSTANCE_INJECTION_TOKEN = 'BULLBOARD_INSTANCE'; @@ -7,3 +8,25 @@ export interface BullBoardInjectedInstance { boardInstance: ReturnType; serverAdapter: ExpressAdapter; } +export interface IQueueService { + /** + * Add a job to the queue. + */ + addJob(queueName: string, data: T, opts?: any): Promise; + + /** + * Register a worker for a queue. The processor should be a standalone function for testability and abstraction. + */ + registerWorker( + queueName: string, + processor: (job: any) => Promise, + options?: { concurrency?: number }, + ): void; + + isWorkerEnabled(): boolean; + + /** + * Explicitly create/configure a queue with options. + */ + createQueue(queueName: string, options?: QueueOptions): void; +} diff --git a/services/workflows-service/src/webhooks/webhooks.service.ts b/services/workflows-service/src/webhooks/webhooks.service.ts index 7c71d3641e..a07ef0477d 100644 --- a/services/workflows-service/src/webhooks/webhooks.service.ts +++ b/services/workflows-service/src/webhooks/webhooks.service.ts @@ -7,10 +7,13 @@ import { Job } from 'bullmq'; import { AppLoggerService } from '@/common/app-logger/app-logger.service'; import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types'; -import type { BullBoardInjectedInstance } from '@/common/queue/types'; +import type { BullBoardInjectedInstance, IQueueService } from '@/common/queue/types'; import { env } from '@/env'; -import { type OutgoingWebhookJobData, type OutgoingWebhookPayloads } from './types/webhook'; -import type { IQueueService } from '@/common/queue/queue.interface'; +import { + WebhookError, + type OutgoingWebhookJobData, + type OutgoingWebhookPayloads, +} from './types/webhook'; const captureWebhookFailureWithSentry = (errorPayload: Record) => { Sentry.captureException( @@ -88,6 +91,16 @@ export class WebhooksService implements OnModuleInit { return res.data; } catch (error) { this.handleWebhookJobError(job, error); + + if (isAxiosError(error)) { + const webhookError = new WebhookError('Webhook request failed'); + webhookError.cause = error; + webhookError.statusCode = error.response?.status; + webhookError.responseData = error.response?.data; + webhookError.headers = error.response?.headers; + throw webhookError; + } + throw error; } } From 9d128ef6a3b1f51c4fc80206e7a629f6c59ad255 Mon Sep 17 00:00:00 2001 From: Omer Harel Date: Wed, 25 Jun 2025 12:58:34 +0300 Subject: [PATCH 6/9] refactor(queue): simplify queue configuration and job setup - Remove unnecessary injection of BullBoard instance in AlertQueueService - Update job options to reduce default attempts for better performance - Enhance queue service methods for clearer job scheduling and worker registration --- .../src/alert/alert-queue.service.ts | 27 +++-- .../src/common/queue/queue.module.ts | 8 +- .../src/common/queue/queue.service.ts | 108 ++++++------------ .../src/common/queue/types.ts | 39 +++++-- .../src/common/redis/redis.module.ts | 10 ++ .../src/common/redis/redis.service.ts | 49 ++++++++ .../src/webhooks/webhooks.service.ts | 4 +- 7 files changed, 141 insertions(+), 104 deletions(-) create mode 100644 services/workflows-service/src/common/redis/redis.module.ts create mode 100644 services/workflows-service/src/common/redis/redis.service.ts diff --git a/services/workflows-service/src/alert/alert-queue.service.ts b/services/workflows-service/src/alert/alert-queue.service.ts index a1498b7346..1c3f67f13c 100644 --- a/services/workflows-service/src/alert/alert-queue.service.ts +++ b/services/workflows-service/src/alert/alert-queue.service.ts @@ -1,8 +1,7 @@ import { Inject, Injectable, OnModuleInit } from '@nestjs/common'; import { AppLoggerService } from '@/common/app-logger/app-logger.service'; import { AlertService } from './alert.service'; -import type { IQueueService, BullBoardInjectedInstance } from '@/common/queue/types'; -import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types'; +import type { IQueueService } from '@/common/queue/types'; import { env } from '@/env'; export interface AlertCheckJobData extends Record { @@ -15,11 +14,9 @@ export class AlertQueueService implements OnModuleInit { private readonly SCHEDULER_ID = 'transaction-monitoring-alert-check'; constructor( - @Inject(AppLoggerService) private readonly logger: AppLoggerService, - @Inject(AlertService) private readonly alertService: AlertService, + private readonly logger: AppLoggerService, + private readonly alertService: AlertService, @Inject('IQueueService') private readonly queueService: IQueueService, - @Inject(BULLBOARD_INSTANCE_INJECTION_TOKEN) - private bullBoard: BullBoardInjectedInstance, ) {} async onModuleInit() { @@ -35,19 +32,21 @@ export class AlertQueueService implements OnModuleInit { this.queueService.createQueue(this.QUEUE_NAME, { name: this.QUEUE_NAME, jobOptions: { - attempts: 10, + attempts: 3, backoff: { type: 'exponential', delay: 10000 }, removeOnComplete: { count: 100, age: 3600 * 24 }, removeOnFail: false, }, }); - - const queue = (this.queueService as any).getQueue({ name: this.QUEUE_NAME }); - await (this.queueService as any).setupJobScheduler(queue, this.SCHEDULER_ID, { - every: 60 * 60 * 1000, - jobName: 'alert-check', - data: { timestamp: Date.now() }, - }); + await this.queueService.setupJobScheduler( + this.QUEUE_NAME, + this.SCHEDULER_ID, + { every: 60 * 60 * 1000 }, + { + name: 'alert-check', + data: { timestamp: Date.now() }, + }, + ); this.registerWorker(); diff --git a/services/workflows-service/src/common/queue/queue.module.ts b/services/workflows-service/src/common/queue/queue.module.ts index a8196c25d1..89fdebdcec 100644 --- a/services/workflows-service/src/common/queue/queue.module.ts +++ b/services/workflows-service/src/common/queue/queue.module.ts @@ -1,5 +1,5 @@ import { Module } from '@nestjs/common'; -import { QueueService } from './queue.service'; +import { BullMQQueueService } from './queue.service'; import { QueueBullboardService } from './queue-bullboard.service'; import { QueueOtelService } from './otel.service'; import { MonitoringModule } from '@/common/monitoring/monitoring.module'; @@ -11,8 +11,8 @@ import { RedisModule } from '../redis/redis.module'; @Module({ imports: [MonitoringModule, RedisModule], providers: [ - QueueService, - { provide: 'IQueueService', useExisting: QueueService }, + BullMQQueueService, + { provide: 'IQueueService', useExisting: BullMQQueueService }, QueueBullboardService, QueueOtelService, { @@ -27,7 +27,7 @@ import { RedisModule } from '../redis/redis.module'; }, ], exports: [ - QueueService, + BullMQQueueService, 'IQueueService', QueueBullboardService, QueueOtelService, diff --git a/services/workflows-service/src/common/queue/queue.service.ts b/services/workflows-service/src/common/queue/queue.service.ts index 52e48098e6..4234dfbf00 100644 --- a/services/workflows-service/src/common/queue/queue.service.ts +++ b/services/workflows-service/src/common/queue/queue.service.ts @@ -5,31 +5,24 @@ import { AppLoggerService } from '@/common/app-logger/app-logger.service'; import { env } from '@/env'; import { RedisService } from '../redis/redis.service'; import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.service'; -import type { BullBoardInjectedInstance } from './types'; +import type { BullBoardInjectedInstance, IQueueService, QueueOptions } from './types'; import { QueueBullboardService } from './queue-bullboard.service'; -export interface QueueOptions { - name: string; - concurrency?: number; - jobOptions?: { - attempts?: number; - backoff?: { - type: 'exponential' | 'fixed'; - delay: number; - }; - removeOnComplete?: boolean | number | { count: number; age: number }; - removeOnFail?: boolean | number | { count: number; age: number }; - priority?: number; - }; -} - @Injectable() -export class QueueService implements OnModuleDestroy { +export class BullMQQueueService implements OnModuleDestroy, IQueueService { private redisClient: IORedis | null; private queues: Map = new Map(); private workers: Map = new Map(); private readonly shouldProcessJobs: boolean; + async addJob(queueName: string, job_name: string, data: T, opts?: any): Promise { + const queue = this.getQueue(queueName); + + return queue.add(job_name, data, { + priority: opts?.priority, + }); + } + constructor( private readonly logger: AppLoggerService, private readonly redisService: RedisService, @@ -58,40 +51,16 @@ export class QueueService implements OnModuleDestroy { return this.shouldProcessJobs; } - private getQueue(options: QueueOptions): Queue { + public getQueue(queueName: string): Queue { if (!this.redisClient) { throw new Error('Redis client not initialized'); } - if (this.queues.has(options.name)) { - return this.queues.get(options.name) as unknown as Queue; - } - - const queue = new Queue(options.name, { - connection: this.redisClient as any, - defaultJobOptions: { - attempts: options.jobOptions?.attempts ?? 3, - backoff: options.jobOptions?.backoff ?? { - type: options.jobOptions?.backoff?.type ?? 'exponential', - delay: options.jobOptions?.backoff?.delay ?? 5000, - }, - removeOnComplete: options.jobOptions?.removeOnComplete ?? { count: 100, age: 3600 * 24 }, - removeOnFail: options.jobOptions?.removeOnFail ?? false, - }, - }); - - this.queues.set(options.name, queue as Queue); - this.logger.log(`Queue created: ${options.name}`); - - if (this.bullMQPrometheusService) { - this.bullMQPrometheusService.registerQueue(queue); - } - - if (this.shouldProcessJobs && this.bullBoard && this.queueBullboardService) { - this.queueBullboardService.registerQueue(this.bullBoard, queue); + if (this.queues.has(queueName)) { + return this.queues.get(queueName) as Queue; } - return queue; + throw new Error(`Queue with name '${queueName}' does not exist. Please create it first.`); } registerWorker( @@ -115,8 +84,8 @@ export class QueueService implements OnModuleDestroy { return; } - const worker = new Worker(queueName, processor, { - connection: this.redisClient as any, + const worker = new Worker(queueName, processor, { + connection: this.redisClient, concurrency: options.concurrency ?? 1, autorun: true, }); @@ -139,17 +108,17 @@ export class QueueService implements OnModuleDestroy { }); }); - this.workers.set(queueName, worker as unknown as Worker); + this.workers.set(queueName, worker); this.logger.log(`Worker registered for queue: ${queueName}`); } - createQueue(queueName: string, options?: QueueOptions): void { + createQueue(queueName: string, options?: QueueOptions): void { if (this.queues.has(queueName)) { return; } - const queue = new Queue(queueName, { - connection: this.redisClient as any, + const queue = new Queue(queueName, { + connection: this.redisClient as IORedis, defaultJobOptions: options?.jobOptions ?? { attempts: 3, backoff: { type: 'exponential', delay: 5000 }, @@ -157,7 +126,7 @@ export class QueueService implements OnModuleDestroy { removeOnFail: false, }, }); - this.queues.set(queueName, queue as Queue); + this.queues.set(queueName, queue); this.logger.log(`Queue created: ${queueName}`); if (this.bullMQPrometheusService) { @@ -182,41 +151,36 @@ export class QueueService implements OnModuleDestroy { } async setupJobScheduler( - queue: Queue, + queueName: string, schedulerId: string, - options: { - every: number; - data?: T; - jobName?: string; - jobOptions?: { - attempts?: number; - backoff?: { - type: 'exponential' | 'fixed'; - delay: number; - }; - }; + scheduleOpts: { every: number }, + jobOpts: { + name: string; + data: T; + opts?: any; }, - ) { + ): Promise { try { - const jobName = options.jobName || 'scheduled-job'; + const queue = this.getQueue(queueName); + const jobName = jobOpts.name; const firstJob = await queue.upsertJobScheduler( schedulerId, - { every: options.every, jobId: schedulerId }, + { every: scheduleOpts.every, jobId: schedulerId }, { name: jobName, - data: options.data || { timestamp: Date.now() }, + data: jobOpts.data || { timestamp: Date.now() }, opts: { - attempts: options.jobOptions?.attempts || 10, - backoff: options.jobOptions?.backoff || { + attempts: jobOpts.opts?.attempts || 3, + backoff: jobOpts.opts?.backoff || { type: 'exponential', - delay: 10000, + delay: 3000, }, }, }, ); this.logger.log(`Created job scheduler: ${schedulerId}`, { schedulerId, - every: options.every, + every: scheduleOpts.every, jobId: firstJob?.id, }); diff --git a/services/workflows-service/src/common/queue/types.ts b/services/workflows-service/src/common/queue/types.ts index a7d4b6c256..26c16471d9 100644 --- a/services/workflows-service/src/common/queue/types.ts +++ b/services/workflows-service/src/common/queue/types.ts @@ -1,6 +1,5 @@ import { createBullBoard } from '@bull-board/api'; import { ExpressAdapter } from '@bull-board/express'; -import type { QueueOptions } from './queue.service'; export const BULLBOARD_INSTANCE_INJECTION_TOKEN = 'BULLBOARD_INSTANCE'; @@ -8,15 +7,10 @@ export interface BullBoardInjectedInstance { boardInstance: ReturnType; serverAdapter: ExpressAdapter; } + export interface IQueueService { - /** - * Add a job to the queue. - */ - addJob(queueName: string, data: T, opts?: any): Promise; + addJob(queueName: string, job_name: string, data: T, opts?: any): Promise; - /** - * Register a worker for a queue. The processor should be a standalone function for testability and abstraction. - */ registerWorker( queueName: string, processor: (job: any) => Promise, @@ -24,9 +18,30 @@ export interface IQueueService { ): void; isWorkerEnabled(): boolean; - - /** - * Explicitly create/configure a queue with options. - */ createQueue(queueName: string, options?: QueueOptions): void; + setupJobScheduler( + queueName: string, + schedulerId: string, + scheduleOpts: { every: number }, + jobOpts: { + name: string; + data: T; + opts?: any; + }, + ): Promise; +} + +export interface QueueOptions { + name: string; + concurrency?: number; + jobOptions?: { + attempts?: number; + backoff?: { + type: 'exponential' | 'fixed'; + delay: number; + }; + removeOnComplete?: boolean | number | { count: number; age: number }; + removeOnFail?: boolean | number | { count: number; age: number }; + priority?: number; + }; } diff --git a/services/workflows-service/src/common/redis/redis.module.ts b/services/workflows-service/src/common/redis/redis.module.ts new file mode 100644 index 0000000000..2d3d7608df --- /dev/null +++ b/services/workflows-service/src/common/redis/redis.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; +import { RedisService, redisProvider } from './redis.service'; +import { AppLoggerModule } from '@/common/app-logger/app-logger.module'; + +@Module({ + imports: [AppLoggerModule], + providers: [RedisService, redisProvider], + exports: [RedisService, redisProvider], +}) +export class RedisModule {} diff --git a/services/workflows-service/src/common/redis/redis.service.ts b/services/workflows-service/src/common/redis/redis.service.ts new file mode 100644 index 0000000000..b69631ae95 --- /dev/null +++ b/services/workflows-service/src/common/redis/redis.service.ts @@ -0,0 +1,49 @@ +import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import IORedis from 'ioredis'; +import { env } from '@/env'; +import { AppLoggerService } from '@/common/app-logger/app-logger.service'; + +export const REDIS_CLIENT = Symbol('REDIS_CLIENT'); + +@Injectable() +export class RedisService implements OnModuleDestroy { + public readonly client: IORedis; + + constructor(private readonly logger: AppLoggerService) { + if (!env.QUEUE_SYSTEM_ENABLED) { + this.client = null as any; + + return; + } + + const redisConfig = { + host: env.REDIS_HOST, + port: env.REDIS_PORT, + password: env.REDIS_PASSWORD, + maxRetriesPerRequest: null, + ...(env.ENVIRONMENT_NAME !== 'local' ? { tls: {} } : {}), + }; + this.client = new IORedis(redisConfig); + + this.client.on('error', error => { + this.logger.error('Redis connection error', { error }); + }); + + this.client.on('connect', () => { + this.logger.log('Redis connected successfully'); + }); + + this.logger.log('Redis client initialized via RedisService.'); + } + + async onModuleDestroy() { + if (this.client) { + await this.client.quit(); + } + } +} + +export const redisProvider = { + provide: REDIS_CLIENT, + useExisting: RedisService, +}; diff --git a/services/workflows-service/src/webhooks/webhooks.service.ts b/services/workflows-service/src/webhooks/webhooks.service.ts index a07ef0477d..915eca9cec 100644 --- a/services/workflows-service/src/webhooks/webhooks.service.ts +++ b/services/workflows-service/src/webhooks/webhooks.service.ts @@ -59,7 +59,7 @@ export class WebhooksService implements OnModuleInit { this.queueService.createQueue(this.QUEUE_NAME, { name: this.QUEUE_NAME, jobOptions: { - attempts: 5, + attempts: 3, backoff: { type: 'exponential', delay: 5000 }, removeOnComplete: { count: 1000, age: 3600 * 24 * 7 }, removeOnFail: false, @@ -185,7 +185,7 @@ export class WebhooksService implements OnModuleInit { if (env.QUEUE_SYSTEM_ENABLED && this.queueInitialized && !forceDirect) { try { - return await this.queueService.addJob(this.QUEUE_NAME, requestData); + return await this.queueService.addJob(this.QUEUE_NAME, name, requestData); } catch (error) { const enqueueErrorPayload = { message: 'Failed to add webhook job to the queue', From f76dddf1286b22d85ec6741c8b3b2258431e0b34 Mon Sep 17 00:00:00 2001 From: Omer Harel Date: Thu, 26 Jun 2025 11:43:54 +0300 Subject: [PATCH 7/9] fix(webhook): correct forceDirect logic in webhook callers - Update forceDirect assignment to explicitly check for enabled state - Ensure consistent behavior for document and workflow state changes --- .../src/events/document-changed-webhook-caller.ts | 2 +- .../src/events/workflow-state-changed-webhook-caller.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/workflows-service/src/events/document-changed-webhook-caller.ts b/services/workflows-service/src/events/document-changed-webhook-caller.ts index 4ca20a4a76..487a21da4d 100644 --- a/services/workflows-service/src/events/document-changed-webhook-caller.ts +++ b/services/workflows-service/src/events/document-changed-webhook-caller.ts @@ -134,7 +134,7 @@ export class DocumentChangedWebhookCaller { oldDocuments, webhook, webhookSharedSecret, - forceDirect: !customer.features?.WEBHOOK_QUEUE_SYSTEM_ENABLED?.enabled, + forceDirect: customer.features?.WEBHOOK_QUEUE_SYSTEM_ENABLED?.enabled !== true, }); } } diff --git a/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts b/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts index 601192f0cd..7c37592c76 100644 --- a/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts +++ b/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts @@ -51,7 +51,7 @@ export class WorkflowStateChangedWebhookCaller { data, webhook, webhookSharedSecret, - forceDirect: !customer.features?.WEBHOOK_QUEUE_SYSTEM_ENABLED?.enabled, + forceDirect: customer.features?.WEBHOOK_QUEUE_SYSTEM_ENABLED?.enabled !== true, }); } } From 91380117f11f7d851abcc7493f4e3bf2880fc766 Mon Sep 17 00:00:00 2001 From: Omer Harel Date: Thu, 26 Jun 2025 17:06:27 +0300 Subject: [PATCH 8/9] add feautres --- .../src/events/document-changed-webhook-caller.ts | 1 + .../src/events/workflow-completed-webhook-caller.ts | 1 + .../src/events/workflow-state-changed-webhook-caller.ts | 1 + 3 files changed, 3 insertions(+) diff --git a/services/workflows-service/src/events/document-changed-webhook-caller.ts b/services/workflows-service/src/events/document-changed-webhook-caller.ts index 487a21da4d..4b4e87e23f 100644 --- a/services/workflows-service/src/events/document-changed-webhook-caller.ts +++ b/services/workflows-service/src/events/document-changed-webhook-caller.ts @@ -84,6 +84,7 @@ export class DocumentChangedWebhookCaller { select: { authenticationConfiguration: true, subscriptions: true, + features: true, }, }); diff --git a/services/workflows-service/src/events/workflow-completed-webhook-caller.ts b/services/workflows-service/src/events/workflow-completed-webhook-caller.ts index 53d31bd689..20f214e835 100644 --- a/services/workflows-service/src/events/workflow-completed-webhook-caller.ts +++ b/services/workflows-service/src/events/workflow-completed-webhook-caller.ts @@ -41,6 +41,7 @@ export class WorkflowCompletedWebhookCaller { select: { authenticationConfiguration: true, subscriptions: true, + features: true, }, }); diff --git a/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts b/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts index 7c37592c76..0fd1d756cd 100644 --- a/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts +++ b/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts @@ -33,6 +33,7 @@ export class WorkflowStateChangedWebhookCaller { select: { authenticationConfiguration: true, subscriptions: true, + features: true, }, }); From 89b2632b550a2c526e58b20b285f11f5441e5c36 Mon Sep 17 00:00:00 2001 From: Omer Harel Date: Sun, 29 Jun 2025 14:30:50 +0300 Subject: [PATCH 9/9] refactor(queue): streamline job options and improve queue management - Remove deprecated job option settings for consistency - Introduce default job options in queue creation function - Add validation to ensure valid queue names are provided --- .../src/alert/alert-queue.service.ts | 15 +++---- .../src/common/queue/queue.service.ts | 42 ++++++++++++------- .../src/common/queue/types.ts | 10 ++--- .../src/common/redis/redis.service.ts | 8 +++- .../src/webhooks/webhooks.service.ts | 18 ++------ 5 files changed, 47 insertions(+), 46 deletions(-) diff --git a/services/workflows-service/src/alert/alert-queue.service.ts b/services/workflows-service/src/alert/alert-queue.service.ts index 1c3f67f13c..441f26bd64 100644 --- a/services/workflows-service/src/alert/alert-queue.service.ts +++ b/services/workflows-service/src/alert/alert-queue.service.ts @@ -29,15 +29,6 @@ export class AlertQueueService implements OnModuleInit { private async setupAlertQueue() { try { - this.queueService.createQueue(this.QUEUE_NAME, { - name: this.QUEUE_NAME, - jobOptions: { - attempts: 3, - backoff: { type: 'exponential', delay: 10000 }, - removeOnComplete: { count: 100, age: 3600 * 24 }, - removeOnFail: false, - }, - }); await this.queueService.setupJobScheduler( this.QUEUE_NAME, this.SCHEDULER_ID, @@ -46,6 +37,12 @@ export class AlertQueueService implements OnModuleInit { name: 'alert-check', data: { timestamp: Date.now() }, }, + { + jobOptions: { + attempts: 2, + backoff: { type: 'exponential', delay: 10000 }, + }, + }, ); this.registerWorker(); diff --git a/services/workflows-service/src/common/queue/queue.service.ts b/services/workflows-service/src/common/queue/queue.service.ts index 4234dfbf00..bb8d273660 100644 --- a/services/workflows-service/src/common/queue/queue.service.ts +++ b/services/workflows-service/src/common/queue/queue.service.ts @@ -8,6 +8,13 @@ import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.s import type { BullBoardInjectedInstance, IQueueService, QueueOptions } from './types'; import { QueueBullboardService } from './queue-bullboard.service'; +const defaultJobOptions = { + attempts: 3, + backoff: { type: 'exponential', delay: 2000 }, + removeOnComplete: { count: 100, age: 3600 * 24 * 7 }, + removeOnFail: false, +}; + @Injectable() export class BullMQQueueService implements OnModuleDestroy, IQueueService { private redisClient: IORedis | null; @@ -51,6 +58,12 @@ export class BullMQQueueService implements OnModuleDestroy, IQueueService { return this.shouldProcessJobs; } + private validateQueueName(queueName: string): void { + if (!queueName || typeof queueName !== 'string' || queueName.trim().length === 0) { + throw new Error('Queue name must be a non-empty string'); + } + } + public getQueue(queueName: string): Queue { if (!this.redisClient) { throw new Error('Redis client not initialized'); @@ -63,11 +76,13 @@ export class BullMQQueueService implements OnModuleDestroy, IQueueService { throw new Error(`Queue with name '${queueName}' does not exist. Please create it first.`); } - registerWorker( + registerWorker( queueName: string, processor: (job: any) => Promise, options: { concurrency?: number } = {}, ): void { + this.validateQueueName(queueName); + if (!this.redisClient) { throw new Error('Redis client not initialized'); } @@ -113,18 +128,16 @@ export class BullMQQueueService implements OnModuleDestroy, IQueueService { } createQueue(queueName: string, options?: QueueOptions): void { + this.validateQueueName(queueName); + if (this.queues.has(queueName)) { return; } + const mergedJobOptions = { ...defaultJobOptions, ...(options?.jobOptions || {}) }; const queue = new Queue(queueName, { connection: this.redisClient as IORedis, - defaultJobOptions: options?.jobOptions ?? { - attempts: 3, - backoff: { type: 'exponential', delay: 5000 }, - removeOnComplete: { count: 100, age: 3600 * 24 }, - removeOnFail: false, - }, + defaultJobOptions: mergedJobOptions, }); this.queues.set(queueName, queue); this.logger.log(`Queue created: ${queueName}`); @@ -157,11 +170,17 @@ export class BullMQQueueService implements OnModuleDestroy, IQueueService { jobOpts: { name: string; data: T; - opts?: any; }, + queueOptions?: QueueOptions, ): Promise { + this.validateQueueName(queueName); try { + if (!this.queues.has(queueName)) { + this.createQueue(queueName, queueOptions); + } + const queue = this.getQueue(queueName); + const jobName = jobOpts.name; const firstJob = await queue.upsertJobScheduler( schedulerId, @@ -169,13 +188,6 @@ export class BullMQQueueService implements OnModuleDestroy, IQueueService { { name: jobName, data: jobOpts.data || { timestamp: Date.now() }, - opts: { - attempts: jobOpts.opts?.attempts || 3, - backoff: jobOpts.opts?.backoff || { - type: 'exponential', - delay: 3000, - }, - }, }, ); this.logger.log(`Created job scheduler: ${schedulerId}`, { diff --git a/services/workflows-service/src/common/queue/types.ts b/services/workflows-service/src/common/queue/types.ts index 26c16471d9..9b80c2d9a0 100644 --- a/services/workflows-service/src/common/queue/types.ts +++ b/services/workflows-service/src/common/queue/types.ts @@ -11,14 +11,14 @@ export interface BullBoardInjectedInstance { export interface IQueueService { addJob(queueName: string, job_name: string, data: T, opts?: any): Promise; - registerWorker( + registerWorker( queueName: string, processor: (job: any) => Promise, options?: { concurrency?: number }, ): void; isWorkerEnabled(): boolean; - createQueue(queueName: string, options?: QueueOptions): void; + createQueue(queueName: string, options?: QueueOptions): void; setupJobScheduler( queueName: string, schedulerId: string, @@ -26,14 +26,12 @@ export interface IQueueService { jobOpts: { name: string; data: T; - opts?: any; }, + queueOptions?: QueueOptions, ): Promise; } -export interface QueueOptions { - name: string; - concurrency?: number; +export interface QueueOptions { jobOptions?: { attempts?: number; backoff?: { diff --git a/services/workflows-service/src/common/redis/redis.service.ts b/services/workflows-service/src/common/redis/redis.service.ts index b69631ae95..4792b368e5 100644 --- a/services/workflows-service/src/common/redis/redis.service.ts +++ b/services/workflows-service/src/common/redis/redis.service.ts @@ -7,11 +7,15 @@ export const REDIS_CLIENT = Symbol('REDIS_CLIENT'); @Injectable() export class RedisService implements OnModuleDestroy { - public readonly client: IORedis; + public readonly client!: IORedis; constructor(private readonly logger: AppLoggerService) { if (!env.QUEUE_SYSTEM_ENABLED) { - this.client = null as any; + Object.defineProperty(this, 'client', { + get: () => { + this.logger.warn('Redis client is not available when QUEUE_SYSTEM_ENABLED is false'); + }, + }); return; } diff --git a/services/workflows-service/src/webhooks/webhooks.service.ts b/services/workflows-service/src/webhooks/webhooks.service.ts index 915eca9cec..00dffbf3b5 100644 --- a/services/workflows-service/src/webhooks/webhooks.service.ts +++ b/services/workflows-service/src/webhooks/webhooks.service.ts @@ -56,15 +56,7 @@ export class WebhooksService implements OnModuleInit { private async setupQueueSystem() { try { - this.queueService.createQueue(this.QUEUE_NAME, { - name: this.QUEUE_NAME, - jobOptions: { - attempts: 3, - backoff: { type: 'exponential', delay: 5000 }, - removeOnComplete: { count: 1000, age: 3600 * 24 * 7 }, - removeOnFail: false, - }, - }); + this.queueService.createQueue(this.QUEUE_NAME); this.registerWorker(); @@ -77,11 +69,9 @@ export class WebhooksService implements OnModuleInit { } private registerWorker() { - this.queueService.registerWorker( - this.QUEUE_NAME, - this.processWebhookJob.bind(this), - { concurrency: 10 }, - ); + this.queueService.registerWorker(this.QUEUE_NAME, this.processWebhookJob.bind(this), { + concurrency: 10, + }); } private async processWebhookJob(job: Job) {