diff --git a/pkg/webhook/request.go b/pkg/webhook/request.go index 714e52ff2..d36177335 100644 --- a/pkg/webhook/request.go +++ b/pkg/webhook/request.go @@ -22,7 +22,7 @@ import ( "net/http" "reflect" - admissionv1beta1 "k8s.io/api/admission/v1beta1" + admissionv1 "k8s.io/api/admission/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/banzaicloud/koperator/pkg/util" @@ -34,7 +34,7 @@ var ( kafkaTopic = reflect.TypeOf(v1alpha1.KafkaTopic{}).Name() ) -func (s *webhookServer) validate(ar *admissionv1beta1.AdmissionReview) *admissionv1beta1.AdmissionResponse { +func (s *webhookServer) validate(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionResponse { req := ar.Request l := log.WithValues("kind", req.Kind, "namespace", req.Namespace, "name", req.Name, "uid", req.UID, @@ -50,7 +50,7 @@ func (s *webhookServer) validate(ar *admissionv1beta1.AdmissionReview) *admissio } if ok := util.ObjectManagedByClusterRegistry(topic.GetObjectMeta()); ok { l.Info("Skip validation as the resource is managed by Cluster Registry") - return &admissionv1beta1.AdmissionResponse{ + return &admissionv1.AdmissionResponse{ Allowed: true, } } @@ -85,8 +85,8 @@ func (s *webhookServer) serve(w http.ResponseWriter, r *http.Request) { return } - var admissionResponse *admissionv1beta1.AdmissionResponse - ar := admissionv1beta1.AdmissionReview{} + var admissionResponse *admissionv1.AdmissionResponse + ar := admissionv1.AdmissionReview{} if _, _, err := s.deserializer.Decode(body, nil, &ar); err != nil { log.Error(err, "Can't decode body") admissionResponse = notAllowed(err.Error(), metav1.StatusReasonBadRequest) @@ -94,7 +94,13 @@ func (s *webhookServer) serve(w http.ResponseWriter, r *http.Request) { admissionResponse = s.validate(&ar) } - admissionReview := admissionv1beta1.AdmissionReview{} + admissionReview := admissionv1.AdmissionReview{ + // APIVersion and Kind must be set for admission/v1, or the request would fail + TypeMeta: metav1.TypeMeta{ + APIVersion: admissionv1.SchemeGroupVersion.String(), + Kind: "AdmissionReview", + }, + } if admissionResponse != nil { admissionReview.Response = admissionResponse if ar.Request != nil { @@ -113,8 +119,8 @@ func (s *webhookServer) serve(w http.ResponseWriter, r *http.Request) { } } -func notAllowed(msg string, reason metav1.StatusReason) *admissionv1beta1.AdmissionResponse { - return &admissionv1beta1.AdmissionResponse{ +func notAllowed(msg string, reason metav1.StatusReason) *admissionv1.AdmissionResponse { + return &admissionv1.AdmissionResponse{ Result: &metav1.Status{ Message: msg, Reason: reason, diff --git a/pkg/webhook/request_test.go b/pkg/webhook/request_test.go index bcb8cfc78..437e43b51 100644 --- a/pkg/webhook/request_test.go +++ b/pkg/webhook/request_test.go @@ -24,7 +24,7 @@ import ( "strings" "testing" - admissionv1beta1 "k8s.io/api/admission/v1beta1" + admissionv1 "k8s.io/api/admission/v1" authv1 "k8s.io/api/authentication/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -41,9 +41,9 @@ func newRawTopic() []byte { return out } -func newAdmissionReview() *admissionv1beta1.AdmissionReview { - return &admissionv1beta1.AdmissionReview{ - Request: &admissionv1beta1.AdmissionRequest{ +func newAdmissionReview() *admissionv1.AdmissionReview { + return &admissionv1.AdmissionReview{ + Request: &admissionv1.AdmissionRequest{ Kind: metav1.GroupVersionKind{ Kind: "non-topic-kind", }, @@ -170,7 +170,7 @@ func TestServe(t *testing.T) { if err != nil { t.Error("Expected admission review response, got error") } - admissionReview := admissionv1beta1.AdmissionReview{} + admissionReview := admissionv1.AdmissionReview{} if err := json.Unmarshal(body, &admissionReview); err != nil { t.Error("Expected no error got:", err) } @@ -196,7 +196,7 @@ func TestServe(t *testing.T) { if err != nil { t.Error("Expected admission review response, got error") } - admissionReview := admissionv1beta1.AdmissionReview{} + admissionReview := admissionv1.AdmissionReview{} if err := json.Unmarshal(body, &admissionReview); err != nil { t.Error("Expected no error got:", err) } diff --git a/pkg/webhook/topic_validator.go b/pkg/webhook/topic_validator.go index ade73e634..54b22b3e8 100644 --- a/pkg/webhook/topic_validator.go +++ b/pkg/webhook/topic_validator.go @@ -20,7 +20,7 @@ import ( "github.com/banzaicloud/koperator/pkg/util" - admissionv1beta1 "k8s.io/api/admission/v1beta1" + admissionv1 "k8s.io/api/admission/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -36,7 +36,7 @@ const ( invalidReplicationFactorErrMsg = "Replication factor is larger than the number of nodes in the kafka cluster" ) -func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1beta1.AdmissionResponse { +func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1.AdmissionResponse { ctx := context.Background() log.Info(fmt.Sprintf("Doing pre-admission validation of kafka topic %s", topic.Spec.Name)) @@ -54,25 +54,21 @@ func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic if apierrors.IsNotFound(err) { if k8sutil.IsMarkedForDeletion(topic.ObjectMeta) { log.Info("Deleted as a result of a cluster deletion") - return &admissionv1beta1.AdmissionResponse{ + return &admissionv1.AdmissionResponse{ Allowed: true, } } log.Error(err, "Referenced kafka cluster does not exist") - return notAllowed( - fmt.Sprintf("KafkaCluster '%s' in the namespace '%s' does not exist", topic.Spec.ClusterRef.Name, topic.Spec.ClusterRef.Namespace), - metav1.StatusReasonNotFound, - ) + return notAllowed(fmt.Sprintf("KafkaCluster '%s' in the namespace '%s' does not exist", topic.Spec.ClusterRef.Name, topic.Spec.ClusterRef.Namespace), metav1.StatusReasonNotFound) } log.Error(err, "API failure while running topic validation") return notAllowed("API failure while validating topic, please try again", metav1.StatusReasonServiceUnavailable) } - if k8sutil.IsMarkedForDeletion(cluster.ObjectMeta) { // Let this through, it's a delete topic request from a parent cluster being // deleted log.Info("Cluster is going down for deletion, assuming a delete topic request") - return &admissionv1beta1.AdmissionResponse{ + return &admissionv1.AdmissionResponse{ Allowed: true, } } @@ -96,7 +92,7 @@ func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic } // everything looks a-okay - return &admissionv1beta1.AdmissionResponse{ + return &admissionv1.AdmissionResponse{ Allowed: true, } } @@ -104,7 +100,7 @@ func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic // checkKafka creates a Kafka admin client and connects to the Kafka brokers to check // whether the referred topic exists, and what are its properties func (s *webhookServer) checkKafka(ctx context.Context, topic *banzaicloudv1alpha1.KafkaTopic, - cluster *banzaicloudv1beta1.KafkaCluster) *admissionv1beta1.AdmissionResponse { + cluster *banzaicloudv1beta1.KafkaCluster) *admissionv1.AdmissionResponse { // retrieve an admin client for the cluster broker, closeClient, err := s.newKafkaFromCluster(s.client, cluster) if err != nil { @@ -128,10 +124,7 @@ func (s *webhookServer) checkKafka(ctx context.Context, topic *banzaicloudv1alph if apierrors.IsNotFound(err) { // User is trying to overwrite an existing topic - bad user log.Info("User attempted to create topic with name that already exists in the kafka cluster") - return notAllowed( - fmt.Sprintf("Topic '%s' already exists on kafka cluster '%s'", topic.Spec.Name, topic.Spec.ClusterRef.Name), - metav1.StatusReasonAlreadyExists, - ) + return notAllowed(fmt.Sprintf("Topic '%s' already exists on kafka cluster '%s'", topic.Spec.Name, topic.Spec.ClusterRef.Name), metav1.StatusReasonAlreadyExists) } log.Error(err, "API failure while running topic validation") return notAllowed("API failure while validating topic, please try again", metav1.StatusReasonServiceUnavailable) @@ -160,7 +153,7 @@ func (s *webhookServer) checkKafka(ctx context.Context, topic *banzaicloudv1alph // checkExistingKafkaTopicCRs checks whether there's any other duplicate KafkaTopic CR exists // that refers to the same KafkaCluster's same topic func (s *webhookServer) checkExistingKafkaTopicCRs(ctx context.Context, - clusterNamespace string, topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1beta1.AdmissionResponse { + clusterNamespace string, topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1.AdmissionResponse { // check KafkaTopic in the referred KafkaCluster's namespace kafkaTopicList := banzaicloudv1alpha1.KafkaTopicList{} err := s.client.List(ctx, &kafkaTopicList, client.MatchingFields{"spec.name": topic.Spec.Name})