Skip to content

Commit dad28b3

Browse files
authored
Implement StreamResponse (#110)
Implements the `StreamResponse` class that provides direct access to the response content stream.
1 parent eb5416e commit dad28b3

File tree

3 files changed

+56
-5
lines changed

3 files changed

+56
-5
lines changed

src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
using System.Text.Json;
1313
using System.Threading;
1414
using System.Threading.Tasks;
15+
1516
using Elastic.Transport.Diagnostics;
1617
using Elastic.Transport.Extensions;
18+
1719
using static Elastic.Transport.ResponseBuilderDefaults;
1820

1921
namespace Elastic.Transport;
@@ -26,7 +28,6 @@ internal static class ResponseBuilderDefaults
2628
{
2729
typeof(StringResponse), typeof(BytesResponse), typeof(VoidResponse), typeof(DynamicResponse)
2830
};
29-
3031
}
3132

3233
/// <summary>
@@ -225,7 +226,7 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
225226

226227
using (responseStream)
227228
{
228-
if (SetSpecialTypes<TResponse>(mimeType, bytes, requestData.MemoryStreamFactory, out var r)) return r;
229+
if (SetSpecialTypes<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;
229230

230231
if (details.HttpStatusCode.HasValue &&
231232
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
@@ -288,7 +289,7 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
288289
}
289290
}
290291

291-
private static bool SetSpecialTypes<TResponse>(string mimeType, byte[] bytes,
292+
private static bool SetSpecialTypes<TResponse>(string mimeType, byte[] bytes, Stream responseStream,
292293
MemoryStreamFactory memoryStreamFactory, out TResponse cs)
293294
where TResponse : TransportResponse, new()
294295
{
@@ -298,6 +299,8 @@ private static bool SetSpecialTypes<TResponse>(string mimeType, byte[] bytes,
298299

299300
if (responseType == typeof(StringResponse))
300301
cs = new StringResponse(bytes.Utf8String()) as TResponse;
302+
else if (responseType == typeof(StreamResponse))
303+
cs = new StreamResponse(responseStream, mimeType) as TResponse;
301304
else if (responseType == typeof(BytesResponse))
302305
cs = new BytesResponse(bytes) as TResponse;
303306
else if (responseType == typeof(VoidResponse))

src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
118118
responseMessage = client.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).GetAwaiter().GetResult();
119119
#endif
120120

121+
receive = responseMessage;
121122
statusCode = (int)responseMessage.StatusCode;
122123
}
123124

@@ -152,8 +153,11 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
152153
{
153154
ex = e;
154155
}
155-
using (receive)
156-
using (responseStream ??= Stream.Null)
156+
157+
var isStreamResponse = typeof(TResponse) == typeof(StreamResponse);
158+
159+
using (isStreamResponse ? DiagnosticSources.SingletonDisposable : receive)
160+
using (isStreamResponse ? Stream.Null : responseStream ??= Stream.Null)
157161
{
158162
TResponse response;
159163

@@ -165,6 +169,10 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
165169
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
166170
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);
167171

172+
// Defer disposal of the response message
173+
if (response is StreamResponse sr)
174+
sr.Finalizer = () => receive.Dispose();
175+
168176
if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
169177
return response;
170178

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using System;
6+
using System.IO;
7+
8+
namespace Elastic.Transport;
9+
10+
/// <summary>
11+
/// A response that exposes the response <see cref="TransportResponse{T}.Body"/> as <see cref="Stream"/>.
12+
/// <para>
13+
/// Must be disposed after use.
14+
/// </para>
15+
/// </summary>
16+
public sealed class StreamResponse :
17+
TransportResponse<Stream>,
18+
IDisposable
19+
{
20+
internal Action? Finalizer { get; set; }
21+
22+
/// <summary>
23+
/// The MIME type of the response, if present.
24+
/// </summary>
25+
public string MimeType { get; }
26+
27+
/// <inheritdoc cref="StreamResponse"/>
28+
public StreamResponse(Stream body, string? mimeType)
29+
{
30+
Body = body;
31+
MimeType = mimeType ?? string.Empty;
32+
}
33+
34+
/// <inheritdoc cref="IDisposable.Dispose"/>
35+
public void Dispose()
36+
{
37+
Body.Dispose();
38+
Finalizer?.Invoke();
39+
}
40+
}

0 commit comments

Comments
 (0)