Skip to content

Commit

Permalink
feat(bullmq): add FlowProducer injectable
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jan 5, 2023
1 parent 4b1abc6 commit 244e110
Show file tree
Hide file tree
Showing 14 changed files with 529 additions and 5 deletions.
207 changes: 206 additions & 1 deletion packages/bullmq/e2e/module.e2e-spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { MetadataScanner } from '@nestjs/core';
import { Test, TestingModule } from '@nestjs/testing';
import { Job, Queue, QueueEvents } from 'bullmq';
import { FlowProducer, Job, Queue, QueueEvents } from 'bullmq';
import {
BullModule,
getFlowProducerToken,
getQueueToken,
OnQueueEvent,
OnWorkerEvent,
Expand Down Expand Up @@ -80,6 +81,73 @@ describe('BullModule', () => {
});
});

describe('registerFlowProducer', () => {
let moduleRef: TestingModule;

describe('single configuration', () => {
beforeAll(async () => {
moduleRef = await Test.createTestingModule({
imports: [
BullModule.registerFlowProducer({
name: 'test',
connection: {
host: '0.0.0.0',
port: 6380,
},
}),
],
}).compile();
});
afterAll(async () => {
await moduleRef.close();
});
it('should inject the flowProducer with the given name', () => {
const flowProducer = moduleRef.get<FlowProducer>(getFlowProducerToken('test'));

expect(flowProducer).toBeDefined();
expect((flowProducer.opts as any).name).toEqual('test');
});
});

describe('multiple configurations', () => {
beforeAll(async () => {
moduleRef = await Test.createTestingModule({
imports: [
BullModule.registerFlowProducer(
{
name: 'test1',
connection: {
host: '0.0.0.0',
port: 6380,
},
},
{
name: 'test2',
connection: {
host: '0.0.0.0',
port: 6380,
},
},
),
],
}).compile();
});
afterAll(async () => {
await moduleRef.close();
});
it('should inject the flowProducer with name "test1"', () => {
const flowProducer: FlowProducer = moduleRef.get<FlowProducer>(getFlowProducerToken('test1'));
expect(flowProducer).toBeDefined();
expect((flowProducer.opts as any).name).toEqual('test1');
});
it('should inject the flowProducer with name "test2"', () => {
const flowProducer: FlowProducer = moduleRef.get<FlowProducer>(getFlowProducerToken('test2'));
expect(flowProducer).toBeDefined();
expect((flowProducer.opts as any).name).toEqual('test2');
});
});
});

describe('forRoot + registerQueue', () => {
let moduleRef: TestingModule;

Expand Down Expand Up @@ -142,6 +210,68 @@ describe('BullModule', () => {
});
});

describe('forRoot + registerFlowProducer', () => {
let moduleRef: TestingModule;

describe('single configuration', () => {
beforeAll(async () => {
moduleRef = await Test.createTestingModule({
imports: [
BullModule.forRoot({
connection: {
host: '0.0.0.0',
port: 6380,
},
}),
BullModule.registerFlowProducer({
name: 'test',
}),
],
}).compile();
});
afterAll(async () => {
await moduleRef.close();
});

it('should inject the flowProducer with the given name', () => {
const flowProducer: FlowProducer = moduleRef.get<FlowProducer>(getFlowProducerToken('test'));
expect(flowProducer).toBeDefined();
expect((flowProducer.opts as any).name).toEqual('test');
});
});

describe('multiple configurations', () => {
beforeAll(async () => {
moduleRef = await Test.createTestingModule({
imports: [
BullModule.forRoot({
connection: {
host: '0.0.0.0',
port: 6380,
},
}),
BullModule.registerFlowProducer({ name: 'test1' }, { name: 'test2' }),
],
}).compile();
});
afterAll(async () => {
await moduleRef.close();
});
it('should inject the flowProducer with name "test1"', () => {
const flowProducer = moduleRef.get<FlowProducer>(getFlowProducerToken('test1'));

expect(flowProducer).toBeDefined();
expect((flowProducer.opts as any).name).toEqual('test1');
});
it('should inject the flowProducer with name "test2"', () => {
const flowProducer = moduleRef.get<FlowProducer>(getFlowProducerToken('test2'));

expect(flowProducer).toBeDefined();
expect((flowProducer.opts as any).name).toEqual('test2');
});
});
});

