diff --git a/pkg/router.go b/pkg/router.go index 178fbd954..654cba975 100644 --- a/pkg/router.go +++ b/pkg/router.go @@ -202,6 +202,15 @@ func (r *router) addSender(p *WebRTCTransport, rr *receiverRouter) error { log.Errorf("Error closing sender: %s", err) } }) + for _, t := range p.pc.GetTransceivers() { + if t.Sender() != nil && t.Sender().Track().SSRC() == ssrc { + p.pendingSenders.PushBack(&pendingSender{ + transceiver: t, + sender: sender, + }) + break + } + } p.AddSender(rr.stream, sender) recv.AddSender(sender) return nil diff --git a/pkg/webrtctransport.go b/pkg/webrtctransport.go index 1ed2d5e69..5d055444f 100644 --- a/pkg/webrtctransport.go +++ b/pkg/webrtctransport.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/gammazero/deque" + "github.com/bep/debounce" "github.com/lucsky/cuid" log "github.com/pion/ion-log" @@ -24,21 +26,26 @@ type WebRTCTransportConfig struct { // WebRTCTransport represents a sfu peer connection type WebRTCTransport struct { id string - ctx context.Context - cancel context.CancelFunc pc *webrtc.PeerConnection me MediaEngine mu sync.RWMutex - candidates []webrtc.ICECandidateInit + ctx context.Context + cancel context.CancelFunc + router Router session *Session - mids map[string]Sender senders map[string][]Sender - router Router + candidates []webrtc.ICECandidateInit + pendingSenders deque.Deque onTrackHandler func(*webrtc.Track, *webrtc.RTPReceiver) subOnce sync.Once } +type pendingSender struct { + transceiver *webrtc.RTPTransceiver + sender Sender +} + // NewWebRTCTransport creates a new WebRTCTransport func NewWebRTCTransport(ctx context.Context, session *Session, me MediaEngine, cfg WebRTCTransportConfig) (*WebRTCTransport, error) { api := webrtc.NewAPI(webrtc.WithMediaEngine(me.MediaEngine), webrtc.WithSettingEngine(cfg.setting)) @@ -59,9 +66,9 @@ func NewWebRTCTransport(ctx context.Context, session *Session, me MediaEngine, c me: me, session: session, router: newRouter(pc, id, cfg.router), - mids: make(map[string]Sender), senders: make(map[string][]Sender), } + p.pendingSenders.SetMinCapacity(2) // Add transport to the session session.AddTransport(p) @@ -124,58 +131,12 @@ func NewWebRTCTransport(ctx context.Context, session *Session, me MediaEngine, c // CreateOffer generates the localDescription func (p *WebRTCTransport) CreateOffer() (webrtc.SessionDescription, error) { - offer, err := p.pc.CreateOffer(nil) - if err != nil { - return webrtc.SessionDescription{}, err - } - parsed := sdp.SessionDescription{} - if err := parsed.Unmarshal([]byte(offer.SDP)); err == nil { - for _, md := range parsed.MediaDescriptions { - if md.MediaName.Media != mediaNameAudio && md.MediaName.Media != mediaNameVideo { - continue - } - var msid, mid string - - for _, att := range md.Attributes { - switch att.Key { - case sdp.AttrKeyMID: - mid = att.Value - if len(msid) > 0 { - break - } - case sdp.AttrKeyMsid: - msid = att.Value - if len(mid) > 0 { - break - } - } - } - if len(msid) > 0 && len(mid) > 0 { - split := strings.Split(msid, " ") - sid := split[0] - tid := split[1] - // find sender for mid - for _, sender := range p.senders[sid] { - if sender.Track().ID() == tid { - p.mids[mid] = sender - } - } - } - } - } - - return offer, nil + return p.pc.CreateOffer(nil) } // SetLocalDescription sets the SessionDescription of the remote peer func (p *WebRTCTransport) SetLocalDescription(desc webrtc.SessionDescription) error { - err := p.pc.SetLocalDescription(desc) - if err != nil { - log.Errorf("SetLocalDescription error: %v", err) - return err - } - - return nil + return p.pc.SetLocalDescription(desc) } // CreateAnswer generates the localDescription @@ -212,42 +173,50 @@ func (p *WebRTCTransport) SetRemoteDescription(desc webrtc.SessionDescription) e p.candidates = nil } - for _, md := range pd.MediaDescriptions { - if md.MediaName.Media != mediaNameAudio && md.MediaName.Media != mediaNameVideo { - continue - } - var ( - ext int - id string - ) - - for _, att := range md.Attributes { - if att.Key == sdp.AttrKeyMID { - if p.mids[att.Value] != nil { - p.mids[att.Value].Start() - // remove mid mapping in case transceiver is reused later - p.mids[att.Value] = nil + switch desc.Type { + case webrtc.SDPTypeAnswer: + if p.pendingSenders.Len() != 0 { + for _, md := range pd.MediaDescriptions { + if mid, ok := md.Attribute(sdp.AttrKeyMID); ok { + for i := 0; i < p.pendingSenders.Len(); i++ { + ps := p.pendingSenders.PopFront().(*pendingSender) + if ps.transceiver.Mid() == mid { + ps.sender.Start() + } else { + p.pendingSenders.PushBack(ps) + } + } } } - - if att.Key == sdp.AttrKeyExtMap && strings.HasSuffix(att.Value, sdp.TransportCCURI) { - ext, _ = strconv.Atoi(att.Value[:1]) - if len(id) > 0 { - break - } + } + case webrtc.SDPTypeOffer: + for _, md := range pd.MediaDescriptions { + if md.MediaName.Media != mediaNameAudio && md.MediaName.Media != mediaNameVideo { + continue } - if att.Key == sdp.AttrKeyMsid { - v := strings.Split(att.Value, " ") - id = v[len(v)-1] - if ext != 0 { - break + var ( + ext int + id string + ) + for _, att := range md.Attributes { + if att.Key == sdp.AttrKeyExtMap && strings.HasSuffix(att.Value, sdp.TransportCCURI) { + ext, _ = strconv.Atoi(att.Value[:1]) + if len(id) > 0 { + break + } + } + if att.Key == sdp.AttrKeyMsid { + v := strings.Split(att.Value, " ") + id = v[len(v)-1] + if ext != 0 { + break + } } } - } - p.router.AddTWCCExt(id, ext) + p.router.AddTWCCExt(id, ext) + } } - return nil }