From 4337e81806bc66b689617568df4a30f7a73383d4 Mon Sep 17 00:00:00 2001 From: "al.bash" Date: Tue, 8 Jul 2025 13:25:56 +0300 Subject: [PATCH 1/2] Add negotiated codec parameters to StreamInfo --- interceptor.go | 19 +++++++++++++++++++ interceptor_test.go | 44 +++++++++++++++++++++++++++++++++++++++++--- peerconnection.go | 1 + rtpreceiver.go | 3 ++- rtpsender.go | 1 + 5 files changed, 64 insertions(+), 4 deletions(-) diff --git a/interceptor.go b/interceptor.go index 5d7dea4c89c..40a5f9fe848 100644 --- a/interceptor.go +++ b/interceptor.go @@ -222,6 +222,7 @@ func createStreamInfo( payloadType, payloadTypeRTX, payloadTypeFEC PayloadType, codec RTPCodecCapability, webrtcHeaderExtensions []RTPHeaderExtensionParameter, + codecs []RTPCodecParameters, ) *interceptor.StreamInfo { headerExtensions := make([]interceptor.RTPHeaderExtension, 0, len(webrtcHeaderExtensions)) for _, h := range webrtcHeaderExtensions { @@ -233,6 +234,23 @@ func createStreamInfo( feedbacks = append(feedbacks, interceptor.RTCPFeedback{Type: f.Type, Parameter: f.Parameter}) } + streamCodecs := make([]interceptor.RTPCodecParameters, 0, len(codecs)) + for _, c := range codecs { + feedbacks := make([]interceptor.RTCPFeedback, 0, len(c.RTCPFeedback)) + for _, f := range c.RTCPFeedback { + feedbacks = append(feedbacks, interceptor.RTCPFeedback{Type: f.Type, Parameter: f.Parameter}) + } + + streamCodecs = append(streamCodecs, interceptor.RTPCodecParameters{ + MimeType: c.MimeType, + ClockRate: c.ClockRate, + Channels: c.Channels, + SDPFmtpLine: c.SDPFmtpLine, + RTCPFeedback: feedbacks, + PayloadType: uint8(c.PayloadType), + }) + } + return &interceptor.StreamInfo{ ID: id, Attributes: interceptor.Attributes{}, @@ -248,5 +266,6 @@ func createStreamInfo( Channels: codec.Channels, SDPFmtpLine: codec.SDPFmtpLine, RTCPFeedback: feedbacks, + Codecs: streamCodecs, } } diff --git a/interceptor_test.go b/interceptor_test.go index 335ee0f368a..dc0afec7c1f 100644 --- a/interceptor_test.go +++ b/interceptor_test.go @@ -28,6 +28,7 @@ import ( // * Assert an extension can be set on an outbound packet // * Assert an extension can be read on an outbound packet // * Assert that attributes set by an interceptor are returned to the Reader. +// * Assert that StreamInfo.Codecs contain capabilities of all negotiated codecs for track's SSRC func TestPeerConnection_Interceptor(t *testing.T) { to := test.TimeOut(time.Second * 20) defer to.Stop() @@ -36,11 +37,47 @@ func TestPeerConnection_Interceptor(t *testing.T) { defer report() createPC := func() *PeerConnection { + videoRTCPFeedback := []RTCPFeedback{{"goog-remb", ""}, {"ccm", "fir"}, {"nack", ""}, {"nack", "pli"}} + videoCodecs := []RTPCodecParameters{ + { + RTPCodecCapability: RTPCodecCapability{MimeTypeVP8, 90000, 0, "", videoRTCPFeedback}, + PayloadType: 96, + }, + { + RTPCodecCapability: RTPCodecCapability{MimeTypeVP9, 90000, 0, "profile-id=0", videoRTCPFeedback}, + PayloadType: 98, + }, + } + + me := &MediaEngine{} + for _, videoCodec := range videoCodecs { + err := me.RegisterCodec(videoCodec, RTPCodecTypeVideo) + assert.NoError(t, err) + } + + streamInfoCodecs := make([]interceptor.RTPCodecParameters, 0, len(videoCodecs)) + for _, c := range videoCodecs { + feedbacks := make([]interceptor.RTCPFeedback, 0, len(c.RTCPFeedback)) + for _, f := range c.RTCPFeedback { + feedbacks = append(feedbacks, interceptor.RTCPFeedback{Type: f.Type, Parameter: f.Parameter}) + } + + streamInfoCodecs = append(streamInfoCodecs, interceptor.RTPCodecParameters{ + MimeType: c.MimeType, + ClockRate: c.ClockRate, + Channels: c.Channels, + SDPFmtpLine: c.SDPFmtpLine, + RTCPFeedback: feedbacks, + PayloadType: uint8(c.PayloadType), + }) + } + ir := &interceptor.Registry{} ir.Add(&mock_interceptor.Factory{ NewInterceptorFn: func(_ string) (interceptor.Interceptor, error) { return &mock_interceptor.Interceptor{ - BindLocalStreamFn: func(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + BindLocalStreamFn: func(streamInfo *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + assert.Equal(t, streamInfoCodecs, streamInfo.Codecs) return interceptor.RTPWriterFunc( func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { // set extension on outgoing packet @@ -52,7 +89,8 @@ func TestPeerConnection_Interceptor(t *testing.T) { }, ) }, - BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + BindRemoteStreamFn: func(streamInfo *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + assert.Equal(t, streamInfoCodecs, streamInfo.Codecs) return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { if a == nil { a = interceptor.Attributes{} @@ -67,7 +105,7 @@ func TestPeerConnection_Interceptor(t *testing.T) { }, }) - pc, err := NewAPI(WithInterceptorRegistry(ir)).NewPeerConnection(Configuration{}) + pc, err := NewAPI(WithInterceptorRegistry(ir), WithMediaEngine(me)).NewPeerConnection(Configuration{}) assert.NoError(t, err) return pc diff --git a/peerconnection.go b/peerconnection.go index 50b05cd058b..b3c1ee86e14 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -1778,6 +1778,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err 0, 0, params.Codecs[0].RTPCodecCapability, params.HeaderExtensions, + params.Codecs, ) readStream, interceptor, rtcpReadStream, rtcpInterceptor, err := pc.dtlsTransport.streamsForSSRC(ssrc, *streamInfo) if err != nil { diff --git a/rtpreceiver.go b/rtpreceiver.go index 0e85b2d958c..6527ff2a441 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -224,6 +224,7 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error { //no 0, 0, 0, 0, 0, codec, globalParams.HeaderExtensions, + globalParams.Codecs, ) var err error @@ -233,7 +234,7 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error { //no } if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 { - streamInfo := createStreamInfo("", rtxSsrc, 0, 0, 0, 0, 0, codec, globalParams.HeaderExtensions) + streamInfo := createStreamInfo("", rtxSsrc, 0, 0, 0, 0, 0, codec, globalParams.HeaderExtensions, globalParams.Codecs) rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC( rtxSsrc, *streamInfo, diff --git a/rtpsender.go b/rtpsender.go index dd4107ecbc0..e70d3cd9754 100644 --- a/rtpsender.go +++ b/rtpsender.go @@ -358,6 +358,7 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error { findFECPayloadType(rtpParameters.Codecs), codec.RTPCodecCapability, parameters.HeaderExtensions, + parameters.Codecs, ) rtpInterceptor := r.api.interceptor.BindLocalStream( From 801af074064ca8f5aa7cc8aaa04ab5e966ba2665 Mon Sep 17 00:00:00 2001 From: "al.bash" Date: Tue, 8 Jul 2025 17:00:20 +0300 Subject: [PATCH 2/2] Add only necessary information to StreamInfo --- interceptor.go | 18 +++--------------- interceptor_test.go | 20 ++++---------------- 2 files changed, 7 insertions(+), 31 deletions(-) diff --git a/interceptor.go b/interceptor.go index 40a5f9fe848..0695538e7e5 100644 --- a/interceptor.go +++ b/interceptor.go @@ -234,21 +234,9 @@ func createStreamInfo( feedbacks = append(feedbacks, interceptor.RTCPFeedback{Type: f.Type, Parameter: f.Parameter}) } - streamCodecs := make([]interceptor.RTPCodecParameters, 0, len(codecs)) + payloadToMimeType := make(map[uint8]string) for _, c := range codecs { - feedbacks := make([]interceptor.RTCPFeedback, 0, len(c.RTCPFeedback)) - for _, f := range c.RTCPFeedback { - feedbacks = append(feedbacks, interceptor.RTCPFeedback{Type: f.Type, Parameter: f.Parameter}) - } - - streamCodecs = append(streamCodecs, interceptor.RTPCodecParameters{ - MimeType: c.MimeType, - ClockRate: c.ClockRate, - Channels: c.Channels, - SDPFmtpLine: c.SDPFmtpLine, - RTCPFeedback: feedbacks, - PayloadType: uint8(c.PayloadType), - }) + payloadToMimeType[uint8(c.PayloadType)] = c.MimeType } return &interceptor.StreamInfo{ @@ -266,6 +254,6 @@ func createStreamInfo( Channels: codec.Channels, SDPFmtpLine: codec.SDPFmtpLine, RTCPFeedback: feedbacks, - Codecs: streamCodecs, + PayloadToMimeType: payloadToMimeType, } } diff --git a/interceptor_test.go b/interceptor_test.go index dc0afec7c1f..241135e3ae8 100644 --- a/interceptor_test.go +++ b/interceptor_test.go @@ -55,21 +55,9 @@ func TestPeerConnection_Interceptor(t *testing.T) { assert.NoError(t, err) } - streamInfoCodecs := make([]interceptor.RTPCodecParameters, 0, len(videoCodecs)) + payloadToMimeType := make(map[uint8]string) for _, c := range videoCodecs { - feedbacks := make([]interceptor.RTCPFeedback, 0, len(c.RTCPFeedback)) - for _, f := range c.RTCPFeedback { - feedbacks = append(feedbacks, interceptor.RTCPFeedback{Type: f.Type, Parameter: f.Parameter}) - } - - streamInfoCodecs = append(streamInfoCodecs, interceptor.RTPCodecParameters{ - MimeType: c.MimeType, - ClockRate: c.ClockRate, - Channels: c.Channels, - SDPFmtpLine: c.SDPFmtpLine, - RTCPFeedback: feedbacks, - PayloadType: uint8(c.PayloadType), - }) + payloadToMimeType[uint8(c.PayloadType)] = c.MimeType } ir := &interceptor.Registry{} @@ -77,7 +65,7 @@ func TestPeerConnection_Interceptor(t *testing.T) { NewInterceptorFn: func(_ string) (interceptor.Interceptor, error) { return &mock_interceptor.Interceptor{ BindLocalStreamFn: func(streamInfo *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { - assert.Equal(t, streamInfoCodecs, streamInfo.Codecs) + assert.Equal(t, payloadToMimeType, streamInfo.PayloadToMimeType) return interceptor.RTPWriterFunc( func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { // set extension on outgoing packet @@ -90,7 +78,7 @@ func TestPeerConnection_Interceptor(t *testing.T) { ) }, BindRemoteStreamFn: func(streamInfo *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { - assert.Equal(t, streamInfoCodecs, streamInfo.Codecs) + assert.Equal(t, payloadToMimeType, streamInfo.PayloadToMimeType) return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { if a == nil { a = interceptor.Attributes{}