Skip to content

Commit

Permalink
Handle provider id changing in cluster state (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored Jan 18, 2023
1 parent df9dab2 commit c38b8b0
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 19 deletions.
6 changes: 0 additions & 6 deletions pkg/apis/v1alpha5/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ const (
TerminationFinalizer = Group + "/termination"
)

// Tags for infrastructure resources deployed into Cloud Provider's accounts
const (
DiscoveryTagKey = Group + "/discovery"
ManagedByTagKey = Group + "/managed-by"
)

var (
// RestrictedLabelDomains are either prohibited by the kubelet or reserved by karpenter
RestrictedLabelDomains = sets.NewString(
Expand Down
41 changes: 28 additions & 13 deletions pkg/controllers/state/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,18 @@ func (c *Cluster) UpdateNode(ctx context.Context, node *v1.Node) error {
return nil
}

func (c *Cluster) DeleteNode(nodeName string) {
func (c *Cluster) DeleteNode(name string) {
c.mu.Lock()
defer c.mu.Unlock()

if id := c.nameToProviderID[nodeName]; id != "" {
delete(c.nodes, id)
delete(c.nameToProviderID, nodeName)
c.RecordConsolidationChange()
}
c.cleanupNode(name)
}

// UpdatePod is called every time the pod is reconciled
func (c *Cluster) UpdatePod(ctx context.Context, pod *v1.Pod) error {
c.mu.Lock()
defer c.mu.Unlock()

var err error
if podutils.IsTerminal(pod) {
c.updateNodeUsageFromPodCompletion(client.ObjectKeyFromObject(pod))
Expand All @@ -190,6 +189,9 @@ func (c *Cluster) UpdatePod(ctx context.Context, pod *v1.Pod) error {

// DeletePod is called when the pod has been deleted
func (c *Cluster) DeletePod(podKey types.NamespacedName) {
c.mu.Lock()
defer c.mu.Unlock()

c.antiAffinityPods.Delete(podKey)
c.updateNodeUsageFromPodCompletion(podKey)
c.RecordConsolidationChange()
Expand Down Expand Up @@ -224,6 +226,11 @@ func (c *Cluster) Reset() {
c.antiAffinityPods = sync.Map{}
}

// WARNING
// Everything under this section of code assumes that you have already held a lock when you are calling into these functions
// and explicitly modifying the cluster state. If you do not hold the cluster state lock before calling any of these helpers
// you will hit race conditions and data corruption

func (c *Cluster) newStateFromNode(ctx context.Context, node *v1.Node, oldNode *Node) (*Node, error) {
if oldNode == nil {
oldNode = &Node{
Expand All @@ -250,10 +257,24 @@ func (c *Cluster) newStateFromNode(ctx context.Context, node *v1.Node, oldNode *
); err != nil {
return nil, err
}
// Cleanup the old node with its old providerID if its providerID changes
// This can happen since nodes don't get created with providerIDs. Rather, CCM picks up the
// created node and injects the providerID into the spec.providerID
if id, ok := c.nameToProviderID[node.Name]; ok && id != node.Spec.ProviderID {
c.cleanupNode(node.Name)
}
c.triggerConsolidationOnChange(oldNode, n)
return n, nil
}

func (c *Cluster) cleanupNode(name string) {
if id, ok := c.nameToProviderID[name]; ok {
delete(c.nodes, id)
delete(c.nameToProviderID, name)
c.RecordConsolidationChange()
}
}

func (c *Cluster) populateStartupTaints(ctx context.Context, n *Node) error {
if !n.Owned() {
return nil
Expand Down Expand Up @@ -315,7 +336,7 @@ func (c *Cluster) populateResourceRequests(ctx context.Context, n *Node) error {
}
c.cleanupOldBindings(pod)
n.updateForPod(ctx, pod)
c.bindings[client.ObjectKeyFromObject(pod)] = pod.Spec.NodeName // TODO @joinnis: Potentially change this later
c.bindings[client.ObjectKeyFromObject(pod)] = pod.Spec.NodeName
}
return nil
}
Expand All @@ -328,9 +349,6 @@ func (c *Cluster) updateNodeUsageFromPod(ctx context.Context, pod *v1.Pod) error
return nil
}

c.mu.Lock()
defer c.mu.Unlock()

n, ok := c.nodes[c.nameToProviderID[pod.Spec.NodeName]]
if !ok {
// the node must exist for us to update the resource requests on the node
Expand All @@ -343,9 +361,6 @@ func (c *Cluster) updateNodeUsageFromPod(ctx context.Context, pod *v1.Pod) error
}

func (c *Cluster) updateNodeUsageFromPodCompletion(podKey types.NamespacedName) {
c.mu.Lock()
defer c.mu.Unlock()

nodeName, bindingKnown := c.bindings[podKey]
if !bindingKnown {
// we didn't think the pod was bound, so we weren't tracking it and don't need to do anything
Expand Down
26 changes: 26 additions & 0 deletions pkg/controllers/state/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,22 @@ var _ = Describe("Node Resource Level", func() {
time.Sleep(time.Second * 6) // past 10s, node should no longer be nominated
Expect(ExpectStateNodeExists(node).Nominated()).To(BeFalse())
})
It("should handle a node changing from no providerID to registering a providerID", func() {
node := test.Node()
ExpectApplied(ctx, env.Client, node)
ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))

ExpectStateNodeCount("==", 1)
ExpectStateNodeExists(node)

// Change the providerID; this mocks CCM adding the providerID onto the node after registration
node.Spec.ProviderID = fmt.Sprintf("fake://%s", node.Name)
ExpectApplied(ctx, env.Client, node)
ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))

ExpectStateNodeCount("==", 1)
ExpectStateNodeExists(node)
})
})

var _ = Describe("Pod Anti-Affinity", func() {
Expand Down Expand Up @@ -802,6 +818,16 @@ var _ = Describe("Provisioner Spec Updates", func() {
})
})

func ExpectStateNodeCount(comparator string, count int) int {
c := 0
cluster.ForEachNode(func(n *state.Node) bool {
c++
return true
})
ExpectWithOffset(1, count).To(BeNumerically(comparator, count))
return c
}

func ExpectStateNodeExistsWithOffset(offset int, node *v1.Node) *state.Node {
var ret *state.Node
cluster.ForEachNode(func(n *state.Node) bool {
Expand Down

0 comments on commit c38b8b0

Please sign in to comment.