Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unsubscribe from optimistic inflight subscription #275

Closed
wants to merge 1 commit into from

Conversation

FZambia
Copy link
Member

@FZambia FZambia commented Apr 1, 2024

Alternative solution for #274

@ThisIsEsh
Copy link
Contributor

ThisIsEsh commented Apr 2, 2024

Hello!

Checked it out in my playground, it fixes optimistic inflight subscription problem.

But it also covers another situation, when subscription is sent, the client did not receive response yet and connection goes to connecting state. In this case, variable isSubOptimisticallySent could be lit bit misleading and it will also lead to situation when we will queue unsub frame, it does not affect anything right now, but maybe it can lead to some unexpected behaviour later if server will not just ignore such frames.

Also it may lead to some unexpected behaviour with such call chain, where client is Centrifuge instance
let test = client.newSubscription('test-channel'); test.subscribe(); client.disconnect(); client.connect(); test.unsubscribe();

It will lead to this error "Failed to execute 'send' on 'WebSocket': Still in CONNECTING state." (you can also see it on screenshot), because transport is not open yet.

image

@FZambia
Copy link
Member Author

FZambia commented Apr 2, 2024

Also it may lead to some unexpected behaviour with such call chain, where client is Centrifuge instance
let test = client.newSubscription('test-channel'); test.subscribe(); client.disconnect(); client.connect(); test.unsubscribe();

Tried to reproduce this with test case:

test.each(transportCases)("%s: unsubscribe inflight", async (transport, endpoint) => {
  const c = new Centrifuge([{
    transport: transport as TransportName,
    endpoint: endpoint,
  }], {
    websocket: WebSocket,
    fetch: fetch,
    eventsource: EventSource,
    readableStream: ReadableStream,
    emulationEndpoint: 'http://localhost:8000/emulation'
  });

  const sub = c.newSubscription('test');

  let unsubcribeCalled: any;
  const unsubscribedPromise = new Promise<UnsubscribedContext>((resolve, _) => {
    unsubcribeCalled = resolve;
  })
  sub.on('unsubscribed', (ctx) => {
    unsubcribeCalled(ctx);
  })

  sub.subscribe();
  c.connect();
  sub.unsubscribe();

  expect(sub.state).toBe(SubscriptionState.Unsubscribed);
  await unsubscribedPromise;

  let disconnectCalled: any;
  const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
    disconnectCalled = resolve;
  })
  c.on('disconnected', (ctx) => {
    disconnectCalled(ctx);
  })

  c.disconnect();
  await disconnectedPromise;
  expect(c.state).toBe(State.Disconnected);
});

And it worked fine for me. In this case I am not sure how the error you've shown may be possible. Let's break down:

  • client.newSubscription('test-channel'); - no frames sent since we are disconnected
  • test.subscribe(); - no frames sent since we are disconnected
  • client.disconnect(); - not sure it's needed since it's noop in disconnected state
  • client.connect(); - initializes connection, client goes to Connecting state
  • test.unsubscribe(); - I think it can't cause the error you've shown because while WebSocket transport in CONNECTING state we are not sending connect command/subscription commands and sub._inflight should be false - thus no unsubscribe frame should be sent.

So for now I do not see how to reproduce the error WebSocket': Still in CONNECTING state. you've shown. Am I missing something?

@FZambia
Copy link
Member Author

FZambia commented Apr 2, 2024

I see! Looks like I missed that the client must be originally in Connected state to reproduce, that's why you called client.disconnect. So I can reproduce it with this test case:

