Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #69 from SoCreate/feature/throttle-processing-duri…
Browse files Browse the repository at this point in the history
…ng-failure

Throttle processing during failure
  • Loading branch information
lurock authored Apr 1, 2019
2 parents 3543510 + c9f0c28 commit a164ba3
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 27 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# 9.1.0 (2019-03-26)
# 9.1.0 (2019-04-01)
### Features
* **Subscribe Retry**: Added a retry strategy for when a subscriber fails to subscribe because the Broker doesn't exist yet.
* **BrokerServiceUri**: We recently lost the ability to specify the BrokerUri when publishing/subscribing. Added the ability to pass the BrokerUri to the BrokerServiceLocator class.
* **Throttle on Failure**: Added a config to slow down the processing loop when errors occur.
* **Filter Broker Stats**: Added ability to filter on time, Service name, or message type using query parameters in the GET broker/stats API in the demo app.

# 9.0.0 (2019-03-21)
### Features
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
Expand All @@ -23,11 +24,14 @@ public BrokerController(IBrokerClient brokerClient)

// GET api/broker/stats
[HttpGet("stats")]
public async Task<ActionResult<Dictionary<string, List<QueueStats>>>> Get()
public async Task<ActionResult<Dictionary<string, IEnumerable<QueueStats>>>> Get([FromQuery] BrokerStatsQueryParams queryParams)
{
try
{
return await _brokerClient.GetBrokerStatsAsync();
return (await _brokerClient.GetBrokerStatsAsync())
.Where(item => queryParams.MessageType == null || item.Key.Contains(queryParams.MessageType))
.ToDictionary(item => item.Key, item => item.Value.Where(i => i.Time > queryParams.FromTime &&
(queryParams.ServiceName == null || i.ServiceName.Contains(queryParams.ServiceName))));
}
catch (Exception ex)
{
Expand Down Expand Up @@ -111,4 +115,11 @@ public async Task<IActionResult> Delete(string queueName)
}
}
}

public class BrokerStatsQueryParams
{
public DateTime FromTime { get; set; }
public string ServiceName { get; set; }
public string MessageType { get; set; }
}
}
26 changes: 17 additions & 9 deletions src/SoCreate.ServiceFabric.PubSub/BrokerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class BrokerClient : IBrokerClient
/// <summary>
/// A list of QueueStats for each queue on the Broker Service. Hold up to <see cref="QueueStatCapacity"/> items at a time.
/// </summary>
private Dictionary<string, List<QueueStats>> _queueStats = new Dictionary<string, List<QueueStats>>();
private readonly Dictionary<string, List<QueueStats>> _queueStats = new Dictionary<string, List<QueueStats>>();

/// <summary>
/// A dictionary of Reference Wrappers, keyed by queue name, representing all queues on the Broker Service.
Expand Down Expand Up @@ -85,11 +85,7 @@ public Task ProcessMessageAsync(MessageWrapper messageWrapper)
/// <inheritdoc />
public async Task<Dictionary<string, List<QueueStats>>> GetBrokerStatsAsync()
{
var tasks = from brokerService in await _brokerServiceLocator.GetBrokerServicesForAllPartitionsAsync()
select brokerService.GetBrokerStatsAsync();

var allStats = await Task.WhenAll(tasks.ToList());
foreach (var stat in allStats.SelectMany(stat => stat.Stats))
foreach (var stat in await GetAllBrokerStatsAsync())
{
if (!_queueStats.ContainsKey(stat.QueueName))
{
Expand All @@ -102,8 +98,6 @@ public async Task<Dictionary<string, List<QueueStats>>> GetBrokerStatsAsync()
}
}

_subscriberReferences = allStats.SelectMany(stat => stat.Queues).ToDictionary(i => i.Key, i => i.Value);

return _queueStats;
}

Expand All @@ -112,16 +106,30 @@ public async Task UnsubscribeByQueueNameAsync(string queueName)
{
if (!_subscriberReferences.ContainsKey(queueName))
{
await GetBrokerStatsAsync();
await GetAllBrokerStatsAsync();
}

if (_subscriberReferences.TryGetValue(queueName, out var referenceWrapper))
{
var messageType = queueName.Split('_')[0];
var brokerService = await _brokerServiceLocator.GetBrokerServiceForMessageAsync(messageType);
await brokerService.UnsubscribeAsync(referenceWrapper, messageType);
_subscriberReferences.Remove(queueName);
_queueStats.Remove(queueName);
}
}

private async Task<IEnumerable<QueueStats>> GetAllBrokerStatsAsync()
{
var tasks = from brokerService in await _brokerServiceLocator.GetBrokerServicesForAllPartitionsAsync()
select brokerService.GetBrokerStatsAsync();

var allStats = await Task.WhenAll(tasks.ToList());

_subscriberReferences = allStats.SelectMany(stat => stat.Queues).ToDictionary(i => i.Key, i => i.Value);

return allStats.SelectMany(stat => stat.Stats);
}
}

