Skip to content

Commit

Permalink
per sub _optimisticallySent, disable optimistic for http_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Apr 4, 2024
1 parent 2610388 commit 2cec5b9
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 16 deletions.
141 changes: 136 additions & 5 deletions src/centrifuge.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { Centrifuge } from './centrifuge'
import {
DisconnectedContext, Error as CentrifugeError,
PublicationContext, TransportName, UnsubscribedContext, State, SubscriptionState
DisconnectedContext,
Error as CentrifugeError,
PublicationContext,
TransportName,
UnsubscribedContext,
State,
SubscriptionState,
SubscribedContext,
} from './types';
import { disconnectedCodes, unsubscribedCodes, connectingCodes } from './codes';

Expand All @@ -20,9 +26,13 @@ test('no websocket constructor', async () => {
});

const transportCases = [
['websocket', 'ws://localhost:8000/connection/websocket?cf_protocol_version=v2'],
['http_stream', 'http://localhost:8000/connection/http_stream?cf_protocol_version=v2'],
['sse', 'http://localhost:8000/connection/sse?cf_protocol_version=v2'],
['websocket', 'ws://localhost:8000/connection/websocket'],
['http_stream', 'http://localhost:8000/connection/http_stream'],
['sse', 'http://localhost:8000/connection/sse'],
]

const websocketOnly = [
['websocket', 'ws://localhost:8000/connection/websocket'],
]

test.each(transportCases)("%s: connects and disconnects", async (transport, endpoint) => {
Expand Down Expand Up @@ -364,3 +374,124 @@ test.each(transportCases)("%s: subscribe and unsubscribe loop", async (transport
await disconnectedPromise;
expect(c.state).toBe(State.Disconnected);
});

// Make sure we can unsubscribe right after connect called and connect/subscribe
// frames not sent yet.
test.each(transportCases)("%s: unsubscribe right after connect", async (transport, endpoint) => {
const c = new Centrifuge([{
transport: transport as TransportName,
endpoint: endpoint,
}], {
websocket: WebSocket,
fetch: fetch,
eventsource: EventSource,
readableStream: ReadableStream,
emulationEndpoint: 'http://localhost:8000/emulation'
});

c.connect();
await c.ready(5000);

const sub = c.newSubscription('test');

let unsubcribeCalled: any;
const unsubscribedPromise = new Promise<UnsubscribedContext>((resolve, _) => {
unsubcribeCalled = resolve;
})
let subcribeCalled: any;
const subscribedPromise = new Promise<SubscribedContext>((resolve, _) => {
subcribeCalled = resolve;
})

sub.on('subscribed', (ctx) => {
subcribeCalled(ctx);
})
sub.on('unsubscribed', (ctx) => {
unsubcribeCalled(ctx);
})

sub.subscribe();
c.disconnect();
c.connect();
sub.unsubscribe();

expect(sub.state).toBe(SubscriptionState.Unsubscribed);
await unsubscribedPromise;

sub.subscribe();
await subscribedPromise;

let disconnectCalled: any;
const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
disconnectCalled = resolve;
})
c.on('disconnected', (ctx) => {
disconnectCalled(ctx);
})

c.disconnect();
await disconnectedPromise;
expect(c.state).toBe(State.Disconnected);
});

// Make sure we can unsubscribe right after connect frame sent but reply has not been yet received.
// This is important to cover bug described in https://github.com/centrifugal/centrifuge-js/pull/274.
test.each(websocketOnly)("%s: unsubscribe in between connect command and reply", async (transport, endpoint) => {
const c = new Centrifuge([{
transport: transport as TransportName,
endpoint: endpoint,
}], {
websocket: WebSocket,
fetch: fetch,
eventsource: EventSource,
readableStream: ReadableStream,
emulationEndpoint: 'http://localhost:8000/emulation'
});

const sub = c.newSubscription('test');

let unsubcribeCalled: any;
const unsubscribedPromise = new Promise<UnsubscribedContext>((resolve, _) => {
unsubcribeCalled = resolve;
})
let subcribeCalled: any;
const subscribedPromise = new Promise<SubscribedContext>((resolve, _) => {
subcribeCalled = resolve;
})

// @ts-ignore this is only for test purposes.
c.on('__centrifuge_debug:connect_frame_sent', () => {
sub.unsubscribe();
unsubcribeCalled()
})

sub.on('subscribed', (ctx) => {
subcribeCalled(ctx);
})
sub.on('unsubscribed', (ctx) => {
unsubcribeCalled(ctx);
})

sub.subscribe();
c.connect();

await unsubscribedPromise;
await c.ready()

await new Promise(r => setTimeout(r, 2000));
sub.subscribe();

await subscribedPromise;

let disconnectCalled: any;
const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
disconnectCalled = resolve;
})
c.on('disconnected', (ctx) => {
disconnectCalled(ctx);
})