test.each(transportCases)("%s: unsubscribe inflight", async (transport, endpoint) => {
  const c = new Centrifuge([{
    transport: transport as TransportName,
    endpoint: endpoint,
  }], {
    websocket: WebSocket,
    fetch: fetch,
    eventsource: EventSource,
    readableStream: ReadableStream,
    emulationEndpoint: 'http://localhost:8000/emulation'
  });

  c.connect();
  await c.ready(5000);

  const sub = c.newSubscription('test');

  let unsubcribeCalled: any;
  const unsubscribedPromise = new Promise<UnsubscribedContext>((resolve, _) => {
    unsubcribeCalled = resolve;
  })
  let subcribeCalled: any;
  const subscribedPromise = new Promise<SubscribedContext>((resolve, _) => {
    subcribeCalled = resolve;
  })
  sub.on('subscribed', (ctx) => {
    subcribeCalled(ctx);
  })
  sub.on('unsubscribed', (ctx) => {
    unsubcribeCalled(ctx);
  })

  sub.subscribe();
  c.disconnect();
  c.connect();
  sub.unsubscribe();

  expect(sub.state).toBe(SubscriptionState.Unsubscribed);
  await unsubscribedPromise;

  sub.subscribe();
  await subscribedPromise;

  let disconnectCalled: any;
  const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
    disconnectCalled = resolve;
  })
  c.on('disconnected', (ctx) => {
    disconnectCalled(ctx);
  })

  c.disconnect();
  await disconnectedPromise;
  expect(c.state).toBe(State.Disconnected);
});

Is this correct assumption?

@ThisIsEsh
Copy link
Contributor

ThisIsEsh commented Apr 2, 2024

Yep, you are right connection must be connected before calls in my example, sorry that I did not mention that at first, here's short example:

const MIN_RETRY = 4000;
const MAX_RETRY = 4.611686018427388e21;

const client = new Centrifuge(centrifugeUrl, {
  minReconnectDelay: MIN_RETRY,
  maxReconnectDelay: MAX_RETRY,
  token: centrifugoToken
});

client.once('connected', () => {
  let test = client.newSubscription('test-channel33');
  test.subscribe();
  client.disconnect();
  client.connect();
  test.unsubscribe();
})
client.connect()

@FZambia
Copy link
Member Author

FZambia commented Apr 3, 2024

Yep, I see now that this alternative solution is worse than what you originally suggested. I'll come with another PR tomorrow - using your approach as base but slightly modified – to manage optimisticallySent flag on per-subscription basis.

@ThisIsEsh
Copy link
Contributor

ThisIsEsh commented Apr 3, 2024

I have been thinking about per subscription basis, but will there be use for it?

All subs, that will be created after connection + optimistic subscribe frames has been sent, will be sent after connection open, if we unsub from them before connection is opened they will be just removed from queue.

Also it may make centrifuge and subscription entity more codependent between each other, because client will need to know that subs may be different.

In my approach all logic about optiomistic frames are bound inside centrifuge entity. It just knows that optimistic frames has been sent and it stops early returning in _unsubscribe calls. In it's order _unsubscribe calls are controlled inside subscription and will be called only if subscrption frames has been sent before.

P.S. Just sharing my point of view to the problem, after hours spent in researches)

@FZambia
Copy link
Member Author

FZambia commented Apr 4, 2024

Thanks, it's definitely more complexity. But having per-subscription flags seems natural to me because not all subscriptions are optimistic in centrifuge-js (this depends on configuration and current token state - here is a single branch of code in which optimistic subscription will be sent - it's the common path in general I believe, so most subs are optimistic, but other paths around can happen too). So when you call unsubscribe for some subscription – it's not necessarily sent as part of optimistic subscription batch. That's why in this case I think it's better to fallback to the current logic which seems to work correctly for non-optimistic case.

Does it make sense?

UPD. Opened PR with what I mean - #276

@ThisIsEsh
Copy link
Contributor

ThisIsEsh commented Apr 4, 2024

Thank you! Yeah, that's exact what I have been thinking about, when have been trying per sub approach)

But it looks like it's already controlled by sub itself, because it will never call this._centrifuge._unsubscribe, if subscription is not in subscribed | subscribing state, so _optimisticallySent flag will be some kind of guard logic over it and not cover any additional case, or I am missing something and there's case where it works another way?

What’s also a little scary (from debugging side of view) is that the unsubscribe logic will be more spread between the client and the subscription entity, which will complicate future debugging.

@ThisIsEsh
Copy link
Contributor

ThisIsEsh commented Apr 4, 2024

I mean that calling optimistic subscribe will already change state of subscription and only such subscriptions will be calling _unsubscribe in client before connection is fully established.

