Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for CancellationToken in publishers #202

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Silverback.Messaging.Messages;
Expand Down Expand Up @@ -96,45 +97,48 @@ public int ExecuteSaveTransaction(Func<int> saveChanges)
/// <param name="saveChangesAsync">
/// The delegate to the original <c>SaveChangesAsync</c> method.
/// </param>
/// <param name="cancellationToken">
/// A <see cref="CancellationToken" /> used to cancel the operation.
/// </param>
/// <returns>
/// A <see cref="Task{TResult}" /> representing the asynchronous operation. The task result contains the
/// number of entities saved to the database.
/// </returns>
public Task<int> ExecuteSaveTransactionAsync(Func<Task<int>> saveChangesAsync)
public Task<int> ExecuteSaveTransactionAsync(Func<Task<int>> saveChangesAsync, CancellationToken cancellationToken = default)
{
Check.NotNull(saveChangesAsync, nameof(saveChangesAsync));

return ExecuteSaveTransactionAsync(saveChangesAsync, true);
return ExecuteSaveTransactionAsync(saveChangesAsync, true, cancellationToken);
}

private async Task<int> ExecuteSaveTransactionAsync(Func<Task<int>> saveChanges, bool executeAsync)
private async Task<int> ExecuteSaveTransactionAsync(Func<Task<int>> saveChanges, bool executeAsync, CancellationToken cancellationToken = default)
{
await PublishEventAsync<TransactionStartedEvent>(executeAsync).ConfigureAwait(false);
await PublishEventAsync<TransactionStartedEvent>(executeAsync, cancellationToken).ConfigureAwait(false);

var saved = false;
try
{
await PublishDomainEventsAsync(executeAsync).ConfigureAwait(false);
await PublishDomainEventsAsync(executeAsync, cancellationToken).ConfigureAwait(false);

int result = await saveChanges().ConfigureAwait(false);

saved = true;

await PublishEventAsync<TransactionCompletedEvent>(executeAsync).ConfigureAwait(false);
await PublishEventAsync<TransactionCompletedEvent>(executeAsync, cancellationToken).ConfigureAwait(false);

return result;
}
catch
{
if (!saved)
await PublishEventAsync<TransactionAbortedEvent>(executeAsync).ConfigureAwait(false);
await PublishEventAsync<TransactionAbortedEvent>(executeAsync, cancellationToken).ConfigureAwait(false);

throw;
}
}

[SuppressMessage("", "VSTHRD103", Justification = Justifications.ExecutesSyncOrAsync)]
private async Task PublishDomainEventsAsync(bool executeAsync)
private async Task PublishDomainEventsAsync(bool executeAsync, CancellationToken cancellationToken)
{
var events = GetDomainEvents();

Expand All @@ -143,7 +147,7 @@ private async Task PublishDomainEventsAsync(bool executeAsync)
{
if (executeAsync)
{
await events.ForEachAsync(message => _publisher.PublishAsync(message))
await events.ForEachAsync(message => _publisher.PublishAsync(message, cancellationToken))
.ConfigureAwait(false);
}
else
Expand All @@ -168,11 +172,11 @@ private IReadOnlyCollection<object> GetDomainEvents() =>
}).ToList();

[SuppressMessage("", "VSTHRD103", Justification = Justifications.ExecutesSyncOrAsync)]
private async Task PublishEventAsync<TEvent>(bool executeAsync)
private async Task PublishEventAsync<TEvent>(bool executeAsync, CancellationToken cancellationToken)
where TEvent : new()
{
if (executeAsync)
await _publisher.PublishAsync(new TEvent()).ConfigureAwait(false);
await _publisher.PublishAsync(new TEvent(), cancellationToken).ConfigureAwait(false);
else
_publisher.Publish(new TEvent());
}
Expand Down
30 changes: 19 additions & 11 deletions src/Silverback.Core.Model/Messaging/Publishing/CommandPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// This code is licensed under MIT license (see LICENSE file for details)

using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Silverback.Messaging.Messages;

Expand Down Expand Up @@ -38,24 +39,31 @@ public TResult Execute<TResult>(ICommand<TResult> commandMessage) =>
public TResult Execute<TResult>(ICommand<TResult> commandMessage, bool throwIfUnhandled) =>
_publisher.Publish<TResult>(commandMessage, throwIfUnhandled).SingleOrDefault();

