Skip to content

Commit

Permalink
Merge pull request #661 from ably/feature/server-initiated-auth
Browse files Browse the repository at this point in the history
[ECO-4330] Feature - server initiated auth
  • Loading branch information
sacOO7 authored Sep 6, 2024
2 parents 8b38951 + bd39bc3 commit 21b1fd4
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 30 deletions.
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,6 @@ See [jwt auth issue](https://github.com/ably/ably-go/issues/569) for more detail

### Realtime API

- Inband reauthentication is not supported; expiring tokens will trigger a disconnection and resume of a realtime
connection. See [server initiated auth](https://github.com/ably/ably-go/issues/228) for more details.

- Channel suspended state is partially implemented. See [suspended channel state](https://github.com/ably/ably-go/issues/568).

- Realtime Ping function is not implemented.
Expand Down
19 changes: 19 additions & 0 deletions ably/ably_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,25 @@ func (rec *MessageRecorder) CheckIfSent(action ably.ProtoAction, times int) func
}
}

func (rec *MessageRecorder) CheckIfReceived(action ably.ProtoAction, times int) func() bool {
return func() bool {
counter := 0
for _, m := range rec.Received() {
if m.Action == action {
counter++
if counter == times {
return true
}
}
}
// Check if no msg of given action type received
if times == 0 && counter == 0 {
return true
}
return false
}
}

func (rec *MessageRecorder) FindFirst(action ably.ProtoAction) *ably.ProtocolMessage {
for _, m := range rec.Sent() {
if m.Action == action {
Expand Down
9 changes: 6 additions & 3 deletions ably/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Auth struct {

// onExplicitAuthorize is the callback that Realtime sets to reauthorize with the
// server when Authorize is explicitly called.
onExplicitAuthorize func(context.Context, *TokenDetails)
onExplicitAuthorize func(context.Context, *TokenDetails) error

serverTimeOffset time.Duration

Expand All @@ -92,7 +92,7 @@ type Auth struct {
func newAuth(client *REST) (*Auth, error) {
a := &Auth{
client: client,
onExplicitAuthorize: func(context.Context, *TokenDetails) {},
onExplicitAuthorize: func(context.Context, *TokenDetails) error { return nil },
}
method, err := detectAuthMethod(a.opts())
if err != nil {
Expand Down Expand Up @@ -313,7 +313,10 @@ func (a *Auth) Authorize(ctx context.Context, params *TokenParams, setOpts ...Au
if err != nil {
return nil, err
}
a.onExplicitAuthorize(ctx, token)
err = a.onExplicitAuthorize(ctx, token)
if err != nil {
return nil, err
}
return token, nil
}

Expand Down
5 changes: 5 additions & 0 deletions ably/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func (c *Connection) ConnectionStateTTL() time.Duration {
return c.connectionStateTTL()
}

func (r *Realtime) Logger() logger {
return r.log()
}

func NewInternalLogger(l Logger) logger {
return logger{l: l}
}
Expand Down Expand Up @@ -319,6 +323,7 @@ const (
ActionPresence = actionPresence
ActionMessage = actionMessage
ActionSync = actionSync
ActionAuth = actionAuth

FlagHasPresence = flagHasPresence
FlagHasBacklog = flagHasBacklog
Expand Down
3 changes: 3 additions & 0 deletions ably/proto_protocol_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ func (msg *protocolMessage) String() string {
case actionMessage:
return fmt.Sprintf("(action=%q, id=%q, messages=%v)", msg.Action,
msg.ConnectionID, msg.Messages)
case actionAuth:
return fmt.Sprintf("(action=%q, id=%q, auth=%v)", msg.Action,
msg.ConnectionID, msg.Auth)
default:
return fmt.Sprintf("%#v", msg)
}
Expand Down
95 changes: 77 additions & 18 deletions ably/realtime_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func newConn(opts *clientOptions, auth *Auth, callbacks connCallbacks, client *R
readLimit: maxMessageSize,
recover: opts.Recover,
}
auth.onExplicitAuthorize = c.onClientAuthorize
auth.onExplicitAuthorize = c.onExplicitAuthorize
c.queue = newMsgQueue(c)
if !opts.NoConnect {
c.setState(ConnectionStateConnecting, nil, 0)
Expand Down Expand Up @@ -780,7 +780,7 @@ func (c *Connection) eventloop() {
c.mtx.Unlock()
return
}
// RTN23a
// RTN23a, RTN15a
c.lockSetState(ConnectionStateDisconnected, err, 0)
c.mtx.Unlock()
arg := connArgs{
Expand Down Expand Up @@ -830,6 +830,7 @@ func (c *Connection) eventloop() {
c.mtx.Unlock()

c.failedConnSideEffects(msg.Error)
return
case actionConnected:
c.mtx.Lock()

Expand Down Expand Up @@ -887,33 +888,40 @@ func (c *Connection) eventloop() {
c.callbacks.onReconnected(failedResumeOrRecover)
}
c.queue.Flush()
case actionDisconnected:
if !isTokenError(msg.Error) {
// The spec doesn't say what to do in this case, so do nothing.
// Ably is supposed to then close the transport, which will
// trigger a transition to DISCONNECTED.
continue
}

if !c.auth.isTokenRenewable() {
case actionDisconnected: // RTN15h
if isTokenError(msg.Error) {
// RTN15h1
c.failedConnSideEffects(msg.Error)
if !c.auth.isTokenRenewable() {
c.failedConnSideEffects(msg.Error)
return
}
// RTN15h2, RTN22a
c.setState(ConnectionStateDisconnected, newErrorFromProto(msg.Error), 0)
c.reauthorize(connArgs{
lastActivityAt: lastActivityAt,
connDetails: connDetails,
})
return
}

// RTN15h2
c.reauthorize(connArgs{
// RTN15h3
c.setState(ConnectionStateDisconnected, newErrorFromProto(msg.Error), 0)
c.reconnect(connArgs{
lastActivityAt: lastActivityAt,
connDetails: connDetails,
})
return

case actionClosed:
c.mtx.Lock()
c.lockSetState(ConnectionStateClosed, nil, 0)
c.mtx.Unlock()
if c.conn != nil {
c.conn.Close()
}
case actionAuth: // RTN22
canceledCtx, cancel := context.WithCancel(context.Background())
cancel() // Cancel context to unblock current eventloop to receieve new messages
c.auth.Authorize(canceledCtx, c.auth.params)
default:
c.callbacks.onChannelMsg(msg)
}
Expand Down Expand Up @@ -952,8 +960,30 @@ func (c *Connection) reauthorize(arg connArgs) {
c.reconnect(arg)
}

func (c *Connection) onClientAuthorize(ctx context.Context, token *TokenDetails) {
switch c.State() {
func (c *Connection) onExplicitAuthorize(ctx context.Context, token *TokenDetails) error {
switch state := c.State(); state {
case ConnectionStateConnecting:
// RTC8b says: "all current connection attempts should be halted, and
// after obtaining a new token the library should immediately initiate a
// connection attempt using the new token". But the WebSocket library
// doesn't really allow us to halt the connection attempt. Instead, once
// the connection transitions out of CONNECTING (either to CONNECTED or
// to a failure state), we attempt to connect again, which will use
// the new token.
c.log().Info("client-requested authorization while CONNECTING. Will reconnect with new token.")
done := make(chan error)

c.internalEmitter.OnceAll(func(_ ConnectionStateChange) {
done <- c.onExplicitAuthorize(ctx, token)
})

select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
return err
}

case ConnectionStateConnected:
c.log().Verbosef("starting client-requested reauthorization with token: %+v", token)

Expand All @@ -974,9 +1004,38 @@ func (c *Connection) onClientAuthorize(ctx context.Context, token *TokenDetails)

select {
case <-ctx.Done():
case <-changes:
return ctx.Err()
case change := <-changes:
return change.Reason.unwrapNil()
}

case
ConnectionStateDisconnected,
ConnectionStateSuspended,
ConnectionStateFailed,
ConnectionStateClosed:
c.log().Infof("client-requested authorization while %s: connecting with new token", state)

done := make(chan error)
c.internalEmitter.OnceAll(func(change ConnectionStateChange) {
if change.Current == ConnectionStateConnecting {
done <- c.onExplicitAuthorize(ctx, token)
} else {
done <- change.Reason.unwrapNil()
}
})

c.Connect()

select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
return err
}
}

return nil
}

func (c *Connection) lockedReauthorizationFailed(err error) {
Expand Down
Loading

0 comments on commit 21b1fd4

Please sign in to comment.