Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host.RabbitMQ] Add ability to configure connection retry attempts and delay #310

Open
robert94p opened this issue Sep 23, 2024 · 14 comments
Assignees

Comments

@robert94p
Copy link

It is necessary to add the ability to configure the number of connection attempts (including infinite) and the delay between them. This is required because the connection is established when the application starts, but at that moment RabbitMQ may be unavailable for various reasons and could remain unavailable for an extended period. Alternatively, the ability to detect successful bus creation should be added so that in case of failure, the application can be restarted.

@zarusz
Copy link
Owner

zarusz commented Sep 23, 2024

It makes sense to add the connection retries.

Check also #282 as it could help (while not directly related to this issue, it could help in some scenarios).

Are you looking to contribute or rather asking for the feature to be built?

@robert94p
Copy link
Author

robert94p commented Sep 24, 2024

Rather a suggestion to add feature, as I don't have time to contribute myself.

@zarusz zarusz self-assigned this Sep 24, 2024
@robert94p
Copy link
Author

How can multiple consumers be used on a single bus, each with different parallelism limits? Currently, this isn't possible and isn't planned, is it?

@zarusz
Copy link
Owner

zarusz commented Dec 11, 2024

@robert94p do you need 2 separate consumers on the same rabbitmq queue?

With current implementation, the consumer declarations are grouped by the queue name (see RabbitMqMessageBus.cs). If you want to have 2 distinct consumer instances on the same queue, then a feature would be required (I could look into this).

Ideally, if you provide me with a sample bus setup to better understand the need.

@robert94p
Copy link
Author

Apologies, I wrote this under the wrong issue; it should have been under #205

I need 2 or more consumers. I have one producer, one exchange, and three queues for three different consumers. When publishing a message, the target queue is determined via the routingKey.

One of the consumers performs a resource-intensive operation, so it needs to be limited to processing only 2 messages in parallel. The other consumers can handle more messages simultaneously.

I would like to configure the number of messages that can be processed concurrently per consumer, similar to how it works in MemoryMessageBus. Currently, messages are processed based on the ConsumerDispatchConcurrency setting, which applies globally to all consumers.

@zarusz
Copy link
Owner

zarusz commented Dec 17, 2024

I think I can sort this. It makes sense to not group same consumer declarations if they all have a different routing key. Let me look into this and get back to you.

@zarusz
Copy link
Owner

zarusz commented Dec 21, 2024

There is already some built-in retry via AutomaticRecoveryEnabled.
There could be more added on the SMB side: https://www.rabbitmq.com/client-libraries/dotnet-api-guide#recovery-triggers

@robert94p
Copy link
Author

Yes, such functionalities exist. A feature for retrying connection attempts is needed for establishing a new connection. The built-in functionality of RabbitMQ does not allow retries when initially establishing a connection.

@Jan-Olof
Copy link

I like this idea. Would it also not be a good idea to be able to throw an exception once the retry attempts are all done? Now there only seem to be an error message in the log. Or am I missing something? I'm quite new to SMB.

@zarusz
Copy link
Owner

zarusz commented Dec 30, 2024

@Jan-Olof as far as I know, the initial connection retries should happen (3 times with some delay). After the 3rd attempt, it rethrows the error making publishing to the bus impossible. Link to the retry logic.

I've tested this with 2.7.0 version, and corrupted my URL to emulate an unreachable broker. Here is the result:

 Message: 
SlimMessageBus.ProducerMessageBusException : The Channel is not available at this time

 Stack Trace: 
