Skip to content

Commit 051e5ad

Browse files
committed
Try making SuperSocket.Kestrel support Detach
1 parent 4dd771d commit 051e5ad

File tree

3 files changed

+16
-30
lines changed

3 files changed

+16
-30
lines changed

src/SuperSocket.Connection/IObjectPipe.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ public interface IObjectPipe<T> : IObjectPipe
2020
ValueTask<T> ReadAsync();
2121
}
2222

23-
interface ISupplyController
23+
public interface ISupplyController
2424
{
2525
ValueTask SupplyRequired();
2626

2727
void SupplyEnd();
2828
}
29-
}
29+
}

src/SuperSocket.Connection/PipeConnectionBase.cs

+3-24
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ IPipelineFilter IPipeConnection.PipelineFilter
4747

4848
private bool _isDetaching = false;
4949

50-
private ISupplyController _supplyController;
51-
5250
protected PipeConnectionBase(PipeReader inputReader, PipeWriter outputWriter, ConnectionOptions options)
5351
{
5452
Options = options;
@@ -70,17 +68,9 @@ protected void UpdateLastActiveTime()
7068

7169
public async override IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPipelineFilter<TPackageInfo> pipelineFilter)
7270
{
73-
IObjectPipe<TPackageInfo> packagePipe;
74-
if (Options.ReadAsDemand)
75-
{
76-
var defaultObjectPipe = new DefaultObjectPipeWithSupplyControl<TPackageInfo>();
77-
_supplyController = defaultObjectPipe;
78-
packagePipe = defaultObjectPipe;
79-
}
80-
else
81-
{
82-
packagePipe = new DefaultObjectPipe<TPackageInfo>();
83-
}
71+
var packagePipe = !Options.ReadAsDemand
72+
? new DefaultObjectPipe<TPackageInfo>()
73+
: new DefaultObjectPipeWithSupplyControl<TPackageInfo>();
8474

8575
_packagePipe = packagePipe;
8676
_pipelineFilter = pipelineFilter;
@@ -423,16 +413,5 @@ protected void OnError(string message, Exception e = null)
423413
else
424414
Logger?.LogError(message);
425415
}
426-
427-
protected void SupplyEnd()
428-
{
429-
_supplyController?.SupplyEnd();
430-
}
431-
432-
protected async Task SupplyRequiredAsync()
433-
{
434-
if (_supplyController != null)
435-
await _supplyController.SupplyRequired().ConfigureAwait(false);
436-
}
437416
}
438417
}

src/SuperSocket.Kestrel/KestrelPipeConnection.cs

+11-4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
public class KestrelPipeConnection : PipeConnectionBase
1414
{
1515
private ConnectionContext _context;
16+
private ISupplyController _supplyController;
1617

1718
public KestrelPipeConnection(ConnectionContext context, ConnectionOptions options)
1819
: base(context.Transport.Input, context.Transport.Output, options)
@@ -49,7 +50,8 @@ protected override async Task OnInputPipeReadAsync(ReadResult result)
4950
if (result is { IsCanceled: false, IsCompleted: false })
5051
UpdateLastActiveTime();
5152

52-
await SupplyRequiredAsync();
53+
if (_supplyController != null)
54+
await _supplyController.SupplyRequired().ConfigureAwait(false);
5355
}
5456

5557
public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken)
@@ -64,7 +66,8 @@ public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, Cancellat
6466
UpdateLastActiveTime();
6567
}
6668

67-
public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken)
69+
public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package,
70+
CancellationToken cancellationToken)
6871
{
6972
await base.SendAsync(packageEncoder, package, cancellationToken);
7073
UpdateLastActiveTime();
@@ -93,7 +96,11 @@ private void OnConnectionClosed()
9396
protected override Task StartInputPipeTask<TPackageInfo>(IObjectPipe<TPackageInfo> packagePipe,
9497
CancellationToken cancellationToken)
9598
{
96-
cancellationToken.Register(SupplyEnd);
99+
_supplyController = packagePipe as ISupplyController;
100+
101+
if (_supplyController != null)
102+
cancellationToken.Register(() => _supplyController.SupplyEnd());
103+
97104
return base.StartInputPipeTask(packagePipe, cancellationToken);
98105
}
99-
}
106+
}

0 commit comments

Comments
 (0)