Skip to content

Commit ceaeb98

Browse files
committed
Add interceptor to aggregate CCFB reports
1 parent 8492094 commit ceaeb98

File tree

10 files changed

+1323
-0
lines changed

10 files changed

+1323
-0
lines changed

pkg/rtpfb/acknowledgement.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
2+
// SPDX-License-Identifier: MIT
3+
4+
package rtpfb
5+
6+
import (
7+
"time"
8+
9+
"github.com/pion/rtcp"
10+
)
11+
12+
type acknowledgement struct {
13+
sequenceNumber uint16
14+
arrived bool
15+
arrival time.Time
16+
ecn rtcp.ECN
17+
}

pkg/rtpfb/ccfb_receiver.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
2+
// SPDX-License-Identifier: MIT
3+
4+
package rtpfb
5+
6+
import (
7+
"time"
8+
9+
"github.com/pion/interceptor/internal/ntp"
10+
"github.com/pion/rtcp"
11+
)
12+
13+
func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) (time.Duration, map[uint32][]acknowledgement) {
14+
if feedback == nil {
15+
return 0, nil
16+
}
17+
result := map[uint32][]acknowledgement{}
18+
referenceTime := ntp.ToTime32(feedback.ReportTimestamp, ts)
19+
latestArrival := time.Time{}
20+
for _, rb := range feedback.ReportBlocks {
21+
var la time.Time
22+
la, result[rb.MediaSSRC] = convertMetricBlock(referenceTime, rb.BeginSequence, rb.MetricBlocks)
23+
if la.After(latestArrival) {
24+
latestArrival = la
25+
}
26+
}
27+
28+
return referenceTime.Sub(latestArrival), result
29+
}
30+
31+
func convertMetricBlock(
32+
reference time.Time,
33+
seqNrOffset uint16,
34+
blocks []rtcp.CCFeedbackMetricBlock,
35+
) (time.Time, []acknowledgement) {
36+
reports := make([]acknowledgement, len(blocks))
37+
latestArrival := time.Time{}
38+
for i, mb := range blocks {
39+
if mb.Received {
40+
arrival := time.Time{}
41+
42+
// RFC 8888 states: If the measurement is unavailable or if the
43+
// arrival time of the RTP packet is after the time represented by
44+
// the RTS field, then an ATO value of 0x1FFF MUST be reported for
45+
// the packet. In that case, we set a zero time.Time value.
46+
if mb.ArrivalTimeOffset != 0x1FFF {
47+
delta := time.Duration((float64(mb.ArrivalTimeOffset) / 1024.0) * float64(time.Second))
48+
arrival = reference.Add(-delta)
49+
if arrival.After(latestArrival) {
50+
latestArrival = arrival
51+
}
52+
}
53+
reports[i] = acknowledgement{
54+
sequenceNumber: seqNrOffset + uint16(i), // nolint:gosec
55+
arrived: true,
56+
arrival: arrival,
57+
ecn: mb.ECN,
58+
}
59+
} else {
60+
reports[i] = acknowledgement{
61+
sequenceNumber: seqNrOffset + uint16(i), // nolint:gosec
62+
arrived: false,
63+
arrival: time.Time{},
64+
ecn: 0,
65+
}
66+
}
67+
}
68+
69+
return latestArrival, reports
70+
}

