Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fleet: Allow increasing the fleet replicas during a rolling update #3977

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions install/helm/agones/templates/crds/fleet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ spec:
anyOf:
- type: integer
- type: string
allowReplicaSurge:
type: boolean
description: Allows the fleet to surge replicas beyond the specified limit.
default: false
priorities:
type: array
description: Configuration of Counters and Lists scale down logic -- which gameservers in the Fleet are most important to keep around.
Expand Down
4 changes: 4 additions & 0 deletions install/yaml/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ spec:
anyOf:
- type: integer
- type: string
allowReplicaSurge:
type: boolean
description: Allows the fleet to surge replicas beyond the specified limit.
default: false
priorities:
type: array
description: Configuration of Counters and Lists scale down logic -- which gameservers in the Fleet are most important to keep around.
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/agones/v1/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type FleetSpec struct {
Priorities []Priority `json:"priorities,omitempty"`
// Template the GameServer template to apply for this Fleet
Template GameServerTemplateSpec `json:"template"`
// Allows to surge the number of replicas above the desired replicas when scaling up.
AllowReplicaSurge bool `json:"allowReplicaSurge"`
}

// FleetStatus is the status of a Fleet
Expand Down
17 changes: 15 additions & 2 deletions pkg/fleets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,25 @@ func (c *Controller) applyDeploymentStrategy(ctx context.Context, fleet *agonesv
return fleet.Spec.Replicas, nil
}

