Skip to content

Commit

Permalink
feat: add actions on completion for aggregators and sources (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Aug 27, 2024
1 parent 55dedd9 commit c70c985
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 30 deletions.
13 changes: 10 additions & 3 deletions Streamistry.Core/Aggregator.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability;
Expand All @@ -19,13 +20,16 @@ public class Aggregator<TSource, TAccumulate, TResult> : ChainablePipe<TResult>,
{
public Func<TAccumulate?, TSource?, TAccumulate?> Accumulator { get; }
public Func<TAccumulate?, TResult?> Selector { get; }
private TAccumulate? State { get; set; }
public TAccumulate? State { get; set; }
private TAccumulate? Seed { get; }

public Aggregator(IChainablePipe<TSource> upstream, Func<TAccumulate?, TSource?, TAccumulate?> accumulator, Func<TAccumulate?, TResult?> selector, TAccumulate? seed = default)
public Aggregator(IChainablePipe<TSource> upstream, Func<TAccumulate?, TSource?, TAccumulate?> accumulator, Func<TAccumulate?, TResult?> selector, TAccumulate? seed = default, Expression<Action<Aggregator<TSource, TAccumulate, TResult>>>? completion = null)
: base(upstream.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit, PushComplete);
(Accumulator, Selector, State) = (accumulator, selector, seed);
(Accumulator, Selector, State, Seed) = (accumulator, selector, seed, seed);
if (completion is not null)
Completion += () => (completion!.Compile())(this);
}

[Meter]
Expand All @@ -38,4 +42,7 @@ public virtual void Emit(TSource? obj)
State = Accumulator.Invoke(State, obj);
return Selector.Invoke(State);
}

public virtual void Reset()
=> State = Seed;
}
9 changes: 7 additions & 2 deletions Streamistry.Core/ChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Streamistry;
public abstract class ChainablePipe<T> : ObservablePipe, IChainablePipe<T>
{
private Action<T?>? Downstream { get; set; }
private Action? Completion { get; set; }
protected Action? Completion { get; set; }

protected ChainablePipe(ObservabilityProvider? observability)
: base(observability)
Expand All @@ -21,7 +21,12 @@ public void RegisterDownstream(Action<T?> downstream, Action? completion)
Downstream += downstream;
Completion += completion;
}


public void RegisterOnCompleted(Action? completion)
{
Completion += completion;
}

protected void PushDownstream(T? obj)
=> Downstream?.Invoke(obj);

Expand Down
8 changes: 7 additions & 1 deletion Streamistry.Core/IChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
using System.Threading.Tasks;

namespace Streamistry;
public interface IChainablePipe<T> : IObservablePipe

public interface IChainablePipe : IObservablePipe
{
void RegisterOnCompleted(Action? completion);
}

public interface IChainablePipe<T> : IChainablePipe
{
void RegisterDownstream(Action<T?> action, Action? complete);
}
11 changes: 7 additions & 4 deletions Streamistry.Core/Pipes/Aggregators/Average.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Text;
using System.Threading.Tasks;
using System.Numerics;
using System.Linq.Expressions;

namespace Streamistry.Pipes.Aggregators;

Expand All @@ -27,22 +28,24 @@ public AverageState<T> Append(T? value)

public class Average<T> : Aggregator<T, AverageState<T>, T> where T : INumber<T>
{
public Average(IChainablePipe<T> upstream)
public Average(IChainablePipe<T> upstream, Expression<Action<Aggregator<T, AverageState<T>, T>>>? completion = null)
: base(upstream
, (x, y) => x.Append(y)
, (x) => x.Select()
, AverageState<T>.Default)
, AverageState<T>.Default
, completion)
{ }
}

public class Average<T, U> : Aggregator<T, AverageState<U>, U>
where T : INumber<T>
where U : INumber<U>
{
public Average(IChainablePipe<T> upstream)
public Average(IChainablePipe<T> upstream, Expression<Action<Aggregator<T, AverageState<U>, U>>>? completion = null)
: base(upstream
, (x, y) => x.Append(y is null ? default : U.CreateChecked(y))
, (x) => x.Select()
, AverageState<U>.Default)
, AverageState<U>.Default
, completion)
{ }
}
9 changes: 5 additions & 4 deletions Streamistry.Core/Pipes/Aggregators/Count.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@
using System.Text;
using System.Threading.Tasks;
using System.Numerics;
using System.Linq.Expressions;

