Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Qiu <[email protected]>
  • Loading branch information
wenqiq committed Oct 11, 2024
1 parent 9bf2518 commit d743814
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 36 deletions.
78 changes: 51 additions & 27 deletions pkg/controllers/networkinfo/networkinfo_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -47,28 +48,27 @@ type NetworkInfoReconciler struct {
func (r *NetworkInfoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
startTime := time.Now()
defer func() {
log.Info("Finished reconciling NetworkInfo", "NetworkInfo", req.NamespacedName, "duration", time.Since(startTime))
log.Info("Finished reconciling NetworkInfo", "NetworkInfo", req.NamespacedName, "duration(ms)", time.Since(startTime).Milliseconds())
}()
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerSyncTotal, common.MetricResTypeNetworkInfo)

networkInfoCR := &v1alpha1.NetworkInfo{}
if err := r.Client.Get(ctx, req.NamespacedName, networkInfoCR); err != nil {
// IgnoreNotFound returns nil on NotFound errors.
if client.IgnoreNotFound(err) == nil {
if err := r.deleteStaleVPCs(ctx, req.Namespace); err != nil {
if apierrors.IsNotFound(err) {
if err := r.deleteStaleVPCs(ctx, req.Namespace, ""); err != nil {
log.Error(err, "Failed to delete stale NSX VPC", "NetworkInfo", req.NamespacedName)
return common.ResultRequeue, err
}
return common.ResultNormal, nil
}
log.Error(err, "Unable to fetch NetworkInfo CR", "req", req.NamespacedName)
log.Error(err, "Unable to fetch NetworkInfo CR", "NetworkInfo", req.NamespacedName)
return common.ResultRequeue, err
}

// Check if the CR is marked for deletion
if !networkInfoCR.ObjectMeta.DeletionTimestamp.IsZero() {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, common.MetricResTypeNetworkInfo)
if err := r.deleteStaleVPCs(ctx, networkInfoCR.GetNamespace()); err != nil {
if err := r.deleteStaleVPCs(ctx, networkInfoCR.GetNamespace(), string(networkInfoCR.UID)); err != nil {
deleteFail(r, ctx, networkInfoCR, &err, r.Client)
log.Error(err, "Failed to delete stale NSX VPC, retrying", "NetworkInfo", req.NamespacedName)
return common.ResultRequeue, err
Expand Down Expand Up @@ -156,15 +156,15 @@ func (r *NetworkInfoReconciler) Reconcile(ctx context.Context, req ctrl.Request)

vpcConnectivityProfile, err := r.Service.GetVpcConnectivityProfile(&nc, vpcConnectivityProfilePath)
if err != nil {
log.Error(err, "Failed to get VPC connectivity profile", "VPC", req.NamespacedName)
log.Error(err, "Failed to get VPC connectivity profile", "NetworkInfo", req.NamespacedName)
updateFail(r, ctx, networkInfoCR, &err, r.Client, nil)
return common.ResultRequeueAfter10sec, err
}
hasExternalIPs := true
if ncName == commonservice.SystemVPCNetworkConfigurationName {
if len(vpcConnectivityProfile.ExternalIpBlocks) == 0 {
hasExternalIPs = false
log.Error(err, "there is no ExternalIPBlock in VPC ConnectivityProfile", "VPC", req.NamespacedName)
log.Error(err, "There is no ExternalIPBlock in VPC ConnectivityProfile", "NetworkInfo", req.NamespacedName)
}
setVPCNetworkConfigurationStatusWithNoExternalIPBlock(ctx, r.Client, vpcNetworkConfiguration, hasExternalIPs)
}
Expand All @@ -189,11 +189,11 @@ func (r *NetworkInfoReconciler) Reconcile(ctx context.Context, req ctrl.Request)
vpcNetworkConfiguration := &v1alpha1.VPCNetworkConfiguration{}
err := r.Client.Get(ctx, types.NamespacedName{Name: ncName}, vpcNetworkConfiguration)
if err != nil {
log.Error(err, "failed to get VPCNetworkConfiguration", "Name", ncName)
log.Error(err, "Failed to get VPCNetworkConfiguration", "Name", ncName)
updateFail(r, ctx, networkInfoCR, &err, r.Client, nil)
return common.ResultRequeueAfter10sec, err
}
log.Info("got the AutoSnat status", "autoSnatEnabled", autoSnatEnabled, "req", req.NamespacedName)
log.Info("Got the AutoSnat status", "autoSnatEnabled", autoSnatEnabled, "NetworkInfo", req.NamespacedName)
setVPCNetworkConfigurationStatusWithSnatEnabled(ctx, r.Client, vpcNetworkConfiguration, autoSnatEnabled)
}

Expand All @@ -203,7 +203,7 @@ func (r *NetworkInfoReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if lbProvider == vpc.AVILB && createdVpc.LoadBalancerVpcEndpoint != nil && createdVpc.LoadBalancerVpcEndpoint.Enabled != nil && *createdVpc.LoadBalancerVpcEndpoint.Enabled {
path, cidr, err = r.Service.GetAVISubnetInfo(*createdVpc)
if err != nil {
log.Error(err, "failed to read lb subnet path and cidr", "VPC", createdVpc.Id)
log.Error(err, "Failed to read LB Subnet path and CIDR", "VPC", createdVpc.Id)
state := &v1alpha1.VPCState{
Name: *createdVpc.DisplayName,
DefaultSNATIP: snatIP,
Expand All @@ -230,7 +230,7 @@ func (r *NetworkInfoReconciler) Reconcile(ctx context.Context, req ctrl.Request)
setVPCNetworkConfigurationStatusWithLBS(ctx, r.Client, ncName, state.Name, path, nsxLBSPath, *createdVpc.Path)
updateSuccess(r, ctx, networkInfoCR, r.Client, state, nc.Name, path)
if ncName == commonservice.SystemVPCNetworkConfigurationName && (!gatewayConnectionReady || !autoSnatEnabled || !hasExternalIPs) {
log.Info("Requeuing NetworkInfo CR because VPCNetworkConfiguration system is not ready", "gatewayConnectionReason", gatewayConnectionReason, "autoSnatEnabled", autoSnatEnabled, "hasExternalIPs", hasExternalIPs, "req", req)
log.Info("Requeue NetworkInfo CR because VPCNetworkConfiguration system is not ready", "gatewayConnectionReason", gatewayConnectionReason, "autoSnatEnabled", autoSnatEnabled, "hasExternalIPs", hasExternalIPs, "req", req)
return common.ResultRequeueAfter60sec, nil
}

Expand Down Expand Up @@ -268,6 +268,22 @@ func (r *NetworkInfoReconciler) Start(mgr ctrl.Manager) error {
return nil
}

func (r *NetworkInfoReconciler) listNamespaceCRsName(ctx context.Context) (sets.Set[string], sets.Set[string], error) {
// read all Namespaces from K8s
namespaces := &corev1.NamespaceList{}
err := r.Client.List(ctx, namespaces)
if err != nil {
return nil, nil, err
}
nsSet := sets.Set[string]{}
idSet := sets.Set[string]{}
for _, ns := range namespaces.Items {
nsSet.Insert(ns.Name)
idSet.Insert(string(ns.UID))
}
return nsSet, idSet, nil
}

// CollectGarbage logic for NSX VPC is that:
// 1. list all current existing namespace in kubernetes
// 2. list all the NSX VPC in vpcStore
Expand All @@ -286,28 +302,23 @@ func (r *NetworkInfoReconciler) CollectGarbage(ctx context.Context) {
return
}

// read all namespaces from k8s
namespaces := &corev1.NamespaceList{}
err := r.Client.List(ctx, namespaces)
nsSet, idSet, err := r.listNamespaceCRsName(ctx)
if err != nil {
log.Error(err, "Failed to list Kubernetes Namespaces")
return
}
nsSet := sets.NewString()
for _, ns := range namespaces.Items {
nsSet.Insert(ns.Name)
}

for i, nsxVPC := range nsxVPCList {
nsxVPCNamespace := getNamespaceFromNSXVPC(&nsxVPCList[i])
if nsSet.Has(nsxVPCNamespace) {
nsxVPCNamespaceName := filterTagFromNSXVPC(&nsxVPCList[i], commonservice.TagScopeNamespace)
nsxVPCNamespaceID := filterTagFromNSXVPC(&nsxVPCList[i], commonservice.TagScopeNamespaceUID)
if nsSet.Has(nsxVPCNamespaceName) && idSet.Has(nsxVPCNamespaceID) {
continue
}
log.Info("Garbage collecting NSX VPC object", "VPC", nsxVPC.Id, "Namespace", nsxVPCNamespace)
log.Info("Garbage collecting NSX VPC object", "VPC", nsxVPC.Id, "Namespace", nsxVPCNamespaceName)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, common.MetricResTypeNetworkInfo)

if err = r.Service.DeleteVPC(*nsxVPC.Path); err != nil {
log.Error(err, "Failed to delete NSX VPC", "VPC", nsxVPC.Id, "Namespace", nsxVPCNamespace)
log.Error(err, "Failed to delete NSX VPC", "VPC", nsxVPC.Id, "Namespace", nsxVPCNamespaceName)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteFailTotal, common.MetricResTypeNetworkInfo)
continue
}
Expand All @@ -317,29 +328,42 @@ func (r *NetworkInfoReconciler) CollectGarbage(ctx context.Context) {
}
}

func (r *NetworkInfoReconciler) deleteStaleVPCs(ctx context.Context, ns string) error {
func (r *NetworkInfoReconciler) deleteStaleVPCs(ctx context.Context, ns, id string) error {
isShared, err := r.Service.IsSharedVPCNamespaceByNS(ns)
if err != nil {
return fmt.Errorf("failed to check if Namespace is shared for NS %s: %w", ns, err)
}
if isShared {
log.Info("shared Namespace, skipping deletion of NSX VPC", "Namespace", ns)
log.Info("Shared Namespace, skipping deletion of NSX VPC", "Namespace", ns)
return nil
}
nsSet, idSet, err := r.listNamespaceCRsName(ctx)
if err != nil {
log.Error(err, "Failed to list Kubernetes Namespaces")
return fmt.Errorf("failed to list Kubernetes Namespaces while deleting VPCs: %v", err)
}
// Retrieve stale VPCs associated with the Namespace
staleVPCs := r.Service.GetVPCsByNamespace(ns)
if len(staleVPCs) == 0 {
log.Info("no VPCs found in store, skipping deletion of NSX VPC", "Namespace", ns)
log.Info("There is no VPCs found in store, skipping deletion of NSX VPC", "Namespace", ns)
return nil
}
var deleteErrs []error
for _, nsxVPC := range staleVPCs {
namespaceIDofVPC := filterTagFromNSXVPC(nsxVPC, commonservice.TagScopeNamespaceUID)
namespaceNameofVPC := filterTagFromNSXVPC(nsxVPC, commonservice.TagScopeNamespace)
// if delete with Name, the id is "", `id != namespaceIDofVPC` will always be true
// if delete with Name and id, namespaceNameofVPC and namespaceIDofVPC will in K8s, if id == namespaceIDofVPC, we should delete it, otherwise skip deleting it
if nsSet.Has(namespaceNameofVPC) && idSet.Has(namespaceIDofVPC) && id != namespaceIDofVPC {
log.Info("Namespace still exists in Kubernetes, skipping deletion of NSX VPC", "Namespace", ns)
continue
}
if nsxVPC.Path == nil {
log.Error(nil, "VPC path is nil, skipping", "VPC", nsxVPC)
continue
}
if err := r.Service.DeleteVPC(*nsxVPC.Path); err != nil {
log.Error(err, "failed to delete VPC in NSX", "VPC", nsxVPC.Path)
log.Error(err, "Failed to delete VPC in NSX", "VPC", nsxVPC.Path)
deleteErrs = append(deleteErrs, fmt.Errorf("failed to delete VPC %s: %w", *nsxVPC.Path, err))
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/networkinfo/networkinfo_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func TestNetworkInfoReconciler_deleteStaleVPCs(t *testing.T) {
})
defer patches.Reset()

err := r.deleteStaleVPCs(ctx, namespace)
err := r.deleteStaleVPCs(ctx, namespace, "")
require.NoError(t, err)
})

Expand All @@ -767,7 +767,7 @@ func TestNetworkInfoReconciler_deleteStaleVPCs(t *testing.T) {
})
defer patches.Reset()

err := r.deleteStaleVPCs(ctx, namespace)
err := r.deleteStaleVPCs(ctx, namespace, "")
require.NoError(t, err)
})

Expand All @@ -784,7 +784,7 @@ func TestNetworkInfoReconciler_deleteStaleVPCs(t *testing.T) {
})
defer patches.Reset()

err := r.deleteStaleVPCs(ctx, namespace)
err := r.deleteStaleVPCs(ctx, namespace, "")
assert.Error(t, err)
assert.Contains(t, err.Error(), "delete failed")
})
Expand Down Expand Up @@ -812,7 +812,7 @@ func TestNetworkInfoReconciler_deleteStaleVPCs(t *testing.T) {
})
defer patches.Reset()

err := r.deleteStaleVPCs(ctx, namespace)
err := r.deleteStaleVPCs(ctx, namespace, "")
require.NoError(t, err)
})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/networkinfo/networkinfo_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ func deleteVPCNetworkConfigurationStatus(ctx context.Context, client client.Clie
log.Info("Deleted stale VPCNetworkConfiguration status", "Name", ncName, "nc.Status.VPCs", nc.Status.VPCs, "staleVPCs", staleVPCNames)
}

