Skip to content

Commit

Permalink
🧹 Refectoring Servers.
Browse files Browse the repository at this point in the history
  • Loading branch information
Texnomic committed Jul 3, 2024
1 parent bc7938c commit 10c8c2d
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 235 deletions.
21 changes: 21 additions & 0 deletions Texnomic.SecureDNS.Servers/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Global using directives

global using System.Buffers.Binary;
global using System.Diagnostics;
global using System.Net;
global using System.Net.Sockets;
global using System.Runtime.InteropServices;
global using System.Text.Json.Serialization;
global using System.Threading.Tasks.Dataflow;
global using Microsoft.Extensions.Hosting;
global using Microsoft.Extensions.Options;
global using Nethereum.Util;
global using PipelineNet.MiddlewareResolver;
global using Serilog;
global using Texnomic.SecureDNS.Abstractions;
global using Texnomic.SecureDNS.Abstractions.Enums;
global using Texnomic.SecureDNS.Core;
global using Texnomic.SecureDNS.Extensions;
global using Texnomic.SecureDNS.Serialization;
global using Texnomic.SecureDNS.Servers.Proxy.Options;
global using Texnomic.SecureDNS.Servers.Proxy.ResponsibilityChain;
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using PipelineNet.Middleware;
using Texnomic.SecureDNS.Abstractions;

namespace Texnomic.SecureDNS.Servers.Proxy.Options;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
using System.Net;
using System.Text.Json.Serialization;

namespace Texnomic.SecureDNS.Servers.Proxy.Options;
namespace Texnomic.SecureDNS.Servers.Proxy.Options;

