Skip to content

Commit

Permalink
added npackage handking cancellation token
Browse files Browse the repository at this point in the history
  • Loading branch information
kerryjiang committed Apr 14, 2024
1 parent 7f5d67a commit ca9439b
Show file tree
Hide file tree
Showing 28 changed files with 122 additions and 64 deletions.
3 changes: 2 additions & 1 deletion samples/CommandServer/ADD.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket;
using SuperSocket.Server.Abstractions.Session;
Expand All @@ -11,7 +12,7 @@ namespace CommandServer
[Command("add")]
public class ADD : IAsyncCommand<StringPackageInfo>
{
public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package)
public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken)
{
var result = package.Parameters
.Select(p => int.Parse(p))
Expand Down
3 changes: 2 additions & 1 deletion samples/CommandServer/MULT.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket;
using SuperSocket.Server.Abstractions.Session;
Expand All @@ -12,7 +13,7 @@ namespace CommandServer
public class MULT : IAsyncCommand<StringPackageInfo>
{

public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package)
public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken)
{
var result = package.Parameters
.Select(p => int.Parse(p))
Expand Down
3 changes: 2 additions & 1 deletion samples/CommandServer/SUB.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket;
using SuperSocket.Server.Abstractions.Session;
Expand All @@ -14,7 +15,7 @@ public class SUB : IAsyncCommand<StringPackageInfo>

public string Name => Key;

public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package)
public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken)
{
var result = package.Parameters
.Select(p => int.Parse(p))
Expand Down
3 changes: 3 additions & 0 deletions src/SuperSocket.Command/CommandExecutingContext.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using SuperSocket.Server.Abstractions.Session;

namespace SuperSocket.Command
Expand Down Expand Up @@ -27,5 +28,7 @@ public struct CommandExecutingContext
/// The exception.
/// </value>
public Exception Exception { get; set; }

public CancellationToken CancellationToken { get; set; }
}
}
28 changes: 15 additions & 13 deletions src/SuperSocket.Command/CommandMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using SuperSocket.ProtoBase;
using SuperSocket.Server.Abstractions;
using SuperSocket.Server.Abstractions.Session;
using SuperSocket.Server.Abstractions.Middleware;
using Microsoft.Extensions.Logging;

namespace SuperSocket.Command
{
Expand Down Expand Up @@ -195,31 +196,31 @@ protected virtual IPackageMapper<TNetPackageInfo, TPackageInfo> CreatePackageMap
return serviceProvider.GetService<IPackageMapper<TNetPackageInfo, TPackageInfo>>();
}

protected virtual async ValueTask HandlePackage(IAppSession session, TPackageInfo package)
protected virtual async ValueTask HandlePackage(IAppSession session, TPackageInfo package, CancellationToken cancellationToken)
{
if (!_commands.TryGetValue(package.Key, out ICommandSet commandSet))
{
return;
}

await commandSet.ExecuteAsync(session, package);
await commandSet.ExecuteAsync(session, package, cancellationToken);
}

protected virtual async Task OnPackageReceived(IAppSession session, TPackageInfo package)
protected virtual async Task OnPackageReceived(IAppSession session, TPackageInfo package, CancellationToken cancellationToken)
{
await HandlePackage(session, package);
await HandlePackage(session, package, cancellationToken);
}

ValueTask IPackageHandler<TNetPackageInfo>.Handle(IAppSession session, TNetPackageInfo package)
ValueTask IPackageHandler<TNetPackageInfo>.Handle(IAppSession session, TNetPackageInfo package, CancellationToken cancellationToken)
{
return HandlePackage(session, PackageMapper.Map(package));
return HandlePackage(session, PackageMapper.Map(package), cancellationToken);
}

interface ICommandSet
{
TKey Key { get; }

ValueTask ExecuteAsync(IAppSession session, TPackageInfo package);
ValueTask ExecuteAsync(IAppSession session, TPackageInfo package, CancellationToken cancellationToken);
}

class CommandTypeInfo
Expand Down Expand Up @@ -392,11 +393,11 @@ public void Initialize(IServiceProvider serviceProvider, CommandTypeInfo command
Filters = filters;
}

