Skip to content

Commit

Permalink
Remove SubnetPort/Pod finalizer
Browse files Browse the repository at this point in the history
Signed-off-by: Yanjun Zhou <[email protected]>
  • Loading branch information
yanjunz97 committed Oct 12, 2024
1 parent 762863a commit 9b9bb82
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 203 deletions.
95 changes: 49 additions & 46 deletions pkg/controllers/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@ import (
"context"
"fmt"
"os"
"time"

"github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/vmware-tanzu/nsx-operator/pkg/controllers/common"
"github.com/vmware-tanzu/nsx-operator/pkg/logger"
Expand All @@ -45,14 +44,25 @@ type PodReconciler struct {
}

func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
pod := &v1.Pod{}
log.Info("reconciling pod", "pod", req.NamespacedName)
startTime := time.Now()
defer func() {
log.Info("finished reconciling Pod", "Pod", req.NamespacedName, "duration", time.Since(startTime))
}()

metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerSyncTotal, MetricResTypePod)

pod := &v1.Pod{}
if err := r.Client.Get(ctx, req.NamespacedName, pod); err != nil {
log.Error(err, "unable to fetch pod", "req", req.NamespacedName)
return common.ResultNormal, client.IgnoreNotFound(err)
if apierrors.IsNotFound(err) {
if err := r.deleteSubnetPortByPodName(ctx, req.Namespace, req.Name); err != nil {
log.Error(err, "failed to delete NSX SubnetPort", "SubnetPort", req.NamespacedName)
return common.ResultRequeue, err
}
return common.ResultNormal, nil
}
log.Error(err, "unable to fetch Pod", "Pod", req.NamespacedName)
return common.ResultRequeue, err
}
if pod.Spec.HostNetwork {
log.Info("skipping handling hostnetwork pod", "pod", req.NamespacedName)
Expand All @@ -65,16 +75,6 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R

if !podIsDeleted(pod) {
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerUpdateTotal, MetricResTypePod)
if !controllerutil.ContainsFinalizer(pod, servicecommon.PodFinalizerName) {
controllerutil.AddFinalizer(pod, servicecommon.PodFinalizerName)
if err := r.Client.Update(ctx, pod); err != nil {
log.Error(err, "add finalizer", "pod", req.NamespacedName)
updateFail(r, ctx, pod, &err)
return common.ResultRequeue, err
}
log.Info("added finalizer on pod", "pod", req.NamespacedName)
}

nsxSubnetPath, err := r.GetSubnetPathForPod(ctx, pod)
if err != nil {
log.Error(err, "failed to get NSX resource path from subnet", "pod.Name", pod.Name, "pod.UID", pod.UID)
Expand Down Expand Up @@ -105,25 +105,13 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
}
updateSuccess(r, ctx, pod)
} else {
if controllerutil.ContainsFinalizer(pod, servicecommon.PodFinalizerName) {
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteTotal, MetricResTypePod)
subnetPortID := r.SubnetPortService.BuildSubnetPortId(&pod.ObjectMeta)
if err := r.SubnetPortService.DeleteSubnetPort(subnetPortID); err != nil {
log.Error(err, "deletion failed, would retry exponentially", "pod", req.NamespacedName)
deleteFail(r, ctx, pod, &err)
return common.ResultRequeue, err
}
controllerutil.RemoveFinalizer(pod, servicecommon.PodFinalizerName)
if err := r.Client.Update(ctx, pod); err != nil {
log.Error(err, "deletion failed, would retry exponentially", "pod", req.NamespacedName)
deleteFail(r, ctx, pod, &err)
return common.ResultRequeue, err
}
log.Info("removed finalizer", "pod", req.NamespacedName)
deleteSuccess(r, ctx, pod)
} else {
log.Info("finalizers cannot be recognized", "pod", req.NamespacedName)
subnetPortID := r.SubnetPortService.BuildSubnetPortId(&pod.ObjectMeta)
if err := r.SubnetPortService.DeleteSubnetPortById(subnetPortID); err != nil {
log.Error(err, "deletion failed, would retry exponentially", "pod", req.NamespacedName)
deleteFail(r, ctx, pod, &err)
return common.ResultRequeue, err
}
deleteSuccess(r, ctx, pod)
}
return ctrl.Result{}, nil
}
Expand All @@ -147,14 +135,6 @@ func (r *PodReconciler) GetNodeByName(nodeName string) (*model.HostTransportNode
func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1.Pod{}).
WithEventFilter(
predicate.Funcs{
DeleteFunc: func(e event.DeleteEvent) bool {
// Suppress Delete events to avoid filtering them out in the Reconcile function
return false
},
},
).
WithOptions(
controller.Options{
MaxConcurrentReconciles: common.NumReconcile(),
Expand Down Expand Up @@ -212,7 +192,7 @@ func (r *PodReconciler) CollectGarbage(ctx context.Context) {
for elem := range diffSet {
log.V(1).Info("GC collected Pod", "NSXSubnetPortID", elem)
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteTotal, MetricResTypePod)
err = r.SubnetPortService.DeleteSubnetPort(elem)
err = r.SubnetPortService.DeleteSubnetPortById(elem)
if err != nil {
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResTypePod)
} else {
Expand All @@ -221,17 +201,17 @@ func (r *PodReconciler) CollectGarbage(ctx context.Context) {
}
}

func updateFail(r *PodReconciler, c context.Context, o *v1.Pod, e *error) {
func updateFail(r *PodReconciler, _ context.Context, o *v1.Pod, e *error) {
r.Recorder.Event(o, v1.EventTypeWarning, common.ReasonFailUpdate, fmt.Sprintf("%v", *e))
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerUpdateFailTotal, MetricResTypePod)
}

func deleteFail(r *PodReconciler, c context.Context, o *v1.Pod, e *error) {
func deleteFail(r *PodReconciler, _ context.Context, o *v1.Pod, e *error) {
r.Recorder.Event(o, v1.EventTypeWarning, common.ReasonFailDelete, fmt.Sprintf("%v", *e))
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResTypePod)
}

func updateSuccess(r *PodReconciler, c context.Context, o *v1.Pod) {
func updateSuccess(r *PodReconciler, _ context.Context, o *v1.Pod) {
r.Recorder.Event(o, v1.EventTypeNormal, common.ReasonSuccessfulUpdate, "Pod CR has been successfully updated")
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerUpdateSuccessTotal, MetricResTypePod)
}
Expand Down Expand Up @@ -264,3 +244,26 @@ func (r *PodReconciler) GetSubnetPathForPod(ctx context.Context, pod *v1.Pod) (s
func podIsDeleted(pod *v1.Pod) bool {
return !pod.ObjectMeta.DeletionTimestamp.IsZero() || pod.Status.Phase == "Succeeded" || pod.Status.Phase == "Failed"
}

func (r *PodReconciler) deleteSubnetPortByPodName(ctx context.Context, ns string, name string) error {
// When deleting SubnetPort by Name and Namespace, skip the SubnetPort belonging to the existed SubnetPort CR
nsxSubnetPorts := r.SubnetPortService.ListSubnetPortByPodName(ns, name)

crSubnetPortIDsSet, err := r.SubnetPortService.ListSubnetPortIDsFromCRs(ctx)
if err != nil {
log.Error(err, "failed to list SubnetPort CRs")
return err
}

for _, nsxSubnetPort := range nsxSubnetPorts {
if crSubnetPortIDsSet.Has(*nsxSubnetPort.Id) {
log.Info("skipping deletion, Pod CR still exists in K8s", "ID", *nsxSubnetPort.Id)
continue
}
if err := r.SubnetPortService.DeleteSubnetPort(nsxSubnetPort); err != nil {
return err
}
}
log.Info("successfully deleted nsxSubnetPort for Pod", "namespace", ns, "name", name)
return nil
}
118 changes: 55 additions & 63 deletions pkg/controllers/subnetport/subnetport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,20 @@ import (
"os"
"reflect"
"strings"
"time"

vmv1alpha1 "github.com/vmware-tanzu/vm-operator/api/v1alpha1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -57,18 +56,27 @@ type SubnetPortReconciler struct {

// +kubebuilder:rbac:groups=nsx.vmware.com,resources=subnetports,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nsx.vmware.com,resources=subnetports/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nsx.vmware.com,resources=subnetports/finalizers,verbs=update
func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
subnetPort := &v1alpha1.SubnetPort{}
log.Info("reconciling subnetport CR", "subnetport", req.NamespacedName)
startTime := time.Now()
defer func() {
log.Info("finished reconciling SubnetPort", "SubnetPort", req.NamespacedName, "duration", time.Since(startTime))
}()

metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerSyncTotal, MetricResTypeSubnetPort)

subnetPort := &v1alpha1.SubnetPort{}
if err := r.Client.Get(ctx, req.NamespacedName, subnetPort); err != nil {
log.Error(err, "unable to fetch subnetport CR", "req", req.NamespacedName)
return common.ResultNormal, client.IgnoreNotFound(err)
if apierrors.IsNotFound(err) {
if err := r.deleteSubnetPortByName(ctx, req.Namespace, req.Name); err != nil {
log.Error(err, "failed to delete NSX SubnetPort", "SubnetPort", req.NamespacedName)
return common.ResultRequeue, err
}
return common.ResultNormal, nil
}
log.Error(err, "unable to fetch SubnetPort CR", "SubnetPort", req.NamespacedName)
return common.ResultRequeue, err
}

if len(subnetPort.Spec.SubnetSet) > 0 && len(subnetPort.Spec.Subnet) > 0 {
err := errors.New("subnet and subnetset should not be configured at the same time")
log.Error(err, "failed to get subnet/subnetset of the subnetport", "subnetport", req.NamespacedName)
Expand All @@ -78,15 +86,6 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request)

if subnetPort.ObjectMeta.DeletionTimestamp.IsZero() {
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerUpdateTotal, MetricResTypeSubnetPort)
if !controllerutil.ContainsFinalizer(subnetPort, servicecommon.SubnetPortFinalizerName) {
controllerutil.AddFinalizer(subnetPort, servicecommon.SubnetPortFinalizerName)
if err := r.Client.Update(ctx, subnetPort); err != nil {
log.Error(err, "add finalizer", "subnetport", req.NamespacedName)
updateFail(r, ctx, subnetPort, &err)
return common.ResultRequeue, err
}
log.Info("added finalizer on subnetport CR", "subnetport", req.NamespacedName)
}

old_status := subnetPort.Status.DeepCopy()
isParentResourceTerminating, nsxSubnetPath, err := r.CheckAndGetSubnetPathForSubnetPort(ctx, subnetPort)
Expand Down Expand Up @@ -145,27 +144,16 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
updateSuccess(r, ctx, subnetPort)
} else {
if controllerutil.ContainsFinalizer(subnetPort, servicecommon.SubnetPortFinalizerName) {
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteTotal, MetricResTypeSubnetPort)
subnetPortID := r.SubnetPortService.BuildSubnetPortId(&subnetPort.ObjectMeta)
if err := r.SubnetPortService.DeleteSubnetPort(subnetPortID); err != nil {
log.Error(err, "deletion failed, would retry exponentially", "subnetport", req.NamespacedName)
deleteFail(r, ctx, subnetPort, &err)
return common.ResultRequeue, err
}
controllerutil.RemoveFinalizer(subnetPort, servicecommon.SubnetPortFinalizerName)
if err := r.Client.Update(ctx, subnetPort); err != nil {
log.Error(err, "deletion failed, would retry exponentially", "subnetport", req.NamespacedName)
deleteFail(r, ctx, subnetPort, &err)
return common.ResultRequeue, err
}
log.Info("removed finalizer", "subnetport", req.NamespacedName)
deleteSuccess(r, ctx, subnetPort)
} else {
log.Info("finalizers cannot be recognized", "subnetport", req.NamespacedName)
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteTotal, MetricResTypeSubnetPort)
subnetPortID := r.SubnetPortService.BuildSubnetPortId(&subnetPort.ObjectMeta)
if err := r.SubnetPortService.DeleteSubnetPortById(subnetPortID); err != nil {
log.Error(err, "deletion failed, would retry exponentially", "SubnetPort", req.NamespacedName)
deleteFail(r, ctx, subnetPort, &err)
return common.ResultRequeue, err
}
deleteSuccess(r, ctx, subnetPort)
}
return ctrl.Result{}, nil
return common.ResultNormal, nil
}

