Skip to content

Commit

Permalink
feat: segment can handle a single route sub-part of a pipeline (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Sep 21, 2024
1 parent e6e9461 commit 43ed489
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 1 deletion.
3 changes: 3 additions & 0 deletions Streamistry.Core/Fluent/BasePipeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,7 @@ public ParserBuilder<TOutput, TNext> Parse<TNext>(ParserDelegate<TOutput, TNext>
=> new(this, parser);
public ParserBuilder<TOutput> Parse()
=> new(this);

public BinderBuilder<TOutput, TNext> Bind<TNext>(Segment<TOutput, TNext> segment)
=> new(this, segment);
}
23 changes: 23 additions & 0 deletions Streamistry.Core/Fluent/BinderBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Fluent;
public class BinderBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Segment<TInput, TOutput> Segment { get; }

public BinderBuilder(IPipeBuilder<TInput> upstream, Segment<TInput, TOutput> segment)
: base(upstream)
=> (Segment) = (segment);

public override IChainablePort<TOutput> OnBuildPipeElement()
{
var upstream = Upstream.BuildPipeElement();
var (input, output) = Segment.Craft(upstream.Pipe.Pipeline!);
input.Bind(upstream);
return output;
}
}
65 changes: 65 additions & 0 deletions Streamistry.Core/Fluent/Segment.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability;

namespace Streamistry.Fluent;

public class Segment<TInput, TOutput> : IPipeBuilder<TInput>
{
private VirtualInput<TInput> Input { get; set; } = new();
internal Func<BasePipeBuilder<TInput>, BasePipeBuilder<TOutput>> Builder { get; }

public Segment(Func<BasePipeBuilder<TInput>, BasePipeBuilder<TOutput>> builder)
=> Builder = builder;

public IChainablePort<TInput> BuildPipeElement()
=> Input ??= (VirtualInput<TInput>)OnBuildPipeElement();

public IChainablePort<TInput> OnBuildPipeElement()
=> new VirtualInput<TInput>();

public (IBindablePipe<TInput> input, IChainablePort<TOutput>) Craft(Pipeline pipeline)
{
Input.Pipeline = pipeline;
var builder = Builder.Invoke(Input);
builder.Build();
return (Input.GetTarget(), builder.BuildPipeElement());
}

private class VirtualInput<T> : BasePipeBuilder<T>, IChainablePort<T>, IChainablePipe
{
private IBindablePipe<T>? Target { get; set; }

public IChainablePipe Pipe
=> this;

public Pipeline? Pipeline { get; set; }

public IBindablePipe<T> GetTarget()
=> Target ?? throw new InvalidOperationException();

public void RegisterDownstream(Action<T?> downstream)
=> Target ??= (IBindablePipe<T>)downstream.Target!;

public void UnregisterDownstream(Action<T?> downstream)
=> Target ??= (IBindablePipe<T>)downstream.Target!;


public void RegisterOnCompleted(Action? complete)
{
if (complete is not null)
Target ??= (IBindablePipe<T>)complete.Target!;
}

public override IChainablePort<T> OnBuildPipeElement()
=> this;

public void RegisterObservability(ObservabilityProvider? provider) => throw new NotImplementedException();
public ObservabilityProvider? GetObservabilityProvider() => null;
}
}


11 changes: 11 additions & 0 deletions Streamistry.SourceGenerator/BasePipeBuilder.scriban
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,15 @@
indexes | array.each @concat | array.join ", "
}})
=> new(this, upstream{{ indexes | array.join ", upstream" }});

public BranchesBuilder<TOutput, {{ generics | array.join ", " }}> Branch<{{ generics | array.join ", " }}>(
{{-
func concat; ret string.append "Segment<TOutput, T" $0 | string.append "> upstream" | string.append $0; end
indexes | array.each @concat | array.join ", "
}})
=> new(this,
{{-
func concat; ret string.append "upstream" $0 | string.append ".Builder"; end
indexes | array.each @concat | array.join ", "
}});
}
53 changes: 52 additions & 1 deletion Streamistry.Testing/Fluent/PipelineBuilderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ public void Build_InBranchOfBranchCheckpointForAllPortsTypedAllAsserted_Success(
day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day).Branch(
day1 => day1.Map(x => x + 1)
, day2 => day2.Map(x => x + 2)
).Zip((x,y)=> x + y)
).Zip((x, y) => x + y)
, month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture))
).Checkpoints(out var portDay, out var portMonth)
.Build();
Expand Down Expand Up @@ -556,4 +556,55 @@ public void Build_CombineFiveUpstreamsCheckpoint_Success()
Assert.That(output, Does.Contain(25));
Assert.That(output, Does.Contain(30));
}

[Test]
public void Build_WithSingleSegment_Success()
{
var segment = new Segment<int, int>(x => x.Map(y => y * 2).Filter(y => y % 5 == 3));

var pipeline = new PipelineBuilder()
.Source([1, 2, 3])
.Map(x => x * 3)
.Bind(segment)
.Map(x => x % 4).Checkpoint(out var sink)
.Build();

var output = sink.GetOutputs(pipeline.Start);
Assert.That(output, Does.Contain(2));
}

[Test]
public void Build_WithSingleSegmentChangeType_Success()
{
var segment = new Segment<int, string>(x => x.Map(y => y + 3).Map(y => new string('*', y)));

var pipeline = new PipelineBuilder()
.Source([1, 2, 3])
.Map(x => x - 2)
.Bind(segment)
.Map(x => x!.PadLeft(4, '-')).Checkpoint(out var sink)
.Build();

var output = sink.GetOutputs(pipeline.Start);
Assert.That(output, Does.Contain("--**"));
Assert.That(output, Does.Contain("-***"));
Assert.That(output, Does.Contain("****"));
}

[Test]
public void Build_WithManySegments_Success()
{
var odd = new Segment<int, int>(x => x.Filter(y => y % 2 == 1).Map(y => y * 2));
var even = new Segment<int, int>(x => x.Filter(y => y % 2 == 0).Map(y => y * 5));

var pipeline = new PipelineBuilder()
.Source([1, 2, 3, 6])
.Branch(odd, even)
.Zip((x, y) => x + y).Checkpoint(out var zip)
.Build();

var output = zip.GetOutputs(pipeline.Start);
Assert.That(output, Does.Contain(12));
Assert.That(output, Does.Contain(36));
}
}

0 comments on commit 43ed489

Please sign in to comment.