public async ValueTask ExecuteAsync(IAppSession session, TPackageInfo package)
public async ValueTask ExecuteAsync(IAppSession session, TPackageInfo package, CancellationToken cancellationToken)
{
if (Filters.Count > 0)
{
await ExecuteAsyncWithFilter(session, package);
await ExecuteAsyncWithFilter(session, package, cancellationToken);
return;
}

Expand All @@ -406,18 +407,19 @@ public async ValueTask ExecuteAsync(IAppSession session, TPackageInfo package)

if (asyncCommand != null)
{
await asyncCommand.ExecuteAsync(appSession, package);
await asyncCommand.ExecuteAsync(appSession, package, cancellationToken);
return;
}

Command.Execute(appSession, package);
}

private async ValueTask ExecuteAsyncWithFilter(IAppSession session, TPackageInfo package)
private async ValueTask ExecuteAsyncWithFilter(IAppSession session, TPackageInfo package, CancellationToken cancellationToken)
{
var context = new CommandExecutingContext();
context.Package = package;
context.Session = session;
context.CancellationToken = cancellationToken;

var command = AsyncCommand != null ? (AsyncCommand as ICommand) : (Command as ICommand);

Expand Down Expand Up @@ -457,7 +459,7 @@ private async ValueTask ExecuteAsyncWithFilter(IAppSession session, TPackageInfo

if (asyncCommand != null)
{
await asyncCommand.ExecuteAsync(appSession, package);
await asyncCommand.ExecuteAsync(appSession, package, cancellationToken);
}
else
{
Expand Down
5 changes: 3 additions & 2 deletions src/SuperSocket.Command/CommandWrap.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using SuperSocket.Server.Abstractions.Session;
Expand Down Expand Up @@ -55,9 +56,9 @@ public AsyncCommandWrap(IServiceProvider serviceProvider)
InnerCommand = (TAsyncCommand)ActivatorUtilities.CreateInstance(serviceProvider, typeof(TAsyncCommand));
}

public async ValueTask ExecuteAsync(TAppSession session, TPackageInfo package)
public async ValueTask ExecuteAsync(TAppSession session, TPackageInfo package, CancellationToken cancellationToken)
{
await InnerCommand.ExecuteAsync(session, package);
await InnerCommand.ExecuteAsync(session, package, cancellationToken);
}

ICommand ICommandWrap.InnerCommand
Expand Down
3 changes: 2 additions & 1 deletion src/SuperSocket.Command/ICommand.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.Server.Abstractions.Session;

Expand Down Expand Up @@ -28,6 +29,6 @@ public interface IAsyncCommand<TPackageInfo> : IAsyncCommand<IAppSession, TPacka
public interface IAsyncCommand<TAppSession, TPackageInfo> : ICommand
where TAppSession : IAppSession
{
ValueTask ExecuteAsync(TAppSession session, TPackageInfo package);
ValueTask ExecuteAsync(TAppSession session, TPackageInfo package, CancellationToken cancellationToken);
}
}
9 changes: 5 additions & 4 deletions src/SuperSocket.Command/JsonCommand.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using System.Text.Json;
using SuperSocket.ProtoBase;
using SuperSocket.Server.Abstractions.Session;
Expand Down Expand Up @@ -55,17 +56,17 @@ public JsonAsyncCommand()
};
}

public virtual async ValueTask ExecuteAsync(TAppSession session, IStringPackage package)
public virtual async ValueTask ExecuteAsync(TAppSession session, IStringPackage package, CancellationToken cancellationToken)
{
var content = package.Body;
await ExecuteJsonAsync(session, string.IsNullOrEmpty(content) ? default(TJsonObject) : Deserialize(content));
await ExecuteJsonAsync(session, string.IsNullOrEmpty(content) ? default(TJsonObject) : Deserialize(content), cancellationToken);
}

protected virtual TJsonObject Deserialize(string content)
{
return JsonSerializer.Deserialize<TJsonObject>(content, JsonSerializerOptions);
}

protected abstract ValueTask ExecuteJsonAsync(TAppSession session, TJsonObject jsonObject);
protected abstract ValueTask ExecuteJsonAsync(TAppSession session, TJsonObject jsonObject, CancellationToken cancellationToken);
}
}
13 changes: 9 additions & 4 deletions src/SuperSocket.Server.Abstractions/DelegatePackageHandler.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.Server.Abstractions.Session;

