Skip to content

Commit

Permalink
feat: define a route for each output ports (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Sep 28, 2024
1 parent c848cac commit 1225e4f
Show file tree
Hide file tree
Showing 14 changed files with 389 additions and 81 deletions.
11 changes: 11 additions & 0 deletions Streamistry.Core/DualRouterPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability;
using static System.Collections.Specialized.BitVector32;

namespace Streamistry;
public abstract class DualRouterPipe<TInput, TOutput> : ChainablePipe<TOutput>, IDualRoute<TOutput, TInput>, IBindablePipe<TInput>
{
public OutputPort<TInput> Alternate { get; }
public new OutputPort<TOutput> Main { get => base.Main; }

IChainablePort IDualRoute.Main => Main;
IChainablePort IDualRoute.Alternate => Alternate;

public DualRouterPipe(IChainablePort<TInput>? upstream)
: base(upstream?.Pipe)
{
Expand All @@ -29,8 +34,14 @@ public void Bind(IChainablePort<TInput> input)
Pipeline = input.Pipe.Pipeline;
}

public void Bind(IChainablePort input)
=> Bind(input as IChainablePort<TInput> ?? throw new InvalidCastException());

public void Unbind(IChainablePort<TInput> input)
{
input.UnregisterDownstream(Emit);
}

public void Unbind(IChainablePort input)
=> Unbind(input as IChainablePort<TInput> ?? throw new InvalidCastException());
}
52 changes: 39 additions & 13 deletions Streamistry.Core/Fluent/CasterBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,54 @@
using Streamistry.Pipes.Mappers;

namespace Streamistry.Fluent;
public class CasterBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>, ISafeBuilder<CasterBuilder<TInput, TOutput>>
public class CasterBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
{
private bool IsSafe { get; set; } = false;

public CasterBuilder(IPipeBuilder<TInput> upstream)
: base(upstream)
{ }

public CasterBuilder<TInput, TOutput> Safe()
{
IsSafe = true;
return this;
}
public SafeCasterBuilder<TInput, TOutput> Safe()
=> new (Upstream);

public override IChainablePort<TOutput> OnBuildPipeElement()
=> IsSafe
? new SafeCaster<TInput, TOutput>(
Upstream.BuildPipeElement()
)
: new Caster<TInput, TOutput>(
=> new Caster<TInput, TOutput>(
Upstream.BuildPipeElement()
);
}

public class SafeCasterBuilder<TInput, TOutput> : CasterBuilder<TInput, TOutput>, IBuilder<IDualRoute>
{
protected new IDualRoute? Instance { get; set; }

public new IDualRoute BuildPipeElement()
=> Instance ??= OnBuildPipeElement() is IDualRoute dual ? dual : throw new InvalidCastException();

public override IChainablePort<TOutput> OnBuildPipeElement()
=> new SafeCaster<TInput, TOutput>(
Upstream.BuildPipeElement()
);

IDualRoute IBuilder<IDualRoute>.OnBuildPipeElement()
=> throw new InvalidOperationException();

public SafeCasterBuilder(IPipeBuilder<TInput> upstream)
: base(upstream)
{ }

public RoutesBuilder Route<TPort, TNext>(Func<IDualRoute, IChainablePort> port, Segment<TPort, TNext> segment)
{
var routeBuilder = new RoutesBuilder(this);
routeBuilder.Add(port, segment);
return routeBuilder;
}

public RoutesBuilder Route<TPort, TNext>(Func<IDualRoute, IChainablePort> port, Func<BasePipeBuilder<TPort>, BasePipeBuilder<TNext>> path)
{
var routeBuilder = new RoutesBuilder(this);
routeBuilder.Add(port, new Segment<TPort, TNext>(path));
return routeBuilder;
}

public ConvergerBuilder<TNext> Converge<TNext>()
=> new(this);
}
34 changes: 34 additions & 0 deletions Streamistry.Core/Fluent/ConvergerBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Fluent;
public class ConvergerBuilder<TNext> : BaseRoutesBuilder
{
protected interface IRouteBuilder<TIn, TOut> : IRouteBuilder
{
new IChainablePort<TOut> Build(IDualRoute dual);
}

public ConvergerBuilder(IBuilder<IDualRoute> upstream)
: base(upstream)
{ }

public ConvergerBuilder<TNext> Route(Func<IDualRoute, IChainablePort> port, ISegment segment)
{
Add(port, segment);
return this;
}

public ConvergerBuilder<TNext> Route<TOutput>(Func<IDualRoute, IChainablePort> port, Func<BasePipeBuilder<TOutput>, BasePipeBuilder<TNext>> path)
{
Add(port, new Segment<TOutput, TNext>(path));
return this;
}

public UnionBuilder<TNext> Union()
=> new(this);
}
50 changes: 39 additions & 11 deletions Streamistry.Core/Fluent/MapperBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,34 +1,62 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Fluent;
public class MapperBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>, ISafeBuilder<MapperBuilder<TInput, TOutput>>
public class MapperBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Func<TInput, TOutput>? Function { get; set; }
private bool IsSafe { get; set; } = false;

public MapperBuilder(IPipeBuilder<TInput> upstream, Func<TInput, TOutput>? function)
: base(upstream)
=> (Function) = (function);

public MapperBuilder<TInput, TOutput> Safe()
{
IsSafe = true;
return this;
}
public SafeMapperBuilder<TInput, TOutput> Safe()
=> new (Upstream, Function);

public override IChainablePort<TOutput> OnBuildPipeElement()
=> IsSafe
? new ExceptionMapper<TInput, TOutput>(
=> new Mapper<TInput, TOutput>(
Upstream.BuildPipeElement()
, Function ?? throw new InvalidOperationException()
)
: new Mapper<TInput, TOutput>(
);
}

public class SafeMapperBuilder<TInput, TOutput> : MapperBuilder<TInput, TOutput>, IBuilder<IDualRoute>
{
protected new IDualRoute? Instance { get; set; }

public new IDualRoute BuildPipeElement()
=> Instance ??= OnBuildPipeElement();

public new IDualRoute OnBuildPipeElement()
=> new SafeMapper<TInput, TOutput>(
Upstream.BuildPipeElement()
, Function ?? throw new InvalidOperationException()
);

public SafeMapperBuilder(IPipeBuilder<TInput> upstream, Func<TInput, TOutput>? function)
: base(upstream, function)
{ }

public RoutesBuilder Route<TPort, TNext>(Func<IDualRoute, IChainablePort> port, Segment<TPort, TNext> segment)
{
var routeBuilder = new RoutesBuilder(this);
routeBuilder.Add(port, segment);
return routeBuilder;
}

public RoutesBuilder Route<TPort, TNext>(Func<IDualRoute, IChainablePort> port, Func<BasePipeBuilder<TPort>, BasePipeBuilder<TNext>> path)
{
var routeBuilder = new RoutesBuilder(this);
routeBuilder.Add(port, new Segment<TPort, TNext>(path));
return routeBuilder;
}

public ConvergerBuilder<TNext> Converge<TNext>()
=> new(this);
}
69 changes: 33 additions & 36 deletions Streamistry.Core/Fluent/ParserBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Streamistry.Fluent;

public class ParserBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
public class ParserBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>, IBuilder<IDualRoute>
{
protected IFormatProvider? FormatProvider { get; set; }
protected ParserDelegate<TInput, TOutput> ParseFunction { get; }
Expand All @@ -25,6 +25,10 @@ public ParserBuilder<TInput, TOutput> WithFormatProvider(IFormatProvider formatP

public override IChainablePort<TOutput> OnBuildPipeElement()
=> new Parser<TInput, TOutput>(Upstream.BuildPipeElement(), ParseFunction);
IDualRoute IBuilder<IDualRoute>.BuildPipeElement()
=> base.BuildPipeElement().Pipe is IDualRoute dual ? dual : throw new InvalidCastException();
IDualRoute IBuilder<IDualRoute>.OnBuildPipeElement()
=> throw new NotImplementedException();
}

public class ParserBuilder<TInput>
Expand All @@ -47,7 +51,7 @@ public ParserBuilder<TInput> WithFormatProvider(IFormatProvider formatProvider)
}
}

public class SpecializedParserBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
public class SpecializedParserBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>, IBuilder<IDualRoute>
{
protected Type Type { get; }
protected IFormatProvider? FormatProvider { get; set; }
Expand All @@ -57,45 +61,38 @@ public SpecializedParserBuilder(IPipeBuilder<TInput> upstream, Type type, IForma
=> (Type, FormatProvider) = (type, formatProvider);

public override IChainablePort<TOutput> OnBuildPipeElement()
{
return (IChainablePort<TOutput>)Activator.CreateInstance(Type, Upstream.BuildPipeElement(), FormatProvider)!;
}
=> (IChainablePort<TOutput>)Activator.CreateInstance(
Type
, Upstream.BuildPipeElement()
, FormatProvider)!;

IDualRoute IBuilder<IDualRoute>.BuildPipeElement()
=> base.BuildPipeElement().Pipe is IDualRoute dual ? dual : throw new InvalidCastException();
IDualRoute IBuilder<IDualRoute>.OnBuildPipeElement()
=> throw new NotImplementedException();

public SpecializedParserBuilder<TInput, TOutput> WithFormatProvider(IFormatProvider formatProvider)
{
FormatProvider = formatProvider;
return this;
}

public RoutesBuilder Route<TPort, TNext>(Func<IDualRoute, IChainablePort> port, Segment<TPort, TNext> segment)
{
var routeBuilder = new RoutesBuilder(this);
routeBuilder.Add(port, segment);
return routeBuilder;
}

public RoutesBuilder Route<TPort, TNext>(Func<IDualRoute, IChainablePort> port, Func<BasePipeBuilder<TPort>, BasePipeBuilder<TNext>> path)
{
var routeBuilder = new RoutesBuilder(this);
routeBuilder.Add(port, new Segment<TPort, TNext>(path));
return routeBuilder;
}

public ConvergerBuilder<TNext> Converge<TNext>()
=> new(this);
}

//internal class UniversalParserBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
//{
// protected Func<TAccumulate?, TInput?, TAccumulate?>? Accumulator { get; }
// protected Func<TAccumulate?, TOutput?>? Selector { get; set; } = x => (TOutput?)Convert.ChangeType(x, typeof(TOutput));
// protected TAccumulate? Seed { get; set; } = default;

// public UniversalParserBuilder(IPipeBuilder<TInput> upstream, Func<TAccumulate?, TInput?, TAccumulate?> accumulator)
// : base(upstream)
// => (Accumulator) = (accumulator);

// public UniversalParserBuilder<TInput, TAccumulate, TOutput> WithSelector(Func<TAccumulate?, TOutput?>? selector)
// {
// Selector = selector;
// return this;
// }

// public UniversalParserBuilder<TInput, TAccumulate, TOutput> WithSeed(TAccumulate? seed)
// {
// Seed = seed;
// return this;
// }

// public override IChainablePort<TOutput> OnBuildPipeElement()
// => new Parser<TInput, TAccumulate, TOutput>(
// Upstream.BuildPipeElement()
// , Accumulator ?? throw new InvalidOperationException()
// , Selector ?? throw new InvalidOperationException()
// , Seed
// );

//}

83 changes: 83 additions & 0 deletions Streamistry.Core/Fluent/RoutesBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Fluent;
public abstract class BaseRoutesBuilder : IBuilder<IChainablePort[]>
{
protected interface IRouteBuilder
{
IChainablePort Build(IDualRoute dual);
}

protected class RouteBuilder : IRouteBuilder
{
protected Func<IDualRoute, IChainablePort> Port { get; }
protected ISegment Segment { get; }

public RouteBuilder(Func<IDualRoute, IChainablePort> port, ISegment segment)
=> (Port, Segment) = (port, segment);

public IChainablePort Build(IDualRoute dual)
{
var port = Port(dual);
var (input, output) = Segment.Craft(port.Pipe.Pipeline!);
input.Bind(port);
return output;
}
}

protected IBuilder<IDualRoute> Upstream { get; }

protected IChainablePort[]? Instances { get; set; }
protected List<IRouteBuilder> RouteBuilders { get; } = [];

public BaseRoutesBuilder(IBuilder<IDualRoute> upstream)
=> (Upstream) = (upstream);

internal void Add(Func<IDualRoute, IChainablePort> port, ISegment segment)
=> RouteBuilders.Add(new RouteBuilder(port, segment));

public IChainablePort[] BuildPipeElement()
=> Instances ??= OnBuildPipeElement();

public IChainablePort[] OnBuildPipeElement()
{
var routes = new List<IChainablePort>();
var upstream = Upstream.BuildPipeElement();
foreach (var routeBuilder in RouteBuilders)
routes.Add(routeBuilder.Build(upstream));
return [.. routes];
}

public Pipeline Build()
{
BuildPipeElement();
return Instances![0].Pipe.Pipeline!;
}
}
public class RoutesBuilder : BaseRoutesBuilder
{
public RoutesBuilder(IBuilder<IDualRoute> upstream)
: base(upstream)
{ }

public RoutesBuilder Route<TInput, TOutput>(Func<IDualRoute, IChainablePort> port, Segment<TInput, TOutput> segment)
{
Add(port, segment);
return this;
}

public RoutesBuilder Route<TInput, TOutput>(Func<IDualRoute, IChainablePort> port, Func<BasePipeBuilder<TInput>, BasePipeBuilder<TOutput>> path)
{
Add(port, new Segment<TInput, TOutput>(path));
return this;
}

public UnionBuilder<TNext> Union<TNext>()
=> new(this);
}

Loading

0 comments on commit 1225e4f

Please sign in to comment.