Skip to content

Commit

Permalink
Added tests for server initiated sync
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Nov 12, 2023
1 parent 8919bed commit 2c088ac
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 0 deletions.
12 changes: 12 additions & 0 deletions ably/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,18 @@ func (c *RealtimePresence) GetMembers() map[string]*PresenceMessage {
return c.members
}

func (c *RealtimePresence) SyncInitial() bool {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.syncState == syncInitial
}

func (c *RealtimePresence) SyncInProgress() bool {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.syncState == syncInProgress
}

func (c *Connection) ConnectionStateTTL() time.Duration {
return c.connectionStateTTL()
}
Expand Down
238 changes: 238 additions & 0 deletions ably/proto_presence_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func Test_PresenceMap_RTP2(t *testing.T) {
Presence: []*ably.PresenceMessage{presenceMsg1, presenceMsg2, presenceMsg3},
}

assert.False(t, channel.Presence.SyncInitial())
assert.True(t, channel.Presence.SyncComplete())

in <- &ably.ProtocolMessage{
Expand All @@ -447,6 +448,8 @@ func Test_PresenceMap_RTP2(t *testing.T) {
}

ablytest.Instantly.Recv(t, nil, stateChanges, t.Fatalf)

assert.True(t, channel.Presence.SyncInitial())
assert.False(t, channel.Presence.SyncComplete())

in <- syncMessage
Expand All @@ -455,6 +458,7 @@ func Test_PresenceMap_RTP2(t *testing.T) {
ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf)

assert.True(t, channel.Presence.SyncComplete())
assert.False(t, channel.Presence.SyncInitial())
presenceMembers := channel.Presence.GetMembers()
assert.Equal(t, 1, len(presenceMembers))
})
Expand Down Expand Up @@ -606,16 +610,250 @@ func Test_PresenceMap_RTP2(t *testing.T) {
}

func Test_Presence_server_initiated_sync_RTP18(t *testing.T) {

const channelRetryTimeout = 123 * time.Millisecond
const realtimeRequestTimeout = 2 * time.Second

setup := func(t *testing.T) (
in, out chan *ably.ProtocolMessage,
c *ably.Realtime,
channel *ably.RealtimeChannel,
stateChanges ably.ChannelStateChanges,
afterCalls chan ablytest.AfterCall,
presenceMsgCh chan *ably.PresenceMessage,
) {
in = make(chan *ably.ProtocolMessage, 1)
out = make(chan *ably.ProtocolMessage, 16)
presenceMsgCh = make(chan *ably.PresenceMessage, 16)

afterCalls = make(chan ablytest.AfterCall, 1)
now, after := ablytest.TimeFuncs(afterCalls)

c, _ = ably.NewRealtime(
ably.WithToken("fake:token"),
ably.WithAutoConnect(false),
ably.WithChannelRetryTimeout(channelRetryTimeout),
ably.WithRealtimeRequestTimeout(realtimeRequestTimeout),
ably.WithDial(MessagePipe(in, out)),
ably.WithNow(now),
ably.WithAfter(after),
)

in <- &ably.ProtocolMessage{
Action: ably.ActionConnected,
ConnectionID: "connection-id",
ConnectionDetails: &ably.ConnectionDetails{},
}

err := ablytest.Wait(ablytest.ConnWaiter(c, c.Connect, ably.ConnectionEventConnected), nil)
assert.NoError(t, err)

channel = c.Channels.Get("test")
stateChanges = make(ably.ChannelStateChanges, 10)
channel.OnAll(stateChanges.Receive)

in <- &ably.ProtocolMessage{
Action: ably.ActionAttached,
Channel: channel.Name,
}

var change ably.ChannelStateChange

ablytest.Instantly.Recv(t, &change, stateChanges, t.Fatalf)
assert.Equal(t, ably.ChannelStateAttached, change.Current,
"expected %v; got %v (event: %+v)", ably.ChannelStateAttached, change.Current)

channel.Presence.SubscribeAll(context.Background(), func(message *ably.PresenceMessage) {
presenceMsgCh <- message
})
return
}

t.Run("RTP18a: client determines a new sync started with <sync sequence id>:<cursor value>", func(t *testing.T) {
in, _, _, channel, _, _, presenceMsgCh := setup(t)

initialMembers := channel.Presence.GetMembers()
assert.Empty(t, initialMembers)

presenceMsg1 := &ably.PresenceMessage{
Action: ably.PresenceActionPresent,
Message: ably.Message{
ID: "987:12:5",
Timestamp: 125,
ConnectionID: "987",
ClientID: "999",
},
}

presenceMsg2 := &ably.PresenceMessage{
Action: ably.PresenceActionPresent,
Message: ably.Message{
ID: "987:12:0",
Timestamp: 128,
ConnectionID: "987",
ClientID: "999",
},
}

presenceMsg3 := &ably.PresenceMessage{
Action: ably.PresenceActionPresent,
Message: ably.Message{
ID: "987:12:7",
Timestamp: 128,
ConnectionID: "987",
ClientID: "999",
},
}

syncMessage := &ably.ProtocolMessage{
Action: ably.ActionSync,
Channel: channel.Name,
ChannelSerial: "abcdefg:12",
Presence: []*ably.PresenceMessage{presenceMsg1, presenceMsg2, presenceMsg3},
}

assert.False(t, channel.Presence.SyncInitial())
assert.False(t, channel.Presence.SyncInProgress())
assert.True(t, channel.Presence.SyncComplete())

in <- syncMessage

ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf)
ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf)

assert.False(t, channel.Presence.SyncInitial())
assert.True(t, channel.Presence.SyncInProgress())
assert.False(t, channel.Presence.SyncComplete())

presenceMembers := channel.Presence.GetMembers()
assert.Equal(t, 1, len(presenceMembers))
})

