Skip to content

Commit

Permalink
[core] Remove unused AsyncDrainNode (#51043)
Browse files Browse the repository at this point in the history
AsyncDrainNode is unused

---------

Signed-off-by: dayshah <[email protected]>
  • Loading branch information
dayshah authored Mar 9, 2025
1 parent e7268f2 commit 2f8684f
Show file tree
Hide file tree
Showing 5 changed files with 1 addition and 99 deletions.
4 changes: 0 additions & 4 deletions src/mock/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,6 @@ class MockNodeInfoAccessor : public NodeInfoAccessor {
AsyncRegister,
(const rpc::GcsNodeInfo &node_info, const StatusCallback &callback),
(override));
MOCK_METHOD(Status,
AsyncDrainNode,
(const NodeID &node_id, const StatusCallback &callback),
(override));
MOCK_METHOD(Status,
AsyncCheckSelfAlive,
(const std::function<void(Status, bool)> &callback, int64_t timeout_ms),
Expand Down
17 changes: 0 additions & 17 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -595,23 +595,6 @@ Status NodeInfoAccessor::AsyncCheckAlive(const std::vector<std::string> &raylet_
return Status::OK();
}

Status NodeInfoAccessor::AsyncDrainNode(const NodeID &node_id,
const StatusCallback &callback) {
RAY_LOG(DEBUG).WithField(node_id) << "Draining node";
rpc::DrainNodeRequest request;
auto draining_request = request.add_drain_node_data();
draining_request->set_node_id(node_id.Binary());
client_impl_->GetGcsRpcClient().DrainNode(
request, [node_id, callback](const Status &status, rpc::DrainNodeReply &&reply) {
if (callback) {
callback(status);
}
RAY_LOG(DEBUG).WithField(node_id)
<< "Finished draining node, status = " << status;
});
return Status::OK();
}

Status NodeInfoAccessor::DrainNodes(const std::vector<NodeID> &node_ids,
int64_t timeout_ms,
std::vector<std::string> &drained_node_ids) {
Expand Down
10 changes: 0 additions & 10 deletions src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,16 +366,6 @@ class NodeInfoAccessor {
int64_t timeout_ms,
const MultiItemCallback<bool> &callback);

/// Drain (remove the information of the node from the cluster) the local node from GCS
/// asynchronously.
///
/// Check gcs_service.proto NodeInfoGcsService.DrainNode for the API spec.
///
/// \param node_id The ID of node that to be unregistered.
/// \param callback Callback that will be called when unregistration is complete.
/// \return Status
virtual Status AsyncDrainNode(const NodeID &node_id, const StatusCallback &callback);

/// Get information of all nodes from GCS asynchronously.
///
/// \param callback Callback that will be called after lookup finishes.
Expand Down
61 changes: 1 addition & 60 deletions src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,6 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
return nodes;
}

bool DrainNode(const NodeID &node_id) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Nodes().AsyncDrainNode(
node_id, [&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
}

std::vector<rpc::AvailableResources> GetAllAvailableResources() {
std::promise<bool> promise;
std::vector<rpc::AvailableResources> resources;
Expand Down Expand Up @@ -413,18 +406,6 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
ASSERT_TRUE(actor.state() == expected_state);
}

absl::flat_hash_set<NodeID> RegisterNodeAndMarkDead(int node_count) {
absl::flat_hash_set<NodeID> node_ids;
for (int index = 0; index < node_count; ++index) {
auto node_info = Mocker::GenNodeInfo();
auto node_id = NodeID::FromBinary(node_info->node_id());
EXPECT_TRUE(RegisterNode(*node_info));
EXPECT_TRUE(DrainNode(node_id));
node_ids.insert(node_id);
}
return node_ids;
}

// Test parameter, whether to use GCS without redis.
const bool no_redis_;

Expand Down Expand Up @@ -590,20 +571,8 @@ TEST_P(GcsClientTest, TestNodeInfo) {
std::vector<rpc::GcsNodeInfo> node_list = GetNodeInfoList();
EXPECT_EQ(node_list.size(), 2);
ASSERT_TRUE(gcs_client_->Nodes().Get(node1_id));
ASSERT_TRUE(gcs_client_->Nodes().Get(node2_id));
EXPECT_EQ(gcs_client_->Nodes().GetAll().size(), 2);

// Cancel registration of both nodes to GCS.
ASSERT_TRUE(DrainNode(node1_id));
ASSERT_TRUE(DrainNode(node2_id));
WaitForExpectedCount(unregister_count, 2);

// Get information of all nodes from GCS.
node_list = GetNodeInfoList();
EXPECT_EQ(node_list.size(), 2);
EXPECT_EQ(node_list[0].state(), rpc::GcsNodeInfo::DEAD);
EXPECT_EQ(node_list[1].state(), rpc::GcsNodeInfo::DEAD);
ASSERT_TRUE(gcs_client_->Nodes().IsRemoved(node1_id));
ASSERT_TRUE(gcs_client_->Nodes().IsRemoved(node2_id));
}

TEST_P(GcsClientTest, TestUnregisterNode) {
Expand Down Expand Up @@ -994,32 +963,6 @@ TEST_P(GcsClientTest, TestGcsAuth) {
EXPECT_TRUE(RegisterNode(*node_info));
}

TEST_P(GcsClientTest, TestEvictExpiredDeadNodes) {
RayConfig::instance().initialize(R"({"enable_cluster_auth": true})");
// Restart GCS.
RestartGcsServer();
if (RayConfig::instance().gcs_storage() == gcs::GcsServer::kInMemoryStorage) {
ReconnectClient();
}

// Simulate the scenario of node dead.
int node_count = RayConfig::instance().maximum_gcs_dead_node_cached_count();

const auto &node_ids = RegisterNodeAndMarkDead(node_count);

// Get all nodes.
auto condition = [this]() {
return GetNodeInfoList().size() ==
RayConfig::instance().maximum_gcs_dead_node_cached_count();
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));

auto nodes = GetNodeInfoList();
for (const auto &node : nodes) {
EXPECT_TRUE(node_ids.contains(NodeID::FromBinary(node.node_id())));
}
}

TEST_P(GcsClientTest, TestRegisterHeadNode) {
// Test at most only one head node is alive in GCS server
auto head_node_info = Mocker::GenNodeInfo(1);
Expand Down Expand Up @@ -1081,8 +1024,6 @@ TEST_P(GcsClientTest, TestInternalKVDelByPrefix) {
ASSERT_EQ(value, "test_value3");
}

// TODO(sang): Add tests after adding asyncAdd

} // namespace ray

int main(int argc, char **argv) {
Expand Down
8 changes: 0 additions & 8 deletions src/ray/gcs/gcs_server/test/gcs_server_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,14 +467,6 @@ struct GcsServerMocker {
return Status::NotImplemented("");
}

Status AsyncDrainNode(const NodeID &node_id,
const gcs::StatusCallback &callback) override {
if (callback) {
callback(Status::OK());
}
return Status::OK();
}

Status AsyncGetAll(const gcs::MultiItemCallback<rpc::GcsNodeInfo> &callback,
int64_t timeout_ms,
std::optional<NodeID> node_id = std::nullopt) override {
Expand Down

0 comments on commit 2f8684f

Please sign in to comment.