Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reverse connection: code changes for a listener that initiates reverse connections #37820

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ proto_library(
name = "v3_protos",
visibility = ["//visibility:public"],
deps = [
"//contrib/envoy/extensions/reverse_connection/reverse_connection_listener_config/v3alpha:pkg",
"//contrib/envoy/extensions/compression/qatzip/compressor/v3alpha:pkg",
"//contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha:pkg",
"//contrib/envoy/extensions/filters/http/checksum/v3alpha:pkg",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
deps = ["@com_github_cncf_xds//udpa/annotations:pkg"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
syntax = "proto3";

package envoy.extensions.reverse_connection.reverse_connection_listener_config.v3alpha;

import "google/protobuf/wrappers.proto";

import "udpa/annotations/status.proto";
import "validate/validate.proto";

option java_package = "io.envoyproxy.envoy.extensions.reverse_connection.reverse_connection_listener_config.v3alpha";
option java_outer_classname = "ReverseConnectionListenerConfigProto";
option java_multiple_files = true;
option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/reverse_connection/reverse_connection_listener_config/v3alpha";
option (udpa.annotations.file_status).package_version_status = ACTIVE;

// [#protodoc-title: Listener Config for Reverse Connections]
// [#extension: envoy.reverse_connection.reverse_connection_listener_config]

message ReverseConnectionCluster {
// Name of the cluster to initiate reverse connections to.
string cluster_name = 1 [(validate.rules).string = {min_len: 1}];

// Number of reverse connections need to cluster. Atleast one reverse connection
// has to be requested.
google.protobuf.UInt32Value reverse_connection_count = 2 [(validate.rules).uint32 = {gte: 1}];
}

// Configuration for envoy reverse connection listener. All the future reverse connection listener features
// should be added here.
message ReverseConnectionListenerConfig {
// The unique name for the initiator envoy while initiating reverse connections. This is
// a required field for reverse connections.
string src_node_id = 1 [(validate.rules).string = {min_len: 1}];

// The cluster ID (optional) of the initiator envoy.
string src_cluster_id = 2;

// Tenant ID (optional) of the initiator envoy.
string src_tenant_id = 3;

repeated ReverseConnectionCluster remote_cluster_to_conn_count = 4;
}
1 change: 1 addition & 0 deletions api/versioning/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ proto_library(
name = "active_protos",
visibility = ["//visibility:public"],
deps = [
"//contrib/envoy/extensions/reverse_connection/reverse_connection_listener_config/v3alpha:pkg",
"//contrib/envoy/extensions/compression/qatzip/compressor/v3alpha:pkg",
"//contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha:pkg",
"//contrib/envoy/extensions/config/v3alpha:pkg",
Expand Down
5 changes: 5 additions & 0 deletions contrib/contrib_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ CONTRIB_EXTENSIONS = {
"envoy.filters.network.rocketmq_proxy": "//contrib/rocketmq_proxy/filters/network/source:config",
"envoy.filters.network.golang": "//contrib/golang/filters/network/source:config",

#
# Reverse Connection
#
"envoy.reverse_connection.reverse_connection_listener_config": "//contrib/reverse_connection/reverse_connection_listener_config/source:reverse_connection_listener_config_lib",

#
# Sip proxy
#
Expand Down
7 changes: 7 additions & 0 deletions contrib/extensions_metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ envoy.tls.key_providers.qat:
- envoy.tls.key_providers
security_posture: robust_to_untrusted_downstream
status: alpha
envoy.reverse_connection.reverse_connection_listener_config:
categories:
- envoy.reverse_connection
security_posture: unknown
status: wip
type_urls:
- envoy.extensions.reverse_connection.reverse_connection_listener_config.v3alpha.ReverseConnectionListenerConfig
envoy.bootstrap.vcl:
categories:
- envoy.bootstrap
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_extension",
"envoy_cc_library",
"envoy_extension_package",
)

licenses(["notice"]) # Apache 2

envoy_extension_package()

envoy_cc_extension(
name = "reverse_connection_listener_config_lib",
srcs = ["reverse_connection_listener_config_impl.cc"],
hdrs = ["reverse_connection_listener_config_impl.h"],
visibility = ["//visibility:public"],
deps = [
"//envoy/network:listener_interface",
],
)

envoy_cc_library(
name = "active_reverse_connection_listener_lib",
srcs = ["active_reverse_connection_listener.cc"],
hdrs = [
"active_reverse_connection_listener.h",
],
visibility = ["//visibility:public"],
deps = [
"//contrib/reverse_connection/bootstrap/source:reverse_connection_includes",
"//envoy/event:dispatcher_interface",
"//envoy/network:filter_interface",
"//envoy/network:listener_interface",
"//source/common/listener_manager:active_stream_listener_base",
"//source/server:active_listener_base",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#include "contrib/reverse_connection/reverse_connection_listener_config/source/active_reverse_connection_listener.h"

namespace Envoy {
namespace Extensions {
namespace ReverseConnection {

ActiveReverseConnectionListener::ActiveReverseConnectionListener(
Network::ConnectionHandler& conn_handler, Event::Dispatcher& dispatcher,
Network::ListenerConfig& config, Bootstrap::ReverseConnection::RCThreadLocalRegistry& local_registry)
: Server::OwnedActiveStreamListenerBase(
conn_handler, dispatcher, std::make_unique<NetworkReverseConnectionListener>(), config), local_registry_(local_registry) {
startRCWorkflow(dispatcher, conn_handler, config);
}

ActiveReverseConnectionListener::ActiveReverseConnectionListener(
Network::ConnectionHandler& conn_handler, Event::Dispatcher& dispatcher,
Network::ListenerPtr listener, Network::ListenerConfig& config, Bootstrap::ReverseConnection::RCThreadLocalRegistry& local_registry)
: Server::OwnedActiveStreamListenerBase(conn_handler, dispatcher, std::move(listener), config), local_registry_(local_registry) {
}

ActiveReverseConnectionListener::~ActiveReverseConnectionListener() {
is_deleting_ = true;
// Purge sockets that have not progressed to connections. This should only happen when
// a listener filter stops iteration and never resumes.
while (!sockets_.empty()) {
auto removed = sockets_.front()->removeFromList(sockets_);
dispatcher().deferredDelete(std::move(removed));
}

for (auto& [chain, active_connections] : connections_by_context_) {
ASSERT(active_connections != nullptr);
auto& connections = active_connections->connections_;
while (!connections.empty()) {
connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush,
"reverse_conn_listener_draining");
}
}
dispatcher().clearDeferredDeleteList();
}

void ActiveReverseConnectionListener::startRCWorkflow(Event::Dispatcher& dispatcher,
Network::ConnectionHandler& conn_handler,
Network::ListenerConfig& config) {
ENVOY_LOG(debug, "Starting reverse conn workflow on worker: {} listener: {}", dispatcher.name(),
config.name());
local_registry_.getRCManager().registerRCInitiators(conn_handler, config);
}

void ActiveReverseConnectionListener::removeConnection(Server::ActiveTcpConnection& connection) {
// Remove the connection from all internal data structures maintained by the RCManager.
const std::string& connectionKey =
connection.connection_->getSocket()->connectionInfoProvider().localAddress()->asString();
ENVOY_LOG(
info,
"Connection ID :{} local address: {} remote address: {} closed ; Reporting to RCManager",
connection.connection_->id(), connectionKey,
connection.connection_->getSocket()
->connectionInfoProvider()
.remoteAddress()
->asStringView());

// Notify that an used reverse connection has been closed.
local_registry_.getRCManager().notifyConnectionClose(
connectionKey, true /* is_used */);

Server::OwnedActiveStreamListenerBase::removeConnection(connection);
}

void ActiveReverseConnectionListener::onAccept(Network::ConnectionSocketPtr&& socket) {
incNumConnections();
auto active_socket = std::make_unique<Server::ActiveTcpSocket>(
*this, std::move(socket), false);

onSocketAccepted(std::move(active_socket));
}

void ActiveReverseConnectionListener::onReject(RejectCause cause) {
switch (cause) {
case RejectCause::GlobalCxLimit:
stats_.downstream_global_cx_overflow_.inc();
break;
case RejectCause::OverloadAction:
stats_.downstream_cx_overload_reject_.inc();
break;
}
}

void ActiveReverseConnectionListener::recordConnectionsAcceptedOnSocketEvent(
uint32_t connections_accepted) {
stats_.connections_accepted_per_socket_event_.recordValue(connections_accepted);
}

void ActiveReverseConnectionListener::updateListenerConfig(Network::ListenerConfig& config) {
ENVOY_LOG(trace, "replacing listener ", config_->listenerTag(), " by ", config.listenerTag());
config_ = &config;
}

void ActiveReverseConnectionListener::newActiveConnection(
const Network::FilterChain& filter_chain, Network::ServerConnectionPtr server_conn_ptr,
std::unique_ptr<StreamInfo::StreamInfo> stream_info) {
auto& active_connections = getOrCreateActiveConnections(filter_chain);
auto active_connection = std::make_unique<Server::ActiveTcpConnection>(
active_connections, std::move(server_conn_ptr), dispatcher().timeSource(),
std::move(stream_info));
// If the connection is already closed, we can just let this connection immediately die.
if (active_connection->connection_->state() != Network::Connection::State::Closed) {
ENVOY_CONN_LOG(
debug, "new connection from {}", *active_connection->connection_,
active_connection->connection_->connectionInfoProvider().remoteAddress()->asString());
active_connection->connection_->addConnectionCallbacks(*active_connection);
LinkedList::moveIntoList(std::move(active_connection), active_connections.connections_);
}
}

} // namespace ReverseConnection
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#pragma once
#include <atomic>
#include <cstdint>
#include <list>
#include <memory>

#include "contrib/reverse_connection/bootstrap/source/reverse_conn_thread_local_registry.h"
#include "contrib/reverse_connection/bootstrap/source/reverse_connection_manager.h"
#include "contrib/reverse_connection/bootstrap/source/reverse_connection_handler.h"

#include "envoy/event/dispatcher.h"
#include "envoy/network/filter.h"
#include "envoy/network/listener.h"

#include "source/common/listener_manager/active_stream_listener_base.h"
#include "source/server/active_listener_base.h"


namespace Envoy {
namespace Extensions {
namespace ReverseConnection {
class ActiveReverseConnectionListener : public Server::OwnedActiveStreamListenerBase,
public Network::TcpListenerCallbacks,
public Network::ReverseConnectionListener {
public:
ActiveReverseConnectionListener(Network::ConnectionHandler& conn_handler,
Event::Dispatcher& dispatcher, Network::ListenerConfig& config, Bootstrap::ReverseConnection::RCThreadLocalRegistry& local_registry);
ActiveReverseConnectionListener(Network::ConnectionHandler& conn_handler,
Event::Dispatcher& dispatcher, Network::ListenerPtr listener,
Network::ListenerConfig& config, Bootstrap::ReverseConnection::RCThreadLocalRegistry& local_registry);
~ActiveReverseConnectionListener() override;

class NetworkReverseConnectionListener : public Network::Listener {

public:
// ReverseConnectionListener does not bind to port.
void disable() override {
ENVOY_LOG(debug, "Warning: the reverse connection listener cannot be disabled.");
}

void enable() override {
ENVOY_LOG(debug, "Warning: the reverse connection listener is always enabled.");
}

void setRejectFraction(UnitFloat) override {}
void configureLoadShedPoints(Server::LoadShedPointProvider&) override {}
bool shouldBypassOverloadManager() const override { return false; }
};

virtual void removeConnection(Server::ActiveTcpConnection& connection) override;

Network::BalancedConnectionHandlerOptRef
getBalancedHandlerByAddress(const Network::Address::Instance&) override {
// Reverse connection listener doesn't support migrate connection to another worker.
PANIC("not implemented");
}

void onAccept(Network::ConnectionSocketPtr&& socket) override;

// Network::TcpListenerCallbacks
void onReject(RejectCause) override;
void recordConnectionsAcceptedOnSocketEvent(uint32_t connections_accepted) override;

// ConnectionHandler::ActiveListener
uint64_t listenerTag() override { return OwnedActiveStreamListenerBase::listenerTag(); }
Network::Listener* listener() override { return listener_.get(); }
void pauseListening() override {
if (listener_ != nullptr) {
listener_->disable();
}
}
void resumeListening() override {
if (listener_ != nullptr) {
listener_->enable();
}
}
void shutdownListener(const Network::ExtraShutdownListenerOptions&) override {
listener_.reset();
}
void updateListenerConfig(Network::ListenerConfig& config) override;
void onFilterChainDraining(
const std::list<const Network::FilterChain*>& draining_filter_chains) override {
OwnedActiveStreamListenerBase::onFilterChainDraining(draining_filter_chains);
}

void startRCWorkflow(Event::Dispatcher& dispatcher, Network::ConnectionHandler& conn_handler, Network::ListenerConfig& config) override;

// ActiveStreamListenerBase
void incNumConnections() override { config_->openConnections().inc(); }
void decNumConnections() override { config_->openConnections().dec(); }

void newActiveConnection(const Network::FilterChain&, Network::ServerConnectionPtr,
std::unique_ptr<StreamInfo::StreamInfo>) override;

private:
Bootstrap::ReverseConnection::RCThreadLocalRegistry& local_registry_;
};
} // namespace ReverseConnection
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#include "contrib/reverse_connection/reverse_connection_listener_config/source/reverse_connection_listener_config_impl.h"

namespace Envoy {
namespace Extensions {
namespace ReverseConnection {
} // namespace ReverseConnection
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once
#include <atomic>
#include <cstdint>
#include <list>
#include <memory>

#include "envoy/network/listener.h"

namespace Envoy {
namespace Extensions {
namespace ReverseConnection {

class ReverseConnectionListenerConfigImpl : public Network::ReverseConnectionListenerConfig {
public:
ReverseConnectionListenerConfigImpl(ReverseConnParamsPtr params, Network::RevConnRegistry& registry)
: rc_local_params_(std::move(params)), registry_(registry) {}

ReverseConnParamsPtr& getReverseConnParams() override { return rc_local_params_; }

Network::RevConnRegistry& reverseConnRegistry() override { return registry_;}

private:
// Stores the parameters identifying the local envoy.
ReverseConnParamsPtr rc_local_params_;

// The global reverse connection registry.
Network::RevConnRegistry& registry_;
};

} // namespace ReverseConnection
} // namespace Extensions
} // namespace Envoy