From 075982d496ee9e6d6114bbd0598fcea74c501243 Mon Sep 17 00:00:00 2001 From: Simon Esposito Date: Wed, 13 Aug 2025 13:28:57 +0100 Subject: [PATCH 1/5] Gracefully handle socket closure. --- server/session_ws.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/server/session_ws.go b/server/session_ws.go index 525cc742f..c37c46f30 100644 --- a/server/session_ws.go +++ b/server/session_ws.go @@ -188,6 +188,8 @@ func (s *sessionWS) Consume() { s.maybeResetPingTimer() return nil }) + // Disable the close handler so that the server can handle the close message itself. + s.conn.SetCloseHandler(func(code int, text string) error { return nil }) // Start a routine to process outbound messages. go s.processOutgoing() @@ -210,6 +212,7 @@ IncomingLoop: } break } + if messageType != s.wsMessageType { // Expected text but received binary, or expected binary but received text. // Disconnect client if it attempts to use this kind of mixed protocol mode. @@ -514,12 +517,31 @@ func (s *sessionWS) Close(msg string, reason runtime.PresenceReason, envelopes . // This may not be possible if the socket was already fully closed by an error. s.logger.Debug("Could not send close message", zap.Error(err)) } + if msg != "" { + // Server initiated close, await for client close response. + if err := s.conn.SetReadDeadline(time.Now().Add(s.pongWaitDuration)); err != nil { + s.logger.Warn("Failed to set read deadline", zap.Error(err)) + } else { + for { + msgType, _, readErr := s.conn.ReadMessage() + if readErr != nil { + // If any error occurs, close message likely won't be delivered, just close the socket. + break + } + if msgType == websocket.CloseMessage { + // We only care about the close message at this point, if the server initiated a close then something went wrong. + break + } + } + } + } + // Close WebSocket. if err := s.conn.Close(); err != nil { s.logger.Debug("Could not close", zap.Error(err)) } - s.logger.Info("Closed client connection") + s.logger.Debug("Closed client connection") // Fire an event for session end. if fn := s.runtime.EventSessionEnd(); fn != nil { From 8964d899b427c713621c12b5fcbe94b688c724ae Mon Sep 17 00:00:00 2001 From: Simon Esposito Date: Wed, 20 Aug 2025 16:59:19 +0100 Subject: [PATCH 2/5] Improve socket disconnect debug logging. --- server/session_ws.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/session_ws.go b/server/session_ws.go index c37c46f30..7e76bd54e 100644 --- a/server/session_ws.go +++ b/server/session_ws.go @@ -541,7 +541,11 @@ func (s *sessionWS) Close(msg string, reason runtime.PresenceReason, envelopes . s.logger.Debug("Could not close", zap.Error(err)) } - s.logger.Debug("Closed client connection") + if msg != "" { + s.logger.Debug("Closed client connection", zap.String("reason", msg)) + } else { + s.logger.Debug("Closed client connection") + } // Fire an event for session end. if fn := s.runtime.EventSessionEnd(); fn != nil { From 7e6fa575fcc6598a280b9c12a51ffc9c66c172fb Mon Sep 17 00:00:00 2001 From: Simon Esposito Date: Wed, 20 Aug 2025 17:41:06 +0100 Subject: [PATCH 3/5] Add timeout to await for close message. --- server/session_ws.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/server/session_ws.go b/server/session_ws.go index 7e76bd54e..705eebf2b 100644 --- a/server/session_ws.go +++ b/server/session_ws.go @@ -519,19 +519,23 @@ func (s *sessionWS) Close(msg string, reason runtime.PresenceReason, envelopes . } if msg != "" { // Server initiated close, await for client close response. - if err := s.conn.SetReadDeadline(time.Now().Add(s.pongWaitDuration)); err != nil { - s.logger.Warn("Failed to set read deadline", zap.Error(err)) - } else { - for { - msgType, _, readErr := s.conn.ReadMessage() - if readErr != nil { - // If any error occurs, close message likely won't be delivered, just close the socket. - break - } - if msgType == websocket.CloseMessage { - // We only care about the close message at this point, if the server initiated a close then something went wrong. - break - } + t := time.NewTimer(10 * time.Second) + defer t.Stop() + closeMessageWait: + for { + msgType, _, readErr := s.conn.ReadMessage() + if readErr != nil || msgType == websocket.CloseMessage { + // The server initiated close, so something went wrong. Thus, we only care about the close message at this point. + // If any error occurs, close message likely won't be delivered, just close the socket. + break + } + + select { + case <-t.C: + // If the client doesn't respond within 10 seconds, close the connection anyway. + break closeMessageWait + default: + // Otherwise, continue reading messages until we get a close message or timeout. } } } From b82a816691c744ac9ba5e9f8bfb77563e4a76d3a Mon Sep 17 00:00:00 2001 From: Simon Esposito Date: Tue, 26 Aug 2025 20:07:07 +0100 Subject: [PATCH 4/5] Rework logic --- server/session_ws.go | 47 +++++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/server/session_ws.go b/server/session_ws.go index 705eebf2b..8b42bafa3 100644 --- a/server/session_ws.go +++ b/server/session_ws.go @@ -67,6 +67,8 @@ type sessionWS struct { runtime *Runtime stopped bool + closeSent *atomic.Bool + closeWaitCh chan struct{} conn *websocket.Conn receivedMessageCounter int pingTimer *time.Timer @@ -188,8 +190,18 @@ func (s *sessionWS) Consume() { s.maybeResetPingTimer() return nil }) + s.closeSent = atomic.NewBool(false) + s.closeWaitCh = make(chan struct{}, 1) // Disable the close handler so that the server can handle the close message itself. - s.conn.SetCloseHandler(func(code int, text string) error { return nil }) + s.conn.SetCloseHandler(func(code int, text string) error { + if !s.closeSent.Load() { + message := websocket.FormatCloseMessage(code, "") + _ = s.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(s.pongWaitDuration)) + } else { + s.closeWaitCh <- struct{}{} + } + return nil + }) // Start a routine to process outbound messages. go s.processOutgoing() @@ -512,30 +524,21 @@ func (s *sessionWS) Close(msg string, reason runtime.PresenceReason, envelopes . s.Unlock() } - // Send close message. - if err := s.conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(s.writeWaitDuration)); err != nil { - // This may not be possible if the socket was already fully closed by an error. - s.logger.Debug("Could not send close message", zap.Error(err)) - } if msg != "" { - // Server initiated close, await for client close response. - t := time.NewTimer(10 * time.Second) - defer t.Stop() - closeMessageWait: - for { - msgType, _, readErr := s.conn.ReadMessage() - if readErr != nil || msgType == websocket.CloseMessage { - // The server initiated close, so something went wrong. Thus, we only care about the close message at this point. - // If any error occurs, close message likely won't be delivered, just close the socket. - break - } - + // Server initiated close, attempt to send a close control message. + reasonMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, msg) + if err := s.conn.WriteControl(websocket.CloseMessage, reasonMsg, time.Now().Add(s.writeWaitDuration)); err != nil { + // This may not be possible if the socket was already fully closed by an error. + s.logger.Debug("Could not send close message", zap.Error(err)) + } else { + s.closeSent.Store(true) + t := time.NewTimer(10 * time.Second) + defer t.Stop() select { + case <-s.closeWaitCh: + s.logger.Debug("socket close ack received") case <-t.C: - // If the client doesn't respond within 10 seconds, close the connection anyway. - break closeMessageWait - default: - // Otherwise, continue reading messages until we get a close message or timeout. + s.logger.Debug("socket close ack not received within 10 seconds") } } } From 9bf3fd6ba3803a7e12e6f4f00bf92ac32d9e456c Mon Sep 17 00:00:00 2001 From: Andrei Mihu Date: Wed, 27 Aug 2025 10:52:17 +0100 Subject: [PATCH 5/5] Use compare and swap. --- server/session_ws.go | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/server/session_ws.go b/server/session_ws.go index 8b42bafa3..10f4a8040 100644 --- a/server/session_ws.go +++ b/server/session_ws.go @@ -67,7 +67,7 @@ type sessionWS struct { runtime *Runtime stopped bool - closeSent *atomic.Bool + closeSentCAS *atomic.Uint32 closeWaitCh chan struct{} conn *websocket.Conn receivedMessageCounter int @@ -122,6 +122,8 @@ func NewSessionWS(logger *zap.Logger, config Config, format SessionFormat, sessi runtime: runtime, stopped: false, + closeSentCAS: atomic.NewUint32(0), + closeWaitCh: make(chan struct{}), conn: conn, receivedMessageCounter: config.GetSocket().PingBackoffThreshold, pingTimer: time.NewTimer(time.Duration(config.GetSocket().PingPeriodMs) * time.Millisecond), @@ -190,15 +192,12 @@ func (s *sessionWS) Consume() { s.maybeResetPingTimer() return nil }) - s.closeSent = atomic.NewBool(false) - s.closeWaitCh = make(chan struct{}, 1) - // Disable the close handler so that the server can handle the close message itself. s.conn.SetCloseHandler(func(code int, text string) error { - if !s.closeSent.Load() { + if s.closeSentCAS.CompareAndSwap(0, 1) { message := websocket.FormatCloseMessage(code, "") _ = s.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(s.pongWaitDuration)) } else { - s.closeWaitCh <- struct{}{} + close(s.closeWaitCh) } return nil }) @@ -525,20 +524,21 @@ func (s *sessionWS) Close(msg string, reason runtime.PresenceReason, envelopes . } if msg != "" { - // Server initiated close, attempt to send a close control message. - reasonMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, msg) - if err := s.conn.WriteControl(websocket.CloseMessage, reasonMsg, time.Now().Add(s.writeWaitDuration)); err != nil { - // This may not be possible if the socket was already fully closed by an error. - s.logger.Debug("Could not send close message", zap.Error(err)) - } else { - s.closeSent.Store(true) - t := time.NewTimer(10 * time.Second) - defer t.Stop() - select { - case <-s.closeWaitCh: - s.logger.Debug("socket close ack received") - case <-t.C: - s.logger.Debug("socket close ack not received within 10 seconds") + if s.closeSentCAS.CompareAndSwap(0, 1) { + // Server initiated close, attempt to send a close control message. + reasonMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, msg) + if err := s.conn.WriteControl(websocket.CloseMessage, reasonMsg, time.Now().Add(s.writeWaitDuration)); err != nil { + // This may not be possible if the socket was already fully closed by an error. + s.logger.Debug("Could not send close message", zap.Error(err)) + } else { + t := time.NewTimer(10 * time.Second) + defer t.Stop() + select { + case <-s.closeWaitCh: + s.logger.Debug("socket close ack received") + case <-t.C: + s.logger.Debug("socket close ack not received within 10 seconds") + } } } }