Skip to content

Commit

Permalink
Change sending commands logic, rely on transport state instead of cli…
Browse files Browse the repository at this point in the history
…ent connected state
  • Loading branch information
ThisIsEsh committed Apr 4, 2024
1 parent 04e4f7b commit 7ab5590
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 38 deletions.
43 changes: 18 additions & 25 deletions src/centrifuge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export class UnauthorizedError extends Error {
/** Centrifuge is a Centrifuge/Centrifugo bidirectional client. */
export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<ClientEvents>) {
state: State;
private _transportIsOpen: boolean;
private _endpoint: string | Array<TransportEndpoint>;
private _emulation: boolean;
private _transports: any[];
Expand Down Expand Up @@ -111,6 +112,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
constructor(endpoint: string | Array<TransportEndpoint>, options?: Partial<Options>) {
super();
this.state = State.Disconnected;
this._transportIsOpen = false;
this._endpoint = endpoint;
this._emulation = false;
this._transports = [];
Expand Down Expand Up @@ -409,6 +411,10 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
})
}

isTransportOpen() {
return this._transportIsOpen;
}

private _debug(...args: any[]) {
if (!this._debugEnabled) {
return;
Expand Down Expand Up @@ -762,28 +768,11 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
const transportId = this._nextTransportId();
self._debug("id of transport", transportId);
let wasOpen = false;

let optimistic = true;
if (this._transport.name() === 'sse') {
// Avoid using optimistic subscriptions with SSE/EventSource as we are sending
// initial data in URL params. URL is recommended to be 2048 chars max – so adding
// subscription data may be risky.
optimistic = false;
}

const initialCommands: any[] = [];

if (this._transport.emulation()) {
const connectCommand = self._sendConnect(true);
initialCommands.push(connectCommand);
if (optimistic) {
const subscribeCommands: any[] = self._sendSubscribeCommands(true, true);
for (const i in subscribeCommands) {
if (subscribeCommands.hasOwnProperty(i)) {
initialCommands.push(subscribeCommands[i]);
}
}
}
}

this._setNetworkEvents();
Expand All @@ -810,15 +799,14 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}
wasOpen = true;
self._debug(transport.subName(), 'transport open');
self._transportWasOpen = true;
if (transport.emulation()) {
return;
}
self._transportIsOpen = true;
self._transportWasOpen = true;
self.startBatching();
self._sendConnect(false);
if (optimistic) {
self._sendSubscribeCommands(true, false);
}
self._sendSubscribeCommands();
self.stopBatching();
},
onError: function (e: any) {
Expand All @@ -839,6 +827,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}
self._debug(transport.subName(), 'transport closed');
self._transportClosed = true;
self._transportIsOpen = false;

let reason = 'connection closed';
let needReconnect = true;
Expand Down Expand Up @@ -1261,6 +1250,9 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
const transport = this._transport;
this._transport = null;
transport.close(); // Close only after setting this._transport to null to avoid recursion when calling transport close().
// Need to mark as closed here, because connect call may be sync called after disconnect,
// transport onClose callback will not be called yet
this._transportIsOpen = false;
this._transportClosed = true;
this._nextTransportId();
} else {
Expand Down Expand Up @@ -1379,7 +1371,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}

protected _unsubscribe(sub: Subscription) {
if (!this._isConnected()) {
if (!this.isTransportOpen()) {
return;
}
const req = {
Expand Down Expand Up @@ -1415,7 +1407,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
return this._serverSubs[channel] !== undefined;
}

private _sendSubscribeCommands(optimistic: boolean, skipSending: boolean): any[] {
private _sendSubscribeCommands(): any[] {
const commands: any[] = [];
for (const channel in this._subs) {
if (!this._subs.hasOwnProperty(channel)) {
Expand All @@ -1428,7 +1420,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}
if (sub.state === SubscriptionState.Subscribing) {
// @ts-ignore – we are hiding some symbols from public API autocompletion.
const cmd = sub._subscribe(optimistic, skipSending);
const cmd = sub._subscribe();
if (cmd) {
commands.push(cmd);
}
Expand All @@ -1438,6 +1430,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}

private _connectResponse(result: any) {
this._transportIsOpen = true;
this._transportWasOpen = true;
this._reconnectAttempts = 0;
this._refreshRequired = false;
Expand All @@ -1460,7 +1453,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
this._node = result.node;

this.startBatching();
this._sendSubscribeCommands(false, false);
this._sendSubscribeCommands();
this.stopBatching();

const ctx: any = {
Expand Down
31 changes: 18 additions & 13 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,16 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
if (this._setState(SubscriptionState.Subscribing)) {
this.emit('subscribing', { channel: this.channel, code: code, reason: reason });
}
this._subscribe(false, false);
this._subscribe();
}

private _subscribe(optimistic: boolean, skipSending: boolean): any {
private _subscribe(): any {
// @ts-ignore – we are hiding some symbols from public API autocompletion.
this._centrifuge._debug('subscribing on', this.channel);

if (this._centrifuge.state !== State.Connected && !optimistic) {
// need to check transport readiness here, because there's no point for calling getData or getToken
// if transport is not ready yet
if (!this._centrifuge.isTransportOpen()) {
// @ts-ignore – we are hiding some symbols from public API autocompletion.
this._centrifuge._debug('delay subscribe on', this.channel, 'till connected');
// subscribe will be called later automatically.
Expand All @@ -280,16 +282,14 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
return;
}
self._data = data;
self._sendSubscribe(self._token, false);
self._sendSubscribe(self._token);
})
return null;
} else {
return self._sendSubscribe(self._token, skipSending);
return self._sendSubscribe(self._token);
}
}
if (optimistic) {
return null;
}

this._getSubscriptionToken().then(function (token) {
if (!self._isSubscribing()) {
return;
Expand All @@ -305,10 +305,10 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
return;
}
self._data = data;
self._sendSubscribe(token, false);
self._sendSubscribe(token);
})
} else {
self._sendSubscribe(token, false);
self._sendSubscribe(token);
}
}).catch(function (e) {
if (!self._isSubscribing()) {
Expand All @@ -331,7 +331,12 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
return null;
}

private _sendSubscribe(token: string, skipSending: boolean): any {
private _sendSubscribe(token: string): any {
// we also need to check for transport state before sending subscription
// because it may change for subscription with side effects (getData, getToken options)
if (!this._centrifuge.isTransportOpen()) {
return null;
}
const channel = this.channel;

const req: any = {
Expand Down Expand Up @@ -375,7 +380,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this._inflight = true;

// @ts-ignore – we are hiding some symbols from public API autocompletion.
this._centrifuge._call(cmd, skipSending).then(resolveCtx => {
this._centrifuge._call(cmd).then(resolveCtx => {
this._inflight = false;
// @ts-ignore - improve later.
const result = resolveCtx.reply.subscribe;
Expand Down Expand Up @@ -492,7 +497,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
const delay = this._getResubscribeDelay();
this._resubscribeTimeout = setTimeout(function () {
if (self._isSubscribing()) {
self._subscribe(false, false);
self._subscribe();
}
}, delay);
}
Expand Down

0 comments on commit 7ab5590

Please sign in to comment.