Skip to content

Commit

Permalink
feat: add Union pipe to combine the output of many pipes as a single …
Browse files Browse the repository at this point in the history
…pipe (#34)

feat: add Union pipe to combine the output of many pipes and present them as single pipe
  • Loading branch information
Seddryck authored Aug 27, 2024
1 parent 3ff4f4e commit 55dedd9
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 0 deletions.
31 changes: 31 additions & 0 deletions Streamistry.Core/Union.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability;

namespace Streamistry;

/// <summary>
/// Represents a pipeline element that merges two or more upstream streams into a single downstream stream by emetting each values from each upstream.
/// </summary>
/// <typeparam name="TInput">The type of the elements in any input stream.</typeparam>
public class Union<TInput> : ChainablePipe<TInput>
{
public Union(IChainablePipe<TInput>[] upstreams)
: base(upstreams[0]?.GetObservabilityProvider())
{
foreach (var upstream in upstreams)
upstream.RegisterDownstream(Emit, Complete);
}

[Meter]
public void Emit(TInput? obj)
=> PushDownstream(Invoke(obj));

[Trace]
protected TInput? Invoke(TInput? obj)
=> obj;
}
32 changes: 32 additions & 0 deletions Streamistry.Testing/UnionTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework;
using Streamistry.Pipes.Sinks;
using Streamistry.Pipes.Sources;

namespace Streamistry.Testing;
public class UnionTests
{
[Test]
public void Union_ManyPipes_AsSinglePipe()
{

var firstSource = new EnumerableSource<int>([1, 2, 3]);
var secondSource = new EnumerableSource<int>([10, 20, 30]);
var union = new Union<int>([firstSource, secondSource]);
var sink = new MemorySink<int>(union);
var pipeline = new Pipeline([firstSource, secondSource]);
pipeline.Start();

Assert.Multiple(() =>
{
Assert.That(sink.State, Has.Count.EqualTo(6));
Assert.That(sink.State.First(), Is.EqualTo(1));
Assert.That(sink.State.Last(), Is.EqualTo(30));
});
}
}

0 comments on commit 55dedd9

Please sign in to comment.