Skip to content

Commit

Permalink
#45 Added support for the ReaderDisposable* delegate overloads in Sql…
Browse files Browse the repository at this point in the history
…Batch.

Changed how the state of a DbBatchDataReader is handled. The previous way would of caused issues if the reader was closed/disposed before it had finished reading.
  • Loading branch information
billings7 committed Jun 1, 2017
1 parent 7c31553 commit c8e8abd
Show file tree
Hide file tree
Showing 9 changed files with 767 additions and 92 deletions.
138 changes: 107 additions & 31 deletions Database/DbBatchDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
using System.Data.Common;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
Expand All @@ -39,6 +40,7 @@
using WebApplications.Utilities.Annotations;
using WebApplications.Utilities.Database.Exceptions;
using WebApplications.Utilities.Logging;
using WebApplications.Utilities.Threading;

namespace WebApplications.Utilities.Database
{
Expand Down Expand Up @@ -95,49 +97,73 @@ private static InvalidOperationException FinishedError()
private static InvalidOperationException ClosedError([CallerMemberName] string name = null)
=> new InvalidOperationException($"Invalid attempt to call {name} when reader is closed.");

private int _state = (int)BatchReaderState.Open;
[Flags]
private enum BatchReaderState : byte
{
Open = 0,
Finished = 1,
Closed = 2
}

[NotNull]
private readonly object _stateLock = new object();
private BatchReaderState _state = BatchReaderState.Open;

/// <summary>
/// Gets or sets the state of the reader.
/// Sets the state of the reader.
/// </summary>
/// <value>
/// The state.
/// </value>
internal BatchReaderState State
/// <param name="state">The state.</param>
private void SetState(BatchReaderState state)
{
get => (BatchReaderState)_state;
set
Debug.Assert(state != BatchReaderState.Open);

if ((state & _state) == state) return;

lock (_stateLock)
{
int iv = (int)value;
int state;
do
{
state = _state;
if (iv <= state) return;
} while (Interlocked.CompareExchange(ref _state, iv, state) != state);
if ((state & _state) == state) return;
_state |= state;
}

Debug.Assert(_state != BatchReaderState.Open);

_finishedCompletionSource?.TrySetCompleted();
}

/// <summary>
/// Gets a value indicating whether the reader has finished reading data from the underlying reader.
/// </summary>
/// <value>
/// <see langword="true" /> if this instance is finished reading from the underlying reader; otherwise, <see langword="false" />.
/// </value>
/// <exception cref="System.ArgumentOutOfRangeException">value</exception>
protected internal bool IsFinishedReading => (_state & BatchReaderState.Finished) == BatchReaderState.Finished;

/// <summary>
/// Indicates that this reader has finished reading from the underlying reader.
/// </summary>
internal void SetFinished() => SetState(BatchReaderState.Finished);

/// <summary>
/// Gets a value indicating whether this <see cref="DbBatchDataReader"/> is open.
/// </summary>
/// <value>
/// <see langword="true" /> if open; otherwise, <see langword="false" />.
/// </value>
public bool IsOpen => State == BatchReaderState.Open && !_skipRows;
public bool IsOpen => _state == BatchReaderState.Open && !_skipRows;

/// <summary>
/// Gets a value indicating whether the reader has finished reading data.
/// Gets a value indicating whether the reader has finished reading data or the rest of the data should be skipped.
/// </summary>
/// <value>
/// <see langword="true" /> if this reader is finished; otherwise, <see langword="false" />.
/// </value>
protected bool IsFinished => State == BatchReaderState.Finished || (!IsClosed && _skipRows);
protected bool IsFinished => IsFinishedReading || (_state == BatchReaderState.Open && _skipRows);

/// <summary>Gets a value indicating whether the <see cref="T:System.Data.Common.DbDataReader" /> is closed.</summary>
/// <returns>true if the <see cref="T:System.Data.Common.DbDataReader" /> is closed; otherwise false.</returns>
/// <exception cref="T:System.InvalidOperationException">The <see cref="T:System.Data.SqlClient.SqlDataReader" /> is closed. </exception>
public override bool IsClosed => State == BatchReaderState.Closed;
public override bool IsClosed => (_state & BatchReaderState.Closed) == BatchReaderState.Closed;

/// <summary>
/// The command behavior.
Expand Down Expand Up @@ -371,25 +397,82 @@ internal async Task StartAsync(CancellationToken cancellationToken)
}

