Skip to content
Open
8 changes: 4 additions & 4 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

78 changes: 28 additions & 50 deletions services/workflows-service/src/alert/alert-queue.service.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
import { Job } from 'bullmq';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { AlertService } from './alert.service';
import { QueueService } from '@/common/queue/queue.service';
import { QueueBullboardService } from '@/common/queue/queue-bullboard.service';
import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.service';
import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types';
import type { BullBoardInjectedInstance } from '@/common/queue/types';
import type { IQueueService } from '@/common/queue/types';
import { env } from '@/env';

export interface AlertCheckJobData extends Record<string, unknown> {
Expand All @@ -21,53 +16,37 @@ export class AlertQueueService implements OnModuleInit {
constructor(
private readonly logger: AppLoggerService,
private readonly alertService: AlertService,
private readonly queueService: QueueService,
private readonly queueBullboardService: QueueBullboardService,
private readonly bullMQPrometheusService: BullMQPrometheusService,
@Inject(BULLBOARD_INSTANCE_INJECTION_TOKEN)
private bullBoard: BullBoardInjectedInstance,
@Inject('IQueueService') private readonly queueService: IQueueService,
) {}

async onModuleInit() {
if (!env.QUEUE_SYSTEM_ENABLED) {
return;
}

await this.setupAlertQueue();
}

private async setupAlertQueue() {
try {
const queue = this.queueService.getQueue<AlertCheckJobData>({
this.queueService.createQueue<AlertCheckJobData>(this.QUEUE_NAME, {
name: this.QUEUE_NAME,
jobOptions: {
attempts: 10,
backoff: {
type: 'exponential',
delay: 10_000,
},
attempts: 3,
backoff: { type: 'exponential', delay: 10000 },
removeOnComplete: { count: 100, age: 3600 * 24 },
removeOnFail: false,
},
});

this.bullMQPrometheusService.registerQueue(queue);

if (this.queueService.isWorkerEnabled()) {
this.queueBullboardService.registerQueue(this.bullBoard, queue);
}

await this.queueService.setupJobScheduler(queue, this.SCHEDULER_ID, {
every: 60 * 60 * 1000,
jobName: 'check-transaction-monitoring-alerts',
data: { timestamp: Date.now() },
jobOptions: {
attempts: 10,
backoff: {
type: 'exponential',
delay: 10_000,
},
await this.queueService.setupJobScheduler(
this.QUEUE_NAME,
this.SCHEDULER_ID,
{ every: 60 * 60 * 1000 },
{
name: 'alert-check',
data: { timestamp: Date.now() },
},
});
);

this.registerWorker();

Expand All @@ -78,22 +57,21 @@ export class AlertQueueService implements OnModuleInit {
}

private registerWorker() {
this.queueService.registerWorker<AlertCheckJobData>(
this.QUEUE_NAME,
async (job: Job<AlertCheckJobData>) => {
this.logger.log('Processing transaction monitoring alerts check job', {
jobId: job.id,
});

await this.alertService.checkAllAlerts();
this.queueService.registerWorker(this.QUEUE_NAME, this.processAlertCheckJob.bind(this), {
concurrency: 1,
});
}

this.logger.log('Completed transaction monitoring alerts check', {
jobId: job.id,
});
private async processAlertCheckJob(job: any) {
this.logger.log('Processing transaction monitoring alerts check job', { jobId: job.id });
try {
await this.alertService.checkAllAlerts();
this.logger.log('Completed transaction monitoring alerts check', { jobId: job.id });

return { success: true, timestamp: Date.now() };
},
{ concurrency: 1 },
);
return { success: true, timestamp: Date.now() };
} catch (error) {
this.logger.error('Alert check job failed', { jobId: job.id, error });
throw error;
}
}
}
11 changes: 3 additions & 8 deletions services/workflows-service/src/app.worker.module.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import { MiddlewareConsumer, Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ClsModule } from 'nestjs-cls';

import { AnalyticsModule } from '@/common/analytics-logger/analytics.module';
import { AppLoggerModule } from '@/common/app-logger/app-logger.module';
import { QueueModule } from '@/common/queue/queue.module';
import { configs, env } from '@/env';
import { configs } from '@/env';
import { validate } from '@/env-validate';
import { SecretsManagerModule } from '@/secrets-manager/secrets-manager.module';
import { SentryModule } from '@/sentry/sentry.module';
import { WebhooksModule } from '@/webhooks/webhooks.module';
import { HealthModule } from './health/health.module';
import { PrismaModule } from './prisma/prisma.module';
import { AlertModule } from './alert/alert.module';
import { MetricsAuthMiddleware } from './common/middlewares/metrics-auth.middleware';
import { Module } from '@nestjs/common';

@Module({
imports: [
Expand All @@ -37,8 +36,4 @@ import { MetricsAuthMiddleware } from './common/middlewares/metrics-auth.middlew
AlertModule,
],
})
export class WorkerAppModule {
configure(consumer: MiddlewareConsumer) {
consumer.apply(MetricsAuthMiddleware).forRoutes('*');
}
}
export class WorkerAppModule {}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,4 @@ export class QueueBullboardService {
this.logger.error(`Error registering queue ${queue.name} with BullBoard`, { error });
}
}

registerQueues(bullBoardInstance: any, queues: Queue[]) {
try {
const adapters = queues.map(queue => new BullMQAdapter(queue));
bullBoardInstance.boardInstance.setQueues(adapters);
this.logger.log(`Registered ${queues.length} queues with BullBoard`);
} catch (error) {
this.logger.error('Error registering queues with BullBoard', { error });
}
}
}
33 changes: 30 additions & 3 deletions services/workflows-service/src/common/queue/queue.module.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,37 @@
import { Module } from '@nestjs/common';
import { QueueService } from './queue.service';
import { BullMQQueueService } from './queue.service';
import { QueueBullboardService } from './queue-bullboard.service';
import { QueueOtelService } from './otel.service';
import { MonitoringModule } from '@/common/monitoring/monitoring.module';
import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from './types';
import { createBullBoard } from '@bull-board/api';
import { ExpressAdapter } from '@bull-board/express';
import { RedisModule } from '../redis/redis.module';

@Module({
providers: [QueueService, QueueBullboardService, QueueOtelService],
exports: [QueueService, QueueBullboardService, QueueOtelService],
imports: [MonitoringModule, RedisModule],
providers: [
BullMQQueueService,
{ provide: 'IQueueService', useExisting: BullMQQueueService },
QueueBullboardService,
QueueOtelService,
{
provide: BULLBOARD_INSTANCE_INJECTION_TOKEN,
useFactory: () => {
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/api/queues');
const boardInstance = createBullBoard({ queues: [], serverAdapter });

return { boardInstance, serverAdapter };
},
},
],
exports: [
BullMQQueueService,
'IQueueService',
QueueBullboardService,
QueueOtelService,
BULLBOARD_INSTANCE_INJECTION_TOKEN,
],
})
export class QueueModule {}
Loading
Loading