Skip to content

Commit

Permalink
fix: ZonalAllocationFailure handling (and general cleanup of zone nam…
Browse files Browse the repository at this point in the history
…e handling) (#637)

* fix: consolidate and streamline all zone handling

(as a side effect, this also fixes ZonalAllocationFailure handling)

* fix: cleanup and fix existing zone-related tests

* fix: GetZone implementation and test

* fix: capture MockedFunction/LRO input on error

* test: should handle ZonalAllocationFailed on creating the VM

* chore: make presubmit

---------

Co-authored-by: Bryce Soghigian <[email protected]>
  • Loading branch information
tallaxes and Bryce-Soghigian authored Jan 13, 2025
1 parent cf7daeb commit 15e1a82
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 129 deletions.
12 changes: 1 addition & 11 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,9 @@ func (c *CloudProvider) instanceToNodeClaim(ctx context.Context, vm *armcompute.
nodeClaim.Status.Allocatable = lo.PickBy(instanceType.Allocatable(), func(_ v1.ResourceName, v resource.Quantity) bool { return !resources.IsZero(v) })
}

// TODO: review logic for determining zone (AWS uses Zone from subnet resolved and aviailable from NodeClass conditions ...)
if zoneID, err := instance.GetZoneID(vm); err != nil {
if zone, err := utils.GetZone(vm); err != nil {
logging.FromContext(ctx).Warnf("Failed to get zone for VM %s, %v", *vm.Name, err)
} else {
zone := makeZone(*vm.Location, zoneID)
// aks-node-validating-webhook protects v1.LabelTopologyZone, will be set elsewhere, so we use a different label
labels[v1alpha2.AlternativeLabelTopologyZone] = zone
}
Expand Down Expand Up @@ -369,14 +367,6 @@ func GenerateNodeClaimName(vmName string) string {
return strings.TrimLeft("aks-", vmName)
}

// makeZone returns the zone value in format of <region>-<zone-id>.
func makeZone(location string, zoneID string) string {
if zoneID == "" {
return ""
}
return fmt.Sprintf("%s-%s", strings.ToLower(location), zoneID)
}

// newTerminatingNodeClassError returns a NotFound error for handling by
func newTerminatingNodeClassError(name string) *errors.StatusError {
qualifiedResource := schema.GroupResource{Group: apis.Group, Resource: "aksnodeclasses"}
Expand Down
7 changes: 3 additions & 4 deletions pkg/fake/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,12 @@ func (m *MockedFunction[I, O]) Reset() {
}

func (m *MockedFunction[I, O]) Invoke(input *I, defaultTransformer func(*I) (O, error)) (O, error) {
m.CalledWithInput.Add(input)
err := m.Error.Get()
if err != nil {
m.failedCalls.Add(1)
return *new(O), err
}
m.CalledWithInput.Add(input)

if !m.Output.IsNil() {
m.successfulCalls.Add(1)
return *m.Output.Clone(), nil
Expand Down Expand Up @@ -94,6 +93,8 @@ func (m *MockedLRO[I, O]) Reset() {
}

func (m *MockedLRO[I, O]) Invoke(input *I, defaultTransformer func(*I) (*O, error)) (*runtime.Poller[O], error) {
m.CalledWithInput.Add(input)

if err := m.BeginError.Get(); err != nil {
m.failedCalls.Add(1)
return nil, err
Expand All @@ -103,8 +104,6 @@ func (m *MockedLRO[I, O]) Invoke(input *I, defaultTransformer func(*I) (*O, erro
return newMockPoller[O](nil, err)
}

m.CalledWithInput.Add(input)

if !m.Output.IsNil() {
m.successfulCalls.Add(1)
return newMockPoller(m.Output.Clone(), nil)
Expand Down
30 changes: 3 additions & 27 deletions pkg/providers/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/Azure/karpenter-provider-azure/pkg/providers/instancetype"
"github.com/Azure/karpenter-provider-azure/pkg/providers/launchtemplate"
"github.com/Azure/karpenter-provider-azure/pkg/providers/loadbalancer"
"github.com/Azure/karpenter-provider-azure/pkg/utils"

corecloudprovider "sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/scheduling"
Expand Down Expand Up @@ -140,7 +141,7 @@ func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1alpha2.AKSNod
}
return nil, err
}
zone, err := GetZoneID(vm)
zone, err := utils.GetZone(vm)
if err != nil {
logging.FromContext(ctx).Error(err)
}
Expand Down Expand Up @@ -375,7 +376,7 @@ func newVMObject(
CapacityTypeToPriority[capacityType]),
),
},
Zones: lo.Ternary(len(zone) > 0, []*string{&zone}, []*string{}),
Zones: utils.MakeVMZone(zone),
Tags: launchTemplate.Tags,
}
setVMPropertiesOSDiskType(vm.Properties, launchTemplate.StorageProfile)
Expand Down Expand Up @@ -628,11 +629,6 @@ func (p *DefaultProvider) pickSkuSizePriorityAndZone(ctx context.Context, nodeCl
})
zonesWithPriority := lo.Map(priorityOfferings, func(o corecloudprovider.Offering, _ int) string { return getOfferingZone(o) })
if zone, ok := sets.New(zonesWithPriority...).PopAny(); ok {
if len(zone) > 0 {
// Zones in zonal Offerings have <region>-<number> format; the zone returned from here will be used for VM instantiation,
// which expects just the zone number, without region
zone = string(zone[len(zone)-1])
}
return instanceType, priority, zone
}
return nil, "", ""
Expand Down Expand Up @@ -752,26 +748,6 @@ func (p *DefaultProvider) getCSExtension(cse string, isWindows bool) *armcompute
}
}

// GetZoneID returns the zone ID for the given virtual machine, or an empty string if there is no zone specified
func GetZoneID(vm *armcompute.VirtualMachine) (string, error) {
if vm == nil {
return "", fmt.Errorf("cannot pass in a nil virtual machine")
}
if vm.Name == nil {
return "", fmt.Errorf("virtual machine is missing name")
}
if vm.Zones == nil {
return "", nil
}
if len(vm.Zones) == 1 {
return *(vm.Zones)[0], nil
}
if len(vm.Zones) > 1 {
return "", fmt.Errorf("virtual machine %v has multiple zones", *vm.Name)
}
return "", nil
}

func GetListQueryBuilder(rg string) *kql.Builder {
return kql.New(`Resources`).
AddLiteral(` | where type == "microsoft.compute/virtualmachines"`).
Expand Down
57 changes: 1 addition & 56 deletions pkg/providers/instance/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute"
"github.com/Azure/karpenter-provider-azure/pkg/cache"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -79,7 +78,7 @@ func TestGetPriorityCapacityAndInstanceType(t *testing.T) {
},
nodeClaim: &karpv1.NodeClaim{},
expectedInstanceType: "Standard_D2s_v3",
expectedZone: "2",
expectedZone: "westus-2",
expectedPriority: karpv1.CapacityTypeOnDemand,
},
}
Expand All @@ -101,60 +100,6 @@ func TestGetPriorityCapacityAndInstanceType(t *testing.T) {
}
}

