Skip to content

Commit e70e706

Browse files
authored
enhance: [2.5] skip adding stopping node to resource group in handleNodeUp (#45969) (#45982)
Cherry-pick from master pr: #45969 Related to #45960 Follow-up to #45961 After #45961 ensured that handleNodeUp is always called for nodes discovered during rewatchNodes (including stopping nodes), this change adds a safeguard in ResourceManager.handleNodeUp to skip adding stopping nodes to resource groups. 1. **resource_manager.go**: Add check for IsStoppingState() in handleNodeUp to prevent stopping nodes from being added to incomingNode set and assigned to resource groups. 2. **server.go**: - Delete processed nodes from sessionMap to avoid duplicate processing in the subsequent loop - Add warning logs for stopping state transitions during rewatch Signed-off-by: Congqi Xia <[email protected]>
1 parent 61c8023 commit e70e706

File tree

2 files changed

+15
-5
lines changed

2 files changed

+15
-5
lines changed

internal/querycoordv2/meta/resource_manager.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,12 @@ func (rm *ResourceManager) HandleNodeUp(ctx context.Context, node int64) {
469469
}
470470

471471
func (rm *ResourceManager) handleNodeUp(ctx context.Context, node int64) {
472-
if nodeInfo := rm.nodeMgr.Get(node); nodeInfo == nil || nodeInfo.IsEmbeddedQueryNodeInStreamingNode() {
472+
nodeInfo := rm.nodeMgr.Get(node)
473+
if nodeInfo == nil || nodeInfo.IsEmbeddedQueryNodeInStreamingNode() {
474+
return
475+
}
476+
if nodeInfo.IsStoppingState() {
477+
log.Warn("node is stopping, skip handle node up in resource manager", zap.Int64("node", node))
473478
return
474479
}
475480
rm.incomingNode.Insert(node)

internal/querycoordv2/server.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -799,10 +799,14 @@ func (s *Server) rewatchNodes(sessions map[string]*sessionutil.Session) error {
799799
// node in node manager but session not exist, means it's offline
800800
s.nodeMgr.Remove(node.ID())
801801
s.handleNodeDown(node.ID())
802-
} else if nodeSession.Stopping && !node.IsStoppingState() {
803-
// node in node manager but session is stopping, means it's stopping
804-
s.nodeMgr.Stopping(node.ID())
805-
s.handleNodeStopping(node.ID())
802+
} else {
803+
if nodeSession.Stopping && !node.IsStoppingState() {
804+
// node in node manager but session is stopping, means it's stopping
805+
log.Warn("rewatch found old querynode in stopping state", zap.Int64("nodeID", nodeSession.ServerID))
806+
s.nodeMgr.Stopping(node.ID())
807+
s.handleNodeStopping(node.ID())
808+
}
809+
delete(sessionMap, node.ID())
806810
}
807811
}
808812

@@ -823,6 +827,7 @@ func (s *Server) rewatchNodes(sessions map[string]*sessionutil.Session) error {
823827
s.handleNodeUp(nodeSession.GetServerID())
824828

825829
if nodeSession.Stopping {
830+
log.Warn("rewatch found new querynode in stopping state", zap.Int64("nodeID", nodeSession.ServerID))
826831
s.nodeMgr.Stopping(nodeSession.ServerID)
827832
s.handleNodeStopping(nodeSession.ServerID)
828833
}

0 commit comments

Comments
 (0)