Skip to content

Envoy core changes for reverse connections #37368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 41 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
457e1e9
Envoy core changes for reverse connections
basundhara-c Nov 26, 2024
6ce719e
Pass cluster manager while creating tls registry
basundhara-c Dec 5, 2024
3d33358
Envoy core changes to use contrib/ extensions
basundhara-c Dec 20, 2024
3331617
1. Remove the getDispatcher API and instead pass the connection handl…
basundhara-c Dec 20, 2024
4104495
Formatting files
basundhara-c Dec 25, 2024
edd3b17
Nit: Remove more extraneous code
basundhara-c Dec 25, 2024
553fada
reverse_conn: resolve PR comments, add logic in cluster manager to ch…
basundhara-c Jan 8, 2025
5401791
reverse_conn: add mock methods, add tests for json utils
basundhara-c Jan 8, 2025
7c270c2
nit: Change listener_map_by_tag_ to use shared_ptr instead of unique_…
basundhara-c Jan 9, 2025
05ee7d6
Nit: Add mocks for Host functions
basundhara-c Jan 9, 2025
841dfea
Nit: Add listener proto field and minor nit to remove deleted functio…
basundhara-c Jan 12, 2025
ba4e0bf
Minor nits to reconcile missed code, and added a runtime guard for re…
basundhara-c Jan 13, 2025
ae3e00b
Nit: Correct/Add mocks for HostDescription
basundhara-c Jan 13, 2025
bc15de5
reverse_conn_core: Simplify connection close sequence, add a flag to …
basundhara-c Feb 3, 2025
0e26d53
rev_conn_core: Unifying flags to enable connection reuse
basundhara-c Feb 8, 2025
d10a90a
Merge remote-tracking branch 'upstream/main' into reverse_conn_envoy_…
basundhara-c Feb 11, 2025
d54a8d0
reverse_conn_core: add mocks for ListenerFilterWithDataFuzzer
basundhara-c Feb 17, 2025
9c953ae
Adding null check for socket_ and minor nits
basundhara-c Mar 10, 2025
d478775
Fixing issues in release build and formatting files
basundhara-c Mar 10, 2025
2b365bf
reverse_conn_core: Store unique ptr of connection socket instead of s…
basundhara-c Mar 12, 2025
611e8ba
reverse_conn_core: Formatting changes
basundhara-c Mar 12, 2025
86fe208
reverse_conn_core: Formatting files and using ProtobufWkt::Any instea…
basundhara-c Mar 13, 2025
13d6ba9
Adding unit tests for logical_host and listener_manager to fix covera…
basundhara-c Apr 1, 2025
b4c6fe1
Formatting files
basundhara-c Apr 1, 2025
6f5de2b
fix coverage tests
basundhara-c Apr 1, 2025
ce9edf6
Addressed Comments
agrawroh Apr 9, 2025
1919394
Merge pull request #6 from agrawroh/rev-tunn-2
basundhara-c Apr 9, 2025
7492552
Merge remote-tracking branch 'upstream/main' into reverse_conn_envoy_…
basundhara-c Apr 10, 2025
09c7245
Minor nits to LOG statements and fix formatting
basundhara-c Apr 10, 2025
a1dddbb
Added Unit Tests
agrawroh Apr 11, 2025
55e4ab4
Merge pull request #9 from agrawroh/rev-tunn-5
basundhara-c Apr 11, 2025
24ef333
Minor nits to MultiConnectionBaseImplTest
basundhara-c Apr 13, 2025
4592851
test commit to verify coverage
basundhara-c Apr 14, 2025
f2de792
Merge remote-tracking branch 'upstream/main' into reverse_conn_envoy_…
basundhara-c Apr 14, 2025
8f7ea97
Fixing coverage build for source/common/quic
basundhara-c Apr 15, 2025
b7d7049
Addressed comments from @botengyao
agrawroh Apr 16, 2025
2064812
Merge pull request #10 from agrawroh/rev-tunn-7
basundhara-c Apr 16, 2025
f82cce1
Addressed comments from @botengyao
agrawroh Apr 16, 2025
5a6086b
Merge pull request #11 from agrawroh/rev-tunn-8
basundhara-c Apr 16, 2025
a1b112b
Addressed comments from @botengyao
agrawroh Apr 18, 2025
5152af6
Merge pull request #12 from agrawroh/rev-tunn-9
basundhara-c Apr 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion api/envoy/config/listener/v3/listener.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import "envoy/config/listener/v3/listener_components.proto";
import "envoy/config/listener/v3/udp_listener_config.proto";

