Skip to content

Commit 30e0ee8

Browse files
committed
feat(native): Add thrift server for exchange service
1 parent 6a51bc1 commit 30e0ee8

File tree

14 files changed

+705
-13
lines changed

14 files changed

+705
-13
lines changed

presto-native-execution/presto_cpp/main/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ target_link_libraries(
6363
presto_function_metadata
6464
presto_connectors
6565
presto_http
66+
presto_thrift_server
6667
presto_operators
6768
presto_session_properties
6869
presto_velox_plan_conversion

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
#include "presto_cpp/main/operators/ShuffleRead.h"
4444
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
4545
#include "presto_cpp/main/types/VeloxPlanConversion.h"
46+
#include "thrift/server/ThriftServer.h"
4647
#include "velox/common/base/Counters.h"
4748
#include "velox/common/base/StatsReporter.h"
4849
#include "velox/common/caching/CacheTTLController.h"
@@ -260,9 +261,9 @@ void PrestoServer::run() {
260261
}
261262

262263
sslContext_ = util::createSSLContext(
263-
optionalClientCertPath.value(),
264-
ciphers,
265-
systemConfig->httpClientHttp2Enabled());
264+
optionalClientCertPath.value(), ciphers, util::SSLProtocol::HTTP_1_1);
265+
thriftSslContext_ = util::createSSLContext(
266+
optionalClientCertPath.value(), ciphers, util::SSLProtocol::THRIFT);
266267
}
267268

268269
if (systemConfig->internalCommunicationJwtEnabled()) {
@@ -619,6 +620,8 @@ void PrestoServer::run() {
619620
}
620621
};
621622

