Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9db32d0
Upgrade masters last when upgrading ES clusters
naemono Oct 15, 2025
39b2702
Fix lint issue
naemono Oct 23, 2025
50b3954
Add e2e test for upgrade order.
naemono Oct 28, 2025
00555c2
unexport things in e2e tests
naemono Oct 29, 2025
88cb347
Also look at the current/target version while determining whether sts is
naemono Oct 29, 2025
790d3f1
Fix tests
naemono Oct 29, 2025
4b944d1
Merge branch 'fix-sts-upgrade-issue-recreation' of github.com:naemono…
naemono Oct 29, 2025
d9885ba
Fix the unit tests for master last upgrades
naemono Oct 29, 2025
efa8643
fix linter
naemono Oct 29, 2025
6914708
move closer to use.
naemono Oct 29, 2025
2dc664b
Ensure requeue
naemono Oct 29, 2025
46c726c
adjust comments
naemono Oct 29, 2025
fccf6c3
Adjust logging in e2e test
naemono Oct 29, 2025
8feef24
Don't compare masters against other masters or themselves.
naemono Oct 30, 2025
0f5a31a
Fix spelling
naemono Oct 30, 2025
030fe16
Also check the generation/observedGeneration.
naemono Nov 4, 2025
6c9e2c5
Merge branch 'fix-sts-upgrade-issue-recreation' of github.com:naemono…
naemono Nov 4, 2025
0db51d8
Debugging
naemono Nov 5, 2025
57c71a9
Remove useless if check.
naemono Nov 5, 2025
c89e872
More targeted debugging
naemono Nov 5, 2025
068fa54
More debugging
naemono Nov 5, 2025
0af6b85
Debugging pod upgrade logic.
naemono Nov 5, 2025
54f9775
Attempt fix for blocked sts upgrades
naemono Nov 6, 2025
5dfdd05
Bug fix adding new master role to existing non-master sts.
naemono Nov 6, 2025
edf3faf
More debugging
naemono Nov 7, 2025
a6d8edc
Adjust to logical or
naemono Nov 7, 2025
8cfa06c
Simplify logic in HandleUpscaleAndSpecChanges
naemono Nov 17, 2025
c2e1161
Remove debugging
naemono Nov 10, 2025
1dcef6c
comments
naemono Nov 10, 2025
eb963e5
Use expectations.
naemono Nov 17, 2025
518d69d
Remove duplicative check now that using expectations.
naemono Nov 17, 2025
16fd9ec
Attempt allow replicas update master (wip)
naemono Nov 17, 2025
8273152
Fix the code allowing replica updates during upgrades.
naemono Nov 18, 2025
11cc0e6
Add comment to test
naemono Nov 18, 2025
645e088
Apply review comments
naemono Dec 1, 2025
3e95b2d
More review comments
naemono Dec 1, 2025
7f00388
Ensure the StatefulSet controller has observed the latest generation.
naemono Dec 1, 2025
5142908
Use reconcileStatefulSet directly
naemono Dec 11, 2025
4afe2e4
Add comment for 404
naemono Dec 11, 2025
4b966db
Only attempt upscale of masters when there are non-masters
naemono Dec 11, 2025
6321a96
Adjust e2e test
naemono Dec 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/controller/elasticsearch/driver/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (d *defaultDriver) reconcileNodeSpecs(
if reconcileState.HasPendingNewNodes() {
results.WithReconciliationState(defaultRequeue.WithReason("Upscale in progress"))
}
if reconcileState.HasPendingNonMasterSTSUpgrades() {
results.WithReconciliationState(defaultRequeue.WithReason("Non-master StatefulSets are still upgrading"))
}
actualStatefulSets = upscaleResults.ActualStatefulSets

// Once all the StatefulSets have been updated we can ensure that the former version of the transport certificates Secret is deleted.
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/elasticsearch/driver/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ func isVersionUpgrade(es esv1.Elasticsearch) (bool, error) {
if err != nil {
return false, err
}

// If status version is empty, this is a new cluster, not an upgrade
if es.Status.Version == "" {
return false, nil
}

statusVersion, err := version.Parse(es.Status.Version)
if err != nil {
return false, err
Expand Down
255 changes: 235 additions & 20 deletions pkg/controller/elasticsearch/driver/upscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,27 @@ package driver
import (
"context"
"fmt"
"slices"

appsv1 "k8s.io/api/apps/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/utils/ptr"

esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/expectations"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/metadata"
sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/reconcile"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/settings"
es_sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version/zen1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version/zen2"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
)

type upscaleCtx struct {
Expand Down Expand Up @@ -66,34 +72,124 @@ func HandleUpscaleAndSpecChanges(
if err != nil {
return results, fmt.Errorf("adjust resources: %w", err)
}
// reconcile all resources

// Check if this is a version upgrade
isVersionUpgrade, err := isVersionUpgrade(ctx.es)
if err != nil {
return results, fmt.Errorf("while checking for version upgrade: %w", err)
}

// If this is not a version upgrade, process all resources normally and return
if !isVersionUpgrade {
results, err = reconcileResources(ctx, actualStatefulSets, adjusted)
if err != nil {
return results, fmt.Errorf("while reconciling resources: %w", err)
}
return results, nil
}

// Version upgrade: separate master and non-master StatefulSets
var masterResources, nonMasterResources []nodespec.Resources
for _, res := range adjusted {
res := res
if err := settings.ReconcileConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config, ctx.meta); err != nil {
return results, fmt.Errorf("reconcile config: %w", err)
if label.IsMasterNodeSet(res.StatefulSet) {
masterResources = append(masterResources, res)
} else {
nonMasterResources = append(nonMasterResources, res)
}
if _, err := common.ReconcileService(ctx.parentCtx, ctx.k8sClient, &res.HeadlessService, &ctx.es); err != nil {
return results, fmt.Errorf("reconcile service: %w", err)
}

// Only attempt this upscale of master StatefulSets if there are any non-master StatefulSets to reconcile,
// otherwise you will immediately upscale the masters, and then delete the new node you added to upgrade.
if len(nonMasterResources) > 0 {
// The only adjustment we want to make to master statefulSets before ensuring that all non-master
// statefulSets have been reconciled is to potentially scale up the replicas
// which should happen 1 at a time as we adjust the replicas early.
results, err = maybeUpscaleMasterResources(ctx, actualStatefulSets, masterResources)
if err != nil {
return results, fmt.Errorf("while scaling up master resources: %w", err)
}
if actualSset, exists := actualStatefulSets.GetByName(res.StatefulSet.Name); exists {
recreateSset, err := handleVolumeExpansion(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, actualSset, ctx.validateStorageClass)
if err != nil {
return results, fmt.Errorf("handle volume expansion: %w", err)
}
if recreateSset {
// The StatefulSet is scheduled for recreation: let's requeue before attempting any further spec change.
results.Requeue = true
actualStatefulSets = results.ActualStatefulSets
}

// First, reconcile all non-master resources
results, err = reconcileResources(ctx, actualStatefulSets, nonMasterResources)
if err != nil {
return results, fmt.Errorf("while reconciling non-master resources: %w", err)
}
results.ActualStatefulSets = actualStatefulSets

if results.Requeue {
return results, nil
}

targetVersion, err := version.Parse(ctx.es.Spec.Version)
if err != nil {
return results, fmt.Errorf("while parsing Elasticsearch upgrade target version: %w", err)
}

// Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets
pendingNonMasterSTS, err := findPendingNonMasterStatefulSetUpgrades(
ctx.k8sClient,
actualStatefulSets,
expectedResources.StatefulSets(),
targetVersion,
ctx.expectations,
)
if err != nil {
return results, fmt.Errorf("while checking non-master upgrade status: %w", err)
}

ctx.upscaleReporter.RecordPendingNonMasterSTSUpgrades(pendingNonMasterSTS)

if len(pendingNonMasterSTS) > 0 {
// Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily.
// This will cause a requeue in the caller, and master StatefulSets will attempt to be processed in the next reconciliation
return results, nil
}

// All non-master StatefulSets are upgraded, now process master StatefulSets
results, err = reconcileResources(ctx, actualStatefulSets, masterResources)
if err != nil {
return results, fmt.Errorf("while reconciling master resources: %w", err)
}

results.ActualStatefulSets = actualStatefulSets
return results, nil
}

func maybeUpscaleMasterResources(ctx upscaleCtx, actualStatefulSets es_sset.StatefulSetList, masterResources []nodespec.Resources) (UpscaleResults, error) {
results := UpscaleResults{
ActualStatefulSets: actualStatefulSets,
}
// Upscale master StatefulSets using the adjusted resources and read the current StatefulSet
// from k8s to get the latest state.
for _, res := range masterResources {
stsName := res.StatefulSet.Name

// Read the current StatefulSet from k8s to get the latest state
var actualSset appsv1.StatefulSet
if err := ctx.k8sClient.Get(ctx.parentCtx, k8s.ExtractNamespacedName(&res.StatefulSet), &actualSset); err != nil {
// If the StatefulSet is not found, it means that it has not been created yet, so we can skip it.
// This should only happen when a user is upscaling the master nodes with a new NodeSet/StatefulSet.
// We are only interested in scaling up the existing master StatefulSets in this case.
if apierrors.IsNotFound(err) {
continue
}
return results, fmt.Errorf("while getting master StatefulSet %s: %w", stsName, err)
}
reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, ctx.expectations)
if err != nil {
return results, fmt.Errorf("reconcile StatefulSet: %w", err)

actualReplicas := sset.GetReplicas(actualSset)
targetReplicas := sset.GetReplicas(res.StatefulSet)

if actualReplicas < targetReplicas {
nodespec.UpdateReplicas(&actualSset, ptr.To(targetReplicas))
reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, actualSset, ctx.expectations)
if err != nil {
return results, fmt.Errorf("while reconciling master StatefulSet %s: %w", stsName, err)
}
results.ActualStatefulSets = results.ActualStatefulSets.WithStatefulSet(reconciled)
}
// update actual with the reconciled ones for next steps to work with up-to-date information
actualStatefulSets = actualStatefulSets.WithStatefulSet(reconciled)
}
results.ActualStatefulSets = actualStatefulSets
return results, nil
}

Expand Down Expand Up @@ -166,3 +262,122 @@ func adjustStatefulSetReplicas(

return expected, nil
}

// reconcileResources handles the common StatefulSet reconciliation logic
// It returns:
// - the updated StatefulSets
// - whether a requeue is needed
// - any errors that occurred
func reconcileResources(
ctx upscaleCtx,
actualStatefulSets es_sset.StatefulSetList,
resources []nodespec.Resources,
) (UpscaleResults, error) {
results := UpscaleResults{
ActualStatefulSets: actualStatefulSets,
}
ulog.FromContext(ctx.parentCtx).Info("Reconciling resources", "resource_size", len(resources))
for _, res := range resources {
res := res
if err := settings.ReconcileConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config, ctx.meta); err != nil {
return results, fmt.Errorf("reconcile config: %w", err)
}
if _, err := common.ReconcileService(ctx.parentCtx, ctx.k8sClient, &res.HeadlessService, &ctx.es); err != nil {
return results, fmt.Errorf("reconcile service: %w", err)
}
if actualSset, exists := actualStatefulSets.GetByName(res.StatefulSet.Name); exists {
recreateSset, err := handleVolumeExpansion(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, actualSset, ctx.validateStorageClass)
if err != nil {
return results, fmt.Errorf("handle volume expansion: %w", err)
}
if recreateSset {
ulog.FromContext(ctx.parentCtx).Info("StatefulSet is scheduled for recreation, requeuing", "name", res.StatefulSet.Name)
// The StatefulSet is scheduled for recreation: let's requeue before attempting any further spec change.
results.Requeue = true
continue
}
} else {
ulog.FromContext(ctx.parentCtx).Info("StatefulSet does not exist", "name", res.StatefulSet.Name)
}
ulog.FromContext(ctx.parentCtx).Info("Reconciling StatefulSet", "name", res.StatefulSet.Name)
reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, ctx.expectations)
if err != nil {
return results, fmt.Errorf("reconcile StatefulSet: %w", err)
}
// update actual with the reconciled ones for next steps to work with up-to-date information
results.ActualStatefulSets = results.ActualStatefulSets.WithStatefulSet(reconciled)
}
ulog.FromContext(ctx.parentCtx).Info("Resources reconciled", "actualStatefulSets_size", len(results.ActualStatefulSets), "requeue", results.Requeue)
return results, nil
}

// findPendingNonMasterStatefulSetUpgrades finds all non-master StatefulSets that have not completed their upgrades
func findPendingNonMasterStatefulSetUpgrades(
client k8s.Client,
actualStatefulSets es_sset.StatefulSetList,
expectedStatefulSets es_sset.StatefulSetList,
targetVersion version.Version,
expectations *expectations.Expectations,
) ([]appsv1.StatefulSet, error) {
pendingStatefulSets, err := expectations.ExpectedStatefulSetUpdates.PendingGenerations()
if err != nil {
return nil, err
}

pendingNonMasterSTS := make([]appsv1.StatefulSet, 0)
for _, actualStatefulSet := range actualStatefulSets {
expectedSset, _ := expectedStatefulSets.GetByName(actualStatefulSet.Name)

// Skip master StatefulSets. We check both here because the master role may have been added
// to a non-master StatefulSet during the upgrade spec change.
if label.IsMasterNodeSet(actualStatefulSet) || label.IsMasterNodeSet(expectedSset) {
continue
}

// If the expectations show this as a pending StatefulSet, add it to the list.
if slices.Contains(pendingStatefulSets, actualStatefulSet.Name) {
pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet)
continue
}

// If the StatefulSet is not at the target version, it is not upgraded
// so don't even bother looking at the state/status of the StatefulSet.
actualVersion, err := es_sset.GetESVersion(actualStatefulSet)
if err != nil {
return pendingNonMasterSTS, err
}
if actualVersion.LT(targetVersion) {
pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet)
continue
}

if actualStatefulSet.Status.ObservedGeneration < actualStatefulSet.Generation {
// The StatefulSet controller has not yet observed the latest generation.
pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet)
continue
}

// Check if this StatefulSet has pending updates
if actualStatefulSet.Status.UpdatedReplicas != actualStatefulSet.Status.Replicas {
pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet)
continue
}

// Check if there are any pods that need to be upgraded
pods, err := es_sset.GetActualPodsForStatefulSet(client, k8s.ExtractNamespacedName(&actualStatefulSet))
if err != nil {
return pendingNonMasterSTS, err
}

for _, pod := range pods {
// Check if pod revision matches StatefulSet update revision
if actualStatefulSet.Status.UpdateRevision != "" && sset.PodRevision(pod) != actualStatefulSet.Status.UpdateRevision {
// This pod still needs to be upgraded
pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet)
break
}
}
}

return pendingNonMasterSTS, nil
}
Loading