Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ordered consumer creation #746

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/NATS.Client.Core/NatsOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Client.Core.Internal;
#if NETSTANDARD
using Random = NATS.Client.Core.Internal.NetStandardExtensions.Random;
#endif

namespace NATS.Client.Core;

Expand Down Expand Up @@ -186,3 +189,21 @@ internal NatsOpts ReadUserInfoFromConnectionString()
}
}
}

public static class NatsOptsExtensions
{
/// <summary>
/// Applies an exponential backoff delay with an added random jitter for attempts.
/// </summary>
/// <param name="opts">The NatsOpts instance containing configuration settings for intervals and jitter.</param>
/// <param name="iter">The current attempt iteration, used to calculate the exponential delay.</param>
/// <returns>A task that completes after the calculated delay time has elapsed.</returns>
public static Task BackoffWithJitterAsync(this NatsOpts opts, int iter)
{
var baseDelay = opts.ReconnectWaitMin.TotalMilliseconds * Math.Pow(2, iter - 1);
var jitter = opts.ReconnectJitter.TotalMilliseconds * Random.Shared.NextDouble();

var delay = Math.Min(baseDelay + jitter, opts.ReconnectWaitMax.TotalMilliseconds);
return Task.Delay(TimeSpan.FromMilliseconds(delay));
}
}
8 changes: 8 additions & 0 deletions src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream.Models;
#if NETSTANDARD
using Random = NATS.Client.Core.Internal.NetStandardExtensions.Random;
#endif

namespace NATS.Client.JetStream;

Expand Down Expand Up @@ -314,11 +317,16 @@ private async Task<NatsJSConsumer> RecreateConsumer(string consumer, ulong seq,
catch (NatsJSApiNoResponseException)
{
}
catch (NatsJSApiException apiException) when (apiException.Error.Code == 503)
{
}

if (i == _opts.MaxResetAttempts)
{
throw new NatsJSException("Maximum number of create attempts reached.");
}

await _context.Connection.Opts.BackoffWithJitterAsync(i);
}

Info = info;
Expand Down
Loading