From baa3e85988173eb31bb5946fb94ea16f940e4b6b Mon Sep 17 00:00:00 2001 From: adwpc Date: Thu, 18 Feb 2021 01:48:35 +0800 Subject: [PATCH] add mediaengine; add ion-load-tool; fix to do more than 14*14 load testing --- client.go | 76 ++++++++++----------- conf.go | 1 + engine.go | 4 +- example/ion-sfu-load-tool/main.go | 98 +++++++++++++++++++++++++++ go.mod | 3 +- go.sum | 19 +++--- mediaengine.go | 107 ++++++++++++++++++++++++++++++ signal.go | 57 ++++++++-------- transport.go | 13 ++-- webmproducer.go | 8 ++- 10 files changed, 303 insertions(+), 83 deletions(-) create mode 100644 example/ion-sfu-load-tool/main.go create mode 100644 mediaengine.go diff --git a/client.go b/client.go index c74c595c..18ce1c02 100644 --- a/client.go +++ b/client.go @@ -53,7 +53,7 @@ type Client struct { func NewClient(addr, id string, cfg WebRTCTransportConfig) *Client { c := &Client{ ID: id, - signal: NewSignal(addr), + signal: NewSignal(addr, id), cfg: cfg, notify: make(chan struct{}), remoteStreamId: make(map[string]string), @@ -80,17 +80,17 @@ func NewClient(addr, id string, cfg WebRTCTransportConfig) *Client { func (c *Client) SetRemoteSDP(sdp webrtc.SessionDescription) error { err := c.pub.pc.SetRemoteDescription(sdp) if err != nil { - log.Errorf("err=%v", err) + log.Errorf("id=%v err=%v", c.ID, err) return err } // it's safe to add cand now after SetRemoteDescription if len(c.pub.RecvCandidates) > 0 { for _, candidate := range c.pub.RecvCandidates { - log.Infof("c.pub.pc.AddICECandidate candidate=%v", candidate) + log.Debugf("id=%v c.pub.pc.AddICECandidate candidate=%v", c.ID, candidate) err = c.pub.pc.AddICECandidate(candidate) if err != nil { - log.Errorf("c.pub.pc.AddICECandidate err=%v", err) + log.Errorf("id=%v c.pub.pc.AddICECandidate err=%v", c.ID, err) } } c.pub.RecvCandidates = []webrtc.ICECandidateInit{} @@ -99,7 +99,7 @@ func (c *Client) SetRemoteSDP(sdp webrtc.SessionDescription) error { // it's safe to send cand now after join ok if len(c.pub.SendCandidates) > 0 { for _, cand := range c.pub.SendCandidates { - log.Infof("sending c.pub.SendCandidates cand=%v", cand) + log.Debugf("id=%v sending c.pub.SendCandidates cand=%v", c.ID, cand) c.signal.Trickle(cand, PUBLISHER) } c.pub.SendCandidates = []*webrtc.ICECandidate{} @@ -109,12 +109,12 @@ func (c *Client) SetRemoteSDP(sdp webrtc.SessionDescription) error { // Join client join a session func (c *Client) Join(sid string) error { - log.Infof("[Client.Join] sid=%v", sid) + log.Debugf("[Client.Join] sid=%v id=%v", sid, c.ID) c.sub.pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { - log.Infof("[c.sub.pc.OnTrack] got track streamId=%v kind=%v ssrc=%v ", track.StreamID(), track.Kind(), track.SSRC()) + log.Debugf("[c.sub.pc.OnTrack] got track streamId=%v kind=%v ssrc=%v ", track.StreamID(), track.Kind(), track.SSRC()) c.streamLock.Lock() c.remoteStreamId[track.StreamID()] = track.StreamID() - log.Infof("c.remoteStreamId=%+v", c.remoteStreamId) + log.Debugf("id=%v len(c.remoteStreamId)=%+v", c.ID, len(c.remoteStreamId)) c.streamLock.Unlock() // user define if c.OnTrack != nil { @@ -129,10 +129,10 @@ func (c *Client) Join(sid string) error { pkt, _, err := track.ReadRTP() if err != nil { if err == io.EOF { - log.Errorf("track.ReadRTP err=%v", err) + log.Errorf("id=%v track.ReadRTP err=%v", c.ID, err) return } - log.Errorf("Error reading track rtp %s", err) + log.Errorf("id=%v Error reading track rtp %s", c.ID, err) continue } c.recvByte += len(pkt.Raw) @@ -142,22 +142,22 @@ func (c *Client) Join(sid string) error { }) c.sub.pc.OnDataChannel(func(dc *webrtc.DataChannel) { - log.Infof("[c.sub.pc.OnDataChannel] got dc %v", dc.Label()) + log.Debugf("id=%v [c.sub.pc.OnDataChannel] got dc %v", c.ID, dc.Label()) if dc.Label() == API_CHANNEL { - log.Infof("got dc %v", dc.Label()) + log.Debugf("%v got dc %v", c.ID, dc.Label()) c.sub.api = dc // send cmd after open c.sub.api.OnOpen(func() { if len(c.apiQueue) > 0 { for _, cmd := range c.apiQueue { - log.Infof("c.sub.api.OnOpen send cmd=%v", cmd) + log.Debugf("%v c.sub.api.OnOpen send cmd=%v", c.ID, cmd) marshalled, err := json.Marshal(cmd) if err != nil { continue } err = c.sub.api.Send(marshalled) if err != nil { - log.Errorf("err=%v", err) + log.Errorf("id=%v err=%v", c.ID, err) } time.Sleep(time.Millisecond * 10) } @@ -166,7 +166,7 @@ func (c *Client) Join(sid string) error { }) return } - log.Infof("got dc %v", dc.Label()) + log.Debugf("%v got dc %v", c.ID, dc.Label()) if c.OnDataChannel != nil { c.OnDataChannel(dc) } @@ -212,7 +212,7 @@ func (c *Client) UnPublish(t *webrtc.RTPTransceiver) error { // Close client close func (c *Client) Close() { - log.Infof("c=%v", c) + log.Debugf("id=%v", c.ID) close(c.notify) if c.pub != nil { c.pub.pc.Close() @@ -224,13 +224,13 @@ func (c *Client) Close() { // CreateDataChannel create a custom datachannel func (c *Client) CreateDataChannel(label string) (*webrtc.DataChannel, error) { - log.Infof("c=%v", c) + log.Debugf("id=%v CreateDataChannel %v", c.ID, label) return c.pub.pc.CreateDataChannel(label, &webrtc.DataChannelInit{}) } // Trickle receive candidate from sfu and add to pc func (c *Client) Trickle(candidate webrtc.ICECandidateInit, target int) { - log.Infof("candidate=%v target=%v", candidate, target) + log.Debugf("id=%v candidate=%v target=%v", c.ID, candidate, target) var t *Transport if target == SUBSCRIBER { t = c.sub @@ -243,7 +243,7 @@ func (c *Client) Trickle(candidate webrtc.ICECandidateInit, target int) { } else { err := t.pc.AddICECandidate(candidate) if err != nil { - log.Errorf("err=%v", err) + log.Errorf("id=%v err=%v", c.ID, err) } } @@ -251,18 +251,18 @@ func (c *Client) Trickle(candidate webrtc.ICECandidateInit, target int) { // Negotiate sub negotiate func (c *Client) Negotiate(sdp webrtc.SessionDescription) error { - log.Infof("Negotiate sdp=%v", sdp) + log.Debugf("id=%v Negotiate sdp=%v", c.ID, sdp) // 1.sub set remote sdp err := c.sub.pc.SetRemoteDescription(sdp) if err != nil { - log.Errorf("Negotiate c.sub.pc.SetRemoteDescription err=%v", err) + log.Errorf("id=%v Negotiate c.sub.pc.SetRemoteDescription err=%v", c.ID, err) return err } // 2. safe to send candiate to sfu after join ok if len(c.sub.SendCandidates) > 0 { for _, cand := range c.sub.SendCandidates { - log.Infof("send sub.SendCandidates c.signal.Trickle cand=%v", cand) + log.Debugf("id=%v send sub.SendCandidates c.ID, c.signal.Trickle cand=%v", c.ID, cand) c.signal.Trickle(cand, SUBSCRIBER) } c.sub.SendCandidates = []*webrtc.ICECandidate{} @@ -271,7 +271,7 @@ func (c *Client) Negotiate(sdp webrtc.SessionDescription) error { // 3. safe to add candidate after SetRemoteDescription if len(c.sub.RecvCandidates) > 0 { for _, candidate := range c.sub.RecvCandidates { - log.Infof("Negotiate c.sub.pc.AddICECandidate candidate=%v", candidate) + log.Debugf("id=%v Negotiate c.sub.pc.AddICECandidate candidate=%v", c.ID, candidate) _ = c.sub.pc.AddICECandidate(candidate) } c.sub.RecvCandidates = []webrtc.ICECandidateInit{} @@ -280,14 +280,14 @@ func (c *Client) Negotiate(sdp webrtc.SessionDescription) error { // 4. create answer after add ice candidate answer, err := c.sub.pc.CreateAnswer(nil) if err != nil { - log.Errorf("err=%v", err) + log.Errorf("id=%v err=%v", c.ID, err) return err } // 5. set local sdp(answer) err = c.sub.pc.SetLocalDescription(answer) if err != nil { - log.Errorf("err=%v", err) + log.Errorf("id=%v err=%v", c.ID, err) return err } @@ -302,23 +302,23 @@ func (c *Client) OnNegotiationNeeded() { // 1. pub create offer offer, err := c.pub.pc.CreateOffer(nil) if err != nil { - log.Errorf("err=%v", err) + log.Errorf("id=%v err=%v", c.ID, err) } // 2. pub set local sdp(offer) err = c.pub.pc.SetLocalDescription(offer) if err != nil { - log.Errorf("err=%v", err) + log.Errorf("id=%v err=%v", c.ID, err) } - log.Infof("OnNegotiationNeeded!! c.pub.pc.CreateOffer and send offer=%v", offer) + log.Debugf("id=%v OnNegotiationNeeded!! c.pub.pc.CreateOffer and send offer=%v", c.ID, offer) //3. send offer to sfu c.signal.Offer(offer) } // selectRemote select remote video/audio func (c *Client) selectRemote(streamId, video string, audio bool) error { - log.Infof("streamId=%v video=%v audio=%v", streamId, video, audio) + log.Debugf("id=%v streamId=%v video=%v audio=%v", c.ID, streamId, video, audio) call := Call{ StreamID: streamId, Video: video, @@ -327,7 +327,7 @@ func (c *Client) selectRemote(streamId, video string, audio bool) error { // cache cmd when dc not ready if c.sub.api == nil || c.sub.api.ReadyState() != webrtc.DataChannelStateOpen { - log.Infof("append to c.apiQueue call=%v", call) + log.Debugf("id=%v append to c.apiQueue call=%v", c.ID, call) c.apiQueue = append(c.apiQueue, call) return nil } @@ -335,7 +335,7 @@ func (c *Client) selectRemote(streamId, video string, audio bool) error { // send cached cmd if len(c.apiQueue) > 0 { for _, cmd := range c.apiQueue { - log.Infof("c.sub.api.Send cmd=%v", cmd) + log.Debugf("id=%v c.sub.api.Send cmd=%v", c.ID, cmd) marshalled, err := json.Marshal(cmd) if err != nil { continue @@ -350,14 +350,14 @@ func (c *Client) selectRemote(streamId, video string, audio bool) error { } // send this cmd - log.Infof("c.sub.api.Send call=%v", call) + log.Debugf("id=%v c.sub.api.Send call=%v", c.ID, call) marshalled, err := json.Marshal(call) if err != nil { return err } err = c.sub.api.Send(marshalled) if err != nil { - log.Errorf("err=%v", err) + log.Errorf("id=%v err=%v", c.ID, err) } return err } @@ -368,7 +368,7 @@ func (c *Client) UnSubscribeAll() { m := c.remoteStreamId c.streamLock.RUnlock() for streamId := range m { - log.Infof("UnSubscribe remote streamid=%v", streamId) + log.Debugf("id=%v UnSubscribe remote streamid=%v", c.ID, streamId) c.selectRemote(streamId, "none", false) } } @@ -379,7 +379,7 @@ func (c *Client) SubscribeAll(video string, audio bool) { m := c.remoteStreamId c.streamLock.RUnlock() for streamId := range m { - log.Infof("Subscribe remote streamid=%v", streamId) + log.Debugf("id=%v Subscribe remote streamid=%v", c.ID, streamId) c.selectRemote(streamId, video, audio) } } @@ -389,18 +389,18 @@ func (c *Client) PublishWebm(file string) error { ext := filepath.Ext(file) switch ext { case ".webm": - c.producer = NewWebMProducer(file, 0) + c.producer = NewWebMProducer(c.ID, file, 0) default: return errInvalidFile } _, err := c.producer.AddTrack(c.pub.pc, "video") if err != nil { - log.Infof("err=%v", err) + log.Debugf("err=%v", err) return err } _, err = c.producer.AddTrack(c.pub.pc, "audio") if err != nil { - log.Infof("err=%v", err) + log.Debugf("err=%v", err) return err } c.producer.Start() diff --git a/conf.go b/conf.go index 7ce1947f..37bb28ac 100644 --- a/conf.go +++ b/conf.go @@ -14,6 +14,7 @@ type Config struct { // WebRTCTransportConfig represents configuration options type WebRTCTransportConfig struct { + VideoMime string Configuration webrtc.Configuration Setting webrtc.SettingEngine } diff --git a/engine.go b/engine.go index a4019f14..1fceb898 100644 --- a/engine.go +++ b/engine.go @@ -17,13 +17,13 @@ type Engine struct { } // NewEngine create a engine -func NewEngine(addr string, cfg Config) *Engine { +func NewEngine(cfg Config) *Engine { e := &Engine{ clients: make(map[string]map[string]*Client), } e.cfg = cfg - fixByFile := []string{"asm_amd64.s", "proc.go", "icegatherer.go"} + fixByFile := []string{"asm_amd64.s", "proc.go", "icegatherer.go", "client.go", "signal.go"} fixByFunc := []string{} log.Init(cfg.Log.Level, fixByFile, fixByFunc) diff --git a/example/ion-sfu-load-tool/main.go b/example/ion-sfu-load-tool/main.go new file mode 100644 index 00000000..17e50bc5 --- /dev/null +++ b/example/ion-sfu-load-tool/main.go @@ -0,0 +1,98 @@ +package main + +import ( + "flag" + "fmt" + "time" + + log "github.com/pion/ion-log" + sdk "github.com/pion/ion-sdk-go" + "github.com/pion/webrtc/v3" +) + +func run(e *sdk.Engine, addr, session, file, role string, total, duration, cycle int) { + log.Infof("run session=%v file=%v role=%v total=%v duration=%v cycle=%v\n", session, file, role, total, duration, cycle) + timer := time.NewTimer(time.Duration(duration) * time.Second) + + go e.Stats(3) + for i := 0; i < total; i++ { + switch role { + case "pubsub": + cid := fmt.Sprintf("%s_pubsub_%d", session, i) + log.Infof("AddClient session=%v clientid=%v", session, cid) + c := e.AddClient(addr, session, cid) + if c == nil { + log.Errorf("c==nil") + break + } + c.Join(session) + c.PublishWebm(file) + case "sub": + cid := fmt.Sprintf("%s_sub_%d", session, i) + log.Infof("AddClient session=%v clientid=%v", session, cid) + c := e.AddClient(addr, session, cid) + if c == nil { + log.Errorf("c==nil") + break + } + c.Join(session) + default: + log.Errorf("invalid role! should be pubsub/sub") + } + + time.Sleep(time.Millisecond * time.Duration(cycle)) + } + + select { + case <-timer.C: + } +} + +func main() { + //init log + fixByFile := []string{"asm_amd64.s", "proc.go", "icegatherer.go"} + fixByFunc := []string{"AddProducer", "NewClient"} + + //get args + var session string + var addr, file string + var total, cycle, duration int + var role string + var loglevel string + // var video, audio bool + + flag.StringVar(&file, "file", "./file.webm", "Path to the file media") + flag.StringVar(&addr, "addr", "localhost:50051", "Ion-sfu grpc addr") + flag.StringVar(&session, "session", "test", "join session name") + flag.IntVar(&total, "clients", 1, "Number of clients to start") + flag.IntVar(&cycle, "cycle", 1000, "Run new client cycle in ms") + flag.IntVar(&duration, "duration", 3600, "Running duration in sencond") + flag.StringVar(&role, "role", "pubsub", "Run as pubsub/sub") + flag.StringVar(&loglevel, "log", "info", "Log level") + // flag.BoolVar(&video, "video", true, "Publish video stream from webm file") + // flag.BoolVar(&audio, "audio", true, "Publish audio stream from webm file") + flag.Parse() + log.Init(loglevel, fixByFile, fixByFunc) + + se := webrtc.SettingEngine{} + se.SetEphemeralUDPPortRange(10000, 15000) + webrtcCfg := webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + webrtc.ICEServer{ + URLs: []string{"stun:stun.stunprotocol.org:3478", "stun:stun.l.google.com:19302"}, + }, + }, + } + config := sdk.Config{ + Log: log.Config{ + Level: loglevel, + }, + WebRTC: sdk.WebRTCTransportConfig{ + VideoMime: "video/vp8", + Setting: se, + Configuration: webrtcCfg, + }, + } + e := sdk.NewEngine(config) + run(e, addr, session, file, role, total, duration, cycle) +} diff --git a/go.mod b/go.mod index 4df64b2a..d30f93db 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/pion/ice/v2 v2.0.15 github.com/pion/ion-log v1.0.0 github.com/pion/ion-sfu v1.9.1-0.20210212123851-331f8471352c - github.com/pion/webrtc/v3 v3.0.4 + github.com/pion/sdp/v3 v3.0.4 + github.com/pion/webrtc/v3 v3.0.11 google.golang.org/grpc v1.35.0 ) diff --git a/go.sum b/go.sum index c6a96d45..54ee5bc3 100644 --- a/go.sum +++ b/go.sum @@ -136,7 +136,6 @@ github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OI github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.5 h1:kxhtnfFVi+rYdOALN0B3k9UT86zVJKfBimRaciULW4I= github.com/google/uuid v1.1.5/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -265,9 +264,10 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pion/datachannel v1.4.21 h1:3ZvhNyfmxsAqltQrApLPQMhSFNA+aT87RqyCq4OXmf0= github.com/pion/datachannel v1.4.21/go.mod h1:oiNyP4gHx2DIwRzX/MFyH0Rz/Gz05OgBlayAI2hAWjg= -github.com/pion/dtls/v2 v2.0.4 h1:WuUcqi6oYMu/noNTz92QrF1DaFj4eXbhQ6dzaaAwOiI= github.com/pion/dtls/v2 v2.0.4/go.mod h1:qAkFscX0ZHoI1E07RfYPoRw3manThveu+mlTDdOxoGI= -github.com/pion/ice/v2 v2.0.14 h1:FxXxauyykf89SWAtkQCfnHkno6G8+bhRkNguSh9zU+4= +github.com/pion/dtls/v2 v2.0.7 h1:PNcUs/G1l9hb4jzMEorgFMxIBdp7fRN4LIApOTMtCYs= +github.com/pion/dtls/v2 v2.0.7/go.mod h1:QuDII+8FVvk9Dp5t5vYIMTo7hh7uBkra+8QIm7QGm10= +github.com/pion/ice v0.7.18 h1:KbAWlzWRUdX9SmehBh3gYpIFsirjhSQsCw6K2MjYMK0= github.com/pion/ice/v2 v2.0.14/go.mod h1:wqaUbOq5ObDNU5ox1hRsEst0rWfsKuH1zXjQFEWiZwM= github.com/pion/ice/v2 v2.0.15 h1:KZrwa2ciL9od8+TUVJiYTNsCW9J5lktBjGwW1MacEnQ= github.com/pion/ice/v2 v2.0.15/go.mod h1:ZIiVGevpgAxF/cXiIVmuIUtCb3Xs4gCzCbXB6+nFkSI= @@ -275,8 +275,8 @@ github.com/pion/interceptor v0.0.9 h1:fk5hTdyLO3KURQsf/+RjMpEm4NE3yeTY9Kh97b5Bvw github.com/pion/interceptor v0.0.9/go.mod h1:dHgEP5dtxOTf21MObuBAjJeAayPxLUAZjerGH8Xr07c= github.com/pion/ion-log v1.0.0 h1:2lJLImCmfCWCR38hLWsjQfBWe6NFz/htbqiYHwvOP/Q= github.com/pion/ion-log v1.0.0/go.mod h1:jwcla9KoB9bB/4FxYDSRJPcPYSLp5XiUUMnOLaqwl4E= -github.com/pion/ion-sfu v1.7.8 h1:5DBi3m1eZVOQBcJiV7GPOWLHY56LpDhIBRa8ZmMhz/k= -github.com/pion/ion-sfu v1.7.8/go.mod h1:3VcBm7RSsSzB09HH4g5ka63pVvx7UHI4vL2XNFt1jA8= +github.com/pion/ion-sfu v1.9.0 h1:3bgaTS1AyGvBmrU8R8nt/qQRguhOv3aGgaTEKUFAoUQ= +github.com/pion/ion-sfu v1.9.0/go.mod h1:MKDh4JSLpBA/hFeYAYALb6nYpe6Ruknd3pmxPMbdpM8= github.com/pion/ion-sfu v1.9.1-0.20210212123851-331f8471352c h1:+tg7My3x1JEKhDWNo+o3aza7bBlkKI2v3LdhZnIrCeA= github.com/pion/ion-sfu v1.9.1-0.20210212123851-331f8471352c/go.mod h1:MKDh4JSLpBA/hFeYAYALb6nYpe6Ruknd3pmxPMbdpM8= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= @@ -292,7 +292,7 @@ github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko github.com/pion/sctp v1.7.10/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0= github.com/pion/sctp v1.7.11 h1:UCnj7MsobLKLuP/Hh+JMiI/6W5Bs/VF45lWKgHFjSIE= github.com/pion/sctp v1.7.11/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0= -github.com/pion/sdp/v3 v3.0.3/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk= +github.com/pion/sdp v1.3.0 h1:21lpgEILHyolpsIrbCBagZaAPj4o057cFjzaFebkVOs= github.com/pion/sdp/v3 v3.0.4 h1:2Kf+dgrzJflNCSw3TV5v2VLeI0s/qkzy2r5jlR0wzf8= github.com/pion/sdp/v3 v3.0.4/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk= github.com/pion/srtp/v2 v2.0.1 h1:kgfh65ob3EcnFYA4kUBvU/menCp9u7qaJLXwWgpobzs= @@ -311,9 +311,9 @@ github.com/pion/turn/v2 v2.0.5/go.mod h1:APg43CFyt/14Uy7heYUOGWdkem/Wu4PhCO/bjyr github.com/pion/udp v0.1.0 h1:uGxQsNyrqG3GLINv36Ff60covYmfrLoxzwnCsIYspXI= github.com/pion/udp v0.1.0/go.mod h1:BPELIjbwE9PRbd/zxI/KYBnbo7B6+oA6YuEaNE8lths= github.com/pion/webrtc v1.2.0 h1:3LGGPQEMacwG2hcDfhdvwQPz315gvjZXOfY4vaF4+I4= -github.com/pion/webrtc/v3 v3.0.3/go.mod h1:DZonLDfkjMlsY/IGixAbcq8izHu0zJIk04DYx51KUvk= -github.com/pion/webrtc/v3 v3.0.4 h1:Tiw3H9fpfcwkvaxonB+Gv1DG9tmgYBQaM1vBagDHP40= github.com/pion/webrtc/v3 v3.0.4/go.mod h1:1TmFSLpPYFTFXFHPtoq9eGP1ASTa9LC6FBh7sUY8cd4= +github.com/pion/webrtc/v3 v3.0.11 h1:RIxUbkWJn6YvLVmHZSzc30yQLyME5vGDkpqrV7EHxz4= +github.com/pion/webrtc/v3 v3.0.11/go.mod h1:WEvXneGTeqNmiR59v5jTsxMc4yXQyOQcRsrdAbNwSEU= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -466,8 +466,9 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201201195509-5d6afe98e0b7/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210119194325-5f4716e94777 h1:003p0dJM77cxMSyCPFphvZf/Y5/NXf5fzg6ufd1/Oew= +golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/mediaengine.go b/mediaengine.go new file mode 100644 index 00000000..fb7a43c8 --- /dev/null +++ b/mediaengine.go @@ -0,0 +1,107 @@ +package engine + +import ( + "github.com/pion/sdp/v3" + "github.com/pion/webrtc/v3" +) + +const ( + mimeTypeH264 = "video/h264" + mimeTypeOpus = "audio/opus" + mimeTypeVP8 = "video/vp8" + mimeTypeVP9 = "video/vp9" +) + +var ( + videoRTCPFeedback = []webrtc.RTCPFeedback{{"goog-remb", ""}, {"ccm", "fir"}, {"nack", ""}, {"nack", "pli"}} + videoRTPCodecParameters = []webrtc.RTPCodecParameters{ + { + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: mimeTypeVP8, ClockRate: 90000, RTCPFeedback: videoRTCPFeedback}, + PayloadType: 96, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: mimeTypeVP9, ClockRate: 90000, SDPFmtpLine: "profile-id=0", RTCPFeedback: videoRTCPFeedback}, + PayloadType: 98, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: mimeTypeVP9, ClockRate: 90000, SDPFmtpLine: "profile-id=1", RTCPFeedback: videoRTCPFeedback}, + PayloadType: 100, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: mimeTypeH264, ClockRate: 90000, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f", RTCPFeedback: videoRTCPFeedback}, + PayloadType: 102, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: mimeTypeH264, ClockRate: 90000, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f", RTCPFeedback: videoRTCPFeedback}, + PayloadType: 127, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: mimeTypeH264, ClockRate: 90000, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", RTCPFeedback: videoRTCPFeedback}, + PayloadType: 125, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: mimeTypeH264, ClockRate: 90000, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f", RTCPFeedback: videoRTCPFeedback}, + PayloadType: 108, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: mimeTypeH264, ClockRate: 90000, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=640032", RTCPFeedback: videoRTCPFeedback}, + PayloadType: 123, + }, + } +) + +const frameMarking = "urn:ietf:params:rtp-hdrext:framemarking" + +func getPublisherMediaEngine(mime string) (*webrtc.MediaEngine, error) { + me := &webrtc.MediaEngine{} + if err := me.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: mimeTypeOpus, ClockRate: 48000, Channels: 2, SDPFmtpLine: "minptime=10;useinbandfec=1", RTCPFeedback: nil}, + PayloadType: 111, + }, webrtc.RTPCodecTypeAudio); err != nil { + return nil, err + } + + for _, codec := range videoRTPCodecParameters { + // register all if mime == "" + if mime == "" { + if err := me.RegisterCodec(codec, webrtc.RTPCodecTypeVideo); err != nil { + return nil, err + } + continue + } + // register the chosen mime + if codec.RTPCodecCapability.MimeType == mime { + if err := me.RegisterCodec(codec, webrtc.RTPCodecTypeVideo); err != nil { + return nil, err + } + } + } + + for _, extension := range []string{ + sdp.SDESMidURI, + sdp.SDESRTPStreamIDURI, + sdp.TransportCCURI, + frameMarking, + } { + if err := me.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeVideo); err != nil { + return nil, err + } + } + for _, extension := range []string{ + sdp.SDESMidURI, + sdp.SDESRTPStreamIDURI, + sdp.AudioLevelURI, + } { + if err := me.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeAudio); err != nil { + return nil, err + } + } + + return me, nil +} + +func getSubscriberMediaEngine() (*webrtc.MediaEngine, error) { + me := &webrtc.MediaEngine{} + me.RegisterDefaultCodecs() + return me, nil +} diff --git a/signal.go b/signal.go index 4dd304d8..5aae6bb6 100644 --- a/signal.go +++ b/signal.go @@ -5,6 +5,7 @@ import ( "encoding/json" "io" "sync" + "time" log "github.com/pion/ion-log" pb "github.com/pion/ion-sfu/cmd/signal/grpc/proto" @@ -16,6 +17,7 @@ import ( // Signal is a wrapper of grpc type Signal struct { + id string client pb.SFUClient stream pb.SFU_SignalClient @@ -31,15 +33,18 @@ type Signal struct { } // NewSignal create a grpc signaler -func NewSignal(addr string) *Signal { +func NewSignal(addr, id string) *Signal { s := &Signal{} - log.Infof("Connecting to sfu: %s", addr) + s.id = id // Set up a connection to the sfu server. - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure()) if err != nil { - log.Errorf("did not connect: %v", err) + log.Errorf("[%v] Connecting to sfu:%s failed: %v", s.id, addr, err) return nil } + log.Infof("[%v] Connecting to sfu ok: %s", s.id, addr) s.ctx, s.cancel = context.WithCancel(context.Background()) s.client = pb.NewSFUClient(conn) @@ -67,9 +72,9 @@ func (s *Signal) onSignalHandle() error { res, err := s.stream.Recv() if err != nil { if err == io.EOF { - log.Infof("WebRTC Transport Closed") + log.Infof("[%v] WebRTC Transport Closed", s.id) if err := s.stream.CloseSend(); err != nil { - log.Errorf("error sending close: %s", err) + log.Errorf("[%v] error sending close: %s", s.id, err) } return err } @@ -77,55 +82,55 @@ func (s *Signal) onSignalHandle() error { errStatus, _ := status.FromError(err) if errStatus.Code() == codes.Canceled { if err := s.stream.CloseSend(); err != nil { - log.Errorf("error sending close: %s", err) + log.Errorf("[%v] error sending close: %s", s.id, err) } return err } - log.Errorf("Error receiving signal response: %v", err) + log.Errorf("[%v] Error receiving signal response: %v", s.id, err) return err } switch payload := res.Payload.(type) { case *pb.SignalReply_Join: // Set the remote SessionDescription - log.Infof("[join] got answer: %s", payload.Join.Description) + log.Infof("[%v] [join] got answer: %s", s.id, payload.Join.Description) var sdp webrtc.SessionDescription err := json.Unmarshal(payload.Join.Description, &sdp) if err != nil { - log.Errorf("[join] sdp unmarshal error: %v", err) + log.Errorf("[%v] [join] sdp unmarshal error: %v", s.id, err) return err } if err = s.OnSetRemoteSDP(sdp); err != nil { - log.Errorf("[join] s.OnSetRemoteSDP error %s", err) + log.Errorf("[%v] [join] s.OnSetRemoteSDP error %s", s.id, err) return err } case *pb.SignalReply_Description: var sdp webrtc.SessionDescription err := json.Unmarshal(payload.Description, &sdp) if err != nil { - log.Errorf("[description] sdp unmarshal error: %v", err) + log.Errorf("[%v] [description] sdp unmarshal error: %v", s.id, err) return err } if sdp.Type == webrtc.SDPTypeOffer { - log.Infof("[description] got offer call s.OnNegotiate sdp=%+v", sdp) + log.Infof("[%v] [description] got offer call s.OnNegotiate sdp=%+v", s.id, sdp) err := s.OnNegotiate(sdp) if err != nil { log.Errorf("err=%v", err) } } else if sdp.Type == webrtc.SDPTypeAnswer { - log.Infof("[description] got answer call s.OnSetRemoteSDP sdp=%+v", sdp) + log.Infof("[%v] [description] got answer call s.OnSetRemoteSDP sdp=%+v", s.id, sdp) err = s.OnSetRemoteSDP(sdp) if err != nil { - log.Errorf("[description] s.OnSetRemoteSDP err=%s", err) + log.Errorf("[%v] [description] s.OnSetRemoteSDP err=%s", s.id, err) } } case *pb.SignalReply_Trickle: var candidate webrtc.ICECandidateInit _ = json.Unmarshal([]byte(payload.Trickle.Init), &candidate) - log.Infof("[trickle] type=%v candidate=%v", payload.Trickle.Target, candidate) + log.Infof("[%v] [trickle] type=%v candidate=%v", s.id, payload.Trickle.Target, candidate) s.OnTrickle(candidate, int(payload.Trickle.Target)) default: // log.Errorf("Unknow signal type!!!!%v", payload) @@ -134,7 +139,7 @@ func (s *Signal) onSignalHandle() error { } func (s *Signal) Join(sid, uid string, offer webrtc.SessionDescription) error { - log.Infof("[Signal.Join] sid=%v uid=%v, offer=%v", sid, uid, offer) + log.Infof("[%v] [Signal.Join] sid=%v uid=%v, offer=%v", s.id, sid, uid, offer) marshalled, err := json.Marshal(offer) if err != nil { return err @@ -154,13 +159,13 @@ func (s *Signal) Join(sid, uid string, offer webrtc.SessionDescription) error { ) s.Unlock() if err != nil { - log.Errorf("err=%v", err) + log.Errorf("[%v] err=%v", s.id, err) } return err } func (s *Signal) Trickle(candidate *webrtc.ICECandidate, target int) { - log.Infof("[Signal.Trickle] candidate=%v target=%v", candidate, target) + log.Infof("[%v] [Signal.Trickle] candidate=%v target=%v", s.id, candidate, target) bytes, err := json.Marshal(candidate.ToJSON()) if err != nil { log.Errorf("err=%v", err) @@ -178,15 +183,15 @@ func (s *Signal) Trickle(candidate *webrtc.ICECandidate, target int) { }) s.Unlock() if err != nil { - log.Errorf("err=%v", err) + log.Errorf("[%v] err=%v", s.id, err) } } func (s *Signal) Offer(sdp webrtc.SessionDescription) { - log.Infof("[Signal.Offer] sdp=%v", sdp) + log.Infof("[%v] [Signal.Offer] sdp=%v", s.id, sdp) marshalled, err := json.Marshal(sdp) if err != nil { - log.Errorf("err=%v", err) + log.Errorf("[%v] err=%v", s.id, err) return } go s.onSignalHandleOnce() @@ -200,12 +205,12 @@ func (s *Signal) Offer(sdp webrtc.SessionDescription) { ) s.Unlock() if err != nil { - log.Errorf("err=%v", err) + log.Errorf("[%v] err=%v", s.id, err) } } func (s *Signal) Answer(sdp webrtc.SessionDescription) { - log.Infof("[Signal.Answer] sdp=%v", sdp) + log.Infof("[%v] [Signal.Answer] sdp=%v", s.id, sdp) marshalled, err := json.Marshal(sdp) if err != nil { log.Errorf("err=%v", err) @@ -221,12 +226,12 @@ func (s *Signal) Answer(sdp webrtc.SessionDescription) { ) s.Unlock() if err != nil { - log.Errorf("err=%v", err) + log.Errorf("[%v] err=%v", s.id, err) } } func (s *Signal) Close() { - log.Infof("[Signal.Close]") + log.Infof("[%v] [Signal.Close]", s.id) s.cancel() go s.onSignalHandleOnce() } diff --git a/transport.go b/transport.go index 71827631..a877eba1 100644 --- a/transport.go +++ b/transport.go @@ -19,17 +19,22 @@ type Transport struct { // NewTransport create a transport func NewTransport(role int, signal *Signal, cfg WebRTCTransportConfig) *Transport { - var err error t := &Transport{ role: role, signal: signal, config: cfg, } - me := &webrtc.MediaEngine{} - _ = me.RegisterDefaultCodecs() + var err error + var api *webrtc.API + var me *webrtc.MediaEngine cfg.Setting.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled) - api := webrtc.NewAPI(webrtc.WithMediaEngine(me), webrtc.WithSettingEngine(cfg.Setting)) + if role == PUBLISHER { + me, err = getPublisherMediaEngine(cfg.VideoMime) + } else { + me, err = getSubscriberMediaEngine() + } + api = webrtc.NewAPI(webrtc.WithMediaEngine(me), webrtc.WithSettingEngine(cfg.Setting)) t.pc, err = api.NewPeerConnection(cfg.Configuration) if err != nil { diff --git a/webmproducer.go b/webmproducer.go index bbbd11dd..2cadbcfe 100644 --- a/webmproducer.go +++ b/webmproducer.go @@ -34,10 +34,11 @@ type WebMProducer struct { videoCodec string file *os.File sendByte int + id string } // NewWebMProducer new a WebMProducer -func NewWebMProducer(name string, offset int) *WebMProducer { +func NewWebMProducer(id, name string, offset int) *WebMProducer { r, err := os.Open(name) if err != nil { log.Errorf("unable to open file %s", name) @@ -51,6 +52,7 @@ func NewWebMProducer(name string, offset int) *WebMProducer { } p := &WebMProducer{ + id: id, name: name, offsetSeconds: offset, reader: reader, @@ -234,9 +236,9 @@ func (t *WebMProducer) readLoop() { // Send samples if ivfErr := track.track.WriteSample(media.Sample{Data: pck.Data, Duration: time.Millisecond * 20}); ivfErr != nil { - log.Infof("Track write error=%v", ivfErr) + log.Errorf("Track write error=%v", ivfErr) } else { - // log.Infof("mime=%v kind=%v streamid=%v len=%v", track.track.Codec().MimeType, track.track.Kind(), track.track.StreamID(), len(pck.Data)) + log.Tracef("id=%v mime=%v kind=%v streamid=%v len=%v", t.id, track.track.Codec().MimeType, track.track.Kind(), track.track.StreamID(), len(pck.Data)) t.sendByte += len(pck.Data) } }