Skip to content

Commit

Permalink
per subscription _optimisticallySent flag
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-emelin committed Apr 3, 2024
1 parent 2610388 commit 4ffc99a
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 17 deletions.
141 changes: 135 additions & 6 deletions src/centrifuge.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { Centrifuge } from './centrifuge'
import {
DisconnectedContext, Error as CentrifugeError,
PublicationContext, TransportName, UnsubscribedContext, State, SubscriptionState
DisconnectedContext,
Error as CentrifugeError,
PublicationContext,
TransportName,
UnsubscribedContext,
State,
SubscriptionState,
SubscribedContext
} from './types';
import { disconnectedCodes, unsubscribedCodes, connectingCodes } from './codes';

Expand All @@ -15,14 +21,18 @@ test('invalid endpoint', () => {
});

test('no websocket constructor', async () => {
const c = new Centrifuge('ws://localhost:8000/connection/websocket?cf_protocol_version=v2');
const c = new Centrifuge('ws://localhost:8000/connection/websocket');
expect(() => { c.connect() }).toThrowError();
});

const transportCases = [
['websocket', 'ws://localhost:8000/connection/websocket?cf_protocol_version=v2'],
['http_stream', 'http://localhost:8000/connection/http_stream?cf_protocol_version=v2'],
['sse', 'http://localhost:8000/connection/sse?cf_protocol_version=v2'],
['websocket', 'ws://localhost:8000/connection/websocket'],
['http_stream', 'http://localhost:8000/connection/http_stream'],
['sse', 'http://localhost:8000/connection/sse'],
]

const websocketOnly = [
['websocket', 'ws://localhost:8000/connection/websocket'],
]

test.each(transportCases)("%s: connects and disconnects", async (transport, endpoint) => {
Expand Down Expand Up @@ -364,3 +374,122 @@ test.each(transportCases)("%s: subscribe and unsubscribe loop", async (transport
await disconnectedPromise;
expect(c.state).toBe(State.Disconnected);
});

// Make sure we can unsubscribe right after connect called and connect/subscribe
// frames not sent yet.
test.each(transportCases)("%s: unsubscribe right after connect", async (transport, endpoint) => {
const c = new Centrifuge([{
transport: transport as TransportName,
endpoint: endpoint,
}], {
websocket: WebSocket,
fetch: fetch,
eventsource: EventSource,
readableStream: ReadableStream,
emulationEndpoint: 'http://localhost:8000/emulation'
});

c.connect();
await c.ready(5000);

const sub = c.newSubscription('test');

let unsubcribeCalled: any;
const unsubscribedPromise = new Promise<UnsubscribedContext>((resolve, _) => {
unsubcribeCalled = resolve;
})
let subcribeCalled: any;
const subscribedPromise = new Promise<SubscribedContext>((resolve, _) => {
subcribeCalled = resolve;
})

sub.on('subscribed', (ctx) => {
subcribeCalled(ctx);
})
sub.on('unsubscribed', (ctx) => {
unsubcribeCalled(ctx);
})

sub.subscribe();
c.disconnect();
c.connect();
sub.unsubscribe();

expect(sub.state).toBe(SubscriptionState.Unsubscribed);
await unsubscribedPromise;

sub.subscribe();
await subscribedPromise;

let disconnectCalled: any;
const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
disconnectCalled = resolve;
})
c.on('disconnected', (ctx) => {
disconnectCalled(ctx);
})

c.disconnect();
await disconnectedPromise;
expect(c.state).toBe(State.Disconnected);
});

test.each(websocketOnly)("%s: unsubscribe in between connect command and reply", async (transport, endpoint) => {
const c = new Centrifuge([{
transport: transport as TransportName,
endpoint: endpoint,
}], {
websocket: WebSocket,
fetch: fetch,
eventsource: EventSource,
readableStream: ReadableStream,
emulationEndpoint: 'http://localhost:8000/emulation'
});

const sub = c.newSubscription('test');

let unsubcribeCalled: any;
const unsubscribedPromise = new Promise<UnsubscribedContext>((resolve, _) => {
unsubcribeCalled = resolve;
})
let subcribeCalled: any;
const subscribedPromise = new Promise<SubscribedContext>((resolve, _) => {
subcribeCalled = resolve;
})

// @ts-ignore this is only for test purposes.
c.on('__centrifuge_debug:connect_frame_sent', () => {
sub.unsubscribe();
unsubcribeCalled()
})

sub.on('subscribed', (ctx) => {
subcribeCalled(ctx);
})
sub.on('unsubscribed', (ctx) => {
unsubcribeCalled(ctx);
})

sub.subscribe();
c.connect();

await unsubscribedPromise;
await c.ready()

await new Promise(r => setTimeout(r, 2000));
sub.subscribe();

await subscribedPromise;

let disconnectCalled: any;
const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
disconnectCalled = resolve;
})
c.on('disconnected', (ctx) => {
disconnectCalled(ctx);
})

