diff --git a/Streamistry.Core/Union.cs b/Streamistry.Core/Union.cs
new file mode 100644
index 0000000..68d4021
--- /dev/null
+++ b/Streamistry.Core/Union.cs
@@ -0,0 +1,31 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Streamistry.Observability;
+
+namespace Streamistry;
+
+///
+/// Represents a pipeline element that merges two or more upstream streams into a single downstream stream by emetting each values from each upstream.
+///
+/// The type of the elements in any input stream.
+public class Union : ChainablePipe
+{
+ public Union(IChainablePipe[] upstreams)
+ : base(upstreams[0]?.GetObservabilityProvider())
+ {
+ foreach (var upstream in upstreams)
+ upstream.RegisterDownstream(Emit, Complete);
+ }
+
+ [Meter]
+ public void Emit(TInput? obj)
+ => PushDownstream(Invoke(obj));
+
+ [Trace]
+ protected TInput? Invoke(TInput? obj)
+ => obj;
+}
diff --git a/Streamistry.Testing/UnionTests.cs b/Streamistry.Testing/UnionTests.cs
new file mode 100644
index 0000000..e5109f7
--- /dev/null
+++ b/Streamistry.Testing/UnionTests.cs
@@ -0,0 +1,32 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Security.Cryptography.X509Certificates;
+using System.Text;
+using System.Threading.Tasks;
+using NUnit.Framework;
+using Streamistry.Pipes.Sinks;
+using Streamistry.Pipes.Sources;
+
+namespace Streamistry.Testing;
+public class UnionTests
+{
+ [Test]
+ public void Union_ManyPipes_AsSinglePipe()
+ {
+
+ var firstSource = new EnumerableSource([1, 2, 3]);
+ var secondSource = new EnumerableSource([10, 20, 30]);
+ var union = new Union([firstSource, secondSource]);
+ var sink = new MemorySink(union);
+ var pipeline = new Pipeline([firstSource, secondSource]);
+ pipeline.Start();
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(sink.State, Has.Count.EqualTo(6));
+ Assert.That(sink.State.First(), Is.EqualTo(1));
+ Assert.That(sink.State.Last(), Is.EqualTo(30));
+ });
+ }
+}