|
| 1 | +package drain |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "encoding/json" |
| 6 | + "maps" |
| 7 | + "testing" |
| 8 | + "time" |
| 9 | + |
| 10 | + corev1 "k8s.io/api/core/v1" |
| 11 | + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| 12 | + "k8s.io/apimachinery/pkg/runtime" |
| 13 | + kubeinformers "k8s.io/client-go/informers" |
| 14 | + k8sfake "k8s.io/client-go/kubernetes/fake" |
| 15 | + core "k8s.io/client-go/testing" |
| 16 | + |
| 17 | + configv1 "github.com/openshift/api/config/v1" |
| 18 | + mcfgv1 "github.com/openshift/api/machineconfiguration/v1" |
| 19 | + fakemcfgclientset "github.com/openshift/client-go/machineconfiguration/clientset/versioned/fake" |
| 20 | + mcfginformers "github.com/openshift/client-go/machineconfiguration/informers/externalversions" |
| 21 | + "github.com/openshift/library-go/pkg/operator/configobserver/featuregates" |
| 22 | + daemonconsts "github.com/openshift/machine-config-operator/pkg/daemon/constants" |
| 23 | + |
| 24 | + "github.com/stretchr/testify/assert" |
| 25 | +) |
| 26 | + |
| 27 | +type fakeFeatureGateHandler struct{} |
| 28 | + |
| 29 | +func (f *fakeFeatureGateHandler) Connect(ctx context.Context) error { return nil } |
| 30 | +func (f *fakeFeatureGateHandler) Enabled(featureName configv1.FeatureGateName) bool { return false } |
| 31 | +func (f *fakeFeatureGateHandler) Exists(featureName configv1.FeatureGateName) bool { return false } |
| 32 | +func (f *fakeFeatureGateHandler) KnownFeatures() []configv1.FeatureGateName { return nil } |
| 33 | + |
| 34 | +const ( |
| 35 | + testNodeName = "test-node" |
| 36 | + testPoolName = "worker" |
| 37 | + testDrainState = "drain-test-hash" |
| 38 | + testUncordonState = "uncordon-test-hash" |
| 39 | +) |
| 40 | + |
| 41 | +func createTestNode(name string, unschedulable bool) *corev1.Node { |
| 42 | + node := &corev1.Node{ |
| 43 | + ObjectMeta: metav1.ObjectMeta{ |
| 44 | + Name: name, |
| 45 | + Annotations: make(map[string]string), |
| 46 | + }, |
| 47 | + Spec: corev1.NodeSpec{ |
| 48 | + Unschedulable: unschedulable, |
| 49 | + }, |
| 50 | + } |
| 51 | + return node |
| 52 | +} |
| 53 | + |
| 54 | +func createTestNodeWithAnnotations(name string, unschedulable bool, annotations map[string]string) *corev1.Node { |
| 55 | + node := createTestNode(name, unschedulable) |
| 56 | + maps.Copy(node.Annotations, annotations) |
| 57 | + return node |
| 58 | +} |
| 59 | + |
| 60 | +func createTestMCP(name string) *mcfgv1.MachineConfigPool { |
| 61 | + return &mcfgv1.MachineConfigPool{ |
| 62 | + ObjectMeta: metav1.ObjectMeta{ |
| 63 | + Name: name, |
| 64 | + }, |
| 65 | + Spec: mcfgv1.MachineConfigPoolSpec{ |
| 66 | + NodeSelector: &metav1.LabelSelector{ |
| 67 | + MatchLabels: map[string]string{ |
| 68 | + "node-role.kubernetes.io/" + name: "", |
| 69 | + }, |
| 70 | + }, |
| 71 | + }, |
| 72 | + } |
| 73 | +} |
| 74 | + |
| 75 | +func createTestController(nodes []*corev1.Node, mcps []*mcfgv1.MachineConfigPool) (*Controller, *k8sfake.Clientset, *fakemcfgclientset.Clientset) { |
| 76 | + kubeObjs := make([]runtime.Object, len(nodes)) |
| 77 | + for i, node := range nodes { |
| 78 | + kubeObjs[i] = node |
| 79 | + } |
| 80 | + kubeClient := k8sfake.NewSimpleClientset(kubeObjs...) |
| 81 | + |
| 82 | + mcfgObjs := make([]runtime.Object, len(mcps)) |
| 83 | + for i, mcp := range mcps { |
| 84 | + mcfgObjs[i] = mcp |
| 85 | + } |
| 86 | + mcfgClient := fakemcfgclientset.NewSimpleClientset(mcfgObjs...) |
| 87 | + |
| 88 | + kubeInformers := kubeinformers.NewSharedInformerFactory(kubeClient, 0) |
| 89 | + mcfgInformers := mcfginformers.NewSharedInformerFactory(mcfgClient, 0) |
| 90 | + |
| 91 | + nodeInformer := kubeInformers.Core().V1().Nodes() |
| 92 | + mcpInformer := mcfgInformers.Machineconfiguration().V1().MachineConfigPools() |
| 93 | + |
| 94 | + // Start informers to properly initialize them |
| 95 | + kubeInformers.Start(make(chan struct{})) |
| 96 | + mcfgInformers.Start(make(chan struct{})) |
| 97 | + |
| 98 | + // Add nodes to informer |
| 99 | + for _, node := range nodes { |
| 100 | + nodeInformer.Informer().GetIndexer().Add(node) |
| 101 | + } |
| 102 | + // Add MCPs to informer |
| 103 | + for _, mcp := range mcps { |
| 104 | + mcpInformer.Informer().GetIndexer().Add(mcp) |
| 105 | + } |
| 106 | + |
| 107 | + cfg := DefaultConfig() |
| 108 | + cfg.DrainTimeoutDuration = 10 * time.Minute |
| 109 | + cfg.DrainRequeueDelay = 1 * time.Minute |
| 110 | + cfg.DrainRequeueFailingDelay = 5 * time.Minute |
| 111 | + cfg.DrainRequeueFailingThreshold = 5 * time.Minute |
| 112 | + |
| 113 | + fgAccess := featuregates.NewHardcodedFeatureGateAccess([]configv1.FeatureGateName{"MachineConfigNodes", "PinnedImages"}, nil) |
| 114 | + |
| 115 | + ctrl := New(cfg, nodeInformer, mcpInformer, kubeClient, mcfgClient, fgAccess) |
| 116 | + |
| 117 | + // Initialize ongoing drains map for testing |
| 118 | + ctrl.ongoingDrains = make(map[string]time.Time) |
| 119 | + |
| 120 | + return ctrl, kubeClient, mcfgClient |
| 121 | +} |
| 122 | + |
| 123 | +func createDrainTestNode(nodeName string, unschedulable bool, desiredState, lastAppliedState string) *corev1.Node { |
| 124 | + node := createTestNodeWithAnnotations(nodeName, unschedulable, map[string]string{ |
| 125 | + daemonconsts.DesiredDrainerAnnotationKey: desiredState, |
| 126 | + daemonconsts.LastAppliedDrainerAnnotationKey: lastAppliedState, |
| 127 | + }) |
| 128 | + node.Labels = map[string]string{ |
| 129 | + "node-role.kubernetes.io/" + testPoolName: "", |
| 130 | + } |
| 131 | + return node |
| 132 | +} |
| 133 | + |
| 134 | +func setupControllerAndSync(node *corev1.Node, ongoingDrains map[string]time.Time) (*Controller, *k8sfake.Clientset, error) { |
| 135 | + ctrl, kubeClient, _ := createTestController([]*corev1.Node{node}, []*mcfgv1.MachineConfigPool{createTestMCP(testPoolName)}) |
| 136 | + |
| 137 | + if ongoingDrains != nil { |
| 138 | + ctrl.ongoingDrains = ongoingDrains |
| 139 | + } |
| 140 | + |
| 141 | + err := ctrl.syncNode(testNodeName) |
| 142 | + return ctrl, kubeClient, err |
| 143 | +} |
| 144 | + |
| 145 | +func verifyDrainPatches(t *testing.T, kubeClient *k8sfake.Clientset, expectedUnschedulable bool, expectedAnnotationValue string) { |
| 146 | + // Collect all patch actions |
| 147 | + patchActions := []core.PatchAction{} |
| 148 | + for _, action := range kubeClient.Actions() { |
| 149 | + if patchAction, ok := action.(core.PatchAction); ok { |
| 150 | + patchActions = append(patchActions, patchAction) |
| 151 | + } |
| 152 | + } |
| 153 | + |
| 154 | + // Verify exactly 2 patch operations occurred |
| 155 | + assert.Len(t, patchActions, 2, "should have made exactly two patch requests") |
| 156 | + |
| 157 | + // Verify the first patch sets the correct scheduling state |
| 158 | + firstPatchBytes := patchActions[0].GetPatch() |
| 159 | + var firstPatch map[string]any |
| 160 | + err := json.Unmarshal(firstPatchBytes, &firstPatch) |
| 161 | + assert.NoError(t, err, "unmarshalling first patch failed") |
| 162 | + if spec, ok := firstPatch["spec"].(map[string]any); ok { |
| 163 | + if unschedulable, ok := spec["unschedulable"].(bool); ok { |
| 164 | + assert.Equal(t, expectedUnschedulable, unschedulable, "first patch should set node schedulable state correctly") |
| 165 | + } |
| 166 | + } |
| 167 | + |
| 168 | + // Verify the second patch sets the LastAppliedDrainerAnnotationKey annotation |
| 169 | + secondPatchBytes := patchActions[1].GetPatch() |
| 170 | + var secondPatch map[string]any |
| 171 | + err = json.Unmarshal(secondPatchBytes, &secondPatch) |
| 172 | + assert.NoError(t, err, "unmarshalling second patch failed") |
| 173 | + if metadata, ok := secondPatch["metadata"].(map[string]any); ok { |
| 174 | + if annotations, ok := metadata["annotations"].(map[string]any); ok { |
| 175 | + if lastApplied, ok := annotations[daemonconsts.LastAppliedDrainerAnnotationKey].(string); ok { |
| 176 | + assert.Equal(t, expectedAnnotationValue, lastApplied, "LastAppliedDrainerAnnotationKey should be set correctly") |
| 177 | + } |
| 178 | + } |
| 179 | + } |
| 180 | +} |
| 181 | + |
| 182 | +func TestSyncNode(t *testing.T) { |
| 183 | + |
| 184 | + t.Run("uncordon requested", func(t *testing.T) { |
| 185 | + node := createDrainTestNode(testNodeName, true, testUncordonState, "") |
| 186 | + _, kubeClient, err := setupControllerAndSync(node, nil) |
| 187 | + assert.NoError(t, err, "syncNode should not error for uncordon action") |
| 188 | + |
| 189 | + // Verify patch operations: uncordon (schedulable=false) + completion annotation |
| 190 | + verifyDrainPatches(t, kubeClient, false, testUncordonState) |
| 191 | + |
| 192 | + }) |
| 193 | + |
| 194 | + t.Run("drain requested", func(t *testing.T) { |
| 195 | + node := createDrainTestNode(testNodeName, false, testDrainState, "") |
| 196 | + _, kubeClient, err := setupControllerAndSync(node, nil) |
| 197 | + assert.NoError(t, err, "syncNode should not error for drain action") |
| 198 | + |
| 199 | + // Verify patch operations: cordon (unschedulable=true) + completion annotation |
| 200 | + verifyDrainPatches(t, kubeClient, true, testDrainState) |
| 201 | + }) |
| 202 | + |
| 203 | + t.Run("re-cordon required", func(t *testing.T) { |
| 204 | + node := createDrainTestNode(testNodeName, false, testDrainState, "") |
| 205 | + // Simulate ongoing drain (but node is not cordoned - external uncordon) |
| 206 | + ongoingDrains := map[string]time.Time{ |
| 207 | + testNodeName: time.Now().Add(-5 * time.Minute), |
| 208 | + } |
| 209 | + _, kubeClient, err := setupControllerAndSync(node, ongoingDrains) |
| 210 | + assert.NoError(t, err, "syncNode should not error for re-cordon action") |
| 211 | + |
| 212 | + // Verify patch operations: re-cordon (unschedulable=true) + completion annotation |
| 213 | + verifyDrainPatches(t, kubeClient, true, testDrainState) |
| 214 | + }) |
| 215 | +} |
0 commit comments