From c70c98535f38c7cb964909549c0bb79c8e3fd60f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20L=2E=20Charlier?= Date: Tue, 27 Aug 2024 17:24:09 +0200 Subject: [PATCH] feat: add actions on completion for aggregators and sources (#35) --- Streamistry.Core/Aggregator.cs | 13 ++++- Streamistry.Core/ChainablePipe.cs | 9 ++- Streamistry.Core/IChainablePipe.cs | 8 ++- Streamistry.Core/Pipes/Aggregators/Average.cs | 11 ++-- Streamistry.Core/Pipes/Aggregators/Count.cs | 9 +-- Streamistry.Core/Pipes/Aggregators/Max.cs | 6 +- Streamistry.Core/Pipes/Aggregators/Median.cs | 11 ++-- Streamistry.Core/Pipes/Aggregators/Min.cs | 6 +- Streamistry.Core/Pipes/Aggregators/Sum.cs | 17 +++--- Streamistry.Core/Pipes/Sinks/MemorySink.cs | 3 + Streamistry.Core/Source.cs | 5 ++ Streamistry.Testing/AggregatorTests.cs | 56 +++++++++++++++++++ 12 files changed, 124 insertions(+), 30 deletions(-) create mode 100644 Streamistry.Testing/AggregatorTests.cs diff --git a/Streamistry.Core/Aggregator.cs b/Streamistry.Core/Aggregator.cs index 19bf01e..c681291 100644 --- a/Streamistry.Core/Aggregator.cs +++ b/Streamistry.Core/Aggregator.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Linq.Expressions; using System.Text; using System.Threading.Tasks; using Streamistry.Observability; @@ -19,13 +20,16 @@ public class Aggregator : ChainablePipe, { public Func Accumulator { get; } public Func Selector { get; } - private TAccumulate? State { get; set; } + public TAccumulate? State { get; set; } + private TAccumulate? Seed { get; } - public Aggregator(IChainablePipe upstream, Func accumulator, Func selector, TAccumulate? seed = default) + public Aggregator(IChainablePipe upstream, Func accumulator, Func selector, TAccumulate? seed = default, Expression>>? completion = null) : base(upstream.GetObservabilityProvider()) { upstream.RegisterDownstream(Emit, PushComplete); - (Accumulator, Selector, State) = (accumulator, selector, seed); + (Accumulator, Selector, State, Seed) = (accumulator, selector, seed, seed); + if (completion is not null) + Completion += () => (completion!.Compile())(this); } [Meter] @@ -38,4 +42,7 @@ public virtual void Emit(TSource? obj) State = Accumulator.Invoke(State, obj); return Selector.Invoke(State); } + + public virtual void Reset() + => State = Seed; } diff --git a/Streamistry.Core/ChainablePipe.cs b/Streamistry.Core/ChainablePipe.cs index 7fe71e6..80991ba 100644 --- a/Streamistry.Core/ChainablePipe.cs +++ b/Streamistry.Core/ChainablePipe.cs @@ -10,7 +10,7 @@ namespace Streamistry; public abstract class ChainablePipe : ObservablePipe, IChainablePipe { private Action? Downstream { get; set; } - private Action? Completion { get; set; } + protected Action? Completion { get; set; } protected ChainablePipe(ObservabilityProvider? observability) : base(observability) @@ -21,7 +21,12 @@ public void RegisterDownstream(Action downstream, Action? completion) Downstream += downstream; Completion += completion; } - + + public void RegisterOnCompleted(Action? completion) + { + Completion += completion; + } + protected void PushDownstream(T? obj) => Downstream?.Invoke(obj); diff --git a/Streamistry.Core/IChainablePipe.cs b/Streamistry.Core/IChainablePipe.cs index b3e6fae..dc5c628 100644 --- a/Streamistry.Core/IChainablePipe.cs +++ b/Streamistry.Core/IChainablePipe.cs @@ -5,7 +5,13 @@ using System.Threading.Tasks; namespace Streamistry; -public interface IChainablePipe : IObservablePipe + +public interface IChainablePipe : IObservablePipe +{ + void RegisterOnCompleted(Action? completion); +} + +public interface IChainablePipe : IChainablePipe { void RegisterDownstream(Action action, Action? complete); } diff --git a/Streamistry.Core/Pipes/Aggregators/Average.cs b/Streamistry.Core/Pipes/Aggregators/Average.cs index 83391d7..7410cbb 100644 --- a/Streamistry.Core/Pipes/Aggregators/Average.cs +++ b/Streamistry.Core/Pipes/Aggregators/Average.cs @@ -4,6 +4,7 @@ using System.Text; using System.Threading.Tasks; using System.Numerics; +using System.Linq.Expressions; namespace Streamistry.Pipes.Aggregators; @@ -27,11 +28,12 @@ public AverageState Append(T? value) public class Average : Aggregator, T> where T : INumber { - public Average(IChainablePipe upstream) + public Average(IChainablePipe upstream, Expression, T>>>? completion = null) : base(upstream , (x, y) => x.Append(y) , (x) => x.Select() - , AverageState.Default) + , AverageState.Default + , completion) { } } @@ -39,10 +41,11 @@ public class Average : Aggregator, U> where T : INumber where U : INumber { - public Average(IChainablePipe upstream) + public Average(IChainablePipe upstream, Expression, U>>>? completion = null) : base(upstream , (x, y) => x.Append(y is null ? default : U.CreateChecked(y)) , (x) => x.Select() - , AverageState.Default) + , AverageState.Default + , completion) { } } diff --git a/Streamistry.Core/Pipes/Aggregators/Count.cs b/Streamistry.Core/Pipes/Aggregators/Count.cs index 2bedc82..5ee353d 100644 --- a/Streamistry.Core/Pipes/Aggregators/Count.cs +++ b/Streamistry.Core/Pipes/Aggregators/Count.cs @@ -4,21 +4,22 @@ using System.Text; using System.Threading.Tasks; using System.Numerics; +using System.Linq.Expressions; namespace Streamistry.Pipes.Aggregators; public class Count : Aggregator { - public Count(IChainablePipe upstream) + public Count(IChainablePipe upstream, Expression>>? completion = null) : base(upstream - , (x, y) => y is null ? x : ++x, (x) => x) + , (x, y) => y is null ? x : ++x, (x) => x, 0, completion) { } } public class Count : Aggregator where U : INumber { - public Count(IChainablePipe upstream) + public Count(IChainablePipe upstream, Expression>>? completion = null) : base(upstream - , (x, y) => y is null ? x : (x is null ? U.One : ++x), (x) => x) + , (x, y) => y is null ? x : (x is null ? U.One : ++x), (x) => x, U.Zero, completion) { } } diff --git a/Streamistry.Core/Pipes/Aggregators/Max.cs b/Streamistry.Core/Pipes/Aggregators/Max.cs index 0e978ae..52c87ae 100644 --- a/Streamistry.Core/Pipes/Aggregators/Max.cs +++ b/Streamistry.Core/Pipes/Aggregators/Max.cs @@ -4,6 +4,7 @@ using System.Text; using System.Threading.Tasks; using System.Numerics; +using System.Linq.Expressions; namespace Streamistry.Pipes.Aggregators; @@ -28,10 +29,11 @@ public MaxState Append(T? value) public class Max : Aggregator, TInput> where TInput : INumber { - public Max(IChainablePipe upstream) + public Max(IChainablePipe upstream, Expression, TInput>>>? completion = null) : base(upstream , (x, y) => x.Append(y) , (x) => x.Select() - , MaxState.Default) + , MaxState.Default + , completion) { } } diff --git a/Streamistry.Core/Pipes/Aggregators/Median.cs b/Streamistry.Core/Pipes/Aggregators/Median.cs index 0e1f246..b11604b 100644 --- a/Streamistry.Core/Pipes/Aggregators/Median.cs +++ b/Streamistry.Core/Pipes/Aggregators/Median.cs @@ -4,6 +4,7 @@ using System.Text; using System.Threading.Tasks; using System.Numerics; +using System.Linq.Expressions; namespace Streamistry.Pipes.Aggregators; @@ -45,11 +46,12 @@ public MedianState Append(T? value) public class Median : Aggregator, TInput> where TInput : INumber { - public Median(IChainablePipe upstream) + public Median(IChainablePipe upstream, Expression, TInput>>>? completion = null) : base(upstream , (x, y) => x.Append(y) , (x) => x.Select() - , MedianState.Default) + , MedianState.Default + , completion) { } } @@ -57,11 +59,12 @@ public class Median : Aggregator, TO where TInput : INumber where TOuput : INumber { - public Median(IChainablePipe upstream) + public Median(IChainablePipe upstream, Expression, TOuput>>>? completion = null) : base(upstream , (x, y) => x.Append(y is null ? default : TOuput.CreateChecked(y)) , (x) => x.Select() - , MedianState.Default) + , MedianState.Default + , completion) { } } diff --git a/Streamistry.Core/Pipes/Aggregators/Min.cs b/Streamistry.Core/Pipes/Aggregators/Min.cs index 60d4a94..53fd1aa 100644 --- a/Streamistry.Core/Pipes/Aggregators/Min.cs +++ b/Streamistry.Core/Pipes/Aggregators/Min.cs @@ -4,6 +4,7 @@ using System.Text; using System.Threading.Tasks; using System.Numerics; +using System.Linq.Expressions; namespace Streamistry.Pipes.Aggregators; @@ -28,10 +29,11 @@ public MinState Append(T? value) public class Min : Aggregator, TInput> where TInput : INumber { - public Min(IChainablePipe upstream) + public Min(IChainablePipe upstream, Expression, TInput>>>? completion = null) : base(upstream , (x, y) => x.Append(y) , (x) => x.Select() - , MinState.Default) + , MinState.Default + , completion) { } } diff --git a/Streamistry.Core/Pipes/Aggregators/Sum.cs b/Streamistry.Core/Pipes/Aggregators/Sum.cs index d42eaad..7188684 100644 --- a/Streamistry.Core/Pipes/Aggregators/Sum.cs +++ b/Streamistry.Core/Pipes/Aggregators/Sum.cs @@ -4,15 +4,16 @@ using System.Text; using System.Threading.Tasks; using System.Numerics; +using System.Linq.Expressions; namespace Streamistry.Pipes.Aggregators; public class Sum : Aggregator where TInput : INumber { - public Sum(IChainablePipe upstream) - : this(upstream, default) { } + public Sum(IChainablePipe upstream, Expression>>? completion = null) + : this(upstream, default, completion) { } - private Sum(IChainablePipe upstream, TInput? seed = default) - : base(upstream, (x, y) => x is null || y is null ? default : x += y, (x) => x, seed) + private Sum(IChainablePipe upstream, TInput? seed = default, Expression>>? completion = null) + : base(upstream, (x, y) => x is null || y is null ? default : x += y, (x) => x, seed, completion) { } } @@ -20,11 +21,11 @@ public class Sum : Aggregator where TInput : INumber where TOutput : INumber { - public Sum(IChainablePipe upstream) - : this(upstream, default) { } + public Sum(IChainablePipe upstream, Expression>>? completion = null) + : this(upstream, default, completion) { } - private Sum(IChainablePipe upstream, TOutput? seed = default) + private Sum(IChainablePipe upstream, TOutput? seed = default, Expression>>? completion = null) : base(upstream - , (x, y) => x is null || y is null ? default : x += TOutput.CreateChecked(y), (x) => x, seed) + , (x, y) => x is null || y is null ? default : x += TOutput.CreateChecked(y), (x) => x, seed, completion) { } } diff --git a/Streamistry.Core/Pipes/Sinks/MemorySink.cs b/Streamistry.Core/Pipes/Sinks/MemorySink.cs index 38df724..5500710 100644 --- a/Streamistry.Core/Pipes/Sinks/MemorySink.cs +++ b/Streamistry.Core/Pipes/Sinks/MemorySink.cs @@ -17,4 +17,7 @@ public MemorySink(IChainablePipe upstream) private MemorySink(IChainablePipe upstream, IList state) : base(upstream, state.Add) => State = state; + + public void Clear() + => State.Clear(); } diff --git a/Streamistry.Core/Source.cs b/Streamistry.Core/Source.cs index 7c17d47..b44bcfe 100644 --- a/Streamistry.Core/Source.cs +++ b/Streamistry.Core/Source.cs @@ -23,6 +23,8 @@ protected Source(ObservabilityProvider? provider) public void Start() { + if (IsStarted) + return; IsStarted = true; Read(); } @@ -43,4 +45,7 @@ protected virtual void Read() public void WaitOnPrepared(IPreparablePipe pipe) => pipe.RegisterOnPrepared(Start); + + public void WaitOnCompleted(IChainablePipe pipe) + => pipe.RegisterOnCompleted(Start); } diff --git a/Streamistry.Testing/AggregatorTests.cs b/Streamistry.Testing/AggregatorTests.cs new file mode 100644 index 0000000..399c4f8 --- /dev/null +++ b/Streamistry.Testing/AggregatorTests.cs @@ -0,0 +1,56 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Newtonsoft.Json.Converters; +using NUnit.Framework; +using Streamistry.Pipes.Aggregators; +using Streamistry.Pipes.Sinks; +using Streamistry.Pipes.Sources; + +namespace Streamistry.Testing; +public class AggregatorTests +{ + [Test] + public void Completion_ResetItself_AggregatorReset() + { + var firstSource = new EnumerableSource([1, 2, 3]); + var secondSource = new EnumerableSource([-1, -2, 5]); + + var pipeline = new Pipeline(firstSource); + var union = new Union([firstSource, secondSource]); + var aggregator = new Sum(union, x => x.Reset()); + secondSource.WaitOnCompleted(aggregator); + var sink = new MemorySink(aggregator); + pipeline.Start(); + + Assert.That(sink.State, Has.Count.EqualTo(6)); + Assert.Multiple(() => + { + foreach (var value in (int[])[1, 3, 6, -1, -3, 2]) + Assert.That(sink.State, Does.Contain(value)); + }); + } + + [Test] + public void Completion_AdditionalEmit_EmitInThePipeline() + { + var firstSource = new EnumerableSource([1, 2, 3]); + var secondSource = new EnumerableSource([-1, -2, 5]); + + var pipeline = new Pipeline(firstSource); + var union = new Union([firstSource, secondSource]); + var aggregator = new Sum(union, x => x.Emit(-x.State)); + secondSource.WaitOnCompleted(aggregator); + var sink = new MemorySink(aggregator); + pipeline.Start(); + + Assert.That(sink.State, Has.Count.EqualTo(8)); + Assert.Multiple(() => + { + foreach (var value in (int[])[1, 3, 6, 0, -1, -3, 2, 0]) + Assert.That(sink.State, Does.Contain(value)); + }); + } +}