diff --git a/main/contracts/contracts.ts b/main/contracts/contracts.ts index e6e83f7..31d838e 100644 --- a/main/contracts/contracts.ts +++ b/main/contracts/contracts.ts @@ -96,6 +96,26 @@ export interface IMessageBroker { * This RSVP function is used by responders and is analogous to the 'Get' function. Responders when invoked must return the required response value type. */ rsvp>(channelName: K, handler: RSVPHandler): IResponderRef; + + /** + * Creates a new scope with this instance of the MessageBroker as its parent. + * Messages from the scope will be passed to this instance if the child scope doesn't have a handler for it. + * @returns A new instance of the messagebroker + */ + createScope(): IMessageBroker; + + /* + * Disposes of all message channels on this instance. + * It also destroys the connection between this and its parent so that messages will no longer propogate up. + */ + destroy(): void; + + /** + * Returns true if this is root node of the tree of MessageBrokers. + * The root MessageBroker will not have a parent MessageBroker. + * @returns A boolean indicating whether this is the root or not + */ + isRoot(): boolean; } /** diff --git a/main/core/messagebroker.ts b/main/core/messagebroker.ts index f1d723c..be7dd43 100644 --- a/main/core/messagebroker.ts +++ b/main/core/messagebroker.ts @@ -33,11 +33,11 @@ export function messagebroker(): IMessageBroker { * Represents a messagebroker. Using the 'new' operator is discouraged, instead use the messagebroker() function or dependency injection. */ @Injectable() -export class MessageBroker implements IMessageBroker { +export class MessageBroker implements IMessageBroker { private channelLookup: ChannelModelLookup = {}; private messagePublisher = new Subject>(); - constructor(private rsvpMediator: RSVPMediator) {} + constructor(private rsvpMediator: RSVPMediator, private _parent?: MessageBroker) {} /** * Creates a new channel with the provided channelName. An optional config object can be passed that specifies how many messages to cache. @@ -99,6 +99,29 @@ export class MessageBroker implements IMessageBroker { delete this.channelLookup[channelName]; } + /** + * Creates a new scope with this instance of the MessageBroker as its parent. + * Messages from the scope will be passed to this instance if the child scope doesn't have a handler for it. + * @returns A new instance of the messagebroker + */ + public createScope(): IMessageBroker { + const instance = new MessageBroker(this.rsvpMediator, this); + return instance; + } + + /* + * Disposes of all message channels on this instance. + * It also destroys the connection between this and its parent so that messages will no longer propogate up. + */ + public destroy(): void { + type Channels = (keyof typeof this.channelLookup)[]; + (Object.keys(this.channelLookup) as Channels).forEach((channelName) => this.dispose(channelName)); + + if (this._parent) { + this._parent = undefined; + } + } + /** * Return a deferred observable as the channel config may have been updated before the subscription * @param channelName name of channel to subscribe to @@ -143,7 +166,16 @@ export class MessageBroker implements IMessageBroker { } const publishFunction = (data?: T[K], type?: string): void => { - this.messagePublisher.next(this.createMessage(channelName, data, type)); + // If there is any registered subscriber for the channel on this broker, then let those handle the message. + // Otherwise, pass it up the chain to the parent to see if they can handle it. + if (this.messagePublisher.observed) { + this.messagePublisher.next(this.createMessage(channelName, data, type)); + } else if (this._parent) { + // It is possible that this channel being published on does NOT exist on the parent. + // In that case, the message will simply be passed up and ignored + // since no one higher up the chain will be able to create a subscriber for this channel. + this._parent.create(channelName as any).publish(data); + } }; // Stream should return a deferred observable @@ -180,4 +212,12 @@ export class MessageBroker implements IMessageBroker { ): channel is RequiredPick, 'config' | 'subscription'> { return channel != null && channel.subscription != null; } + + public isRoot(): boolean { + return this._parent === undefined; + } + + protected get parent(): MessageBroker | undefined { + return this._parent; + } } diff --git a/package.json b/package.json index ae4fba4..04536dd 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@morgan-stanley/message-broker", - "version": "1.0.1", + "version": "1.1.0", "description": "Framework agnostic messagebroker for decoupled communication.", "main": "dist/main/index.js", "types": "dist/main/index.d.ts", diff --git a/site/content/documentation/1.0.0/scopes.mdx b/site/content/documentation/1.0.0/scopes.mdx new file mode 100644 index 0000000..1101dc2 --- /dev/null +++ b/site/content/documentation/1.0.0/scopes.mdx @@ -0,0 +1,84 @@ +--- +order: 5 +title: Scopes +--- + +Scopes are a mechanism for creating a tree of messagebrokers, where a message will be passed up the tree until either a handler is found for the message, or the root of the tree is reached. +You can create a new scope on a messagebroker by calling `createScope`. + +```typescript +const parent: IMessageBroker + = messagebroker(); + +const child: IMessageBroker + = parent.createScope(); +``` + +A scope is just another instance of an `IMessageBroker` on which you can perform all of the same operations that you'd expect on the base messagebroker. +The main thing to note about this feature is how messages are handled when a scope doesn't have any subscribers for a channel. + +### Scope Hierarchies + +Any message that is published to a broker will be passed up the chain of scopes until a handler for the message is found. + +```typescript +parent.get('x').subscribe(message => console.log('parent received')); + +child.create('x').publish({}); + +// expect: parent received +``` + +However messages are **not** sent up the hierarchy if the channel in the child has been subscribed to. + +```typescript +parent.get('x').subscribe(message => console.log('parent received')); +child.get('x').subscribe(message => console.log('child received')); + +child.create('x').publish({}); + +// expect: child received +``` + +Messages are also not published to "sibling" scopes, where the brokers share a parent. + +```typescript +const sibling: IMessageBroker + = parent.createScope(); + +parent.get('x').subscribe(message => console.log('parent received')); +child.get('x').subscribe(message => console.log('child received')); +sibling.get('x').subscribe(message => console.log('sibling received')); + +sibling.create('x').publish({}); + +// expect: sibling received +``` + +### Scope Depth + +Scope hierarchies can be arbitrarily deep, and messages will make their way all the way to the top to find a handler. + +```typescript +const distantChild = parent + .createScope() + .createScope() + ... + .createScope(); + +parent.get('x').subscribe(message => console.log('parent received')); + +distantChild.create('x').publish({}); + +// expect: parent received +``` + +### Destroy + +A MessageBroker instance can be destroyed. +Destroying a MessageBroker will first destroy all of its children, it will dispose of all its channels, and finally remove itself from its parent. + +```typescript +child.destroy(); +parent.children.contains(child); // expect: false +``` \ No newline at end of file diff --git a/spec/core/messagebroker.spec.ts b/spec/core/messagebroker.spec.ts index 8ed9988..9334dff 100644 --- a/spec/core/messagebroker.spec.ts +++ b/spec/core/messagebroker.spec.ts @@ -24,7 +24,7 @@ describe('MessageBroker', () => { mockRSVPMediator = Mock.create>().setup(setupFunction('rsvp')); }); - function getInstance(): MessageBroker { + function getInstance(): MessageBroker { return new MessageBroker(mockRSVPMediator.mock); } @@ -368,6 +368,119 @@ describe('MessageBroker', () => { }); }); + describe('Scopes', () => { + it('should return a new messagebroker instance for each new scope', () => { + const instance = getInstance(); + const scope = instance.createScope(); + const scope2 = instance.createScope(); + + expect(scope).not.toBe(instance); + expect(scope).not.toBe(scope2); + }); + + it('should publish messages from child to parent if there is no handler on child', () => { + const parentMessages: Array> = []; + const parent = getInstance(); + const child = parent.createScope(); + + parent.get('channel').subscribe((message) => parentMessages.push(message)); + child.create('channel').publish('parent should handle this'); + + expect(parentMessages.length).toEqual(1); + verifyMessage(parentMessages[0], 'parent should handle this'); + }); + + it('should not publish messages from child to parent if there is a handler on child', () => { + const parentMessages: Array> = []; + const childMessages: Array> = []; + const parent = getInstance(); + const child = parent.createScope(); + + parent.get('channel').subscribe((message) => parentMessages.push(message)); + child.get('channel').subscribe((message) => childMessages.push(message)); + + child.create('channel').publish('child should handle this'); + + expect(parentMessages.length).toEqual(0); + + expect(childMessages.length).toEqual(1); + verifyMessage(childMessages[0], 'child should handle this'); + }); + + it('should not publish messages to "sibling" scopes', () => { + const brotherMessages: Array> = []; + const sisterMessages: Array> = []; + const parent = getInstance(); + const brother = parent.createScope(); + const sister = parent.createScope(); + + brother.get('channel').subscribe((message) => brotherMessages.push(message)); + sister.get('channel').subscribe((message) => sisterMessages.push(message)); + + brother.create('channel').publish('brother should get this'); + sister.create('channel').publish('sister should get this'); + + expect(brotherMessages.length).toEqual(1); + verifyMessage(brotherMessages[0], 'brother should get this'); + + expect(sisterMessages.length).toEqual(1); + verifyMessage(sisterMessages[0], 'sister should get this'); + }); + + describe('Destroy', () => { + it('should dispose of all subscriptions on that instance and its child', () => { + const instance = getInstance(); + const instanceChannel = instance.create('yourChannel'); + const child = instance.createScope(); + const childChannel = instance.create('yourChannel'); + + instance.destroy(); // destroy the PARENT + + const postDisposeInstanceChannel = instance.create('yourChannel'); + const postDisposeChildChannel = child.create('yourChannel'); + + expect(postDisposeInstanceChannel).not.toBe(instanceChannel); + expect(postDisposeChildChannel).not.toBe(childChannel); + }); + + it('should prevent message propagation from happening', () => { + const childMessages: Array> = []; + const parentMessages: Array> = []; + const parent = getInstance(); + const child = parent.createScope(); + + parent.get('channel').subscribe((message) => parentMessages.push(message)); + + child.destroy(); + + child.create('channel').publish('message'); + + expect(childMessages.length).toEqual(0); + expect(parentMessages.length).toEqual(0); + }); + + it('should destroy all cached messages on parent as well', () => { + const parent = getInstance(); + const child = parent.createScope(); + + const parentChannel = parent.create('channel', { replayCacheSize: 2 }); + const childChannel = child.create('channel', { replayCacheSize: 2 }); + + childChannel.publish('message one'); + childChannel.publish('message two'); + + child.destroy(); // this should cancel the existing caching subscriptions + + const parentMessages: Array> = []; + parentChannel.stream.subscribe((message) => parentMessages.push(message)); + + childChannel.publish('message three'); + + expect(parentMessages.length).toEqual(0); + }); + }); + }); + function verifyMessage(message: IMessage, expectedData: T, expectedType?: string) { expect(message).toBeDefined(); expect(message.data).toEqual(expectedData); diff --git a/tsconfig.json b/tsconfig.json index fa00f2b..089fe2c 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,11 +1,12 @@ { "compilerOptions": { - "target": "ES5", + "target": "ES2017", "module": "commonjs", "lib": [ "ES5", "ES6", "DOM", + "ES2017" ], "declaration": true, "sourceMap": true,