From f81a67450406885f31b10b9b362ad2b5e494ca47 Mon Sep 17 00:00:00 2001 From: Yutaka Takeda Date: Thu, 10 Nov 2022 23:43:50 -0800 Subject: [PATCH] Fix race and lint errors Relates to #239 --- association.go | 19 ++++++++++--------- stream.go | 30 +++++++++++++++++------------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/association.go b/association.go index ae3827bb..bf8f21d1 100644 --- a/association.go +++ b/association.go @@ -2090,20 +2090,14 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1 dataLen := uint32(len(c.userData)) if dataLen == 0 { sisToReset = append(sisToReset, c.streamIdentifier) - err := a.pendingQueue.pop(c) - if err != nil { - a.log.Errorf("failed to pop from pending queue: %s", err.Error()) - } + a.popPendingDataChunksToDrop(c) continue } s, ok := a.streams[c.streamIdentifier] - if !ok || s.state > StreamStateOpen || s.version != c.streamVersion { - err := a.pendingQueue.pop(c) - if err != nil { - a.log.Errorf("failed to pop from pending queue: %s", err.Error()) - } + if !ok || s.State() > StreamStateOpen || s.version != c.streamVersion { + a.popPendingDataChunksToDrop(c) continue } @@ -2135,6 +2129,13 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1 return chunks, sisToReset } +func (a *Association) popPendingDataChunksToDrop(c *chunkPayloadData) { + err := a.pendingQueue.pop(c) + if err != nil { + a.log.Errorf("failed to pop from pending queue: %s", err.Error()) + } +} + // bundleDataChunksIntoPackets packs DATA chunks into packets. It tries to bundle // DATA chunks into a packet so long as the resulting packet size does not exceed // the path MTU. diff --git a/stream.go b/stream.go index 84e8cdc3..c1fbece3 100644 --- a/stream.go +++ b/stream.go @@ -24,7 +24,7 @@ const ( // StreamState is an enum for SCTP Stream state field // This field identifies the state of stream. -type StreamState int +type StreamState int32 // StreamState enums const ( @@ -340,17 +340,18 @@ func (s *Stream) Close() error { s.lock.Lock() defer s.lock.Unlock() - s.log.Debugf("[%s] Close: state=%s", s.name, s.state.String()) + state := s.State() + s.log.Debugf("[%s] Close: state=%s", s.name, state.String()) - switch s.state { + switch state { case StreamStateOpen: - s.state = StreamStateClosed + s.SetState(StreamStateClosed) s.log.Debugf("[%s] state change: open => closed", s.name) s.readErr = io.EOF s.readNotifier.Broadcast() return s.streamIdentifier, true case StreamStateClosing: - s.state = StreamStateClosed + s.SetState(StreamStateClosed) s.log.Debugf("[%s] state change: closing => closed", s.name) return s.streamIdentifier, true case StreamStateClosed: @@ -441,7 +442,8 @@ func (s *Stream) onInboundStreamReset() { s.lock.Lock() defer s.lock.Unlock() - s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, s.state.String()) + state := s.State() + s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, state.String()) // No more inbound data to read. Unblock the read with io.EOF. // This should cause DCEP layer (datachannel package) to call Close() which @@ -452,19 +454,21 @@ func (s *Stream) onInboundStreamReset() { // outgoing stream. When the peer sees that an incoming stream was // reset, it also resets its corresponding outgoing stream. Once this // is completed, the data channel is closed. - if s.state == StreamStateOpen { + if state == StreamStateOpen { s.log.Debugf("[%s] state change: open => closing", s.name) - s.state = StreamStateClosing + s.SetState(StreamStateClosing) } s.readErr = io.EOF s.readNotifier.Broadcast() - } -// State return the stream state. +// State atomically returns the stream state. func (s *Stream) State() StreamState { - s.lock.RLock() - defer s.lock.RUnlock() - return s.state + return StreamState(atomic.LoadInt32((*int32)(&s.state))) +} + +// SetState atomically sets the stream state. +func (s *Stream) SetState(newState StreamState) { + atomic.StoreInt32((*int32)(&s.state), int32(newState)) }