From 6b96b8e4e9ccc110cf36c47a29f504551f81121a Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 30 Oct 2023 18:46:55 +0530 Subject: [PATCH] Added implementation for resending acks without msgserial change for successful resume --- ably/realtime_client.go | 13 ++++++------- ably/realtime_conn.go | 11 ++++++++++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/ably/realtime_client.go b/ably/realtime_client.go index 752fdfbe..92369f65 100644 --- a/ably/realtime_client.go +++ b/ably/realtime_client.go @@ -73,7 +73,7 @@ func (c *Realtime) onChannelMsg(msg *protocolMessage) { c.Channels.Get(msg.Channel).notify(msg) } -func (c *Realtime) onReconnected(isNewID bool) { +func (c *Realtime) onReconnected(failedResumeOrRecover bool) { for _, ch := range c.Channels.Iterate() { switch ch.State() { // RTN15g3, RTN15c6, RTN15c7, RTN16l, RTN19b @@ -84,20 +84,19 @@ func (c *Realtime) onReconnected(isNewID bool) { } } - if !isNewID /* RTN15c3, RTN15g3 */ { + if failedResumeOrRecover /* RTN15c3, RTN15g3 */ { // No need to reattach: state is preserved. We just need to flush the // queue of pending messages. // TODO - Once channel is attached, channel queue will be flushed // for _, ch := range c.Channels.Iterate() { // ch.queue.Flush() // } - //RTN19a + //RTN19a1 c.Connection.resendPending() - return + } else { + //RTN19a2 - successful resume, msgSerial doesn't change + c.Connection.resendAcks() } - - //RTN19a - c.Connection.resendPending() } func (c *Realtime) onReconnectionFailed(err *errorInfo) { diff --git a/ably/realtime_conn.go b/ably/realtime_conn.go index e835fc4b..9137a772 100644 --- a/ably/realtime_conn.go +++ b/ably/realtime_conn.go @@ -90,7 +90,7 @@ type connCallbacks struct { // move this up because some implementation details for (RTN15c) requires // access to Channels, and we don't have it here, so we let RealtimeClient do the // work. - onReconnected func(isNewID bool) + onReconnected func(failedResumeOrRecover bool) // onReconnectionFailed is called when we get a FAILED response from a // reconnection request. onReconnectionFailed func(*errorInfo) @@ -705,6 +705,15 @@ func (c *Connection) log() logger { return c.auth.log() } +func (c *Connection) resendAcks() { + c.mtx.Lock() + defer c.mtx.Unlock() + c.log().Debugf("resending %d messages waiting for ACK/NACK", len(c.pending.queue)) + for _, v := range c.pending.queue { + c.conn.Send(v.msg) + } +} + func (c *Connection) resendPending() { c.mtx.Lock() cx := c.pending.Dismiss()