From e610db138005702c9cf617f4f0d63d82fba90269 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rio=20Bezerra?= Date: Wed, 6 Sep 2023 23:08:35 -0300 Subject: [PATCH] Implement Verify Weight (#37) Signed-off-by: Mario Bezerra --- pkg/mocks/plugin.go | 102 ++++++++++++--- pkg/plugin/plugin.go | 264 +++++++++++++++++++++++++++----------- pkg/plugin/plugin_test.go | 45 ++++--- pkg/utils/utils.go | 13 -- 4 files changed, 305 insertions(+), 119 deletions(-) diff --git a/pkg/mocks/plugin.go b/pkg/mocks/plugin.go index 3c44ae3..5447cc9 100644 --- a/pkg/mocks/plugin.go +++ b/pkg/mocks/plugin.go @@ -3,33 +3,101 @@ package mocks import ( contourv1 "github.com/projectcontour/contour/apis/projectcontour/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" ) const ( Namespace = "default" - StableServiceName = "argo-rollouts-stable-service" - CanaryServiceName = "argo-rollouts-canary-service" - HTTPProxyName = "argo-rollouts-httpproxy" + StableServiceName = "argo-rollouts-stable" + CanaryServiceName = "argo-rollouts-canary" + + HTTPProxyName = "argo-rollouts" + ValidHTTPProxyName = "argo-rollouts-valid" + OutdatedHTTPProxyName = "argo-rollouts-outdated" + InvalidHTTPProxyName = "argo-rollouts-invalid" + FalseConditionHTTPProxyName = "argo-rollouts-false-condition" + + HTTPProxyGeneration = 1 + HTTPProxyDesiredWeight = 20 ) -var HTTPProxyObj = contourv1.HTTPProxy{ - ObjectMeta: metav1.ObjectMeta{ - Name: HTTPProxyName, - Namespace: Namespace, - }, - Spec: contourv1.HTTPProxySpec{ - Routes: []contourv1.Route{ +func MakeObjects() []runtime.Object { + httpProxy := newHTTPProxy(HTTPProxyName) + + validHttpProxy := newHTTPProxy(ValidHTTPProxyName) + + invalidHttpProxy := newHTTPProxy(InvalidHTTPProxyName) + invalidHttpProxy.Status = contourv1.HTTPProxyStatus{ + Conditions: []contourv1.DetailedCondition{ + { + Condition: contourv1.Condition{ + Type: contourv1.ConditionTypeServiceError, + Status: contourv1.ConditionTrue, + ObservedGeneration: HTTPProxyGeneration, + }, + }, + }, + } + + outdatedHttpProxy := newHTTPProxy(OutdatedHTTPProxyName) + outdatedHttpProxy.Generation = HTTPProxyGeneration + 1 + + falseConditionHttpProxy := newHTTPProxy(FalseConditionHTTPProxyName) + falseConditionHttpProxy.Status = contourv1.HTTPProxyStatus{ + Conditions: []contourv1.DetailedCondition{ { - Services: []contourv1.Service{ - { - Name: StableServiceName, - Weight: 100, + Condition: contourv1.Condition{ + Type: contourv1.ValidConditionType, + Status: contourv1.ConditionFalse, + ObservedGeneration: HTTPProxyGeneration, + }, + }, + }, + } + + objs := []runtime.Object{ + httpProxy, + validHttpProxy, + invalidHttpProxy, + outdatedHttpProxy, + falseConditionHttpProxy, + } + return objs +} + +func newHTTPProxy(name string) *contourv1.HTTPProxy { + return &contourv1.HTTPProxy{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: Namespace, + Generation: HTTPProxyGeneration, + }, + Spec: contourv1.HTTPProxySpec{ + Routes: []contourv1.Route{ + { + Services: []contourv1.Service{ + { + Name: StableServiceName, + Weight: 100 - HTTPProxyDesiredWeight, + }, + { + Name: CanaryServiceName, + Weight: HTTPProxyDesiredWeight, + }, }, - { - Name: CanaryServiceName, + }, + }, + }, + Status: contourv1.HTTPProxyStatus{ + Conditions: []contourv1.DetailedCondition{ + { + Condition: contourv1.Condition{ + Type: contourv1.ValidConditionType, + Status: contourv1.ConditionTrue, + ObservedGeneration: HTTPProxyGeneration, }, }, }, }, - }, + } } diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index 7a373fe..6764113 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -3,24 +3,26 @@ package plugin import ( "context" "encoding/json" - "errors" "fmt" "log/slog" - "github.com/argoproj-labs/rollouts-plugin-trafficrouter-contour/pkg/utils" - "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" + rolloutsPlugin "github.com/argoproj/argo-rollouts/rollout/trafficrouting/plugin/rpc" pluginTypes "github.com/argoproj/argo-rollouts/utils/plugin/types" contourv1 "github.com/projectcontour/contour/apis/projectcontour/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" + + "github.com/argoproj-labs/rollouts-plugin-trafficrouter-contour/pkg/utils" ) // Type holds this controller type const Type = "Contour" +var _ rolloutsPlugin.TrafficRouterPlugin = (*RpcPlugin)(nil) + type RpcPlugin struct { IsTest bool dynamicClient dynamic.Interface @@ -33,93 +35,220 @@ type ContourTrafficRouting struct { HTTPProxies []string `json:"httpProxies" protobuf:"bytes,1,name=httpProxies"` } -func (r *RpcPlugin) InitPlugin() (re pluginTypes.RpcError) { - defer func() { - if e := recover(); e != nil { - re.ErrorString = e.(error).Error() - } - }() - +func (r *RpcPlugin) InitPlugin() pluginTypes.RpcError { if r.IsTest { - return + return pluginTypes.RpcError{} } - cfg := utils.Must1(utils.NewKubeConfig()) - r.dynamicClient = utils.Must1(dynamic.NewForConfig(cfg)) + cfg, err := utils.NewKubeConfig() + if err != nil { + return pluginTypes.RpcError{ErrorString: err.Error()} + } + + r.dynamicClient, err = dynamic.NewForConfig(cfg) + if err != nil { + return pluginTypes.RpcError{ErrorString: err.Error()} + } - return + return pluginTypes.RpcError{} } + func (r *RpcPlugin) UpdateHash(rollout *v1alpha1.Rollout, canaryHash, stableHash string, additionalDestinations []v1alpha1.WeightDestination) pluginTypes.RpcError { return pluginTypes.RpcError{} } -func (r *RpcPlugin) SetWeight( - rollout *v1alpha1.Rollout, - desiredWeight int32, - additionalDestinations []v1alpha1.WeightDestination) (re pluginTypes.RpcError) { - defer func() { - if e := recover(); e != nil { - re.ErrorString = e.(error).Error() - } - }() +func (r *RpcPlugin) SetWeight(rollout *v1alpha1.Rollout, desiredWeight int32, additionalDestinations []v1alpha1.WeightDestination) pluginTypes.RpcError { + if err := validateRolloutParameters(rollout); err != nil { + return pluginTypes.RpcError{ErrorString: err.Error()} + } - if rollout == nil || rollout.Spec.Strategy.Canary == nil || - rollout.Spec.Strategy.Canary.StableService == "" || - rollout.Spec.Strategy.Canary.CanaryService == "" { - utils.Must(errors.New("illegal parameter(s)")) + ctr, err := getContourTrafficRouting(rollout) + if err != nil { + return pluginTypes.RpcError{ErrorString: err.Error()} } ctx := context.Background() - ctr := ContourTrafficRouting{} - utils.Must(json.Unmarshal(rollout.Spec.Strategy.Canary.TrafficRouting.Plugins["argoproj-labs/contour"], &ctr)) - for _, proxy := range ctr.HTTPProxies { - slog.Debug("updating proxy", slog.String("proxy", proxy)) + slog.Debug("updating httpproxy", slog.String("name", proxy)) - var httpProxy contourv1.HTTPProxy - unstr := utils.Must1(r.dynamicClient.Resource(contourv1.HTTPProxyGVR).Namespace(rollout.Namespace).Get(ctx, proxy, metav1.GetOptions{})) - utils.Must(runtime.DefaultUnstructuredConverter.FromUnstructured(unstr.UnstructuredContent(), &httpProxy)) + if err := r.updateHTTPProxy(ctx, proxy, rollout, desiredWeight); err != nil { + slog.Error("failed to update httpproxy", slog.String("name", proxy), slog.Any("err", err)) + return pluginTypes.RpcError{ErrorString: err.Error()} + } - canarySvcName := rollout.Spec.Strategy.Canary.CanaryService - stableSvcName := rollout.Spec.Strategy.Canary.StableService + slog.Info("successfully updated httpproxy", slog.String("name", proxy)) + } - slog.Debug("the services name", slog.String("stable", stableSvcName), slog.String("canary", canarySvcName)) + return pluginTypes.RpcError{} +} - // TODO: filter by condition(s) - svcMap := getServiceMap(&httpProxy) - canarySvc := utils.Must1(getService(canarySvcName, svcMap)) - stableSvc := utils.Must1(getService(stableSvcName, svcMap)) +func (r *RpcPlugin) SetHeaderRoute(rollout *v1alpha1.Rollout, headerRouting *v1alpha1.SetHeaderRoute) pluginTypes.RpcError { + return pluginTypes.RpcError{} +} - slog.Debug("old weight", slog.Int64("canary", canarySvc.Weight), slog.Int64("stable", stableSvc.Weight)) +func (r *RpcPlugin) SetMirrorRoute(rollout *v1alpha1.Rollout, setMirrorRoute *v1alpha1.SetMirrorRoute) pluginTypes.RpcError { + return pluginTypes.RpcError{} +} - canarySvc.Weight = int64(desiredWeight) - stableSvc.Weight = 100 - canarySvc.Weight +func (r *RpcPlugin) VerifyWeight(rollout *v1alpha1.Rollout, desiredWeight int32, additionalDestinations []v1alpha1.WeightDestination) (pluginTypes.RpcVerified, pluginTypes.RpcError) { + if err := validateRolloutParameters(rollout); err != nil { + return pluginTypes.NotVerified, pluginTypes.RpcError{ErrorString: err.Error()} + } - slog.Debug("new weight", slog.Int64("canary", canarySvc.Weight), slog.Int64("stable", stableSvc.Weight)) + ctr, err := getContourTrafficRouting(rollout) + if err != nil { + return pluginTypes.NotVerified, pluginTypes.RpcError{ErrorString: err.Error()} + } + + ctx := context.Background() - m := utils.Must1(runtime.DefaultUnstructuredConverter.ToUnstructured(&httpProxy)) - updated, err := r.dynamicClient.Resource(contourv1.HTTPProxyGVR).Namespace(rollout.Namespace).Update(ctx, &unstructured.Unstructured{Object: m}, metav1.UpdateOptions{}) + for _, proxy := range ctr.HTTPProxies { + slog.Debug("verifying httpproxy", slog.String("name", proxy)) + + verified, err := r.verifyHTTPProxy(ctx, proxy, rollout, desiredWeight) if err != nil { - slog.Error("update the HTTPProxy is failed", slog.String("name", httpProxy.Name), slog.Any("err", err)) - utils.Must(err) + slog.Error("failed to verify httpproxy", slog.String("name", proxy), slog.Any("err", err)) + return pluginTypes.NotVerified, pluginTypes.RpcError{ErrorString: err.Error()} + } + if !verified { + return pluginTypes.NotVerified, pluginTypes.RpcError{} } - if r.IsTest { - proxy := contourv1.HTTPProxy{} - utils.Must(runtime.DefaultUnstructuredConverter.FromUnstructured(updated.UnstructuredContent(), &proxy)) - r.UpdatedMockHTTPProxy = &proxy + slog.Info("successfully verified httpproxy", slog.String("name", proxy)) + } + + return pluginTypes.Verified, pluginTypes.RpcError{} +} + +func (r *RpcPlugin) RemoveManagedRoutes(rollout *v1alpha1.Rollout) pluginTypes.RpcError { + return pluginTypes.RpcError{} +} + +func (r *RpcPlugin) Type() string { + return Type +} + +func (r *RpcPlugin) getHTTPProxy(ctx context.Context, namespace string, name string) (*contourv1.HTTPProxy, error) { + unstr, err := r.dynamicClient.Resource(contourv1.HTTPProxyGVR).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + var httpProxy contourv1.HTTPProxy + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstr.UnstructuredContent(), &httpProxy); err != nil { + return nil, err + } + return &httpProxy, nil +} + +func (r *RpcPlugin) updateHTTPProxy(ctx context.Context, httpProxyName string, rollout *v1alpha1.Rollout, desiredWeight int32) error { + httpProxy, err := r.getHTTPProxy(ctx, rollout.Namespace, httpProxyName) + if err != nil { + return err + } + + canarySvc, stableSvc, err := getCanaryAndStableServices(httpProxy, rollout) + if err != nil { + return err + } + + slog.Debug("old weight", slog.Int64("canary", canarySvc.Weight), slog.Int64("stable", stableSvc.Weight)) + + canarySvc.Weight = int64(desiredWeight) + stableSvc.Weight = 100 - canarySvc.Weight + + slog.Debug("new weight", slog.Int64("canary", canarySvc.Weight), slog.Int64("stable", stableSvc.Weight)) + + m, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&httpProxy) + if err != nil { + return err + } + updated, err := r.dynamicClient.Resource(contourv1.HTTPProxyGVR).Namespace(rollout.Namespace).Update(ctx, &unstructured.Unstructured{Object: m}, metav1.UpdateOptions{}) + if err != nil { + return err + } + + if r.IsTest { + var proxy contourv1.HTTPProxy + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updated.UnstructuredContent(), &proxy); err != nil { + return err } + r.UpdatedMockHTTPProxy = &proxy + } + + return nil +} + +func (r *RpcPlugin) verifyHTTPProxy(ctx context.Context, httpProxyName string, rollout *v1alpha1.Rollout, desiredWeight int32) (bool, error) { + httpProxy, err := r.getHTTPProxy(ctx, rollout.Namespace, httpProxyName) + if err != nil { + return false, err + } + + validCondition := httpProxy.Status.GetConditionFor(contourv1.ValidConditionType) + if validCondition == nil { + slog.Debug("unable to find valid status condition", slog.String("name", httpProxyName)) + return false, nil + } + if validCondition.Status != metav1.ConditionTrue { + slog.Debug(fmt.Sprintf("condition status is not %s", metav1.ConditionTrue), slog.String("name", httpProxyName)) + return false, nil + } + if validCondition.ObservedGeneration != httpProxy.Generation { + slog.Debug("condition is out of date with respect to the current state of the instance", slog.String("name", httpProxyName)) + return false, nil + } + + canarySvc, stableSvc, err := getCanaryAndStableServices(httpProxy, rollout) + if err != nil { + return false, err + } + + canarySvcDesiredWeight := int64(desiredWeight) + stableSvcDesiredWeight := 100 - canarySvcDesiredWeight + if canarySvc.Weight != canarySvcDesiredWeight || stableSvc.Weight != stableSvcDesiredWeight { + slog.Debug(fmt.Sprintf("expected weights are canary=%d and stable=%d, but got canary=%d and stable=%d", canarySvcDesiredWeight, stableSvcDesiredWeight, canarySvc.Weight, stableSvc.Weight), slog.String("name", httpProxyName)) + return false, nil + } + + return true, nil +} + +func getCanaryAndStableServices(httpProxy *contourv1.HTTPProxy, rollout *v1alpha1.Rollout) (*contourv1.Service, *contourv1.Service, error) { + canarySvcName := rollout.Spec.Strategy.Canary.CanaryService + stableSvcName := rollout.Spec.Strategy.Canary.StableService + + slog.Debug("the services name", slog.String("stable", stableSvcName), slog.String("canary", canarySvcName)) - slog.Info("successfully updated HTTPProxy", slog.String("httpproxy", proxy)) + // TODO: filter by condition(s) + svcMap := getServiceMap(httpProxy) + + canarySvc, err := getService(canarySvcName, svcMap) + if err != nil { + return nil, nil, err + } + + stableSvc, err := getService(stableSvcName, svcMap) + if err != nil { + return nil, nil, err + } + + return canarySvc, stableSvc, nil +} + +func getContourTrafficRouting(rollout *v1alpha1.Rollout) (*ContourTrafficRouting, error) { + var ctr ContourTrafficRouting + if err := json.Unmarshal(rollout.Spec.Strategy.Canary.TrafficRouting.Plugins["argoproj-labs/contour"], &ctr); err != nil { + return nil, err } - return + return &ctr, nil } func getService(name string, svcMap map[string]*contourv1.Service) (*contourv1.Service, error) { svc, ok := svcMap[name] if !ok { - return nil, fmt.Errorf("the service: %s is not found in HTTPProxy", name) + return nil, fmt.Errorf("the service: %s is not found in httpproxy", name) } return svc, nil } @@ -135,22 +264,9 @@ func getServiceMap(httpProxy *contourv1.HTTPProxy) map[string]*contourv1.Service return svcMap } -func (r *RpcPlugin) SetHeaderRoute(rollout *v1alpha1.Rollout, headerRouting *v1alpha1.SetHeaderRoute) pluginTypes.RpcError { - return pluginTypes.RpcError{} -} - -func (r *RpcPlugin) SetMirrorRoute(rollout *v1alpha1.Rollout, setMirrorRoute *v1alpha1.SetMirrorRoute) pluginTypes.RpcError { - return pluginTypes.RpcError{} -} - -func (r *RpcPlugin) VerifyWeight(rollout *v1alpha1.Rollout, desiredWeight int32, additionalDestinations []v1alpha1.WeightDestination) (pluginTypes.RpcVerified, pluginTypes.RpcError) { - return pluginTypes.Verified, pluginTypes.RpcError{} -} - -func (r *RpcPlugin) RemoveManagedRoutes(rollout *v1alpha1.Rollout) pluginTypes.RpcError { - return pluginTypes.RpcError{} -} - -func (r *RpcPlugin) Type() string { - return Type +func validateRolloutParameters(rollout *v1alpha1.Rollout) error { + if rollout == nil || rollout.Spec.Strategy.Canary == nil || rollout.Spec.Strategy.Canary.StableService == "" || rollout.Spec.Strategy.Canary.CanaryService == "" { + return fmt.Errorf("illegal parameter(s)") + } + return nil } diff --git a/pkg/plugin/plugin_test.go b/pkg/plugin/plugin_test.go index b9ec017..27d60f9 100644 --- a/pkg/plugin/plugin_test.go +++ b/pkg/plugin/plugin_test.go @@ -8,16 +8,17 @@ import ( "testing" "time" - "github.com/argoproj-labs/rollouts-plugin-trafficrouter-contour/pkg/mocks" - "github.com/argoproj-labs/rollouts-plugin-trafficrouter-contour/pkg/utils" - "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" rolloutsPlugin "github.com/argoproj/argo-rollouts/rollout/trafficrouting/plugin/rpc" + "github.com/argoproj/argo-rollouts/utils/plugin/types" goPlugin "github.com/hashicorp/go-plugin" contourv1 "github.com/projectcontour/contour/apis/projectcontour/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime" fakeDynClient "k8s.io/client-go/dynamic/fake" + + "github.com/argoproj-labs/rollouts-plugin-trafficrouter-contour/pkg/mocks" + "github.com/argoproj-labs/rollouts-plugin-trafficrouter-contour/pkg/utils" ) var testHandshake = goPlugin.HandshakeConfig{ @@ -38,7 +39,7 @@ func TestRunSuccessfully(t *testing.T) { } _ = b.AddToScheme(s) - dynClient := fakeDynClient.NewSimpleDynamicClient(s, &mocks.HTTPProxyObj) + dynClient := fakeDynClient.NewSimpleDynamicClient(s, mocks.MakeObjects()...) rpcPluginImp := &RpcPlugin{ IsTest: true, dynamicClient: dynClient, @@ -102,21 +103,15 @@ func TestRunSuccessfully(t *testing.T) { } pluginInstance := raw.(*rolloutsPlugin.TrafficRouterPluginRPC) - err = pluginInstance.InitPlugin() - if err.Error() != "" { + if err := pluginInstance.InitPlugin(); err.HasError() { t.Fail() } - t.Run("SetWeight", func(t *testing.T) { - var desiredWeight int32 = 30 + t.Run("SetWeight", func(t *testing.T) { rollout := newRollout(mocks.StableServiceName, mocks.CanaryServiceName, mocks.HTTPProxyName) + desiredWeight := int32(30) - re := pluginInstance.SetWeight( - rollout, - desiredWeight, - []v1alpha1.WeightDestination{}) - - if re.HasError() { + if err := pluginInstance.SetWeight(rollout, desiredWeight, []v1alpha1.WeightDestination{}); err.HasError() { t.Fail() } @@ -130,6 +125,26 @@ func TestRunSuccessfully(t *testing.T) { } }) + t.Run("VerifyWeight", func(t *testing.T) { + verifyWeight := func(httpProxyName string, desiredWeight int32, expected types.RpcVerified) { + rollout := newRollout(mocks.StableServiceName, mocks.CanaryServiceName, httpProxyName) + + actual, err := pluginInstance.VerifyWeight(rollout, desiredWeight, []v1alpha1.WeightDestination{}) + if err.HasError() { + t.Fail() + } + if actual != expected { + t.Fail() + } + } + + verifyWeight(mocks.ValidHTTPProxyName, mocks.HTTPProxyDesiredWeight, types.Verified) + verifyWeight(mocks.ValidHTTPProxyName, mocks.HTTPProxyDesiredWeight+10, types.NotVerified) + verifyWeight(mocks.InvalidHTTPProxyName, mocks.HTTPProxyDesiredWeight, types.NotVerified) + verifyWeight(mocks.OutdatedHTTPProxyName, mocks.HTTPProxyDesiredWeight, types.NotVerified) + verifyWeight(mocks.FalseConditionHTTPProxyName, mocks.HTTPProxyDesiredWeight, types.NotVerified) + }) + // Canceling should cause an exit cancel() <-closeCh diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 01d028e..c79694d 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -23,19 +23,6 @@ func NewKubeConfig() (*rest.Config, error) { return config, nil } -func Must(err error) { - if err != nil { - panic(err) - } -} - -func Must1[T any](t T, err error) T { - if err != nil { - panic(err) - } - return t -} - func InitLogger(lvl slog.Level) { lvlVar := &slog.LevelVar{} lvlVar.Set(lvl)