Skip to content

Commit 1da9a3c

Browse files
committed
adding new prefetchCount config
Signed-off-by: Neil South <[email protected]>
1 parent 054a0e9 commit 1da9a3c

File tree

5 files changed

+23
-17
lines changed

5 files changed

+23
-17
lines changed

.github/workflows/ci.yml

100644100755
+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ jobs:
8989
run: license_finder -r
9090

9191
- name: Check License Header
92-
uses: apache/skywalking-eyes@v0.5.0
92+
uses: apache/skywalking-eyes@v0.4.0
9393

9494
unit-test:
9595
runs-on: ubuntu-latest

doc/dependency_decisions.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -172,14 +172,14 @@
172172
- :who: mocsharp
173173
:why: MIT ( https://licenses.nuget.org/MIT)
174174
:versions:
175-
- 8.2.0
175+
- 8.2.1
176176
:when: 2022-11-09 18:57:32.294717110 Z
177177
- - :approve
178178
- Polly.Core
179179
- :who: mocsharp
180180
:why: MIT ( https://licenses.nuget.org/MIT)
181181
:versions:
182-
- 8.2.0
182+
- 8.2.1
183183
:when: 2022-11-09 18:57:32.294717110 Z
184184
- - :approve
185185
- RabbitMQ.Client
@@ -803,5 +803,5 @@
803803
- :who: mocsharp
804804
:why: MIT ( https://licenses.nuget.org/MIT)
805805
:versions:
806-
- 2.5.5
806+
- 2.5.6
807807
:when: 2022-08-16 21:40:32.294717110 Z

src/Plugins/RabbitMQ/ConfigurationKeys.cs

100644100755
+3-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ internal static class ConfigurationKeys
3131
public static readonly string RequeueDelay = "requeueDelay";
3232
public static readonly string UseSSL = "useSSL";
3333
public static readonly string Port = "port";
34-
public static readonly string[] PublisherRequiredKeys = new[] { EndPoint, Username, Password, VirtualHost, Exchange };
35-
public static readonly string[] SubscriberRequiredKeys = new[] { EndPoint, Username, Password, VirtualHost, Exchange, DeadLetterExchange, DeliveryLimit, RequeueDelay };
34+
public static readonly string[] PublisherRequiredKeys = [EndPoint, Username, Password, VirtualHost, Exchange];
35+
public static readonly string[] SubscriberRequiredKeys = [EndPoint, Username, Password, VirtualHost, Exchange, DeadLetterExchange, DeliveryLimit, RequeueDelay];
36+
public static readonly string PrefetchCount = "prefetchCount";
3637
}
3738
}

src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs

+13-8
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class RabbitMQMessageSubscriberService : IMessageBrokerSubscriberService
4949
private readonly string _portNumber;
5050
private IModel? _channel;
5151
private bool _disposedValue;
52+
private readonly ushort _prefetchCount = 1;
5253

5354
public event ConnectionErrorHandler? OnConnectionError;
5455

@@ -72,19 +73,23 @@ public RabbitMQMessageSubscriberService(IOptions<MessageBrokerServiceConfigurati
7273
_deadLetterExchange = configuration.SubscriberSettings[ConfigurationKeys.DeadLetterExchange];
7374
_deliveryLimit = int.Parse(configuration.SubscriberSettings[ConfigurationKeys.DeliveryLimit], NumberFormatInfo.InvariantInfo);
7475
_requeueDelay = int.Parse(configuration.SubscriberSettings[ConfigurationKeys.RequeueDelay], NumberFormatInfo.InvariantInfo);
76+
if (configuration.SubscriberSettings.TryGetValue(ConfigurationKeys.PrefetchCount, out var value))
77+
{
78+
_prefetchCount = ushort.Parse(value ?? "1", NumberFormatInfo.InvariantInfo);
79+
}
7580

76-
if (configuration.SubscriberSettings.ContainsKey(ConfigurationKeys.UseSSL))
81+
if (configuration.SubscriberSettings.TryGetValue(ConfigurationKeys.UseSSL, out var sslValue))
7782
{
78-
_useSSL = configuration.SubscriberSettings[ConfigurationKeys.UseSSL];
83+
_useSSL = sslValue;
7984
}
8085
else
8186
{
8287
_useSSL = string.Empty;
8388
}
8489

85-
if (configuration.SubscriberSettings.ContainsKey(ConfigurationKeys.Port))
90+
if (configuration.SubscriberSettings.TryGetValue(ConfigurationKeys.Port, out var portValue))
8691
{
87-
_portNumber = configuration.SubscriberSettings[ConfigurationKeys.Port];
92+
_portNumber = portValue;
8893
}
8994
else
9095
{
@@ -112,7 +117,7 @@ private void CreateChannel()
112117
_channel = _rabbitMqConnectionFactory.CreateChannel(ChannelType.Subscriber, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber) ?? throw new ServiceException("Failed to create a new channel to RabbitMQ");
113118
_channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false);
114119
_channel.ExchangeDeclare(_deadLetterExchange, ExchangeType.Topic, durable: true, autoDelete: false);
115-
_channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
120+
_channel.BasicQos(prefetchSize: 0, prefetchCount: _prefetchCount, global: false);
116121
_channel.ModelShutdown += Channel_ModelShutdown;
117122
_logger.ConnectedToRabbitMQ(Name, _endpoint, _virtualHost);
118123
});
@@ -234,15 +239,15 @@ private QueueDeclareOk DeclareQueues(string[] topics, string queue, ushort prefe
234239

235240
var queueDeclareResult = _channel!.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
236241

237-
var deadLetterExists = QueueExists(deadLetterQueue);
238-
if (deadLetterExists.exists == false)
242+
var (exists, accessable) = QueueExists(deadLetterQueue);
243+
if (exists == false)
239244
{
240245
_channel.QueueDeclare(queue: deadLetterQueue, durable: true, exclusive: false, autoDelete: false);
241246
}
242247

243248
try
244249
{
245-
BindToRoutingKeys(topics, queueDeclareResult.QueueName, deadLetterExists.accessable ? deadLetterQueue : "");
250+
BindToRoutingKeys(topics, queueDeclareResult.QueueName, accessable ? deadLetterQueue : "");
246251
_channel.BasicQos(0, prefetchCount, false);
247252
}
248253
catch (OperationInterruptedException operationInterruptedException)

src/Plugins/RabbitMQ/Tests/Integration/ReliabilityTest.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ public ReliabilityTest()
5454
_loggerSubscriber = new Mock<ILogger<RabbitMQMessageSubscriberService>>();
5555
_factory = new RabbitMQConnectionFactory(_logger.Object);
5656

57-
_publishers = new List<IMessageBrokerPublisherService>();
58-
_subscribers = new List<IMessageBrokerSubscriberService>();
57+
_publishers = [];
58+
_subscribers = [];
5959

6060
_options = Options.Create(new MessageBrokerServiceConfiguration());
6161
_options.Value.PublisherSettings[ConfigurationKeys.EndPoint] = RabbitMqConnection.HostName;
@@ -72,7 +72,7 @@ public ReliabilityTest()
7272
_options.Value.SubscriberSettings[ConfigurationKeys.DeliveryLimit] = RabbitMqConnection.DeliveryLimit;
7373
_options.Value.SubscriberSettings[ConfigurationKeys.RequeueDelay] = RabbitMqConnection.RequeueDelay;
7474

75-
_topics = new List<string>();
75+
_topics = [];
7676
_messages = new ConcurrentDictionary<string, int>();
7777
_random = new Random();
7878
SetupTopics();

0 commit comments

Comments
 (0)