Skip to content

Commit

Permalink
Merge pull request davidyaha#65 from AlexLeung/master
Browse files Browse the repository at this point in the history
Implemented Generic AsyncIterator
  • Loading branch information
davidyaha authored Jun 14, 2017
2 parents 2f5c979 + 9b7262c commit e490c63
Show file tree
Hide file tree
Showing 7 changed files with 400 additions and 68 deletions.
11 changes: 5 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"pretest": "npm run compile",
"test": "npm run testonly -- && npm run integration --",
"posttest": "npm run lint",
"lint": "tslint ./src/**/*.ts",
"lint": "tslint --type-check --project ./tsconfig.json ./src/**/*.ts",
"watch": "tsc -w",
"testonly": "mocha --reporter spec --full-trace ./dist/test/tests.js ",
"integration": "npm run compile && mocha --reporter spec --full-trace ./dist/test/integration-tests.js ",
Expand All @@ -34,12 +34,11 @@
"prepublish": "npm run test"
},
"dependencies": {
"async": "^2.0.1",
"graphql-subscriptions": "^0.3.1",
"graphql-subscriptions": "^0.4.2",
"iterall": "^1.1.1",
"redis": "^2.6.3"
},
"devDependencies": {
"@types/async": "^2.0.35",
"@types/chai": "^3.4.34",
"@types/chai-as-promised": "0.0.30",
"@types/graphql": "^0.9.0",
Expand All @@ -49,13 +48,13 @@
"@types/simple-mock": "0.0.27",
"chai": "^3.5.0",
"chai-as-promised": "^6.0.0",
"graphql": "^0.9.6",
"graphql": "^0.10.1",
"istanbul": "1.0.0-alpha.2",
"mocha": "^3.0.0",
"remap-istanbul": "^0.9.5",
"simple-mock": "^0.7.0",
"tslint": "^5.2.0",
"typescript": "^2.1.4"
"typescript": "^2.3.4"
},
"typings": "dist/index.d.ts",
"typescript": {
Expand Down
111 changes: 111 additions & 0 deletions src/pubsub-async-iterator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import { $$asyncIterator } from 'iterall';
import {PubSubEngine} from 'graphql-subscriptions/dist/pubsub-engine';

/**
* A class for digesting PubSubEngine events via the new AsyncIterator interface.
* This implementation is a generic version of the one located at
* https://github.com/apollographql/graphql-subscriptions/blob/master/src/event-emitter-to-async-iterator.ts
* @class
*
* @constructor
*
* @property pullQueue @type {Function[]}
* A queue of resolve functions waiting for an incoming event which has not yet arrived.
* This queue expands as next() calls are made without PubSubEngine events occurring in between.
*
* @property pushQueue @type {any[]}
* A queue of PubSubEngine events waiting for next() calls to be made.
* This queue expands as PubSubEngine events arrice without next() calls occurring in between.
*
* @property eventsArray @type {string[]}
* An array of PubSubEngine event names which this PubSubAsyncIterator should watch.
*
* @property allSubscribed @type {Promise<number[]>}
* A promise of a list of all subscription ids to the passed PubSubEngine.
*
* @property listening @type {boolean}
* Whether or not the PubSubAsynIterator is in listening mode (responding to incoming PubSubEngine events and next() calls).
* Listening begins as true and turns to false once the return method is called.
*
* @property pubsub @type {PubSubEngine}
* The PubSubEngine whose events will be observed.
*/
export class PubSubAsyncIterator<T> implements AsyncIterator<T> {

constructor(pubsub: PubSubEngine, eventNames: string | string[]) {
this.pubsub = pubsub;
this.pullQueue = [];
this.pushQueue = [];
this.listening = true;
this.eventsArray = typeof eventNames === 'string' ? [eventNames] : eventNames;
this.allSubscribed = this.subscribeAll();
}

public async next() {
await this.allSubscribed;
return this.listening ? this.pullValue() : this.return();
}

public async return() {
this.emptyQueue(await this.allSubscribed);
return { value: undefined, done: true };
}

public async throw(error) {
this.emptyQueue(await this.allSubscribed);
return Promise.reject(error);
}

public [$$asyncIterator]() {
return this;
}

private pullQueue: Function[];
private pushQueue: any[];
private eventsArray: string[];
private allSubscribed: Promise<number[]>;
private listening: boolean;
private pubsub: PubSubEngine;

private async pushValue(event) {
await this.allSubscribed;
if (this.pullQueue.length !== 0) {
this.pullQueue.shift()({ value: event, done: false });
} else {
this.pushQueue.push(event);
}
}

private pullValue() {
return new Promise((resolve => {
if (this.pushQueue.length !== 0) {
resolve({ value: this.pushQueue.shift(), done: false });
} else {
this.pullQueue.push(resolve);
}
}).bind(this));
}

private emptyQueue(subscriptionIds: number[]) {
if (this.listening) {
this.listening = false;
this.unsubscribeAll(subscriptionIds);
this.pullQueue.forEach(resolve => resolve({ value: undefined, done: true }));
this.pullQueue.length = 0;
this.pushQueue.length = 0;
}
}

private subscribeAll() {
return Promise.all(this.eventsArray.map(
eventName => this.pubsub.subscribe(eventName, this.pushValue.bind(this), {}),
));
}

private unsubscribeAll(subscriptionIds: number[]) {
for (const subscriptionId of subscriptionIds) {
this.pubsub.unsubscribe(subscriptionId);
}
}

}
16 changes: 9 additions & 7 deletions src/redis-pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {PubSubEngine} from 'graphql-subscriptions/dist/pubsub';
import {PubSubEngine} from 'graphql-subscriptions/dist/pubsub-engine';
import {createClient, RedisClient, ClientOpts as RedisOptions} from 'redis';
import {each} from 'async';
import {PubSubAsyncIterator} from './pubsub-async-iterator';

export interface PubSubRedisOptions {
connection?: RedisOptions;
Expand Down Expand Up @@ -87,6 +87,10 @@ export class RedisPubSub implements PubSubEngine {
delete this.subscriptionMap[subId];
}

public asyncIterator<T>(triggers: string | string[]): AsyncIterator<T> {
return new PubSubAsyncIterator<T>(this, triggers);
}

private onMessage(channel: string, message: string) {
const subscribers = this.subsRefsMap[channel];

Expand All @@ -100,12 +104,10 @@ export class RedisPubSub implements PubSubEngine {
parsedMessage = message;
}

each(subscribers, (subId, cb) => {
// TODO Support pattern based subscriptions
const [triggerName, listener] = this.subscriptionMap[subId];
for (const subId of subscribers) {
const listener = this.subscriptionMap[subId][1];
listener(parsedMessage);
cb();
});
}
}

private triggerTransform: TriggerTransform;
Expand Down
Loading

0 comments on commit e490c63

Please sign in to comment.