From d4cfa22de3594ed31b4f4c6f964411c568d43512 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Bra=C5=BCewicz?= Date: Wed, 15 Jan 2025 12:03:24 +0100 Subject: [PATCH] feat: report connection times to SFU stats (#818) * Video filters * tweak * fix * custom filter added * fix * tweaks * Codec negotiation * client publish options added * force TF build * dep fix * replace duplicate resource bundle name * fixed target name * use xcode 16 to distribte * podfile test * remove duplicate bundle * remove resource bundle 2 * podfile fix * fixes * tweaks * don't log stats request * setting publish options when reconnecting * log tweaks * dogfooding: disable autocorrect in call id input * announce tracks change for reconnect * enable env switcher * log tweak * added codec to track info * added muted to track info * revert temp changes * Report connection times * publish option id added to trackinfo * fmtp line * fix * fix for race between negotiation and initial track creation * cleanup * tweaks * fix telemetry reporting * stopwatch used instead of datetime * comment removed --- packages/stream_video/lib/src/call/call.dart | 188 +++++------------- .../lib/src/call/session/call_session.dart | 120 +---------- .../lib/src/call/sfu_stats_reporter.dart | 157 +++++++++++++++ .../state/mixins/state_lifecycle_mixin.dart | 4 - .../lib/src/call/stats_reporter.dart | 172 ++++++++++++++++ .../lib/src/models/call_preferences.dart | 5 + .../stream_video/lib/src/sfu/sfu_client.dart | 2 +- .../lib/src/webrtc/peer_connection.dart | 100 +++------- .../lib/src/webrtc/rtc_manager.dart | 21 -- .../call_diagnostics_content.dart | 22 +- 10 files changed, 418 insertions(+), 373 deletions(-) create mode 100644 packages/stream_video/lib/src/call/sfu_stats_reporter.dart create mode 100644 packages/stream_video/lib/src/call/stats_reporter.dart diff --git a/packages/stream_video/lib/src/call/call.dart b/packages/stream_video/lib/src/call/call.dart index a98cdad7d..e21bca229 100644 --- a/packages/stream_video/lib/src/call/call.dart +++ b/packages/stream_video/lib/src/call/call.dart @@ -25,10 +25,6 @@ import '../utils/cancelables.dart'; import '../utils/extensions.dart'; import '../utils/future.dart'; import '../utils/standard.dart'; -import '../webrtc/model/stats/rtc_codec.dart'; -import '../webrtc/model/stats/rtc_ice_candidate_pair.dart'; -import '../webrtc/model/stats/rtc_inbound_rtp_video_stream.dart'; -import '../webrtc/model/stats/rtc_outbound_rtp_video_stream.dart'; import '../webrtc/rtc_manager.dart'; import '../webrtc/sdp/editor/sdp_editor_impl.dart'; import '../webrtc/sdp/policy/sdp_policy.dart'; @@ -36,7 +32,9 @@ import '../ws/ws.dart'; import 'permissions/permissions_manager.dart'; import 'session/call_session.dart'; import 'session/call_session_factory.dart'; +import 'sfu_stats_reporter.dart'; import 'state/call_state_notifier.dart'; +import 'stats_reporter.dart'; typedef OnCallPermissionRequest = void Function( CoordinatorCallPermissionRequestEvent, @@ -224,6 +222,9 @@ class Call { CallSession? _session; CallSession? _previousSession; + int? _statsReportingIntervalMs; + SfuStatsReporter? _sfuStatsReporter; + int _reconnectAttempts = 0; Duration _fastReconnectDeadline = Duration.zero; SfuReconnectionStrategy _reconnectStrategy = @@ -241,8 +242,10 @@ class Call { StateEmitter get state => _stateManager.callStateStream; - SharedEmitter get stats => _stats; - late final _stats = MutableSharedEmitterImpl(); + SharedEmitter<({CallStats publisherStats, CallStats subscriberStats})> + get stats => _stats; + late final _stats = MutableSharedEmitterImpl< + ({CallStats publisherStats, CallStats subscriberStats})>(); SharedEmitter get callEvents => _callEvents; final _callEvents = MutableSharedEmitterImpl(); @@ -578,6 +581,7 @@ class Call { return _callJoinLock.synchronized(() async { _logger.d(() => '[join] options: $_connectOptions'); + final connectionTimeStopwatch = Stopwatch()..start(); final validation = await _stateManager.validateUserId(_streamVideo.currentUser.id); @@ -624,7 +628,6 @@ class Call { } _credentials = joinedResult.data; - _previousSession = _session; final isWsHealthy = _previousSession?.sfuWS.isConnected ?? false; @@ -704,6 +707,15 @@ class Call { ); } + // make sure we only track connection timing if we are not calling this method as part of a migration flow + connectionTimeStopwatch.stop(); + if (!performingMigration) { + await _sfuStatsReporter?.sendSfuStats( + reconnectionStrategy: _reconnectStrategy, + connectionTimeMs: connectionTimeStopwatch.elapsedMilliseconds, + ); + } + if (performingRejoin) { _logger.v(() => '[join] leaving previous session'); _previousSession?.leave( @@ -726,6 +738,10 @@ class Call { } _logger.v(() => '[join] completed'); + + // reset the reconnect strategy to unspecified after a successful reconnection + _reconnectStrategy = SfuReconnectionStrategy.unspecified; + return const Result.success(none); }); } @@ -742,6 +758,7 @@ class Call { final prevState = _stateManager.callState; if (credentials == null || + _statsReportingIntervalMs == null || _reconnectStrategy == SfuReconnectionStrategy.rejoin || _reconnectStrategy == SfuReconnectionStrategy.migrate) { _logger.w(() => '[joinIfNeeded] joining'); @@ -757,8 +774,7 @@ class Call { return joinedResult.fold( success: (success) { _credentials = success.data.credentials; - _session?.rtcManager - ?.updateReportingInterval(success.data.reportingIntervalMs); + _statsReportingIntervalMs = success.data.reportingIntervalMs; return Result.success(success.data.credentials); }, @@ -874,29 +890,18 @@ class Call { _session = session; - _subscriptions.cancel(_idSessionEvents); + _sfuStatsReporter?.stop(); _subscriptions.cancel(_idSessionStats); + _subscriptions.cancel(_idSessionEvents); _subscriptions.add( _idSessionEvents, session.events.listen((event) { - // _logger.log( - // event.logPriority, - // () => '[listenSfuEvent] event.type: ${event.runtimeType}', - // ); event.mapToCallEvent(state.value).emitIfNotNull(_callEvents); _onSfuEvent(event); }), ); - _subscriptions.add( - _idSessionStats, - session.stats.listen((stats) { - _stats.emit(stats); - _processStats(stats); - }), - ); - var localStats = state.value.localStats ?? LocalStats.empty(); localStats = localStats.copyWith( sfu: session.config.sfuUrl, @@ -921,6 +926,23 @@ class Call { }, ); + _subscriptions.add( + _idSessionStats, + StatsReporter( + rtcManager: session.rtcManager!, + stateManager: _stateManager, + ).run(interval: _preferences.callStatsReportingInterval).listen((stats) { + _stats.emit(stats); + }), + ); + + if (_statsReportingIntervalMs != null) { + _sfuStatsReporter = SfuStatsReporter( + callSession: session, + stateManager: _stateManager, + )..run(interval: Duration(milliseconds: _statsReportingIntervalMs!)); + } + return result.fold( success: (success) { _logger.v(() => '[startSession] success: $success'); @@ -934,119 +956,6 @@ class Call { ); } - void _processStats(CallStats stats) { - var publisherStats = - state.value.publisherStats ?? PeerConnectionStats.empty(); - var subscriberStats = - state.value.subscriberStats ?? PeerConnectionStats.empty(); - - if (stats.peerType == StreamPeerType.publisher) { - final allStats = stats.stats - .whereType() - .map(MediaStatsInfo.fromRtcOutboundRtpVideoStream); - - final mediaStats = allStats.firstWhereOrNull( - (s) => s.width != null && s.height != null && s.fps != null, - ); - - final jitterInMs = ((mediaStats?.jitter ?? 0) * 1000).toInt(); - final resolution = mediaStats != null - ? '${mediaStats.width} x ${mediaStats.height} @ ${mediaStats.fps}fps' - : null; - - var activeOutbound = allStats.toList(); - - if (publisherStats.outboundMediaStats.isNotEmpty) { - activeOutbound = activeOutbound - .where( - (s) => - publisherStats.outboundMediaStats.none((i) => s.id == i.id) || - publisherStats.outboundMediaStats - .firstWhere((i) => i.id == s.id) - .bytesSent != - s.bytesSent, - ) - .toList(); - } - - final codec = stats.stats - .whereType() - .where((c) => c.mimeType?.startsWith('video') ?? false) - .where((c) => activeOutbound.any((s) => s.videoCodecId == c.id)) - .map((c) => c.mimeType?.replaceFirst('video/', '')) - .where((c) => c != null) - .cast() - .toList(); - - publisherStats = publisherStats.copyWith( - resolution: resolution, - qualityDropReason: mediaStats?.qualityLimit, - jitterInMs: jitterInMs, - videoCodec: codec, - outboundMediaStats: allStats.toList(), - ); - } - - final inboudRtpVideo = - stats.stats.whereType().firstOrNull; - - if (stats.peerType == StreamPeerType.subscriber && inboudRtpVideo != null) { - final jitterInMs = ((inboudRtpVideo.jitter ?? 0) * 1000).toInt(); - final resolution = inboudRtpVideo.frameWidth != null && - inboudRtpVideo.frameHeight != null && - inboudRtpVideo.framesPerSecond != null - ? '${inboudRtpVideo.frameWidth} x ${inboudRtpVideo.frameHeight} @ ${inboudRtpVideo.framesPerSecond}fps' - : null; - - final codecStats = stats.stats - .whereType() - .where((c) => c.mimeType?.startsWith('video') ?? false) - .firstOrNull; - - final codec = codecStats?.mimeType?.replaceFirst('video/', ''); - - subscriberStats = subscriberStats.copyWith( - resolution: resolution, - jitterInMs: jitterInMs, - videoCodec: codec != null ? [codec] : [], - ); - } - - final candidatePair = - stats.stats.whereType().firstOrNull; - if (candidatePair != null) { - final latency = candidatePair.currentRoundTripTime; - final outgoingBitrate = candidatePair.availableOutgoingBitrate; - final incomingBitrate = candidatePair.availableIncomingBitrate; - - if (stats.peerType == StreamPeerType.publisher) { - publisherStats = publisherStats.copyWith( - latency: latency != null ? (latency * 1000).toInt() : null, - bitrateKbps: outgoingBitrate != null ? outgoingBitrate / 1000 : null, - ); - } else { - subscriberStats = subscriberStats.copyWith( - bitrateKbps: incomingBitrate != null ? incomingBitrate / 1000 : null, - ); - } - } - - var latencyHistory = state.value.latencyHistory; - if (stats.peerType == StreamPeerType.publisher && - publisherStats.latency != null) { - latencyHistory = [ - ...state.value.latencyHistory.reversed.take(19).toList().reversed, - publisherStats.latency!, - ]; - } - - _stateManager.lifecycleCallStats( - publisherStats: publisherStats, - subscriberStats: subscriberStats, - latencyHistory: latencyHistory, - ); - } - Future _onSfuEvent(SfuEvent sfuEvent) async { if (sfuEvent is SfuParticipantLeftEvent) { final callParticipants = [...state.value.callParticipants]..removeWhere( @@ -1188,6 +1097,8 @@ class Call { } Future _reconnectMigrate() async { + final migrateTimeStopwatch = Stopwatch()..start(); + _reconnectStrategy = SfuReconnectionStrategy.migrate; await _join(); final result = await _session?.waitForMigrationComplete(); @@ -1202,6 +1113,12 @@ class Call { _reconnectStrategy = SfuReconnectionStrategy.rejoin; }, ); + + migrateTimeStopwatch.stop(); + await _sfuStatsReporter?.sendSfuStats( + connectionTimeMs: migrateTimeStopwatch.elapsedMilliseconds, + reconnectionStrategy: _reconnectStrategy, + ); } Future _awaitNetworkAvailable() async { @@ -1292,6 +1209,7 @@ class Call { timer.cancel(); } + _sfuStatsReporter?.stop(); _subscriptions.cancelAll(); _cancelables.cancelAll(); await _session?.dispose(); diff --git a/packages/stream_video/lib/src/call/session/call_session.dart b/packages/stream_video/lib/src/call/session/call_session.dart index 9e29ba3e3..b9dcbb5d9 100644 --- a/packages/stream_video/lib/src/call/session/call_session.dart +++ b/packages/stream_video/lib/src/call/session/call_session.dart @@ -1,14 +1,12 @@ import 'dart:async'; import 'dart:convert'; -import 'package:battery_plus/battery_plus.dart'; import 'package:collection/collection.dart'; import 'package:device_info_plus/device_info_plus.dart'; import 'package:rxdart/rxdart.dart'; import 'package:stream_webrtc_flutter/stream_webrtc_flutter.dart' as rtc; import 'package:synchronized/synchronized.dart'; import 'package:system_info2/system_info2.dart'; -import 'package:thermal/thermal.dart'; import '../../../protobuf/video/sfu/event/events.pb.dart' as sfu_events; import '../../../protobuf/video/sfu/models/models.pb.dart' as sfu_models; @@ -19,7 +17,6 @@ import '../../../version.g.dart'; import '../../disposable.dart'; import '../../errors/video_error.dart'; import '../../errors/video_error_composer.dart'; -import '../../extensions/thermal_status_ext.dart'; import '../../sfu/data/events/sfu_events.dart'; import '../../sfu/data/models/sfu_call_state.dart'; import '../../sfu/data/models/sfu_error.dart'; @@ -31,8 +28,6 @@ import '../../shared_emitter.dart'; import '../../utils/debounce_buffer.dart'; import '../../webrtc/model/rtc_model_mapper_extensions.dart'; import '../../webrtc/model/rtc_tracks_info.dart'; -import '../../webrtc/model/stats/rtc_printable_stats.dart'; -import '../../webrtc/model/stats/rtc_stats.dart'; import '../../webrtc/peer_connection.dart'; import '../../webrtc/rtc_manager.dart'; import '../../webrtc/rtc_manager_factory.dart'; @@ -77,26 +72,6 @@ class CallSession extends Disposable { sdpEditor: sdpEditor, ) { _logger.i(() => ' callCid: $callCid, sessionId: $sessionId'); - - _thermalStatusSubscription = - Thermal().onThermalStatusChanged.listen((ThermalStatus status) { - _thermalStatus = status; - }); - - _mediaDeviceSubscription = - RtcMediaDeviceNotifier.instance.onDeviceChange.listen( - (devices) { - _availableAudioInputs = devices - .where((device) => device.kind == RtcMediaDeviceKind.audioInput) - .map((device) => device.label) - .toList(); - - _availableVideoInputs = devices - .where((device) => device.kind == RtcMediaDeviceKind.videoInput) - .map((device) => device.label) - .toList(); - }, - ); } late final _logger = taggedLogger(tag: '$_tag-$sessionSeq'); @@ -121,21 +96,11 @@ class CallSession extends Disposable { RtcManager? rtcManager; BehaviorSubject? _rtcManagerSubject; StreamSubscription? _eventsSubscription; - StreamSubscription>? _statsSubscription; - StreamSubscription>? _mediaDeviceSubscription; - StreamSubscription? _thermalStatusSubscription; - - List? _availableAudioInputs; - List? _availableVideoInputs; - ThermalStatus? _thermalStatus; Timer? _peerConnectionCheckTimer; sfu_models.ClientDetails? _clientDetails; - SharedEmitter get stats => _stats; - late final _stats = MutableSharedEmitterImpl(); - SharedEmitter get events => sfuWS.events; late final _vvBuffer = DebounceBuffer>( @@ -347,14 +312,11 @@ class CallSession extends Disposable { ..onLocalTrackMuted = _onLocalTrackMuted ..onLocalTrackPublished = _onLocalTrackPublished ..onRenegotiationNeeded = _onRenegotiationNeeded - ..onRemoteTrackReceived = _onRemoteTrackReceived - ..onStatsReceived = _onStatsReceived; + ..onRemoteTrackReceived = _onRemoteTrackReceived; await onRtcManagerCreatedCallback?.call(rtcManager!); _rtcManagerSubject!.add(rtcManager!); - await observePeerConnectionStats(); - _logger.d(() => '[start] completed'); return Result.success( ( @@ -471,11 +433,7 @@ class CallSession extends Disposable { }) async { _logger.d(() => '[close] code: $code, closeReason: $closeReason'); - await _stats.close(); await _eventsSubscription?.cancel(); - await _statsSubscription?.cancel(); - await _mediaDeviceSubscription?.cancel(); - await _thermalStatusSubscription?.cancel(); await sfuWS.disconnect( code.value, @@ -816,22 +774,6 @@ class CallSession extends Disposable { } } - void _onStatsReceived( - StreamPeerConnection pc, - List rtcStats, - RtcPrintableStats rtcPrintableStats, - List> rtcRawStats, - ) { - _stats.emit( - CallStats( - peerType: pc.type, - stats: rtcStats, - printable: rtcPrintableStats, - raw: rtcRawStats, - ), - ); - } - Future> setParticipantPinned({ required String sessionId, required String userId, @@ -965,66 +907,6 @@ class CallSession extends Disposable { return result.map((_) => none); } - Future observePeerConnectionStats() async { - await _statsSubscription?.cancel(); - - _statsSubscription = rtcManager?.statsStream.listen((rawStats) async { - final lowPowerMode = await Battery().isInBatterySaveMode; - - sfu_models.AndroidState? androidState; - sfu_models.AppleState? appleState; - - final audioInputDevices = sfu_models.InputDevices( - availableDevices: _availableAudioInputs, - currentDevice: stateManager.callState.audioInputDevice?.label, - isPermitted: stateManager.callState.audioInputDevice != null && - stateManager.callState.ownCapabilities - .contains(CallPermission.sendAudio), - ); - - final videoInputDevices = sfu_models.InputDevices( - availableDevices: _availableVideoInputs, - currentDevice: stateManager.callState.videoInputDevice?.label, - isPermitted: stateManager.callState.videoInputDevice != null && - stateManager.callState.ownCapabilities - .contains(CallPermission.sendVideo), - ); - - if (CurrentPlatform.isAndroid) { - androidState = sfu_models.AndroidState( - thermalState: _thermalStatus?.toAndroidThermalState(), - isPowerSaverMode: lowPowerMode, - ); - } else if (CurrentPlatform.isIos) { - appleState = sfu_models.AppleState( - thermalState: _thermalStatus?.toAppleThermalState(), - isLowPowerModeEnabled: lowPowerMode, - ); - } - - final request = sfu.SendStatsRequest( - sessionId: sessionId, - publisherStats: jsonEncode(rawStats['publisherStats']), - subscriberStats: jsonEncode(rawStats['subscriberStats']), - sdkVersion: streamVideoVersion, - sdk: streamSdkName, - android: androidState, - apple: appleState, - audioDevices: audioInputDevices, - videoDevices: videoInputDevices, - webrtcVersion: switch (CurrentPlatform.type) { - PlatformType.android => androidWebRTCVersion, - PlatformType.ios => iosWebRTCVersion, - _ => null, - }, - ); - - await sfuClient.sendStats( - request, - ); - }); - } - @override String toString() => 'CallSession{seq: $sessionSeq, id: $sessionId}'; } diff --git a/packages/stream_video/lib/src/call/sfu_stats_reporter.dart b/packages/stream_video/lib/src/call/sfu_stats_reporter.dart new file mode 100644 index 000000000..b9f8266c0 --- /dev/null +++ b/packages/stream_video/lib/src/call/sfu_stats_reporter.dart @@ -0,0 +1,157 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:battery_plus/battery_plus.dart'; +import 'package:thermal/thermal.dart'; + +import '../../../protobuf/video/sfu/signal_rpc/signal.pb.dart' as sfu; +import '../../protobuf/video/sfu/models/models.pb.dart' as sfu_models; +import '../../stream_video.dart'; +import '../../version.g.dart'; +import '../extensions/thermal_status_ext.dart'; +import '../sfu/data/models/sfu_error.dart'; +import 'session/call_session.dart'; +import 'state/call_state_notifier.dart'; + +class SfuStatsReporter { + SfuStatsReporter({ + required this.callSession, + required this.stateManager, + }) { + _thermalStatusSubscription = + Thermal().onThermalStatusChanged.listen((ThermalStatus status) { + _thermalStatus = status; + }); + + _mediaDeviceSubscription = + RtcMediaDeviceNotifier.instance.onDeviceChange.listen( + (devices) { + _availableAudioInputs = devices + .where((device) => device.kind == RtcMediaDeviceKind.audioInput) + .map((device) => device.label) + .toList(); + + _availableVideoInputs = devices + .where((device) => device.kind == RtcMediaDeviceKind.videoInput) + .map((device) => device.label) + .toList(); + }, + ); + } + + final CallSession callSession; + final CallStateNotifier stateManager; + + StreamSubscription>? _mediaDeviceSubscription; + StreamSubscription? _thermalStatusSubscription; + + List? _availableAudioInputs; + List? _availableVideoInputs; + ThermalStatus? _thermalStatus; + + Timer? _timer; + + void run({Duration interval = const Duration(seconds: 10)}) { + _timer?.cancel(); + _timer = Timer.periodic(interval, (_) { + sendSfuStats(); + }); + } + + Future sendSfuStats({ + int? connectionTimeMs, + SfuReconnectionStrategy? reconnectionStrategy, + }) async { + final publisherStatsBundle = + await callSession.rtcManager?.publisher.getStats(); + final subscriberStatsBundle = + await callSession.rtcManager?.subscriber.getStats(); + + if (publisherStatsBundle == null || subscriberStatsBundle == null) { + return; + } + + final lowPowerMode = await Battery().isInBatterySaveMode; + + sfu_models.AndroidState? androidState; + sfu_models.AppleState? appleState; + + final audioInputDevices = sfu_models.InputDevices( + availableDevices: _availableAudioInputs, + currentDevice: stateManager.callState.audioInputDevice?.label, + isPermitted: stateManager.callState.audioInputDevice != null && + stateManager.callState.ownCapabilities + .contains(CallPermission.sendAudio), + ); + + final videoInputDevices = sfu_models.InputDevices( + availableDevices: _availableVideoInputs, + currentDevice: stateManager.callState.videoInputDevice?.label, + isPermitted: stateManager.callState.videoInputDevice != null && + stateManager.callState.ownCapabilities + .contains(CallPermission.sendVideo), + ); + + if (CurrentPlatform.isAndroid) { + androidState = sfu_models.AndroidState( + thermalState: _thermalStatus?.toAndroidThermalState(), + isPowerSaverMode: lowPowerMode, + ); + } else if (CurrentPlatform.isIos) { + appleState = sfu_models.AppleState( + thermalState: _thermalStatus?.toAppleThermalState(), + isLowPowerModeEnabled: lowPowerMode, + ); + } + + final request = sfu.SendStatsRequest( + sessionId: callSession.sessionId, + publisherStats: jsonEncode(publisherStatsBundle.rawStats), + subscriberStats: jsonEncode(subscriberStatsBundle.rawStats), + sdkVersion: streamVideoVersion, + sdk: streamSdkName, + android: androidState, + apple: appleState, + audioDevices: audioInputDevices, + videoDevices: videoInputDevices, + webrtcVersion: switch (CurrentPlatform.type) { + PlatformType.android => androidWebRTCVersion, + PlatformType.ios => iosWebRTCVersion, + _ => null, + }, + telemetry: _calculateTelemetry(connectionTimeMs, reconnectionStrategy), + ); + + await callSession.sfuClient.sendStats( + request, + ); + } + + sfu.Telemetry? _calculateTelemetry( + int? connectionTimeMs, + SfuReconnectionStrategy? reconnectionStrategy, + ) { + if (connectionTimeMs == null) return null; + + final timeSeconds = connectionTimeMs / 1000; + if (reconnectionStrategy != null && + reconnectionStrategy != SfuReconnectionStrategy.unspecified) { + return sfu.Telemetry( + reconnection: sfu.Reconnection( + timeSeconds: timeSeconds, + strategy: reconnectionStrategy.toDto(), + ), + ); + } else { + return sfu.Telemetry( + connectionTimeSeconds: timeSeconds, + ); + } + } + + void stop() { + _timer?.cancel(); + _mediaDeviceSubscription?.cancel(); + _thermalStatusSubscription?.cancel(); + } +} diff --git a/packages/stream_video/lib/src/call/state/mixins/state_lifecycle_mixin.dart b/packages/stream_video/lib/src/call/state/mixins/state_lifecycle_mixin.dart index fd5552249..75ae1dc78 100644 --- a/packages/stream_video/lib/src/call/state/mixins/state_lifecycle_mixin.dart +++ b/packages/stream_video/lib/src/call/state/mixins/state_lifecycle_mixin.dart @@ -259,10 +259,6 @@ mixin StateLifecycleMixin on StateNotifier { PeerConnectionStats? publisherStats, PeerConnectionStats? subscriberStats, }) { - _logger.d( - () => - '[lifecycleCallStats] publisherStats: $publisherStats, subscriberStats: $subscriberStats, state: $state', - ); state = state.copyWith( publisherStats: publisherStats, subscriberStats: subscriberStats, diff --git a/packages/stream_video/lib/src/call/stats_reporter.dart b/packages/stream_video/lib/src/call/stats_reporter.dart new file mode 100644 index 000000000..60443324e --- /dev/null +++ b/packages/stream_video/lib/src/call/stats_reporter.dart @@ -0,0 +1,172 @@ +import 'dart:async'; + +import 'package:collection/collection.dart'; + +import '../../stream_video.dart'; +import '../webrtc/model/stats/rtc_codec.dart'; +import '../webrtc/model/stats/rtc_ice_candidate_pair.dart'; +import '../webrtc/model/stats/rtc_inbound_rtp_video_stream.dart'; +import '../webrtc/model/stats/rtc_outbound_rtp_video_stream.dart'; +import '../webrtc/rtc_manager.dart'; +import 'state/call_state_notifier.dart'; + +class StatsReporter { + StatsReporter({ + required this.rtcManager, + required this.stateManager, + }); + + final RtcManager rtcManager; + final CallStateNotifier stateManager; + + Stream<({CallStats publisherStats, CallStats subscriberStats})> run({ + Duration interval = const Duration(seconds: 10), + }) { + return Stream.periodic(interval, (_) => collectStats()) + .asyncMap((event) async { + final stats = await event; + _processStats(stats); + return event; + }); + } + + Future<({CallStats publisherStats, CallStats subscriberStats})> + collectStats() async { + final publisherStatsBundle = await rtcManager.publisher.getStats(); + final subscriberStatsBundle = await rtcManager.subscriber.getStats(); + + final publisherStats = CallStats( + peerType: StreamPeerType.publisher, + stats: publisherStatsBundle.rtcStats, + printable: publisherStatsBundle.printable, + raw: publisherStatsBundle.rawStats, + ); + + final subscriberStats = CallStats( + peerType: StreamPeerType.subscriber, + stats: subscriberStatsBundle.rtcStats, + printable: subscriberStatsBundle.printable, + raw: subscriberStatsBundle.rawStats, + ); + + return (publisherStats: publisherStats, subscriberStats: subscriberStats); + } + + void _processStats( + ({CallStats publisherStats, CallStats subscriberStats}) stats, + ) { + final state = stateManager.callState; + + var publisherStats = state.publisherStats ?? PeerConnectionStats.empty(); + var subscriberStats = state.subscriberStats ?? PeerConnectionStats.empty(); + + final allStats = stats.publisherStats.stats + .whereType() + .map(MediaStatsInfo.fromRtcOutboundRtpVideoStream); + + final mediaStats = allStats.firstWhereOrNull( + (s) => s.width != null && s.height != null && s.fps != null, + ); + + final jitterInMs = ((mediaStats?.jitter ?? 0) * 1000).toInt(); + final resolution = mediaStats != null + ? '${mediaStats.width} x ${mediaStats.height} @ ${mediaStats.fps}fps' + : null; + + var activeOutbound = allStats.toList(); + + if (publisherStats.outboundMediaStats.isNotEmpty) { + activeOutbound = activeOutbound + .where( + (s) => + publisherStats.outboundMediaStats.none((i) => s.id == i.id) || + publisherStats.outboundMediaStats + .firstWhere((i) => i.id == s.id) + .bytesSent != + s.bytesSent, + ) + .toList(); + } + + final codec = stats.publisherStats.stats + .whereType() + .where((c) => c.mimeType?.startsWith('video') ?? false) + .where((c) => activeOutbound.any((s) => s.videoCodecId == c.id)) + .map((c) => c.mimeType?.replaceFirst('video/', '')) + .where((c) => c != null) + .cast() + .toList(); + + publisherStats = publisherStats.copyWith( + resolution: resolution, + qualityDropReason: mediaStats?.qualityLimit, + jitterInMs: jitterInMs, + videoCodec: codec, + outboundMediaStats: allStats.toList(), + ); + + final inboudRtpVideo = stats.subscriberStats.stats + .whereType() + .firstOrNull; + + if (inboudRtpVideo != null) { + final jitterInMs = ((inboudRtpVideo.jitter ?? 0) * 1000).toInt(); + final resolution = inboudRtpVideo.frameWidth != null && + inboudRtpVideo.frameHeight != null && + inboudRtpVideo.framesPerSecond != null + ? '${inboudRtpVideo.frameWidth} x ${inboudRtpVideo.frameHeight} @ ${inboudRtpVideo.framesPerSecond}fps' + : null; + + final codecStats = stats.subscriberStats.stats + .whereType() + .where((c) => c.mimeType?.startsWith('video') ?? false) + .firstOrNull; + + final codec = codecStats?.mimeType?.replaceFirst('video/', ''); + + subscriberStats = subscriberStats.copyWith( + resolution: resolution, + jitterInMs: jitterInMs, + videoCodec: codec != null ? [codec] : [], + ); + } + + final subscriberCandidatePair = stats.subscriberStats.stats + .whereType() + .firstOrNull; + if (subscriberCandidatePair != null) { + final incomingBitrate = subscriberCandidatePair.availableIncomingBitrate; + + subscriberStats = subscriberStats.copyWith( + bitrateKbps: incomingBitrate != null ? incomingBitrate / 1000 : null, + ); + } + + final publisherCandidatePair = stats.subscriberStats.stats + .whereType() + .firstOrNull; + if (publisherCandidatePair != null) { + final latency = publisherCandidatePair.currentRoundTripTime; + final outgoingBitrate = publisherCandidatePair.availableOutgoingBitrate; + + publisherStats = publisherStats.copyWith( + latency: latency != null ? (latency * 1000).toInt() : null, + bitrateKbps: outgoingBitrate != null ? outgoingBitrate / 1000 : null, + ); + } + + var latencyHistory = state.latencyHistory; + if (publisherStats.latency != null) { + latencyHistory = [ + ...state.latencyHistory.reversed.take(19).toList().reversed, + publisherStats.latency!, + ]; + } + + stateManager.lifecycleCallStats( + publisherStats: publisherStats, + subscriberStats: subscriberStats, + latencyHistory: latencyHistory, + ); + } +} diff --git a/packages/stream_video/lib/src/models/call_preferences.dart b/packages/stream_video/lib/src/models/call_preferences.dart index 8c986051a..14a21dc39 100644 --- a/packages/stream_video/lib/src/models/call_preferences.dart +++ b/packages/stream_video/lib/src/models/call_preferences.dart @@ -3,6 +3,7 @@ import 'call_client_publish_options.dart'; abstract class CallPreferences { Duration get connectTimeout; Duration get reactionAutoDismissTime; + Duration get callStatsReportingInterval; bool get dropIfAloneInRingingFlow; ClientPublishOptions? get clientPublishOptions; } @@ -11,6 +12,7 @@ class DefaultCallPreferences implements CallPreferences { DefaultCallPreferences({ this.connectTimeout = const Duration(seconds: 60), this.reactionAutoDismissTime = const Duration(seconds: 5), + this.callStatsReportingInterval = const Duration(seconds: 2), this.dropIfAloneInRingingFlow = true, this.clientPublishOptions, }); @@ -21,6 +23,9 @@ class DefaultCallPreferences implements CallPreferences { @override final Duration reactionAutoDismissTime; + @override + final Duration callStatsReportingInterval; + @override final bool dropIfAloneInRingingFlow; diff --git a/packages/stream_video/lib/src/sfu/sfu_client.dart b/packages/stream_video/lib/src/sfu/sfu_client.dart index c934fe305..9575c58cb 100644 --- a/packages/stream_video/lib/src/sfu/sfu_client.dart +++ b/packages/stream_video/lib/src/sfu/sfu_client.dart @@ -69,7 +69,7 @@ class SfuClient { try { _logger.d(() => '[setPublisher] request: ${request.stringify()}'); final response = await _client.setPublisher(_withAuthHeaders(), request); - _logger.v(() => '[setPublisher] response: ${response.stringify()}'); + _logger.d(() => '[setPublisher] response: ${response.stringify()}'); return Result.success(response); } catch (e, stk) { return Result.failure(VideoErrors.compose(e, stk)); diff --git a/packages/stream_video/lib/src/webrtc/peer_connection.dart b/packages/stream_video/lib/src/webrtc/peer_connection.dart index 92ed43dd6..1c46d8151 100644 --- a/packages/stream_video/lib/src/webrtc/peer_connection.dart +++ b/packages/stream_video/lib/src/webrtc/peer_connection.dart @@ -1,6 +1,5 @@ import 'dart:async'; -import 'package:rxdart/rxdart.dart'; import 'package:stream_webrtc_flutter/stream_webrtc_flutter.dart' as rtc; import '../disposable.dart'; @@ -50,16 +49,6 @@ typedef OnTrack = void Function( rtc.RTCTrackEvent, ); -/// {@template onTrack} -/// Handler whenever we receive [RtcPrintableStats]. -/// {@endtemplate} -typedef OnStats = void Function( - StreamPeerConnection, - List, - RtcPrintableStats, - List>, -); - /// Wrapper around the WebRTC connection that contains tracks. class StreamPeerConnection extends Disposable { /// Creates [StreamPeerConnection] instance. @@ -80,7 +69,6 @@ class StreamPeerConnection extends Disposable { final StreamPeerType type; final rtc.RTCPeerConnection pc; final SdpEditor sdpEditor; - int _reportingIntervalMs = 2000; /// {@macro onStreamAdded} OnStreamAdded? onStreamAdded; @@ -96,22 +84,8 @@ class StreamPeerConnection extends Disposable { /// {@macro onTrack} OnTrack? onTrack; - /// {@macro onTrack} - OnStats? onStats; - - Stream>> get statsStream => - _statsController.stream.startWith([]); - final _pendingCandidates = []; - set reportingIntervalMs(int interval) { - _reportingIntervalMs = interval; - if (_statsTimer != null) { - _stopObservingStats(); - } - _startObservingStats(); - } - /// Creates an offer and sets it as the local description. Future> createOffer([ Map mediaConstraints = const {}, @@ -319,10 +293,6 @@ class StreamPeerConnection extends Disposable { _logger.v(() => '[onIceConnectionState] state: $state'); switch (state) { - case rtc.RTCIceConnectionState.RTCIceConnectionStateConnected: - return _startObservingStats(); - case rtc.RTCIceConnectionState.RTCIceConnectionStateClosed: - return _stopObservingStats(); case rtc.RTCIceConnectionState.RTCIceConnectionStateFailed: case rtc.RTCIceConnectionState.RTCIceConnectionStateDisconnected: onIssue?.call(this); @@ -331,72 +301,48 @@ class StreamPeerConnection extends Disposable { } } - Timer? _statsTimer; - final StreamController>> _statsController = - StreamController.broadcast(); - - void _startObservingStats() { - // Stop previous timer if any. - _stopObservingStats(); - - // Start new timer. - _statsTimer = Timer.periodic( - Duration(milliseconds: _reportingIntervalMs), - (_) async { - try { - if (_statsController.isClosed) return; - - final stats = await pc.getStats(); - final rtcPrintableStats = stats.toPrintableRtcStats(); - final rawStats = stats.toRawStats(); - final rtcStats = stats - .map((report) => report.toRtcStats()) - .where((element) => element != null) - .cast() - .toList(); - - onStats?.call(this, rtcStats, rtcPrintableStats, rawStats); - _statsController.add(rawStats); - } catch (e, stk) { - _logger.e(() => '[getStats] failed: $e; $stk'); - } - }, - ); - } - - void _stopObservingStats() { - _statsTimer?.cancel(); - _statsTimer = null; - } - void _onRenegotiationNeeded() { _logger.i(() => '[onRenegotiationNeeded] no args'); onRenegotiationNeeded?.call(this); } + Future< + ({ + List rtcStats, + RtcPrintableStats printable, + List> rawStats, + })> getStats() async { + final stats = await pc.getStats(); + + final rtcPrintableStats = stats.toPrintableRtcStats(); + final rawStats = stats.toRawStats(); + final rtcStats = stats + .map((report) => report.toRtcStats()) + .where((element) => element != null) + .cast() + .toList(); + + return ( + rtcStats: rtcStats, + printable: rtcPrintableStats, + rawStats: rawStats, + ); + } + @override Future dispose() async { _logger.d(() => '[dispose] no args'); _dropRtcCallbacks(); - _stopObservingStats(); onStreamAdded = null; onRenegotiationNeeded = null; onIceCandidate = null; onTrack = null; _pendingCandidates.clear(); - await _statsController.close(); await pc.dispose(); return await super.dispose(); } } -extension on rtc.StatsReport { - // ignore: unused_element - String stringify() { - return 'ts: $timestamp, id: $id, type: $type, values: $values'; - } -} - extension on rtc.RTCSessionDescription { rtc.RTCSessionDescription copyWith({ String? type, diff --git a/packages/stream_video/lib/src/webrtc/rtc_manager.dart b/packages/stream_video/lib/src/webrtc/rtc_manager.dart index 61d6cb414..b70bbb624 100644 --- a/packages/stream_video/lib/src/webrtc/rtc_manager.dart +++ b/packages/stream_video/lib/src/webrtc/rtc_manager.dart @@ -1,6 +1,5 @@ import 'package:collection/collection.dart'; import 'package:flutter/widgets.dart'; -import 'package:rxdart/rxdart.dart'; import 'package:sdp_transform/sdp_transform.dart'; import 'package:stream_webrtc_flutter/stream_webrtc_flutter.dart' as rtc; @@ -85,20 +84,6 @@ class RtcManager extends Disposable { publisher.onRenegotiationNeeded = cb; } - set onStatsReceived(OnStats? cb) { - subscriber.onStats = cb; - publisher.onStats = cb; - } - - Stream> get statsStream => CombineLatestStream.combine2( - subscriber.statsStream, - publisher.statsStream, - (subscriber, publisher) => { - 'subscriberStats': subscriber, - 'publisherStats': publisher, - }, - ).asBroadcastStream(); - OnLocalTrackMuted? onLocalTrackMuted; OnLocalTrackPublished? onLocalTrackPublished; OnRemoteTrackReceived? onRemoteTrackReceived; @@ -424,7 +409,6 @@ class RtcManager extends Disposable { onLocalTrackMuted = null; onLocalTrackPublished = null; onRemoteTrackReceived = null; - onStatsReceived = null; await publisher.dispose(); await subscriber.dispose(); @@ -1261,11 +1245,6 @@ extension RtcManagerTrackHelper on RtcManager { return Result.failure(VideoErrors.compose(e, stk)); } } - - void updateReportingInterval(int reportingIntervalMs) { - publisher.reportingIntervalMs = reportingIntervalMs; - subscriber.reportingIntervalMs = reportingIntervalMs; - } } extension on RtcLocalTrack { diff --git a/packages/stream_video_flutter/lib/src/call_screen/call_diagnostics_content/call_diagnostics_content.dart b/packages/stream_video_flutter/lib/src/call_screen/call_diagnostics_content/call_diagnostics_content.dart index 9f068e08a..8d624b09a 100644 --- a/packages/stream_video_flutter/lib/src/call_screen/call_diagnostics_content/call_diagnostics_content.dart +++ b/packages/stream_video_flutter/lib/src/call_screen/call_diagnostics_content/call_diagnostics_content.dart @@ -33,27 +33,17 @@ class _CallDiagnosticsContentState extends State { CallStats? _subscriberStats; /// Controls the subscription to the stats updates. - StreamSubscription? _subscription; + StreamSubscription<({CallStats publisherStats, CallStats subscriberStats})>? + _subscription; @override void initState() { super.initState(); _subscription = widget.call.stats.listen((stats) { - // final local = stats.printable.local.substring(0, 28); - // final remote = stats.printable.remote.substring(0, 28); - // _logger.v( - // () => '[listenStats] #${stats.peerType}; ' - // 'local: $local, remote: $remote <<<<<<<<<<<<<<<<<<<<<<<<<<', - // ); - switch (stats.peerType) { - case StreamPeerType.publisher: - _publisherStats = stats; - break; - case StreamPeerType.subscriber: - _subscriberStats = stats; - break; - } - setState(() {}); + setState(() { + _publisherStats = stats.publisherStats; + _subscriberStats = stats.subscriberStats; + }); }); }