Skip to content

Commit

Permalink
Merge pull request #4786 from mahendraHegde/refactor-execution-detail
Browse files Browse the repository at this point in the history
refactor: use execution queue instead of writing directly to execution detail collection
  • Loading branch information
Cliftonz authored Nov 19, 2023
2 parents e943f69 + 99000c0 commit da64b14
Show file tree
Hide file tree
Showing 32 changed files with 738 additions and 164 deletions.
4 changes: 3 additions & 1 deletion apps/worker/src/app/workflow/services/cold-start.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import { INovuWorker, ReadinessService } from '@novu/application-generic';
import { StandardWorker } from './standard.worker';
import { SubscriberProcessWorker } from './subscriber-process.worker';
import { WorkflowWorker } from './workflow.worker';
import { ExecutionLogWorker } from './execution-log.worker';

const getWorkers = (app: INestApplication): INovuWorker[] => {
const standardWorker = app.get(StandardWorker, { strict: false });
const workflowWorker = app.get(WorkflowWorker, { strict: false });
const subscriberProcessWorker = app.get(SubscriberProcessWorker, { strict: false });
const executionLogWorker = app.get(ExecutionLogWorker, { strict: false });

const workers: INovuWorker[] = [standardWorker, workflowWorker, subscriberProcessWorker];
const workers: INovuWorker[] = [standardWorker, workflowWorker, subscriberProcessWorker, executionLogWorker];

return workers;
};
Expand Down
141 changes: 141 additions & 0 deletions apps/worker/src/app/workflow/services/execution-log.worker.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import { setTimeout } from 'timers/promises';

import { TriggerEvent, ExecutionLogQueueService, CreateExecutionDetails } from '@novu/application-generic';

import { ExecutionLogWorker } from './execution-log.worker';

import { WorkflowModule } from '../workflow.module';

let executionLogQueueService: ExecutionLogQueueService;
let executionLogWorker: ExecutionLogWorker;

describe('ExecutionLog Worker', () => {
before(async () => {
process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false';
process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false';

const moduleRef = await Test.createTestingModule({
imports: [WorkflowModule],
}).compile();

const createExecutionDetails = moduleRef.get<CreateExecutionDetails>(CreateExecutionDetails);

executionLogWorker = new ExecutionLogWorker(createExecutionDetails);

executionLogQueueService = new ExecutionLogQueueService();
await executionLogQueueService.queue.obliterate();
});

after(async () => {
await executionLogQueueService.queue.drain();
await executionLogWorker.gracefulShutdown();
});

it('should be initialised properly', async () => {
expect(executionLogWorker).to.be.ok;
expect(executionLogWorker).to.have.all.keys('DEFAULT_ATTEMPTS', 'instance', 'topic', 'createExecutionDetails');
expect(await executionLogWorker.bullMqService.getStatus()).to.deep.equal({
queueIsPaused: undefined,
queueName: undefined,
workerName: 'execution-logs',
workerIsPaused: false,
workerIsRunning: true,
});
expect(executionLogWorker.worker.opts).to.deep.include({
concurrency: 200,
lockDuration: 90000,
});
});

it('should be able to automatically pull a job from the queue', async () => {
const existingJobs = await executionLogQueueService.queue.getJobs();
expect(existingJobs.length).to.equal(0);

const jobId = 'execution-logs-queue-job-id';
const _environmentId = 'execution-logs-queue-environment-id';
const _organizationId = 'execution-logs-queue-organization-id';
const _userId = 'execution-logs-queue-user-id';
const jobData = {
_id: jobId,
test: 'execution-logs-queue-job-data',
_environmentId,
_organizationId,
_userId,
};

await executionLogQueueService.add(jobId, jobData, _organizationId);

expect(await executionLogQueueService.queue.getActiveCount()).to.equal(1);
expect(await executionLogQueueService.queue.getWaitingCount()).to.equal(0);

// When we arrive to pull the job it has been already pulled by the worker
const nextJob = await executionLogWorker.worker.getNextJob(jobId);
expect(nextJob).to.equal(undefined);

await setTimeout(100);

// No jobs left in queue
const queueJobs = await executionLogQueueService.queue.getJobs();
expect(queueJobs.length).to.equal(0);
});

it('should pause the worker', async () => {
const isPaused = await executionLogWorker.worker.isPaused();
expect(isPaused).to.equal(false);

const runningStatus = await executionLogWorker.bullMqService.getStatus();
expect(runningStatus).to.deep.equal({
queueIsPaused: undefined,
queueName: undefined,
workerName: 'execution-logs',
workerIsPaused: false,
workerIsRunning: true,
});

await executionLogWorker.pause();

const isNowPaused = await executionLogWorker.worker.isPaused();
expect(isNowPaused).to.equal(true);

const runningStatusChanged = await executionLogWorker.bullMqService.getStatus();
expect(runningStatusChanged).to.deep.equal({
queueIsPaused: undefined,
queueName: undefined,
workerName: 'execution-logs',
workerIsPaused: true,
workerIsRunning: true,
});
});

it('should resume the worker', async () => {
await executionLogWorker.pause();

const isPaused = await executionLogWorker.worker.isPaused();
expect(isPaused).to.equal(true);

const runningStatus = await executionLogWorker.bullMqService.getStatus();
expect(runningStatus).to.deep.equal({
queueIsPaused: undefined,
queueName: undefined,
workerName: 'execution-logs',
workerIsPaused: true,
workerIsRunning: true,
});

await executionLogWorker.resume();

const isNowPaused = await executionLogWorker.worker.isPaused();
expect(isNowPaused).to.equal(false);

const runningStatusChanged = await executionLogWorker.bullMqService.getStatus();
expect(runningStatusChanged).to.deep.equal({
queueIsPaused: undefined,
queueName: undefined,
workerName: 'execution-logs',
workerIsPaused: false,
workerIsRunning: true,
});
});
});
61 changes: 61 additions & 0 deletions apps/worker/src/app/workflow/services/execution-log.worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { Injectable, Logger } from '@nestjs/common';
const nr = require('newrelic');
import {
getExecutionLogWorkerOptions,
INovuWorker,
PinoLogger,
storage,
Store,
ExecutionLogWorkerService,
WorkerOptions,
WorkerProcessor,
CreateExecutionDetails,
CreateExecutionDetailsCommand,
} from '@novu/application-generic';
import { ObservabilityBackgroundTransactionEnum } from '@novu/shared';
const LOG_CONTEXT = 'ExecutionLogWorker';

