From 4cea48cc3d2563fc173338c90ca66bcb89909133 Mon Sep 17 00:00:00 2001 From: Kerry Jiang Date: Sun, 14 Apr 2024 17:43:26 -0700 Subject: [PATCH] added cancellation token for websocket send async method --- .../WebSocketPackageHandler.cs | 4 +- .../WebSocketSession.cs | 38 ++++++++++--------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/src/SuperSocket.WebSocket.Server/WebSocketPackageHandler.cs b/src/SuperSocket.WebSocket.Server/WebSocketPackageHandler.cs index 1f0966566..cf366a6de 100644 --- a/src/SuperSocket.WebSocket.Server/WebSocketPackageHandler.cs +++ b/src/SuperSocket.WebSocket.Server/WebSocketPackageHandler.cs @@ -129,7 +129,7 @@ public async ValueTask Handle(IAppSession session, WebSocketPackage package, Can try { - await websocketSession.SendAsync(package); + await websocketSession.SendAsync(package, cancellationToken); } catch (InvalidOperationException) { @@ -146,7 +146,7 @@ public async ValueTask Handle(IAppSession session, WebSocketPackage package, Can else if (package.OpCode == OpCode.Ping) { package.OpCode = OpCode.Pong; - await websocketSession.SendAsync(package); + await websocketSession.SendAsync(package, cancellationToken); return; } else if (package.OpCode == OpCode.Pong) diff --git a/src/SuperSocket.WebSocket.Server/WebSocketSession.cs b/src/SuperSocket.WebSocket.Server/WebSocketSession.cs index b3912348b..e34c9e52b 100644 --- a/src/SuperSocket.WebSocket.Server/WebSocketSession.cs +++ b/src/SuperSocket.WebSocket.Server/WebSocketSession.cs @@ -2,6 +2,7 @@ using System.Buffers; using System.Collections.Specialized; using System.Text; +using System.Threading; using System.Threading.Tasks; using SuperSocket.ProtoBase; using SuperSocket.Server; @@ -33,30 +34,32 @@ public string Path internal IPackageEncoder MessageEncoder { get; set; } - public virtual ValueTask SendAsync(WebSocketPackage message) + public virtual ValueTask SendAsync(WebSocketPackage message, CancellationToken cancellationToken = default) { - return this.Connection.SendAsync(MessageEncoder, message); + return this.Connection.SendAsync(MessageEncoder, message, cancellationToken); } - public virtual ValueTask SendAsync(string message) + public virtual ValueTask SendAsync(string message, CancellationToken cancellationToken = default) { return SendAsync(new WebSocketPackage - { - OpCode = OpCode.Text, - Message = message, - }); + { + OpCode = OpCode.Text, + Message = message, + }, + cancellationToken); } - public virtual ValueTask SendAsync(ReadOnlyMemory data) + public virtual ValueTask SendAsync(ReadOnlyMemory data, CancellationToken cancellationToken = default) { return SendAsync(new WebSocketPackage - { - OpCode = OpCode.Binary, - Data = new ReadOnlySequence(data), - }); + { + OpCode = OpCode.Binary, + Data = new ReadOnlySequence(data), + }, + cancellationToken); } - public ValueTask CloseAsync(CloseReason reason, string reasonText = null) + public ValueTask CloseAsync(CloseReason reason, string reasonText = null, CancellationToken cancellationToken = default) { var closeReasonCode = (short)reason; @@ -90,10 +93,11 @@ public ValueTask CloseAsync(CloseReason reason, string reasonText = null) OnCloseHandshakeStarted(); return SendAsync(new WebSocketPackage - { - OpCode = OpCode.Close, - Data = new ReadOnlySequence(buffer, 0, length) - }); + { + OpCode = OpCode.Close, + Data = new ReadOnlySequence(buffer, 0, length) + }, + cancellationToken); } private void OnCloseHandshakeStarted()