Skip to content

Commit

Permalink
feat: fast reconnection implementation (#549)
Browse files Browse the repository at this point in the history
* fast reconnection implementation

* reconnecting status

---------

Co-authored-by: Kanat Kiialbaev <[email protected]>
Co-authored-by: Deven Joshi <[email protected]>
  • Loading branch information
3 people authored Dec 20, 2023
1 parent ddc8c23 commit 1b90a73
Show file tree
Hide file tree
Showing 16 changed files with 289 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,16 @@ class CallSessionStart extends LifecycleStage {
}

class CallConnecting extends LifecycleStage {
const CallConnecting(this.attempt);
const CallConnecting(this.attempt, {this.isFastReconnectAttempt = false});

factory CallConnecting.fastReconnect() =>
const CallConnecting(1, isFastReconnectAttempt: true);

final int attempt;
final bool isFastReconnectAttempt;

@override
List<Object?> get props => [attempt];
List<Object?> get props => [attempt, isFastReconnectAttempt];
}

class CallConnected extends LifecycleStage {
Expand Down
98 changes: 83 additions & 15 deletions packages/stream_video/lib/src/call/call.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import 'dart:async';

import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:flutter/foundation.dart';
import 'package:internet_connection_checker/internet_connection_checker.dart';
import 'package:meta/meta.dart';
import 'package:rxdart/rxdart.dart';

Expand Down Expand Up @@ -44,6 +47,9 @@ const _idAwait = 7;

const _tag = 'SV:Call';

const _reconnectTimeout = Duration(seconds: 30);
const _fastReconnectTimeout = Duration(seconds: kDebugMode ? 10 : 3);

int _callSeq = 1;

/// Represents a [Call] in which you can connect to.
Expand Down Expand Up @@ -558,6 +564,7 @@ class Call {
sessionId: sessionId,
credentials: credentials,
stateManager: _stateManager,
onFullReconnectNeeded: () => _fullReconnect(null),
);
_logger.v(() => '[startSession] session created: $session');
_session = session;
Expand Down Expand Up @@ -602,16 +609,76 @@ class Call {
}

Future<void> _reconnect(dynamic reason) async {
//one reconnect attempt at a time
if (_status.value == _ConnectionStatus.reconnecting) return;
_status.value = _ConnectionStatus.reconnecting;

_stateManager.lifecycleCallConnectingAction(CallConnecting.fastReconnect());

var tryFastReconnect = true;
_logger.w(() => '[reconnect] starting timer');
final timer = Timer(_fastReconnectTimeout, () {
_logger.w(() => '[reconnect] too late for fast reconnect');
tryFastReconnect = false;
});

final connectionStatus = await InternetConnectionChecker.createInstance(
checkInterval: const Duration(seconds: 1),
)
.onStatusChange
.firstWhere((status) => status == InternetConnectionStatus.connected)
.timeout(
_reconnectTimeout,
onTimeout: () {
_logger.w(() => '[reconnect] timeout');

return InternetConnectionStatus.disconnected;
},
);

//no internet connection after _reconnectTimeout, leave the call
if (connectionStatus != InternetConnectionStatus.connected) {
timer.cancel();
await leave();
return;
}

if (_session != null && tryFastReconnect) {
_logger.w(() => '[reconnect] trying fast reconnect');
timer.cancel();

final result = await _session!.fastReconnect();
if (!result.isSuccess) {
_logger.w(
() =>
'[reconnect] fast reconnect failed, doing full reconnect: ${result.fold(success: (success) => '', failure: (failure) => failure.error.message)}',
);
await _fullReconnect(reason);
} else {
_logger.w(
() => '[reconnect] fast reconnect successful',
);

_stateManager.lifecycleCallConnected(const CallConnected());
_status.value = _ConnectionStatus.connected;
}
} else {
_logger.w(() => '[reconnect] doing full reconnect');
await _fullReconnect(reason);
}
}

Future<void> _fullReconnect(dynamic reason) async {
if (_status.value == _ConnectionStatus.disconnected) {
_logger.w(() => '[reconnect] rejected (disconnected)');
_logger.w(() => '[fullReconnect] rejected (disconnected)');
return;
}
if (_status.value == _ConnectionStatus.connecting) {
_logger.w(() => '[reconnect] rejected (connecting)');
_logger.w(() => '[fullReconnect] rejected (connecting)');
return;
}
_status.value = _ConnectionStatus.connecting;
_logger.w(() => '[reconnect] >>>>>>>>>>>>>>>> reason: $reason');
_logger.w(() => '[fullReconnect] >>>>>>>>>>>>>>>> reason: $reason');
_subscriptions.cancel(_idSessionEvents);
await _session?.dispose();
_session = null;
Expand All @@ -625,52 +692,52 @@ class Call {
if (_status.value == _ConnectionStatus.disconnected) {
_logger.w(
() =>
'[reconnect] attempt($_reconnectAttempt) rejected (disconnected)',
'[fullReconnect] attempt($_reconnectAttempt) rejected (disconnected)',
);
_logger.v(() => '[reconnect] <<<<<<<<<<<<<<< rejected');
_logger.v(() => '[fullReconnect] <<<<<<<<<<<<<<< rejected');
return;
}
final elapsed = DateTime.now().toUtc().millisecondsSinceEpoch - startTime;
final retryPolicy = _retryPolicy;
if (elapsed > retryPolicy.config.callRejoinTimeout.inMilliseconds) {
_logger.w(() => '[reconnect] timeout exceed');
_logger.w(() => '[fullReconnect] timeout exceed');
result = Result.error('was unable to reconnect in 15 seconds');
break;
}
final delay = retryPolicy.backoff(_reconnectAttempt);
_logger.v(
() => '[reconnect] attempt: $_reconnectAttempt, '
() => '[fullReconnect] attempt: $_reconnectAttempt, '
'elapsed: $elapsed, delay: $delay',
);
await Future<void>.delayed(delay);
_logger.v(() => '[reconnect] joining to coordinator');
_logger.v(() => '[fullReconnect] joining to coordinator');
final joinedResult = await _joinIfNeeded();
if (joinedResult is! Success<CallCredentials>) {
_logger.e(() => '[reconnect] joining failed: $joinedResult');
_logger.e(() => '[fullReconnect] joining failed: $joinedResult');
continue;
}
_logger.v(() => '[reconnect] starting session');
_logger.v(() => '[fullReconnect] starting session');
result = await _startSession(joinedResult.data);
if (result is! Success<None>) {
_logger.w(() => '[reconnect] session start failed: $result');
_logger.w(() => '[fullReconnect] session start failed: $result');
continue;
}
_logger.v(() => '[reconnect] session started');
_logger.v(() => '[fullReconnect] session started');
break;
}
_reconnectAttempt = 0;
if (result.isFailure) {
_logger.e(() => '[reconnect] <<<<<<<<<<<<<<< failed: $result');
_logger.e(() => '[fullReconnect] <<<<<<<<<<<<<<< failed: $result');
_status.value = _ConnectionStatus.disconnected;
final error = (result as Failure).error;
_stateManager.lifecycleCallConnectFailed(ConnectFailed(error));
return;
}
_logger.v(() => '[reconnect] <<<<<<<<<<<<<<< completed');
_logger.v(() => '[fullReconnect] <<<<<<<<<<<<<<< completed');
_stateManager.lifecycleCallConnected(const CallConnected());
_status.value = _ConnectionStatus.connected;
await _applyConnectOptions();
_logger.v(() => '[reconnect] <<<<<<<<<<<<<<< side effects applied');
_logger.v(() => '[fullReconnect] <<<<<<<<<<<<<<< side effects applied');
}

Future<Result<None>> _awaitIfNeeded() async {
Expand Down Expand Up @@ -1591,6 +1658,7 @@ extension on CallStateNotifier {
enum _ConnectionStatus {
disconnected,
connecting,
reconnecting,
connected;

@override
Expand Down
113 changes: 113 additions & 0 deletions packages/stream_video/lib/src/call/session/call_session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ import 'dart:convert';

import 'package:collection/collection.dart';
import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc;
import 'package:webrtc_interface/webrtc_interface.dart';

import '../../../protobuf/video/sfu/event/events.pb.dart' as sfu_events;
import '../../../protobuf/video/sfu/models/models.pb.dart' as sfu_models;
import '../../../protobuf/video/sfu/signal_rpc/signal.pb.dart' as sfu;
import '../../../stream_video.dart';
import '../../action/internal/rtc_action.dart';
import '../../disposable.dart';
import '../../errors/video_error.dart';
import '../../errors/video_error_composer.dart';
import '../../sfu/data/events/sfu_events.dart';
import '../../sfu/data/models/sfu_model_mapper_extensions.dart';
Expand All @@ -33,6 +35,9 @@ import 'call_session_config.dart';
const _tag = 'SV:CallSession';

const _debounceDuration = Duration(milliseconds: 200);
const _pcReconnectTimeout = Duration(seconds: 4);

typedef OnFullReconnectNeeded = void Function();

class CallSession extends Disposable {
CallSession({
Expand All @@ -41,6 +46,7 @@ class CallSession extends Disposable {
required this.sessionId,
required this.config,
required this.stateManager,
required this.onFullReconnectNeeded,
required SdpEditor sdpEditor,
}) : sfuClient = SfuClientImpl(
baseUrl: config.sfuUrl,
Expand Down Expand Up @@ -71,8 +77,11 @@ class CallSession extends Disposable {
final SfuClient sfuClient;
final SfuWebSocket sfuWS;
final RtcManagerFactory rtcManagerFactory;
final OnFullReconnectNeeded onFullReconnectNeeded;

RtcManager? rtcManager;
StreamSubscription<SfuEvent>? eventsSubscription;
Timer? _peerConnectionCheckTimer;

SharedEmitter<CallStats> get stats => _stats;
late final _stats = MutableSharedEmitterImpl<CallStats>();
Expand Down Expand Up @@ -130,6 +139,8 @@ class CallSession extends Disposable {
)
..onPublisherIceCandidate = _onLocalIceCandidate
..onSubscriberIceCandidate = _onLocalIceCandidate
..onSubscriberDisconnectedOrFailed = _onSubsciberDisconnectedOrFailed
..onPublisherDisconnectedOrFailed = _onPublisherDisconnectedOrFailed
..onLocalTrackMuted = _onLocalTrackMuted
..onLocalTrackPublished = _onLocalTrackPublished
..onRenegotiationNeeded = _onRenegotiationNeeded
Expand All @@ -147,6 +158,57 @@ class CallSession extends Disposable {
}
}

Future<Result<None>> fastReconnect() async {
try {
_logger.d(() => '[fastReconnect] no args');

final genericSdp = await RtcManager.getGenericSdp();
_logger.v(() => '[fastReconnect] genericSdp.len: ${genericSdp.length}');

await eventsSubscription?.cancel();
eventsSubscription = sfuWS.events.listen(_onSfuEvent);
await sfuWS.connect();

sfuWS.send(
sfu_events.SfuRequest(
joinRequest: sfu_events.JoinRequest(
token: config.sfuToken,
sessionId: sessionId,
subscriberSdp: genericSdp,
fastReconnect: true,
),
),
);

_logger.v(() => '[fastReconnect] wait for SfuJoinResponseEvent');
final event = await sfuWS.events.waitFor<SfuJoinResponseEvent>(
timeLimit: const Duration(seconds: 30),
);

if (event.isReconnected) {
_logger.v(() =>
'[fastReconnect] fast-reconnect possible - requesting ICE restarts');
final iceResult = await sfuClient.restartIce(
sfu.ICERestartRequest(
sessionId: sessionId,
peerType: sfu_models.PeerType.PEER_TYPE_SUBSCRIBER,
),
);

_logger.v(() => '[fastReconnect] completed');
return iceResult.map((data) => none);
} else {
_logger.v(() => '[fastReconnect] fast-reconnect not possible');
return const Result.failure(
VideoError(message: 'Fast reconnect not possible'),
);
}
} catch (e, stk) {
_logger.e(() => '[fastReconnect] failed: $e');
return Result.failure(VideoErrors.compose(e, stk));
}
}

@override
Future<void> dispose() async {
_logger.d(() => '[dispose] no args');
Expand All @@ -157,6 +219,7 @@ class CallSession extends Disposable {
await sfuWS.disconnect();
await rtcManager?.dispose();
rtcManager = null;
_peerConnectionCheckTimer?.cancel();
return await super.dispose();
}

Expand Down Expand Up @@ -402,6 +465,56 @@ class CallSession extends Disposable {
_logger.v(() => '[onLocalIceCandidate] result: $result');
}

Future<void> _onSubsciberDisconnectedOrFailed(
StreamPeerConnection pc,
rtc.RTCIceConnectionState state,
) async {
_logger.d(
() => '[_onSubsciberDisconnectedOrFaild] type: ${pc.type}, state: $state',
);

_peerConnectionCheckTimer?.cancel();
_peerConnectionCheckTimer = Timer(_pcReconnectTimeout, () {
if (pc.pc.connectionState !=
RTCPeerConnectionState.RTCPeerConnectionStateConnected) {
_peerConnectionCheckTimer = null;
onFullReconnectNeeded();
}
});

final iceResult = await sfuClient.restartIce(
sfu.ICERestartRequest(
sessionId: sessionId,
peerType: sfu_models.PeerType.PEER_TYPE_SUBSCRIBER,
),
);

_logger.v(() => '[_onSubsciberDisconnectedOrFailed] result: $iceResult');
}

Future<void> _onPublisherDisconnectedOrFailed(
StreamPeerConnection pc,
rtc.RTCIceConnectionState state,
) async {
_logger.d(
() =>
'[_onPublisherDisconnectedOrFailed] type: ${pc.type}, state: $state',
);

_peerConnectionCheckTimer?.cancel();
_peerConnectionCheckTimer = Timer(_pcReconnectTimeout, () {
if (pc.pc.connectionState !=
RTCPeerConnectionState.RTCPeerConnectionStateConnected) {
_peerConnectionCheckTimer = null;
onFullReconnectNeeded();
}
});

await pc.pc.restartIce();

_logger.v(() => '[_onPublisherDisconnectedOrFailed] ice restarted');
}

Future<void> _onRenegotiationNeeded(StreamPeerConnection pc) async {
_logger.d(() => '[negotiate] type: ${pc.type}');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class CallSessionFactory {
String? sessionId,
required CallCredentials credentials,
required CallStateNotifier stateManager,
required OnFullReconnectNeeded onFullReconnectNeeded,
}) async {
final finalSessionId = sessionId ?? const Uuid().v4();
_logger.d(() => '[makeCallSession] sessionId: $finalSessionId($sessionId)');
Expand All @@ -49,6 +50,7 @@ class CallSessionFactory {
config: sessionConfig,
stateManager: stateManager,
sdpEditor: sdpEditor,
onFullReconnectNeeded: onFullReconnectNeeded,
);
}

Expand Down
Loading

0 comments on commit 1b90a73

Please sign in to comment.