/// <summary>
/// Reads the result sets while the reader <see cref="IsOpen"/>.
/// Reads the result sets while the reader is not <see cref="BatchReaderState.Finished"/>.
/// </summary>
/// <param name="cancellationToken">The cancellation instruction.</param>
/// <returns>A task representing the asynchronous operation.</returns>
[NotNull]
internal Task ReadTillClosedAsync(CancellationToken cancellationToken)
internal Task ReadTillFinishedAsync(CancellationToken cancellationToken)
{
return IsOpen ? DoAsync() : TaskResult.Completed;
return IsFinishedReading ? TaskResult.Completed : DoAsync();

async Task DoAsync()
{
// ReSharper disable once PossibleNullReferenceException
while (IsOpen && _hasResultSet)
while (!IsFinishedReading && _hasResultSet)
{
_hasResultSet.Value = await BaseReader.NextResultAsync(cancellationToken).ConfigureAwait(false);
}
}
}

private TaskCompletionSource _finishedCompletionSource;
internal TaskCompletionSource FinishedCompletionSource => _finishedCompletionSource;

/// <summary>
/// Gets an <see cref="IDisposable"/> which will close this reader and complete the <see cref="FinishedCompletionSource"/> when disposed.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns></returns>
[NotNull]
internal IDisposable GetDisposer(CancellationToken cancellationToken)
{
// Create a new TCS if there isn't already one
if (_finishedCompletionSource == null)
{
TaskCompletionSource newSource = new TaskCompletionSource(SqlBatchResult.CompletionSourceOptions);
Interlocked.CompareExchange(ref _finishedCompletionSource, newSource, null);
}


TaskCompletionSource completionSource = _finishedCompletionSource;
CancellationTokenRegistration reg = default(CancellationTokenRegistration);

// If the reader is already over, we can just complete it
if (!IsOpen) completionSource.TrySetCompleted();

// If the token can be cancelled, then the source should be cancelled when the token is
else if (cancellationToken.CanBeCanceled)
reg = cancellationToken.Register(() => completionSource.TrySetCanceled());

return new Disposer(this, reg);
}

/// <summary>
/// Closes a reader on <see cref="Dispose"/>.
/// </summary>
private class Disposer : IDisposable
{
[NotNull]
private readonly DbBatchDataReader _reader;
private readonly CancellationTokenRegistration _cancellation;

public Disposer([NotNull] DbBatchDataReader reader, CancellationTokenRegistration cancellation)
{
_reader = reader;
_cancellation = cancellation;
}


/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void Dispose()
{
// Closing the reader sets the State, which completes the TCS
_reader.Close();
_cancellation.Dispose();
}
}

/// <summary>Gets a value that indicates whether the column contains nonexistent or missing values.</summary>
/// <returns>true if the specified column is equivalent to <see cref="T:System.DBNull" />; otherwise false.</returns>
/// <param name="ordinal">The zero-based column ordinal.</param>
Expand All @@ -405,7 +488,7 @@ public override Task<bool> IsDBNullAsync(int ordinal, CancellationToken cancella
=> BaseReaderOpen().IsDBNullAsync(ordinal, cancellationToken);

/// <summary>Closes the <see cref="T:System.Data.Common.DbDataReader" /> object.</summary>
public override void Close() => State = BatchReaderState.Closed;
public override void Close() => SetState(BatchReaderState.Closed);

/// <summary>Releases the managed resources used by the <see cref="T:System.Data.Common.DbDataReader" /> and optionally releases the unmanaged resources.</summary>
/// <param name="disposing">true to release managed and unmanaged resources; false to release only unmanaged resources.</param>
Expand Down Expand Up @@ -638,13 +721,6 @@ public override IEnumerator GetEnumerator()
protected internal abstract XmlReader GetXmlReader();
}

