Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: drain and volume detachment status conditions #1876

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/apis/v1/nodeclaim_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
ConditionTypeInitialized = "Initialized"
ConditionTypeConsolidatable = "Consolidatable"
ConditionTypeDrifted = "Drifted"
ConditionTypeDrained = "Drained"
ConditionTypeVolumesDetached = "VolumesDetached"
ConditionTypeInstanceTerminating = "InstanceTerminating"
ConditionTypeConsistentStateFound = "ConsistentStateFound"
)
Expand Down
144 changes: 97 additions & 47 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -76,6 +77,7 @@ func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider clou

func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "node.termination")
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name)))

if !n.GetDeletionTimestamp().IsZero() {
return c.finalize(ctx, n)
Expand All @@ -92,20 +94,39 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, nil
}

nodeClaims, err := nodeutils.GetNodeClaims(ctx, c.kubeClient, node)
nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err)
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
log.FromContext(ctx).Error(err, "failed to terminate node")
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodeClaim", klog.KRef(nodeClaim.Namespace, nodeClaim.Name)))
if nodeClaim.DeletionTimestamp.IsZero() {
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("deleting nodeclaim, %w", err))
Comment on lines +107 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't we want to client.IgnoreNotFound the error handling block so that we continue to the rest of the controller

Copy link
Member Author

@jmdeal jmdeal Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the nodeClaim isn't found, we shouldn't be able to proceed with the rest of the loop anyway. This is just a short-circuit. Same answer for anywhere else we short circuit on NotFound.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, we'll return and requeue if the NodeClaim isn't found. An appropriate error message will then be printed by the initial get call, and we'll return without requeue.

}
}

if err = c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining
// on nodes that are no longer alive. We do a check on the Ready condition of the node since, even
// though the CloudProvider says the instance is not around, we know that the kubelet process is still running
// if the Node Ready condition is true
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
}
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
}
}

nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaims...)
nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaim)
if err != nil {
return reconcile.Result{}, err
}

if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
Expand All @@ -117,25 +138,43 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, fmt.Errorf("draining node, %w", err)
}
c.recorder.Publish(terminatorevents.NodeFailedToDrain(node, err))
// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining
// on nodes that are no longer alive. We do a check on the Ready condition of the node since, even
// though the CloudProvider says the instance is not around, we know that the kubelet process is still running
// if the Node Ready condition is true
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeDrained, "Draining", "Draining"); modified {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want both Reason and Message to be Draining? Any extra details we can add here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scoping this to the drain error handling block also means that we're not going to be adding this status condition if the node was empty in the first place. From a functionality perspective this is fine, but also makes it a bit confusing to trace the steps in history later. Thoughts on this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want both Reason and Message to be Draining? Any extra details we can add here?

It would be nice, but would result in a lot of additional writes to the resource. That's why I opted to leave additional information on the event where it can be appropriately deduped.

Scoping this to the drain error handling block also means that we're not going to be adding this status condition if the node was empty in the first place.

Yeah, this is intentional. If there were no drainable pods on the Node in the first place, it wouldn't make sense to transition the status condition to false. We should transition from unknown -> true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice, but would result in a lot of additional writes to the resource. That's why I opted to leave additional information on the event where it can be appropriately deduped.

Maybe we can do as a followup, but it'd be interesting to have our reason here call back to which group of pods we're currently draining (e.g. non-critical daemon, critical daemon, non-critical non-daemon, critical non-daemon)

I agree with your second point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed some additional follow-ups, we could set the reason based on the group of pods currently being evicted (e.g. critical, system-critical, etc.) This will result in up to 4 additional writes per Node. We're going to decouple for now, but I'll open an issue to track this as an additional feature once this has merged.

if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
return reconcile.Result{}, client.IgnoreNotFound(err)
Comment on lines +143 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be doing ignore not found in this block and just continuing if it doesn't exist?

}
}

return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
NodesDrainedTotal.Inc(map[string]string{
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
})
if !nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsTrue() {
stored := nodeClaim.DeepCopy()
_ = nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrained)
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(err)
}
NodesDrainedTotal.Inc(map[string]string{
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
})
Comment on lines +161 to +163
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an observation that we still emit this metric even if we didn't do any draining.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is what we want. I'm considering "drained" the end state, not the process. It would also be confusing / concerning to me as an operator if total nodes drained was less than the total nodes terminated, since that would indicate to me Karpenter is terminating nodes.

We do need to check though that it drained successfully, and we haven't passed over the drain block due to TGP expiration. If that's what you were calling out, you're right and I'll address that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, this isn't actually an issue with drain, but it is an issue with VolumeDrained. We shouldn't set that condition to true if we proceeded due to TPG expiration.

// We sleep here after a patch operation since we want to ensure that we are able to read our own write before
// getting the NodeClaim again. This prevents conflict errors on subsequent writes.
// USE CAUTION when determining whether to increase this timeout or remove this line
time.Sleep(time.Second)
nodeClaim, err = nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we can just return here and requeue the controller?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason I didn't do that was the additional testability burden. This would increase the number of reconciliations required for the termination controller. Requiring multiple reconciliations for instance termination can already be hard enough to reason about, I'd really rather not increase this further.

