Skip to content

Commit 3352233

Browse files
authored
refactor/request data v3 (#130)
This PR continues #128 and ensures `RequestPipeline` can be shared over many requests unless a local configuration is provided. Similar to `RequestData` being shared in #128. This includes further refactorings: - `DateTimeProvider` is only provided externally once (on `NodePool`) and that instance is used everywhere. Before it could be set seperately on `NodePool` and `TransportConfiguration`, not by design but by necessity. - `RequestPipeline` is no longer disposable (we didn't actually disposed anything). - A new type exists `Auditor` this is now explicitly passed to the methods that need it on `RequestPipeline`. It implements `IReadOnlyCollection<Audit>` and is exposed as such to users. - This PR also merges `DefaultRequestPipeline` and `RequestPipeline` into one.
1 parent 0813129 commit 3352233

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+830
-978
lines changed

src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private Auditor(Components.VirtualizedCluster cluster, Components.VirtualizedClu
3434

3535
public IEnumerable<Diagnostics.Auditing.Audit> AsyncAuditTrail { get; set; }
3636
public IEnumerable<Diagnostics.Auditing.Audit> AuditTrail { get; set; }
37-
public Func<Components.VirtualizedCluster> Cluster { get; set; }
37+
public Func<Components.VirtualizedCluster> Cluster { get; }
3838

3939
public TransportResponse Response { get; internal set; }
4040
public TransportResponse ResponseAsync { get; internal set; }

src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,16 @@ namespace Elastic.Transport.VirtualizedCluster.Components;
1111
public sealed class ExposingPipelineFactory<TConfiguration> : RequestPipelineFactory
1212
where TConfiguration : class, ITransportConfiguration
1313
{
14-
public ExposingPipelineFactory(TConfiguration configuration, DateTimeProvider dateTimeProvider)
14+
public ExposingPipelineFactory(TConfiguration configuration)
1515
{
16-
DateTimeProvider = dateTimeProvider;
1716
Configuration = configuration;
18-
Pipeline = Create(new RequestData(Configuration, null), DateTimeProvider);
19-
RequestHandler = new DistributedTransport<TConfiguration>(Configuration, this, DateTimeProvider);
17+
Transport = new DistributedTransport<TConfiguration>(Configuration);
2018
}
2119

22-
// ReSharper disable once MemberCanBePrivate.Global
23-
public RequestPipeline Pipeline { get; }
24-
private DateTimeProvider DateTimeProvider { get; }
2520
private TConfiguration Configuration { get; }
26-
public ITransport<TConfiguration> RequestHandler { get; }
21+
public ITransport<TConfiguration> Transport { get; }
2722

28-
public override RequestPipeline Create(RequestData requestData, DateTimeProvider dateTimeProvider) =>
29-
new DefaultRequestPipeline(requestData, DateTimeProvider);
23+
public override RequestPipeline Create(RequestData requestData) =>
24+
new RequestPipeline(requestData);
3025
}
3126
#nullable restore

src/Elastic.Transport.VirtualizedCluster/Components/SealedVirtualCluster.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,21 @@ internal SealedVirtualCluster(VirtualCluster cluster, NodePool pool, TestableDat
3131
private TransportConfigurationDescriptor CreateSettings() =>
3232
new(_nodePool, _requestInvoker, serializer: null, _productRegistration.ProductRegistration);
3333

34+
3435
/// <summary> Create the cluster using all defaults on <see cref="TransportConfigurationDescriptor"/> </summary>
3536
public VirtualizedCluster AllDefaults() =>
36-
new(_dateTimeProvider, CreateSettings());
37+
new(CreateSettings());
3738

3839
/// <summary> Create the cluster using <paramref name="selector"/> to provide configuration changes </summary>
3940
/// <param name="selector">Provide custom configuration options</param>
4041
public VirtualizedCluster Settings(Func<TransportConfigurationDescriptor, TransportConfigurationDescriptor> selector) =>
41-
new(_dateTimeProvider, selector(CreateSettings()));
42+
new(selector(CreateSettings()));
4243

4344
/// <summary>
4445
/// Allows you to create an instance of `<see cref="VirtualClusterConnection"/> using the DSL provided by <see cref="Virtual"/>
4546
/// </summary>
4647
/// <param name="selector">Provide custom configuration options</param>
4748
public VirtualClusterRequestInvoker VirtualClusterConnection(Func<TransportConfigurationDescriptor, TransportConfigurationDescriptor> selector = null) =>
48-
new VirtualizedCluster(_dateTimeProvider, selector == null ? CreateSettings() : selector(CreateSettings()))
49+
new VirtualizedCluster(selector == null ? CreateSettings() : selector(CreateSettings()))
4950
.Connection;
5051
}

src/Elastic.Transport.VirtualizedCluster/Components/VirtualCluster.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ protected VirtualCluster(IEnumerable<Node> nodes, MockProductRegistration produc
1919
InternalNodes = nodes.ToList();
2020
}
2121

22-
public List<IClientCallRule> ClientCallRules { get; } = new List<IClientCallRule>();
23-
public TestableDateTimeProvider DateTimeProvider { get; } = new TestableDateTimeProvider();
22+
public List<IClientCallRule> ClientCallRules { get; } = new();
23+
private TestableDateTimeProvider TestDateTimeProvider { get; } = new();
2424

2525
protected List<Node> InternalNodes { get; }
2626
public IReadOnlyList<Node> Nodes => InternalNodes;
27-
public List<IRule> PingingRules { get; } = new List<IRule>();
27+
public List<IRule> PingingRules { get; } = new();
2828

29-
public List<ISniffRule> SniffingRules { get; } = new List<ISniffRule>();
29+
public List<ISniffRule> SniffingRules { get; } = new();
3030
internal string PublishAddressOverride { get; private set; }
3131

3232
internal bool SniffShouldReturnFqnd { get; private set; }
@@ -73,32 +73,34 @@ public VirtualCluster ClientCalls(Func<ClientCallRule, IClientCallRule> selector
7373
public SealedVirtualCluster SingleNodeConnection(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
7474
{
7575
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
76-
return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), DateTimeProvider, ProductRegistration);
76+
return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), TestDateTimeProvider, ProductRegistration);
7777
}
7878

