diff --git a/Circles.Index/BlockIndexer.cs b/Circles.Index/BlockIndexer.cs index 797dab3..3fcbdc6 100644 --- a/Circles.Index/BlockIndexer.cs +++ b/Circles.Index/BlockIndexer.cs @@ -60,12 +60,8 @@ private async Task Sink((BlockWithReceipts, IEnumerable) data) CreateOptions(cancellationToken, 3, 3)); TransformBlock receiptsSourceBlock = - new( - block => - new BlockWithReceipts( - block - , receiptFinder.Get(block)) - , CreateOptions(cancellationToken, Environment.ProcessorCount, Environment.ProcessorCount)); + new(block => new BlockWithReceipts(block, receiptFinder.Get(block)) + , CreateOptions(cancellationToken, Environment.ProcessorCount, Environment.ProcessorCount)); sourceBlock.LinkTo(receiptsSourceBlock!, new DataflowLinkOptions { PropagateCompletion = true }, o => o != null); @@ -151,20 +147,14 @@ private async Task Sink((BlockWithReceipts, IEnumerable) data) public async Task> Run(IAsyncEnumerable blocksToIndex, CancellationToken? cancellationToken) { - var (sourceBlock, sinkBlock) = BuildPipeline(CancellationToken.None); + var (sourceBlock, sinkBlock) = BuildPipeline(cancellationToken ?? CancellationToken.None); long min = long.MaxValue; long max = long.MinValue; - if (cancellationToken == null) + await foreach (var blockNo in blocksToIndex.WithCancellation(cancellationToken ?? CancellationToken.None)) { - CancellationTokenSource cts = new(); - cancellationToken = cts.Token; - } - - await foreach (var blockNo in blocksToIndex.WithCancellation(cancellationToken.Value)) - { - await sourceBlock.SendAsync(blockNo, cancellationToken.Value); + await sourceBlock.SendAsync(blockNo, cancellationToken ?? CancellationToken.None); min = Math.Min(min, blockNo); max = Math.Max(max, blockNo); @@ -187,6 +177,10 @@ private async Task AddBlock(BlockWithEventCounts block) if (_blockBuffer.Length >= context.Settings.BlockBufferSize) { + // FLush events + await context.Sink.Flush(); + + // Flush blocks await FlushBlocks(); } }