From f2d4eeb104cdc99a54e26ec33278194ed28391cc Mon Sep 17 00:00:00 2001 From: justinsb Date: Mon, 13 Jan 2025 17:47:48 -0500 Subject: [PATCH] reconcile: wait for apiserver to response before trying rolling-update The rolling-update requires the apiserver (when called without --cloudonly), so reconcile should wait for apiserver to start responding. Implement this by reusing "validate cluster", but filtering to only the instance groups and pods that we expect to be online. --- cmd/kops/delete_instance.go | 2 +- cmd/kops/reconcile_cluster.go | 26 ++++++ cmd/kops/rolling-update_cluster.go | 2 +- cmd/kops/validate_cluster.go | 24 ++++-- pkg/instancegroups/instancegroups.go | 2 +- pkg/instancegroups/rollingupdate_test.go | 22 +++-- .../rollingupdate_warmpool_test.go | 2 +- pkg/validation/validate_cluster.go | 83 ++++++++++++++----- pkg/validation/validate_cluster_test.go | 13 ++- 9 files changed, 125 insertions(+), 51 deletions(-) diff --git a/cmd/kops/delete_instance.go b/cmd/kops/delete_instance.go index 4b3628323f35b..f83d4fd9b8b4b 100644 --- a/cmd/kops/delete_instance.go +++ b/cmd/kops/delete_instance.go @@ -258,7 +258,7 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti var clusterValidator validation.ClusterValidator if !options.CloudOnly { - clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient) + clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, nil, nil, restConfig, k8sClient) if err != nil { return fmt.Errorf("cannot create cluster validator: %v", err) } diff --git a/cmd/kops/reconcile_cluster.go b/cmd/kops/reconcile_cluster.go index 7ec9db74450a5..92d8fbb8ba509 100644 --- a/cmd/kops/reconcile_cluster.go +++ b/cmd/kops/reconcile_cluster.go @@ -20,8 +20,10 @@ import ( "context" "fmt" "io" + "time" "github.com/spf13/cobra" + v1 "k8s.io/api/core/v1" "k8s.io/kops/cmd/kops/util" "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/commands/commandutils" @@ -134,6 +136,30 @@ func RunReconcileCluster(ctx context.Context, f *util.Factory, out io.Writer, c } } + // Particularly for a new cluster, we need to wait for the control plane to be answering requests + // before we can do a rolling update. + fmt.Fprintf(out, "Waiting for the kubernetes API to be served\n") + { + opt := &ValidateClusterOptions{} + opt.InitDefaults() + opt.ClusterName = c.ClusterName + opt.wait = 10 * time.Minute + + // filter the instance group to only include the control plane + opt.filterInstanceGroups = func(ig *kops.InstanceGroup) bool { + return ig.Spec.Role == kops.InstanceGroupRoleAPIServer || ig.Spec.Role == kops.InstanceGroupRoleControlPlane + } + + // Ignore all pods, we just want to check the control plane is responding + opt.filterPodsForValidation = func(pod *v1.Pod) bool { + return false + } + + if _, err := RunValidateCluster(ctx, f, out, opt); err != nil { + return fmt.Errorf("waiting for kubernetes API to be served: %w", err) + } + } + fmt.Fprintf(out, "Doing rolling-update for control plane\n") { opt := &RollingUpdateOptions{} diff --git a/cmd/kops/rolling-update_cluster.go b/cmd/kops/rolling-update_cluster.go index 5c0ab84905f8b..ad3fb0e0b92ec 100644 --- a/cmd/kops/rolling-update_cluster.go +++ b/cmd/kops/rolling-update_cluster.go @@ -453,7 +453,7 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer return fmt.Errorf("getting rest config: %w", err) } - clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient) + clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, nil, nil, restConfig, k8sClient) if err != nil { return fmt.Errorf("cannot create cluster validator: %v", err) } diff --git a/cmd/kops/validate_cluster.go b/cmd/kops/validate_cluster.go index 01c842f961cf1..9fcc45e401d71 100644 --- a/cmd/kops/validate_cluster.go +++ b/cmd/kops/validate_cluster.go @@ -36,6 +36,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "k8s.io/kops/cmd/kops/util" + "k8s.io/kops/pkg/apis/kops" kopsapi "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/validation" "k8s.io/kops/util/pkg/tables" @@ -61,12 +62,19 @@ var ( ) type ValidateClusterOptions struct { - ClusterName string - output string - wait time.Duration - count int - interval time.Duration - kubeconfig string + ClusterName string + InstanceGroupRoles []kops.InstanceGroupRole + output string + wait time.Duration + count int + interval time.Duration + kubeconfig string + + // filterInstanceGroups is a function that returns true if the instance group should be validated + filterInstanceGroups func(ig *kops.InstanceGroup) bool + + // filterPodsForValidation is a function that returns true if the pod should be validated + filterPodsForValidation func(pod *v1.Pod) bool } func (o *ValidateClusterOptions) InitDefaults() { @@ -164,7 +172,7 @@ func RunValidateCluster(ctx context.Context, f *util.Factory, out io.Writer, opt timeout := time.Now().Add(options.wait) - validator, err := validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient) + validator, err := validation.NewClusterValidator(cluster, cloud, list, options.filterInstanceGroups, options.filterPodsForValidation, restConfig, k8sClient) if err != nil { return nil, fmt.Errorf("unexpected error creating validatior: %v", err) } @@ -175,7 +183,7 @@ func RunValidateCluster(ctx context.Context, f *util.Factory, out io.Writer, opt return nil, fmt.Errorf("wait time exceeded during validation") } - result, err := validator.Validate() + result, err := validator.Validate(ctx) if err != nil { consecutive = 0 if options.wait > 0 { diff --git a/pkg/instancegroups/instancegroups.go b/pkg/instancegroups/instancegroups.go index 4d8c623fcd16b..813ecd085f45f 100644 --- a/pkg/instancegroups/instancegroups.go +++ b/pkg/instancegroups/instancegroups.go @@ -537,7 +537,7 @@ func (c *RollingUpdateCluster) validateClusterWithTimeout(validateCount int, gro for { // Note that we validate at least once before checking the timeout, in case the cluster is healthy with a short timeout - result, err := c.ClusterValidator.Validate() + result, err := c.ClusterValidator.Validate(ctx) if err == nil && !hasFailureRelevantToGroup(result.Failures, group) { successCount++ if successCount >= validateCount { diff --git a/pkg/instancegroups/rollingupdate_test.go b/pkg/instancegroups/rollingupdate_test.go index 0eb2e6b10dda7..d559378a78e6d 100644 --- a/pkg/instancegroups/rollingupdate_test.go +++ b/pkg/instancegroups/rollingupdate_test.go @@ -84,13 +84,13 @@ func getTestSetup() (*RollingUpdateCluster, *awsup.MockAWSCloud) { type successfulClusterValidator struct{} -func (*successfulClusterValidator) Validate() (*validation.ValidationCluster, error) { +func (*successfulClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) { return &validation.ValidationCluster{}, nil } type failingClusterValidator struct{} -func (*failingClusterValidator) Validate() (*validation.ValidationCluster, error) { +func (*failingClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) { return &validation.ValidationCluster{ Failures: []*validation.ValidationError{ { @@ -104,7 +104,7 @@ func (*failingClusterValidator) Validate() (*validation.ValidationCluster, error type erroringClusterValidator struct{} -func (*erroringClusterValidator) Validate() (*validation.ValidationCluster, error) { +func (*erroringClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) { return nil, errors.New("testing validation error") } @@ -113,7 +113,7 @@ type instanceGroupNodeSpecificErrorClusterValidator struct { InstanceGroup *kopsapi.InstanceGroup } -func (igErrorValidator *instanceGroupNodeSpecificErrorClusterValidator) Validate() (*validation.ValidationCluster, error) { +func (igErrorValidator *instanceGroupNodeSpecificErrorClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) { return &validation.ValidationCluster{ Failures: []*validation.ValidationError{ { @@ -130,7 +130,7 @@ type assertNotCalledClusterValidator struct { T *testing.T } -func (v *assertNotCalledClusterValidator) Validate() (*validation.ValidationCluster, error) { +func (v *assertNotCalledClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) { v.T.Fatal("validator called unexpectedly") return nil, errors.New("validator called unexpectedly") } @@ -425,8 +425,7 @@ type failAfterOneNodeClusterValidator struct { ReturnError bool } -func (v *failAfterOneNodeClusterValidator) Validate() (*validation.ValidationCluster, error) { - ctx := context.TODO() +func (v *failAfterOneNodeClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) { asgGroups, _ := v.Cloud.Autoscaling().DescribeAutoScalingGroups(ctx, &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: []string{v.Group}, }) @@ -648,8 +647,7 @@ type flappingClusterValidator struct { invocationCount int } -func (v *flappingClusterValidator) Validate() (*validation.ValidationCluster, error) { - ctx := context.TODO() +func (v *flappingClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) { asgGroups, _ := v.Cloud.Autoscaling().DescribeAutoScalingGroups(ctx, &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: []string{"master-1"}, }) @@ -706,7 +704,7 @@ type failThreeTimesClusterValidator struct { invocationCount int } -func (v *failThreeTimesClusterValidator) Validate() (*validation.ValidationCluster, error) { +func (v *failThreeTimesClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) { v.invocationCount++ if v.invocationCount <= 3 { return &validation.ValidationCluster{ @@ -1060,7 +1058,7 @@ type concurrentTest struct { detached map[string]bool } -func (c *concurrentTest) Validate() (*validation.ValidationCluster, error) { +func (c *concurrentTest) Validate(ctx context.Context) (*validation.ValidationCluster, error) { c.mutex.Lock() defer c.mutex.Unlock() @@ -1441,7 +1439,7 @@ type alreadyDetachedTest struct { detached map[string]bool } -func (t *alreadyDetachedTest) Validate() (*validation.ValidationCluster, error) { +func (t *alreadyDetachedTest) Validate(ctx context.Context) (*validation.ValidationCluster, error) { t.mutex.Lock() defer t.mutex.Unlock() diff --git a/pkg/instancegroups/rollingupdate_warmpool_test.go b/pkg/instancegroups/rollingupdate_warmpool_test.go index 927753122a9d3..17b4d767896d2 100644 --- a/pkg/instancegroups/rollingupdate_warmpool_test.go +++ b/pkg/instancegroups/rollingupdate_warmpool_test.go @@ -75,7 +75,7 @@ type countingValidator struct { numValidations int } -func (c *countingValidator) Validate() (*validation.ValidationCluster, error) { +func (c *countingValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) { c.numValidations++ return &validation.ValidationCluster{}, nil } diff --git a/pkg/validation/validate_cluster.go b/pkg/validation/validate_cluster.go index b21e93b308217..0194a1f5e52b1 100644 --- a/pkg/validation/validate_cluster.go +++ b/pkg/validation/validate_cluster.go @@ -56,15 +56,23 @@ type ValidationError struct { type ClusterValidator interface { // Validate validates a k8s cluster - Validate() (*ValidationCluster, error) + Validate(ctx context.Context) (*ValidationCluster, error) } type clusterValidatorImpl struct { - cluster *kops.Cluster - cloud fi.Cloud - instanceGroups []*kops.InstanceGroup - restConfig *rest.Config - k8sClient kubernetes.Interface + cluster *kops.Cluster + cloud fi.Cloud + restConfig *rest.Config + k8sClient kubernetes.Interface + + // allInstanceGroups is the list of all instance groups in the cluster + allInstanceGroups []*kops.InstanceGroup + + // filterInstanceGroups is a function that returns true if the instance group should be validated + filterInstanceGroups func(ig *kops.InstanceGroup) bool + + // filterPodsForValidation is a function that returns true if the pod should be validated + filterPodsForValidation func(pod *v1.Pod) bool } func (v *ValidationCluster) addError(failure *ValidationError) { @@ -101,30 +109,44 @@ func hasPlaceHolderIP(host string) (string, error) { return "", nil } -func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, restConfig *rest.Config, k8sClient kubernetes.Interface) (ClusterValidator, error) { - var instanceGroups []*kops.InstanceGroup +func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, filterInstanceGroups func(ig *kops.InstanceGroup) bool, filterPodsForValidation func(pod *v1.Pod) bool, restConfig *rest.Config, k8sClient kubernetes.Interface) (ClusterValidator, error) { + var allInstanceGroups []*kops.InstanceGroup for i := range instanceGroupList.Items { ig := &instanceGroupList.Items[i] - instanceGroups = append(instanceGroups, ig) + allInstanceGroups = append(allInstanceGroups, ig) } - if len(instanceGroups) == 0 { + if len(allInstanceGroups) == 0 { return nil, fmt.Errorf("no InstanceGroup objects found") } + // If no filter is provided, validate all instance groups + if filterInstanceGroups == nil { + filterInstanceGroups = func(ig *kops.InstanceGroup) bool { + return true + } + } + + // If no filter is provided, validate all pods + if filterPodsForValidation == nil { + filterPodsForValidation = func(pod *v1.Pod) bool { + return true + } + } + return &clusterValidatorImpl{ - cluster: cluster, - cloud: cloud, - instanceGroups: instanceGroups, - restConfig: restConfig, - k8sClient: k8sClient, + cluster: cluster, + cloud: cloud, + allInstanceGroups: allInstanceGroups, + restConfig: restConfig, + k8sClient: k8sClient, + filterInstanceGroups: filterInstanceGroups, + filterPodsForValidation: filterPodsForValidation, }, nil } -func (v *clusterValidatorImpl) Validate() (*ValidationCluster, error) { - ctx := context.TODO() - +func (v *clusterValidatorImpl) Validate(ctx context.Context) (*ValidationCluster, error) { validation := &ValidationCluster{} // Do not use if we are running gossip or without dns @@ -161,13 +183,14 @@ func (v *clusterValidatorImpl) Validate() (*ValidationCluster, error) { } warnUnmatched := false - cloudGroups, err := v.cloud.GetCloudGroups(v.cluster, v.instanceGroups, warnUnmatched, nodeList.Items) + cloudGroups, err := v.cloud.GetCloudGroups(v.cluster, v.allInstanceGroups, warnUnmatched, nodeList.Items) if err != nil { return nil, err } - readyNodes, nodeInstanceGroupMapping := validation.validateNodes(cloudGroups, v.instanceGroups) - if err := validation.collectPodFailures(ctx, v.k8sClient, readyNodes, nodeInstanceGroupMapping); err != nil { + readyNodes, nodeInstanceGroupMapping := validation.validateNodes(cloudGroups, v.allInstanceGroups, v.filterInstanceGroups) + + if err := validation.collectPodFailures(ctx, v.k8sClient, readyNodes, nodeInstanceGroupMapping, v.filterPodsForValidation); err != nil { return nil, fmt.Errorf("cannot get pod health for %q: %v", v.cluster.Name, err) } @@ -181,7 +204,7 @@ var masterStaticPods = []string{ } func (v *ValidationCluster) collectPodFailures(ctx context.Context, client kubernetes.Interface, nodes []v1.Node, - nodeInstanceGroupMapping map[string]*kops.InstanceGroup, + nodeInstanceGroupMapping map[string]*kops.InstanceGroup, podValidationFilter func(pod *v1.Pod) bool, ) error { masterWithoutPod := map[string]map[string]bool{} nodeByAddress := map[string]string{} @@ -210,10 +233,16 @@ func (v *ValidationCluster) collectPodFailures(ctx context.Context, client kuber delete(masterWithoutPod[nodeByAddress[pod.Status.HostIP]], app) } + // Ignore pods that we don't want to validate + if !podValidationFilter(pod) { + return nil + } + priority := pod.Spec.PriorityClassName if priority != "system-cluster-critical" && priority != "system-node-critical" { return nil } + if pod.Status.Phase == v1.PodSucceeded { return nil } @@ -275,12 +304,16 @@ func (v *ValidationCluster) collectPodFailures(ctx context.Context, client kuber return nil } -func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances.CloudInstanceGroup, groups []*kops.InstanceGroup) ([]v1.Node, map[string]*kops.InstanceGroup) { +func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances.CloudInstanceGroup, groups []*kops.InstanceGroup, shouldValidateInstanceGroup func(ig *kops.InstanceGroup) bool) ([]v1.Node, map[string]*kops.InstanceGroup) { var readyNodes []v1.Node groupsSeen := map[string]bool{} nodeInstanceGroupMapping := map[string]*kops.InstanceGroup{} for _, cloudGroup := range cloudGroups { + if cloudGroup.InstanceGroup != nil && !shouldValidateInstanceGroup(cloudGroup.InstanceGroup) { + continue + } + var allMembers []*cloudinstances.CloudInstance allMembers = append(allMembers, cloudGroup.Ready...) allMembers = append(allMembers, cloudGroup.NeedUpdate...) @@ -372,6 +405,10 @@ func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances } for _, ig := range groups { + if !shouldValidateInstanceGroup(ig) { + continue + } + if !groupsSeen[ig.Name] { v.addError(&ValidationError{ Kind: "InstanceGroup", diff --git a/pkg/validation/validate_cluster_test.go b/pkg/validation/validate_cluster_test.go index 821c692b55c6a..d747ef900a933 100644 --- a/pkg/validation/validate_cluster_test.go +++ b/pkg/validation/validate_cluster_test.go @@ -17,6 +17,7 @@ limitations under the License. package validation import ( + "context" "fmt" "testing" @@ -70,6 +71,8 @@ func (c *MockCloud) GetCloudGroups(cluster *kopsapi.Cluster, instancegroups []*k } func testValidate(t *testing.T, groups map[string]*cloudinstances.CloudInstanceGroup, objects []runtime.Object) (*ValidationCluster, error) { + ctx := context.TODO() + cluster := &kopsapi.Cluster{ ObjectMeta: metav1.ObjectMeta{Name: "testcluster.k8s.local"}, Spec: kopsapi.ClusterSpec{ @@ -130,14 +133,16 @@ func testValidate(t *testing.T, groups map[string]*cloudinstances.CloudInstanceG restConfig := &rest.Config{ Host: "https://api.testcluster.k8s.local", } - validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, restConfig, fake.NewSimpleClientset(objects...)) + validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, nil, nil, restConfig, fake.NewSimpleClientset(objects...)) if err != nil { return nil, err } - return validator.Validate() + return validator.Validate(ctx) } func Test_ValidateCloudGroupMissing(t *testing.T) { + ctx := context.TODO() + cluster := &kopsapi.Cluster{ ObjectMeta: metav1.ObjectMeta{Name: "testcluster.k8s.local"}, Spec: kopsapi.ClusterSpec{ @@ -163,9 +168,9 @@ func Test_ValidateCloudGroupMissing(t *testing.T) { restConfig := &rest.Config{ Host: "https://api.testcluster.k8s.local", } - validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, restConfig, fake.NewSimpleClientset()) + validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, nil, nil, restConfig, fake.NewSimpleClientset()) require.NoError(t, err) - v, err := validator.Validate() + v, err := validator.Validate(ctx) require.NoError(t, err) if !assert.Len(t, v.Failures, 1) || !assert.Equal(t, &ValidationError{