diff --git a/Streamistry.Core/Fluent/BranchesBuilder.cs b/Streamistry.Core/Fluent/BranchesBuilder.cs index fec4aec..a559465 100644 --- a/Streamistry.Core/Fluent/BranchesBuilder.cs +++ b/Streamistry.Core/Fluent/BranchesBuilder.cs @@ -25,3 +25,10 @@ public Pipeline Build() return Instances![0].Pipe.Pipeline!; } } + +public class InvalidUpstreamBranchException : InvalidOperationException +{ + public InvalidUpstreamBranchException(Type T1, Type T2) + : base($"Input branches of the Union operators must have the same type. Following distinct types were detected '{T1.Name}' and '{T2.Name}'") + { } +} diff --git a/Streamistry.Core/Fluent/UnionBuilder.cs b/Streamistry.Core/Fluent/UnionBuilder.cs new file mode 100644 index 0000000..acc44ed --- /dev/null +++ b/Streamistry.Core/Fluent/UnionBuilder.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Fluent; +public class UnionBuilder : BasePipeBuilder +{ + protected IBuilder Upstream { get; } + + public UnionBuilder(IBuilder upstream) + : base() + => Upstream = upstream; + + public override IChainablePort OnBuildPipeElement() + { + var upstreams = Upstream.BuildPipeElement().Select(x => x.Pipe); + if (upstreams.Any(x => x is not IChainablePipe)) + throw new InvalidOperationException(); + + var pipes = upstreams.Cast>().ToArray(); + + return new Union(pipes); + } +} diff --git a/Streamistry.SourceGenerator/BranchesBuilder.scriban b/Streamistry.SourceGenerator/BranchesBuilder.scriban index 7596cf6..719c75d 100644 --- a/Streamistry.SourceGenerator/BranchesBuilder.scriban +++ b/Streamistry.SourceGenerator/BranchesBuilder.scriban @@ -30,6 +30,15 @@ public CombinatorBuilder<{{ generics | array.join ", " }}, TOutput> Zip(Func<{{ generics | array.join "?, " }}?, TOutput> function) => new(this, function); + public UnionBuilder Union() + { + {{~ for type in generics | array.offset 1 ~}} + if (typeof(T1) != typeof({{type}})) + throw new InvalidUpstreamBranchException(typeof(T1), typeof({{type}})); + {{~ end ~}} + return new(this); + } + public BranchesBuilder Checkpoints(out IChainablePort[] ports) { ports = BuildPipeElement(); diff --git a/Streamistry.Testing/Fluent/PipelineBuilderTests.cs b/Streamistry.Testing/Fluent/PipelineBuilderTests.cs index dad7228..1c35909 100644 --- a/Streamistry.Testing/Fluent/PipelineBuilderTests.cs +++ b/Streamistry.Testing/Fluent/PipelineBuilderTests.cs @@ -607,4 +607,71 @@ public void Build_WithManySegments_Success() Assert.That(output, Does.Contain(12)); Assert.That(output, Does.Contain(36)); } + + [Test] + public void Build_WithUnion_Success() + { + var odd = new Segment(x => x.Filter(y => y % 2 == 1).Map(y => y * 2)); + var even = new Segment(x => x.Filter(y => y % 2 == 0).Map(y => y * 5)); + + var pipeline = new PipelineBuilder() + .Source([1, 2, 3, 6]) + .Branch(odd, even) + .Union().Checkpoint(out var union) + .Build(); + + var output = union.GetOutputs(pipeline.Start); + Assert.That(output, Does.Contain(2)); + Assert.That(output, Does.Contain(10)); + Assert.That(output, Does.Contain(6)); + Assert.That(output, Does.Contain(30)); + } + + [Test] + public void Build_WithUnionButDifferentType_Failure() + { + var odd = new Segment(x => x.Filter(y => y % 2 == 1).Map(y => y * 2)); + var even = new Segment(x => x.Filter(y => y % 2 == 0).Map(y => new string('*', y))); + + var pipeline = new PipelineBuilder() + .Source([1, 2, 3, 6]) + .Branch(odd, even); + + var ex = Assert.Throws(() => pipeline.Union()); + Assert.That(ex.Message, Is.EqualTo("Input branches of the Union operators must have the same type. Following distinct types were detected 'Int32' and 'String'")); + } + + public record Animal(string Name); + public record Carnivore(string Name) : Animal(Name); + public record Frugivore(string Name) : Animal(Name); + + [Test] + public void Build_WithUnionButDifferentTypeLinkedByInheritance_Failure() + { + var notCarnivore = new Segment(x => x.Filter(y => y is not Carnivore).Map(y => y)); + var carnivore = new Segment(x => x.Filter(y => y is Carnivore).Map(y => (Carnivore)y!)); + + var pipeline = new PipelineBuilder() + .Source([new Animal("Bird"), new Carnivore("Dog")]) + .Branch(notCarnivore, carnivore); + + var ex = Assert.Throws(() => pipeline.Union()); + Assert.That(ex.Message, Is.EqualTo("Input branches of the Union operators must have the same type. Following distinct types were detected 'Animal' and 'Carnivore'")); + } + + [Test] + public void Build_WithMoreThanTwoUpstreamsUnion_Success() + { + var common = new Segment(x => x.Filter(y => y is not Carnivore && y is not Frugivore).Map(y => y)); + var carnivore = new Segment(x => x.Filter(y => y is Carnivore).Map(y => y)); + var frugivore = new Segment(x => x.Filter(y => y is Frugivore).Map(y => y)); + + var pipeline = new PipelineBuilder() + .Source([new Animal("Bird"), new Carnivore("Dog")]) + .Branch(common, carnivore, frugivore) + .Union().Checkpoint(out var union) + .Build(); + + Assert.That(union.GetOutputs(pipeline.Start), Has.Length.EqualTo(2)); + } }