-
Notifications
You must be signed in to change notification settings - Fork 5.5k
[Prototype] feat(native): Add thrift server for exchange service #26477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Reviewer's GuideThis PR integrates a Thrift-based RPC server into the native execution layer for the exchange service by extending configuration support, embedding startup/shutdown hooks in PrestoServer, introducing standalone Thrift server and service handler classes, updating the Thrift IDL, and adjusting the build system accordingly. Sequence diagram for Thrift server startup and shutdown in PrestoServersequenceDiagram
participant PrestoServer
participant ThriftServer
participant ThriftConfig
participant PrestoThriftServiceHandler
PrestoServer->>ThriftConfig: Create ThriftConfig
PrestoServer->>ThriftServer: Create ThriftServer with config, pool, planValidator, taskManager
PrestoServer->>ThriftServer: startThriftServer()
ThriftServer->>PrestoThriftServiceHandler: setInterface(handler)
ThriftServer->>ThriftServer: serve()
Note over ThriftServer: Thrift server is running
PrestoServer->>ThriftServer: shutdownThriftServer()
ThriftServer->>ThriftServer: stop()
Note over ThriftServer: Thrift server stopped
Sequence diagram for getTaskResults Thrift RPC callsequenceDiagram
actor Client
participant ThriftServer
participant PrestoThriftServiceHandler
participant TaskManager
Client->>ThriftServer: getTaskResults(taskId, bufferId, token, ...)
ThriftServer->>PrestoThriftServiceHandler: future_getTaskResults(...)
PrestoThriftServiceHandler->>TaskManager: getResults(...)
TaskManager-->>PrestoThriftServiceHandler: Result
PrestoThriftServiceHandler-->>ThriftServer: TaskResult
ThriftServer-->>Client: TaskResult
Class diagram for new and updated Thrift server classesclassDiagram
class PrestoServer {
+void startThriftServer(...)
+void shutdownThriftServer()
-unique_ptr<ThriftServer> thriftServer_
-Future<Unit> thriftServerFuture_
-shared_ptr<TaskManager> taskManager_
-shared_ptr<VeloxPlanValidator> planValidator_
}
class ThriftConfig {
+getAddress() SocketAddress
+getCertPath() string
+getKeyPath() string
+getSupportedCiphers() string
+getTaskExpireTimeMs() int
+getStreamExpireTimeMs() int
+getMaxRequest() int
+getMaxConnections() int
+getIdleTimeout() int
-address_ SocketAddress
-certPath_ string
-keyPath_ string
-ciphers_ string
-supportedCiphers_ string
-taskExpireTimeMs_ int
-streamExpireTimeMs_ int
-maxRequest_ int
-maxConnections_ int
-idleTimeout_ int
}
class ThriftServer {
+ThriftServer(...)
+void start()
+void stop()
+address() SocketAddress
-unique_ptr<ThriftConfig> config_
-shared_ptr<IOThreadPoolExecutor> ioExecutor_
-unique_ptr<ThriftServer> server_
-shared_ptr<PrestoThriftServiceHandler> handler_
}
class PrestoThriftServiceHandler {
+future_getTaskResults(...)
+future_acknowledgeTaskResults(...)
+future_abortTaskResults(...)
-shared_ptr<MemoryPool> pool_
-shared_ptr<VeloxPlanValidator> planValidator_
-shared_ptr<TaskManager> taskManager_
}
PrestoServer --> ThriftServer
ThriftServer --> ThriftConfig
ThriftServer --> PrestoThriftServiceHandler
PrestoThriftServiceHandler --> TaskManager
PrestoThriftServiceHandler --> VeloxPlanValidator
PrestoThriftServiceHandler --> MemoryPool
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Consider extracting the repeated thrift-server config property keys (e.g. "presto.thrift-server.port") into constexpr constants to avoid duplication and typos.
- Centralize the default thrift-server values (ports, timeouts, etc.) in your Config class rather than duplicating them in both production code and tests so they remain in sync.
- Double-check the change from unique_ptr to shared_ptr for planValidator and taskManager to ensure their lifecycles and thread-safety guarantees remain correct.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider extracting the repeated thrift-server config property keys (e.g. "presto.thrift-server.port") into constexpr constants to avoid duplication and typos.
- Centralize the default thrift-server values (ports, timeouts, etc.) in your Config class rather than duplicating them in both production code and tests so they remain in sync.
- Double-check the change from unique_ptr to shared_ptr for planValidator and taskManager to ensure their lifecycles and thread-safety guarantees remain correct.
## Individual Comments
### Comment 1
<location> `presto-native-execution/presto_cpp/main/thrift/server/ThriftServer.h:85-86` </location>
<code_context>
+ const folly::SocketAddress address_;
+ const std::string certPath_;
+ const std::string keyPath_;
+ const std::string ciphers_;
+ const std::string supportedCiphers_;
+
+ int taskExpireTimeMs_;
</code_context>
<issue_to_address>
**suggestion:** Both 'ciphers_' and 'supportedCiphers_' are present, but only 'supportedCiphers_' is used.
Consider removing 'ciphers_' if it is unused to improve code clarity.
Suggested implementation:
```c
const std::string supportedCiphers_;
```
If there are any constructors, initializers, or other code in this file (or related .cpp files) that reference or initialize `ciphers_`, those should also be removed or updated accordingly.
</issue_to_address>
### Comment 2
<location> `presto-native-execution/presto_cpp/main/thrift/server/ThriftServer.cpp:91-97` </location>
<code_context>
+ std::chrono::milliseconds(config_->getStreamExpireTimeMs()));
+
+ // Configure SSL if cert path is provided
+ if (!config_->getCertPath().empty() && !config_->getKeyPath().empty()) {
+ wangle::SSLContextConfig sslCfg;
+ sslCfg.isDefault = true;
</code_context>
<issue_to_address>
**suggestion:** SSL configuration does not handle errors from setCertificate.
Wrap sslCfg.setCertificate in a try-catch block or validate certificate and key files beforehand to prevent startup failures due to invalid files.
```suggestion
sslCfg.isDefault = true;
sslCfg.clientVerification =
folly::SSLContext::VerifyClientCertificate::DO_NOT_REQUEST;
try {
sslCfg.setCertificate(config_->getCertPath(), config_->getKeyPath(), "");
} catch (const std::exception& ex) {
LOG(ERROR) << "Failed to set SSL certificate or key: " << ex.what();
throw; // or handle error as appropriate for your application
}
sslCfg.sslCiphers = config_->getSupportedCiphers();
sslCfg.setNextProtocols({"rs"});
server_->setSSLConfig(std::make_shared<wangle::SSLContextConfig>(sslCfg));
```
</issue_to_address>
### Comment 3
<location> `presto-native-execution/presto_cpp/main/common/tests/ConfigTest.cpp:194` </location>
<code_context>
ASSERT_EQ(config.discoveryUri(), "my uri");
}
+TEST_F(ConfigTest, thriftServerConfigs) {
+ SystemConfig config;
+
</code_context>
<issue_to_address>
**suggestion (testing):** Missing negative and invalid value tests for thrift server configs.
Please add tests for invalid or out-of-range thrift server config values, such as negative ports, zero/negative timeouts, and non-boolean strings for boolean options, to verify proper error handling and default behavior.
</issue_to_address>
### Comment 4
<location> `presto-native-execution/presto_cpp/main/common/tests/ConfigTest.cpp:197-198` </location>
<code_context>
+TEST_F(ConfigTest, thriftServerConfigs) {
+ SystemConfig config;
+
+ // Test default values (when no thrift server configs are provided)
+ init(config, {});
+
</code_context>
<issue_to_address>
**suggestion (testing):** No assertion for default values of thrift server configs.
Please add assertions to verify that each thrift server config option has the correct default value when no config is provided.
```suggestion
// Test default values (when no thrift server configs are provided)
init(config, {});
// Assert default values for thrift server configs
ASSERT_EQ(
config.optionalProperty<bool>(std::string_view("presto.thrift-server.enabled")).value_or(false),
false);
ASSERT_EQ(
config.optionalProperty<int32_t>(std::string_view("presto.thrift-server.port")).value_or(7777),
7777);
ASSERT_EQ(
config.optionalProperty<int32_t>(std::string_view("presto.thrift-server.max-worker-threads")).value_or(32),
32);
ASSERT_EQ(
config.optionalProperty<int32_t>(std::string_view("presto.thrift-server.max-task-threads")).value_or(128),
128);
ASSERT_EQ(
config.optionalProperty<int32_t>(std::string_view("presto.thrift-server.max-buffer-size")).value_or(1048576),
1048576);
```
</issue_to_address>
### Comment 5
<location> `presto-native-execution/presto_cpp/main/common/tests/ConfigTest.cpp:200-204` </location>
<code_context>
+ // Test default values (when no thrift server configs are provided)
+ init(config, {});
+
+ // Test with thrift server enabled
+ init(config, {{"presto.thrift-server.enabled", "true"}});
+ ASSERT_EQ(
+ config.optionalProperty<bool>(std::string_view("presto.thrift-server.enabled")).value_or(false),
+ true);
+
</code_context>
<issue_to_address>
**suggestion (testing):** No test for thrift server explicitly disabled.
Add a test with 'presto.thrift-server.enabled' set to 'false' and verify the config reflects the disabled state.
```suggestion
// Test with thrift server enabled
init(config, {{"presto.thrift-server.enabled", "true"}});
ASSERT_EQ(
config.optionalProperty<bool>(std::string_view("presto.thrift-server.enabled")).value_or(false),
true);
// Test with thrift server explicitly disabled
init(config, {{"presto.thrift-server.enabled", "false"}});
ASSERT_EQ(
config.optionalProperty<bool>(std::string_view("presto.thrift-server.enabled")).value_or(true),
false);
```
</issue_to_address>
### Comment 6
<location> `presto-native-execution/presto_cpp/main/thrift/server/PrestoThriftServiceHandler.cpp:24` </location>
<code_context>
+namespace facebook::presto::thrift {
+
+folly::Future<std::unique_ptr<::facebook::presto::thrift::TaskResult>>
+PrestoThriftServiceHandler::future_getTaskResults(
+ std::unique_ptr<std::string> taskId,
+ int64_t bufferId,
</code_context>
<issue_to_address>
**issue (review_instructions):** Function 'future_getTaskResults' does not use camelCase as required for functions.
The function 'future_getTaskResults' uses snake_case instead of camelCase. Please rename it to 'futureGetTaskResults' to comply with the naming convention for functions.
<details>
<summary>Review instructions:</summary>
**Path patterns:** `presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp`
**Instructions:**
Use camelCase for functions, member and local variables, and non-type template parameters.
</details>
</issue_to_address>
### Comment 7
<location> `presto-native-execution/presto_cpp/main/thrift/server/PrestoThriftServiceHandler.cpp:66` </location>
<code_context>
+}
+
+folly::Future<folly::Unit>
+PrestoThriftServiceHandler::future_acknowledgeTaskResults(
+ std::unique_ptr<std::string> taskId,
+ int64_t bufferId,
</code_context>
<issue_to_address>
**issue (review_instructions):** Function 'future_acknowledgeTaskResults' does not use camelCase as required for functions.
The function 'future_acknowledgeTaskResults' uses snake_case instead of camelCase. Please rename it to 'futureAcknowledgeTaskResults' to comply with the naming convention for functions.
<details>
<summary>Review instructions:</summary>
**Path patterns:** `presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp`
**Instructions:**
Use camelCase for functions, member and local variables, and non-type template parameters.
</details>
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| const std::string ciphers_; | ||
| const std::string supportedCiphers_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Both 'ciphers_' and 'supportedCiphers_' are present, but only 'supportedCiphers_' is used.
Consider removing 'ciphers_' if it is unused to improve code clarity.
Suggested implementation:
const std::string supportedCiphers_;If there are any constructors, initializers, or other code in this file (or related .cpp files) that reference or initialize ciphers_, those should also be removed or updated accordingly.
| sslCfg.isDefault = true; | ||
| sslCfg.clientVerification = | ||
| folly::SSLContext::VerifyClientCertificate::DO_NOT_REQUEST; | ||
| sslCfg.setCertificate(config_->getCertPath(), config_->getKeyPath(), ""); | ||
| sslCfg.sslCiphers = config_->getSupportedCiphers(); | ||
| sslCfg.setNextProtocols({"rs"}); | ||
| server_->setSSLConfig(std::make_shared<wangle::SSLContextConfig>(sslCfg)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: SSL configuration does not handle errors from setCertificate.
Wrap sslCfg.setCertificate in a try-catch block or validate certificate and key files beforehand to prevent startup failures due to invalid files.
| sslCfg.isDefault = true; | |
| sslCfg.clientVerification = | |
| folly::SSLContext::VerifyClientCertificate::DO_NOT_REQUEST; | |
| sslCfg.setCertificate(config_->getCertPath(), config_->getKeyPath(), ""); | |
| sslCfg.sslCiphers = config_->getSupportedCiphers(); | |
| sslCfg.setNextProtocols({"rs"}); | |
| server_->setSSLConfig(std::make_shared<wangle::SSLContextConfig>(sslCfg)); | |
| sslCfg.isDefault = true; | |
| sslCfg.clientVerification = | |
| folly::SSLContext::VerifyClientCertificate::DO_NOT_REQUEST; | |
| try { | |
| sslCfg.setCertificate(config_->getCertPath(), config_->getKeyPath(), ""); | |
| } catch (const std::exception& ex) { | |
| LOG(ERROR) << "Failed to set SSL certificate or key: " << ex.what(); | |
| throw; // or handle error as appropriate for your application | |
| } | |
| sslCfg.sslCiphers = config_->getSupportedCiphers(); | |
| sslCfg.setNextProtocols({"rs"}); | |
| server_->setSSLConfig(std::make_shared<wangle::SSLContextConfig>(sslCfg)); |
| // Test with thrift server enabled | ||
| init(config, {{"presto.thrift-server.enabled", "true"}}); | ||
| ASSERT_EQ( | ||
| config.optionalProperty<bool>(std::string_view("presto.thrift-server.enabled")).value_or(false), | ||
| true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): No test for thrift server explicitly disabled.
Add a test with 'presto.thrift-server.enabled' set to 'false' and verify the config reflects the disabled state.
| // Test with thrift server enabled | |
| init(config, {{"presto.thrift-server.enabled", "true"}}); | |
| ASSERT_EQ( | |
| config.optionalProperty<bool>(std::string_view("presto.thrift-server.enabled")).value_or(false), | |
| true); | |
| // Test with thrift server enabled | |
| init(config, {{"presto.thrift-server.enabled", "true"}}); | |
| ASSERT_EQ( | |
| config.optionalProperty<bool>(std::string_view("presto.thrift-server.enabled")).value_or(false), | |
| true); | |
| // Test with thrift server explicitly disabled | |
| init(config, {{"presto.thrift-server.enabled", "false"}}); | |
| ASSERT_EQ( | |
| config.optionalProperty<bool>(std::string_view("presto.thrift-server.enabled")).value_or(true), | |
| false); |
| namespace facebook::presto::thrift { | ||
|
|
||
| folly::Future<std::unique_ptr<::facebook::presto::thrift::TaskResult>> | ||
| PrestoThriftServiceHandler::future_getTaskResults( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (review_instructions): Function 'future_getTaskResults' does not use camelCase as required for functions.
The function 'future_getTaskResults' uses snake_case instead of camelCase. Please rename it to 'futureGetTaskResults' to comply with the naming convention for functions.
Review instructions:
Path patterns: presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp
Instructions:
Use camelCase for functions, member and local variables, and non-type template parameters.
| } | ||
|
|
||
| folly::Future<folly::Unit> | ||
| PrestoThriftServiceHandler::future_acknowledgeTaskResults( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (review_instructions): Function 'future_acknowledgeTaskResults' does not use camelCase as required for functions.
The function 'future_acknowledgeTaskResults' uses snake_case instead of camelCase. Please rename it to 'futureAcknowledgeTaskResults' to comply with the naming convention for functions.
Review instructions:
Path patterns: presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp
Instructions:
Use camelCase for functions, member and local variables, and non-type template parameters.
|
If |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kewang1024 : Do you have an RFC for these changes ? I remember you mentioning in the working group last time that you could share a writeup from your internal design doc for it. Would really help us understand and set expectations for this work.
Yes, I'm still working on publishing the design doc, this is a quick prototype code so that open source community can also try out, let me reflect it in the title as well What is RFC? is it something like this? prestodb/rfcs#29, and it has to be agreed upon and merged? |
@kewang1024 : Yes. RFCs are available at https://github.com/prestodb/rfcs |
Uh oh!
There was an error while loading. Please reload this page.