diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs index 6b45bf2f..dffc1289 100644 --- a/src/NATS.Client.JetStream/NatsJSStream.cs +++ b/src/NATS.Client.JetStream/NatsJSStream.cs @@ -1,5 +1,5 @@ +using System.Runtime.CompilerServices; using NATS.Client.Core; -using NATS.Client.JetStream.Internal; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream; @@ -202,17 +202,27 @@ public ValueTask GetAsync(StreamMsgGetRequest request, Can request: request, cancellationToken); - private IAsyncEnumerable> GetBatchDirectInternalAsync(StreamMsgBatchGetRequest request, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + private async IAsyncEnumerable> GetBatchDirectInternalAsync(StreamMsgBatchGetRequest request, INatsDeserialize? serializer = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { ValidateStream(); - return _context.Connection.RequestManyAsync( + var requestManyAsync = _context.Connection.RequestManyAsync( subject: $"{_context.Opts.Prefix}.DIRECT.GET.{_name}", data: request, requestSerializer: NatsJSJsonSerializer.Default, replySerializer: serializer, - replyOpts: new NatsSubOpts() { StopOnEmptyMsg = true, ThrowIfNoResponders = true }, + replyOpts: new NatsSubOpts { StopOnEmptyMsg = true, ThrowIfNoResponders = true }, cancellationToken: cancellationToken); + + await foreach (var msg in requestManyAsync.ConfigureAwait(false)) + { + if (msg.Error is { } error) + { + throw error; + } + + yield return msg; + } } private void ThrowIfDeleted() diff --git a/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs b/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs index 574cc093..458aaba8 100644 --- a/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs +++ b/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs @@ -9,7 +9,7 @@ public class DirectGetTest public async Task Direct_get_when_stream_disable() { await using var server = NatsServer.StartJS(); - var nats = server.CreateClientConnection(); + await using var nats = server.CreateClientConnection(); var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(); var cancellationToken = cts.Token; @@ -33,7 +33,7 @@ public async Task Direct_get_when_stream_enable() { var testDataList = new List(); await using var server = NatsServer.StartJS(); - var nats = server.CreateClientConnection(); + await using var nats = server.CreateClientConnection(); var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100)); var cancellationToken = cts.Token; @@ -57,11 +57,11 @@ public async Task Direct_get_by_multi_last() { var testDataList = new List(); await using var server = NatsServer.StartJS(); - var nats = server.CreateClientConnection(); + await using var nats = server.CreateClientConnection(); var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100)); var cancellationToken = cts.Token; - var streamConfig = new StreamConfig("multiLast", new[] { "multiLast.*" }) { AllowDirect = true }; + var streamConfig = new StreamConfig("multiLast", ["multiLast.*"]) { AllowDirect = true }; var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);