From 6e9f764c81adeeba7273d96a7870d69880bd0f86 Mon Sep 17 00:00:00 2001 From: wujun <8400684@qq.com> Date: Sun, 8 Dec 2024 11:42:19 +0800 Subject: [PATCH 1/5] Try making SuperSocket.Kestrel support Detach --- .../PipeConnectionBase.cs | 32 ++++++++++++++++--- .../KestrelPipeConnection.cs | 20 ++++++------ 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/src/SuperSocket.Connection/PipeConnectionBase.cs b/src/SuperSocket.Connection/PipeConnectionBase.cs index faa0d6c4d..cb7777683 100644 --- a/src/SuperSocket.Connection/PipeConnectionBase.cs +++ b/src/SuperSocket.Connection/PipeConnectionBase.cs @@ -47,6 +47,8 @@ IPipelineFilter IPipeConnection.PipelineFilter private bool _isDetaching = false; + private ISupplyController _supplyController; + protected PipeConnectionBase(PipeReader inputReader, PipeWriter outputWriter, ConnectionOptions options) { Options = options; @@ -68,9 +70,17 @@ protected void UpdateLastActiveTime() public async override IAsyncEnumerable RunAsync(IPipelineFilter pipelineFilter) { - var packagePipe = !Options.ReadAsDemand - ? new DefaultObjectPipe() - : new DefaultObjectPipeWithSupplyControl(); + IObjectPipe packagePipe; + if (Options.ReadAsDemand) + { + var defaultObjectPipe = new DefaultObjectPipeWithSupplyControl(); + _supplyController = defaultObjectPipe; + packagePipe = defaultObjectPipe; + } + else + { + packagePipe = new DefaultObjectPipe(); + } _packagePipe = packagePipe; _pipelineFilter = pipelineFilter; @@ -230,8 +240,9 @@ protected void WritePackageWithEncoder(IBufferWriter writer, IPa packageEncoder.Encode(writer, package); } - protected virtual void OnInputPipeRead(ReadResult result) + protected virtual Task OnInputPipeReadAsync(ReadResult result) { + return Task.CompletedTask; } protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe packagePipe, CancellationToken cancellationToken) @@ -245,7 +256,7 @@ protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe< try { result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); - OnInputPipeRead(result); + await OnInputPipeReadAsync(result); } catch (Exception e) { @@ -412,5 +423,16 @@ protected void OnError(string message, Exception e = null) else Logger?.LogError(message); } + + protected void SupplyEnd() + { + _supplyController?.SupplyEnd(); + } + + protected async Task SupplyRequiredAsync() + { + if (_supplyController != null) + await _supplyController.SupplyRequired().ConfigureAwait(false); + } } } diff --git a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs index 8ed40dd93..d92b54fe5 100644 --- a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs +++ b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs @@ -31,11 +31,6 @@ protected override void OnClosed() base.OnClosed(); } - public override ValueTask DetachAsync() - { - throw new NotSupportedException($"Detach is not supported by {nameof(KestrelPipeConnection)}."); - } - protected override async void Close() { var context = _context; @@ -49,12 +44,12 @@ protected override async void Close() } } - protected override void OnInputPipeRead(ReadResult result) + protected override async Task OnInputPipeReadAsync(ReadResult result) { - if (!result.IsCanceled && !result.IsCompleted) - { + if (result is { IsCanceled: false, IsCompleted: false }) UpdateLastActiveTime(); - } + + await SupplyRequiredAsync(); } public override async ValueTask SendAsync(Action write, CancellationToken cancellationToken) @@ -94,4 +89,11 @@ private void OnConnectionClosed() { Cancel(); } + + protected override Task StartInputPipeTask(IObjectPipe packagePipe, + CancellationToken cancellationToken) + { + cancellationToken.Register(SupplyEnd); + return base.StartInputPipeTask(packagePipe, cancellationToken); + } } From 051e5ad55baa2bbeafe2c7d5e31b20555b2ccd4d Mon Sep 17 00:00:00 2001 From: wujun <8400684@qq.com> Date: Sun, 8 Dec 2024 18:49:43 +0800 Subject: [PATCH 2/5] Try making SuperSocket.Kestrel support Detach --- src/SuperSocket.Connection/IObjectPipe.cs | 4 +-- .../PipeConnectionBase.cs | 27 +++---------------- .../KestrelPipeConnection.cs | 15 ++++++++--- 3 files changed, 16 insertions(+), 30 deletions(-) diff --git a/src/SuperSocket.Connection/IObjectPipe.cs b/src/SuperSocket.Connection/IObjectPipe.cs index 5655eef90..35d2a8f27 100644 --- a/src/SuperSocket.Connection/IObjectPipe.cs +++ b/src/SuperSocket.Connection/IObjectPipe.cs @@ -20,10 +20,10 @@ public interface IObjectPipe : IObjectPipe ValueTask ReadAsync(); } - interface ISupplyController + public interface ISupplyController { ValueTask SupplyRequired(); void SupplyEnd(); } -} +} \ No newline at end of file diff --git a/src/SuperSocket.Connection/PipeConnectionBase.cs b/src/SuperSocket.Connection/PipeConnectionBase.cs index cb7777683..7c5dd4b40 100644 --- a/src/SuperSocket.Connection/PipeConnectionBase.cs +++ b/src/SuperSocket.Connection/PipeConnectionBase.cs @@ -47,8 +47,6 @@ IPipelineFilter IPipeConnection.PipelineFilter private bool _isDetaching = false; - private ISupplyController _supplyController; - protected PipeConnectionBase(PipeReader inputReader, PipeWriter outputWriter, ConnectionOptions options) { Options = options; @@ -70,17 +68,9 @@ protected void UpdateLastActiveTime() public async override IAsyncEnumerable RunAsync(IPipelineFilter pipelineFilter) { - IObjectPipe packagePipe; - if (Options.ReadAsDemand) - { - var defaultObjectPipe = new DefaultObjectPipeWithSupplyControl(); - _supplyController = defaultObjectPipe; - packagePipe = defaultObjectPipe; - } - else - { - packagePipe = new DefaultObjectPipe(); - } + var packagePipe = !Options.ReadAsDemand + ? new DefaultObjectPipe() + : new DefaultObjectPipeWithSupplyControl(); _packagePipe = packagePipe; _pipelineFilter = pipelineFilter; @@ -423,16 +413,5 @@ protected void OnError(string message, Exception e = null) else Logger?.LogError(message); } - - protected void SupplyEnd() - { - _supplyController?.SupplyEnd(); - } - - protected async Task SupplyRequiredAsync() - { - if (_supplyController != null) - await _supplyController.SupplyRequired().ConfigureAwait(false); - } } } diff --git a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs index d92b54fe5..600c5600d 100644 --- a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs +++ b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs @@ -13,6 +13,7 @@ public class KestrelPipeConnection : PipeConnectionBase { private ConnectionContext _context; + private ISupplyController _supplyController; public KestrelPipeConnection(ConnectionContext context, ConnectionOptions options) : base(context.Transport.Input, context.Transport.Output, options) @@ -49,7 +50,8 @@ protected override async Task OnInputPipeReadAsync(ReadResult result) if (result is { IsCanceled: false, IsCompleted: false }) UpdateLastActiveTime(); - await SupplyRequiredAsync(); + if (_supplyController != null) + await _supplyController.SupplyRequired().ConfigureAwait(false); } public override async ValueTask SendAsync(Action write, CancellationToken cancellationToken) @@ -64,7 +66,8 @@ public override async ValueTask SendAsync(ReadOnlyMemory buffer, Cancellat UpdateLastActiveTime(); } - public override async ValueTask SendAsync(IPackageEncoder packageEncoder, TPackage package, CancellationToken cancellationToken) + public override async ValueTask SendAsync(IPackageEncoder packageEncoder, TPackage package, + CancellationToken cancellationToken) { await base.SendAsync(packageEncoder, package, cancellationToken); UpdateLastActiveTime(); @@ -93,7 +96,11 @@ private void OnConnectionClosed() protected override Task StartInputPipeTask(IObjectPipe packagePipe, CancellationToken cancellationToken) { - cancellationToken.Register(SupplyEnd); + _supplyController = packagePipe as ISupplyController; + + if (_supplyController != null) + cancellationToken.Register(() => _supplyController.SupplyEnd()); + return base.StartInputPipeTask(packagePipe, cancellationToken); } -} +} \ No newline at end of file From 96eaf4ae2ff50afebd8ec5fb4a65f0fe7a623b91 Mon Sep 17 00:00:00 2001 From: wujun <8400684@qq.com> Date: Mon, 9 Dec 2024 07:25:13 +0800 Subject: [PATCH 3/5] Using Reflection to create a kestrel client for unit testing --- test/SuperSocket.Tests/ClientTest.cs | 111 ++++++++++-------- .../ServiceCollectionExtensions.cs | 25 ++++ 2 files changed, 89 insertions(+), 47 deletions(-) create mode 100644 test/SuperSocket.Tests/ServiceCollectionExtensions.cs diff --git a/test/SuperSocket.Tests/ClientTest.cs b/test/SuperSocket.Tests/ClientTest.cs index c190f8174..7b2d05b74 100644 --- a/test/SuperSocket.Tests/ClientTest.cs +++ b/test/SuperSocket.Tests/ClientTest.cs @@ -14,7 +14,10 @@ using SuperSocket.Server.Abstractions; using SuperSocket.Server.Abstractions.Session; using System.Threading; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; +using SuperSocket.Kestrel; using SuperSocket.WebSocket; namespace SuperSocket.Tests @@ -256,78 +259,92 @@ public async Task TestCommandLine(Type hostConfiguratorType) } } - [Fact] +[Fact] [Trait("Category", "TestDetachableConnection")] public async Task TestDetachableConnection() { IHostConfigurator hostConfigurator = new RegularHostConfigurator(); - await TestDetachableConnectionInternal(hostConfigurator, (_, socket) => - new StreamPipeConnection( - hostConfigurator.GetClientStream(socket).Result, - socket.RemoteEndPoint, - socket.LocalEndPoint, - new ConnectionOptions - { - Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)), - ReadAsDemand = true - }) - ); - - /* KestrelPipeConnection doesn't support Detach right now. - await TestDetachableConnectionInternal(new KestralConnectionHostConfigurator(), (server, socket) => - new KestrelPipeConnection( - server.ServiceProvider.GetService().Create(socket), + using (var server = CreateSocketServerBuilder(hostConfigurator) + .UsePackageHandler(async (s, p) => + { + await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n")); + }).BuildAsServer()) + { + Assert.Equal("TestServer", server.Name); + + Assert.True(await server.StartAsync()); + OutputHelper.WriteLine("Server started."); + + using (var socket = hostConfigurator.CreateClient()) + { + await TestDetachableConnectionInternal(hostConfigurator, server, ser => new StreamPipeConnection( + hostConfigurator.GetClientStream(socket).Result, + socket.RemoteEndPoint, + socket.LocalEndPoint, new ConnectionOptions { Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)), - ReadAsDemand = false - } - ) - ); - */ - } + ReadAsDemand = true + }), () => socket.Connected); + } + + await server.StopAsync(); + } - async Task TestDetachableConnectionInternal(IHostConfigurator hostConfigurator, Func connectionFactory) - { using (var server = CreateSocketServerBuilder(hostConfigurator) - .UsePackageHandler(async (s, p) => - { - await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n")); - }).BuildAsServer()) + .ConfigureServices((ctx, services) => services.AddSocketConnectionFactory()) + .UsePackageHandler(async (s, p) => + { + await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n")); + }).BuildAsServer()) { - Assert.Equal("TestServer", server.Name); Assert.True(await server.StartAsync()); OutputHelper.WriteLine("Server started."); - - using (var socket = hostConfigurator.CreateClient()) + var connectionFactory = server.ServiceProvider + .GetRequiredService(); + await using (var context = await connectionFactory.ConnectAsync(hostConfigurator.GetServerEndPoint())) { - var connection = connectionFactory(server, socket); + await TestDetachableConnectionInternal(hostConfigurator, server, ser => new KestrelPipeConnection( + context, + new ConnectionOptions + { + Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)), + ReadAsDemand = true + } + ), () => !context.ConnectionClosed.IsCancellationRequested); + } - await TestConnection(connection); + await server.StopAsync(); + } + } - OutputHelper.WriteLine("Before DetachAsync"); + async Task TestDetachableConnectionInternal(IHostConfigurator hostConfigurator, + IServer server, + Func connectionFactory, + Func checkConnectionFactory) + { + var connection = connectionFactory(server); - await connection.DetachAsync(); + await TestConnection(connection); - // the connection is still alive in the server - Assert.Equal(1, server.SessionCount); + OutputHelper.WriteLine("Before DetachAsync"); - // socket.Connected is is still connected - Assert.True(socket.Connected); + await connection.DetachAsync(); - // Attach the socket with another connection - connection = connectionFactory(server, socket); + // the connection is still alive in the server + Assert.Equal(1, server.SessionCount); - await TestConnection(connection); - } + // socket.Connected is is still connected + Assert.True(checkConnectionFactory()); - await server.StopAsync(); - } - } + // Attach the socket with another connection + connection = connectionFactory(server); + await TestConnection(connection); + } async Task TestConnection(IConnection connection) { var packagePipe = connection.RunAsync(new LinePipelineFilter()); diff --git a/test/SuperSocket.Tests/ServiceCollectionExtensions.cs b/test/SuperSocket.Tests/ServiceCollectionExtensions.cs new file mode 100644 index 000000000..bdc2932a7 --- /dev/null +++ b/test/SuperSocket.Tests/ServiceCollectionExtensions.cs @@ -0,0 +1,25 @@ +using System; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets; +using Microsoft.Extensions.DependencyInjection; + +namespace SuperSocket.Tests; + +public static class ServiceCollectionExtensions +{ + private const string SocketConnectionFactoryTypeName = + "Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactory"; + + private static Type FindSocketConnectionFactory() + { + var assembly = typeof(SocketTransportOptions).Assembly; + var connectionFactoryType = assembly.GetType(SocketConnectionFactoryTypeName); + return connectionFactoryType ?? throw new NotSupportedException(SocketConnectionFactoryTypeName); + } + + public static IServiceCollection AddSocketConnectionFactory(this IServiceCollection services) + { + var factoryType = FindSocketConnectionFactory(); + return services.AddSingleton(typeof(IConnectionFactory), factoryType); + } +} \ No newline at end of file From 7ec3ad654329613377582ebfa4573039ac4f117f Mon Sep 17 00:00:00 2001 From: wujun <8400684@qq.com> Date: Mon, 9 Dec 2024 09:35:24 +0800 Subject: [PATCH 4/5] When kestrelconnection DetachAsync does not call reader.Complete() --- src/SuperSocket.Connection/PipeConnectionBase.cs | 7 ++++++- src/SuperSocket.Kestrel/KestrelPipeConnection.cs | 8 ++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/SuperSocket.Connection/PipeConnectionBase.cs b/src/SuperSocket.Connection/PipeConnectionBase.cs index 7c5dd4b40..16936a86b 100644 --- a/src/SuperSocket.Connection/PipeConnectionBase.cs +++ b/src/SuperSocket.Connection/PipeConnectionBase.cs @@ -305,7 +305,7 @@ protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe< } } - reader.Complete(); + OnReaderComplete(reader, _isDetaching); WriteEOFPackage(); } @@ -413,5 +413,10 @@ protected void OnError(string message, Exception e = null) else Logger?.LogError(message); } + + protected virtual void OnReaderComplete(PipeReader reader, bool isDetaching) + { + reader.Complete(); + } } } diff --git a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs index 600c5600d..06286dff3 100644 --- a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs +++ b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs @@ -103,4 +103,12 @@ protected override Task StartInputPipeTask(IObjectPipe Date: Tue, 10 Dec 2024 12:23:31 +0800 Subject: [PATCH 5/5] Try making SuperSocket.Kestrel support Detach --- src/SuperSocket.Connection/IObjectPipe.cs | 2 +- .../PipeConnectionBase.cs | 18 ++++---- .../KestrelPipeConnection.cs | 42 ++++++++----------- test/SuperSocket.Tests/ClientTest.cs | 4 +- 4 files changed, 31 insertions(+), 35 deletions(-) diff --git a/src/SuperSocket.Connection/IObjectPipe.cs b/src/SuperSocket.Connection/IObjectPipe.cs index 35d2a8f27..66b05aafb 100644 --- a/src/SuperSocket.Connection/IObjectPipe.cs +++ b/src/SuperSocket.Connection/IObjectPipe.cs @@ -20,7 +20,7 @@ public interface IObjectPipe : IObjectPipe ValueTask ReadAsync(); } - public interface ISupplyController + interface ISupplyController { ValueTask SupplyRequired(); diff --git a/src/SuperSocket.Connection/PipeConnectionBase.cs b/src/SuperSocket.Connection/PipeConnectionBase.cs index 16936a86b..51a26be93 100644 --- a/src/SuperSocket.Connection/PipeConnectionBase.cs +++ b/src/SuperSocket.Connection/PipeConnectionBase.cs @@ -66,11 +66,16 @@ protected void UpdateLastActiveTime() LastActiveTime = DateTimeOffset.Now; } - public async override IAsyncEnumerable RunAsync(IPipelineFilter pipelineFilter) + protected virtual IObjectPipe CreatePackagePipe(bool readAsDemand) { - var packagePipe = !Options.ReadAsDemand + return !readAsDemand ? new DefaultObjectPipe() : new DefaultObjectPipeWithSupplyControl(); + } + + public async override IAsyncEnumerable RunAsync(IPipelineFilter pipelineFilter) + { + var packagePipe = CreatePackagePipe(Options.ReadAsDemand); _packagePipe = packagePipe; _pipelineFilter = pipelineFilter; @@ -230,9 +235,8 @@ protected void WritePackageWithEncoder(IBufferWriter writer, IPa packageEncoder.Encode(writer, package); } - protected virtual Task OnInputPipeReadAsync(ReadResult result) + protected virtual void OnInputPipeRead(ReadResult result) { - return Task.CompletedTask; } protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe packagePipe, CancellationToken cancellationToken) @@ -246,7 +250,7 @@ protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe< try { result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); - await OnInputPipeReadAsync(result); + OnInputPipeRead(result); } catch (Exception e) { @@ -305,7 +309,7 @@ protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe< } } - OnReaderComplete(reader, _isDetaching); + CompleteReader(reader, _isDetaching); WriteEOFPackage(); } @@ -414,7 +418,7 @@ protected void OnError(string message, Exception e = null) Logger?.LogError(message); } - protected virtual void OnReaderComplete(PipeReader reader, bool isDetaching) + protected virtual void CompleteReader(PipeReader reader, bool isDetaching) { reader.Complete(); } diff --git a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs index 06286dff3..af5d9772e 100644 --- a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs +++ b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs @@ -1,4 +1,6 @@ -namespace SuperSocket.Kestrel; +using Microsoft.Extensions.Logging; + +namespace SuperSocket.Kestrel; using System; using System.IO; @@ -13,7 +15,6 @@ public class KestrelPipeConnection : PipeConnectionBase { private ConnectionContext _context; - private ISupplyController _supplyController; public KestrelPipeConnection(ConnectionContext context, ConnectionOptions options) : base(context.Transport.Input, context.Transport.Output, options) @@ -22,6 +23,19 @@ public KestrelPipeConnection(ConnectionContext context, ConnectionOptions option context.ConnectionClosed.Register(() => OnConnectionClosed()); LocalEndPoint = context.LocalEndPoint; RemoteEndPoint = context.RemoteEndPoint; + + if (options.ReadAsDemand) + { + Logger.LogWarning($"{nameof(KestrelPipeConnection)} doesn't support ReadAsDemand."); + } + } + + protected override void CompleteReader(PipeReader reader, bool isDetaching) + { + if (!isDetaching) + { + reader.Complete(); + } } protected override void OnClosed() @@ -45,13 +59,10 @@ protected override async void Close() } } - protected override async Task OnInputPipeReadAsync(ReadResult result) + protected override void OnInputPipeRead(ReadResult result) { if (result is { IsCanceled: false, IsCompleted: false }) UpdateLastActiveTime(); - - if (_supplyController != null) - await _supplyController.SupplyRequired().ConfigureAwait(false); } public override async ValueTask SendAsync(Action write, CancellationToken cancellationToken) @@ -92,23 +103,4 @@ private void OnConnectionClosed() { Cancel(); } - - protected override Task StartInputPipeTask(IObjectPipe packagePipe, - CancellationToken cancellationToken) - { - _supplyController = packagePipe as ISupplyController; - - if (_supplyController != null) - cancellationToken.Register(() => _supplyController.SupplyEnd()); - - return base.StartInputPipeTask(packagePipe, cancellationToken); - } - - protected override void OnReaderComplete(PipeReader reader, bool isDetaching) - { - if (isDetaching) - return; - - reader.Complete(); - } } \ No newline at end of file diff --git a/test/SuperSocket.Tests/ClientTest.cs b/test/SuperSocket.Tests/ClientTest.cs index 7b2d05b74..38116096a 100644 --- a/test/SuperSocket.Tests/ClientTest.cs +++ b/test/SuperSocket.Tests/ClientTest.cs @@ -297,7 +297,7 @@ public async Task TestDetachableConnection() .UsePackageHandler(async (s, p) => { await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n")); - }).BuildAsServer()) + }).UseKestrelPipeConnection().BuildAsServer()) { Assert.Equal("TestServer", server.Name); @@ -312,7 +312,7 @@ public async Task TestDetachableConnection() new ConnectionOptions { Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)), - ReadAsDemand = true + ReadAsDemand = false } ), () => !context.ConnectionClosed.IsCancellationRequested); }