public static class BrokerClientExtensions
Expand Down
2 changes: 1 addition & 1 deletion src/SoCreate.ServiceFabric.PubSub/BrokerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ await TimeoutRetryHelper.ExecuteInTransaction(StateManager, async (tx, token, st
}
catch (Exception ex)
{
await BrokerEventsManager.OnMessageDeliveryFailedAsync(queueName, subscriber, message.Value, ex);
await BrokerEventsManager.OnMessageDeliveryFailedAsync(queueName, subscriber, message.Value, ex, ThrottleFactor);
throw;
}
}
Expand Down
26 changes: 14 additions & 12 deletions src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,14 @@ public abstract class BrokerServiceBase : StatefulService, IBrokerService
protected TimeSpan DueTime { get; set; } = TimeSpan.FromSeconds(5);

/// <summary>
/// Gets or sets the interval to wait between batches of publishing messages. (Default: 5s)
/// Gets or sets the interval to wait between batches of publishing messages. (Default: 1s)
/// </summary>
protected TimeSpan Period { get; set; } = TimeSpan.FromSeconds(5);
protected TimeSpan Period { get; set; } = TimeSpan.FromSeconds(1);

/// <summary>
/// Gets or sets the amount to throttle queue processing when deliveries are failing. Slow down queue processing by a factor of X. (Default 10) (Default: 10)
/// </summary>
protected int ThrottleFactor { get; set; } = 10;

/// <summary>
/// Get or Sets the maximum period to process messages before allowing enqueuing
Expand Down Expand Up @@ -252,18 +257,15 @@ protected override async Task RunAsync(CancellationToken cancellationToken)
//process messages for given time, then allow other transactions to enqueue messages
var cts = new CancellationTokenSource(MaxProcessingPeriod);
var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, cancellationToken);
var timeoutCancellationToken = linkedTokenSource.Token;
try
{
var elements = _queues.ToArray();
var tasks = new List<Task>(elements.Length);

foreach (var element in elements)
{
var subscriber = element.Value;
string queueName = element.Key;
tasks.Add(ProcessQueues(linkedTokenSource.Token, subscriber, queueName));
}
await Task.WhenAll(tasks);
await Task.WhenAll(
from subscription in _queues
let queueName = subscription.Key
let subscriber = subscription.Value
where subscriber.ShouldProcessMessages()
select ProcessQueues(timeoutCancellationToken, subscriber, queueName));
}
catch (TaskCanceledException)
{//swallow and move on..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ public async Task OnMessageDeliveredAsync(string queueName, ReferenceWrapper sub
_stats[queueName].TotalDelivered++;
}

public async Task OnMessageDeliveryFailedAsync(string queueName, ReferenceWrapper subscriber, MessageWrapper messageWrapper, Exception exception)
public async Task OnMessageDeliveryFailedAsync(string queueName, ReferenceWrapper subscriber, MessageWrapper messageWrapper, Exception exception, int throttleFactor)
{
if (MessageDeliveryFailed != null)
{
await MessageDeliveryFailed.Invoke(queueName, subscriber, messageWrapper, exception);
}

subscriber.SkipCount = throttleFactor;
_stats[queueName].TotalDeliveryFailures++;
}

Expand Down
2 changes: 1 addition & 1 deletion src/SoCreate.ServiceFabric.PubSub/Events/IBrokerEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface IBrokerEventsManager : IBrokerEvents
Task OnUnsubscribedAsync(string queueName, ReferenceWrapper subscriber, string messageTypeName);
Task OnMessageReceivedAsync(string queueName, ReferenceWrapper subscriber, MessageWrapper messageWrapper);
Task OnMessageDeliveredAsync(string queueName, ReferenceWrapper subscriber, MessageWrapper messageWrapper);
Task OnMessageDeliveryFailedAsync(string queueName, ReferenceWrapper subscriber, MessageWrapper messageWrapper, Exception exception);
Task OnMessageDeliveryFailedAsync(string queueName, ReferenceWrapper subscriber, MessageWrapper messageWrapper, Exception exception, int throttleFactor = 0);
Task<List<QueueStats>> GetStatsAsync();
}
}
12 changes: 12 additions & 0 deletions src/SoCreate.ServiceFabric.PubSub/State/ReferenceWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ protected IHashingHelper HashingHelper
[DataMember]
public string RoutingKey { get; private set; }

public int SkipCount { get; set; }

/// <inheritdoc />
public abstract bool Equals(ReferenceWrapper other);

Expand Down Expand Up @@ -98,5 +100,15 @@ public bool ShouldDeliverMessage(MessageWrapper message)

return string.Equals(_routingKeyValue[1], value, StringComparison.InvariantCultureIgnoreCase);
}

public bool ShouldProcessMessages()
{
if (SkipCount > 0)
{
SkipCount--;
return false;
}
return true;
}
}
}

0 comments on commit a164ba3

Please sign in to comment.