A lightweight job scheduler for NestJS
- Background
- Install
- Configure Agenda
- Job processors
- Job schedulers
- Event listeners
- Manually working with a queue
- License
Agenda Nest provides a NestJS module wrapper for Agenda, a lightweight job scheduling library. Heavily inspired by Nest's own Bull implementation, @nestjs/bull, Agenda Nest provides a fully-featured implementation, complete with decorators for defining your jobs, processors and queue event listeners. You may optionally, make use of Agenda Nest's Express controller to interface with your queues through HTTP.
Agenda uses MongoDB to persist job data, so you'll need to have Mongo (or mongoose) installed on your system.
npm install agenda-nest
As Agenda Nest is a wrapper for Agenda, it is configurable with same properties as the Agenda instance. Refer to AgendaConfig for the complete configuration type.
import { Module } from '@nestjs/common'
import { AgendaModule } from 'agenda-nest';
@Module({
imports: [
AgendaModule.forRoot({
processEvery: '3 minutes',
db: {
addresss: 'mongodb://localhost:27017',
},
}),
],
providers: [Jobs],
})
export class AppModule {}
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { AgendaModule } from 'agenda-nest';
import configuration from './configuration';
@Module({
imports: [
ConfigModule.forRoot({
load: [configuration],
}),
AgendaModule.forRootAsync({
useFactory: (config: ConfigService) => ({
processEvery: config.get('queues.processInterval'),
db: {
address: config.get('database.connectionString'),
},
}),
inject: [ConfigService],
})
],
providers: [Jobs],
})
export class AppModule {}
Agenda Nest can manage multiple queues within your application. To configure a new queue use AgendaModule.registerQueue(queueName: string, config: AgendaConfig)
. Queues will inherit the configuration provided to Agenda.forRoot
, merging and overriding properties provided to the queue.
import { Module } from '@nestjs/common';
import { AgendaModule } from 'agenda-nest';
@Module({
imports: [
AgendaModule.registerQueue('notifications', {
processEvery: '5 minutes',
autoStart: false, // default: true
collection: 'notificationsqueue', // default: notifications-queue (`${queueName}-queue`)
}),
],
})
export class NotificationsModule {}
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { AgendaModule } from 'agenda-nest';
@Module({
imports: [
AgendaModule.registerQueueAsync('notifications', {
useFactory: (config: ConfigService) => ({
processEvery: config.get('queues.notifications.processInterval'),
autoStart: config.get('queues.notifications.autoStart'),
}),
inject: [ConfigService],
}),
],
})
export class NotificationsModule {}
Job processors are methods defined on a class declared with the @Queue(name: string)
decorator. The queue name will be used to create the MongoDB collection, formatted as "{queue Name}-queue"
, for each queue. You can also specify your own collection name.
import { Queue } from 'agenda-nest';
// will use a "notifications-queue" collection
@Queue('notifications')
export class NotificationsQueue {}
// with custom collection name
@Queue('notifications', { collection: 'notificationsqueue' })
export class NotificationsQueue {}
To define, but not schedule, a job on the queue, use the @Define()
decorator as shown below. To define a scheduled job, see Job schedulers.
import { Define, Queue, Job } from 'agenda-nest';
@Queue('notifications')
export class NotificationsQueue {
@Define()
sendNotification(job: Job) {}
}
To define and schedule a job on the queue, use one of the @Every()
, @Schedule()
, or @Now()
decorators. See Agenda's Creating Jobs documentation for an explanation on the behavior of each.
Defines a job to run at the given interval
import { Every, Queue, Job } from 'agenda-nest';
@Queue('notifications')
export class NotificationsQueue {
@Every({ name: 'send notifications', interval: '15 minutes' })
async sendNotifications(job: Job) {
const users = await User.doSomethingReallyIntensive();
sendNotification(users, "Welcome!");
}
}
@Queue('reports')
export class ReportsQueue {
@Every('15 minutes')
async printAnalyticsReport(job: Job) {
const users = await User.doSomethingReallyIntensive();
processUserData(users);
}
}
Schedules a job to run once at the given time.
import { Schedule, Queue, Job } from 'agenda-nest';
@Queue('notifications')
export class NotificationsQueue {
@Scheduler({ name: 'send notifications', when: 'tomorrow at noon' })
async sendNotifications(job: Job) {
const users = await User.doSomethingReallyIntensive();
sendNotification(users, "Welcome!");
}
}
@Queue('reports')
export class ReportsQueue {
@Schedule('tomorrow at noon')
async printAnalyticsReport(job: Job) {
const users = await User.doSomethingReallyIntensive();
processUserData(users);
}
}
Schedules a job to run once immediately.
import { Now, Queue, Job } from 'agenda-nest';
@Queue('dance')
export class DanceQueue {
@Now()
async doTheHokeyPokey(job: Job) {
hokeyPokey();
}
@Now('do the cha-cha')
async doTheChaCha(job: Job) {
chaCha();
}
}
Agenda generates a set of useful events when queue and/or job state changes occur. Agenda NestJS provides a set of decorators that allow subscribing to a core set of standard events.
Event listeners must be declared within an injectable class (i.e., within a class decorated with the @Queue() decorator). To listen for an event, use one of the decorators in the table below to declare a handler for the event. For example, to listen to the event emitted when a job enters the active state in the audio queue, use the following construct:
import { OnQueueReady } from 'agenda-nest';
import { Job } from 'agenda';
@Queue()
export class JobsQueue {
@OnQueueReady()
onReady() {
console.log('Jobs queue is ready to run our jobs');
}
...
An instance of an agenda will emit the queue events listed below. Use the corresponding method decorator to listen for and handle each event.
Event | Listener | |
---|---|---|
ready |
@OnQueueReady() |
called when Agenda mongo connection is successfully opened and indices created |
error |
@OnQueueError() |
called when Agenda mongo connection process has thrown an error |
An instance of an agenda will emit the job events listed below. Use the corresponding method decorator to listen for and handle each event.
Event | Listener | |
---|---|---|
start or start:job name |
@OnJobStart(name?: string) |
called just before a job starts |
complete or complete:job name |
@OnJobComplete(name?: string) |
called when a job finishes, regardless of if it succeeds or fails |
success or success:job name |
@OnJobSuccess(name?: string) |
called when a job finishes successfully |
fail or fail:job name |
@OnJobFail(name?: string) |
called when a job throws an error |
You can access any registered queue using the @InjectQueue(queueName)
decorator, which will inject the instance of Agenda
for the given queue name. See Agenda's documentation for the available API.
@Injectable()
export class NotificationsService {
constructor(@InjectQueue('notificiations') private queue: Agenda) {}
async scheduleNotification(sendAt: string) {
await this.queue.schedule('tomorrow at noon', 'sendNotification', {
to: '[email protected]',
});
}
}
Agenda Nest is MIT licensed.