/// <inheritdoc cref="ICommandPublisher.ExecuteAsync(ICommand)" />
public Task ExecuteAsync(ICommand commandMessage) => _publisher.PublishAsync(commandMessage);
/// <inheritdoc cref="ICommandPublisher.ExecuteAsync(ICommand, CancellationToken)" />
public Task ExecuteAsync(ICommand commandMessage, CancellationToken cancellationToken = default) =>
_publisher.PublishAsync(commandMessage, cancellationToken);

/// <inheritdoc cref="ICommandPublisher.ExecuteAsync(ICommand, bool)" />
public Task ExecuteAsync(ICommand commandMessage, bool throwIfUnhandled) =>
_publisher.PublishAsync(commandMessage, throwIfUnhandled);
/// <inheritdoc cref="ICommandPublisher.ExecuteAsync(ICommand, bool, CancellationToken)" />
public Task ExecuteAsync(
ICommand commandMessage,
bool throwIfUnhandled,
CancellationToken cancellationToken = default) =>
_publisher.PublishAsync(commandMessage, throwIfUnhandled, cancellationToken);

/// <inheritdoc cref="ICommandPublisher.ExecuteAsync{TResult}(ICommand{TResult})" />
public async Task<TResult> ExecuteAsync<TResult>(ICommand<TResult> commandMessage) =>
(await _publisher.PublishAsync<TResult>(commandMessage)
/// <inheritdoc cref="ICommandPublisher.ExecuteAsync{TResult}(ICommand{TResult}, CancellationToken)" />
public async Task<TResult> ExecuteAsync<TResult>(
ICommand<TResult> commandMessage,
CancellationToken cancellationToken = default) =>
(await _publisher.PublishAsync<TResult>(commandMessage, cancellationToken)
.ConfigureAwait(false))
.SingleOrDefault();

/// <inheritdoc cref="ICommandPublisher.ExecuteAsync{TResult}(ICommand{TResult}, bool)" />
/// <inheritdoc cref="ICommandPublisher.ExecuteAsync{TResult}(ICommand{TResult}, bool, CancellationToken)" />
public async Task<TResult> ExecuteAsync<TResult>(
ICommand<TResult> commandMessage,
bool throwIfUnhandled) =>
(await _publisher.PublishAsync<TResult>(commandMessage, throwIfUnhandled)
bool throwIfUnhandled,
CancellationToken cancellationToken = default) =>
(await _publisher.PublishAsync<TResult>(commandMessage, throwIfUnhandled, cancellationToken)
.ConfigureAwait(false))
.SingleOrDefault();
}
Expand Down
15 changes: 10 additions & 5 deletions src/Silverback.Core.Model/Messaging/Publishing/EventPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2020 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)

using System.Threading;
using System.Threading.Tasks;
using Silverback.Messaging.Messages;

Expand Down Expand Up @@ -29,11 +30,15 @@ public EventPublisher(IPublisher publisher)
public void Publish(IEvent eventMessage, bool throwIfUnhandled) =>
_publisher.Publish(eventMessage, throwIfUnhandled);

/// <inheritdoc cref="IEventPublisher.PublishAsync(IEvent)" />
public Task PublishAsync(IEvent eventMessage) => _publisher.PublishAsync(eventMessage);
/// <inheritdoc cref="IEventPublisher.PublishAsync(IEvent, CancellationToken)" />
public Task PublishAsync(IEvent eventMessage, CancellationToken cancellationToken = default) =>
_publisher.PublishAsync(eventMessage, cancellationToken);

/// <inheritdoc cref="IEventPublisher.PublishAsync(IEvent, bool)" />
public Task PublishAsync(IEvent eventMessage, bool throwIfUnhandled) =>
_publisher.PublishAsync(eventMessage, throwIfUnhandled);
/// <inheritdoc cref="IEventPublisher.PublishAsync(IEvent, bool, CancellationToken)" />
public Task PublishAsync(
IEvent eventMessage,
bool throwIfUnhandled,
CancellationToken cancellationToken = default) =>
_publisher.PublishAsync(eventMessage, throwIfUnhandled, cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2020 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)

using System.Threading;
using System.Threading.Tasks;
using Silverback.Messaging.Messages;

Expand Down Expand Up @@ -80,10 +81,13 @@ public interface ICommandPublisher
/// <param name="commandMessage">
/// The command to be executed.
/// </param>
/// <param name="cancellationToken">
/// A <see cref="CancellationToken" /> used to cancel the operation.
/// </param>
/// <returns>
/// A <see cref="Task" /> representing the asynchronous operation.
/// </returns>
Task ExecuteAsync(ICommand commandMessage);
Task ExecuteAsync(ICommand commandMessage, CancellationToken cancellationToken = default);

