Skip to content

Commit

Permalink
fix a potential concurrency issue when handling new heads
Browse files Browse the repository at this point in the history
  • Loading branch information
jaensen committed Sep 25, 2024
1 parent 5271423 commit 73fb628
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 16 deletions.
1 change: 1 addition & 0 deletions Circles.Index/Circles.Index.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="8.0.8" />
<PackageReference Include="Nethermind.Numerics.Int256" Version="1.2.0" />
<PackageReference Include="Nethermind.ReferenceAssemblies" Version="1.28.0" />
</ItemGroup>
Expand Down
48 changes: 32 additions & 16 deletions Circles.Index/Plugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class Plugin : INethermindPlugin

private StateMachine? _indexerMachine;
private Context? _indexerContext;
private Task? _currentMachineExecution;
private int _isProcessing;
private int _newItemsArrived;
private long _latestHeadToIndex = -1;

Expand Down Expand Up @@ -88,8 +88,6 @@ public async Task Init(INethermindApi nethermindApi)

await _indexerMachine.TransitionTo(StateMachine.State.Initial);

_currentMachineExecution = Task.CompletedTask;

nethermindApi.BlockTree!.NewHeadBlock += (_, args) =>
{
var fullSyncInfo = nethermindApi.EthSyncingInfo?.GetFullInfo();
Expand Down Expand Up @@ -191,28 +189,46 @@ private static void InitCaches(InterfaceLogger logger, IDatabase database)

private void HandleNewHead()
{
if (_currentMachineExecution is { IsCompleted: false })
// Start the processing task if not already running
if (Interlocked.CompareExchange(ref _isProcessing, 1, 0) == 0)
{
// If there is an ongoing execution, we don't need to start a new one
return;
Task.Run(ProcessBlocksAsync, _cancellationTokenSource.Token);
}

_currentMachineExecution = Task.Run(ProcessBlocksAsync, _cancellationTokenSource.Token);
}

private async Task ProcessBlocksAsync()
{
// This loop is to ensure that we process all the new heads that arrive while we are processing the current head
do
try
{
long toIndex = Interlocked.Exchange(ref _latestHeadToIndex, -1);
if (toIndex == -1)
do
{
continue;
}
_cancellationTokenSource.Token.ThrowIfCancellationRequested();

long toIndex = Interlocked.Exchange(ref _latestHeadToIndex, -1);
if (toIndex == -1)
{
continue;
}

await _indexerMachine!.HandleEvent(new StateMachine.NewHead(toIndex));

await _indexerMachine!.HandleEvent(new StateMachine.NewHead(toIndex));
} while (Interlocked.CompareExchange(ref _newItemsArrived, 0, 1) == 1);
// As long as new items arrive, keep processing
} while (Interlocked.CompareExchange(ref _newItemsArrived, 0, 1) == 1);
}
catch (OperationCanceledException)
{
_indexerContext!.Logger.Info("Processing was canceled.");
}
catch (Exception e)
{
_indexerContext!.Logger.Error("Error processing blocks", e);
throw;
}
finally
{
// Mark processing as complete
Interlocked.Exchange(ref _isProcessing, 0);
}
}

public Task InitNetworkProtocol()
Expand Down

0 comments on commit 73fb628

Please sign in to comment.