func TestGetZone(t *testing.T) {
testVMName := "silly-armcompute"
tc := []struct {
testName string
input *armcompute.VirtualMachine
expectedZone string
expectedError string
}{
{
testName: "missing name",
input: &armcompute.VirtualMachine{
Name: nil,
},
expectedError: "virtual machine is missing name",
},
{
testName: "invalid virtual machine struct",
input: nil,
expectedError: "cannot pass in a nil virtual machine",
},
{
testName: "invalid zones field in virtual machine struct",
input: &armcompute.VirtualMachine{
Name: &testVMName,
},
expectedError: "virtual machine silly-armcompute zones are nil",
},
{
testName: "happy case",
input: &armcompute.VirtualMachine{
Name: &testVMName,
Zones: []*string{to.Ptr("poland-central")},
},
expectedZone: "poland-central",
},
{
testName: "emptyZones",
input: &armcompute.VirtualMachine{
Name: &testVMName,
Zones: []*string{},
},
expectedError: "virtual machine silly-armcompute does not have any zones specified",
},
}

for _, c := range tc {
zone, err := GetZoneID(c.input)
assert.Equal(t, c.expectedZone, zone, c.testName)
if err != nil {
assert.Equal(t, c.expectedError, err.Error(), c.testName)
}
}
}

