From 37a790a4dbfa7a2fe06d3ec73ca8131283dcff7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E4=BF=8A?= <39239954+wj8400684@users.noreply.github.com> Date: Thu, 18 Apr 2024 01:16:15 +0800 Subject: [PATCH] Add CancellationTokenSourcePool (#710) * Add CancellationTokenSourcePool * Add CancellationTokenSourcePool --- .../CancellationTokenSourcePool.cs | 76 +++++++++++++++++++ .../Connection/TcpConnectionListener.cs | 4 + src/SuperSocket.Server/SuperSocketService.cs | 8 ++ src/SuperSocket.Udp/UdpConnectionListener.cs | 4 + 4 files changed, 92 insertions(+) create mode 100644 src/SuperSocket.Primitives/CancellationTokenSourcePool.cs diff --git a/src/SuperSocket.Primitives/CancellationTokenSourcePool.cs b/src/SuperSocket.Primitives/CancellationTokenSourcePool.cs new file mode 100644 index 000000000..db8f28733 --- /dev/null +++ b/src/SuperSocket.Primitives/CancellationTokenSourcePool.cs @@ -0,0 +1,76 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; + +namespace SuperSocket; + +#if NET6_0_OR_GREATER + +public sealed class CancellationTokenSourcePool +{ + private const int MaxQueueSize = 1024; + + private readonly ConcurrentQueue _queue = new(); + private int _count; + + public static readonly CancellationTokenSourcePool Shared = new(); + + public PooledCancellationTokenSource Rent() + { + if (_queue.TryDequeue(out var cts)) + { + Interlocked.Decrement(ref _count); + cts.CancelAfter(Timeout.Infinite); + return cts; + } + + return new PooledCancellationTokenSource(this); + } + + public PooledCancellationTokenSource Rent(TimeSpan delay) + { + var token = Rent(); + token.CancelAfter(delay); + return token; + } + + private bool Return(PooledCancellationTokenSource cts) + { + if (Interlocked.Increment(ref _count) > MaxQueueSize || !cts.TryReset()) + { + Interlocked.Decrement(ref _count); + return false; + } + + _queue.Enqueue(cts); + return true; + } + + /// + /// A with a back pointer to the pool it came from. + /// Dispose will return it to the pool. + /// + public sealed class PooledCancellationTokenSource + : CancellationTokenSource + { + private readonly CancellationTokenSourcePool _pool; + + public PooledCancellationTokenSource(CancellationTokenSourcePool pool) + { + _pool = pool; + } + + protected override void Dispose(bool disposing) + { + if (!disposing) + return; + // If we failed to return to the pool then dispose + if (!_pool.Return(this)) + { + base.Dispose(disposing); + } + } + } +} + +#endif \ No newline at end of file diff --git a/src/SuperSocket.Server/Connection/TcpConnectionListener.cs b/src/SuperSocket.Server/Connection/TcpConnectionListener.cs index d5aa8d831..74a214663 100644 --- a/src/SuperSocket.Server/Connection/TcpConnectionListener.cs +++ b/src/SuperSocket.Server/Connection/TcpConnectionListener.cs @@ -107,7 +107,11 @@ private async void OnNewClientAccept(Socket socket) try { +#if NET6_0_OR_GREATER + using var cts = CancellationTokenSourcePool.Shared.Rent(Options.ConnectionAcceptTimeOut); +#else using var cts = new CancellationTokenSource(Options.ConnectionAcceptTimeOut); +#endif connection = await ConnectionFactory.CreateConnection(socket, cts.Token); } catch (Exception e) diff --git a/src/SuperSocket.Server/SuperSocketService.cs b/src/SuperSocket.Server/SuperSocketService.cs index 17787e3db..cd1b57cc8 100644 --- a/src/SuperSocket.Server/SuperSocketService.cs +++ b/src/SuperSocket.Server/SuperSocketService.cs @@ -179,7 +179,11 @@ private ValueTask AcceptNewConnection(IConnection connection) async Task IConnectionRegister.RegisterConnection(object connectionSource) { var connectionListener = _connectionListeners.FirstOrDefault(); +#if NET6_0_OR_GREATER + using var cts = CancellationTokenSourcePool.Shared.Rent(connectionListener.Options.ConnectionAcceptTimeOut); +#else using var cts = new CancellationTokenSource(connectionListener.Options.ConnectionAcceptTimeOut); +#endif var connection = await connectionListener.ConnectionFactory.CreateConnection(connectionSource, cts.Token); await AcceptNewConnection(connection); } @@ -420,9 +424,13 @@ private async ValueTask HandleSession(AppSession session, IConnection connection protected virtual CancellationTokenSource GetPackageHandlingCancellationTokenSource(CancellationToken cancellationToken) { +#if NET6_0_OR_GREATER + return CancellationTokenSourcePool.Shared.Rent(TimeSpan.FromSeconds(Options.PackageHandlingTimeOut)); +#else var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(Options.PackageHandlingTimeOut)); return cancellationTokenSource; +#endif } protected virtual ValueTask OnSessionErrorAsync(IAppSession session, PackageHandlingException exception) diff --git a/src/SuperSocket.Udp/UdpConnectionListener.cs b/src/SuperSocket.Udp/UdpConnectionListener.cs index c9db3a650..85cc46266 100644 --- a/src/SuperSocket.Udp/UdpConnectionListener.cs +++ b/src/SuperSocket.Udp/UdpConnectionListener.cs @@ -177,7 +177,11 @@ private async ValueTask CreateConnection(Socket socket, IPEndPoint { try { +#if NET6_0_OR_GREATER + using var cts = CancellationTokenSourcePool.Shared.Rent(Options.ConnectionAcceptTimeOut); +#else using var cts = new CancellationTokenSource(Options.ConnectionAcceptTimeOut); +#endif return await ConnectionFactory.CreateConnection(new UdpConnectionInfo { Socket = socket,