Skip to content

Commit

Permalink
k8s: Fix topic name overwrite
Browse files Browse the repository at this point in the history
The GetName function is defined in ObjectMetadata. When this function is
overwritten then wrong name is used and client Get is not working correctly.
  • Loading branch information
RafalKorepta committed Oct 3, 2023
1 parent 1d70212 commit e90ac4c
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func init() {
SchemeBuilder.Register(&Topic{}, &TopicList{})
}

func (t *Topic) GetName() string {
func (t *Topic) GetTopicName() string {
topicName := t.Name
if t.Spec.OverwriteTopicName != nil && *t.Spec.OverwriteTopicName != "" {
topicName = *t.Spec.OverwriteTopicName
Expand Down
62 changes: 31 additions & 31 deletions src/go/k8s/controllers/cluster.redpanda.com/topic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (r *TopicReconciler) reconcile(ctx context.Context, topic *v1alpha1.Topic,

// Examine if the object is under deletion
if !topic.ObjectMeta.DeletionTimestamp.IsZero() {
l.V(DebugLevel).Info("delete topic", "topic-name", topic.GetName())
l.V(DebugLevel).Info("delete topic", "topic-name", topic.GetTopicName())
err = r.deleteTopic(ctx, topic, kafkaClient)
if err != nil {
return v1alpha1.TopicFailed(topic), ctrl.Result{}, fmt.Errorf("unable to delete topic: %w", err)
Expand All @@ -180,7 +180,7 @@ func (r *TopicReconciler) reconcile(ctx context.Context, topic *v1alpha1.Topic,
return v1alpha1.TopicFailed(topic), ctrl.Result{}, err
}
l.V(DebugLevel).Info("topic created",
"topic-name", topic.GetName(),
"topic-name", topic.GetTopicName(),
"topic-configuration", topic.Spec.AdditionalConfig,
"topic-partition", partition,
"topic-replication-factor", replicationFactor)
Expand Down Expand Up @@ -279,24 +279,24 @@ func convertUnknownTags(tags kmsg.Tags) map[string]string {
func (r *TopicReconciler) reconcilePartition(ctx context.Context, topic *v1alpha1.Topic, cl *kgo.Client, partition int) (int16, error) {
reqMetadata := kmsg.NewPtrMetadataRequest()
reqTopic := kmsg.NewMetadataRequestTopic()
reqTopic.Topic = kmsg.StringPtr(topic.GetName())
reqTopic.Topic = kmsg.StringPtr(topic.GetTopicName())
reqMetadata.Topics = append(reqMetadata.Topics, reqTopic)

respMetadata, err := reqMetadata.RequestWith(ctx, cl)
if err != nil {
return 0, r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "failed topic (%s) metadata retrieval library error", topic.GetName())
return 0, r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "failed topic (%s) metadata retrieval library error", topic.GetTopicName())
}

if len(respMetadata.Topics) == 0 {
return 0, r.recordErrorEvent(ErrEmptyMetadataTopic, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "metadata topic (%s) request return empty response", topic.GetName())
return 0, r.recordErrorEvent(ErrEmptyMetadataTopic, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "metadata topic (%s) request return empty response", topic.GetTopicName())
}

if err = kerr.ErrorForCode(respMetadata.Topics[0].ErrorCode); err != nil {
return 0, r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "failed topic (%s) metadata retrieval library error", topic.GetName())
return 0, r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "failed topic (%s) metadata retrieval library error", topic.GetTopicName())
}

if len(respMetadata.Topics[0].Partitions) > partition {
return 0, r.recordErrorEvent(ErrScaleDownPartitionCount, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "unable to update topic (%s)", topic.GetName())
return 0, r.recordErrorEvent(ErrScaleDownPartitionCount, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "unable to update topic (%s)", topic.GetTopicName())
}

if len(respMetadata.Topics[0].Partitions) == partition {
Expand All @@ -305,20 +305,20 @@ func (r *TopicReconciler) reconcilePartition(ctx context.Context, topic *v1alpha

reqPartition := kmsg.NewCreatePartitionsRequest()
rt := kmsg.NewCreatePartitionsRequestTopic()
rt.Topic = topic.GetName()
rt.Topic = topic.GetTopicName()
rt.Count = int32(partition)
reqPartition.Topics = append(reqPartition.Topics, rt)

respPartition, err := reqPartition.RequestWith(ctx, cl)
if err != nil {
return 0, r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationAlteringFailure, "failed change topic (%s) partition count (%d) library error", topic.GetName(), partition)
return 0, r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationAlteringFailure, "failed change topic (%s) partition count (%d) library error", topic.GetTopicName(), partition)
}
if err = kerr.ErrorForCode(respPartition.Topics[0].ErrorCode); err != nil {
errMsg := NoneConstantString
if respPartition.Topics[0].ErrorMessage != nil {
errMsg = *respPartition.Topics[0].ErrorMessage
}
return 0, r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationAlteringFailure, "failed change topic (%s) partition count (%d) library error (%s)", topic.GetName(), partition, errMsg)
return 0, r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationAlteringFailure, "failed change topic (%s) partition count (%d) library error (%s)", topic.GetTopicName(), partition, errMsg)
}

return int16(len(respMetadata.Topics[0].Partitions[0].Replicas)), nil
Expand Down Expand Up @@ -364,22 +364,22 @@ func (r *TopicReconciler) alterTopicConfiguration(ctx context.Context, topic *v1

reqTopic := kmsg.NewIncrementalAlterConfigsRequestResource()
reqTopic.ResourceType = kmsg.ConfigResourceTypeTopic
reqTopic.ResourceName = topic.GetName()
reqTopic.ResourceName = topic.GetTopicName()
reqTopic.Configs = configs
reqAltConfig.Resources = append(reqAltConfig.Resources, reqTopic)

l.V(TraceLevel).Info("alter topic configuration", "topic-name", topic.GetName(), "configs", configs)
l.V(TraceLevel).Info("alter topic configuration", "topic-name", topic.GetTopicName(), "configs", configs)
respAltConfig, err := reqAltConfig.RequestWith(ctx, kafkaClient)
if err != nil {
return r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationAlteringFailure, "alter topic configuration (%s) library error", topic.GetName())
return r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationAlteringFailure, "alter topic configuration (%s) library error", topic.GetTopicName())
}

if err = kerr.ErrorForCode(respAltConfig.Resources[0].ErrorCode); err != nil {
errMsg := NoneConstantString
if respAltConfig.Resources[0].ErrorMessage != nil {
errMsg = *respAltConfig.Resources[0].ErrorMessage
}
return r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationAlteringFailure, "alter topic configuration (%s) incremental alter config (%s)", topic.GetName(), errMsg)
return r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationAlteringFailure, "alter topic configuration (%s) incremental alter config (%s)", topic.GetTopicName(), errMsg)
}
return nil
}
Expand All @@ -388,23 +388,23 @@ func (r *TopicReconciler) describeTopic(ctx context.Context, topic *v1alpha1.Top
req := kmsg.NewPtrDescribeConfigsRequest()
reqResource := kmsg.NewDescribeConfigsRequestResource()
reqResource.ResourceType = kmsg.ConfigResourceTypeTopic
reqResource.ResourceName = topic.GetName()
reqResource.ResourceName = topic.GetTopicName()
req.Resources = append(req.Resources, reqResource)
resp, err := req.RequestWith(ctx, kafkaClient)
if err != nil {
return nil, r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "describing topic configuration (%s) library error", topic.GetName())
return nil, r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "describing topic configuration (%s) library error", topic.GetTopicName())
}

