Skip to content

Commit

Permalink
nats-io#636 - JetStream Batch Get Client support
Browse files Browse the repository at this point in the history
* Added throw exception
  • Loading branch information
Ivandemidov00 committed Feb 15, 2025
1 parent edb6b4f commit 4b49424
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
18 changes: 14 additions & 4 deletions src/NATS.Client.JetStream/NatsJSStream.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System.Runtime.CompilerServices;
using NATS.Client.Core;
using NATS.Client.JetStream.Internal;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream;
Expand Down Expand Up @@ -202,17 +202,27 @@ public ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, Can
request: request,
cancellationToken);

private IAsyncEnumerable<NatsMsg<T>> GetBatchDirectInternalAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
private async IAsyncEnumerable<NatsMsg<T>> GetBatchDirectInternalAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ValidateStream();

return _context.Connection.RequestManyAsync<StreamMsgBatchGetRequest, T>(
var requestManyAsync = _context.Connection.RequestManyAsync(
subject: $"{_context.Opts.Prefix}.DIRECT.GET.{_name}",
data: request,
requestSerializer: NatsJSJsonSerializer<StreamMsgBatchGetRequest>.Default,
replySerializer: serializer,
replyOpts: new NatsSubOpts() { StopOnEmptyMsg = true, ThrowIfNoResponders = true },
replyOpts: new NatsSubOpts { StopOnEmptyMsg = true, ThrowIfNoResponders = true },
cancellationToken: cancellationToken);

await foreach (var msg in requestManyAsync.ConfigureAwait(false))
{
if (msg.Error is { } error)
{
throw error;
}

yield return msg;
}
}

private void ThrowIfDeleted()
Expand Down
8 changes: 4 additions & 4 deletions tests/NATS.Client.JetStream.Tests/DirectGetTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class DirectGetTest
public async Task Direct_get_when_stream_disable()
{
await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
Expand All @@ -33,7 +33,7 @@ public async Task Direct_get_when_stream_enable()
{
var testDataList = new List<TestData?>();
await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100));
var cancellationToken = cts.Token;
Expand All @@ -57,11 +57,11 @@ public async Task Direct_get_by_multi_last()
{
var testDataList = new List<TestData?>();
await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100));
var cancellationToken = cts.Token;
var streamConfig = new StreamConfig("multiLast", new[] { "multiLast.*" }) { AllowDirect = true };
var streamConfig = new StreamConfig("multiLast", ["multiLast.*"]) { AllowDirect = true };

var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);

Expand Down

0 comments on commit 4b49424

Please sign in to comment.