Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing Scopes #102

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions main/contracts/contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,26 @@ export interface IMessageBroker<T> {
* This RSVP function is used by responders and is analogous to the 'Get' function. Responders when invoked must return the required response value type.
*/
rsvp<K extends keyof RSVPOf<T>>(channelName: K, handler: RSVPHandler<T>): IResponderRef;

/**
* Creates a new scope with this instance of the MessageBroker as its parent.
* Messages from the scope will be passed to this instance if the child scope doesn't have a handler for it.
* @returns A new instance of the messagebroker
*/
createScope<K extends T>(): IMessageBroker<K>;

/*
* Disposes of all message channels on this instance.
* It also destroys the connection between this and its parent so that messages will no longer propogate up.
*/
destroy(): void;

/**
* Returns true if this is root node of the tree of MessageBrokers.
* The root MessageBroker will not have a parent MessageBroker.
* @returns A boolean indicating whether this is the root or not
*/
isRoot(): boolean;
}

/**
Expand Down
46 changes: 43 additions & 3 deletions main/core/messagebroker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ export function messagebroker<T = any>(): IMessageBroker<T> {
* Represents a messagebroker. Using the 'new' operator is discouraged, instead use the messagebroker() function or dependency injection.
*/
@Injectable()
export class MessageBroker<T = any> implements IMessageBroker<T> {
export class MessageBroker<T extends TParent = any, TParent = any> implements IMessageBroker<T> {
private channelLookup: ChannelModelLookup<T> = {};
private messagePublisher = new Subject<IMessage<any>>();

constructor(private rsvpMediator: RSVPMediator<T>) {}
constructor(private rsvpMediator: RSVPMediator<T>, private _parent?: MessageBroker<TParent>) {}

/**
* Creates a new channel with the provided channelName. An optional config object can be passed that specifies how many messages to cache.
Expand Down Expand Up @@ -99,6 +99,29 @@ export class MessageBroker<T = any> implements IMessageBroker<T> {
delete this.channelLookup[channelName];
}

/**
* Creates a new scope with this instance of the MessageBroker as its parent.
* Messages from the scope will be passed to this instance if the child scope doesn't have a handler for it.
* @returns A new instance of the messagebroker
*/
public createScope<K extends T>(): IMessageBroker<K> {
const instance = new MessageBroker<K, T>(this.rsvpMediator, this);
return instance;
}

/*
* Disposes of all message channels on this instance.
* It also destroys the connection between this and its parent so that messages will no longer propogate up.
*/
public destroy(): void {
type Channels = (keyof typeof this.channelLookup)[];
(Object.keys(this.channelLookup) as Channels).forEach((channelName) => this.dispose(channelName));

if (this._parent) {
this._parent = undefined;
}
}

/**
* Return a deferred observable as the channel config may have been updated before the subscription
* @param channelName name of channel to subscribe to
Expand Down Expand Up @@ -143,7 +166,16 @@ export class MessageBroker<T = any> implements IMessageBroker<T> {
}

const publishFunction = (data?: T[K], type?: string): void => {
this.messagePublisher.next(this.createMessage(channelName, data, type));
// If there is any registered subscriber for the channel on this broker, then let those handle the message.
// Otherwise, pass it up the chain to the parent to see if they can handle it.
if (this.messagePublisher.observed) {
this.messagePublisher.next(this.createMessage(channelName, data, type));
} else if (this._parent) {
// It is possible that this channel being published on does NOT exist on the parent.
// In that case, the message will simply be passed up and ignored
// since no one higher up the chain will be able to create a subscriber for this channel.
this._parent.create(channelName as any).publish(data);
}
};

// Stream should return a deferred observable
Expand Down Expand Up @@ -180,4 +212,12 @@ export class MessageBroker<T = any> implements IMessageBroker<T> {
): channel is RequiredPick<IChannelModel<T[K]>, 'config' | 'subscription'> {
return channel != null && channel.subscription != null;
}

public isRoot(): boolean {
return this._parent === undefined;
}

protected get parent(): MessageBroker<TParent> | undefined {
return this._parent;
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@morgan-stanley/message-broker",
"version": "1.0.1",
"version": "1.1.0",
"description": "Framework agnostic messagebroker for decoupled communication.",
"main": "dist/main/index.js",
"types": "dist/main/index.d.ts",
Expand Down
84 changes: 84 additions & 0 deletions site/content/documentation/1.0.0/scopes.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
---
order: 5
title: Scopes
---

Scopes are a mechanism for creating a tree of messagebrokers, where a message will be passed up the tree until either a handler is found for the message, or the root of the tree is reached.
You can create a new scope on a messagebroker by calling `createScope`.

```typescript
const parent: IMessageBroker<IContract>
= messagebroker<IContract>();

const child: IMessageBroker<IContract>
= parent.createScope();
```

A scope is just another instance of an `IMessageBroker` on which you can perform all of the same operations that you'd expect on the base messagebroker.
The main thing to note about this feature is how messages are handled when a scope doesn't have any subscribers for a channel.

### Scope Hierarchies

Any message that is published to a broker will be passed up the chain of scopes until a handler for the message is found.

```typescript
parent.get('x').subscribe(message => console.log('parent received'));

child.create('x').publish({});

// expect: parent received
```

However messages are **not** sent up the hierarchy if the channel in the child has been subscribed to.

```typescript
parent.get('x').subscribe(message => console.log('parent received'));
child.get('x').subscribe(message => console.log('child received'));

child.create('x').publish({});

// expect: child received
```

Messages are also not published to "sibling" scopes, where the brokers share a parent.

```typescript
const sibling: IMessageBroker<IContract>
= parent.createScope();

parent.get('x').subscribe(message => console.log('parent received'));
child.get('x').subscribe(message => console.log('child received'));
sibling.get('x').subscribe(message => console.log('sibling received'));

sibling.create('x').publish({});

// expect: sibling received
```

### Scope Depth

Scope hierarchies can be arbitrarily deep, and messages will make their way all the way to the top to find a handler.

```typescript
const distantChild = parent
.createScope()
.createScope()
...
.createScope();

parent.get('x').subscribe(message => console.log('parent received'));

distantChild.create('x').publish({});

// expect: parent received
```

### Destroy

A MessageBroker instance can be destroyed.
Destroying a MessageBroker will first destroy all of its children, it will dispose of all its channels, and finally remove itself from its parent.

```typescript
child.destroy();
parent.children.contains(child); // expect: false
```
115 changes: 114 additions & 1 deletion spec/core/messagebroker.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe('MessageBroker', () => {
mockRSVPMediator = Mock.create<RSVPMediator<any>>().setup(setupFunction('rsvp'));
});

function getInstance<T = any>(): MessageBroker {
function getInstance<T = any>(): MessageBroker<T> {
return new MessageBroker<T>(mockRSVPMediator.mock);
}

Expand Down Expand Up @@ -368,6 +368,119 @@ describe('MessageBroker', () => {
});
});

describe('Scopes', () => {
it('should return a new messagebroker instance for each new scope', () => {
const instance = getInstance<IMySampleBroker>();
const scope = instance.createScope();
const scope2 = instance.createScope();

expect(scope).not.toBe(instance);
expect(scope).not.toBe(scope2);
});

it('should publish messages from child to parent if there is no handler on child', () => {
const parentMessages: Array<IMessage<string>> = [];
const parent = getInstance();
const child = parent.createScope();

parent.get('channel').subscribe((message) => parentMessages.push(message));
child.create('channel').publish('parent should handle this');

expect(parentMessages.length).toEqual(1);
verifyMessage(parentMessages[0], 'parent should handle this');
});

it('should not publish messages from child to parent if there is a handler on child', () => {
const parentMessages: Array<IMessage<string>> = [];
const childMessages: Array<IMessage<string>> = [];
const parent = getInstance();
const child = parent.createScope();

parent.get('channel').subscribe((message) => parentMessages.push(message));
child.get('channel').subscribe((message) => childMessages.push(message));

child.create('channel').publish('child should handle this');

expect(parentMessages.length).toEqual(0);

expect(childMessages.length).toEqual(1);
verifyMessage(childMessages[0], 'child should handle this');
});

it('should not publish messages to "sibling" scopes', () => {
const brotherMessages: Array<IMessage<string>> = [];
const sisterMessages: Array<IMessage<string>> = [];
const parent = getInstance();
const brother = parent.createScope();
const sister = parent.createScope();

brother.get('channel').subscribe((message) => brotherMessages.push(message));
sister.get('channel').subscribe((message) => sisterMessages.push(message));

brother.create('channel').publish('brother should get this');
sister.create('channel').publish('sister should get this');

expect(brotherMessages.length).toEqual(1);
verifyMessage(brotherMessages[0], 'brother should get this');

expect(sisterMessages.length).toEqual(1);
verifyMessage(sisterMessages[0], 'sister should get this');
});

describe('Destroy', () => {
it('should dispose of all subscriptions on that instance and its child', () => {
const instance = getInstance();
const instanceChannel = instance.create('yourChannel');
const child = instance.createScope();
const childChannel = instance.create('yourChannel');

instance.destroy(); // destroy the PARENT

const postDisposeInstanceChannel = instance.create('yourChannel');
const postDisposeChildChannel = child.create('yourChannel');

expect(postDisposeInstanceChannel).not.toBe(instanceChannel);
expect(postDisposeChildChannel).not.toBe(childChannel);
});

it('should prevent message propagation from happening', () => {
const childMessages: Array<IMessage<string>> = [];
const parentMessages: Array<IMessage<string>> = [];
const parent = getInstance();
const child = parent.createScope();

parent.get('channel').subscribe((message) => parentMessages.push(message));

child.destroy();

child.create('channel').publish('message');

expect(childMessages.length).toEqual(0);
expect(parentMessages.length).toEqual(0);
});

it('should destroy all cached messages on parent as well', () => {
const parent = getInstance();
const child = parent.createScope();

const parentChannel = parent.create('channel', { replayCacheSize: 2 });
const childChannel = child.create('channel', { replayCacheSize: 2 });

childChannel.publish('message one');
childChannel.publish('message two');

child.destroy(); // this should cancel the existing caching subscriptions

const parentMessages: Array<IMessage<string>> = [];
parentChannel.stream.subscribe((message) => parentMessages.push(message));

childChannel.publish('message three');

expect(parentMessages.length).toEqual(0);
});
});
});

function verifyMessage<T>(message: IMessage<T>, expectedData: T, expectedType?: string) {
expect(message).toBeDefined();
expect(message.data).toEqual(expectedData);
Expand Down
3 changes: 2 additions & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
{
"compilerOptions": {
"target": "ES5",
"target": "ES2017",
"module": "commonjs",
"lib": [
"ES5",
"ES6",
"DOM",
"ES2017"
],
"declaration": true,
"sourceMap": true,
Expand Down