Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ target_link_libraries(
presto_function_metadata
presto_connectors
presto_http
presto_thrift_server
presto_operators
presto_session_properties
presto_velox_plan_conversion
Expand Down
77 changes: 73 additions & 4 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "presto_cpp/main/operators/ShuffleRead.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "presto_cpp/main/types/VeloxPlanConversion.h"
#include "thrift/server/ThriftServer.h"
#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/caching/CacheTTLController.h"
Expand Down Expand Up @@ -260,9 +261,9 @@ void PrestoServer::run() {
}

sslContext_ = util::createSSLContext(
optionalClientCertPath.value(),
ciphers,
systemConfig->httpClientHttp2Enabled());
optionalClientCertPath.value(), ciphers, util::SSLProtocol::HTTP_1_1);
thriftSslContext_ = util::createSSLContext(
optionalClientCertPath.value(), ciphers, util::SSLProtocol::THRIFT);
}

if (systemConfig->internalCommunicationJwtEnabled()) {
Expand Down Expand Up @@ -619,6 +620,8 @@ void PrestoServer::run() {
}
};

startThriftServer(bindToNodeInternalAddressOnly, certPath, keyPath, ciphers);

// Start everything. After the return from the following call we are shutting
// down.
httpServer_->start(getHttpServerFilters(), [&](proxygen::HTTPServer* server) {
Expand Down Expand Up @@ -679,6 +682,7 @@ void PrestoServer::run() {
taskManager_.reset();
PRESTO_SHUTDOWN_LOG(INFO) << "Destroying HTTP Server";
httpServer_.reset();
thriftServer_.reset();

unregisterFileReadersAndWriters();
unregisterFileSystems();
Expand Down Expand Up @@ -1070,6 +1074,7 @@ void PrestoServer::stop() {
httpServer_->stop();
PRESTO_SHUTDOWN_LOG(INFO) << "HTTP Server stopped.";
}
shutdownThriftServer();
}

size_t PrestoServer::numDriverThreads() const {
Expand Down Expand Up @@ -1492,7 +1497,7 @@ void PrestoServer::enableWorkerStatsReporting() {

void PrestoServer::initVeloxPlanValidator() {
VELOX_CHECK_NULL(planValidator_);
planValidator_ = std::make_unique<VeloxPlanValidator>();
planValidator_ = std::make_shared<VeloxPlanValidator>();
}

VeloxPlanValidator* PrestoServer::getVeloxPlanValidator() {
Expand Down Expand Up @@ -1793,6 +1798,70 @@ void PrestoServer::createTaskManager() {
driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get());
}

void PrestoServer::startThriftServer(
bool bindToNodeInternalAddressOnly,
const std::string& certPath,
const std::string& keyPath,
const std::string& ciphers) {
auto* systemConfig = SystemConfig::instance();
bool thriftServerEnabled = systemConfig->thriftServerEnabled();

if (thriftServerEnabled) {
std::unique_ptr<thrift::ThriftConfig> thriftConfig;
folly::SocketAddress thriftAddress;
int thriftPort = systemConfig->thriftServerPort();
if (bindToNodeInternalAddressOnly) {
thriftAddress.setFromHostPort(address_, thriftPort);
} else {
thriftAddress.setFromLocalPort(thriftPort);
}
thriftConfig = std::make_unique<thrift::ThriftConfig>(
thriftAddress, certPath, keyPath, ciphers);
thriftServer_ = std::make_unique<thrift::ThriftServer>(
std::move(thriftConfig),
httpSrvIoExecutor_,
pool_,
planValidator_,
taskManager_);

thriftServerFuture_ =
folly::via(folly::getGlobalCPUExecutor().get())
.thenTry([this](folly::Try<folly::Unit>) {
try {
PRESTO_STARTUP_LOG(INFO)
<< "Starting Thrift server asynchronously...";
thriftServer_->start();
PRESTO_STARTUP_LOG(INFO)
<< "Thrift server started successfully";
} catch (const std::exception& e) {
PRESTO_STARTUP_LOG(ERROR)
<< "Thrift server failed to start: " << e.what();
throw;
}
});
}
}

void PrestoServer::shutdownThriftServer() {
if (thriftServer_) {
PRESTO_SHUTDOWN_LOG(INFO) << "Stopping Thrift server";
thriftServer_->stop();

// Wait for Thrift server thread to complete with timeout
try {
std::move(thriftServerFuture_)
.within(std::chrono::seconds(5)) // 5-second timeout
.get();
PRESTO_SHUTDOWN_LOG(INFO) << "Thrift server stopped gracefully";
} catch (const std::exception& e) {
PRESTO_SHUTDOWN_LOG(WARNING)
<< "Thrift server shutdown timeout or error: " << e.what();
}

thriftServer_.reset();
}
}

void PrestoServer::reportNodeStats(proxygen::ResponseHandler* downstream) {
protocol::NodeStats nodeStats;

Expand Down
21 changes: 19 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ namespace facebook::presto::http {
class HttpServer;
}

namespace facebook::presto::thrift {
class ThriftServer;
}

namespace proxygen {
class ResponseHandler;
} // namespace proxygen
Expand Down Expand Up @@ -231,6 +235,16 @@ class PrestoServer {

virtual void createTaskManager();

/// Utility method to start the Thrift server if enabled
void startThriftServer(
bool bindToNodeInternalAddressOnly,
const std::string& certPath,
const std::string& keyPath,
const std::string& ciphers);

/// Utility method to safely shutdown the Thrift server if running
void shutdownThriftServer();

const std::string configDirectoryPath_;

std::shared_ptr<CoordinatorDiscoverer> coordinatorDiscoverer_;
Expand Down Expand Up @@ -269,20 +283,22 @@ class PrestoServer {
// Executor for spilling.
std::unique_ptr<folly::CPUThreadPoolExecutor> spillerExecutor_;

std::unique_ptr<VeloxPlanValidator> planValidator_;
std::shared_ptr<VeloxPlanValidator> planValidator_;

std::unique_ptr<http::HttpClientConnectionPool> exchangeSourceConnectionPool_;

// If not null, the instance of AsyncDataCache used for in-memory file cache.
std::shared_ptr<velox::cache::AsyncDataCache> cache_;

std::unique_ptr<http::HttpServer> httpServer_;
std::unique_ptr<thrift::ThriftServer> thriftServer_;
folly::Future<folly::Unit> thriftServerFuture_{folly::makeFuture()};
std::unique_ptr<SignalHandler> signalHandler_;
std::unique_ptr<Announcer> announcer_;
std::unique_ptr<PeriodicHeartbeatManager> heartbeatManager_;
std::shared_ptr<velox::memory::MemoryPool> pool_;
std::shared_ptr<velox::memory::MemoryPool> nativeWorkerPool_;
std::unique_ptr<TaskManager> taskManager_;
std::shared_ptr<TaskManager> taskManager_;
std::unique_ptr<TaskResource> taskResource_;
std::atomic<NodeState> nodeState_{NodeState::kActive};
folly::Synchronized<bool> shuttingDown_{false};
Expand Down Expand Up @@ -313,6 +329,7 @@ class PrestoServer {
std::string nodeLocation_;
std::string nodePoolType_;
folly::SSLContextPtr sslContext_;
folly::SSLContextPtr thriftSslContext_;
std::string prestoBuiltinFunctionPrefix_;
};

Expand Down
35 changes: 35 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ SystemConfig::SystemConfig() {
std::unordered_map<std::string, folly::Optional<std::string>>{
BOOL_PROP(kMutableConfig, false),
NONE_PROP(kPrestoVersion),
BOOL_PROP(kThriftServerEnabled, true),
NUM_PROP(kThriftServerPort, 9090),
NUM_PROP(kThriftServerMaxConnections, 50000),
NUM_PROP(kThriftServerMaxRequests, 200),
NUM_PROP(kThriftServerIdleTimeout, 120000),
NUM_PROP(kThriftServerTaskExpireTimeMs, 60000),
NUM_PROP(kThriftServerStreamExpireTime, 60000),
NONE_PROP(kHttpServerHttpPort),
BOOL_PROP(kHttpServerReusePort, false),
BOOL_PROP(kHttpServerBindToNodeInternalAddressOnlyEnabled, false),
Expand Down Expand Up @@ -279,6 +286,34 @@ SystemConfig* SystemConfig::instance() {
return instance.get();
}

bool SystemConfig::thriftServerEnabled() const {
return optionalProperty<bool>(kThriftServerEnabled).value();
}

int32_t SystemConfig::thriftServerPort() const {
return optionalProperty<int32_t>(kThriftServerPort).value();
}

int32_t SystemConfig::thriftServerMaxConnections() const {
return optionalProperty<int32_t>(kThriftServerMaxConnections).value();
}

int32_t SystemConfig::thriftServerMaxRequests() const {
return optionalProperty<int32_t>(kThriftServerMaxRequests).value();
}

int32_t SystemConfig::thriftServerIdleTimeout() const {
return optionalProperty<int32_t>(kThriftServerIdleTimeout).value();
}

int32_t SystemConfig::thriftServerTaskExpireTimeMs() const {
return optionalProperty<int32_t>(kThriftServerTaskExpireTimeMs).value();
}

int32_t SystemConfig::thriftServerStreamExpireTime() const {
return optionalProperty<int32_t>(kThriftServerStreamExpireTime).value();
}

int SystemConfig::httpServerHttpPort() const {
return requiredProperty<int>(kHttpServerHttpPort);
}
Expand Down
40 changes: 40 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,30 @@ class ConfigBase {
class SystemConfig : public ConfigBase {
public:
static constexpr std::string_view kPrestoVersion{"presto.version"};

/// Thrift server configuration
static constexpr std::string_view kThriftServerEnabled{
"presto.thrift-server.enabled"};

static constexpr std::string_view kThriftServerPort{
"presto.thrift-server.port"};

static constexpr std::string_view kThriftServerMaxConnections{
"presto.thrift-server.max-connections"};

static constexpr std::string_view kThriftServerMaxRequests{
"presto.thrift-server.max-requests"};

static constexpr std::string_view kThriftServerIdleTimeout{
"presto.thrift-server.idle-timeout"};

static constexpr std::string_view kThriftServerTaskExpireTimeMs{
"presto.thrift-server.task-expire-time-ms"};

static constexpr std::string_view kThriftServerStreamExpireTime{
"presto.thrift-server.stream-expire-time"};

/// HTTP server configuration
static constexpr std::string_view kHttpServerHttpPort{
"http-server.http.port"};

Expand Down Expand Up @@ -808,6 +832,22 @@ class SystemConfig : public ConfigBase {

static SystemConfig* instance();

// Thrift server configuration
bool thriftServerEnabled() const;

int32_t thriftServerPort() const;

int32_t thriftServerMaxConnections() const;

int32_t thriftServerMaxRequests() const;

int32_t thriftServerIdleTimeout() const;

int32_t thriftServerTaskExpireTimeMs() const;

int32_t thriftServerStreamExpireTime() const;

// HTTP server configuration
int httpServerHttpPort() const;

bool httpServerReusePort() const;
Expand Down
19 changes: 14 additions & 5 deletions presto-native-execution/presto_cpp/main/common/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,25 @@ DateTime toISOTimestamp(uint64_t timeMilli) {
std::shared_ptr<folly::SSLContext> createSSLContext(
const std::string& clientCertAndKeyPath,
const std::string& ciphers,
bool http2Enabled) {
SSLProtocol protocol) {
try {
auto sslContext = std::make_shared<folly::SSLContext>();
sslContext->loadCertKeyPairFromFiles(
clientCertAndKeyPath.c_str(), clientCertAndKeyPath.c_str());
sslContext->setCiphersOrThrow(ciphers);
if (http2Enabled) {
sslContext->setAdvertisedNextProtocols({"h2", "http/1.1"});
} else {
sslContext->setAdvertisedNextProtocols({"http/1.1"});
switch (protocol) {
case SSLProtocol::THRIFT:
sslContext->setVerificationOption(
folly::SSLContext::SSLVerifyPeerEnum::NO_VERIFY);
// Set ALPN for Rocket protocol
sslContext->setAdvertisedNextProtocols({"rs"});
break;
case SSLProtocol::HTTP_1_1:
sslContext->setAdvertisedNextProtocols({"http/1.1"});
break;
case SSLProtocol::HTTP_2:
sslContext->setAdvertisedNextProtocols({"h2", "http/1.1"});
break;
}
return sslContext;
} catch (const std::exception& ex) {
Expand Down
8 changes: 7 additions & 1 deletion presto-native-execution/presto_cpp/main/common/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@ namespace facebook::presto::util {
using DateTime = std::string;
DateTime toISOTimestamp(uint64_t timeMilli);

enum class SSLProtocol {
THRIFT, // Rocket protocol (rs)
HTTP_1_1, // HTTP/1.1
HTTP_2 // HTTP/2 (h2)
};

std::shared_ptr<folly::SSLContext> createSSLContext(
const std::string& clientCertAndKeyPath,
const std::string& ciphers,
bool http2Enabled);
SSLProtocol protocol);

/// Returns current process-wide CPU time in nanoseconds.
long getProcessCpuTimeNs();
Expand Down
Loading
Loading