Skip to content

Commit

Permalink
feat: add parsers supporting an exception route (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Sep 1, 2024
1 parent f18d9ff commit e90fc69
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 0 deletions.
59 changes: 59 additions & 0 deletions Streamistry.Core/Parser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability;

namespace Streamistry;

public interface IParser<TInput, TOutput> : IChainablePipe<TOutput>, IProcessablePipe<TInput>
{ }

public delegate bool ParserDelegate<TInput, TOutput>(TInput? input, out TOutput? value);

/// <summary>
/// Represents a pipeline element that parses each element within a batch of this stream according to a specified set of rules or grammar.
/// The output stream is determined by the structured format resulting from the parsing process applied to the input elements.
/// </summary>
/// <typeparam name="TInput">The type of the elements in the input stream before parsing.</typeparam>
/// <typeparam name="TOutput">The type of the elements in the output stream after parsing, typically a structured representation of the input.</typeparam>
public abstract class Parser<TInput, TOutput> : ChainablePipe<TOutput>, IParser<TInput, TOutput>, IDualRoute<TOutput, TInput>
{
public ParserDelegate<TInput, TOutput> ParseFunction { get; init; }
public OutputPort<TInput> Alternate { get; }
public new OutputPort<TOutput> Main { get => base.Main; }

public Parser(IChainablePort<TInput> upstream, ParserDelegate<TInput, TOutput> parseFunction)
: base(upstream.Pipe.GetObservabilityProvider())
{
Alternate = new(this, "Alternate");
upstream.RegisterDownstream(Emit);
upstream.Pipe.RegisterOnCompleted(Complete);
ParseFunction = parseFunction;
}

[Meter]
public void Emit(TInput? obj)
{
if (TryInvoke(obj, out var value))
PushDownstream(value);
else
Alternate.PushDownstream(obj);
}

[Trace]
protected virtual bool TryInvoke(TInput? obj, out TOutput? value)
=> ParseFunction.Invoke(obj, out value);
}

public interface IStringParser<TOutput> : IParser<string, TOutput>
{ }

public abstract class StringParser<TOutput> : Parser<string, TOutput>, IStringParser<TOutput>
{
public StringParser(IChainablePipe<string> upstream, ParserDelegate<string, TOutput> parseFunction)
: base(upstream, parseFunction)
{ }
}
18 changes: 18 additions & 0 deletions Streamistry.Core/Pipes/Parsers/DateParser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Text.Json.Nodes;
using System.Threading.Tasks;

namespace Streamistry.Pipes.Parsers;
internal class DateParser : StringParser<DateOnly>
{
public DateParser(IChainablePipe<string> upstream, IFormatProvider? formatProvider = null)
: base(upstream, (string? x, out DateOnly y) => TryParse(x, formatProvider, out y))
{ }

private static bool TryParse(string? text, IFormatProvider? formatProvider, out DateOnly value)
=> DateOnly.TryParse(text, formatProvider ?? CultureInfo.InvariantCulture, out value);
}
18 changes: 18 additions & 0 deletions Streamistry.Core/Pipes/Parsers/DateTimeParser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Text.Json.Nodes;
using System.Threading.Tasks;

namespace Streamistry.Pipes.Parsers;
internal class DateTimeParser : StringParser<DateTime>
{
public DateTimeParser(IChainablePipe<string> upstream, IFormatProvider? formatProvider = null)
: base(upstream, (string? x, out DateTime y) => TryParse(x, formatProvider, out y))
{ }

private static bool TryParse(string? text, IFormatProvider? formatProvider, out DateTime value)
=> DateTime.TryParse(text, formatProvider ?? CultureInfo.InvariantCulture, out value);
}
78 changes: 78 additions & 0 deletions Streamistry.Testing/ParserTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework;
using Streamistry.Pipes.Parsers;
using Streamistry.Pipes.Sinks;

namespace Streamistry.Testing;
public class ParserTests
{
[Test]
public void DateParserEmit_ValidData_MainRoute()
{
var pipeline = new Pipeline<string>();
var parser = new DateParser(pipeline);
var mainSink = new MemorySink<DateOnly>(parser);
pipeline.Emit("2024-08-30");
pipeline.Emit("2024-08-31");
pipeline.Emit("2024-09-01");

Assert.That(mainSink.State, Has.Count.EqualTo(3));
Assert.That(mainSink.State.First(), Is.EqualTo(new DateOnly(2024, 8, 30)));
Assert.That(mainSink.State.Last(), Is.EqualTo(new DateOnly(2024, 9, 1)));
}

[Test]
public void DateParserEmit_MixedData_MainRouteAndExceptionRoute()
{
var pipeline = new Pipeline<string>();
var parser = new DateParser(pipeline);
var mainSink = new MemorySink<DateOnly>(parser);
var exceptionSink = new MemorySink<string>(parser.Alternate);
pipeline.Emit("2024-08-30");
pipeline.Emit("2024-08-31");
pipeline.Emit("2024-08-32");

Assert.That(mainSink.State, Has.Count.EqualTo(2));
Assert.That(mainSink.State.First(), Is.EqualTo(new DateOnly(2024, 8, 30)));
Assert.That(mainSink.State.Last(), Is.EqualTo(new DateOnly(2024, 8, 31)));
Assert.That(exceptionSink.State, Has.Count.EqualTo(1));
Assert.That(exceptionSink.State.First(), Is.EqualTo("2024-08-32"));
}

[Test]
public void DateTimeParserEmit_ValidData_MainRoute()
{
var pipeline = new Pipeline<string>();
var parser = new DateTimeParser(pipeline);
var mainSink = new MemorySink<DateTime>(parser);
pipeline.Emit("2024-08-30 17:12:16");
pipeline.Emit("2024-08-31T17:12:16");
pipeline.Emit("2024-09-01 05:00AM");

Assert.That(mainSink.State, Has.Count.EqualTo(3));
Assert.That(mainSink.State.First(), Is.EqualTo(new DateTime(2024, 8, 30, 17, 12, 16)));
Assert.That(mainSink.State.Last(), Is.EqualTo(new DateTime(2024, 9, 1, 5, 0, 0)));
}

[Test]
public void DateTimeParserEmit_MixedData_MainRouteAndExceptionRoute()
{
var pipeline = new Pipeline<string>();
var parser = new DateTimeParser(pipeline);
var mainSink = new MemorySink<DateTime>(parser);
var exceptionSink = new MemorySink<string>(parser.Alternate);
pipeline.Emit("2024-08-30 17:12:16");
pipeline.Emit("2024-08-31 25:62:41");
pipeline.Emit("2024-09-01 05:00AM");

Assert.That(mainSink.State, Has.Count.EqualTo(2));
Assert.That(mainSink.State.First(), Is.EqualTo(new DateTime(2024, 8, 30, 17, 12, 16)));
Assert.That(mainSink.State.Last(), Is.EqualTo(new DateTime(2024, 9, 1, 5, 0, 0)));
Assert.That(exceptionSink.State, Has.Count.EqualTo(1));
Assert.That(exceptionSink.State.First(), Is.EqualTo("2024-08-31 25:62:41"));
}
}

0 comments on commit e90fc69

Please sign in to comment.