Long-term I'm still tracking #1837 which will split these stages into individual controller or subreconcilers and address this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, we'll return and requeue after 1 second rather than doing the sleep.

if err != nil {
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
log.FromContext(ctx).Error(err, "failed to terminate node")
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
}

// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer.
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting.
Expand All @@ -145,48 +184,59 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
if !areVolumesDetached {
c.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachmentEvent(node))
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "AwaitingVolumeDetachment", "AwaitingVolumeDetachment"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here on ignoring not found here rather than L194

if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
}
nodeClaims, err = nodeutils.GetNodeClaims(ctx, c.kubeClient, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
}
for _, nodeClaim := range nodeClaims {
isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider)
if err != nil {
// 404 = the nodeClaim no longer exists
if errors.IsNotFound(err) {
continue
}
// 409 - The nodeClaim exists, but its status has already been modified
if !nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsTrue() {
stored := nodeClaim.DeepCopy()
_ = nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeVolumesDetached)
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err)
return reconcile.Result{}, client.IgnoreNotFound(err)
}
if !isInstanceTerminated {
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
// We sleep here after a patch operation since we want to ensure that we are able to read our own write before
// getting the NodeClaim again. This prevents conflict errors on subsequent writes.
// USE CAUTION when determining whether to increase this timeout or remove this line
time.Sleep(time.Second)
nodeClaim, err = nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here on just returning

if err != nil {
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
log.FromContext(ctx).Error(err, "failed to terminate node")
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
}

isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider)
if client.IgnoreNotFound(err) != nil {
// 409 - The nodeClaim exists, but its status has already been modified
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err)
}
if !isInstanceTerminated {
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}
if err := c.removeFinalizer(ctx, node); err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

func (c *Controller) deleteAllNodeClaims(ctx context.Context, nodeClaims ...*v1.NodeClaim) error {
for _, nodeClaim := range nodeClaims {
// If we still get the NodeClaim, but it's already marked as terminating, we don't need to call Delete again
if nodeClaim.DeletionTimestamp.IsZero() {
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
return client.IgnoreNotFound(err)
}
}
}
return nil
}

func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) {
volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node)
if err != nil {
Expand Down
17 changes: 16 additions & 1 deletion pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ var _ = Describe("Termination", func() {
It("should not delete nodes until all pods are deleted", func() {
pods := test.Pods(2, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}})
ExpectApplied(ctx, env.Client, node, nodeClaim, pods[0], pods[1])
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsUnknown()).To(BeTrue())

// Trigger Termination Controller
Expect(env.Client.Delete(ctx, node)).To(Succeed())
Expand All @@ -540,6 +541,10 @@ var _ = Describe("Termination", func() {
ExpectSingletonReconciled(ctx, queue)
ExpectSingletonReconciled(ctx, queue)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsFalse()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).Reason).To(Equal("Draining"))

// Expect the pods to be evicted
EventuallyExpectTerminating(ctx, env.Client, pods[0], pods[1])

Expand All @@ -563,6 +568,8 @@ var _ = Describe("Termination", func() {
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsTrue()).To(BeTrue())
})
It("should delete nodes with no underlying instance even if not fully drained", func() {
pods := test.Pods(2, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}})
Expand Down Expand Up @@ -769,15 +776,23 @@ var _ = Describe("Termination", func() {
})
ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, va)
Expect(env.Client.Delete(ctx, node)).To(Succeed())
ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsUnknown()).To(BeTrue())

ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsFalse()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).Reason).To(Equal("AwaitingVolumeDetachment"))

ExpectDeleted(ctx, env.Client, va)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsTrue()).To(BeTrue())
})
It("should only wait for volume attachments associated with drainable pods", func() {
vaDrainable := test.VolumeAttachment(test.VolumeAttachmentOptions{
Expand Down Expand Up @@ -882,7 +897,7 @@ var _ = Describe("Termination", func() {
// Don't let any pod evict
MinAvailable: &minAvailable,
})
ExpectApplied(ctx, env.Client, pdb, node)
ExpectApplied(ctx, env.Client, pdb, node, nodeClaim)
pods := test.Pods(5, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{
OwnerReferences: defaultOwnerRefs,
Labels: labelSelector,
Expand Down
10 changes: 10 additions & 0 deletions pkg/controllers/node/termination/terminator/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ func NodeFailedToDrain(node *corev1.Node, err error) events.Event {
}
}

func NodeAwaitingVolumeDetachmentEvent(node *corev1.Node) events.Event {
return events.Event{
InvolvedObject: node,
Type: corev1.EventTypeNormal,
Reason: "AwaitingVolumeDetachment",
Message: "Awaiting deletion VolumeAttachments bound to node",
DedupeValues: []string{node.Name},
}
}

func NodeTerminationGracePeriodExpiring(node *corev1.Node, terminationTime string) events.Event {
return events.Event{
InvolvedObject: node,
Expand Down
Loading