Skip to content

Commit

Permalink
Reconnect faster in a more robust way when connection is failed/closed
Browse files Browse the repository at this point in the history
  • Loading branch information
mekya committed Nov 21, 2024
1 parent 404c515 commit 2322394
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 33 deletions.
98 changes: 65 additions & 33 deletions src/main/js/webrtc_adaptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,11 @@ export class WebRTCAdaptor {
* This is the time info for the last reconnection attempt
*/
this.lastReconnectiontionTrialTime = 0;

/**
* TimerId for the pending try again call
*/
this.pendingTryAgainTimerId = -1;

/**
* All media management works for teh local stream are made by @MediaManager class.
Expand Down Expand Up @@ -613,7 +618,7 @@ export class WebRTCAdaptor {
this.iceConnectionState(streamId) != "connected" &&
this.iceConnectionState(streamId) != "completed") {
//if it is not connected, try to reconnect
this.reconnectIfRequired(0);
this.reconnectIfRequired(0, false);
}
}, 3000);
}
Expand All @@ -623,25 +628,36 @@ export class WebRTCAdaptor {
* @param {number} [delayMs]
* @returns
*/
reconnectIfRequired(delayMs = 3000) {
reconnectIfRequired(delayMs = 3000, forceReconnect = false) {
if (this.reconnectIfRequiredFlag) {
//It's important to run the following methods after 3000 ms because the stream may be stopped by the user in the meantime
if (delayMs > 0) {
setTimeout(() => {
this.tryAgain();
}, delayMs);

//clear the previous timer if exists
clearTimeout(this.pendingTryAgainTimerId);

//set a timer to prevent too many trial from different paths
this.pendingTryAgainTimerId = setTimeout(() =>
{
this.tryAgain(forceReconnect);
},
delayMs);

}
else {
this.tryAgain()
this.tryAgain(forceReconnect)
}
}
}

tryAgain() {
tryAgain(forceReconnect) {

const now = Date.now();
//to prevent too many trial from different paths
if (now - this.lastReconnectiontionTrialTime < 3000) {
//check again 3 seconds later if it is not stopped on purpose
Logger.debug("Reconnection is tried before 3 seconds. It will check/try again after 3 seconds");
this.reconnectIfRequired(3000, forceReconnect);
return;
}
this.lastReconnectiontionTrialTime = now;
Expand All @@ -650,10 +666,10 @@ export class WebRTCAdaptor {
//if remotePeerConnection has a peer connection for the stream id, it means that it is not stopped on purpose

if (this.remotePeerConnection[this.publishStreamId] != null &&
(forceReconnect ||
//check connection status to not stop streaming an active stream
this.iceConnectionState(this.publishStreamId) != "checking" &&
this.iceConnectionState(this.publishStreamId) != "connected" &&
this.iceConnectionState(this.publishStreamId) != "completed") {
["checking", "connected", "completed"].indexOf(this.iceConnectionState(this.publishStreamId)) === -1)
) {
// notify that reconnection process started for publish
this.notifyEventListeners("reconnection_attempt_for_publisher", this.publishStreamId);

Expand All @@ -669,11 +685,12 @@ export class WebRTCAdaptor {
//reconnect play
for (var index in this.playStreamId) {
let streamId = this.playStreamId[index];
if (this.remotePeerConnection[streamId] != "null" &&
//check connection status to not stop streaming an active stream
this.iceConnectionState(streamId) != "checking" &&
this.iceConnectionState(streamId) != "connected" &&
this.iceConnectionState(streamId) != "completed") {
if (this.remotePeerConnection[streamId] != null &&
(forceReconnect ||
//check connection status to not stop streaming an active stream
["checking", "connected", "completed"].indexOf(this.iceConnectionState(streamId)) === -1
)
) {
// notify that reconnection process started for play
this.notifyEventListeners("reconnection_attempt_for_player", streamId);

Expand Down Expand Up @@ -1136,27 +1153,36 @@ export class WebRTCAdaptor {

this.remotePeerConnection[streamId].oniceconnectionstatechange = event => {
var obj = { state: this.remotePeerConnection[streamId].iceConnectionState, streamId: streamId };
if (obj.state == "failed" || obj.state == "disconnected" || obj.state == "closed") {
this.reconnectIfRequired(3000);
}
this.notifyEventListeners("ice_connection_state_changed", obj);

//
if (!this.isPlayMode && !this.playStreamId.includes(streamId)) {
if (this.remotePeerConnection[streamId].iceConnectionState == "connected") {

this.mediaManager.changeBandwidth(this.mediaManager.bandwidth, streamId).then(() => {
Logger.debug("Bandwidth is changed to " + this.mediaManager.bandwidth);
})
.catch(e => Logger.warn(e));
}
}
this.oniceconnectionstatechangeCallback(obj);

Check warning on line 1157 in src/main/js/webrtc_adaptor.js

View check run for this annotation

Codecov / codecov/patch

src/main/js/webrtc_adaptor.js#L1157

Added line #L1157 was not covered by tests
}

}

return this.remotePeerConnection[streamId];
}

oniceconnectionstatechangeCallback(obj)
{
if (obj.state == "failed" || obj.state == "disconnected" || obj.state == "closed") {
//try immediately
this.reconnectIfRequired(0, false);
}
this.notifyEventListeners("ice_connection_state_changed", obj);

//
if (!this.isPlayMode && !this.playStreamId.includes(obj.streamId)) {
if (this.remotePeerConnection[streamId].iceConnectionState == "connected") {

Check warning on line 1175 in src/main/js/webrtc_adaptor.js

View check run for this annotation

Codecov / codecov/patch

src/main/js/webrtc_adaptor.js#L1175

Added line #L1175 was not covered by tests

this.mediaManager.changeBandwidth(this.mediaManager.bandwidth, obj.streamId).then(() => {
Logger.debug("Bandwidth is changed to " + this.mediaManager.bandwidth);

Check warning on line 1178 in src/main/js/webrtc_adaptor.js

View check run for this annotation

Codecov / codecov/patch

src/main/js/webrtc_adaptor.js#L1177-L1178

Added lines #L1177 - L1178 were not covered by tests
})
.catch(e => Logger.warn(e));

Check warning on line 1180 in src/main/js/webrtc_adaptor.js

View check run for this annotation

Codecov / codecov/patch

src/main/js/webrtc_adaptor.js#L1180

Added line #L1180 was not covered by tests
}
}
}



/**
* Called internally to close PeerConnection.
Expand Down Expand Up @@ -1745,10 +1771,7 @@ export class WebRTCAdaptor {
websocket_url: this.websocketURL,
webrtcadaptor: this,
callback: (info, obj) => {
if (info == "closed") {
this.reconnectIfRequired();
}
this.notifyEventListeners(info, obj);
this.websocketCallback(info, obj)
},
callbackError: (error, message) => {
this.notifyErrorEventListeners(error, message)
Expand All @@ -1757,6 +1780,15 @@ export class WebRTCAdaptor {
});
}
}

websocketCallback(info, obj) {
if (info == "closed") {
Logger.info("Websocket is closed. It will reconnect if required.")
//try with forcing reconnect because webrtc will be closed as well
this.reconnectIfRequired(0, true);
}
this.notifyEventListeners(info, obj);
}

/**
* Called to stop Web Socket connection
Expand Down
103 changes: 103 additions & 0 deletions src/test/js/webrtc_adaptor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,116 @@ describe("WebRTCAdaptor", function() {
expect(webSocketAdaptor.connecting).to.be.false;

});


it("reconnectIfRequired", async function() {
var adaptor = new WebRTCAdaptor({
websocketURL: "ws://example.com",
isPlayMode: true
});

let tryAgain = sinon.replace(adaptor, "tryAgain", sinon.fake());


adaptor.reconnectIfRequired(100);
adaptor.reconnectIfRequired(200);
clock.tick(300);

expect(tryAgain.calledOnce).to.be.true;



});

it("oniceconnectionstatechangeCallback", async function() {
var adaptor = new WebRTCAdaptor({
websocketURL: "ws://example.com",
isPlayMode: true
});

let reconnectIfRequired = sinon.replace(adaptor, "reconnectIfRequired", sinon.fake());
var obj = { state: "failed", streamId: "streamId" };

var stopFake = sinon.replace(adaptor, "stop", sinon.fake());
adaptor.oniceconnectionstatechangeCallback(obj);
expect(reconnectIfRequired.calledOnce).to.be.true;
expect(reconnectIfRequired.calledWithExactly(0, false)).to.be.true;

obj = { state: "closed", streamId: "streamId" };

adaptor.oniceconnectionstatechangeCallback(obj);
expect(reconnectIfRequired.calledTwice).to.be.true;
expect(reconnectIfRequired.calledWithExactly(0, false)).to.be.true;

obj = { state: "disconnected", streamId: "streamId" };
adaptor.oniceconnectionstatechangeCallback(obj);
expect(reconnectIfRequired.callCount).to.be.equal(3);


obj = { state: "connected", streamId: "streamId" };
adaptor.oniceconnectionstatechangeCallback(obj);
expect(reconnectIfRequired.callCount).to.be.equal(3);


});

it("websocketCallback", async function() {
var adaptor = new WebRTCAdaptor({
websocketURL: "ws://example.com",
isPlayMode: true
});

let reconnectIfRequired = sinon.replace(adaptor, "reconnectIfRequired", sinon.fake());

var stopFake = sinon.replace(adaptor, "stop", sinon.fake());
adaptor.websocketCallback("closed");

expect(reconnectIfRequired.calledOnce).to.be.true;
expect(reconnectIfRequired.calledWithExactly(0, true)).to.be.true;


adaptor.websocketCallback("anyOtherThing");

//it should be still once
expect(reconnectIfRequired.calledOnce).to.be.true;


});

it("tryAgainForceReconnect", async function() {

var adaptor = new WebRTCAdaptor({
websocketURL: "ws://example.com",
isPlayMode: true
});
var streamId = "streamId";
adaptor.publishStreamId = streamId;

let stop = sinon.replace(adaptor, "stop", sinon.fake());

var mockPC = sinon.mock(RTCPeerConnection);
adaptor.remotePeerConnection[streamId] = mockPC
mockPC.iceConnectionState = "connected";

adaptor.tryAgain(false);

expect(stop.calledOnce).to.be.false;

adaptor.tryAgain(true);
expect(stop.calledOnce).to.be.false;


});



it("Frequent try again call", async function() {
var adaptor = new WebRTCAdaptor({
websocketURL: "ws://example.com",
isPlayMode: true
});

expect(adaptor.pendingTryAgainTimerId).to.be.equal(-1);
let webSocketAdaptor = sinon.mock(adaptor.webSocketAdaptor);
let closeExpectation = webSocketAdaptor.expects("close");

Expand Down

0 comments on commit 2322394

Please sign in to comment.