Skip to content

Commit

Permalink
feat: add StreamBuffer to temporarily stores elements in a buffer (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Aug 25, 2024
1 parent e373dd3 commit 806409e
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 61 deletions.
7 changes: 4 additions & 3 deletions Streamistry.Core/Aggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ public class Aggregator<TSource, TAccumulate, TResult> : ChainablePipe<TResult>,
public Aggregator(IChainablePipe<TSource> upstream, Func<TAccumulate?, TSource?, TAccumulate?> accumulator, Func<TAccumulate?, TResult?> selector, TAccumulate? seed = default)
: base(upstream.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit);
upstream.RegisterDownstream(Emit, PushComplete);
(Accumulator, Selector, State) = (accumulator, selector, seed);
}

public void Emit(TSource? obj)
[Meter]
public virtual void Emit(TSource? obj)
=> PushDownstream(Invoke(obj));

[Trace]
protected TResult? Invoke(TSource? obj)
protected virtual TResult? Invoke(TSource? obj)
{
State = Accumulator.Invoke(State, obj);
return Selector.Invoke(State);
Expand Down
17 changes: 14 additions & 3 deletions Streamistry.Core/ChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ namespace Streamistry;
public abstract class ChainablePipe<T> : IChainablePipe<T>, IObservablePipe
{
private Action<T?>? Downstream { get; set; }
private Action? Completion { get; set; }

protected ObservabilityProvider? Observability { get; private set; }

public void RegisterDownstream(Action<T?> action)
=> Downstream += action;
public void RegisterDownstream(Action<T?> downstream, Action? completion)
{
Downstream += downstream;
Completion += completion;
}

protected ChainablePipe(ObservabilityProvider? observability)
=> RegisterObservability(observability);
Expand All @@ -24,6 +29,12 @@ public void RegisterObservability(ObservabilityProvider? observability)
public ObservabilityProvider? GetObservabilityProvider()
=> Observability;

protected virtual void PushDownstream(T? obj)
protected void PushDownstream(T? obj)
=> Downstream?.Invoke(obj);

public virtual void Complete()
=> PushComplete();

protected void PushComplete()
=> Completion?.Invoke();
}
15 changes: 13 additions & 2 deletions Streamistry.Core/Combinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ namespace Streamistry;
public abstract class Combinator<TFirst, TSecond, TResult> : ChainablePipe<TResult>
{
public Func<TFirst?, TSecond?, TResult?> Function { get; init; }
protected int BranchesCompleted { get; set; }

public Combinator(IChainablePipe<TFirst> firstUpstream, IChainablePipe<TSecond> secondUpstream, Func<TFirst?, TSecond?, TResult?> function)
: base(firstUpstream.GetObservabilityProvider())
{
firstUpstream.RegisterDownstream(EmitFirst);
secondUpstream.RegisterDownstream(EmitSecond);
firstUpstream.RegisterDownstream(EmitFirst, Complete);
secondUpstream.RegisterDownstream(EmitSecond, Complete);

Function = function;
}
Expand All @@ -44,6 +45,16 @@ public void EmitSecond(TSecond? second)
Queue(second);
}

public override void Complete()
{
BranchesCompleted += 1;
if (BranchesCompleted > 1)
{
BranchesCompleted = 0;
PushComplete();
}
}

[Trace]
protected TResult? Invoke(TFirst? first, TSecond? second)
=> Function.Invoke(first, second);
Expand Down
2 changes: 1 addition & 1 deletion Streamistry.Core/Filter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class Filter<TInput> : ChainablePipe<TInput>, IProcessablePipe<TInput>
public Filter(IChainablePipe<TInput> upstream, Func<TInput?, bool> predicate)
: base(upstream.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit);
upstream.RegisterDownstream(Emit, Complete);
Predicate = predicate;
}

