diff --git a/Streamistry.Core/Aggregator.cs b/Streamistry.Core/Aggregator.cs index 9d6c69a..19bf01e 100644 --- a/Streamistry.Core/Aggregator.cs +++ b/Streamistry.Core/Aggregator.cs @@ -24,15 +24,16 @@ public class Aggregator : ChainablePipe, public Aggregator(IChainablePipe upstream, Func accumulator, Func selector, TAccumulate? seed = default) : base(upstream.GetObservabilityProvider()) { - upstream.RegisterDownstream(Emit); + upstream.RegisterDownstream(Emit, PushComplete); (Accumulator, Selector, State) = (accumulator, selector, seed); } - public void Emit(TSource? obj) + [Meter] + public virtual void Emit(TSource? obj) => PushDownstream(Invoke(obj)); [Trace] - protected TResult? Invoke(TSource? obj) + protected virtual TResult? Invoke(TSource? obj) { State = Accumulator.Invoke(State, obj); return Selector.Invoke(State); diff --git a/Streamistry.Core/ChainablePipe.cs b/Streamistry.Core/ChainablePipe.cs index 02afa89..f1c9438 100644 --- a/Streamistry.Core/ChainablePipe.cs +++ b/Streamistry.Core/ChainablePipe.cs @@ -10,10 +10,15 @@ namespace Streamistry; public abstract class ChainablePipe : IChainablePipe, IObservablePipe { private Action? Downstream { get; set; } + private Action? Completion { get; set; } + protected ObservabilityProvider? Observability { get; private set; } - public void RegisterDownstream(Action action) - => Downstream += action; + public void RegisterDownstream(Action downstream, Action? completion) + { + Downstream += downstream; + Completion += completion; + } protected ChainablePipe(ObservabilityProvider? observability) => RegisterObservability(observability); @@ -24,6 +29,12 @@ public void RegisterObservability(ObservabilityProvider? observability) public ObservabilityProvider? GetObservabilityProvider() => Observability; - protected virtual void PushDownstream(T? obj) + protected void PushDownstream(T? obj) => Downstream?.Invoke(obj); + + public virtual void Complete() + => PushComplete(); + + protected void PushComplete() + => Completion?.Invoke(); } diff --git a/Streamistry.Core/Combinator.cs b/Streamistry.Core/Combinator.cs index daee73c..34140e7 100644 --- a/Streamistry.Core/Combinator.cs +++ b/Streamistry.Core/Combinator.cs @@ -18,12 +18,13 @@ namespace Streamistry; public abstract class Combinator : ChainablePipe { public Func Function { get; init; } + protected int BranchesCompleted { get; set; } public Combinator(IChainablePipe firstUpstream, IChainablePipe secondUpstream, Func function) : base(firstUpstream.GetObservabilityProvider()) { - firstUpstream.RegisterDownstream(EmitFirst); - secondUpstream.RegisterDownstream(EmitSecond); + firstUpstream.RegisterDownstream(EmitFirst, Complete); + secondUpstream.RegisterDownstream(EmitSecond, Complete); Function = function; } @@ -44,6 +45,16 @@ public void EmitSecond(TSecond? second) Queue(second); } + public override void Complete() + { + BranchesCompleted += 1; + if (BranchesCompleted > 1) + { + BranchesCompleted = 0; + PushComplete(); + } + } + [Trace] protected TResult? Invoke(TFirst? first, TSecond? second) => Function.Invoke(first, second); diff --git a/Streamistry.Core/Filter.cs b/Streamistry.Core/Filter.cs index 21c5f26..248ec2e 100644 --- a/Streamistry.Core/Filter.cs +++ b/Streamistry.Core/Filter.cs @@ -20,7 +20,7 @@ public class Filter : ChainablePipe, IProcessablePipe public Filter(IChainablePipe upstream, Func predicate) : base(upstream.GetObservabilityProvider()) { - upstream.RegisterDownstream(Emit); + upstream.RegisterDownstream(Emit, Complete); Predicate = predicate; } diff --git a/Streamistry.Core/IChainablePipe.cs b/Streamistry.Core/IChainablePipe.cs index abdd7cd..b3e6fae 100644 --- a/Streamistry.Core/IChainablePipe.cs +++ b/Streamistry.Core/IChainablePipe.cs @@ -7,5 +7,5 @@ namespace Streamistry; public interface IChainablePipe : IObservablePipe { - void RegisterDownstream(Action action); + void RegisterDownstream(Action action, Action? complete); } diff --git a/Streamistry.Core/Mapper.cs b/Streamistry.Core/Mapper.cs index 5a3e5f4..2fb9cfa 100644 --- a/Streamistry.Core/Mapper.cs +++ b/Streamistry.Core/Mapper.cs @@ -21,7 +21,7 @@ public class Mapper : ChainablePipe, IProcessablePipe< public Mapper(IChainablePipe upstream, Func function) : base(upstream.GetObservabilityProvider()) { - upstream.RegisterDownstream(Emit); + upstream.RegisterDownstream(Emit, Complete); Function = function; } diff --git a/Streamistry.Core/Sink.cs b/Streamistry.Core/Sink.cs index 5830e29..a8d7de7 100644 --- a/Streamistry.Core/Sink.cs +++ b/Streamistry.Core/Sink.cs @@ -13,11 +13,12 @@ public class Sink : IProcessablePipe, IObservablePipe public Sink(IChainablePipe upstream, Action function) { - upstream.RegisterDownstream(Emit); + upstream.RegisterDownstream(Emit, null); RegisterObservability(upstream.GetObservabilityProvider()); Function = function; } + [Meter] public void Emit(T? obj) => Invoke(obj); diff --git a/Streamistry.Core/Source.cs b/Streamistry.Core/Source.cs index 9a36fb2..b32db53 100644 --- a/Streamistry.Core/Source.cs +++ b/Streamistry.Core/Source.cs @@ -36,6 +36,7 @@ protected virtual void Read() { while (IsStarted && TryReadNext(out var item)) PushDownstream(item); + PushComplete(); } protected abstract bool TryReadNext(out TOutput? item); diff --git a/Streamistry.Core/Splitter.cs b/Streamistry.Core/Splitter.cs index 13bf42c..7a959d7 100644 --- a/Streamistry.Core/Splitter.cs +++ b/Streamistry.Core/Splitter.cs @@ -20,10 +20,11 @@ internal class Splitter : ChainablePipe, IProcessableP public Splitter(IChainablePipe upstream, Func function) : base(upstream.GetObservabilityProvider()) { - upstream.RegisterDownstream(Emit); + upstream.RegisterDownstream(Emit, Complete); Function = function; } + [Meter] public void Emit(TInput? obj) { var results = Invoke(obj); diff --git a/Streamistry.Core/StreamBuffer.cs b/Streamistry.Core/StreamBuffer.cs new file mode 100644 index 0000000..4ad8e91 --- /dev/null +++ b/Streamistry.Core/StreamBuffer.cs @@ -0,0 +1,50 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Streamistry.Observability; + +namespace Streamistry; + +/// +/// Represents a pipeline element that temporarily stores elements in a buffer as they pass through the stream. +/// The buffer can accumulate a specified number of elements or a batch of elements over time, allowing for more efficient processing or delayed downstream transmission. +/// Once the buffer reaches a certain threshold, such as a maximum size or time limit, its contents are released downstream for further processing. +/// +/// The type of the elements stored in the buffer. +public class StreamBuffer : ChainablePipe, IProcessablePipe +{ + protected List Store { get; } = []; + protected int? MaxCapacity { get; } + + public StreamBuffer(IChainablePipe upstream, int? maxCapacity = null) + : base(upstream.GetObservabilityProvider()) + { + upstream.RegisterDownstream(Emit, Complete); + MaxCapacity = maxCapacity; + } + + [Meter] + public void Emit(T? obj) + { + Invoke(obj); + if (MaxCapacity.HasValue && Store.Count >= MaxCapacity.Value) + { + Complete(); + Store.Clear(); + } + } + + [Trace] + protected void Invoke(T? obj) + => Store.Add(obj); + + public override void Complete() + { + foreach (var item in Store) + PushDownstream(item); + PushComplete(); + } +} diff --git a/Streamistry.Testing/ConsoleOutput.cs b/Streamistry.Testing/ConsoleOutput.cs new file mode 100644 index 0000000..47bb18a --- /dev/null +++ b/Streamistry.Testing/ConsoleOutput.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Testing; +internal class ConsoleOutput : IDisposable +{ + private readonly StringWriter stringWriter = new(); + private readonly TextWriter originalOutput = Console.Out; + + public ConsoleOutput() + => Console.SetOut(stringWriter); + + public string GetOuput() + => stringWriter.ToString(); + + public int CountSubstring(string value) + { + var text = GetOuput(); + int count = 0, minIndex = text.IndexOf(value, 0); + while (minIndex != -1) + { + minIndex = text.IndexOf(value, minIndex + value.Length); + count++; + } + return count; + } + + public void Dispose() + { + Console.SetOut(originalOutput); + stringWriter.Dispose(); + } +} diff --git a/Streamistry.Testing/Observability/MeterTests.cs b/Streamistry.Testing/Observability/MeterTests.cs index d995b83..96bad45 100644 --- a/Streamistry.Testing/Observability/MeterTests.cs +++ b/Streamistry.Testing/Observability/MeterTests.cs @@ -13,36 +13,6 @@ namespace Streamistry.Testing.Observability; public class MeterTests { - private class ConsoleOutput : IDisposable - { - private readonly StringWriter stringWriter = new(); - private readonly TextWriter originalOutput = Console.Out; - - public ConsoleOutput() - => Console.SetOut(stringWriter); - - public string GetOuput() - => stringWriter.ToString(); - - public int CountSubstring(string value) - { - var text = GetOuput(); - int count = 0, minIndex = text.IndexOf(value, 0); - while (minIndex != -1) - { - minIndex = text.IndexOf(value, minIndex + value.Length); - count++; - } - return count; - } - - public void Dispose() - { - Console.SetOut(originalOutput); - stringWriter.Dispose(); - } - } - [Test] public void Counter_Mapper_ReturnCount() { diff --git a/Streamistry.Testing/Pipes/Sinks/DebugOutputSinkTests.cs b/Streamistry.Testing/Pipes/Sinks/DebugOutputSinkTests.cs index 0091aca..a87a4c6 100644 --- a/Streamistry.Testing/Pipes/Sinks/DebugOutputSinkTests.cs +++ b/Streamistry.Testing/Pipes/Sinks/DebugOutputSinkTests.cs @@ -11,24 +11,6 @@ namespace Streamistry.Testing.Pipes.Sinks; public class DebugOutputSinkTests { - private class ConsoleOutput : IDisposable - { - private readonly StringWriter stringWriter = new(); - private readonly TextWriter originalOutput = Console.Out; - - public ConsoleOutput() - => Console.SetOut(stringWriter); - - public string GetOuput() - => stringWriter.ToString(); - - public void Dispose() - { - Console.SetOut(originalOutput); - stringWriter.Dispose(); - } - } - [Test] public void Emit_DisplayOneElement_Successful() { diff --git a/Streamistry.Testing/StreamBufferTests.cs b/Streamistry.Testing/StreamBufferTests.cs new file mode 100644 index 0000000..8c4e0e6 --- /dev/null +++ b/Streamistry.Testing/StreamBufferTests.cs @@ -0,0 +1,85 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Security.Cryptography.X509Certificates; +using System.Text; +using System.Threading.Tasks; +using NUnit.Framework; +using Streamistry.Pipes.Sinks; +using Streamistry.Pipes.Sources; + +namespace Streamistry.Testing; +public class StreamBufferTests +{ + [Test] + public void WithoutBuffer_TwoSinks_Overlaps() + { + var pipeline = new Pipeline(); + var mapper = new Mapper(pipeline, x => x + 1); + var firstSink = new DebugOutputSink(mapper); + var secondSink = new DebugOutputSink(mapper); + + using var output = new ConsoleOutput(); + + pipeline.Emit(0); + pipeline.Emit(1); + pipeline.Emit(2); + Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 1\r\n>>> 2\r\n>>> 2\r\n>>> 3\r\n>>> 3\r\n")); + } + + [Test] + public void WithBuffer_TwoSinks_NoOverlap() + { + var pipeline = new Pipeline(); + var mapper = new Mapper(pipeline, x => x + 1); + var firstSink = new DebugOutputSink(mapper); + var buffer = new StreamBuffer(mapper); + var secondSink = new DebugOutputSink(buffer); + + using var output = new ConsoleOutput(); + + pipeline.Emit(0); + pipeline.Emit(1); + pipeline.Emit(2); + Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n>>> 3\r\n")); + + pipeline.Complete(); + Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 1\r\n>>> 2\r\n>>> 3\r\n")); + } + + [Test] + public void WithBufferMaxCapacity_TwoSinks_NoOverlap() + { + var pipeline = new Pipeline(); + var mapper = new Mapper(pipeline, x => x + 1); + var firstSink = new DebugOutputSink(mapper); + var buffer = new StreamBuffer(mapper, 3); + var secondSink = new DebugOutputSink(buffer); + + using var output = new ConsoleOutput(); + + pipeline.Emit(0); + pipeline.Emit(1); + Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n")); + pipeline.Emit(2); + Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 1\r\n>>> 2\r\n>>> 3\r\n")); + pipeline.Emit(3); + Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 4\r\n")); + pipeline.Complete(); + Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 4\r\n>>> 4\r\n")); + } + + [Test] + public void WithBufferMaxCapacity_TwoSources_NoOverlapSourcePushCompletion() + { + var source = new EnumerableSource(Enumerable.Range(0, 4)); + var mapper = new Mapper(source, x => x + 1); + var firstSink = new DebugOutputSink(mapper); + var buffer = new StreamBuffer(mapper, 3); + var secondSink = new DebugOutputSink(buffer); + + using var output = new ConsoleOutput(); + source.Start(); + Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 4\r\n>>> 4\r\n")); + } +}