@Injectable()
export class ExecutionLogWorker extends ExecutionLogWorkerService implements INovuWorker {
constructor(private createExecutionDetails: CreateExecutionDetails) {
super();
this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions());
}
gracefulShutdown: () => Promise<void>;
onModuleDestroy: () => Promise<void>;
pause: () => Promise<void>;
resume: () => Promise<void>;

private getWorkerOptions(): WorkerOptions {
return getExecutionLogWorkerOptions();
}

private getWorkerProcessor(): WorkerProcessor {
return async ({ data }: { data: CreateExecutionDetailsCommand }) => {
return await new Promise(async (resolve, reject) => {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const _this = this;

Logger.verbose(`Job ${data.jobId} is being inserted into execution details collection`, LOG_CONTEXT);

nr.startBackgroundTransaction(
ObservabilityBackgroundTransactionEnum.EXECUTION_LOG_QUEUE,
'Trigger Engine',
function () {
const transaction = nr.getTransaction();

storage.run(new Store(PinoLogger.root), () => {
_this.createExecutionDetails
.execute(data)
.then(resolve)
.catch(reject)
.finally(() => {
transaction.end();
});
});
}
);
});
};
}
}
1 change: 1 addition & 0 deletions apps/worker/src/app/workflow/services/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from './active-jobs-metric.service';
export * from './completed-jobs-metric.service';
export * from './standard.worker';
export * from './workflow.worker';
export * from './execution-log.worker';
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { Injectable, Logger } from '@nestjs/common';
import { JobRepository } from '@novu/dal';
import { ExecutionDetailsSourceEnum, ExecutionDetailsStatusEnum } from '@novu/shared';
import {
CreateExecutionDetails,
CreateExecutionDetailsCommand,
DetailEnum,
ExecutionLogQueueService,
InstrumentUsecase,
} from '@novu/application-generic';

Expand All @@ -19,7 +19,7 @@ const LOG_CONTEXT = 'HandleLastFailedJob';
@Injectable()
export class HandleLastFailedJob {
constructor(
private createExecutionDetails: CreateExecutionDetails,
private executionLogQueueService: ExecutionLogQueueService,
private queueNextJob: QueueNextJob,
private jobRepository: JobRepository
) {}
Expand All @@ -40,17 +40,20 @@ export class HandleLastFailedJob {
Logger.error(message, new NotFoundError(message), LOG_CONTEXT);
throw new PlatformException(message);
}

await this.createExecutionDetails.execute(
const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata();
await this.executionLogQueueService.add(
metadata._id,
CreateExecutionDetailsCommand.create({
...metadata,
...CreateExecutionDetailsCommand.getDetailsFromJob(job),
detail: DetailEnum.WEBHOOK_FILTER_FAILED_LAST_RETRY,
source: ExecutionDetailsSourceEnum.WEBHOOK,
status: ExecutionDetailsStatusEnum.PENDING,
isTest: false,
isRetry: true,
raw: JSON.stringify({ message: JSON.parse(error.message).message }),
})
}),
job._organizationId
);

