Skip to content

Commit

Permalink
Fix: webserver error (#497)
Browse files Browse the repository at this point in the history
* check exsit pair before access on offer/answer

* fix test

* check client exist when delete connection

* fix delete process

* fix and add test about ReNegotiationAfterReceivingFirstOffer

* wait while not stable on sendoffercorutine

* controll offer answer order

* wait to stable on manage
  • Loading branch information
kannan-xiao4 authored Jun 14, 2021
1 parent 2de1fde commit a6fc9d3
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 66 deletions.
2 changes: 1 addition & 1 deletion WebApp/public/bidirectional/js/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export default class Peer extends EventTarget {
async loopResendOffer() {

while (true) {
if(this.waitingAnswer) {
if(this.pc != null && this.waitingAnswer) {
this.dispatchEvent(new CustomEvent('sendoffer', { detail: { connectionId: this.connectionId, sdp: this.pc.localDescription.sdp } }));
}
await this.sleep(this.interval);
Expand Down
10 changes: 7 additions & 3 deletions WebApp/public/bidirectional/js/sendvideo.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class SendVideo {

this.signaling.addEventListener('disconnect', async (e) => {
const data = e.detail;
if (_this.pc.connectionId == data.connectionId) {
if (_this.pc != null && _this.pc.connectionId == data.connectionId) {
_this.ondisconnect();
}
});
Expand All @@ -57,13 +57,17 @@ export class SendVideo {
this.signaling.addEventListener('answer', async (e) => {
const answer = e.detail;
const desc = new RTCSessionDescription({ sdp: answer.sdp, type: "answer" });
await _this.pc.onGotDescription(answer.connectionId, desc);
if (_this.pc != null) {
await _this.pc.onGotDescription(answer.connectionId, desc);
}
});

this.signaling.addEventListener('candidate', async (e) => {
const candidate = e.detail;
const iceCandidate = new RTCIceCandidate({ candidate: candidate.candidate, sdpMid: candidate.sdpMid, sdpMLineIndex: candidate.sdpMLineIndex });
await _this.pc.onGotCandidate(candidate.connectionId, iceCandidate);
if (_this.pc != null) {
await _this.pc.onGotCandidate(candidate.connectionId, iceCandidate);
}
});

await this.signaling.start();
Expand Down
9 changes: 6 additions & 3 deletions WebApp/public/js/signaling.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export default class Signaling extends EventTarget {
constructor() {
super();
this.interval = 3000;
this.running = false;
this.sleep = msec => new Promise(resolve => setTimeout(resolve, msec));
}

Expand All @@ -25,6 +26,7 @@ export default class Signaling extends EventTarget {
const createResponse = await fetch(this.url(''), { method: 'PUT', headers: this.headers() });
const session = await createResponse.json();
this.sessionId = session.sessionId;
this.running = true;

this.loopGetOffer();
this.loopGetAnswer();
Expand All @@ -34,7 +36,7 @@ export default class Signaling extends EventTarget {
async loopGetOffer() {
let lastTimeRequest = Date.now() - 30000;

while (true) {
while (this.running) {
const res = await this.getOffer(lastTimeRequest);
lastTimeRequest = Date.parse(res.headers.get('Date'));

Expand All @@ -54,7 +56,7 @@ export default class Signaling extends EventTarget {
// receive answer message from 30secs ago
let lastTimeRequest = Date.now() - 30000;

while (true) {
while (this.running) {
const res = await this.getAnswer(lastTimeRequest);
lastTimeRequest = Date.parse(res.headers.get('Date'));

Expand All @@ -74,7 +76,7 @@ export default class Signaling extends EventTarget {
// receive answer message from 30secs ago
let lastTimeRequest = Date.now() - 30000;

while (true) {
while (this.running) {
const res = await this.getCandidate(lastTimeRequest);
lastTimeRequest = Date.parse(res.headers.get('Date'));

Expand All @@ -93,6 +95,7 @@ export default class Signaling extends EventTarget {
}

async stop() {
this.running = false;
await fetch(this.url(''), { method: 'DELETE', headers: this.headers() });
this.sessionId = null;
}
Expand Down
34 changes: 21 additions & 13 deletions WebApp/src/signaling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,16 @@ router.delete('/connection', (req: Request, res: Response) => {
const pair = connectionPair.get(connectionId);
const otherSessionId = pair[0] == sessionId ? pair[1] : pair[0];
if (otherSessionId) {
clients.get(otherSessionId).delete(connectionId);
if (clients.has(otherSessionId)) {
clients.get(otherSessionId).delete(connectionId);
}
}
}
connectionPair.delete(connectionId);
offers.get(sessionId).delete(connectionId);
answers.get(sessionId).delete(connectionId);
candidates.get(sessionId).delete(connectionId);
res.sendStatus(200);
res.json({ connectionId: connectionId });
});

router.post('/offer', (req: Request, res: Response) => {
Expand All @@ -197,20 +199,21 @@ router.post('/offer', (req: Request, res: Response) => {
let polite = false;

if (res.app.get('isPrivate')) {
const pair = connectionPair.get(connectionId);
keySessionId = pair[0] == sessionId ? pair[1] : pair[0];
if (keySessionId == null) {
const err = new Error(`${connectionId}: This connection id is not ready other session.`);
console.log(err);
res.status(400).send({ error: err });
return;
if (connectionPair.has(connectionId)) {
const pair = connectionPair.get(connectionId);
keySessionId = pair[0] == sessionId ? pair[1] : pair[0];
if (keySessionId != null) {
polite = true;
const map = offers.get(keySessionId);
map.set(connectionId, new Offer(req.body.sdp, Date.now(), polite))
}
}
polite = true;
} else {
connectionPair.set(connectionId, [sessionId, null]);
keySessionId = sessionId;
res.sendStatus(200);
return;
}

connectionPair.set(connectionId, [sessionId, null]);
keySessionId = sessionId;
const map = offers.get(keySessionId);
map.set(connectionId, new Offer(req.body.sdp, Date.now(), polite))

Expand All @@ -223,6 +226,11 @@ router.post('/answer', (req: Request, res: Response) => {
const connectionIds = getOrCreateConnectionIds(sessionId);
connectionIds.add(connectionId);

if (!connectionPair.has(connectionId)) {
res.sendStatus(200);
return;
}

// add connectionPair
const pair = connectionPair.get(connectionId);
const otherSessionId = pair[0] == sessionId ? pair[1] : pair[0];
Expand Down
50 changes: 21 additions & 29 deletions WebApp/src/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ export default class WSSignaling {
let newOffer = new Offer(message.sdp, Date.now(), false);

if (this.isPrivate) {
const pair = connectionPair.get(connectionId);
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
if (otherSessionWs) {
newOffer.polite = true;
otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer }));
} else {
ws.send(JSON.stringify({ type: "error", message: `${connectionId}: This connection id is not ready other session.` }));
if (connectionPair.has(connectionId)) {
const pair = connectionPair.get(connectionId);
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
if (otherSessionWs) {
newOffer.polite = true;
otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer }));
}
}
return;
}
Expand All @@ -160,39 +160,31 @@ export default class WSSignaling {
connectionIds.add(connectionId);
const newAnswer = new Answer(message.sdp, Date.now());

let otherSessionWs = null;

if (this.isPrivate) {
const pair = connectionPair.get(connectionId);
otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
} else {
const pair = connectionPair.get(connectionId);
otherSessionWs = pair[0];
connectionPair.set(connectionId, [otherSessionWs, ws]);
if (!connectionPair.has(connectionId)) {
return;
}

if (this.isPrivate) {
otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "answer", data: newAnswer }));
return;
const pair = connectionPair.get(connectionId);
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];

if (!this.isPrivate) {
connectionPair.set(connectionId, [otherSessionWs, ws]);
}

clients.forEach((_v, k) => {
if (k == ws) {
return;
}
k.send(JSON.stringify({ from: connectionId, to: "", type: "answer", data: newAnswer }));
});
otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "answer", data: newAnswer }));
}

private onCandidate(ws: WebSocket, message: any) {
const connectionId = message.connectionId;
const candidate = new Candidate(message.candidate, message.sdpMLineIndex, message.sdpMid, Date.now());

if (this.isPrivate) {
const pair = connectionPair.get(connectionId);
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
if (otherSessionWs) {
otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "candidate", data: candidate }));
if (connectionPair.has(connectionId)) {
const pair = connectionPair.get(connectionId);
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
if (otherSessionWs) {
otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "candidate", data: candidate }));
}
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,18 @@ PeerConnection CreatePeerConnection(string connectionId, bool polite)
{
onAddReceiver?.Invoke(connectionId, trackEvent.Receiver);
};
pc.OnNegotiationNeeded = () => OnNegotiationNeeded(connectionId);
pc.OnNegotiationNeeded = () => _startCoroutine(OnNegotiationNeeded(connectionId));
return peer;
}

void DeletePeerConnection(string connectionId)
{
_mapConnectionIdAndPeer[connectionId].Dispose();
if (!_mapConnectionIdAndPeer.TryGetValue(connectionId, out var peer))
{
return;
}

peer.Dispose();
_mapConnectionIdAndPeer.Remove(connectionId);
}

Expand All @@ -424,15 +429,16 @@ void OnIceConnectionChange(string connectionId, RTCIceConnectionState state)
}
}

void OnNegotiationNeeded(string connectionId)
IEnumerator OnNegotiationNeeded(string connectionId)
{
yield return new WaitWhile(() => !IsStable(connectionId));
SendOffer(connectionId);
}

IEnumerator SendOfferCoroutine(string connectionId, PeerConnection pc)
{
// waiting other setLocalDescription process
yield return new WaitWhile(() => pc.makingOffer || pc.makingAnswer);
yield return new WaitWhile(() => !IsStable(connectionId));

Assert.AreEqual(pc.peer.SignalingState, RTCSignalingState.Stable,
$"{pc} negotiationneeded always fires in stable state");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ private void HTTPPooling()
try
{
HTTPGetConnections();
HTTPGetOffers();
//ToDo workaround: The processing order needs to be determined by the time stamp
HTTPGetAnswers();
HTTPGetOffers();
HTTPGetCandidates();

Thread.Sleep((int)(m_timeout * 1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,6 @@ public IEnumerator OnOffer()
yield return new WaitUntil(() => !string.IsNullOrEmpty(connectionId1));

signaling2.OnOffer += (s, e) => { offerRaised2 = true; };

if (m_SignalingType != typeof(MockSignaling))
{
LogAssert.Expect(LogType.Error, new Regex("."));
}

signaling1.SendOffer(connectionId, m_DescOffer);
yield return new WaitForSeconds(3);
// Do not receive offer other signaling if not connected same sendoffer connectionId in private mode
Expand Down
Loading

0 comments on commit a6fc9d3

Please sign in to comment.