diff --git a/src/NATS.Client.Core/NatsHeaderParser.cs b/src/NATS.Client.Core/NatsHeaderParser.cs index e26077aa5..01f76ef48 100644 --- a/src/NATS.Client.Core/NatsHeaderParser.cs +++ b/src/NATS.Client.Core/NatsHeaderParser.cs @@ -119,6 +119,11 @@ private bool TryParseHeaderLine(ReadOnlySpan headerLine, NatsHeaders heade headers.Message = NatsHeaders.Messages.MessageSizeExceedsMaxBytes; headers.MessageText = NatsHeaders.MessageMessageSizeExceedsMaxBytesStr; } + else if (headerLine.SequenceEqual(NatsHeaders.MessageEobCode)) + { + headers.Message = NatsHeaders.Messages.EobCode; + headers.MessageText = NatsHeaders.MessageEobCodeStr; + } else { headers.Message = NatsHeaders.Messages.Text; diff --git a/src/NATS.Client.Core/NatsHeaders.cs b/src/NATS.Client.Core/NatsHeaders.cs index f7bec3f67..0521d431a 100644 --- a/src/NATS.Client.Core/NatsHeaders.cs +++ b/src/NATS.Client.Core/NatsHeaders.cs @@ -25,6 +25,7 @@ public enum Messages NoMessages, RequestTimeout, MessageSizeExceedsMaxBytes, + EobCode, } // Uses C# compiler's optimization for static byte[] data @@ -56,6 +57,10 @@ public enum Messages internal static ReadOnlySpan MessageMessageSizeExceedsMaxBytes => new byte[] { 77, 101, 115, 115, 97, 103, 101, 32, 83, 105, 122, 101, 32, 69, 120, 99, 101, 101, 100, 115, 32, 77, 97, 120, 66, 121, 116, 101, 115 }; internal static readonly string MessageMessageSizeExceedsMaxBytesStr = "Message Size Exceeds MaxBytes"; + // EOB + internal static ReadOnlySpan MessageEobCode => new byte[] { 69, 79, 66 }; + internal static readonly string MessageEobCodeStr = "EOB"; + private static readonly string[] EmptyKeys = Array.Empty(); private static readonly StringValues[] EmptyValues = Array.Empty(); diff --git a/src/NATS.Client.JetStream/INatsJSStream.cs b/src/NATS.Client.JetStream/INatsJSStream.cs index 9fd3734df..79317c2e9 100644 --- a/src/NATS.Client.JetStream/INatsJSStream.cs +++ b/src/NATS.Client.JetStream/INatsJSStream.cs @@ -112,5 +112,43 @@ ValueTask UpdateAsync( ValueTask> GetDirectAsync(StreamMsgGetRequest request, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + /// + /// Request a direct batch message + /// + /// Batch message request. + /// Serializer to use for the message type. + /// A used to cancel the API call. + /// Message type to deserialize. + /// There was an issue, stream must have allow direct set. + IAsyncEnumerable> GetBatchDirectAsync(StreamMsgBatchGetRequest request, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + + /// + /// Request a direct batch message + /// + /// Return last messages matching the subjects + /// The maximum amount of messages to be returned + /// Serializer to use for the message type. + /// A used to cancel the API call. + /// Message type to deserialize. + /// + /// Get up to batch number of messages for subject + /// + /// There was an issue, stream must have allow direct set. + IAsyncEnumerable> GetBatchDirectAsync(string[] multiLastBySubjects, ulong batch, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + + /// + /// Request a direct batch message + /// + /// The subject used filter messages that should be returned + /// The maximum amount of messages to be returned + /// Serializer to use for the message type. + /// A used to cancel the API call. + /// Message type to deserialize. + /// + /// Get the last message for each subject in the list up to the batch size + /// + /// There was an issue, stream must have allow direct set. + IAsyncEnumerable> GetBatchDirectAsync(string nextBySubject, ulong batch, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + ValueTask GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default); } diff --git a/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs b/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs new file mode 100644 index 000000000..af0f9ff1e --- /dev/null +++ b/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs @@ -0,0 +1,76 @@ +using System.Text.Json.Serialization; + +namespace NATS.Client.JetStream.Models; + +/// +/// A request to the JetStream $JS.API.STREAM.MSG.GET API +/// +public record StreamMsgBatchGetRequest +{ + /// + /// The maximum amount of messages to be returned for this request + /// + [JsonPropertyName("batch")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + [System.ComponentModel.DataAnnotations.Range(ulong.MinValue, ulong.MaxValue)] + public ulong Batch { get; set; } + + /// + /// The maximum amount of returned bytes for this request. + /// + [JsonPropertyName("max_bytes")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + [System.ComponentModel.DataAnnotations.Range(ulong.MinValue, long.MaxValue)] + public ulong MaxBytes { get; set; } + + /// + /// The minimum sequence for returned message + /// + [JsonPropertyName("seq")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + [System.ComponentModel.DataAnnotations.Range(ulong.MinValue, ulong.MaxValue)] + public ulong Seq { get; set; } + + /// + /// The minimum start time for returned message + /// + [JsonPropertyName("start_time")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTimeOffset StartTime { get; set; } + + /// + /// The subject used filter messages that should be returned + /// + [JsonPropertyName("next_by_subj")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] +#if NET6_0 + public string? NextBySubject { get; set; } = default!; +#else +#pragma warning disable SA1206 + public string? NextBySubject { get; set; } + +#pragma warning restore SA1206 +#endif + + /// + /// Return last messages mathing the subjects + /// + [JsonPropertyName("multi_last")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string[]? MultiLastBySubjects { get; set; } + + /// + /// Return message after sequence + /// + [JsonPropertyName("up_to_seq")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + [System.ComponentModel.DataAnnotations.Range(ulong.MinValue, ulong.MaxValue)] + public ulong UpToSequence { get; set; } + + /// + /// Return message after time + /// + [JsonPropertyName("up_to_time")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTimeOffset UpToTime { get; set; } +} diff --git a/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs b/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs index 375be743f..4994ebb10 100644 --- a/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs +++ b/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs @@ -78,6 +78,7 @@ public static class NatsJSJsonSerializer [JsonSerializable(typeof(StreamMsgDeleteResponse))] [JsonSerializable(typeof(StreamMsgGetRequest))] [JsonSerializable(typeof(StreamMsgGetResponse))] +[JsonSerializable(typeof(StreamMsgBatchGetRequest))] [JsonSerializable(typeof(StreamNamesRequest))] [JsonSerializable(typeof(StreamNamesResponse))] [JsonSerializable(typeof(StreamPurgeRequest))] diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs index 54f8b63e0..dffc12891 100644 --- a/src/NATS.Client.JetStream/NatsJSStream.cs +++ b/src/NATS.Client.JetStream/NatsJSStream.cs @@ -1,5 +1,5 @@ +using System.Runtime.CompilerServices; using NATS.Client.Core; -using NATS.Client.JetStream.Internal; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream; @@ -181,15 +181,61 @@ public ValueTask> GetDirectAsync(StreamMsgGetRequest request, INat cancellationToken: cancellationToken); } + public IAsyncEnumerable> GetBatchDirectAsync(StreamMsgBatchGetRequest request, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + => GetBatchDirectInternalAsync(request, serializer, cancellationToken); + + public IAsyncEnumerable> GetBatchDirectAsync(string[] multiLastBySubjects, ulong batch, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + { + var request = new StreamMsgBatchGetRequest { MultiLastBySubjects = multiLastBySubjects, Batch = batch }; + return GetBatchDirectInternalAsync(request, serializer, cancellationToken); + } + + public IAsyncEnumerable> GetBatchDirectAsync(string nextBySubject, ulong batch, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + { + var request = new StreamMsgBatchGetRequest { NextBySubject = nextBySubject, Batch = batch }; + return GetBatchDirectInternalAsync(request, serializer, cancellationToken); + } + public ValueTask GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default) => _context.JSRequestResponseAsync( subject: $"{_context.Opts.Prefix}.STREAM.MSG.GET.{_name}", request: request, cancellationToken); + private async IAsyncEnumerable> GetBatchDirectInternalAsync(StreamMsgBatchGetRequest request, INatsDeserialize? serializer = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ValidateStream(); + + var requestManyAsync = _context.Connection.RequestManyAsync( + subject: $"{_context.Opts.Prefix}.DIRECT.GET.{_name}", + data: request, + requestSerializer: NatsJSJsonSerializer.Default, + replySerializer: serializer, + replyOpts: new NatsSubOpts { StopOnEmptyMsg = true, ThrowIfNoResponders = true }, + cancellationToken: cancellationToken); + + await foreach (var msg in requestManyAsync.ConfigureAwait(false)) + { + if (msg.Error is { } error) + { + throw error; + } + + yield return msg; + } + } + private void ThrowIfDeleted() { if (_deleted) throw new NatsJSException($"Stream '{_name}' is deleted"); } + + private void ValidateStream() + { + if (!Info.Config.AllowDirect) + { + throw new InvalidOperationException("StreamMsgBatchGetRequest is not permitted when AllowDirect on stream disable"); + } + } } diff --git a/tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs b/tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs index 240a0700e..9d12ffb4b 100644 --- a/tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs +++ b/tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs @@ -96,6 +96,7 @@ public void ParserTests() [InlineData("Request Timeout", NatsHeaders.Messages.RequestTimeout)] [InlineData("Message Size Exceeds MaxBytes", NatsHeaders.Messages.MessageSizeExceedsMaxBytes)] [InlineData("test message", NatsHeaders.Messages.Text)] + [InlineData("EOB", NatsHeaders.Messages.EobCode)] public void ParserMessageEnumTests(string message, NatsHeaders.Messages result) { var parser = new NatsHeaderParser(Encoding.UTF8); diff --git a/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs b/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs new file mode 100644 index 000000000..458aaba8b --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs @@ -0,0 +1,78 @@ +using NATS.Client.Core.Tests; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream.Tests; + +public class DirectGetTest +{ + [SkipIfNatsServer(versionEarlierThan: "2.11")] + public async Task Direct_get_when_stream_disable() + { + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + var cts = new CancellationTokenSource(); + var cancellationToken = cts.Token; + var streamConfig = new StreamConfig("stream_disable", new[] { "stream_disable.x" }); + + var stream = await js.CreateStreamAsync(streamConfig, cancellationToken); + + async Task GetBatchAction() + { + var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { NextBySubject = "stream_disable.x" }; + await foreach (var unused in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, cancellationToken: cancellationToken)) + { + } + } + + await Assert.ThrowsAsync(GetBatchAction); + } + + [SkipIfNatsServer(versionEarlierThan: "2.11")] + public async Task Direct_get_when_stream_enable() + { + var testDataList = new List(); + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100)); + var cancellationToken = cts.Token; + var streamConfig = new StreamConfig("stream_enable", new[] { "stream_enable.x" }) { AllowDirect = true }; + + var stream = await js.CreateStreamAsync(streamConfig, cancellationToken); + + await js.PublishAsync("stream_enable.x", new TestData { Test = 1 }, TestDataJsonSerializer.Default, cancellationToken: cancellationToken); + + var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { NextBySubject = "stream_enable.x", Batch = 3 }; + await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer.Default, cancellationToken: cancellationToken)) + { + testDataList.Add(msg.Data); + } + + Assert.Single(testDataList); + } + + [SkipIfNatsServer(versionEarlierThan: "2.11")] + public async Task Direct_get_by_multi_last() + { + var testDataList = new List(); + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100)); + var cancellationToken = cts.Token; + var streamConfig = new StreamConfig("multiLast", ["multiLast.*"]) { AllowDirect = true }; + + var stream = await js.CreateStreamAsync(streamConfig, cancellationToken); + + await js.PublishAsync("multiLast.x", new TestData { Test = 1 }, TestDataJsonSerializer.Default, cancellationToken: cancellationToken); + await js.PublishAsync("multiLast.y", new TestData { Test = 2 }, TestDataJsonSerializer.Default, cancellationToken: cancellationToken); + + await foreach (var msg in stream.GetBatchDirectAsync(["multiLast.x", "multiLast.y"], 4, TestDataJsonSerializer.Default, cancellationToken: cancellationToken)) + { + testDataList.Add(msg.Data); + } + + Assert.Equal(2, testDataList.Count); + } +}