This library helps at building simple TPL Dataflow pipelines, that enforce the following guarantees:
- In case any dataflow block fails, all the other blocks will complete as soon as possible.
- When a pipeline as a whole completes either successfully or with an error, all of its constituent dataflow blocks will be also completed.
- The
Completion
of the pipeline propagates all errors that may have occurred in all blocks, accumulated inside a flatAggregateException
.
This StackOverflow question
provides a deeper insight about why this library exists.
The problem with building pipelines using the traditional LinkTo
method,
configured with the PropagateCompletion
option set to true
,
is that it allows the possibility of deadlocks and leaked
fire-and-forget dataflow blocks:
-
A deadlock can occur in case a producer is blocked, waiting for empty space in the input buffer of the first dataflow block of a bounded pipeline, and any other block except from the first one fails. In this case the producer will never be unblocked, because the first block will postpone all incoming messages ad infinitum, never accepting or declining any offered message.
-
A leaked fire-and-forget block can occur under similar circumstances. When any but the first dataflow block fails, the error will be propagated downstream but not upstream. So the pipeline will soon signal its completion, while some blocks near the top may still be in a running state. These blocks will be leaked as fire-and-forget blocks, consuming resources and potentialy modifying the state of the application in unpredictable ways. Or they can just get stuck and become the source of a deadlock, as described previously.
-
The standard approach for propagating errors, the
PropagateCompletion = true
option, results in deeply nestedAggregateException
s.
This library attempts to fix these problems.
At first you instantiate the individual dataflow blocks, as usual. Any built-in or custom dataflow block can be part of the pipeline. This library helps only at linking the blocks, not at creating them.
After all the dataflow blocks are created, you use the static PipelineBuilder.BeginWith
method in order to start building the pipeline, specifying the first dataflow block.
This will be the entry point of the pipeline. Then you call the LinkTo
method to add
more blocks in the pipeline. The LinkTo
invocations should be chained, because
they don't modify the current builder. Instead they return a new builder each time. Finally,
when all the dataflow blocks have been added, you call the ToPipeline
method that
physically links the blocks, and composes the pipeline. Example:
var block1 = new TransformManyBlock<string, string>(
folderPath => Directory.EnumerateFiles(folderPath));
var block2 = new TransformBlock<string, (string, int)>(
filePath => (filePath, File.ReadLines(filePath).Count()));
var block3 = new ActionBlock<(string, int)>(
e => Console.WriteLine($"{Path.GetFileName(e.Item1)} has {e.Item2} lines"));
var pipeline = PipelineBuilder // This pipeline is a ITargetBlock<string>
.BeginWith(block1)
.LinkTo(block2)
.LinkTo(block3)
.ToPipeline();
pipeline.Post(@"C:\Users\Public\Documents");
pipeline.Complete();
await pipeline.Completion;
After the pipeline has been created, it now owns all the dataflow blocks
from which it is composed. You don't need to interact with the individual blocks any longer.
The pipeline represents them as a whole. The pipeline is a ITargetBlock<T>
that can
receive messages, and potentially also a ISourceBlock<T>
that can emit messages.
Whether it can emit messages depends on the type of the last block added in the pipeline.
When the pipeline is created, all the blocks are linked automatically with the built-in LinkTo
method,
configured with the PropagateCompletion
option set to false
.
Then a continuation is attached to the Completion
of each block, that takes an appropriate
action depending on how the block was completed. If the block was completed successfully or
it was canceled, the completion is propagated to the next block by invoking the next block's
Complete
method.
If the block was completed in a faulted state, then immediately all the other blocks are
forcefully completed (faulted), and their output is discarded by linking them to a
NullTarget
block.
Faulting the blocks is achieved by invoking their Fault
method,
passing a special PipelineException
as argument.
Faulting the blocks is required in order to empty their input and output buffers,
so that the pipeline can complete ASAP. This special exception is not propagated
through the Completion
of the generated pipeline, but it can be observed by querying
the Completion
property of the individual blocks. Observing this exception just means that
this block was not the first that failed. It is possible that the PipelineException
may coexist with other exceptions in the same dataflow block.
It might be helpful to compare the functionality offered by this library with the
functionality offered by the DataflowBlock.Encapsulate
method.
The result of this method is similar with the result of the ToPipeline
method: both return
an IPropagatorBlock<TInput, TOutput>
implementation
(a block that is both a target and a source). The DataflowBlock.Encapsulate
accepts a target
and a source
block, and returns a propagator that delegates to
these two blocks. The two blocks are not linked automatically in any way, and the completion
of the propagator represents the completion of the second (the source
) block only.
On the contrary the ToPipeline
returns a propagator that links all the dataflow
blocks tightly in both directions, and its Completion
represents the completion of all its
constituent blocks, not just the last one.
It should be noted that the IPropagatorBlock<TInput, TOutput>
returned by both of
these approaches also implements the IReceivableSourceBlock<TOutput>
interface,
in exactly the same way. Casting the propagator to this interface always succeeds
(provided that the TOutput
has the correct type). Invoking the
TryReceive
/TryReceiveAll
methods returns true
if the underlying dataflow block implements
this interface, and also has at least one available message to emit. Example:
var receivable = (IReceivableSourceBlock<string>)pipeline;
bool received = receivable.TryReceive(out string item);
Starting from the version 1.2, the pipeline builder supports two additional methods,
beyond the LinkTo
.
- The
AddUnlinked
method makes it possible to add a dataflow block in the pipeline, without linking it with the previous block. The completion of the previous block will still be propagated automatically to the next block. This can be useful in case a block sends messages to the next block manually, by issuingPost
orSendAsync
commands. - The
WithPostCompletionAction
method allows to intercept a synchronous or asynchronous action between the completion of a block, and signaling the completion of the next block. Any exception thrown by this action will cause the failure of the whole pipeline, and the error will be finally surfaced through itsCompletion
.
You can install the SimpleTplDataflowPipelines NuGet package.
You can also download the project and build it locally, or just
embed the single code file PipelineBuilder.cs
(~550 lines of code) into your project.
This library has been tested on the .NET 5 and the .NET Framework 4.6 platforms.
The pipelines created with the help of this library, are neither slower or faster than
the pipelines created manually by using the LinkTo
method. This library has not been
micro-optimized regarding the allocation of the few, small, short-lived objects that are
created during the construction of a pipeline. The emphasis has been put on simplicity,
readability and correctness, than on writing the most GC-friendly code possible.