623+
startThriftServer(bindToNodeInternalAddressOnly, certPath, keyPath, ciphers);
624+
622625
// Start everything. After the return from the following call we are shutting
623626
// down.
624627
httpServer_->start(getHttpServerFilters(), [&](proxygen::HTTPServer* server) {
@@ -679,6 +682,7 @@ void PrestoServer::run() {
679682
taskManager_.reset();
680683
PRESTO_SHUTDOWN_LOG(INFO) << "Destroying HTTP Server";
681684
httpServer_.reset();
685+
thriftServer_.reset();
682686

683687
unregisterFileReadersAndWriters();
684688
unregisterFileSystems();
@@ -1070,6 +1074,7 @@ void PrestoServer::stop() {
10701074
httpServer_->stop();
10711075
PRESTO_SHUTDOWN_LOG(INFO) << "HTTP Server stopped.";
10721076
}
1077+
shutdownThriftServer();
10731078
}
10741079

10751080
size_t PrestoServer::numDriverThreads() const {
@@ -1492,7 +1497,7 @@ void PrestoServer::enableWorkerStatsReporting() {
14921497

14931498
void PrestoServer::initVeloxPlanValidator() {
14941499
VELOX_CHECK_NULL(planValidator_);
1495-
planValidator_ = std::make_unique<VeloxPlanValidator>();
1500+
planValidator_ = std::make_shared<VeloxPlanValidator>();
14961501
}
14971502

14981503
VeloxPlanValidator* PrestoServer::getVeloxPlanValidator() {
@@ -1793,6 +1798,70 @@ void PrestoServer::createTaskManager() {
17931798
driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get());
17941799
}
17951800

1801+
void PrestoServer::startThriftServer(
1802+
bool bindToNodeInternalAddressOnly,
1803+
const std::string& certPath,
1804+
const std::string& keyPath,
1805+
const std::string& ciphers) {
1806+
auto* systemConfig = SystemConfig::instance();
1807+
bool thriftServerEnabled = systemConfig->thriftServerEnabled();
1808+
1809+
if (thriftServerEnabled) {
1810+
std::unique_ptr<thrift::ThriftConfig> thriftConfig;
1811+
folly::SocketAddress thriftAddress;
1812+
int thriftPort = systemConfig->thriftServerPort();
1813+
if (bindToNodeInternalAddressOnly) {
1814+
thriftAddress.setFromHostPort(address_, thriftPort);
1815+
} else {
1816+
thriftAddress.setFromLocalPort(thriftPort);
1817+
}
1818+
thriftConfig = std::make_unique<thrift::ThriftConfig>(
1819+
thriftAddress, certPath, keyPath, ciphers);
1820+
thriftServer_ = std::make_unique<thrift::ThriftServer>(
1821+
std::move(thriftConfig),
1822+
httpSrvIoExecutor_,
1823+
pool_,
1824+
planValidator_,
1825+
taskManager_);
1826+
1827+
thriftServerFuture_ =
1828+
folly::via(folly::getGlobalCPUExecutor().get())
1829+
.thenTry([this](folly::Try<folly::Unit>) {
1830+
try {
1831+
PRESTO_STARTUP_LOG(INFO)
1832+
<< "Starting Thrift server asynchronously...";
1833+
thriftServer_->start();
1834+
PRESTO_STARTUP_LOG(INFO)
1835+
<< "Thrift server started successfully";
1836+
} catch (const std::exception& e) {
1837+
PRESTO_STARTUP_LOG(ERROR)
1838+
<< "Thrift server failed to start: " << e.what();
1839+
throw;
1840+
}
1841+
});
1842+
}
1843+
}
1844+
1845+
void PrestoServer::shutdownThriftServer() {
1846+
if (thriftServer_) {
1847+
PRESTO_SHUTDOWN_LOG(INFO) << "Stopping Thrift server";
1848+
thriftServer_->stop();
1849+
1850+
// Wait for Thrift server thread to complete with timeout
1851+
try {
1852+
std::move(thriftServerFuture_)
1853+
.within(std::chrono::seconds(5)) // 5-second timeout
1854+
.get();
1855+
PRESTO_SHUTDOWN_LOG(INFO) << "Thrift server stopped gracefully";
1856+
} catch (const std::exception& e) {
1857+
PRESTO_SHUTDOWN_LOG(WARNING)
1858+
<< "Thrift server shutdown timeout or error: " << e.what();
1859+
}
1860+
1861+
thriftServer_.reset();
1862+
}
1863+
}
1864+
17961865
void PrestoServer::reportNodeStats(proxygen::ResponseHandler* downstream) {
17971866
protocol::NodeStats nodeStats;
17981867

presto-native-execution/presto_cpp/main/PrestoServer.h

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ namespace facebook::presto::http {
4848
class HttpServer;
4949
}
5050

51+
namespace facebook::presto::thrift {
52+
class ThriftServer;
53+
}
54+
5155
namespace proxygen {
5256
class ResponseHandler;
5357
} // namespace proxygen
@@ -231,6 +235,16 @@ class PrestoServer {
231235

232236
virtual void createTaskManager();
233237

238+
/// Utility method to start the Thrift server if enabled
239+
void startThriftServer(
240+
bool bindToNodeInternalAddressOnly,
241+
const std::string& certPath,
242+
const std::string& keyPath,
243+
const std::string& ciphers);
244+
245+
/// Utility method to safely shutdown the Thrift server if running
246+
void shutdownThriftServer();
247+
234248
const std::string configDirectoryPath_;
235249

236250
std::shared_ptr<CoordinatorDiscoverer> coordinatorDiscoverer_;
@@ -269,20 +283,22 @@ class PrestoServer {
269283
// Executor for spilling.
270284
std::unique_ptr<folly::CPUThreadPoolExecutor> spillerExecutor_;
271285

272-
std::unique_ptr<VeloxPlanValidator> planValidator_;
286+
std::shared_ptr<VeloxPlanValidator> planValidator_;
273287

274288
std::unique_ptr<http::HttpClientConnectionPool> exchangeSourceConnectionPool_;
275289

276290
// If not null, the instance of AsyncDataCache used for in-memory file cache.
277291
std::shared_ptr<velox::cache::AsyncDataCache> cache_;
278292

279293
std::unique_ptr<http::HttpServer> httpServer_;
294+
std::unique_ptr<thrift::ThriftServer> thriftServer_;
295+
folly::Future<folly::Unit> thriftServerFuture_{folly::makeFuture()};
280296
std::unique_ptr<SignalHandler> signalHandler_;
281297
std::unique_ptr<Announcer> announcer_;
282298
std::unique_ptr<PeriodicHeartbeatManager> heartbeatManager_;
283299
std::shared_ptr<velox::memory::MemoryPool> pool_;
284300
std::shared_ptr<velox::memory::MemoryPool> nativeWorkerPool_;
285-
std::unique_ptr<TaskManager> taskManager_;
301+
std::shared_ptr<TaskManager> taskManager_;
286302
std::unique_ptr<TaskResource> taskResource_;
287303
std::atomic<NodeState> nodeState_{NodeState::kActive};
288304
folly::Synchronized<bool> shuttingDown_{false};
@@ -313,6 +329,7 @@ class PrestoServer {
313329
std::string nodeLocation_;
314330
std::string nodePoolType_;
315331
folly::SSLContextPtr sslContext_;
332+
folly::SSLContextPtr thriftSslContext_;
316333
std::string prestoBuiltinFunctionPrefix_;
317334
};
318335

presto-native-execution/presto_cpp/main/common/Configs.cpp

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,13 @@ SystemConfig::SystemConfig() {
134134
std::unordered_map<std::string, folly::Optional<std::string>>{
135135
BOOL_PROP(kMutableConfig, false),
136136
NONE_PROP(kPrestoVersion),
137+
BOOL_PROP(kThriftServerEnabled, true),
138+
NUM_PROP(kThriftServerPort, 9090),
139+
NUM_PROP(kThriftServerMaxConnections, 50000),
140+
NUM_PROP(kThriftServerMaxRequests, 200),
141+
NUM_PROP(kThriftServerIdleTimeout, 120000),
142+
NUM_PROP(kThriftServerTaskExpireTimeMs, 60000),
143+
NUM_PROP(kThriftServerStreamExpireTime, 60000),
137144
NONE_PROP(kHttpServerHttpPort),
138145
BOOL_PROP(kHttpServerReusePort, false),
139146
BOOL_PROP(kHttpServerBindToNodeInternalAddressOnlyEnabled, false),
@@ -279,6 +286,34 @@ SystemConfig* SystemConfig::instance() {
279286
return instance.get();
280287
}
281288

289+
bool SystemConfig::thriftServerEnabled() const {
290+
return optionalProperty<bool>(kThriftServerEnabled).value();
291+
}
292+
293+
int32_t SystemConfig::thriftServerPort() const {
294+
return optionalProperty<int32_t>(kThriftServerPort).value();
295+
}
296+
297+
int32_t SystemConfig::thriftServerMaxConnections() const {
298+
return optionalProperty<int32_t>(kThriftServerMaxConnections).value();
299+
}
300+
301+
int32_t SystemConfig::thriftServerMaxRequests() const {
302+
return optionalProperty<int32_t>(kThriftServerMaxRequests).value();
303+
}
304+
305+
int32_t SystemConfig::thriftServerIdleTimeout() const {
306+
return optionalProperty<int32_t>(kThriftServerIdleTimeout).value();
307+
}
308+
309+
int32_t SystemConfig::thriftServerTaskExpireTimeMs() const {
310+
return optionalProperty<int32_t>(kThriftServerTaskExpireTimeMs).value();
311+
}
312+
313+
int32_t SystemConfig::thriftServerStreamExpireTime() const {
314+
return optionalProperty<int32_t>(kThriftServerStreamExpireTime).value();
315+
}
316+
282317
int SystemConfig::httpServerHttpPort() const {
283318
return requiredProperty<int>(kHttpServerHttpPort);
284319
}

presto-native-execution/presto_cpp/main/common/Configs.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,30 @@ class ConfigBase {
159159
class SystemConfig : public ConfigBase {
160160
public:
161161
static constexpr std::string_view kPrestoVersion{"presto.version"};
162+
163+
/// Thrift server configuration
164+
static constexpr std::string_view kThriftServerEnabled{
165+
"presto.thrift-server.enabled"};
166+
167+
static constexpr std::string_view kThriftServerPort{
168+
"presto.thrift-server.port"};
169+
170+
static constexpr std::string_view kThriftServerMaxConnections{
171+
"presto.thrift-server.max-connections"};
172+
173+
static constexpr std::string_view kThriftServerMaxRequests{
174+
"presto.thrift-server.max-requests"};
175+
176+
static constexpr std::string_view kThriftServerIdleTimeout{
177+
"presto.thrift-server.idle-timeout"};
178+
179+
static constexpr std::string_view kThriftServerTaskExpireTimeMs{
180+
"presto.thrift-server.task-expire-time-ms"};
181+
182+
static constexpr std::string_view kThriftServerStreamExpireTime{
183+
"presto.thrift-server.stream-expire-time"};
184+
185+
/// HTTP server configuration
162186
static constexpr std::string_view kHttpServerHttpPort{
163187
"http-server.http.port"};
164188

@@ -808,6 +832,22 @@ class SystemConfig : public ConfigBase {
808832

809833
static SystemConfig* instance();
810834

835+
// Thrift server configuration
836+
bool thriftServerEnabled() const;
837+
838+
int32_t thriftServerPort() const;
839+
840+
int32_t thriftServerMaxConnections() const;
841+
842+
int32_t thriftServerMaxRequests() const;
843+
844+
int32_t thriftServerIdleTimeout() const;
845+
846+
int32_t thriftServerTaskExpireTimeMs() const;
847+
848+
int32_t thriftServerStreamExpireTime() const;
849+
850+
// HTTP server configuration
811851
int httpServerHttpPort() const;
812852

813853
bool httpServerReusePort() const;

presto-native-execution/presto_cpp/main/common/Utils.cpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,25 @@ DateTime toISOTimestamp(uint64_t timeMilli) {
3232
std::shared_ptr<folly::SSLContext> createSSLContext(
3333
const std::string& clientCertAndKeyPath,
3434
const std::string& ciphers,
35-
bool http2Enabled) {
35+
SSLProtocol protocol) {
3636
try {
3737
auto sslContext = std::make_shared<folly::SSLContext>();
3838
sslContext->loadCertKeyPairFromFiles(
3939
clientCertAndKeyPath.c_str(), clientCertAndKeyPath.c_str());
4040
sslContext->setCiphersOrThrow(ciphers);
41-
if (http2Enabled) {
42-
sslContext->setAdvertisedNextProtocols({"h2", "http/1.1"});
43-
} else {
44-
sslContext->setAdvertisedNextProtocols({"http/1.1"});
41+
switch (protocol) {
42+
case SSLProtocol::THRIFT:
43+
sslContext->setVerificationOption(
44+
folly::SSLContext::SSLVerifyPeerEnum::NO_VERIFY);
45+
// Set ALPN for Rocket protocol
46+
sslContext->setAdvertisedNextProtocols({"rs"});
47+
break;
48+
case SSLProtocol::HTTP_1_1:
49+
sslContext->setAdvertisedNextProtocols({"http/1.1"});
50+
break;
51+
case SSLProtocol::HTTP_2:
52+
sslContext->setAdvertisedNextProtocols({"h2", "http/1.1"});
53+
break;
4554
}
4655
return sslContext;
4756
} catch (const std::exception& ex) {

presto-native-execution/presto_cpp/main/common/Utils.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,16 @@ namespace facebook::presto::util {
2828
using DateTime = std::string;
2929
DateTime toISOTimestamp(uint64_t timeMilli);
3030

31+
enum class SSLProtocol {
32+
THRIFT, // Rocket protocol (rs)
33+
HTTP_1_1, // HTTP/1.1
34+
HTTP_2 // HTTP/2 (h2)
35+
};
36+
3137
std::shared_ptr<folly::SSLContext> createSSLContext(
3238
const std::string& clientCertAndKeyPath,
3339
const std::string& ciphers,
34-
bool http2Enabled);
40+
SSLProtocol protocol);
3541

3642
/// Returns current process-wide CPU time in nanoseconds.
3743
long getProcessCpuTimeNs();

0 commit comments

Comments
 (0)