Skip to content

Commit

Permalink
* fixed using CancellationToken.None in BuildPipeline() even if a can…
Browse files Browse the repository at this point in the history
…cellationToken is supplied.

* Also flush events in AddBlock if the buffer size reaches 'BlockBufferSize'. This should ensure that no blocks with incomplete events are written.
  • Loading branch information
jaensen committed Sep 25, 2024
1 parent 73fb628 commit 1d10470
Showing 1 changed file with 9 additions and 15 deletions.
24 changes: 9 additions & 15 deletions Circles.Index/BlockIndexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,8 @@ private async Task Sink((BlockWithReceipts, IEnumerable<IIndexEvent>) data)
CreateOptions(cancellationToken, 3, 3));

TransformBlock<Block, BlockWithReceipts> receiptsSourceBlock =
new(
block =>
new BlockWithReceipts(
block
, receiptFinder.Get(block))
, CreateOptions(cancellationToken, Environment.ProcessorCount, Environment.ProcessorCount));
new(block => new BlockWithReceipts(block, receiptFinder.Get(block))
, CreateOptions(cancellationToken, Environment.ProcessorCount, Environment.ProcessorCount));

sourceBlock.LinkTo(receiptsSourceBlock!, new DataflowLinkOptions { PropagateCompletion = true },
o => o != null);
Expand Down Expand Up @@ -151,20 +147,14 @@ private async Task Sink((BlockWithReceipts, IEnumerable<IIndexEvent>) data)

public async Task<Range<long>> Run(IAsyncEnumerable<long> blocksToIndex, CancellationToken? cancellationToken)
{
var (sourceBlock, sinkBlock) = BuildPipeline(CancellationToken.None);
var (sourceBlock, sinkBlock) = BuildPipeline(cancellationToken ?? CancellationToken.None);

long min = long.MaxValue;
long max = long.MinValue;

if (cancellationToken == null)
await foreach (var blockNo in blocksToIndex.WithCancellation(cancellationToken ?? CancellationToken.None))
{
CancellationTokenSource cts = new();
cancellationToken = cts.Token;
}

await foreach (var blockNo in blocksToIndex.WithCancellation(cancellationToken.Value))
{
await sourceBlock.SendAsync(blockNo, cancellationToken.Value);
await sourceBlock.SendAsync(blockNo, cancellationToken ?? CancellationToken.None);

min = Math.Min(min, blockNo);
max = Math.Max(max, blockNo);
Expand All @@ -187,6 +177,10 @@ private async Task AddBlock(BlockWithEventCounts block)

if (_blockBuffer.Length >= context.Settings.BlockBufferSize)
{
// FLush events
await context.Sink.Flush();

// Flush blocks
await FlushBlocks();
}
}
Expand Down

0 comments on commit 1d10470

Please sign in to comment.