Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry support for ingestion layer #60

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions gslb/gslbutils/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
PassthroughRoute = "passthrough"
ThirdPartyMemberType = "ThirdPartyMember"
HostRuleType = "HostRule"
GslbHostRuleType = "GSLBHostRule"

// Refresh cycle for AVI cache in seconds
DefaultRefreshInterval = 600
Expand All @@ -54,10 +55,11 @@ const (
GSFQDNKeyLen = 3

// Default values for Retry Operations
SlowSyncTime = 120
SlowRetryQueue = "SlowRetry"
FastRetryQueue = "FastRetry"
DefaultRetryCount = 5
SlowSyncTime = 120
SlowRetryQueue = "SlowRetry"
FastRetryQueue = "FastRetry"
IngestionRetryQueue = "IngestionRetry"
DefaultRetryCount = 5

// Identify objects created by AMKO
AmkoUser = "amko-gslb"
Expand Down
73 changes: 73 additions & 0 deletions gslb/gslbutils/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2021 VMware, Inc.
* All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package gslbutils

// Error codes
const (
ObjectErrStatus = 1
ControllerErrStatus = 2
ResponseParseStatus = 3
FederatedErrStatus = 4
)

type ControllerValidationError struct {
errCode int
msg string
}

func (vErr ControllerValidationError) Error() string {
if vErr.errCode < 5 && vErr.errCode > 0 {
return vErr.msg
}
return "unknown status code"
}

func GetIngestionErrorForObjectNotFound(errMsg string) error {
return ControllerValidationError{errCode: ObjectErrStatus, msg: errMsg}
}

func GetIngestionErrorForController(errMsg string) error {
return ControllerValidationError{errCode: ControllerErrStatus, msg: errMsg}
}

func GetIngestionErrorForParsing(errMsg string) error {
return ControllerValidationError{errCode: ResponseParseStatus, msg: errMsg}
}

func GetIngestionErrorForObjectNotFederated(errMsg string) error {
return ControllerValidationError{errCode: FederatedErrStatus, msg: errMsg}
}

// IsControllerError returns true only if there was an issue in communicating with the controller.
func IsControllerError(err error) bool {
vErr, ok := err.(ControllerValidationError)
if !ok || vErr.errCode != ControllerErrStatus {
return false
}
return true
}

// IsRetriableOnError returns true only if a retry is required
func IsRetriableOnError(err error) bool {
// For errors other than object not federated, we will retry for everything else
vErr, ok := err.(ControllerValidationError)
if !ok {
return false
}
if vErr.errCode == FederatedErrStatus {
return false
}
return true
}
22 changes: 16 additions & 6 deletions gslb/gslbutils/gslbutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ func ExtractMultiClusterKey(key string) (string, string, string, string, string)
return operation, objType, cluster, ns, name
}

func ExtractIngestionRetryQueueKey(key string) (string, string, string, string, error) {
segments := strings.Split(key, "/")
if len(segments) != 4 {
return "", "", "", "", fmt.Errorf("unexpected segment length for key %s", key)
}
return segments[0], segments[1], segments[2], segments[3], nil
}