c.disconnect();
await disconnectedPromise;
expect(c.state).toBe(State.Disconnected);
});
24 changes: 13 additions & 11 deletions src/centrifuge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
private _currentTransportIndex: number;
private _triedAllTransports: boolean;
private _transportWasOpen: boolean;
private _sentOptimisticCommands: boolean;
private _transport?: any;
private _transportId: number;
private _deviceWentOffline: boolean;
Expand Down Expand Up @@ -118,7 +117,6 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
this._currentTransportIndex = 0;
this._triedAllTransports = false;
this._transportWasOpen = false;
this._sentOptimisticCommands = false;
this._transport = null;
this._transportId = 0;
this._deviceWentOffline = false;
Expand Down Expand Up @@ -766,10 +764,13 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
let wasOpen = false;

let optimistic = true;
if (this._transport.name() === 'sse') {
// Avoid using optimistic subscriptions with SSE/EventSource as we are sending
// initial data in URL params. URL is recommended to be 2048 chars max – so adding
// subscription data may be risky.
if (this._transport.emulation()) {
// Avoid using optimistic subscriptions for emulation transport because calling
// unsubscribe before receiving node identifier in the connect reply results into
// "node not found" error on the server side.
// Another reason to avoid using optimistic subscriptions with SSE/EventSource as
// we are sending initial data in URL params. URL is recommended to be 2048 chars
// max – so adding subscription data may be risky.
optimistic = false;
}

Expand Down Expand Up @@ -819,10 +820,11 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
self.startBatching();
self._sendConnect(false);
if (optimistic) {
const commands = self._sendSubscribeCommands(true, false);
self._sentOptimisticCommands = commands.length !== 0;
self._sendSubscribeCommands(true, false);
}
self.stopBatching();
//@ts-ignore only for debug and test purposes.
self.emit('__centrifuge_debug:connect_frame_sent', {})
},
onError: function (e: any) {
if (self._transportId != transportId) {
Expand All @@ -842,7 +844,6 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}
self._debug(transport.subName(), 'transport closed');
self._transportClosed = true;
self._sentOptimisticCommands = false;

let reason = 'connection closed';
let needReconnect = true;
Expand Down Expand Up @@ -1383,7 +1384,9 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}

protected _unsubscribe(sub: Subscription) {
if (!this._isConnected() && !this._sentOptimisticCommands) {
// @ts-ignore

Check failure on line 1387 in src/centrifuge.ts

View workflow job for this annotation

GitHub Actions / test (18)

Include a description after the "@ts-ignore" directive to explain why the @ts-ignore is necessary. The description must be 3 characters or longer

Check failure on line 1387 in src/centrifuge.ts

View workflow job for this annotation

GitHub Actions / test (20)

Include a description after the "@ts-ignore" directive to explain why the @ts-ignore is necessary. The description must be 3 characters or longer
const isOptimisticallySent = sub._optimisticallySent;
if (!this._isConnected() && !isOptimisticallySent) {
return;
}
const req = {
Expand Down Expand Up @@ -1445,7 +1448,6 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
this._transportWasOpen = true;
this._reconnectAttempts = 0;
this._refreshRequired = false;
this._sentOptimisticCommands = false;

if (this._isConnected()) {
return;
Expand Down
9 changes: 9 additions & 0 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
private _epoch: string | null;
private _resubscribeAttempts: number;
private _promiseId: number;
// @ts-ignore - this is used by a client in centrifuge.ts.
private _optimisticallySent: boolean;

private _token: string;
private _data: any | null;
Expand Down Expand Up @@ -60,6 +62,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this._promiseId = 0;
this._inflight = false;
this._refreshTimeout = null;
this._optimisticallySent = false;
this._setOptions(options);
// @ts-ignore – we are hiding some symbols from public API autocompletion.
if (this._centrifuge._debugEnabled) {
Expand Down Expand Up @@ -284,6 +287,9 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
})
return null;
} else {
if (optimistic) {
self._optimisticallySent = true;
}
return self._sendSubscribe(self._token, skipSending);
}
}
Expand Down Expand Up @@ -377,6 +383,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
// @ts-ignore – we are hiding some symbols from public API autocompletion.
this._centrifuge._call(cmd, skipSending).then(resolveCtx => {
this._inflight = false;
this._optimisticallySent = false;
// @ts-ignore - improve later.
const result = resolveCtx.reply.subscribe;
this._handleSubscribeResponse(
Expand All @@ -389,6 +396,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
}
}, rejectCtx => {
this._inflight = false;
this._optimisticallySent = false;
this._handleSubscribeError(rejectCtx.error);
if (rejectCtx.next) {
rejectCtx.next();
Expand Down Expand Up @@ -438,6 +446,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this.emit('unsubscribed', { channel: this.channel, code: code, reason: reason });
}
this._rejectPromises({ code: errorCodes.subscriptionUnsubscribed, message: this.state });
this._optimisticallySent = false;
}

private _handlePublication(pub: any) {
Expand Down

0 comments on commit 4ffc99a

Please sign in to comment.