Skip to content

Commit

Permalink
improved the code about DetachableConnection test
Browse files Browse the repository at this point in the history
  • Loading branch information
kerryjiang committed Mar 30, 2024
1 parent 8eb89fe commit 1d47795
Showing 1 changed file with 67 additions and 51 deletions.
118 changes: 67 additions & 51 deletions test/SuperSocket.Tests/ClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
using SuperSocket.Server.Host;
using SuperSocket.Tests.Command;
using SuperSocket.Connection;
using SuperSocket.Server.Abstractions;
using SuperSocket.Server.Abstractions.Session;
using System.Threading;
using Microsoft.Extensions.Logging.Abstractions;
using SuperSocket.WebSocket;
using SuperSocket.Kestrel;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;

namespace SuperSocket.Tests
{
Expand Down Expand Up @@ -269,12 +272,40 @@ public async Task TestCommandLine(Type hostConfiguratorType)
}
}

[Theory]
[InlineData(typeof(RegularHostConfigurator))]
[Fact]
[Trait("Category", "TestDetachableChannel")]
public async Task TestDetachableChannel(Type hostConfiguratorType)
public async Task TestDetachableChannel()
{
IHostConfigurator hostConfigurator = new RegularHostConfigurator();

await TestDetachableChannelInternal(hostConfigurator, (_, socket) =>
new StreamPipeConnection(
hostConfigurator.GetClientStream(socket).Result,
socket.RemoteEndPoint,
socket.LocalEndPoint,
new ConnectionOptions
{
Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableChannel)),
ReadAsDemand = true
})
);

/* KestrelPipeConnection doesn't support Detach right now.
await TestDetachableChannelInternal(new KestralConnectionHostConfigurator(), (server, socket) =>
new KestrelPipeConnection(
server.ServiceProvider.GetService<SocketConnectionContextFactory>().Create(socket),
new ConnectionOptions
{
Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableChannel)),
ReadAsDemand = false
}
)
);
*/
}

async Task TestDetachableChannelInternal(IHostConfigurator hostConfigurator, Func<IServer, Socket, IConnection> connectionFactory)
{
var hostConfigurator = CreateObject<IHostConfigurator>(hostConfiguratorType);
using (var server = CreateSocketServerBuilder<TextPackageInfo, LinePipelineFilter>(hostConfigurator)
.UsePackageHandler(async (s, p) =>
{
Expand All @@ -287,67 +318,52 @@ public async Task TestDetachableChannel(Type hostConfiguratorType)
Assert.True(await server.StartAsync());
OutputHelper.WriteLine("Server started.");

var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await socket.ConnectAsync(hostConfigurator.GetServerEndPoint());
var stream = await hostConfigurator.GetClientStream(socket);

var connection = new StreamPipeConnection(stream, socket.RemoteEndPoint, socket.LocalEndPoint, new ConnectionOptions
using (var socket = hostConfigurator.CreateClient())
{
Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableChannel)),
ReadAsDemand = true
});
var connection = connectionFactory(server, socket);

var packagePipe = connection.RunAsync(new LinePipelineFilter());
await TestConnection(connection);

var msg = Guid.NewGuid().ToString();
await connection.SendAsync(Utf8Encoding.GetBytes(msg + "\r\n"));
OutputHelper.WriteLine("Before DetachAsync");

var round = 0;
await connection.DetachAsync();

await foreach (var package in packagePipe)
{
Assert.NotNull(package);
Assert.Equal("PRE-" + msg, package.Text);
round++;
// the connection is still alive in the server
Assert.Equal(1, server.SessionCount);

if (round >= 10)
break;
// socket.Connected is is still connected
Assert.True(socket.Connected);

msg = Guid.NewGuid().ToString();
await connection.SendAsync(Utf8Encoding.GetBytes(msg + "\r\n"));
}
// Attach the socket with another connection
connection = connectionFactory(server, socket);

await TestConnection(connection);
}

OutputHelper.WriteLine("Before DetachAsync");
await server.StopAsync();
}
}

await connection.DetachAsync();

// the connection is still alive in the server
Assert.Equal(1, server.SessionCount);
async Task TestConnection(IConnection connection)
{
var packagePipe = connection.RunAsync(new LinePipelineFilter());

// socket.Connected is is still connected
Assert.True(socket.Connected);
var msg = Guid.NewGuid().ToString();
await connection.SendAsync(Utf8Encoding.GetBytes(msg + "\r\n"));

var ns = stream as DerivedNetworkStream;
Assert.True(ns.Socket.Connected);
var round = 0;

// the stream is still usable
using (var streamReader = new StreamReader(stream, Utf8Encoding, true))
using (var streamWriter = new StreamWriter(stream, Utf8Encoding, 1024 * 1024 * 4))
{
for (var i = 0; i < 10; i++)
{
var txt = Guid.NewGuid().ToString();
await streamWriter.WriteAsync(txt + "\r\n");
await streamWriter.FlushAsync();
OutputHelper.WriteLine($"Sent {(i + 1)} message over the detached network stream");
var line = await streamReader.ReadLineAsync();
Assert.Equal("PRE-" + txt, line);
OutputHelper.WriteLine($"Received {(i + 1)} message over the detached network stream");
}
}
await foreach (var package in packagePipe)
{
Assert.NotNull(package);
Assert.Equal("PRE-" + msg, package.Text);
round++;

await server.StopAsync();
if (round >= 10)
break;

msg = Guid.NewGuid().ToString();
await connection.SendAsync(Utf8Encoding.GetBytes(msg + "\r\n"));
}
}
}
Expand Down

0 comments on commit 1d47795

Please sign in to comment.