diff --git a/Streamistry.Core/Aggregator.cs b/Streamistry.Core/Aggregator.cs index c681291..88eee43 100644 --- a/Streamistry.Core/Aggregator.cs +++ b/Streamistry.Core/Aggregator.cs @@ -23,10 +23,12 @@ public class Aggregator : ChainablePipe, public TAccumulate? State { get; set; } private TAccumulate? Seed { get; } + public Aggregator(IChainablePipe upstream, Func accumulator, Func selector, TAccumulate? seed = default, Expression>>? completion = null) : base(upstream.GetObservabilityProvider()) { - upstream.RegisterDownstream(Emit, PushComplete); + upstream.RegisterDownstream(Emit); + upstream.Pipe.RegisterOnCompleted(PushComplete); (Accumulator, Selector, State, Seed) = (accumulator, selector, seed, seed); if (completion is not null) Completion += () => (completion!.Compile())(this); diff --git a/Streamistry.Core/ChainablePipe.cs b/Streamistry.Core/ChainablePipe.cs index 80991ba..ee21820 100644 --- a/Streamistry.Core/ChainablePipe.cs +++ b/Streamistry.Core/ChainablePipe.cs @@ -7,28 +7,32 @@ using Streamistry.Observability; namespace Streamistry; -public abstract class ChainablePipe : ObservablePipe, IChainablePipe +public abstract class ChainablePipe : ObservablePipe, IChainablePipe, IObservablePipe { - private Action? Downstream { get; set; } + public MainOutputPort Main { get; } protected Action? Completion { get; set; } + public IChainablePipe Pipe { get => this; } + protected ChainablePipe(ObservabilityProvider? observability) : base(observability) - { } - - public void RegisterDownstream(Action downstream, Action? completion) { - Downstream += downstream; - Completion += completion; + Main = new(this); } - public void RegisterOnCompleted(Action? completion) + public void RegisterDownstream(Action downstream, Action? completion) { - Completion += completion; + RegisterDownstream(downstream); + RegisterOnCompleted(completion); } + public void RegisterOnCompleted(Action? action) + => Completion += action; + public void RegisterDownstream(Action action) + => Main.RegisterDownstream(action); + protected void PushDownstream(T? obj) - => Downstream?.Invoke(obj); + => Main.PushDownstream(obj); public virtual void Complete() => PushComplete(); diff --git a/Streamistry.Core/Combinator.cs b/Streamistry.Core/Combinator.cs index 34140e7..9bb12ab 100644 --- a/Streamistry.Core/Combinator.cs +++ b/Streamistry.Core/Combinator.cs @@ -20,11 +20,13 @@ public abstract class Combinator : ChainablePipe Function { get; init; } protected int BranchesCompleted { get; set; } - public Combinator(IChainablePipe firstUpstream, IChainablePipe secondUpstream, Func function) - : base(firstUpstream.GetObservabilityProvider()) + public Combinator(IChainablePort firstUpstream, IChainablePipe secondUpstream, Func function) + : base(firstUpstream.Pipe.GetObservabilityProvider()) { - firstUpstream.RegisterDownstream(EmitFirst, Complete); - secondUpstream.RegisterDownstream(EmitSecond, Complete); + firstUpstream.RegisterDownstream(EmitFirst); + firstUpstream.Pipe.RegisterOnCompleted(Complete); + secondUpstream.RegisterDownstream(EmitSecond); + secondUpstream.Pipe.RegisterOnCompleted(Complete); Function = function; } diff --git a/Streamistry.Core/ExceptionRouterMapper.cs b/Streamistry.Core/ExceptionRouterMapper.cs new file mode 100644 index 0000000..cd4105a --- /dev/null +++ b/Streamistry.Core/ExceptionRouterMapper.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Streamistry.Observability; + +namespace Streamistry; +public class ExceptionRouterMapper : Mapper, IDualRoute +{ + public OutputPort Alternate { get; } + public new OutputPort Main { get => base.Main; } + + public ExceptionRouterMapper(IChainablePort upstream, Func function) + : base(upstream, function) + { + Alternate = new(this, "Alternate"); + } + + [Trace] + public override void Emit(TInput? obj) + { + if (TryInvokeCatch(obj, out var value, out var exception)) + PushDownstream(value); + else + Alternate.PushDownstream(obj); + } + + [Meter] + protected virtual bool TryInvokeCatch(TInput? obj, out TOutput? value, out Exception? ex) + { + value = default; + ex = null; + try + { + value = Function.Invoke(obj); + return true; + } + catch (Exception e) + { + ex = e; + return false; + } + } +} diff --git a/Streamistry.Core/Filter.cs b/Streamistry.Core/Filter.cs index 248ec2e..0085166 100644 --- a/Streamistry.Core/Filter.cs +++ b/Streamistry.Core/Filter.cs @@ -17,10 +17,11 @@ public class Filter : ChainablePipe, IProcessablePipe { public Func Predicate { get; init; } - public Filter(IChainablePipe upstream, Func predicate) - : base(upstream.GetObservabilityProvider()) + public Filter(IChainablePort upstream, Func predicate) + : base(upstream.Pipe.GetObservabilityProvider()) { - upstream.RegisterDownstream(Emit, Complete); + upstream.RegisterDownstream(Emit); + upstream.Pipe.RegisterOnCompleted(Complete); Predicate = predicate; } diff --git a/Streamistry.Core/IChainablePipe.cs b/Streamistry.Core/IChainablePipe.cs index dc5c628..05f87a4 100644 --- a/Streamistry.Core/IChainablePipe.cs +++ b/Streamistry.Core/IChainablePipe.cs @@ -5,13 +5,18 @@ using System.Threading.Tasks; namespace Streamistry; +public interface IChainablePipe : IChainablePort, IChainablePipe +{ + void RegisterDownstream(Action downstream, Action? complete); +} public interface IChainablePipe : IObservablePipe { - void RegisterOnCompleted(Action? completion); + void RegisterOnCompleted(Action? complete); } -public interface IChainablePipe : IChainablePipe +public interface IChainablePort { - void RegisterDownstream(Action action, Action? complete); + void RegisterDownstream(Action action); + IChainablePipe Pipe { get; } } diff --git a/Streamistry.Core/IDualRoute.cs b/Streamistry.Core/IDualRoute.cs new file mode 100644 index 0000000..0da203b --- /dev/null +++ b/Streamistry.Core/IDualRoute.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry; +public interface IDualRoute : IChainablePort +{ + OutputPort Main { get; } + OutputPort Alternate { get; } +} diff --git a/Streamistry.Core/Mapper.cs b/Streamistry.Core/Mapper.cs index acb2b32..7cb1554 100644 --- a/Streamistry.Core/Mapper.cs +++ b/Streamistry.Core/Mapper.cs @@ -18,15 +18,16 @@ public class Mapper : ChainablePipe, IProcessablePipe< { public Func Function { get; init; } - public Mapper(IChainablePipe upstream, Func function) - : base(upstream.GetObservabilityProvider()) + public Mapper(IChainablePort upstream, Func function) + : base(upstream.Pipe.GetObservabilityProvider()) { - upstream.RegisterDownstream(Emit, Complete); + upstream.RegisterDownstream(Emit); + upstream.Pipe.RegisterOnCompleted(Complete); Function = function; } [Meter] - public void Emit(TInput? obj) + public virtual void Emit(TInput? obj) => PushDownstream(Invoke(obj)); [Trace] diff --git a/Streamistry.Core/OuputPort.cs b/Streamistry.Core/OuputPort.cs new file mode 100644 index 0000000..e13183f --- /dev/null +++ b/Streamistry.Core/OuputPort.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry; +public class OutputPort : IChainablePort +{ + private Action? Downstream { get; set; } + + public void RegisterDownstream(Action downstream) + => Downstream += downstream; + + public string Name { get; init; } + + public IChainablePipe Pipe { get; init; } + + + public OutputPort(IChainablePipe pipe, string name) + => (Name, Pipe) = (name, pipe); + + public void PushDownstream(T? obj) + => Downstream?.Invoke(obj); +} + +public class MainOutputPort : OutputPort +{ + public MainOutputPort(IChainablePipe pipe) + : base(pipe, "Main") + { } +} diff --git a/Streamistry.Core/Pipes/Sinks/MemorySink.cs b/Streamistry.Core/Pipes/Sinks/MemorySink.cs index 5500710..ccb826e 100644 --- a/Streamistry.Core/Pipes/Sinks/MemorySink.cs +++ b/Streamistry.Core/Pipes/Sinks/MemorySink.cs @@ -11,10 +11,10 @@ public class MemorySink : Sink { public IList State { get; } - public MemorySink(IChainablePipe upstream) + public MemorySink(IChainablePort upstream) : this(upstream, []) { } - private MemorySink(IChainablePipe upstream, IList state) + private MemorySink(IChainablePort upstream, IList state) : base(upstream, state.Add) => State = state; diff --git a/Streamistry.Core/Sink.cs b/Streamistry.Core/Sink.cs index a8d7de7..98f65a9 100644 --- a/Streamistry.Core/Sink.cs +++ b/Streamistry.Core/Sink.cs @@ -11,10 +11,10 @@ public class Sink : IProcessablePipe, IObservablePipe public Action Function { get; } protected ObservabilityProvider? Observability { get; private set; } - public Sink(IChainablePipe upstream, Action function) + public Sink(IChainablePort upstream, Action function) { - upstream.RegisterDownstream(Emit, null); - RegisterObservability(upstream.GetObservabilityProvider()); + upstream.RegisterDownstream(Emit); + RegisterObservability(upstream.Pipe.GetObservabilityProvider()); Function = function; } diff --git a/Streamistry.Core/Splitter.cs b/Streamistry.Core/Splitter.cs index 7a959d7..9427313 100644 --- a/Streamistry.Core/Splitter.cs +++ b/Streamistry.Core/Splitter.cs @@ -13,14 +13,15 @@ namespace Streamistry; /// /// The type of the elements in the input stream. /// The type of the elements in the output stream after the function is applied. -internal class Splitter : ChainablePipe, IProcessablePipe +public class Splitter : ChainablePipe, IProcessablePipe { public Func Function { get; init; } - public Splitter(IChainablePipe upstream, Func function) - : base(upstream.GetObservabilityProvider()) + public Splitter(IChainablePort upstream, Func function) + : base(upstream.Pipe.GetObservabilityProvider()) { - upstream.RegisterDownstream(Emit, Complete); + upstream.RegisterDownstream(Emit); + upstream.Pipe.RegisterOnCompleted(Complete); Function = function; } diff --git a/Streamistry.Core/StreamBuffer.cs b/Streamistry.Core/StreamBuffer.cs index ca710c4..27fc64f 100644 --- a/Streamistry.Core/StreamBuffer.cs +++ b/Streamistry.Core/StreamBuffer.cs @@ -19,10 +19,11 @@ public class StreamBuffer : ChainablePipe, IProcessablePipe protected List Store { get; } = []; protected int? MaxCapacity { get; } - public StreamBuffer(IChainablePipe upstream, int? maxCapacity = null) - : base(upstream.GetObservabilityProvider()) + public StreamBuffer(IChainablePort upstream, int? maxCapacity = null) + : base(upstream.Pipe.GetObservabilityProvider()) { - upstream.RegisterDownstream(Emit, Complete); + upstream.RegisterDownstream(Emit); + upstream.Pipe.RegisterOnCompleted(Complete); MaxCapacity = maxCapacity; } diff --git a/Streamistry.Testing/ExceptionRouteMapperTests.cs b/Streamistry.Testing/ExceptionRouteMapperTests.cs new file mode 100644 index 0000000..b4691ed --- /dev/null +++ b/Streamistry.Testing/ExceptionRouteMapperTests.cs @@ -0,0 +1,94 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using NUnit.Framework; +using Streamistry; +using Streamistry.Pipes.Sinks; + +namespace Streamistry.Testing; +public class ExceptionRouterMapperTests +{ + [Test] + public void Emit_ValidData_MainOnly() + { + var pipeline = new Pipeline(); + var mapper = new ExceptionRouterMapper(pipeline, x => 60 / x); + var mainSink = new MemorySink(mapper); + var exceptionSink = new MemorySink(mapper.Alternate); + pipeline.Emit(10); + pipeline.Emit(20); + pipeline.Emit(6); + + Assert.That(mainSink.State.Count, Is.EqualTo(3)); + Assert.That(mainSink.State.First, Is.EqualTo(6)); + Assert.That(mainSink.State.Last, Is.EqualTo(10)); + Assert.That(exceptionSink.State.Count, Is.EqualTo(0)); + } + + [Test] + public void Emit_InvalidData_ExceptionOnly() + { + var pipeline = new Pipeline(); + var mapper = new ExceptionRouterMapper(pipeline, x => 60 / x); + var mainSink = new MemorySink(mapper); + var exceptionSink = new MemorySink(mapper.Alternate); + pipeline.Emit(0); + + Assert.That(mainSink.State.Count, Is.EqualTo(0)); + Assert.That(exceptionSink.State.Count, Is.EqualTo(1)); + Assert.That(exceptionSink.State.First, Is.EqualTo(0)); + } + + [Test] + public void Emit_MixedDataNoExceptionPath_DontFail() + { + var pipeline = new Pipeline(); + var mapper = new ExceptionRouterMapper(pipeline, x => 60 / x); + var mainSink = new MemorySink(mapper); + pipeline.Emit(10); + pipeline.Emit(0); + pipeline.Emit(3); + + Assert.That(mainSink.State.Count, Is.EqualTo(2)); + Assert.That(mainSink.State.First, Is.EqualTo(6)); + Assert.That(mainSink.State.Last, Is.EqualTo(20)); + } + + [Test] + public void Emit_MixedDataWithExceptionPath_DontFail() + { + var pipeline = new Pipeline(); + var mapper = new ExceptionRouterMapper(pipeline, x => 60 / x); + var mainSink = new MemorySink(mapper); + var exceptionSink = new MemorySink(mapper.Alternate); + pipeline.Emit(10); + pipeline.Emit(0); + pipeline.Emit(3); + + Assert.That(mainSink.State.Count, Is.EqualTo(2)); + Assert.That(mainSink.State.First, Is.EqualTo(6)); + Assert.That(mainSink.State.Last, Is.EqualTo(20)); + Assert.That(exceptionSink.State.Count, Is.EqualTo(1)); + Assert.That(exceptionSink.State.First, Is.EqualTo(0)); + } + + [Test] + public void Emit_MixedDataWithExceptionPathAndExplicitMainPath_DontFail() + { + var pipeline = new Pipeline(); + var mapper = new ExceptionRouterMapper(pipeline, x => 60 / x); + var mainSink = new MemorySink(mapper.Main); + var exceptionSink = new MemorySink(mapper.Alternate); + pipeline.Emit(10); + pipeline.Emit(0); + pipeline.Emit(3); + + Assert.That(mainSink.State.Count, Is.EqualTo(2)); + Assert.That(mainSink.State.First, Is.EqualTo(6)); + Assert.That(mainSink.State.Last, Is.EqualTo(20)); + Assert.That(exceptionSink.State.Count, Is.EqualTo(1)); + Assert.That(exceptionSink.State.First, Is.EqualTo(0)); + } +}