Skip to content
Open
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9db32d0
Upgrade masters last when upgrading ES clusters
naemono Oct 15, 2025
39b2702
Fix lint issue
naemono Oct 23, 2025
50b3954
Add e2e test for upgrade order.
naemono Oct 28, 2025
00555c2
unexport things in e2e tests
naemono Oct 29, 2025
88cb347
Also look at the current/target version while determining whether sts is
naemono Oct 29, 2025
790d3f1
Fix tests
naemono Oct 29, 2025
4b944d1
Merge branch 'fix-sts-upgrade-issue-recreation' of github.com:naemono…
naemono Oct 29, 2025
d9885ba
Fix the unit tests for master last upgrades
naemono Oct 29, 2025
efa8643
fix linter
naemono Oct 29, 2025
6914708
move closer to use.
naemono Oct 29, 2025
2dc664b
Ensure requeue
naemono Oct 29, 2025
46c726c
adjust comments
naemono Oct 29, 2025
fccf6c3
Adjust logging in e2e test
naemono Oct 29, 2025
8feef24
Don't compare masters against other masters or themselves.
naemono Oct 30, 2025
0f5a31a
Fix spelling
naemono Oct 30, 2025
030fe16
Also check the generation/observedGeneration.
naemono Nov 4, 2025
6c9e2c5
Merge branch 'fix-sts-upgrade-issue-recreation' of github.com:naemono…
naemono Nov 4, 2025
0db51d8
Debugging
naemono Nov 5, 2025
57c71a9
Remove useless if check.
naemono Nov 5, 2025
c89e872
More targeted debugging
naemono Nov 5, 2025
068fa54
More debugging
naemono Nov 5, 2025
0af6b85
Debugging pod upgrade logic.
naemono Nov 5, 2025
54f9775
Attempt fix for blocked sts upgrades
naemono Nov 6, 2025
5dfdd05
Bug fix adding new master role to existing non-master sts.
naemono Nov 6, 2025
edf3faf
More debugging
naemono Nov 7, 2025
a6d8edc
Adjust to logical or
naemono Nov 7, 2025
8cfa06c
Simplify logic in HandleUpscaleAndSpecChanges
naemono Nov 17, 2025
c2e1161
Remove debugging
naemono Nov 10, 2025
1dcef6c
comments
naemono Nov 10, 2025
eb963e5
Use expectations.
naemono Nov 17, 2025
518d69d
Remove duplicative check now that using expectations.
naemono Nov 17, 2025
16fd9ec
Attempt allow replicas update master (wip)
naemono Nov 17, 2025
8273152
Fix the code allowing replica updates during upgrades.
naemono Nov 18, 2025
11cc0e6
Add comment to test
naemono Nov 18, 2025
645e088
Apply review comments
naemono Dec 1, 2025
3e95b2d
More review comments
naemono Dec 1, 2025
7f00388
Ensure the StatefulSet controller has observed the latest generation.
naemono Dec 1, 2025
5142908
Use reconcileStatefulSet directly
naemono Dec 11, 2025
4afe2e4
Add comment for 404
naemono Dec 11, 2025
4b966db
Only attempt upscale of masters when there are non-masters
naemono Dec 11, 2025
6321a96
Adjust e2e test
naemono Dec 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/controller/elasticsearch/driver/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (d *defaultDriver) reconcileNodeSpecs(
if reconcileState.HasPendingNewNodes() {
results.WithReconciliationState(defaultRequeue.WithReason("Upscale in progress"))
}
if reconcileState.HasPendingNonMasterSTSUpgrades() {
results.WithReconciliationState(defaultRequeue.WithReason("Non-master StatefulSets are still upgrading"))
}
actualStatefulSets = upscaleResults.ActualStatefulSets

// Once all the StatefulSets have been updated we can ensure that the former version of the transport certificates Secret is deleted.
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/elasticsearch/driver/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ func isVersionUpgrade(es esv1.Elasticsearch) (bool, error) {
if err != nil {
return false, err
}

// If status version is empty, this is a new cluster, not an upgrade
if es.Status.Version == "" {
return false, nil
}

statusVersion, err := version.Parse(es.Status.Version)
if err != nil {
return false, err
Expand Down
242 changes: 220 additions & 22 deletions pkg/controller/elasticsearch/driver/upscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,27 @@ package driver
import (
"context"
"fmt"
"slices"

appsv1 "k8s.io/api/apps/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/utils/ptr"

esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/expectations"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/metadata"
sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/reconcile"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/settings"
es_sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version/zen1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version/zen2"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
)

type upscaleCtx struct {
Expand Down Expand Up @@ -66,35 +72,116 @@ func HandleUpscaleAndSpecChanges(
if err != nil {
return results, fmt.Errorf("adjust resources: %w", err)
}
// reconcile all resources
for _, res := range adjusted {
res := res
if err := settings.ReconcileConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config, ctx.meta); err != nil {
return results, fmt.Errorf("reconcile config: %w", err)

// Check if this is a version upgrade
isVersionUpgrade, err := isVersionUpgrade(ctx.es)
if err != nil {
return results, fmt.Errorf("while checking for version upgrade: %w", err)
}

// If this is not a version upgrade, process all resources normally and return
if !isVersionUpgrade {
actualStatefulSets, requeue, err := reconcileResources(ctx, actualStatefulSets, adjusted)
if err != nil {
return results, fmt.Errorf("while reconciling resources: %w", err)
}
if _, err := common.ReconcileService(ctx.parentCtx, ctx.k8sClient, &res.HeadlessService, &ctx.es); err != nil {
return results, fmt.Errorf("reconcile service: %w", err)
results.Requeue = requeue
results.ActualStatefulSets = actualStatefulSets
return results, nil
}

// Version upgrade: separate master and non-master StatefulSets
var masterResources, nonMasterResources []nodespec.Resources
for _, res := range adjusted {
if label.IsMasterNodeSet(res.StatefulSet) {
masterResources = append(masterResources, res)
} else {
nonMasterResources = append(nonMasterResources, res)
}
if actualSset, exists := actualStatefulSets.GetByName(res.StatefulSet.Name); exists {
recreateSset, err := handleVolumeExpansion(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, actualSset, ctx.validateStorageClass)
if err != nil {
return results, fmt.Errorf("handle volume expansion: %w", err)
}
if recreateSset {
// The StatefulSet is scheduled for recreation: let's requeue before attempting any further spec change.
results.Requeue = true
}

// The only adjustment we want to make to master statefulSets before ensuring that all non-master
// statefulSets have been reconciled is to scale up the replicas to the expected number.
// The only adjustment we want to make to master statefulSets before ensuring that all non-master
// statefulSets have been reconciled is to potentially scale up the replicas
// which should happen 1 at a time as we adjust the replicas early.
if err = maybeUpscaleMasterResources(ctx, masterResources); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that calling this when len(nonMasterResources) == 0 (or more generally, when all non-master nodesets have already been upgraded?) can be slightly suboptimal.

Assuming that the initial state is:

apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
  name: elasticsearch-sample
spec:
  version: 9.1.0
  nodeSets:
  - name: default
    config:
      node.roles: ["master", "data", "ingest", "ml"]
      node.store.allow_mmap: false
    count: 3

If we update and upgrade to:

apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
  name: elasticsearch-sample
spec:
  version: 9.1.2
  nodeSets:
  - name: default
    config:
      node.roles: ["master", "data", "ingest", "ml"]
      node.store.allow_mmap: false
    count: 4

Then we are going to scale up the 9.1.0 statefulset, leading to the creation of elasticsearch-sample-es-default-3, but immediately in the next reconciliation we are going to delete elasticsearch-sample-es-default-3 to upgrade it to 9.1.2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My previous comment made me wonder if !isVersionUpgrade is actually the only reason we might want to reconcile everything at once.

Copy link
Contributor Author

@naemono naemono Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this 4b966db what are were intending from this comment @barkbay ?

return results, fmt.Errorf("while scaling up master resources: %w", err)
}

// First, reconcile all non-master resources
actualStatefulSets, requeue, err := reconcileResources(ctx, actualStatefulSets, nonMasterResources)
if err != nil {
return results, fmt.Errorf("while reconciling non-master resources: %w", err)
}
results.ActualStatefulSets = actualStatefulSets

if requeue {
results.Requeue = true
return results, nil
}

targetVersion, err := version.Parse(ctx.es.Spec.Version)
if err != nil {
return results, fmt.Errorf("while parsing Elasticsearch upgrade target version: %w", err)
}

// Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets
pendingNonMasterSTS, err := findPendingNonMasterStatefulSetUpgrades(
ctx.k8sClient,
actualStatefulSets,
expectedResources.StatefulSets(),
targetVersion,
expectations.NewExpectations(ctx.k8sClient),
)
if err != nil {
return results, fmt.Errorf("while checking non-master upgrade status: %w", err)
}

ctx.upscaleReporter.RecordPendingNonMasterSTSUpgrades(pendingNonMasterSTS)

if len(pendingNonMasterSTS) > 0 {
// Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily.
// This will cause a requeue in the caller, and master StatefulSets will attempt to be processed in the next reconciliation
return results, nil
}

// All non-master StatefulSets are upgraded, now process master StatefulSets
actualStatefulSets, results.Requeue, err = reconcileResources(ctx, actualStatefulSets, masterResources)
if err != nil {
return results, fmt.Errorf("while reconciling master resources: %w", err)
}

results.ActualStatefulSets = actualStatefulSets
return results, nil
}

func maybeUpscaleMasterResources(ctx upscaleCtx, masterResources []nodespec.Resources) error {
// Upscale master StatefulSets using the adjusted resources and read the current StatefulSet
// from k8s to get the latest state.
for _, res := range masterResources {
stsName := res.StatefulSet.Name

// Read the current StatefulSet from k8s to get the latest state
var actualSset appsv1.StatefulSet
if err := ctx.k8sClient.Get(ctx.parentCtx, k8s.ExtractNamespacedName(&res.StatefulSet), &actualSset); err != nil {
if apierrors.IsNotFound(err) {
continue
}
return fmt.Errorf("while getting master StatefulSet %s: %w", stsName, err)
}
reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, ctx.expectations)
if err != nil {
return results, fmt.Errorf("reconcile StatefulSet: %w", err)

actualReplicas := sset.GetReplicas(actualSset)
targetReplicas := sset.GetReplicas(res.StatefulSet)

if actualReplicas < targetReplicas {
actualSset.Spec.Replicas = ptr.To[int32](targetReplicas)
if err := ctx.k8sClient.Update(ctx.parentCtx, &actualSset); err != nil {
return fmt.Errorf("while upscaling master sts replicas: %w", err)
}
}
// update actual with the reconciled ones for next steps to work with up-to-date information
actualStatefulSets = actualStatefulSets.WithStatefulSet(reconciled)
}
results.ActualStatefulSets = actualStatefulSets
return results, nil
return nil
}

func podsToCreate(
Expand Down Expand Up @@ -166,3 +253,114 @@ func adjustStatefulSetReplicas(

return expected, nil
}

// reconcileResources handles the common StatefulSet reconciliation logic
// It returns:
// - the updated StatefulSets
// - whether a requeue is needed
// - any errors that occurred
func reconcileResources(
ctx upscaleCtx,
actualStatefulSets es_sset.StatefulSetList,
resources []nodespec.Resources,
) (es_sset.StatefulSetList, bool, error) {
requeue := false
ulog.FromContext(ctx.parentCtx).Info("Reconciling resources", "resource_size", len(resources))
for _, res := range resources {
res := res
if err := settings.ReconcileConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config, ctx.meta); err != nil {
return actualStatefulSets, false, fmt.Errorf("reconcile config: %w", err)
}
if _, err := common.ReconcileService(ctx.parentCtx, ctx.k8sClient, &res.HeadlessService, &ctx.es); err != nil {
return actualStatefulSets, false, fmt.Errorf("reconcile service: %w", err)
}
if actualSset, exists := actualStatefulSets.GetByName(res.StatefulSet.Name); exists {
recreateSset, err := handleVolumeExpansion(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, actualSset, ctx.validateStorageClass)
if err != nil {
return actualStatefulSets, false, fmt.Errorf("handle volume expansion: %w", err)
}
if recreateSset {
ulog.FromContext(ctx.parentCtx).Info("StatefulSet is scheduled for recreation, requeuing", "name", res.StatefulSet.Name)
// The StatefulSet is scheduled for recreation: let's requeue before attempting any further spec change.
requeue = true
continue
}
} else if !exists {
ulog.FromContext(ctx.parentCtx).Info("StatefulSet does not exist", "name", res.StatefulSet.Name)
}
ulog.FromContext(ctx.parentCtx).Info("Reconciling StatefulSet", "name", res.StatefulSet.Name)
reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, ctx.expectations)
if err != nil {
return actualStatefulSets, false, fmt.Errorf("reconcile StatefulSet: %w", err)
}
// update actual with the reconciled ones for next steps to work with up-to-date information
actualStatefulSets = actualStatefulSets.WithStatefulSet(reconciled)
}
ulog.FromContext(ctx.parentCtx).Info("Resources reconciled", "actualStatefulSets_size", len(actualStatefulSets), "requeue", requeue)
return actualStatefulSets, requeue, nil
}

// findPendingNonMasterStatefulSetUpgrades finds all non-master StatefulSets that have not completed their upgrades
func findPendingNonMasterStatefulSetUpgrades(
client k8s.Client,
actualStatefulSets es_sset.StatefulSetList,
expectedStatefulSets es_sset.StatefulSetList,
targetVersion version.Version,
expectations *expectations.Expectations,
) ([]appsv1.StatefulSet, error) {
pendingStatefulSet, err := expectations.ExpectedStatefulSetUpdates.PendingGenerations()
if err != nil {
return nil, err
}

pendingNonMasterSTS := make([]appsv1.StatefulSet, 0)
for _, actualStatefulSet := range actualStatefulSets {
expectedSset, _ := expectedStatefulSets.GetByName(actualStatefulSet.Name)

// Skip master StatefulSets. We check both here because the master role may have been added
// to a non-master StatefulSet during the upgrade spec change.
if label.IsMasterNodeSet(actualStatefulSet) || label.IsMasterNodeSet(expectedSset) {
continue
}

// If the expectations show this as a pending StatefulSet, add it to the list.
if slices.Contains(pendingStatefulSet, actualStatefulSet.Name) {
pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet)
continue
}

// If the StatefulSet is not at the target version, it is not upgraded
// so don't even bother looking at the state/status of the StatefulSet.
actualVersion, err := es_sset.GetESVersion(actualStatefulSet)
if err != nil {
return pendingNonMasterSTS, err
}
if actualVersion.LT(targetVersion) {
pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet)
continue
}

// Check if this StatefulSet has pending updates
if actualStatefulSet.Status.UpdatedReplicas != actualStatefulSet.Status.Replicas {
pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet)
continue
}

// Check if there are any pods that need to be upgraded
pods, err := es_sset.GetActualPodsForStatefulSet(client, k8s.ExtractNamespacedName(&actualStatefulSet))
if err != nil {
return pendingNonMasterSTS, err
}

for _, pod := range pods {
// Check if pod revision matches StatefulSet update revision
if actualStatefulSet.Status.UpdateRevision != "" && sset.PodRevision(pod) != actualStatefulSet.Status.UpdateRevision {
// This pod still needs to be upgraded
pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet)
continue
}
}
}

return pendingNonMasterSTS, nil
}
Loading