// if we do have `rest` but all their spec.replicas is zero, we can just do subtraction against whatever is allocated in `rest`.
if agonesv1.SumSpecReplicas(rest) == 0 {
blocked := agonesv1.SumGameServerSets(rest, func(gsSet *agonesv1.GameServerSet) int32 {
return gsSet.Status.ReservedReplicas + gsSet.Status.AllocatedReplicas
})
replicas := fleet.Spec.Replicas - blocked
var replicas int32
if !fleet.Spec.AllowReplicaSurge {
// if we do have `rest` but all their spec.replicas is zero, we can just do subtraction against whatever is allocated in `rest`.
replicas = fleet.Spec.Replicas - blocked
} else if fleet.Spec.Strategy.Type == appsv1.RollingUpdateDeploymentStrategyType {
// Allow to temporarily increase the number of replicas when doing a rolling update.
r, err := intstr.GetValueFromIntOrPercent(fleet.Spec.Strategy.RollingUpdate.MaxSurge, int(fleet.Spec.Replicas), true)
if err != nil {
return 0, errors.Wrapf(err, "error parsing MaxSurge value: %s", fleet.ObjectMeta.Name)
}
replicas = fleet.Spec.Replicas + int32(r) - blocked
if replicas > int32(r) {
replicas = int32(r)
}
}
if replicas < 0 {
replicas = 0
}
Expand Down
222 changes: 222 additions & 0 deletions test/e2e/fleet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package e2e
import (
"context"
"fmt"
"slices"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -580,6 +581,227 @@ func TestFleetRollingUpdate(t *testing.T) {
}
}

// TestFleetRollingUpdateOverflow checks that the fleet replica count can scale up while some of the initial replicas are in allocated mode
// when using setting the AllowReplicaSurge to true.
func TestFleetRollingUpdateOverflow(t *testing.T) {
t.Parallel()
ctx := context.Background()
nInitialReplicas := 3
fixtures := []struct {
// RollingUpdate maxSurge parameter.
maxSurge string
// The value of AllowReplicaSurge in the fleet spec.
allowReplicaSurge bool
// The number of initial replicas to put in allocated mode before starting the RollingUpdate.
nInitialAllocatedReplicas int
}{
{
maxSurge: "25%",
allowReplicaSurge: false,
nInitialAllocatedReplicas: nInitialReplicas,
},
{
maxSurge: "25%",
allowReplicaSurge: true,
nInitialAllocatedReplicas: nInitialReplicas,
},
{
maxSurge: "99%",
allowReplicaSurge: true,
nInitialAllocatedReplicas: nInitialReplicas - 2,
},
{
maxSurge: "99%",
allowReplicaSurge: true,
nInitialAllocatedReplicas: nInitialReplicas - 1,
},
}
for i := range fixtures {
fixture := fixtures[i]
t.Run(fmt.Sprintf("Use fleet maxSurge %s allowReplicaSurge %t nInitialAllocatedReplicas %d", fixture.maxSurge, fixture.allowReplicaSurge, fixture.nInitialAllocatedReplicas), func(t *testing.T) {
assert.LessOrEqual(t, fixture.nInitialAllocatedReplicas, nInitialReplicas)
assert.GreaterOrEqual(t, nInitialReplicas, 3)
assert.Greater(t, fixture.nInitialAllocatedReplicas, 0)
client := framework.AgonesClient.AgonesV1()

val := intstr.FromString(fixture.maxSurge)
expectedReplicaSurge, err := intstr.GetValueFromIntOrPercent(&val, nInitialReplicas, true)
require.NoError(t, err)
if !fixture.allowReplicaSurge {
expectedReplicaSurge = 0
}
flt := defaultFleet(framework.Namespace)
flt.ApplyDefaults()
flt.Spec.Replicas = int32(nInitialReplicas)
rollingUpdatePercent := intstr.FromString(fixture.maxSurge)
flt.Spec.Strategy.RollingUpdate.MaxSurge = &rollingUpdatePercent
flt.Spec.Strategy.RollingUpdate.MaxUnavailable = &rollingUpdatePercent
flt.Spec.AllowReplicaSurge = fixture.allowReplicaSurge
log := e2e.TestLogger(t).WithField("fleet", flt.ObjectMeta.Name)

flt, err = client.Fleets(framework.Namespace).Create(ctx, flt, metav1.CreateOptions{})
require.NoError(t, err)
defer client.Fleets(framework.Namespace).Delete(ctx, flt.ObjectMeta.Name, metav1.DeleteOptions{}) // nolint:errcheck

assert.Equal(t, int32(nInitialReplicas), flt.Spec.Replicas)
assert.Equal(t, fixture.maxSurge, flt.Spec.Strategy.RollingUpdate.MaxSurge.StrVal)
assert.Equal(t, fixture.maxSurge, flt.Spec.Strategy.RollingUpdate.MaxUnavailable.StrVal)
assert.Equal(t, fixture.allowReplicaSurge, flt.Spec.AllowReplicaSurge)

flt, err = client.Fleets(framework.Namespace).Get(ctx, flt.ObjectMeta.GetName(), metav1.GetOptions{})
require.NoError(t, err)

// Wait for all the GS in the fleet to be ready.
err = framework.WaitForFleetCondition(t, flt, func(entry *logrus.Entry, fleet *agonesv1.Fleet) bool {
return fleet.Status.ReadyReplicas == int32(nInitialReplicas)
})
assert.NoError(t, err)

// Put some of the replicas in allocated mode. This can simulation scenarios similar to what's described in #3970, where the fleet cannot create new
// replicas when performing a RollingUpdate when all the initial replicas are in allocated mode.
allocatedGs := map[string]bool{}
for i := 0; i < fixture.nInitialAllocatedReplicas; i++ {
allocation := framework.CreateAndApplyAllocation(t, flt)
allocatedGs[allocation.Status.GameServerName] = true
}
assert.Equal(t, len(allocatedGs), fixture.nInitialAllocatedReplicas)
err = framework.WaitForFleetCondition(t, flt, func(entry *logrus.Entry, fleet *agonesv1.Fleet) bool {
return fleet.Status.ReadyReplicas == int32(nInitialReplicas-fixture.nInitialAllocatedReplicas) && fleet.Status.AllocatedReplicas == int32(fixture.nInitialAllocatedReplicas)
})
assert.NoError(t, err)

// Change ContainerPort to trigger creating a new GSSet. Retry in case of a conflict. This simulations a redeployment.
fltName := flt.GetName()
require.Eventually(t, func() bool {
flt, err = client.Fleets(framework.Namespace).Get(ctx, fltName, metav1.GetOptions{})
require.NoError(t, err)
fltCopy := flt.DeepCopy()
fltCopy.Spec.Template.Spec.Ports[0].ContainerPort++
flt, err = client.Fleets(framework.Namespace).Update(ctx, fltCopy, metav1.UpdateOptions{})
if err != nil {
log.WithError(err).Info("Failed to update Fleet")
}
return err == nil
}, time.Minute, time.Second)

selector := labels.SelectorFromSet(labels.Set{agonesv1.FleetNameLabel: flt.ObjectMeta.Name})
// We now should have two GSSets.
err = wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
gssList, err := framework.AgonesClient.AgonesV1().GameServerSets(framework.Namespace).List(ctx,
metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return false, err
}
return len(gssList.Items) == 2, nil
})
assert.NoError(t, err)

// The new GSSet should have scaled up based on whether the fleet allows for replica surge.
err = framework.WaitForFleetCondition(t, flt, func(entry *logrus.Entry, fleet *agonesv1.Fleet) bool {
return fleet.Status.ReadyReplicas == int32(expectedReplicaSurge)
})
assert.NoError(t, err)

// Check that total number of gameservers in the system does not exceed the RollingUpdate
// parameters (creating no more than maxSurge, deleting maxUnavailable servers at a time)
// Wait for old GSSet to be deleted
err = wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) {
list, err := framework.AgonesClient.AgonesV1().GameServers(framework.Namespace).List(ctx,
metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return false, err
}

maxSurge, err := intstr.GetScaledValueFromIntOrPercent(flt.Spec.Strategy.RollingUpdate.MaxSurge, int(flt.Spec.Replicas), true)
require.NoError(t, err)

roundUp := false
maxUnavailable, err := intstr.GetScaledValueFromIntOrPercent(flt.Spec.Strategy.RollingUpdate.MaxUnavailable, int(flt.Spec.Replicas), roundUp)

if maxUnavailable == 0 {
maxUnavailable = 1
}
// This difference is inevitable, also could be seen with Deployments and ReplicaSets
shift := maxUnavailable
require.NoError(t, err)

// Ignore any GameServers that are shutting down (resulting from Allocation cycling).
shuttingDown := 0
for _, gs := range list.Items {
if gs.IsBeingDeleted() {
shuttingDown++
}
}
expectedTotal := nInitialReplicas + maxSurge + maxUnavailable + shift + shuttingDown
if len(list.Items) > expectedTotal {
err = fmt.Errorf("new replicas should be less than target + maxSurge + maxUnavailable + shift + shuttingDown. Replicas: %d, Expected: %d", len(list.Items), expectedTotal)
}
if err != nil {
return false, err
}
gssList, err := framework.AgonesClient.AgonesV1().GameServerSets(framework.Namespace).List(ctx,
metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return false, err
}

// There must be two GSSets.
if len(gssList.Items) != 2 {
return false, nil
}

// The old GSSet must have zero ready replicas (i.e. the controller should have got rid of the ones in ready mode).
// The new GSSet must hold the expected number of ready replicas based on the surge options.
readyReplicas := []int{0, 0}
for i, v := range gssList.Items {
readyReplicas[i] = int(v.Status.ReadyReplicas)
}
slices.Sort(readyReplicas)

return readyReplicas[0] == 0 && readyReplicas[1] == expectedReplicaSurge, nil
})
assert.NoError(t, err)

// Delete the initial GSS one by one and check how the fleet replica count is distributed across the GSSets.
// This simulates the scenario in which the old GSSet GameServers are being slowly deleted after a redeployment.
gssList, err := framework.AgonesClient.AgonesV1().GameServerSets(framework.Namespace).List(ctx,
metav1.ListOptions{LabelSelector: selector.String()})
require.NoError(t, err)
found := false

for _, gss := range gssList.Items {
if gss.Status.AllocatedReplicas == 0 {
// This is the new GSSet.
continue
}
assert.False(t, found)
found = true

for gs := range allocatedGs {
err := framework.AgonesClient.AgonesV1().GameServers(framework.Namespace).Delete(ctx, gs, metav1.DeleteOptions{})
assert.NoError(t, err)

// The old GSSet is scaling down and the new one is scaling up according to the surge settings. Make sure that the amount
// of ready replicas is kept in check.
err = framework.WaitForFleetCondition(t, flt, func(entry *logrus.Entry, fleet *agonesv1.Fleet) bool {
return fleet.Status.ReadyReplicas == int32(expectedReplicaSurge)
})
require.NoError(t, err)
}
err = framework.AgonesClient.AgonesV1().GameServerSets(framework.Namespace).Delete(ctx, gss.Name, metav1.DeleteOptions{})
require.NoError(t, err)
}
assert.True(t, found)

// After the old GSSet is deleted the new one should have all the replicas.
err = framework.WaitForFleetCondition(t, flt, func(entry *logrus.Entry, fleet *agonesv1.Fleet) bool {
return fleet.Status.AllocatedReplicas == 0 && fleet.Status.ReadyReplicas == int32(nInitialReplicas)
})
require.NoError(t, err)
})
}
}

func TestUpdateFleetReplicaAndSpec(t *testing.T) {
t.Parallel()

Expand Down
Loading