Skip to content

Commit eb17591

Browse files
committed
drain: re-cordon nodes if needed
1 parent b1595be commit eb17591

File tree

2 files changed

+217
-1
lines changed

2 files changed

+217
-1
lines changed

pkg/controller/drain/drain_controller.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,13 @@ func (ctrl *Controller) drainNode(node *corev1.Node, drainer *drain.Helper) erro
385385
break
386386
}
387387

388-
if !isOngoingDrain {
388+
// isOngoingDrain will only inform us of MCO managed cordons/drains. We should also check
389+
// if an external actor has uncordoned this node, and force it back into a cordoned state if so.
390+
if isOngoingDrain && !node.Spec.Unschedulable {
391+
klog.Infof("External actor has unexpectedly uncordoned node %s, cordoning again...", node.Name)
392+
}
393+
394+
if !isOngoingDrain || !node.Spec.Unschedulable {
389395
ctrl.logNode(node, "cordoning")
390396
// perform cordon
391397
if err := ctrl.cordonOrUncordonNode(true, node, drainer); err != nil {
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package drain
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"maps"
7+
"testing"
8+
"time"
9+
10+
corev1 "k8s.io/api/core/v1"
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"k8s.io/apimachinery/pkg/runtime"
13+
kubeinformers "k8s.io/client-go/informers"
14+
k8sfake "k8s.io/client-go/kubernetes/fake"
15+
core "k8s.io/client-go/testing"
16+
17+
configv1 "github.com/openshift/api/config/v1"
18+
mcfgv1 "github.com/openshift/api/machineconfiguration/v1"
19+
fakemcfgclientset "github.com/openshift/client-go/machineconfiguration/clientset/versioned/fake"
20+
mcfginformers "github.com/openshift/client-go/machineconfiguration/informers/externalversions"
21+
"github.com/openshift/library-go/pkg/operator/configobserver/featuregates"
22+
daemonconsts "github.com/openshift/machine-config-operator/pkg/daemon/constants"
23+
24+
"github.com/stretchr/testify/assert"
25+
)
26+
27+
type fakeFeatureGateHandler struct{}
28+
29+
func (f *fakeFeatureGateHandler) Connect(ctx context.Context) error { return nil }
30+
func (f *fakeFeatureGateHandler) Enabled(featureName configv1.FeatureGateName) bool { return false }
31+
func (f *fakeFeatureGateHandler) Exists(featureName configv1.FeatureGateName) bool { return false }
32+
func (f *fakeFeatureGateHandler) KnownFeatures() []configv1.FeatureGateName { return nil }
33+
34+
const (
35+
testNodeName = "test-node"
36+
testPoolName = "worker"
37+
testDrainState = "drain-test-hash"
38+
testUncordonState = "uncordon-test-hash"
39+
)
40+
41+
func createTestNode(name string, unschedulable bool) *corev1.Node {
42+
node := &corev1.Node{
43+
ObjectMeta: metav1.ObjectMeta{
44+
Name: name,
45+
Annotations: make(map[string]string),
46+
},
47+
Spec: corev1.NodeSpec{
48+
Unschedulable: unschedulable,
49+
},
50+
}
51+
return node
52+
}
53+
54+
func createTestNodeWithAnnotations(name string, unschedulable bool, annotations map[string]string) *corev1.Node {
55+
node := createTestNode(name, unschedulable)
56+
maps.Copy(node.Annotations, annotations)
57+
return node
58+
}
59+
60+
func createTestMCP(name string) *mcfgv1.MachineConfigPool {
61+
return &mcfgv1.MachineConfigPool{
62+
ObjectMeta: metav1.ObjectMeta{
63+
Name: name,
64+
},
65+
Spec: mcfgv1.MachineConfigPoolSpec{
66+
NodeSelector: &metav1.LabelSelector{
67+
MatchLabels: map[string]string{
68+
"node-role.kubernetes.io/" + name: "",
69+
},
70+
},
71+
},
72+
}
73+
}
74+
75+
func createTestController(nodes []*corev1.Node, mcps []*mcfgv1.MachineConfigPool) (*Controller, *k8sfake.Clientset, *fakemcfgclientset.Clientset) {
76+
kubeObjs := make([]runtime.Object, len(nodes))
77+
for i, node := range nodes {
78+
kubeObjs[i] = node
79+
}
80+
kubeClient := k8sfake.NewSimpleClientset(kubeObjs...)
81+
82+
mcfgObjs := make([]runtime.Object, len(mcps))
83+
for i, mcp := range mcps {
84+
mcfgObjs[i] = mcp
85+
}
86+
mcfgClient := fakemcfgclientset.NewSimpleClientset(mcfgObjs...)
87+
88+
kubeInformers := kubeinformers.NewSharedInformerFactory(kubeClient, 0)
89+
mcfgInformers := mcfginformers.NewSharedInformerFactory(mcfgClient, 0)
90+
91+
nodeInformer := kubeInformers.Core().V1().Nodes()
92+
93+
// Start informers to properly initialize them
94+
kubeInformers.Start(make(chan struct{}))
95+
mcfgInformers.Start(make(chan struct{}))
96+
97+
// Add nodes to informer
98+
for _, node := range nodes {
99+
nodeInformer.Informer().GetIndexer().Add(node)
100+
}
101+
102+
cfg := DefaultConfig()
103+
cfg.DrainTimeoutDuration = 10 * time.Minute
104+
cfg.DrainRequeueDelay = 1 * time.Minute
105+
cfg.DrainRequeueFailingDelay = 5 * time.Minute
106+
cfg.DrainRequeueFailingThreshold = 5 * time.Minute
107+
108+
fgAccess := featuregates.NewHardcodedFeatureGateAccess([]configv1.FeatureGateName{"MachineConfigNodes", "PinnedImages"}, nil)
109+
110+
ctrl := New(cfg, nodeInformer, kubeClient, mcfgClient, fgAccess)
111+
112+
// Initialize ongoing drains map for testing
113+
ctrl.ongoingDrains = make(map[string]time.Time)
114+
115+
return ctrl, kubeClient, mcfgClient
116+
}
117+
118+
func createDrainTestNode(nodeName string, unschedulable bool, desiredState, lastAppliedState string) *corev1.Node {
119+
node := createTestNodeWithAnnotations(nodeName, unschedulable, map[string]string{
120+
daemonconsts.DesiredDrainerAnnotationKey: desiredState,
121+
daemonconsts.LastAppliedDrainerAnnotationKey: lastAppliedState,
122+
})
123+
node.Labels = map[string]string{
124+
"node-role.kubernetes.io/" + testPoolName: "",
125+
}
126+
return node
127+
}
128+
129+
func setupControllerAndSync(node *corev1.Node, ongoingDrains map[string]time.Time) (*Controller, *k8sfake.Clientset, error) {
130+
ctrl, kubeClient, _ := createTestController([]*corev1.Node{node}, []*mcfgv1.MachineConfigPool{createTestMCP(testPoolName)})
131+
132+
if ongoingDrains != nil {
133+
ctrl.ongoingDrains = ongoingDrains
134+
}
135+
136+
err := ctrl.syncNode(testNodeName)
137+
return ctrl, kubeClient, err
138+
}
139+
140+
func verifyDrainPatches(t *testing.T, kubeClient *k8sfake.Clientset, expectedUnschedulable bool, expectedAnnotationValue string) {
141+
// Collect all patch actions
142+
patchActions := []core.PatchAction{}
143+
for _, action := range kubeClient.Actions() {
144+
if patchAction, ok := action.(core.PatchAction); ok {
145+
patchActions = append(patchActions, patchAction)
146+
}
147+
}
148+
149+
// Verify exactly 2 patch operations occurred
150+
assert.Len(t, patchActions, 2, "should have made exactly two patch requests")
151+
152+
// Verify the first patch sets the correct scheduling state
153+
firstPatchBytes := patchActions[0].GetPatch()
154+
var firstPatch map[string]any
155+
err := json.Unmarshal(firstPatchBytes, &firstPatch)
156+
assert.NoError(t, err, "unmarshalling first patch failed")
157+
if spec, ok := firstPatch["spec"].(map[string]any); ok {
158+
if unschedulable, ok := spec["unschedulable"].(bool); ok {
159+
assert.Equal(t, expectedUnschedulable, unschedulable, "first patch should set node schedulable state correctly")
160+
}
161+
}
162+
163+
// Verify the second patch sets the LastAppliedDrainerAnnotationKey annotation
164+
secondPatchBytes := patchActions[1].GetPatch()
165+
var secondPatch map[string]any
166+
err = json.Unmarshal(secondPatchBytes, &secondPatch)
167+
assert.NoError(t, err, "unmarshalling second patch failed")
168+
if metadata, ok := secondPatch["metadata"].(map[string]any); ok {
169+
if annotations, ok := metadata["annotations"].(map[string]any); ok {
170+
if lastApplied, ok := annotations[daemonconsts.LastAppliedDrainerAnnotationKey].(string); ok {
171+
assert.Equal(t, expectedAnnotationValue, lastApplied, "LastAppliedDrainerAnnotationKey should be set correctly")
172+
}
173+
}
174+
}
175+
}
176+
177+
func TestSyncNode(t *testing.T) {
178+
179+
t.Run("uncordon requested", func(t *testing.T) {
180+
node := createDrainTestNode(testNodeName, true, testUncordonState, "")
181+
_, kubeClient, err := setupControllerAndSync(node, nil)
182+
assert.NoError(t, err, "syncNode should not error for uncordon action")
183+
184+
// Verify patch operations: uncordon (schedulable=false) + completion annotation
185+
verifyDrainPatches(t, kubeClient, false, testUncordonState)
186+
187+
})
188+
189+
t.Run("drain requested", func(t *testing.T) {
190+
node := createDrainTestNode(testNodeName, false, testDrainState, "")
191+
_, kubeClient, err := setupControllerAndSync(node, nil)
192+
assert.NoError(t, err, "syncNode should not error for drain action")
193+
194+
// Verify patch operations: cordon (unschedulable=true) + completion annotation
195+
verifyDrainPatches(t, kubeClient, true, testDrainState)
196+
})
197+
198+
t.Run("re-cordon required", func(t *testing.T) {
199+
node := createDrainTestNode(testNodeName, false, testDrainState, "")
200+
// Simulate ongoing drain (but node is not cordoned - external uncordon)
201+
ongoingDrains := map[string]time.Time{
202+
testNodeName: time.Now().Add(-5 * time.Minute),
203+
}
204+
_, kubeClient, err := setupControllerAndSync(node, ongoingDrains)
205+
assert.NoError(t, err, "syncNode should not error for re-cordon action")
206+
207+
// Verify patch operations: re-cordon (unschedulable=true) + completion annotation
208+
verifyDrainPatches(t, kubeClient, true, testDrainState)
209+
})
210+
}

0 commit comments

Comments
 (0)