Skip to content

Commit 00a138f

Browse files
committed
Envoy core changes for reverse connections
Commit Message: This commit collates the envoy core changes for reverse connections. Additional Description: Risk Level: Testing: Docs Changes: Release Notes: Platform Specific Features: [Optional Runtime guard:] [Optional Fixes #Issue] [Optional Fixes commit #PR or SHA] [Optional Deprecated:] [Optional [API Considerations](https://github.com/envoyproxy/envoy/blob/main/api/review_checklist.md):] Signed-off-by: Basundhara Chakrabarty <[email protected]> Co-authored-by: Arun Vasudevan <[email protected]> Co-authored-by: Tejas Sangol <[email protected]> Co-authored-by: Aditya Jaltade <[email protected]> Signed-off-by: Basundhara Chakrabarty <[email protected]>
1 parent 4aa1d1b commit 00a138f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+896
-48
lines changed

envoy/event/dispatcher.h

+25
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
#include "absl/functional/any_invocable.h"
3434

3535
namespace Envoy {
36+
namespace Upstream {
37+
class ClusterManager;
38+
}
3639
namespace Event {
3740

3841
/**
@@ -286,6 +289,28 @@ class Dispatcher : public DispatcherBase, public ScopeTracker {
286289
* Shutdown the dispatcher by clear dispatcher thread deletable.
287290
*/
288291
virtual void shutdown() PURE;
292+
293+
/**
294+
* Provides filters access to connection handler to save outgoing connections as
295+
* incoming connections for reverse tunnels
296+
*/
297+
virtual void setConnectionHandler(Network::ConnectionHandler* connection_handler) PURE;
298+
299+
/**
300+
* @return the Connection Handler.
301+
*/
302+
virtual Network::ConnectionHandler* connectionHandler() PURE;
303+
304+
/**
305+
* Sets the dispatcher's cluster manager pointer.
306+
* @param cluster_manager the upstream cluster manager object.
307+
*/
308+
virtual void setClusterManager(Upstream::ClusterManager* cluster_manager) PURE;
309+
310+
/**
311+
* @return the cluster manager pointer.
312+
*/
313+
virtual Upstream::ClusterManager* getClusterManager() PURE;
289314
};
290315

291316
using DispatcherPtr = std::unique_ptr<Dispatcher>;

envoy/network/BUILD

+20
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ envoy_cc_library(
5252
":connection_balancer_interface",
5353
":listen_socket_interface",
5454
":listener_interface",
55+
":reverse_connection_handler_interface",
56+
":reverse_connection_manager_interface",
5557
"//envoy/common:random_generator_interface",
5658
"//envoy/runtime:runtime_interface",
5759
"//envoy/ssl:context_interface",
@@ -257,3 +259,21 @@ envoy_cc_library(
257259
":address_interface",
258260
],
259261
)
262+
263+
envoy_cc_library(
264+
name = "reverse_connection_manager_interface",
265+
hdrs = ["reverse_connection_manager.h"],
266+
deps = [
267+
"//envoy/network:connection_interface",
268+
"//envoy/network:listener_interface",
269+
],
270+
)
271+
272+
envoy_cc_library(
273+
name = "reverse_connection_handler_interface",
274+
hdrs = ["reverse_connection_handler.h"],
275+
deps = [
276+
":listen_socket_interface",
277+
"//envoy/event:timer_interface",
278+
],
279+
)

envoy/network/connection.h

+21
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,27 @@ class Connection : public Event::DeferredDeletable,
318318
*/
319319
virtual bool aboveHighWatermark() const PURE;
320320

321+
/**
322+
* @return ConnectionSocketPtr& To get socket from current connection.
323+
*/
324+
virtual const ConnectionSocketPtr& getSocket() const PURE;
325+
326+
/**
327+
* Set the flag connection_reused_ to value. The flag connection_reused_
328+
* indicates whether the client connection is reused.
329+
*/
330+
virtual void setConnectionReused(bool value) PURE;
331+
332+
/**
333+
* Set flag to convey active connection (listener) is reused.
334+
*/
335+
virtual void setActiveConnectionReused(bool value) PURE;
336+
337+
/**
338+
* return boolean telling if active connection (listener) is reused.
339+
*/
340+
virtual bool isActiveConnectionReused() PURE;
341+
321342
/**
322343
* Get the socket options set on this connection.
323344
*/

envoy/network/connection_handler.h

+42
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include "envoy/network/filter.h"
1111
#include "envoy/network/listen_socket.h"
1212
#include "envoy/network/listener.h"
13+
#include "envoy/network/reverse_connection_handler.h"
14+
#include "envoy/network/reverse_connection_manager.h"
1315
#include "envoy/runtime/runtime.h"
1416
#include "envoy/server/overload/thread_local_overload_state.h"
1517
#include "envoy/ssl/context.h"
@@ -19,6 +21,26 @@
1921
namespace Envoy {
2022
namespace Network {
2123

24+
// The thread local registry.
25+
class LocalRevConnRegistry {
26+
public:
27+
virtual ~LocalRevConnRegistry() = default;
28+
29+
virtual Network::ReverseConnectionManager& getRCManager() PURE;
30+
virtual Network::ReverseConnectionHandler& getRCHandler() PURE;
31+
};
32+
33+
// The central reverse conn registry interface providing the thread local accessor.
34+
class RevConnRegistry {
35+
public:
36+
virtual ~RevConnRegistry() = default;
37+
38+
/**
39+
* @return The thread local registry.
40+
*/
41+
virtual LocalRevConnRegistry* getLocalRegistry() PURE;
42+
};
43+
2244
// This interface allows for a listener to perform an alternative behavior when a
2345
// packet can't be routed correctly during draining; for example QUIC packets that
2446
// are not for an existing connection.
@@ -127,6 +149,26 @@ class ConnectionHandler {
127149
*/
128150
virtual const std::string& statPrefix() const PURE;
129151

152+
/**
153+
* Pass the reverse connection socket to the listener that initiated it.
154+
* @param upstream_socket the socket to be passed.
155+
* @param listener_tag the tag of the listener that initiated the reverse
156+
* connection.
157+
*/
158+
virtual void saveUpstreamConnection(Network::ConnectionSocketPtr&& upstream_socket,
159+
uint64_t listener_tag) PURE;
160+
161+
/**
162+
* Enable reverse connection entities on the current worker.
163+
* @param reverse_conn_registry the thread local registry that holds the reverse connection
164+
* entities.
165+
*/
166+
virtual void enableReverseConnections(Network::RevConnRegistry& reverse_conn_registry) PURE;
167+
/**
168+
* @return the thread local registry.
169+
*/
170+
virtual Network::LocalRevConnRegistry& reverseConnRegistry() const PURE;
171+
130172
/**
131173
* Used by ConnectionHandler to manage listeners.
132174
*/

envoy/network/filter.h

+6
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,12 @@ class ListenerFilter {
383383
*/
384384
virtual FilterStatus onData(Network::ListenerFilterBuffer& buffer) PURE;
385385

386+
/**
387+
* Called when the connection is closed. Only the current filter that has stopped filter
388+
* chain iteration will get the callback.
389+
*/
390+
virtual void onClose(){};
391+
386392
/**
387393
* Return the size of data the filter want to inspect from the connection.
388394
* The size can be increased after filter need to inspect more data.

envoy/network/listen_socket.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class ConnectionSocket : public virtual Socket {
9494
virtual void dumpState(std::ostream& os, int indent_level = 0) const PURE;
9595
};
9696

97-
using ConnectionSocketPtr = std::unique_ptr<ConnectionSocket>;
97+
using ConnectionSocketPtr = std::shared_ptr<ConnectionSocket>;
9898

9999
} // namespace Network
100100
} // namespace Envoy

envoy/network/listener.h

+39
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,34 @@ class InternalListenerConfig {
134134

135135
using InternalListenerConfigOptRef = OptRef<InternalListenerConfig>;
136136

137+
class RevConnRegistry;
138+
139+
/**
140+
* Configuration for a reverse connection listener.
141+
*/
142+
class ReverseConnectionListenerConfig {
143+
public:
144+
virtual ~ReverseConnectionListenerConfig() = default;
145+
146+
// The source node, cluster and tenant IDs of the local envoy. This is used when the listener is
147+
// used to create reverse connections.
148+
struct ReverseConnParams {
149+
std::string src_node_id_;
150+
std::string src_cluster_id_;
151+
std::string src_tenant_id_;
152+
absl::flat_hash_map<std::string, uint32_t> remote_cluster_to_conn_count_map_;
153+
};
154+
using ReverseConnParamsPtr = std::unique_ptr<ReverseConnParams>;
155+
/**
156+
* @return the private ReverseConnParams object, containing
157+
* the params identifying the local envoy.
158+
*/
159+
virtual ReverseConnParamsPtr& getReverseConnParams() PURE;
160+
};
161+
162+
using ReverseConnectionListenerConfigPtr = std::unique_ptr<ReverseConnectionListenerConfig>;
163+
using ReverseConnectionListenerConfigOptRef = OptRef<ReverseConnectionListenerConfig>;
164+
137165
/**
138166
* Description of the listener.
139167
*/
@@ -260,6 +288,17 @@ class ListenerConfig {
260288
*/
261289
virtual InternalListenerConfigOptRef internalListenerConfig() PURE;
262290

291+
/**
292+
* @return the reverse connection configuration for the listener IFF it is an reverse connection
293+
* listener.
294+
*/
295+
virtual ReverseConnectionListenerConfigOptRef reverseConnectionListenerConfig() const PURE;
296+
297+
/**
298+
* @return the listener's version.
299+
*/
300+
virtual const std::string& versionInfo() const PURE;
301+
263302
/**
264303
* @param address is used for query the address specific connection balancer.
265304
* @return the connection balancer for this listener. All listeners have a connection balancer,
+131
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
#pragma once
2+
3+
#include <future>
4+
#include <string>
5+
6+
#include "envoy/common/pure.h"
7+
#include "envoy/event/timer.h"
8+
#include "envoy/network/listen_socket.h"
9+
#include "envoy/stats/scope.h"
10+
#include "envoy/stats/stats_macros.h"
11+
12+
#include "absl/container/flat_hash_map.h"
13+
14+
namespace Envoy {
15+
namespace Network {
16+
17+
/**
18+
* All ReverseConnectionHandler stats. @see stats_macros.h
19+
* This encompasses the stats for all accepted reverse connections by the responder envoy.
20+
* The initiated reverse connections by the initiator envoy are logged by the RCManager.
21+
*/
22+
#define ALL_RCHANDLER_STATS(GAUGE) \
23+
GAUGE(reverse_conn_cx_idle, NeverImport) \
24+
GAUGE(reverse_conn_cx_used, NeverImport) \
25+
GAUGE(reverse_conn_cx_total, NeverImport)
26+
27+
/**
28+
* Struct definition for all ReverseConnectionHandler stats. @see stats_macros.h
29+
*/
30+
struct RCHandlerStats {
31+
ALL_RCHANDLER_STATS(GENERATE_GAUGE_STRUCT)
32+
};
33+
34+
using RCHandlerStatsPtr = std::unique_ptr<RCHandlerStats>;
35+
using RCSocketPair = std::pair<Network::ConnectionSocketPtr, bool>;
36+
37+
/**
38+
* class to store reverse connection sockets.
39+
*/
40+
class ReverseConnectionHandler {
41+
public:
42+
virtual ~ReverseConnectionHandler() = default;
43+
44+
/** Add the accepted connection and remote cluster mapping to RCHandler maps.
45+
* @param node_id node_id of initiating node.
46+
* @param cluster_id cluster_id of receiving(acceptor) cluster.
47+
* @param socket the socket to be added.
48+
* @param expects_proxy_protocol whether the proxy protocol header is expected. This is used
49+
* in legacy versions.
50+
* @param ping_interval the interval at which ping keepalives are sent on accepted reverse conns.
51+
* @param rebalanced is true if we are adding to the socket after `pickMinHandler` is used
52+
* to pick the most appropriate thread.
53+
*/
54+
virtual void
55+
addConnectionSocket(const std::string& node_id, const std::string& cluster_id,
56+
Network::ConnectionSocketPtr socket, bool expects_proxy_protocol,
57+
const std::chrono::seconds& ping_interval = std::chrono::seconds::zero(),
58+
bool rebalanced = false) PURE;
59+
60+
/** Add the accepted connection and remote cluster mapping to RCHandler maps
61+
* through the thread local dispatcher.
62+
* @param node_id node_id of initiating node.
63+
* @param cluster_id cluster_id of receiving(acceptor) cluster.
64+
* @param socket the socket to be added.
65+
* @param expects_proxy_protocol whether the proxy protocol header is expected. This is used
66+
* in legacy versions.
67+
* @param ping_interval the interval at which ping keepalives are sent on accepted reverse conns.
68+
*/
69+
virtual void post(const std::string& node_id, const std::string& cluster_id,
70+
Network::ConnectionSocketPtr socket, bool expects_proxy_protocol,
71+
const std::chrono::seconds& ping_interval = std::chrono::seconds::zero()) PURE;
72+
73+
/** Called by the responder envoy when a request is received, that could be sent through a reverse
74+
* connection. This returns an accepted connection socket, if present.
75+
* @param key the remote cluster ID/ node ID.
76+
* @param rebalanced is true if we are calling the function after `pickTargetHandler` is used
77+
* to pick the most appropriate thread.
78+
*/
79+
virtual std::pair<Network::ConnectionSocketPtr, bool> getConnectionSocket(const std::string& key,
80+
bool rebalanced) PURE;
81+
82+
/** Called by the responder envoy when the local worker does not have any accepted reverse
83+
* connections for the key, to rebalance the request to a different worker and return the
84+
* connection socket.
85+
* @param key the remote cluster ID/ node ID.
86+
* @param rebalanced is true if we are calling the function after `pickTargetHandler` is used
87+
* to pick the most appropriate thread.
88+
* @param socket_promise the promise to be set with the connection socket.
89+
*/
90+
virtual void rebalanceGetConnectionSocket(
91+
const std::string& key, bool rebalanced,
92+
std::shared_ptr<std::promise<Network::RCSocketPair>> socket_promise) PURE;
93+
94+
/**
95+
* @return the number of reverse connections across all workers
96+
* for the given node id.
97+
*/
98+
virtual size_t getNumberOfSocketsByNode(const std::string& node_id) PURE;
99+
100+
/**
101+
* @return the number of reverse connections across all workers for
102+
* the given cluster id.
103+
*/
104+
virtual size_t getNumberOfSocketsByCluster(const std::string& cluster_id) PURE;
105+
106+
using SocketCountMap = absl::flat_hash_map<std::string, size_t>;
107+
/**
108+
*
109+
* @return the cluster -> reverse conn count mapping.
110+
*/
111+
virtual SocketCountMap getSocketCountMap() PURE;
112+
/**
113+
* Mark the connection socket dead and remove it from internal maps.
114+
* @param fd the FD for the socket to be marked dead.
115+
* @param used is true, when the connection the fd belongs to has been used by a cluster.
116+
*/
117+
virtual void markSocketDead(int fd, bool used) PURE;
118+
119+
/**
120+
* Sets the stats scope for logging stats for accepted reverse connections
121+
* with the local envoy as responder.
122+
* @param scope the base scope to be used.
123+
* @return the parent scope for RCHandler stats.
124+
*/
125+
virtual void initializeStats(Stats::Scope& scope) PURE;
126+
127+
virtual absl::flat_hash_map<std::string, size_t> getConnectionStats() PURE;
128+
};
129+
130+
} // namespace Network
131+
} // namespace Envoy

0 commit comments

Comments
 (0)