Skip to content

Commit

Permalink
Updated failing tests due to locking mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Nov 14, 2023
1 parent f4e5c8a commit 7d36865
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 30 deletions.
101 changes: 76 additions & 25 deletions ably/proto_presence_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,21 @@ func Test_PresenceMap_RTP2(t *testing.T) {
initialMembers := channel.Presence.GetMembers()
assert.Empty(t, initialMembers)

assert.False(t, channel.Presence.SyncInitial())
assert.True(t, channel.Presence.SyncComplete())

in <- &ably.ProtocolMessage{
Action: ably.ActionAttached,
Flags: ably.FlagHasPresence,
Channel: channel.Name,
}

ablytest.Instantly.Recv(t, nil, stateChanges, t.Fatalf)

assert.False(t, channel.Presence.SyncInitial())
assert.True(t, channel.Presence.SyncInProgress())
assert.False(t, channel.Presence.SyncComplete())

presenceMsg1 := &ably.PresenceMessage{
Action: ably.PresenceActionPresent,
Message: ably.Message{
Expand Down Expand Up @@ -438,20 +453,6 @@ func Test_PresenceMap_RTP2(t *testing.T) {
Presence: []*ably.PresenceMessage{presenceMsg1, presenceMsg2, presenceMsg3},
}

assert.False(t, channel.Presence.SyncInitial())
assert.True(t, channel.Presence.SyncComplete())

in <- &ably.ProtocolMessage{
Action: ably.ActionAttached,
Flags: ably.FlagHasPresence,
Channel: channel.Name,
}

ablytest.Instantly.Recv(t, nil, stateChanges, t.Fatalf)

assert.True(t, channel.Presence.SyncInitial())
assert.False(t, channel.Presence.SyncComplete())

in <- syncMessage

ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf)
Expand Down Expand Up @@ -602,11 +603,6 @@ func Test_PresenceMap_RTP2(t *testing.T) {
assert.Equal(t, ably.PresenceActionPresent, pm.Action)
}
})

t.Run("RTP2f: when presence msg with LEAVE action arrives, if sync in progress, store as absent and remove it later", func(t *testing.T) {

})

}

func Test_Presence_server_initiated_sync_RTP18(t *testing.T) {
Expand Down Expand Up @@ -947,7 +943,7 @@ func Test_internal_presencemap_RTP17(t *testing.T) {
assert.NoError(t, err)

channel = c.Channels.Get("test")
stateChanges = make(ably.ChannelStateChanges, 10)
stateChanges = make(ably.ChannelStateChanges, 20)
channel.OnAll(stateChanges.Receive)

in <- &ably.ProtocolMessage{
Expand All @@ -957,7 +953,7 @@ func Test_internal_presencemap_RTP17(t *testing.T) {

var change ably.ChannelStateChange

ablytest.Instantly.Recv(t, &change, stateChanges, t.Fatalf)
ablytest.Soon.Recv(t, &change, stateChanges, t.Fatalf)
assert.Equal(t, ably.ChannelStateAttached, change.Current,
"expected %v; got %v (event: %+v)", ably.ChannelStateAttached, change.Current)

Expand Down Expand Up @@ -1133,7 +1129,6 @@ func Test_internal_presencemap_RTP17(t *testing.T) {

internalMembers = channel.Presence.InternalMembers()
assert.Equal(t, 2, len(internalMembers))

})

t.Run("RTP17h: presencemap should be keyed by clientId", func(t *testing.T) {
Expand Down Expand Up @@ -1175,11 +1170,67 @@ func Test_internal_presencemap_RTP17(t *testing.T) {
}
})

t.Run("RTP17f, RTP17g: automatic re-entry whenever channel moves into ATTACHED state", func(t *testing.T) {
t.Run("RTP17f, RTP17g, RTP17e: automatic re-entry whenever channel moves into ATTACHED state", func(t *testing.T) {
in, _, client, channel, stateChanges, presenceMsgCh := setup(t)

})
presenceMsg1 := &ably.PresenceMessage{
Action: ably.PresenceActionPresent,
Message: ably.Message{
ID: "987:12:0",
Timestamp: 125,
ConnectionID: client.Connection.ID(),
ClientID: "999",
},
}

presenceMsg2 := &ably.PresenceMessage{
Action: ably.PresenceActionPresent,
Message: ably.Message{
ID: "987:12:1",
Timestamp: 128,
ConnectionID: client.Connection.ID(),
ClientID: "234",
},
}

presenceMsg3 := &ably.PresenceMessage{
Action: ably.PresenceActionPresent,
Message: ably.Message{
ID: "987:12:2",
Timestamp: 128,
ConnectionID: "3435",
ClientID: "345",
},
}

msg := &ably.ProtocolMessage{
Action: ably.ActionPresence,
Channel: channel.Name,
Presence: []*ably.PresenceMessage{presenceMsg1, presenceMsg2, presenceMsg3},
}

in <- msg
ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf)
ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf)
ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf)

members := channel.Presence.GetMembers()
assert.Equal(t, 3, len(members))

internalMembers := channel.Presence.InternalMembers()
assert.Equal(t, 2, len(internalMembers))

in <- &ably.ProtocolMessage{
Action: ably.ActionAttached,
Flags: ably.FlagResumed,
Channel: channel.Name,
}

t.Run("RTP17e: publish error if automatic re-enter failed", func(t *testing.T) {
var chanChange ably.ChannelStateChange
ablytest.Instantly.Recv(t, &chanChange, stateChanges, t.Fatalf)

// Enter from internal map
// ablytest.Soon.Recv(t, nil, out, t.Fatalf)
// ablytest.Instantly.Recv(t, nil, out, t.Fatalf)
})
}
8 changes: 4 additions & 4 deletions ably/realtime_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,15 +1003,15 @@ func (c *RealtimeChannel) lockSetAttachResume(state ChannelState) {
}
}

func (channel *RealtimeChannel) emitErrorUpdate(err *ErrorInfo, resumed bool) {
func (c *RealtimeChannel) emitErrorUpdate(err *ErrorInfo, resumed bool) {
change := ChannelStateChange{
Current: channel.state,
Previous: channel.state,
Current: c.state,
Previous: c.state,
Reason: err,
Resumed: resumed,
Event: ChannelEventUpdate,
}
channel.emitter.Emit(change.Event, change)
c.emitter.Emit(change.Event, change)
}

func (c *RealtimeChannel) lockSetState(state ChannelState, err error, resumed bool) error {
Expand Down
2 changes: 1 addition & 1 deletion ably/realtime_presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (pres *RealtimePresence) onAttach(msg *protocolMessage, isAttachWithoutMess
pres.queue.Flush() // RTP5b
// RTP17f
if isAttachWithoutMessageLoss {
pres.enterMembersFromInternalPresenceMap()
go pres.enterMembersFromInternalPresenceMap()
}
}

Expand Down

0 comments on commit 7d36865

Please sign in to comment.