Delivery Policy with OrderedConsumer #425
Unanswered
codymullins
asked this question in
Q&A
Replies: 2 comments 1 reply
-
Looking at the JetStream docs, the issue here might actually be because I'm using an ordered consumer with this delivery policy. Is this correct? If so, does this delivery policy still apply? |
Beta Was this translation helpful? Give feedback.
1 reply
-
Here is an example for a generic message processing use case. (Apologies if I misunderstood your use case and this is way off. In that case I hope it may help someone else) // > rm ~\AppData\Local\Temp\nats
// > nats-server.exe -js
// > nats stream list
// > nats consumer report ORDERS
// > cls; while(1) { $Host.UI.RawUI.CursorPosition=@{x=0;y= 0}; nats stream list; nats consumer report ORDERS; sleep 1 }
// > dotnet add package NATS.Net
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
Console.OutputEncoding = System.Text.Encoding.UTF8;
await using var nats = new NatsConnection();
var js = new NatsJSContext(nats);
await js.CreateStreamAsync(new StreamConfig("ORDERS", ["order.>"]));
if (args.Length > 0 && args[0] == "publisher")
{
var count = 0;
while (true)
{
count++;
var id = $"{count:d4}";
var order =$"created: {DateTime.Now:HH:mm:ss}, id: {id}";
var ack = await js.PublishAsync($"order.new.{id}", order);
ack.EnsureSuccess();
Console.WriteLine($"Published order: {order} ➡️");
await Task.Delay(1_000);
}
}
else if (args.Length > 0 && args[0] == "consumer-ack")
{
// all success
var consumer = await js.CreateOrUpdateConsumerAsync("ORDERS", new ConsumerConfig("order_processor"));
await foreach (var msg in consumer.ConsumeAsync<string>())
{
Console.WriteLine($"Processing order: {msg.Data} ✅");
await msg.AckAsync();
}
}
else if (args.Length > 0 && args[0] == "consumer-rnd")
{
// simulating random failures
var consumer = await js.CreateOrUpdateConsumerAsync("ORDERS", new ConsumerConfig("order_processor"));
await foreach (var msg in consumer.ConsumeAsync<string>())
{
Console.Write($"Processing order: {msg.Data} ");
await Task.Delay(1_000);
// using random simulate failure one third of the time, no ack 1/3 and success 1/3
var result = Random.Shared.Next(3);
if (result == 0)
{
// e.g. consumer crashed
// in this case message will be redelivered by the server
Console.WriteLine("❌");
}
else if (result == 1)
{
// e.g. message is invalid
// in this case message will be marked as failed and not redelivered
Console.WriteLine("🚫");
await msg.NakAsync();
}
else
{
Console.WriteLine("✅");
await msg.AckAsync();
}
}
}
else
{
Console.WriteLine("usage: dotnet run -- [publisher|consumer-ack|consumer-rnd]");
} Recording.2024-03-02.004324.mp4 |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I'm building multiple worker processes that are activated when messages are pushed to the stream. I'd like these processes to be able to start up and process messages in the stream. If they close, I'd like them to just pick up where they left off. If they weren't started when the messages were pushed to the stream, I want them to pick up as normal too.
I started building a way to track the consumer's position in the stream with the KV store, but then noticed
ConsumerConfigDeliverPolicy.LastPerSubject
- it appears this might already do what I'd like, but it doesn't. Every time I start the consumer, it appears to run from the beginning and even triggers multiple times.Can we get some clarity on how these delivery policies should be used, and if this is expected?
In summary, do we need to track the position of the consumers ourselves or should the delivery policies take care of this?
Update 1 - OrderedConsumer
I have confirmed that using the Ordered Consumer was the issue here, but the question above remains, and now some new observations:
DurableName
without settingName
on the consumer, I get a cryptic error message. SettingDurableName
should be sufficient or there should be a better validation message here. I only realized the issue from How can I create a durable consumer? #234Beta Was this translation helpful? Give feedback.
All reactions