diff --git a/CHANGELOG.md b/CHANGELOG.md index 33cabf3..39493c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,15 @@ Represents the **NuGet** versions. +## v1.0.9 +- *Enhancement:* Updated the `EventOoutboxEnqueueBase` to handle new `EventSendException` and enqueue each individual message as either sent or unsent within the outbox. +- *Fixed:* Updated to `CoreEx` version `1.0.5`. + ## v1.0.8 - *Fixed:* Updated to `CoreEx` version `1.0.3`. ## v1.0.7 -- *Fixed:* Previous version v1.0.6 fix was incorrect; Data import order should not have been reversed. This preivous change has been corrected. +- *Fixed:* Previous version `1.0.6` fix was incorrect; Data import order should not have been reversed. This previous change has been corrected. ## v1.0.6 - *Fixed:* [Issue 12](https://github.com/Avanade/DbEx/issues/12) fixed. Data import order has been reversed. diff --git a/src/DbEx/Console/MigratorConsoleBase.cs b/src/DbEx/Console/MigratorConsoleBase.cs index 4fea2f9..62409e9 100644 --- a/src/DbEx/Console/MigratorConsoleBase.cs +++ b/src/DbEx/Console/MigratorConsoleBase.cs @@ -22,7 +22,7 @@ namespace DbEx.Console /// /// The standard console command-line arguments/options can be controlled via the constructor using the flags. Additional capabilities can be added by inherting and overridding the /// , and . Changes to the console output can be achieved by overridding - /// , , and . + /// , , and . /// The underlying command line parsing is provided by . public abstract class MigratorConsoleBase { @@ -58,7 +58,7 @@ public abstract class MigratorConsoleBase protected ILogger? Logger => Args.Logger; /// - /// Indicates whether to bypass standard execution of , , and . + /// Indicates whether to bypass standard execution of , , and . /// protected bool BypassOnWrites { get; set; } @@ -273,7 +273,7 @@ private async Task RunRunawayAsync() /* Method name inspired by: Slade - Ru // Write footer and exit successfully. sw.Stop(); if (!BypassOnWrites) - OnWriteFooter(sw.ElapsedMilliseconds); + OnWriteFooter(sw.Elapsed.TotalMilliseconds); return 0; } @@ -345,11 +345,11 @@ public static void WriteStandardizedArgs(MigratorConsoleArgs args) /// /// Invoked to write the footer information to the . /// - /// The elapsed execution time in milliseconds. - protected virtual void OnWriteFooter(long elapsedMilliseconds) + /// The elapsed execution time in milliseconds. + protected virtual void OnWriteFooter(double totalMilliseconds) { Logger?.LogInformation("{Content}", string.Empty); - Logger?.LogInformation("{Content}", $"{AppName} Complete. [{elapsedMilliseconds}ms]"); + Logger?.LogInformation("{Content}", $"{AppName} Complete. [{totalMilliseconds}ms]"); Logger?.LogInformation("{Content}", string.Empty); } } diff --git a/src/DbEx/DbEx.csproj b/src/DbEx/DbEx.csproj index 81310d9..d0bd2d0 100644 --- a/src/DbEx/DbEx.csproj +++ b/src/DbEx/DbEx.csproj @@ -3,7 +3,7 @@ netstandard2.1 DbEx - 1.0.8 + 1.0.9 true DbEx Developers Avanade @@ -89,7 +89,7 @@ - + diff --git a/src/DbEx/Migration/DatabaseMigratorBase.cs b/src/DbEx/Migration/DatabaseMigratorBase.cs index 11482a4..b5f3501 100644 --- a/src/DbEx/Migration/DatabaseMigratorBase.cs +++ b/src/DbEx/Migration/DatabaseMigratorBase.cs @@ -208,7 +208,7 @@ protected async Task CommandExecuteAsync(string title, Func> ac sw.Stop(); Logger.LogInformation("{Content}", string.Empty); - Logger.LogInformation("{Content}", $"Complete. [{sw.ElapsedMilliseconds}ms{summary?.Invoke() ?? string.Empty}]"); + Logger.LogInformation("{Content}", $"Complete. [{sw.Elapsed.TotalMilliseconds}ms{summary?.Invoke() ?? string.Empty}]"); return true; } catch (Exception ex) diff --git a/src/DbEx/Migration/SqlServer/SqlServerMigrator.cs b/src/DbEx/Migration/SqlServer/SqlServerMigrator.cs index d4e16fd..fb6acd4 100644 --- a/src/DbEx/Migration/SqlServer/SqlServerMigrator.cs +++ b/src/DbEx/Migration/SqlServer/SqlServerMigrator.cs @@ -68,7 +68,7 @@ protected override async Task DatabaseSchemaAsync(List DatabaseDataAsync(IDatabase database, List().ConfigureAwait(false); - Logger.LogInformation($"Result: {rows} rows affected."); + Logger.LogInformation("{Message}", $"Result: {rows} rows affected."); } return true; @@ -154,7 +154,7 @@ public override async Task ExecuteScriptsAsync(IEnumerabl if (dur.Successful || dur.ErrorScript != null || i >= MaxRetries) return dur; - Logger.LogWarning($" Possible transient error (will try again in 500ms): {dur.Error.Message}"); + Logger.LogWarning("{Message}", $" Possible transient error (will try again in 500ms): {dur.Error.Message}"); await Task.Delay(500).ConfigureAwait(false); } } diff --git a/src/DbEx/SqlServer/EventOutboxDequeueBase.cs b/src/DbEx/SqlServer/EventOutboxDequeueBase.cs index a288ad5..4b1a84f 100644 --- a/src/DbEx/SqlServer/EventOutboxDequeueBase.cs +++ b/src/DbEx/SqlServer/EventOutboxDequeueBase.cs @@ -14,7 +14,7 @@ namespace DbEx.SqlServer { /// - /// Provides the base database outbox dequeue and corresponding . + /// Provides the base database outbox dequeue and corresponding . /// public abstract class EventOutboxDequeueBase : IDatabaseMapper { @@ -105,13 +105,13 @@ public async Task DequeueAndSendAsync(int maxDequeueSize = 50, string? part return 0; } - Logger.LogInformation("{EventCount} event(s) were dequeued. [Elapsed={Elapsed}ms]", events.Count(), sw.ElapsedMilliseconds); + Logger.LogInformation("{EventCount} event(s) were dequeued. [Elapsed={Elapsed}ms]", events.Count(), sw.Elapsed.TotalMilliseconds); // Send the events. sw = Stopwatch.StartNew(); - await EventSender.SendAsync(events.ToArray()).ConfigureAwait(false); + await EventSender.SendAsync(events.ToArray(), cancellationToken).ConfigureAwait(false); sw.Stop(); - Logger.LogInformation("{EventCount} event(s) were sent successfully. [Sender={Sender}, Elapsed={Elapsed}ms]", events.Count(), EventSender.GetType().Name, sw.ElapsedMilliseconds); + Logger.LogInformation("{EventCount} event(s) were sent successfully. [Sender={Sender}, Elapsed={Elapsed}ms]", events.Count(), EventSender.GetType().Name, sw.Elapsed.TotalMilliseconds); // Commit the transaction. txn.Complete(); diff --git a/src/DbEx/SqlServer/EventOutboxEnqueueBase.cs b/src/DbEx/SqlServer/EventOutboxEnqueueBase.cs index cf3221f..c7fd5d6 100644 --- a/src/DbEx/SqlServer/EventOutboxEnqueueBase.cs +++ b/src/DbEx/SqlServer/EventOutboxEnqueueBase.cs @@ -8,12 +8,13 @@ using System.Data; using System.Diagnostics; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace DbEx.SqlServer { /// - /// Provides the base database outbox enqueue . + /// Provides the base database outbox enqueue . /// /// By default the events are first sent/enqueued to the datatbase outbox, then a secondary out-of-process dequeues and sends. This can however introduce unwanted latency depending on the frequency in which the secondary process /// performs the dequeue and send, as this is essentially a polling-based operation. To improve (minimize) latency, the primary can be specified using . This will @@ -86,42 +87,58 @@ public void SetPrimaryEventSender(IEventSender eventSender) /// /// /// + /// /// Executes the to send / enqueue the to the underlying database outbox tables. - public async Task SendAsync(params EventSendData[] events) + public async Task SendAsync(IEnumerable events, CancellationToken cancellationToken = default) { if (events == null || !events.Any()) return; Stopwatch sw = Stopwatch.StartNew(); - var setEventsAsDequeued = _eventSender != null; - if (setEventsAsDequeued) + var unsentEvents = new List(events); + + if (_eventSender != null) { try { - await _eventSender!.SendAsync(events).ConfigureAwait(false); + await _eventSender!.SendAsync(events, cancellationToken).ConfigureAwait(false); + sw.Stop(); + unsentEvents.Clear(); + Logger.LogDebug("{EventCount} event(s) were sent successfully; will be forwarded (sent/enqueued) to the datatbase outbox as sent. [Sender={Sender}, Elapsed={Elapsed}ms]", + events.Count(), _eventSender.GetType().Name, sw.Elapsed.TotalMilliseconds); + } + catch (EventSendException esex) + { sw.Stop(); - Logger.LogDebug("{EventCount} event(s) were sent successfully. [Sender={Sender}, Elapsed={Elapsed}ms]", events.Length, _eventSender.GetType().Name, sw.ElapsedMilliseconds); + Logger.LogWarning(esex, "{UnsentCount} of {EventCount} event(s) were unable to be sent successfully; will be forwarded (sent/enqueued) to the datatbase outbox for an out-of-process send: {ErrorMessage} [Sender={Sender}, Elapsed={Elapsed}ms]", + esex.NotSentEvents?.Count() ?? unsentEvents.Count, events.Count(), esex.Message, _eventSender!.GetType().Name, sw.Elapsed.TotalMilliseconds); + + if (esex.NotSentEvents != null) + unsentEvents = esex.NotSentEvents.ToList(); } catch (Exception ex) { sw.Stop(); - setEventsAsDequeued = false; Logger.LogWarning(ex, "{EventCount} event(s) were unable to be sent successfully; will be forwarded (sent/enqueued) to the datatbase outbox for an out-of-process send: {ErrorMessage} [Sender={Sender}, Elapsed={Elapsed}ms]", - events.Length, ex.Message, _eventSender!.GetType().Name, sw.ElapsedMilliseconds); + events.Count(), ex.Message, _eventSender!.GetType().Name, sw.Elapsed.TotalMilliseconds); } } sw = Stopwatch.StartNew(); - await Database.StoredProcedure(EnqueueStoredProcedure, p => p.Param("@SetEventsAsDequeued", setEventsAsDequeued).AddTableValuedParameter("@EventList", CreateTableValuedParameter(events))).NonQueryAsync().ConfigureAwait(false); + await Database.StoredProcedure(EnqueueStoredProcedure, p => p.AddTableValuedParameter("@EventList", CreateTableValuedParameter(events, unsentEvents))).NonQueryAsync().ConfigureAwait(false); sw.Stop(); - Logger.LogDebug("{EventCount} event(s) were enqueued. [Sender={Sender}, SetEventsAsDequeued={SetAsDequeued}, Elapsed={Elapsed}ms]", events.Length, GetType().Name, setEventsAsDequeued, sw.ElapsedMilliseconds); + Logger.LogDebug("{EventCount} event(s) were enqueued; {SuccessCount} as sent, {ErrorCount} to be sent. [Sender={Sender}, Elapsed={Elapsed}ms]", + events.Count(), events.Count() - unsentEvents.Count, unsentEvents.Count, GetType().Name, sw.Elapsed.TotalMilliseconds); } - /// - public TableValuedParameter CreateTableValuedParameter(IEnumerable list) + /// + /// Creates the TVP from the list. + /// + private TableValuedParameter CreateTableValuedParameter(IEnumerable list, IEnumerable unsentList) { var dt = new DataTable(); dt.Columns.Add(EventIdColumnName, typeof(string)); + dt.Columns.Add("EventDequeued", typeof(bool)); dt.Columns.Add(nameof(EventSendData.Destination), typeof(string)); dt.Columns.Add(nameof(EventSendData.Subject), typeof(string)); dt.Columns.Add(nameof(EventSendData.Action), typeof(string)); @@ -139,7 +156,8 @@ public TableValuedParameter CreateTableValuedParameter(IEnumerable()) : JsonSerializer.Default.SerializeToBinaryData(item.Attributes); - tvp.AddRow(item.Id, item.Destination ?? DefaultDestination ?? throw new InvalidOperationException($"The {nameof(DefaultDestination)} must have a non-null value."), + tvp.AddRow(item.Id, !unsentList.Contains(item), + item.Destination ?? DefaultDestination ?? throw new InvalidOperationException($"The {nameof(DefaultDestination)} must have a non-null value."), item.Subject, item.Action, item.Type, item.Source, item.Timestamp, item.CorrelationId, item.TenantId, item.PartitionKey ?? DefaultPartitionKey ?? throw new InvalidOperationException($"The {nameof(DefaultPartitionKey)} must have a non-null value."), item.ETag, attributes.ToArray(), item.Data?.ToArray()); diff --git a/src/DbEx/Templates/SqlServer/SpEventOutboxEnqueue_sql.hbs b/src/DbEx/Templates/SqlServer/SpEventOutboxEnqueue_sql.hbs index dbd6b21..ff25c3d 100644 --- a/src/DbEx/Templates/SqlServer/SpEventOutboxEnqueue_sql.hbs +++ b/src/DbEx/Templates/SqlServer/SpEventOutboxEnqueue_sql.hbs @@ -20,16 +20,14 @@ BEGIN @dequeuedDate DATETIME SET @enqueuedDate = SYSUTCDATETIME() - IF (@SetEventsAsDequeued = 1) - BEGIN - SET @dequeuedDate = @enqueuedDate - END + SET @dequeuedDate = @enqueuedDate -- Enqueued outbox resultant identifier. DECLARE @enqueuedId TABLE([{{OutboxTable}}Id] BIGINT) -- Cursor output variables. DECLARE @eventId NVARCHAR(127), + @eventDequeued BIT, @destination NVARCHAR(127), @subject NVARCHAR(511), @action NVARCHAR(255), @@ -45,10 +43,10 @@ BEGIN -- Declare, open, and fetch first event from cursor. DECLARE c CURSOR FORWARD_ONLY - FOR SELECT [EventId], [Destination], [Subject], [Action], [Type], [Source], [Timestamp], [CorrelationId], [TenantId], [PartitionKey], [ETag], [Attributes], [Data] FROM @EventList + FOR SELECT [EventId], [EventDequeued], [Destination], [Subject], [Action], [Type], [Source], [Timestamp], [CorrelationId], [TenantId], [PartitionKey], [ETag], [Attributes], [Data] FROM @EventList OPEN c - FETCH NEXT FROM c INTO @eventId, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @tenantId, @partitionKey, @etag, @attributes, @data + FETCH NEXT FROM c INTO @eventId, @eventDequeued, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @tenantId, @partitionKey, @etag, @attributes, @data -- Iterate the event(s). WHILE @@FETCH_STATUS = 0 @@ -56,7 +54,7 @@ BEGIN -- Enqueue event into outbox INSERT INTO [{{OutboxSchema}}].[{{OutboxTable}}] ([EnqueuedDate], [PartitionKey], [Destination], [DequeuedDate]) OUTPUT inserted.{{OutboxTable}}Id INTO @enqueuedId - VALUES (@enqueuedDate, @partitionKey, @destination, @dequeuedDate) + VALUES (@enqueuedDate, @partitionKey, @destination, CASE WHEN @eventDequeued IS NULL OR @eventDequeued = 0 THEN NULL ELSE @dequeuedDate END) SELECT @{{camel OutboxTable}}Id = [{{OutboxTable}}Id] FROM @enqueuedId @@ -95,7 +93,7 @@ BEGIN ) -- Fetch the next event from the cursor. - FETCH NEXT FROM c INTO @eventId, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @tenantId, @partitionKey, @etag, @attributes, @data + FETCH NEXT FROM c INTO @eventId, @eventDequeued, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @tenantId, @partitionKey, @etag, @attributes, @data END -- Close the cursor. diff --git a/src/DbEx/Templates/SqlServer/UdtEventOutbox_sql.hbs b/src/DbEx/Templates/SqlServer/UdtEventOutbox_sql.hbs index cf0bf92..e5c9a89 100644 --- a/src/DbEx/Templates/SqlServer/UdtEventOutbox_sql.hbs +++ b/src/DbEx/Templates/SqlServer/UdtEventOutbox_sql.hbs @@ -5,7 +5,8 @@ CREATE TYPE [{{OutboxSchema}}].[udt{{OutboxTable}}List] AS TABLE ( */ [EventId] NVARCHAR(127), - [Destination] NVARCHAR(127) NULL, + [EventDequeued] BIT NULL, -- Indicates whether to set the event as dequeued; i.e. already sent/processed. + [Destination] NVARCHAR(127) NULL, -- For example, queue/topic name. [Subject] NVARCHAR(511) NULL, [Action] NVARCHAR(255) NULL, [Type] NVARCHAR(1023) NULL, diff --git a/tests/DbEx.Test.OutboxConsole/Schema/Outbox/Stored Procedures/Generated/spEventOutboxEnqueue.sql b/tests/DbEx.Test.OutboxConsole/Schema/Outbox/Stored Procedures/Generated/spEventOutboxEnqueue.sql index c4b6602..f6fad3e 100644 --- a/tests/DbEx.Test.OutboxConsole/Schema/Outbox/Stored Procedures/Generated/spEventOutboxEnqueue.sql +++ b/tests/DbEx.Test.OutboxConsole/Schema/Outbox/Stored Procedures/Generated/spEventOutboxEnqueue.sql @@ -19,16 +19,14 @@ BEGIN @dequeuedDate DATETIME SET @enqueuedDate = SYSUTCDATETIME() - IF (@SetEventsAsDequeued = 1) - BEGIN - SET @dequeuedDate = @enqueuedDate - END + SET @dequeuedDate = @enqueuedDate -- Enqueued outbox resultant identifier. DECLARE @enqueuedId TABLE([EventOutboxId] BIGINT) -- Cursor output variables. DECLARE @eventId NVARCHAR(127), + @eventDequeued BIT, @destination NVARCHAR(127), @subject NVARCHAR(511), @action NVARCHAR(255), @@ -44,10 +42,10 @@ BEGIN -- Declare, open, and fetch first event from cursor. DECLARE c CURSOR FORWARD_ONLY - FOR SELECT [EventId], [Destination], [Subject], [Action], [Type], [Source], [Timestamp], [CorrelationId], [TenantId], [PartitionKey], [ETag], [Attributes], [Data] FROM @EventList + FOR SELECT [EventId], [EventDequeued], [Destination], [Subject], [Action], [Type], [Source], [Timestamp], [CorrelationId], [TenantId], [PartitionKey], [ETag], [Attributes], [Data] FROM @EventList OPEN c - FETCH NEXT FROM c INTO @eventId, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @tenantId, @partitionKey, @etag, @attributes, @data + FETCH NEXT FROM c INTO @eventId, @eventDequeued, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @tenantId, @partitionKey, @etag, @attributes, @data -- Iterate the event(s). WHILE @@FETCH_STATUS = 0 @@ -55,7 +53,7 @@ BEGIN -- Enqueue event into outbox INSERT INTO [Outbox].[EventOutbox] ([EnqueuedDate], [PartitionKey], [Destination], [DequeuedDate]) OUTPUT inserted.EventOutboxId INTO @enqueuedId - VALUES (@enqueuedDate, @partitionKey, @destination, @dequeuedDate) + VALUES (@enqueuedDate, @partitionKey, @destination, CASE WHEN @eventDequeued IS NULL OR @eventDequeued = 0 THEN NULL ELSE @dequeuedDate END) SELECT @eventOutboxId = [EventOutboxId] FROM @enqueuedId @@ -94,7 +92,7 @@ BEGIN ) -- Fetch the next event from the cursor. - FETCH NEXT FROM c INTO @eventId, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @tenantId, @partitionKey, @etag, @attributes, @data + FETCH NEXT FROM c INTO @eventId, @eventDequeued, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @tenantId, @partitionKey, @etag, @attributes, @data END -- Close the cursor. diff --git a/tests/DbEx.Test.OutboxConsole/Schema/Outbox/Types/User-Defined Table Types/Generated/udtEventOutboxList.sql b/tests/DbEx.Test.OutboxConsole/Schema/Outbox/Types/User-Defined Table Types/Generated/udtEventOutboxList.sql index 5ff48df..fa4ccd0 100644 --- a/tests/DbEx.Test.OutboxConsole/Schema/Outbox/Types/User-Defined Table Types/Generated/udtEventOutboxList.sql +++ b/tests/DbEx.Test.OutboxConsole/Schema/Outbox/Types/User-Defined Table Types/Generated/udtEventOutboxList.sql @@ -4,7 +4,8 @@ CREATE TYPE [Outbox].[udtEventOutboxList] AS TABLE ( */ [EventId] NVARCHAR(127), - [Destination] NVARCHAR(127) NULL, + [EventDequeued] BIT NULL, -- Indicates whether to set the event as dequeued; i.e. already sent/processed. + [Destination] NVARCHAR(127) NULL, -- For example, queue/topic name. [Subject] NVARCHAR(511) NULL, [Action] NVARCHAR(255) NULL, [Type] NVARCHAR(1023) NULL, diff --git a/tests/DbEx.Test/SqlServerOutboxTest.cs b/tests/DbEx.Test/SqlServerOutboxTest.cs index 114ec2d..f15ffec 100644 --- a/tests/DbEx.Test/SqlServerOutboxTest.cs +++ b/tests/DbEx.Test/SqlServerOutboxTest.cs @@ -7,6 +7,8 @@ using System; using System.Collections.Generic; using DbEx.Migration.SqlServer; +using System.Threading; +using System.Linq; namespace DbEx.Test { @@ -19,6 +21,8 @@ public async Task Setup() var cs = UnitTest.GetConfig("DbEx_").GetConnectionString("ConsoleDb"); var l = UnitTest.GetLogger(); var m = new SqlServerMigrator(cs, Migration.MigrationCommand.DropAndAll, l, typeof(Console.Program).Assembly, typeof(DbEx.Test.OutboxConsole.Program).Assembly); + m.ParserArgs.Parameters.Add("DefaultName", "Bazza"); + m.ParserArgs.RefDataColumnDefaults.Add("SortOrder", i => 1); await m.MigrateAsync().ConfigureAwait(false); } @@ -49,7 +53,7 @@ public async Task A100_EnqueueDequeue() }; var eoe = new EventOutboxEnqueue(db, UnitTest.GetLogger()); - await eoe.SendAsync(esd).ConfigureAwait(false); + await eoe.SendAsync(new EventSendData[] { esd }).ConfigureAwait(false); var ims = new InMemorySender(); var eod = new EventOutboxDequeue(db, ims, UnitTest.GetLogger()); @@ -89,11 +93,13 @@ public async Task A110_EnqueueDequeue_MaxDequeueSize() await db.SqlStatement("DELETE FROM [Outbox].[EventOutbox]").NonQueryAsync().ConfigureAwait(false); var eoe = new EventOutboxEnqueue(db, UnitTest.GetLogger()); - await eoe.SendAsync( + await eoe.SendAsync(new EventSendData[] + { new EventSendData { Id = "1", PartitionKey = null, Destination = null }, new EventSendData { Id = "2", PartitionKey = "apples", Destination = null }, new EventSendData { Id = "3", PartitionKey = null, Destination = "queue" }, - new EventSendData { Id = "4", PartitionKey = "apples", Destination = "queue" }).ConfigureAwait(false); + new EventSendData { Id = "4", PartitionKey = "apples", Destination = "queue" } + }).ConfigureAwait(false); var ims = new InMemorySender(); var eod = new EventOutboxDequeue(db, ims, UnitTest.GetLogger()); @@ -127,11 +133,13 @@ public async Task A120_EnqueueDequeue_PartitionKey_Destination_Selection() await db.SqlStatement("DELETE FROM [Outbox].[EventOutbox]").NonQueryAsync().ConfigureAwait(false); var eoe = new EventOutboxEnqueue(db, UnitTest.GetLogger()); - await eoe.SendAsync( + await eoe.SendAsync(new EventSendData[] + { new EventSendData { Id = "1", PartitionKey = null, Destination = null }, new EventSendData { Id = "2", PartitionKey = "apples", Destination = null }, new EventSendData { Id = "3", PartitionKey = null, Destination = "queue" }, - new EventSendData { Id = "4", PartitionKey = "apples", Destination = "queue" }).ConfigureAwait(false); + new EventSendData { Id = "4", PartitionKey = "apples", Destination = "queue" } + }).ConfigureAwait(false); var ims = new InMemorySender(); var eod = new EventOutboxDequeue(db, ims, UnitTest.GetLogger()); @@ -182,7 +190,7 @@ public async Task B100_EnqueueDequeue_PrimarySender_Success() var eoe = new EventOutboxEnqueue(db, UnitTest.GetLogger()); var pims = new InMemorySender(); eoe.SetPrimaryEventSender(pims); - await eoe.SendAsync(new EventSendData { Id = "1", PartitionKey = null, Destination = null }).ConfigureAwait(false); + await eoe.SendAsync(new EventSendData[] { new EventSendData { Id = "1", PartitionKey = null, Destination = null } }).ConfigureAwait(false); var events = pims.GetEvents(); Assert.AreEqual(1, events.Length); @@ -208,7 +216,7 @@ public async Task B110_EnqueueDequeue_PrimarySender_Error() var eoe = new EventOutboxEnqueue(db, UnitTest.GetLogger()); eoe.SetPrimaryEventSender(new TestSender()); - await eoe.SendAsync(new EventSendData { Id = "1", PartitionKey = null, Destination = null }).ConfigureAwait(false); + await eoe.SendAsync(new EventSendData[] { new EventSendData { Id = "1", PartitionKey = null, Destination = null } }).ConfigureAwait(false); var ims = new InMemorySender(); var eod = new EventOutboxDequeue(db, ims, UnitTest.GetLogger()); @@ -220,9 +228,48 @@ public async Task B110_EnqueueDequeue_PrimarySender_Error() Assert.AreEqual("1", events[0].Id); } + [Test] + public async Task B120_EnqueueDequeue_PrimarySender_EventSendException() + { + var cs = UnitTest.GetConfig("DbEx_").GetConnectionString("ConsoleDb"); + var l = UnitTest.GetLogger(); + + using var db = new Database(() => new SqlConnection(cs)); + await db.SqlStatement("DELETE FROM [Outbox].[EventOutbox]").NonQueryAsync().ConfigureAwait(false); + + var eoe = new EventOutboxEnqueue(db, UnitTest.GetLogger()); + eoe.SetPrimaryEventSender(new TestSenderFail()); + await eoe.SendAsync(new EventSendData[] + { + new EventSendData { Id = "1", PartitionKey = null, Destination = "A" }, + new EventSendData { Id = "2", PartitionKey = null, Destination = "B" }, + new EventSendData { Id = "3", PartitionKey = null, Destination = "A" }, + new EventSendData { Id = "4", PartitionKey = null, Destination = "B" } + }).ConfigureAwait(false); + + var ims = new InMemorySender(); + var eod = new EventOutboxDequeue(db, ims, UnitTest.GetLogger()); + + // Should have updated as enqueued; therefore, need to be dequeued. + Assert.AreEqual(2, await eod.DequeueAndSendAsync(10, null, null)); + var events = ims.GetEvents(); + Assert.AreEqual(2, events.Length); + Assert.AreEqual("2", events[0].Id); + Assert.AreEqual("4", events[1].Id); + } + private class TestSender : IEventSender { - public Task SendAsync(params EventSendData[] events) => throw new NotImplementedException(); + public Task SendAsync(IEnumerable events, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + } + + private class TestSenderFail : IEventSender + { + public Task SendAsync(IEnumerable events, CancellationToken cancellationToken = default) + { + var elist = events.ToArray(); + throw new EventSendException("Oh no that's not good.", new EventSendData[] { elist[1], elist[3] }); + } } } } \ No newline at end of file