Skip to content

Commit

Permalink
Support LastActiveTime for KestrelPipeConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
kerryjiang committed Mar 30, 2024
1 parent 9f8ae75 commit 8eb89fe
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 12 deletions.
16 changes: 12 additions & 4 deletions src/SuperSocket.Connection/PipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Buffers;
using System.Collections.Generic;
using SuperSocket.ProtoBase;
using System.IO.Pipelines;
using System.Runtime.InteropServices;

namespace SuperSocket.Connection
{
Expand Down Expand Up @@ -109,7 +108,7 @@ internal virtual async Task FillPipeAsync(PipeWriter writer, ISupplyController s
break;
}

LastActiveTime = DateTimeOffset.Now;
UpdateLastActiveTime();

// Tell the PipeWriter how much was read
writer.Advance(bytesRead);
Expand Down Expand Up @@ -164,7 +163,7 @@ protected async ValueTask<bool> ProcessOutputRead(PipeReader reader)
try
{
await SendOverIOAsync(buffer, CancellationToken.None).ConfigureAwait(false); ;
LastActiveTime = DateTimeOffset.Now;
UpdateLastActiveTime();
}
catch (Exception e)
{
Expand All @@ -184,6 +183,15 @@ protected async ValueTask<bool> ProcessOutputRead(PipeReader reader)

protected abstract ValueTask<int> SendOverIOAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken);

protected internal ArraySegment<T> GetArrayByMemory<T>(ReadOnlyMemory<T> memory)
{
if (!MemoryMarshal.TryGetArray(memory, out var result))
{
throw new InvalidOperationException("Buffer backed by array was expected");
}

return result;
}
protected override bool IsIgnorableException(Exception e)
{
if (base.IsIgnorableException(e))
Expand Down
15 changes: 7 additions & 8 deletions src/SuperSocket.Connection/PipeConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Threading.Tasks;
using System.IO.Pipelines;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using Microsoft.Extensions.Logging;
using SuperSocket.ProtoBase;

Expand Down Expand Up @@ -60,6 +59,11 @@ protected virtual Task StartTask<TPackageInfo>(IObjectPipe<TPackageInfo> package
return StartInputPipeTask(packagePipe, _cts.Token);
}

protected void UpdateLastActiveTime()
{
LastActiveTime = DateTimeOffset.Now;
}

public async override IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPipelineFilter<TPackageInfo> pipelineFilter)
{
var packagePipe = !Options.ReadAsDemand
Expand Down Expand Up @@ -211,14 +215,8 @@ protected void WritePackageWithEncoder<TPackage>(IBufferWriter<byte> writer, IPa
packageEncoder.Encode(writer, package);
}

protected internal ArraySegment<T> GetArrayByMemory<T>(ReadOnlyMemory<T> memory)
protected virtual void OnInputPipeRead(ReadResult result)
{
if (!MemoryMarshal.TryGetArray(memory, out var result))
{
throw new InvalidOperationException("Buffer backed by array was expected");
}

return result;
}

protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<TPackageInfo> packagePipe, CancellationToken cancellationToken)
Expand All @@ -232,6 +230,7 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
try
{
result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
OnInputPipeRead(result);
}
catch (Exception e)
{
Expand Down
28 changes: 28 additions & 0 deletions src/SuperSocket.Kestrel/KestrelPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using SuperSocket.Connection;
using SuperSocket.ProtoBase;

public class KestrelPipeConnection : PipeConnectionBase
{
Expand Down Expand Up @@ -39,4 +41,30 @@ protected override void OnClosed()

base.OnClosed();
}

protected override void OnInputPipeRead(ReadResult result)
{
if (!result.IsCanceled && !result.IsCompleted)
{
UpdateLastActiveTime();
}
}

public override async ValueTask SendAsync(Action<PipeWriter> write)
{
await base.SendAsync(write);
UpdateLastActiveTime();
}

public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer)
{
await base.SendAsync(buffer);
UpdateLastActiveTime();
}

public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package)
{
await base.SendAsync(packageEncoder, package);
UpdateLastActiveTime();
}
}

0 comments on commit 8eb89fe

Please sign in to comment.