diff --git a/Streamistry.Core/Aggregator.cs b/Streamistry.Core/Aggregator.cs
index d04b17b..327a67b 100644
--- a/Streamistry.Core/Aggregator.cs
+++ b/Streamistry.Core/Aggregator.cs
@@ -18,12 +18,12 @@ namespace Streamistry;
/// The type of the elements in the output stream, determined by applying a selection function to the accumulated state.
public class Aggregator : SingleRouterPipe
{
- public Func Accumulator { get; }
- public Func Selector { get; }
+ public Func Accumulator { get; }
+ public Func Selector { get; }
public TAccumulate? State { get; set; }
private TAccumulate? Seed { get; }
- public Aggregator(IChainablePort? upstream, Func accumulator, Func selector, TAccumulate? seed = default, Expression>>? completion = null)
+ public Aggregator(IChainablePort? upstream, Func accumulator, Func selector, TAccumulate? seed = default, Expression>>? completion = null)
: base(upstream)
{
(Accumulator, Selector, State, Seed) = (accumulator, selector, seed, seed);
@@ -32,9 +32,9 @@ public Aggregator(IChainablePort? upstream, Func downstream, Action? completion)
+ public void RegisterDownstream(Action downstream, Action? completion)
{
RegisterDownstream(downstream);
RegisterOnCompleted(completion);
@@ -39,10 +39,10 @@ public void RegisterDownstream(Action downstream, Action? completion)
public void RegisterOnCompleted(Action? action)
=> Completion += action;
- public void RegisterDownstream(Action action)
+ public void RegisterDownstream(Action action)
=> Main.RegisterDownstream(action);
- public void UnregisterDownstream(Action downstream)
+ public void UnregisterDownstream(Action downstream)
=> Main.UnregisterDownstream(downstream);
protected void PushDownstream(T? obj)
diff --git a/Streamistry.Core/DualRouterPipe.cs b/Streamistry.Core/DualRouterPipe.cs
index e7f1103..28ea4fc 100644
--- a/Streamistry.Core/DualRouterPipe.cs
+++ b/Streamistry.Core/DualRouterPipe.cs
@@ -21,7 +21,7 @@ public DualRouterPipe(IChainablePort? upstream)
}
[Meter]
- public abstract void Emit(TInput? obj);
+ public abstract void Emit(TInput obj);
public void Bind(IChainablePort input)
{
diff --git a/Streamistry.Core/ExceptionMapper.cs b/Streamistry.Core/ExceptionMapper.cs
index b49e77f..f7ba853 100644
--- a/Streamistry.Core/ExceptionMapper.cs
+++ b/Streamistry.Core/ExceptionMapper.cs
@@ -9,22 +9,22 @@
namespace Streamistry;
public class ExceptionMapper : ExceptionRouterPipe
{
- public Func Function { get; init; }
+ public Func Function { get; init; }
- protected ExceptionMapper(Func function, IChainablePort ? upstream)
+ protected ExceptionMapper(Func function, IChainablePort ? upstream)
: base(upstream)
{
Function = function;
}
- public ExceptionMapper(IChainablePort upstream, Func function)
+ public ExceptionMapper(IChainablePort upstream, Func function)
: this(function, upstream)
{ }
- public ExceptionMapper(Func function)
+ public ExceptionMapper(Func function)
: this(function, null)
{ }
- protected override TOutput? Invoke(TInput? obj)
+ protected override TOutput Invoke(TInput obj)
=> Function.Invoke(obj);
}
diff --git a/Streamistry.Core/ExceptionRouterPipe.cs b/Streamistry.Core/ExceptionRouterPipe.cs
index fb35334..44c28ee 100644
--- a/Streamistry.Core/ExceptionRouterPipe.cs
+++ b/Streamistry.Core/ExceptionRouterPipe.cs
@@ -14,7 +14,7 @@ public ExceptionRouterPipe(IChainablePort? upstream)
{ }
[Trace]
- public override void Emit(TInput? obj)
+ public override void Emit(TInput obj)
{
if (TryCatchInvoke(obj, out var value, out var exception))
PushDownstream(value);
@@ -23,9 +23,9 @@ public override void Emit(TInput? obj)
}
[Trace]
- protected virtual bool TryCatchInvoke(TInput? obj, out TOutput? value, out Exception? ex)
+ protected virtual bool TryCatchInvoke(TInput obj, out TOutput value, out Exception? ex)
{
- value = default;
+ value = default!;
ex = null;
try
{
@@ -40,5 +40,5 @@ protected virtual bool TryCatchInvoke(TInput? obj, out TOutput? value, out Excep
}
[Trace]
- protected abstract TOutput? Invoke(TInput? obj);
+ protected abstract TOutput Invoke(TInput obj);
}
diff --git a/Streamistry.Core/Filter.cs b/Streamistry.Core/Filter.cs
index eebc8e7..34a3916 100644
--- a/Streamistry.Core/Filter.cs
+++ b/Streamistry.Core/Filter.cs
@@ -13,31 +13,78 @@ namespace Streamistry;
/// The output stream is composed of elements that satisfy the predicate; elements that do not satisfy the predicate are excluded from the downstream stream.
///
/// The type of the elements in both the input and output streams.
-public class Filter : BaseSingleRouterPipe
+public class Filter : BaseFilter
{
- public Func Predicate { get; init; }
-
- public Filter(Func predicate)
+ public Filter(Func predicate)
: this(predicate, null)
{ }
- public Filter(IChainablePort upstream, Func predicate)
+ public Filter(IChainablePort upstream, Func predicate)
: this(predicate, upstream)
{ }
- public Filter(Func predicate, IChainablePort? upstream = null)
- : base(upstream)
+ public Filter(Func predicate, IChainablePort? upstream = null)
+ : base(predicate, upstream)
{
Predicate = predicate;
}
- public override void Emit(TInput? obj)
+ [Meter]
+ public override void Emit(TInput obj)
{
if (Invoke(obj))
PushDownstream(obj);
}
+}
+
+public class FilterNull : BaseFilter
+{
+ public FilterNull(IChainablePort? upstream = null)
+ : base((x) => x is null, upstream)
+ { }
+
+ [Meter]
+ public override void Emit(TInput? obj)
+ {
+ if (Invoke(obj))
+ PushDownstream(default);
+ }
+}
+
+public class FilterNotNull : BaseFilter
+ where TOutput : notnull
+{
+ public FilterNotNull(IChainablePort? upstream = null)
+ : base((x) => x is not null, upstream)
+ { }
+
+ [Meter]
+ public override void Emit(TInput obj)
+ {
+ if (Invoke(obj))
+ PushDownstream(obj is TOutput output ? output : throw new InvalidCastException());
+ }
+}
+
+public abstract class BaseFilter : BaseSingleRouterPipe
+{
+ public Func Predicate { get; init; }
+
+ public BaseFilter(Func predicate)
+ : this(predicate, null)
+ { }
+
+ public BaseFilter(IChainablePort upstream, Func predicate)
+ : this(predicate, upstream)
+ { }
+
+ public BaseFilter(Func predicate, IChainablePort? upstream = null)
+ : base(upstream)
+ {
+ Predicate = predicate;
+ }
[Trace]
- protected bool Invoke(TInput? input)
+ protected bool Invoke(TInput input)
=> Predicate.Invoke(input);
}
diff --git a/Streamistry.Core/Fluent/AggregatorBuilder.cs b/Streamistry.Core/Fluent/AggregatorBuilder.cs
index 0865f7f..28dbe20 100644
--- a/Streamistry.Core/Fluent/AggregatorBuilder.cs
+++ b/Streamistry.Core/Fluent/AggregatorBuilder.cs
@@ -45,21 +45,21 @@ public override IChainablePort OnBuildPipeElement()
public class UniversalAggregatorBuilder : PipeElementBuilder
{
- protected Func? Accumulator { get; }
- protected Func? Selector { get; set; } = x => (TOutput?)Convert.ChangeType(x, typeof(TOutput));
- protected TAccumulate? Seed { get; set; } = default;
+ protected Func? Accumulator { get; }
+ protected Func? Selector { get; set; } = x => (TOutput)Convert.ChangeType(x, typeof(TOutput))!;
+ protected TAccumulate Seed { get; set; } = default!;
- public UniversalAggregatorBuilder(IPipeBuilder upstream, Func accumulator)
+ public UniversalAggregatorBuilder(IPipeBuilder upstream, Func accumulator)
: base(upstream)
=> (Accumulator) = (accumulator);
- public UniversalAggregatorBuilder WithSelector(Func? selector)
+ public UniversalAggregatorBuilder WithSelector(Func? selector)
{
Selector = selector;
return this;
}
- public UniversalAggregatorBuilder WithSeed(TAccumulate? seed)
+ public UniversalAggregatorBuilder WithSeed(TAccumulate seed)
{
Seed = seed;
return this;
diff --git a/Streamistry.Core/Fluent/BasePipeBuilder.cs b/Streamistry.Core/Fluent/BasePipeBuilder.cs
index 762cf71..35000e5 100644
--- a/Streamistry.Core/Fluent/BasePipeBuilder.cs
+++ b/Streamistry.Core/Fluent/BasePipeBuilder.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Data.SqlTypes;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
@@ -31,24 +32,29 @@ public BasePipeBuilder Checkpoint(out IChainablePort port)
public SinkBuilder Sink()
=> new(this);
- public MapperBuilder Map(Func? function)
+ public MapperBuilder Map(Func? function)
=> new(this, function);
- public FilterBuilder Filter(Func? function)
+ public FilterBuilder Filter(Func? function)
=> new(this, function);
- public PluckerBuilder Pluck(Expression> expr)
+ public FilterNullBuilder IsNull()
+ => new(this);
+ public FilterNotNullBuilder IsNotNull()
+ where TNext : notnull
+ => new(this);
+ public PluckerBuilder Pluck(Expression> expr)
=> new(this, expr);
public CasterBuilder Cast()
=> new(this);
public ConstantBuilder Constant(TNext value)
=> new(this, value);
- public SplitterBuilder Split(Func? function)
+ public SplitterBuilder Split(Func? function)
=> new(this, function);
- public UniversalAggregatorBuilder Aggregate(Func accumulator)
+ public UniversalAggregatorBuilder Aggregate(Func accumulator)
=> new(this, accumulator);
- public UniversalAggregatorBuilder Aggregate(Func accumulator)
+ public UniversalAggregatorBuilder Aggregate(Func accumulator)
=> new(this, accumulator);
- public UniversalAggregatorBuilder Aggregate(Func accumulator)
+ public UniversalAggregatorBuilder Aggregate(Func accumulator)
=> new(this, accumulator);
public AggregatorBuilder Aggregate()
=> new(this);
@@ -61,3 +67,4 @@ public ParserBuilder Parse()
public BinderBuilder Bind(Segment segment)
=> new(this, segment);
}
+
diff --git a/Streamistry.Core/Fluent/FilterBuilder.cs b/Streamistry.Core/Fluent/FilterBuilder.cs
index 90651fa..ca90c91 100644
--- a/Streamistry.Core/Fluent/FilterBuilder.cs
+++ b/Streamistry.Core/Fluent/FilterBuilder.cs
@@ -7,9 +7,9 @@
namespace Streamistry.Fluent;
public class FilterBuilder : PipeElementBuilder, IPipeBuilder
{
- protected Func? Function { get; }
+ protected Func? Function { get; }
- public FilterBuilder(IPipeBuilder upstream, Func? function)
+ public FilterBuilder(IPipeBuilder upstream, Func? function)
:base(upstream)
=> (Function) = (function);
@@ -19,3 +19,28 @@ public override IChainablePort OnBuildPipeElement()
, Function ?? throw new InvalidOperationException()
);
}
+
+public class FilterNullBuilder : PipeElementBuilder, IPipeBuilder