Skip to content

Commit

Permalink
Add server error event
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Feb 18, 2025
1 parent 0e2e280 commit 665a73d
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public interface INatsConnection : INatsClient
/// </summary>
public event AsyncEventHandler<NatsLameDuckModeActivatedEventArgs>? LameDuckModeActivated;

/// <summary>
/// Event that is raised when server sends an error message ('-ERR').
/// </summary>
public event AsyncEventHandler<NatsServerErrorEventArgs>? ServerError;

/// <summary>
/// Server information received from the NATS server.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,15 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
var newPosition = newBuffer.PositionOf((byte)'\n');
var error = ParseError(newBuffer.Slice(0, buffer.GetOffset(newPosition!.Value) - 1));
_logger.LogError(NatsLogEvents.Protocol, "Server error {Error}", error);
_connection.PushEvent(NatsEvent.ServerError, new NatsServerErrorEventArgs(error));
_waitForPongOrErrorSignal.TrySetException(new NatsServerException(error));
return newBuffer.Slice(newBuffer.GetPosition(1, newPosition!.Value));
}
else
{
var error = ParseError(buffer.Slice(0, buffer.GetOffset(position.Value) - 1));
_logger.LogError(NatsLogEvents.Protocol, "Server error {Error}", error);
_connection.PushEvent(NatsEvent.ServerError, new NatsServerErrorEventArgs(error));
_waitForPongOrErrorSignal.TrySetException(new NatsServerException(error));
return buffer.Slice(buffer.GetPosition(1, position.Value));
}
Expand Down
9 changes: 9 additions & 0 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ internal enum NatsEvent
ReconnectFailed,
MessageDropped,
LameDuckModeActivated,
ServerError,
}

public partial class NatsConnection : INatsConnection
Expand Down Expand Up @@ -111,6 +112,8 @@ public NatsConnection(NatsOpts opts)

public event AsyncEventHandler<NatsLameDuckModeActivatedEventArgs>? LameDuckModeActivated;

public event AsyncEventHandler<NatsServerErrorEventArgs>? ServerError;

public INatsConnection Connection => this;

public NatsOpts Opts { get; }
Expand Down Expand Up @@ -304,6 +307,9 @@ internal ValueTask UnsubscribeAsync(int sid)
return default;
}

internal void PushEvent(NatsEvent @event, NatsEventArgs args)
=> _eventChannel.Writer.TryWrite((@event, args));

private async ValueTask InitialConnectAsync()
{
Debug.Assert(ConnectionState == NatsConnectionState.Connecting, "Connection state");
Expand Down Expand Up @@ -784,6 +790,9 @@ private async Task PublishEventsAsync()
case NatsEvent.LameDuckModeActivated when LameDuckModeActivated != null && args is NatsLameDuckModeActivatedEventArgs uri:
await LameDuckModeActivated.InvokeAsync(this, uri).ConfigureAwait(false);
break;
case NatsEvent.ServerError when ServerError != null && args is NatsServerErrorEventArgs error:
await ServerError.InvokeAsync(this, error).ConfigureAwait(false);
break;
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/NATS.Client.Core/NatsEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ public NatsLameDuckModeActivatedEventArgs(Uri uri)

public Uri Uri { get; }
}

public class NatsServerErrorEventArgs : NatsEventArgs
{
public NatsServerErrorEventArgs(string error)
: base($"Server error {error}") => Error = error;

public string Error { get; }
}
119 changes: 119 additions & 0 deletions tests/NATS.Client.Core2.Tests/ErrorHandlerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
using Microsoft.Extensions.Logging;
using NATS.Client.Core.Tests;
using NATS.Client.TestUtilities;

namespace NATS.Client.Core2.Tests;

[Collection("nats-server-restricted-user")]
public class ErrorHandlerTest
{
private readonly ITestOutputHelper _output;
private readonly NatsServerRestrictedUserFixture _server;

public ErrorHandlerTest(ITestOutputHelper output, NatsServerRestrictedUserFixture server)
{
_output = output;
_server = server;
}

[Fact]
public async Task Handle_permissions_violation()
{
var logger = new InMemoryTestLoggerFactory(LogLevel.Error);

var proxy = new NatsProxy(_server.Port);
await using var nats = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{proxy.Port}",
LoggerFactory = logger,
AuthOpts = new NatsAuthOpts { Username = "u" },
});

var errors = new List<NatsServerErrorEventArgs>();

nats.ServerError += (_, args) =>
{
lock (errors)
{
errors.Add(args);
}

return default;
};

var prefix = _server.GetNextId();

await nats.PublishAsync("x", $"_{prefix}_published_1_");

await nats.PingAsync();

await Retry.Until(
"published and pinged 1",
() =>
{
var published = false;
foreach (var frame in proxy.AllFrames)
{
if (frame.Origin == "C" && frame.Message.Contains($"_{prefix}_published_1_"))
{
published = true;
continue;
}

if (published && frame.Origin == "S" && frame.Message == "PONG")
{
return true;
}
}

return false;
});

await nats.PublishAsync("y", $"_{prefix}_published_2_");

await nats.PingAsync();

await Retry.Until(
"published and pinged 2",
() =>
{
var published = false;
foreach (var frame in proxy.AllFrames)
{
if (frame.Origin == "C" && frame.Message.Contains($"_{prefix}_published_2_"))
{
published = true;
continue;
}

if (published && frame.Origin == "S" && frame.Message == "PONG")
{
return true;
}
}

return false;
});

await Task.Delay(TimeSpan.FromSeconds(2));

Assert.Contains(proxy.AllFrames, f => f.Origin == "S" && f.Message == "-ERR 'Permissions Violation for Publish to \"y\"'");

await Retry.Until(
"error is logged",
() =>
{
return logger.Logs.Any(x => x.LogLevel == LogLevel.Error && x.Message == "Server error Permissions Violation for Publish to \"y\"");
});

await Retry.Until(
"server error event",
() =>
{
lock (errors)
{
return errors.Any(e => e.Error == "Permissions Violation for Publish to \"y\"");
}
});
}
}

0 comments on commit 665a73d

Please sign in to comment.