Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feature/better-suppor…
Browse files Browse the repository at this point in the history
…t-for-disposable-assets-in-tests
  • Loading branch information
sierpinskid committed Mar 1, 2024
2 parents f73bb4f + 02eaaee commit c8cce0b
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;

namespace StreamVideo.Core.LowLevelClient
{
internal class DisposedDuringOperationException : Exception
{

}

internal static class AppDisposedDuringOperationExceptionExt
{
public static void ThrowDisposedDuringOperationIfNull(this StreamPeerConnection streamPeerConnection)
{
if (streamPeerConnection == null)
{
throw new DisposedDuringOperationException();
}
}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,15 @@ private async void OnSfuSubscriberOffer(SubscriberOffer subscriberOffer)
};

await _subscriber.SetRemoteDescriptionAsync(rtcSessionDescription);
_subscriber.ThrowDisposedDuringOperationIfNull();

var answer = await _subscriber.CreateAnswerAsync();
_subscriber.ThrowDisposedDuringOperationIfNull();

//StreamTodo: mangle SDP

await _subscriber.SetLocalDescriptionAsync(ref answer);
_subscriber.ThrowDisposedDuringOperationIfNull();

var sendAnswerRequest = new SendAnswerRequest
{
Expand All @@ -543,6 +546,10 @@ private async void OnSfuSubscriberOffer(SubscriberOffer subscriberOffer)

await RpcCallAsync(sendAnswerRequest, GeneratedAPI.SendAnswer, nameof(GeneratedAPI.SendAnswer),
preLog: true);
}
catch (DisposedDuringOperationException)
{

}
catch (Exception e)
{
Expand Down Expand Up @@ -765,7 +772,8 @@ private async void OnPublisherNegotiationNeeded()
}

var offer = await _publisher.CreateOfferAsync();

_publisher.ThrowDisposedDuringOperationIfNull();

//StreamTodo: ignored the _config.Audio.EnableRed because with current webRTC version this modification causes a crash
//We're also forcing the red codec in the StreamPeerConnection but atm this results in "InvalidModification"
//This is most likely issue with the webRTC lib
Expand All @@ -782,6 +790,7 @@ private async void OnPublisherNegotiationNeeded()
}

await _publisher.SetLocalDescriptionAsync(ref offer);
_publisher.ThrowDisposedDuringOperationIfNull();

// //StreamTodo: timeout + break if we're disconnecting/reconnecting
// while (_sfuWebSocket.ConnectionState != ConnectionState.Connected)
Expand Down Expand Up @@ -809,6 +818,7 @@ private async void OnPublisherNegotiationNeeded()
#endif

var result = await RpcCallAsync(request, GeneratedAPI.SetPublisher, nameof(GeneratedAPI.SetPublisher));
_publisher.ThrowDisposedDuringOperationIfNull();

#if STREAM_DEBUG_ENABLED
_logs.Warning($"[Publisher] RemoteDesc (SDP Answer):\n{result.Sdp}");
Expand All @@ -819,6 +829,10 @@ await _publisher.SetRemoteDescriptionAsync(new RTCSessionDescription()
type = RTCSdpType.Answer,
sdp = result.Sdp
});
}
catch (DisposedDuringOperationException)
{

}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void Dispose()
WebsocketClient.ConnectionFailed -= OnConnectionFailed;
WebsocketClient.Disconnected -= OnDisconnected;

//StreamTodo: we're disposing the WS but we're not the owner, would be better to accept a factory method so we own the obj
//StreamTodo: we're disposing the WS but we're not the owner, would be better to receive a factory method so we own the obj
WebsocketClient.Dispose();
}