All other subscriptions will just not call sub/unsub before connection is established and after connection is established, there's no cases when subs need to know if they are optimistic or not. So it becomes logic inside subscription that is used only inside client.

@FZambia
Copy link
Member Author

FZambia commented Apr 4, 2024

it will never call this._centrifuge._unsubscribe, if subscription is not in subscribed | subscribing state

Subscription goes to subscribing state right after .subscribe() has been called. It may be done even before calling client.connect() for the first time.

At any point after that .unsubscribe() may be called. Let's assume that at this point client is in the process of connecting with other optimistic subscriptions. But this particular subscription in subscribing state may be not included into optimistic batch (if subscription uses token and have not received it for the first time, or if it uses dynamic per subscription data) – in that case unnecessary unsubscribe command will be sent to server.

Also I think more logic may be added in the future which may result into unnecessary unsubscribe sent. I am not happy from tight coupling of Client and Subscription – but it's sth we already have and it's hard to change without loosing efficiency.

What do you think? Maybe I am still missing sth which ruins my concerns?

@FZambia
Copy link
Member Author

FZambia commented Apr 4, 2024

I suppose it's worth writing a test case to check this for sure.

@ThisIsEsh
Copy link
Contributor

Yeah, but even the subs with getData will send their frame, the only difference it will not be in batch with connection, so it looks like unsub frame is also required for such subs.

Here's short representer, which leads to such frames order in open WS connection, you need to turn off/on your internet connection

const MIN_RETRY = 4000;
const MAX_RETRY = 4.611686018427388e21;

const client = new Centrifuge(centrifugeUrl, {
  minReconnectDelay: MIN_RETRY,
  maxReconnectDelay: MAX_RETRY,
  token: centrifugoToken
});

window.addEventListener("offline", (event) => {
  console.log("centrifugo went to connecting state, but did not receive connect response yet, removing sub")
  let getDataPromiseResolver;
  const offlineSub  = client.newSubscription('test-channel22', {
    getData: () => {
      return new Promise((resolve) => {
        getDataPromiseResolver = resolve
      })
    }
  });
  offlineSub.subscribe();
  // You need to delay connection response or add such custom  event in the end of onOpen callback document.dispatchEvent(new CustomEvent('centrifugo-connect-sent'))
  document.addEventListener('centrifugo-connect-sent', () => {
    setTimeout(() => {
      getDataPromiseResolver({data: '123'});
      setTimeout(() => {
        client.removeSubscription(offlineSub);
      }, 1)
    }, 2)
  })

});
client.connect();
image

@ThisIsEsh
Copy link
Contributor

ThisIsEsh commented Apr 4, 2024

Just got another thought, maybe it's better to introduce some kind of flag, that will indicate that transport is open and rely only on it when sending sub/unsub frames?

Looks like it will close and issue and also allow not only subs with getData to send their frame during connection process, which can speed up subscription process lit bit for some cases.

But I did not deep dive server code yet and not sure if it's ready for it, but from first glance, based on screenshot in my prev comment, looks like yes.

upd: I can prepare the PR if this idea makes sense

@FZambia
Copy link
Member Author

FZambia commented Apr 4, 2024

Yeah, but even the subs with getData will send their frame

May be a bug - they should not be included to optimistic batch. At least in my understanding. Let me check why this happens and come back.

Just got another thought, maybe it's better to introduce some kind of flag, that will indicate that transport is open and rely only on it when sending sub/unsub frames?

I like the idea, may even simplify some things in current implementation. One note - it's not possible to use in emulation (transport.emulation() === true) scenario since we need to wait for connect reply with a node identifier in it. Maybe sth else will araise. If you want you can try making this, I will come back to this soon also - checking current getData behaviour as a first step.

@ThisIsEsh
Copy link
Contributor

Well, it's not included in optimistic batch, it will be sent after it as a separate frame and looks like it works, based on responses from the server

@FZambia
Copy link
Member Author

FZambia commented Apr 4, 2024

Well, it's not included in optimistic batch, it will be sent after it as a separate frame and looks like it works, based on responses from the server

