Skip to content

Commit

Permalink
feat: filters for null and non-null values (#106)
Browse files Browse the repository at this point in the history
* feat: filters for null and non-null values
* refactor: review nullability of many methods
* ci: calculate coverage on the tested library
  • Loading branch information
Seddryck authored Sep 26, 2024
1 parent 5257797 commit c848cac
Show file tree
Hide file tree
Showing 34 changed files with 236 additions and 121 deletions.
10 changes: 5 additions & 5 deletions Streamistry.Core/Aggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ namespace Streamistry;
/// <typeparam name="TResult">The type of the elements in the output stream, determined by applying a selection function to the accumulated state.</typeparam>
public class Aggregator<TSource, TAccumulate, TResult> : SingleRouterPipe<TSource, TResult>
{
public Func<TAccumulate?, TSource?, TAccumulate?> Accumulator { get; }
public Func<TAccumulate?, TResult?> Selector { get; }
public Func<TAccumulate, TSource, TAccumulate> Accumulator { get; }
public Func<TAccumulate, TResult> Selector { get; }
public TAccumulate? State { get; set; }
private TAccumulate? Seed { get; }

public Aggregator(IChainablePort<TSource>? upstream, Func<TAccumulate?, TSource?, TAccumulate?> accumulator, Func<TAccumulate?, TResult?> selector, TAccumulate? seed = default, Expression<Action<Aggregator<TSource, TAccumulate, TResult>>>? completion = null)
public Aggregator(IChainablePort<TSource>? upstream, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> selector, TAccumulate? seed = default, Expression<Action<Aggregator<TSource, TAccumulate, TResult>>>? completion = null)
: base(upstream)
{
(Accumulator, Selector, State, Seed) = (accumulator, selector, seed, seed);
Expand All @@ -32,9 +32,9 @@ public Aggregator(IChainablePort<TSource>? upstream, Func<TAccumulate?, TSource?
}

[Trace]
protected override TResult? Invoke(TSource? obj)
protected override TResult Invoke(TSource obj)
{
State = Accumulator.Invoke(State, obj);
State = Accumulator.Invoke(State!, obj);
return Selector.Invoke(State);
}

Expand Down
6 changes: 3 additions & 3 deletions Streamistry.Core/ChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ protected ChainablePipe(IChainablePipe? upstream)
Pipeline = upstream.Pipeline;
}

public void RegisterDownstream(Action<T?> downstream, Action? completion)
public void RegisterDownstream(Action<T> downstream, Action? completion)
{
RegisterDownstream(downstream);
RegisterOnCompleted(completion);
Expand All @@ -39,10 +39,10 @@ public void RegisterDownstream(Action<T?> downstream, Action? completion)
public void RegisterOnCompleted(Action? action)
=> Completion += action;

public void RegisterDownstream(Action<T?> action)
public void RegisterDownstream(Action<T> action)
=> Main.RegisterDownstream(action);

public void UnregisterDownstream(Action<T?> downstream)
public void UnregisterDownstream(Action<T> downstream)
=> Main.UnregisterDownstream(downstream);

protected void PushDownstream(T? obj)
Expand Down
2 changes: 1 addition & 1 deletion Streamistry.Core/DualRouterPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public DualRouterPipe(IChainablePort<TInput>? upstream)
}

[Meter]
public abstract void Emit(TInput? obj);
public abstract void Emit(TInput obj);

public void Bind(IChainablePort<TInput> input)
{
Expand Down
10 changes: 5 additions & 5 deletions Streamistry.Core/ExceptionMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@
namespace Streamistry;
public class ExceptionMapper<TInput, TOutput> : ExceptionRouterPipe<TInput, TOutput>
{
public Func<TInput?, TOutput?> Function { get; init; }
public Func<TInput, TOutput> Function { get; init; }

protected ExceptionMapper(Func<TInput?, TOutput?> function, IChainablePort<TInput> ? upstream)
protected ExceptionMapper(Func<TInput, TOutput> function, IChainablePort<TInput> ? upstream)
: base(upstream)
{
Function = function;
}

public ExceptionMapper(IChainablePort<TInput> upstream, Func<TInput?, TOutput?> function)
public ExceptionMapper(IChainablePort<TInput> upstream, Func<TInput, TOutput> function)
: this(function, upstream)
{ }

public ExceptionMapper(Func<TInput?, TOutput?> function)
public ExceptionMapper(Func<TInput, TOutput> function)
: this(function, null)
{ }

protected override TOutput? Invoke(TInput? obj)
protected override TOutput Invoke(TInput obj)
=> Function.Invoke(obj);
}
8 changes: 4 additions & 4 deletions Streamistry.Core/ExceptionRouterPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public ExceptionRouterPipe(IChainablePort<TInput>? upstream)
{ }

[Trace]
public override void Emit(TInput? obj)
public override void Emit(TInput obj)
{
if (TryCatchInvoke(obj, out var value, out var exception))
PushDownstream(value);
Expand All @@ -23,9 +23,9 @@ public override void Emit(TInput? obj)
}

[Trace]
protected virtual bool TryCatchInvoke(TInput? obj, out TOutput? value, out Exception? ex)
protected virtual bool TryCatchInvoke(TInput obj, out TOutput value, out Exception? ex)
{
value = default;
value = default!;
ex = null;
try
{
Expand All @@ -40,5 +40,5 @@ protected virtual bool TryCatchInvoke(TInput? obj, out TOutput? value, out Excep
}

[Trace]
protected abstract TOutput? Invoke(TInput? obj);
protected abstract TOutput Invoke(TInput obj);
}
65 changes: 56 additions & 9 deletions Streamistry.Core/Filter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,78 @@ namespace Streamistry;
/// The output stream is composed of elements that satisfy the predicate; elements that do not satisfy the predicate are excluded from the downstream stream.
/// </summary>
/// <typeparam name="TInput">The type of the elements in both the input and output streams.</typeparam>
public class Filter<TInput> : BaseSingleRouterPipe<TInput, TInput>
public class Filter<TInput> : BaseFilter<TInput, TInput>
{
public Func<TInput?, bool> Predicate { get; init; }

public Filter(Func<TInput?, bool> predicate)
public Filter(Func<TInput, bool> predicate)
: this(predicate, null)
{ }

public Filter(IChainablePort<TInput> upstream, Func<TInput?, bool> predicate)
public Filter(IChainablePort<TInput> upstream, Func<TInput, bool> predicate)
: this(predicate, upstream)
{ }

public Filter(Func<TInput?, bool> predicate, IChainablePort<TInput>? upstream = null)
: base(upstream)
public Filter(Func<TInput, bool> predicate, IChainablePort<TInput>? upstream = null)
: base(predicate, upstream)
{
Predicate = predicate;
}

public override void Emit(TInput? obj)
[Meter]
public override void Emit(TInput obj)
{
if (Invoke(obj))
PushDownstream(obj);
}
}

public class FilterNull<TInput> : BaseFilter<TInput?, object?>
{
public FilterNull(IChainablePort<TInput?>? upstream = null)
: base((x) => x is null, upstream)
{ }

[Meter]
public override void Emit(TInput? obj)
{
if (Invoke(obj))
PushDownstream(default);
}
}

public class FilterNotNull<TInput, TOutput> : BaseFilter<TInput, TOutput>
where TOutput : notnull
{
public FilterNotNull(IChainablePort<TInput>? upstream = null)
: base((x) => x is not null, upstream)
{ }

[Meter]
public override void Emit(TInput obj)
{
if (Invoke(obj))
PushDownstream(obj is TOutput output ? output : throw new InvalidCastException());
}
}

public abstract class BaseFilter<TInput, TOutput> : BaseSingleRouterPipe<TInput, TOutput>
{
public Func<TInput, bool> Predicate { get; init; }

public BaseFilter(Func<TInput, bool> predicate)
: this(predicate, null)
{ }

public BaseFilter(IChainablePort<TInput> upstream, Func<TInput, bool> predicate)
: this(predicate, upstream)
{ }

public BaseFilter(Func<TInput, bool> predicate, IChainablePort<TInput>? upstream = null)
: base(upstream)
{
Predicate = predicate;
}

[Trace]
protected bool Invoke(TInput? input)
protected bool Invoke(TInput input)
=> Predicate.Invoke(input);
}
12 changes: 6 additions & 6 deletions Streamistry.Core/Fluent/AggregatorBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,21 @@ public override IChainablePort<TOutput> OnBuildPipeElement()

public class UniversalAggregatorBuilder<TInput, TAccumulate, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Func<TAccumulate?, TInput?, TAccumulate?>? Accumulator { get; }
protected Func<TAccumulate?, TOutput?>? Selector { get; set; } = x => (TOutput?)Convert.ChangeType(x, typeof(TOutput));
protected TAccumulate? Seed { get; set; } = default;
protected Func<TAccumulate, TInput, TAccumulate>? Accumulator { get; }
protected Func<TAccumulate, TOutput>? Selector { get; set; } = x => (TOutput)Convert.ChangeType(x, typeof(TOutput))!;
protected TAccumulate Seed { get; set; } = default!;

public UniversalAggregatorBuilder(IPipeBuilder<TInput> upstream, Func<TAccumulate?, TInput?, TAccumulate?> accumulator)
public UniversalAggregatorBuilder(IPipeBuilder<TInput> upstream, Func<TAccumulate, TInput, TAccumulate> accumulator)
: base(upstream)
=> (Accumulator) = (accumulator);

public UniversalAggregatorBuilder<TInput, TAccumulate, TOutput> WithSelector(Func<TAccumulate?, TOutput?>? selector)
public UniversalAggregatorBuilder<TInput, TAccumulate, TOutput> WithSelector(Func<TAccumulate, TOutput>? selector)
{
Selector = selector;
return this;
}

public UniversalAggregatorBuilder<TInput, TAccumulate, TOutput> WithSeed(TAccumulate? seed)
public UniversalAggregatorBuilder<TInput, TAccumulate, TOutput> WithSeed(TAccumulate seed)
{
Seed = seed;
return this;
Expand Down
21 changes: 14 additions & 7 deletions Streamistry.Core/Fluent/BasePipeBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Data.SqlTypes;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
Expand Down Expand Up @@ -31,24 +32,29 @@ public BasePipeBuilder<TOutput> Checkpoint(out IChainablePort<TOutput> port)
public SinkBuilder<TOutput> Sink()
=> new(this);

public MapperBuilder<TOutput, TNext> Map<TNext>(Func<TOutput?, TNext?>? function)
public MapperBuilder<TOutput, TNext> Map<TNext>(Func<TOutput, TNext>? function)
=> new(this, function);
public FilterBuilder<TOutput> Filter(Func<TOutput?, bool>? function)
public FilterBuilder<TOutput> Filter(Func<TOutput, bool>? function)
=> new(this, function);
public PluckerBuilder<TOutput, TNext> Pluck<TNext>(Expression<Func<TOutput, TNext?>> expr)
public FilterNullBuilder<TOutput> IsNull()
=> new(this);
public FilterNotNullBuilder<TOutput, TNext> IsNotNull<TNext>()
where TNext : notnull
=> new(this);
public PluckerBuilder<TOutput, TNext> Pluck<TNext>(Expression<Func<TOutput, TNext>> expr)
=> new(this, expr);
public CasterBuilder<TOutput, TNext> Cast<TNext>()
=> new(this);
public ConstantBuilder<TOutput, TNext> Constant<TNext>(TNext value)
=> new(this, value);
public SplitterBuilder<TOutput, TNext> Split<TNext>(Func<TOutput?, TNext[]?>? function)
public SplitterBuilder<TOutput, TNext> Split<TNext>(Func<TOutput, TNext[]>? function)
=> new(this, function);

public UniversalAggregatorBuilder<TOutput, TAccumulate, TNext> Aggregate<TAccumulate, TNext>(Func<TAccumulate?, TOutput?, TAccumulate?> accumulator)
public UniversalAggregatorBuilder<TOutput, TAccumulate, TNext> Aggregate<TAccumulate, TNext>(Func<TAccumulate, TOutput, TAccumulate> accumulator)
=> new(this, accumulator);
public UniversalAggregatorBuilder<TOutput, TNext, TNext> Aggregate<TNext>(Func<TNext?, TOutput?, TNext?> accumulator)
public UniversalAggregatorBuilder<TOutput, TNext, TNext> Aggregate<TNext>(Func<TNext, TOutput, TNext> accumulator)
=> new(this, accumulator);
public UniversalAggregatorBuilder<TOutput, TOutput, TOutput> Aggregate(Func<TOutput?, TOutput?, TOutput?> accumulator)
public UniversalAggregatorBuilder<TOutput, TOutput, TOutput> Aggregate(Func<TOutput, TOutput, TOutput> accumulator)
=> new(this, accumulator);
public AggregatorBuilder<TOutput, TOutput, TOutput> Aggregate()
=> new(this);
Expand All @@ -61,3 +67,4 @@ public ParserBuilder<TOutput> Parse()
public BinderBuilder<TOutput, TNext> Bind<TNext>(Segment<TOutput, TNext> segment)
=> new(this, segment);
}

29 changes: 27 additions & 2 deletions Streamistry.Core/Fluent/FilterBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
namespace Streamistry.Fluent;
public class FilterBuilder<TInput> : PipeElementBuilder<TInput, TInput>, IPipeBuilder<TInput>
{
protected Func<TInput?, bool>? Function { get; }
protected Func<TInput, bool>? Function { get; }

public FilterBuilder(IPipeBuilder<TInput> upstream, Func<TInput?, bool>? function)
public FilterBuilder(IPipeBuilder<TInput> upstream, Func<TInput, bool>? function)
:base(upstream)
=> (Function) = (function);

Expand All @@ -19,3 +19,28 @@ public override IChainablePort<TInput> OnBuildPipeElement()
, Function ?? throw new InvalidOperationException()
);
}

public class FilterNullBuilder<TInput> : PipeElementBuilder<TInput, object?>, IPipeBuilder<object?>
{
public FilterNullBuilder(IPipeBuilder<TInput> upstream)
: base(upstream)
{ }

public override IChainablePort<object?> OnBuildPipeElement()
=> new FilterNull<TInput>(
Upstream.BuildPipeElement()!
);
}

public class FilterNotNullBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>, IPipeBuilder<TOutput>
where TOutput: notnull
{
public FilterNotNullBuilder(IPipeBuilder<TInput> upstream)
: base(upstream)
{ }

public override IChainablePort<TOutput> OnBuildPipeElement()
=> new FilterNotNull<TInput, TOutput>(
Upstream.BuildPipeElement()
);
}
4 changes: 2 additions & 2 deletions Streamistry.Core/Fluent/MapperBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
namespace Streamistry.Fluent;
public class MapperBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>, ISafeBuilder<MapperBuilder<TInput, TOutput>>
{
protected Func<TInput?, TOutput?>? Function { get; set; }
protected Func<TInput, TOutput>? Function { get; set; }
private bool IsSafe { get; set; } = false;

public MapperBuilder(IPipeBuilder<TInput> upstream, Func<TInput?, TOutput?>? function)
public MapperBuilder(IPipeBuilder<TInput> upstream, Func<TInput, TOutput>? function)
: base(upstream)
=> (Function) = (function);

Expand Down
4 changes: 2 additions & 2 deletions Streamistry.Core/Fluent/PluckerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
namespace Streamistry.Fluent;
public class PluckerBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Expression<Func<TInput, TOutput?>> Expr { get; set; }
protected Expression<Func<TInput, TOutput>> Expr { get; set; }

public PluckerBuilder(IPipeBuilder<TInput> upstream, Expression<Func<TInput, TOutput?>> expr)
public PluckerBuilder(IPipeBuilder<TInput> upstream, Expression<Func<TInput, TOutput>> expr)
: base(upstream)
=> (Expr) = (expr);

Expand Down
4 changes: 2 additions & 2 deletions Streamistry.Core/Fluent/Segment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ public IChainablePipe Pipe
public IBindablePipe<T> GetTarget()
=> Target ?? throw new InvalidOperationException();

public void RegisterDownstream(Action<T?> downstream)
public void RegisterDownstream(Action<T> downstream)
=> Target ??= (IBindablePipe<T>)downstream.Target!;

public void UnregisterDownstream(Action<T?> downstream)
public void UnregisterDownstream(Action<T> downstream)
=> Target ??= (IBindablePipe<T>)downstream.Target!;


Expand Down
4 changes: 2 additions & 2 deletions Streamistry.Core/Fluent/SplitterBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
namespace Streamistry.Fluent;
public class SplitterBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Func<TInput?, TOutput[]?>? Function { get; set; }
protected Func<TInput, TOutput[]>? Function { get; set; }

public SplitterBuilder(IPipeBuilder<TInput> upstream, Func<TInput?, TOutput[]?>? function)
public SplitterBuilder(IPipeBuilder<TInput> upstream, Func<TInput, TOutput[]>? function)
: base(upstream)
=> (Function) = (function);

Expand Down
6 changes: 3 additions & 3 deletions Streamistry.Core/IChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace Streamistry;
public interface IChainablePipe<T> : IChainablePort<T>, IChainablePipe
{
void RegisterDownstream(Action<T?> downstream, Action? complete);
void RegisterDownstream(Action<T> downstream, Action? complete);
}

public interface IChainablePipe : IObservablePipe
Expand All @@ -18,8 +18,8 @@ public interface IChainablePipe : IObservablePipe

public interface IChainablePort<T> : IChainablePort
{
void RegisterDownstream(Action<T?> action);
void UnregisterDownstream(Action<T?> downstream);
void RegisterDownstream(Action<T> action);
void UnregisterDownstream(Action<T> downstream);
}

public interface IChainablePort
Expand Down
2 changes: 1 addition & 1 deletion Streamistry.Core/IProcessablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ public interface IBindablePipe<T>

public interface IProcessablePipe<T>
{
void Emit(T? obj);
void Emit(T obj);
}
Loading

0 comments on commit c848cac

Please sign in to comment.