describe('registerQueueAsync', () => {
let moduleRef: TestingModule;

Expand Down Expand Up @@ -217,6 +347,81 @@ describe('BullModule', () => {
});
});

describe('registerFlowProducerAsync', () => {
let moduleRef: TestingModule;

describe('single configuration', () => {
describe('useFactory', () => {
beforeAll(async () => {
moduleRef = await Test.createTestingModule({
imports: [
BullModule.registerFlowProducerAsync({
name: 'test',
useFactory: () => ({
connection: {
host: '0.0.0.0',
port: 6380,
},
}),
}),
],
}).compile();
});
afterAll(async () => {
await moduleRef.close();
});
it('should inject the flowProducer with the given name', () => {
const flowProducer: FlowProducer = moduleRef.get<FlowProducer>(getFlowProducerToken('test'));
expect(flowProducer).toBeDefined();
expect((flowProducer.opts as any).name).toEqual('test');
});
});
});
describe('multiple configurations', () => {
describe('useFactory', () => {
beforeAll(async () => {
moduleRef = await Test.createTestingModule({
imports: [
BullModule.registerFlowProducerAsync(
{
name: 'test1',
useFactory: () => ({
connection: {
host: '0.0.0.0',
port: 6380,
},
}),
},
{
name: 'test2',
useFactory: () => ({
connection: {
host: '0.0.0.0',
port: 6380,
},
}),
},
),
],
}).compile();
});
afterAll(async () => {
await moduleRef.close();
});
it('should inject the flowProducer with name "test1"', () => {
const flowProducer: FlowProducer = moduleRef.get<FlowProducer>(getFlowProducerToken('test1'));
expect(flowProducer).toBeDefined();
expect((flowProducer.opts as any).name).toEqual('test1');
});
it('should inject the flowProducer with name "test2"', () => {
const flowProducer: FlowProducer = moduleRef.get<FlowProducer>(getFlowProducerToken('test2'));
expect(flowProducer).toBeDefined();
expect((flowProducer.opts as any).name).toEqual('test2');
});
});
});
});

describe('forRootAsync + registerQueueAsync', () => {
let moduleRef: TestingModule;

Expand Down
20 changes: 20 additions & 0 deletions packages/bullmq/lib/bull.explorer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { Injector } from '@nestjs/core/injector/injector';
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
import { Module } from '@nestjs/core/injector/module';
import {
FlowOpts,
FlowProducer,
Processor,
Queue,
QueueEvents,
Expand All @@ -19,6 +21,7 @@ import {
} from 'bullmq';
import { BullMetadataAccessor } from './bull-metadata.accessor';
import { OnQueueEventMetadata, OnWorkerEventMetadata } from './decorators';
import { NO_FLOW_PRODUCER_FOUND } from './bull.messages';
import {
InvalidProcessorClassError,
InvalidQueueEventsListenerClassError,
Expand Down Expand Up @@ -121,6 +124,23 @@ export class BullExplorer implements OnModuleInit {
}
}

getFlowProducerOptions(flowProducerToken: string, name: string, configKey?: string) {
try {
const flowProducerRef = this.moduleRef.get<FlowProducer>(flowProducerToken, { strict: false });
return flowProducerRef.opts ?? {};
} catch (err) {
const sharedConfigToken = getSharedConfigToken(configKey);
try {
return this.moduleRef.get<FlowOpts>(sharedConfigToken, {
strict: false,
});
} catch (err) {
this.logger.error(NO_FLOW_PRODUCER_FOUND(name));
throw err;
}
}
}

handleProcessor<T extends WorkerHost>(
instance: T,
queueName: string,
Expand Down
5 changes: 5 additions & 0 deletions packages/bullmq/lib/bull.messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@ export const NO_QUEUE_FOUND = (name?: string) =>
name
? `No Queue was found with the given name (${name}). Check your configuration.`
: 'No Queue was found. Check your configuration.';

export const NO_FLOW_PRODUCER_FOUND = (name?: string) =>
name
? `No Flow Producer was found with the given name (${name}). Check your configuration.`
: 'No Flow Producer was found. Check your configuration.';
Loading

0 comments on commit 244e110

Please sign in to comment.