Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4327] Feature - realtime fallbacks #657

Merged
merged 30 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f0c9b1c
Added separate struct for realtimeHosts and hostcache to handle realt…
sacOO7 Aug 5, 2024
afe392a
Added code to options to check for active internet connection
sacOO7 Aug 5, 2024
db45ce8
Removed REST specific (hostcache) implementation from hosts.go
sacOO7 Aug 6, 2024
e04ed9d
Updated export test methods for realtimeHost
sacOO7 Aug 6, 2024
9e25796
Added unit tests for realtime host fallbacks
sacOO7 Aug 6, 2024
a5964df
Moved timeoutOrDnsError check to error.go file
sacOO7 Aug 7, 2024
ef8bcee
Added test for checking active internet connection
sacOO7 Aug 7, 2024
7960515
Added hosts as a separate property to connection struct, removed unus…
sacOO7 Aug 9, 2024
06ceed1
Updated websocketConn to include http resp field that stores handshak…
sacOO7 Aug 12, 2024
e449475
Added method to extract http response from conn interface
sacOO7 Aug 12, 2024
b67165f
Updated realtime implementation to connect using fallback hosts
sacOO7 Aug 12, 2024
823d0fc
Updated realtime url to return url based on given host
sacOO7 Aug 12, 2024
982e5cd
Fixed failing tests responsible for checking agent headers
sacOO7 Aug 12, 2024
edeb38d
Fixed realtimeUrl method on clientOptions
sacOO7 Aug 12, 2024
a22202b
Added integration tests for realtime host fallback
sacOO7 Aug 13, 2024
2168f83
Added RTN17e host fallback implementation for active realtime host,
sacOO7 Aug 13, 2024
2a8d98e
Simplified realtime fallbacks, Removed use of separate hosts struct a…
sacOO7 Aug 13, 2024
15ad3b6
Added type for websocketErr, implemented passing integration test for…
sacOO7 Aug 14, 2024
d240ee3
Added integration test for host fallback timeout and server error
sacOO7 Aug 15, 2024
d368039
Reverted sandbox clientOptions changes, refactored opts param
sacOO7 Aug 15, 2024
b5b9f4b
Updated README, removed limitation for connection failure handling
sacOO7 Aug 15, 2024
918eade
Merge branch 'main' into feature/realtime-fallbacks
sacOO7 Aug 21, 2024
992a9e3
Using errors.Is instead of errors.As for checking dns error
sacOO7 Aug 22, 2024
a0242d4
Refactored realtime host fallback tests as per review comment
sacOO7 Aug 22, 2024
f484445
Removed use of deprecated io/ioutil, replaced with io package instead
sacOO7 Aug 22, 2024
a202171
Refactored implementation for realtime host fallback with proper erro…
sacOO7 Aug 22, 2024
049151c
Revert "Using errors.Is instead of errors.As for checking dns error"
sacOO7 Aug 22, 2024
5254cae
Refactored realtime host tests and internet check code as per review …
sacOO7 Aug 22, 2024
22d35c9
Updated code as per spec for checking active internet connection
sacOO7 Aug 23, 2024
63cb110
Updated realtime test for fallback hosts, removed code for possible f…
sacOO7 Aug 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,6 @@ See [jwt auth issue](https://github.com/ably/ably-go/issues/569) for more detail
- Inband reauthentication is not supported; expiring tokens will trigger a disconnection and resume of a realtime
connection. See [server initiated auth](https://github.com/ably/ably-go/issues/228) for more details.

- Realtime connection failure handling is partially implemented. See [host fallback](https://github.com/ably/ably-go/issues/225) for more details.

- Channel suspended state is partially implemented. See [suspended channel state](https://github.com/ably/ably-go/issues/568).

- Realtime Ping function is not implemented.
Expand Down
11 changes: 11 additions & 0 deletions ably/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ func errFromUnprocessableBody(resp *http.Response) error {
return &ErrorInfo{Code: ErrBadRequest, StatusCode: resp.StatusCode, err: err}
}

func isTimeoutOrDnsErr(err error) bool {
var netErr net.Error
amnonbc marked this conversation as resolved.
Show resolved Hide resolved
if errors.As(err, &netErr) {
if netErr.Timeout() { // RSC15l2
return true
}
}
var dnsErr *net.DNSError
return errors.As(err, &dnsErr) // RSC15l1
amnonbc marked this conversation as resolved.
Show resolved Hide resolved
}

func checkValidHTTPResponse(resp *http.Response) error {
type errorBody struct {
Error errorInfo `json:"error,omitempty" codec:"error,omitempty"`
Expand Down
16 changes: 12 additions & 4 deletions ably/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ func (opts *clientOptions) RestURL() string {
return opts.restURL()
}

func (opts *clientOptions) RealtimeURL() string {
return opts.realtimeURL()
}

func (c *REST) Post(ctx context.Context, path string, in, out interface{}) (*http.Response, error) {
return c.post(ctx, path, in, out)
}
Expand Down Expand Up @@ -93,6 +89,10 @@ func (c *REST) GetCachedFallbackHost() string {
return c.hostCache.get()
}

func (c *REST) ActiveRealtimeHost() string {
return c.activeRealtimeHost
}

func (c *RealtimeChannel) GetChannelSerial() string {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand Down Expand Up @@ -121,6 +121,10 @@ func (opts *clientOptions) GetFallbackRetryTimeout() time.Duration {
return opts.fallbackRetryTimeout()
}

func (opts *clientOptions) HasActiveInternetConnection() bool {
return opts.hasActiveInternetConnection()
amnonbc marked this conversation as resolved.
Show resolved Hide resolved
}

func NewErrorInfo(code ErrorCode, err error) *ErrorInfo {
return newError(code, err)
}
Expand Down Expand Up @@ -222,6 +226,10 @@ func (c *Connection) SetKey(key string) {
c.key = key
}

func (r *Realtime) Rest() *REST {
return r.rest
}

func (c *RealtimePresence) Members() map[string]*PresenceMessage {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand Down
23 changes: 20 additions & 3 deletions ably/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
"log"
"net"
"net/http"
Expand All @@ -28,6 +29,9 @@ const (
Port = 80
TLSPort = 443
maxMessageSize = 65536 // 64kb, default value TO3l8

internetCheckUrl = "https://internet-up.ably-realtime.com/is-the-internet-up.txt"
internetCheckOk = "yes\n"
)

var defaultOptions = clientOptions{
Expand Down Expand Up @@ -482,10 +486,10 @@ func (opts *clientOptions) restURL() (restUrl string) {
return "https://" + baseUrl
}

func (opts *clientOptions) realtimeURL() (realtimeUrl string) {
baseUrl := opts.getRealtimeHost()
func (opts *clientOptions) realtimeURL(realtimeHost string) (realtimeUrl string) {
baseUrl := realtimeHost
_, _, err := net.SplitHostPort(baseUrl)
if err != nil { // set port if not set in baseUrl
if err != nil { // set port if not set in provided realtimeHost
port, _ := opts.activePort()
baseUrl = net.JoinHostPort(baseUrl, strconv.Itoa(port))
}
Expand Down Expand Up @@ -595,6 +599,19 @@ func (opts *clientOptions) idempotentRESTPublishing() bool {
return opts.IdempotentRESTPublishing
}

func (opts *clientOptions) hasActiveInternetConnection() bool {
res, err := opts.httpclient().Get(internetCheckUrl)
if err != nil {
return false
}
data, err := ioutil.ReadAll(res.Body)
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
res.Body.Close()
if err != nil {
return false
}
return string(data) == internetCheckOk
amnonbc marked this conversation as resolved.
Show resolved Hide resolved
}

type ScopeParams struct {
Start time.Time
End time.Time
Expand Down
45 changes: 23 additions & 22 deletions ably/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,32 @@ import (
)

func TestDefaultFallbacks_RSC15h(t *testing.T) {
t.Run("with env should return environment fallback hosts", func(t *testing.T) {
expectedFallBackHosts := []string{
"a.ably-realtime.com",
"b.ably-realtime.com",
"c.ably-realtime.com",
"d.ably-realtime.com",
"e.ably-realtime.com",
}
hosts := ably.DefaultFallbackHosts()
assert.Equal(t, expectedFallBackHosts, hosts)
})
expectedFallBackHosts := []string{
"a.ably-realtime.com",
"b.ably-realtime.com",
"c.ably-realtime.com",
"d.ably-realtime.com",
"e.ably-realtime.com",
}
hosts := ably.DefaultFallbackHosts()
assert.Equal(t, expectedFallBackHosts, hosts)
}

func TestEnvFallbackHosts_RSC15i(t *testing.T) {
t.Run("with env should return environment fallback hosts", func(t *testing.T) {
expectedFallBackHosts := []string{
"sandbox-a-fallback.ably-realtime.com",
"sandbox-b-fallback.ably-realtime.com",
"sandbox-c-fallback.ably-realtime.com",
"sandbox-d-fallback.ably-realtime.com",
"sandbox-e-fallback.ably-realtime.com",
}
hosts := ably.GetEnvFallbackHosts("sandbox")
assert.Equal(t, expectedFallBackHosts, hosts)
})
expectedFallBackHosts := []string{
"sandbox-a-fallback.ably-realtime.com",
"sandbox-b-fallback.ably-realtime.com",
"sandbox-c-fallback.ably-realtime.com",
"sandbox-d-fallback.ably-realtime.com",
"sandbox-e-fallback.ably-realtime.com",
}
hosts := ably.GetEnvFallbackHosts("sandbox")
assert.Equal(t, expectedFallBackHosts, hosts)
}

func TestInternetConnectionCheck_RTN17c(t *testing.T) {
clientOptions := ably.NewClientOptions()
assert.True(t, clientOptions.HasActiveInternetConnection())
amnonbc marked this conversation as resolved.
Show resolved Hide resolved
}

func TestFallbackHosts_RSC15b(t *testing.T) {
Expand Down
179 changes: 179 additions & 0 deletions ably/realtime_client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/ably/ably-go/ably"
"github.com/ably/ably-go/ably/internal/ablyutil"
"github.com/ably/ably-go/ablytest"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -132,6 +133,184 @@ func TestRealtime_RSC7_AblyAgent(t *testing.T) {
})
}

func TestRealtime_RTN17_HostFallback(t *testing.T) {
t.Parallel()

getDNSErr := func() *net.DNSError {
return &net.DNSError{
IsTimeout: false,
}
}

getTimeoutErr := func() error {
return &errTimeout{}
}

setUpWithError := func(err error, opts ...ably.ClientOption) (visitedHosts []string) {
client, _ := ably.NewRealtime(append(opts, ably.WithAutoConnect(false), ably.WithKey("fake:key"),
ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) {
visitedHosts = append(visitedHosts, u.Hostname())
return nil, err
}))...)
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventDisconnected), nil)
return
}

t.Run("RTN17a: First attempt should be on default host first", func(t *testing.T) {
visitedHosts := setUpWithError(fmt.Errorf("host url is wrong"))
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
expectedHost := "realtime.ably.io"

assert.Equal(t, expectedHost, visitedHosts[0])
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
})

t.Run("RTN17b: Fallback behaviour", func(t *testing.T) {
t.Parallel()

t.Run("apply when default realtime endpoint is not overridden, port/tlsport not set", func(t *testing.T) {
visitedHosts := setUpWithError(getTimeoutErr())
expectedPrimaryHost := "realtime.ably.io"
expectedFallbackHosts := ably.DefaultFallbackHosts()

assert.Equal(t, 6, len(visitedHosts))
assert.Equal(t, expectedPrimaryHost, visitedHosts[0])
assert.ElementsMatch(t, expectedFallbackHosts, visitedHosts[1:])
})

t.Run("does not apply when the custom realtime endpoint is used", func(t *testing.T) {
visitedHosts := setUpWithError(getTimeoutErr(), ably.WithRealtimeHost("custom-realtime.ably.io"))
expectedHost := "custom-realtime.ably.io"

assert.Equal(t, 1, len(visitedHosts))
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, expectedHost, visitedHosts[0])
})

t.Run("apply when fallbacks are provided", func(t *testing.T) {
fallbacks := []string{"fallback0", "fallback1", "fallback2"}
visitedHosts := setUpWithError(getTimeoutErr(), ably.WithFallbackHosts(fallbacks))
expectedPrimaryHost := "realtime.ably.io"

assert.Equal(t, 4, len(visitedHosts))
assert.Equal(t, expectedPrimaryHost, visitedHosts[0])
assert.ElementsMatch(t, fallbacks, visitedHosts[1:])
})

t.Run("apply when fallbackHostUseDefault is true, even if env. or host is set", func(t *testing.T) {
visitedHosts := setUpWithError(
getTimeoutErr(),
ably.WithFallbackHostsUseDefault(true),
ably.WithEnvironment("custom"),
ably.WithRealtimeHost("custom-ably.realtime.com"))

expectedPrimaryHost := "custom-ably.realtime.com"
expectedFallbackHosts := ably.DefaultFallbackHosts()

assert.Equal(t, 6, len(visitedHosts))
assert.Equal(t, expectedPrimaryHost, visitedHosts[0])
assert.ElementsMatch(t, expectedFallbackHosts, visitedHosts[1:])
})
})

t.Run("RTN17c: Verifies internet connection is active in case of error necessitating use of an alternative host", func(t *testing.T) {
t.Parallel()
const internetCheckUrl = "https://internet-up.ably-realtime.com/is-the-internet-up.txt"
rec, optn := ablytest.NewHttpRecorder()
visitedHosts := setUpWithError(getDNSErr(), optn...)
assert.Equal(t, 6, len(visitedHosts)) // including primary host
assert.Equal(t, 5, len(rec.Requests()))
for _, request := range rec.Requests() {
assert.Equal(t, request.URL.String(), internetCheckUrl)
}
})
coderabbitai[bot] marked this conversation as resolved.
Show resolved Hide resolved

t.Run("RTN17d: Check for compatible errors before attempting to reconnect to a fallback host", func(t *testing.T) {
visitedHosts := setUpWithError(fmt.Errorf("host url is wrong")) // non-dns or non-timeout error
assert.Equal(t, 1, len(visitedHosts))
visitedHosts = setUpWithError(getDNSErr())
assert.Equal(t, 6, len(visitedHosts))
visitedHosts = setUpWithError(getTimeoutErr())
assert.Equal(t, 6, len(visitedHosts))
})

t.Run("RTN17e: Same fallback host should be used for REST as Realtime Fallback Host for a given active connection", func(t *testing.T) {
errCh := make(chan error, 1)
errCh <- getTimeoutErr()
realtimeMsgRecorder := NewMessageRecorder() // websocket recorder
restMsgRecorder, optn := ablytest.NewHttpRecorder() // http recorder
_, client := ablytest.NewRealtime(ably.WithAutoConnect(false),
ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) {
err, ok := <-errCh
if ok {
close(errCh)
return nil, err // return timeout error for primary host
}
return realtimeMsgRecorder.Dial(protocol, u, timeout) // return dial for subsequent dials
}), optn[0])
defer client.Close()

err := ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected), nil)
if err != nil {
t.Fatalf("Error connecting host with error %v", err)
}
realtimeSuccessHost := realtimeMsgRecorder.URLs()[0].Hostname()
fallbackHosts := ably.GetEnvFallbackHosts("sandbox")
if !ablyutil.SliceContains(fallbackHosts, realtimeSuccessHost) {
t.Fatalf("realtime host must be one of fallback hosts, received %v", realtimeSuccessHost)
}

client.Time(context.Background()) // make a rest request
restSuccessHost := restMsgRecorder.Request(1).URL.Hostname() // second request is to get the time, first for internet connection
assert.Equal(t, realtimeSuccessHost, restSuccessHost)
})
}

func TestRealtime_RTN17_Integration_HostFallback_Internal_Server_Error(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer server.Close()
serverURL, err := url.Parse(server.URL)
assert.NoError(t, err)

app, realtime := ablytest.NewRealtime(
ably.WithAutoConnect(false),
ably.WithTLS(false),
ably.WithUseTokenAuth(true),
ably.WithFallbackHosts([]string{"sandbox-a-fallback.ably-realtime.com"}),
ably.WithRealtimeHost(serverURL.Host))

defer safeclose(t, ablytest.FullRealtimeCloser(realtime), app)

err = ablytest.Wait(ablytest.ConnWaiter(realtime, realtime.Connect, ably.ConnectionEventConnected), nil)
assert.Nil(t, err)
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved

assert.Equal(t, "sandbox-a-fallback.ably-realtime.com", realtime.Rest().ActiveRealtimeHost())
}

func TestRealtime_RTN17_Integration_HostFallback_Timeout(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(3 * time.Second)
w.WriteHeader(http.StatusSwitchingProtocols)
}))
defer server.Close()
serverURL, err := url.Parse(server.URL)
assert.NoError(t, err)

app, realtime := ablytest.NewRealtime(
ably.WithAutoConnect(false),
ably.WithTLS(false),
ably.WithUseTokenAuth(true),
ably.WithFallbackHosts([]string{"sandbox-a-fallback.ably-realtime.com"}),
ably.WithRealtimeRequestTimeout(2*time.Second),
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
ably.WithRealtimeHost(serverURL.Host))

defer safeclose(t, ablytest.FullRealtimeCloser(realtime), app)

err = ablytest.Wait(ablytest.ConnWaiter(realtime, realtime.Connect, ably.ConnectionEventConnected), nil)
assert.Nil(t, err)
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved

assert.Equal(t, "sandbox-a-fallback.ably-realtime.com", realtime.Rest().ActiveRealtimeHost())
}

func checkUnique(ch chan string, typ string, n int) error {
close(ch)
uniq := make(map[string]struct{}, n)
Expand Down
Loading
Loading