The InboxOutboxModule
is solution designed for NestJS to tackle the challenges of dual write and reliable event delivery in distributed systems. It addresses scenarios where one module emits an integration event, and another module must receive and process this information to maintain system-wide data consistency, which is not possible with a in-memory event bus.
-
Dual Write Consistency: Ensures that database updates and event emissions are atomic, preventing scenarios where data is updated but the corresponding event fails to emit (or vice versa).
-
Reliable Event Delivery: Guarantees that events are delivered to their intended recipients, even in the face of temporary network issues, errors or service downtime/crash.
-
Cross-Module Consistency: Facilitates keeping data consistent across different modules or microservices by ensuring that all relevant parts of the system are updated based on emitted events.
npm i @nestixis/nestjs-inbox-outbox
import { InboxOutboxEvent } from '@nestixis/inbox-outbox';
export class UserApplicationAssignedEvent implements InboxOutboxEvent {
public readonly name = UserApplicationAssignedEvent.name;
constructor(
public readonly userToken: string,
public readonly applicationToken: string,
public readonly userApplicationToken: string,
) {}
}
Decorator Usage: To listen for an event, apply the @Listener
decorator to a class method, referencing the specific event's name.
import { Listener } from '@nestixis/inbox-outbox';
@Listener(UserApplicationAssignedEvent.name)
Interface Implementation: Implement the IListener<T>
interface to define the listener's behavior for the corresponding event.
import { IListener } from '@nestixis/inbox-outbox';
class EmitIntegrationEventOnUserApplicationUpdateListener implements IListener<UserApplicationAssignedEvent> {
// Implementation details...
}
Or in case when you want to listen to multiple events:
import { Listener, IListener } from '@nestixis/inbox-outbox';
@Listener([
UserApplicationAssignedEvent.name,
UserApplicationAssigningEvent.name,
])
export class EmitIntegrationEventOnUserApplicationUpdateListener
implements
IListener<
| UserApplicationAssignedEvent
| UserApplicationAssigningEvent
>
{
constructor(
private eventEmitter: EventEmitter2,
private queryBus: QueryBus,
) {}
async handle(
event:
| UserApplicationAssignedEvent
| UserApplicationAssigningEvent
): Promise<void> {
// Implementation details...
}
}
Note: You should only group events that are related to each other (By case and data) in the same listener. If you have events that are not related to each other, you should create a separate listener for each event.
The module uses a TransactionalEventEmitter
for reliable event emission. This component is designed to work similarly to eventemitter2, but with added transactional capabilities.
-
Transactional Emission: The
TransactionalEventEmitter
takes two arguments:- The event to be emitted
- An array of entities to be saved or removed in the transaction
-
Immediate Delivery Attempt: Upon emission, the system immediately tries to deliver the event to registered listeners.
-
Fallback Mechanism: If immediate delivery fails (due to network issues, service unavailability, etc.), a built-in polling mechanism ensures eventual delivery.
-
Events are only emitted if the associated database transaction succeeds.
-
Even if immediate delivery fails, the event will eventually be processed.
-
Emitting an Event: To emit an event, use the
emit
method of thetransactionalEventEmitter
, providing the event object and associated transactional entities. Operation has to be awaited.import { TransactionalEventEmitterOperations, transactionalEventEmitter } from '@nestixis/inbox-outbox'; constructor(private readonly transactionalEventEmitter: TransactionalEventEmitter) {} await this.transactionalEventEmitter.emit( new UserApplicationAssignedEvent(user.token, application.token, userApplication.token), [{ entity: userApplication, operation: TransactionalEventEmitterOperations.persist, }] );
-
Event Contract: Ensure that your event classes implement the
InboxOutboxEvent
interface for consistency and clarity.
- expiresAtTTL: This is how long the event will be stored in the database and will be retried
- maxExecutionTimeTTL: This is how long it will wait for the listener to process the event, if it takes longer than this, it will be retried
- readyToRetryAfterTTL: This is how long it will wait before retrying the event listeners
- retryEveryMilliseconds: This is how often it will check for events that need to be retried
- maxInboxOutboxTransportEventPerRetry: This is how many events it will retry at a time
- Register the
InboxOutboxModule
within your application's bootstrap process, specifying global accessibility and event configurations.InboxOutboxModule.registerAsync({ isGlobal: true, imports: [ TypeOrmModule.forFeature([TypeOrmInboxOutboxTransportEvent, Cat]), ], useFactory: (dataSource: DataSource) => { const driverFactory = new TypeORMDatabaseDriverFactory(dataSource); return { driverFactory: driverFactory, events: [ { name: UserApplicationAssignedEvent.name, listeners: { expiresAtTTL: 1000 * 60 * 60 * 24, maxExecutionTimeTTL: 1000 * 15, readyToRetryAfterTTL: 10000, }, }, ], retryEveryMilliseconds: 30_000, maxInboxOutboxTransportEventPerRetry: 10, }; }, inject: [DataSource], }),
To extend the InboxOutboxModule with support for additional ORMs or databases, you can create a new driver. Follow these steps to implement and integrate your custom driver:
Begin by forking the main InboxOutboxModule repository to your own GitHub account.
Use Lerna to create a new package in the packages
folder:
lerna create @nestixis/your-orm-driver
Alternatively, you can copy an existing driver package and modify it.
Develop your driver by implementing the DatabaseDriver
interface. Pay special attention to:
- Transaction handling
- Pessimistic locking mechanisms
- Persist and flush operations
These aspects are crucial for maintaining data consistency and performance.
Create a factory class that implements the DatabaseDriverFactory
interface. This factory will be responsible for instantiating your custom driver.
Create a model that can be persisted in your target database. This model shall implement the InboxOutboxTransportEvent
interface.
Create a demo application that utilizes your new driver. This PoC will serve as both a testing ground and an example for other developers.
You have two options for making your driver available:
- Create a Pull Request to the main InboxOutboxModule repository for inclusion in the official release.
- Publish your driver to npm under your own namespace.
- Ensure comprehensive test coverage for your driver.
- Document any database-specific considerations or configurations.
- Follow the coding standards and conventions established in the existing drivers.
- Consider performance implications, especially for high-throughput systems.