Skip to content

Commit

Permalink
Merge pull request #8 from pion/update
Browse files Browse the repository at this point in the history
feat( more than 14*14 load testing)
  • Loading branch information
adwpc authored Feb 17, 2021
2 parents d19a33a + baa3e85 commit 0ba66bf
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 83 deletions.
76 changes: 38 additions & 38 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Client struct {
func NewClient(addr, id string, cfg WebRTCTransportConfig) *Client {
c := &Client{
ID: id,
signal: NewSignal(addr),
signal: NewSignal(addr, id),
cfg: cfg,
notify: make(chan struct{}),
remoteStreamId: make(map[string]string),
Expand All @@ -80,17 +80,17 @@ func NewClient(addr, id string, cfg WebRTCTransportConfig) *Client {
func (c *Client) SetRemoteSDP(sdp webrtc.SessionDescription) error {
err := c.pub.pc.SetRemoteDescription(sdp)
if err != nil {
log.Errorf("err=%v", err)
log.Errorf("id=%v err=%v", c.ID, err)
return err
}

// it's safe to add cand now after SetRemoteDescription
if len(c.pub.RecvCandidates) > 0 {
for _, candidate := range c.pub.RecvCandidates {
log.Infof("c.pub.pc.AddICECandidate candidate=%v", candidate)
log.Debugf("id=%v c.pub.pc.AddICECandidate candidate=%v", c.ID, candidate)
err = c.pub.pc.AddICECandidate(candidate)
if err != nil {
log.Errorf("c.pub.pc.AddICECandidate err=%v", err)
log.Errorf("id=%v c.pub.pc.AddICECandidate err=%v", c.ID, err)
}
}
c.pub.RecvCandidates = []webrtc.ICECandidateInit{}
Expand All @@ -99,7 +99,7 @@ func (c *Client) SetRemoteSDP(sdp webrtc.SessionDescription) error {
// it's safe to send cand now after join ok
if len(c.pub.SendCandidates) > 0 {
for _, cand := range c.pub.SendCandidates {
log.Infof("sending c.pub.SendCandidates cand=%v", cand)
log.Debugf("id=%v sending c.pub.SendCandidates cand=%v", c.ID, cand)
c.signal.Trickle(cand, PUBLISHER)
}
c.pub.SendCandidates = []*webrtc.ICECandidate{}
Expand All @@ -109,12 +109,12 @@ func (c *Client) SetRemoteSDP(sdp webrtc.SessionDescription) error {

// Join client join a session
func (c *Client) Join(sid string) error {
log.Infof("[Client.Join] sid=%v", sid)
log.Debugf("[Client.Join] sid=%v id=%v", sid, c.ID)
c.sub.pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
log.Infof("[c.sub.pc.OnTrack] got track streamId=%v kind=%v ssrc=%v ", track.StreamID(), track.Kind(), track.SSRC())
log.Debugf("[c.sub.pc.OnTrack] got track streamId=%v kind=%v ssrc=%v ", track.StreamID(), track.Kind(), track.SSRC())
c.streamLock.Lock()
c.remoteStreamId[track.StreamID()] = track.StreamID()
log.Infof("c.remoteStreamId=%+v", c.remoteStreamId)
log.Debugf("id=%v len(c.remoteStreamId)=%+v", c.ID, len(c.remoteStreamId))
c.streamLock.Unlock()
// user define
if c.OnTrack != nil {
Expand All @@ -129,10 +129,10 @@ func (c *Client) Join(sid string) error {
pkt, _, err := track.ReadRTP()
if err != nil {
if err == io.EOF {
log.Errorf("track.ReadRTP err=%v", err)
log.Errorf("id=%v track.ReadRTP err=%v", c.ID, err)
return
}
log.Errorf("Error reading track rtp %s", err)
log.Errorf("id=%v Error reading track rtp %s", c.ID, err)
continue
}
c.recvByte += len(pkt.Raw)
Expand All @@ -142,22 +142,22 @@ func (c *Client) Join(sid string) error {
})

c.sub.pc.OnDataChannel(func(dc *webrtc.DataChannel) {
log.Infof("[c.sub.pc.OnDataChannel] got dc %v", dc.Label())
log.Debugf("id=%v [c.sub.pc.OnDataChannel] got dc %v", c.ID, dc.Label())
if dc.Label() == API_CHANNEL {
log.Infof("got dc %v", dc.Label())
log.Debugf("%v got dc %v", c.ID, dc.Label())
c.sub.api = dc
// send cmd after open
c.sub.api.OnOpen(func() {
if len(c.apiQueue) > 0 {
for _, cmd := range c.apiQueue {
log.Infof("c.sub.api.OnOpen send cmd=%v", cmd)
log.Debugf("%v c.sub.api.OnOpen send cmd=%v", c.ID, cmd)
marshalled, err := json.Marshal(cmd)
if err != nil {
continue
}
err = c.sub.api.Send(marshalled)
if err != nil {
log.Errorf("err=%v", err)
log.Errorf("id=%v err=%v", c.ID, err)
}
time.Sleep(time.Millisecond * 10)
}
Expand All @@ -166,7 +166,7 @@ func (c *Client) Join(sid string) error {
})
return
}
log.Infof("got dc %v", dc.Label())
log.Debugf("%v got dc %v", c.ID, dc.Label())
if c.OnDataChannel != nil {
c.OnDataChannel(dc)
}
Expand Down Expand Up @@ -212,7 +212,7 @@ func (c *Client) UnPublish(t *webrtc.RTPTransceiver) error {

// Close client close
func (c *Client) Close() {
log.Infof("c=%v", c)
log.Debugf("id=%v", c.ID)
close(c.notify)
if c.pub != nil {
c.pub.pc.Close()
Expand All @@ -224,13 +224,13 @@ func (c *Client) Close() {

// CreateDataChannel create a custom datachannel
func (c *Client) CreateDataChannel(label string) (*webrtc.DataChannel, error) {
log.Infof("c=%v", c)
log.Debugf("id=%v CreateDataChannel %v", c.ID, label)
return c.pub.pc.CreateDataChannel(label, &webrtc.DataChannelInit{})
}

// Trickle receive candidate from sfu and add to pc
func (c *Client) Trickle(candidate webrtc.ICECandidateInit, target int) {
log.Infof("candidate=%v target=%v", candidate, target)
log.Debugf("id=%v candidate=%v target=%v", c.ID, candidate, target)
var t *Transport
if target == SUBSCRIBER {
t = c.sub
Expand All @@ -243,26 +243,26 @@ func (c *Client) Trickle(candidate webrtc.ICECandidateInit, target int) {
} else {
err := t.pc.AddICECandidate(candidate)
if err != nil {
log.Errorf("err=%v", err)
log.Errorf("id=%v err=%v", c.ID, err)
}
}

}

// Negotiate sub negotiate
func (c *Client) Negotiate(sdp webrtc.SessionDescription) error {
log.Infof("Negotiate sdp=%v", sdp)
log.Debugf("id=%v Negotiate sdp=%v", c.ID, sdp)
// 1.sub set remote sdp
err := c.sub.pc.SetRemoteDescription(sdp)
if err != nil {
log.Errorf("Negotiate c.sub.pc.SetRemoteDescription err=%v", err)
log.Errorf("id=%v Negotiate c.sub.pc.SetRemoteDescription err=%v", c.ID, err)
return err
}

// 2. safe to send candiate to sfu after join ok
if len(c.sub.SendCandidates) > 0 {
for _, cand := range c.sub.SendCandidates {
log.Infof("send sub.SendCandidates c.signal.Trickle cand=%v", cand)
log.Debugf("id=%v send sub.SendCandidates c.ID, c.signal.Trickle cand=%v", c.ID, cand)
c.signal.Trickle(cand, SUBSCRIBER)
}
c.sub.SendCandidates = []*webrtc.ICECandidate{}
Expand All @@ -271,7 +271,7 @@ func (c *Client) Negotiate(sdp webrtc.SessionDescription) error {
// 3. safe to add candidate after SetRemoteDescription
if len(c.sub.RecvCandidates) > 0 {
for _, candidate := range c.sub.RecvCandidates {
log.Infof("Negotiate c.sub.pc.AddICECandidate candidate=%v", candidate)
log.Debugf("id=%v Negotiate c.sub.pc.AddICECandidate candidate=%v", c.ID, candidate)
_ = c.sub.pc.AddICECandidate(candidate)
}
c.sub.RecvCandidates = []webrtc.ICECandidateInit{}
Expand All @@ -280,14 +280,14 @@ func (c *Client) Negotiate(sdp webrtc.SessionDescription) error {
// 4. create answer after add ice candidate
answer, err := c.sub.pc.CreateAnswer(nil)
if err != nil {
log.Errorf("err=%v", err)
log.Errorf("id=%v err=%v", c.ID, err)
return err
}

// 5. set local sdp(answer)
err = c.sub.pc.SetLocalDescription(answer)
if err != nil {
log.Errorf("err=%v", err)
log.Errorf("id=%v err=%v", c.ID, err)
return err
}

Expand All @@ -302,23 +302,23 @@ func (c *Client) OnNegotiationNeeded() {
// 1. pub create offer
offer, err := c.pub.pc.CreateOffer(nil)
if err != nil {
log.Errorf("err=%v", err)
log.Errorf("id=%v err=%v", c.ID, err)
}

// 2. pub set local sdp(offer)
err = c.pub.pc.SetLocalDescription(offer)
if err != nil {
log.Errorf("err=%v", err)
log.Errorf("id=%v err=%v", c.ID, err)
}

log.Infof("OnNegotiationNeeded!! c.pub.pc.CreateOffer and send offer=%v", offer)
log.Debugf("id=%v OnNegotiationNeeded!! c.pub.pc.CreateOffer and send offer=%v", c.ID, offer)
//3. send offer to sfu
c.signal.Offer(offer)
}

// selectRemote select remote video/audio
func (c *Client) selectRemote(streamId, video string, audio bool) error {
log.Infof("streamId=%v video=%v audio=%v", streamId, video, audio)
log.Debugf("id=%v streamId=%v video=%v audio=%v", c.ID, streamId, video, audio)
call := Call{
StreamID: streamId,
Video: video,
Expand All @@ -327,15 +327,15 @@ func (c *Client) selectRemote(streamId, video string, audio bool) error {

// cache cmd when dc not ready
if c.sub.api == nil || c.sub.api.ReadyState() != webrtc.DataChannelStateOpen {
log.Infof("append to c.apiQueue call=%v", call)
log.Debugf("id=%v append to c.apiQueue call=%v", c.ID, call)
c.apiQueue = append(c.apiQueue, call)
return nil
}

// send cached cmd
if len(c.apiQueue) > 0 {
for _, cmd := range c.apiQueue {
log.Infof("c.sub.api.Send cmd=%v", cmd)
log.Debugf("id=%v c.sub.api.Send cmd=%v", c.ID, cmd)
marshalled, err := json.Marshal(cmd)
if err != nil {
continue
Expand All @@ -350,14 +350,14 @@ func (c *Client) selectRemote(streamId, video string, audio bool) error {
}

// send this cmd
log.Infof("c.sub.api.Send call=%v", call)
log.Debugf("id=%v c.sub.api.Send call=%v", c.ID, call)
marshalled, err := json.Marshal(call)
if err != nil {
return err
}
err = c.sub.api.Send(marshalled)
if err != nil {
log.Errorf("err=%v", err)
log.Errorf("id=%v err=%v", c.ID, err)
}
return err
}
Expand All @@ -368,7 +368,7 @@ func (c *Client) UnSubscribeAll() {
m := c.remoteStreamId
c.streamLock.RUnlock()
for streamId := range m {
log.Infof("UnSubscribe remote streamid=%v", streamId)
log.Debugf("id=%v UnSubscribe remote streamid=%v", c.ID, streamId)
c.selectRemote(streamId, "none", false)
}
}
Expand All @@ -379,7 +379,7 @@ func (c *Client) SubscribeAll(video string, audio bool) {
m := c.remoteStreamId
c.streamLock.RUnlock()
for streamId := range m {
log.Infof("Subscribe remote streamid=%v", streamId)
log.Debugf("id=%v Subscribe remote streamid=%v", c.ID, streamId)
c.selectRemote(streamId, video, audio)
}
}
Expand All @@ -389,18 +389,18 @@ func (c *Client) PublishWebm(file string) error {
ext := filepath.Ext(file)
switch ext {
case ".webm":
c.producer = NewWebMProducer(file, 0)
c.producer = NewWebMProducer(c.ID, file, 0)
default:
return errInvalidFile
}
_, err := c.producer.AddTrack(c.pub.pc, "video")
if err != nil {
log.Infof("err=%v", err)
log.Debugf("err=%v", err)
return err
}
_, err = c.producer.AddTrack(c.pub.pc, "audio")
if err != nil {
log.Infof("err=%v", err)
log.Debugf("err=%v", err)
return err
}
c.producer.Start()
Expand Down
1 change: 1 addition & 0 deletions conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Config struct {

// WebRTCTransportConfig represents configuration options
type WebRTCTransportConfig struct {
VideoMime string
Configuration webrtc.Configuration
Setting webrtc.SettingEngine
}
4 changes: 2 additions & 2 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ type Engine struct {
}

// NewEngine create a engine
func NewEngine(addr string, cfg Config) *Engine {
func NewEngine(cfg Config) *Engine {
e := &Engine{
clients: make(map[string]map[string]*Client),
}
e.cfg = cfg

fixByFile := []string{"asm_amd64.s", "proc.go", "icegatherer.go"}
fixByFile := []string{"asm_amd64.s", "proc.go", "icegatherer.go", "client.go", "signal.go"}
fixByFunc := []string{}

log.Init(cfg.Log.Level, fixByFile, fixByFunc)
Expand Down
Loading

0 comments on commit 0ba66bf

Please sign in to comment.