From e90fc696d7f11bea3d9daa6e74c67ccd2bce0bee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20L=2E=20Charlier?= Date: Sun, 1 Sep 2024 12:51:58 +0200 Subject: [PATCH] feat: add parsers supporting an exception route (#40) --- Streamistry.Core/Parser.cs | 59 ++++++++++++++ Streamistry.Core/Pipes/Parsers/DateParser.cs | 18 +++++ .../Pipes/Parsers/DateTimeParser.cs | 18 +++++ Streamistry.Testing/ParserTests.cs | 78 +++++++++++++++++++ 4 files changed, 173 insertions(+) create mode 100644 Streamistry.Core/Parser.cs create mode 100644 Streamistry.Core/Pipes/Parsers/DateParser.cs create mode 100644 Streamistry.Core/Pipes/Parsers/DateTimeParser.cs create mode 100644 Streamistry.Testing/ParserTests.cs diff --git a/Streamistry.Core/Parser.cs b/Streamistry.Core/Parser.cs new file mode 100644 index 0000000..415ed99 --- /dev/null +++ b/Streamistry.Core/Parser.cs @@ -0,0 +1,59 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Streamistry.Observability; + +namespace Streamistry; + +public interface IParser : IChainablePipe, IProcessablePipe +{ } + +public delegate bool ParserDelegate(TInput? input, out TOutput? value); + +/// +/// Represents a pipeline element that parses each element within a batch of this stream according to a specified set of rules or grammar. +/// The output stream is determined by the structured format resulting from the parsing process applied to the input elements. +/// +/// The type of the elements in the input stream before parsing. +/// The type of the elements in the output stream after parsing, typically a structured representation of the input. +public abstract class Parser : ChainablePipe, IParser, IDualRoute +{ + public ParserDelegate ParseFunction { get; init; } + public OutputPort Alternate { get; } + public new OutputPort Main { get => base.Main; } + + public Parser(IChainablePort upstream, ParserDelegate parseFunction) + : base(upstream.Pipe.GetObservabilityProvider()) + { + Alternate = new(this, "Alternate"); + upstream.RegisterDownstream(Emit); + upstream.Pipe.RegisterOnCompleted(Complete); + ParseFunction = parseFunction; + } + + [Meter] + public void Emit(TInput? obj) + { + if (TryInvoke(obj, out var value)) + PushDownstream(value); + else + Alternate.PushDownstream(obj); + } + + [Trace] + protected virtual bool TryInvoke(TInput? obj, out TOutput? value) + => ParseFunction.Invoke(obj, out value); +} + +public interface IStringParser : IParser +{ } + +public abstract class StringParser : Parser, IStringParser +{ + public StringParser(IChainablePipe upstream, ParserDelegate parseFunction) + : base(upstream, parseFunction) + { } +} diff --git a/Streamistry.Core/Pipes/Parsers/DateParser.cs b/Streamistry.Core/Pipes/Parsers/DateParser.cs new file mode 100644 index 0000000..f1de3ba --- /dev/null +++ b/Streamistry.Core/Pipes/Parsers/DateParser.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Text; +using System.Text.Json.Nodes; +using System.Threading.Tasks; + +namespace Streamistry.Pipes.Parsers; +internal class DateParser : StringParser +{ + public DateParser(IChainablePipe upstream, IFormatProvider? formatProvider = null) + : base(upstream, (string? x, out DateOnly y) => TryParse(x, formatProvider, out y)) + { } + + private static bool TryParse(string? text, IFormatProvider? formatProvider, out DateOnly value) + => DateOnly.TryParse(text, formatProvider ?? CultureInfo.InvariantCulture, out value); +} diff --git a/Streamistry.Core/Pipes/Parsers/DateTimeParser.cs b/Streamistry.Core/Pipes/Parsers/DateTimeParser.cs new file mode 100644 index 0000000..48ec701 --- /dev/null +++ b/Streamistry.Core/Pipes/Parsers/DateTimeParser.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Text; +using System.Text.Json.Nodes; +using System.Threading.Tasks; + +namespace Streamistry.Pipes.Parsers; +internal class DateTimeParser : StringParser +{ + public DateTimeParser(IChainablePipe upstream, IFormatProvider? formatProvider = null) + : base(upstream, (string? x, out DateTime y) => TryParse(x, formatProvider, out y)) + { } + + private static bool TryParse(string? text, IFormatProvider? formatProvider, out DateTime value) + => DateTime.TryParse(text, formatProvider ?? CultureInfo.InvariantCulture, out value); +} diff --git a/Streamistry.Testing/ParserTests.cs b/Streamistry.Testing/ParserTests.cs new file mode 100644 index 0000000..4ba4dc9 --- /dev/null +++ b/Streamistry.Testing/ParserTests.cs @@ -0,0 +1,78 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using NUnit.Framework; +using Streamistry.Pipes.Parsers; +using Streamistry.Pipes.Sinks; + +namespace Streamistry.Testing; +public class ParserTests +{ + [Test] + public void DateParserEmit_ValidData_MainRoute() + { + var pipeline = new Pipeline(); + var parser = new DateParser(pipeline); + var mainSink = new MemorySink(parser); + pipeline.Emit("2024-08-30"); + pipeline.Emit("2024-08-31"); + pipeline.Emit("2024-09-01"); + + Assert.That(mainSink.State, Has.Count.EqualTo(3)); + Assert.That(mainSink.State.First(), Is.EqualTo(new DateOnly(2024, 8, 30))); + Assert.That(mainSink.State.Last(), Is.EqualTo(new DateOnly(2024, 9, 1))); + } + + [Test] + public void DateParserEmit_MixedData_MainRouteAndExceptionRoute() + { + var pipeline = new Pipeline(); + var parser = new DateParser(pipeline); + var mainSink = new MemorySink(parser); + var exceptionSink = new MemorySink(parser.Alternate); + pipeline.Emit("2024-08-30"); + pipeline.Emit("2024-08-31"); + pipeline.Emit("2024-08-32"); + + Assert.That(mainSink.State, Has.Count.EqualTo(2)); + Assert.That(mainSink.State.First(), Is.EqualTo(new DateOnly(2024, 8, 30))); + Assert.That(mainSink.State.Last(), Is.EqualTo(new DateOnly(2024, 8, 31))); + Assert.That(exceptionSink.State, Has.Count.EqualTo(1)); + Assert.That(exceptionSink.State.First(), Is.EqualTo("2024-08-32")); + } + + [Test] + public void DateTimeParserEmit_ValidData_MainRoute() + { + var pipeline = new Pipeline(); + var parser = new DateTimeParser(pipeline); + var mainSink = new MemorySink(parser); + pipeline.Emit("2024-08-30 17:12:16"); + pipeline.Emit("2024-08-31T17:12:16"); + pipeline.Emit("2024-09-01 05:00AM"); + + Assert.That(mainSink.State, Has.Count.EqualTo(3)); + Assert.That(mainSink.State.First(), Is.EqualTo(new DateTime(2024, 8, 30, 17, 12, 16))); + Assert.That(mainSink.State.Last(), Is.EqualTo(new DateTime(2024, 9, 1, 5, 0, 0))); + } + + [Test] + public void DateTimeParserEmit_MixedData_MainRouteAndExceptionRoute() + { + var pipeline = new Pipeline(); + var parser = new DateTimeParser(pipeline); + var mainSink = new MemorySink(parser); + var exceptionSink = new MemorySink(parser.Alternate); + pipeline.Emit("2024-08-30 17:12:16"); + pipeline.Emit("2024-08-31 25:62:41"); + pipeline.Emit("2024-09-01 05:00AM"); + + Assert.That(mainSink.State, Has.Count.EqualTo(2)); + Assert.That(mainSink.State.First(), Is.EqualTo(new DateTime(2024, 8, 30, 17, 12, 16))); + Assert.That(mainSink.State.Last(), Is.EqualTo(new DateTime(2024, 9, 1, 5, 0, 0))); + Assert.That(exceptionSink.State, Has.Count.EqualTo(1)); + Assert.That(exceptionSink.State.First(), Is.EqualTo("2024-08-31 25:62:41")); + } +}