Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/ray/gcs/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,13 @@ void GcsNodeManager::SetNodeDraining(
<< drain_request->DebugString();
iter->second = drain_request;
}

for (auto &listener : node_draining_listeners_) {
listener.Post("NodeManager.SetNodeDrainingCallback",
node_id,
true,
drain_request->deadline_timestamp_ms());
}
}

std::shared_ptr<const rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
Expand Down
17 changes: 17 additions & 0 deletions src/ray/gcs/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,19 @@ class GcsNodeManager : public rpc::NodeInfoGcsServiceHandler {
node_added_listeners_.emplace_back(std::move(listener), io_context);
}

/// Add listener to monitor when a node is set to draining.
/// The listener receives (node_id, is_draining, draining_deadline_timestamp_ms).
///
/// \param listener The handler which processes the draining state change.
/// \param io_context the context to post the listener function to
void AddNodeDrainingListener(
std::function<void(const NodeID &, bool, int64_t)> listener,
instrumented_io_context &io_context) {
absl::MutexLock lock(&mutex_);
RAY_CHECK(listener);
node_draining_listeners_.emplace_back(std::move(listener), io_context);
}

/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
Expand Down Expand Up @@ -368,6 +381,10 @@ class GcsNodeManager : public rpc::NodeInfoGcsServiceHandler {
std::vector<Postable<void(std::shared_ptr<const rpc::GcsNodeInfo>)>>
node_removed_listeners_ ABSL_GUARDED_BY(mutex_);

/// Listeners which monitors when nodes are set to draining.
std::vector<Postable<void(const NodeID &, bool, int64_t)>> node_draining_listeners_
ABSL_GUARDED_BY(mutex_);

/// A publisher for publishing gcs messages.
pubsub::GcsPublisher *gcs_publisher_;
/// Storage for GCS tables.
Expand Down
10 changes: 9 additions & 1 deletion src/ray/gcs/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ GcsResourceManager::GcsResourceManager(instrumented_io_context &io_context,
cluster_resource_manager_(cluster_resource_manager),
gcs_node_manager_(gcs_node_manager),
local_node_id_(std::move(local_node_id)),
cluster_lease_manager_(cluster_lease_manager) {}
cluster_lease_manager_(cluster_lease_manager) {
gcs_node_manager_.AddNodeDrainingListener(
[this](const NodeID &node_id, bool is_draining, int64_t deadline_timestamp_ms) {
scheduling::NodeID scheduling_node_id(node_id.Binary());
cluster_resource_manager_.SetNodeDraining(
scheduling_node_id, is_draining, deadline_timestamp_ms);
},
io_context_);
}

void GcsResourceManager::ConsumeSyncMessage(
std::shared_ptr<const rpc::syncer::RaySyncMessage> message) {
Expand Down
34 changes: 34 additions & 0 deletions src/ray/gcs/tests/gcs_resource_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,38 @@ TEST_F(GcsResourceManagerTest, TestGetDrainingNodes) {
ASSERT_EQ(reply.draining_nodes(0).node_id(), node1->node_id());
}

// Verify SetNodeDraining() immediately updates ClusterResourceManager.
TEST_F(GcsResourceManagerTest, DrainStateImmediatelyVisibleToScheduler) {
auto node = GenNodeInfo();
node->mutable_resources_total()->insert({"CPU", 10});
gcs_resource_manager_->OnNodeAdd(*node);

auto node_id = NodeID::FromBinary(node->node_id());
auto scheduling_node_id = scheduling::NodeID(node_id.Binary());

UpdateFromResourceViewSync(node_id,
{{"CPU", 10}},
{{"CPU", 10}},
/*idle_ms=*/0,
/*is_draining=*/false);

ASSERT_FALSE(cluster_resource_manager_.IsNodeDraining(scheduling_node_id));

auto drain_request = std::make_shared<rpc::autoscaler::DrainNodeRequest>();
drain_request->set_node_id(node_id.Binary());
drain_request->set_reason(rpc::autoscaler::DRAIN_NODE_REASON_IDLE_TERMINATION);
drain_request->set_reason_message("idle termination");

gcs_node_manager_->AddNode(std::make_shared<rpc::GcsNodeInfo>(*node));
drain_request->set_deadline_timestamp_ms(12345);
gcs_node_manager_->SetNodeDraining(node_id, drain_request);
io_service_.poll();

ASSERT_TRUE(cluster_resource_manager_.IsNodeDraining(scheduling_node_id));
const auto &node_resources =
cluster_resource_manager_.GetNodeResources(scheduling_node_id);
ASSERT_EQ(node_resources.draining_deadline_timestamp_ms,
drain_request->deadline_timestamp_ms());
}

} // namespace ray
14 changes: 14 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,20 @@ bool ClusterResourceManager::RemoveNode(scheduling::NodeID node_id) {
return nodes_.erase(node_id) != 0;
}

bool ClusterResourceManager::SetNodeDraining(const scheduling::NodeID &node_id,
bool is_draining,
int64_t draining_deadline_timestamp_ms) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
return false;
}

auto *local_view = it->second.GetMutableLocalView();
local_view->is_draining = is_draining;
local_view->draining_deadline_timestamp_ms = draining_deadline_timestamp_ms;
return true;
}

bool ClusterResourceManager::GetNodeResources(scheduling::NodeID node_id,
NodeResources *ret_resources) const {
auto it = nodes_.find(node_id);
Expand Down
12 changes: 12 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ class ClusterResourceManager {
return node.GetLocalView().is_draining;
}

/// Set the draining state of a node.
/// This is called when the autoscaler commits to draining a node, ensuring
/// the scheduler immediately sees the node as unavailable for scheduling.
///
/// \param node_id The ID of the node.
/// \param is_draining Whether the node is draining.
/// \param draining_deadline_timestamp_ms The deadline for the drain operation.
/// \return true if the node exists and was updated, false otherwise.
bool SetNodeDraining(const scheduling::NodeID &node_id,
bool is_draining,
int64_t draining_deadline_timestamp_ms);

/// @param max_num_nodes_to_include Max number of nodes to include in the debug string.
/// If not specified, all nodes will be included.
std::string DebugString(
Expand Down