if (!job?.step?.shouldStopOnFail) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import { MessageMatcher } from './message-matcher.usecase';
import type { SendMessageCommand } from '../send-message/send-message.command';

describe('Message filter matcher', function () {
const createExecutionDetails = {
execute: sinon.stub(),
const executionLogQueueService = {
add: sinon.stub(),
};
const messageMatcher = new MessageMatcher(
undefined as any,
createExecutionDetails as any,
executionLogQueueService as any,
undefined as any,
undefined as any,
undefined as any,
Expand Down Expand Up @@ -708,7 +708,7 @@ describe('Message filter matcher', function () {
it('allows to process multiple filter parts', async () => {
const matcher = new MessageMatcher(
{ findOne: () => Promise.resolve(getSubscriber()) } as any,
createExecutionDetails as any,
executionLogQueueService as any,
undefined as any,
undefined as any,
undefined as any,
Expand Down Expand Up @@ -745,7 +745,7 @@ describe('Message filter matcher', function () {
lastName: 'Doe',
}),
} as any,
createExecutionDetails as any,
executionLogQueueService as any,
undefined as any,
undefined as any,
undefined as any,
Expand Down Expand Up @@ -776,7 +776,7 @@ describe('Message filter matcher', function () {
lastName: 'Doe',
}),
} as any,
createExecutionDetails as any,
executionLogQueueService as any,
undefined as any,
undefined as any,
undefined as any,
Expand All @@ -801,7 +801,7 @@ describe('Message filter matcher', function () {
it('allows to process if the subscriber is online', async () => {
const matcher = new MessageMatcher(
{ findOne: () => Promise.resolve(getSubscriber()) } as any,
createExecutionDetails as any,
executionLogQueueService as any,
undefined as any,
undefined as any,
undefined as any,
Expand All @@ -826,7 +826,7 @@ describe('Message filter matcher', function () {
it("doesn't allow to process if the subscriber is not online", async () => {
const matcher = new MessageMatcher(
{ findOne: () => Promise.resolve(getSubscriber({ isOnline: false })) } as any,
createExecutionDetails as any,
executionLogQueueService as any,
undefined as any,
undefined as any,
undefined as any,
Expand Down Expand Up @@ -855,7 +855,7 @@ describe('Message filter matcher', function () {
{
findOne: () => Promise.resolve(getSubscriber({ isOnline: true }, { subDuration: { minutes: 3 } })),
} as any,
createExecutionDetails as any,
executionLogQueueService as any,
undefined as any,
undefined as any,
undefined as any,
Expand Down Expand Up @@ -893,7 +893,7 @@ describe('Message filter matcher', function () {
lastName: 'Doe',
}),
} as any,
createExecutionDetails as any,
executionLogQueueService as any,
undefined as any,
undefined as any,
undefined as any,
Expand Down Expand Up @@ -921,7 +921,7 @@ describe('Message filter matcher', function () {
{
findOne: () => Promise.resolve(getSubscriber({ isOnline: true }, { subDuration: { minutes: 10 } })),
} as any,
createExecutionDetails as any,
executionLogQueueService as any,
undefined as any,
undefined as any,
undefined as any,
Expand Down Expand Up @@ -949,7 +949,7 @@ describe('Message filter matcher', function () {
{
findOne: () => Promise.resolve(getSubscriber({ isOnline: false }, { subDuration: { minutes: 4 } })),
} as any,
createExecutionDetails as any,
executionLogQueueService as any,
undefined as any,
undefined as any,
undefined as any,
Expand Down Expand Up @@ -977,7 +977,7 @@ describe('Message filter matcher', function () {
{
findOne: () => Promise.resolve(getSubscriber({ isOnline: false }, { subDuration: { minutes: 6 } })),
} as any,
createExecutionDetails as any,
executionLogQueueService as any,
undefined as any,
undefined as any,
undefined as any,
Expand Down Expand Up @@ -1005,7 +1005,7 @@ describe('Message filter matcher', function () {
{
findOne: () => Promise.resolve(getSubscriber({ isOnline: false }, { subDuration: { minutes: 30 } })),
} as any,
createExecutionDetails as any,
executionLogQueueService as any,
undefined as any,
undefined as any,
undefined as any,
Expand Down Expand Up @@ -1033,7 +1033,7 @@ describe('Message filter matcher', function () {
{
findOne: () => Promise.resolve(getSubscriber({ isOnline: false }, { subDuration: { hours: 23 } })),
} as any,
createExecutionDetails as any,
executionLogQueueService as any,
undefined as any,
undefined as any,
undefined as any,
Expand Down
Loading

0 comments on commit da64b14

Please sign in to comment.