Skip to content

Commit

Permalink
fix(buffer): fix nack request (#336)
Browse files Browse the repository at this point in the history
  • Loading branch information
OrlandoCo authored Dec 11, 2020
1 parent 2c0b6c7 commit b3f1f2f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
4 changes: 4 additions & 0 deletions pkg/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,7 @@ func (b *Buffer) onTransportWideCC(fn func(sn uint16, timeNS int64, marker bool)
func (b *Buffer) onFeedback(fn func(fb []rtcp.Packet)) {
b.feedbackCB = fn
}

func (b *Buffer) onNack(fn func(fb *rtcp.TransportLayerNack)) {
b.pktQueue.onLost = fn
}
17 changes: 15 additions & 2 deletions pkg/buffer/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"sync"
"sync/atomic"

log "github.com/pion/ion-log"

"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/rtp"
Expand Down Expand Up @@ -84,7 +86,9 @@ func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.

func (i *Interceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
i.twcc.onFeedback = func(pkts []rtcp.Packet) {
writer.Write(pkts, nil)
if _, err := writer.Write(pkts, nil); err != nil {
log.Errorf("Writing buffer twcc rtcp err: %v", err)
}
}
i.rtcpWriter.Store(writer)
return writer
Expand Down Expand Up @@ -127,12 +131,21 @@ func (i *Interceptor) newBuffer(info *interceptor.StreamInfo) *Buffer {
buffer := NewBuffer(info, Options{})
buffer.onFeedback(func(pkts []rtcp.Packet) {
if p, ok := i.rtcpWriter.Load().(interceptor.RTCPWriter); ok {
p.Write(pkts, nil)
if _, err := p.Write(pkts, nil); err != nil {
log.Errorf("Writing buffer rtcp err: %v", err)
}
}
})
buffer.onTransportWideCC(func(sn uint16, timeNS int64, marker bool) {
i.twcc.push(sn, timeNS, marker)
})
buffer.onNack(func(fb *rtcp.TransportLayerNack) {
if p, ok := i.rtcpWriter.Load().(interceptor.RTCPWriter); ok {
if _, err := p.Write([]rtcp.Packet{fb}, nil); err != nil {
log.Errorf("Writing buffer rtcp err: %v", err)
}
}
})
i.Lock()
i.buffers = append(i.buffers, buffer)
i.twcc.mSSRC = info.SSRC
Expand Down
4 changes: 2 additions & 2 deletions pkg/sfu/nack.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func newNACKList() *nackList {
}
}

func (n *nackList) getNACKSeqNo(seqno []uint16) []uint16 {
func (n *nackList) getNACKSeqNo(seqNo []uint16) []uint16 {
packets := make([]uint16, 0, 17)
for _, sn := range seqno {
for _, sn := range seqNo {
if nack, ok := n.nacks[sn]; !ok {
n.nacks[sn] = n.ll.PushBack(NACK{sn, time.Now().UnixNano()})
packets = append(packets, sn)
Expand Down
14 changes: 13 additions & 1 deletion pkg/sfu/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sfu

import (
"io"
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -173,6 +174,7 @@ func (s *Subscriber) Close() error {
func (s *Subscriber) downTracksReports() {
for {
time.Sleep(5 * time.Second)

var r []rtcp.Packet
var sd []rtcp.SourceDescriptionChunk
s.RLock()
Expand Down Expand Up @@ -210,14 +212,24 @@ func (s *Subscriber) downTracksReports() {
}
}
s.RUnlock()
if len(r) > 0 {
i := math.Ceil(float64(len(sd)) / float64(20))
j := 0
for i > 0 {
if i > 1 {
sd = sd[j*20 : (j+1)*20-1]
} else {
sd = sd[j*20 : cap(sd)]
}
r = append(r, &rtcp.SourceDescription{Chunks: sd})
if err := s.pc.WriteRTCP(r); err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
log.Errorf("Sending downtrack reports err: %v", err)
}
r = r[:0]
i--
j++
}
}
}
Expand Down

0 comments on commit b3f1f2f

Please sign in to comment.