t.Run("RTP18b: client determines sync ended with <sync sequence id>:", func(t *testing.T) {
in, _, _, channel, _, _, presenceMsgCh := setup(t)

initialMembers := channel.Presence.GetMembers()
assert.Empty(t, initialMembers)

presenceMsg1 := &ably.PresenceMessage{
Action: ably.PresenceActionPresent,
Message: ably.Message{
ID: "987:12:5",
Timestamp: 125,
ConnectionID: "987",
ClientID: "999",
},
}

syncMessage := &ably.ProtocolMessage{
Action: ably.ActionSync,
Channel: channel.Name,
ChannelSerial: "abcdefg:12",
Presence: []*ably.PresenceMessage{presenceMsg1},
}

assert.False(t, channel.Presence.SyncInitial())
assert.False(t, channel.Presence.SyncInProgress())
assert.True(t, channel.Presence.SyncComplete())

in <- syncMessage

ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf)

assert.False(t, channel.Presence.SyncInitial())
assert.True(t, channel.Presence.SyncInProgress())
assert.False(t, channel.Presence.SyncComplete())

presenceMembers := channel.Presence.GetMembers()
assert.Equal(t, 1, len(presenceMembers))

presenceMsg2 := &ably.PresenceMessage{
Action: ably.PresenceActionPresent,
Message: ably.Message{
ID: "987:12:12",
Timestamp: 128,
ConnectionID: "980",
ClientID: "999",
},
}

// RTP18b
syncMessage = &ably.ProtocolMessage{
Action: ably.ActionSync,
Channel: channel.Name,
ChannelSerial: "abcdefg:",
Presence: []*ably.PresenceMessage{presenceMsg2},
}
in <- syncMessage

ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf)

assert.False(t, channel.Presence.SyncInitial())
assert.False(t, channel.Presence.SyncInProgress())
assert.True(t, channel.Presence.SyncComplete())

presenceMembers = channel.Presence.GetMembers()
assert.Equal(t, 2, len(presenceMembers))
})

t.Run("RTP18: client determines sync started and ended with <sync sequence id>:", func(t *testing.T) {
in, _, _, channel, _, _, presenceMsgCh := setup(t)

initialMembers := channel.Presence.GetMembers()
assert.Empty(t, initialMembers)

presenceMsg1 := &ably.PresenceMessage{
Action: ably.PresenceActionPresent,
Message: ably.Message{
ID: "987:12:5",
Timestamp: 125,
ConnectionID: "987",
ClientID: "999",
},
}

presenceMsg2 := &ably.PresenceMessage{
Action: ably.PresenceActionPresent,
Message: ably.Message{
ID: "987:12:0",
Timestamp: 128,
ConnectionID: "987",
ClientID: "999",
},
}

presenceMsg3 := &ably.PresenceMessage{
Action: ably.PresenceActionPresent,
Message: ably.Message{
ID: "987:12:7",
Timestamp: 128,
ConnectionID: "987",
ClientID: "999",
},
}

syncMessage := &ably.ProtocolMessage{
Action: ably.ActionSync,
Channel: channel.Name,
ChannelSerial: "abcdefg:",
Presence: []*ably.PresenceMessage{presenceMsg1, presenceMsg2, presenceMsg3},
}

assert.False(t, channel.Presence.SyncInitial())
assert.False(t, channel.Presence.SyncInProgress())
assert.True(t, channel.Presence.SyncComplete())

in <- syncMessage

ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf)
ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf)

assert.False(t, channel.Presence.SyncInitial())
assert.False(t, channel.Presence.SyncInProgress())
assert.True(t, channel.Presence.SyncComplete())

presenceMembers := channel.Presence.GetMembers()
assert.Equal(t, 1, len(presenceMembers))
})
}

Expand Down

0 comments on commit 2c088ac

Please sign in to comment.