c.disconnect();
await disconnectedPromise;
expect(c.state).toBe(State.Disconnected);
});
26 changes: 15 additions & 11 deletions src/centrifuge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
private _currentTransportIndex: number;
private _triedAllTransports: boolean;
private _transportWasOpen: boolean;
private _sentOptimisticCommands: boolean;
private _transport?: any;
private _transportId: number;
private _deviceWentOffline: boolean;
Expand Down Expand Up @@ -118,7 +117,6 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
this._currentTransportIndex = 0;
this._triedAllTransports = false;
this._transportWasOpen = false;
this._sentOptimisticCommands = false;
this._transport = null;
this._transportId = 0;
this._deviceWentOffline = false;
Expand Down Expand Up @@ -766,10 +764,15 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
let wasOpen = false;

let optimistic = true;
if (this._transport.name() === 'sse') {
// Avoid using optimistic subscriptions with SSE/EventSource as we are sending
// initial data in URL params. URL is recommended to be 2048 chars max – so adding
// subscription data may be risky.
if (this._transport.emulation()) {
// Avoid using optimistic subscriptions for emulation transport because calling
// unsubscribe before receiving node identifier in the connect reply results into
// "node not found" error on the server side. It's theoretically possible to still
// have optimistic subscriptions in emulation scenario – but in this case we need
// to delay unsubscribe frames till we get the node identifier.
// Another reason to avoid using optimistic subscriptions with SSE/EventSource as
// we are sending initial data in URL params. URL is recommended to be 2048 chars
// max – so adding subscription data may be risky.
optimistic = false;
}

Expand Down Expand Up @@ -819,10 +822,11 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
self.startBatching();
self._sendConnect(false);
if (optimistic) {
const commands = self._sendSubscribeCommands(true, false);
self._sentOptimisticCommands = commands.length !== 0;
self._sendSubscribeCommands(true, false);
}
self.stopBatching();
//@ts-ignore only for debug and test purposes.
self.emit('__centrifuge_debug:connect_frame_sent', {})
},
onError: function (e: any) {
if (self._transportId != transportId) {
Expand All @@ -842,7 +846,6 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}
self._debug(transport.subName(), 'transport closed');
self._transportClosed = true;
self._sentOptimisticCommands = false;

let reason = 'connection closed';
let needReconnect = true;
Expand Down Expand Up @@ -1383,7 +1386,9 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}

protected _unsubscribe(sub: Subscription) {
if (!this._isConnected() && !this._sentOptimisticCommands) {
// @ts-ignore we need to get _optimisticallySent but it's not part of public API.
const isOptimisticallySent = sub._optimisticallySent;
if (!this._isConnected() && !isOptimisticallySent) {
return;
}
const req = {
Expand Down Expand Up @@ -1445,7 +1450,6 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
this._transportWasOpen = true;
this._reconnectAttempts = 0;
this._refreshRequired = false;
this._sentOptimisticCommands = false;

if (this._isConnected()) {
return;
Expand Down
9 changes: 9 additions & 0 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
private _joinLeave: boolean;
// @ts-ignore – this is used by a client in centrifuge.ts.
private _inflight: boolean;
// @ts-ignore - this is used by a client in centrifuge.ts.
private _optimisticallySent: boolean;

/** Subscription constructor should not be used directly, create subscriptions using Client method. */
constructor(centrifuge: Centrifuge, channel: string, options?: Partial<SubscriptionOptions>) {
Expand All @@ -59,6 +61,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this._promises = {};
this._promiseId = 0;
this._inflight = false;
this._optimisticallySent = false;
this._refreshTimeout = null;
this._setOptions(options);
// @ts-ignore – we are hiding some symbols from public API autocompletion.
Expand Down Expand Up @@ -284,6 +287,9 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
})
return null;
} else {
if (optimistic) {
self._optimisticallySent = true;
}
return self._sendSubscribe(self._token, skipSending);
}
}
Expand Down Expand Up @@ -377,6 +383,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
// @ts-ignore – we are hiding some symbols from public API autocompletion.
this._centrifuge._call(cmd, skipSending).then(resolveCtx => {
this._inflight = false;
this._optimisticallySent = false;
// @ts-ignore - improve later.
const result = resolveCtx.reply.subscribe;
this._handleSubscribeResponse(
Expand All @@ -389,6 +396,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
}
}, rejectCtx => {
this._inflight = false;
this._optimisticallySent = false;
this._handleSubscribeError(rejectCtx.error);
if (rejectCtx.next) {
rejectCtx.next();
Expand Down Expand Up @@ -437,6 +445,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
if (this._setState(SubscriptionState.Unsubscribed)) {
this.emit('unsubscribed', { channel: this.channel, code: code, reason: reason });
}
this._optimisticallySent = false;
this._rejectPromises({ code: errorCodes.subscriptionUnsubscribed, message: this.state });
}

Expand Down

0 comments on commit 2cec5b9

Please sign in to comment.