Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pkg/rtpfb/acknowledgement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package rtpfb

import (
"time"

"github.com/pion/rtcp"
)

type acknowledgement struct {
sequenceNumber uint16
arrived bool
arrival time.Time
ecn rtcp.ECN
}
70 changes: 70 additions & 0 deletions pkg/rtpfb/ccfb_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package rtpfb

import (
"time"

"github.com/pion/interceptor/internal/ntp"
"github.com/pion/rtcp"
)

func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) (time.Duration, map[uint32][]acknowledgement) {
if feedback == nil {
return 0, nil
}
result := map[uint32][]acknowledgement{}
referenceTime := ntp.ToTime32(feedback.ReportTimestamp, ts)
latestArrival := time.Time{}
for _, rb := range feedback.ReportBlocks {
var la time.Time
la, result[rb.MediaSSRC] = convertMetricBlock(referenceTime, rb.BeginSequence, rb.MetricBlocks)
if la.After(latestArrival) {
latestArrival = la
}
}

return referenceTime.Sub(latestArrival), result
}

func convertMetricBlock(
reference time.Time,
seqNrOffset uint16,
blocks []rtcp.CCFeedbackMetricBlock,
) (time.Time, []acknowledgement) {
reports := make([]acknowledgement, len(blocks))
latestArrival := time.Time{}
for i, mb := range blocks {
if mb.Received {
arrival := time.Time{}

// RFC 8888 states: If the measurement is unavailable or if the
// arrival time of the RTP packet is after the time represented by
// the RTS field, then an ATO value of 0x1FFF MUST be reported for
// the packet. In that case, we set a zero time.Time value.
if mb.ArrivalTimeOffset != 0x1FFF {
delta := time.Duration((float64(mb.ArrivalTimeOffset) / 1024.0) * float64(time.Second))
arrival = reference.Add(-delta)
if arrival.After(latestArrival) {
latestArrival = arrival
}
}
reports[i] = acknowledgement{
sequenceNumber: seqNrOffset + uint16(i), // nolint:gosec
arrived: true,
arrival: arrival,
ecn: mb.ECN,
}
} else {
reports[i] = acknowledgement{
sequenceNumber: seqNrOffset + uint16(i), // nolint:gosec
arrived: false,
arrival: time.Time{},
ecn: 0,
}
}
}

return latestArrival, reports
}
200 changes: 200 additions & 0 deletions pkg/rtpfb/ccfb_receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package rtpfb

import (
"fmt"
"testing"
"time"

"github.com/pion/interceptor/internal/ntp"
"github.com/pion/rtcp"
"github.com/stretchr/testify/assert"
)

func TestConvertCCFB(t *testing.T) {
timeZero := time.Now()
cases := []struct {
ts time.Time
feedback *rtcp.CCFeedbackReport
expect map[uint32][]acknowledgement
expectAckDelay time.Duration
}{
{},
{
ts: timeZero.Add(2 * time.Second),
feedback: &rtcp.CCFeedbackReport{
SenderSSRC: 1,
ReportBlocks: []rtcp.CCFeedbackReportBlock{
{
MediaSSRC: 2,
BeginSequence: 17,
MetricBlocks: []rtcp.CCFeedbackMetricBlock{
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 512,
},
},
},
},
ReportTimestamp: ntp.ToNTP32(timeZero.Add(time.Second)),
},
expect: map[uint32][]acknowledgement{
2: {
{
sequenceNumber: 17,
arrived: true,
arrival: timeZero.Add(500 * time.Millisecond),
ecn: 0,
},
},
},
expectAckDelay: 500 * time.Millisecond,
},
}
for i, tc := range cases {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
ackDelay, res := convertCCFB(tc.ts, tc.feedback)

assert.Equal(t, tc.expectAckDelay, ackDelay)

// Can't directly check equality since arrival timestamp conversions
// may be slightly off due to ntp conversions.
assert.Equal(t, len(tc.expect), len(res))
for i, acks := range tc.expect {
for j, ack := range acks {
assert.Equal(t, ack.sequenceNumber, res[i][j].sequenceNumber)
assert.Equal(t, ack.arrived, res[i][j].arrived)
assert.Equal(t, ack.ecn, res[i][j].ecn)
assert.InDelta(t, ack.arrival.UnixNano(), res[i][j].arrival.UnixNano(), float64(time.Millisecond.Nanoseconds()))
}
}
})
}
}

func TestConvertMetricBlock(t *testing.T) {
cases := []struct {
ts time.Time
reference time.Time
seqNrOffset uint16
blocks []rtcp.CCFeedbackMetricBlock
expected []acknowledgement
expectedLatestArrival time.Time
}{
{
ts: time.Time{},
reference: time.Time{},
seqNrOffset: 0,
blocks: []rtcp.CCFeedbackMetricBlock{},
expected: []acknowledgement{},
},
{
ts: time.Time{}.Add(2 * time.Second),
reference: time.Time{}.Add(time.Second),
seqNrOffset: 3,
blocks: []rtcp.CCFeedbackMetricBlock{
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 512,
},
{
Received: false,
ECN: 0,
ArrivalTimeOffset: 0,
},
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 0,
},
},
expected: []acknowledgement{
{
sequenceNumber: 3,
arrived: true,
arrival: time.Time{}.Add(500 * time.Millisecond),
ecn: 0,
},
{
sequenceNumber: 4,
arrived: false,
arrival: time.Time{},
ecn: 0,
},
{
sequenceNumber: 5,
arrived: true,
arrival: time.Time{}.Add(time.Second),
ecn: 0,
},
},
expectedLatestArrival: time.Time{}.Add(time.Second),
},
{
ts: time.Time{}.Add(2 * time.Second),
reference: time.Time{}.Add(time.Second),
seqNrOffset: 3,
blocks: []rtcp.CCFeedbackMetricBlock{
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 512,
},
{
Received: false,
ECN: 0,
ArrivalTimeOffset: 0,
},
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 0,
},
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 0x1FFF,
},
},
expected: []acknowledgement{
{
sequenceNumber: 3,
arrived: true,
arrival: time.Time{}.Add(500 * time.Millisecond),
ecn: 0,
},
{
sequenceNumber: 4,
arrived: false,
arrival: time.Time{},
ecn: 0,
},
{
sequenceNumber: 5,
arrived: true,
arrival: time.Time{}.Add(time.Second),
ecn: 0,
},
{
sequenceNumber: 6,
arrived: true,
arrival: time.Time{},
ecn: 0,
},
},
expectedLatestArrival: time.Time{}.Add(time.Second),
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
ela, res := convertMetricBlock(tc.reference, tc.seqNrOffset, tc.blocks)
assert.Equal(t, tc.expected, res)
assert.Equal(t, tc.expectedLatestArrival, ela)
})
}
}
Loading
Loading