internal enum BatchReaderState : byte
{
Open,
Finished,
Closed
}

/// <summary>
/// Base class for reading a forward-only stream of rows from a SQL Server database for a single <see cref="SqlBatchCommand"/>.
/// </summary>
Expand Down
116 changes: 116 additions & 0 deletions Database/Generated/SqlProgramGeneric.Generated.tt
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,66 @@ namespace WebApplications.Utilities.Database
return this.AddExecuteReader<TOut>(program, resultFunc, out result, behavior, c => c.SetParameters(<#= Strings.Parameters #>, constraintMode ?? program.ConstraintMode), suppressErrors, exceptionHandler);
}

/// <summary>
/// Adds the specified program to the batch.
/// </summary>
<#= Strings.TypeComments #>
/// <param name="program">The program to add to the batch.</param>
/// <param name="resultAction">The action used to process the result.</param>
/// <param name="result">A <see cref="SqlBatchResult" /> which can be used to wait for the program to finish executing.</param>
<#= Strings.ParameterComments #>
/// <param name="behavior">The query's effect on the database.</param>
/// <param name="suppressErrors">if set to <see langword="true" /> any errors that occur within this program
/// wont cause an exception to be thrown for the whole batch, unless an <paramref name="exceptionHandler"/>
/// is specified and doesn't suppress the error. The program itself will still throw an exception.</param>
/// <param name="exceptionHandler">The optional exception handler.</param>
/// <param name="constraintMode">The constraint mode, if set will override the configured default for the program.</param>
/// <returns>This <see cref="SqlBatch"/> instance.</returns>
[NotNull]
public SqlBatch AddExecuteReader<<#= Strings.TypeParams#>>(
[NotNull] SqlProgram<<#= Strings.TypeParams#>> program,
[NotNull] ResultDisposableDelegateAsync resultAction,
[NotNull] out SqlBatchResult result,
<#= Strings.InputParametersTypedDefault #>,
CommandBehavior behavior = CommandBehavior.Default,
bool suppressErrors = false,
ExceptionHandler exceptionHandler = null,
TypeConstraintMode? constraintMode = null)
{
return this.AddExecuteReader(program, resultAction, out result, behavior, c => c.SetParameters(<#= Strings.Parameters #>, constraintMode ?? program.ConstraintMode), suppressErrors, exceptionHandler);
}

/// <summary>
/// Adds the specified program to the batch.
/// The value returned by the <paramref name="resultFunc"/> will be returned by the <see cref="SqlBatchResult{T}"/>.
/// </summary>
<#= Strings.TypeComments #>
/// <typeparam name="TOut">The type of the result.</typeparam>
/// <param name="program">The program to add to the batch.</param>
/// <param name="resultFunc">The function used to process the result.</param>
/// <param name="result">A <see cref="SqlBatchResult" /> which can be used to get the value returned by the <paramref name="resultFunc"/>.</param>
<#= Strings.ParameterComments #>
/// <param name="behavior">The query's effect on the database.</param>
/// <param name="suppressErrors">if set to <see langword="true" /> any errors that occur within this program
/// wont cause an exception to be thrown for the whole batch, unless an <paramref name="exceptionHandler"/>
/// is specified and doesn't suppress the error. The program itself will still throw an exception.</param>
/// <param name="exceptionHandler">The optional exception handler.</param>
/// <param name="constraintMode">The constraint mode, if set will override the configured default for the program.</param>
/// <returns>This <see cref="SqlBatch"/> instance.</returns>
[NotNull]
public SqlBatch AddExecuteReader<<#= Strings.TypeParams#>, TOut>(
[NotNull] SqlProgram<<#= Strings.TypeParams#>> program,
[NotNull] ResultDisposableDelegateAsync<TOut> resultFunc,
[NotNull] out SqlBatchResult<TOut> result,
<#= Strings.InputParametersTypedDefault #>,
CommandBehavior behavior = CommandBehavior.Default,
bool suppressErrors = false,
ExceptionHandler exceptionHandler = null,
TypeConstraintMode? constraintMode = null)
{
return this.AddExecuteReader<TOut>(program, resultFunc, out result, behavior, c => c.SetParameters(<#= Strings.Parameters #>, constraintMode ?? program.ConstraintMode), suppressErrors, exceptionHandler);
}

/// <summary>
/// Adds the specified program to the batch.
/// </summary>
Expand Down Expand Up @@ -1285,6 +1345,62 @@ namespace WebApplications.Utilities.Database
{
return this.AddExecuteXmlReader<TOut>(program, resultFunc, out result, c => c.SetParameters(<#= Strings.Parameters #>, constraintMode ?? program.ConstraintMode), suppressErrors, exceptionHandler);
}

/// <summary>
/// Adds the specified program to the batch.
/// </summary>
<#= Strings.TypeComments #>
/// <param name="program">The program to add to the batch.</param>
/// <param name="resultAction">The action used to process the result.</param>
/// <param name="result">A <see cref="SqlBatchResult" /> which can be used to wait for the program to finish executing.</param>
<#= Strings.ParameterComments #>
/// <param name="suppressErrors">if set to <see langword="true" /> any errors that occur within this program
/// wont cause an exception to be thrown for the whole batch, unless an <paramref name="exceptionHandler"/>
/// is specified and doesn't suppress the error. The program itself will still throw an exception.</param>
/// <param name="exceptionHandler">The optional exception handler.</param>
/// <param name="constraintMode">The constraint mode, if set will override the configured default for the program.</param>
/// <returns>This <see cref="SqlBatch"/> instance.</returns>
[NotNull]
public SqlBatch AddExecuteXmlReader<<#= Strings.TypeParams#>>(
[NotNull] SqlProgram<<#= Strings.TypeParams#>> program,
[NotNull] XmlResultDisposableDelegateAsync resultAction,
[NotNull] out SqlBatchResult result,
<#= Strings.InputParametersTypedDefault #>,
bool suppressErrors = false,
ExceptionHandler exceptionHandler = null,
TypeConstraintMode? constraintMode = null)
{
return this.AddExecuteXmlReader(program, resultAction, out result, c => c.SetParameters(<#= Strings.Parameters #>, constraintMode ?? program.ConstraintMode), suppressErrors, exceptionHandler);
}

/// <summary>
/// Adds the specified program to the batch.
/// The value returned by the <paramref name="resultFunc"/> will be returned by the <see cref="SqlBatchResult{T}"/>.
/// </summary>
<#= Strings.TypeComments #>
/// <typeparam name="TOut">The type of the result.</typeparam>
/// <param name="program">The program to add to the batch.</param>
/// <param name="resultFunc">The function used to process the result.</param>
/// <param name="result">A <see cref="SqlBatchResult" /> which can be used to get the value returned by the <paramref name="resultFunc"/>.</param>
<#= Strings.ParameterComments #>
/// <param name="suppressErrors">if set to <see langword="true" /> any errors that occur within this program
/// wont cause an exception to be thrown for the whole batch, unless an <paramref name="exceptionHandler"/>
/// is specified and doesn't suppress the error. The program itself will still throw an exception.</param>
/// <param name="exceptionHandler">The optional exception handler.</param>
/// <param name="constraintMode">The constraint mode, if set will override the configured default for the program.</param>
/// <returns>This <see cref="SqlBatch"/> instance.</returns>
[NotNull]
public SqlBatch AddExecuteXmlReader<<#= Strings.TypeParams#>, TOut>(
[NotNull] SqlProgram<<#= Strings.TypeParams#>> program,
[NotNull] XmlResultDisposableDelegateAsync<TOut> resultFunc,
[NotNull] out SqlBatchResult<TOut> result,
<#= Strings.InputParametersTypedDefault #>,
bool suppressErrors = false,
ExceptionHandler exceptionHandler = null,
TypeConstraintMode? constraintMode = null)
{
return this.AddExecuteXmlReader<TOut>(program, resultFunc, out result, c => c.SetParameters(<#= Strings.Parameters #>, constraintMode ?? program.ConstraintMode), suppressErrors, exceptionHandler);
}
}
#endregion

Expand Down
Loading

0 comments on commit c8e8abd

Please sign in to comment.