Skip to content

Commit

Permalink
Fix Kafka topic creation bug due to recent upgrade of AdmissionReview…
Browse files Browse the repository at this point in the history
… version (#796)

* Update admission pkg name; add API version and Kind to HTTP request

* Fix lint error

* Add deleted imports by

* Specify Kind and APIVersion when initializing admissionReview

* Make lint-fix
  • Loading branch information
panyuenlau committed Apr 19, 2022
1 parent 6cdcb6a commit cf1cdc4
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 30 deletions.
22 changes: 14 additions & 8 deletions pkg/webhook/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"net/http"
"reflect"

admissionv1beta1 "k8s.io/api/admission/v1beta1"
admissionv1 "k8s.io/api/admission/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/banzaicloud/koperator/pkg/util"
Expand All @@ -34,7 +34,7 @@ var (
kafkaTopic = reflect.TypeOf(v1alpha1.KafkaTopic{}).Name()
)

func (s *webhookServer) validate(ar *admissionv1beta1.AdmissionReview) *admissionv1beta1.AdmissionResponse {
func (s *webhookServer) validate(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
req := ar.Request

l := log.WithValues("kind", req.Kind, "namespace", req.Namespace, "name", req.Name, "uid", req.UID,
Expand All @@ -50,7 +50,7 @@ func (s *webhookServer) validate(ar *admissionv1beta1.AdmissionReview) *admissio
}
if ok := util.ObjectManagedByClusterRegistry(topic.GetObjectMeta()); ok {
l.Info("Skip validation as the resource is managed by Cluster Registry")
return &admissionv1beta1.AdmissionResponse{
return &admissionv1.AdmissionResponse{
Allowed: true,
}
}
Expand Down Expand Up @@ -85,16 +85,22 @@ func (s *webhookServer) serve(w http.ResponseWriter, r *http.Request) {
return
}

var admissionResponse *admissionv1beta1.AdmissionResponse
ar := admissionv1beta1.AdmissionReview{}
var admissionResponse *admissionv1.AdmissionResponse
ar := admissionv1.AdmissionReview{}
if _, _, err := s.deserializer.Decode(body, nil, &ar); err != nil {
log.Error(err, "Can't decode body")
admissionResponse = notAllowed(err.Error(), metav1.StatusReasonBadRequest)
} else {
admissionResponse = s.validate(&ar)
}

admissionReview := admissionv1beta1.AdmissionReview{}
admissionReview := admissionv1.AdmissionReview{
// APIVersion and Kind must be set for admission/v1, or the request would fail
TypeMeta: metav1.TypeMeta{
APIVersion: admissionv1.SchemeGroupVersion.String(),
Kind: "AdmissionReview",
},
}
if admissionResponse != nil {
admissionReview.Response = admissionResponse
if ar.Request != nil {
Expand All @@ -113,8 +119,8 @@ func (s *webhookServer) serve(w http.ResponseWriter, r *http.Request) {
}
}

func notAllowed(msg string, reason metav1.StatusReason) *admissionv1beta1.AdmissionResponse {
return &admissionv1beta1.AdmissionResponse{
func notAllowed(msg string, reason metav1.StatusReason) *admissionv1.AdmissionResponse {
return &admissionv1.AdmissionResponse{
Result: &metav1.Status{
Message: msg,
Reason: reason,
Expand Down
12 changes: 6 additions & 6 deletions pkg/webhook/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"strings"
"testing"

admissionv1beta1 "k8s.io/api/admission/v1beta1"
admissionv1 "k8s.io/api/admission/v1"
authv1 "k8s.io/api/authentication/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -41,9 +41,9 @@ func newRawTopic() []byte {
return out
}

func newAdmissionReview() *admissionv1beta1.AdmissionReview {
return &admissionv1beta1.AdmissionReview{
Request: &admissionv1beta1.AdmissionRequest{
func newAdmissionReview() *admissionv1.AdmissionReview {
return &admissionv1.AdmissionReview{
Request: &admissionv1.AdmissionRequest{
Kind: metav1.GroupVersionKind{
Kind: "non-topic-kind",
},
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestServe(t *testing.T) {
if err != nil {
t.Error("Expected admission review response, got error")
}
admissionReview := admissionv1beta1.AdmissionReview{}
admissionReview := admissionv1.AdmissionReview{}
if err := json.Unmarshal(body, &admissionReview); err != nil {
t.Error("Expected no error got:", err)
}
Expand All @@ -196,7 +196,7 @@ func TestServe(t *testing.T) {
if err != nil {
t.Error("Expected admission review response, got error")
}
admissionReview := admissionv1beta1.AdmissionReview{}
admissionReview := admissionv1.AdmissionReview{}
if err := json.Unmarshal(body, &admissionReview); err != nil {
t.Error("Expected no error got:", err)
}
Expand Down
25 changes: 9 additions & 16 deletions pkg/webhook/topic_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/banzaicloud/koperator/pkg/util"

admissionv1beta1 "k8s.io/api/admission/v1beta1"
admissionv1 "k8s.io/api/admission/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -36,7 +36,7 @@ const (
invalidReplicationFactorErrMsg = "Replication factor is larger than the number of nodes in the kafka cluster"
)

func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1beta1.AdmissionResponse {
func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1.AdmissionResponse {
ctx := context.Background()
log.Info(fmt.Sprintf("Doing pre-admission validation of kafka topic %s", topic.Spec.Name))

Expand All @@ -54,25 +54,21 @@ func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic
if apierrors.IsNotFound(err) {
if k8sutil.IsMarkedForDeletion(topic.ObjectMeta) {
log.Info("Deleted as a result of a cluster deletion")
return &admissionv1beta1.AdmissionResponse{
return &admissionv1.AdmissionResponse{
Allowed: true,
}
}
log.Error(err, "Referenced kafka cluster does not exist")
return notAllowed(
fmt.Sprintf("KafkaCluster '%s' in the namespace '%s' does not exist", topic.Spec.ClusterRef.Name, topic.Spec.ClusterRef.Namespace),
metav1.StatusReasonNotFound,
)
return notAllowed(fmt.Sprintf("KafkaCluster '%s' in the namespace '%s' does not exist", topic.Spec.ClusterRef.Name, topic.Spec.ClusterRef.Namespace), metav1.StatusReasonNotFound)
}
log.Error(err, "API failure while running topic validation")
return notAllowed("API failure while validating topic, please try again", metav1.StatusReasonServiceUnavailable)
}

if k8sutil.IsMarkedForDeletion(cluster.ObjectMeta) {
// Let this through, it's a delete topic request from a parent cluster being
// deleted
log.Info("Cluster is going down for deletion, assuming a delete topic request")
return &admissionv1beta1.AdmissionResponse{
return &admissionv1.AdmissionResponse{
Allowed: true,
}
}
Expand All @@ -96,15 +92,15 @@ func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic
}

// everything looks a-okay
return &admissionv1beta1.AdmissionResponse{
return &admissionv1.AdmissionResponse{
Allowed: true,
}
}

// checkKafka creates a Kafka admin client and connects to the Kafka brokers to check
// whether the referred topic exists, and what are its properties
func (s *webhookServer) checkKafka(ctx context.Context, topic *banzaicloudv1alpha1.KafkaTopic,
cluster *banzaicloudv1beta1.KafkaCluster) *admissionv1beta1.AdmissionResponse {
cluster *banzaicloudv1beta1.KafkaCluster) *admissionv1.AdmissionResponse {
// retrieve an admin client for the cluster
broker, closeClient, err := s.newKafkaFromCluster(s.client, cluster)
if err != nil {
Expand All @@ -128,10 +124,7 @@ func (s *webhookServer) checkKafka(ctx context.Context, topic *banzaicloudv1alph
if apierrors.IsNotFound(err) {
// User is trying to overwrite an existing topic - bad user
log.Info("User attempted to create topic with name that already exists in the kafka cluster")
return notAllowed(
fmt.Sprintf("Topic '%s' already exists on kafka cluster '%s'", topic.Spec.Name, topic.Spec.ClusterRef.Name),
metav1.StatusReasonAlreadyExists,
)
return notAllowed(fmt.Sprintf("Topic '%s' already exists on kafka cluster '%s'", topic.Spec.Name, topic.Spec.ClusterRef.Name), metav1.StatusReasonAlreadyExists)
}
log.Error(err, "API failure while running topic validation")
return notAllowed("API failure while validating topic, please try again", metav1.StatusReasonServiceUnavailable)
Expand Down Expand Up @@ -160,7 +153,7 @@ func (s *webhookServer) checkKafka(ctx context.Context, topic *banzaicloudv1alph
// checkExistingKafkaTopicCRs checks whether there's any other duplicate KafkaTopic CR exists
// that refers to the same KafkaCluster's same topic
func (s *webhookServer) checkExistingKafkaTopicCRs(ctx context.Context,
clusterNamespace string, topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1beta1.AdmissionResponse {
clusterNamespace string, topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1.AdmissionResponse {
// check KafkaTopic in the referred KafkaCluster's namespace
kafkaTopicList := banzaicloudv1alpha1.KafkaTopicList{}
err := s.client.List(ctx, &kafkaTopicList, client.MatchingFields{"spec.name": topic.Spec.Name})
Expand Down

0 comments on commit cf1cdc4

Please sign in to comment.