Skip to content

Commit 1e87f20

Browse files
authored
Add cancellation token support to CompleteTaskOnCloseStream (#163)
This update enhances CompleteTaskOnCloseStream by incorporating an optional CancellationTokenSource. The token ensures proper resource management and cleanup by disposing of the source upon stream disposal. This improves the handling of async operations with better cancellation support.
1 parent a07eb36 commit 1e87f20

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

src/Elastic.Transport/Components/TransportClient/Content/RequestDataContent.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ async Task SerializeToStreamAsync(Stream stream, TransportContext context, Cance
123123
{
124124
var source = CancellationTokenSource.CreateLinkedTokenSource(_token, cancellationToken);
125125
var serializeToStreamTask = new TaskCompletionSource<bool>();
126-
var wrappedStream = new CompleteTaskOnCloseStream(stream, serializeToStreamTask);
126+
var wrappedStream = new CompleteTaskOnCloseStream(stream, serializeToStreamTask, source);
127127
await _onStreamAvailableAsync(_boundConfiguration, _postData, wrappedStream, this, context, source.Token).ConfigureAwait(false);
128128
await serializeToStreamTask.Task.ConfigureAwait(false);
129129
}
@@ -153,18 +153,21 @@ protected override bool TryComputeLength(out long length)
153153
internal class CompleteTaskOnCloseStream : DelegatingStream
154154
{
155155
private readonly TaskCompletionSource<bool> _serializeToStreamTask;
156+
private readonly CancellationTokenSource? _source;
156157

157-
public CompleteTaskOnCloseStream(Stream innerStream, TaskCompletionSource<bool> serializeToStreamTask)
158+
public CompleteTaskOnCloseStream(Stream innerStream, TaskCompletionSource<bool> serializeToStreamTask, CancellationTokenSource? source = null)
158159
: base(innerStream)
159160
{
160161
Contract.Assert(serializeToStreamTask != null);
161162
_serializeToStreamTask = serializeToStreamTask;
163+
_source = source;
162164
}
163165

164166
protected override void Dispose(bool disposing)
165167
{
166168
_serializeToStreamTask.TrySetResult(true);
167169
base.Dispose();
170+
_source?.Dispose();
168171
}
169172

170173
public override void Close() => _serializeToStreamTask.TrySetResult(true);

0 commit comments

Comments
 (0)