namespace Streamistry.Pipes.Aggregators;
public class Count<T> : Aggregator<T, int, int>
{
public Count(IChainablePipe<T> upstream)
public Count(IChainablePipe<T> upstream, Expression<Action<Aggregator<T, int, int>>>? completion = null)
: base(upstream
, (x, y) => y is null ? x : ++x, (x) => x)
, (x, y) => y is null ? x : ++x, (x) => x, 0, completion)
{ }
}

public class Count<T, U> : Aggregator<T, U, U>
where U : INumber<U>
{
public Count(IChainablePipe<T> upstream)
public Count(IChainablePipe<T> upstream, Expression<Action<Aggregator<T, U, U>>>? completion = null)
: base(upstream
, (x, y) => y is null ? x : (x is null ? U.One : ++x), (x) => x)
, (x, y) => y is null ? x : (x is null ? U.One : ++x), (x) => x, U.Zero, completion)
{ }
}
6 changes: 4 additions & 2 deletions Streamistry.Core/Pipes/Aggregators/Max.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Text;
using System.Threading.Tasks;
using System.Numerics;
using System.Linq.Expressions;

namespace Streamistry.Pipes.Aggregators;

Expand All @@ -28,10 +29,11 @@ public MaxState<T> Append(T? value)

public class Max<TInput> : Aggregator<TInput, MaxState<TInput>, TInput> where TInput : INumber<TInput>
{
public Max(IChainablePipe<TInput> upstream)
public Max(IChainablePipe<TInput> upstream, Expression<Action<Aggregator<TInput, MaxState<TInput>, TInput>>>? completion = null)
: base(upstream
, (x, y) => x.Append(y)
, (x) => x.Select()
, MaxState<TInput>.Default)
, MaxState<TInput>.Default
, completion)
{ }
}
11 changes: 7 additions & 4 deletions Streamistry.Core/Pipes/Aggregators/Median.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Text;
using System.Threading.Tasks;
using System.Numerics;
using System.Linq.Expressions;

namespace Streamistry.Pipes.Aggregators;

Expand Down Expand Up @@ -45,23 +46,25 @@ public MedianState<T> Append(T? value)

public class Median<TInput> : Aggregator<TInput, MedianState<TInput>, TInput> where TInput : INumber<TInput>
{
public Median(IChainablePipe<TInput> upstream)
public Median(IChainablePipe<TInput> upstream, Expression<Action<Aggregator<TInput, MedianState<TInput>, TInput>>>? completion = null)
: base(upstream
, (x, y) => x.Append(y)
, (x) => x.Select()
, MedianState<TInput>.Default)
, MedianState<TInput>.Default
, completion)
{ }
}

public class Median<TInput, TOuput> : Aggregator<TInput, MedianState<TOuput>, TOuput>
where TInput : INumber<TInput>
where TOuput : INumber<TOuput>
{
public Median(IChainablePipe<TInput> upstream)
public Median(IChainablePipe<TInput> upstream, Expression<Action<Aggregator<TInput, MedianState<TOuput>, TOuput>>>? completion = null)
: base(upstream
, (x, y) => x.Append(y is null ? default : TOuput.CreateChecked(y))
, (x) => x.Select()
, MedianState<TOuput>.Default)
, MedianState<TOuput>.Default
, completion)
{ }
}

6 changes: 4 additions & 2 deletions Streamistry.Core/Pipes/Aggregators/Min.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Text;
using System.Threading.Tasks;
using System.Numerics;
using System.Linq.Expressions;

namespace Streamistry.Pipes.Aggregators;

Expand All @@ -28,10 +29,11 @@ public MinState<T> Append(T? value)

public class Min<TInput> : Aggregator<TInput, MinState<TInput>, TInput> where TInput : INumber<TInput>
{
public Min(IChainablePipe<TInput> upstream)
public Min(IChainablePipe<TInput> upstream, Expression<Action<Aggregator<TInput, MinState<TInput>, TInput>>>? completion = null)
: base(upstream
, (x, y) => x.Append(y)
, (x) => x.Select()
, MinState<TInput>.Default)
, MinState<TInput>.Default
, completion)
{ }
}
17 changes: 9 additions & 8 deletions Streamistry.Core/Pipes/Aggregators/Sum.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,28 @@
using System.Text;
using System.Threading.Tasks;
using System.Numerics;
using System.Linq.Expressions;

namespace Streamistry.Pipes.Aggregators;
public class Sum<TInput> : Aggregator<TInput, TInput, TInput> where TInput : INumber<TInput>
{
public Sum(IChainablePipe<TInput> upstream)
: this(upstream, default) { }
public Sum(IChainablePipe<TInput> upstream, Expression<Action<Aggregator<TInput, TInput, TInput>>>? completion = null)
: this(upstream, default, completion) { }

private Sum(IChainablePipe<TInput> upstream, TInput? seed = default)
: base(upstream, (x, y) => x is null || y is null ? default : x += y, (x) => x, seed)
private Sum(IChainablePipe<TInput> upstream, TInput? seed = default, Expression<Action<Aggregator<TInput, TInput, TInput>>>? completion = null)
: base(upstream, (x, y) => x is null || y is null ? default : x += y, (x) => x, seed, completion)
{ }
}

public class Sum<TInput, TOutput> : Aggregator<TInput, TOutput, TOutput>
where TInput : INumber<TInput>
where TOutput : INumber<TOutput>
{
public Sum(IChainablePipe<TInput> upstream)
: this(upstream, default) { }
public Sum(IChainablePipe<TInput> upstream, Expression<Action<Aggregator<TInput, TOutput, TOutput>>>? completion = null)
: this(upstream, default, completion) { }

private Sum(IChainablePipe<TInput> upstream, TOutput? seed = default)
private Sum(IChainablePipe<TInput> upstream, TOutput? seed = default, Expression<Action<Aggregator<TInput, TOutput, TOutput>>>? completion = null)
: base(upstream
, (x, y) => x is null || y is null ? default : x += TOutput.CreateChecked(y), (x) => x, seed)
, (x, y) => x is null || y is null ? default : x += TOutput.CreateChecked(y), (x) => x, seed, completion)
{ }
}
3 changes: 3 additions & 0 deletions Streamistry.Core/Pipes/Sinks/MemorySink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ public MemorySink(IChainablePipe<T> upstream)
private MemorySink(IChainablePipe<T> upstream, IList<T?> state)
: base(upstream, state.Add)
=> State = state;

public void Clear()
=> State.Clear();
}
5 changes: 5 additions & 0 deletions Streamistry.Core/Source.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ protected Source(ObservabilityProvider? provider)

public void Start()
{
if (IsStarted)
return;
IsStarted = true;
Read();
}
Expand All @@ -43,4 +45,7 @@ protected virtual void Read()

public void WaitOnPrepared(IPreparablePipe pipe)
=> pipe.RegisterOnPrepared(Start);

public void WaitOnCompleted(IChainablePipe pipe)
=> pipe.RegisterOnCompleted(Start);
}
56 changes: 56 additions & 0 deletions Streamistry.Testing/AggregatorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json.Converters;
using NUnit.Framework;
using Streamistry.Pipes.Aggregators;
using Streamistry.Pipes.Sinks;
using Streamistry.Pipes.Sources;

namespace Streamistry.Testing;
public class AggregatorTests
{
[Test]
public void Completion_ResetItself_AggregatorReset()
{
var firstSource = new EnumerableSource<int>([1, 2, 3]);
var secondSource = new EnumerableSource<int>([-1, -2, 5]);

var pipeline = new Pipeline(firstSource);
var union = new Union<int>([firstSource, secondSource]);
var aggregator = new Sum<int>(union, x => x.Reset());
secondSource.WaitOnCompleted(aggregator);
var sink = new MemorySink<int>(aggregator);
pipeline.Start();

Assert.That(sink.State, Has.Count.EqualTo(6));
Assert.Multiple(() =>
{
foreach (var value in (int[])[1, 3, 6, -1, -3, 2])
Assert.That(sink.State, Does.Contain(value));
});
}

[Test]
public void Completion_AdditionalEmit_EmitInThePipeline()
{
var firstSource = new EnumerableSource<int>([1, 2, 3]);
var secondSource = new EnumerableSource<int>([-1, -2, 5]);

var pipeline = new Pipeline(firstSource);
var union = new Union<int>([firstSource, secondSource]);
var aggregator = new Sum<int>(union, x => x.Emit(-x.State));
secondSource.WaitOnCompleted(aggregator);
var sink = new MemorySink<int>(aggregator);
pipeline.Start();

Assert.That(sink.State, Has.Count.EqualTo(8));
Assert.Multiple(() =>
{
foreach (var value in (int[])[1, 3, 6, 0, -1, -3, 2, 0])
Assert.That(sink.State, Does.Contain(value));
});
}
}

0 comments on commit c70c985

Please sign in to comment.