7979
public SealedVirtualCluster StaticNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
8080
{
8181
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
82-
return new SealedVirtualCluster(this, new StaticNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration);
82+
var dateTimeProvider = TestDateTimeProvider;
83+
var nodePool = new StaticNodePool(nodes, false) { DateTimeProvider = dateTimeProvider };
84+
return new SealedVirtualCluster(this, nodePool , TestDateTimeProvider, ProductRegistration);
8385
}
8486

8587
public SealedVirtualCluster SniffingNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
8688
{
8789
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
88-
return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration);
90+
return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration);
8991
}
9092

9193
public SealedVirtualCluster StickyNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
9294
{
9395
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
94-
return new SealedVirtualCluster(this, new StickyNodePool(nodes, DateTimeProvider), DateTimeProvider, ProductRegistration);
96+
return new SealedVirtualCluster(this, new StickyNodePool(nodes) { DateTimeProvider = TestDateTimeProvider}, TestDateTimeProvider, ProductRegistration);
9597
}
9698

9799
public SealedVirtualCluster StickySniffingNodePool(Func<Node, float> sorter = null,
98100
Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null
99101
)
100102
{
101103
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
102-
return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter, DateTimeProvider), DateTimeProvider, ProductRegistration);
104+
return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration);
103105
}
104106
}

src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ private class VirtualResponse : TransportResponse;
2222

2323
private static readonly EndpointPath RootPath = new(HttpMethod.GET, "/");
2424

