Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send commands based on transport state instead of connected state #278

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 14 additions & 25 deletions src/centrifuge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export class UnauthorizedError extends Error {
/** Centrifuge is a Centrifuge/Centrifugo bidirectional client. */
export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<ClientEvents>) {
state: State;
private _transportIsOpen: boolean;
private _endpoint: string | Array<TransportEndpoint>;
private _emulation: boolean;
private _transports: any[];
Expand Down Expand Up @@ -111,6 +112,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
constructor(endpoint: string | Array<TransportEndpoint>, options?: Partial<Options>) {
super();
this.state = State.Disconnected;
this._transportIsOpen = false;
this._endpoint = endpoint;
this._emulation = false;
this._transports = [];
Expand Down Expand Up @@ -762,28 +764,11 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
const transportId = this._nextTransportId();
self._debug("id of transport", transportId);
let wasOpen = false;

let optimistic = true;
if (this._transport.name() === 'sse') {
// Avoid using optimistic subscriptions with SSE/EventSource as we are sending
// initial data in URL params. URL is recommended to be 2048 chars max – so adding
// subscription data may be risky.
optimistic = false;
}

const initialCommands: any[] = [];

if (this._transport.emulation()) {
const connectCommand = self._sendConnect(true);
initialCommands.push(connectCommand);
if (optimistic) {
const subscribeCommands: any[] = self._sendSubscribeCommands(true, true);
for (const i in subscribeCommands) {
if (subscribeCommands.hasOwnProperty(i)) {
initialCommands.push(subscribeCommands[i]);
}
}
}
}

this._setNetworkEvents();
Expand All @@ -810,15 +795,14 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}
wasOpen = true;
self._debug(transport.subName(), 'transport open');
self._transportWasOpen = true;
if (transport.emulation()) {
return;
}
self._transportIsOpen = true;
self._transportWasOpen = true;
self.startBatching();
self._sendConnect(false);
if (optimistic) {
self._sendSubscribeCommands(true, false);
}
self._sendSubscribeCommands();
self.stopBatching();
},
onError: function (e: any) {
Expand All @@ -839,6 +823,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}
self._debug(transport.subName(), 'transport closed');
self._transportClosed = true;
self._transportIsOpen = false;

let reason = 'connection closed';
let needReconnect = true;
Expand Down Expand Up @@ -1261,6 +1246,9 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
const transport = this._transport;
this._transport = null;
transport.close(); // Close only after setting this._transport to null to avoid recursion when calling transport close().
// Need to mark as closed here, because connect call may be sync called after disconnect,
// transport onClose callback will not be called yet
this._transportIsOpen = false;
FZambia marked this conversation as resolved.
Show resolved Hide resolved
this._transportClosed = true;
this._nextTransportId();
} else {
Expand Down Expand Up @@ -1379,7 +1367,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}

protected _unsubscribe(sub: Subscription) {
if (!this._isConnected()) {
if (!this._transportIsOpen) {
return;
}
const req = {
Expand Down Expand Up @@ -1415,7 +1403,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
return this._serverSubs[channel] !== undefined;
}

private _sendSubscribeCommands(optimistic: boolean, skipSending: boolean): any[] {
private _sendSubscribeCommands(): any[] {
const commands: any[] = [];
for (const channel in this._subs) {
if (!this._subs.hasOwnProperty(channel)) {
Expand All @@ -1428,7 +1416,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}
if (sub.state === SubscriptionState.Subscribing) {
// @ts-ignore – we are hiding some symbols from public API autocompletion.
const cmd = sub._subscribe(optimistic, skipSending);
const cmd = sub._subscribe();
if (cmd) {
commands.push(cmd);
}
Expand All @@ -1438,6 +1426,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}

private _connectResponse(result: any) {
this._transportIsOpen = true;
this._transportWasOpen = true;
this._reconnectAttempts = 0;
this._refreshRequired = false;
Expand All @@ -1460,7 +1449,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
this._node = result.node;

this.startBatching();
this._sendSubscribeCommands(false, false);
this._sendSubscribeCommands();
this.stopBatching();

const ctx: any = {
Expand Down
35 changes: 22 additions & 13 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,19 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
if (this._setState(SubscriptionState.Subscribing)) {
this.emit('subscribing', { channel: this.channel, code: code, reason: reason });
}
this._subscribe(false, false);
if (code === subscribingCodes.subscribeCalled) {
FZambia marked this conversation as resolved.
Show resolved Hide resolved
this._subscribe();
}
}

private _subscribe(optimistic: boolean, skipSending: boolean): any {
private _subscribe(): any {
// @ts-ignore – we are hiding some symbols from public API autocompletion.
this._centrifuge._debug('subscribing on', this.channel);

if (this._centrifuge.state !== State.Connected && !optimistic) {
// need to check transport readiness here, because there's no point for calling getData or getToken
// if transport is not ready yet
// @ts-ignore – we are hiding some symbols from public API autocompletion.
if (!this._centrifuge._transportIsOpen) {
// @ts-ignore – we are hiding some symbols from public API autocompletion.
this._centrifuge._debug('delay subscribe on', this.channel, 'till connected');
// subscribe will be called later automatically.
Expand All @@ -280,16 +285,14 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
return;
}
self._data = data;
self._sendSubscribe(self._token, false);
self._sendSubscribe(self._token);
})
return null;
} else {
return self._sendSubscribe(self._token, skipSending);
return self._sendSubscribe(self._token);
}
}
if (optimistic) {
return null;
}

this._getSubscriptionToken().then(function (token) {
if (!self._isSubscribing()) {
return;
Expand All @@ -305,10 +308,10 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
return;
}
self._data = data;
self._sendSubscribe(token, false);
self._sendSubscribe(token);
})
} else {
self._sendSubscribe(token, false);
self._sendSubscribe(token);
}
}).catch(function (e) {
if (!self._isSubscribing()) {
Expand All @@ -331,7 +334,13 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
return null;
}

private _sendSubscribe(token: string, skipSending: boolean): any {
private _sendSubscribe(token: string): any {
// we also need to check for transport state before sending subscription
// because it may change for subscription with side effects (getData, getToken options)
// @ts-ignore – we are hiding some symbols from public API autocompletion.
if (!this._centrifuge._transportIsOpen) {
return null;
}
const channel = this.channel;

const req: any = {
Expand Down Expand Up @@ -375,7 +384,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this._inflight = true;

// @ts-ignore – we are hiding some symbols from public API autocompletion.
this._centrifuge._call(cmd, skipSending).then(resolveCtx => {
this._centrifuge._call(cmd).then(resolveCtx => {
this._inflight = false;
// @ts-ignore - improve later.
const result = resolveCtx.reply.subscribe;
Expand Down Expand Up @@ -492,7 +501,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
const delay = this._getResubscribeDelay();
this._resubscribeTimeout = setTimeout(function () {
if (self._isSubscribing()) {
self._subscribe(false, false);
self._subscribe();
}
}, delay);
}
Expand Down