/// <summary>
/// Executes the specified command publishing it to the internal bus. The message will be forwarded to
Expand All @@ -98,10 +102,13 @@ public interface ICommandPublisher
/// A boolean value indicating whether an exception must be thrown if no subscriber is handling the
/// message.
/// </param>
/// <param name="cancellationToken">
/// A <see cref="CancellationToken" /> used to cancel the operation.
/// </param>
/// <returns>
/// A <see cref="Task" /> representing the asynchronous operation.
/// </returns>
Task ExecuteAsync(ICommand commandMessage, bool throwIfUnhandled);
Task ExecuteAsync(ICommand commandMessage, bool throwIfUnhandled, CancellationToken cancellationToken = default);

/// <summary>
/// Executes the specified command publishing it to the internal bus. The message will be forwarded to
Expand All @@ -115,11 +122,14 @@ public interface ICommandPublisher
/// <param name="commandMessage">
/// The command to be executed.
/// </param>
/// <param name="cancellationToken">
/// A <see cref="CancellationToken" /> used to cancel the operation.
/// </param>
/// <returns>
/// A <see cref="Task{TResult}" /> representing the asynchronous operation. The task result contains the
/// command result.
/// </returns>
Task<TResult> ExecuteAsync<TResult>(ICommand<TResult> commandMessage);
Task<TResult> ExecuteAsync<TResult>(ICommand<TResult> commandMessage, CancellationToken cancellationToken = default);

/// <summary>
/// Executes the specified command publishing it to the internal bus. The message will be forwarded to
Expand All @@ -137,10 +147,13 @@ public interface ICommandPublisher
/// A boolean value indicating whether an exception must be thrown if no subscriber is handling the
/// message.
/// </param>
/// <param name="cancellationToken">
/// A <see cref="CancellationToken" /> used to cancel the operation.
/// </param>
/// <returns>
/// A <see cref="Task{TResult}" /> representing the asynchronous operation. The task result contains the
/// command result.
/// </returns>
Task<TResult> ExecuteAsync<TResult>(ICommand<TResult> commandMessage, bool throwIfUnhandled);
Task<TResult> ExecuteAsync<TResult>(ICommand<TResult> commandMessage, bool throwIfUnhandled, CancellationToken cancellationToken = default);
}
}
14 changes: 12 additions & 2 deletions src/Silverback.Core.Model/Messaging/Publishing/IEventPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2020 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)

using System.Threading;
using System.Threading.Tasks;
using Silverback.Messaging.Messages;

Expand Down Expand Up @@ -43,10 +44,13 @@ public interface IEventPublisher
/// <param name="eventMessage">
/// The event to be executed.
/// </param>
/// <param name="cancellationToken">
/// A <see cref="CancellationToken" /> used to cancel the operation.
/// </param>
/// <returns>
/// A <see cref="Task" /> representing the asynchronous operation.
/// </returns>
Task PublishAsync(IEvent eventMessage);
Task PublishAsync(IEvent eventMessage, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes the specified event to the internal bus. The message will be forwarded to its subscribers
Expand All @@ -60,9 +64,15 @@ public interface IEventPublisher
/// A boolean value indicating whether an exception must be thrown if no subscriber is handling the
/// message.
/// </param>
/// <param name="cancellationToken">
/// A <see cref="CancellationToken" /> used to cancel the operation.
/// </param>
/// <returns>
/// A <see cref="Task" /> representing the asynchronous operation.
/// </returns>
Task PublishAsync(IEvent eventMessage, bool throwIfUnhandled);
Task PublishAsync(
IEvent eventMessage,
bool throwIfUnhandled,
CancellationToken cancellationToken = default);
}
}
16 changes: 14 additions & 2 deletions src/Silverback.Core.Model/Messaging/Publishing/IQueryPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2020 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)

using System.Threading;
using System.Threading.Tasks;
using Silverback.Messaging.Messages;

Expand Down Expand Up @@ -58,11 +59,16 @@ public interface IQueryPublisher
/// <param name="queryMessage">
/// The query to be executed.
/// </param>
/// <param name="cancellationToken">
/// A <see cref="CancellationToken" /> used to cancel the operation.
/// </param>
/// <returns>
/// A <see cref="Task{TResult}" /> representing the asynchronous operation. The task result contains the
/// query result.
/// </returns>
Task<TResult> ExecuteAsync<TResult>(IQuery<TResult> queryMessage);
Task<TResult> ExecuteAsync<TResult>(
IQuery<TResult> queryMessage,
CancellationToken cancellationToken = default);