namespace SuperSocket.Server.Abstractions
{
public class DelegatePackageHandler<TReceivePackageInfo> : IPackageHandler<TReceivePackageInfo>
{

Func<IAppSession, TReceivePackageInfo, ValueTask> _func;
Func<IAppSession, TReceivePackageInfo, CancellationToken, ValueTask> _func;

public DelegatePackageHandler(Func<IAppSession, TReceivePackageInfo, ValueTask> func)
{
_func = (session, package, cancellationToken) => func(session, package);
}

public DelegatePackageHandler(Func<IAppSession, TReceivePackageInfo, CancellationToken, ValueTask> func)
{
_func = func;
}

public async ValueTask Handle(IAppSession session, TReceivePackageInfo package)
public async ValueTask Handle(IAppSession session, TReceivePackageInfo package, CancellationToken cancellationToken)
{
await _func(session, package);
await _func(session, package, cancellationToken);
}
}
}
3 changes: 2 additions & 1 deletion src/SuperSocket.Server.Abstractions/IPackageHandler.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.Server.Abstractions.Session;

namespace SuperSocket.Server.Abstractions
{
public interface IPackageHandler<TReceivePackageInfo>
{
ValueTask Handle(IAppSession session, TReceivePackageInfo package);
ValueTask Handle(IAppSession session, TReceivePackageInfo package, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.Connection;
using SuperSocket.Server.Abstractions.Session;
Expand All @@ -11,6 +12,6 @@ public interface IPackageHandlingScheduler<TPackageInfo>
{
void Initialize(IPackageHandler<TPackageInfo> packageHandler, Func<IAppSession, PackageHandlingException<TPackageInfo>, ValueTask<bool>> errorHandler);

ValueTask HandlePackage(IAppSession session, TPackageInfo package);
ValueTask HandlePackage(IAppSession session, TPackageInfo package, CancellationToken cancellationToken);
}
}
5 changes: 5 additions & 0 deletions src/SuperSocket.Server.Abstractions/ServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,10 @@ public class ServerOptions : ConnectionOptions
public int ClearIdleSessionInterval { get; set; } = 120;

public int IdleSessionTimeOut { get; set; } = 300;

/// <summary>
/// In seconds.
/// </summary>
public int PackageHandlingTimeOut { get; set; } = 30;
}
}
5 changes: 3 additions & 2 deletions src/SuperSocket.Server/ConcurrentPackageHandlingScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.Server.Abstractions.Session;

namespace SuperSocket.Server
{
public class ConcurrentPackageHandlingScheduler<TPackageInfo> : PackageHandlingSchedulerBase<TPackageInfo>
{
public override ValueTask HandlePackage(IAppSession session, TPackageInfo package)
public override ValueTask HandlePackage(IAppSession session, TPackageInfo package, CancellationToken cancellationToken)
{
HandlePackageInternal(session, package).DoNotAwait();
HandlePackageInternal(session, package, cancellationToken).DoNotAwait();
return new ValueTask();
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/SuperSocket.Server/PackageHandlingSchedulerBase.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.Connection;
using SuperSocket.Server.Abstractions;
Expand All @@ -14,23 +15,23 @@ public abstract class PackageHandlingSchedulerBase<TPackageInfo> : IPackageHandl

public Func<IAppSession, PackageHandlingException<TPackageInfo>, ValueTask<bool>> ErrorHandler { get; private set; }

public abstract ValueTask HandlePackage(IAppSession session, TPackageInfo package);
public abstract ValueTask HandlePackage(IAppSession session, TPackageInfo package, CancellationToken cancellationToken);

public virtual void Initialize(IPackageHandler<TPackageInfo> packageHandler, Func<IAppSession, PackageHandlingException<TPackageInfo>, ValueTask<bool>> errorHandler)
{
PackageHandler = packageHandler;
ErrorHandler = errorHandler;
}

protected async ValueTask HandlePackageInternal(IAppSession session, TPackageInfo package)
protected async ValueTask HandlePackageInternal(IAppSession session, TPackageInfo package, CancellationToken cancellationToken)
{
var packageHandler = PackageHandler;
var errorHandler = ErrorHandler;

try
{
if (packageHandler != null)
await packageHandler.Handle(session, package);
await packageHandler.Handle(session, package, cancellationToken);
}
catch (Exception e)
{
Expand Down
5 changes: 3 additions & 2 deletions src/SuperSocket.Server/SerialPackageHandlingScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.Server.Abstractions.Session;

namespace SuperSocket.Server
{
public class SerialPackageHandlingScheduler<TPackageInfo> : PackageHandlingSchedulerBase<TPackageInfo>
{
public override async ValueTask HandlePackage(IAppSession session, TPackageInfo package)
public override async ValueTask HandlePackage(IAppSession session, TPackageInfo package, CancellationToken cancellationToken)
{
await HandlePackageInternal(session, package);
await HandlePackageInternal(session, package, cancellationToken);
}
}
}
Loading

0 comments on commit ca9439b

Please sign in to comment.