Skip to content

Commit

Permalink
feat: use any IDataReader as a data source (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Sep 21, 2024
1 parent 0551710 commit 1a8ab6d
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 0 deletions.
124 changes: 124 additions & 0 deletions Streamistry.Core/Pipes/Sources/DataReaderSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Collections.Specialized;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
using System.Xml;
using Streamistry.Observability;

namespace Streamistry.Pipes.Sources;

public abstract class DataReaderSource<TOutput> : Source<TOutput>
{
protected IDataReader DataReader { get; set; }

public DataReaderSource(IDataReader dataReader, ObservabilityProvider? provider = null)
: base(provider)
=> DataReader = dataReader;

public DataReaderSource(Pipeline upstream, IDataReader dataReader)
: base(upstream)
=> DataReader = dataReader;

protected override bool TryReadNext(out TOutput? item)
{
if (DataReader.Read())
{
item = OnRead();
return true;
}
item = default;
return false;
}

protected abstract TOutput? OnRead();
}

public class DataReaderAsDictionarySource : DataReaderSource<IReadOnlyDictionary<string, object>>
{
public DataReaderAsDictionarySource(IDataReader dataReader, ObservabilityProvider? provider = null)
: base(dataReader, provider)
{ }

public DataReaderAsDictionarySource(Pipeline upstream, IDataReader dataReader)
: base(upstream, dataReader)
{ }

protected override IReadOnlyDictionary<string, object>? OnRead()
{
var dictionary = new Dictionary<string, object>();
foreach (var i in Enumerable.Range(0, DataReader.FieldCount))
dictionary.Add(string.Intern(DataReader.GetName(i)), DataReader.GetValue(i));
return dictionary.AsReadOnly();
}
}

public class DataReaderAsArraySource : DataReaderSource<ImmutableArray<object>>
{
public DataReaderAsArraySource(IDataReader dataReader, ObservabilityProvider? provider = null)
: base(dataReader, provider)
{ }

public DataReaderAsArraySource(Pipeline upstream, IDataReader dataReader)
: base(upstream, dataReader)
{ }

protected override ImmutableArray<object> OnRead()
{
var list = new List<object>();
foreach (var i in Enumerable.Range(0, DataReader.FieldCount))
list.Add(DataReader.GetValue(i));
return list.ToImmutableArray();
}
}

public class DataReaderAsValueSource<TOutput> : DataReaderSource<TOutput>
{
public DataReaderAsValueSource(IDataReader dataReader, ObservabilityProvider? provider = null)
: base(dataReader, provider)
{ }

public DataReaderAsValueSource(Pipeline upstream, IDataReader dataReader)
: base(upstream, dataReader)
{ }

protected override TOutput? OnRead()
{
if (DataReader.IsDBNull(0))
return default;

if (typeof(TOutput) == typeof(object))
return (TOutput)DataReader.GetValue(0);
else if (typeof(TOutput) == typeof(bool))
return (TOutput)(object)DataReader.GetBoolean(0);
else if (typeof(TOutput) == typeof(byte))
return (TOutput)(object)DataReader.GetByte(0);
else if (typeof(TOutput) == typeof(short))
return (TOutput)(object)DataReader.GetInt16(0);
else if (typeof(TOutput) == typeof(int))
return (TOutput)(object)DataReader.GetInt32(0);
else if (typeof(TOutput) == typeof(long))
return (TOutput)(object)DataReader.GetInt64(0);
else if (typeof(TOutput) == typeof(float))
return (TOutput)(object)DataReader.GetFloat(0);
else if (typeof(TOutput) == typeof(double))
return (TOutput)(object)DataReader.GetDouble(0);
else if (typeof(TOutput) == typeof(decimal))
return (TOutput)(object)DataReader.GetDecimal(0);
else if (typeof(TOutput) == typeof(Guid))
return (TOutput)(object)DataReader.GetGuid(0);
else if (typeof(TOutput) == typeof(char))
return (TOutput)(object)DataReader.GetChar(0);
else if (typeof(TOutput) == typeof(string))
return (TOutput)(object)DataReader.GetString(0);
else if (typeof(TOutput) == typeof(DateTime))
return (TOutput)(object)DataReader.GetDateTime(0);
else
throw new NotSupportedException($"Type {typeof(TOutput)} is not supported.");
}
}
78 changes: 78 additions & 0 deletions Streamistry.Testing/Pipes/Sources/DataReaderSourceTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Moq;
using NUnit.Framework;
using Streamistry.Pipes.Sources;
using Streamistry.Testability;
using static System.Runtime.InteropServices.JavaScript.JSType;

namespace Streamistry.Testing.Pipes.Sources;
public class DataReaderSourceTests
{
private class DataReaderHelper
{
public static Mock<IDataReader> Create()
{
var dr = new Mock<IDataReader>();
dr.SetupSequence(m => m.Read()).Returns(true).Returns(true).Returns(false);
dr.Setup(m => m.FieldCount).Returns(2);
dr.SetupSequence(m => m.GetName(0)).Returns("Id").Returns("Id");
dr.SetupSequence(m => m.GetName(1)).Returns("Name").Returns("Name");
dr.SetupSequence(m => m.GetValue(0)).Returns(1).Returns(2);
dr.SetupSequence(m => m.GetValue(1)).Returns("Foo").Returns("Bar");
dr.SetupSequence(m => m.GetInt32(0)).Returns(1).Returns(2);
dr.SetupSequence(m => m.GetString(1)).Returns("Foo").Returns("Bar");
return dr;
}
}

[Test]
public void Read_Array_Successful()
{
var dr = DataReaderHelper.Create();
var source = new DataReaderAsArraySource(dr.Object);
var pipeline = new Pipeline(source);
Assert.That(source.GetOutputs(pipeline.Start), Is.EqualTo(new object[]
{ new object[] { 1, "Foo" }, new object[] { 2, "Bar" } }));
}

[Test]
public void Read_Dictionary_Successful()
{
var dr = DataReaderHelper.Create();
var source = new DataReaderAsDictionarySource(dr.Object);
var pipeline = new Pipeline(source);
var data = source.GetOutputs(pipeline.Start);
Assert.That(data, Has.Length.EqualTo(2));
Assert.Multiple(() =>
{
Assert.That(data[0]!.ContainsKey("Id"), Is.True);
Assert.That(data[0]!.ContainsKey("Name"), Is.True);
Assert.That(data[0]!["Id"], Is.EqualTo(1));
Assert.That(data[0]!["Name"], Is.EqualTo("Foo"));
Assert.That(data[1]!.ContainsKey("Id"), Is.True);
Assert.That(data[1]!.ContainsKey("Name"), Is.True);
Assert.That(data[1]!["Id"], Is.EqualTo(2));
Assert.That(data[1]!["Name"], Is.EqualTo("Bar"));
});
}

[Test]
public void Read_Value_Successful()
{
var dr = DataReaderHelper.Create();
var source = new DataReaderAsValueSource<int>(dr.Object);
var pipeline = new Pipeline(source);
var data = source.GetOutputs(pipeline.Start);
Assert.That(data, Has.Length.EqualTo(2));
Assert.Multiple(() =>
{
Assert.That(data, Does.Contain(1));
Assert.That(data, Does.Contain(2));
});
}
}
1 change: 1 addition & 0 deletions Streamistry.Testing/Streamistry.Testing.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="Moq" Version="4.20.72" />
<PackageReference Include="NUnit" Version="4.2.2" />
<PackageReference Include="NUnit.Analyzers" Version="4.3.0">
<PrivateAssets>all</PrivateAssets>
Expand Down

0 comments on commit 1a8ab6d

Please sign in to comment.