Sorry, I've meant why it's sent before connect reply received. Also wondering, is it the same now for subscription with token case (when no Subscription token provided but Subscription getToken function is used - i.e. no initial token scenario), or only getData behaves like this.

@ThisIsEsh
Copy link
Contributor

ThisIsEsh commented Apr 4, 2024

Looks like not really same, all optimistic subs will early return here

image

And all non optimistic will end up here during connection
image

But there is possible to reach same behaviour if we call subscribe for such subscriptions during connected state, but getToken fn will be resolved during connecting state later, for example during reconnect

const MIN_RETRY = 4000;
const MAX_RETRY = 4.611686018427388e21;

const client = new Centrifuge(centrifugeUrl, {
  minReconnectDelay: MIN_RETRY,
  maxReconnectDelay: MAX_RETRY,
  token: centrifugoToken
});

let getTokenPromiseResolver;
let getTokenSub

client.once('connected', () => {
  getTokenSub = client.newSubscription('test-channel22', {
    getToken: () => {
      return new Promise((resolve) => {
        getTokenPromiseResolver = resolve
      })
    }
  });
  getTokenSub.subscribe();
})
window.addEventListener("offline", (event) => {
  console.log("centrifugo went to connecting state, but did not receive connect response yet, removing sub")
  // You need to delay connection response or add such custom  event in the end of onOpen callback document.dispatchEvent(new CustomEvent('centrifugo-connect-sent'))
  document.addEventListener('centrifugo-connect-sent', () => {
    setTimeout(() => {
      getTokenPromiseResolver(centrifugoToken);
      setTimeout(() => {
        client.removeSubscription(getTokenSub);
      }, 1)
    }, 2)
  })

});
client.connect();
image

UPD: Also, it looks like if promise will be resolved and transport will not be opened at that time, it will lead to an error

@FZambia
Copy link
Member Author

FZambia commented Apr 4, 2024

But there is possible to reach same behaviour if we call subscribe for such subscriptions during connected state, but getToken fn will be resolved during connecting state later, for example during reconnect

Looks like it's also a buggy current behaviour – it fails with unsubscribe in the same manner as optimistic subs. Right?

@FZambia
Copy link
Member Author

FZambia commented Apr 4, 2024

UPD: Also, it looks like if promise will be resolved and transport will not be opened at that time, it will lead to an error

You mean token promise may be resolved and _sendSubscribe will be called (here) without connection check? Seems true. Need to check connection status also.

@ThisIsEsh
Copy link
Contributor

Looks like it's also a buggy current behaviour – it fails with unsubscribe in the same manner as optimistic subs. Right?

Yep, it will lead to same behaviour

Looks like transport state could resolve all this problems. Already made draft and it works perfectly with transport that has separated logic for opening and sending first batch of commands. But if it's emulated transport, looks like it may require some kind of queue for unsub commands, that is called during opening process, if we sent optimistic commands. And separation of subscribe and unsubscribe commands, which will lead to situation where we will need to make one more call of sending missing commands.

I have seen you made a change in your PR which will set optimistic false for all emulation transport, looks like in such case we will never have to call _unsubscribe before such transport is really opened and it will not be a problem.

If its not really required to support optimistic subs for emulation, it will be easier to prepare proposal.

P.S. Just deployed fork to production, already see significant drop of unsubscribe errors from 100-200 to 20-40 per minute, but clients are still updating

@FZambia
Copy link
Member Author

FZambia commented Apr 4, 2024

Looks like transport state could resolve all this problems. Already made draft and it works perfectly with transport that has separated logic for opening and sending first batch of commands.

Yep, sounds great

But if it's emulated transport, looks like it may require some kind of queue for unsub commands, that is called during opening process, if we sent optimistic commands.

I think we can skip optimistic subs for emulation transports for now, it's a bit unfortunate, but I think it will be right to implement it as a separate step later if needed.

@ThisIsEsh
Copy link
Contributor

Made new PR - #278 , left some notes inside of it, can you check it out plz?

@FZambia
Copy link
Member Author

FZambia commented Apr 6, 2024

Closing in favour of #278

@FZambia FZambia closed this Apr 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants