Skip to content

Commit

Permalink
feat: add distinct routes out of a pipe (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Sep 1, 2024
1 parent c70c985 commit f18d9ff
Show file tree
Hide file tree
Showing 14 changed files with 237 additions and 37 deletions.
4 changes: 3 additions & 1 deletion Streamistry.Core/Aggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ public class Aggregator<TSource, TAccumulate, TResult> : ChainablePipe<TResult>,
public TAccumulate? State { get; set; }
private TAccumulate? Seed { get; }


public Aggregator(IChainablePipe<TSource> upstream, Func<TAccumulate?, TSource?, TAccumulate?> accumulator, Func<TAccumulate?, TResult?> selector, TAccumulate? seed = default, Expression<Action<Aggregator<TSource, TAccumulate, TResult>>>? completion = null)
: base(upstream.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit, PushComplete);
upstream.RegisterDownstream(Emit);
upstream.Pipe.RegisterOnCompleted(PushComplete);
(Accumulator, Selector, State, Seed) = (accumulator, selector, seed, seed);
if (completion is not null)
Completion += () => (completion!.Compile())(this);
Expand Down
24 changes: 14 additions & 10 deletions Streamistry.Core/ChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,32 @@
using Streamistry.Observability;

namespace Streamistry;
public abstract class ChainablePipe<T> : ObservablePipe, IChainablePipe<T>
public abstract class ChainablePipe<T> : ObservablePipe, IChainablePipe<T>, IObservablePipe
{
private Action<T?>? Downstream { get; set; }
public MainOutputPort<T> Main { get; }
protected Action? Completion { get; set; }
public IChainablePipe Pipe { get => this; }


protected ChainablePipe(ObservabilityProvider? observability)
: base(observability)
{ }

public void RegisterDownstream(Action<T?> downstream, Action? completion)
{
Downstream += downstream;
Completion += completion;
Main = new(this);
}

public void RegisterOnCompleted(Action? completion)
public void RegisterDownstream(Action<T?> downstream, Action? completion)
{
Completion += completion;
RegisterDownstream(downstream);
RegisterOnCompleted(completion);
}

public void RegisterOnCompleted(Action? action)
=> Completion += action;
public void RegisterDownstream(Action<T?> action)
=> Main.RegisterDownstream(action);

protected void PushDownstream(T? obj)
=> Downstream?.Invoke(obj);
=> Main.PushDownstream(obj);

public virtual void Complete()
=> PushComplete();
Expand Down
10 changes: 6 additions & 4 deletions Streamistry.Core/Combinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ public abstract class Combinator<TFirst, TSecond, TResult> : ChainablePipe<TResu
public Func<TFirst?, TSecond?, TResult?> Function { get; init; }
protected int BranchesCompleted { get; set; }

public Combinator(IChainablePipe<TFirst> firstUpstream, IChainablePipe<TSecond> secondUpstream, Func<TFirst?, TSecond?, TResult?> function)
: base(firstUpstream.GetObservabilityProvider())
public Combinator(IChainablePort<TFirst> firstUpstream, IChainablePipe<TSecond> secondUpstream, Func<TFirst?, TSecond?, TResult?> function)
: base(firstUpstream.Pipe.GetObservabilityProvider())
{
firstUpstream.RegisterDownstream(EmitFirst, Complete);
secondUpstream.RegisterDownstream(EmitSecond, Complete);
firstUpstream.RegisterDownstream(EmitFirst);
firstUpstream.Pipe.RegisterOnCompleted(Complete);
secondUpstream.RegisterDownstream(EmitSecond);
secondUpstream.Pipe.RegisterOnCompleted(Complete);

Function = function;
}
Expand Down
45 changes: 45 additions & 0 deletions Streamistry.Core/ExceptionRouterMapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability;

namespace Streamistry;
public class ExceptionRouterMapper<TInput, TOutput> : Mapper<TInput, TOutput>, IDualRoute<TOutput, TInput>
{
public OutputPort<TInput> Alternate { get; }
public new OutputPort<TOutput> Main { get => base.Main; }

public ExceptionRouterMapper(IChainablePort<TInput> upstream, Func<TInput?, TOutput?> function)
: base(upstream, function)
{
Alternate = new(this, "Alternate");
}

[Trace]
public override void Emit(TInput? obj)
{
if (TryInvokeCatch(obj, out var value, out var exception))
PushDownstream(value);
else
Alternate.PushDownstream(obj);
}

[Meter]
protected virtual bool TryInvokeCatch(TInput? obj, out TOutput? value, out Exception? ex)
{
value = default;
ex = null;
try
{
value = Function.Invoke(obj);
return true;
}
catch (Exception e)
{
ex = e;
return false;
}
}
}
7 changes: 4 additions & 3 deletions Streamistry.Core/Filter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ public class Filter<TInput> : ChainablePipe<TInput>, IProcessablePipe<TInput>
{
public Func<TInput?, bool> Predicate { get; init; }

public Filter(IChainablePipe<TInput> upstream, Func<TInput?, bool> predicate)
: base(upstream.GetObservabilityProvider())
public Filter(IChainablePort<TInput> upstream, Func<TInput?, bool> predicate)
: base(upstream.Pipe.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit, Complete);
upstream.RegisterDownstream(Emit);
upstream.Pipe.RegisterOnCompleted(Complete);
Predicate = predicate;
}

Expand Down
11 changes: 8 additions & 3 deletions Streamistry.Core/IChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@
using System.Threading.Tasks;

namespace Streamistry;
public interface IChainablePipe<T> : IChainablePort<T>, IChainablePipe
{
void RegisterDownstream(Action<T?> downstream, Action? complete);
}

public interface IChainablePipe : IObservablePipe
{
void RegisterOnCompleted(Action? completion);
void RegisterOnCompleted(Action? complete);
}

public interface IChainablePipe<T> : IChainablePipe
public interface IChainablePort<T>
{
void RegisterDownstream(Action<T?> action, Action? complete);
void RegisterDownstream(Action<T?> action);
IChainablePipe Pipe { get; }
}
12 changes: 12 additions & 0 deletions Streamistry.Core/IDualRoute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry;
public interface IDualRoute<TMain, TAlternate> : IChainablePort<TMain>
{
OutputPort<TMain> Main { get; }
OutputPort<TAlternate> Alternate { get; }
}
9 changes: 5 additions & 4 deletions Streamistry.Core/Mapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ public class Mapper<TInput, TOutput> : ChainablePipe<TOutput>, IProcessablePipe<
{
public Func<TInput?, TOutput?> Function { get; init; }

public Mapper(IChainablePipe<TInput> upstream, Func<TInput?, TOutput?> function)
: base(upstream.GetObservabilityProvider())
public Mapper(IChainablePort<TInput> upstream, Func<TInput?, TOutput?> function)
: base(upstream.Pipe.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit, Complete);
upstream.RegisterDownstream(Emit);
upstream.Pipe.RegisterOnCompleted(Complete);
Function = function;
}

[Meter]
public void Emit(TInput? obj)
public virtual void Emit(TInput? obj)
=> PushDownstream(Invoke(obj));

[Trace]
Expand Down
32 changes: 32 additions & 0 deletions Streamistry.Core/OuputPort.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry;
public class OutputPort<T> : IChainablePort<T>
{
private Action<T?>? Downstream { get; set; }

public void RegisterDownstream(Action<T?> downstream)
=> Downstream += downstream;

public string Name { get; init; }

public IChainablePipe Pipe { get; init; }


public OutputPort(IChainablePipe pipe, string name)
=> (Name, Pipe) = (name, pipe);

public void PushDownstream(T? obj)
=> Downstream?.Invoke(obj);
}

public class MainOutputPort<T> : OutputPort<T>
{
public MainOutputPort(IChainablePipe pipe)
: base(pipe, "Main")
{ }
}
4 changes: 2 additions & 2 deletions Streamistry.Core/Pipes/Sinks/MemorySink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ public class MemorySink<T> : Sink<T>
{
public IList<T?> State { get; }

public MemorySink(IChainablePipe<T> upstream)
public MemorySink(IChainablePort<T> upstream)
: this(upstream, []) { }

private MemorySink(IChainablePipe<T> upstream, IList<T?> state)
private MemorySink(IChainablePort<T> upstream, IList<T?> state)
: base(upstream, state.Add)
=> State = state;

Expand Down
6 changes: 3 additions & 3 deletions Streamistry.Core/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ public class Sink<T> : IProcessablePipe<T>, IObservablePipe
public Action<T?> Function { get; }
protected ObservabilityProvider? Observability { get; private set; }

public Sink(IChainablePipe<T> upstream, Action<T?> function)
public Sink(IChainablePort<T> upstream, Action<T?> function)
{
upstream.RegisterDownstream(Emit, null);
RegisterObservability(upstream.GetObservabilityProvider());
upstream.RegisterDownstream(Emit);
RegisterObservability(upstream.Pipe.GetObservabilityProvider());
Function = function;
}

Expand Down
9 changes: 5 additions & 4 deletions Streamistry.Core/Splitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ namespace Streamistry;
/// </summary>
/// <typeparam name="TInput">The type of the elements in the input stream.</typeparam>
/// <typeparam name="TOutput">The type of the elements in the output stream after the function is applied.</typeparam>
internal class Splitter<TInput, TOutput> : ChainablePipe<TOutput>, IProcessablePipe<TInput>
public class Splitter<TInput, TOutput> : ChainablePipe<TOutput>, IProcessablePipe<TInput>
{
public Func<TInput?, TOutput[]?> Function { get; init; }

public Splitter(IChainablePipe<TInput> upstream, Func<TInput?, TOutput[]?> function)
: base(upstream.GetObservabilityProvider())
public Splitter(IChainablePort<TInput> upstream, Func<TInput?, TOutput[]?> function)
: base(upstream.Pipe.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit, Complete);
upstream.RegisterDownstream(Emit);
upstream.Pipe.RegisterOnCompleted(Complete);
Function = function;
}

Expand Down
7 changes: 4 additions & 3 deletions Streamistry.Core/StreamBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ public class StreamBuffer<T> : ChainablePipe<T>, IProcessablePipe<T>
protected List<T?> Store { get; } = [];
protected int? MaxCapacity { get; }

public StreamBuffer(IChainablePipe<T> upstream, int? maxCapacity = null)
: base(upstream.GetObservabilityProvider())
public StreamBuffer(IChainablePort<T> upstream, int? maxCapacity = null)
: base(upstream.Pipe.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit, Complete);
upstream.RegisterDownstream(Emit);
upstream.Pipe.RegisterOnCompleted(Complete);
MaxCapacity = maxCapacity;
}

Expand Down
94 changes: 94 additions & 0 deletions Streamistry.Testing/ExceptionRouteMapperTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework;
using Streamistry;
using Streamistry.Pipes.Sinks;

namespace Streamistry.Testing;
public class ExceptionRouterMapperTests
{
[Test]
public void Emit_ValidData_MainOnly()
{
var pipeline = new Pipeline<int>();
var mapper = new ExceptionRouterMapper<int, int>(pipeline, x => 60 / x);
var mainSink = new MemorySink<int>(mapper);
var exceptionSink = new MemorySink<int>(mapper.Alternate);
pipeline.Emit(10);
pipeline.Emit(20);
pipeline.Emit(6);

Assert.That(mainSink.State.Count, Is.EqualTo(3));
Assert.That(mainSink.State.First, Is.EqualTo(6));
Assert.That(mainSink.State.Last, Is.EqualTo(10));
Assert.That(exceptionSink.State.Count, Is.EqualTo(0));
}

[Test]
public void Emit_InvalidData_ExceptionOnly()
{
var pipeline = new Pipeline<int>();
var mapper = new ExceptionRouterMapper<int, int>(pipeline, x => 60 / x);
var mainSink = new MemorySink<int>(mapper);
var exceptionSink = new MemorySink<int>(mapper.Alternate);
pipeline.Emit(0);

Assert.That(mainSink.State.Count, Is.EqualTo(0));
Assert.That(exceptionSink.State.Count, Is.EqualTo(1));
Assert.That(exceptionSink.State.First, Is.EqualTo(0));
}

[Test]
public void Emit_MixedDataNoExceptionPath_DontFail()
{
var pipeline = new Pipeline<int>();
var mapper = new ExceptionRouterMapper<int, int>(pipeline, x => 60 / x);
var mainSink = new MemorySink<int>(mapper);
pipeline.Emit(10);
pipeline.Emit(0);
pipeline.Emit(3);

Assert.That(mainSink.State.Count, Is.EqualTo(2));
Assert.That(mainSink.State.First, Is.EqualTo(6));
Assert.That(mainSink.State.Last, Is.EqualTo(20));
}

[Test]
public void Emit_MixedDataWithExceptionPath_DontFail()
{
var pipeline = new Pipeline<int>();
var mapper = new ExceptionRouterMapper<int, int>(pipeline, x => 60 / x);
var mainSink = new MemorySink<int>(mapper);
var exceptionSink = new MemorySink<int>(mapper.Alternate);
pipeline.Emit(10);
pipeline.Emit(0);
pipeline.Emit(3);

Assert.That(mainSink.State.Count, Is.EqualTo(2));
Assert.That(mainSink.State.First, Is.EqualTo(6));
Assert.That(mainSink.State.Last, Is.EqualTo(20));
Assert.That(exceptionSink.State.Count, Is.EqualTo(1));
Assert.That(exceptionSink.State.First, Is.EqualTo(0));
}

[Test]
public void Emit_MixedDataWithExceptionPathAndExplicitMainPath_DontFail()
{
var pipeline = new Pipeline<int>();
var mapper = new ExceptionRouterMapper<int, int>(pipeline, x => 60 / x);
var mainSink = new MemorySink<int>(mapper.Main);
var exceptionSink = new MemorySink<int>(mapper.Alternate);
pipeline.Emit(10);
pipeline.Emit(0);
pipeline.Emit(3);

Assert.That(mainSink.State.Count, Is.EqualTo(2));
Assert.That(mainSink.State.First, Is.EqualTo(6));
Assert.That(mainSink.State.Last, Is.EqualTo(20));
Assert.That(exceptionSink.State.Count, Is.EqualTo(1));
Assert.That(exceptionSink.State.First, Is.EqualTo(0));
}
}

0 comments on commit f18d9ff

Please sign in to comment.