RabbitMqMessageBus.ProduceToTransportBulk[T](IReadOnlyCollection`1 envelopes, String path, IMessageBusTarget targetBus, CancellationToken cancellationToken) line 143
MessageBusBase.ProduceToTransport(Object message, Type messageType, String path, IDictionary`2 messageHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken) line 418
MessageBusBase.SendInternal[TResponseMessage](Object request, String path, Type requestType, Type responseType, ProducerSettings producerSettings, DateTimeOffset created, DateTimeOffset expires, String requestId, IDictionary`2 requestHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken) line 572
<<BasicReqResp>b__1>d.MoveNext() line 252
--- End of stack trace from previous location ---
RabbitMqMessageBusIt.BasicReqResp() line 255
RabbitMqMessageBusIt.BasicReqRespOnTopic(RabbitMqMessageAcknowledgementMode acknowledgementMode) line 231
--- End of stack trace from previous location ---

 Standard Output: 
[14:23:45 INF] SlimMessageBus.Host.RabbitMQ.RabbitMqMessageBus Retrying 0 of 3 connection to RabbitMQ...
RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
---> System.AggregateException: One or more errors occurred. (Connection failed)
---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed
---> System.Net.Sockets.SocketException (11001): Nieznany host.
  at System.Net.NameResolutionPal.ProcessResult(SocketError errorCode, GetAddrInfoExContext* context)
  at System.Net.NameResolutionPal.GetAddressInfoExCallback(Int32 error, Int32 bytes, NativeOverlapped* overlapped)
--- End of stack trace from previous location ---
  at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
  at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, TimeSpan timeout)
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, TimeSpan timeout)
  --- End of inner exception stack trace ---
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, TimeSpan timeout)
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan timeout, AddressFamily family)
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan timeout)
  at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
  at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, ArrayPool`1 pool, Func`2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
  at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
  at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
  --- End of inner exception stack trace ---
  at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
  at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints)
  at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
  --- End of inner exception stack trace ---
  at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
  at RabbitMQ.Client.ConnectionFactory.CreateConnection(String clientProvidedName)
  at RabbitMQ.Client.ConnectionFactory.CreateConnection()
  at SlimMessageBus.Host.RabbitMQ.RabbitMqMessageBus.<CreateConnection>b__13_0(CancellationToken cancellationTask) in /_/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs:line 73
  at SlimMessageBus.Host.Retry.WithDelay(Func`2 operation, Func`3 shouldRetry, Nullable`1 delay, Nullable`1 jitter, CancellationToken cancellationToken) in /_/src/SlimMessageBus.Host/Retry.cs:line 28
[14:24:12 INF] SlimMessageBus.Host.RabbitMQ.RabbitMqMessageBus Retrying 1 of 3 connection to RabbitMQ...
RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
---> System.AggregateException: One or more errors occurred. (Connection failed)
---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed
---> System.Net.Sockets.SocketException (11001): Nieznany host.
  at System.Net.NameResolutionPal.ProcessResult(SocketError errorCode, GetAddrInfoExContext* context)
  at System.Net.NameResolutionPal.GetAddressInfoExCallback(Int32 error, Int32 bytes, NativeOverlapped* overlapped)
--- End of stack trace from previous location ---
  at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
  at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, TimeSpan timeout)
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, TimeSpan timeout)
  --- End of inner exception stack trace ---
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, TimeSpan timeout)
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan timeout, AddressFamily family)
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan timeout)
  at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
  at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, ArrayPool`1 pool, Func`2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
  at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
  at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
  --- End of inner exception stack trace ---
  at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
  at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints)
  at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
  --- End of inner exception stack trace ---
  at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
  at RabbitMQ.Client.ConnectionFactory.CreateConnection(String clientProvidedName)
  at RabbitMQ.Client.ConnectionFactory.CreateConnection()
  at SlimMessageBus.Host.RabbitMQ.RabbitMqMessageBus.<CreateConnection>b__13_0(CancellationToken cancellationTask) in /_/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs:line 73
  at SlimMessageBus.Host.Retry.WithDelay(Func`2 operation, Func`3 shouldRetry, Nullable`1 delay, Nullable`1 jitter, CancellationToken cancellationToken) in /_/src/SlimMessageBus.Host/Retry.cs:line 28
[14:24:39 INF] SlimMessageBus.Host.RabbitMQ.RabbitMqMessageBus Retrying 2 of 3 connection to RabbitMQ...
RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
---> System.AggregateException: One or more errors occurred. (Connection failed)
---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed
---> System.Net.Sockets.SocketException (11001): Nieznany host.
  at System.Net.NameResolutionPal.ProcessResult(SocketError errorCode, GetAddrInfoExContext* context)
  at System.Net.NameResolutionPal.GetAddressInfoExCallback(Int32 error, Int32 bytes, NativeOverlapped* overlapped)
--- End of stack trace from previous location ---
  at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
  at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, TimeSpan timeout)
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, TimeSpan timeout)
  --- End of inner exception stack trace ---
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, TimeSpan timeout)
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan timeout, AddressFamily family)
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan timeout)
  at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
  at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, ArrayPool`1 pool, Func`2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
  at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
  at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
  --- End of inner exception stack trace ---
  at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
  at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints)
  at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
  --- End of inner exception stack trace ---
  at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
  at RabbitMQ.Client.ConnectionFactory.CreateConnection(String clientProvidedName)
  at RabbitMQ.Client.ConnectionFactory.CreateConnection()
  at SlimMessageBus.Host.RabbitMQ.RabbitMqMessageBus.<CreateConnection>b__13_0(CancellationToken cancellationTask) in /_/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs:line 73
  at SlimMessageBus.Host.Retry.WithDelay(Func`2 operation, Func`3 shouldRetry, Nullable`1 delay, Nullable`1 jitter, CancellationToken cancellationToken) in /_/src/SlimMessageBus.Host/Retry.cs:line 28
[14:25:06 ERR] SlimMessageBus.Host.RabbitMQ.RabbitMqMessageBus Could not initialize RabbitMQ connection: None of the specified endpoints were reachable
RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
---> System.AggregateException: One or more errors occurred. (Connection failed)
---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed
---> System.Net.Sockets.SocketException (11001): Nieznany host.
  at System.Net.NameResolutionPal.ProcessResult(SocketError errorCode, GetAddrInfoExContext* context)
  at System.Net.NameResolutionPal.GetAddressInfoExCallback(Int32 error, Int32 bytes, NativeOverlapped* overlapped)
--- End of stack trace from previous location ---
  at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
  at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, TimeSpan timeout)
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, TimeSpan timeout)
  --- End of inner exception stack trace ---
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, TimeSpan timeout)
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan timeout, AddressFamily family)
  at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan timeout)
  at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
  at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, ArrayPool`1 pool, Func`2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
  at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
  at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
  --- End of inner exception stack trace ---
  at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
  at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints)
  at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
  --- End of inner exception stack trace ---
  at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
  at RabbitMQ.Client.ConnectionFactory.CreateConnection(String clientProvidedName)
  at RabbitMQ.Client.ConnectionFactory.CreateConnection()
  at SlimMessageBus.Host.RabbitMQ.RabbitMqMessageBus.<CreateConnection>b__13_0(CancellationToken cancellationTask) in /_/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs:line 73
  at SlimMessageBus.Host.Retry.WithDelay(Func`2 operation, Func`3 shouldRetry, Nullable`1 delay, Nullable`1 jitter, CancellationToken cancellationToken) in /_/src/SlimMessageBus.Host/Retry.cs:line 28
  at SlimMessageBus.Host.Retry.WithDelay(Func`2 operation, Func`3 shouldRetry, Nullable`1 delay, Nullable`1 jitter, CancellationToken cancellationToken) in /_/src/SlimMessageBus.Host/Retry.cs:line 39
  at SlimMessageBus.Host.RabbitMQ.RabbitMqMessageBus.CreateConnection() in /_/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs:line 67
[14:25:06 INF] SlimMessageBus.Host.MessageBusBase Starting consumers for Main bus...
[14:25:06 INF] SlimMessageBus.Host.MessageBusBase Creating consumers for Main bus...
[14:25:06 ERR] SlimMessageBus.Host.MessageBusBase Could not auto start consumers

Do you expect something else here?
Perhaps other types of Rabbit exceptions should be accounted for.

@Jan-Olof
Copy link

Thank you for the quick reply. It was the error in the last line I was thinking of:

[14:25:06 ERR] SlimMessageBus.Host.MessageBusBase Could not auto start consumers

For my purpose it would be better if it threw an exception. But really, it would be enough if there was something like a property you could access that showed if there is an active connection to RabbitMQ or not. I.e. an easy way to know the connection status. But I guess you should make a "ping" publish to find out?

@zarusz
Copy link
Owner

zarusz commented Dec 31, 2024

Thanks for the clarification.

  1. We could add something like consumer start retries (endless, with some delay).

  2. Other than that one could check Consumer start/stop status via https://github.com/zarusz/SlimMessageBus/blob/master/docs/intro.md#start-or-stop-message-consumption using the IConsumerControl interface.

  3. Also in v3, there will be a consumer circuit breaker in [Host] Consumer Circuit breaker #353 which would stop the consumers and resume them depending on some downstream system up status. While not directly related here, I think it could also help to monitor the health of the broker network accessibility.

Let me know your thoughts. Specifically if 2) does what you asked for, and if 1) would be useful.

@Jan-Olof
Copy link

Jan-Olof commented Jan 2, 2025

Thanks again for your answer. I had missed the IsStarted in IConsumerControl. It is indeed what I asked for. Also 1) would be very useful, as I would like connection retries to be endless (or at least until you tell it to stop.

Returning to the producer (though this may be off topic). I don't suppose you use the RabbitMQ Publisher Confirms feature?

https://www.rabbitmq.com/docs/confirms#publisher-confirms

https://rianjs.net/2013/12/publisher-confirms-with-rabbitmq-and-c-sharp

@zarusz
Copy link
Owner

zarusz commented Jan 2, 2025

@Jan-Olof yes, we're not using Publish Confirms right now. This could be added and I've added a request for this: #360

I will look at adding the endless consumer start as part of this ticket (at some point):

We could add something like consumer start retries (endless, with some delay).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants