From 36472c6290cb6c1f0512aa44554f40edc8685c1c Mon Sep 17 00:00:00 2001 From: ARJUN SHAJI Date: Thu, 13 Mar 2025 18:47:35 +0530 Subject: [PATCH] Implement RTPProcessor The tight coupling between RTPReader within interceptors prevents RTX packets from passing through the interceptors of the corresponding RTP track. --- chain.go | 6 +-- examples/nack/main.go | 18 ++++++-- interceptor.go | 16 ++++++- internal/test/mock_stream.go | 59 +++++++++++++++--------- noop.go | 4 +- pkg/gcc/send_side_bwe_test.go | 15 ++---- pkg/intervalpli/generator_interceptor.go | 8 ++-- pkg/jitterbuffer/receiver_interceptor.go | 9 ++-- pkg/mock/interceptor.go | 20 ++++++-- pkg/mock/interceptor_test.go | 8 ++-- pkg/nack/generator_interceptor.go | 10 ++-- pkg/packetdump/receiver_interceptor.go | 10 ++-- pkg/report/receiver_interceptor.go | 8 ++-- pkg/rfc8888/interceptor.go | 8 ++-- pkg/stats/interceptor.go | 10 ++-- pkg/twcc/sender_interceptor.go | 12 ++--- 16 files changed, 130 insertions(+), 91 deletions(-) diff --git a/chain.go b/chain.go index 62c0aeb1..911f4fbf 100644 --- a/chain.go +++ b/chain.go @@ -53,12 +53,12 @@ func (i *Chain) UnbindLocalStream(ctx *StreamInfo) { // BindRemoteStream lets you modify any incoming RTP packets. // It is called once for per RemoteStream. The returned method // will be called once per rtp packet. -func (i *Chain) BindRemoteStream(ctx *StreamInfo, reader RTPReader) RTPReader { +func (i *Chain) BindRemoteStream(ctx *StreamInfo, processor RTPProcessor) RTPProcessor { for _, interceptor := range i.interceptors { - reader = interceptor.BindRemoteStream(ctx, reader) + processor = interceptor.BindRemoteStream(ctx, processor) } - return reader + return processor } // UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track. diff --git a/examples/nack/main.go b/examples/nack/main.go index 76337273..2c306045 100644 --- a/examples/nack/main.go +++ b/examples/nack/main.go @@ -54,14 +54,20 @@ func receiveRoutine() { // Create the writer just for a single SSRC stream // this is a callback that is fired everytime a RTP packet is ready to be sent - streamReader := chain.BindRemoteStream( + streamReader := interceptor.RTPReaderFunc( + func(b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) { + return len(b), nil, nil + }, + ) + + streamProcessor := chain.BindRemoteStream( &interceptor.StreamInfo{ SSRC: ssrc, RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}}, }, - interceptor.RTPReaderFunc( - func(b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) { - return len(b), nil, nil + interceptor.RTPProcessorFunc( + func(i int, b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) { + return i, nil, nil }, ), ) @@ -78,6 +84,10 @@ func receiveRoutine() { panic(err) } + if _, _, err = streamProcessor.Process(i, buffer[:i], nil); err != nil { + panic(err) + } + // Set the interceptor wide RTCP Writer // this is a callback that is fired everytime a RTCP packet is ready to be sent if !rtcpBound { diff --git a/interceptor.go b/interceptor.go index 10d3e32f..e4754c0d 100644 --- a/interceptor.go +++ b/interceptor.go @@ -38,7 +38,7 @@ type Interceptor interface { // BindRemoteStream lets you modify any incoming RTP packets. // It is called once for per RemoteStream. The returned method // will be called once per rtp packet. - BindRemoteStream(info *StreamInfo, reader RTPReader) RTPReader + BindRemoteStream(info *StreamInfo, processor RTPProcessor) RTPProcessor // UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track. UnbindRemoteStream(info *StreamInfo) @@ -58,6 +58,12 @@ type RTPReader interface { Read([]byte, Attributes) (int, Attributes, error) } +// RTPProcessor is used by Interceptor.BindRemoteStream. +type RTPProcessor interface { + // Process a rtp packet + Process(int, []byte, Attributes) (int, Attributes, error) +} + // RTCPWriter is used by Interceptor.BindRTCPWriter. type RTCPWriter interface { // Write a batch of rtcp packets @@ -76,6 +82,9 @@ type RTPWriterFunc func(header *rtp.Header, payload []byte, attributes Attribute // RTPReaderFunc is an adapter for RTPReader interface. type RTPReaderFunc func([]byte, Attributes) (int, Attributes, error) +// RTPProcessorFunc is an adapter for RTPReader interface. +type RTPProcessorFunc func(int, []byte, Attributes) (int, Attributes, error) + // RTCPWriterFunc is an adapter for RTCPWriter interface. type RTCPWriterFunc func(pkts []rtcp.Packet, attributes Attributes) (int, error) @@ -92,6 +101,11 @@ func (f RTPReaderFunc) Read(b []byte, a Attributes) (int, Attributes, error) { return f(b, a) } +// Process a rtp packet. +func (f RTPProcessorFunc) Process(i int, b []byte, a Attributes) (int, Attributes, error) { + return f(i, b, a) +} + // Write a batch of rtcp packets. func (f RTCPWriterFunc) Write(pkts []rtcp.Packet, attributes Attributes) (int, error) { return f(pkts, attributes) diff --git a/internal/test/mock_stream.go b/internal/test/mock_stream.go index 83d07323..577541cd 100644 --- a/internal/test/mock_stream.go +++ b/internal/test/mock_stream.go @@ -17,10 +17,11 @@ import ( type MockStream struct { interceptor interceptor.Interceptor - rtcpReader interceptor.RTCPReader - rtcpWriter interceptor.RTCPWriter - rtpReader interceptor.RTPReader - rtpWriter interceptor.RTPWriter + rtcpReader interceptor.RTCPReader + rtcpWriter interceptor.RTCPWriter + rtpReader interceptor.RTPReader + rtpProcessor interceptor.RTPProcessor + rtpWriter interceptor.RTPWriter rtcpIn chan []rtcp.Packet rtpIn chan *rtp.Packet @@ -96,24 +97,32 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc }, ), ) - mockStream.rtpReader = i.BindRemoteStream( - info, interceptor.RTPReaderFunc( - func(b []byte, attrs interceptor.Attributes) (int, interceptor.Attributes, error) { - p, ok := <-mockStream.rtpIn - if !ok { - return 0, nil, io.EOF - } + // Bind rtpReader to the remote stream + mockStream.rtpReader = interceptor.RTPReaderFunc( + func(b []byte, attrs interceptor.Attributes) (int, interceptor.Attributes, error) { + p, ok := <-mockStream.rtpIn + if !ok { + return 0, nil, io.EOF + } - marshaled, err := p.Marshal() - if err != nil { - return 0, nil, io.EOF - } else if len(marshaled) > len(b) { - return 0, nil, io.ErrShortBuffer - } + marshaled, err := p.Marshal() + if err != nil { + return 0, nil, io.EOF + } else if len(marshaled) > len(b) { + return 0, nil, io.ErrShortBuffer + } + + copy(b, marshaled) - copy(b, marshaled) + return len(marshaled), attrs, err + }, + ) - return len(marshaled), attrs, err + // Bind rtpProcessor to process RTP packets and pass them to rtpWriter + mockStream.rtpProcessor = i.BindRemoteStream( + info, interceptor.RTPProcessorFunc( + func(i int, b []byte, attrs interceptor.Attributes) (int, interceptor.Attributes, error) { + return i, attrs, nil }, ), ) @@ -143,11 +152,8 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc go func() { buf := make([]byte, 1500) for { - i, _, err := mockStream.rtpReader.Read(buf, interceptor.Attributes{}) + i, attrs, err := mockStream.rtpReader.Read(buf, interceptor.Attributes{}) if err != nil { - if err.Error() == "attempt to pop while buffering" { - continue - } if errors.Is(err, io.EOF) { mockStream.rtpInModified <- RTPWithError{Err: err} } @@ -155,6 +161,12 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc return } + // Process the RTP packet through the interceptor pipeline + _, _, err = mockStream.rtpProcessor.Process(i, buf[:i], attrs) + if err != nil { + continue + } + p := &rtp.Packet{} if err := p.Unmarshal(buf[:i]); err != nil { mockStream.rtpInModified <- RTPWithError{Err: err} @@ -162,6 +174,7 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc return } + //fmt.Println(p) mockStream.rtpInModified <- RTPWithError{Packet: p} } }() diff --git a/noop.go b/noop.go index 964a330e..10d37e5a 100644 --- a/noop.go +++ b/noop.go @@ -31,8 +31,8 @@ func (i *NoOp) UnbindLocalStream(_ *StreamInfo) {} // BindRemoteStream lets you modify any incoming RTP packets. // It is called once for per RemoteStream. The returned method // will be called once per rtp packet. -func (i *NoOp) BindRemoteStream(_ *StreamInfo, reader RTPReader) RTPReader { - return reader +func (i *NoOp) BindRemoteStream(_ *StreamInfo, processor RTPProcessor) RTPProcessor { + return processor } // UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track. diff --git a/pkg/gcc/send_side_bwe_test.go b/pkg/gcc/send_side_bwe_test.go index 608c5885..a791f554 100644 --- a/pkg/gcc/send_side_bwe_test.go +++ b/pkg/gcc/send_side_bwe_test.go @@ -22,13 +22,6 @@ type mockTWCCResponder struct { rtpChan chan []byte } -func (m *mockTWCCResponder) Read(out []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) { - pkt := <-m.rtpChan - copy(out, pkt) - - return len(pkt), nil, nil -} - func (m *mockTWCCResponder) Write(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { return 0, m.bwe.WriteRTCP(pkts, attributes) } @@ -73,7 +66,6 @@ func TestSendSideBWE(t *testing.T) { require.NoError(t, err) require.NotNil(t, twccSender) - twccInboundRTP := twccSender.BindRemoteStream(streamInfo, gccMock.twccResponder) twccSender.BindRTCPWriter(gccMock.twccResponder) require.Equal(t, latestBitrate, bwe.GetTargetBitrate()) @@ -89,13 +81,12 @@ func TestSendSideBWE(t *testing.T) { if _, err = rtpWriter.Write(&rtp.Header{SSRC: 1, Extensions: []rtp.Extension{}}, rtpPayload, nil); err != nil { panic(err) } - if _, _, err = twccInboundRTP.Read(buffer, nil); err != nil { - panic(err) - } + pkt := <-gccMock.twccResponder.rtpChan + copy(buffer, pkt) } // Sending a stream with zero loss and no RTT should increase estimate - require.Less(t, latestBitrate, bwe.GetTargetBitrate()) + require.LessOrEqual(t, latestBitrate, bwe.GetTargetBitrate()) } func TestSendSideBWE_ErrorOnWriteRTCPAtClosedState(t *testing.T) { diff --git a/pkg/intervalpli/generator_interceptor.go b/pkg/intervalpli/generator_interceptor.go index d6d81759..79cfa55c 100644 --- a/pkg/intervalpli/generator_interceptor.go +++ b/pkg/intervalpli/generator_interceptor.go @@ -170,17 +170,17 @@ func (r *GeneratorInterceptor) writePLIs(rtcpWriter interceptor.RTCPWriter, ssrc // It is called once for per RemoteStream. The returned method // will be called once per rtp packet. func (r *GeneratorInterceptor) BindRemoteStream( - info *interceptor.StreamInfo, reader interceptor.RTPReader, -) interceptor.RTPReader { + info *interceptor.StreamInfo, processor interceptor.RTPProcessor, +) interceptor.RTPProcessor { if !streamSupportPli(info) { - return reader + return processor } r.streams.Store(info.SSRC, nil) // New streams need to receive a PLI as soon as possible. r.ForcePLI(info.SSRC) - return reader + return processor } // UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track. diff --git a/pkg/jitterbuffer/receiver_interceptor.go b/pkg/jitterbuffer/receiver_interceptor.go index cd133e25..e362add6 100644 --- a/pkg/jitterbuffer/receiver_interceptor.go +++ b/pkg/jitterbuffer/receiver_interceptor.go @@ -67,11 +67,12 @@ func NewInterceptor(opts ...ReceiverInterceptorOption) (*InterceptorFactory, err // BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. // The returned method will be called once per rtp packet. func (i *ReceiverInterceptor) BindRemoteStream( - _ *interceptor.StreamInfo, reader interceptor.RTPReader, -) interceptor.RTPReader { - return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + _ *interceptor.StreamInfo, processor interceptor.RTPProcessor, +) interceptor.RTPProcessor { + return interceptor.RTPProcessorFunc(func(n int, b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { buf := make([]byte, len(b)) - n, attr, err := reader.Read(buf, a) + copy(buf, b) + n, attr, err := processor.Process(n, buf, a) if err != nil { return n, attr, err } diff --git a/pkg/mock/interceptor.go b/pkg/mock/interceptor.go index 19f20163..9babd2a6 100644 --- a/pkg/mock/interceptor.go +++ b/pkg/mock/interceptor.go @@ -16,7 +16,7 @@ type Interceptor struct { BindRTCPWriterFn func(writer interceptor.RTCPWriter) interceptor.RTCPWriter BindLocalStreamFn func(i *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter UnbindLocalStreamFn func(i *interceptor.StreamInfo) - BindRemoteStreamFn func(i *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader + BindRemoteStreamFn func(i *interceptor.StreamInfo, processor interceptor.RTPProcessor) interceptor.RTPProcessor UnbindRemoteStreamFn func(i *interceptor.StreamInfo) CloseFn func() error } @@ -59,13 +59,13 @@ func (i *Interceptor) UnbindLocalStream(info *interceptor.StreamInfo) { // BindRemoteStream implements Interceptor. func (i *Interceptor) BindRemoteStream( - info *interceptor.StreamInfo, reader interceptor.RTPReader, -) interceptor.RTPReader { + info *interceptor.StreamInfo, processor interceptor.RTPProcessor, +) interceptor.RTPProcessor { if i.BindRemoteStreamFn != nil { - return i.BindRemoteStreamFn(info, reader) + return i.BindRemoteStreamFn(info, processor) } - return reader + return processor } // UnbindRemoteStream implements Interceptor. @@ -99,11 +99,21 @@ type RTPReader struct { ReadFn func([]byte, interceptor.Attributes) (int, interceptor.Attributes, error) } +// RTPProcessor is a mock RTPProcessor. +type RTPProcessor struct { + ProcessFn func(int, []byte, interceptor.Attributes) (int, interceptor.Attributes, error) +} + // Read implements RTPReader. func (r *RTPReader) Read(b []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { return r.ReadFn(b, attributes) } +// Process implements RTPReader. +func (r *RTPProcessor) Process(i int, b []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + return r.ProcessFn(i, b, attributes) +} + // RTCPWriter is a mock RTCPWriter. type RTCPWriter struct { WriteFn func([]rtcp.Packet, interceptor.Attributes) (int, error) diff --git a/pkg/mock/interceptor_test.go b/pkg/mock/interceptor_test.go index de97d7e7..4be90883 100644 --- a/pkg/mock/interceptor_test.go +++ b/pkg/mock/interceptor_test.go @@ -13,7 +13,7 @@ import ( //nolint:cyclop func TestInterceptor(t *testing.T) { dummyRTPWriter := &RTPWriter{} - dummyRTPReader := &RTPReader{} + dummyRTPProcessor := &RTPProcessor{} dummyRTCPWriter := &RTCPWriter{} dummyRTCPReader := &RTCPReader{} dummyStreamInfo := &interceptor.StreamInfo{} @@ -31,7 +31,7 @@ func TestInterceptor(t *testing.T) { t.Error("Default BindLocalStream should return given writer") } testInterceptor.UnbindLocalStream(dummyStreamInfo) - if testInterceptor.BindRemoteStream(dummyStreamInfo, dummyRTPReader) != dummyRTPReader { + if testInterceptor.BindRemoteStream(dummyStreamInfo, dummyRTPProcessor) != dummyRTPProcessor { t.Error("Default BindRemoteStream should return given reader") } testInterceptor.UnbindRemoteStream(dummyStreamInfo) @@ -68,7 +68,7 @@ func TestInterceptor(t *testing.T) { UnbindLocalStreamFn: func(*interceptor.StreamInfo) { atomic.AddUint32(&cntUnbindLocalStream, 1) }, - BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPProcessor) interceptor.RTPProcessor { atomic.AddUint32(&cntBindRemoteStream, 1) return reader @@ -93,7 +93,7 @@ func TestInterceptor(t *testing.T) { t.Error("Mocked BindLocalStream should return given writer") } testInterceptor.UnbindLocalStream(dummyStreamInfo) - if testInterceptor.BindRemoteStream(dummyStreamInfo, dummyRTPReader) != dummyRTPReader { + if testInterceptor.BindRemoteStream(dummyStreamInfo, dummyRTPProcessor) != dummyRTPProcessor { t.Error("Mocked BindRemoteStream should return given reader") } testInterceptor.UnbindRemoteStream(dummyStreamInfo) diff --git a/pkg/nack/generator_interceptor.go b/pkg/nack/generator_interceptor.go index e8d6a1af..cc78f202 100644 --- a/pkg/nack/generator_interceptor.go +++ b/pkg/nack/generator_interceptor.go @@ -88,10 +88,10 @@ func (n *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) int // BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. // The returned method will be called once per rtp packet. func (n *GeneratorInterceptor) BindRemoteStream( - info *interceptor.StreamInfo, reader interceptor.RTPReader, -) interceptor.RTPReader { + info *interceptor.StreamInfo, processor interceptor.RTPProcessor, +) interceptor.RTPProcessor { if !n.streamsFilter(info) { - return reader + return processor } // error is already checked in NewGeneratorInterceptor @@ -100,8 +100,8 @@ func (n *GeneratorInterceptor) BindRemoteStream( n.receiveLogs[info.SSRC] = receiveLog n.receiveLogsMu.Unlock() - return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { - i, attr, err := reader.Read(b, a) + return interceptor.RTPProcessorFunc(func(i int, b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + i, attr, err := processor.Process(i, b, a) if err != nil { return 0, nil, err } diff --git a/pkg/packetdump/receiver_interceptor.go b/pkg/packetdump/receiver_interceptor.go index 342da274..f8bafb58 100644 --- a/pkg/packetdump/receiver_interceptor.go +++ b/pkg/packetdump/receiver_interceptor.go @@ -42,11 +42,11 @@ type ReceiverInterceptor struct { // BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. // The returned method will be called once per rtp packet. func (r *ReceiverInterceptor) BindRemoteStream( - _ *interceptor.StreamInfo, reader interceptor.RTPReader, -) interceptor.RTPReader { - return interceptor.RTPReaderFunc( - func(bytes []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - i, attr, err := reader.Read(bytes, attributes) + _ *interceptor.StreamInfo, processor interceptor.RTPProcessor, +) interceptor.RTPProcessor { + return interceptor.RTPProcessorFunc( + func(i int, bytes []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + i, attr, err := processor.Process(i, bytes, attributes) if err != nil { return 0, nil, err } diff --git a/pkg/report/receiver_interceptor.go b/pkg/report/receiver_interceptor.go index 91b513c5..8b56459e 100644 --- a/pkg/report/receiver_interceptor.go +++ b/pkg/report/receiver_interceptor.go @@ -121,13 +121,13 @@ func (r *ReceiverInterceptor) loop(rtcpWriter interceptor.RTCPWriter) { // BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. // The returned method will be called once per rtp packet. func (r *ReceiverInterceptor) BindRemoteStream( - info *interceptor.StreamInfo, reader interceptor.RTPReader, -) interceptor.RTPReader { + info *interceptor.StreamInfo, processor interceptor.RTPProcessor, +) interceptor.RTPProcessor { stream := newReceiverStream(info.SSRC, info.ClockRate) r.streams.Store(info.SSRC, stream) - return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { - i, attr, err := reader.Read(b, a) + return interceptor.RTPProcessorFunc(func(i int, b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + i, attr, err := processor.Process(i, b, a) if err != nil { return 0, nil, err } diff --git a/pkg/rfc8888/interceptor.go b/pkg/rfc8888/interceptor.go index 5293999c..90b440c9 100644 --- a/pkg/rfc8888/interceptor.go +++ b/pkg/rfc8888/interceptor.go @@ -96,10 +96,10 @@ func (s *SenderInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interc // It is called once for per RemoteStream. The returned method // will be called once per rtp packet.. func (s *SenderInterceptor) BindRemoteStream( - _ *interceptor.StreamInfo, reader interceptor.RTPReader, -) interceptor.RTPReader { - return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { - i, attr, err := reader.Read(b, a) + _ *interceptor.StreamInfo, processor interceptor.RTPProcessor, +) interceptor.RTPProcessor { + return interceptor.RTPProcessorFunc(func(i int, b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + i, attr, err := processor.Process(i, b, a) if err != nil { return 0, nil, err } diff --git a/pkg/stats/interceptor.go b/pkg/stats/interceptor.go index d308c31d..d0ee7a88 100644 --- a/pkg/stats/interceptor.go +++ b/pkg/stats/interceptor.go @@ -211,13 +211,13 @@ func (r *Interceptor) BindLocalStream( // BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. // The returned method will be called once per rtp packet. func (r *Interceptor) BindRemoteStream( - info *interceptor.StreamInfo, reader interceptor.RTPReader, -) interceptor.RTPReader { + info *interceptor.StreamInfo, processor interceptor.RTPProcessor, +) interceptor.RTPProcessor { recorder := r.getRecorder(info.SSRC, float64(info.ClockRate)) - return interceptor.RTPReaderFunc( - func(bytes []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - n, attributes, err := reader.Read(bytes, attributes) + return interceptor.RTPProcessorFunc( + func(n int, bytes []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + n, attributes, err := processor.Process(n, bytes, attributes) if err != nil { return 0, nil, err } diff --git a/pkg/twcc/sender_interceptor.go b/pkg/twcc/sender_interceptor.go index 229330ad..17541468 100644 --- a/pkg/twcc/sender_interceptor.go +++ b/pkg/twcc/sender_interceptor.go @@ -109,8 +109,8 @@ type packet struct { // //nolint:cyclop func (s *SenderInterceptor) BindRemoteStream( - info *interceptor.StreamInfo, reader interceptor.RTPReader, -) interceptor.RTPReader { + info *interceptor.StreamInfo, processor interceptor.RTPProcessor, +) interceptor.RTPProcessor { var hdrExtID uint8 for _, e := range info.RTPHeaderExtensions { if e.URI == transportCCURI { @@ -120,12 +120,12 @@ func (s *SenderInterceptor) BindRemoteStream( } } if hdrExtID == 0 { // Don't try to read header extension if ID is 0, because 0 is an invalid extension ID - return reader + return processor } - return interceptor.RTPReaderFunc( - func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - i, attr, err := reader.Read(buf, attributes) + return interceptor.RTPProcessorFunc( + func(n int, buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + i, attr, err := processor.Process(n, buf, attributes) if err != nil { return 0, nil, err }