Skip to content

Commit

Permalink
Merge pull request #631 from ably/connection-read-limit-unlimited
Browse files Browse the repository at this point in the history
fix: keep client default client read limit when realtime doesn't specify
  • Loading branch information
AndyTWF authored Feb 2, 2024
2 parents cc9e260 + e85f788 commit 7a94a5a
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 5 deletions.
4 changes: 4 additions & 0 deletions ably/ably_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,10 @@ type interceptConn struct {
active *activeIntercept
}

func (c interceptConn) Unwrap() ably.Conn {
return c.Conn
}

func (c interceptConn) Receive(deadline time.Time) (*ably.ProtocolMessage, error) {
msg, err := c.Conn.Receive(deadline)
if err != nil {
Expand Down
25 changes: 25 additions & 0 deletions ably/realtime_channel_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,31 @@ func TestRealtimeChannel_ShouldSetProvidedReadLimit(t *testing.T) {
assert.Equal(t, int64(2048), client.Connection.ReadLimit())
}

func TestRealtimeChannel_SetDefaultReadLimitIfServerHasNoLimit(t *testing.T) {

dial := func(proto string, url *url.URL, timeout time.Duration) (ably.Conn, error) {
return ably.DialWebsocket(proto, url, timeout)
}
wrappedDialWebsocket, interceptMsg := DialIntercept(dial)

ctx, cancel := context.WithCancel(context.Background())
msgCh := interceptMsg(ctx, ably.ActionConnected)

app, client := ablytest.NewRealtime(ably.WithDial(wrappedDialWebsocket))
defer safeclose(t, ablytest.FullRealtimeCloser(client), app)
connectedWaiter := ablytest.ConnWaiter(client, nil, ably.ConnectionEventConnected)

connectedMsg := <-msgCh
connectedMsg.ConnectionDetails.MaxMessageSize = 0 // 0 represents limitless message size
cancel() // unblocks updated message to be processed

err := ablytest.Wait(connectedWaiter, nil)
assert.Nil(t, err)

// If server set limit is 0, value is set to default readlimit
assert.Equal(t, int64(65536), client.Connection.ReadLimit())
}

func TestRealtimeChannel_ShouldReturnErrorIfReadLimitExceeded(t *testing.T) {
app, client1 := ablytest.NewRealtime(ably.WithEchoMessages(false))
defer safeclose(t, ablytest.FullRealtimeCloser(client1), app)
Expand Down
6 changes: 5 additions & 1 deletion ably/realtime_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ func (c *Connection) eventloop() {
c.connStateTTL = connDetails.ConnectionStateTTL
// Spec RSA7b3, RSA7b4, RSA12a
c.auth.updateClientID(connDetails.ClientID)
if !c.isReadLimitSetExternally {
if !c.isReadLimitSetExternally && connDetails.MaxMessageSize > 0 {
c.readLimit = connDetails.MaxMessageSize // set MaxMessageSize limit as per TO3l8
}
}
Expand Down Expand Up @@ -988,6 +988,10 @@ func (vc verboseConn) Close() error {
return vc.conn.Close()
}

func (vc verboseConn) Unwrap() conn {
return vc.conn
}

func (c *Connection) setState(state ConnectionState, err error, retryIn time.Duration) error {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand Down
15 changes: 11 additions & 4 deletions ably/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,19 @@ func dialWebsocketTimeout(uri, origin string, timeout time.Duration, agents map[
return c, nil
}

func setConnectionReadLimit(c conn, readLimit int64) error {
verboseConn, ok := c.(verboseConn)
func unwrapConn(c conn) conn {
u, ok := c.(interface {
Unwrap() conn
})
if !ok {
return errors.New("cannot set readlimit for connection, connection does not use verboseConn")
return c
}
websocketConn, ok := verboseConn.conn.(*websocketConn)
return unwrapConn(u.Unwrap())
}

func setConnectionReadLimit(c conn, readLimit int64) error {
unwrappedConn := unwrapConn(c)
websocketConn, ok := unwrappedConn.(*websocketConn)
if !ok {
return errors.New("cannot set readlimit for connection, connection does not use nhooyr.io/websocket")
}
Expand Down

0 comments on commit 7a94a5a

Please sign in to comment.