From c493707cbc3497fa8b34b90d0531e475fe1fc7b4 Mon Sep 17 00:00:00 2001 From: Andy Ford Date: Thu, 1 Feb 2024 12:00:27 +0000 Subject: [PATCH 1/8] fix: keep client default client read limit when realtime doesn't specify This is mostly for internal purposes, where we have rate limits of 0 (read: unlimited). When 0 is received from realtime, the SDK should continue with its existing defaults. --- ably/realtime_channel_integration_test.go | 41 +++++++++++++++++++++++ ably/realtime_conn.go | 2 +- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/ably/realtime_channel_integration_test.go b/ably/realtime_channel_integration_test.go index c5a0ec0a..b1fec25e 100644 --- a/ably/realtime_channel_integration_test.go +++ b/ably/realtime_channel_integration_test.go @@ -294,6 +294,47 @@ func TestRealtimeChannel_ShouldSetProvidedReadLimit(t *testing.T) { assert.Equal(t, int64(2048), client.Connection.ReadLimit()) } +func TestRealtimeChannel_SetsNoLimitIfServerNoLimits(t *testing.T) { + // Mock out the dial + dial := DialFunc(func(p string, url *url.URL, timeout time.Duration) (ably.Conn, error) { + return connMock{ + SendFunc: func(m *ably.ProtocolMessage) error { + return nil + }, + ReceiveFunc: func(deadline time.Time) (*ably.ProtocolMessage, error) { + connDetails := ably.ConnectionDetails{ + ClientID: "id1", + ConnectionKey: "foo", + MaxFrameSize: 12, + MaxInboundRate: 14, + MaxMessageSize: 0, + ConnectionStateTTL: ably.DurationFromMsecs(time.Minute * 2), + MaxIdleInterval: ably.DurationFromMsecs(time.Second), + } + + return &ably.ProtocolMessage{ + Action: ably.ActionConnected, + ConnectionID: "connection-id-1", + ConnectionDetails: &connDetails, + }, nil + }, + CloseFunc: func() error { + return nil + }, + }, nil + }) + + _, c := ablytest.NewRealtime(ably.WithDial(dial)) + + // Wait for a little bit for things to settle + time.Sleep(1 * time.Second) + + // Check that the connection read limit is the default - due to websocket.go + // Only allowing the value to be set if the connection is a certain type + // We'll just check -1 here + assert.Equal(t, int64(-1), c.Connection.ReadLimit()) +} + func TestRealtimeChannel_ShouldReturnErrorIfReadLimitExceeded(t *testing.T) { app, client1 := ablytest.NewRealtime(ably.WithEchoMessages(false)) defer safeclose(t, ablytest.FullRealtimeCloser(client1), app) diff --git a/ably/realtime_conn.go b/ably/realtime_conn.go index e82eca03..d13e8eaa 100644 --- a/ably/realtime_conn.go +++ b/ably/realtime_conn.go @@ -826,7 +826,7 @@ func (c *Connection) eventloop() { c.connStateTTL = connDetails.ConnectionStateTTL // Spec RSA7b3, RSA7b4, RSA12a c.auth.updateClientID(connDetails.ClientID) - if !c.isReadLimitSetExternally { + if !c.isReadLimitSetExternally && connDetails.MaxMessageSize > 0 { c.readLimit = connDetails.MaxMessageSize // set MaxMessageSize limit as per TO3l8 } } From 514dd2f021fe4856cff16594153dfc6d936878e9 Mon Sep 17 00:00:00 2001 From: Andy Ford Date: Thu, 1 Feb 2024 13:58:43 +0000 Subject: [PATCH 2/8] test: use messagepipe --- ably/realtime_channel_integration_test.go | 48 +++++++++-------------- 1 file changed, 19 insertions(+), 29 deletions(-) diff --git a/ably/realtime_channel_integration_test.go b/ably/realtime_channel_integration_test.go index b1fec25e..206b6f11 100644 --- a/ably/realtime_channel_integration_test.go +++ b/ably/realtime_channel_integration_test.go @@ -295,36 +295,26 @@ func TestRealtimeChannel_ShouldSetProvidedReadLimit(t *testing.T) { } func TestRealtimeChannel_SetsNoLimitIfServerNoLimits(t *testing.T) { - // Mock out the dial - dial := DialFunc(func(p string, url *url.URL, timeout time.Duration) (ably.Conn, error) { - return connMock{ - SendFunc: func(m *ably.ProtocolMessage) error { - return nil - }, - ReceiveFunc: func(deadline time.Time) (*ably.ProtocolMessage, error) { - connDetails := ably.ConnectionDetails{ - ClientID: "id1", - ConnectionKey: "foo", - MaxFrameSize: 12, - MaxInboundRate: 14, - MaxMessageSize: 0, - ConnectionStateTTL: ably.DurationFromMsecs(time.Minute * 2), - MaxIdleInterval: ably.DurationFromMsecs(time.Second), - } - - return &ably.ProtocolMessage{ - Action: ably.ActionConnected, - ConnectionID: "connection-id-1", - ConnectionDetails: &connDetails, - }, nil - }, - CloseFunc: func() error { - return nil - }, - }, nil - }) + in := make(chan *ably.ProtocolMessage, 1) + out := make(chan *ably.ProtocolMessage, 16) + + _, c := ablytest.NewRealtime(ably.WithDial(MessagePipe(in, out))) + + connDetails := ably.ConnectionDetails{ + ClientID: "id1", + ConnectionKey: "foo", + MaxFrameSize: 12, + MaxInboundRate: 14, + MaxMessageSize: 0, + ConnectionStateTTL: ably.DurationFromMsecs(time.Minute * 2), + MaxIdleInterval: ably.DurationFromMsecs(time.Second), + } - _, c := ablytest.NewRealtime(ably.WithDial(dial)) + in <- &ably.ProtocolMessage{ + Action: ably.ActionConnected, + ConnectionID: "connection-id-1", + ConnectionDetails: &connDetails, + } // Wait for a little bit for things to settle time.Sleep(1 * time.Second) From 6ae052cdb19a27c6b18a63db6bfa086d56bc4a45 Mon Sep 17 00:00:00 2001 From: Andy Ford Date: Thu, 1 Feb 2024 14:14:49 +0000 Subject: [PATCH 3/8] Update ably/realtime_channel_integration_test.go Co-authored-by: sachin shinde --- ably/realtime_channel_integration_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ably/realtime_channel_integration_test.go b/ably/realtime_channel_integration_test.go index 206b6f11..68aec2d5 100644 --- a/ably/realtime_channel_integration_test.go +++ b/ably/realtime_channel_integration_test.go @@ -317,7 +317,8 @@ func TestRealtimeChannel_SetsNoLimitIfServerNoLimits(t *testing.T) { } // Wait for a little bit for things to settle - time.Sleep(1 * time.Second) + err := ablytest.Wait(ablytest.ConnWaiter(c, c.Connect, ably.ConnectionEventConnected), nil) + assert.NoError(t, err) // Check that the connection read limit is the default - due to websocket.go // Only allowing the value to be set if the connection is a certain type From f0184adda8a53c0be1c33400797b0655c22a902e Mon Sep 17 00:00:00 2001 From: Andy Ford Date: Thu, 1 Feb 2024 14:16:22 +0000 Subject: [PATCH 4/8] Update ably/realtime_channel_integration_test.go Co-authored-by: sachin shinde --- ably/realtime_channel_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ably/realtime_channel_integration_test.go b/ably/realtime_channel_integration_test.go index 68aec2d5..359c38f2 100644 --- a/ably/realtime_channel_integration_test.go +++ b/ably/realtime_channel_integration_test.go @@ -294,7 +294,7 @@ func TestRealtimeChannel_ShouldSetProvidedReadLimit(t *testing.T) { assert.Equal(t, int64(2048), client.Connection.ReadLimit()) } -func TestRealtimeChannel_SetsNoLimitIfServerNoLimits(t *testing.T) { +func TestRealtimeChannel_SetDefaultLimitIfNoServerLimit(t *testing.T) { in := make(chan *ably.ProtocolMessage, 1) out := make(chan *ably.ProtocolMessage, 16) From 39e58564e7a303c5cc412694728ebf8dab636eae Mon Sep 17 00:00:00 2001 From: Andy Ford Date: Thu, 1 Feb 2024 14:16:29 +0000 Subject: [PATCH 5/8] Update ably/realtime_channel_integration_test.go Co-authored-by: sachin shinde --- ably/realtime_channel_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ably/realtime_channel_integration_test.go b/ably/realtime_channel_integration_test.go index 359c38f2..1c8425f6 100644 --- a/ably/realtime_channel_integration_test.go +++ b/ably/realtime_channel_integration_test.go @@ -305,7 +305,7 @@ func TestRealtimeChannel_SetDefaultLimitIfNoServerLimit(t *testing.T) { ConnectionKey: "foo", MaxFrameSize: 12, MaxInboundRate: 14, - MaxMessageSize: 0, + MaxMessageSize: 0, // 0 represents no limit on message size ConnectionStateTTL: ably.DurationFromMsecs(time.Minute * 2), MaxIdleInterval: ably.DurationFromMsecs(time.Second), } From acf4f815495b15684d46733e39af8723cd9e2e93 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 2 Feb 2024 01:01:08 +0530 Subject: [PATCH 6/8] Implemented recursive unwrap for detecting websocketConn while setting connection readlimit --- ably/ably_test.go | 4 ++++ ably/realtime_conn.go | 4 ++++ ably/websocket.go | 15 +++++++++++---- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/ably/ably_test.go b/ably/ably_test.go index 13ed5165..608598b6 100644 --- a/ably/ably_test.go +++ b/ably/ably_test.go @@ -492,6 +492,10 @@ type interceptConn struct { active *activeIntercept } +func (c interceptConn) Unwrap() ably.Conn { + return c.Conn +} + func (c interceptConn) Receive(deadline time.Time) (*ably.ProtocolMessage, error) { msg, err := c.Conn.Receive(deadline) if err != nil { diff --git a/ably/realtime_conn.go b/ably/realtime_conn.go index d13e8eaa..99cf7bc2 100644 --- a/ably/realtime_conn.go +++ b/ably/realtime_conn.go @@ -988,6 +988,10 @@ func (vc verboseConn) Close() error { return vc.conn.Close() } +func (vc verboseConn) Unwrap() conn { + return vc.conn +} + func (c *Connection) setState(state ConnectionState, err error, retryIn time.Duration) error { c.mtx.Lock() defer c.mtx.Unlock() diff --git a/ably/websocket.go b/ably/websocket.go index 9a9370a7..febfc6ae 100644 --- a/ably/websocket.go +++ b/ably/websocket.go @@ -114,12 +114,19 @@ func dialWebsocketTimeout(uri, origin string, timeout time.Duration, agents map[ return c, nil } -func setConnectionReadLimit(c conn, readLimit int64) error { - verboseConn, ok := c.(verboseConn) +func unwrapConn(c conn) conn { + u, ok := c.(interface { + Unwrap() conn + }) if !ok { - return errors.New("cannot set readlimit for connection, connection does not use verboseConn") + return c } - websocketConn, ok := verboseConn.conn.(*websocketConn) + return unwrapConn(u.Unwrap()) +} + +func setConnectionReadLimit(c conn, readLimit int64) error { + unwrappedConn := unwrapConn(c) + websocketConn, ok := unwrappedConn.(*websocketConn) if !ok { return errors.New("cannot set readlimit for connection, connection does not use nhooyr.io/websocket") } From 66a341bf0cf83805aeebfd1fc045d7f1af08c520 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 2 Feb 2024 01:05:38 +0530 Subject: [PATCH 7/8] Added test to check for default readlimit when server has no limit --- ably/realtime_channel_integration_test.go | 45 ++++++++++------------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/ably/realtime_channel_integration_test.go b/ably/realtime_channel_integration_test.go index 1c8425f6..3e9c86a9 100644 --- a/ably/realtime_channel_integration_test.go +++ b/ably/realtime_channel_integration_test.go @@ -294,36 +294,29 @@ func TestRealtimeChannel_ShouldSetProvidedReadLimit(t *testing.T) { assert.Equal(t, int64(2048), client.Connection.ReadLimit()) } -func TestRealtimeChannel_SetDefaultLimitIfNoServerLimit(t *testing.T) { - in := make(chan *ably.ProtocolMessage, 1) - out := make(chan *ably.ProtocolMessage, 16) - - _, c := ablytest.NewRealtime(ably.WithDial(MessagePipe(in, out))) - - connDetails := ably.ConnectionDetails{ - ClientID: "id1", - ConnectionKey: "foo", - MaxFrameSize: 12, - MaxInboundRate: 14, - MaxMessageSize: 0, // 0 represents no limit on message size - ConnectionStateTTL: ably.DurationFromMsecs(time.Minute * 2), - MaxIdleInterval: ably.DurationFromMsecs(time.Second), - } +func TestRealtimeChannel_SetDefaultReadLimitIfServerHasNoLimit(t *testing.T) { - in <- &ably.ProtocolMessage{ - Action: ably.ActionConnected, - ConnectionID: "connection-id-1", - ConnectionDetails: &connDetails, + dial := func(proto string, url *url.URL, timeout time.Duration) (ably.Conn, error) { + return ably.DialWebsocket(proto, url, timeout) } + wrappedDialWebsocket, interceptMsg := DialIntercept(dial) - // Wait for a little bit for things to settle - err := ablytest.Wait(ablytest.ConnWaiter(c, c.Connect, ably.ConnectionEventConnected), nil) - assert.NoError(t, err) + connMsgContext, cancel := context.WithCancel(context.Background()) + connMessageCh := interceptMsg(connMsgContext, ably.ActionConnected) + + app, client := ablytest.NewRealtime(ably.WithDial(wrappedDialWebsocket)) + defer safeclose(t, ablytest.FullRealtimeCloser(client), app) + connWaiter := ablytest.ConnWaiter(client, nil, ably.ConnectionEventConnected) + + msg := <-connMessageCh + msg.ConnectionDetails.MaxMessageSize = 0 // 0 represents limitless message size + cancel() + + err := ablytest.Wait(connWaiter, nil) + assert.Nil(t, err) - // Check that the connection read limit is the default - due to websocket.go - // Only allowing the value to be set if the connection is a certain type - // We'll just check -1 here - assert.Equal(t, int64(-1), c.Connection.ReadLimit()) + // If server set limit is 0, value is set to default readlimit + assert.Equal(t, int64(65536), client.Connection.ReadLimit()) } func TestRealtimeChannel_ShouldReturnErrorIfReadLimitExceeded(t *testing.T) { From e85f788c6e52e7abd189f2de3e85cab0d16711ca Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 2 Feb 2024 01:11:22 +0530 Subject: [PATCH 8/8] Refactored conn readlimit test when set by server --- ably/realtime_channel_integration_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ably/realtime_channel_integration_test.go b/ably/realtime_channel_integration_test.go index 3e9c86a9..0deefa96 100644 --- a/ably/realtime_channel_integration_test.go +++ b/ably/realtime_channel_integration_test.go @@ -301,18 +301,18 @@ func TestRealtimeChannel_SetDefaultReadLimitIfServerHasNoLimit(t *testing.T) { } wrappedDialWebsocket, interceptMsg := DialIntercept(dial) - connMsgContext, cancel := context.WithCancel(context.Background()) - connMessageCh := interceptMsg(connMsgContext, ably.ActionConnected) + ctx, cancel := context.WithCancel(context.Background()) + msgCh := interceptMsg(ctx, ably.ActionConnected) app, client := ablytest.NewRealtime(ably.WithDial(wrappedDialWebsocket)) defer safeclose(t, ablytest.FullRealtimeCloser(client), app) - connWaiter := ablytest.ConnWaiter(client, nil, ably.ConnectionEventConnected) + connectedWaiter := ablytest.ConnWaiter(client, nil, ably.ConnectionEventConnected) - msg := <-connMessageCh - msg.ConnectionDetails.MaxMessageSize = 0 // 0 represents limitless message size - cancel() + connectedMsg := <-msgCh + connectedMsg.ConnectionDetails.MaxMessageSize = 0 // 0 represents limitless message size + cancel() // unblocks updated message to be processed - err := ablytest.Wait(connWaiter, nil) + err := ablytest.Wait(connectedWaiter, nil) assert.Nil(t, err) // If server set limit is 0, value is set to default readlimit