From 7d3686594756cf8923363aadeea1f91deb475591 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 14 Nov 2023 08:49:47 +0530 Subject: [PATCH] Updated failing tests due to locking mechanism --- ably/proto_presence_message_test.go | 101 +++++++++++++++++++++------- ably/realtime_channel.go | 8 +-- ably/realtime_presence.go | 2 +- 3 files changed, 81 insertions(+), 30 deletions(-) diff --git a/ably/proto_presence_message_test.go b/ably/proto_presence_message_test.go index dd3e5761..03ade9aa 100644 --- a/ably/proto_presence_message_test.go +++ b/ably/proto_presence_message_test.go @@ -401,6 +401,21 @@ func Test_PresenceMap_RTP2(t *testing.T) { initialMembers := channel.Presence.GetMembers() assert.Empty(t, initialMembers) + assert.False(t, channel.Presence.SyncInitial()) + assert.True(t, channel.Presence.SyncComplete()) + + in <- &ably.ProtocolMessage{ + Action: ably.ActionAttached, + Flags: ably.FlagHasPresence, + Channel: channel.Name, + } + + ablytest.Instantly.Recv(t, nil, stateChanges, t.Fatalf) + + assert.False(t, channel.Presence.SyncInitial()) + assert.True(t, channel.Presence.SyncInProgress()) + assert.False(t, channel.Presence.SyncComplete()) + presenceMsg1 := &ably.PresenceMessage{ Action: ably.PresenceActionPresent, Message: ably.Message{ @@ -438,20 +453,6 @@ func Test_PresenceMap_RTP2(t *testing.T) { Presence: []*ably.PresenceMessage{presenceMsg1, presenceMsg2, presenceMsg3}, } - assert.False(t, channel.Presence.SyncInitial()) - assert.True(t, channel.Presence.SyncComplete()) - - in <- &ably.ProtocolMessage{ - Action: ably.ActionAttached, - Flags: ably.FlagHasPresence, - Channel: channel.Name, - } - - ablytest.Instantly.Recv(t, nil, stateChanges, t.Fatalf) - - assert.True(t, channel.Presence.SyncInitial()) - assert.False(t, channel.Presence.SyncComplete()) - in <- syncMessage ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) @@ -602,11 +603,6 @@ func Test_PresenceMap_RTP2(t *testing.T) { assert.Equal(t, ably.PresenceActionPresent, pm.Action) } }) - - t.Run("RTP2f: when presence msg with LEAVE action arrives, if sync in progress, store as absent and remove it later", func(t *testing.T) { - - }) - } func Test_Presence_server_initiated_sync_RTP18(t *testing.T) { @@ -947,7 +943,7 @@ func Test_internal_presencemap_RTP17(t *testing.T) { assert.NoError(t, err) channel = c.Channels.Get("test") - stateChanges = make(ably.ChannelStateChanges, 10) + stateChanges = make(ably.ChannelStateChanges, 20) channel.OnAll(stateChanges.Receive) in <- &ably.ProtocolMessage{ @@ -957,7 +953,7 @@ func Test_internal_presencemap_RTP17(t *testing.T) { var change ably.ChannelStateChange - ablytest.Instantly.Recv(t, &change, stateChanges, t.Fatalf) + ablytest.Soon.Recv(t, &change, stateChanges, t.Fatalf) assert.Equal(t, ably.ChannelStateAttached, change.Current, "expected %v; got %v (event: %+v)", ably.ChannelStateAttached, change.Current) @@ -1133,7 +1129,6 @@ func Test_internal_presencemap_RTP17(t *testing.T) { internalMembers = channel.Presence.InternalMembers() assert.Equal(t, 2, len(internalMembers)) - }) t.Run("RTP17h: presencemap should be keyed by clientId", func(t *testing.T) { @@ -1175,11 +1170,67 @@ func Test_internal_presencemap_RTP17(t *testing.T) { } }) - t.Run("RTP17f, RTP17g: automatic re-entry whenever channel moves into ATTACHED state", func(t *testing.T) { + t.Run("RTP17f, RTP17g, RTP17e: automatic re-entry whenever channel moves into ATTACHED state", func(t *testing.T) { + in, _, client, channel, stateChanges, presenceMsgCh := setup(t) - }) + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:0", + Timestamp: 125, + ConnectionID: client.Connection.ID(), + ClientID: "999", + }, + } + + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:1", + Timestamp: 128, + ConnectionID: client.Connection.ID(), + ClientID: "234", + }, + } + + presenceMsg3 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:2", + Timestamp: 128, + ConnectionID: "3435", + ClientID: "345", + }, + } + + msg := &ably.ProtocolMessage{ + Action: ably.ActionPresence, + Channel: channel.Name, + Presence: []*ably.PresenceMessage{presenceMsg1, presenceMsg2, presenceMsg3}, + } + + in <- msg + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + + members := channel.Presence.GetMembers() + assert.Equal(t, 3, len(members)) + + internalMembers := channel.Presence.InternalMembers() + assert.Equal(t, 2, len(internalMembers)) + + in <- &ably.ProtocolMessage{ + Action: ably.ActionAttached, + Flags: ably.FlagResumed, + Channel: channel.Name, + } - t.Run("RTP17e: publish error if automatic re-enter failed", func(t *testing.T) { + var chanChange ably.ChannelStateChange + ablytest.Instantly.Recv(t, &chanChange, stateChanges, t.Fatalf) + // Enter from internal map + // ablytest.Soon.Recv(t, nil, out, t.Fatalf) + // ablytest.Instantly.Recv(t, nil, out, t.Fatalf) }) } diff --git a/ably/realtime_channel.go b/ably/realtime_channel.go index dcd8cc8a..513b81fc 100644 --- a/ably/realtime_channel.go +++ b/ably/realtime_channel.go @@ -1003,15 +1003,15 @@ func (c *RealtimeChannel) lockSetAttachResume(state ChannelState) { } } -func (channel *RealtimeChannel) emitErrorUpdate(err *ErrorInfo, resumed bool) { +func (c *RealtimeChannel) emitErrorUpdate(err *ErrorInfo, resumed bool) { change := ChannelStateChange{ - Current: channel.state, - Previous: channel.state, + Current: c.state, + Previous: c.state, Reason: err, Resumed: resumed, Event: ChannelEventUpdate, } - channel.emitter.Emit(change.Event, change) + c.emitter.Emit(change.Event, change) } func (c *RealtimeChannel) lockSetState(state ChannelState, err error, resumed bool) error { diff --git a/ably/realtime_presence.go b/ably/realtime_presence.go index fe567d4a..b90b3985 100644 --- a/ably/realtime_presence.go +++ b/ably/realtime_presence.go @@ -173,7 +173,7 @@ func (pres *RealtimePresence) onAttach(msg *protocolMessage, isAttachWithoutMess pres.queue.Flush() // RTP5b // RTP17f if isAttachWithoutMessageLoss { - pres.enterMembersFromInternalPresenceMap() + go pres.enterMembersFromInternalPresenceMap() } }