Skip to content

Commit

Permalink
feat: drain and volume detachment status conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Dec 11, 2024
1 parent 651605e commit fb3ac47
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 48 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/v1/nodeclaim_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
ConditionTypeInitialized = "Initialized"
ConditionTypeConsolidatable = "Consolidatable"
ConditionTypeDrifted = "Drifted"
ConditionTypeDrained = "Drained"
ConditionTypeVolumesDetached = "VolumesDetached"
ConditionTypeInstanceTerminating = "InstanceTerminating"
ConditionTypeConsistentStateFound = "ConsistentStateFound"
)
Expand Down
144 changes: 97 additions & 47 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -76,6 +77,7 @@ func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider clou

func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "node.termination")
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name)))

if !n.GetDeletionTimestamp().IsZero() {
return c.finalize(ctx, n)
Expand All @@ -92,20 +94,39 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, nil
}

nodeClaims, err := nodeutils.GetNodeClaims(ctx, c.kubeClient, node)
nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err)
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
log.FromContext(ctx).Error(err, "failed to terminate node")
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodeClaim", klog.KRef(nodeClaim.Namespace, nodeClaim.Name)))
if nodeClaim.DeletionTimestamp.IsZero() {
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("deleting nodeclaim, %w", err))
}
}

if err = c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining
// on nodes that are no longer alive. We do a check on the Ready condition of the node since, even
// though the CloudProvider says the instance is not around, we know that the kubelet process is still running
// if the Node Ready condition is true
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
}
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
}
}

nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaims...)
nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaim)
if err != nil {
return reconcile.Result{}, err
}

if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
Expand All @@ -117,25 +138,43 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, fmt.Errorf("draining node, %w", err)
}
c.recorder.Publish(terminatorevents.NodeFailedToDrain(node, err))
// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining
// on nodes that are no longer alive. We do a check on the Ready condition of the node since, even
// though the CloudProvider says the instance is not around, we know that the kubelet process is still running
// if the Node Ready condition is true
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeDrained, "Draining", "Draining"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}

return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
NodesDrainedTotal.Inc(map[string]string{
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
})
if !nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsTrue() {
stored := nodeClaim.DeepCopy()
_ = nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrained)
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(err)
}
NodesDrainedTotal.Inc(map[string]string{
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
})
// We sleep here after a patch operation since we want to ensure that we are able to read our own write before
// getting the NodeClaim again. This prevents conflict errors on subsequent writes.
// USE CAUTION when determining whether to increase this timeout or remove this line
time.Sleep(time.Second)
nodeClaim, err = nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
log.FromContext(ctx).Error(err, "failed to terminate node")
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
}

// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer.
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting.
Expand All @@ -145,48 +184,59 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
if !areVolumesDetached {
c.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachmentEvent(node))
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "AwaitingVolumeDetachment", "AwaitingVolumeDetachment"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
}
nodeClaims, err = nodeutils.GetNodeClaims(ctx, c.kubeClient, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
}
for _, nodeClaim := range nodeClaims {
isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider)
if err != nil {
// 404 = the nodeClaim no longer exists
if errors.IsNotFound(err) {
continue
}
// 409 - The nodeClaim exists, but its status has already been modified
if !nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsTrue() {
stored := nodeClaim.DeepCopy()
_ = nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeVolumesDetached)
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err)
return reconcile.Result{}, client.IgnoreNotFound(err)
}
if !isInstanceTerminated {
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
// We sleep here after a patch operation since we want to ensure that we are able to read our own write before
// getting the NodeClaim again. This prevents conflict errors on subsequent writes.
// USE CAUTION when determining whether to increase this timeout or remove this line
time.Sleep(time.Second)
nodeClaim, err = nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
log.FromContext(ctx).Error(err, "failed to terminate node")
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
}

isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider)
if client.IgnoreNotFound(err) != nil {
// 409 - The nodeClaim exists, but its status has already been modified
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err)
}
if !isInstanceTerminated {
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}
if err := c.removeFinalizer(ctx, node); err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

func (c *Controller) deleteAllNodeClaims(ctx context.Context, nodeClaims ...*v1.NodeClaim) error {
for _, nodeClaim := range nodeClaims {
// If we still get the NodeClaim, but it's already marked as terminating, we don't need to call Delete again
if nodeClaim.DeletionTimestamp.IsZero() {
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
return client.IgnoreNotFound(err)
}
}
}
return nil
}

func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) {
volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node)
if err != nil {
Expand Down
17 changes: 16 additions & 1 deletion pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ var _ = Describe("Termination", func() {
It("should not delete nodes until all pods are deleted", func() {
pods := test.Pods(2, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}})
ExpectApplied(ctx, env.Client, node, nodeClaim, pods[0], pods[1])
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsUnknown()).To(BeTrue())

// Trigger Termination Controller
Expect(env.Client.Delete(ctx, node)).To(Succeed())
Expand All @@ -540,6 +541,10 @@ var _ = Describe("Termination", func() {
ExpectSingletonReconciled(ctx, queue)
ExpectSingletonReconciled(ctx, queue)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsFalse()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).Reason).To(Equal("Draining"))

// Expect the pods to be evicted
EventuallyExpectTerminating(ctx, env.Client, pods[0], pods[1])

Expand All @@ -563,6 +568,8 @@ var _ = Describe("Termination", func() {
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsTrue()).To(BeTrue())
})
It("should delete nodes with no underlying instance even if not fully drained", func() {
pods := test.Pods(2, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}})
Expand Down Expand Up @@ -769,15 +776,23 @@ var _ = Describe("Termination", func() {
})
ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, va)
Expect(env.Client.Delete(ctx, node)).To(Succeed())
ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsUnknown()).To(BeTrue())

ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsFalse()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).Reason).To(Equal("AwaitingVolumeDetachment"))

ExpectDeleted(ctx, env.Client, va)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsTrue()).To(BeTrue())
})
It("should only wait for volume attachments associated with drainable pods", func() {
vaDrainable := test.VolumeAttachment(test.VolumeAttachmentOptions{
Expand Down Expand Up @@ -882,7 +897,7 @@ var _ = Describe("Termination", func() {
// Don't let any pod evict
MinAvailable: &minAvailable,
})
ExpectApplied(ctx, env.Client, pdb, node)
ExpectApplied(ctx, env.Client, pdb, node, nodeClaim)
pods := test.Pods(5, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{
OwnerReferences: defaultOwnerRefs,
Labels: labelSelector,
Expand Down
10 changes: 10 additions & 0 deletions pkg/controllers/node/termination/terminator/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ func NodeFailedToDrain(node *corev1.Node, err error) events.Event {
}
}

func NodeAwaitingVolumeDetachmentEvent(node *corev1.Node) events.Event {
return events.Event{
InvolvedObject: node,
Type: corev1.EventTypeNormal,
Reason: "AwaitingVolumeDetachment",
Message: "Awaiting deletion VolumeAttachments bound to node",
DedupeValues: []string{node.Name},
}
}

func NodeTerminationGracePeriodExpiring(node *corev1.Node, terminationTime string) events.Event {
return events.Event{
InvolvedObject: node,
Expand Down

0 comments on commit fb3ac47

Please sign in to comment.