Expand Down
2 changes: 1 addition & 1 deletion Streamistry.Core/IChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
namespace Streamistry;
public interface IChainablePipe<T> : IObservablePipe
{
void RegisterDownstream(Action<T?> action);
void RegisterDownstream(Action<T?> action, Action? complete);
}
2 changes: 1 addition & 1 deletion Streamistry.Core/Mapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class Mapper<TInput, TOutput> : ChainablePipe<TOutput>, IProcessablePipe<
public Mapper(IChainablePipe<TInput> upstream, Func<TInput?, TOutput?> function)
: base(upstream.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit);
upstream.RegisterDownstream(Emit, Complete);
Function = function;
}

Expand Down
3 changes: 2 additions & 1 deletion Streamistry.Core/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ public class Sink<T> : IProcessablePipe<T>, IObservablePipe

public Sink(IChainablePipe<T> upstream, Action<T?> function)
{
upstream.RegisterDownstream(Emit);
upstream.RegisterDownstream(Emit, null);
RegisterObservability(upstream.GetObservabilityProvider());
Function = function;
}

[Meter]
public void Emit(T? obj)
=> Invoke(obj);

Expand Down
1 change: 1 addition & 0 deletions Streamistry.Core/Source.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ protected virtual void Read()
{
while (IsStarted && TryReadNext(out var item))
PushDownstream(item);
PushComplete();
}

protected abstract bool TryReadNext(out TOutput? item);
Expand Down
3 changes: 2 additions & 1 deletion Streamistry.Core/Splitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ internal class Splitter<TInput, TOutput> : ChainablePipe<TOutput>, IProcessableP
public Splitter(IChainablePipe<TInput> upstream, Func<TInput?, TOutput[]?> function)
: base(upstream.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit);
upstream.RegisterDownstream(Emit, Complete);
Function = function;
}

