From 9db32d0d4008c8209761a5a67a416bc6e8b4b28e Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 15 Oct 2025 16:36:19 -0500 Subject: [PATCH 01/39] Upgrade masters last when upgrading ES clusters Signed-off-by: Michael Montgomery --- .../elasticsearch/driver/upgrade.go | 6 + .../elasticsearch/driver/upscale.go | 148 ++++++++++++-- .../elasticsearch/driver/upscale_test.go | 181 ++++++++++++++++++ 3 files changed, 315 insertions(+), 20 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upgrade.go b/pkg/controller/elasticsearch/driver/upgrade.go index 5790f61fd23..6141419a209 100644 --- a/pkg/controller/elasticsearch/driver/upgrade.go +++ b/pkg/controller/elasticsearch/driver/upgrade.go @@ -207,6 +207,12 @@ func isVersionUpgrade(es esv1.Elasticsearch) (bool, error) { if err != nil { return false, err } + + // If status version is empty, this is a new cluster, not an upgrade + if es.Status.Version == "" { + return false, nil + } + statusVersion, err := version.Parse(es.Status.Version) if err != nil { return false, err diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 763f3a5ac6b..722e10c22d4 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/expectations" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/metadata" sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/reconcile" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/settings" @@ -66,33 +67,66 @@ func HandleUpscaleAndSpecChanges( if err != nil { return results, fmt.Errorf("adjust resources: %w", err) } - // reconcile all resources + + // Check if this is a version upgrade + isVersionUpgrade, err := isVersionUpgrade(ctx.es) + if err != nil { + return results, fmt.Errorf("while checking for version upgrade: %w", err) + } + + // If this is not a version upgrade, process all resources normally and return + if !isVersionUpgrade { + actualStatefulSets, requeue, err := reconcileResources(ctx, actualStatefulSets, adjusted) + if err != nil { + return results, fmt.Errorf("while reconciling resources: %w", err) + } + results.Requeue = requeue + results.ActualStatefulSets = actualStatefulSets + return results, nil + } + + // Version upgrade: separate master and non-master StatefulSets + var masterResources, nonMasterResources []nodespec.Resources for _, res := range adjusted { - res := res - if err := settings.ReconcileConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config, ctx.meta); err != nil { - return results, fmt.Errorf("reconcile config: %w", err) + if label.IsMasterNodeSet(res.StatefulSet) { + masterResources = append(masterResources, res) + } else { + nonMasterResources = append(nonMasterResources, res) } - if _, err := common.ReconcileService(ctx.parentCtx, ctx.k8sClient, &res.HeadlessService, &ctx.es); err != nil { - return results, fmt.Errorf("reconcile service: %w", err) + } + + // First, reconcile all non-master resources + actualStatefulSets, requeue, err := reconcileResources(ctx, actualStatefulSets, nonMasterResources) + if err != nil { + return results, fmt.Errorf("while reconciling non-master resources: %w", err) + } + if requeue { + results.Requeue = true + results.ActualStatefulSets = actualStatefulSets + return results, nil + } + + // Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets + if len(masterResources) > 0 { + allNonMastersUpgraded, err := areAllNonMasterStatefulSetsUpgraded(ctx.k8sClient, actualStatefulSets) + if err != nil { + return results, fmt.Errorf("while checking non-master upgrade status: %w", err) } - if actualSset, exists := actualStatefulSets.GetByName(res.StatefulSet.Name); exists { - recreateSset, err := handleVolumeExpansion(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, actualSset, ctx.validateStorageClass) - if err != nil { - return results, fmt.Errorf("handle volume expansion: %w", err) - } - if recreateSset { - // The StatefulSet is scheduled for recreation: let's requeue before attempting any further spec change. - results.Requeue = true - continue - } + + if !allNonMastersUpgraded { + // Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily. + // This will cause a requeue, and master StatefulSets will attempt to be processed in the next reconciliation + results.ActualStatefulSets = actualStatefulSets + return results, nil } - reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, ctx.expectations) + + // All non-master StatefulSets are upgraded, now process master StatefulSets + actualStatefulSets, results.Requeue, err = reconcileResources(ctx, actualStatefulSets, masterResources) if err != nil { - return results, fmt.Errorf("reconcile StatefulSet: %w", err) + return results, fmt.Errorf("while reconciling master resources: %w", err) } - // update actual with the reconciled ones for next steps to work with up-to-date information - actualStatefulSets = actualStatefulSets.WithStatefulSet(reconciled) } + results.ActualStatefulSets = actualStatefulSets return results, nil } @@ -166,3 +200,77 @@ func adjustStatefulSetReplicas( return expected, nil } + +// reconcileResources handles the common StatefulSet reconciliation logic +// It returns: +// - the updated StatefulSets +// - whether a requeue is needed +// - any errors that occurred +func reconcileResources( + ctx upscaleCtx, + actualStatefulSets es_sset.StatefulSetList, + resources []nodespec.Resources, +) (es_sset.StatefulSetList, bool, error) { + requeue := false + for _, res := range resources { + res := res + if err := settings.ReconcileConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config, ctx.meta); err != nil { + return actualStatefulSets, false, fmt.Errorf("reconcile config: %w", err) + } + if _, err := common.ReconcileService(ctx.parentCtx, ctx.k8sClient, &res.HeadlessService, &ctx.es); err != nil { + return actualStatefulSets, false, fmt.Errorf("reconcile service: %w", err) + } + if actualSset, exists := actualStatefulSets.GetByName(res.StatefulSet.Name); exists { + recreateSset, err := handleVolumeExpansion(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, actualSset, ctx.validateStorageClass) + if err != nil { + return actualStatefulSets, false, fmt.Errorf("handle volume expansion: %w", err) + } + if recreateSset { + // The StatefulSet is scheduled for recreation: let's requeue before attempting any further spec change. + requeue = true + continue + } + } + reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, ctx.expectations) + if err != nil { + return actualStatefulSets, false, fmt.Errorf("reconcile StatefulSet: %w", err) + } + // update actual with the reconciled ones for next steps to work with up-to-date information + actualStatefulSets = actualStatefulSets.WithStatefulSet(reconciled) + } + return actualStatefulSets, requeue, nil +} + +// areAllNonMasterStatefulSetsUpgraded checks if all non-master StatefulSets have completed their upgrades +func areAllNonMasterStatefulSetsUpgraded( + client k8s.Client, + actualStatefulSets es_sset.StatefulSetList, +) (bool, error) { + for _, statefulSet := range actualStatefulSets { + // Skip master StatefulSets + if label.IsMasterNodeSet(statefulSet) { + continue + } + + // Check if this StatefulSet has pending updates + if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas { + return false, nil + } + + // Check if there are any pods that need to be upgraded + pods, err := es_sset.GetActualPodsForStatefulSet(client, k8s.ExtractNamespacedName(&statefulSet)) + if err != nil { + return false, err + } + + for _, pod := range pods { + // Check if pod revision matches StatefulSet update revision + if statefulSet.Status.UpdateRevision != "" && sset.PodRevision(pod) != statefulSet.Status.UpdateRevision { + // This pod still needs to be upgraded + return false, nil + } + } + } + + return true, nil +} diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index 7120fea7ea4..29cc3ccfeaa 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -569,3 +569,184 @@ func Test_adjustResources(t *testing.T) { }) } } + +func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) { + // Test the complete upgrade flow: data nodes upgrade first, then master nodes + es := esv1.Elasticsearch{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "es"}, + Spec: esv1.ElasticsearchSpec{Version: "8.17.1"}, + Status: esv1.ElasticsearchStatus{Version: "8.16.2"}, // This makes it a version upgrade + } + k8sClient := k8s.NewFakeClient(&es) + ctx := upscaleCtx{ + k8sClient: k8sClient, + es: es, + esState: nil, + expectations: expectations.NewExpectations(k8sClient), + parentCtx: context.Background(), + } + + // Create expected resources with both master and data StatefulSets + expectedResources := nodespec.ResourcesList{ + { + StatefulSet: appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "master-sset", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To[int32](3), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "elasticsearch", + Image: "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", + }, + }, + }, + }, + }, + }, + HeadlessService: corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "master-sset", + }, + }, + Config: settings.CanonicalConfig{}, + }, + { + StatefulSet: appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "data-sset", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-data": "true", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To[int32](2), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-data": "true", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "elasticsearch", + Image: "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", + }, + }, + }, + }, + }, + }, + HeadlessService: corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "data-sset", + }, + }, + Config: settings.CanonicalConfig{}, + }, + } + + // Set template hash labels + expectedResources[0].StatefulSet.Labels = hash.SetTemplateHashLabel(expectedResources[0].StatefulSet.Labels, expectedResources[0].StatefulSet.Spec) + expectedResources[1].StatefulSet.Labels = hash.SetTemplateHashLabel(expectedResources[1].StatefulSet.Labels, expectedResources[1].StatefulSet.Spec) + + // Step 1: Initial call - both StatefulSets should be created + actualStatefulSets := es_sset.StatefulSetList{} + res, err := HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) + require.NoError(t, err) + require.Len(t, res.ActualStatefulSets, 2) + + // Verify both StatefulSets were created + var masterSset appsv1.StatefulSet + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) + require.Equal(t, ptr.To[int32](3), masterSset.Spec.Replicas) + require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", masterSset.Spec.Template.Spec.Containers[0].Image) + + var dataSset appsv1.StatefulSet + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) + require.Equal(t, ptr.To[int32](2), dataSset.Spec.Replicas) + require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", dataSset.Spec.Template.Spec.Containers[0].Image) + + // Step 2: Simulate that data StatefulSet is still upgrading (UpdatedReplicas < Replicas) + dataSset.Status.UpdatedReplicas = 1 // Still upgrading + require.NoError(t, k8sClient.Update(context.Background(), &dataSset)) + + // Step 3: Call HandleUpscaleAndSpecChanges again - master should NOT be updated yet + // Get fresh copy of ES resource to avoid modification conflicts + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "es"}, &es)) + ctx.es = es + actualStatefulSets = res.ActualStatefulSets + res, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) + require.NoError(t, err) + + // Verify master StatefulSet is still not updated (should have old image) + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) + // The master StatefulSet should still have the old image since data nodes are still upgrading + require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", masterSset.Spec.Template.Spec.Containers[0].Image) + + // Step 4: Simulate that data StatefulSet upgrade is complete + dataSset.Status.UpdatedReplicas = 2 // Upgrade complete + dataSset.Status.UpdateRevision = "data-sset-12345" // Set update revision + require.NoError(t, k8sClient.Update(context.Background(), &dataSset)) + + // Create pods for the data StatefulSet with the new revision + dataPods := []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "data-sset-0", + Namespace: "ns", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-data": "true", + "controller-revision-hash": "data-sset-12345", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "data-sset-1", + Namespace: "ns", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-data": "true", + "controller-revision-hash": "data-sset-12345", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + } + for _, pod := range dataPods { + require.NoError(t, k8sClient.Create(context.Background(), &pod)) + } + + // Step 5: Call HandleUpscaleAndSpecChanges again - now master should be updated + // Get fresh copy of ES resource to avoid modification conflicts + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "es"}, &es)) + ctx.es = es + actualStatefulSets = res.ActualStatefulSets + res, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) + require.NoError(t, err) + + // Verify master StatefulSet is now updated + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) + require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", masterSset.Spec.Template.Spec.Containers[0].Image) +} From 39b27027baade93a6a1a6773768c636441c0fb87 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Thu, 23 Oct 2025 14:03:37 -0500 Subject: [PATCH 02/39] Fix lint issue Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index 29cc3ccfeaa..9bb0723105f 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -743,7 +743,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "es"}, &es)) ctx.es = es actualStatefulSets = res.ActualStatefulSets - res, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) + _, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) require.NoError(t, err) // Verify master StatefulSet is now updated From 50b395450c54f0c82293af048cac31f24ec091b9 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Tue, 28 Oct 2025 14:49:03 -0500 Subject: [PATCH 03/39] Add e2e test for upgrade order. Signed-off-by: Michael Montgomery --- test/e2e/es/non_master_first_upgrade_test.go | 99 ++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 test/e2e/es/non_master_first_upgrade_test.go diff --git a/test/e2e/es/non_master_first_upgrade_test.go b/test/e2e/es/non_master_first_upgrade_test.go new file mode 100644 index 00000000000..fd19ccd9caa --- /dev/null +++ b/test/e2e/es/non_master_first_upgrade_test.go @@ -0,0 +1,99 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +//go:build es || e2e + +package es + +import ( + "fmt" + "testing" + "time" + + esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" + essset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset" + "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" + "github.com/elastic/cloud-on-k8s/v3/test/e2e/test" + "github.com/elastic/cloud-on-k8s/v3/test/e2e/test/elasticsearch" +) + +// NewNonMasterFirstUpgradeWatcher creates a watcher that monitors StatefulSet upgrade order +// and ensures non-master StatefulSets upgrade before master StatefulSets +func NewNonMasterFirstUpgradeWatcher(es esv1.Elasticsearch) test.Watcher { + var violations []string + + return test.NewWatcher( + "watch StatefulSet upgrade order: non-master StatefulSets should upgrade before master StatefulSets", + 2*time.Second, + func(k *test.K8sClient, t *testing.T) { + statefulSets, err := essset.RetrieveActualStatefulSets(k.Client, k8s.ExtractNamespacedName(&es)) + if err != nil { + t.Logf("failed to get StatefulSets: %v", err) + return + } + + // Check if any master StatefulSet has it's version higher than any non-master StatefulSet + // which indicates that the master StatefulSet is upgrading before the non-master StatefulSets + for _, sset := range statefulSets { + masterSTSVersion, err := essset.GetESVersion(sset) + if err != nil { + t.Logf("failed to get StatefulSet version: %v", err) + continue + } + if !label.IsMasterNodeSet(sset) { + continue + } + // Ensure that the master StatefulSet never has a version higher than any non-master StatefulSet. + for _, otherSset := range statefulSets { + otherSsetVersion, err := essset.GetESVersion(otherSset) + if err != nil { + t.Logf("failed to get StatefulSet version: %v", err) + continue + } + if masterSTSVersion.GT(otherSsetVersion) { + violations = append(violations, fmt.Sprintf("master StatefulSet %s has a version higher than non-master StatefulSet %s", sset.Name, otherSset.Name)) + } + } + } + }, + func(k *test.K8sClient, t *testing.T) { + if len(violations) > 0 { + t.Errorf("%d non-master first upgrade order violations detected", len(violations)) + } + }) +} + +// runNonMasterFirstUpgradeTest runs the complete test for non-master first upgrade behavior +func runNonMasterFirstUpgradeTest(t *testing.T, initial, mutated elasticsearch.Builder) { + watcher := NewNonMasterFirstUpgradeWatcher(initial.Elasticsearch) + + test.RunMutationsWhileWatching( + t, + []test.Builder{initial}, + []test.Builder{mutated}, + []test.Watcher{watcher}, + ) +} + +// TestNonMasterFirstUpgradeComplexTopology tests the non-master first upgrade behavior with a complex topology +func TestNonMasterFirstUpgradeComplexTopology(t *testing.T) { + srcVersion, dstVersion := test.GetUpgradePathTo8x(test.Ctx().ElasticStackVersion) + + test.SkipInvalidUpgrade(t, srcVersion, dstVersion) + + initial := elasticsearch.NewBuilder("test-non-master-first-complex"). + WithVersion(srcVersion). + WithESMasterNodes(3, elasticsearch.DefaultResources). + WithESDataNodes(2, elasticsearch.DefaultResources). + WithESCoordinatingNodes(1, elasticsearch.DefaultResources) + + mutated := initial.WithNoESTopology(). + WithVersion(dstVersion). + WithESMasterNodes(3, elasticsearch.DefaultResources). + WithESDataNodes(2, elasticsearch.DefaultResources). + WithESCoordinatingNodes(1, elasticsearch.DefaultResources) + + runNonMasterFirstUpgradeTest(t, initial, mutated) +} From 00555c21004e36c4d528bf9d94d6a0f296c523a7 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Tue, 28 Oct 2025 21:00:21 -0500 Subject: [PATCH 04/39] unexport things in e2e tests Signed-off-by: Michael Montgomery --- test/e2e/es/non_master_first_upgrade_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/e2e/es/non_master_first_upgrade_test.go b/test/e2e/es/non_master_first_upgrade_test.go index fd19ccd9caa..46da1bcad25 100644 --- a/test/e2e/es/non_master_first_upgrade_test.go +++ b/test/e2e/es/non_master_first_upgrade_test.go @@ -19,9 +19,9 @@ import ( "github.com/elastic/cloud-on-k8s/v3/test/e2e/test/elasticsearch" ) -// NewNonMasterFirstUpgradeWatcher creates a watcher that monitors StatefulSet upgrade order +// newNonMasterFirstUpgradeWatcher creates a watcher that monitors StatefulSet upgrade order // and ensures non-master StatefulSets upgrade before master StatefulSets -func NewNonMasterFirstUpgradeWatcher(es esv1.Elasticsearch) test.Watcher { +func newNonMasterFirstUpgradeWatcher(es esv1.Elasticsearch) test.Watcher { var violations []string return test.NewWatcher( @@ -67,7 +67,7 @@ func NewNonMasterFirstUpgradeWatcher(es esv1.Elasticsearch) test.Watcher { // runNonMasterFirstUpgradeTest runs the complete test for non-master first upgrade behavior func runNonMasterFirstUpgradeTest(t *testing.T, initial, mutated elasticsearch.Builder) { - watcher := NewNonMasterFirstUpgradeWatcher(initial.Elasticsearch) + watcher := newNonMasterFirstUpgradeWatcher(initial.Elasticsearch) test.RunMutationsWhileWatching( t, From 88cb347c03c786d94f6c58ba3deb2faa6ad0f59e Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 29 Oct 2025 09:47:54 -0500 Subject: [PATCH 05/39] Also look at the current/target version while determining whether sts is updated. Signed-off-by: Michael Montgomery --- .../elasticsearch/driver/upscale.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 722e10c22d4..4c6e7fdadca 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/expectations" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/metadata" sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/reconcile" @@ -85,6 +86,11 @@ func HandleUpscaleAndSpecChanges( return results, nil } + targetVersion, err := version.Parse(ctx.es.Spec.Version) + if err != nil { + return results, fmt.Errorf("while parsing Elasticsearch upgrade target version: %w", err) + } + // Version upgrade: separate master and non-master StatefulSets var masterResources, nonMasterResources []nodespec.Resources for _, res := range adjusted { @@ -108,7 +114,7 @@ func HandleUpscaleAndSpecChanges( // Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets if len(masterResources) > 0 { - allNonMastersUpgraded, err := areAllNonMasterStatefulSetsUpgraded(ctx.k8sClient, actualStatefulSets) + allNonMastersUpgraded, err := areAllNonMasterStatefulSetsUpgraded(ctx.k8sClient, actualStatefulSets, targetVersion) if err != nil { return results, fmt.Errorf("while checking non-master upgrade status: %w", err) } @@ -245,6 +251,7 @@ func reconcileResources( func areAllNonMasterStatefulSetsUpgraded( client k8s.Client, actualStatefulSets es_sset.StatefulSetList, + targetVersion version.Version, ) (bool, error) { for _, statefulSet := range actualStatefulSets { // Skip master StatefulSets @@ -252,6 +259,16 @@ func areAllNonMasterStatefulSetsUpgraded( continue } + // If the StatefulSet is not at the target version, it is not upgraded + // so don't even bother looking at the state/status of the StatefulSet. + actualVersion, err := es_sset.GetESVersion(statefulSet) + if err != nil { + return false, err + } + if actualVersion.LT(targetVersion) { + return false, nil + } + // Check if this StatefulSet has pending updates if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas { return false, nil From 790d3f19f4a50cecc9645c7ea71710c2903ac98d Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 29 Oct 2025 13:46:53 -0500 Subject: [PATCH 06/39] Fix tests Signed-off-by: Michael Montgomery --- .../elasticsearch/driver/upscale_test.go | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index 9bb0723105f..94f32f84c4a 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -197,7 +197,6 @@ func TestHandleUpscaleAndSpecChanges(t *testing.T) { comparison.RequireEqual(t, &res.ActualStatefulSets[1], &sset2) // expectations should have been set require.NotEmpty(t, ctx.expectations.GetGenerations()) - // apply a spec change actualStatefulSets = es_sset.StatefulSetList{sset1, sset2} expectedResources[1].StatefulSet.Spec.Template.Labels = map[string]string{"a": "b"} @@ -573,9 +572,16 @@ func Test_adjustResources(t *testing.T) { func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) { // Test the complete upgrade flow: data nodes upgrade first, then master nodes es := esv1.Elasticsearch{ - ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "es"}, - Spec: esv1.ElasticsearchSpec{Version: "8.17.1"}, - Status: esv1.ElasticsearchStatus{Version: "8.16.2"}, // This makes it a version upgrade + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "es", + Annotations: map[string]string{ + "elasticsearch.k8s.elastic.co/initial-master-nodes": "node-1,node-2,node-3", + bootstrap.ClusterUUIDAnnotationName: "uuid", + }, + }, + Spec: esv1.ElasticsearchSpec{Version: "8.17.1"}, + Status: esv1.ElasticsearchStatus{Version: "8.16.2"}, // This makes it a version upgrade } k8sClient := k8s.NewFakeClient(&es) ctx := upscaleCtx{ @@ -595,6 +601,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) Name: "master-sset", Labels: map[string]string{ "elasticsearch.k8s.elastic.co/node-master": "true", + "elasticsearch.k8s.elastic.co/version": "8.17.1", }, }, Spec: appsv1.StatefulSetSpec{ @@ -603,6 +610,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "elasticsearch.k8s.elastic.co/node-master": "true", + "elasticsearch.k8s.elastic.co/version": "8.17.1", }, }, Spec: corev1.PodSpec{ @@ -622,7 +630,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) Name: "master-sset", }, }, - Config: settings.CanonicalConfig{}, + Config: settings.NewCanonicalConfig(), }, { StatefulSet: appsv1.StatefulSet{ @@ -631,6 +639,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) Name: "data-sset", Labels: map[string]string{ "elasticsearch.k8s.elastic.co/node-data": "true", + "elasticsearch.k8s.elastic.co/version": "8.17.1", }, }, Spec: appsv1.StatefulSetSpec{ @@ -639,6 +648,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "elasticsearch.k8s.elastic.co/node-data": "true", + "elasticsearch.k8s.elastic.co/version": "8.17.1", }, }, Spec: corev1.PodSpec{ @@ -658,7 +668,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) Name: "data-sset", }, }, - Config: settings.CanonicalConfig{}, + Config: settings.NewCanonicalConfig(), }, } @@ -675,12 +685,15 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) // Verify both StatefulSets were created var masterSset appsv1.StatefulSet require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) - require.Equal(t, ptr.To[int32](3), masterSset.Spec.Replicas) + require.NotNil(t, masterSset.Spec.Replicas) + // Master nodes/pods are limited to 1 creation at a time regardless of the replicas setting. + require.Equal(t, int32(1), *masterSset.Spec.Replicas) require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", masterSset.Spec.Template.Spec.Containers[0].Image) var dataSset appsv1.StatefulSet require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) - require.Equal(t, ptr.To[int32](2), dataSset.Spec.Replicas) + require.NotNil(t, dataSset.Spec.Replicas) + require.Equal(t, int32(2), *dataSset.Spec.Replicas) require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", dataSset.Spec.Template.Spec.Containers[0].Image) // Step 2: Simulate that data StatefulSet is still upgrading (UpdatedReplicas < Replicas) From d9885ba73b95b7772b44af207e2aa5221aa81887 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 29 Oct 2025 15:22:39 -0500 Subject: [PATCH 07/39] Fix the unit tests for master last upgrades Signed-off-by: Michael Montgomery --- .../elasticsearch/driver/upscale_test.go | 265 +++++++++++++++--- 1 file changed, 222 insertions(+), 43 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index 94f32f84c4a..88e89e77d12 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -265,7 +265,8 @@ func TestHandleUpscaleAndSpecChanges_PVCResize(t *testing.T) { Spec: appsv1.StatefulSetSpec{ Replicas: ptr.To[int32](4), VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ - {ObjectMeta: metav1.ObjectMeta{Name: "elasticsearch-data"}, + { + ObjectMeta: metav1.ObjectMeta{Name: "elasticsearch-data"}, Spec: corev1.PersistentVolumeClaimSpec{ Resources: corev1.VolumeResourceRequirements{ Requests: corev1.ResourceList{ @@ -569,7 +570,7 @@ func Test_adjustResources(t *testing.T) { } } -func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) { +func TestHandleUpscaleAndSpecChanges_VersionUpgradeDataFirstFlow(t *testing.T) { // Test the complete upgrade flow: data nodes upgrade first, then master nodes es := esv1.Elasticsearch{ ObjectMeta: metav1.ObjectMeta{ @@ -580,8 +581,8 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) bootstrap.ClusterUUIDAnnotationName: "uuid", }, }, - Spec: esv1.ElasticsearchSpec{Version: "8.17.1"}, - Status: esv1.ElasticsearchStatus{Version: "8.16.2"}, // This makes it a version upgrade + Spec: esv1.ElasticsearchSpec{Version: "8.16.2"}, // Start at 8.16.2 + Status: esv1.ElasticsearchStatus{Version: "8.16.2"}, // Start at 8.16.2 } k8sClient := k8s.NewFakeClient(&es) ctx := upscaleCtx{ @@ -592,7 +593,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) parentCtx: context.Background(), } - // Create expected resources with both master and data StatefulSets + // Create expected resources with both master and data StatefulSets at 8.16.2 expectedResources := nodespec.ResourcesList{ { StatefulSet: appsv1.StatefulSet{ @@ -601,7 +602,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) Name: "master-sset", Labels: map[string]string{ "elasticsearch.k8s.elastic.co/node-master": "true", - "elasticsearch.k8s.elastic.co/version": "8.17.1", + "elasticsearch.k8s.elastic.co/version": "8.16.2", }, }, Spec: appsv1.StatefulSetSpec{ @@ -610,14 +611,14 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "elasticsearch.k8s.elastic.co/node-master": "true", - "elasticsearch.k8s.elastic.co/version": "8.17.1", + "elasticsearch.k8s.elastic.co/version": "8.16.2", }, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: "elasticsearch", - Image: "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", + Image: "docker.elastic.co/elasticsearch/elasticsearch:8.16.2", }, }, }, @@ -639,7 +640,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) Name: "data-sset", Labels: map[string]string{ "elasticsearch.k8s.elastic.co/node-data": "true", - "elasticsearch.k8s.elastic.co/version": "8.17.1", + "elasticsearch.k8s.elastic.co/version": "8.16.2", }, }, Spec: appsv1.StatefulSetSpec{ @@ -648,14 +649,14 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "elasticsearch.k8s.elastic.co/node-data": "true", - "elasticsearch.k8s.elastic.co/version": "8.17.1", + "elasticsearch.k8s.elastic.co/version": "8.16.2", }, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: "elasticsearch", - Image: "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", + Image: "docker.elastic.co/elasticsearch/elasticsearch:8.16.2", }, }, }, @@ -676,49 +677,88 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) expectedResources[0].StatefulSet.Labels = hash.SetTemplateHashLabel(expectedResources[0].StatefulSet.Labels, expectedResources[0].StatefulSet.Spec) expectedResources[1].StatefulSet.Labels = hash.SetTemplateHashLabel(expectedResources[1].StatefulSet.Labels, expectedResources[1].StatefulSet.Spec) - // Step 1: Initial call - both StatefulSets should be created + // Call HandleUpscaleAndSpecChanges and check things are created properly actualStatefulSets := es_sset.StatefulSetList{} res, err := HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) require.NoError(t, err) require.Len(t, res.ActualStatefulSets, 2) - // Verify both StatefulSets were created + // Verify both StatefulSets were created at 8.16.2 var masterSset appsv1.StatefulSet require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) require.NotNil(t, masterSset.Spec.Replicas) // Master nodes/pods are limited to 1 creation at a time regardless of the replicas setting. require.Equal(t, int32(1), *masterSset.Spec.Replicas) - require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", masterSset.Spec.Template.Spec.Containers[0].Image) + require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.16.2", masterSset.Spec.Template.Spec.Containers[0].Image) + + // Set master StatefulSet status to show it's fully deployed at 8.16.2 + // Also update the replicas to 3 to simulate full rollout at 8.16.2 + masterSset.Spec.Replicas = ptr.To[int32](3) + masterSset.Status.Replicas = 3 + masterSset.Status.UpdatedReplicas = 3 + masterSset.Status.CurrentRevision = "master-sset-old" + masterSset.Status.UpdateRevision = "master-sset-old" + require.NoError(t, k8sClient.Status().Update(context.Background(), &masterSset)) var dataSset appsv1.StatefulSet require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) require.NotNil(t, dataSset.Spec.Replicas) require.Equal(t, int32(2), *dataSset.Spec.Replicas) - require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", dataSset.Spec.Template.Spec.Containers[0].Image) + require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.16.2", dataSset.Spec.Template.Spec.Containers[0].Image) - // Step 2: Simulate that data StatefulSet is still upgrading (UpdatedReplicas < Replicas) - dataSset.Status.UpdatedReplicas = 1 // Still upgrading - require.NoError(t, k8sClient.Update(context.Background(), &dataSset)) + // Set data StatefulSet status to show it's fully deployed at 8.16.2 + dataSset.Status.Replicas = 2 + dataSset.Status.UpdatedReplicas = 2 + dataSset.Status.CurrentRevision = "data-sset-old" + dataSset.Status.UpdateRevision = "data-sset-old" + require.NoError(t, k8sClient.Status().Update(context.Background(), &dataSset)) - // Step 3: Call HandleUpscaleAndSpecChanges again - master should NOT be updated yet - // Get fresh copy of ES resource to avoid modification conflicts - require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "es"}, &es)) - ctx.es = es - actualStatefulSets = res.ActualStatefulSets - res, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) - require.NoError(t, err) - - // Verify master StatefulSet is still not updated (should have old image) - require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) - // The master StatefulSet should still have the old image since data nodes are still upgrading - require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", masterSset.Spec.Template.Spec.Containers[0].Image) - - // Step 4: Simulate that data StatefulSet upgrade is complete - dataSset.Status.UpdatedReplicas = 2 // Upgrade complete - dataSset.Status.UpdateRevision = "data-sset-12345" // Set update revision - require.NoError(t, k8sClient.Update(context.Background(), &dataSset)) + // Create pods for both StatefulSets with the old revision + masterPods := []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "master-sset-0", + Namespace: "ns", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + "controller-revision-hash": "master-sset-old", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "master-sset-1", + Namespace: "ns", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + "controller-revision-hash": "master-sset-old", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "master-sset-2", + Namespace: "ns", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + "controller-revision-hash": "master-sset-old", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + } + for _, pod := range masterPods { + require.NoError(t, k8sClient.Create(context.Background(), &pod)) + } - // Create pods for the data StatefulSet with the new revision dataPods := []corev1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -726,7 +766,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) Namespace: "ns", Labels: map[string]string{ "elasticsearch.k8s.elastic.co/node-data": "true", - "controller-revision-hash": "data-sset-12345", + "controller-revision-hash": "data-sset-old", }, }, Status: corev1.PodStatus{ @@ -739,7 +779,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) Namespace: "ns", Labels: map[string]string{ "elasticsearch.k8s.elastic.co/node-data": "true", - "controller-revision-hash": "data-sset-12345", + "controller-revision-hash": "data-sset-old", }, }, Status: corev1.PodStatus{ @@ -751,15 +791,154 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeMasterFirstFlow(t *testing.T) require.NoError(t, k8sClient.Create(context.Background(), &pod)) } - // Step 5: Call HandleUpscaleAndSpecChanges again - now master should be updated - // Get fresh copy of ES resource to avoid modification conflicts - require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "es"}, &es)) + // Update the ES object to 8.17.1 in k8s + es.Spec.Version = "8.17.1" + require.NoError(t, k8sClient.Update(context.Background(), &es)) ctx.es = es + + // Update actualStatefulSets to reflect the current state with status + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) + actualStatefulSets = es_sset.StatefulSetList{masterSset, dataSset} + + // Update expected resources to 8.17.1 for the upgrade + expectedResourcesUpgrade := nodespec.ResourcesList{ + { + StatefulSet: appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "master-sset", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + "elasticsearch.k8s.elastic.co/version": "8.17.1", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To[int32](3), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + "elasticsearch.k8s.elastic.co/version": "8.17.1", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "elasticsearch", + Image: "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", + }, + }, + }, + }, + }, + }, + HeadlessService: corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "master-sset", + }, + }, + Config: settings.NewCanonicalConfig(), + }, + { + StatefulSet: appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "data-sset", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-data": "true", + "elasticsearch.k8s.elastic.co/version": "8.17.1", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To[int32](2), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-data": "true", + "elasticsearch.k8s.elastic.co/version": "8.17.1", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "elasticsearch", + Image: "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", + }, + }, + }, + }, + }, + }, + HeadlessService: corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "data-sset", + }, + }, + Config: settings.NewCanonicalConfig(), + }, + } + + // Set template hash labels for upgrade resources + expectedResourcesUpgrade[0].StatefulSet.Labels = hash.SetTemplateHashLabel(expectedResourcesUpgrade[0].StatefulSet.Labels, expectedResourcesUpgrade[0].StatefulSet.Spec) + expectedResourcesUpgrade[1].StatefulSet.Labels = hash.SetTemplateHashLabel(expectedResourcesUpgrade[1].StatefulSet.Labels, expectedResourcesUpgrade[1].StatefulSet.Spec) + + // Manually set the data StatefulSet status to show it's NOT fully upgraded + // This simulates the state after the StatefulSet controller has updated the spec but before the pods are updated + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) + dataSset.Status.UpdatedReplicas = 0 // No replicas updated yet + dataSset.Status.Replicas = 2 // Total replicas + dataSset.Status.UpdateRevision = "data-sset-12345" // New revision (different from old) + require.NoError(t, k8sClient.Status().Update(context.Background(), &dataSset)) + + // Call HandleUpscaleAndSpecChanges and verify that both data upgrade has begun and master STS is not updated + res, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) + require.NoError(t, err) + + // Update actualStatefulSets to reflect the current state + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) + actualStatefulSets = es_sset.StatefulSetList{masterSset, dataSset} + + // Call HandleUpscaleAndSpecChanges - data STS should be updated, but master should NOT + res, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) + require.NoError(t, err) + + // Verify data StatefulSet is updated to 8.17.1 + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) + require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", dataSset.Spec.Template.Spec.Containers[0].Image) + + // Verify master StatefulSet version hasn't changed yet (should still be 8.16.2) + // This is the key test - master should NOT be updated until data is fully upgraded + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) + require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.16.2", masterSset.Spec.Template.Spec.Containers[0].Image) + + // Update data STS and associated pods to show they are completely upgraded + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) + dataSset.Status.UpdatedReplicas = 2 // All replicas updated + dataSset.Status.Replicas = 2 // Total replicas + dataSset.Status.UpdateRevision = "data-sset-12345" // Set update revision + require.NoError(t, k8sClient.Status().Update(context.Background(), &dataSset)) + + // Update the existing data pods to have the new revision + var pod0 corev1.Pod + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset-0"}, &pod0)) + pod0.Labels["controller-revision-hash"] = "data-sset-12345" + require.NoError(t, k8sClient.Update(context.Background(), &pod0)) + + var pod1 corev1.Pod + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset-1"}, &pod1)) + pod1.Labels["controller-revision-hash"] = "data-sset-12345" + require.NoError(t, k8sClient.Update(context.Background(), &pod1)) + + // Call HandleUpscaleAndSpecChanges and verify that master STS is now set to be upgraded actualStatefulSets = res.ActualStatefulSets - _, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) + _, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) require.NoError(t, err) - // Verify master StatefulSet is now updated + // Verify master StatefulSet is now updated to 8.17.1 require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", masterSset.Spec.Template.Spec.Containers[0].Image) } From efa8643e7836d3a5a20d63cbc6319c123627fefc Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 29 Oct 2025 18:03:24 -0500 Subject: [PATCH 08/39] fix linter Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index 88e89e77d12..76e2e8ecce8 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -894,7 +894,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeDataFirstFlow(t *testing.T) { require.NoError(t, k8sClient.Status().Update(context.Background(), &dataSset)) // Call HandleUpscaleAndSpecChanges and verify that both data upgrade has begun and master STS is not updated - res, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) + _, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) require.NoError(t, err) // Update actualStatefulSets to reflect the current state From 69147089ec9dc2e5c5b7cd3016f24906ac72e57e Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 29 Oct 2025 18:05:36 -0500 Subject: [PATCH 09/39] move closer to use. Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 4c6e7fdadca..5ac3c2bc6a8 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -86,11 +86,6 @@ func HandleUpscaleAndSpecChanges( return results, nil } - targetVersion, err := version.Parse(ctx.es.Spec.Version) - if err != nil { - return results, fmt.Errorf("while parsing Elasticsearch upgrade target version: %w", err) - } - // Version upgrade: separate master and non-master StatefulSets var masterResources, nonMasterResources []nodespec.Resources for _, res := range adjusted { @@ -112,6 +107,11 @@ func HandleUpscaleAndSpecChanges( return results, nil } + targetVersion, err := version.Parse(ctx.es.Spec.Version) + if err != nil { + return results, fmt.Errorf("while parsing Elasticsearch upgrade target version: %w", err) + } + // Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets if len(masterResources) > 0 { allNonMastersUpgraded, err := areAllNonMasterStatefulSetsUpgraded(ctx.k8sClient, actualStatefulSets, targetVersion) From 2dc664b2f66e8bc52abf6b78a19522ffde996f05 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 29 Oct 2025 18:07:39 -0500 Subject: [PATCH 10/39] Ensure requeue Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 5ac3c2bc6a8..cd9a5bb7a3c 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -123,6 +123,7 @@ func HandleUpscaleAndSpecChanges( // Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily. // This will cause a requeue, and master StatefulSets will attempt to be processed in the next reconciliation results.ActualStatefulSets = actualStatefulSets + results.Requeue = true return results, nil } From 46c726cea0de1cbdbaca334f5ed9172c29cfee2e Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 29 Oct 2025 18:10:18 -0500 Subject: [PATCH 11/39] adjust comments Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index 76e2e8ecce8..3bf905cc014 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -572,6 +572,7 @@ func Test_adjustResources(t *testing.T) { func TestHandleUpscaleAndSpecChanges_VersionUpgradeDataFirstFlow(t *testing.T) { // Test the complete upgrade flow: data nodes upgrade first, then master nodes + // starting at 8.16.2 and upgrading to 8.17.1 es := esv1.Elasticsearch{ ObjectMeta: metav1.ObjectMeta{ Namespace: "ns", @@ -581,8 +582,8 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeDataFirstFlow(t *testing.T) { bootstrap.ClusterUUIDAnnotationName: "uuid", }, }, - Spec: esv1.ElasticsearchSpec{Version: "8.16.2"}, // Start at 8.16.2 - Status: esv1.ElasticsearchStatus{Version: "8.16.2"}, // Start at 8.16.2 + Spec: esv1.ElasticsearchSpec{Version: "8.16.2"}, + Status: esv1.ElasticsearchStatus{Version: "8.16.2"}, } k8sClient := k8s.NewFakeClient(&es) ctx := upscaleCtx{ From fccf6c38c84835358501beadfc03ead507773e59 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 29 Oct 2025 18:18:13 -0500 Subject: [PATCH 12/39] Adjust logging in e2e test Signed-off-by: Michael Montgomery --- test/e2e/es/non_master_first_upgrade_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/e2e/es/non_master_first_upgrade_test.go b/test/e2e/es/non_master_first_upgrade_test.go index 46da1bcad25..62191b01bfb 100644 --- a/test/e2e/es/non_master_first_upgrade_test.go +++ b/test/e2e/es/non_master_first_upgrade_test.go @@ -30,7 +30,7 @@ func newNonMasterFirstUpgradeWatcher(es esv1.Elasticsearch) test.Watcher { func(k *test.K8sClient, t *testing.T) { statefulSets, err := essset.RetrieveActualStatefulSets(k.Client, k8s.ExtractNamespacedName(&es)) if err != nil { - t.Logf("failed to get StatefulSets: %v", err) + t.Logf("failed to get StatefulSets: %s", err.Error()) return } @@ -39,7 +39,7 @@ func newNonMasterFirstUpgradeWatcher(es esv1.Elasticsearch) test.Watcher { for _, sset := range statefulSets { masterSTSVersion, err := essset.GetESVersion(sset) if err != nil { - t.Logf("failed to get StatefulSet version: %v", err) + t.Logf("failed to get StatefulSet version: %s", err.Error()) continue } if !label.IsMasterNodeSet(sset) { @@ -49,7 +49,7 @@ func newNonMasterFirstUpgradeWatcher(es esv1.Elasticsearch) test.Watcher { for _, otherSset := range statefulSets { otherSsetVersion, err := essset.GetESVersion(otherSset) if err != nil { - t.Logf("failed to get StatefulSet version: %v", err) + t.Logf("failed to get StatefulSet version: %s", err.Error()) continue } if masterSTSVersion.GT(otherSsetVersion) { From 8feef2469bac887b79af20f105f4a2434c962144 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Thu, 30 Oct 2025 15:21:39 -0500 Subject: [PATCH 13/39] Don't compare masters against other masters or themselves. Signed-off-by: Michael Montgomery --- test/e2e/es/non_master_first_upgrade_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/e2e/es/non_master_first_upgrade_test.go b/test/e2e/es/non_master_first_upgrade_test.go index 62191b01bfb..0b23dc3efe8 100644 --- a/test/e2e/es/non_master_first_upgrade_test.go +++ b/test/e2e/es/non_master_first_upgrade_test.go @@ -47,6 +47,10 @@ func newNonMasterFirstUpgradeWatcher(es esv1.Elasticsearch) test.Watcher { } // Ensure that the master StatefulSet never has a version higher than any non-master StatefulSet. for _, otherSset := range statefulSets { + // don't compare master against master. + if label.IsMasterNodeSet(otherSset) { + continue + } otherSsetVersion, err := essset.GetESVersion(otherSset) if err != nil { t.Logf("failed to get StatefulSet version: %s", err.Error()) From 0f5a31a2f54134e86a639663826f1f3b5262c749 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Thu, 30 Oct 2025 15:25:45 -0500 Subject: [PATCH 14/39] Fix spelling Signed-off-by: Michael Montgomery --- test/e2e/es/non_master_first_upgrade_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/es/non_master_first_upgrade_test.go b/test/e2e/es/non_master_first_upgrade_test.go index 0b23dc3efe8..a3979ca6ad9 100644 --- a/test/e2e/es/non_master_first_upgrade_test.go +++ b/test/e2e/es/non_master_first_upgrade_test.go @@ -34,7 +34,7 @@ func newNonMasterFirstUpgradeWatcher(es esv1.Elasticsearch) test.Watcher { return } - // Check if any master StatefulSet has it's version higher than any non-master StatefulSet + // Check if any master StatefulSet has its version higher than any non-master StatefulSet // which indicates that the master StatefulSet is upgrading before the non-master StatefulSets for _, sset := range statefulSets { masterSTSVersion, err := essset.GetESVersion(sset) From 030fe16ebef68f6a60d887cfcbd5a00f6d5e5f43 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Tue, 4 Nov 2025 11:03:02 -0600 Subject: [PATCH 15/39] Also check the generation/observedGeneration. Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 4c6e7fdadca..d4696e1d47a 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -269,6 +269,12 @@ func areAllNonMasterStatefulSetsUpgraded( return false, nil } + // If the StatefulSet observedGeneration is not in sync with the generation, + // then a change is in progress, and we should not consider it as upgraded. + if statefulSet.Generation != statefulSet.Status.ObservedGeneration { + return false, nil + } + // Check if this StatefulSet has pending updates if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas { return false, nil From 0db51d87fd5921bcfe5fef020ea54d07209770f9 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 5 Nov 2025 08:37:16 -0600 Subject: [PATCH 16/39] Debugging Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index baed40b3992..4e50c68c9dd 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version/zen1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version/zen2" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" + ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log" ) type upscaleCtx struct { @@ -97,11 +98,15 @@ func HandleUpscaleAndSpecChanges( } // First, reconcile all non-master resources + ulog.FromContext(ctx.parentCtx).Info("Reconciling non-master resources", "resources", nonMasterResources) actualStatefulSets, requeue, err := reconcileResources(ctx, actualStatefulSets, nonMasterResources) if err != nil { + ulog.FromContext(ctx.parentCtx).Error(err, "while reconciling non-master resources") return results, fmt.Errorf("while reconciling non-master resources: %w", err) } + ulog.FromContext(ctx.parentCtx).Info("Non-master resources reconciled", "actualStatefulSets", actualStatefulSets, "requeue", requeue) if requeue { + ulog.FromContext(ctx.parentCtx).Info("Requeuing non-master resources", "actualStatefulSets", actualStatefulSets, "requeue", requeue) results.Requeue = true results.ActualStatefulSets = actualStatefulSets return results, nil @@ -109,17 +114,21 @@ func HandleUpscaleAndSpecChanges( targetVersion, err := version.Parse(ctx.es.Spec.Version) if err != nil { + ulog.FromContext(ctx.parentCtx).Error(err, "while parsing Elasticsearch upgrade target version") return results, fmt.Errorf("while parsing Elasticsearch upgrade target version: %w", err) } // Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets if len(masterResources) > 0 { + ulog.FromContext(ctx.parentCtx).Info("Checking if all non-master StatefulSets have completed their upgrades", "masterResources", masterResources, "targetVersion", targetVersion) allNonMastersUpgraded, err := areAllNonMasterStatefulSetsUpgraded(ctx.k8sClient, actualStatefulSets, targetVersion) if err != nil { + ulog.FromContext(ctx.parentCtx).Error(err, "while checking non-master upgrade status") return results, fmt.Errorf("while checking non-master upgrade status: %w", err) } if !allNonMastersUpgraded { + ulog.FromContext(ctx.parentCtx).Info("Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily", "actualStatefulSets", actualStatefulSets, "requeue", true) // Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily. // This will cause a requeue, and master StatefulSets will attempt to be processed in the next reconciliation results.ActualStatefulSets = actualStatefulSets @@ -128,13 +137,17 @@ func HandleUpscaleAndSpecChanges( } // All non-master StatefulSets are upgraded, now process master StatefulSets + ulog.FromContext(ctx.parentCtx).Info("Reconciling master resources", "masterResources", masterResources) actualStatefulSets, results.Requeue, err = reconcileResources(ctx, actualStatefulSets, masterResources) if err != nil { + ulog.FromContext(ctx.parentCtx).Error(err, "while reconciling master resources") return results, fmt.Errorf("while reconciling master resources: %w", err) } + ulog.FromContext(ctx.parentCtx).Info("Master resources reconciled", "actualStatefulSets", actualStatefulSets, "requeue", results.Requeue) } results.ActualStatefulSets = actualStatefulSets + ulog.FromContext(ctx.parentCtx).Info("Upscale completed", "results", results) return results, nil } From 57c71a9a735a07a0359f1548bdd0caccb052ec5a Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 5 Nov 2025 08:37:42 -0600 Subject: [PATCH 17/39] Remove useless if check. Signed-off-by: Michael Montgomery --- .../elasticsearch/driver/upscale.go | 44 +++++++++---------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 4e50c68c9dd..00a4bc8547d 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -119,32 +119,30 @@ func HandleUpscaleAndSpecChanges( } // Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets - if len(masterResources) > 0 { - ulog.FromContext(ctx.parentCtx).Info("Checking if all non-master StatefulSets have completed their upgrades", "masterResources", masterResources, "targetVersion", targetVersion) - allNonMastersUpgraded, err := areAllNonMasterStatefulSetsUpgraded(ctx.k8sClient, actualStatefulSets, targetVersion) - if err != nil { - ulog.FromContext(ctx.parentCtx).Error(err, "while checking non-master upgrade status") - return results, fmt.Errorf("while checking non-master upgrade status: %w", err) - } + ulog.FromContext(ctx.parentCtx).Info("Checking if all non-master StatefulSets have completed their upgrades", "masterResources", masterResources, "targetVersion", targetVersion) + allNonMastersUpgraded, err := areAllNonMasterStatefulSetsUpgraded(ctx.k8sClient, actualStatefulSets, targetVersion) + if err != nil { + ulog.FromContext(ctx.parentCtx).Error(err, "while checking non-master upgrade status") + return results, fmt.Errorf("while checking non-master upgrade status: %w", err) + } - if !allNonMastersUpgraded { - ulog.FromContext(ctx.parentCtx).Info("Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily", "actualStatefulSets", actualStatefulSets, "requeue", true) - // Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily. - // This will cause a requeue, and master StatefulSets will attempt to be processed in the next reconciliation - results.ActualStatefulSets = actualStatefulSets - results.Requeue = true - return results, nil - } + if !allNonMastersUpgraded { + ulog.FromContext(ctx.parentCtx).Info("Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily", "actualStatefulSets", actualStatefulSets, "requeue", true) + // Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily. + // This will cause a requeue, and master StatefulSets will attempt to be processed in the next reconciliation + results.ActualStatefulSets = actualStatefulSets + results.Requeue = true + return results, nil + } - // All non-master StatefulSets are upgraded, now process master StatefulSets - ulog.FromContext(ctx.parentCtx).Info("Reconciling master resources", "masterResources", masterResources) - actualStatefulSets, results.Requeue, err = reconcileResources(ctx, actualStatefulSets, masterResources) - if err != nil { - ulog.FromContext(ctx.parentCtx).Error(err, "while reconciling master resources") - return results, fmt.Errorf("while reconciling master resources: %w", err) - } - ulog.FromContext(ctx.parentCtx).Info("Master resources reconciled", "actualStatefulSets", actualStatefulSets, "requeue", results.Requeue) + // All non-master StatefulSets are upgraded, now process master StatefulSets + ulog.FromContext(ctx.parentCtx).Info("Reconciling master resources", "masterResources", masterResources) + actualStatefulSets, results.Requeue, err = reconcileResources(ctx, actualStatefulSets, masterResources) + if err != nil { + ulog.FromContext(ctx.parentCtx).Error(err, "while reconciling master resources") + return results, fmt.Errorf("while reconciling master resources: %w", err) } + ulog.FromContext(ctx.parentCtx).Info("Master resources reconciled", "actualStatefulSets", actualStatefulSets, "requeue", results.Requeue) results.ActualStatefulSets = actualStatefulSets ulog.FromContext(ctx.parentCtx).Info("Upscale completed", "results", results) From c89e872d301e3e14cb9b3257700096438eb70f6a Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 5 Nov 2025 08:51:38 -0600 Subject: [PATCH 18/39] More targeted debugging Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 00a4bc8547d..88b5ff86513 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -98,15 +98,15 @@ func HandleUpscaleAndSpecChanges( } // First, reconcile all non-master resources - ulog.FromContext(ctx.parentCtx).Info("Reconciling non-master resources", "resources", nonMasterResources) + ulog.FromContext(ctx.parentCtx).Info("Reconciling non-master resources") actualStatefulSets, requeue, err := reconcileResources(ctx, actualStatefulSets, nonMasterResources) if err != nil { ulog.FromContext(ctx.parentCtx).Error(err, "while reconciling non-master resources") return results, fmt.Errorf("while reconciling non-master resources: %w", err) } - ulog.FromContext(ctx.parentCtx).Info("Non-master resources reconciled", "actualStatefulSets", actualStatefulSets, "requeue", requeue) + ulog.FromContext(ctx.parentCtx).Info("Non-master resources reconciled", "requeue", requeue) if requeue { - ulog.FromContext(ctx.parentCtx).Info("Requeuing non-master resources", "actualStatefulSets", actualStatefulSets, "requeue", requeue) + ulog.FromContext(ctx.parentCtx).Info("Requeuing non-master resources", "requeue", requeue) results.Requeue = true results.ActualStatefulSets = actualStatefulSets return results, nil @@ -119,7 +119,7 @@ func HandleUpscaleAndSpecChanges( } // Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets - ulog.FromContext(ctx.parentCtx).Info("Checking if all non-master StatefulSets have completed their upgrades", "masterResources", masterResources, "targetVersion", targetVersion) + ulog.FromContext(ctx.parentCtx).Info("Checking if all non-master StatefulSets have completed their upgrades", "targetVersion", targetVersion) allNonMastersUpgraded, err := areAllNonMasterStatefulSetsUpgraded(ctx.k8sClient, actualStatefulSets, targetVersion) if err != nil { ulog.FromContext(ctx.parentCtx).Error(err, "while checking non-master upgrade status") @@ -127,7 +127,7 @@ func HandleUpscaleAndSpecChanges( } if !allNonMastersUpgraded { - ulog.FromContext(ctx.parentCtx).Info("Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily", "actualStatefulSets", actualStatefulSets, "requeue", true) + ulog.FromContext(ctx.parentCtx).Info("Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily", "requeue", true) // Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily. // This will cause a requeue, and master StatefulSets will attempt to be processed in the next reconciliation results.ActualStatefulSets = actualStatefulSets @@ -136,13 +136,13 @@ func HandleUpscaleAndSpecChanges( } // All non-master StatefulSets are upgraded, now process master StatefulSets - ulog.FromContext(ctx.parentCtx).Info("Reconciling master resources", "masterResources", masterResources) + ulog.FromContext(ctx.parentCtx).Info("Reconciling master resources") actualStatefulSets, results.Requeue, err = reconcileResources(ctx, actualStatefulSets, masterResources) if err != nil { ulog.FromContext(ctx.parentCtx).Error(err, "while reconciling master resources") return results, fmt.Errorf("while reconciling master resources: %w", err) } - ulog.FromContext(ctx.parentCtx).Info("Master resources reconciled", "actualStatefulSets", actualStatefulSets, "requeue", results.Requeue) + ulog.FromContext(ctx.parentCtx).Info("Master resources reconciled", "requeue", results.Requeue) results.ActualStatefulSets = actualStatefulSets ulog.FromContext(ctx.parentCtx).Info("Upscale completed", "results", results) From 068fa54be020941a773f3e604d6211a5ec9506ea Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 5 Nov 2025 09:03:53 -0600 Subject: [PATCH 19/39] More debugging Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 88b5ff86513..38493295a63 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -230,6 +230,7 @@ func reconcileResources( resources []nodespec.Resources, ) (es_sset.StatefulSetList, bool, error) { requeue := false + ulog.FromContext(ctx.parentCtx).Info("Reconciling resources", "resource_size", len(resources)) for _, res := range resources { res := res if err := settings.ReconcileConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config, ctx.meta); err != nil { @@ -244,11 +245,15 @@ func reconcileResources( return actualStatefulSets, false, fmt.Errorf("handle volume expansion: %w", err) } if recreateSset { + ulog.FromContext(ctx.parentCtx).Info("StatefulSet is scheduled for recreation, requeuing", "name", res.StatefulSet.Name) // The StatefulSet is scheduled for recreation: let's requeue before attempting any further spec change. requeue = true continue } + } else if !exists { + ulog.FromContext(ctx.parentCtx).Info("StatefulSet does not exist", "name", res.StatefulSet.Name) } + ulog.FromContext(ctx.parentCtx).Info("Reconciling StatefulSet", "name", res.StatefulSet.Name) reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, ctx.expectations) if err != nil { return actualStatefulSets, false, fmt.Errorf("reconcile StatefulSet: %w", err) @@ -256,6 +261,7 @@ func reconcileResources( // update actual with the reconciled ones for next steps to work with up-to-date information actualStatefulSets = actualStatefulSets.WithStatefulSet(reconciled) } + ulog.FromContext(ctx.parentCtx).Info("Resources reconciled", "actualStatefulSets_size", len(actualStatefulSets), "requeue", requeue) return actualStatefulSets, requeue, nil } From 0af6b8516fa0bb6304769c2fbfe4c5cb3a15a6f4 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Wed, 5 Nov 2025 09:46:12 -0600 Subject: [PATCH 20/39] Debugging pod upgrade logic. Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/nodes.go | 2 +- pkg/controller/elasticsearch/driver/upgrade.go | 7 ++++++- pkg/controller/elasticsearch/driver/upgrade_forced.go | 2 +- pkg/controller/elasticsearch/driver/upgrade_test.go | 2 +- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/nodes.go b/pkg/controller/elasticsearch/driver/nodes.go index 65199fc48a8..7dee52d2e34 100644 --- a/pkg/controller/elasticsearch/driver/nodes.go +++ b/pkg/controller/elasticsearch/driver/nodes.go @@ -254,7 +254,7 @@ func (d *defaultDriver) isNodeSpecsReconciled(ctx context.Context, actualStatefu } // all pods should have been upgraded - pods, err := podsToUpgrade(client, actualStatefulSets) + pods, err := podsToUpgrade(ctx, client, actualStatefulSets) if err != nil { return false } diff --git a/pkg/controller/elasticsearch/driver/upgrade.go b/pkg/controller/elasticsearch/driver/upgrade.go index 6141419a209..2eee7d5d7ce 100644 --- a/pkg/controller/elasticsearch/driver/upgrade.go +++ b/pkg/controller/elasticsearch/driver/upgrade.go @@ -53,7 +53,7 @@ func (d *defaultDriver) handleUpgrades( if err != nil { return results.WithError(err) } - podsToUpgrade, err := podsToUpgrade(d.Client, statefulSets) + podsToUpgrade, err := podsToUpgrade(ctx, d.Client, statefulSets) if err != nil { return results.WithError(err) } @@ -249,6 +249,7 @@ func healthyPods( // podsToUpgrade returns all Pods of all StatefulSets where the controller-revision-hash label compared to the sset's // .status.updateRevision indicates that the Pod still needs to be deleted to be recreated with the new spec. func podsToUpgrade( + ctx context.Context, client k8s.Client, statefulSets es_sset.StatefulSetList, ) ([]corev1.Pod, error) { @@ -256,8 +257,10 @@ func podsToUpgrade( for _, statefulSet := range statefulSets { if statefulSet.Status.UpdateRevision == "" { // no upgrade scheduled + ulog.FromContext(ctx).Info("No upgrade scheduled as updateRevision is empty", "sset_name", statefulSet.Name) continue } + ulog.FromContext(ctx).Info("Inspecting pods for upgrade", "sset_name", statefulSet.Name) // Inspect each pod, starting from the highest ordinal, and decrement the idx to allow // pod upgrades to go through, controlled by the StatefulSet controller. for idx := sset.GetReplicas(statefulSet) - 1; idx >= 0; idx-- { @@ -271,9 +274,11 @@ func podsToUpgrade( return toUpgrade, err } if apierrors.IsNotFound(err) { + ulog.FromContext(ctx).Info("Pod does not exist, continuing loop", "sset_name", statefulSet.Name, "pod_name", podName) // Pod does not exist, continue the loop as the absence will be accounted by the deletion driver continue } + ulog.FromContext(ctx).Info("Pod exists, checking pod revision comparing to sts.updateRevision", "sset_name", statefulSet.Name, "pod_name", podName, "pod_revision", sset.PodRevision(pod), "sts_update_revision", statefulSet.Status.UpdateRevision) if sset.PodRevision(pod) != statefulSet.Status.UpdateRevision { toUpgrade = append(toUpgrade, pod) } diff --git a/pkg/controller/elasticsearch/driver/upgrade_forced.go b/pkg/controller/elasticsearch/driver/upgrade_forced.go index 506a602fe73..cf384f2ae6d 100644 --- a/pkg/controller/elasticsearch/driver/upgrade_forced.go +++ b/pkg/controller/elasticsearch/driver/upgrade_forced.go @@ -20,7 +20,7 @@ import ( func (d *defaultDriver) MaybeForceUpgrade(ctx context.Context, statefulSets sset.StatefulSetList) (bool, error) { // Get the pods to upgrade - podsToUpgrade, err := podsToUpgrade(d.Client, statefulSets) + podsToUpgrade, err := podsToUpgrade(ctx, d.Client, statefulSets) if err != nil { return false, err } diff --git a/pkg/controller/elasticsearch/driver/upgrade_test.go b/pkg/controller/elasticsearch/driver/upgrade_test.go index 3b3d15a9fd1..bf2ff8ae286 100644 --- a/pkg/controller/elasticsearch/driver/upgrade_test.go +++ b/pkg/controller/elasticsearch/driver/upgrade_test.go @@ -137,7 +137,7 @@ func Test_podsToUpgrade(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { client := k8s.NewFakeClient(tt.args.pods...) - got, err := podsToUpgrade(client, tt.args.statefulSets) + got, err := podsToUpgrade(context.Background(), client, tt.args.statefulSets) if (err != nil) != tt.wantErr { t.Errorf("podsToUpgrade() error = %v, wantErr %v", err, tt.wantErr) return From 54f97758ad5711dd945f74f1b01973097d06235a Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Thu, 6 Nov 2025 10:06:16 -0600 Subject: [PATCH 21/39] Attempt fix for blocked sts upgrades Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/nodes.go | 3 ++ .../elasticsearch/driver/upscale.go | 33 +++++++++++-------- .../elasticsearch/reconcile/status.go | 14 ++++++++ 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/nodes.go b/pkg/controller/elasticsearch/driver/nodes.go index 7dee52d2e34..39879ac1c53 100644 --- a/pkg/controller/elasticsearch/driver/nodes.go +++ b/pkg/controller/elasticsearch/driver/nodes.go @@ -125,6 +125,9 @@ func (d *defaultDriver) reconcileNodeSpecs( if reconcileState.HasPendingNewNodes() { results.WithReconciliationState(defaultRequeue.WithReason("Upscale in progress")) } + if reconcileState.HasPendingNonMasterSTSUpgrades() { + results.WithReconciliationState(defaultRequeue.WithReason("Non-master StatefulSets are still upgrading")) + } actualStatefulSets = upscaleResults.ActualStatefulSets // Once all the StatefulSets have been updated we can ensure that the former version of the transport certificates Secret is deleted. diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 38493295a63..91a91ba4e7a 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -120,18 +120,18 @@ func HandleUpscaleAndSpecChanges( // Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets ulog.FromContext(ctx.parentCtx).Info("Checking if all non-master StatefulSets have completed their upgrades", "targetVersion", targetVersion) - allNonMastersUpgraded, err := areAllNonMasterStatefulSetsUpgraded(ctx.k8sClient, actualStatefulSets, targetVersion) + pendingNonMasterSTS, err := findPendingNonMasterStatefulSetUpgrades(ctx.k8sClient, actualStatefulSets, targetVersion) if err != nil { ulog.FromContext(ctx.parentCtx).Error(err, "while checking non-master upgrade status") return results, fmt.Errorf("while checking non-master upgrade status: %w", err) } - if !allNonMastersUpgraded { + if len(pendingNonMasterSTS) > 0 { ulog.FromContext(ctx.parentCtx).Info("Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily", "requeue", true) // Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily. - // This will cause a requeue, and master StatefulSets will attempt to be processed in the next reconciliation + // This will cause a requeue in the caller, and master StatefulSets will attempt to be processed in the next reconciliation + ctx.upscaleReporter.RecordPendingNonMasterSTSUpgrades(pendingNonMasterSTS) results.ActualStatefulSets = actualStatefulSets - results.Requeue = true return results, nil } @@ -265,12 +265,13 @@ func reconcileResources( return actualStatefulSets, requeue, nil } -// areAllNonMasterStatefulSetsUpgraded checks if all non-master StatefulSets have completed their upgrades -func areAllNonMasterStatefulSetsUpgraded( +// findPendingNonMasterStatefulSetUpgrades finds all non-master StatefulSets that have not completed their upgrades +func findPendingNonMasterStatefulSetUpgrades( client k8s.Client, actualStatefulSets es_sset.StatefulSetList, targetVersion version.Version, -) (bool, error) { +) ([]appsv1.StatefulSet, error) { + pendingNonMasterSTS := make([]appsv1.StatefulSet, 0) for _, statefulSet := range actualStatefulSets { // Skip master StatefulSets if label.IsMasterNodeSet(statefulSet) { @@ -281,37 +282,41 @@ func areAllNonMasterStatefulSetsUpgraded( // so don't even bother looking at the state/status of the StatefulSet. actualVersion, err := es_sset.GetESVersion(statefulSet) if err != nil { - return false, err + return pendingNonMasterSTS, err } if actualVersion.LT(targetVersion) { - return false, nil + pendingNonMasterSTS = append(pendingNonMasterSTS, statefulSet) + continue } // If the StatefulSet observedGeneration is not in sync with the generation, // then a change is in progress, and we should not consider it as upgraded. if statefulSet.Generation != statefulSet.Status.ObservedGeneration { - return false, nil + pendingNonMasterSTS = append(pendingNonMasterSTS, statefulSet) + continue } // Check if this StatefulSet has pending updates if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas { - return false, nil + pendingNonMasterSTS = append(pendingNonMasterSTS, statefulSet) + continue } // Check if there are any pods that need to be upgraded pods, err := es_sset.GetActualPodsForStatefulSet(client, k8s.ExtractNamespacedName(&statefulSet)) if err != nil { - return false, err + return pendingNonMasterSTS, err } for _, pod := range pods { // Check if pod revision matches StatefulSet update revision if statefulSet.Status.UpdateRevision != "" && sset.PodRevision(pod) != statefulSet.Status.UpdateRevision { // This pod still needs to be upgraded - return false, nil + pendingNonMasterSTS = append(pendingNonMasterSTS, statefulSet) + continue } } } - return true, nil + return pendingNonMasterSTS, nil } diff --git a/pkg/controller/elasticsearch/reconcile/status.go b/pkg/controller/elasticsearch/reconcile/status.go index 77e5e224382..b00a0ecbd21 100644 --- a/pkg/controller/elasticsearch/reconcile/status.go +++ b/pkg/controller/elasticsearch/reconcile/status.go @@ -8,6 +8,7 @@ import ( "reflect" "sort" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" @@ -60,6 +61,8 @@ func (s *StatusReporter) ReportCondition( type UpscaleReporter struct { // Expected nodes to be upscaled nodes map[string]esv1.NewNode + // Number of non-master StatefulSets that are still upgrading + nonMasterSTSUpgrades int } // RecordNewNodes records pending node creations. @@ -103,6 +106,17 @@ func (u *UpscaleReporter) HasPendingNewNodes() bool { return len(u.nodes) > 0 } +func (u *UpscaleReporter) HasPendingNonMasterSTSUpgrades() bool { + return u.nonMasterSTSUpgrades > 0 +} + +func (u *UpscaleReporter) RecordPendingNonMasterSTSUpgrades(pendingNonMasterSTS []appsv1.StatefulSet) { + if u == nil { + return + } + u.nonMasterSTSUpgrades = len(pendingNonMasterSTS) +} + // Merge creates a new upscale status using the reported upscale status and an existing upscale status. func (u *UpscaleReporter) Merge(other esv1.UpscaleOperation) esv1.UpscaleOperation { upscaleOperation := other.DeepCopy() From 5dfdd0599d170fa0e2f7f13d23caab0810af8e6e Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Thu, 6 Nov 2025 14:26:18 -0600 Subject: [PATCH 22/39] Bug fix adding new master role to existing non-master sts. Signed-off-by: Michael Montgomery --- .../elasticsearch/driver/upscale.go | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 91a91ba4e7a..c7fbf185f1f 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -119,8 +119,8 @@ func HandleUpscaleAndSpecChanges( } // Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets - ulog.FromContext(ctx.parentCtx).Info("Checking if all non-master StatefulSets have completed their upgrades", "targetVersion", targetVersion) - pendingNonMasterSTS, err := findPendingNonMasterStatefulSetUpgrades(ctx.k8sClient, actualStatefulSets, targetVersion) + ulog.FromContext(ctx.parentCtx).Info("Checking if all expected non-master StatefulSets have completed their upgrades", "targetVersion", targetVersion) + pendingNonMasterSTS, err := findPendingNonMasterStatefulSetUpgrades(ctx.k8sClient, actualStatefulSets, expectedResources.StatefulSets(), targetVersion) if err != nil { ulog.FromContext(ctx.parentCtx).Error(err, "while checking non-master upgrade status") return results, fmt.Errorf("while checking non-master upgrade status: %w", err) @@ -269,50 +269,54 @@ func reconcileResources( func findPendingNonMasterStatefulSetUpgrades( client k8s.Client, actualStatefulSets es_sset.StatefulSetList, + expectedStatefulSets es_sset.StatefulSetList, targetVersion version.Version, ) ([]appsv1.StatefulSet, error) { pendingNonMasterSTS := make([]appsv1.StatefulSet, 0) - for _, statefulSet := range actualStatefulSets { - // Skip master StatefulSets - if label.IsMasterNodeSet(statefulSet) { + for _, actualStatefulSet := range actualStatefulSets { + expectedSset, _ := expectedStatefulSets.GetByName(actualStatefulSet.Name) + + // Skip master StatefulSets. We check both here because the master role may have been added + // to a non-master StatefulSet during the upgrade spec change. + if label.IsMasterNodeSet(actualStatefulSet) && label.IsMasterNodeSet(expectedSset) { continue } // If the StatefulSet is not at the target version, it is not upgraded // so don't even bother looking at the state/status of the StatefulSet. - actualVersion, err := es_sset.GetESVersion(statefulSet) + actualVersion, err := es_sset.GetESVersion(actualStatefulSet) if err != nil { return pendingNonMasterSTS, err } if actualVersion.LT(targetVersion) { - pendingNonMasterSTS = append(pendingNonMasterSTS, statefulSet) + pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) continue } // If the StatefulSet observedGeneration is not in sync with the generation, // then a change is in progress, and we should not consider it as upgraded. - if statefulSet.Generation != statefulSet.Status.ObservedGeneration { - pendingNonMasterSTS = append(pendingNonMasterSTS, statefulSet) + if actualStatefulSet.Generation != actualStatefulSet.Status.ObservedGeneration { + pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) continue } // Check if this StatefulSet has pending updates - if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas { - pendingNonMasterSTS = append(pendingNonMasterSTS, statefulSet) + if actualStatefulSet.Status.UpdatedReplicas != actualStatefulSet.Status.Replicas { + pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) continue } // Check if there are any pods that need to be upgraded - pods, err := es_sset.GetActualPodsForStatefulSet(client, k8s.ExtractNamespacedName(&statefulSet)) + pods, err := es_sset.GetActualPodsForStatefulSet(client, k8s.ExtractNamespacedName(&actualStatefulSet)) if err != nil { return pendingNonMasterSTS, err } for _, pod := range pods { // Check if pod revision matches StatefulSet update revision - if statefulSet.Status.UpdateRevision != "" && sset.PodRevision(pod) != statefulSet.Status.UpdateRevision { + if actualStatefulSet.Status.UpdateRevision != "" && sset.PodRevision(pod) != actualStatefulSet.Status.UpdateRevision { // This pod still needs to be upgraded - pendingNonMasterSTS = append(pendingNonMasterSTS, statefulSet) + pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) continue } } From edf3faffc9fd49bb7be7bd9f296bd6eb6a000557 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Fri, 7 Nov 2025 10:12:02 -0600 Subject: [PATCH 23/39] More debugging Signed-off-by: Michael Montgomery --- .../elasticsearch/driver/upscale.go | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index c7fbf185f1f..77e95256232 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -7,6 +7,7 @@ package driver import ( "context" "fmt" + "strings" appsv1 "k8s.io/api/apps/v1" @@ -96,6 +97,20 @@ func HandleUpscaleAndSpecChanges( nonMasterResources = append(nonMasterResources, res) } } + ulog.FromContext(ctx.parentCtx).Info("Master resources", "resources", func() string { + names := make([]string, 0, len(masterResources)) + for _, res := range masterResources { + names = append(names, res.StatefulSet.Name) + } + return strings.Join(names, ", ") + }()) + ulog.FromContext(ctx.parentCtx).Info("Non-master resources", "resources", func() string { + names := make([]string, 0, len(nonMasterResources)) + for _, res := range nonMasterResources { + names = append(names, res.StatefulSet.Name) + } + return strings.Join(names, ", ") + }()) // First, reconcile all non-master resources ulog.FromContext(ctx.parentCtx).Info("Reconciling non-master resources") @@ -127,7 +142,13 @@ func HandleUpscaleAndSpecChanges( } if len(pendingNonMasterSTS) > 0 { - ulog.FromContext(ctx.parentCtx).Info("Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily", "requeue", true) + ulog.FromContext(ctx.parentCtx).Info("Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily", "requeue", true, "pendingNonMasterSTS", func() string { + names := make([]string, 0, len(pendingNonMasterSTS)) + for _, sst := range pendingNonMasterSTS { + names = append(names, sst.Name) + } + return strings.Join(names, ", ") + }()) // Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily. // This will cause a requeue in the caller, and master StatefulSets will attempt to be processed in the next reconciliation ctx.upscaleReporter.RecordPendingNonMasterSTSUpgrades(pendingNonMasterSTS) From a6d8edc50070f05ecba8aed50162f4afe07c58f3 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Fri, 7 Nov 2025 11:04:03 -0600 Subject: [PATCH 24/39] Adjust to logical or Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 77e95256232..19918cb3d57 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -299,7 +299,7 @@ func findPendingNonMasterStatefulSetUpgrades( // Skip master StatefulSets. We check both here because the master role may have been added // to a non-master StatefulSet during the upgrade spec change. - if label.IsMasterNodeSet(actualStatefulSet) && label.IsMasterNodeSet(expectedSset) { + if label.IsMasterNodeSet(actualStatefulSet) || label.IsMasterNodeSet(expectedSset) { continue } From 8cfa06c48ce81c08c612047f327f75175d8c7373 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Mon, 17 Nov 2025 09:06:46 -0600 Subject: [PATCH 25/39] Simplify logic in HandleUpscaleAndSpecChanges Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 19918cb3d57..179e807a687 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -119,11 +119,11 @@ func HandleUpscaleAndSpecChanges( ulog.FromContext(ctx.parentCtx).Error(err, "while reconciling non-master resources") return results, fmt.Errorf("while reconciling non-master resources: %w", err) } - ulog.FromContext(ctx.parentCtx).Info("Non-master resources reconciled", "requeue", requeue) + results.ActualStatefulSets = actualStatefulSets + if requeue { ulog.FromContext(ctx.parentCtx).Info("Requeuing non-master resources", "requeue", requeue) results.Requeue = true - results.ActualStatefulSets = actualStatefulSets return results, nil } @@ -141,6 +141,8 @@ func HandleUpscaleAndSpecChanges( return results, fmt.Errorf("while checking non-master upgrade status: %w", err) } + ctx.upscaleReporter.RecordPendingNonMasterSTSUpgrades(pendingNonMasterSTS) + if len(pendingNonMasterSTS) > 0 { ulog.FromContext(ctx.parentCtx).Info("Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily", "requeue", true, "pendingNonMasterSTS", func() string { names := make([]string, 0, len(pendingNonMasterSTS)) @@ -151,8 +153,6 @@ func HandleUpscaleAndSpecChanges( }()) // Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily. // This will cause a requeue in the caller, and master StatefulSets will attempt to be processed in the next reconciliation - ctx.upscaleReporter.RecordPendingNonMasterSTSUpgrades(pendingNonMasterSTS) - results.ActualStatefulSets = actualStatefulSets return results, nil } From c2e116179eff83192b15ee98fadc82272b63215e Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Mon, 10 Nov 2025 08:33:11 -0600 Subject: [PATCH 26/39] Remove debugging Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/nodes.go | 2 +- .../elasticsearch/driver/upgrade.go | 7 +--- .../elasticsearch/driver/upgrade_forced.go | 2 +- .../elasticsearch/driver/upgrade_test.go | 2 +- .../elasticsearch/driver/upscale.go | 32 ------------------- 5 files changed, 4 insertions(+), 41 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/nodes.go b/pkg/controller/elasticsearch/driver/nodes.go index 39879ac1c53..545c6033484 100644 --- a/pkg/controller/elasticsearch/driver/nodes.go +++ b/pkg/controller/elasticsearch/driver/nodes.go @@ -257,7 +257,7 @@ func (d *defaultDriver) isNodeSpecsReconciled(ctx context.Context, actualStatefu } // all pods should have been upgraded - pods, err := podsToUpgrade(ctx, client, actualStatefulSets) + pods, err := podsToUpgrade(client, actualStatefulSets) if err != nil { return false } diff --git a/pkg/controller/elasticsearch/driver/upgrade.go b/pkg/controller/elasticsearch/driver/upgrade.go index 2eee7d5d7ce..6141419a209 100644 --- a/pkg/controller/elasticsearch/driver/upgrade.go +++ b/pkg/controller/elasticsearch/driver/upgrade.go @@ -53,7 +53,7 @@ func (d *defaultDriver) handleUpgrades( if err != nil { return results.WithError(err) } - podsToUpgrade, err := podsToUpgrade(ctx, d.Client, statefulSets) + podsToUpgrade, err := podsToUpgrade(d.Client, statefulSets) if err != nil { return results.WithError(err) } @@ -249,7 +249,6 @@ func healthyPods( // podsToUpgrade returns all Pods of all StatefulSets where the controller-revision-hash label compared to the sset's // .status.updateRevision indicates that the Pod still needs to be deleted to be recreated with the new spec. func podsToUpgrade( - ctx context.Context, client k8s.Client, statefulSets es_sset.StatefulSetList, ) ([]corev1.Pod, error) { @@ -257,10 +256,8 @@ func podsToUpgrade( for _, statefulSet := range statefulSets { if statefulSet.Status.UpdateRevision == "" { // no upgrade scheduled - ulog.FromContext(ctx).Info("No upgrade scheduled as updateRevision is empty", "sset_name", statefulSet.Name) continue } - ulog.FromContext(ctx).Info("Inspecting pods for upgrade", "sset_name", statefulSet.Name) // Inspect each pod, starting from the highest ordinal, and decrement the idx to allow // pod upgrades to go through, controlled by the StatefulSet controller. for idx := sset.GetReplicas(statefulSet) - 1; idx >= 0; idx-- { @@ -274,11 +271,9 @@ func podsToUpgrade( return toUpgrade, err } if apierrors.IsNotFound(err) { - ulog.FromContext(ctx).Info("Pod does not exist, continuing loop", "sset_name", statefulSet.Name, "pod_name", podName) // Pod does not exist, continue the loop as the absence will be accounted by the deletion driver continue } - ulog.FromContext(ctx).Info("Pod exists, checking pod revision comparing to sts.updateRevision", "sset_name", statefulSet.Name, "pod_name", podName, "pod_revision", sset.PodRevision(pod), "sts_update_revision", statefulSet.Status.UpdateRevision) if sset.PodRevision(pod) != statefulSet.Status.UpdateRevision { toUpgrade = append(toUpgrade, pod) } diff --git a/pkg/controller/elasticsearch/driver/upgrade_forced.go b/pkg/controller/elasticsearch/driver/upgrade_forced.go index cf384f2ae6d..506a602fe73 100644 --- a/pkg/controller/elasticsearch/driver/upgrade_forced.go +++ b/pkg/controller/elasticsearch/driver/upgrade_forced.go @@ -20,7 +20,7 @@ import ( func (d *defaultDriver) MaybeForceUpgrade(ctx context.Context, statefulSets sset.StatefulSetList) (bool, error) { // Get the pods to upgrade - podsToUpgrade, err := podsToUpgrade(ctx, d.Client, statefulSets) + podsToUpgrade, err := podsToUpgrade(d.Client, statefulSets) if err != nil { return false, err } diff --git a/pkg/controller/elasticsearch/driver/upgrade_test.go b/pkg/controller/elasticsearch/driver/upgrade_test.go index bf2ff8ae286..3b3d15a9fd1 100644 --- a/pkg/controller/elasticsearch/driver/upgrade_test.go +++ b/pkg/controller/elasticsearch/driver/upgrade_test.go @@ -137,7 +137,7 @@ func Test_podsToUpgrade(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { client := k8s.NewFakeClient(tt.args.pods...) - got, err := podsToUpgrade(context.Background(), client, tt.args.statefulSets) + got, err := podsToUpgrade(client, tt.args.statefulSets) if (err != nil) != tt.wantErr { t.Errorf("podsToUpgrade() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 179e807a687..b5df20f85ba 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -7,7 +7,6 @@ package driver import ( "context" "fmt" - "strings" appsv1 "k8s.io/api/apps/v1" @@ -97,76 +96,45 @@ func HandleUpscaleAndSpecChanges( nonMasterResources = append(nonMasterResources, res) } } - ulog.FromContext(ctx.parentCtx).Info("Master resources", "resources", func() string { - names := make([]string, 0, len(masterResources)) - for _, res := range masterResources { - names = append(names, res.StatefulSet.Name) - } - return strings.Join(names, ", ") - }()) - ulog.FromContext(ctx.parentCtx).Info("Non-master resources", "resources", func() string { - names := make([]string, 0, len(nonMasterResources)) - for _, res := range nonMasterResources { - names = append(names, res.StatefulSet.Name) - } - return strings.Join(names, ", ") - }()) // First, reconcile all non-master resources - ulog.FromContext(ctx.parentCtx).Info("Reconciling non-master resources") actualStatefulSets, requeue, err := reconcileResources(ctx, actualStatefulSets, nonMasterResources) if err != nil { - ulog.FromContext(ctx.parentCtx).Error(err, "while reconciling non-master resources") return results, fmt.Errorf("while reconciling non-master resources: %w", err) } results.ActualStatefulSets = actualStatefulSets if requeue { - ulog.FromContext(ctx.parentCtx).Info("Requeuing non-master resources", "requeue", requeue) results.Requeue = true return results, nil } targetVersion, err := version.Parse(ctx.es.Spec.Version) if err != nil { - ulog.FromContext(ctx.parentCtx).Error(err, "while parsing Elasticsearch upgrade target version") return results, fmt.Errorf("while parsing Elasticsearch upgrade target version: %w", err) } // Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets - ulog.FromContext(ctx.parentCtx).Info("Checking if all expected non-master StatefulSets have completed their upgrades", "targetVersion", targetVersion) pendingNonMasterSTS, err := findPendingNonMasterStatefulSetUpgrades(ctx.k8sClient, actualStatefulSets, expectedResources.StatefulSets(), targetVersion) if err != nil { - ulog.FromContext(ctx.parentCtx).Error(err, "while checking non-master upgrade status") return results, fmt.Errorf("while checking non-master upgrade status: %w", err) } ctx.upscaleReporter.RecordPendingNonMasterSTSUpgrades(pendingNonMasterSTS) if len(pendingNonMasterSTS) > 0 { - ulog.FromContext(ctx.parentCtx).Info("Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily", "requeue", true, "pendingNonMasterSTS", func() string { - names := make([]string, 0, len(pendingNonMasterSTS)) - for _, sst := range pendingNonMasterSTS { - names = append(names, sst.Name) - } - return strings.Join(names, ", ") - }()) // Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily. // This will cause a requeue in the caller, and master StatefulSets will attempt to be processed in the next reconciliation return results, nil } // All non-master StatefulSets are upgraded, now process master StatefulSets - ulog.FromContext(ctx.parentCtx).Info("Reconciling master resources") actualStatefulSets, results.Requeue, err = reconcileResources(ctx, actualStatefulSets, masterResources) if err != nil { - ulog.FromContext(ctx.parentCtx).Error(err, "while reconciling master resources") return results, fmt.Errorf("while reconciling master resources: %w", err) } - ulog.FromContext(ctx.parentCtx).Info("Master resources reconciled", "requeue", results.Requeue) results.ActualStatefulSets = actualStatefulSets - ulog.FromContext(ctx.parentCtx).Info("Upscale completed", "results", results) return results, nil } From 1dcef6c9e8aa1f2651d872ded89fb17b7eb84955 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Mon, 10 Nov 2025 08:36:48 -0600 Subject: [PATCH 27/39] comments Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/reconcile/status.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/controller/elasticsearch/reconcile/status.go b/pkg/controller/elasticsearch/reconcile/status.go index b00a0ecbd21..d7416b5c3f4 100644 --- a/pkg/controller/elasticsearch/reconcile/status.go +++ b/pkg/controller/elasticsearch/reconcile/status.go @@ -106,10 +106,12 @@ func (u *UpscaleReporter) HasPendingNewNodes() bool { return len(u.nodes) > 0 } +// HasPendingNonMasterSTSUpgrades returns true if at least one non-master StatefulSet is still upgrading. func (u *UpscaleReporter) HasPendingNonMasterSTSUpgrades() bool { return u.nonMasterSTSUpgrades > 0 } +// RecordPendingNonMasterSTSUpgrades records the number of non-master StatefulSets that have upgrades pending. func (u *UpscaleReporter) RecordPendingNonMasterSTSUpgrades(pendingNonMasterSTS []appsv1.StatefulSet) { if u == nil { return From eb963e5f64133957368b265f22cc6ab9bbcbc97e Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Mon, 17 Nov 2025 09:43:45 -0600 Subject: [PATCH 28/39] Use expectations. Signed-off-by: Michael Montgomery --- .../elasticsearch/driver/upscale.go | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index b5df20f85ba..919ad2c94a9 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -7,6 +7,7 @@ package driver import ( "context" "fmt" + "slices" appsv1 "k8s.io/api/apps/v1" @@ -115,7 +116,13 @@ func HandleUpscaleAndSpecChanges( } // Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets - pendingNonMasterSTS, err := findPendingNonMasterStatefulSetUpgrades(ctx.k8sClient, actualStatefulSets, expectedResources.StatefulSets(), targetVersion) + pendingNonMasterSTS, err := findPendingNonMasterStatefulSetUpgrades( + ctx.k8sClient, + actualStatefulSets, + expectedResources.StatefulSets(), + targetVersion, + expectations.NewExpectations(ctx.k8sClient), + ) if err != nil { return results, fmt.Errorf("while checking non-master upgrade status: %w", err) } @@ -260,7 +267,13 @@ func findPendingNonMasterStatefulSetUpgrades( actualStatefulSets es_sset.StatefulSetList, expectedStatefulSets es_sset.StatefulSetList, targetVersion version.Version, + expectations *expectations.Expectations, ) ([]appsv1.StatefulSet, error) { + pendingStatefulSet, err := expectations.ExpectedStatefulSetUpdates.PendingGenerations() + if err != nil { + return nil, err + } + pendingNonMasterSTS := make([]appsv1.StatefulSet, 0) for _, actualStatefulSet := range actualStatefulSets { expectedSset, _ := expectedStatefulSets.GetByName(actualStatefulSet.Name) @@ -271,6 +284,12 @@ func findPendingNonMasterStatefulSetUpgrades( continue } + // If the expectations show this as a pending StatefulSet, add it to the list. + if slices.Contains(pendingStatefulSet, actualStatefulSet.Name) { + pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) + continue + } + // If the StatefulSet is not at the target version, it is not upgraded // so don't even bother looking at the state/status of the StatefulSet. actualVersion, err := es_sset.GetESVersion(actualStatefulSet) From 518d69d0f47b4b9a1fda8cb203bb48da170dbd24 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Mon, 17 Nov 2025 09:44:53 -0600 Subject: [PATCH 29/39] Remove duplicative check now that using expectations. Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 919ad2c94a9..a3fbb77797a 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -301,13 +301,6 @@ func findPendingNonMasterStatefulSetUpgrades( continue } - // If the StatefulSet observedGeneration is not in sync with the generation, - // then a change is in progress, and we should not consider it as upgraded. - if actualStatefulSet.Generation != actualStatefulSet.Status.ObservedGeneration { - pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) - continue - } - // Check if this StatefulSet has pending updates if actualStatefulSet.Status.UpdatedReplicas != actualStatefulSet.Status.Replicas { pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) From 16fd9ece0ea7f34140f3437343922859823503a3 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Mon, 17 Nov 2025 13:51:56 -0600 Subject: [PATCH 30/39] Attempt allow replicas update master (wip) Signed-off-by: Michael Montgomery --- .../elasticsearch/driver/upscale.go | 29 +++++++++++++++++++ .../elasticsearch/driver/upscale_test.go | 21 ++++++++------ 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index a3fbb77797a..d616387caf5 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -10,6 +10,7 @@ import ( "slices" appsv1 "k8s.io/api/apps/v1" + "k8s.io/utils/ptr" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common" @@ -98,6 +99,12 @@ func HandleUpscaleAndSpecChanges( } } + // The only adjustment we want to make to master statefulSets before ensuring that all non-master + // statefulSets have been reconciled is to scale up the replicas to the expected number. + if err = maybeUpscaleMasterResources(ctx, actualStatefulSets, masterResources); err != nil { + return results, fmt.Errorf("while scaling up master resources: %w", err) + } + // First, reconcile all non-master resources actualStatefulSets, requeue, err := reconcileResources(ctx, actualStatefulSets, nonMasterResources) if err != nil { @@ -145,6 +152,28 @@ func HandleUpscaleAndSpecChanges( return results, nil } +func maybeUpscaleMasterResources(ctx upscaleCtx, actualStatefulSets es_sset.StatefulSetList, masterResources []nodespec.Resources) error { + for _, sts := range masterResources { + fmt.Println("Attempting upscale of master StatefulSet", sts.StatefulSet.Name) + actualSset, found := actualStatefulSets.GetByName(sts.StatefulSet.Name) + if !found { + fmt.Println("Master StatefulSet not found in actualStatefulSets", sts.StatefulSet.Name) + continue + } + actualReplicas := sset.GetReplicas(actualSset) + targetReplicas := sset.GetReplicas(sts.StatefulSet) + fmt.Println("Actual replicas", actualReplicas, "Target replicas", targetReplicas) + if actualReplicas < targetReplicas { + actualSset.Spec.Replicas = ptr.To[int32](targetReplicas) + fmt.Println("Updating master StatefulSet replicas", actualSset.Spec.Replicas, "to", targetReplicas) + if err := ctx.k8sClient.Update(ctx.parentCtx, &actualSset); err != nil { + return fmt.Errorf("while upscaling master sts replicas: %w", err) + } + } + } + return nil +} + func podsToCreate( actualStatefulSets, expectedStatefulSets es_sset.StatefulSetList, ) []string { diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index 3bf905cc014..3337d961ed8 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -6,6 +6,7 @@ package driver import ( "context" + "fmt" "reflect" "sort" "sync" @@ -699,7 +700,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeDataFirstFlow(t *testing.T) { masterSset.Status.UpdatedReplicas = 3 masterSset.Status.CurrentRevision = "master-sset-old" masterSset.Status.UpdateRevision = "master-sset-old" - require.NoError(t, k8sClient.Status().Update(context.Background(), &masterSset)) + require.NoError(t, k8sClient.Update(context.Background(), &masterSset)) var dataSset appsv1.StatefulSet require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) @@ -815,7 +816,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeDataFirstFlow(t *testing.T) { }, }, Spec: appsv1.StatefulSetSpec{ - Replicas: ptr.To[int32](3), + Replicas: ptr.To[int32](4), // also upscale the master replicas to ensure that an upscale during an upgrade can happen in parallel with non-masters. Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -895,26 +896,28 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeDataFirstFlow(t *testing.T) { require.NoError(t, k8sClient.Status().Update(context.Background(), &dataSset)) // Call HandleUpscaleAndSpecChanges and verify that both data upgrade has begun and master STS is not updated + fmt.Println("Calling HandleUpscaleAndSpecChanges which should upscale the master replicas to 4") _, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) require.NoError(t, err) // Update actualStatefulSets to reflect the current state - require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) - require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) - actualStatefulSets = es_sset.StatefulSetList{masterSset, dataSset} + // require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) + // require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) + // actualStatefulSets = es_sset.StatefulSetList{masterSset, dataSset} - // Call HandleUpscaleAndSpecChanges - data STS should be updated, but master should NOT - res, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) - require.NoError(t, err) + // Call HandleUpscaleAndSpecChanges - data STS should be updated, + // master version should not, but the replicas should be scaled up to 4. + // res, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) + // require.NoError(t, err) // Verify data StatefulSet is updated to 8.17.1 require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", dataSset.Spec.Template.Spec.Containers[0].Image) // Verify master StatefulSet version hasn't changed yet (should still be 8.16.2) - // This is the key test - master should NOT be updated until data is fully upgraded require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.16.2", masterSset.Spec.Template.Spec.Containers[0].Image) + require.Equal(t, int32(4), *masterSset.Spec.Replicas) // Update data STS and associated pods to show they are completely upgraded require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) From 82731527253dbddfad8f1505313d7b32cb3dcc26 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Mon, 17 Nov 2025 18:36:12 -0600 Subject: [PATCH 31/39] Fix the code allowing replica updates during upgrades. Signed-off-by: Michael Montgomery --- .../elasticsearch/driver/upscale.go | 32 ++++++++++++------- .../elasticsearch/driver/upscale_test.go | 11 +------ 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index d616387caf5..6c25e2e5a9b 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -10,6 +10,7 @@ import ( "slices" appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/utils/ptr" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" @@ -101,7 +102,10 @@ func HandleUpscaleAndSpecChanges( // The only adjustment we want to make to master statefulSets before ensuring that all non-master // statefulSets have been reconciled is to scale up the replicas to the expected number. - if err = maybeUpscaleMasterResources(ctx, actualStatefulSets, masterResources); err != nil { + // The only adjustment we want to make to master statefulSets before ensuring that all non-master + // statefulSets have been reconciled is to potentially scale up the replicas + // which should happen 1 at a time as we adjust the replicas early. + if err = maybeUpscaleMasterResources(ctx, masterResources); err != nil { return results, fmt.Errorf("while scaling up master resources: %w", err) } @@ -152,20 +156,26 @@ func HandleUpscaleAndSpecChanges( return results, nil } -func maybeUpscaleMasterResources(ctx upscaleCtx, actualStatefulSets es_sset.StatefulSetList, masterResources []nodespec.Resources) error { - for _, sts := range masterResources { - fmt.Println("Attempting upscale of master StatefulSet", sts.StatefulSet.Name) - actualSset, found := actualStatefulSets.GetByName(sts.StatefulSet.Name) - if !found { - fmt.Println("Master StatefulSet not found in actualStatefulSets", sts.StatefulSet.Name) - continue +func maybeUpscaleMasterResources(ctx upscaleCtx, masterResources []nodespec.Resources) error { + // Upscale master StatefulSets using the adjusted resources and read the current StatefulSet + // from k8s to get the latest state. + for _, res := range masterResources { + stsName := res.StatefulSet.Name + + // Read the current StatefulSet from k8s to get the latest state + var actualSset appsv1.StatefulSet + if err := ctx.k8sClient.Get(ctx.parentCtx, k8s.ExtractNamespacedName(&res.StatefulSet), &actualSset); err != nil { + if apierrors.IsNotFound(err) { + continue + } + return fmt.Errorf("while getting master StatefulSet %s: %w", stsName, err) } + actualReplicas := sset.GetReplicas(actualSset) - targetReplicas := sset.GetReplicas(sts.StatefulSet) - fmt.Println("Actual replicas", actualReplicas, "Target replicas", targetReplicas) + targetReplicas := sset.GetReplicas(res.StatefulSet) + if actualReplicas < targetReplicas { actualSset.Spec.Replicas = ptr.To[int32](targetReplicas) - fmt.Println("Updating master StatefulSet replicas", actualSset.Spec.Replicas, "to", targetReplicas) if err := ctx.k8sClient.Update(ctx.parentCtx, &actualSset); err != nil { return fmt.Errorf("while upscaling master sts replicas: %w", err) } diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index 3337d961ed8..4db720f398c 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -701,6 +701,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeDataFirstFlow(t *testing.T) { masterSset.Status.CurrentRevision = "master-sset-old" masterSset.Status.UpdateRevision = "master-sset-old" require.NoError(t, k8sClient.Update(context.Background(), &masterSset)) + require.NoError(t, k8sClient.Status().Update(context.Background(), &masterSset)) var dataSset appsv1.StatefulSet require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) @@ -900,16 +901,6 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeDataFirstFlow(t *testing.T) { _, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) require.NoError(t, err) - // Update actualStatefulSets to reflect the current state - // require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) - // require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) - // actualStatefulSets = es_sset.StatefulSetList{masterSset, dataSset} - - // Call HandleUpscaleAndSpecChanges - data STS should be updated, - // master version should not, but the replicas should be scaled up to 4. - // res, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) - // require.NoError(t, err) - // Verify data StatefulSet is updated to 8.17.1 require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", dataSset.Spec.Template.Spec.Containers[0].Image) From 11cc0e612051230287f48b096199ffd686325bc5 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Mon, 17 Nov 2025 18:40:38 -0600 Subject: [PATCH 32/39] Add comment to test Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index 4db720f398c..5c38eb4160a 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -908,6 +908,7 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeDataFirstFlow(t *testing.T) { // Verify master StatefulSet version hasn't changed yet (should still be 8.16.2) require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.16.2", masterSset.Spec.Template.Spec.Containers[0].Image) + // Verify master StatefulSet replicas have been scaled up to 4 require.Equal(t, int32(4), *masterSset.Spec.Replicas) // Update data STS and associated pods to show they are completely upgraded From 645e088e23cde9af5fcbf27cbd779eb6cf341eb3 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Sun, 30 Nov 2025 19:35:16 -0600 Subject: [PATCH 33/39] Apply review comments Signed-off-by: Michael Montgomery --- .../elasticsearch/driver/upscale.go | 43 +++++++++---------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 6c25e2e5a9b..350b3ebbcab 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -81,12 +81,10 @@ func HandleUpscaleAndSpecChanges( // If this is not a version upgrade, process all resources normally and return if !isVersionUpgrade { - actualStatefulSets, requeue, err := reconcileResources(ctx, actualStatefulSets, adjusted) + results, err = reconcileResources(ctx, actualStatefulSets, adjusted) if err != nil { return results, fmt.Errorf("while reconciling resources: %w", err) } - results.Requeue = requeue - results.ActualStatefulSets = actualStatefulSets return results, nil } @@ -100,8 +98,6 @@ func HandleUpscaleAndSpecChanges( } } - // The only adjustment we want to make to master statefulSets before ensuring that all non-master - // statefulSets have been reconciled is to scale up the replicas to the expected number. // The only adjustment we want to make to master statefulSets before ensuring that all non-master // statefulSets have been reconciled is to potentially scale up the replicas // which should happen 1 at a time as we adjust the replicas early. @@ -110,14 +106,13 @@ func HandleUpscaleAndSpecChanges( } // First, reconcile all non-master resources - actualStatefulSets, requeue, err := reconcileResources(ctx, actualStatefulSets, nonMasterResources) + results, err = reconcileResources(ctx, actualStatefulSets, nonMasterResources) if err != nil { return results, fmt.Errorf("while reconciling non-master resources: %w", err) } results.ActualStatefulSets = actualStatefulSets - if requeue { - results.Requeue = true + if results.Requeue { return results, nil } @@ -132,7 +127,7 @@ func HandleUpscaleAndSpecChanges( actualStatefulSets, expectedResources.StatefulSets(), targetVersion, - expectations.NewExpectations(ctx.k8sClient), + ctx.expectations, ) if err != nil { return results, fmt.Errorf("while checking non-master upgrade status: %w", err) @@ -147,7 +142,7 @@ func HandleUpscaleAndSpecChanges( } // All non-master StatefulSets are upgraded, now process master StatefulSets - actualStatefulSets, results.Requeue, err = reconcileResources(ctx, actualStatefulSets, masterResources) + results, err = reconcileResources(ctx, actualStatefulSets, masterResources) if err != nil { return results, fmt.Errorf("while reconciling master resources: %w", err) } @@ -175,7 +170,7 @@ func maybeUpscaleMasterResources(ctx upscaleCtx, masterResources []nodespec.Reso targetReplicas := sset.GetReplicas(res.StatefulSet) if actualReplicas < targetReplicas { - actualSset.Spec.Replicas = ptr.To[int32](targetReplicas) + actualSset.Spec.Replicas = ptr.To(targetReplicas) if err := ctx.k8sClient.Update(ctx.parentCtx, &actualSset); err != nil { return fmt.Errorf("while upscaling master sts replicas: %w", err) } @@ -263,41 +258,43 @@ func reconcileResources( ctx upscaleCtx, actualStatefulSets es_sset.StatefulSetList, resources []nodespec.Resources, -) (es_sset.StatefulSetList, bool, error) { - requeue := false +) (UpscaleResults, error) { + results := UpscaleResults{ + ActualStatefulSets: actualStatefulSets, + } ulog.FromContext(ctx.parentCtx).Info("Reconciling resources", "resource_size", len(resources)) for _, res := range resources { res := res if err := settings.ReconcileConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config, ctx.meta); err != nil { - return actualStatefulSets, false, fmt.Errorf("reconcile config: %w", err) + return results, fmt.Errorf("reconcile config: %w", err) } if _, err := common.ReconcileService(ctx.parentCtx, ctx.k8sClient, &res.HeadlessService, &ctx.es); err != nil { - return actualStatefulSets, false, fmt.Errorf("reconcile service: %w", err) + return results, fmt.Errorf("reconcile service: %w", err) } if actualSset, exists := actualStatefulSets.GetByName(res.StatefulSet.Name); exists { recreateSset, err := handleVolumeExpansion(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, actualSset, ctx.validateStorageClass) if err != nil { - return actualStatefulSets, false, fmt.Errorf("handle volume expansion: %w", err) + return results, fmt.Errorf("handle volume expansion: %w", err) } if recreateSset { ulog.FromContext(ctx.parentCtx).Info("StatefulSet is scheduled for recreation, requeuing", "name", res.StatefulSet.Name) // The StatefulSet is scheduled for recreation: let's requeue before attempting any further spec change. - requeue = true + results.Requeue = true continue } - } else if !exists { + } else { ulog.FromContext(ctx.parentCtx).Info("StatefulSet does not exist", "name", res.StatefulSet.Name) } ulog.FromContext(ctx.parentCtx).Info("Reconciling StatefulSet", "name", res.StatefulSet.Name) reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, ctx.expectations) if err != nil { - return actualStatefulSets, false, fmt.Errorf("reconcile StatefulSet: %w", err) + return results, fmt.Errorf("reconcile StatefulSet: %w", err) } // update actual with the reconciled ones for next steps to work with up-to-date information - actualStatefulSets = actualStatefulSets.WithStatefulSet(reconciled) + results.ActualStatefulSets = results.ActualStatefulSets.WithStatefulSet(reconciled) } - ulog.FromContext(ctx.parentCtx).Info("Resources reconciled", "actualStatefulSets_size", len(actualStatefulSets), "requeue", requeue) - return actualStatefulSets, requeue, nil + ulog.FromContext(ctx.parentCtx).Info("Resources reconciled", "actualStatefulSets_size", len(results.ActualStatefulSets), "requeue", results.Requeue) + return results, nil } // findPendingNonMasterStatefulSetUpgrades finds all non-master StatefulSets that have not completed their upgrades @@ -357,7 +354,7 @@ func findPendingNonMasterStatefulSetUpgrades( if actualStatefulSet.Status.UpdateRevision != "" && sset.PodRevision(pod) != actualStatefulSet.Status.UpdateRevision { // This pod still needs to be upgraded pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) - continue + break } } } From 3e95b2d7ce41dedd480203953ca945d6230691c6 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Sun, 30 Nov 2025 19:40:33 -0600 Subject: [PATCH 34/39] More review comments Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale.go | 5 +++-- .../elasticsearch/driver/upscale_test.go | 17 ++++++----------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 350b3ebbcab..dd11016b615 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -174,6 +174,7 @@ func maybeUpscaleMasterResources(ctx upscaleCtx, masterResources []nodespec.Reso if err := ctx.k8sClient.Update(ctx.parentCtx, &actualSset); err != nil { return fmt.Errorf("while upscaling master sts replicas: %w", err) } + ctx.expectations.ExpectGeneration(actualSset) } } return nil @@ -305,7 +306,7 @@ func findPendingNonMasterStatefulSetUpgrades( targetVersion version.Version, expectations *expectations.Expectations, ) ([]appsv1.StatefulSet, error) { - pendingStatefulSet, err := expectations.ExpectedStatefulSetUpdates.PendingGenerations() + pendingStatefulSets, err := expectations.ExpectedStatefulSetUpdates.PendingGenerations() if err != nil { return nil, err } @@ -321,7 +322,7 @@ func findPendingNonMasterStatefulSetUpgrades( } // If the expectations show this as a pending StatefulSet, add it to the list. - if slices.Contains(pendingStatefulSet, actualStatefulSet.Name) { + if slices.Contains(pendingStatefulSets, actualStatefulSet.Name) { pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) continue } diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index 5c38eb4160a..9322d6fdd56 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -897,7 +897,6 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeDataFirstFlow(t *testing.T) { require.NoError(t, k8sClient.Status().Update(context.Background(), &dataSset)) // Call HandleUpscaleAndSpecChanges and verify that both data upgrade has begun and master STS is not updated - fmt.Println("Calling HandleUpscaleAndSpecChanges which should upscale the master replicas to 4") _, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) require.NoError(t, err) @@ -919,16 +918,12 @@ func TestHandleUpscaleAndSpecChanges_VersionUpgradeDataFirstFlow(t *testing.T) { require.NoError(t, k8sClient.Status().Update(context.Background(), &dataSset)) // Update the existing data pods to have the new revision - var pod0 corev1.Pod - require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset-0"}, &pod0)) - pod0.Labels["controller-revision-hash"] = "data-sset-12345" - require.NoError(t, k8sClient.Update(context.Background(), &pod0)) - - var pod1 corev1.Pod - require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset-1"}, &pod1)) - pod1.Labels["controller-revision-hash"] = "data-sset-12345" - require.NoError(t, k8sClient.Update(context.Background(), &pod1)) - + for i := 0; i < int(dataSset.Status.Replicas); i++ { + var pod corev1.Pod + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: fmt.Sprintf("data-sset-%d", i)}, &pod)) + pod.Labels["controller-revision-hash"] = "data-sset-12345" + require.NoError(t, k8sClient.Update(context.Background(), &pod)) + } // Call HandleUpscaleAndSpecChanges and verify that master STS is now set to be upgraded actualStatefulSets = res.ActualStatefulSets _, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) From 7f003887329217048bc725a61060bd5efb3d066a Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Sun, 30 Nov 2025 19:47:37 -0600 Subject: [PATCH 35/39] Ensure the StatefulSet controller has observed the latest generation. Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index dd11016b615..3bb66cf9295 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -338,6 +338,12 @@ func findPendingNonMasterStatefulSetUpgrades( continue } + if actualStatefulSet.Status.ObservedGeneration < actualStatefulSet.Generation { + // The StatefulSet controller has not yet observed the latest generation. + pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) + continue + } + // Check if this StatefulSet has pending updates if actualStatefulSet.Status.UpdatedReplicas != actualStatefulSet.Status.Replicas { pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) From 5142908ff05098170fd00b6b2852321f52daacae Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Thu, 11 Dec 2025 10:14:18 -0600 Subject: [PATCH 36/39] Use reconcileStatefulSet directly Update upscaleResults with results of upscale. Signed-off-by: Michael Montgomery --- .../elasticsearch/driver/upscale.go | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 3bb66cf9295..71674331078 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -101,9 +101,11 @@ func HandleUpscaleAndSpecChanges( // The only adjustment we want to make to master statefulSets before ensuring that all non-master // statefulSets have been reconciled is to potentially scale up the replicas // which should happen 1 at a time as we adjust the replicas early. - if err = maybeUpscaleMasterResources(ctx, masterResources); err != nil { + results, err = maybeUpscaleMasterResources(ctx, actualStatefulSets, masterResources) + if err != nil { return results, fmt.Errorf("while scaling up master resources: %w", err) } + actualStatefulSets = results.ActualStatefulSets // First, reconcile all non-master resources results, err = reconcileResources(ctx, actualStatefulSets, nonMasterResources) @@ -151,7 +153,10 @@ func HandleUpscaleAndSpecChanges( return results, nil } -func maybeUpscaleMasterResources(ctx upscaleCtx, masterResources []nodespec.Resources) error { +func maybeUpscaleMasterResources(ctx upscaleCtx, actualStatefulSets es_sset.StatefulSetList, masterResources []nodespec.Resources) (UpscaleResults, error) { + results := UpscaleResults{ + ActualStatefulSets: actualStatefulSets, + } // Upscale master StatefulSets using the adjusted resources and read the current StatefulSet // from k8s to get the latest state. for _, res := range masterResources { @@ -163,21 +168,22 @@ func maybeUpscaleMasterResources(ctx upscaleCtx, masterResources []nodespec.Reso if apierrors.IsNotFound(err) { continue } - return fmt.Errorf("while getting master StatefulSet %s: %w", stsName, err) + return results, fmt.Errorf("while getting master StatefulSet %s: %w", stsName, err) } actualReplicas := sset.GetReplicas(actualSset) targetReplicas := sset.GetReplicas(res.StatefulSet) if actualReplicas < targetReplicas { - actualSset.Spec.Replicas = ptr.To(targetReplicas) - if err := ctx.k8sClient.Update(ctx.parentCtx, &actualSset); err != nil { - return fmt.Errorf("while upscaling master sts replicas: %w", err) + nodespec.UpdateReplicas(&actualSset, ptr.To(targetReplicas)) + reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, actualSset, ctx.expectations) + if err != nil { + return results, fmt.Errorf("while reconciling master StatefulSet %s: %w", stsName, err) } - ctx.expectations.ExpectGeneration(actualSset) + results.ActualStatefulSets = results.ActualStatefulSets.WithStatefulSet(reconciled) } } - return nil + return results, nil } func podsToCreate( From 4afe2e4dbe825b4710ffb6b7a2cc55e2125ac259 Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Thu, 11 Dec 2025 10:20:13 -0600 Subject: [PATCH 37/39] Add comment for 404 Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 71674331078..f6e4bc9c101 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -165,6 +165,9 @@ func maybeUpscaleMasterResources(ctx upscaleCtx, actualStatefulSets es_sset.Stat // Read the current StatefulSet from k8s to get the latest state var actualSset appsv1.StatefulSet if err := ctx.k8sClient.Get(ctx.parentCtx, k8s.ExtractNamespacedName(&res.StatefulSet), &actualSset); err != nil { + // If the StatefulSet is not found, it means that it has not been created yet, so we can skip it. + // This should only happen when a user is upscaling the master nodes with a new NodeSet/StatefulSet. + // We are only interested in scaling up the existing master StatefulSets in this case. if apierrors.IsNotFound(err) { continue } From 4b966db0cfc4d8ae414c0631cfb4bed412ae480b Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Thu, 11 Dec 2025 10:24:52 -0600 Subject: [PATCH 38/39] Only attempt upscale of masters when there are non-masters Signed-off-by: Michael Montgomery --- pkg/controller/elasticsearch/driver/upscale.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index f6e4bc9c101..84f81d6fcca 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -98,14 +98,18 @@ func HandleUpscaleAndSpecChanges( } } - // The only adjustment we want to make to master statefulSets before ensuring that all non-master - // statefulSets have been reconciled is to potentially scale up the replicas - // which should happen 1 at a time as we adjust the replicas early. - results, err = maybeUpscaleMasterResources(ctx, actualStatefulSets, masterResources) - if err != nil { - return results, fmt.Errorf("while scaling up master resources: %w", err) + // Only attempt this upscale of master StatefulSets if there are any non-master StatefulSets to reconcile, + // otherwise you will immediately upscale the masters, and then delete the new node you added to upgrade. + if len(nonMasterResources) > 0 { + // The only adjustment we want to make to master statefulSets before ensuring that all non-master + // statefulSets have been reconciled is to potentially scale up the replicas + // which should happen 1 at a time as we adjust the replicas early. + results, err = maybeUpscaleMasterResources(ctx, actualStatefulSets, masterResources) + if err != nil { + return results, fmt.Errorf("while scaling up master resources: %w", err) + } + actualStatefulSets = results.ActualStatefulSets } - actualStatefulSets = results.ActualStatefulSets // First, reconcile all non-master resources results, err = reconcileResources(ctx, actualStatefulSets, nonMasterResources) From 6321a96999acfcec183fbb60ed1c4235d8dbd15f Mon Sep 17 00:00:00 2001 From: Michael Montgomery Date: Thu, 11 Dec 2025 10:30:28 -0600 Subject: [PATCH 39/39] Adjust e2e test Signed-off-by: Michael Montgomery --- test/e2e/es/non_master_first_upgrade_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/test/e2e/es/non_master_first_upgrade_test.go b/test/e2e/es/non_master_first_upgrade_test.go index a3979ca6ad9..3a8ff4ff321 100644 --- a/test/e2e/es/non_master_first_upgrade_test.go +++ b/test/e2e/es/non_master_first_upgrade_test.go @@ -93,11 +93,7 @@ func TestNonMasterFirstUpgradeComplexTopology(t *testing.T) { WithESDataNodes(2, elasticsearch.DefaultResources). WithESCoordinatingNodes(1, elasticsearch.DefaultResources) - mutated := initial.WithNoESTopology(). - WithVersion(dstVersion). - WithESMasterNodes(3, elasticsearch.DefaultResources). - WithESDataNodes(2, elasticsearch.DefaultResources). - WithESCoordinatingNodes(1, elasticsearch.DefaultResources) + mutated := initial.WithVersion(dstVersion).WithMutatedFrom(&initial) runNonMasterFirstUpgradeTest(t, initial, mutated) }