From 55dedd934c19e130696ca097ea5622885351c337 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20L=2E=20Charlier?= Date: Tue, 27 Aug 2024 08:45:01 +0200 Subject: [PATCH] feat: add Union pipe to combine the output of many pipes as a single pipe (#34) feat: add Union pipe to combine the output of many pipes and present them as single pipe --- Streamistry.Core/Union.cs | 31 ++++++++++++++++++++++++++++++ Streamistry.Testing/UnionTests.cs | 32 +++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 Streamistry.Core/Union.cs create mode 100644 Streamistry.Testing/UnionTests.cs diff --git a/Streamistry.Core/Union.cs b/Streamistry.Core/Union.cs new file mode 100644 index 0000000..68d4021 --- /dev/null +++ b/Streamistry.Core/Union.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Streamistry.Observability; + +namespace Streamistry; + +/// +/// Represents a pipeline element that merges two or more upstream streams into a single downstream stream by emetting each values from each upstream. +/// +/// The type of the elements in any input stream. +public class Union : ChainablePipe +{ + public Union(IChainablePipe[] upstreams) + : base(upstreams[0]?.GetObservabilityProvider()) + { + foreach (var upstream in upstreams) + upstream.RegisterDownstream(Emit, Complete); + } + + [Meter] + public void Emit(TInput? obj) + => PushDownstream(Invoke(obj)); + + [Trace] + protected TInput? Invoke(TInput? obj) + => obj; +} diff --git a/Streamistry.Testing/UnionTests.cs b/Streamistry.Testing/UnionTests.cs new file mode 100644 index 0000000..e5109f7 --- /dev/null +++ b/Streamistry.Testing/UnionTests.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Security.Cryptography.X509Certificates; +using System.Text; +using System.Threading.Tasks; +using NUnit.Framework; +using Streamistry.Pipes.Sinks; +using Streamistry.Pipes.Sources; + +namespace Streamistry.Testing; +public class UnionTests +{ + [Test] + public void Union_ManyPipes_AsSinglePipe() + { + + var firstSource = new EnumerableSource([1, 2, 3]); + var secondSource = new EnumerableSource([10, 20, 30]); + var union = new Union([firstSource, secondSource]); + var sink = new MemorySink(union); + var pipeline = new Pipeline([firstSource, secondSource]); + pipeline.Start(); + + Assert.Multiple(() => + { + Assert.That(sink.State, Has.Count.EqualTo(6)); + Assert.That(sink.State.First(), Is.EqualTo(1)); + Assert.That(sink.State.Last(), Is.EqualTo(30)); + }); + } +}