Skip to content

Commit

Permalink
Add CancellationTokenSourcePool (#710)
Browse files Browse the repository at this point in the history
* Add CancellationTokenSourcePool

* Add CancellationTokenSourcePool
  • Loading branch information
wj8400684 authored Apr 17, 2024
1 parent 4cea48c commit 37a790a
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 0 deletions.
76 changes: 76 additions & 0 deletions src/SuperSocket.Primitives/CancellationTokenSourcePool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System;
using System.Collections.Concurrent;
using System.Threading;

namespace SuperSocket;

#if NET6_0_OR_GREATER

public sealed class CancellationTokenSourcePool
{
private const int MaxQueueSize = 1024;

private readonly ConcurrentQueue<PooledCancellationTokenSource> _queue = new();
private int _count;

public static readonly CancellationTokenSourcePool Shared = new();

public PooledCancellationTokenSource Rent()
{
if (_queue.TryDequeue(out var cts))
{
Interlocked.Decrement(ref _count);
cts.CancelAfter(Timeout.Infinite);
return cts;
}

return new PooledCancellationTokenSource(this);
}

public PooledCancellationTokenSource Rent(TimeSpan delay)
{
var token = Rent();
token.CancelAfter(delay);
return token;
}

private bool Return(PooledCancellationTokenSource cts)
{
if (Interlocked.Increment(ref _count) > MaxQueueSize || !cts.TryReset())
{
Interlocked.Decrement(ref _count);
return false;
}

_queue.Enqueue(cts);
return true;
}

/// <summary>
/// A <see cref="CancellationTokenSource"/> with a back pointer to the pool it came from.
/// Dispose will return it to the pool.
/// </summary>
public sealed class PooledCancellationTokenSource
: CancellationTokenSource
{
private readonly CancellationTokenSourcePool _pool;

public PooledCancellationTokenSource(CancellationTokenSourcePool pool)
{
_pool = pool;
}

protected override void Dispose(bool disposing)
{
if (!disposing)
return;
// If we failed to return to the pool then dispose
if (!_pool.Return(this))
{
base.Dispose(disposing);
}
}
}
}

#endif
4 changes: 4 additions & 0 deletions src/SuperSocket.Server/Connection/TcpConnectionListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ private async void OnNewClientAccept(Socket socket)

try
{
#if NET6_0_OR_GREATER
using var cts = CancellationTokenSourcePool.Shared.Rent(Options.ConnectionAcceptTimeOut);
#else
using var cts = new CancellationTokenSource(Options.ConnectionAcceptTimeOut);
#endif
connection = await ConnectionFactory.CreateConnection(socket, cts.Token);
}
catch (Exception e)
Expand Down
8 changes: 8 additions & 0 deletions src/SuperSocket.Server/SuperSocketService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ private ValueTask AcceptNewConnection(IConnection connection)
async Task IConnectionRegister.RegisterConnection(object connectionSource)
{
var connectionListener = _connectionListeners.FirstOrDefault();
#if NET6_0_OR_GREATER
using var cts = CancellationTokenSourcePool.Shared.Rent(connectionListener.Options.ConnectionAcceptTimeOut);
#else
using var cts = new CancellationTokenSource(connectionListener.Options.ConnectionAcceptTimeOut);
#endif
var connection = await connectionListener.ConnectionFactory.CreateConnection(connectionSource, cts.Token);
await AcceptNewConnection(connection);
}
Expand Down Expand Up @@ -420,9 +424,13 @@ private async ValueTask HandleSession(AppSession session, IConnection connection

protected virtual CancellationTokenSource GetPackageHandlingCancellationTokenSource(CancellationToken cancellationToken)
{
#if NET6_0_OR_GREATER
return CancellationTokenSourcePool.Shared.Rent(TimeSpan.FromSeconds(Options.PackageHandlingTimeOut));
#else
var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(Options.PackageHandlingTimeOut));
return cancellationTokenSource;
#endif
}

protected virtual ValueTask<bool> OnSessionErrorAsync(IAppSession session, PackageHandlingException<TReceivePackageInfo> exception)
Expand Down
4 changes: 4 additions & 0 deletions src/SuperSocket.Udp/UdpConnectionListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ private async ValueTask<IConnection> CreateConnection(Socket socket, IPEndPoint
{
try
{
#if NET6_0_OR_GREATER
using var cts = CancellationTokenSourcePool.Shared.Rent(Options.ConnectionAcceptTimeOut);
#else
using var cts = new CancellationTokenSource(Options.ConnectionAcceptTimeOut);
#endif
return await ConnectionFactory.CreateConnection(new UdpConnectionInfo
{
Socket = socket,
Expand Down

0 comments on commit 37a790a

Please sign in to comment.