From 2505ac5a44be87433d99626cccada2cc4f37ad15 Mon Sep 17 00:00:00 2001 From: "Eric Sibly [chullybun]" Date: Mon, 28 Mar 2022 19:21:22 -0700 Subject: [PATCH] In-process event send option introduction. (#11) --- CHANGELOG.md | 3 + DbEx.sln | 2 +- README.md | 8 ++ src/DbEx/Console/MigratorConsoleBase.cs | 32 +++---- src/DbEx/Console/SqlServerMigratorConsole.cs | 4 +- src/DbEx/DbEx.csproj | 2 +- src/DbEx/SqlServer/EventOutboxDequeueBase.cs | 10 +-- src/DbEx/SqlServer/EventOutboxEnqueueBase.cs | 88 +++++++++++++++---- .../SqlServer/EventOutboxDequeue_cs.hbs | 2 +- .../SqlServer/EventOutboxEnqueue_cs.hbs | 8 +- .../SqlServer/SpEventOutboxEnqueue_sql.hbs | 12 ++- .../Generated/EventOutboxDequeue.cs | 2 +- .../Generated/EventOutboxEnqueue.cs | 8 +- .../Generated/spEventOutboxEnqueue.sql | 12 ++- tests/DbEx.Test/SqlServerMigratorTest.cs | 2 +- tests/DbEx.Test/SqlServerOutboxTest.cs | 61 ++++++++++++- 16 files changed, 194 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 61f3b4d..6948ed5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ Represents the **NuGet** versions. +## v1.0.5 +- *Enhancement:* The `EventOutboxEnqueueBase` is the SQL Server event outbox enqueue `IEventSender`. To minimize send latency (increasing real-time event delivery) a primary (alternate) `IEventSender` can be specified (`SetPrimaryEventSender`). This changes the event behaviour whereby the events will be sent via the specified primary first, then enqueued only where the primary fails. The events will still be written to the event outbox but as sent for audit purposes. + ## v1.0.4 - *Enhancement:* Integrated SQL Server-based `EventOutbox` code-generation (both database and C#) into _DbEx_ to enable re-use and consistency. diff --git a/DbEx.sln b/DbEx.sln index 58ec455..b209b88 100644 --- a/DbEx.sln +++ b/DbEx.sln @@ -27,7 +27,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DbEx.Test.Console", "tests\DbEx.Test.Console\DbEx.Test.Console.csproj", "{117B6E86-2C88-446E-AC3A-AE1A2E84E2D8}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DbEx.Test.OutboxConsole", "tests\DbEx.Test.OutboxConsole\DbEx.Test.OutboxConsole.csproj", "{959DD5E1-530A-42BA-82B8-F17A657AC351}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DbEx.Test.OutboxConsole", "tests\DbEx.Test.OutboxConsole\DbEx.Test.OutboxConsole.csproj", "{959DD5E1-530A-42BA-82B8-F17A657AC351}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/README.md b/README.md index dd730e1..cf144b4 100644 --- a/README.md +++ b/README.md @@ -322,6 +322,14 @@ Template | Description | Example
+### Event Outbox Enqueue + +The [`EventOutboxDequeueBase`](./src/DbEx/SqlServer/EventOutboxEnqueueBase.cs) provides the base [`IEventSender`](https://github.com/Avanade/CoreEx/blob/main/src/CoreEx/Events/IEventSender.cs) send/enqueue capabilities. + +By default the events are first sent/enqueued to the datatbase outbox, then a secondary out-of-process dequeues and sends. This can however introduce unwanted latency depending on the frequency in which the secondary process performs the dequeue and send, as this is essentially a polling-based operation. To improve (minimize) latency, the primary `IEventSender` can be specified using the `SetPrimaryEventSender` method. This will then be used to send the events immediately, and where successful, they will be audited in the database as dequeued event(s); versus on error (as a backup), where they will be enqueued for the out-of-process dequeue and send (as per default). + +
+ ## Other repos These other _Avanade_ repositories leverage _DbEx_: diff --git a/src/DbEx/Console/MigratorConsoleBase.cs b/src/DbEx/Console/MigratorConsoleBase.cs index 2da024b..4fea2f9 100644 --- a/src/DbEx/Console/MigratorConsoleBase.cs +++ b/src/DbEx/Console/MigratorConsoleBase.cs @@ -187,8 +187,8 @@ public async Task RunAsync(string[] args) } catch (CommandParsingException cpex) { - Args.Logger?.LogError(cpex.Message); - Args.Logger?.LogError(string.Empty); + Args.Logger?.LogError("{Content}", cpex.Message); + Args.Logger?.LogError("{Content}", string.Empty); return 1; } } @@ -281,11 +281,11 @@ private async Task RunRunawayAsync() /* Method name inspired by: Slade - Ru { if (gcex.Message != null) { - Args.Logger?.LogError(gcex.Message); + Args.Logger?.LogError("{Content}", gcex.Message); if (gcex.InnerException != null) - Args.Logger?.LogError(gcex.InnerException.Message); + Args.Logger?.LogError("{Content}", gcex.InnerException.Message); - Args.Logger?.LogError(string.Empty); + Args.Logger?.LogError("{Content}", string.Empty); } return 2; @@ -304,7 +304,7 @@ private async Task RunRunawayAsync() /* Method name inspired by: Slade - Ru protected virtual void OnWriteMasthead() { if (MastheadText != null) - Logger?.LogInformation(MastheadText); + Logger?.LogInformation("{Content}", MastheadText); } /// @@ -313,8 +313,8 @@ protected virtual void OnWriteMasthead() /// Writes the . protected virtual void OnWriteHeader() { - Logger?.LogInformation(AppTitle); - Logger?.LogInformation(string.Empty); + Logger?.LogInformation("{Content}", AppTitle); + Logger?.LogInformation("{Content}", string.Empty); } /// @@ -332,13 +332,13 @@ public static void WriteStandardizedArgs(MigratorConsoleArgs args) if (args == null || args.Logger == null) return; - args.Logger.LogInformation($"Command = {args.MigrationCommand}"); - args.Logger.LogInformation($"SchemaOrder = {string.Join(", ", args.SchemaOrder.ToArray())}"); - args.Logger.LogInformation($"OutDir = {args.OutputDirectory?.FullName}"); - args.Logger.LogInformation($"Assemblies{(args.Assemblies.Count == 0 ? " = none" : ":")}"); + args.Logger.LogInformation("{Content}", $"Command = {args.MigrationCommand}"); + args.Logger.LogInformation("{Content}", $"SchemaOrder = {string.Join(", ", args.SchemaOrder.ToArray())}"); + args.Logger.LogInformation("{Content}", $"OutDir = {args.OutputDirectory?.FullName}"); + args.Logger.LogInformation("{Content}", $"Assemblies{(args.Assemblies.Count == 0 ? " = none" : ":")}"); foreach (var a in args.Assemblies) { - args.Logger.LogInformation($" {a.FullName}"); + args.Logger.LogInformation("{Content}", $" {a.FullName}"); } } @@ -348,9 +348,9 @@ public static void WriteStandardizedArgs(MigratorConsoleArgs args) /// The elapsed execution time in milliseconds. protected virtual void OnWriteFooter(long elapsedMilliseconds) { - Logger?.LogInformation(string.Empty); - Logger?.LogInformation($"{AppName} Complete. [{elapsedMilliseconds}ms]"); - Logger?.LogInformation(string.Empty); + Logger?.LogInformation("{Content}", string.Empty); + Logger?.LogInformation("{Content}", $"{AppName} Complete. [{elapsedMilliseconds}ms]"); + Logger?.LogInformation("{Content}", string.Empty); } } } \ No newline at end of file diff --git a/src/DbEx/Console/SqlServerMigratorConsole.cs b/src/DbEx/Console/SqlServerMigratorConsole.cs index 8b1e6ca..ca8c50f 100644 --- a/src/DbEx/Console/SqlServerMigratorConsole.cs +++ b/src/DbEx/Console/SqlServerMigratorConsole.cs @@ -58,8 +58,8 @@ protected override async Task OnMigrateAsync() if (!await migrator.MigrateAsync().ConfigureAwait(false)) return false; - Logger?.LogInformation(string.Empty); - Logger?.LogInformation(new string('-', 80)); + Logger?.LogInformation("{Content}", string.Empty); + Logger?.LogInformation("{Content}", new string('-', 80)); return true; } diff --git a/src/DbEx/DbEx.csproj b/src/DbEx/DbEx.csproj index e0ccd42..75394a9 100644 --- a/src/DbEx/DbEx.csproj +++ b/src/DbEx/DbEx.csproj @@ -3,7 +3,7 @@ netstandard2.1 DbEx - 1.0.4 + 1.0.5 true DbEx Developers Avanade diff --git a/src/DbEx/SqlServer/EventOutboxDequeueBase.cs b/src/DbEx/SqlServer/EventOutboxDequeueBase.cs index 6af5956..a288ad5 100644 --- a/src/DbEx/SqlServer/EventOutboxDequeueBase.cs +++ b/src/DbEx/SqlServer/EventOutboxDequeueBase.cs @@ -34,28 +34,28 @@ public EventOutboxDequeueBase(IDatabase database, IEventSender eventSender, ILog /// /// Gets the . /// - public IDatabase Database { get; } + protected IDatabase Database { get; } /// /// Gets the . /// - public IEventSender EventSender { get; } + protected IEventSender EventSender { get; } /// /// Gets the . /// - public ILogger Logger { get; } + protected ILogger Logger { get; } /// /// Gets the event outbox dequeue stored procedure name. /// - public abstract string DequeueStoredProcedure { get; } + protected abstract string DequeueStoredProcedure { get; } /// /// Gets the column name for the property within the event outbox table. /// /// Defaults to 'EventId'. - public virtual string EventIdColumnName => "EventId"; + protected virtual string EventIdColumnName => "EventId"; /// /// Gets or sets the default partition key. diff --git a/src/DbEx/SqlServer/EventOutboxEnqueueBase.cs b/src/DbEx/SqlServer/EventOutboxEnqueueBase.cs index c761c4f..cf3221f 100644 --- a/src/DbEx/SqlServer/EventOutboxEnqueueBase.cs +++ b/src/DbEx/SqlServer/EventOutboxEnqueueBase.cs @@ -2,9 +2,11 @@ using CoreEx.Events; using CoreEx.Json; +using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Data; +using System.Diagnostics; using System.Linq; using System.Threading.Tasks; @@ -13,34 +15,49 @@ namespace DbEx.SqlServer /// /// Provides the base database outbox enqueue . /// + /// By default the events are first sent/enqueued to the datatbase outbox, then a secondary out-of-process dequeues and sends. This can however introduce unwanted latency depending on the frequency in which the secondary process + /// performs the dequeue and send, as this is essentially a polling-based operation. To improve (minimize) latency, the primary can be specified using . This will + /// then be used to send the events immediately, and where successful, they will be audited in the database as dequeued event(s); versus on error (as a backup), where they will be enqueued for the out-of-process dequeue and send (as per default). public abstract class EventOutboxEnqueueBase : IEventSender { + private IEventSender? _eventSender; + /// /// Initializes a new instance of the class. /// /// The . - public EventOutboxEnqueueBase(IDatabase database) => Database = database ?? throw new ArgumentNullException(nameof(database)); + /// The . + public EventOutboxEnqueueBase(IDatabase database, ILogger logger) + { + Database = database ?? throw new ArgumentNullException(nameof(database)); + Logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } /// /// Gets the . /// - public IDatabase Database { get; } + protected IDatabase Database { get; } + + /// + /// Gets the . + /// + protected ILogger Logger { get; } /// /// Gets the database type name for the . /// - public abstract string DbTvpTypeName { get; } + protected abstract string DbTvpTypeName { get; } /// /// Gets the event outbox enqueue stored procedure name. /// - public abstract string EnqueueStoredProcedure { get; } + protected abstract string EnqueueStoredProcedure { get; } /// /// Gets the column name for the property within the event outbox table. /// /// Defaults to 'EventId'. - public virtual string EventIdColumnName => "EventId"; + protected virtual string EventIdColumnName => "EventId"; /// /// Gets or sets the default partition key. @@ -54,6 +71,52 @@ public abstract class EventOutboxEnqueueBase : IEventSender /// Defaults to '$default'. This will ensure that there is always a value recorded in the database. public string DefaultDestination { get; set; } = "$default"; + /// + /// Sets the optional to act as the primary where outbox enqueue is to provide backup/audit capabilities. + /// + public void SetPrimaryEventSender(IEventSender eventSender) + { + if (eventSender != null & eventSender is EventOutboxEnqueueBase) + throw new ArgumentException($"{nameof(SetPrimaryEventSender)} value must not be of Type {nameof(EventOutboxEnqueueBase)}.", nameof(eventSender)); + + _eventSender = eventSender; + } + + /// + /// + /// + /// + /// Executes the to send / enqueue the to the underlying database outbox tables. + public async Task SendAsync(params EventSendData[] events) + { + if (events == null || !events.Any()) + return; + + Stopwatch sw = Stopwatch.StartNew(); + var setEventsAsDequeued = _eventSender != null; + if (setEventsAsDequeued) + { + try + { + await _eventSender!.SendAsync(events).ConfigureAwait(false); + sw.Stop(); + Logger.LogDebug("{EventCount} event(s) were sent successfully. [Sender={Sender}, Elapsed={Elapsed}ms]", events.Length, _eventSender.GetType().Name, sw.ElapsedMilliseconds); + } + catch (Exception ex) + { + sw.Stop(); + setEventsAsDequeued = false; + Logger.LogWarning(ex, "{EventCount} event(s) were unable to be sent successfully; will be forwarded (sent/enqueued) to the datatbase outbox for an out-of-process send: {ErrorMessage} [Sender={Sender}, Elapsed={Elapsed}ms]", + events.Length, ex.Message, _eventSender!.GetType().Name, sw.ElapsedMilliseconds); + } + } + + sw = Stopwatch.StartNew(); + await Database.StoredProcedure(EnqueueStoredProcedure, p => p.Param("@SetEventsAsDequeued", setEventsAsDequeued).AddTableValuedParameter("@EventList", CreateTableValuedParameter(events))).NonQueryAsync().ConfigureAwait(false); + sw.Stop(); + Logger.LogDebug("{EventCount} event(s) were enqueued. [Sender={Sender}, SetEventsAsDequeued={SetAsDequeued}, Elapsed={Elapsed}ms]", events.Length, GetType().Name, setEventsAsDequeued, sw.ElapsedMilliseconds); + } + /// public TableValuedParameter CreateTableValuedParameter(IEnumerable list) { @@ -77,25 +140,12 @@ public TableValuedParameter CreateTableValuedParameter(IEnumerable()) : JsonSerializer.Default.SerializeToBinaryData(item.Attributes); tvp.AddRow(item.Id, item.Destination ?? DefaultDestination ?? throw new InvalidOperationException($"The {nameof(DefaultDestination)} must have a non-null value."), - item.Subject, item.Action, item.Type, item.Source, item.Timestamp, item.CorrelationId, item.TenantId, + item.Subject, item.Action, item.Type, item.Source, item.Timestamp, item.CorrelationId, item.TenantId, item.PartitionKey ?? DefaultPartitionKey ?? throw new InvalidOperationException($"The {nameof(DefaultPartitionKey)} must have a non-null value."), item.ETag, attributes.ToArray(), item.Data?.ToArray()); } return tvp; } - - /// - /// - /// - /// - /// Executes the to send / enqueue the to the underlying database outbox tables. - public async Task SendAsync(params EventSendData[] events) - { - if (events == null || !events.Any()) - return; - - await Database.StoredProcedure(EnqueueStoredProcedure, p => p.AddTableValuedParameter("@EventList", CreateTableValuedParameter(events))).NonQueryAsync().ConfigureAwait(false); - } } } \ No newline at end of file diff --git a/src/DbEx/Templates/SqlServer/EventOutboxDequeue_cs.hbs b/src/DbEx/Templates/SqlServer/EventOutboxDequeue_cs.hbs index 518436b..4e4277c 100644 --- a/src/DbEx/Templates/SqlServer/EventOutboxDequeue_cs.hbs +++ b/src/DbEx/Templates/SqlServer/EventOutboxDequeue_cs.hbs @@ -29,7 +29,7 @@ namespace {{NamespaceOutbox}}.Data public EventOutboxDequeue(IDatabase database, IEventSender eventSender, ILogger logger) : base(database, eventSender, logger) { } /// - public override string DequeueStoredProcedure => "[{{OutboxSchema}}].[{{OutboxDequeueStoredProcedure}}]"; + protected override string DequeueStoredProcedure => "[{{OutboxSchema}}].[{{OutboxDequeueStoredProcedure}}]"; } } diff --git a/src/DbEx/Templates/SqlServer/EventOutboxEnqueue_cs.hbs b/src/DbEx/Templates/SqlServer/EventOutboxEnqueue_cs.hbs index a04000f..92c05b1 100644 --- a/src/DbEx/Templates/SqlServer/EventOutboxEnqueue_cs.hbs +++ b/src/DbEx/Templates/SqlServer/EventOutboxEnqueue_cs.hbs @@ -8,6 +8,7 @@ using DbEx; using DbEx.SqlServer; +using Microsoft.Extensions.Logging; using System.Collections.Generic; using System.Data; @@ -22,13 +23,14 @@ namespace {{NamespaceOutbox}}.Data /// Initializes a new instance of the class. /// /// The . - public EventOutboxEnqueue(IDatabase database) : base(database) { } + /// The . + public EventOutboxEnqueue(IDatabase database, ILogger logger) : base(database, logger) { } /// - public override string DbTvpTypeName => "[{{OutboxSchema}}].[udt{{OutboxTable}}List]"; + protected override string DbTvpTypeName => "[{{OutboxSchema}}].[udt{{OutboxTable}}List]"; /// - public override string EnqueueStoredProcedure => "[{{OutboxSchema}}].[{{OutboxEnqueueStoredProcedure}}]"; + protected override string EnqueueStoredProcedure => "[{{OutboxSchema}}].[{{OutboxEnqueueStoredProcedure}}]"; } } diff --git a/src/DbEx/Templates/SqlServer/SpEventOutboxEnqueue_sql.hbs b/src/DbEx/Templates/SqlServer/SpEventOutboxEnqueue_sql.hbs index e3e3d72..dbd6b21 100644 --- a/src/DbEx/Templates/SqlServer/SpEventOutboxEnqueue_sql.hbs +++ b/src/DbEx/Templates/SqlServer/SpEventOutboxEnqueue_sql.hbs @@ -1,5 +1,6 @@ {{! Copyright (c) Avanade. Licensed under the MIT License. See https://github.com/Avanade/NTangle }} CREATE PROCEDURE [{{OutboxSchema}}].[sp{{OutboxTable}}Enqueue] + @SetEventsAsDequeued AS BIT = 0, @EventList AS [{{OutboxSchema}}].[udt{{OutboxTable}}List] READONLY AS BEGIN @@ -15,9 +16,14 @@ BEGIN -- Working variables. DECLARE @{{camel OutboxTable}}Id BIGINT, - @enqueuedDate DATETIME + @enqueuedDate DATETIME, + @dequeuedDate DATETIME SET @enqueuedDate = SYSUTCDATETIME() + IF (@SetEventsAsDequeued = 1) + BEGIN + SET @dequeuedDate = @enqueuedDate + END -- Enqueued outbox resultant identifier. DECLARE @enqueuedId TABLE([{{OutboxTable}}Id] BIGINT) @@ -48,9 +54,9 @@ BEGIN WHILE @@FETCH_STATUS = 0 BEGIN -- Enqueue event into outbox - INSERT INTO [{{OutboxSchema}}].[{{OutboxTable}}] ([EnqueuedDate], [PartitionKey], [Destination]) + INSERT INTO [{{OutboxSchema}}].[{{OutboxTable}}] ([EnqueuedDate], [PartitionKey], [Destination], [DequeuedDate]) OUTPUT inserted.{{OutboxTable}}Id INTO @enqueuedId - VALUES (@enqueuedDate, @partitionKey, @destination) + VALUES (@enqueuedDate, @partitionKey, @destination, @dequeuedDate) SELECT @{{camel OutboxTable}}Id = [{{OutboxTable}}Id] FROM @enqueuedId diff --git a/tests/DbEx.Test.OutboxConsole/Generated/EventOutboxDequeue.cs b/tests/DbEx.Test.OutboxConsole/Generated/EventOutboxDequeue.cs index d1da780..05337ec 100644 --- a/tests/DbEx.Test.OutboxConsole/Generated/EventOutboxDequeue.cs +++ b/tests/DbEx.Test.OutboxConsole/Generated/EventOutboxDequeue.cs @@ -28,7 +28,7 @@ public sealed class EventOutboxDequeue : EventOutboxDequeueBase public EventOutboxDequeue(IDatabase database, IEventSender eventSender, ILogger logger) : base(database, eventSender, logger) { } /// - public override string DequeueStoredProcedure => "[Outbox].[spEventOutboxDequeue]"; + protected override string DequeueStoredProcedure => "[Outbox].[spEventOutboxDequeue]"; } } diff --git a/tests/DbEx.Test.OutboxConsole/Generated/EventOutboxEnqueue.cs b/tests/DbEx.Test.OutboxConsole/Generated/EventOutboxEnqueue.cs index 25add2c..ddd15ac 100644 --- a/tests/DbEx.Test.OutboxConsole/Generated/EventOutboxEnqueue.cs +++ b/tests/DbEx.Test.OutboxConsole/Generated/EventOutboxEnqueue.cs @@ -7,6 +7,7 @@ using DbEx; using DbEx.SqlServer; +using Microsoft.Extensions.Logging; using System.Collections.Generic; using System.Data; @@ -21,13 +22,14 @@ public sealed class EventOutboxEnqueue : EventOutboxEnqueueBase /// Initializes a new instance of the class. /// /// The . - public EventOutboxEnqueue(IDatabase database) : base(database) { } + /// The . + public EventOutboxEnqueue(IDatabase database, ILogger logger) : base(database, logger) { } /// - public override string DbTvpTypeName => "[Outbox].[udtEventOutboxList]"; + protected override string DbTvpTypeName => "[Outbox].[udtEventOutboxList]"; /// - public override string EnqueueStoredProcedure => "[Outbox].[spEventOutboxEnqueue]"; + protected override string EnqueueStoredProcedure => "[Outbox].[spEventOutboxEnqueue]"; } } diff --git a/tests/DbEx.Test.OutboxConsole/Schema/Outbox/Stored Procedures/Generated/spEventOutboxEnqueue.sql b/tests/DbEx.Test.OutboxConsole/Schema/Outbox/Stored Procedures/Generated/spEventOutboxEnqueue.sql index 0bf8a31..c4b6602 100644 --- a/tests/DbEx.Test.OutboxConsole/Schema/Outbox/Stored Procedures/Generated/spEventOutboxEnqueue.sql +++ b/tests/DbEx.Test.OutboxConsole/Schema/Outbox/Stored Procedures/Generated/spEventOutboxEnqueue.sql @@ -1,4 +1,5 @@ CREATE PROCEDURE [Outbox].[spEventOutboxEnqueue] + @SetEventsAsDequeued AS BIT = 0, @EventList AS [Outbox].[udtEventOutboxList] READONLY AS BEGIN @@ -14,9 +15,14 @@ BEGIN -- Working variables. DECLARE @eventOutboxId BIGINT, - @enqueuedDate DATETIME + @enqueuedDate DATETIME, + @dequeuedDate DATETIME SET @enqueuedDate = SYSUTCDATETIME() + IF (@SetEventsAsDequeued = 1) + BEGIN + SET @dequeuedDate = @enqueuedDate + END -- Enqueued outbox resultant identifier. DECLARE @enqueuedId TABLE([EventOutboxId] BIGINT) @@ -47,9 +53,9 @@ BEGIN WHILE @@FETCH_STATUS = 0 BEGIN -- Enqueue event into outbox - INSERT INTO [Outbox].[EventOutbox] ([EnqueuedDate], [PartitionKey], [Destination]) + INSERT INTO [Outbox].[EventOutbox] ([EnqueuedDate], [PartitionKey], [Destination], [DequeuedDate]) OUTPUT inserted.EventOutboxId INTO @enqueuedId - VALUES (@enqueuedDate, @partitionKey, @destination) + VALUES (@enqueuedDate, @partitionKey, @destination, @dequeuedDate) SELECT @eventOutboxId = [EventOutboxId] FROM @enqueuedId diff --git a/tests/DbEx.Test/SqlServerMigratorTest.cs b/tests/DbEx.Test/SqlServerMigratorTest.cs index c8620a6..7a4979f 100644 --- a/tests/DbEx.Test/SqlServerMigratorTest.cs +++ b/tests/DbEx.Test/SqlServerMigratorTest.cs @@ -122,7 +122,7 @@ public async Task A120_MigrateAll_Console() Assert.IsNull(row.GenderId); } - private async Task<(string cs, ILogger l, SqlServerMigrator m)> CreateConsoleDb() + private static async Task<(string cs, ILogger l, SqlServerMigrator m)> CreateConsoleDb() { var cs = UnitTest.GetConfig("DbEx_").GetConnectionString("ConsoleDb"); var l = UnitTest.GetLogger(); diff --git a/tests/DbEx.Test/SqlServerOutboxTest.cs b/tests/DbEx.Test/SqlServerOutboxTest.cs index 7cc4bb9..114ec2d 100644 --- a/tests/DbEx.Test/SqlServerOutboxTest.cs +++ b/tests/DbEx.Test/SqlServerOutboxTest.cs @@ -48,7 +48,7 @@ public async Task A100_EnqueueDequeue() Attributes = new Dictionary { { "key", "value" } } }; - var eoe = new EventOutboxEnqueue(db); + var eoe = new EventOutboxEnqueue(db, UnitTest.GetLogger()); await eoe.SendAsync(esd).ConfigureAwait(false); var ims = new InMemorySender(); @@ -88,7 +88,7 @@ public async Task A110_EnqueueDequeue_MaxDequeueSize() using var db = new Database(() => new SqlConnection(cs)); await db.SqlStatement("DELETE FROM [Outbox].[EventOutbox]").NonQueryAsync().ConfigureAwait(false); - var eoe = new EventOutboxEnqueue(db); + var eoe = new EventOutboxEnqueue(db, UnitTest.GetLogger()); await eoe.SendAsync( new EventSendData { Id = "1", PartitionKey = null, Destination = null }, new EventSendData { Id = "2", PartitionKey = "apples", Destination = null }, @@ -126,7 +126,7 @@ public async Task A120_EnqueueDequeue_PartitionKey_Destination_Selection() using var db = new Database(() => new SqlConnection(cs)); await db.SqlStatement("DELETE FROM [Outbox].[EventOutbox]").NonQueryAsync().ConfigureAwait(false); - var eoe = new EventOutboxEnqueue(db); + var eoe = new EventOutboxEnqueue(db, UnitTest.GetLogger()); await eoe.SendAsync( new EventSendData { Id = "1", PartitionKey = null, Destination = null }, new EventSendData { Id = "2", PartitionKey = "apples", Destination = null }, @@ -169,5 +169,60 @@ await eoe.SendAsync( Assert.AreEqual(1, events.Length); Assert.AreEqual("1", events[0].Id); } + + [Test] + public async Task B100_EnqueueDequeue_PrimarySender_Success() + { + var cs = UnitTest.GetConfig("DbEx_").GetConnectionString("ConsoleDb"); + var l = UnitTest.GetLogger(); + + using var db = new Database(() => new SqlConnection(cs)); + await db.SqlStatement("DELETE FROM [Outbox].[EventOutbox]").NonQueryAsync().ConfigureAwait(false); + + var eoe = new EventOutboxEnqueue(db, UnitTest.GetLogger()); + var pims = new InMemorySender(); + eoe.SetPrimaryEventSender(pims); + await eoe.SendAsync(new EventSendData { Id = "1", PartitionKey = null, Destination = null }).ConfigureAwait(false); + + var events = pims.GetEvents(); + Assert.AreEqual(1, events.Length); + Assert.AreEqual("1", events[0].Id); + + var ims = new InMemorySender(); + var eod = new EventOutboxDequeue(db, ims, UnitTest.GetLogger()); + + // Should have updated as dequeued already; therefore, nothing to dequeue. + Assert.AreEqual(0, await eod.DequeueAndSendAsync(10, null, null)); + events = ims.GetEvents(); + Assert.AreEqual(0, events.Length); + } + + [Test] + public async Task B110_EnqueueDequeue_PrimarySender_Error() + { + var cs = UnitTest.GetConfig("DbEx_").GetConnectionString("ConsoleDb"); + var l = UnitTest.GetLogger(); + + using var db = new Database(() => new SqlConnection(cs)); + await db.SqlStatement("DELETE FROM [Outbox].[EventOutbox]").NonQueryAsync().ConfigureAwait(false); + + var eoe = new EventOutboxEnqueue(db, UnitTest.GetLogger()); + eoe.SetPrimaryEventSender(new TestSender()); + await eoe.SendAsync(new EventSendData { Id = "1", PartitionKey = null, Destination = null }).ConfigureAwait(false); + + var ims = new InMemorySender(); + var eod = new EventOutboxDequeue(db, ims, UnitTest.GetLogger()); + + // Should have updated as enqueued; therefore, need to be dequeued. + Assert.AreEqual(1, await eod.DequeueAndSendAsync(10, null, null)); + var events = ims.GetEvents(); + Assert.AreEqual(1, events.Length); + Assert.AreEqual("1", events[0].Id); + } + + private class TestSender : IEventSender + { + public Task SendAsync(params EventSendData[] events) => throw new NotImplementedException(); + } } } \ No newline at end of file