Skip to content

Commit 04d5cd0

Browse files
authored
Fix nack locking buffer writes (#261)
1 parent aa741aa commit 04d5cd0

File tree

5 files changed

+42
-32
lines changed

5 files changed

+42
-32
lines changed

pkg/buffer.go

+13-27
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package sfu
22

33
import (
44
"sync"
5+
"sync/atomic"
56
"time"
67

78
log "github.com/pion/ion-log"
@@ -99,8 +100,6 @@ func NewBuffer(track *webrtc.Track, o BufferOptions) *Buffer {
99100

100101
// push adds a RTP Packet, out of order, new packet may be arrived later
101102
func (b *Buffer) push(p *rtp.Packet) {
102-
b.mu.Lock()
103-
defer b.mu.Unlock()
104103
b.lastPacketTime = time.Now().UnixNano()
105104
b.totalByte += uint64(p.MarshalSize())
106105
if b.packetCount == 0 {
@@ -186,8 +185,11 @@ func (b *Buffer) buildReceptionReport() rtcp.ReceptionReport {
186185
fracLost = uint8((lostInterval << 8) / expectedInterval)
187186
}
188187
var dlsr uint32
189-
if b.lastSRRecv != 0 {
190-
delayMS := uint32((time.Now().UnixNano() - b.lastSRRecv) / 1e6)
188+
lastSRRecv := atomic.LoadInt64(&b.lastSRRecv)
189+
lastSRNTPTime := atomic.LoadUint64(&b.lastSRNTPTime)
190+
191+
if lastSRRecv != 0 {
192+
delayMS := uint32((time.Now().UnixNano() - lastSRRecv) / 1e6)
191193
dlsr = (delayMS / 1e3) << 16
192194
dlsr |= (delayMS % 1e3) * 65536 / 1000
193195
}
@@ -198,18 +200,16 @@ func (b *Buffer) buildReceptionReport() rtcp.ReceptionReport {
198200
TotalLost: lost,
199201
LastSequenceNumber: extMaxSeq,
200202
Jitter: uint32(b.jitter),
201-
LastSenderReport: uint32(b.lastSRNTPTime >> 16),
203+
LastSenderReport: uint32(lastSRNTPTime >> 16),
202204
Delay: dlsr,
203205
}
204206
return rr
205207
}
206208

207209
func (b *Buffer) setSenderReportData(rtpTime uint32, ntpTime uint64) {
208-
b.mu.Lock()
209-
defer b.mu.Unlock()
210-
b.lastSRRTPTime = rtpTime
211-
b.lastSRNTPTime = ntpTime
212-
b.lastSRRecv = time.Now().UnixNano()
210+
atomic.StoreUint32(&b.lastSRRTPTime, rtpTime)
211+
atomic.StoreUint64(&b.lastSRNTPTime, ntpTime)
212+
atomic.StoreInt64(&b.lastSRRecv, time.Now().UnixNano())
213213
}
214214

215215
func (b *Buffer) getRTCP() []rtcp.Packet {
@@ -226,25 +226,11 @@ func (b *Buffer) getRTCP() []rtcp.Packet {
226226
return pkts
227227
}
228228

229-
// WritePacket write buffer packet to requested track. and modify headers
230-
func (b *Buffer) WritePacket(sn uint16, track *webrtc.Track, snOffset uint16, tsOffset, ssrc uint32) error {
231-
b.mu.RLock()
232-
defer b.mu.RUnlock()
229+
func (b *Buffer) getPacket(sn uint16) (rtp.Header, []byte, error) {
233230
if bufferPkt := b.pktQueue.GetPacket(sn); bufferPkt != nil {
234-
bSsrc := bufferPkt.SSRC
235-
bPT := bufferPkt.PayloadType
236-
bufferPkt.PayloadType = track.PayloadType()
237-
bufferPkt.SequenceNumber -= snOffset
238-
bufferPkt.Timestamp -= tsOffset
239-
bufferPkt.SSRC = ssrc
240-
err := track.WriteRTP(bufferPkt)
241-
bufferPkt.PayloadType = bPT
242-
bufferPkt.Timestamp += tsOffset
243-
bufferPkt.SequenceNumber += snOffset
244-
bufferPkt.SSRC = bSsrc
245-
return err
231+
return bufferPkt.Header, bufferPkt.Payload, nil
246232
}
247-
return errPacketNotFound
233+
return rtp.Header{}, nil, errPacketNotFound
248234
}
249235

250236
func (b *Buffer) onLostHandler(fn func(nack *rtcp.TransportLayerNack)) {

pkg/packetqueue.go

+8
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package sfu
22

33
import (
4+
"sync"
5+
46
log "github.com/pion/ion-log"
57
"github.com/pion/rtcp"
68
"github.com/pion/rtp"
79
)
810

911
type queue struct {
12+
sync.Mutex
1013
pkts []*rtp.Packet
1114
ssrc uint32
1215
head int
@@ -19,6 +22,9 @@ type queue struct {
1922
}
2023

2124
func (q *queue) AddPacket(pkt *rtp.Packet, latest bool) {
25+
q.Lock()
26+
defer q.Unlock()
27+
2228
if !latest {
2329
q.set(int(q.headSN-pkt.SequenceNumber), pkt)
2430
return
@@ -44,6 +50,8 @@ func (q *queue) AddPacket(pkt *rtp.Packet, latest bool) {
4450
}
4551

4652
func (q *queue) GetPacket(sn uint16) *rtp.Packet {
53+
q.Lock()
54+
defer q.Unlock()
4755
return q.get(int(q.headSN - sn))
4856
}
4957

pkg/receiver.go

+19-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type Receiver interface {
2828
OnTransportWideCC(fn func(sn uint16, timeNS int64, marker bool))
2929
SendRTCP(p []rtcp.Packet)
3030
SetRTCPCh(ch chan []rtcp.Packet)
31-
WriteBufferedPacket(sn uint16, track *webrtc.Track, snOffset uint16, tsOffset, ssrc uint32) error
31+
WriteBufferedPacket(sn []uint16, track *webrtc.Track, snOffset uint16, tsOffset, ssrc uint32) error
3232
Close()
3333
}
3434

@@ -133,11 +133,27 @@ func (w *WebRTCReceiver) Track() *webrtc.Track {
133133
}
134134

135135
// WriteBufferedPacket writes buffered packet to track, return error if packet not found
136-
func (w *WebRTCReceiver) WriteBufferedPacket(sn uint16, track *webrtc.Track, snOffset uint16, tsOffset, ssrc uint32) error {
136+
func (w *WebRTCReceiver) WriteBufferedPacket(sn []uint16, track *webrtc.Track, snOffset uint16, tsOffset, ssrc uint32) error {
137137
if w.buffer == nil || w.ctx.Err() != nil {
138138
return nil
139139
}
140-
return w.buffer.WritePacket(sn, track, snOffset, tsOffset, ssrc)
140+
for _, seq := range sn {
141+
h, p, err := w.buffer.getPacket(seq + snOffset)
142+
if err != nil {
143+
continue
144+
}
145+
h.PayloadType = track.PayloadType()
146+
h.SequenceNumber -= snOffset
147+
h.Timestamp -= tsOffset
148+
h.SSRC = ssrc
149+
if err := track.WriteRTP(&rtp.Packet{
150+
Header: h,
151+
Payload: p,
152+
}); err != nil {
153+
return err
154+
}
155+
}
156+
return nil
141157
}
142158

143159
func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet) {

pkg/simplesender.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func (s *SimpleSender) receiveRTCP() {
222222
log.Tracef("sender got nack: %+v", pkt)
223223
for _, pair := range pkt.Nacks {
224224
if err := recv.WriteBufferedPacket(
225-
pair.PacketID,
225+
pair.PacketList(),
226226
s.track,
227227
s.snOffset,
228228
s.tsOffset,

pkg/simulcastsender.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func (s *SimulcastSender) receiveRTCP() {
306306
log.Tracef("sender got nack: %+v", pkt)
307307
for _, pair := range pkt.Nacks {
308308
if err := recv.WriteBufferedPacket(
309-
pair.PacketID,
309+
pair.PacketList(),
310310
s.track,
311311
s.snOffset,
312312
s.tsOffset,

0 commit comments

Comments
 (0)