-
-
Notifications
You must be signed in to change notification settings - Fork 144
Making LookupClient disposable and improved TCP client pool #220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,18 +11,22 @@ namespace DnsClient | |
| { | ||
| internal class DnsTcpMessageHandler : DnsMessageHandler | ||
| { | ||
| private bool _disposedValue = false; | ||
| private readonly ConcurrentDictionary<IPEndPoint, ClientPool> _pools = new ConcurrentDictionary<IPEndPoint, ClientPool>(); | ||
|
|
||
| public override DnsMessageHandleType Type { get; } = DnsMessageHandleType.TCP; | ||
|
|
||
| public override DnsResponseMessage Query(IPEndPoint server, DnsRequestMessage request, TimeSpan timeout) | ||
| { | ||
| CancellationToken cancellationToken = default; | ||
| if (_disposedValue) | ||
| { | ||
| throw new ObjectDisposedException(nameof(DnsTcpMessageHandler)); | ||
| } | ||
|
|
||
| using var cts = timeout.TotalMilliseconds != Timeout.Infinite && timeout.TotalMilliseconds < int.MaxValue ? | ||
| new CancellationTokenSource(timeout) : null; | ||
|
|
||
| cancellationToken = cts?.Token ?? default; | ||
| var cancellationToken = cts?.Token ?? default; | ||
|
|
||
| ClientPool pool; | ||
| while (!_pools.TryGetValue(server, out pool)) | ||
|
|
@@ -32,7 +36,7 @@ public override DnsResponseMessage Query(IPEndPoint server, DnsRequestMessage re | |
|
|
||
| cancellationToken.ThrowIfCancellationRequested(); | ||
|
|
||
| var entry = pool.GetNextClient(); | ||
| var entry = pool.GetNextClient(cancellationToken); | ||
|
|
||
| using var cancelCallback = cancellationToken.Register(() => | ||
| { | ||
|
|
@@ -69,6 +73,11 @@ public override async Task<DnsResponseMessage> QueryAsync( | |
| DnsRequestMessage request, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| if (_disposedValue) | ||
| { | ||
| throw new ObjectDisposedException(nameof(DnsTcpMessageHandler)); | ||
| } | ||
|
|
||
| cancellationToken.ThrowIfCancellationRequested(); | ||
|
|
||
| ClientPool pool; | ||
|
|
@@ -77,7 +86,7 @@ public override async Task<DnsResponseMessage> QueryAsync( | |
| _pools.TryAdd(server, new ClientPool(true, server)); | ||
| } | ||
|
|
||
| var entry = await pool.GetNextClientAsync().ConfigureAwait(false); | ||
| var entry = await pool.GetNextClientAsync(cancellationToken).ConfigureAwait(false); | ||
|
|
||
| using var cancelCallback = cancellationToken.Register(() => | ||
| { | ||
|
|
@@ -281,6 +290,30 @@ private async Task<DnsResponseMessage> QueryAsyncInternal(TcpClient client, DnsR | |
| return DnsResponseMessage.Combine(responses); | ||
| } | ||
|
|
||
|
|
||
| protected virtual void Dispose(bool disposing) | ||
| { | ||
| if (!_disposedValue) | ||
| { | ||
| if (disposing) | ||
| { | ||
| foreach (var entry in _pools) | ||
| { | ||
| entry.Value.Dispose(); | ||
| } | ||
|
|
||
| _pools.Clear(); | ||
MichaCo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| _disposedValue = true; | ||
MichaCo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| public override void Dispose() | ||
| { | ||
| Dispose(true); | ||
| } | ||
|
|
||
| private class ClientPool : IDisposable | ||
| { | ||
| private bool _disposedValue = false; | ||
|
|
@@ -294,7 +327,7 @@ public ClientPool(bool enablePool, IPEndPoint endpoint) | |
| _endpoint = endpoint; | ||
| } | ||
|
|
||
| public ClientEntry GetNextClient() | ||
| public ClientEntry GetNextClient(CancellationToken cancellationToken) | ||
| { | ||
| if (_disposedValue) | ||
| { | ||
|
|
@@ -306,20 +339,54 @@ public ClientEntry GetNextClient() | |
| { | ||
| while (entry == null && !TryDequeue(out entry)) | ||
| { | ||
| entry = new ClientEntry(new TcpClient(_endpoint.AddressFamily) { LingerState = new LingerOption(true, 0) }, _endpoint); | ||
| entry.Client.Connect(_endpoint.Address, _endpoint.Port); | ||
| entry = ConnectNew(); | ||
| } | ||
| } | ||
| else | ||
| { | ||
| entry = new ClientEntry(new TcpClient(_endpoint.AddressFamily), _endpoint); | ||
| entry.Client.Connect(_endpoint.Address, _endpoint.Port); | ||
| entry = ConnectNew(); | ||
| } | ||
|
|
||
| return entry; | ||
|
|
||
| ClientEntry ConnectNew() | ||
MichaCo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| { | ||
| var newClient = new TcpClient(_endpoint.AddressFamily) | ||
| { | ||
| LingerState = new LingerOption(true, 0) | ||
| }; | ||
|
|
||
| bool gotCanceled = false; | ||
| cancellationToken.Register(() => | ||
| { | ||
| gotCanceled = true; | ||
| newClient.Dispose(); | ||
| }); | ||
|
|
||
| try | ||
| { | ||
| newClient.Connect(_endpoint.Address, _endpoint.Port); | ||
| } | ||
| catch (Exception) when (gotCanceled) | ||
| { | ||
| throw new TimeoutException("Connection timed out."); | ||
|
||
| } | ||
| catch (Exception) | ||
| { | ||
| try | ||
| { | ||
| newClient.Dispose(); | ||
| } | ||
| catch { } | ||
|
|
||
| throw; | ||
| } | ||
|
|
||
| return new ClientEntry(newClient, _endpoint); | ||
| } | ||
| } | ||
|
|
||
| public async Task<ClientEntry> GetNextClientAsync() | ||
| public async Task<ClientEntry> GetNextClientAsync(CancellationToken cancellationToken) | ||
| { | ||
| if (_disposedValue) | ||
| { | ||
|
|
@@ -331,17 +398,55 @@ public async Task<ClientEntry> GetNextClientAsync() | |
| { | ||
| while (entry == null && !TryDequeue(out entry)) | ||
| { | ||
| entry = new ClientEntry(new TcpClient(_endpoint.AddressFamily) { LingerState = new LingerOption(true, 0) }, _endpoint); | ||
| await entry.Client.ConnectAsync(_endpoint.Address, _endpoint.Port).ConfigureAwait(false); | ||
| entry = await ConnectNew().ConfigureAwait(false); | ||
| } | ||
| } | ||
| else | ||
| { | ||
| entry = new ClientEntry(new TcpClient(_endpoint.AddressFamily), _endpoint); | ||
| await entry.Client.ConnectAsync(_endpoint.Address, _endpoint.Port).ConfigureAwait(false); | ||
| entry = await ConnectNew().ConfigureAwait(false); | ||
| } | ||
|
|
||
| return entry; | ||
|
|
||
| async Task<ClientEntry> ConnectNew() | ||
| { | ||
| var newClient = new TcpClient(_endpoint.AddressFamily) | ||
| { | ||
| LingerState = new LingerOption(true, 0) | ||
| }; | ||
|
|
||
| #if NET6_0_OR_GREATER | ||
| await newClient.ConnectAsync(_endpoint.Address, _endpoint.Port, cancellationToken).ConfigureAwait(false); | ||
| #else | ||
|
|
||
| bool gotCanceled = false; | ||
| cancellationToken.Register(() => | ||
| { | ||
| gotCanceled = true; | ||
| newClient.Dispose(); | ||
| }); | ||
|
|
||
| try | ||
| { | ||
| await newClient.ConnectAsync(_endpoint.Address, _endpoint.Port).ConfigureAwait(false); | ||
| } | ||
| catch (Exception) when (gotCanceled) | ||
| { | ||
| throw new TimeoutException("Connection timed out."); | ||
|
||
| } | ||
| catch (Exception) | ||
| { | ||
| try | ||
| { | ||
| newClient.Dispose(); | ||
| } | ||
| catch { } | ||
|
|
||
| throw; | ||
| } | ||
| #endif | ||
| return new ClientEntry(newClient, _endpoint); | ||
| } | ||
| } | ||
|
|
||
| public void Enqueue(ClientEntry entry) | ||
|
|
@@ -432,11 +537,7 @@ public void DisposeClient() | |
| { | ||
| try | ||
| { | ||
| #if !NET45 | ||
| Client.Dispose(); | ||
| #else | ||
| Client.Close(); | ||
| #endif | ||
| } | ||
| catch { } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.