Expand Down Expand Up @@ -294,7 +294,7 @@ private void OnConnectionFailed()
private void OnDisconnected()
{
#if STREAM_DEBUG_ENABLED
Logs.Warning($"{LogsPrefix} Websocket Disconnected");
Logs.Warning($"{LogsPrefix} Websocket Disconnected. Messages left: {WebsocketClient.QueuedMessagesCount}");
#endif
ConnectionState = ConnectionState.Disconnected;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ protected override void ProcessMessages()
var sfuEvent = SfuEvent.Parser.ParseFrom(msg);

#if STREAM_DEBUG_ENABLED

DebugLogEvent(sfuEvent);
#endif

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public interface IWebsocketClient : IDisposable
event Action Connected;
event Action Disconnected;
event Action ConnectionFailed;
int QueuedMessagesCount { get; }

bool TryDequeueMessage(out byte[] message);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public class NativeWebSocketWrapper : IWebsocketClient
public event Action Connected;
public event Action Disconnected;
public event Action ConnectionFailed;

public int QueuedMessagesCount => _messages.Count;

public NativeWebSocketWrapper(ILogs logs, bool isDebugMode)
{
Expand Down Expand Up @@ -82,6 +84,7 @@ public async Task DisconnectAsync()

UnsubscribeFromEvents();
await _webSocket.Close();
_messages.Clear();
Disconnected?.Invoke();
}

Expand Down
32 changes: 24 additions & 8 deletions Packages/StreamVideo/Runtime/Libs/Websockets/WebsocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class WebsocketClient : IWebsocketClient

public WebSocketState State => _internalClient?.State ?? WebSocketState.None;

public int QueuedMessagesCount => _receiveQueue.Count;

/// <param name="isDebugMode">Additional logs will be printed</param>
public WebsocketClient(ILogs logs, Encoding encoding = default, bool isDebugMode = false)
{
Expand Down Expand Up @@ -90,7 +92,7 @@ public void Send(string message)

_sendQueue.Add(messageSegment);
}

public void Send(byte[] message)
{
var messageSegment = new ArraySegment<byte>(message);
Expand Down Expand Up @@ -125,6 +127,11 @@ public async Task DisconnectAsync(WebSocketCloseStatus closeStatus, string close
{
LogInfoIfDebugMode("Disconnect");
await TryDisposeResourcesAsync(closeStatus, closeMessage);

_receiveQueue.Clear();
while (_sendQueue.TryTake(out _))
{
}

Disconnected?.Invoke();
}
Expand All @@ -135,7 +142,7 @@ public void Dispose()
DisconnectAsync(WebSocketCloseStatus.NormalClosure, "WebSocket client is disposed")
.ContinueWith(_ => LogExceptionIfDebugMode(_.Exception), TaskContinuationOptions.OnlyOnFaulted);
}

private const int UpdatesPerSecond = 20;
private const int UpdatePeriod = 1000 / UpdatesPerSecond;
private const int UpdatePeriodOffset = UpdatePeriod / 2;
Expand Down Expand Up @@ -255,10 +262,15 @@ private async Task TryDisposeResourcesAsync(WebSocketCloseStatus closeStatus, st
{
try
{
_backgroundReceiveTimer?.Dispose();
_backgroundReceiveTimer = null;
_backgroundSendTimer?.Dispose();
_backgroundSendTimer = null;
if (_backgroundReceiveTimer != null)
{
await _backgroundReceiveTimer.DisposeAsync();
}

if (_backgroundSendTimer != null)
{
await _backgroundSendTimer.DisposeAsync();
}
}
catch (Exception e)
{
Expand Down Expand Up @@ -297,8 +309,12 @@ private async Task TryDisposeResourcesAsync(WebSocketCloseStatus closeStatus, st
}
finally
{
_internalClient.Dispose();
_internalClient = null;
//StreamTOdo: this fixes possible null ref if Dispose was called multiple times but perhaps this logic should not be called multiple times
if (_internalClient == null)
{
_internalClient.Dispose();
_internalClient = null;
}
}
}

Expand Down

0 comments on commit c8cce0b

Please sign in to comment.