diff --git a/src/centrifuge.test.ts b/src/centrifuge.test.ts index 4de736a1..74b367c2 100644 --- a/src/centrifuge.test.ts +++ b/src/centrifuge.test.ts @@ -1,7 +1,13 @@ import { Centrifuge } from './centrifuge' import { - DisconnectedContext, Error as CentrifugeError, - PublicationContext, TransportName, UnsubscribedContext, State, SubscriptionState + DisconnectedContext, + Error as CentrifugeError, + PublicationContext, + TransportName, + UnsubscribedContext, + State, + SubscriptionState, + SubscribedContext, } from './types'; import { disconnectedCodes, unsubscribedCodes, connectingCodes } from './codes'; @@ -20,9 +26,13 @@ test('no websocket constructor', async () => { }); const transportCases = [ - ['websocket', 'ws://localhost:8000/connection/websocket?cf_protocol_version=v2'], - ['http_stream', 'http://localhost:8000/connection/http_stream?cf_protocol_version=v2'], - ['sse', 'http://localhost:8000/connection/sse?cf_protocol_version=v2'], + ['websocket', 'ws://localhost:8000/connection/websocket'], + ['http_stream', 'http://localhost:8000/connection/http_stream'], + ['sse', 'http://localhost:8000/connection/sse'], +] + +const websocketOnly = [ + ['websocket', 'ws://localhost:8000/connection/websocket'], ] test.each(transportCases)("%s: connects and disconnects", async (transport, endpoint) => { @@ -364,3 +374,124 @@ test.each(transportCases)("%s: subscribe and unsubscribe loop", async (transport await disconnectedPromise; expect(c.state).toBe(State.Disconnected); }); + +// Make sure we can unsubscribe right after connect called and connect/subscribe +// frames not sent yet. +test.each(transportCases)("%s: unsubscribe right after connect", async (transport, endpoint) => { + const c = new Centrifuge([{ + transport: transport as TransportName, + endpoint: endpoint, + }], { + websocket: WebSocket, + fetch: fetch, + eventsource: EventSource, + readableStream: ReadableStream, + emulationEndpoint: 'http://localhost:8000/emulation' + }); + + c.connect(); + await c.ready(5000); + + const sub = c.newSubscription('test'); + + let unsubcribeCalled: any; + const unsubscribedPromise = new Promise((resolve, _) => { + unsubcribeCalled = resolve; + }) + let subcribeCalled: any; + const subscribedPromise = new Promise((resolve, _) => { + subcribeCalled = resolve; + }) + + sub.on('subscribed', (ctx) => { + subcribeCalled(ctx); + }) + sub.on('unsubscribed', (ctx) => { + unsubcribeCalled(ctx); + }) + + sub.subscribe(); + c.disconnect(); + c.connect(); + sub.unsubscribe(); + + expect(sub.state).toBe(SubscriptionState.Unsubscribed); + await unsubscribedPromise; + + sub.subscribe(); + await subscribedPromise; + + let disconnectCalled: any; + const disconnectedPromise = new Promise((resolve, _) => { + disconnectCalled = resolve; + }) + c.on('disconnected', (ctx) => { + disconnectCalled(ctx); + }) + + c.disconnect(); + await disconnectedPromise; + expect(c.state).toBe(State.Disconnected); +}); + +// Make sure we can unsubscribe right after connect frame sent but reply has not been yet received. +// This is important to cover bug described in https://github.com/centrifugal/centrifuge-js/pull/274. +test.each(websocketOnly)("%s: unsubscribe in between connect command and reply", async (transport, endpoint) => { + const c = new Centrifuge([{ + transport: transport as TransportName, + endpoint: endpoint, + }], { + websocket: WebSocket, + fetch: fetch, + eventsource: EventSource, + readableStream: ReadableStream, + emulationEndpoint: 'http://localhost:8000/emulation' + }); + + const sub = c.newSubscription('test'); + + let unsubcribeCalled: any; + const unsubscribedPromise = new Promise((resolve, _) => { + unsubcribeCalled = resolve; + }) + let subcribeCalled: any; + const subscribedPromise = new Promise((resolve, _) => { + subcribeCalled = resolve; + }) + + // @ts-ignore this is only for test purposes. + c.on('__centrifuge_debug:connect_frame_sent', () => { + sub.unsubscribe(); + unsubcribeCalled() + }) + + sub.on('subscribed', (ctx) => { + subcribeCalled(ctx); + }) + sub.on('unsubscribed', (ctx) => { + unsubcribeCalled(ctx); + }) + + sub.subscribe(); + c.connect(); + + await unsubscribedPromise; + await c.ready() + + await new Promise(r => setTimeout(r, 2000)); + sub.subscribe(); + + await subscribedPromise; + + let disconnectCalled: any; + const disconnectedPromise = new Promise((resolve, _) => { + disconnectCalled = resolve; + }) + c.on('disconnected', (ctx) => { + disconnectCalled(ctx); + }) + + c.disconnect(); + await disconnectedPromise; + expect(c.state).toBe(State.Disconnected); +}); \ No newline at end of file diff --git a/src/centrifuge.ts b/src/centrifuge.ts index a141a833..e56ed353 100644 --- a/src/centrifuge.ts +++ b/src/centrifuge.ts @@ -71,7 +71,6 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter TypedEventEmitter TypedEventEmitter TypedEventEmitter TypedEventEmitter TypedEventEmitter TypedEventEmitter TypedEventEmitter) { @@ -59,6 +61,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter TypedEventEmitter TypedEventEmitter { this._inflight = false; + this._optimisticallySent = false; // @ts-ignore - improve later. const result = resolveCtx.reply.subscribe; this._handleSubscribeResponse( @@ -389,6 +396,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter { this._inflight = false; + this._optimisticallySent = false; this._handleSubscribeError(rejectCtx.error); if (rejectCtx.next) { rejectCtx.next(); @@ -437,6 +445,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter