Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/ray/gcs/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,10 @@ void GcsNodeManager::SetNodeDraining(
<< drain_request->DebugString();
iter->second = drain_request;
}

for (auto &listener : node_draining_listeners_) {
listener(node_id, true, drain_request->deadline_timestamp_ms());
}
}

std::shared_ptr<const rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
Expand Down
17 changes: 17 additions & 0 deletions src/ray/gcs/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,17 @@ class GcsNodeManager : public rpc::NodeInfoGcsServiceHandler {
node_added_listeners_.emplace_back(std::move(listener), io_context);
}

/// Add listener to monitor when a node is set to draining.
/// The listener receives (node_id, is_draining, draining_deadline_timestamp_ms).
///
/// \param listener The handler which processes the draining state change.
void AddNodeDrainingListener(
std::function<void(const NodeID &, bool, int64_t)> listener) {
absl::MutexLock lock(&mutex_);
RAY_CHECK(listener);
node_draining_listeners_.emplace_back(std::move(listener));
}

/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
Expand Down Expand Up @@ -368,6 +379,12 @@ class GcsNodeManager : public rpc::NodeInfoGcsServiceHandler {
std::vector<Postable<void(std::shared_ptr<const rpc::GcsNodeInfo>)>>
node_removed_listeners_ ABSL_GUARDED_BY(mutex_);

/// Listeners which monitors when nodes are set to draining.
/// Uses std::function (not Postable) for synchronous invocation to ensure
/// scheduler sees draining state before HandleDrainNode returns.
std::vector<std::function<void(const NodeID &, bool, int64_t)>> node_draining_listeners_
ABSL_GUARDED_BY(mutex_);

/// A publisher for publishing gcs messages.
pubsub::GcsPublisher *gcs_publisher_;
/// Storage for GCS tables.
Expand Down
7 changes: 7 additions & 0 deletions src/ray/gcs/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ void GcsResourceManager::OnNodeDead(const NodeID &node_id) {
num_alive_nodes_--;
}

void GcsResourceManager::SetNodeDraining(const NodeID &node_id,
bool is_draining,
int64_t draining_deadline_timestamp_ms) {
cluster_resource_manager_.SetNodeDraining(
scheduling::NodeID(node_id.Binary()), is_draining, draining_deadline_timestamp_ms);
}

void GcsResourceManager::UpdatePlacementGroupLoad(
const std::shared_ptr<rpc::PlacementGroupLoad> placement_group_load) {
RAY_CHECK(placement_group_load != nullptr);
Expand Down
9 changes: 9 additions & 0 deletions src/ray/gcs/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ class GcsResourceManager : public rpc::NodeResourceInfoGcsServiceHandler,
/// \param node_id The specified node id.
void OnNodeDead(const NodeID &node_id);

/// Set the draining state of a node.
///
/// \param node_id The ID of the node.
/// \param is_draining Whether the node is draining.
/// \param draining_deadline_timestamp_ms The deadline for the drain operation.
void SetNodeDraining(const NodeID &node_id,
bool is_draining,
int64_t draining_deadline_timestamp_ms);

/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,11 @@ void GcsServer::InstallEventListeners() {
gcs_autoscaler_state_manager_->OnNodeDead(node_id);
},
io_context_provider_.GetDefaultIOContext());
gcs_node_manager_->AddNodeDrainingListener(
[this](const NodeID &node_id, bool is_draining, int64_t deadline_timestamp_ms) {
gcs_resource_manager_->SetNodeDraining(
node_id, is_draining, deadline_timestamp_ms);
});

// Install worker event listener.
gcs_worker_manager_->AddWorkerDeadListener(
Expand Down
39 changes: 39 additions & 0 deletions src/ray/gcs/tests/gcs_resource_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,43 @@ TEST_F(GcsResourceManagerTest, TestGetDrainingNodes) {
ASSERT_EQ(reply.draining_nodes(0).node_id(), node1->node_id());
}

// Verify SetNodeDraining() immediately updates ClusterResourceManager.
TEST_F(GcsResourceManagerTest, DrainStateImmediatelyVisibleToScheduler) {
auto node = GenNodeInfo();
node->mutable_resources_total()->insert({"CPU", 10});
gcs_resource_manager_->OnNodeAdd(*node);

auto node_id = NodeID::FromBinary(node->node_id());
auto scheduling_node_id = scheduling::NodeID(node_id.Binary());

UpdateFromResourceViewSync(node_id,
{{"CPU", 10}},
{{"CPU", 10}},
/*idle_ms=*/0,
/*is_draining=*/false);

ASSERT_FALSE(cluster_resource_manager_.IsNodeDraining(scheduling_node_id));

gcs_node_manager_->AddNodeDrainingListener(
[this](const NodeID &node_id, bool is_draining, int64_t deadline_timestamp_ms) {
cluster_resource_manager_.SetNodeDraining(
scheduling::NodeID(node_id.Binary()), is_draining, deadline_timestamp_ms);
});

auto drain_request = std::make_shared<rpc::autoscaler::DrainNodeRequest>();
drain_request->set_node_id(node_id.Binary());
drain_request->set_reason(rpc::autoscaler::DRAIN_NODE_REASON_IDLE_TERMINATION);
drain_request->set_reason_message("idle termination");

gcs_node_manager_->AddNode(std::make_shared<rpc::GcsNodeInfo>(*node));
drain_request->set_deadline_timestamp_ms(12345);
gcs_node_manager_->SetNodeDraining(node_id, drain_request);

ASSERT_TRUE(cluster_resource_manager_.IsNodeDraining(scheduling_node_id));
const auto &node_resources =
cluster_resource_manager_.GetNodeResources(scheduling_node_id);
ASSERT_EQ(node_resources.draining_deadline_timestamp_ms,
drain_request->deadline_timestamp_ms());
}

} // namespace ray
28 changes: 25 additions & 3 deletions src/ray/raylet/scheduling/cluster_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ bool ClusterResourceManager::UpdateNode(
// Last update time to the local node resources view.
local_view.last_resource_update_time = absl::Now();

local_view.is_draining = resource_view_sync_message.is_draining();
local_view.draining_deadline_timestamp_ms =
resource_view_sync_message.draining_deadline_timestamp_ms();
if (!local_view.is_draining) {
local_view.is_draining = resource_view_sync_message.is_draining();
local_view.draining_deadline_timestamp_ms =
resource_view_sync_message.draining_deadline_timestamp_ms();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a tangent, but why does class care about the draining_deadline anyway? I'm wondering if we're bloating protocol somehow with this...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduler only uses is_draining for scheduling decisions. The deadline is stored but only used for reporting in GcsResourceManager::HandleGetDrainingNodes(). We could potentially separate the scheduling-relevant fields (is_draining) from reporting-only fields (deadline). But that would require larger refactoring of NodeResources.

}

AddOrUpdateNode(node_id, local_view);
received_node_resources_[node_id] = std::move(local_view);
Expand All @@ -115,6 +117,26 @@ bool ClusterResourceManager::RemoveNode(scheduling::NodeID node_id) {
return nodes_.erase(node_id) != 0;
}

bool ClusterResourceManager::SetNodeDraining(const scheduling::NodeID &node_id,
bool is_draining,
int64_t draining_deadline_timestamp_ms) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
return false;
}

auto *local_view = it->second.GetMutableLocalView();
local_view->is_draining = is_draining;
local_view->draining_deadline_timestamp_ms = draining_deadline_timestamp_ms;
auto rnr_it = received_node_resources_.find(node_id);
if (rnr_it != received_node_resources_.end()) {
rnr_it->second.is_draining = is_draining;
rnr_it->second.draining_deadline_timestamp_ms = draining_deadline_timestamp_ms;
}

return true;
}

bool ClusterResourceManager::GetNodeResources(scheduling::NodeID node_id,
NodeResources *ret_resources) const {
auto it = nodes_.find(node_id);
Expand Down
12 changes: 12 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ class ClusterResourceManager {
return node.GetLocalView().is_draining;
}

/// Set the draining state of a node.
/// This is called when the autoscaler commits to draining a node, ensuring
/// the scheduler immediately sees the node as unavailable for scheduling.
///
/// \param node_id The ID of the node.
/// \param is_draining Whether the node is draining.
/// \param draining_deadline_timestamp_ms The deadline for the drain operation.
/// \return true if the node exists and was updated, false otherwise.
bool SetNodeDraining(const scheduling::NodeID &node_id,
bool is_draining,
int64_t draining_deadline_timestamp_ms);

/// @param max_num_nodes_to_include Max number of nodes to include in the debug string.
/// If not specified, all nodes will be included.
std::string DebugString(
Expand Down