Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/ray/gcs/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,10 @@ void GcsNodeManager::SetNodeDraining(
<< drain_request->DebugString();
iter->second = drain_request;
}

for (auto &listener : node_draining_listeners_) {
listener(node_id, true, drain_request->deadline_timestamp_ms());
}
}

std::shared_ptr<const rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
Expand Down
15 changes: 15 additions & 0 deletions src/ray/gcs/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,17 @@ class GcsNodeManager : public rpc::NodeInfoGcsServiceHandler {
node_added_listeners_.emplace_back(std::move(listener), io_context);
}

/// Add listener to monitor when a node is set to draining.
/// The listener receives (node_id, is_draining, draining_deadline_timestamp_ms).
///
/// \param listener The handler which processes the draining state change.
void AddNodeDrainingListener(
std::function<void(const NodeID &, bool, int64_t)> listener) {
absl::MutexLock lock(&mutex_);
RAY_CHECK(listener);
node_draining_listeners_.emplace_back(std::move(listener));
}

/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
Expand Down Expand Up @@ -368,6 +379,10 @@ class GcsNodeManager : public rpc::NodeInfoGcsServiceHandler {
std::vector<Postable<void(std::shared_ptr<const rpc::GcsNodeInfo>)>>
node_removed_listeners_ ABSL_GUARDED_BY(mutex_);

/// Listeners which monitor when nodes are set to draining.
std::vector<std::function<void(const NodeID &, bool, int64_t)>> node_draining_listeners_
ABSL_GUARDED_BY(mutex_);

/// A publisher for publishing gcs messages.
pubsub::GcsPublisher *gcs_publisher_;
/// Storage for GCS tables.
Expand Down
7 changes: 7 additions & 0 deletions src/ray/gcs/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ void GcsResourceManager::OnNodeDead(const NodeID &node_id) {
num_alive_nodes_--;
}

void GcsResourceManager::SetNodeDraining(const NodeID &node_id,
bool is_draining,
int64_t draining_deadline_timestamp_ms) {
cluster_resource_manager_.SetNodeDraining(
scheduling::NodeID(node_id.Binary()), is_draining, draining_deadline_timestamp_ms);
}

void GcsResourceManager::UpdatePlacementGroupLoad(
const std::shared_ptr<rpc::PlacementGroupLoad> placement_group_load) {
RAY_CHECK(placement_group_load != nullptr);
Expand Down
9 changes: 9 additions & 0 deletions src/ray/gcs/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ class GcsResourceManager : public rpc::NodeResourceInfoGcsServiceHandler,
/// \param node_id The specified node id.
void OnNodeDead(const NodeID &node_id);

/// Set the draining state of a node.
///
/// \param node_id The ID of the node.
/// \param is_draining Whether the node is draining.
/// \param draining_deadline_timestamp_ms The deadline for the drain operation.
void SetNodeDraining(const NodeID &node_id,
bool is_draining,
int64_t draining_deadline_timestamp_ms);

/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,11 @@ void GcsServer::InstallEventListeners() {
gcs_autoscaler_state_manager_->OnNodeDead(node_id);
},
io_context_provider_.GetDefaultIOContext());
gcs_node_manager_->AddNodeDrainingListener(
[this](const NodeID &node_id, bool is_draining, int64_t deadline_timestamp_ms) {
gcs_resource_manager_->SetNodeDraining(
node_id, is_draining, deadline_timestamp_ms);
});

// Install worker event listener.
gcs_worker_manager_->AddWorkerDeadListener(
Expand Down
39 changes: 39 additions & 0 deletions src/ray/gcs/tests/gcs_resource_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,43 @@ TEST_F(GcsResourceManagerTest, TestGetDrainingNodes) {
ASSERT_EQ(reply.draining_nodes(0).node_id(), node1->node_id());
}

// Verify SetNodeDraining() immediately updates ClusterResourceManager.
TEST_F(GcsResourceManagerTest, DrainStateImmediatelyVisibleToScheduler) {
auto node = GenNodeInfo();
node->mutable_resources_total()->insert({"CPU", 10});
gcs_resource_manager_->OnNodeAdd(*node);

auto node_id = NodeID::FromBinary(node->node_id());
auto scheduling_node_id = scheduling::NodeID(node_id.Binary());

UpdateFromResourceViewSync(node_id,
{{"CPU", 10}},
{{"CPU", 10}},
/*idle_ms=*/0,
/*is_draining=*/false);

ASSERT_FALSE(cluster_resource_manager_.IsNodeDraining(scheduling_node_id));

gcs_node_manager_->AddNodeDrainingListener(
[this](const NodeID &node_id, bool is_draining, int64_t deadline_timestamp_ms) {
cluster_resource_manager_.SetNodeDraining(
scheduling::NodeID(node_id.Binary()), is_draining, deadline_timestamp_ms);
});

auto drain_request = std::make_shared<rpc::autoscaler::DrainNodeRequest>();
drain_request->set_node_id(node_id.Binary());
drain_request->set_reason(rpc::autoscaler::DRAIN_NODE_REASON_IDLE_TERMINATION);
drain_request->set_reason_message("idle termination");

gcs_node_manager_->AddNode(std::make_shared<rpc::GcsNodeInfo>(*node));
drain_request->set_deadline_timestamp_ms(12345);
gcs_node_manager_->SetNodeDraining(node_id, drain_request);

ASSERT_TRUE(cluster_resource_manager_.IsNodeDraining(scheduling_node_id));
const auto &node_resources =
cluster_resource_manager_.GetNodeResources(scheduling_node_id);
ASSERT_EQ(node_resources.draining_deadline_timestamp_ms,
drain_request->deadline_timestamp_ms());
}

} // namespace ray
28 changes: 25 additions & 3 deletions src/ray/raylet/scheduling/cluster_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ bool ClusterResourceManager::UpdateNode(
// Last update time to the local node resources view.
local_view.last_resource_update_time = absl::Now();

local_view.is_draining = resource_view_sync_message.is_draining();
local_view.draining_deadline_timestamp_ms =
resource_view_sync_message.draining_deadline_timestamp_ms();
if (!local_view.is_draining) {
local_view.is_draining = resource_view_sync_message.is_draining();
local_view.draining_deadline_timestamp_ms =
resource_view_sync_message.draining_deadline_timestamp_ms();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a tangent, but why does class care about the draining_deadline anyway? I'm wondering if we're bloating protocol somehow with this...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduler only uses is_draining for scheduling decisions. The deadline is stored but only used for reporting in GcsResourceManager::HandleGetDrainingNodes(). We could potentially separate the scheduling-relevant fields (is_draining) from reporting-only fields (deadline). But that would require larger refactoring of NodeResources.

}

AddOrUpdateNode(node_id, local_view);
received_node_resources_[node_id] = std::move(local_view);
Expand All @@ -115,6 +117,26 @@ bool ClusterResourceManager::RemoveNode(scheduling::NodeID node_id) {
return nodes_.erase(node_id) != 0;
}

bool ClusterResourceManager::SetNodeDraining(const scheduling::NodeID &node_id,
bool is_draining,
int64_t draining_deadline_timestamp_ms) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
return false;
}

auto *local_view = it->second.GetMutableLocalView();
local_view->is_draining = is_draining;
local_view->draining_deadline_timestamp_ms = draining_deadline_timestamp_ms;
auto rnr_it = received_node_resources_.find(node_id);
if (rnr_it != received_node_resources_.end()) {
rnr_it->second.is_draining = is_draining;
rnr_it->second.draining_deadline_timestamp_ms = draining_deadline_timestamp_ms;
}

return true;
}

bool ClusterResourceManager::GetNodeResources(scheduling::NodeID node_id,
NodeResources *ret_resources) const {
auto it = nodes_.find(node_id);
Expand Down
12 changes: 12 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ class ClusterResourceManager {
return node.GetLocalView().is_draining;
}

/// Set the draining state of a node.
/// This is called when the autoscaler commits to draining a node, ensuring
/// the scheduler immediately sees the node as unavailable for scheduling.
///
/// \param node_id The ID of the node.
/// \param is_draining Whether the node is draining.
/// \param draining_deadline_timestamp_ms The deadline for the drain operation.
/// \return true if the node exists and was updated, false otherwise.
bool SetNodeDraining(const scheduling::NodeID &node_id,
bool is_draining,
int64_t draining_deadline_timestamp_ms);

/// @param max_num_nodes_to_include Max number of nodes to include in the debug string.
/// If not specified, all nodes will be included.
std::string DebugString(
Expand Down