pkg/rtpfb/ccfb_receiver_test.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
2+
// SPDX-License-Identifier: MIT
3+
4+
package rtpfb
5+
6+
import (
7+
"fmt"
8+
"testing"
9+
"time"
10+
11+
"github.com/pion/interceptor/internal/ntp"
12+
"github.com/pion/rtcp"
13+
"github.com/stretchr/testify/assert"
14+
)
15+
16+
func TestConvertCCFB(t *testing.T) {
17+
timeZero := time.Now()
18+
cases := []struct {
19+
ts time.Time
20+
feedback *rtcp.CCFeedbackReport
21+
expect map[uint32][]acknowledgement
22+
expectAckDelay time.Duration
23+
}{
24+
{},
25+
{
26+
ts: timeZero.Add(2 * time.Second),
27+
feedback: &rtcp.CCFeedbackReport{
28+
SenderSSRC: 1,
29+
ReportBlocks: []rtcp.CCFeedbackReportBlock{
30+
{
31+
MediaSSRC: 2,
32+
BeginSequence: 17,
33+
MetricBlocks: []rtcp.CCFeedbackMetricBlock{
34+
{
35+
Received: true,
36+
ECN: 0,
37+
ArrivalTimeOffset: 512,
38+
},
39+
},
40+
},
41+
},
42+
ReportTimestamp: ntp.ToNTP32(timeZero.Add(time.Second)),
43+
},
44+
expect: map[uint32][]acknowledgement{
45+
2: {
46+
{
47+
sequenceNumber: 17,
48+
arrived: true,
49+
arrival: timeZero.Add(500 * time.Millisecond),
50+
ecn: 0,
51+
},
52+
},
53+
},
54+
expectAckDelay: 500 * time.Millisecond,
55+
},
56+
}
57+
for i, tc := range cases {
58+
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
59+
ackDelay, res := convertCCFB(tc.ts, tc.feedback)
60+
61+
assert.Equal(t, tc.expectAckDelay, ackDelay)
62+
63+
// Can't directly check equality since arrival timestamp conversions
64+
// may be slightly off due to ntp conversions.
65+
assert.Equal(t, len(tc.expect), len(res))
66+
for i, acks := range tc.expect {
67+
for j, ack := range acks {
68+
assert.Equal(t, ack.sequenceNumber, res[i][j].sequenceNumber)
69+
assert.Equal(t, ack.arrived, res[i][j].arrived)
70+
assert.Equal(t, ack.ecn, res[i][j].ecn)
71+
assert.InDelta(t, ack.arrival.UnixNano(), res[i][j].arrival.UnixNano(), float64(time.Millisecond.Nanoseconds()))
72+
}
73+
}
74+
})
75+
}
76+
}
77+
78+
func TestConvertMetricBlock(t *testing.T) {
79+
cases := []struct {
80+
ts time.Time
81+
reference time.Time
82+
seqNrOffset uint16
83+
blocks []rtcp.CCFeedbackMetricBlock
84+
expected []acknowledgement
85+
expectedLatestArrival time.Time
86+
}{
87+
{
88+
ts: time.Time{},
89+
reference: time.Time{},
90+
seqNrOffset: 0,
91+
blocks: []rtcp.CCFeedbackMetricBlock{},
92+
expected: []acknowledgement{},
93+
},
94+
{
95+
ts: time.Time{}.Add(2 * time.Second),
96+
reference: time.Time{}.Add(time.Second),
97+
seqNrOffset: 3,
98+
blocks: []rtcp.CCFeedbackMetricBlock{
99+
{
100+
Received: true,
101+
ECN: 0,
102+
ArrivalTimeOffset: 512,
103+
},
104+
{
105+
Received: false,
106+
ECN: 0,
107+
ArrivalTimeOffset: 0,
108+
},
109+
{
110+
Received: true,
111+
ECN: 0,
112+
ArrivalTimeOffset: 0,
113+
},
114+
},
115+
expected: []acknowledgement{
116+
{
117+
sequenceNumber: 3,
118+
arrived: true,
119+
arrival: time.Time{}.Add(500 * time.Millisecond),
120+
ecn: 0,
121+
},
122+
{
123+
sequenceNumber: 4,
124+
arrived: false,
125+
arrival: time.Time{},
126+
ecn: 0,
127+
},
128+
{
129+
sequenceNumber: 5,
130+
arrived: true,
131+
arrival: time.Time{}.Add(time.Second),
132+
ecn: 0,
133+
},
134+
},
135+
expectedLatestArrival: time.Time{}.Add(time.Second),
136+
},
137+
{
138+
ts: time.Time{}.Add(2 * time.Second),
139+
reference: time.Time{}.Add(time.Second),
140+
seqNrOffset: 3,
141+
blocks: []rtcp.CCFeedbackMetricBlock{
142+
{
143+
Received: true,
144+
ECN: 0,
145+
ArrivalTimeOffset: 512,
146+
},
147+
{
148+
Received: false,
149+
ECN: 0,
150+
ArrivalTimeOffset: 0,
151+
},
152+
{
153+
Received: true,
154+
ECN: 0,
155+
ArrivalTimeOffset: 0,
156+
},
157+
{
158+
Received: true,
159+
ECN: 0,
160+
ArrivalTimeOffset: 0x1FFF,
161+
},
162+
},
163+
expected: []acknowledgement{
164+
{
165+
sequenceNumber: 3,
166+
arrived: true,
167+
arrival: time.Time{}.Add(500 * time.Millisecond),
168+
ecn: 0,
169+
},
170+
{
171+
sequenceNumber: 4,
172+
arrived: false,
173+
arrival: time.Time{},
174+
ecn: 0,
175+
},
176+
{
177+
sequenceNumber: 5,
178+
arrived: true,
179+
arrival: time.Time{}.Add(time.Second),
180+
ecn: 0,
181+
},
182+
{
183+
sequenceNumber: 6,
184+
arrived: true,
185+
arrival: time.Time{},
186+
ecn: 0,
187+
},
188+
},
189+
expectedLatestArrival: time.Time{}.Add(time.Second),
190+
},
191+
}
192+
193+
for i, tc := range cases {
194+
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
195+
ela, res := convertMetricBlock(tc.reference, tc.seqNrOffset, tc.blocks)
196+
assert.Equal(t, tc.expected, res)
197+
assert.Equal(t, tc.expectedLatestArrival, ela)
198+
})
199+
}
200+
}

0 commit comments

Comments
 (0)