diff --git a/packages/bullmq/e2e/module.e2e-spec.ts b/packages/bullmq/e2e/module.e2e-spec.ts index 379f2a73..44487ae3 100644 --- a/packages/bullmq/e2e/module.e2e-spec.ts +++ b/packages/bullmq/e2e/module.e2e-spec.ts @@ -1,8 +1,9 @@ import { MetadataScanner } from '@nestjs/core'; import { Test, TestingModule } from '@nestjs/testing'; -import { Job, Queue, QueueEvents } from 'bullmq'; +import { FlowProducer, Job, Queue, QueueEvents } from 'bullmq'; import { BullModule, + getFlowProducerToken, getQueueToken, OnQueueEvent, OnWorkerEvent, @@ -82,6 +83,73 @@ describe('BullModule', () => { }); }); + describe('registerFlowProducer', () => { + let moduleRef: TestingModule; + + describe('single configuration', () => { + beforeAll(async () => { + moduleRef = await Test.createTestingModule({ + imports: [ + BullModule.registerFlowProducer({ + name: 'test', + connection: { + host: '0.0.0.0', + port: 6380, + }, + }), + ], + }).compile(); + }); + afterAll(async () => { + await moduleRef.close(); + }); + it('should inject the flowProducer with the given name', () => { + const flowProducer = moduleRef.get(getFlowProducerToken('test')); + + expect(flowProducer).toBeDefined(); + expect((flowProducer.opts as any).name).toEqual('test'); + }); + }); + + describe('multiple configurations', () => { + beforeAll(async () => { + moduleRef = await Test.createTestingModule({ + imports: [ + BullModule.registerFlowProducer( + { + name: 'test1', + connection: { + host: '0.0.0.0', + port: 6380, + }, + }, + { + name: 'test2', + connection: { + host: '0.0.0.0', + port: 6380, + }, + }, + ), + ], + }).compile(); + }); + afterAll(async () => { + await moduleRef.close(); + }); + it('should inject the flowProducer with name "test1"', () => { + const flowProducer: FlowProducer = moduleRef.get(getFlowProducerToken('test1')); + expect(flowProducer).toBeDefined(); + expect((flowProducer.opts as any).name).toEqual('test1'); + }); + it('should inject the flowProducer with name "test2"', () => { + const flowProducer: FlowProducer = moduleRef.get(getFlowProducerToken('test2')); + expect(flowProducer).toBeDefined(); + expect((flowProducer.opts as any).name).toEqual('test2'); + }); + }); + }); + describe('forRoot + registerQueue', () => { let moduleRef: TestingModule; @@ -144,6 +212,68 @@ describe('BullModule', () => { }); }); + describe('forRoot + registerFlowProducer', () => { + let moduleRef: TestingModule; + + describe('single configuration', () => { + beforeAll(async () => { + moduleRef = await Test.createTestingModule({ + imports: [ + BullModule.forRoot({ + connection: { + host: '0.0.0.0', + port: 6380, + }, + }), + BullModule.registerFlowProducer({ + name: 'test', + }), + ], + }).compile(); + }); + afterAll(async () => { + await moduleRef.close(); + }); + + it('should inject the flowProducer with the given name', () => { + const flowProducer: FlowProducer = moduleRef.get(getFlowProducerToken('test')); + expect(flowProducer).toBeDefined(); + expect((flowProducer.opts as any).name).toEqual('test'); + }); + }); + + describe('multiple configurations', () => { + beforeAll(async () => { + moduleRef = await Test.createTestingModule({ + imports: [ + BullModule.forRoot({ + connection: { + host: '0.0.0.0', + port: 6380, + }, + }), + BullModule.registerFlowProducer({ name: 'test1' }, { name: 'test2' }), + ], + }).compile(); + }); + afterAll(async () => { + await moduleRef.close(); + }); + it('should inject the flowProducer with name "test1"', () => { + const flowProducer = moduleRef.get(getFlowProducerToken('test1')); + + expect(flowProducer).toBeDefined(); + expect((flowProducer.opts as any).name).toEqual('test1'); + }); + it('should inject the flowProducer with name "test2"', () => { + const flowProducer = moduleRef.get(getFlowProducerToken('test2')); + + expect(flowProducer).toBeDefined(); + expect((flowProducer.opts as any).name).toEqual('test2'); + }); + }); + }); + describe('registerQueueAsync', () => { let moduleRef: TestingModule; @@ -219,6 +349,81 @@ describe('BullModule', () => { }); }); + describe('registerFlowProducerAsync', () => { + let moduleRef: TestingModule; + + describe('single configuration', () => { + describe('useFactory', () => { + beforeAll(async () => { + moduleRef = await Test.createTestingModule({ + imports: [ + BullModule.registerFlowProducerAsync({ + name: 'test', + useFactory: () => ({ + connection: { + host: '0.0.0.0', + port: 6380, + }, + }), + }), + ], + }).compile(); + }); + afterAll(async () => { + await moduleRef.close(); + }); + it('should inject the flowProducer with the given name', () => { + const flowProducer: FlowProducer = moduleRef.get(getFlowProducerToken('test')); + expect(flowProducer).toBeDefined(); + expect((flowProducer.opts as any).name).toEqual('test'); + }); + }); + }); + describe('multiple configurations', () => { + describe('useFactory', () => { + beforeAll(async () => { + moduleRef = await Test.createTestingModule({ + imports: [ + BullModule.registerFlowProducerAsync( + { + name: 'test1', + useFactory: () => ({ + connection: { + host: '0.0.0.0', + port: 6380, + }, + }), + }, + { + name: 'test2', + useFactory: () => ({ + connection: { + host: '0.0.0.0', + port: 6380, + }, + }), + }, + ), + ], + }).compile(); + }); + afterAll(async () => { + await moduleRef.close(); + }); + it('should inject the flowProducer with name "test1"', () => { + const flowProducer: FlowProducer = moduleRef.get(getFlowProducerToken('test1')); + expect(flowProducer).toBeDefined(); + expect((flowProducer.opts as any).name).toEqual('test1'); + }); + it('should inject the flowProducer with name "test2"', () => { + const flowProducer: FlowProducer = moduleRef.get(getFlowProducerToken('test2')); + expect(flowProducer).toBeDefined(); + expect((flowProducer.opts as any).name).toEqual('test2'); + }); + }); + }); + }); + describe('forRootAsync + registerQueueAsync', () => { let moduleRef: TestingModule; diff --git a/packages/bullmq/lib/bull.explorer.ts b/packages/bullmq/lib/bull.explorer.ts index dfe570ae..6ac3b4c3 100644 --- a/packages/bullmq/lib/bull.explorer.ts +++ b/packages/bullmq/lib/bull.explorer.ts @@ -10,6 +10,8 @@ import { Injector } from '@nestjs/core/injector/injector'; import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper'; import { Module } from '@nestjs/core/injector/module'; import { + FlowOpts, + FlowProducer, Processor, Queue, QueueEvents, @@ -19,6 +21,7 @@ import { } from 'bullmq'; import { BullMetadataAccessor } from './bull-metadata.accessor'; import { OnQueueEventMetadata, OnWorkerEventMetadata } from './decorators'; +import { NO_FLOW_PRODUCER_FOUND } from './bull.messages'; import { InvalidProcessorClassError, InvalidQueueEventsListenerClassError, @@ -121,6 +124,23 @@ export class BullExplorer implements OnModuleInit { } } + getFlowProducerOptions(flowProducerToken: string, name: string, configKey?: string) { + try { + const flowProducerRef = this.moduleRef.get(flowProducerToken, { strict: false }); + return flowProducerRef.opts ?? {}; + } catch (err) { + const sharedConfigToken = getSharedConfigToken(configKey); + try { + return this.moduleRef.get(sharedConfigToken, { + strict: false, + }); + } catch (err) { + this.logger.error(NO_FLOW_PRODUCER_FOUND(name)); + throw err; + } + } + } + handleProcessor( instance: T, queueName: string, diff --git a/packages/bullmq/lib/bull.messages.ts b/packages/bullmq/lib/bull.messages.ts index c172f0d2..a06d7ce0 100644 --- a/packages/bullmq/lib/bull.messages.ts +++ b/packages/bullmq/lib/bull.messages.ts @@ -2,3 +2,8 @@ export const NO_QUEUE_FOUND = (name?: string) => name ? `No Queue was found with the given name (${name}). Check your configuration.` : 'No Queue was found. Check your configuration.'; + +export const NO_FLOW_PRODUCER_FOUND = (name?: string) => + name + ? `No Flow Producer was found with the given name (${name}). Check your configuration.` + : 'No Flow Producer was found. Check your configuration.'; diff --git a/packages/bullmq/lib/bull.module.ts b/packages/bullmq/lib/bull.module.ts index b03305b9..85656f6a 100644 --- a/packages/bullmq/lib/bull.module.ts +++ b/packages/bullmq/lib/bull.module.ts @@ -4,14 +4,19 @@ import { } from '@nestjs/bull-shared'; import { DynamicModule, Module, Provider, Type } from '@nestjs/common'; import { DiscoveryModule } from '@nestjs/core'; -import { Queue, QueueOptions, Worker } from 'bullmq'; +import { FlowProducer, Queue, QueueBaseOptions, QueueOptions, Worker } from 'bullmq'; import { BullMetadataAccessor } from './bull-metadata.accessor'; import { BullExplorer } from './bull.explorer'; import { + createFlowProducerOptionProviders, + createFlowProducerProviders, createQueueOptionProviders, createQueueProviders, } from './bull.providers'; import { + RegisterFlowProducerAsyncOptions, + RegisterFlowProducerOptions, + RegisterFlowProducerOptionsFactory, SharedBullAsyncConfiguration, SharedBullConfigurationFactory, } from './interfaces'; @@ -22,6 +27,7 @@ import { } from './interfaces/register-queue-options.interface'; import { BULL_CONFIG_DEFAULT_TOKEN, + getFlowProducerOptionsToken, getQueueOptionsToken, getSharedConfigToken, } from './utils'; @@ -29,6 +35,7 @@ import { @Module({}) export class BullModule { private static _queueClass: Type = Queue; + private static _flowProducerClass: Type = FlowProducer; private static _workerClass: Type = Worker; /** @@ -41,6 +48,16 @@ export class BullModule { this._queueClass = cls; } + /** + * Class to be used to create Bull flow producers. + * This configuration property can be used to instruct the "@nestjs/bullmq" + * package to use, for example, "FlowProducerPro" class (from "BullMQ Pro"). + * @default FlowProducer + */ + static set flowProducerClass(cls: Type) { + this._flowProducerClass = cls; + } + /** * Class to be used to create Bull workers. * This configuration property can be used to instruct the "@nestjs/bullmq" @@ -251,6 +268,112 @@ export class BullModule { }; } + static registerFlowProducer(...options: RegisterFlowProducerOptions[]): DynamicModule { + const optionsArr = [].concat(options); + const flowProducerProviders = createFlowProducerProviders( + optionsArr, + this._flowProducerClass, + ); + const flowProducerOptionProviders = createFlowProducerOptionProviders(optionsArr); + + return { + module: BullModule, + imports: [BullModule.registerCore()], + providers: [...flowProducerOptionProviders, ...flowProducerProviders], + exports: flowProducerProviders, + }; + } + + static registerFlowProducerAsync( + ...options: RegisterFlowProducerAsyncOptions[] + ): DynamicModule { + const optionsArr = [].concat(options); + const flowProducerProviders = createFlowProducerProviders( + optionsArr, + this._flowProducerClass, + ); + + const imports = this.getUniqImports(optionsArr); + const asyncFlowProducerOptionsProviders = options + .map((flowProducerOptions) => this.createAsyncFlowProducerProviders(flowProducerOptions)) + .reduce((a, b) => a.concat(b), []); + + return { + imports: imports.concat(BullModule.registerCore()), + module: BullModule, + providers: [...asyncFlowProducerOptionsProviders, ...flowProducerProviders], + exports: flowProducerProviders, + }; + } + + private static createAsyncFlowProducerProviders( + options: RegisterFlowProducerAsyncOptions, + ): Provider[] { + const optionalSharedConfigHolder = createConditionalDepHolder( + getSharedConfigToken(options.configKey), + BULL_CONFIG_DEFAULT_TOKEN, + ); + + if (options.useExisting || options.useFactory) { + return [ + optionalSharedConfigHolder, + this.createAsyncFlowProducerOptionsProvider(options, optionalSharedConfigHolder), + ]; + } + if (!options.useClass) { + // fallback to the "registerFlowProducer" in case someone accidentally used the "registerFlowProducerAsync" instead + return createFlowProducerOptionProviders([options]); + } + const useClass = options.useClass as Type; + return [ + optionalSharedConfigHolder, + this.createAsyncFlowProducerOptionsProvider(options, optionalSharedConfigHolder), + { + provide: useClass, + useClass, + }, + ]; + } + + private static createAsyncFlowProducerOptionsProvider( + asyncOptions: RegisterFlowProducerAsyncOptions, + optionalSharedConfigHolderRef: Type>, + ): Provider { + if (asyncOptions.useFactory) { + return { + provide: getFlowProducerOptionsToken(asyncOptions.name), + useFactory: async ( + optionalDepHolder: IConditionalDepHolder, + ...factoryArgs: unknown[] + ) => { + return { + ...optionalDepHolder.getDependencyRef(asyncOptions.name), + ...(await asyncOptions.useFactory(...factoryArgs)), + }; + }, + inject: [optionalSharedConfigHolderRef, ...(asyncOptions.inject || [])], + }; + } + // `as Type` is a workaround for microsoft/TypeScript#31603 + const inject = [ + (asyncOptions.useClass || + asyncOptions.useExisting) as Type, + ]; + return { + provide: getFlowProducerOptionsToken(asyncOptions.name), + useFactory: async ( + optionalDepHolder: IConditionalDepHolder, + optionsFactory: RegisterFlowProducerOptionsFactory, + ) => { + return { + ...optionalDepHolder.getDependencyRef(asyncOptions.name), + ...(await optionsFactory.createRegisterQueueOptions()), + }; + }, + inject: [optionalSharedConfigHolderRef, ...inject], + }; + } + private static createAsyncSharedConfigurationProviders( configKey: string | undefined, options: SharedBullAsyncConfiguration, diff --git a/packages/bullmq/lib/bull.providers.ts b/packages/bullmq/lib/bull.providers.ts index 217bab16..cc48c6de 100644 --- a/packages/bullmq/lib/bull.providers.ts +++ b/packages/bullmq/lib/bull.providers.ts @@ -4,11 +4,14 @@ import { IConditionalDepHolder, } from '@nestjs/bull-shared'; import { flatten, OnApplicationShutdown, Provider, Type } from '@nestjs/common'; -import { Queue, Worker } from 'bullmq'; +import { FlowProducer, Queue, Worker } from 'bullmq'; import { BullQueueProcessor } from './bull.types'; +import { RegisterFlowProducerOptions } from './interfaces'; import { RegisterQueueOptions } from './interfaces/register-queue-options.interface'; import { BULL_CONFIG_DEFAULT_TOKEN, + getFlowProducerOptionsToken, + getFlowProducerToken, getQueueOptionsToken, getSharedConfigToken, } from './utils'; @@ -70,6 +73,19 @@ function createQueueAndWorkers( return queue; } +function createFlowProducers( + options: RegisterFlowProducerOptions, + flowProducerClass: Type +): TFlowProducer { + const flowProducer = new flowProducerClass(options); + + (flowProducer as unknown as OnApplicationShutdown).onApplicationShutdown = + async function (this: FlowProducer) { + return this.close(); + }; + return flowProducer; +} + export function createQueueOptionProviders( options: RegisterQueueOptions[], ): Provider[] { @@ -95,6 +111,31 @@ export function createQueueOptionProviders( return flatten(providers); } +export function createFlowProducerOptionProviders( + options: RegisterFlowProducerOptions[], +): Provider[] { + const providers = options.map((option) => { + const optionalSharedConfigHolder = createConditionalDepHolder( + getSharedConfigToken(option.configKey), + BULL_CONFIG_DEFAULT_TOKEN, + ); + return [ + optionalSharedConfigHolder, + { + provide: getFlowProducerOptionsToken(option.name), + useFactory: (optionalDepHolder: IConditionalDepHolder) => { + return { + ...optionalDepHolder.getDependencyRef(option.name), + ...option, + }; + }, + inject: [optionalSharedConfigHolder], + }, + ]; + }); + return flatten(providers); +} + export function createQueueProviders< TQueue = Queue, TWorker extends Worker = Worker, @@ -117,3 +158,23 @@ export function createQueueProviders< })); return queueProviders; } + +export function createFlowProducerProviders< + TFlowProducer = FlowProducer, +>( + options: RegisterFlowProducerOptions[], + flowProducerClass: Type, +): Provider[] { + const flowProducerProviders = options.map((item) => ({ + provide: getFlowProducerToken(item.name), + useFactory: (flowProducerOptions: RegisterFlowProducerOptions) => { + const flowProducerName = flowProducerOptions.name || item.name; + return createFlowProducers( + { ...flowProducerOptions, name: flowProducerName }, + flowProducerClass + ); + }, + inject: [getFlowProducerOptionsToken(item.name)], + })); + return flowProducerProviders; +} diff --git a/packages/bullmq/lib/decorators/index.ts b/packages/bullmq/lib/decorators/index.ts index 99338411..ba7b52e6 100644 --- a/packages/bullmq/lib/decorators/index.ts +++ b/packages/bullmq/lib/decorators/index.ts @@ -1,3 +1,4 @@ +export * from './inject-flow-producer.decorator'; export * from './inject-queue.decorator'; export * from './on-queue-event.decorator'; export * from './on-worker-event.decorator'; diff --git a/packages/bullmq/lib/decorators/inject-flow-producer.decorator.ts b/packages/bullmq/lib/decorators/inject-flow-producer.decorator.ts new file mode 100644 index 00000000..355c475a --- /dev/null +++ b/packages/bullmq/lib/decorators/inject-flow-producer.decorator.ts @@ -0,0 +1,9 @@ +import { Inject } from '@nestjs/common'; +import { getFlowProducerToken } from '../utils'; + +/** + * Injects Bull's flow producer instance with the given name + * @param name flow producer name + */ +export const InjectFlowProducer = (name?: string): ParameterDecorator => + Inject(getFlowProducerToken(name)); \ No newline at end of file diff --git a/packages/bullmq/lib/interfaces/index.ts b/packages/bullmq/lib/interfaces/index.ts index 1c5926d8..43748527 100644 --- a/packages/bullmq/lib/interfaces/index.ts +++ b/packages/bullmq/lib/interfaces/index.ts @@ -1,3 +1,4 @@ export * from './bull-processor.interfaces'; +export * from './register-flow-options.interface'; export * from './register-queue-options.interface'; export * from './shared-bull-config.interface'; diff --git a/packages/bullmq/lib/interfaces/register-flow-options.interface.ts b/packages/bullmq/lib/interfaces/register-flow-options.interface.ts new file mode 100644 index 00000000..2bd9ae50 --- /dev/null +++ b/packages/bullmq/lib/interfaces/register-flow-options.interface.ts @@ -0,0 +1,61 @@ +import { FactoryProvider, ModuleMetadata, Type } from '@nestjs/common'; +import { QueueBaseOptions } from 'bullmq'; + +export interface RegisterFlowProducerOptions extends QueueBaseOptions { + /** + * Flow name + * + * @default default + */ + name?: string; + + /** + * Shared configuration key + * + * @default default + */ + configKey?: string; +} + +export interface RegisterFlowProducerOptionsFactory { + createRegisterQueueOptions(): + | Promise + | RegisterFlowProducerOptions; +} + +export interface RegisterFlowProducerAsyncOptions + extends Pick { + /** + * Flow name. + * + * @default default + */ + name?: string; + + /** + * Shared configuration key. + */ + configKey?: string; + + /** + * Existing Provider to be used. + */ + useExisting?: Type; + + /** + * Type (class name) of provider (instance to be registered and injected). + */ + useClass?: Type; + + /** + * Factory function that returns an instance of the provider to be injected. + */ + useFactory?: ( + ...args: any[] + ) => Promise | RegisterFlowProducerOptions; + + /** + * Optional list of providers to be injected into the context of the Factory function. + */ + inject?: FactoryProvider['inject']; +} diff --git a/packages/bullmq/lib/interfaces/register-queue-options.interface.ts b/packages/bullmq/lib/interfaces/register-queue-options.interface.ts index afc676ca..fc8a4b86 100644 --- a/packages/bullmq/lib/interfaces/register-queue-options.interface.ts +++ b/packages/bullmq/lib/interfaces/register-queue-options.interface.ts @@ -1,8 +1,8 @@ import { FactoryProvider, ModuleMetadata, Type } from '@nestjs/common'; -import * as Bull from 'bullmq'; +import { QueueOptions } from 'bullmq'; import { BullQueueProcessor } from '../bull.types'; -export interface RegisterQueueOptions extends Bull.QueueOptions { +export interface RegisterQueueOptions extends QueueOptions { /** * Queue name * diff --git a/packages/bullmq/lib/test/bull.explorer.spec.ts b/packages/bullmq/lib/test/bull.explorer.spec.ts index 9a55dbb1..98d6140c 100644 --- a/packages/bullmq/lib/test/bull.explorer.spec.ts +++ b/packages/bullmq/lib/test/bull.explorer.spec.ts @@ -11,6 +11,7 @@ import { import { BullMetadataAccessor } from '../bull-metadata.accessor'; import { BullExplorer } from '../bull.explorer'; import { BullModule } from '../bull.module'; +import { getFlowProducerToken } from '../utils'; const workerCtorSpy = jest.fn(); const queueEventsSpy = jest.fn(); @@ -176,4 +177,33 @@ describe('BullExplorer', () => { await moduleRef.close(); }); }); + + describe('getFlowProducerOptions', () => { + it('should return options associated with the given flowProducer', async () => { + const flowProducerToken = getFlowProducerToken('test'); + const flowProducerOptions = { + connection: { + host: 'localhost', + port: 65793, + }, + sharedConnection: true, + }; + const moduleRef = await Test.createTestingModule({ + imports: [BullModule.registerFlowProducer({ name: 'test', ...flowProducerOptions })], + }) + .overrideProvider(flowProducerToken) + .useValue({ + opts: flowProducerOptions, + }) + .compile(); + + const explorer = moduleRef.get(BullExplorer); + + const flowProducerOpts = explorer.getFlowProducerOptions(flowProducerToken, 'test'); + expect(flowProducerOpts).toBeDefined(); + expect(flowProducerOpts).toEqual(flowProducerOptions); + + await moduleRef.close(); + }); + }); }); diff --git a/packages/bullmq/lib/utils/get-flow-producer-options-token.util.ts b/packages/bullmq/lib/utils/get-flow-producer-options-token.util.ts new file mode 100644 index 00000000..dc8d33f0 --- /dev/null +++ b/packages/bullmq/lib/utils/get-flow-producer-options-token.util.ts @@ -0,0 +1,3 @@ +export function getFlowProducerOptionsToken(name?: string): string { + return name ? `BullMQFlowProducerOptions_${name}` : 'BullMQFlowProducerOptions_default'; +} diff --git a/packages/bullmq/lib/utils/get-flow-producer-token.util.ts b/packages/bullmq/lib/utils/get-flow-producer-token.util.ts new file mode 100644 index 00000000..2504b7f7 --- /dev/null +++ b/packages/bullmq/lib/utils/get-flow-producer-token.util.ts @@ -0,0 +1,3 @@ +export function getFlowProducerToken(name?: string): string { + return name ? `BullFlowProducer_${name}` : 'BullFlowProducer_default'; +} diff --git a/packages/bullmq/lib/utils/index.ts b/packages/bullmq/lib/utils/index.ts index a7662d92..a01e60b1 100644 --- a/packages/bullmq/lib/utils/index.ts +++ b/packages/bullmq/lib/utils/index.ts @@ -1,2 +1,4 @@ +export * from './get-flow-producer-token.util'; +export * from './get-flow-producer-options-token.util'; export * from './get-queue-options-token.util'; export * from './get-shared-config-token.util';