if len(resp.Resources) == 0 {
return nil, r.recordErrorEvent(ErrEmptyTopicConfigDescription, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "describing topic configuration (%s) DescribeConfigsResponse error", topic.GetName())
return nil, r.recordErrorEvent(ErrEmptyTopicConfigDescription, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "describing topic configuration (%s) DescribeConfigsResponse error", topic.GetTopicName())
}

if err = kerr.ErrorForCode(resp.Resources[0].ErrorCode); err != nil {
errMsg := NoneConstantString
if resp.Resources[0].ErrorMessage != nil {
errMsg = *resp.Resources[0].ErrorMessage
}
return nil, r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "describing topic configuration (%s) DescribeConfigsResponse error (%s)", topic.GetName(), errMsg)
return nil, r.recordErrorEvent(err, topic, v1alpha1.EventTopicConfigurationDescribeFailure, "describing topic configuration (%s) DescribeConfigsResponse error (%s)", topic.GetTopicName(), errMsg)
}

return resp, nil
Expand All @@ -413,7 +413,7 @@ func (r *TopicReconciler) describeTopic(ctx context.Context, topic *v1alpha1.Top
func (r *TopicReconciler) createTopic(ctx context.Context, topic *v1alpha1.Topic, kafkaClient *kgo.Client, partition int32, replicationFactor int16) error {
req := kmsg.NewCreateTopicsRequest()
rt := kmsg.NewCreateTopicsRequestTopic()
rt.Topic = topic.GetName()
rt.Topic = topic.GetTopicName()
rt.NumPartitions = partition
rt.ReplicationFactor = replicationFactor
for k, v := range topic.Spec.AdditionalConfig {
Expand All @@ -425,11 +425,11 @@ func (r *TopicReconciler) createTopic(ctx context.Context, topic *v1alpha1.Topic
req.Topics = append(req.Topics, rt)
resp, err := req.RequestWith(ctx, kafkaClient)
if err != nil {
return r.recordErrorEvent(err, topic, v1alpha1.EventTopicCreationFailure, "creating topic (%s) library error", topic.GetName())
return r.recordErrorEvent(err, topic, v1alpha1.EventTopicCreationFailure, "creating topic (%s) library error", topic.GetTopicName())
}

if len(resp.Topics) == 0 {
return r.recordErrorEvent(ErrEmptyTopicConfigDescription, topic, v1alpha1.EventTopicCreationFailure, "creating topic (%s) return empty response", topic.GetName())
return r.recordErrorEvent(ErrEmptyTopicConfigDescription, topic, v1alpha1.EventTopicCreationFailure, "creating topic (%s) return empty response", topic.GetTopicName())
}

err = kerr.ErrorForCode(resp.Topics[0].ErrorCode)
Expand All @@ -438,11 +438,11 @@ func (r *TopicReconciler) createTopic(ctx context.Context, topic *v1alpha1.Topic
if resp.Topics[0].ErrorMessage != nil {
errMsg = *resp.Topics[0].ErrorMessage
}
return r.recordErrorEvent(err, topic, v1alpha1.EventTopicCreationFailure, "creating topic (%s) CreateTopicsResponse error (%s)", topic.GetName(), errMsg)
return r.recordErrorEvent(err, topic, v1alpha1.EventTopicCreationFailure, "creating topic (%s) CreateTopicsResponse error (%s)", topic.GetTopicName(), errMsg)
}

if resp.Topics[0].Topic != topic.GetName() {
return r.recordErrorEvent(ErrWrongCreateTopicResponse, topic, v1alpha1.EventTopicCreationFailure, "creating topic (%s) response does not match requested topic", topic.GetName())
if resp.Topics[0].Topic != topic.GetTopicName() {
return r.recordErrorEvent(ErrWrongCreateTopicResponse, topic, v1alpha1.EventTopicCreationFailure, "creating topic (%s) response does not match requested topic", topic.GetTopicName())
}

if errors.Is(err, kerr.TopicAlreadyExists) {
Expand All @@ -453,29 +453,29 @@ func (r *TopicReconciler) createTopic(ctx context.Context, topic *v1alpha1.Topic

func (r *TopicReconciler) deleteTopic(ctx context.Context, topic *v1alpha1.Topic, kafkaClient *kgo.Client) error {
req := kmsg.NewDeleteTopicsRequest()
req.TopicNames = []string{topic.GetName()}
req.TopicNames = []string{topic.GetTopicName()}
rt := kmsg.NewDeleteTopicsRequestTopic()
rt.Topic = kmsg.StringPtr(topic.GetName())
rt.Topic = kmsg.StringPtr(topic.GetTopicName())
req.Topics = append(req.Topics, rt)
resp, err := req.RequestWith(ctx, kafkaClient)
if err != nil {
return r.recordErrorEvent(err, topic, v1alpha1.EventTopicDeletionFailure, "deleting topic (%s) library error", topic.GetName())
return r.recordErrorEvent(err, topic, v1alpha1.EventTopicDeletionFailure, "deleting topic (%s) library error", topic.GetTopicName())
}

if len(resp.Topics) == 0 {
return r.recordErrorEvent(ErrEmptyTopicConfigDescription, topic, v1alpha1.EventTopicDeletionFailure, "deleting topic (%s) return empty response", topic.GetName())
return r.recordErrorEvent(ErrEmptyTopicConfigDescription, topic, v1alpha1.EventTopicDeletionFailure, "deleting topic (%s) return empty response", topic.GetTopicName())
}

if err = kerr.ErrorForCode(resp.Topics[0].ErrorCode); err != nil && !errors.Is(err, kerr.UnknownTopicOrPartition) {
errMsg := NoneConstantString
if resp.Topics[0].ErrorMessage != nil {
errMsg = *resp.Topics[0].ErrorMessage
}
return r.recordErrorEvent(err, topic, v1alpha1.EventTopicDeletionFailure, "deleting topic (%s) library error (%s)", topic.GetName(), errMsg)
return r.recordErrorEvent(err, topic, v1alpha1.EventTopicDeletionFailure, "deleting topic (%s) library error (%s)", topic.GetTopicName(), errMsg)
}

if resp.Topics[0].Topic == nil || *resp.Topics[0].Topic != topic.GetName() {
return r.recordErrorEvent(ErrWrongCreateTopicResponse, topic, v1alpha1.EventTopicDeletionFailure, "deleting topic (%s) response does not match requested topic", topic.GetName())
if resp.Topics[0].Topic == nil || *resp.Topics[0].Topic != topic.GetTopicName() {
return r.recordErrorEvent(ErrWrongCreateTopicResponse, topic, v1alpha1.EventTopicDeletionFailure, "deleting topic (%s) response does not match requested topic", topic.GetTopicName())
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,71 @@ func TestReconcile(t *testing.T) { // nolint:funlen // These tests have clear su
assert.Equal(t, v1alpha1.SucceededReason, cond.Reason)
assert.NotEqual(t, 0, len(createTopic.Status.TopicConfiguration))
})
t.Run("overwrite_topic", func(t *testing.T) {
topicName := "overwrite-topic"
differentName := "different_name_with_underscore"

createTopic := v1alpha1.Topic{
ObjectMeta: metav1.ObjectMeta{
Name: topicName,
Namespace: testNamespace,
},
Spec: v1alpha1.TopicSpec{
OverwriteTopicName: &differentName,
Partitions: pointer.Int(3),
ReplicationFactor: pointer.Int(1),
AdditionalConfig: nil,
KafkaAPISpec: &v1alpha1.KafkaAPISpec{
Brokers: []string{seedBroker},
},
SynchronizationInterval: &metav1.Duration{Duration: time.Second * 5},
},
}

err := c.Create(ctx, &createTopic)
require.NoError(t, err)

req := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: topicName,
Namespace: testNamespace,
},
}
result, err := tr.Reconcile(ctx, req)
assert.NoError(t, err)

assert.False(t, result.Requeue)
assert.Equal(t, time.Second*5, result.RequeueAfter)

var mrt kmsg.MetadataResponseTopic
{
metaReq := kmsg.NewPtrMetadataRequest()
reqTopic := kmsg.NewMetadataRequestTopic()
reqTopic.Topic = kmsg.StringPtr(differentName)
metaReq.Topics = append(metaReq.Topics, reqTopic)
resp, errMetadata := metaReq.RequestWith(context.Background(), kafkaCl)
require.NoError(t, errMetadata)

mrt = resp.Topics[0]
}

assert.Equal(t, *createTopic.Spec.ReplicationFactor, len(mrt.Partitions[0].Replicas))
assert.Equal(t, *createTopic.Spec.Partitions, len(mrt.Partitions))

err = c.Get(ctx, types.NamespacedName{
Name: topicName,
Namespace: testNamespace,
}, &createTopic)
require.NoError(t, err)

assert.Equal(t, "operator.redpanda.com/finalizer", createTopic.ObjectMeta.Finalizers[0])
assert.NotEmpty(t, createTopic.Status.Conditions)
cond := createTopic.Status.Conditions[0]
assert.Equal(t, v1alpha1.ReadyCondition, cond.Type)
assert.Equal(t, metav1.ConditionTrue, cond.Status)
assert.Equal(t, v1alpha1.SucceededReason, cond.Reason)
assert.NotEqual(t, 0, len(createTopic.Status.TopicConfiguration))
})
t.Run("create_topic_that_already_exist", func(t *testing.T) {
topicName := "create-already-existent-test-topic"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
apiVersion: cluster.redpanda.com/v1alpha1
kind: Topic
metadata:
name: test-topic
status:
conditions:
- message: Topic reconciliation succeeded
observedGeneration: 5
reason: Succeeded
status: "True"
type: Ready
observedGeneration: 5
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
apiVersion: cluster.redpanda.com/v1alpha1
kind: Topic
metadata:
name: test-topic
spec:
overwriteTopicName: topic_with_underscore
interval: 4s
partitions: 1
replicationFactor: 1
additionalConfig:
segment.bytes: "16777666"
kafkaApiSpec:
brokers:
- redpanda-0.redpanda.redpanda.svc.cluster.local.:9093
tls:
caCertSecretRef:
name: redpanda-default-cert
key: ca.crt

0 comments on commit e90ac4c

Please sign in to comment.