Skip to content

Exception in ReadChannelOnceAsync method unexpectedly pauses the consumer. #243

@jheisong

Description

@jheisong

Problem Description
An exception occurred in the ReadChannelOnceAsync method of the Silverback.Messaging.Broker.Kafka.ConsumerChannelsManager class. The exception was logged but caused the consumer to pause. We have error policies implemented in the OnError method, using a specialization of RetryableErrorPolicyBase, but this specific exception (System.Threading.Channels.ChannelClosedException) was not captured by the configured policy.

We would like to understand how this exception can be externally captured so we can implement an action to handle it.

This behavior has caused unexpected interruptions in the message flow and requires manual intervention to restart the consumer.

Steps to Reproduce
Currently, we are unable to consistently reproduce the issue. However, this behavior has been observed multiple times in the production environment.

Expected
The consumer should automatically restart and continue processing messages even after such an exception. Alternatively, we should be able to capture the exception within the error policies.

Actual
The consumer is paused after the exception, and no automatic recovery occurs, even with error policies configured.

Versions Used:
Silverback.Integration.HealthChecks: 4.5.1
Silverback.Integration.Newtonsoft: 4.5.1
Framework: .NET 8.0

Error Policy Configuration:
.OnError(policy)
Using a specialization of RetryableErrorPolicyBase.

Logs:
{
"attributes": {
"MessageTemplate": "Fatal error occurred processing the consumed message. The consumer will be stopped. | consumerId: {consumerId}, endpointName: {endpointName}",
"Level": "Fatal",
"Properties": {
"ExceptionDetail": {
"Type": "System.Threading.Channels.ChannelClosedException",
"Message": "The channel has been closed.",
"HResult": -2146233079,
"TargetSite": "Void Throw()",
"Source": "System.Private.CoreLib"
},
"ApplicationName": "",
"MachineName": "
",
"ThreadId": 67,
"consumerId": "
",
"endpointName": "
*",
"EventId": {
"Id": 1023,
"Name": "Silverback.Integration_ConsumerFatalError"
},
"SourceContext": "Silverback.Messaging.Broker.KafkaConsumer"
},
"error": {
"stack": "System.Threading.Channels.ChannelClosedException: The channel has been closed.\n at System.Threading.Channels.AsyncOperation1.GetResult(Int16 token)\n at Silverback.Messaging.Broker.Kafka.ConsumerChannelsManager.ReadChannelOnceAsync(Int32 channelIndex, CancellationToken cancellationToken)\n at Silverback.Messaging.Broker.Kafka.ConsumerChannelsManager.ReadChannelAsync(Int32 channelIndex, CancellationToken cancellationToken)", "kind": "System.Threading.Channels.ChannelClosedException", "message": "The channel has been closed." }, "Timestamp": "2025-01-06T23:58:00.0751329+00:00", "Exception": "System.Threading.Channels.ChannelClosedException: The channel has been closed.\n at System.Threading.Channels.AsyncOperation1.GetResult(Int16 token)\n at Silverback.Messaging.Broker.Kafka.ConsumerChannelsManager.ReadChannelOnceAsync(Int32 channelIndex, CancellationToken cancellationToken)\n at Silverback.Messaging.Broker.Kafka.ConsumerChannelsManager.ReadChannelAsync(Int32 channelIndex, CancellationToken cancellationToken)"
}
}

Questions

  1. Is this the expected behavior for this exception (ChannelClosedException)?
  2. How can we configure Silverback to capture this exception and automatically restart the consumer, avoiding it being paused?
    3 . Are there any configurations or best practices to prevent consumer interruptions due to uncaptured exceptions?

Thank you for your attention and support! ;)

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions