Skip to content

Commit

Permalink
fix(llc): fix for anonymous user joining the call (#827)
Browse files Browse the repository at this point in the history
* fix for anonymous user joining the call

* wip

* fix(llc): fixed enabling/disabling cloned tracks (#828)

* fixed cloned track disabling/enabling

* tweak
  • Loading branch information
Brazol authored Jan 16, 2025
1 parent 2fd8ccc commit d81f476
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 82 deletions.
2 changes: 2 additions & 0 deletions packages/stream_video/lib/src/call/call.dart
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,8 @@ class Call {
_logger.v(() => '[startSession] applying connect options');
await _applyConnectOptions();
},
isAnonymousUser:
_streamVideo.state.currentUser.type == UserType.anonymous,
);

_subscriptions.add(
Expand Down
58 changes: 36 additions & 22 deletions packages/stream_video/lib/src/call/session/call_session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,13 @@ class CallSession extends Disposable {
})>> start({
sfu_events.ReconnectDetails? reconnectDetails,
FutureOr<void> Function(RtcManager)? onRtcManagerCreatedCallback,
bool isAnonymousUser = false,
}) async {
try {
_logger.d(() => '[start] no args');
_logger.d(
() => '[start] reconnectDetails: $reconnectDetails, '
'isAnonymousUser: $isAnonymousUser',
);

await _ensureClientDetails();

Expand Down Expand Up @@ -304,25 +308,35 @@ class CallSession extends Disposable {
);

_logger.v(() => '[start] sfu joined: $event');
final currentUserId = stateManager.callState.currentUserId;
final localParticipant = event.callState.participants.firstWhere(
(it) => it.userId == currentUserId,
);
final localTrackId = localParticipant.trackLookupPrefix;

_logger.v(() => '[start] localTrackId: $localTrackId');
rtcManager = await rtcManagerFactory.makeRtcManager(
publisherId: localTrackId,
publishOptions: event.publishOptions,
)
..onPublisherIceCandidate = _onLocalIceCandidate
..onSubscriberIceCandidate = _onLocalIceCandidate
..onPublisherIssue = onPeerConnectionIssue
..onSubscriberIssue = onPeerConnectionIssue
..onLocalTrackMuted = _onLocalTrackMuted
..onLocalTrackPublished = _onLocalTrackPublished
..onRenegotiationNeeded = _onRenegotiationNeeded
..onRemoteTrackReceived = _onRemoteTrackReceived;

if (isAnonymousUser) {
rtcManager = await rtcManagerFactory.makeRtcManager()
..onSubscriberIceCandidate = _onLocalIceCandidate
..onSubscriberIssue = onPeerConnectionIssue
..onRenegotiationNeeded = _onRenegotiationNeeded
..onRemoteTrackReceived = _onRemoteTrackReceived;
} else {
final currentUserId = stateManager.callState.currentUserId;
final localParticipant = event.callState.participants.firstWhere(
(it) => it.userId == currentUserId,
);
final localTrackId = localParticipant.trackLookupPrefix;

_logger.v(() => '[start] localTrackId: $localTrackId');

rtcManager = await rtcManagerFactory.makeRtcManager(
publisherId: localTrackId,
publishOptions: event.publishOptions,
)
..onPublisherIceCandidate = _onLocalIceCandidate
..onSubscriberIceCandidate = _onLocalIceCandidate
..onPublisherIssue = onPeerConnectionIssue
..onSubscriberIssue = onPeerConnectionIssue
..onLocalTrackMuted = _onLocalTrackMuted
..onLocalTrackPublished = _onLocalTrackPublished
..onRenegotiationNeeded = _onRenegotiationNeeded
..onRemoteTrackReceived = _onRemoteTrackReceived;
}

await onRtcManagerCreatedCallback?.call(rtcManager!);
_rtcManagerSubject!.add(rtcManager!);
Expand Down Expand Up @@ -404,13 +418,13 @@ class CallSession extends Disposable {
'[fastReconnect] fast-reconnect possible - requesting ICE restarts',
);

await rtcManager?.publisher.pc.restartIce();
await rtcManager?.publisher?.pc.restartIce();

final remoteTracks =
rtcManager!.tracks.values.whereType<RtcRemoteTrack>().toList();

for (final track in remoteTracks) {
await _onRemoteTrackReceived(rtcManager!.publisher, track);
await _onRemoteTrackReceived(rtcManager!.subscriber, track);
}

_logger.d(() => '[fastReconnect] completed');
Expand Down
2 changes: 1 addition & 1 deletion packages/stream_video/lib/src/call/sfu_stats_reporter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class SfuStatsReporter {
SfuReconnectionStrategy? reconnectionStrategy,
}) async {
final publisherStatsBundle =
await callSession.rtcManager?.publisher.getStats();
await callSession.rtcManager?.publisher?.getStats();
final subscriberStatsBundle =
await callSession.rtcManager?.subscriber.getStats();

Expand Down
10 changes: 6 additions & 4 deletions packages/stream_video/lib/src/call/stats_reporter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import '../webrtc/model/stats/rtc_codec.dart';
import '../webrtc/model/stats/rtc_ice_candidate_pair.dart';
import '../webrtc/model/stats/rtc_inbound_rtp_video_stream.dart';
import '../webrtc/model/stats/rtc_outbound_rtp_video_stream.dart';
import '../webrtc/model/stats/rtc_printable_stats.dart';
import '../webrtc/peer_type.dart';
import '../webrtc/rtc_manager.dart';
import 'state/call_state_notifier.dart';
Expand All @@ -33,14 +34,15 @@ class StatsReporter {

Future<({CallStats publisherStats, CallStats subscriberStats})>
collectStats() async {
final publisherStatsBundle = await rtcManager.publisher.getStats();
final publisherStatsBundle = await rtcManager.publisher?.getStats();
final subscriberStatsBundle = await rtcManager.subscriber.getStats();

final publisherStats = CallStats(
peerType: StreamPeerType.publisher,
stats: publisherStatsBundle.rtcStats,
printable: publisherStatsBundle.printable,
raw: publisherStatsBundle.rawStats,
stats: publisherStatsBundle?.rtcStats ?? [],
printable: publisherStatsBundle?.printable ??
const RtcPrintableStats(local: '', remote: ''),
raw: publisherStatsBundle?.rawStats ?? [],
);

final subscriberStats = CallStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class CoordinatorClientOpenApi extends CoordinatorClient {
required TokenManager tokenManager,
required LatencyService latencyService,
required RetryPolicy retryPolicy,
this.isAnonymous = false,
}) : _rpcUrl = rpcUrl,
_wsUrl = wsUrl,
_apiKey = apiKey,
Expand All @@ -54,6 +55,8 @@ class CoordinatorClientOpenApi extends CoordinatorClient {
final LatencyService _latencyService;
final RetryPolicy _retryPolicy;

final bool isAnonymous;

late final open.ApiClient _apiClient = open.ApiClient(
basePath: _rpcUrl,
authentication: _Authentication(
Expand Down Expand Up @@ -142,7 +145,14 @@ class CoordinatorClientOpenApi extends CoordinatorClient {
}

Future<Result<None>> _waitUntilConnected() async {
_logger.w(() => '[waitUntilConnected] user.id: ${_user?.id}');
if (isAnonymous) {
_logger.d(
() => '[waitUntilConnected] anonymous user does not require connection',
);
return const Result.success(none);
}

_logger.d(() => '[waitUntilConnected] user.id: ${_user?.id}');
return _connectionState
.firstWhere(
(it) => it.isConnected,
Expand Down
9 changes: 9 additions & 0 deletions packages/stream_video/lib/src/stream_video.dart
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class StreamVideo extends Disposable {
}) : _options = options,
_state = MutableClientState(user) {
_client = buildCoordinatorClient(
user: user,
apiKey: apiKey,
tokenManager: _tokenManager,
latencySettings: _options.latencySettings,
Expand Down Expand Up @@ -270,10 +271,12 @@ class StreamVideo extends Disposable {
'Cannot connect anonymous user to the WS due to Missing Permissions',
);
}

_connectOperation ??= _connect(
includeUserDetails: includeUserDetails,
registerPushDevice: registerPushDevice,
).asCancelable();

return _connectOperation!
.valueOrDefault(Result.error('connect was cancelled'))
.whenComplete(() {
Expand All @@ -298,6 +301,7 @@ class StreamVideo extends Disposable {
bool registerPushDevice = true,
}) async {
_logger.i(() => '[connect] currentUser.id: ${_state.currentUser.id}');

if (_connectionState.isConnected) {
_logger.w(() => '[connect] rejected (already connected)');
final token = _tokenManager.getCachedToken();
Expand All @@ -306,9 +310,11 @@ class StreamVideo extends Disposable {
}
return Result.success(token);
}

_connectionState = ConnectionState.connecting(
_state.currentUser.id,
);

// guest user will be updated when token gets fetched
final tokenResult = await _tokenManager.getToken();
if (tokenResult is! Success<UserToken>) {
Expand All @@ -319,6 +325,7 @@ class StreamVideo extends Disposable {
);
return tokenResult;
}

final user = _state.user.value;
_logger.v(() => '[connect] currentUser.id : ${user.id}');
try {
Expand Down Expand Up @@ -857,6 +864,7 @@ class StreamVideo extends Disposable {
}

CoordinatorClient buildCoordinatorClient({
required User user,
required String rpcUrl,
required String wsUrl,
required String apiKey,
Expand All @@ -876,6 +884,7 @@ CoordinatorClient buildCoordinatorClient({
retryPolicy: retryPolicy,
rpcUrl: rpcUrl,
wsUrl: wsUrl,
isAnonymous: user.type == UserType.anonymous,
),
);
}
Expand Down
Loading

0 comments on commit d81f476

Please sign in to comment.