// Currently tested: ID, Name, Tags, Zones
// TODO: Add the below attributes for Properties if needed:
// Priority, InstanceView.HyperVGeneration, TimeCreated
Expand Down
2 changes: 1 addition & 1 deletion pkg/providers/instancetype/instancetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func instanceTypeZones(sku *skewer.SKU, region string) sets.Set[string] {
skuZones := lo.Keys(sku.AvailabilityZones(region))
if hasZonalSupport(region) && len(skuZones) > 0 {
return sets.New(lo.Map(skuZones, func(zone string, _ int) string {
return fmt.Sprintf("%s-%s", region, zone)
return utils.MakeZone(region, zone)
})...)
}
return sets.New("") // empty string means non-zonal offering
Expand Down
77 changes: 49 additions & 28 deletions pkg/providers/instancetype/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ var coreProvisioner, coreProvisionerNonZonal *provisioning.Provisioner
var cluster, clusterNonZonal *state.Cluster
var cloudProvider, cloudProviderNonZonal *cloudprovider.CloudProvider

var fakeZone1 = utils.MakeZone(fake.Region, "1")

func TestAzure(t *testing.T) {
ctx = TestContextWithLogger(t)
RegisterFailHandler(Fail)
Expand Down Expand Up @@ -589,8 +591,8 @@ var _ = Describe("InstanceType Provider", func() {

Context("Unavailable Offerings", func() {
It("should not allocate a vm in a zone marked as unavailable", func() {
azureEnv.UnavailableOfferingsCache.MarkUnavailable(ctx, "ZonalAllocationFailure", "Standard_D2_v2", fmt.Sprintf("%s-1", fake.Region), karpv1.CapacityTypeSpot)
azureEnv.UnavailableOfferingsCache.MarkUnavailable(ctx, "ZonalAllocationFailure", "Standard_D2_v2", fmt.Sprintf("%s-1", fake.Region), karpv1.CapacityTypeOnDemand)
azureEnv.UnavailableOfferingsCache.MarkUnavailable(ctx, "ZonalAllocationFailure", "Standard_D2_v2", fakeZone1, karpv1.CapacityTypeSpot)
azureEnv.UnavailableOfferingsCache.MarkUnavailable(ctx, "ZonalAllocationFailure", "Standard_D2_v2", fakeZone1, karpv1.CapacityTypeOnDemand)
coretest.ReplaceRequirements(nodePool, karpv1.NodeSelectorRequirementWithMinValues{
NodeSelectorRequirement: v1.NodeSelectorRequirement{
Key: v1.LabelInstanceTypeStable,
Expand All @@ -599,19 +601,38 @@ var _ = Describe("InstanceType Provider", func() {
}})

ExpectApplied(ctx, env.Client, nodePool, nodeClass)
// Try this 100 times to make sure we don't get a node in eastus-1,
// we pick from 3 zones so the likelihood of this test passing by chance is 1/3^100
for i := 0; i < 100; i++ {
pod := coretest.UnschedulablePod()
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, coreProvisioner, pod)
ExpectScheduled(ctx, env.Client, pod)
nodes := &v1.NodeList{}
Expect(env.Client.List(ctx, nodes)).To(Succeed())
for _, node := range nodes.Items {
Expect(node.Labels["karpenter.kubernetes.azure/zone"]).ToNot(Equal(fmt.Sprintf("%s-1", fake.Region)))
Expect(node.Labels["node.kubernetes.io/instance-type"]).To(Equal("Standard_D2_v2"))
}
}
pod := coretest.UnschedulablePod()
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, coreProvisioner, pod)
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels[v1alpha2.AlternativeLabelTopologyZone]).ToNot(Equal(fakeZone1))
Expect(node.Labels[v1.LabelInstanceTypeStable]).To(Equal("Standard_D2_v2"))
})
It("should handle ZonalAllocationFailed on creating the VM", func() {
azureEnv.VirtualMachinesAPI.VirtualMachinesBehavior.VirtualMachineCreateOrUpdateBehavior.Error.Set(
&azcore.ResponseError{ErrorCode: sdkerrors.ZoneAllocationFailed},
)
coretest.ReplaceRequirements(nodePool, karpv1.NodeSelectorRequirementWithMinValues{
NodeSelectorRequirement: v1.NodeSelectorRequirement{
Key: v1.LabelInstanceTypeStable,
Operator: v1.NodeSelectorOpIn,
Values: []string{"Standard_D2_v2"},
}})

ExpectApplied(ctx, env.Client, nodePool, nodeClass)
pod := coretest.UnschedulablePod()
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, coreProvisioner, pod)
ExpectNotScheduled(ctx, env.Client, pod)

By("marking whatever zone was picked as unavailable - for both spot and on-demand")
zone, err := utils.GetZone(&azureEnv.VirtualMachinesAPI.VirtualMachineCreateOrUpdateBehavior.CalledWithInput.Pop().VM)
Expect(err).ToNot(HaveOccurred())
Expect(azureEnv.UnavailableOfferingsCache.IsUnavailable("Standard_D2_v2", zone, karpv1.CapacityTypeSpot)).To(BeTrue())
Expect(azureEnv.UnavailableOfferingsCache.IsUnavailable("Standard_D2_v2", zone, karpv1.CapacityTypeOnDemand)).To(BeTrue())

By("successfully scheduling in a different zone on retry")
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, coreProvisioner, pod)
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels[v1alpha2.AlternativeLabelTopologyZone]).ToNot(Equal(zone))
})

DescribeTable("Should not return unavailable offerings", func(azEnv *test.Environment) {
Expand Down Expand Up @@ -641,8 +662,8 @@ var _ = Describe("InstanceType Provider", func() {
)

It("should launch instances in a different zone than preferred", func() {
azureEnv.UnavailableOfferingsCache.MarkUnavailable(ctx, "ZonalAllocationFailure", "Standard_D2_v2", fmt.Sprintf("%s-1", fake.Region), karpv1.CapacityTypeOnDemand)
azureEnv.UnavailableOfferingsCache.MarkUnavailable(ctx, "ZonalAllocationFailure", "Standard_D2_v2", fmt.Sprintf("%s-1", fake.Region), karpv1.CapacityTypeSpot)
azureEnv.UnavailableOfferingsCache.MarkUnavailable(ctx, "ZonalAllocationFailure", "Standard_D2_v2", fakeZone1, karpv1.CapacityTypeOnDemand)
azureEnv.UnavailableOfferingsCache.MarkUnavailable(ctx, "ZonalAllocationFailure", "Standard_D2_v2", fakeZone1, karpv1.CapacityTypeSpot)

ExpectApplied(ctx, env.Client, nodeClass, nodePool)
pod := coretest.UnschedulablePod(coretest.PodOptions{
Expand All @@ -651,18 +672,18 @@ var _ = Describe("InstanceType Provider", func() {
pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{
{
Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{fmt.Sprintf("%s-1", fake.Region)}},
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{fakeZone1}},
}},
},
}}}
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, coreProvisioner, pod)
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels["karpenter.kubernetes.azure/zone"]).ToNot(Equal(fmt.Sprintf("%s-1", fake.Region)))
Expect(node.Labels["node.kubernetes.io/instance-type"]).To(Equal("Standard_D2_v2"))
Expect(node.Labels[v1alpha2.AlternativeLabelTopologyZone]).ToNot(Equal(fakeZone1))
Expect(node.Labels[v1.LabelInstanceTypeStable]).To(Equal("Standard_D2_v2"))
})
It("should launch smaller instances than optimal if larger instance launch results in Insufficient Capacity Error", func() {
azureEnv.UnavailableOfferingsCache.MarkUnavailable(ctx, "SubscriptionQuotaReached", "Standard_F16s_v2", fmt.Sprintf("%s-1", fake.Region), karpv1.CapacityTypeOnDemand)
azureEnv.UnavailableOfferingsCache.MarkUnavailable(ctx, "SubscriptionQuotaReached", "Standard_F16s_v2", fmt.Sprintf("%s-1", fake.Region), karpv1.CapacityTypeSpot)
azureEnv.UnavailableOfferingsCache.MarkUnavailable(ctx, "SubscriptionQuotaReached", "Standard_F16s_v2", fakeZone1, karpv1.CapacityTypeOnDemand)
azureEnv.UnavailableOfferingsCache.MarkUnavailable(ctx, "SubscriptionQuotaReached", "Standard_F16s_v2", fakeZone1, karpv1.CapacityTypeSpot)
coretest.ReplaceRequirements(nodePool, karpv1.NodeSelectorRequirementWithMinValues{
NodeSelectorRequirement: v1.NodeSelectorRequirement{
Key: v1.LabelInstanceTypeStable,
Expand All @@ -676,7 +697,7 @@ var _ = Describe("InstanceType Provider", func() {
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")},
},
NodeSelector: map[string]string{
v1.LabelTopologyZone: fmt.Sprintf("%s-1", fake.Region),
v1.LabelTopologyZone: fakeZone1,
},
}))
}
Expand Down Expand Up @@ -731,8 +752,8 @@ var _ = Describe("InstanceType Provider", func() {
pod := coretest.UnschedulablePod()
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, coreProvisioner, pod)
ExpectNotScheduled(ctx, env.Client, pod)
for _, zone := range []string{"1", "2", "3"} {
ExpectUnavailable(azureEnv, sku, zone, capacityType)
for _, zoneID := range []string{"1", "2", "3"} {
ExpectUnavailable(azureEnv, sku, utils.MakeZone(fake.Region, zoneID), capacityType)
}
}

Expand Down Expand Up @@ -793,7 +814,7 @@ var _ = Describe("InstanceType Provider", func() {
// Well known
v1.LabelTopologyRegion: fake.Region,
karpv1.NodePoolLabelKey: nodePool.Name,
v1.LabelTopologyZone: fmt.Sprintf("%s-1", fake.Region),
v1.LabelTopologyZone: fakeZone1,
v1.LabelInstanceTypeStable: "Standard_NC24ads_A100_v4",
v1.LabelOSStable: "linux",
v1.LabelArchStable: "amd64",
Expand All @@ -814,11 +835,11 @@ var _ = Describe("InstanceType Provider", func() {
v1alpha2.LabelSKUAccelerator: "A100",
// Deprecated Labels
v1.LabelFailureDomainBetaRegion: fake.Region,
v1.LabelFailureDomainBetaZone: fmt.Sprintf("%s-1", fake.Region),
v1.LabelFailureDomainBetaZone: fakeZone1,
"beta.kubernetes.io/arch": "amd64",
"beta.kubernetes.io/os": "linux",
v1.LabelInstanceType: "Standard_NC24ads_A100_v4",
"topology.disk.csi.azure.com/zone": fmt.Sprintf("%s-1", fake.Region),
"topology.disk.csi.azure.com/zone": fakeZone1,
v1.LabelWindowsBuild: "window",
// Cluster Label
v1alpha2.AKSLabelCluster: "test-cluster",
Expand Down
3 changes: 1 addition & 2 deletions pkg/test/expectations/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/Azure/karpenter-provider-azure/pkg/fake"
"github.com/Azure/karpenter-provider-azure/pkg/test"
)

func ExpectUnavailable(env *test.Environment, instanceType string, zone string, capacityType string) {
GinkgoHelper()
Expect(env.UnavailableOfferingsCache.IsUnavailable(instanceType, fmt.Sprintf("%s-%s", fake.Region, zone), capacityType)).To(BeTrue())
Expect(env.UnavailableOfferingsCache.IsUnavailable(instanceType, zone, capacityType)).To(BeTrue())
}

func ExpectKubeletFlags(env *test.Environment, customData string, expectedFlags map[string]string) {
Expand Down
Loading

0 comments on commit 15e1a82

Please sign in to comment.