Skip to content

Commit

Permalink
feat: add a splitter applying a specified function to each element to…
Browse files Browse the repository at this point in the history
… create multiple elements (#20)
  • Loading branch information
Seddryck authored Aug 21, 2024
1 parent 5f0b0e9 commit b0cdc2c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
34 changes: 34 additions & 0 deletions Streamistry.Core/Splitter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry;

/// <summary>
/// Represents a pipeline element that applies a specified function to each element within a batch of this stream to create multiple new elements.
/// The output stream is determined by the result of the function applied to the input elements.
/// </summary>
/// <typeparam name="TInput">The type of the elements in the input stream.</typeparam>
/// <typeparam name="TOutput">The type of the elements in the output stream after the function is applied.</typeparam>
internal class Splitter<TInput, TOutput> : ChainablePipe<TOutput>, IProcessablePipe<TInput>
{
public Func<TInput?, TOutput[]?> Function { get; init; }

public Splitter(IChainablePipe<TInput> upstream, Func<TInput?, TOutput[]?> function)
{
upstream.RegisterDownstream(Emit);
Function = function;
}

public void Emit(TInput? obj)
{
var results = Function.Invoke(obj);
if (results is null)
PushDownstream(default);
else
foreach (var result in results)
PushDownstream(result);
}
}
27 changes: 27 additions & 0 deletions Streamistry.Testing/SplitterTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework;
using Streamistry.Pipes.Sinks;

namespace Streamistry.Testing;
public class SplitterTests
{
[Test]
public void Mapper_InlineIncrement_Successful()
{
var pipeline = new Pipeline<string>();
var mapper = new Splitter<string, string>(pipeline, x => x?.Split(';') ?? null);
var sink = new MemorySink<string>(mapper);
mapper.Emit("foo;bar;quark");

Assert.That(sink.State, Has.Count.EqualTo(3));
Assert.Multiple(() =>
{
Assert.That(sink.State.First(), Is.EqualTo("foo"));
Assert.That(sink.State.Last(), Is.EqualTo("quark"));
});
}
}

0 comments on commit b0cdc2c

Please sign in to comment.