Skip to content

Commit

Permalink
Report WrongExpectedVersion properly in SQL Server scripts (#376)
Browse files Browse the repository at this point in the history
* Update check_stream
* Added a test
* Added try-catch to AppendEvents (SQL Server)
  • Loading branch information
alexeyzimarev authored Sep 10, 2024
1 parent f13666a commit 9f05a4e
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 26 deletions.
10 changes: 10 additions & 0 deletions src/Core/test/Eventuous.Tests.Persistence.Base/Fixtures/Helpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,14 @@ public static Task<AppendEventsResult> AppendEvent(

return fixture.EventStore.AppendEvents(stream, version, [streamEvent], default);
}


public static Task<AppendEventsResult> StoreChanges(
this StoreFixtureBase fixture,
StreamName stream,
object evt,
ExpectedStreamVersion version
) {
return fixture.EventStore.Store(stream, version, [evt]);
}
}
15 changes: 15 additions & 0 deletions src/Core/test/Eventuous.Tests.Persistence.Base/Store/Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,19 @@ public async Task ShouldFailOnWrongVersion() {
var task = () => _fixture.AppendEvent(stream, evt, new(3));
await task.Should().ThrowAsync<AppendToStreamException>();
}


[Fact]
[Trait("Category", "Store")]
public async Task ShouldFailOnWrongVersionWithOptimisticConcurrencyException() {
var evt = _fixture.CreateEvent();
var stream = _fixture.GetStreamName();

await _fixture.AppendEvent(stream, evt, ExpectedStreamVersion.NoStream);

evt = _fixture.CreateEvent();

var task = () => _fixture.StoreChanges(stream, evt, new(3));
await task.Should().ThrowAsync<OptimisticConcurrencyException>();
}
}
27 changes: 23 additions & 4 deletions src/SqlServer/src/Eventuous.SqlServer/Scripts/2_AppendEvents.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ AS
BEGIN
DECLARE @current_version INT,
@stream_id INT,
@position BIGINT
@position BIGINT,
@customErrorMessage NVARCHAR(200),
@newMessagesCount INT,
@expected_StreamVersionAfterUpdate INT,
@actual_StreamVersionAfterUpdate INT

if @created is null
BEGIN
Expand All @@ -16,10 +20,25 @@ BEGIN

EXEC [__schema__].[check_stream] @stream_name, @expected_version, @current_version = @current_version OUTPUT, @stream_id = @stream_id OUTPUT

INSERT INTO __schema__.Messages (MessageId, MessageType, StreamId, StreamPosition, JsonData, JsonMetadata, Created)
SELECT message_id, message_type, @stream_id, @current_version + (ROW_NUMBER() OVER(ORDER BY (SELECT NULL))), json_data, json_metadata, @created
FROM @messages
BEGIN TRY
INSERT INTO __schema__.Messages (MessageId, MessageType, StreamId, StreamPosition, JsonData, JsonMetadata, Created)
SELECT message_id, message_type, @stream_id, @current_version + (ROW_NUMBER() OVER(ORDER BY (SELECT NULL))), json_data, json_metadata, @created
FROM @messages
END TRY
BEGIN CATCH
IF (ERROR_NUMBER() = 2627 OR ERROR_NUMBER() = 2601) AND (SELECT CHARINDEX(N'UQ_StreamIdAndStreamPosition', ERROR_MESSAGE())) > 0
BEGIN
DECLARE @streamIdFromError nvarchar(20) = SUBSTRING(ERROR_MESSAGE(), PATINDEX(N'%[0-9]%,%', ERROR_MESSAGE()), PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE()) - PATINDEX(N'%[0-9]%,%', ERROR_MESSAGE()))
DECLARE @streamPositionFromError nvarchar(20) = SUBSTRING(ERROR_MESSAGE(), (PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE())) + 2, PATINDEX(N'%).', ERROR_MESSAGE()) - (PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE()) + 2))

-- TODO: There are multiple causes of OptimisticConcurrencyExceptions, but current client code is hard-coded to check for 'WrongExpectedVersion' in message and 50000 as error number.
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion, another message has already been written at stream position %s on stream %s.', @streamIdFromError, @streamPositionFromError);
THROW 50000, @customErrorMessage, 1;
END
ELSE
THROW
END CATCH

SELECT TOP 1 @current_version = StreamPosition, @position = GlobalPosition
FROM __schema__.Messages
WHERE StreamId = @stream_id
Expand Down
56 changes: 35 additions & 21 deletions src/SqlServer/src/Eventuous.SqlServer/Scripts/3_CheckStream.sql
Original file line number Diff line number Diff line change
@@ -1,29 +1,43 @@
CREATE OR ALTER PROCEDURE __schema__.check_stream
@stream_name NVARCHAR(850),
@expected_version int,
@current_version INT OUTPUT,
@stream_id INT OUTPUT
AS
CREATE OR ALTER PROCEDURE __schema__.check_stream @stream_name NVARCHAR(850),
@expected_version int,
@current_version INT OUTPUT,
@stream_id INT OUTPUT
AS
BEGIN
DECLARE @customErrorMessage NVARCHAR(200)

SELECT @current_version = [Version], @stream_id =StreamId
FROM __schema__.Streams
SELECT @current_version = [Version], @stream_id = StreamId
FROM [__schema__].Streams
WHERE StreamName = @stream_name

IF @stream_id is null
BEGIN
IF @expected_version = -2 -- Any
OR @expected_version = -1 -- NoStream
IF @stream_id is null
BEGIN
INSERT INTO __schema__.Streams (StreamName, Version) VALUES (@stream_name, -1);
SELECT @current_version = Version, @stream_id = StreamId
FROM __schema__.Streams
WHERE StreamName = @stream_name
IF @expected_version = -2 -- Any
OR @expected_version = -1 -- NoStream
BEGIN
BEGIN TRY
INSERT INTO [__schema__].Streams (StreamName, Version) VALUES (@stream_name, -1);
SELECT @current_version = Version, @stream_id = StreamId
FROM [__schema__].Streams
WHERE StreamName = @stream_name
END TRY
BEGIN CATCH
IF (ERROR_NUMBER() = 2627 OR ERROR_NUMBER() = 2601) AND (SELECT CHARINDEX(N'UQ_StreamName', ERROR_MESSAGE())) > 0
BEGIN
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, stream already exists', @expected_version);
THROW 50000, @customErrorMessage, 1;
END
ELSE
THROW
END CATCH
END
ELSE
THROW 50001, N'StreamNotFound', 1;
END
ELSE
THROW 50001, 'StreamNotFound', 1;
END
ELSE IF @expected_version != -2 and @expected_version != @current_version
THROW 50000, 'WrongExpectedVersion %, current version %', 1;

IF @expected_version != -2 and @expected_version != @current_version
BEGIN
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, current version %i', @expected_version, @current_version);
THROW 50000, @customErrorMessage, 1;
END
END
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace Eventuous.Tests.SqlServer.Fixtures;

public static class SqlContainer {
public static SqlEdgeContainer Create() => new SqlEdgeBuilder()
.WithImage("mcr.microsoft.com/azure-sql-edge:latest")
// .WithImage("mcr.microsoft.com/azure-sql-edge:1.0.7")
.WithImage("mcr.microsoft.com/mssql/server:2022-latest")
.Build();
}

0 comments on commit 9f05a4e

Please sign in to comment.