diff --git a/pkg/apis/v1/nodeclaim_status.go b/pkg/apis/v1/nodeclaim_status.go index aca25a4e77..ec0bf2f102 100644 --- a/pkg/apis/v1/nodeclaim_status.go +++ b/pkg/apis/v1/nodeclaim_status.go @@ -28,6 +28,8 @@ const ( ConditionTypeInitialized = "Initialized" ConditionTypeConsolidatable = "Consolidatable" ConditionTypeDrifted = "Drifted" + ConditionTypeDrained = "Drained" + ConditionTypeVolumesDetached = "VolumesDetached" ConditionTypeInstanceTerminating = "InstanceTerminating" ConditionTypeConsistentStateFound = "ConsistentStateFound" ) diff --git a/pkg/controllers/node/termination/controller.go b/pkg/controllers/node/termination/controller.go index 67e12c6ebf..898524ffde 100644 --- a/pkg/controllers/node/termination/controller.go +++ b/pkg/controllers/node/termination/controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -76,6 +77,7 @@ func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider clou func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) { ctx = injection.WithControllerName(ctx, "node.termination") + ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name))) if !n.GetDeletionTimestamp().IsZero() { return c.finalize(ctx, n) @@ -92,20 +94,39 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile return reconcile.Result{}, nil } - nodeClaims, err := nodeutils.GetNodeClaims(ctx, c.kubeClient, node) + nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node) if err != nil { - return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err) + if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) { + log.FromContext(ctx).Error(err, "failed to terminate node") + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodeClaim", klog.KRef(nodeClaim.Namespace, nodeClaim.Name))) + if nodeClaim.DeletionTimestamp.IsZero() { + if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil { + return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("deleting nodeclaim, %w", err)) + } } - if err = c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil { - return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err) + // If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining + // on nodes that are no longer alive. We do a check on the Ready condition of the node since, even + // though the CloudProvider says the instance is not around, we know that the kubelet process is still running + // if the Node Ready condition is true + // Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144 + if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue { + if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil { + if cloudprovider.IsNodeClaimNotFoundError(err) { + return reconcile.Result{}, c.removeFinalizer(ctx, node) + } + return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err) + } } - nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaims...) + nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaim) if err != nil { return reconcile.Result{}, err } - if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil { if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil @@ -117,25 +138,43 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile return reconcile.Result{}, fmt.Errorf("draining node, %w", err) } c.recorder.Publish(terminatorevents.NodeFailedToDrain(node, err)) - // If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining - // on nodes that are no longer alive. We do a check on the Ready condition of the node since, even - // though the CloudProvider says the instance is not around, we know that the kubelet process is still running - // if the Node Ready condition is true - // Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144 - if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue { - if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil { - if cloudprovider.IsNodeClaimNotFoundError(err) { - return reconcile.Result{}, c.removeFinalizer(ctx, node) + stored := nodeClaim.DeepCopy() + if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeDrained, "Draining", "Draining"); modified { + if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil } - return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err) + return reconcile.Result{}, client.IgnoreNotFound(err) } } - return reconcile.Result{RequeueAfter: 1 * time.Second}, nil } - NodesDrainedTotal.Inc(map[string]string{ - metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey], - }) + if !nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsTrue() { + stored := nodeClaim.DeepCopy() + _ = nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrained) + if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, client.IgnoreNotFound(err) + } + NodesDrainedTotal.Inc(map[string]string{ + metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey], + }) + // We sleep here after a patch operation since we want to ensure that we are able to read our own write before + // getting the NodeClaim again. This prevents conflict errors on subsequent writes. + // USE CAUTION when determining whether to increase this timeout or remove this line + time.Sleep(time.Second) + nodeClaim, err = nodeutils.NodeClaimForNode(ctx, c.kubeClient, node) + if err != nil { + if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) { + log.FromContext(ctx).Error(err, "failed to terminate node") + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + } + // In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait // for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer. // However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting. @@ -145,29 +184,52 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err) } if !areVolumesDetached { + c.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachmentEvent(node)) + stored := nodeClaim.DeepCopy() + if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "AwaitingVolumeDetachment", "AwaitingVolumeDetachment"); modified { + if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, client.IgnoreNotFound(err) + } + } return reconcile.Result{RequeueAfter: 1 * time.Second}, nil } } - nodeClaims, err = nodeutils.GetNodeClaims(ctx, c.kubeClient, node) - if err != nil { - return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err) - } - for _, nodeClaim := range nodeClaims { - isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider) - if err != nil { - // 404 = the nodeClaim no longer exists - if errors.IsNotFound(err) { - continue - } - // 409 - The nodeClaim exists, but its status has already been modified + if !nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsTrue() { + stored := nodeClaim.DeepCopy() + _ = nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeVolumesDetached) + if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } - return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err) + return reconcile.Result{}, client.IgnoreNotFound(err) } - if !isInstanceTerminated { - return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + // We sleep here after a patch operation since we want to ensure that we are able to read our own write before + // getting the NodeClaim again. This prevents conflict errors on subsequent writes. + // USE CAUTION when determining whether to increase this timeout or remove this line + time.Sleep(time.Second) + nodeClaim, err = nodeutils.NodeClaimForNode(ctx, c.kubeClient, node) + if err != nil { + if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) { + log.FromContext(ctx).Error(err, "failed to terminate node") + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + } + + isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider) + if client.IgnoreNotFound(err) != nil { + // 409 - The nodeClaim exists, but its status has already been modified + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil } + return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err) + } + if !isInstanceTerminated { + return reconcile.Result{RequeueAfter: 5 * time.Second}, nil } if err := c.removeFinalizer(ctx, node); err != nil { return reconcile.Result{}, err @@ -175,18 +237,6 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile return reconcile.Result{}, nil } -func (c *Controller) deleteAllNodeClaims(ctx context.Context, nodeClaims ...*v1.NodeClaim) error { - for _, nodeClaim := range nodeClaims { - // If we still get the NodeClaim, but it's already marked as terminating, we don't need to call Delete again - if nodeClaim.DeletionTimestamp.IsZero() { - if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil { - return client.IgnoreNotFound(err) - } - } - } - return nil -} - func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) { volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node) if err != nil { diff --git a/pkg/controllers/node/termination/suite_test.go b/pkg/controllers/node/termination/suite_test.go index 5758579227..4b20df9055 100644 --- a/pkg/controllers/node/termination/suite_test.go +++ b/pkg/controllers/node/termination/suite_test.go @@ -532,6 +532,7 @@ var _ = Describe("Termination", func() { It("should not delete nodes until all pods are deleted", func() { pods := test.Pods(2, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) ExpectApplied(ctx, env.Client, node, nodeClaim, pods[0], pods[1]) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsUnknown()).To(BeTrue()) // Trigger Termination Controller Expect(env.Client.Delete(ctx, node)).To(Succeed()) @@ -540,6 +541,10 @@ var _ = Describe("Termination", func() { ExpectSingletonReconciled(ctx, queue) ExpectSingletonReconciled(ctx, queue) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsFalse()).To(BeTrue()) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).Reason).To(Equal("Draining")) + // Expect the pods to be evicted EventuallyExpectTerminating(ctx, env.Client, pods[0], pods[1]) @@ -563,6 +568,8 @@ var _ = Describe("Termination", func() { ExpectObjectReconciled(ctx, env.Client, terminationController, node) ExpectObjectReconciled(ctx, env.Client, terminationController, node) ExpectNotFound(ctx, env.Client, node) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsTrue()).To(BeTrue()) }) It("should delete nodes with no underlying instance even if not fully drained", func() { pods := test.Pods(2, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) @@ -769,15 +776,23 @@ var _ = Describe("Termination", func() { }) ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, va) Expect(env.Client.Delete(ctx, node)).To(Succeed()) + ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsUnknown()).To(BeTrue()) ExpectObjectReconciled(ctx, env.Client, terminationController, node) ExpectObjectReconciled(ctx, env.Client, terminationController, node) ExpectExists(ctx, env.Client, node) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsFalse()).To(BeTrue()) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).Reason).To(Equal("AwaitingVolumeDetachment")) ExpectDeleted(ctx, env.Client, va) ExpectObjectReconciled(ctx, env.Client, terminationController, node) ExpectObjectReconciled(ctx, env.Client, terminationController, node) ExpectNotFound(ctx, env.Client, node) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsTrue()).To(BeTrue()) }) It("should only wait for volume attachments associated with drainable pods", func() { vaDrainable := test.VolumeAttachment(test.VolumeAttachmentOptions{ @@ -882,7 +897,7 @@ var _ = Describe("Termination", func() { // Don't let any pod evict MinAvailable: &minAvailable, }) - ExpectApplied(ctx, env.Client, pdb, node) + ExpectApplied(ctx, env.Client, pdb, node, nodeClaim) pods := test.Pods(5, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{ OwnerReferences: defaultOwnerRefs, Labels: labelSelector, diff --git a/pkg/controllers/node/termination/terminator/events/events.go b/pkg/controllers/node/termination/terminator/events/events.go index d626173d56..c95b4f0206 100644 --- a/pkg/controllers/node/termination/terminator/events/events.go +++ b/pkg/controllers/node/termination/terminator/events/events.go @@ -56,6 +56,16 @@ func NodeFailedToDrain(node *corev1.Node, err error) events.Event { } } +func NodeAwaitingVolumeDetachmentEvent(node *corev1.Node) events.Event { + return events.Event{ + InvolvedObject: node, + Type: corev1.EventTypeNormal, + Reason: "AwaitingVolumeDetachment", + Message: "Awaiting deletion VolumeAttachments bound to node", + DedupeValues: []string{node.Name}, + } +} + func NodeTerminationGracePeriodExpiring(node *corev1.Node, terminationTime string) events.Event { return events.Event{ InvolvedObject: node,