import "google/protobuf/duration.proto";
import "google/protobuf/any.proto";
import "google/protobuf/wrappers.proto";

import "xds/annotations/v3/status.proto";
Expand Down Expand Up @@ -53,7 +54,7 @@ message ListenerCollection {
repeated xds.core.v3.CollectionEntry entries = 1;
}

// [#next-free-field: 36]
// [#next-free-field: 37]
message Listener {
option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.Listener";

Expand Down Expand Up @@ -378,6 +379,11 @@ message Listener {
// * :ref:`freebind <envoy_v3_api_field_config.listener.v3.Listener.freebind>`
// * :ref:`transparent <envoy_v3_api_field_config.listener.v3.Listener.transparent>`
InternalListenerConfig internal_listener = 27;

// Used to represent a reverse connection listener which, instead of binding to a port and listening,
// initiates reverse connections to a remote endpoint. The used sockets are cached on the remote
// endpoint and can be used to send request to local services.
google.protobuf.Any reverse_connection_listener_config = 36;
}

// Enable MPTCP (multi-path TCP) on this listener. Clients will be allowed to establish
Expand Down
21 changes: 21 additions & 0 deletions envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,27 @@ class Connection : public Event::DeferredDeletable,
*/
virtual bool aboveHighWatermark() const PURE;

/**
* @return ConnectionSocketPtr& To get socket from current connection.
*/
virtual const ConnectionSocketPtr& getSocket() const PURE;

/**
* Set the flag connection_reused_ to value. The flag connection_reused_
* indicates whether the client connection is reused.
*/
virtual void setConnectionReused(bool value) PURE;

/**
* Set flag to convey active connection (listener) is reused.
*/
virtual void setActiveConnectionReused(bool value) PURE;

/**
* return boolean telling if active connection (listener) is reused.
*/
virtual bool isActiveConnectionReused() PURE;

/**
* Get the socket options set on this connection.
*/
Expand Down
68 changes: 68 additions & 0 deletions envoy/network/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
namespace Envoy {
namespace Network {

class LocalRevConnRegistry;

// This interface allows for a listener to perform an alternative behavior when a
// packet can't be routed correctly during draining; for example QUIC packets that
// are not for an existing connection.
Expand Down Expand Up @@ -127,6 +129,20 @@ class ConnectionHandler {
*/
virtual const std::string& statPrefix() const PURE;

/**
* Pass the reverse connection socket to the listener that initiated it.
* @param upstream_socket the socket to be passed.
* @param listener_tag the tag of the listener that initiated the reverse
* connection.
*/
virtual void saveUpstreamConnection(Network::ConnectionSocketPtr&& upstream_socket,
uint64_t listener_tag) PURE;

/**
* @return the thread local registry.
*/
virtual Network::LocalRevConnRegistry& reverseConnRegistry() const PURE;

/**
* Used by ConnectionHandler to manage listeners.
*/
Expand Down Expand Up @@ -303,6 +319,28 @@ class InternalListener : public virtual ConnectionHandler::ActiveListener {
using InternalListenerPtr = std::unique_ptr<InternalListener>;
using InternalListenerOptRef = OptRef<InternalListener>;

/**
* Reverse connection listener callbacks.
*/
class ReverseConnectionListener : public virtual ConnectionHandler::ActiveListener {
public:

/**
* Helper method that triggers the reverse connection workflow.
* @param dispatcher the thread local dispatcher.
* @param conn_handler the thread local connection handler.
* @param config the listener config that triggers the reverse connection.
*/
virtual void startRCWorkflow(Event::Dispatcher& dispatcher, Network::ConnectionHandler& conn_handler, Network::ListenerConfig& config) PURE;

/**
* Called when a new connection is accepted.
* @param socket supplies the socket that is moved into the callee.
*/
virtual void onAccept(ConnectionSocketPtr&& socket) PURE;
};
using ReverseConnectionListenerPtr = std::unique_ptr<ReverseConnectionListener>;

/**
* The query interface of the registered internal listener callbacks.
*/
Expand Down Expand Up @@ -351,5 +389,35 @@ class InternalListenerRegistry {
virtual LocalInternalListenerRegistry* getLocalRegistry() PURE;
};

// The thread local registry.
class LocalRevConnRegistry {
public:
virtual ~LocalRevConnRegistry() = default;

virtual Network::ReverseConnectionListenerPtr
createActiveReverseConnectionListener(Network::ConnectionHandler& conn_handler,
Event::Dispatcher& dispatcher,
Network::ListenerConfig& config) PURE;
};

// The central reverse conn registry interface providing the thread local accessor.
class RevConnRegistry {
public:
virtual ~RevConnRegistry() = default;

/**
* @return The thread local registry.
*/
virtual LocalRevConnRegistry* getLocalRegistry() PURE;

/**
* Helper function to create a ReverseConnectionListenerConfig from a google.protobuf.Any.
* @param config is the reverse connection listener config.
* @return the ReverseConnectionListenerConfig object.
*/
virtual absl::StatusOr<Network::ReverseConnectionListenerConfigPtr>
fromAnyConfig(const google::protobuf::Any& config) PURE;
};

} // namespace Network
} // namespace Envoy
6 changes: 6 additions & 0 deletions envoy/network/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,12 @@ class ListenerFilter {
*/
virtual FilterStatus onData(Network::ListenerFilterBuffer& buffer) PURE;

/**
* Called when the connection is closed. Only the current filter that has stopped filter
* chain iteration will get the callback.
*/
virtual void onClose(){};

/**
* Return the size of data the filter want to inspect from the connection.
* The size can be increased after filter need to inspect more data.
Expand Down
2 changes: 1 addition & 1 deletion envoy/network/listen_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ConnectionSocket : public virtual Socket {
virtual void dumpState(std::ostream& os, int indent_level = 0) const PURE;
};

using ConnectionSocketPtr = std::unique_ptr<ConnectionSocket>;
using ConnectionSocketPtr = std::shared_ptr<ConnectionSocket>;

} // namespace Network
} // namespace Envoy
47 changes: 47 additions & 0 deletions envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,42 @@ class InternalListenerConfig {

using InternalListenerConfigOptRef = OptRef<InternalListenerConfig>;

class RevConnRegistry;

/**
* Configuration for a reverse connection listener.
*/
class ReverseConnectionListenerConfig {
public:
virtual ~ReverseConnectionListenerConfig() = default;

/**
* Encapsulates the source node, cluster and tenant IDs of the local envoy. These are used when
* the listener creates reverse connections.
*/
struct ReverseConnParams {
std::string src_node_id_;
std::string src_cluster_id_;
std::string src_tenant_id_;
absl::flat_hash_map<std::string, uint32_t> remote_cluster_to_conn_count_map_;
};
using ReverseConnParamsPtr = std::unique_ptr<ReverseConnParams>;
/**
* @return the private ReverseConnParams object, containing
* the params identifying the local envoy.
*/
virtual ReverseConnParamsPtr& getReverseConnParams() PURE;

/**
* @return the global reverse conn registry.
*/
virtual RevConnRegistry& reverseConnRegistry() PURE;
};

using ReverseConnectionListenerConfigOptRef = OptRef<ReverseConnectionListenerConfig>;
using ReverseConnectionListenerConfigPtr = std::unique_ptr<ReverseConnectionListenerConfig>;
using ReverseConnParamsPtr = Network::ReverseConnectionListenerConfig::ReverseConnParamsPtr;

/**
* Description of the listener.
*/
Expand Down Expand Up @@ -260,6 +296,17 @@ class ListenerConfig {
*/
virtual InternalListenerConfigOptRef internalListenerConfig() PURE;

/**
* @return the reverse connection configuration for the listener IFF it is an reverse connection
* listener.
*/
virtual ReverseConnectionListenerConfigOptRef reverseConnectionListenerConfig() const PURE;

/**
* @return the listener's version.
*/
virtual const std::string& versionInfo() const PURE;

/**
* @param address is used for query the address specific connection balancer.
* @return the connection balancer for this listener. All listeners have a connection balancer,
Expand Down
11 changes: 11 additions & 0 deletions envoy/upstream/host_description.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,17 @@ class HostDescription {
*/
virtual absl::optional<MonotonicTime> lastHcPassTime() const PURE;

/**
* @return host-id to be used to retrieve reverse connection sockets from
* reverse connection handler.
*/
virtual const absl::string_view getHostId() const PURE;

/*
* Set the current host-id.
*/
virtual void setHostId(const std::string host_id) PURE;

/**
* Set the timestamp of when the host has transitioned from unhealthy to healthy state via an
* active health checking.
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/codec_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ class CodecClient : protected Logger::Loggable<Logger::Id::client>,
*/
void connect();

Network::ClientConnectionPtr& connection() { return connection_; }

bool connectCalled() const { return connect_called_; }

protected:
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,10 @@ uint64_t MultiplexedActiveClientBase::maxStreamsPerConnection(uint64_t max_strea
MultiplexedActiveClientBase::MultiplexedActiveClientBase(
HttpConnPoolImplBase& parent, uint32_t effective_concurrent_streams,
uint32_t max_configured_concurrent_streams, Stats::Counter& cx_total,
OptRef<Upstream::Host::CreateConnectionData> data)
OptRef<Upstream::Host::CreateConnectionData> data, CreateConnectionDataFn connection_fn)
: Envoy::Http::ActiveClient(
parent, maxStreamsPerConnection(parent.host()->cluster().maxRequestsPerConnection()),
effective_concurrent_streams, max_configured_concurrent_streams, data) {
effective_concurrent_streams, max_configured_concurrent_streams, data, connection_fn) {
codec_client_->setCodecClientCallbacks(*this);
codec_client_->setCodecConnectionCallbacks(*this);
cx_total.inc();
Expand Down
27 changes: 17 additions & 10 deletions source/common/http/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,31 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
absl::optional<HttpServerPropertiesCache::Origin> origin_;
};

using CreateConnectionDataFn =
std::function<Upstream::Host::CreateConnectionData(HttpConnPoolImplBase& pool)>;
// An implementation of Envoy::ConnectionPool::ActiveClient for HTTP/1.1 and HTTP/2
class ActiveClient : public Envoy::ConnectionPool::ActiveClient {
public:
ActiveClient(HttpConnPoolImplBase& parent, uint64_t lifetime_stream_limit,
uint64_t effective_concurrent_stream_limit,
uint64_t configured_concurrent_stream_limit,
OptRef<Upstream::Host::CreateConnectionData> opt_data)
ActiveClient(
HttpConnPoolImplBase& parent, uint64_t lifetime_stream_limit,
uint64_t effective_concurrent_stream_limit, uint64_t configured_concurrent_stream_limit,
OptRef<Upstream::Host::CreateConnectionData> opt_data,
CreateConnectionDataFn connection_fn =
[](HttpConnPoolImplBase& parent) {
return static_cast<Envoy::ConnectionPool::ConnPoolImplBase*>(&parent)
->host()
->createConnection(parent.dispatcher(), parent.socketOptions(),
parent.transportSocketOptions());
})
: Envoy::ConnectionPool::ActiveClient(parent, lifetime_stream_limit,
effective_concurrent_stream_limit,
configured_concurrent_stream_limit) {
if (opt_data.has_value()) {
initialize(opt_data.value(), parent);
return;
}
// The static cast makes sure we call the base class host() and not
// HttpConnPoolImplBase::host which is of a different type.
Upstream::Host::CreateConnectionData data =
static_cast<Envoy::ConnectionPool::ConnPoolImplBase*>(&parent)->host()->createConnection(
parent.dispatcher(), parent.socketOptions(), parent.transportSocketOptions());
ENVOY_LOG(debug, "Creating CreateConnectionData");
Upstream::Host::CreateConnectionData data = connection_fn(parent);
initialize(data, parent);
}

Expand Down Expand Up @@ -204,7 +210,8 @@ class MultiplexedActiveClientBase : public CodecClientCallbacks,
public:
MultiplexedActiveClientBase(HttpConnPoolImplBase& parent, uint32_t effective_concurrent_streams,
uint32_t max_configured_concurrent_streams, Stats::Counter& cx_total,
OptRef<Upstream::Host::CreateConnectionData> data);
OptRef<Upstream::Host::CreateConnectionData> data,
CreateConnectionDataFn connection_fn = nullptr);
~MultiplexedActiveClientBase() override = default;
// Caps max streams per connection below 2^31 to prevent overflow.
static uint64_t maxStreamsPerConnection(uint64_t max_streams_config);
Expand Down
6 changes: 4 additions & 2 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1000,10 +1000,12 @@ void DownstreamFilterManager::sendLocalReply(
// route refreshment in the response filter chain.
cb->route(nullptr);
}


bool reverse_conn_force_local_reply =
Runtime::runtimeFeatureEnabled("envoy.reloadable_features.reverse_conn_force_local_reply");
// We only prepare a local reply to execute later if we're actively
// invoking filters to avoid re-entrant in filters.
if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) {
if (!reverse_conn_force_local_reply && state_.filter_call_state_ & FilterCallState::IsDecodingMask) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually a runtime guard has a 6 month deletion policy, and it is usually used to guard the data plane behavior change. I think here you want a parameter to tell if this is going to be a local reply on a reverse connection. Could you point me where you detect this use case, and I assume it is in your HTTP filter extension.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is related to the http filter. Basically, the reverse connection changes are dependent on the guard http_filter_avoid_reentrant_local_reply, which is now deprecated. Without it, when the responder accepts the reverse connection and calls sendLocalReply() to ACK the connection, the filter manager prepares a local reply to be sent after the filter execution returns. This results in a segfault as the connection is closed. To allow the reverse conn filter to send a local reply similar to http_filter_avoid_reentrant_local_reply, I added the runtime guard. Would you suggest adding a flag instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you really need the direct sendLocalReply() behavior, we should not use the runtime guard here, the original introduce of this runtime flag is to fix the reentrant issue, and all other normal users will not see the difference. We need to make this code path only work for the reverse connection with some flag like you did for connection to indicate this is the filter_manager that is used on reverse connection.

prepareLocalReplyViaFilterChain(is_grpc_request, code, body, modify_headers, is_head_request,
grpc_status, details);
} else {
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/headers.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ class HeaderValues {
const LowerCaseString XContentTypeOptions{"x-content-type-options"};
const LowerCaseString XSquashDebug{"x-squash-debug"};
const LowerCaseString EarlyData{"early-data"};
const LowerCaseString EnvoyDstNodeUUID{"x-remote-node-id"};
const LowerCaseString EnvoyDstClusterUUID{"x-dst-cluster-uuid"};

struct {
const std::string Close{"close"};
Expand Down
1 change: 1 addition & 0 deletions source/common/http/http2/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ envoy_cc_library(
srcs = ["conn_pool.cc"],
hdrs = ["conn_pool.h"],
deps = [
"//envoy/config:typed_config_interface",
"//envoy/event:dispatcher_interface",
"//envoy/upstream:upstream_interface",
"//source/common/http:codec_client_lib",
Expand Down
7 changes: 4 additions & 3 deletions source/common/http/http2/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

#include <cstdint>

#include "envoy/config/typed_config.h"
#include "envoy/event/dispatcher.h"
#include "envoy/upstream/upstream.h"

#include "source/common/http/http2/codec_impl.h"
#include "source/common/runtime/runtime_features.h"
Expand Down Expand Up @@ -37,11 +37,12 @@ uint32_t ActiveClient::calculateInitialStreamsLimit(
}

ActiveClient::ActiveClient(HttpConnPoolImplBase& parent,
OptRef<Upstream::Host::CreateConnectionData> data)
OptRef<Upstream::Host::CreateConnectionData> data,
CreateConnectionDataFn connection_fn)
: MultiplexedActiveClientBase(
parent, calculateInitialStreamsLimit(parent.cache(), parent.origin(), parent.host()),
parent.host()->cluster().http2Options().max_concurrent_streams().value(),
parent.host()->cluster().trafficStats()->upstream_cx_http2_total_, data) {}
parent.host()->cluster().trafficStats()->upstream_cx_http2_total_, data, connection_fn) {}

ConnectionPool::InstancePtr
allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
Expand Down
Loading
Loading