Skip to content

Commit

Permalink
In-process event send option introduction. (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
chullybun committed Mar 29, 2022
1 parent a139755 commit 2505ac5
Show file tree
Hide file tree
Showing 16 changed files with 194 additions and 62 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

Represents the **NuGet** versions.

## v1.0.5
- *Enhancement:* The `EventOutboxEnqueueBase` is the SQL Server event outbox enqueue `IEventSender`. To minimize send latency (increasing real-time event delivery) a primary (alternate) `IEventSender` can be specified (`SetPrimaryEventSender`). This changes the event behaviour whereby the events will be sent via the specified primary first, then enqueued only where the primary fails. The events will still be written to the event outbox but as sent for audit purposes.

## v1.0.4
- *Enhancement:* Integrated SQL Server-based `EventOutbox` code-generation (both database and C#) into _DbEx_ to enable re-use and consistency.

Expand Down
2 changes: 1 addition & 1 deletion DbEx.sln
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DbEx.Test.Console", "tests\DbEx.Test.Console\DbEx.Test.Console.csproj", "{117B6E86-2C88-446E-AC3A-AE1A2E84E2D8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DbEx.Test.OutboxConsole", "tests\DbEx.Test.OutboxConsole\DbEx.Test.OutboxConsole.csproj", "{959DD5E1-530A-42BA-82B8-F17A657AC351}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DbEx.Test.OutboxConsole", "tests\DbEx.Test.OutboxConsole\DbEx.Test.OutboxConsole.csproj", "{959DD5E1-530A-42BA-82B8-F17A657AC351}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,14 @@ Template | Description | Example

<br/>

### Event Outbox Enqueue

The [`EventOutboxDequeueBase`](./src/DbEx/SqlServer/EventOutboxEnqueueBase.cs) provides the base [`IEventSender`](https://github.com/Avanade/CoreEx/blob/main/src/CoreEx/Events/IEventSender.cs) send/enqueue capabilities.

By default the events are first sent/enqueued to the datatbase outbox, then a secondary out-of-process dequeues and sends. This can however introduce unwanted latency depending on the frequency in which the secondary process performs the dequeue and send, as this is essentially a polling-based operation. To improve (minimize) latency, the primary `IEventSender` can be specified using the `SetPrimaryEventSender` method. This will then be used to send the events immediately, and where successful, they will be audited in the database as dequeued event(s); versus on error (as a backup), where they will be enqueued for the out-of-process dequeue and send (as per default).

<br/>

## Other repos

These other _Avanade_ repositories leverage _DbEx_:
Expand Down
32 changes: 16 additions & 16 deletions src/DbEx/Console/MigratorConsoleBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ public async Task<int> RunAsync(string[] args)
}
catch (CommandParsingException cpex)
{
Args.Logger?.LogError(cpex.Message);
Args.Logger?.LogError(string.Empty);
Args.Logger?.LogError("{Content}", cpex.Message);
Args.Logger?.LogError("{Content}", string.Empty);
return 1;
}
}
Expand Down Expand Up @@ -281,11 +281,11 @@ private ValidationResult ValidateMultipleValue(string option, ValidationContext
{
if (gcex.Message != null)
{
Args.Logger?.LogError(gcex.Message);
Args.Logger?.LogError("{Content}", gcex.Message);
if (gcex.InnerException != null)
Args.Logger?.LogError(gcex.InnerException.Message);
Args.Logger?.LogError("{Content}", gcex.InnerException.Message);

Args.Logger?.LogError(string.Empty);
Args.Logger?.LogError("{Content}", string.Empty);
}

return 2;
Expand All @@ -304,7 +304,7 @@ private ValidationResult ValidateMultipleValue(string option, ValidationContext
protected virtual void OnWriteMasthead()
{
if (MastheadText != null)
Logger?.LogInformation(MastheadText);
Logger?.LogInformation("{Content}", MastheadText);
}

/// <summary>
Expand All @@ -313,8 +313,8 @@ protected virtual void OnWriteMasthead()
/// <remarks>Writes the <see cref="AppTitle"/>.</remarks>
protected virtual void OnWriteHeader()
{
Logger?.LogInformation(AppTitle);
Logger?.LogInformation(string.Empty);
Logger?.LogInformation("{Content}", AppTitle);
Logger?.LogInformation("{Content}", string.Empty);
}

/// <summary>
Expand All @@ -332,13 +332,13 @@ public static void WriteStandardizedArgs(MigratorConsoleArgs args)
if (args == null || args.Logger == null)
return;

args.Logger.LogInformation($"Command = {args.MigrationCommand}");
args.Logger.LogInformation($"SchemaOrder = {string.Join(", ", args.SchemaOrder.ToArray())}");
args.Logger.LogInformation($"OutDir = {args.OutputDirectory?.FullName}");
args.Logger.LogInformation($"Assemblies{(args.Assemblies.Count == 0 ? " = none" : ":")}");
args.Logger.LogInformation("{Content}", $"Command = {args.MigrationCommand}");
args.Logger.LogInformation("{Content}", $"SchemaOrder = {string.Join(", ", args.SchemaOrder.ToArray())}");
args.Logger.LogInformation("{Content}", $"OutDir = {args.OutputDirectory?.FullName}");
args.Logger.LogInformation("{Content}", $"Assemblies{(args.Assemblies.Count == 0 ? " = none" : ":")}");
foreach (var a in args.Assemblies)
{
args.Logger.LogInformation($" {a.FullName}");
args.Logger.LogInformation("{Content}", $" {a.FullName}");
}
}

Expand All @@ -348,9 +348,9 @@ public static void WriteStandardizedArgs(MigratorConsoleArgs args)
/// <param name="elapsedMilliseconds">The elapsed execution time in milliseconds.</param>
protected virtual void OnWriteFooter(long elapsedMilliseconds)
{
Logger?.LogInformation(string.Empty);
Logger?.LogInformation($"{AppName} Complete. [{elapsedMilliseconds}ms]");
Logger?.LogInformation(string.Empty);
Logger?.LogInformation("{Content}", string.Empty);
Logger?.LogInformation("{Content}", $"{AppName} Complete. [{elapsedMilliseconds}ms]");
Logger?.LogInformation("{Content}", string.Empty);
}
}
}
4 changes: 2 additions & 2 deletions src/DbEx/Console/SqlServerMigratorConsole.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ protected override async Task<bool> OnMigrateAsync()
if (!await migrator.MigrateAsync().ConfigureAwait(false))
return false;

Logger?.LogInformation(string.Empty);
Logger?.LogInformation(new string('-', 80));
Logger?.LogInformation("{Content}", string.Empty);
Logger?.LogInformation("{Content}", new string('-', 80));

return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/DbEx/DbEx.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<RootNamespace>DbEx</RootNamespace>
<Version>1.0.4</Version>
<Version>1.0.5</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Authors>DbEx Developers</Authors>
<Company>Avanade</Company>
Expand Down
10 changes: 5 additions & 5 deletions src/DbEx/SqlServer/EventOutboxDequeueBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,28 @@ public EventOutboxDequeueBase(IDatabase database, IEventSender eventSender, ILog
/// <summary>
/// Gets the <see cref="IDatabase"/>.
/// </summary>
public IDatabase Database { get; }
protected IDatabase Database { get; }

/// <summary>
/// Gets the <see cref="IEventSender"/>.
/// </summary>
public IEventSender EventSender { get; }
protected IEventSender EventSender { get; }

/// <summary>
/// Gets the <see cref="ILogger"/>.
/// </summary>
public ILogger Logger { get; }
protected ILogger Logger { get; }

/// <summary>
/// Gets the event outbox <i>dequeue</i> stored procedure name.
/// </summary>
public abstract string DequeueStoredProcedure { get; }
protected abstract string DequeueStoredProcedure { get; }

/// <summary>
/// Gets the column name for the <see cref="EventDataBase.Id"/> property within the event outbox table.
/// </summary>
/// <remarks>Defaults to '<c>EventId</c>'.</remarks>
public virtual string EventIdColumnName => "EventId";
protected virtual string EventIdColumnName => "EventId";

/// <summary>
/// Gets or sets the default partition key.
Expand Down
88 changes: 69 additions & 19 deletions src/DbEx/SqlServer/EventOutboxEnqueueBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

using CoreEx.Events;
using CoreEx.Json;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

Expand All @@ -13,34 +15,49 @@ namespace DbEx.SqlServer
/// <summary>
/// Provides the base <see cref="EventSendData"/> <see cref="IDatabase">database</see> <i>outbox enqueue</i> <see cref="IEventSender.SendAsync(EventSendData[])"/>.
/// </summary>
/// <remarks>By default the events are first sent/enqueued to the datatbase outbox, then a secondary out-of-process dequeues and sends. This can however introduce unwanted latency depending on the frequency in which the secondary process
/// performs the dequeue and send, as this is essentially a polling-based operation. To improve (minimize) latency, the primary <see cref="IEventSender"/> can be specified using <see cref="SetPrimaryEventSender(IEventSender)"/>. This will
/// then be used to send the events immediately, and where successful, they will be audited in the database as dequeued event(s); versus on error (as a backup), where they will be enqueued for the out-of-process dequeue and send (as per default).</remarks>
public abstract class EventOutboxEnqueueBase : IEventSender
{
private IEventSender? _eventSender;

/// <summary>
/// Initializes a new instance of the <see cref="EventOutboxEnqueueBase"/> class.
/// </summary>
/// <param name="database">The <see cref="IDatabase"/>.</param>
public EventOutboxEnqueueBase(IDatabase database) => Database = database ?? throw new ArgumentNullException(nameof(database));
/// <param name="logger">The <see cref="ILogger"/>.</param>
public EventOutboxEnqueueBase(IDatabase database, ILogger<EventOutboxEnqueueBase> logger)
{
Database = database ?? throw new ArgumentNullException(nameof(database));
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

/// <summary>
/// Gets the <see cref="IDatabase"/>.
/// </summary>
public IDatabase Database { get; }
protected IDatabase Database { get; }

/// <summary>
/// Gets the <see cref="ILogger"/>.
/// </summary>
protected ILogger Logger { get; }

/// <summary>
/// Gets the database type name for the <see cref="TableValuedParameter"/>.
/// </summary>
public abstract string DbTvpTypeName { get; }
protected abstract string DbTvpTypeName { get; }

/// <summary>
/// Gets the event outbox <i>enqueue</i> stored procedure name.
/// </summary>
public abstract string EnqueueStoredProcedure { get; }
protected abstract string EnqueueStoredProcedure { get; }

/// <summary>
/// Gets the column name for the <see cref="EventDataBase.Id"/> property within the event outbox table.
/// </summary>
/// <remarks>Defaults to '<c>EventId</c>'.</remarks>
public virtual string EventIdColumnName => "EventId";
protected virtual string EventIdColumnName => "EventId";

/// <summary>
/// Gets or sets the default partition key.
Expand All @@ -54,6 +71,52 @@ public abstract class EventOutboxEnqueueBase : IEventSender
/// <remarks>Defaults to '<c>$default</c>'. This will ensure that there is always a value recorded in the database.</remarks>
public string DefaultDestination { get; set; } = "$default";

/// <summary>
/// Sets the optional <see cref="IEventSender"/> to act as the primary <see cref="IEventSender"/> where <i>outbox enqueue</i> is to provide backup/audit capabilities.
/// </summary>
public void SetPrimaryEventSender(IEventSender eventSender)
{
if (eventSender != null & eventSender is EventOutboxEnqueueBase)
throw new ArgumentException($"{nameof(SetPrimaryEventSender)} value must not be of Type {nameof(EventOutboxEnqueueBase)}.", nameof(eventSender));

_eventSender = eventSender;
}

/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="events"><inheritdoc/></param>
/// <remarks>Executes the <see cref="EnqueueStoredProcedure"/> to <i>send / enqueue</i> the <paramref name="events"/> to the underlying database outbox tables.</remarks>
public async Task SendAsync(params EventSendData[] events)
{
if (events == null || !events.Any())
return;

Stopwatch sw = Stopwatch.StartNew();
var setEventsAsDequeued = _eventSender != null;
if (setEventsAsDequeued)
{
try
{
await _eventSender!.SendAsync(events).ConfigureAwait(false);
sw.Stop();
Logger.LogDebug("{EventCount} event(s) were sent successfully. [Sender={Sender}, Elapsed={Elapsed}ms]", events.Length, _eventSender.GetType().Name, sw.ElapsedMilliseconds);
}
catch (Exception ex)
{
sw.Stop();
setEventsAsDequeued = false;
Logger.LogWarning(ex, "{EventCount} event(s) were unable to be sent successfully; will be forwarded (sent/enqueued) to the datatbase outbox for an out-of-process send: {ErrorMessage} [Sender={Sender}, Elapsed={Elapsed}ms]",
events.Length, ex.Message, _eventSender!.GetType().Name, sw.ElapsedMilliseconds);
}
}

sw = Stopwatch.StartNew();
await Database.StoredProcedure(EnqueueStoredProcedure, p => p.Param("@SetEventsAsDequeued", setEventsAsDequeued).AddTableValuedParameter("@EventList", CreateTableValuedParameter(events))).NonQueryAsync().ConfigureAwait(false);
sw.Stop();
Logger.LogDebug("{EventCount} event(s) were enqueued. [Sender={Sender}, SetEventsAsDequeued={SetAsDequeued}, Elapsed={Elapsed}ms]", events.Length, GetType().Name, setEventsAsDequeued, sw.ElapsedMilliseconds);
}

/// <inheritdoc/>
public TableValuedParameter CreateTableValuedParameter(IEnumerable<EventSendData> list)
{
Expand All @@ -77,25 +140,12 @@ public TableValuedParameter CreateTableValuedParameter(IEnumerable<EventSendData
{
var attributes = item.Attributes == null || item.Attributes.Count == 0 ? new BinaryData(Array.Empty<byte>()) : JsonSerializer.Default.SerializeToBinaryData(item.Attributes);
tvp.AddRow(item.Id, item.Destination ?? DefaultDestination ?? throw new InvalidOperationException($"The {nameof(DefaultDestination)} must have a non-null value."),
item.Subject, item.Action, item.Type, item.Source, item.Timestamp, item.CorrelationId, item.TenantId,
item.Subject, item.Action, item.Type, item.Source, item.Timestamp, item.CorrelationId, item.TenantId,
item.PartitionKey ?? DefaultPartitionKey ?? throw new InvalidOperationException($"The {nameof(DefaultPartitionKey)} must have a non-null value."),
item.ETag, attributes.ToArray(), item.Data?.ToArray());
}

return tvp;
}

/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="events"><inheritdoc/></param>
/// <remarks>Executes the <see cref="EnqueueStoredProcedure"/> to <i>send / enqueue</i> the <paramref name="events"/> to the underlying database outbox tables.</remarks>
public async Task SendAsync(params EventSendData[] events)
{
if (events == null || !events.Any())
return;

await Database.StoredProcedure(EnqueueStoredProcedure, p => p.AddTableValuedParameter("@EventList", CreateTableValuedParameter(events))).NonQueryAsync().ConfigureAwait(false);
}
}
}
2 changes: 1 addition & 1 deletion src/DbEx/Templates/SqlServer/EventOutboxDequeue_cs.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace {{NamespaceOutbox}}.Data
public EventOutboxDequeue(IDatabase database, IEventSender eventSender, ILogger<EventOutboxDequeue> logger) : base(database, eventSender, logger) { }

/// <inheritdoc/>
public override string DequeueStoredProcedure => "[{{OutboxSchema}}].[{{OutboxDequeueStoredProcedure}}]";
protected override string DequeueStoredProcedure => "[{{OutboxSchema}}].[{{OutboxDequeueStoredProcedure}}]";
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/DbEx/Templates/SqlServer/EventOutboxEnqueue_cs.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

using DbEx;
using DbEx.SqlServer;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Data;

Expand All @@ -22,13 +23,14 @@ namespace {{NamespaceOutbox}}.Data
/// Initializes a new instance of the <see cref="EventOutboxEnqueue"/> class.
/// </summary>
/// <param name="database">The <see cref="IDatabase"/>.</param>
public EventOutboxEnqueue(IDatabase database) : base(database) { }
/// <param name="logger">The <see cref="ILogger"/>.</param>
public EventOutboxEnqueue(IDatabase database, ILogger<EventOutboxEnqueue> logger) : base(database, logger) { }

/// <inheritdoc/>
public override string DbTvpTypeName => "[{{OutboxSchema}}].[udt{{OutboxTable}}List]";
protected override string DbTvpTypeName => "[{{OutboxSchema}}].[udt{{OutboxTable}}List]";

/// <inheritdoc/>
public override string EnqueueStoredProcedure => "[{{OutboxSchema}}].[{{OutboxEnqueueStoredProcedure}}]";
protected override string EnqueueStoredProcedure => "[{{OutboxSchema}}].[{{OutboxEnqueueStoredProcedure}}]";
}
}

Expand Down
12 changes: 9 additions & 3 deletions src/DbEx/Templates/SqlServer/SpEventOutboxEnqueue_sql.hbs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{{! Copyright (c) Avanade. Licensed under the MIT License. See https://github.com/Avanade/NTangle }}
CREATE PROCEDURE [{{OutboxSchema}}].[sp{{OutboxTable}}Enqueue]
@SetEventsAsDequeued AS BIT = 0,
@EventList AS [{{OutboxSchema}}].[udt{{OutboxTable}}List] READONLY
AS
BEGIN
Expand All @@ -15,9 +16,14 @@ BEGIN

-- Working variables.
DECLARE @{{camel OutboxTable}}Id BIGINT,
@enqueuedDate DATETIME
@enqueuedDate DATETIME,
@dequeuedDate DATETIME

SET @enqueuedDate = SYSUTCDATETIME()
IF (@SetEventsAsDequeued = 1)
BEGIN
SET @dequeuedDate = @enqueuedDate
END

-- Enqueued outbox resultant identifier.
DECLARE @enqueuedId TABLE([{{OutboxTable}}Id] BIGINT)
Expand Down Expand Up @@ -48,9 +54,9 @@ BEGIN
WHILE @@FETCH_STATUS = 0
BEGIN
-- Enqueue event into outbox
INSERT INTO [{{OutboxSchema}}].[{{OutboxTable}}] ([EnqueuedDate], [PartitionKey], [Destination])
INSERT INTO [{{OutboxSchema}}].[{{OutboxTable}}] ([EnqueuedDate], [PartitionKey], [Destination], [DequeuedDate])
OUTPUT inserted.{{OutboxTable}}Id INTO @enqueuedId
VALUES (@enqueuedDate, @partitionKey, @destination)
VALUES (@enqueuedDate, @partitionKey, @destination, @dequeuedDate)

SELECT @{{camel OutboxTable}}Id = [{{OutboxTable}}Id] FROM @enqueuedId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public sealed class EventOutboxDequeue : EventOutboxDequeueBase
public EventOutboxDequeue(IDatabase database, IEventSender eventSender, ILogger<EventOutboxDequeue> logger) : base(database, eventSender, logger) { }

/// <inheritdoc/>
public override string DequeueStoredProcedure => "[Outbox].[spEventOutboxDequeue]";
protected override string DequeueStoredProcedure => "[Outbox].[spEventOutboxDequeue]";
}
}

Expand Down
Loading

0 comments on commit 2505ac5

Please sign in to comment.