/// <summary>
/// Executes the specified query publishing it to the internal bus. The message will be forwarded to its
Expand All @@ -79,10 +85,16 @@ public interface IQueryPublisher
/// A boolean value indicating whether an exception must be thrown if no subscriber is handling the
/// message.
/// </param>
/// <param name="cancellationToken">
/// A <see cref="CancellationToken" /> used to cancel the operation.
/// </param>
/// <returns>
/// A <see cref="Task{TResult}" /> representing the asynchronous operation. The task result contains the
/// query result.
/// </returns>
Task<TResult> ExecuteAsync<TResult>(IQuery<TResult> queryMessage, bool throwIfUnhandled);
Task<TResult> ExecuteAsync<TResult>(
IQuery<TResult> queryMessage,
bool throwIfUnhandled,
CancellationToken cancellationToken = default);
}
}
16 changes: 10 additions & 6 deletions src/Silverback.Core.Model/Messaging/Publishing/QueryPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// This code is licensed under MIT license (see LICENSE file for details)

using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Silverback.Messaging.Messages;

Expand Down Expand Up @@ -31,17 +32,20 @@ public TResult Execute<TResult>(IQuery<TResult> queryMessage) =>
public TResult Execute<TResult>(IQuery<TResult> queryMessage, bool throwIfUnhandled) =>
_publisher.Publish<TResult>(queryMessage, throwIfUnhandled).SingleOrDefault();

/// <inheritdoc cref="IQueryPublisher.ExecuteAsync{TResult}(IQuery{TResult})" />
public async Task<TResult> ExecuteAsync<TResult>(IQuery<TResult> queryMessage) =>
(await _publisher.PublishAsync<TResult>(queryMessage)
/// <inheritdoc cref="IQueryPublisher.ExecuteAsync{TResult}(IQuery{TResult}, CancellationToken)" />
public async Task<TResult> ExecuteAsync<TResult>(
IQuery<TResult> queryMessage,
CancellationToken cancellationToken = default) =>
(await _publisher.PublishAsync<TResult>(queryMessage, cancellationToken)
.ConfigureAwait(false))
.SingleOrDefault();

/// <inheritdoc cref="IQueryPublisher.ExecuteAsync{TResult}(IQuery{TResult}, bool)" />
/// <inheritdoc cref="IQueryPublisher.ExecuteAsync{TResult}(IQuery{TResult}, bool, CancellationToken)" />
public async Task<TResult> ExecuteAsync<TResult>(
IQuery<TResult> queryMessage,
bool throwIfUnhandled) =>
(await _publisher.PublishAsync<TResult>(queryMessage, throwIfUnhandled)
bool throwIfUnhandled,
CancellationToken cancellationToken = default) =>
(await _publisher.PublishAsync<TResult>(queryMessage, throwIfUnhandled, cancellationToken)
.ConfigureAwait(false))
.SingleOrDefault();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Silverback.Messaging.Configuration;
using Silverback.Messaging.Publishing;
Expand Down Expand Up @@ -35,7 +36,7 @@ public ObservableMessagesReturnValueHandler(IPublisher publisher, IBusOptions bu
}

/// <inheritdoc cref="IReturnValueHandler.CanHandle" />
public bool CanHandle(object returnValue) =>
public bool CanHandle(object? returnValue) =>
returnValue != null &&
returnValue.GetType().GetInterfaces().Any(
i => i.IsGenericType &&
Expand All @@ -45,11 +46,11 @@ public bool CanHandle(object returnValue) =>
messageType.IsAssignableFrom(i.GenericTypeArguments[0])));

/// <inheritdoc cref="IReturnValueHandler.Handle" />
public void Handle(object returnValue) =>
_publisher.Publish<object>(((IObservable<object>)returnValue).ToEnumerable());
public void Handle(object? returnValue) =>
_publisher.Publish<object?>(((IObservable<object?>)returnValue!).ToEnumerable());

/// <inheritdoc cref="IReturnValueHandler.HandleAsync" />
public Task HandleAsync(object returnValue) =>
_publisher.PublishAsync<object>(((IObservable<object>)returnValue).ToEnumerable());
public Task HandleAsync(object? returnValue, CancellationToken cancellationToken = default) =>
_publisher.PublishAsync<object>(((IObservable<object?>)returnValue!).ToEnumerable(), cancellationToken);
}
}
Loading