25-
internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, TransportConfigurationDescriptor settings)
25+
internal VirtualizedCluster(TransportConfigurationDescriptor settings)
2626
{
27-
_dateTimeProvider = dateTimeProvider;
2827
_settings = settings;
29-
_exposingRequestPipeline = new ExposingPipelineFactory<ITransportConfiguration>(settings, _dateTimeProvider);
28+
_dateTimeProvider = ((ITransportConfiguration)_settings).DateTimeProvider as TestableDateTimeProvider
29+
?? throw new ArgumentException("DateTime provider is not a TestableDateTimeProvider", nameof(_dateTimeProvider));
30+
_exposingRequestPipeline = new ExposingPipelineFactory<ITransportConfiguration>(settings);
3031

3132
_syncCall = (t, r) => t.Request<VirtualResponse>(
3233
path: RootPath,
@@ -50,7 +51,7 @@ internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, Transport
5051

5152
public VirtualClusterRequestInvoker Connection => RequestHandler.Configuration.RequestInvoker as VirtualClusterRequestInvoker;
5253
public NodePool ConnectionPool => RequestHandler.Configuration.NodePool;
53-
public ITransport<ITransportConfiguration> RequestHandler => _exposingRequestPipeline?.RequestHandler;
54+
public ITransport<ITransportConfiguration> RequestHandler => _exposingRequestPipeline?.Transport;
5455

5556
public VirtualizedCluster TransportProxiesTo(
5657
Func<ITransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, TransportResponse> sync,

src/Elastic.Transport/Components/NodePool/CloudNodePool.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,10 @@ public sealed class CloudNodePool : SingleNodePool
3636
/// <para> Read more here: https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html</para>
3737
/// </param>
3838
/// <param name="credentials"></param>
39-
/// <param name="dateTimeProvider">Optionally inject an instance of <see cref="DateTimeProvider"/> used to set <see cref="NodePool.LastUpdate"/></param>
40-
public CloudNodePool(string cloudId, AuthorizationHeader credentials, DateTimeProvider dateTimeProvider = null) : this(ParseCloudId(cloudId), dateTimeProvider) =>
39+
public CloudNodePool(string cloudId, AuthorizationHeader credentials) : this(ParseCloudId(cloudId)) =>
4140
AuthenticationHeader = credentials;
4241

43-
private CloudNodePool(ParsedCloudId parsedCloudId, DateTimeProvider dateTimeProvider = null) : base(parsedCloudId.Uri, dateTimeProvider) =>
42+
private CloudNodePool(ParsedCloudId parsedCloudId) : base(parsedCloudId.Uri) =>
4443
ClusterName = parsedCloudId.Name;
4544

4645
//TODO implement debugger display for NodePool implementations and display it there and its ToString()
@@ -92,7 +91,4 @@ private static ParsedCloudId ParseCloudId(string cloudId)
9291

9392
return new ParsedCloudId(clusterName, new Uri($"https://{elasticsearchUuid}.{domainName}"));
9493
}
95-
96-
/// <inheritdoc />
97-
protected override void Dispose(bool disposing) => base.Dispose(disposing);
9894
}

src/Elastic.Transport/Components/NodePool/NodePool.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ public abstract class NodePool : IDisposable
2020
{
2121
private bool _disposed;
2222

23-
internal NodePool() { }
24-
2523
/// <summary>
2624
/// The last time that this instance was updated.
2725
/// </summary>
28-
public abstract DateTimeOffset LastUpdate { get; protected set; }
26+
public abstract DateTimeOffset? LastUpdate { get; protected set; }
27+
28+
/// <inheritdoc cref="DateTimeProvider"/>>
29+
public DateTimeProvider DateTimeProvider { get; set; } = DefaultDateTimeProvider.Default;
2930

3031
/// <summary>
3132
/// Returns the default maximum retries for the connection pool implementation.
@@ -82,18 +83,15 @@ public void Dispose()
8283
/// <param name="disposing"></param>
8384
protected virtual void Dispose(bool disposing)
8485
{
85-
if (!_disposed)
86-
{
87-
_disposed = true;
88-
}
86+
if (!_disposed) _disposed = true;
8987
}
9088

9189
/// <summary>
9290
/// Creates a view over the nodes, with changing starting positions, that wraps over on each call
9391
/// e.g Thread A might get 1,2,3,4,5 and thread B will get 2,3,4,5,1.
9492
/// if there are no live nodes yields a different dead node to try once
9593
/// </summary>
96-
public abstract IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null);
94+
public abstract IEnumerable<Node> CreateView(Auditor? auditor = null);
9795

9896
/// <summary>
9997
/// Reseeds the nodes. The implementation is responsible for thread safety.

src/Elastic.Transport/Components/NodePool/SingleNodePool.cs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,15 @@ namespace Elastic.Transport;
1212
public class SingleNodePool : NodePool
1313
{
1414
/// <inheritdoc cref="SingleNodePool"/>
15-
public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null)
15+
public SingleNodePool(Uri uri)
1616
{
1717
var node = new Node(uri);
1818
UsingSsl = node.Uri.Scheme == "https";
1919
Nodes = new List<Node> { node };
20-
LastUpdate = (dateTimeProvider ?? DefaultDateTimeProvider.Default).Now();
2120
}
2221

2322
/// <inheritdoc />
24-
public override DateTimeOffset LastUpdate { get; protected set; }
23+
public override DateTimeOffset? LastUpdate { get; protected set; }
2524

2625
/// <inheritdoc />
2726
public override int MaxRetries => 0;
@@ -39,11 +38,8 @@ public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null)
3938
public override bool UsingSsl { get; protected set; }
4039

