Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reverse connection: code changes for reverse connection listener filter #37821

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ proto_library(
"//contrib/envoy/extensions/filters/http/language/v3alpha:pkg",
"//contrib/envoy/extensions/filters/http/squash/v3:pkg",
"//contrib/envoy/extensions/filters/http/sxg/v3alpha:pkg",
"//contrib/envoy/extensions/filters/listener/reverse_connection/v3alpha:pkg",
"//contrib/envoy/extensions/filters/network/client_ssl_auth/v3:pkg",
"//contrib/envoy/extensions/filters/network/generic_proxy/codecs/kafka/v3:pkg",
"//contrib/envoy/extensions/filters/network/golang/v3alpha:pkg",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
deps = ["@com_github_cncf_xds//udpa/annotations:pkg"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
syntax = "proto3";

package envoy.extensions.filters.listener.reverse_connection.v3alpha;

import "google/protobuf/wrappers.proto";

import "udpa/annotations/status.proto";
import "udpa/annotations/versioning.proto";

option java_package = "io.envoyproxy.envoy.extensions.filters.listener.reverse_connection.v3alpha";
option java_outer_classname = "ReverseConnectionProto";
option java_multiple_files = true;
option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/reverse_connection/v3alpha";
option (udpa.annotations.file_status).package_version_status = ACTIVE;

// [#protodoc-title: Reverse Connection Filter]
// PROXY protocol listener filter.
// [#extension: envoy.filters.listener.reverse_connection]

message ReverseConnection {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.listener.reverse_connection.v2.ReverseConnection";

google.protobuf.UInt32Value ping_wait_timeout = 1;
}
1 change: 1 addition & 0 deletions api/versioning/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ proto_library(
"//contrib/envoy/extensions/filters/http/language/v3alpha:pkg",
"//contrib/envoy/extensions/filters/http/squash/v3:pkg",
"//contrib/envoy/extensions/filters/http/sxg/v3alpha:pkg",
"//contrib/envoy/extensions/filters/listener/reverse_connection/v3alpha:pkg",
"//contrib/envoy/extensions/filters/network/client_ssl_auth/v3:pkg",
"//contrib/envoy/extensions/filters/network/generic_proxy/codecs/kafka/v3:pkg",
"//contrib/envoy/extensions/filters/network/golang/v3alpha:pkg",
Expand Down
6 changes: 6 additions & 0 deletions contrib/contrib_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ CONTRIB_EXTENSIONS = {
"envoy.filters.network.rocketmq_proxy": "//contrib/rocketmq_proxy/filters/network/source:config",
"envoy.filters.network.golang": "//contrib/golang/filters/network/source:config",

#
# Listener filters
#

"envoy.filters.listener.reverse_connection": "//contrib/reverse_connection/filters/listener/source:config",

#
# Sip proxy
#
Expand Down
7 changes: 7 additions & 0 deletions contrib/extensions_metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ envoy.filters.http.sxg:
- envoy.filters.http
security_posture: robust_to_untrusted_downstream
status: alpha
envoy.filters.listener.reverse_connection:
categories:
- envoy.filters.listener
security_posture: robust_to_untrusted_downstream
status: alpha
type_urls:
- envoy.extensions.filters.listener.reverse_connection.v3alpha.ReverseConnection
envoy.filters.network.client_ssl_auth:
categories:
- envoy.filters.network
Expand Down
55 changes: 55 additions & 0 deletions contrib/reverse_connection/filters/listener/source/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_extension",
"envoy_cc_library",
"envoy_extension_package",
)

licenses(["notice"]) # Apache 2

envoy_extension_package()

envoy_cc_library(
name = "reverse_connection_lib",
srcs = ["reverse_connection.cc"],
hdrs = ["reverse_connection.h"],
deps = [
":config_lib",
"//envoy/event:dispatcher_interface",
"//envoy/network:filter_interface",
"//envoy/network:listen_socket_interface",
"//source/common/api:os_sys_calls_lib",
"//source/common/buffer:buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"//contrib/reverse_connection/bootstrap/source:reverse_conn_global_registry",
],
)

envoy_cc_library(
name = "config_lib",
srcs = ["config.cc"],
hdrs = ["config.h"],
deps = [
"//source/common/protobuf:utility_lib",
"@envoy_api//contrib/envoy/extensions/filters/listener/reverse_connection/v3alpha:pkg_cc_proto",
],
)

envoy_cc_extension(
name = "config", # The extension build system requires a library named config
srcs = ["config_factory.cc"],
hdrs = ["config_factory.h"],
# category = "envoy.filters.listener",
# security_posture = "robust_to_untrusted_downstream",
extra_visibility = [
"//test/integration:__subpackages__",
],
deps = [
":config_lib",
":reverse_connection_lib",
"//envoy/registry",
"//envoy/server:filter_config_interface",
"@envoy_api//contrib/envoy/extensions/filters/listener/reverse_connection/v3alpha:pkg_cc_proto",
],
)
18 changes: 18 additions & 0 deletions contrib/reverse_connection/filters/listener/source/config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#include "contrib/reverse_connection//filters/listener/source/config.h"

#include "source/common/protobuf/utility.h"

namespace Envoy {
namespace Extensions {
namespace ListenerFilters {
namespace ReverseConnection {

Config::Config(
const envoy::extensions::filters::listener::reverse_connection::v3alpha::ReverseConnection& config)
: ping_wait_timeout_(
std::chrono::seconds(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, ping_wait_timeout, 10))) {}

} // namespace ReverseConnection
} // namespace ListenerFilters
} // namespace Extensions
} // namespace Envoy
24 changes: 24 additions & 0 deletions contrib/reverse_connection/filters/listener/source/config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include "contrib/envoy/extensions/filters/listener/reverse_connection/v3alpha/reverse_connection.pb.h"

namespace Envoy {
namespace Extensions {
namespace ListenerFilters {
namespace ReverseConnection {

class Config {
public:
Config(const envoy::extensions::filters::listener::reverse_connection::v3alpha::ReverseConnection&
config);

std::chrono::seconds pingWaitTimeout() const { return ping_wait_timeout_; }

private:
const std::chrono::seconds ping_wait_timeout_;
};

} // namespace ReverseConnection
} // namespace ListenerFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include "contrib/reverse_connection//filters/listener/source/config_factory.h"

#include "contrib/envoy/extensions/filters/listener/reverse_connection/v3alpha/reverse_connection.pb.h"
#include "contrib/envoy/extensions/filters/listener/reverse_connection/v3alpha/reverse_connection.pb.validate.h"

#include "contrib/reverse_connection//filters/listener/source/config.h"
#include "contrib/reverse_connection//filters/listener/source/reverse_connection.h"

namespace Envoy {
namespace Extensions {
namespace ListenerFilters {
namespace ReverseConnection {

Network::ListenerFilterFactoryCb
ReverseConnectionConfigFactory::createListenerFilterFactoryFromProto(
const Protobuf::Message& message,
const Network::ListenerFilterMatcherSharedPtr& listener_filter_matcher,
Server::Configuration::ListenerFactoryContext& context) {
auto proto_config = MessageUtil::downcastAndValidate<
const envoy::extensions::filters::listener::reverse_connection::v3alpha::ReverseConnection&>(
message, context.messageValidationVisitor());
// Retrieve the ReverseConnRegistry singleton and acecss the thread local slot
std::shared_ptr<ReverseConnection::ReverseConnRegistry> reverse_conn_registry =
context.serverFactoryContext().singletonManager().getTyped<ReverseConnection::ReverseConnRegistry>("reverse_conn_registry_singleton");
if (reverse_conn_registry == nullptr) {
throw EnvoyException(
"Cannot create reverse connection listener filter. Reverse connection registry not found");
}
// ReverseConnection::RCThreadLocalRegistry* thread_local_registry = reverse_conn_registry->getLocalRegistry();
// if (thread_local_registry == nullptr) {
// throw EnvoyException("Cannot create reverse connection listener filter. Thread local reverse connection registry is null");
// }
Config config(proto_config);
return [listener_filter_matcher, config, reverse_conn_registry](Network::ListenerFilterManager& filter_manager) -> void {
filter_manager.addAcceptFilter(listener_filter_matcher, std::make_unique<Filter>(config, reverse_conn_registry));
};
}

ProtobufTypes::MessagePtr ReverseConnectionConfigFactory::createEmptyConfigProto() {
return std::make_unique<
envoy::extensions::filters::listener::reverse_connection::v3alpha::ReverseConnection>();
}

REGISTER_FACTORY(ReverseConnectionConfigFactory,
Server::Configuration::NamedListenerFilterConfigFactory);

} // namespace ReverseConnection
} // namespace ListenerFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "envoy/registry/registry.h"
#include "envoy/server/filter_config.h"

namespace Envoy {
namespace Extensions {
namespace ListenerFilters {
namespace ReverseConnection {

class ReverseConnectionConfigFactory
: public Server::Configuration::NamedListenerFilterConfigFactory {
public:
Network::ListenerFilterFactoryCb createListenerFilterFactoryFromProto(
const Protobuf::Message& config,
const Network::ListenerFilterMatcherSharedPtr& listener_filter_matcher,
Server::Configuration::ListenerFactoryContext& context) override;

ProtobufTypes::MessagePtr createEmptyConfigProto() override;

std::string name() const override { return "envoy.listener.reverse_connection"; }
};

} // namespace ReverseConnection
} // namespace ListenerFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#include "contrib/reverse_connection//filters/listener/source/reverse_connection.h"

#include <string.h>
#include <sys/ioctl.h>
#include <unistd.h>

#include <memory>
#include <string>

#include "envoy/common/exception.h"
#include "envoy/network/listen_socket.h"

#include "source/common/api/os_sys_calls_impl.h"
#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/assert.h"

// #include "source/common/network/io_socket_handle_impl.h"

namespace Envoy {
namespace Extensions {
namespace ListenerFilters {
namespace ReverseConnection {

const absl::string_view Filter::RPING_MSG = "RPING";
const absl::string_view Filter::PROXY_MSG = "PROXY";

Filter::Filter(const Config& config, std::shared_ptr<ReverseConnection::ReverseConnRegistry> reverse_conn_registry) : config_(config), reverse_conn_registry_(reverse_conn_registry) {
ENVOY_LOG(debug, "reverse_connection: ping_wait_timeout is {}",
config_.pingWaitTimeout().count());
}

int Filter::fd() { return cb_->socket().ioHandle().fdDoNotUse(); }

Filter::~Filter() {}

void Filter::onClose() {
ENVOY_LOG(debug, "reverse_connection: close");

const std::string& connectionKey =
cb_->socket().connectionInfoProvider().localAddress()->asString();

// The rc filter responds to pings until data is received, so if onClose() is invoked,
// then an idle reverse connection has been closed.
reverseConnectionManager().notifyConnectionClose(
connectionKey, false /* is_used */);
// Marking the connection as used here to avoid marking the socket dead again in the destructor.
}

Network::FilterStatus Filter::onAccept(Network::ListenerFilterCallbacks& cb) {
ENVOY_LOG(debug, "reverse_connection: New connection accepted");
cb_ = &cb;
ping_wait_timer_ = cb.dispatcher().createTimer([this]() { onPingWaitTimeout(); });
ping_wait_timer_->enableTimer(config_.pingWaitTimeout());

// Wait for data.
return Network::FilterStatus::StopIteration;
}

size_t Filter::maxReadBytes() const { return RPING_MSG.length(); }

void Filter::onPingWaitTimeout() {
ENVOY_LOG(debug, "reverse_connection: timed out waiting for ping request");
const std::string& connectionKey =
cb_->socket().connectionInfoProvider().localAddress()->asString();
ENVOY_LOG(debug,
"Connection socket FD: {} local address: {} remote address: {} closed; Reporting to "
"RCManager.",
fd(), connectionKey,
cb_->socket().connectionInfoProvider().remoteAddress()->asStringView());
reverseConnectionManager().notifyConnectionClose(
connectionKey, false);
cb_->continueFilterChain(false);
}

Network::FilterStatus Filter::onData(Network::ListenerFilterBuffer& buffer) {
const ReadOrParseState read_state = parseBuffer(buffer);
switch (read_state) {
case ReadOrParseState::Error:
return Network::FilterStatus::StopIteration;
case ReadOrParseState::TryAgainLater:
return Network::FilterStatus::StopIteration;
case ReadOrParseState::Done:
ENVOY_LOG(debug, "reverse_connection: marking the socket ready for use, fd {}", fd());
// Call the RC Manager to update the RCManager Stats and log the connection used.
const std::string& connectionKey =
cb_->socket().connectionInfoProvider().localAddress()->asString();
reverseConnectionManager().markConnUsed(connectionKey);
return Network::FilterStatus::Continue;
}
return Network::FilterStatus::Continue;
}

ReadOrParseState Filter::parseBuffer(Network::ListenerFilterBuffer& buffer) {
auto raw_slice = buffer.rawSlice();
auto buf = absl::string_view(static_cast<const char*>(raw_slice.mem_), raw_slice.len_);

ENVOY_LOG(debug, "reverse_connection: Data received, len: {}", buf.length());
if (buf.length() == 0) {
// Remote closed.
return ReadOrParseState::Error;
}

// We will compare the received bytes with the expected "RPING" msg. If,
// we found that the received bytes are not "RPING", this means, that peer
// socket is assigned to an upstream cluster. Otherwise, we will send "RPING"
// as a response.
if (!memcmp(buf.data(), RPING_MSG.data(), RPING_MSG.length())) {
ENVOY_LOG(debug, "reverse_connection: Revceived {} msg on fd {}", RPING_MSG, fd());
if (!buffer.drain(RPING_MSG.length())) {
ENVOY_LOG(error, "reverse_connection: could not drain buffer for ping message");
}

// Echo the RPING message back.
Buffer::OwnedImpl rping_buf(RPING_MSG);
const Api::IoCallUint64Result write_result = cb_->socket().ioHandle().write(rping_buf);
if (write_result.ok()) {
ENVOY_LOG(trace, "reverse_connection: fd {} send ping response rc:{}", fd(),
write_result.return_value_);
} else {
ENVOY_LOG(trace, "reverse_connection: fd {} send ping response rc:{} errno {}", fd(),
write_result.return_value_, write_result.err_->getErrorDetails());
}
ping_wait_timer_->enableTimer(config_.pingWaitTimeout());
// Return a status to wait for data.
return ReadOrParseState::TryAgainLater;
}

ENVOY_LOG(debug, "reverse_connection: fd {} received data, stopping RPINGs", fd());
return ReadOrParseState::Done;
}

} // namespace ReverseConnection
} // namespace ListenerFilters
} // namespace Extensions
} // namespace Envoy
Loading