public class ProxyServerOptions
{
Expand All @@ -10,7 +7,7 @@ public class ProxyServerOptions
public int Port { get; set; } = 53;

[JsonIgnore]
public IPEndPoint IPEndPoint => new IPEndPoint(IPAddress.Parse(Address), Port);
public IPEndPoint IPEndPoint => new(IPAddress.Parse(Address), Port);

public static int Threads { get; set; } = Environment.ProcessorCount;
public int Threads { get; set; } = Environment.ProcessorCount;
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
using Microsoft.Extensions.Options;
using PipelineNet.ChainsOfResponsibility;
using PipelineNet.MiddlewareResolver;
using Texnomic.SecureDNS.Abstractions;
using Texnomic.SecureDNS.Abstractions.Enums;
using Texnomic.SecureDNS.Servers.Proxy.Options;
using PipelineNet.ChainsOfResponsibility;

namespace Texnomic.SecureDNS.Servers.Proxy.ResponsibilityChain;

Expand Down
84 changes: 19 additions & 65 deletions Texnomic.SecureDNS.Servers/Proxy/TCPServer.cs
Original file line number Diff line number Diff line change
@@ -1,78 +1,33 @@
using System.Buffers.Binary;
using System.Net.Sockets;
using System.Threading.Tasks.Dataflow;

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;

using Nethereum.Util;

using PipelineNet.MiddlewareResolver;

using Serilog;

using Texnomic.SecureDNS.Abstractions;
using Texnomic.SecureDNS.Abstractions.Enums;
using Texnomic.SecureDNS.Core;
using Texnomic.SecureDNS.Extensions;
using Texnomic.SecureDNS.Serialization;
using Texnomic.SecureDNS.Servers.Proxy.Options;
using Texnomic.SecureDNS.Servers.Proxy.ResponsibilityChain;

namespace Texnomic.SecureDNS.Servers.Proxy;

public sealed class TCPServer : IHostedService, IDisposable
namespace Texnomic.SecureDNS.Servers.Proxy;

public sealed class TCPServer(
IOptionsMonitor<ProxyResponsibilityChainOptions> ProxyResponsibilityChainOptions,
IOptionsMonitor<ProxyServerOptions> ProxyServerOptions,
IMiddlewareResolver MiddlewareResolver,
ILogger Logger)
: IHostedService, IDisposable
{
private readonly ILogger Logger;
private readonly List<Task> Workers;
private readonly IOptionsMonitor<ProxyServerOptions> Options;
private readonly IMiddlewareResolver MiddlewareResolver;
private readonly IOptionsMonitor<ProxyResponsibilityChainOptions> ProxyResponsibilityChainOptions;
private readonly BufferBlock<(IMessage, Socket)> IncomingQueue;
private readonly BufferBlock<(IMessage, Socket)> OutgoingQueue;
private readonly List<Task> Workers = [];
private readonly BufferBlock<(IMessage, Socket)> IncomingQueue = new();
private readonly BufferBlock<(IMessage, Socket)> OutgoingQueue = new();
private readonly TcpListener TcpListener = new(ProxyServerOptions.CurrentValue.IPEndPoint);

private CancellationToken CancellationToken;

private TcpListener TcpListener;

public TCPServer(IOptionsMonitor<ProxyResponsibilityChainOptions> ProxyResponsibilityChainOptions,
IOptionsMonitor<ProxyServerOptions> ProxyServerOptions,
IMiddlewareResolver MiddlewareResolver,
ILogger Logger)
{
Options = ProxyServerOptions;

this.MiddlewareResolver = MiddlewareResolver;

this.ProxyResponsibilityChainOptions = ProxyResponsibilityChainOptions;

this.Logger = Logger;

Workers = new List<Task>();

IncomingQueue = new BufferBlock<(IMessage, Socket)>();

OutgoingQueue = new BufferBlock<(IMessage, Socket)>();
}

public async Task StartAsync(CancellationToken Token)
{
CancellationToken = Token;

TcpListener = new TcpListener(Options.CurrentValue.IPEndPoint);

//TcpListener.Server.Bind(Options.CurrentValue.IPEndPoint);

TcpListener.Start();

for (var I = 0; I < ProxyServerOptions.Threads; I++)
for (var I = 0; I < ProxyServerOptions.CurrentValue.Threads; I++)
{
Workers.Add(Task.Factory.StartNew(ReceiveAsync, CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap());
Workers.Add(Task.Factory.StartNew(ResolveAsync, CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap());
Workers.Add(Task.Factory.StartNew(SendAsync, CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap());
}

Logger?.Information("TCP Server Started with {@Threads} Threads. Listening On {@IPEndPoint}", ProxyServerOptions.Threads, Options.CurrentValue.IPEndPoint.ToString());
Logger?.Information("TCP Server Started with {@Threads} Threads. Listening On {@IPEndPoint}", ProxyServerOptions.CurrentValue.Threads, ProxyServerOptions.CurrentValue.IPEndPoint.ToString());

await Task.Yield();
}
Expand Down Expand Up @@ -135,12 +90,11 @@ private async Task ReceiveAsync()
{
Thread.CurrentThread.Name = "Receiver";


while (!CancellationToken.IsCancellationRequested)
{
try
{
var ClientSocket = await TcpListener.AcceptSocketAsync();
var ClientSocket = await TcpListener.AcceptSocketAsync(CancellationToken);

var Prefix = new byte[2];

Expand Down Expand Up @@ -183,7 +137,7 @@ private async Task ResolveAsync()
{
try
{
(Query, ClientSocket) = await IncomingQueue.ReceiveAsync(CancellationToken).WithCancellation(CancellationToken);
(Query, ClientSocket) = await IncomingQueue.ReceiveAsync(CancellationToken);

var Answer = await ResponsibilityChain.Execute(Query);

Expand Down Expand Up @@ -220,7 +174,7 @@ private async Task SendAsync()
{
try
{
var (Answer, ClientSocket) = await OutgoingQueue.ReceiveAsync(CancellationToken).WithCancellation(CancellationToken);
var (Answer, ClientSocket) = await OutgoingQueue.ReceiveAsync(CancellationToken);

var Bytes = Serialize(Answer);

Expand All @@ -245,7 +199,7 @@ private async Task SendAsync()
}
}

private IMessage Handle(Exception Error, ushort ID, string Stage, ResponseCode Response)
private Message Handle(Exception Error, ushort ID, string Stage, ResponseCode Response)
{
Logger?.Error(Error, $"{@Error} Occurred While {Stage} Message.", Error);

Expand All @@ -257,7 +211,7 @@ private IMessage Handle(Exception Error, ushort ID, string Stage, ResponseCode R
};
}

private IMessage Handle(Exception Error, byte[] Bytes, string Stage, ResponseCode Response)
private Message Handle(Exception Error, byte[] Bytes, string Stage, ResponseCode Response)
{
Logger?.Error(Error, $"{@Error} Occurred While {Stage} {@Bytes}.", Error, BitConverter.ToString(Bytes).Replace("-", ", 0x"));

Expand Down
59 changes: 11 additions & 48 deletions Texnomic.SecureDNS.Servers/Proxy/TCPServer2.cs
Original file line number Diff line number Diff line change
@@ -1,64 +1,27 @@
using System.Buffers.Binary;
using System.Net.Sockets;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using PipelineNet.MiddlewareResolver;
namespace Texnomic.SecureDNS.Servers.Proxy;

using Serilog;
using Texnomic.SecureDNS.Abstractions.Enums;
using Texnomic.SecureDNS.Core;
using Texnomic.SecureDNS.Extensions;
using Texnomic.SecureDNS.Serialization;
using Texnomic.SecureDNS.Servers.Proxy.Options;
using Texnomic.SecureDNS.Servers.Proxy.ResponsibilityChain;


namespace Texnomic.SecureDNS.Servers.Proxy;

public sealed class TCPServer2 : IHostedService, IDisposable
public sealed class TCPServer2(IOptionsMonitor<ProxyResponsibilityChainOptions> ProxyResponsibilityChainOptions,
IOptionsMonitor<ProxyServerOptions> ProxyServerOptions,
IMiddlewareResolver MiddlewareResolver,
ILogger Logger) : IHostedService, IDisposable
{
private readonly ILogger Logger;
private readonly List<Task> Threads;
private readonly IOptionsMonitor<ProxyServerOptions> Options;
private readonly IMiddlewareResolver MiddlewareResolver;
private readonly IOptionsMonitor<ProxyResponsibilityChainOptions> ProxyResponsibilityChainOptions;
private readonly TcpListener TcpListener;
private readonly List<Task> Threads = [];
private readonly TcpListener TcpListener = new(ProxyServerOptions.CurrentValue.IPEndPoint);

private CancellationToken CancellationToken;


public TCPServer2(IOptionsMonitor<ProxyResponsibilityChainOptions> ProxyResponsibilityChainOptions,
IOptionsMonitor<ProxyServerOptions> ProxyServerOptions,
IMiddlewareResolver MiddlewareResolver,
ILogger Logger)
{
Options = ProxyServerOptions;

this.MiddlewareResolver = MiddlewareResolver;

this.ProxyResponsibilityChainOptions = ProxyResponsibilityChainOptions;

this.Logger = Logger;

Threads = new List<Task>();

TcpListener = new TcpListener(Options.CurrentValue.IPEndPoint);
}

public async Task StartAsync(CancellationToken Token)
{
CancellationToken = Token;

//TcpListener.Server.Listen();

TcpListener.Start();

for (var I = 0; I < ProxyServerOptions.Threads; I++)
for (var I = 0; I < ProxyServerOptions.CurrentValue.Threads; I++)
{
Threads.Add(Task.Factory.StartNew(ResolveAsync, CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap());
}

Logger?.Information("TCP Server Started with {@Threads} Threads. Listening On {@IPEndPoint}", ProxyServerOptions.Threads, Options.CurrentValue.IPEndPoint.ToString());
Logger?.Information("TCP Server Started with {@Threads} Threads. Listening On {@IPEndPoint}", ProxyServerOptions.CurrentValue.Threads, ProxyServerOptions.CurrentValue.IPEndPoint.ToString());

await Task.Yield();
}
Expand Down Expand Up @@ -92,7 +55,7 @@ private async Task ResolveAsync()
Prefix = new byte[2];

await Client.ReceiveAsync(Prefix, SocketFlags.None);

var Size = BinaryPrimitives.ReadUInt16BigEndian(Prefix);

Buffer = new byte[Size];
Expand Down Expand Up @@ -135,7 +98,7 @@ private async Task ResolveAsync()

var RawMessage = DnSerializer.Serialize(Message);

BinaryPrimitives.WriteUInt16BigEndian(Prefix, (ushort) RawMessage.Length);
BinaryPrimitives.WriteUInt16BigEndian(Prefix, (ushort)RawMessage.Length);

await Client.SendAsync(Prefix, SocketFlags.None);

Expand Down
Loading

0 comments on commit 10c8c2d

Please sign in to comment.