Skip to content

Commit

Permalink
added cancellation token for the methods SendAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
kerryjiang committed Apr 15, 2024
1 parent ca9439b commit 24dcc13
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 38 deletions.
6 changes: 3 additions & 3 deletions src/SuperSocket.Connection/ConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ public abstract class ConnectionBase : IConnection
{
public abstract IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPipelineFilter<TPackageInfo> pipelineFilter);

public abstract ValueTask SendAsync(ReadOnlyMemory<byte> buffer);
public abstract ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default);

public abstract ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package);
public abstract ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken = default);

public abstract ValueTask SendAsync(Action<PipeWriter> write);
public abstract ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken = default);

public bool IsClosed { get; private set; }

Expand Down
7 changes: 4 additions & 3 deletions src/SuperSocket.Connection/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.ProtoBase;

Expand All @@ -11,11 +12,11 @@ public interface IConnection
{
IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPipelineFilter<TPackageInfo> pipelineFilter);

ValueTask SendAsync(ReadOnlyMemory<byte> data);
ValueTask SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);

ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package);
ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken = default);

ValueTask SendAsync(Action<PipeWriter> write);
ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken = default);

ValueTask CloseAsync(CloseReason closeReason);

Expand Down
18 changes: 9 additions & 9 deletions src/SuperSocket.Connection/PipeConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ private void CheckConnectionOpen()
}
}

public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer)
public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
try
{
await SendLock.WaitAsync().ConfigureAwait(false);
await SendLock.WaitAsync(cancellationToken).ConfigureAwait(false);
WriteBuffer(OutputWriter, buffer);
await OutputWriter.FlushAsync().ConfigureAwait(false);
await OutputWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
Expand All @@ -181,27 +181,27 @@ private void WriteBuffer(PipeWriter writer, ReadOnlyMemory<byte> buffer)
writer.Write(buffer.Span);
}

public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package)
public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken = default)
{
try
{
await SendLock.WaitAsync().ConfigureAwait(false);
await SendLock.WaitAsync(cancellationToken).ConfigureAwait(false);
WritePackageWithEncoder<TPackage>(OutputWriter, packageEncoder, package);
await OutputWriter.FlushAsync().ConfigureAwait(false);
await OutputWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
SendLock.Release();
}
}

public override async ValueTask SendAsync(Action<PipeWriter> write)
public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken)
{
try
{
await SendLock.WaitAsync().ConfigureAwait(false);
await SendLock.WaitAsync(cancellationToken).ConfigureAwait(false);
write(OutputWriter);
await OutputWriter.FlushAsync().ConfigureAwait(false);
await OutputWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
Expand Down
18 changes: 9 additions & 9 deletions src/SuperSocket.Connection/UdpPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,31 +92,31 @@ private void MergeBuffer(ref ReadOnlySequence<byte> buffer, byte[] destBuffer)
}
}

public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer)
public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
if (_enableSendingPipe)
{
await base.SendAsync(buffer);
await base.SendAsync(buffer, cancellationToken);
return;
}

await SendOverIOAsync(new ReadOnlySequence<byte>(buffer), CancellationToken.None);
await SendOverIOAsync(new ReadOnlySequence<byte>(buffer), cancellationToken);
}

public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package)
public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken)
{
if (_enableSendingPipe)
{
await base.SendAsync(packageEncoder, package);
await base.SendAsync(packageEncoder, package, cancellationToken);
return;
}

try
{
await SendLock.WaitAsync();
await SendLock.WaitAsync(cancellationToken);
var writer = OutputWriter;
WritePackageWithEncoder<TPackage>(writer, packageEncoder, package);
await writer.FlushAsync();
await writer.FlushAsync(cancellationToken);
await ProcessOutputRead(Output.Reader);
}
finally
Expand All @@ -125,11 +125,11 @@ public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> pa
}
}

public override async ValueTask SendAsync(Action<PipeWriter> write)
public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken)
{
if (_enableSendingPipe)
{
await base.SendAsync(write);
await base.SendAsync(write, cancellationToken);
return;
}

Expand Down
12 changes: 6 additions & 6 deletions src/SuperSocket.Kestrel/KestrelPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,21 @@ protected override void OnInputPipeRead(ReadResult result)
}
}

public override async ValueTask SendAsync(Action<PipeWriter> write)
public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken)
{
await base.SendAsync(write);
await base.SendAsync(write, cancellationToken);
UpdateLastActiveTime();
}

public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer)
public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
await base.SendAsync(buffer);
await base.SendAsync(buffer, cancellationToken);
UpdateLastActiveTime();
}

public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package)
public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken)
{
await base.SendAsync(packageEncoder, package);
await base.SendAsync(packageEncoder, package, cancellationToken);
UpdateLastActiveTime();
}
}
5 changes: 3 additions & 2 deletions src/SuperSocket.Server.Abstractions/Session/IAppSession.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.Connection;
using SuperSocket.ProtoBase;
Expand All @@ -21,9 +22,9 @@ public interface IAppSession

EndPoint LocalEndPoint { get; }

ValueTask SendAsync(ReadOnlyMemory<byte> data);
ValueTask SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);

ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package);
ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken = default);

ValueTask CloseAsync(CloseReason reason);

Expand Down
9 changes: 5 additions & 4 deletions src/SuperSocket.Server/AppSession.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using SuperSocket.Connection;
Expand Down Expand Up @@ -143,14 +144,14 @@ internal async ValueTask FireSessionConnectedAsync()
await connectedEventHandler.Invoke(this, EventArgs.Empty);
}

ValueTask IAppSession.SendAsync(ReadOnlyMemory<byte> data)
ValueTask IAppSession.SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
{
return _connection.SendAsync(data);
return _connection.SendAsync(data, cancellationToken);
}

ValueTask IAppSession.SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package)
ValueTask IAppSession.SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken)
{
return _connection.SendAsync(packageEncoder, package);
return _connection.SendAsync(packageEncoder, package, cancellationToken);
}

void IAppSession.Reset()
Expand Down
4 changes: 2 additions & 2 deletions test/SuperSocket.Tests/UdpConnectionStream.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using System;
using System.IO;
using System.Net.Sockets;
using System.Threading;
using SuperSocket.Connection;
using SuperSocket.ProtoBase;

namespace SuperSocket.Tests
{
Expand Down Expand Up @@ -51,7 +51,7 @@ public override int Read(byte[] buffer, int offset, int count)
public override void Write(byte[] buffer, int offset, int count)
{
Connection
.SendAsync((new ArraySegment<byte>(buffer, offset, count)).AsMemory())
.SendAsync((new ArraySegment<byte>(buffer, offset, count)).AsMemory(), CancellationToken.None)
.GetAwaiter()
.GetResult();
}
Expand Down

0 comments on commit 24dcc13

Please sign in to comment.