func GetObjectTypeFromKey(key string) (string, error) {
segments := strings.Split(key, "/")
if len(segments) < 2 {
Expand Down Expand Up @@ -436,10 +444,11 @@ var waitGroupMap map[string]*sync.WaitGroup
var wgSyncOnce sync.Once

const (
WGIngestion = "ingestion"
WGFastRetry = "fastretry"
WGSlowRetry = "slowretry"
WGGraph = "graph"
WGIngestion = "ingestion"
WGFastRetry = "fastretry"
WGSlowRetry = "slowretry"
WGGraph = "graph"
WGIngestionRetry = "ingestionretry"
)

func SetWaitGroupMap() {
Expand All @@ -449,6 +458,7 @@ func SetWaitGroupMap() {
waitGroupMap[WGFastRetry] = &sync.WaitGroup{}
waitGroupMap[WGGraph] = &sync.WaitGroup{}
waitGroupMap[WGSlowRetry] = &sync.WaitGroup{}
waitGroupMap[WGIngestionRetry] = &sync.WaitGroup{}
})
}

Expand Down Expand Up @@ -625,12 +635,12 @@ func GetUriFromAvi(uri string, aviClient *clients.AviClient, infiniteRetry bool)
}
// For 404, return
if aviError.HttpStatusCode == 404 {
return nil, fmt.Errorf("%s", *aviError.Message)
return nil, GetIngestionErrorForObjectNotFound(fmt.Sprintf("object not found for uri: %s", uri))
}
// All other errors, retry
Errf("uri: %s, aviErr: %v, will retry %d", uri, aviError, i)
if !infiniteRetry && i == 2 {
return nil, err
return nil, GetIngestionErrorForController(err.Error())
}
time.Sleep(RestSleepTime)
}
Expand Down
4 changes: 4 additions & 0 deletions gslb/ingestion/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func checkGslbHostRulesAndInitialize() error {
if err != nil {
updateGSLBHR(&gslbHr, err.Error(), GslbHostRuleRejected)
gslbutils.Errf("Error in accepting GSLB Host Rule %s : %v", gsFqdn, err)
if gslbutils.IsRetriableOnError(err) {
updateIngestionRetryAddCache(&gslbHr)
publishKeyToIngestionRetry(gslbutils.ObjectAdd, gslbutils.GslbHostRuleType, gslbHr.Namespace, gslbHr.Name)
}
continue
}
gsFqdnHostRule := gsHostRulesList.GetGSHostRulesForFQDN(gsFqdn)
Expand Down
10 changes: 6 additions & 4 deletions gslb/ingestion/gdp_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,9 @@ func GDPSanityChecks(gdp *gdpalphav2.GlobalDeploymentPolicy, fullSync bool) erro
// Health monotor validity
if len(gdp.Spec.HealthMonitorRefs) != 0 {
for _, hmRef := range gdp.Spec.HealthMonitorRefs {
if !isHealthMonitorRefValid(hmRef, true, fullSync) {
return fmt.Errorf("health monitor ref %s is invalid", hmRef)
err := isHealthMonitorRefValid(hmRef, true, fullSync)
if err != nil {
return fmt.Errorf("health monitor ref %s is invalid: %s", hmRef, err.Error())
}
}
}
Expand All @@ -339,8 +340,9 @@ func GDPSanityChecks(gdp *gdpalphav2.GlobalDeploymentPolicy, fullSync bool) erro
if gdp.Spec.SitePersistenceRef != nil && *gdp.Spec.SitePersistenceRef == "" {
return fmt.Errorf("empty string as site persistence reference not supported")
} else if gdp.Spec.SitePersistenceRef != nil {
if !isSitePersistenceProfilePresent(*gdp.Spec.SitePersistenceRef, true, fullSync) {
return fmt.Errorf("site persistence ref %s not present", *gdp.Spec.SitePersistenceRef)
err := isSitePersistenceProfilePresent(*gdp.Spec.SitePersistenceRef, true, fullSync)
if err != nil {
return fmt.Errorf("site persistence ref %s is invalid: %s", *gdp.Spec.SitePersistenceRef, err.Error())
}
}
return nil
Expand Down
8 changes: 7 additions & 1 deletion gslb/ingestion/gslb.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,14 @@ func Initialize() {
graphQueueParams := utils.WorkerQueue{NumWorkers: gslbutils.NumRestWorkers, WorkqueueName: utils.GraphLayer}
slowRetryQParams := utils.WorkerQueue{NumWorkers: 1, WorkqueueName: gslbutils.SlowRetryQueue, SlowSyncTime: gslbutils.SlowSyncTime}
fastRetryQParams := utils.WorkerQueue{NumWorkers: 1, WorkqueueName: gslbutils.FastRetryQueue}
ingestionRetryQParams := utils.WorkerQueue{NumWorkers: 1, WorkqueueName: gslbutils.IngestionRetryQueue, SlowSyncTime: gslbutils.SlowSyncTime}

utils.SharedWorkQueue(&ingestionQueueParams, &graphQueueParams, &slowRetryQParams, &fastRetryQParams)
utils.SharedWorkQueue(&ingestionQueueParams, &graphQueueParams, &slowRetryQParams, &fastRetryQParams, &ingestionRetryQParams)

// Set workers for ingestion queue retry workers
ingestionRetryQueue := utils.SharedWorkQueue().GetQueueByName(gslbutils.IngestionRetryQueue)
ingestionRetryQueue.SyncFunc = IngestionRetryAddUpdate
ingestionRetryQueue.Run(stopCh, gslbutils.GetWaitGroupFromMap(gslbutils.WGIngestionRetry))

// Set workers for layer 3 (REST layer)
graphSharedQueue := utils.SharedWorkQueue().GetQueueByName(utils.GraphLayer)
Expand Down
74 changes: 40 additions & 34 deletions gslb/ingestion/gslb_host_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ func isSitePersistenceRefPresentInCache(spName string) bool {
return present
}

func isSitePersistenceProfilePresent(profileName string, gdp bool, fullSync bool) bool {
func isSitePersistenceProfilePresent(profileName string, gdp bool, fullSync bool) error {
if fullSync && isSitePersistenceRefPresentInCache(profileName) {
gslbutils.Debugf("site persistence ref %s present in site persistence cache", profileName)
return true
return nil
}
// Check if the profile mentioned in gslbHostRule are present as application persistence profile on the gslb leader
aviClient := avictrl.SharedAviClients().AviClient[0]
Expand All @@ -94,14 +94,13 @@ func isSitePersistenceProfilePresent(profileName string, gdp bool, fullSync bool
result, err := gslbutils.GetUriFromAvi(uri, aviClient, gdp)
if err != nil {
gslbutils.Errf("Error getting uri %s from Avi : %s", uri, err)
return false
return err
}
if result.Count == 0 {
gslbutils.Errf("Site Persistent Profile %s does not exist", profileName)
return false
return fmt.Errorf("SitePersistence profile %s doesn't exist", profileName)
}

return true
return nil
}

func isFallbackAlgorithmValid(fa *gslbhralphav1.GeoFallback) (bool, error) {
Expand Down Expand Up @@ -177,73 +176,70 @@ func isHealthMonitorRefPresentInCache(hmName string) bool {
// If not, return false.
// For non-full sync cases, the hm ref will be fetched from the GSLB leader and verified. The HM
// cache won't be checked for such cases.
func isHealthMonitorRefValid(refName string, gdp bool, fullSync bool) bool {
func isHealthMonitorRefValid(refName string, gdp bool, fullSync bool) error {
if fullSync && isHealthMonitorRefPresentInCache(refName) {
gslbutils.Debugf("health monitor %s present in hm cache", refName)
return true
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that the ref is removed from the controller but present in cache ?

}
aviClient := avictrl.SharedAviClients().AviClient[0]
uri := "api/healthmonitor?name=" + refName

result, err := gslbutils.GetUriFromAvi(uri, aviClient, gdp)
if err != nil {
gslbutils.Errf("Error in getting uri %s from Avi: %v", uri, err)
return false
return err
}
if result.Count == 0 {
gslbutils.Errf("Health Monitor %s does not exist", refName)
return false
return gslbutils.GetIngestionErrorForObjectNotFound("healthmonitor " + refName + " doesn't exist")
}
gslbutils.Logf("health monitor %s fetched from controller", refName)
elems := make([]json.RawMessage, result.Count)
err = json.Unmarshal(result.Results, &elems)
if err != nil {
gslbutils.Errf("failed to unmarshal health monitor data for ref %s: %v", refName, err)
return false
return gslbutils.GetIngestionErrorForParsing(fmt.Sprintf("failed to unmarshal health monitor data for ref %s: %v", refName, err))
}
hm := models.HealthMonitor{}
err = json.Unmarshal(elems[0], &hm)
if err != nil {
gslbutils.Errf("failed to unmarshal the first health monitor element: %v", err)
return false
return gslbutils.GetIngestionErrorForParsing(fmt.Sprintf("failed to unmarshal the first health monitor element: %v", err))
}
if hm.IsFederated != nil && *hm.IsFederated {
return true
return nil
} else {
gslbutils.Errf("health monitor ref %s is not federated, can't add", refName)
errStr := fmt.Sprintf("health monitor ref %s is not federated, can't add", refName)
return gslbutils.GetIngestionErrorForObjectNotFederated(errStr)
}
return false
}

func isThirdPartyMemberSitePresent(gslbhr *gslbhralphav1.GSLBHostRule, siteName string) bool {
func isThirdPartyMemberSitePresent(gslbhr *gslbhralphav1.GSLBHostRule, siteName string) error {
// Verify the presence of the third party member sites on the gslb leader
aviClient := avictrl.SharedAviClients().AviClient[0]
uri := "api/gslb"
result, err := gslbutils.GetUriFromAvi(uri, aviClient, false)
if err != nil {
gslbutils.Errf("Error in getting uri %s from Avi: %v", uri, err)
return false
return err
}
elems := make([]json.RawMessage, result.Count)
err = json.Unmarshal(result.Results, &elems)
if err != nil {
gslbutils.Errf("Failed to unmarshal GSLB data, err: %v", err)
return gslbutils.GetIngestionErrorForParsing(fmt.Sprintf("failed to unmarshal GSLB data, err: %v", err))
}
for _, elem := range elems {
gslb := models.Gslb{}
err = json.Unmarshal(elem, &gslb)
if err != nil {
gslbutils.Errf("Failed to unmarshal GSLB data, err: %v", err)
continue
}
tpms := gslb.ThirdPartySites
for _, tpm := range tpms {
if *tpm.Name == siteName {
return true
return nil
}
}
}
gslbutils.Errf("Third Party Member Site %s does not exist", siteName)
return false
return gslbutils.GetIngestionErrorForObjectNotFound(fmt.Sprintf("third party member site %s doesn't exist", siteName))
}

func ValidateGSLBHostRule(gslbhr *gslbhralphav1.GSLBHostRule, fullSync bool) error {
Expand All @@ -265,9 +261,11 @@ func ValidateGSLBHostRule(gslbhr *gslbhralphav1.GSLBHostRule, fullSync bool) err
sitePersistence := gslbhrSpec.SitePersistence
if sitePersistence != nil {
sitePersistenceProfileName := sitePersistence.ProfileRef
if sitePersistence.Enabled && !isSitePersistenceProfilePresent(sitePersistenceProfileName, false, fullSync) {
errmsg = "SitePersistence Profile " + sitePersistenceProfileName + " error for " + gslbhrName + " GSLBHostRule"
return fmt.Errorf(errmsg)
if sitePersistence.Enabled {
err := isSitePersistenceProfilePresent(sitePersistenceProfileName, false, fullSync)
if err != nil {
return err
}
}
}

Expand All @@ -283,17 +281,17 @@ func ValidateGSLBHostRule(gslbhr *gslbhralphav1.GSLBHostRule, fullSync bool) err
errmsg := "Invalid VIP for thirdPartyMember site " + tpmember.Site + "," + gslbhrName + " GSLBHostRule (expecting IP address)"
return fmt.Errorf(errmsg)
}
if !isThirdPartyMemberSitePresent(gslbhr, tpmember.Site) {
errmsg = "ThirdPartyMember site " + tpmember.Site + " does not exist for " + gslbhrName + " GSLBHostRule"
return fmt.Errorf(errmsg)
err := isThirdPartyMemberSitePresent(gslbhr, tpmember.Site)
if err != nil {
return err
}
}

healthMonitorRefs := gslbhrSpec.HealthMonitorRefs
for _, ref := range healthMonitorRefs {
if !isHealthMonitorRefValid(ref, false, fullSync) {
errmsg = "Health Monitor Ref " + ref + " error for " + gslbhrName + " GSLBHostRule"
return fmt.Errorf(errmsg)
err := isHealthMonitorRefValid(ref, false, fullSync)
if err != nil {
return err
}
}
return nil
Expand All @@ -312,6 +310,10 @@ func AddGSLBHostRuleObj(obj interface{}, k8swq []workqueue.RateLimitingInterface
if err != nil {
updateGSLBHR(gslbhr, err.Error(), GslbHostRuleRejected)
gslbutils.Errf("Error in accepting GSLB Host Rule %s : %s", gsFqdn, err.Error())
if gslbutils.IsRetriableOnError(err) {
updateIngestionRetryAddCache(gslbhr)
publishKeyToIngestionRetry(gslbutils.ObjectAdd, gslbutils.GslbHostRuleType, gslbhr.Namespace, gslbhr.Name)
}
return
}

Expand Down Expand Up @@ -386,9 +388,13 @@ func UpdateGSLBHostRuleObj(old, new interface{}, k8swq []workqueue.RateLimitingI
gslbutils.Errf("Error in accepting GSLB Host Rule %s : %s", newGslbhr.ObjectMeta.Name, err.Error())
// check if previous GSLB host rule was accepted, if yes, we need to add a delete key if this new
// object is rejected
if oldGslbhr.Status.Status == GslbHostRuleAccepted {
if !gslbutils.IsControllerError(err) && (oldGslbhr.Status.Status == GslbHostRuleAccepted) {
deleteObjsForGSHostRule(oldGslbhr, k8swq, numWorkers)
}
if gslbutils.IsRetriableOnError(err) {
updateIngestionRetryUpdateCache(oldGslbhr, newGslbhr)
publishKeyToIngestionRetry(gslbutils.ObjectUpdate, gslbutils.GslbHostRuleType, newGslbhr.Namespace, newGslbhr.Name)
}
return
}
gsHostRulesList := gslbutils.GetGSHostRulesList()
Expand Down
Loading