4140
/// <inheritdoc />
42-
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null) => Nodes;
41+
public override IEnumerable<Node> CreateView(Auditor? auditor) => Nodes;
4342

4443
/// <inheritdoc />
4544
public override void Reseed(IEnumerable<Node> nodes) { } //ignored
46-
47-
/// <inheritdoc />
48-
protected override void Dispose(bool disposing) => base.Dispose(disposing);
4945
}

src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,13 @@ public class SniffingNodePool : StaticNodePool
2222
private readonly ReaderWriterLockSlim _readerWriter = new();
2323

2424
/// <inheritdoc cref="SniffingNodePool"/>>
25-
public SniffingNodePool(IEnumerable<Uri> uris, bool randomize = true, DateTimeProvider dateTimeProvider = null)
26-
: base(uris, randomize, dateTimeProvider) { }
25+
public SniffingNodePool(IEnumerable<Uri> uris, bool randomize = true) : base(uris, randomize) { }
2726

2827
/// <inheritdoc cref="SniffingNodePool"/>>
29-
public SniffingNodePool(IEnumerable<Node> nodes, bool randomize = true, DateTimeProvider dateTimeProvider = null)
30-
: base(nodes, randomize, dateTimeProvider) { }
28+
public SniffingNodePool(IEnumerable<Node> nodes, bool randomize = true) : base(nodes, randomize) { }
3129

3230
/// <inheritdoc cref="SniffingNodePool"/>>
33-
public SniffingNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer, DateTimeProvider dateTimeProvider = null)
34-
: base(nodes, nodeScorer, dateTimeProvider) { }
31+
public SniffingNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer) : base(nodes, nodeScorer) { }
3532

3633
/// <inheritdoc />
3734
public override IReadOnlyCollection<Node> Nodes
@@ -81,12 +78,12 @@ public override void Reseed(IEnumerable<Node> nodes)
8178
}
8279

8380
/// <inheritdoc />
84-
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
81+
public override IEnumerable<Node> CreateView(Auditor? auditor)
8582
{
8683
_readerWriter.EnterReadLock();
8784
try
8885
{
89-
return base.CreateView(audit);
86+
return base.CreateView(auditor);
9087
}
9188
finally
9289
{

0 commit comments

Comments
 (0)