[Meter]
public void Emit(TInput? obj)
{
var results = Invoke(obj);
Expand Down
50 changes: 50 additions & 0 deletions Streamistry.Core/StreamBuffer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability;

namespace Streamistry;

/// <summary>
/// Represents a pipeline element that temporarily stores elements in a buffer as they pass through the stream.
/// The buffer can accumulate a specified number of elements or a batch of elements over time, allowing for more efficient processing or delayed downstream transmission.
/// Once the buffer reaches a certain threshold, such as a maximum size or time limit, its contents are released downstream for further processing.
/// </summary>
/// <typeparam name="T">The type of the elements stored in the buffer.</typeparam>
public class StreamBuffer<T> : ChainablePipe<T>, IProcessablePipe<T>
{
protected List<T?> Store { get; } = [];
protected int? MaxCapacity { get; }

public StreamBuffer(IChainablePipe<T> upstream, int? maxCapacity = null)
: base(upstream.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit, Complete);
MaxCapacity = maxCapacity;
}

[Meter]
public void Emit(T? obj)
{
Invoke(obj);
if (MaxCapacity.HasValue && Store.Count >= MaxCapacity.Value)
{
Complete();
Store.Clear();
}
}

[Trace]
protected void Invoke(T? obj)
=> Store.Add(obj);

public override void Complete()
{
foreach (var item in Store)
PushDownstream(item);
PushComplete();
}
}
36 changes: 36 additions & 0 deletions Streamistry.Testing/ConsoleOutput.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Testing;
internal class ConsoleOutput : IDisposable
{
private readonly StringWriter stringWriter = new();
private readonly TextWriter originalOutput = Console.Out;

public ConsoleOutput()
=> Console.SetOut(stringWriter);

public string GetOuput()
=> stringWriter.ToString();

public int CountSubstring(string value)
{
var text = GetOuput();
int count = 0, minIndex = text.IndexOf(value, 0);
while (minIndex != -1)
{
minIndex = text.IndexOf(value, minIndex + value.Length);
count++;
}
return count;
}

public void Dispose()
{
Console.SetOut(originalOutput);
stringWriter.Dispose();
}
}
30 changes: 0 additions & 30 deletions Streamistry.Testing/Observability/MeterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,6 @@
namespace Streamistry.Testing.Observability;
public class MeterTests
{
private class ConsoleOutput : IDisposable
{
private readonly StringWriter stringWriter = new();
private readonly TextWriter originalOutput = Console.Out;

public ConsoleOutput()
=> Console.SetOut(stringWriter);

public string GetOuput()
=> stringWriter.ToString();

public int CountSubstring(string value)
{
var text = GetOuput();
int count = 0, minIndex = text.IndexOf(value, 0);
while (minIndex != -1)
{
minIndex = text.IndexOf(value, minIndex + value.Length);
count++;
}
return count;
}

public void Dispose()
{
Console.SetOut(originalOutput);
stringWriter.Dispose();
}
}

[Test]
public void Counter_Mapper_ReturnCount()
{
Expand Down
18 changes: 0 additions & 18 deletions Streamistry.Testing/Pipes/Sinks/DebugOutputSinkTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,6 @@ namespace Streamistry.Testing.Pipes.Sinks;

public class DebugOutputSinkTests
{
private class ConsoleOutput : IDisposable
{
private readonly StringWriter stringWriter = new();
private readonly TextWriter originalOutput = Console.Out;

public ConsoleOutput()
=> Console.SetOut(stringWriter);

public string GetOuput()
=> stringWriter.ToString();

public void Dispose()
{
Console.SetOut(originalOutput);
stringWriter.Dispose();
}
}

[Test]
public void Emit_DisplayOneElement_Successful()
{
Expand Down
85 changes: 85 additions & 0 deletions Streamistry.Testing/StreamBufferTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework;
using Streamistry.Pipes.Sinks;
using Streamistry.Pipes.Sources;

namespace Streamistry.Testing;
public class StreamBufferTests
{
[Test]
public void WithoutBuffer_TwoSinks_Overlaps()
{
var pipeline = new Pipeline<int>();
var mapper = new Mapper<int, int>(pipeline, x => x + 1);
var firstSink = new DebugOutputSink<int>(mapper);
var secondSink = new DebugOutputSink<int>(mapper);

using var output = new ConsoleOutput();

pipeline.Emit(0);
pipeline.Emit(1);
pipeline.Emit(2);
Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 1\r\n>>> 2\r\n>>> 2\r\n>>> 3\r\n>>> 3\r\n"));
}

[Test]
public void WithBuffer_TwoSinks_NoOverlap()
{
var pipeline = new Pipeline<int>();
var mapper = new Mapper<int, int>(pipeline, x => x + 1);
var firstSink = new DebugOutputSink<int>(mapper);
var buffer = new StreamBuffer<int>(mapper);
var secondSink = new DebugOutputSink<int>(buffer);

using var output = new ConsoleOutput();

pipeline.Emit(0);
pipeline.Emit(1);
pipeline.Emit(2);
Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n>>> 3\r\n"));

pipeline.Complete();
Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 1\r\n>>> 2\r\n>>> 3\r\n"));
}

[Test]
public void WithBufferMaxCapacity_TwoSinks_NoOverlap()
{
var pipeline = new Pipeline<int>();
var mapper = new Mapper<int, int>(pipeline, x => x + 1);
var firstSink = new DebugOutputSink<int>(mapper);
var buffer = new StreamBuffer<int>(mapper, 3);
var secondSink = new DebugOutputSink<int>(buffer);

using var output = new ConsoleOutput();

pipeline.Emit(0);
pipeline.Emit(1);
Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n"));
pipeline.Emit(2);
Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 1\r\n>>> 2\r\n>>> 3\r\n"));
pipeline.Emit(3);
Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 4\r\n"));
pipeline.Complete();
Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 4\r\n>>> 4\r\n"));
}

[Test]
public void WithBufferMaxCapacity_TwoSources_NoOverlapSourcePushCompletion()
{
var source = new EnumerableSource<int>(Enumerable.Range(0, 4));
var mapper = new Mapper<int, int>(source, x => x + 1);
var firstSink = new DebugOutputSink<int>(mapper);
var buffer = new StreamBuffer<int>(mapper, 3);
var secondSink = new DebugOutputSink<int>(buffer);

using var output = new ConsoleOutput();
source.Start();
Assert.That(output.GetOuput(), Is.EqualTo(">>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 1\r\n>>> 2\r\n>>> 3\r\n>>> 4\r\n>>> 4\r\n"));
}
}

0 comments on commit 806409e

Please sign in to comment.