From c1c01c818f41b9eb363f5531f14e00e95e21dcc0 Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 21 Sep 2023 22:07:42 +0530 Subject: [PATCH] webrtc: fix flaky TestReadWriteDeadlines/WebRTC/SetDeadline/Write --- p2p/transport/webrtc/stream.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/p2p/transport/webrtc/stream.go b/p2p/transport/webrtc/stream.go index 7e873f5634..f777f5b8fd 100644 --- a/p2p/transport/webrtc/stream.go +++ b/p2p/transport/webrtc/stream.go @@ -123,7 +123,23 @@ func newStream( } } - s.maybeDeclareStreamDone() + if s.isDone() { + // onDone removes the stream from the connection and requires the connection lock. + // This callback(onBufferedAmountLow) is executing in the sctp readLoop goroutine. + // If the connection is closed concurrently, the closing goroutine will acquire + // the connection lock and wait for sctp readLoop to exit, the sctp readLoop will + // wait for the connection lock before exiting, causing a deadlock. + // Run this in a different goroutine to avoid the deadlock. + go func() { + s.mx.Lock() + defer s.mx.Unlock() + // TODO: we should be closing the underlying datachannel, but this resets the stream + // See https://github.com/libp2p/specs/issues/575 for details. + // _ = s.dataChannel.Close() + // TODO: write for the spawned reader to return + s.onDone() + }() + } select { case s.writeAvailable <- struct{}{}: @@ -188,9 +204,7 @@ func (s *stream) processIncomingFlag(flag *pb.Message_Flag) { // this is used to force reset a stream func (s *stream) maybeDeclareStreamDone() { - if (s.sendState == sendStateReset || s.sendState == sendStateDataSent) && - (s.receiveState == receiveStateReset || s.receiveState == receiveStateDataRead) && - len(s.controlMsgQueue) == 0 { + if s.isDone() { _ = s.SetReadDeadline(time.Now().Add(-1 * time.Hour)) // pion ignores zero times // TODO: we should be closing the underlying datachannel, but this resets the stream // See https://github.com/libp2p/specs/issues/575 for details. @@ -200,6 +214,12 @@ func (s *stream) maybeDeclareStreamDone() { } } +func (s *stream) isDone() bool { + return (s.sendState == sendStateReset || s.sendState == sendStateDataSent) && + (s.receiveState == receiveStateReset || s.receiveState == receiveStateDataRead) && + len(s.controlMsgQueue) == 0 +} + func (s *stream) setCloseError(e error) { s.mx.Lock() defer s.mx.Unlock()