diff --git a/src/ray/gcs/gcs_node_manager.cc b/src/ray/gcs/gcs_node_manager.cc index 302f1d6dcfb5..fcf44aae34ee 100644 --- a/src/ray/gcs/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_node_manager.cc @@ -604,6 +604,10 @@ void GcsNodeManager::SetNodeDraining( << drain_request->DebugString(); iter->second = drain_request; } + + for (auto &listener : node_draining_listeners_) { + listener(node_id, true, drain_request->deadline_timestamp_ms()); + } } std::shared_ptr GcsNodeManager::RemoveNode( diff --git a/src/ray/gcs/gcs_node_manager.h b/src/ray/gcs/gcs_node_manager.h index 7e7dfd5d1643..199910b4a617 100644 --- a/src/ray/gcs/gcs_node_manager.h +++ b/src/ray/gcs/gcs_node_manager.h @@ -195,6 +195,17 @@ class GcsNodeManager : public rpc::NodeInfoGcsServiceHandler { node_added_listeners_.emplace_back(std::move(listener), io_context); } + /// Add listener to monitor when a node is set to draining. + /// The listener receives (node_id, is_draining, draining_deadline_timestamp_ms). + /// + /// \param listener The handler which processes the draining state change. + void AddNodeDrainingListener( + std::function listener) { + absl::MutexLock lock(&mutex_); + RAY_CHECK(listener); + node_draining_listeners_.emplace_back(std::move(listener)); + } + /// Initialize with the gcs tables data synchronously. /// This should be called when GCS server restarts after a failure. /// @@ -368,6 +379,10 @@ class GcsNodeManager : public rpc::NodeInfoGcsServiceHandler { std::vector)>> node_removed_listeners_ ABSL_GUARDED_BY(mutex_); + /// Listeners which monitor when nodes are set to draining. + std::vector> node_draining_listeners_ + ABSL_GUARDED_BY(mutex_); + /// A publisher for publishing gcs messages. pubsub::GcsPublisher *gcs_publisher_; /// Storage for GCS tables. diff --git a/src/ray/gcs/gcs_resource_manager.cc b/src/ray/gcs/gcs_resource_manager.cc index aa5f8decfc7c..47ea1a426530 100644 --- a/src/ray/gcs/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_resource_manager.cc @@ -295,6 +295,13 @@ void GcsResourceManager::OnNodeDead(const NodeID &node_id) { num_alive_nodes_--; } +void GcsResourceManager::SetNodeDraining(const NodeID &node_id, + bool is_draining, + int64_t draining_deadline_timestamp_ms) { + cluster_resource_manager_.SetNodeDraining( + scheduling::NodeID(node_id.Binary()), is_draining, draining_deadline_timestamp_ms); +} + void GcsResourceManager::UpdatePlacementGroupLoad( const std::shared_ptr placement_group_load) { RAY_CHECK(placement_group_load != nullptr); diff --git a/src/ray/gcs/gcs_resource_manager.h b/src/ray/gcs/gcs_resource_manager.h index 7ff87766d04d..e7972a4d8988 100644 --- a/src/ray/gcs/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_resource_manager.h @@ -104,6 +104,15 @@ class GcsResourceManager : public rpc::NodeResourceInfoGcsServiceHandler, /// \param node_id The specified node id. void OnNodeDead(const NodeID &node_id); + /// Set the draining state of a node. + /// + /// \param node_id The ID of the node. + /// \param is_draining Whether the node is draining. + /// \param draining_deadline_timestamp_ms The deadline for the drain operation. + void SetNodeDraining(const NodeID &node_id, + bool is_draining, + int64_t draining_deadline_timestamp_ms); + /// Initialize with the gcs tables data synchronously. /// This should be called when GCS server restarts after a failure. /// diff --git a/src/ray/gcs/gcs_server.cc b/src/ray/gcs/gcs_server.cc index 3326e656c935..3378c81505ad 100644 --- a/src/ray/gcs/gcs_server.cc +++ b/src/ray/gcs/gcs_server.cc @@ -886,6 +886,11 @@ void GcsServer::InstallEventListeners() { gcs_autoscaler_state_manager_->OnNodeDead(node_id); }, io_context_provider_.GetDefaultIOContext()); + gcs_node_manager_->AddNodeDrainingListener( + [this](const NodeID &node_id, bool is_draining, int64_t deadline_timestamp_ms) { + gcs_resource_manager_->SetNodeDraining( + node_id, is_draining, deadline_timestamp_ms); + }); // Install worker event listener. gcs_worker_manager_->AddWorkerDeadListener( diff --git a/src/ray/gcs/tests/gcs_resource_manager_test.cc b/src/ray/gcs/tests/gcs_resource_manager_test.cc index f67530006b42..dd85d72335d1 100644 --- a/src/ray/gcs/tests/gcs_resource_manager_test.cc +++ b/src/ray/gcs/tests/gcs_resource_manager_test.cc @@ -227,4 +227,43 @@ TEST_F(GcsResourceManagerTest, TestGetDrainingNodes) { ASSERT_EQ(reply.draining_nodes(0).node_id(), node1->node_id()); } +// Verify SetNodeDraining() immediately updates ClusterResourceManager. +TEST_F(GcsResourceManagerTest, DrainStateImmediatelyVisibleToScheduler) { + auto node = GenNodeInfo(); + node->mutable_resources_total()->insert({"CPU", 10}); + gcs_resource_manager_->OnNodeAdd(*node); + + auto node_id = NodeID::FromBinary(node->node_id()); + auto scheduling_node_id = scheduling::NodeID(node_id.Binary()); + + UpdateFromResourceViewSync(node_id, + {{"CPU", 10}}, + {{"CPU", 10}}, + /*idle_ms=*/0, + /*is_draining=*/false); + + ASSERT_FALSE(cluster_resource_manager_.IsNodeDraining(scheduling_node_id)); + + gcs_node_manager_->AddNodeDrainingListener( + [this](const NodeID &node_id, bool is_draining, int64_t deadline_timestamp_ms) { + cluster_resource_manager_.SetNodeDraining( + scheduling::NodeID(node_id.Binary()), is_draining, deadline_timestamp_ms); + }); + + auto drain_request = std::make_shared(); + drain_request->set_node_id(node_id.Binary()); + drain_request->set_reason(rpc::autoscaler::DRAIN_NODE_REASON_IDLE_TERMINATION); + drain_request->set_reason_message("idle termination"); + + gcs_node_manager_->AddNode(std::make_shared(*node)); + drain_request->set_deadline_timestamp_ms(12345); + gcs_node_manager_->SetNodeDraining(node_id, drain_request); + + ASSERT_TRUE(cluster_resource_manager_.IsNodeDraining(scheduling_node_id)); + const auto &node_resources = + cluster_resource_manager_.GetNodeResources(scheduling_node_id); + ASSERT_EQ(node_resources.draining_deadline_timestamp_ms, + drain_request->deadline_timestamp_ms()); +} + } // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.cc b/src/ray/raylet/scheduling/cluster_resource_manager.cc index 06bfafb994c6..3a03c5d8c961 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.cc +++ b/src/ray/raylet/scheduling/cluster_resource_manager.cc @@ -101,9 +101,11 @@ bool ClusterResourceManager::UpdateNode( // Last update time to the local node resources view. local_view.last_resource_update_time = absl::Now(); - local_view.is_draining = resource_view_sync_message.is_draining(); - local_view.draining_deadline_timestamp_ms = - resource_view_sync_message.draining_deadline_timestamp_ms(); + if (!local_view.is_draining) { + local_view.is_draining = resource_view_sync_message.is_draining(); + local_view.draining_deadline_timestamp_ms = + resource_view_sync_message.draining_deadline_timestamp_ms(); + } AddOrUpdateNode(node_id, local_view); received_node_resources_[node_id] = std::move(local_view); @@ -115,6 +117,26 @@ bool ClusterResourceManager::RemoveNode(scheduling::NodeID node_id) { return nodes_.erase(node_id) != 0; } +bool ClusterResourceManager::SetNodeDraining(const scheduling::NodeID &node_id, + bool is_draining, + int64_t draining_deadline_timestamp_ms) { + auto it = nodes_.find(node_id); + if (it == nodes_.end()) { + return false; + } + + auto *local_view = it->second.GetMutableLocalView(); + local_view->is_draining = is_draining; + local_view->draining_deadline_timestamp_ms = draining_deadline_timestamp_ms; + auto rnr_it = received_node_resources_.find(node_id); + if (rnr_it != received_node_resources_.end()) { + rnr_it->second.is_draining = is_draining; + rnr_it->second.draining_deadline_timestamp_ms = draining_deadline_timestamp_ms; + } + + return true; +} + bool ClusterResourceManager::GetNodeResources(scheduling::NodeID node_id, NodeResources *ret_resources) const { auto it = nodes_.find(node_id); diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.h b/src/ray/raylet/scheduling/cluster_resource_manager.h index 533c9194ef13..ab9e28c88ce5 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.h +++ b/src/ray/raylet/scheduling/cluster_resource_manager.h @@ -132,6 +132,18 @@ class ClusterResourceManager { return node.GetLocalView().is_draining; } + /// Set the draining state of a node. + /// This is called when the autoscaler commits to draining a node, ensuring + /// the scheduler immediately sees the node as unavailable for scheduling. + /// + /// \param node_id The ID of the node. + /// \param is_draining Whether the node is draining. + /// \param draining_deadline_timestamp_ms The deadline for the drain operation. + /// \return true if the node exists and was updated, false otherwise. + bool SetNodeDraining(const scheduling::NodeID &node_id, + bool is_draining, + int64_t draining_deadline_timestamp_ms); + /// @param max_num_nodes_to_include Max number of nodes to include in the debug string. /// If not specified, all nodes will be included. std::string DebugString(