func getNamespaceFromNSXVPC(nsxVPC *model.Vpc) string {
func filterTagFromNSXVPC(nsxVPC *model.Vpc, tagName string) string {
tags := nsxVPC.Tags
for _, tag := range tags {
if *tag.Scope == svccommon.TagScopeNamespace {
if *tag.Scope == tagName {
return *tag.Tag
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/nsx/services/vpc/vpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,13 +698,13 @@ func (s *VPCService) IsLBProviderChanged(existingVPC *model.Vpc, lbProvider LBPr
return false
}
func (s *VPCService) CreateOrUpdateVPC(obj *v1alpha1.NetworkInfo, nc *common.VPCNetworkConfigInfo, lbProvider LBProvider) (*model.Vpc, error) {
// check from VPC store if vpc already exist
// check from VPC store if VPC already exist
ns := obj.Namespace
updateVpc := false
nsObj := &v1.Namespace{}
// get name obj
// get Namespace
if err := s.Client.Get(ctx, types.NamespacedName{Name: obj.Namespace}, nsObj); err != nil {
log.Error(err, "unable to fetch namespace", "name", obj.Namespace)
log.Error(err, "unable to fetch Namespace", "Name", obj.Namespace)
return nil, err
}

Expand Down

0 comments on commit d743814

Please sign in to comment.