func subnetPortNamespaceVMIndexFunc(obj client.Object) []string {
Expand All @@ -191,6 +179,29 @@ func addressBindingNamespaceVMIndexFunc(obj client.Object) []string {
}
}

func (r *SubnetPortReconciler) deleteSubnetPortByName(ctx context.Context, ns string, name string) error {
// When deleting SubnetPort by Name and Namespace, skip the SubnetPort belonging to the existed SubnetPort CR
nsxSubnetPorts := r.SubnetPortService.ListSubnetPortByName(ns, name)

crSubnetPortIDsSet, err := r.SubnetPortService.ListSubnetPortIDsFromCRs(ctx)
if err != nil {
log.Error(err, "failed to list SubnetPort CRs")
return err
}

for _, nsxSubnetPort := range nsxSubnetPorts {
if crSubnetPortIDsSet.Has(*nsxSubnetPort.Id) {
log.Info("skipping deletion, SubnetPort CR still exists in K8s", "ID", *nsxSubnetPort.Id)
continue
}
if err := r.SubnetPortService.DeleteSubnetPort(nsxSubnetPort); err != nil {
return err
}
}
log.Info("successfully deleted nsxSubnetPort", "namespace", ns, "name", name)
return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *SubnetPortReconciler) SetupWithManager(mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &v1alpha1.SubnetPort{}, util.SubnetPortNamespaceVMIndexKey, subnetPortNamespaceVMIndexFunc); err != nil {
Expand All @@ -201,18 +212,6 @@ func (r *SubnetPortReconciler) SetupWithManager(mgr ctrl.Manager) error {
}
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.SubnetPort{}).
WithEventFilter(
predicate.Funcs{
DeleteFunc: func(e event.DeleteEvent) bool {
// Suppress Delete events to avoid filtering them out in the Reconcile function
switch e.Object.(type) {
case *v1alpha1.AddressBinding:
return true
}
return false
},
},
).
WithOptions(
controller.Options{
MaxConcurrentReconciles: common.NumReconcile(),
Expand Down Expand Up @@ -290,24 +289,17 @@ func (r *SubnetPortReconciler) CollectGarbage(ctx context.Context) {
if len(nsxSubnetPortSet) == 0 {
return
}
subnetPortList := &v1alpha1.SubnetPortList{}
err := r.Client.List(ctx, subnetPortList)

crSubnetPortIDsSet, err := r.SubnetPortService.ListSubnetPortIDsFromCRs(ctx)
if err != nil {
log.Error(err, "failed to list SubnetPort CR")
return
}

CRSubnetPortSet := sets.New[string]()
for _, subnetPort := range subnetPortList.Items {
subnetPortID := r.SubnetPortService.BuildSubnetPortId(&subnetPort.ObjectMeta)
CRSubnetPortSet.Insert(subnetPortID)
}

diffSet := nsxSubnetPortSet.Difference(CRSubnetPortSet)
diffSet := nsxSubnetPortSet.Difference(crSubnetPortIDsSet)
for elem := range diffSet {
log.V(1).Info("GC collected SubnetPort CR", "UID", elem)
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteTotal, MetricResTypeSubnetPort)
err = r.SubnetPortService.DeleteSubnetPort(elem)
err = r.SubnetPortService.DeleteSubnetPortById(elem)
if err != nil {
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResTypeSubnetPort)
} else {
Expand Down Expand Up @@ -359,7 +351,7 @@ func (r *SubnetPortReconciler) UpdateSubnetPortStatusConditions(ctx context.Cont
}
}

func (r *SubnetPortReconciler) mergeSubnetPortStatusCondition(ctx context.Context, subnetPort *v1alpha1.SubnetPort, newCondition *v1alpha1.Condition) bool {
func (r *SubnetPortReconciler) mergeSubnetPortStatusCondition(_ context.Context, subnetPort *v1alpha1.SubnetPort, newCondition *v1alpha1.Condition) bool {
matchedCondition := getExistingConditionOfType(newCondition.Type, subnetPort.Status.Conditions)

if reflect.DeepEqual(matchedCondition, newCondition) {
Expand Down Expand Up @@ -420,7 +412,7 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co
_, err = r.SubnetService.GetSubnetByPath(subnetPath)
if err != nil {
log.Info("previous NSX subnet is deleted, deleting the stale subnet port", "subnetPort.UID", subnetPort.UID, "subnetPath", subnetPath)
if err = r.SubnetPortService.DeleteSubnetPort(subnetPortID); err != nil {
if err = r.SubnetPortService.DeleteSubnetPortById(subnetPortID); err != nil {
log.Error(err, "failed to delete the stale subnetport", "subnetport.UID", subnetPort.UID)
return
}
Expand All @@ -439,7 +431,7 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co
log.Error(err, "subnet CR not found", "subnet CR", namespacedName)
return
}
if subnet != nil && !subnet.DeletionTimestamp.IsZero() {
if !subnet.DeletionTimestamp.IsZero() {
isStale = true
err := fmt.Errorf("subnet %s is being deleted, cannot operate subnetport %s", namespacedName, subnetPort.Name)
return true, "", err
Expand All @@ -464,7 +456,7 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co
log.Error(err, "subnetSet CR not found", "subnet CR", namespacedName)
return
}
if subnetSet != nil && !subnetSet.DeletionTimestamp.IsZero() {
if !subnetSet.DeletionTimestamp.IsZero() {
isStale = true
err = fmt.Errorf("subnetset %s is being deleted, cannot operate subnetport %s", namespacedName, subnetPort.Name)
return
Expand Down
Loading

0 comments on commit 9b9bb82

Please sign in to comment.