Skip to content

Commit

Permalink
feat: add events for do-not-evict blocking deprovisioning (#224)
Browse files Browse the repository at this point in the history
* feat: add events for do-not-evict blocking deprovisioning

* added suite test

* removed suite test

* moved canbeterminated

* simplified
  • Loading branch information
njtran authored Feb 24, 2023
1 parent 2fe0c76 commit 3e5ec9b
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 28 deletions.
13 changes: 11 additions & 2 deletions pkg/controllers/deprovisioning/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,17 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []Can

// filter out nodes that can't be terminated
nodes = lo.Filter(nodes, func(cn CandidateNode, _ int) bool {
if reason, canTerminate := canBeTerminated(cn, pdbs); !canTerminate {
c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(cn.Node, reason))
if !cn.DeletionTimestamp.IsZero() {
c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(cn.Node, "in the process of deletion"))
return false
}
if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok {
c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb)))
return false
}
if p, ok := hasDoNotEvictPod(cn); ok {
c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(cn.Node,
fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name)))
return false
}
return true
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/deprovisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi
cloudProvider: cp,
deprovisioners: []Deprovisioner{
// Expire any nodes that must be deleted, allowing their pods to potentially land on currently
NewExpiration(clk, kubeClient, cluster, provisioner),
NewExpiration(clk, kubeClient, cluster, provisioner, recorder),
// Terminate any nodes that have drifted from provisioning specifications, allowing the pods to reschedule.
NewDrift(kubeClient, cluster, provisioner),
NewDrift(kubeClient, cluster, provisioner, recorder),
// Delete any remaining empty nodes as there is zero cost in terms of dirsuption. Emptiness and
// emptyNodeConsolidation are mutually exclusive, only one of these will operate
NewEmptiness(clk),
Expand Down
24 changes: 20 additions & 4 deletions pkg/controllers/deprovisioning/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (

"github.com/aws/karpenter-core/pkg/apis/settings"
"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events"
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/metrics"
)

Expand All @@ -37,13 +39,15 @@ type Drift struct {
kubeClient client.Client
cluster *state.Cluster
provisioner *provisioning.Provisioner
recorder events.Recorder
}

func NewDrift(kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner) *Drift {
func NewDrift(kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, recorder events.Recorder) *Drift {
return &Drift{
kubeClient: kubeClient,
cluster: cluster,
provisioner: provisioner,
recorder: recorder,
}
}

Expand All @@ -62,9 +66,21 @@ func (d *Drift) ComputeCommand(ctx context.Context, candidates ...CandidateNode)
if err != nil {
return Command{}, fmt.Errorf("tracking PodDisruptionBudgets, %w", err)
}
candidates = lo.Filter(candidates, func(n CandidateNode, _ int) bool {
_, canTerminate := canBeTerminated(n, pdbs)
return canTerminate
// filter out nodes that can't be terminated
candidates = lo.Filter(candidates, func(cn CandidateNode, _ int) bool {
if !cn.DeletionTimestamp.IsZero() {
return false
}
if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok {
d.recorder.Publish(deprovisioningevents.BlockedDeprovisioning(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb)))
return false
}
if p, ok := hasDoNotEvictPod(cn); ok {
d.recorder.Publish(deprovisioningevents.BlockedDeprovisioning(cn.Node,
fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name)))
return false
}
return true
})

for _, candidate := range candidates {
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/deprovisioning/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ func (e *Emptiness) ShouldDeprovision(ctx context.Context, n *state.Node, provis
// ComputeCommand generates a deprovisioning command given deprovisionable nodes
func (e *Emptiness) ComputeCommand(_ context.Context, nodes ...CandidateNode) (Command, error) {
emptyNodes := lo.Filter(nodes, func(n CandidateNode, _ int) bool {
_, canTerminate := canBeTerminated(n, nil)
return len(n.pods) == 0 && canTerminate
return n.DeletionTimestamp.IsZero() && len(n.pods) == 0
})

if len(emptyNodes) == 0 {
Expand Down
10 changes: 10 additions & 0 deletions pkg/controllers/deprovisioning/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ import (
"github.com/aws/karpenter-core/pkg/events"
)

func BlockedDeprovisioning(node *v1.Node, reason string) events.Event {
return events.Event{
InvolvedObject: node,
Type: v1.EventTypeNormal,
Reason: "BlockedDeprovisioning",
Message: fmt.Sprintf("Cannot deprovision node due to %s", reason),
DedupeValues: []string{node.Name, reason},
}
}

func TerminatingNode(node *v1.Node, reason string) events.Event {
return events.Event{
InvolvedObject: node,
Expand Down
24 changes: 20 additions & 4 deletions pkg/controllers/deprovisioning/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events"
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/metrics"
)

Expand All @@ -42,14 +44,16 @@ type Expiration struct {
kubeClient client.Client
cluster *state.Cluster
provisioner *provisioning.Provisioner
recorder events.Recorder
}

func NewExpiration(clk clock.Clock, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner) *Expiration {
func NewExpiration(clk clock.Clock, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, recorder events.Recorder) *Expiration {
return &Expiration{
clock: clk,
kubeClient: kubeClient,
cluster: cluster,
provisioner: provisioner,
recorder: recorder,
}
}

Expand All @@ -73,9 +77,21 @@ func (e *Expiration) ComputeCommand(ctx context.Context, candidates ...Candidate
if err != nil {
return Command{}, fmt.Errorf("tracking PodDisruptionBudgets, %w", err)
}
candidates = lo.Filter(candidates, func(n CandidateNode, _ int) bool {
_, canTerminate := canBeTerminated(n, pdbs)
return canTerminate
// filter out nodes that can't be terminated
candidates = lo.Filter(candidates, func(cn CandidateNode, _ int) bool {
if !cn.DeletionTimestamp.IsZero() {
return false
}
if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok {
e.recorder.Publish(deprovisioningevents.BlockedDeprovisioning(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb)))
return false
}
if p, ok := hasDoNotEvictPod(cn); ok {
e.recorder.Publish(deprovisioningevents.BlockedDeprovisioning(cn.Node,
fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name)))
return false
}
return true
})

for _, candidate := range candidates {
Expand Down
17 changes: 3 additions & 14 deletions pkg/controllers/deprovisioning/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,22 +325,11 @@ func mapNodes(nodes []*v1.Node, candidateNodes []CandidateNode) []CandidateNode
return ret
}

func canBeTerminated(node CandidateNode, pdbs *PDBLimits) (string, bool) {
if !node.DeletionTimestamp.IsZero() {
return "in the process of deletion", false
}
if pdbs != nil {
if pdb, ok := pdbs.CanEvictPods(node.pods); !ok {
return fmt.Sprintf("pdb %s prevents pod evictions", pdb), false
}
}
if p, ok := lo.Find(node.pods, func(p *v1.Pod) bool {
func hasDoNotEvictPod(cn CandidateNode) (*v1.Pod, bool) {
return lo.Find(cn.pods, func(p *v1.Pod) bool {
if pod.IsTerminating(p) || pod.IsTerminal(p) || pod.IsOwnedByNode(p) {
return false
}
return pod.HasDoNotEvict(p)
}); ok {
return fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name), false
}
return "", true
})
}

0 comments on commit 3e5ec9b

Please sign in to comment.