Skip to content

Commit

Permalink
Add exponential backoff and infinite restart attempts in case of stat…
Browse files Browse the repository at this point in the history
…e machine errors.
  • Loading branch information
jaensen committed Aug 4, 2024
1 parent d868841 commit 4e587df
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Circles.Index/BlockIndexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private async Task Sink((BlockWithReceipts, IEnumerable<IIndexEvent>) data)
receiptsSourceBlock.LinkTo(parserBlock, new DataflowLinkOptions { PropagateCompletion = true });

ActionBlock<(BlockWithReceipts, IEnumerable<IIndexEvent>)> sinkBlock = new(Sink,
CreateOptions(cancellationToken, 50000, 1));
CreateOptions(cancellationToken, 64 * 1024, 1));
parserBlock.LinkTo(sinkBlock, new DataflowLinkOptions { PropagateCompletion = true });

return (sourceBlock, sinkBlock);
Expand Down
58 changes: 29 additions & 29 deletions Circles.Index/StateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,6 @@ private record LeaveState : IEvent;

private State CurrentState { get; set; } = State.New;

private bool _checkedForGaps;

private long LastIndexHeight
{
get
{
// Only initially check for gaps
var value = (!_checkedForGaps ? context.Database.FirstGap() : null) ?? context.Database.LatestBlock() ?? 0;
_checkedForGaps = true;
return value;
}
}

public async Task HandleEvent(IEvent e)
{
try
Expand All @@ -64,8 +51,11 @@ public async Task HandleEvent(IEvent e)
{
case EnterState:
{
// Initially delete all events for which no "System_Block" exists
await TransitionTo(State.Reorg, LastIndexHeight);
context.Logger.Info("Initializing: Finding the last persisted block...");
var lastPersistedBlock = context.Database.FirstGap() ?? context.Database.LatestBlock() ?? 0;

context.Logger.Info($"Initializing: Last persisted block is {lastPersistedBlock}. Deleting all events from this block onwards...");
await TransitionTo(State.Reorg, lastPersistedBlock);
return;
}
}
Expand All @@ -90,8 +80,8 @@ public async Task HandleEvent(IEvent e)
switch (e)
{
case NewHead newHead:
context.Logger.Info($"New head received: {newHead.Head}");
if (newHead.Head <= LastIndexHeight)
context.Logger.Debug($"New head received: {newHead.Head}");
if (newHead.Head <= context.Database.LatestBlock())
{
await TransitionTo(State.Reorg, newHead.Head);
return;
Expand All @@ -108,8 +98,8 @@ public async Task HandleEvent(IEvent e)
{
case EnterState<long> enterSyncing:
var importedBlockRange = await Sync(enterSyncing.Arg);
context.Logger.Info($"Imported blocks from {importedBlockRange.Min} " +
$"to {importedBlockRange.Max}");
context.Logger.Debug($"Imported blocks from {importedBlockRange.Min} " +
$"to {importedBlockRange.Max}");
Errors.Clear();

await TransitionTo(State.NotifySubscribers, importedBlockRange);
Expand Down Expand Up @@ -146,15 +136,25 @@ public async Task HandleEvent(IEvent e)
switch (e)
{
case EnterState:
if (Errors.Count >= 3)
// Exponential backoff based on the number of errors
var delay = Errors.Count * Errors.Count * 1000;

// If the delay is larger than 60 sec, clear the oldest errors
if (delay > 60000)
{
// If we have tried 3 times, give up
await TransitionTo(State.End);
return;
Errors.RemoveAt(0);
}

// Otherwise, wait for a bit before retrying
await Task.Delay(Errors.Count * Errors.Count * 1000, cancellationToken);
// Add some jitter to the delay
var jitter = new Random((int)DateTime.Now.TimeOfDay.TotalSeconds).Next(0, 1000);
delay += jitter;

// Wait 'delay' ms
context.Logger.Info($"Waiting {delay} ms before retrying after an error...");
await Task.Delay(delay, cancellationToken);

// Retry
context.Logger.Info("Transitioning to 'Initial' state after an error...");
await TransitionTo(State.Initial);
return;
case LeaveState:
Expand All @@ -167,7 +167,7 @@ public async Task HandleEvent(IEvent e)
return;
}

context.Logger.Debug($"Unhandled event {e} in state {CurrentState}");
context.Logger.Trace($"Unhandled event {e} in state {CurrentState}");
}
catch (Exception ex)
{
Expand All @@ -180,7 +180,7 @@ public async Task HandleEvent(IEvent e)

private async Task TransitionTo<TArgument>(State newState, TArgument? argument)
{
context.Logger.Info($"Transitioning from {CurrentState} to {newState}");
context.Logger.Debug($"Transitioning from {CurrentState} to {newState}");
if (newState is not State.Error)
{
await HandleEvent(new LeaveState());
Expand All @@ -198,15 +198,15 @@ public async Task TransitionTo(State newState)

private async IAsyncEnumerable<long> GetBlocksToSync(long toBlock)
{
long lastIndexHeight = LastIndexHeight;
long lastIndexHeight = context.Database.LatestBlock() ?? 0;
if (lastIndexHeight == toBlock)
{
context.Logger.Info("No blocks to sync.");
yield break;
}

var nextBlock = lastIndexHeight + 1;
context.Logger.Info($"Enumerating blocks to sync from {nextBlock} (LastIndexHeight + 1) to {toBlock}");
context.Logger.Debug($"Enumerating blocks to sync from {nextBlock} (LastIndexHeight + 1) to {toBlock}");

for (long i = nextBlock; i <= toBlock; i++)
{
Expand Down

0 comments on commit 4e587df

Please sign in to comment.