From b0cdc2c121ceeddbeae32aa616486d59c4568430 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20L=2E=20Charlier?= Date: Wed, 21 Aug 2024 22:59:46 +0200 Subject: [PATCH] feat: add a splitter applying a specified function to each element to create multiple elements (#20) --- Streamistry.Core/Splitter.cs | 34 ++++++++++++++++++++++++++++ Streamistry.Testing/SplitterTests.cs | 27 ++++++++++++++++++++++ 2 files changed, 61 insertions(+) create mode 100644 Streamistry.Core/Splitter.cs create mode 100644 Streamistry.Testing/SplitterTests.cs diff --git a/Streamistry.Core/Splitter.cs b/Streamistry.Core/Splitter.cs new file mode 100644 index 0000000..185b6a5 --- /dev/null +++ b/Streamistry.Core/Splitter.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry; + +/// +/// Represents a pipeline element that applies a specified function to each element within a batch of this stream to create multiple new elements. +/// The output stream is determined by the result of the function applied to the input elements. +/// +/// The type of the elements in the input stream. +/// The type of the elements in the output stream after the function is applied. +internal class Splitter : ChainablePipe, IProcessablePipe +{ + public Func Function { get; init; } + + public Splitter(IChainablePipe upstream, Func function) + { + upstream.RegisterDownstream(Emit); + Function = function; + } + + public void Emit(TInput? obj) + { + var results = Function.Invoke(obj); + if (results is null) + PushDownstream(default); + else + foreach (var result in results) + PushDownstream(result); + } +} diff --git a/Streamistry.Testing/SplitterTests.cs b/Streamistry.Testing/SplitterTests.cs new file mode 100644 index 0000000..8bd1650 --- /dev/null +++ b/Streamistry.Testing/SplitterTests.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using NUnit.Framework; +using Streamistry.Pipes.Sinks; + +namespace Streamistry.Testing; +public class SplitterTests +{ + [Test] + public void Mapper_InlineIncrement_Successful() + { + var pipeline = new Pipeline(); + var mapper = new Splitter(pipeline, x => x?.Split(';') ?? null); + var sink = new MemorySink(mapper); + mapper.Emit("foo;bar;quark"); + + Assert.That(sink.State, Has.Count.EqualTo(3)); + Assert.Multiple(() => + { + Assert.That(sink.State.First(), Is.EqualTo("foo")); + Assert.That(sink.State.Last(), Is.EqualTo("quark")); + }); + } +}