@@ -18,7 +18,7 @@ internal partial class RemoteJSRuntime : JSRuntime
18
18
private readonly CircuitOptions _options ;
19
19
private readonly ILogger < RemoteJSRuntime > _logger ;
20
20
private CircuitClientProxy _clientProxy ;
21
- private readonly ConcurrentDictionary < long , DotNetStreamReference > _pendingDotNetToJSStreams = new ( ) ;
21
+ private readonly ConcurrentDictionary < long , CancelableDotNetStreamReference > _pendingDotNetToJSStreams = new ( ) ;
22
22
private bool _permanentlyDisconnected ;
23
23
private readonly long _maximumIncomingBytes ;
24
24
private int _byteArraysToBeRevivedTotalBytes ;
@@ -152,21 +152,27 @@ protected override void ReceiveByteArray(int id, byte[] data)
152
152
153
153
protected override async Task TransmitStreamAsync ( long streamId , DotNetStreamReference dotNetStreamReference )
154
154
{
155
- if ( ! _pendingDotNetToJSStreams . TryAdd ( streamId , dotNetStreamReference ) )
155
+ var cancelableStreamReference = new CancelableDotNetStreamReference ( dotNetStreamReference ) ;
156
+ if ( ! _pendingDotNetToJSStreams . TryAdd ( streamId , cancelableStreamReference ) )
156
157
{
157
158
throw new ArgumentException ( $ "The stream { streamId } is already pending.") ;
158
159
}
159
160
160
161
// SignalR only supports streaming being initiated from the JS side, so we have to ask it to
161
162
// start the stream. We'll give it a maximum of 10 seconds to do so, after which we give up
162
163
// and discard it.
163
- var cancellationToken = new CancellationTokenSource ( TimeSpan . FromSeconds ( 10 ) ) . Token ;
164
- cancellationToken . Register ( ( ) =>
164
+ CancellationTokenSource cancellationTokenSource = new ( TimeSpan . FromSeconds ( 10 ) ) ;
165
+
166
+ // Store CTS to dispose later.
167
+ cancelableStreamReference . CancellationTokenSource = cancellationTokenSource ;
168
+
169
+ cancellationTokenSource . Token . Register ( ( ) =>
165
170
{
166
171
// If by now the stream hasn't been claimed for sending, stop tracking it
167
- if ( _pendingDotNetToJSStreams . TryRemove ( streamId , out var timedOutStream ) && ! timedOutStream . LeaveOpen )
172
+ if ( _pendingDotNetToJSStreams . TryRemove ( streamId , out var timedOutCancelableStreamReference ) )
168
173
{
169
- timedOutStream . Stream . Dispose ( ) ;
174
+ timedOutCancelableStreamReference . StreamReference . Dispose ( ) ;
175
+ timedOutCancelableStreamReference . CancellationTokenSource ? . Dispose ( ) ;
170
176
}
171
177
} ) ;
172
178
@@ -175,8 +181,13 @@ protected override async Task TransmitStreamAsync(long streamId, DotNetStreamRef
175
181
176
182
public bool TryClaimPendingStreamForSending ( long streamId , out DotNetStreamReference pendingStream )
177
183
{
178
- if ( _pendingDotNetToJSStreams . TryRemove ( streamId , out pendingStream ) )
184
+ if ( _pendingDotNetToJSStreams . TryRemove ( streamId , out var cancelableStreamReference ) )
179
185
{
186
+ pendingStream = cancelableStreamReference . StreamReference ;
187
+
188
+ // Dispose CTS for claimed Stream.
189
+ cancelableStreamReference . CancellationTokenSource ? . Dispose ( ) ;
190
+
180
191
return true ;
181
192
}
182
193
@@ -193,6 +204,18 @@ public void MarkPermanentlyDisconnected()
193
204
protected override async Task < Stream > ReadJSDataAsStreamAsync ( IJSStreamReference jsStreamReference , long totalLength , CancellationToken cancellationToken = default )
194
205
=> await RemoteJSDataStream . CreateRemoteJSDataStreamAsync ( this , jsStreamReference , totalLength , _maximumIncomingBytes , _options . JSInteropDefaultCallTimeout , cancellationToken ) ;
195
206
207
+ private class CancelableDotNetStreamReference
208
+ {
209
+ public CancelableDotNetStreamReference ( DotNetStreamReference streamReference )
210
+ {
211
+ StreamReference = streamReference ;
212
+ }
213
+
214
+ public CancellationTokenSource ? CancellationTokenSource { get ; set ; }
215
+
216
+ public DotNetStreamReference StreamReference { get ; }
217
+ }
218
+
196
219
public static partial class Log
197
220
{
198
221
[ LoggerMessage ( 1 , LogLevel . Debug , "Begin invoke JS interop '{AsyncHandle}': '{FunctionIdentifier}'" , EventName = "BeginInvokeJS" ) ]
0 commit comments