diff --git a/Streamistry.Core/Pipes/Sources/DataReaderSource.cs b/Streamistry.Core/Pipes/Sources/DataReaderSource.cs new file mode 100644 index 0000000..b410d28 --- /dev/null +++ b/Streamistry.Core/Pipes/Sources/DataReaderSource.cs @@ -0,0 +1,124 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Collections.Specialized; +using System.Data; +using System.Data.Common; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading.Tasks; +using System.Xml; +using Streamistry.Observability; + +namespace Streamistry.Pipes.Sources; + +public abstract class DataReaderSource : Source +{ + protected IDataReader DataReader { get; set; } + + public DataReaderSource(IDataReader dataReader, ObservabilityProvider? provider = null) + : base(provider) + => DataReader = dataReader; + + public DataReaderSource(Pipeline upstream, IDataReader dataReader) + : base(upstream) + => DataReader = dataReader; + + protected override bool TryReadNext(out TOutput? item) + { + if (DataReader.Read()) + { + item = OnRead(); + return true; + } + item = default; + return false; + } + + protected abstract TOutput? OnRead(); +} + +public class DataReaderAsDictionarySource : DataReaderSource> +{ + public DataReaderAsDictionarySource(IDataReader dataReader, ObservabilityProvider? provider = null) + : base(dataReader, provider) + { } + + public DataReaderAsDictionarySource(Pipeline upstream, IDataReader dataReader) + : base(upstream, dataReader) + { } + + protected override IReadOnlyDictionary? OnRead() + { + var dictionary = new Dictionary(); + foreach (var i in Enumerable.Range(0, DataReader.FieldCount)) + dictionary.Add(string.Intern(DataReader.GetName(i)), DataReader.GetValue(i)); + return dictionary.AsReadOnly(); + } +} + +public class DataReaderAsArraySource : DataReaderSource> +{ + public DataReaderAsArraySource(IDataReader dataReader, ObservabilityProvider? provider = null) + : base(dataReader, provider) + { } + + public DataReaderAsArraySource(Pipeline upstream, IDataReader dataReader) + : base(upstream, dataReader) + { } + + protected override ImmutableArray OnRead() + { + var list = new List(); + foreach (var i in Enumerable.Range(0, DataReader.FieldCount)) + list.Add(DataReader.GetValue(i)); + return list.ToImmutableArray(); + } +} + +public class DataReaderAsValueSource : DataReaderSource +{ + public DataReaderAsValueSource(IDataReader dataReader, ObservabilityProvider? provider = null) + : base(dataReader, provider) + { } + + public DataReaderAsValueSource(Pipeline upstream, IDataReader dataReader) + : base(upstream, dataReader) + { } + + protected override TOutput? OnRead() + { + if (DataReader.IsDBNull(0)) + return default; + + if (typeof(TOutput) == typeof(object)) + return (TOutput)DataReader.GetValue(0); + else if (typeof(TOutput) == typeof(bool)) + return (TOutput)(object)DataReader.GetBoolean(0); + else if (typeof(TOutput) == typeof(byte)) + return (TOutput)(object)DataReader.GetByte(0); + else if (typeof(TOutput) == typeof(short)) + return (TOutput)(object)DataReader.GetInt16(0); + else if (typeof(TOutput) == typeof(int)) + return (TOutput)(object)DataReader.GetInt32(0); + else if (typeof(TOutput) == typeof(long)) + return (TOutput)(object)DataReader.GetInt64(0); + else if (typeof(TOutput) == typeof(float)) + return (TOutput)(object)DataReader.GetFloat(0); + else if (typeof(TOutput) == typeof(double)) + return (TOutput)(object)DataReader.GetDouble(0); + else if (typeof(TOutput) == typeof(decimal)) + return (TOutput)(object)DataReader.GetDecimal(0); + else if (typeof(TOutput) == typeof(Guid)) + return (TOutput)(object)DataReader.GetGuid(0); + else if (typeof(TOutput) == typeof(char)) + return (TOutput)(object)DataReader.GetChar(0); + else if (typeof(TOutput) == typeof(string)) + return (TOutput)(object)DataReader.GetString(0); + else if (typeof(TOutput) == typeof(DateTime)) + return (TOutput)(object)DataReader.GetDateTime(0); + else + throw new NotSupportedException($"Type {typeof(TOutput)} is not supported."); + } +} diff --git a/Streamistry.Testing/Pipes/Sources/DataReaderSourceTests.cs b/Streamistry.Testing/Pipes/Sources/DataReaderSourceTests.cs new file mode 100644 index 0000000..2cdaa7e --- /dev/null +++ b/Streamistry.Testing/Pipes/Sources/DataReaderSourceTests.cs @@ -0,0 +1,78 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Moq; +using NUnit.Framework; +using Streamistry.Pipes.Sources; +using Streamistry.Testability; +using static System.Runtime.InteropServices.JavaScript.JSType; + +namespace Streamistry.Testing.Pipes.Sources; +public class DataReaderSourceTests +{ + private class DataReaderHelper + { + public static Mock Create() + { + var dr = new Mock(); + dr.SetupSequence(m => m.Read()).Returns(true).Returns(true).Returns(false); + dr.Setup(m => m.FieldCount).Returns(2); + dr.SetupSequence(m => m.GetName(0)).Returns("Id").Returns("Id"); + dr.SetupSequence(m => m.GetName(1)).Returns("Name").Returns("Name"); + dr.SetupSequence(m => m.GetValue(0)).Returns(1).Returns(2); + dr.SetupSequence(m => m.GetValue(1)).Returns("Foo").Returns("Bar"); + dr.SetupSequence(m => m.GetInt32(0)).Returns(1).Returns(2); + dr.SetupSequence(m => m.GetString(1)).Returns("Foo").Returns("Bar"); + return dr; + } + } + + [Test] + public void Read_Array_Successful() + { + var dr = DataReaderHelper.Create(); + var source = new DataReaderAsArraySource(dr.Object); + var pipeline = new Pipeline(source); + Assert.That(source.GetOutputs(pipeline.Start), Is.EqualTo(new object[] + { new object[] { 1, "Foo" }, new object[] { 2, "Bar" } })); + } + + [Test] + public void Read_Dictionary_Successful() + { + var dr = DataReaderHelper.Create(); + var source = new DataReaderAsDictionarySource(dr.Object); + var pipeline = new Pipeline(source); + var data = source.GetOutputs(pipeline.Start); + Assert.That(data, Has.Length.EqualTo(2)); + Assert.Multiple(() => + { + Assert.That(data[0]!.ContainsKey("Id"), Is.True); + Assert.That(data[0]!.ContainsKey("Name"), Is.True); + Assert.That(data[0]!["Id"], Is.EqualTo(1)); + Assert.That(data[0]!["Name"], Is.EqualTo("Foo")); + Assert.That(data[1]!.ContainsKey("Id"), Is.True); + Assert.That(data[1]!.ContainsKey("Name"), Is.True); + Assert.That(data[1]!["Id"], Is.EqualTo(2)); + Assert.That(data[1]!["Name"], Is.EqualTo("Bar")); + }); + } + + [Test] + public void Read_Value_Successful() + { + var dr = DataReaderHelper.Create(); + var source = new DataReaderAsValueSource(dr.Object); + var pipeline = new Pipeline(source); + var data = source.GetOutputs(pipeline.Start); + Assert.That(data, Has.Length.EqualTo(2)); + Assert.Multiple(() => + { + Assert.That(data, Does.Contain(1)); + Assert.That(data, Does.Contain(2)); + }); + } +} diff --git a/Streamistry.Testing/Streamistry.Testing.csproj b/Streamistry.Testing/Streamistry.Testing.csproj index feff0ba..f00ce1b 100644 --- a/Streamistry.Testing/Streamistry.Testing.csproj +++ b/Streamistry.Testing/Streamistry.Testing.csproj @@ -1,6 +1,7 @@ + all