From 4f7e4b217932fa1f4c025d4df923930f82c6d400 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 24 Sep 2025 10:33:57 +0800 Subject: [PATCH 1/5] verify schema delete --- pkg/connection/reconcile_topic.go | 63 ++++++++- pkg/connection/reconcile_topic_test.go | 180 +++++++++++++++++++++++++ 2 files changed, 241 insertions(+), 2 deletions(-) create mode 100644 pkg/connection/reconcile_topic_test.go diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index f9f0a725..67911258 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -15,10 +15,13 @@ package connection import ( + "bytes" "context" + "encoding/json" "fmt" - "reflect" + "maps" "slices" + "strings" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/go-logr/logr" @@ -254,7 +257,7 @@ func applySchema(pulsarAdmin admin.PulsarAdmin, topic *resourcev1alpha1.PulsarTo } if topic.Spec.SchemaInfo != nil { // Only upload the schema when schema doesn't exist or the schema has been updated - if admin.IsNotFound(serr) || !reflect.DeepEqual(topic.Spec.SchemaInfo, schema) { + if admin.IsNotFound(serr) || !schemasEqual(topic.Spec.SchemaInfo, schema) { info := topic.Spec.SchemaInfo param := &admin.SchemaParams{ Type: info.Type, @@ -274,6 +277,62 @@ func applySchema(pulsarAdmin admin.PulsarAdmin, topic *resourcev1alpha1.PulsarTo return nil } +func schemasEqual(desired, current *resourcev1alpha1.SchemaInfo) bool { + if desired == nil && current == nil { + return true + } + if desired == nil || current == nil { + return false + } + if desired.Type != current.Type { + return false + } + if canonicalizeSchemaString(desired.Type, desired.Schema) != canonicalizeSchemaString(current.Type, current.Schema) { + return false + } + return maps.Equal(desired.Properties, current.Properties) +} + +func canonicalizeSchemaString(schemaType, schema string) string { + trimmed := strings.TrimSpace(schema) + if trimmed == "" { + return "" + } + if looksLikeJSON(trimmed) { + if normalized, err := normalizeJSON(trimmed); err == nil { + return normalized + } + } + return trimmed +} + +func looksLikeJSON(schema string) bool { + if schema == "" { + return false + } + first := schema[0] + return first == '{' || first == '[' +} + +func normalizeJSON(schema string) (string, error) { + decoder := json.NewDecoder(strings.NewReader(schema)) + decoder.UseNumber() + var data any + if err := decoder.Decode(&data); err != nil { + return "", err + } + if decoder.More() { + return "", fmt.Errorf("unexpected trailing data in schema JSON") + } + buf := bytes.NewBuffer(nil) + encoder := json.NewEncoder(buf) + encoder.SetEscapeHTML(false) + if err := encoder.Encode(data); err != nil { + return "", err + } + return strings.TrimSpace(buf.String()), nil +} + func createTopicParams(topic *resourcev1alpha1.PulsarTopic) *admin.TopicParams { return &admin.TopicParams{ Persistent: topic.Spec.Persistent, diff --git a/pkg/connection/reconcile_topic_test.go b/pkg/connection/reconcile_topic_test.go new file mode 100644 index 00000000..290410b7 --- /dev/null +++ b/pkg/connection/reconcile_topic_test.go @@ -0,0 +1,180 @@ +package connection + +import ( + "testing" + + "github.com/go-logr/logr" + resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" + "github.com/streamnative/pulsar-resources-operator/pkg/admin" +) + +type fakeSchemaAdmin struct { + admin.DummyPulsarAdmin + schema *resourcev1alpha1.SchemaInfo + schemaErr error + uploads []*admin.SchemaParams +} + +func (f *fakeSchemaAdmin) GetSchema(string) (*resourcev1alpha1.SchemaInfo, error) { + return f.schema, f.schemaErr +} + +func (f *fakeSchemaAdmin) UploadSchema(_ string, params *admin.SchemaParams) error { + f.uploads = append(f.uploads, params) + return nil +} + +func TestApplySchemaSkipsEquivalentSchema(t *testing.T) { + t.Parallel() + + fake := &fakeSchemaAdmin{ + schema: &resourcev1alpha1.SchemaInfo{ + Type: "AVRO", + Schema: `{ + "name": "Example", + "type": "record", + "fields": [ + { + "name": "foo", + "type": "string" + }, + { + "name": "bar", + "type": "int" + } + ] + }`, + Properties: map[string]string{}, + }, + } + + topic := &resourcev1alpha1.PulsarTopic{} + topic.Spec.Name = "persistent://public/default/example" + topic.Spec.SchemaInfo = &resourcev1alpha1.SchemaInfo{ + Type: "AVRO", + Schema: `{"name":"Example","type":"record","fields":[{"name":"foo","type":"string"},{"name":"bar","type":"int"}]}`, + Properties: nil, + } + + if err := applySchema(fake, topic, logr.Discard()); err != nil { + t.Fatalf("applySchema() error = %v", err) + } + + if len(fake.uploads) != 0 { + t.Fatalf("expected no schema upload when schemas are equivalent, got %d", len(fake.uploads)) + } +} + +func TestSchemasEqual(t *testing.T) { + t.Parallel() + + avroSpec := &resourcev1alpha1.SchemaInfo{ + Type: "AVRO", + Schema: `{"name":"Example","type":"record","fields":[{"name":"foo","type":"string"},{"name":"bar","type":"int"}]}`, + } + + avroActualFormatted := &resourcev1alpha1.SchemaInfo{ + Type: "AVRO", + Schema: `{ + "type": "record", + "name": "Example", + "fields": [ + { + "type": "string", + "name": "foo" + }, + { + "type": "int", + "name": "bar" + } + ] +}`, + } + + cases := []struct { + name string + desired *resourcev1alpha1.SchemaInfo + current *resourcev1alpha1.SchemaInfo + expected bool + }{ + { + name: "both nil", + desired: nil, + current: nil, + expected: true, + }, + { + name: "current missing", + desired: avroSpec, + current: nil, + expected: false, + }, + { + name: "type mismatch", + desired: &resourcev1alpha1.SchemaInfo{ + Type: "JSON", + Schema: `{"type":"object"}`, + }, + current: &resourcev1alpha1.SchemaInfo{ + Type: "AVRO", + Schema: `{"type":"record"}`, + }, + expected: false, + }, + { + name: "equivalent json formatting", + desired: avroSpec, + current: avroActualFormatted, + expected: true, + }, + { + name: "properties nil vs empty", + desired: &resourcev1alpha1.SchemaInfo{ + Type: "JSON", + Schema: `{"type":"object"}`, + }, + current: &resourcev1alpha1.SchemaInfo{ + Type: "JSON", + Schema: `{"type":"object"}`, + Properties: map[string]string{}, + }, + expected: true, + }, + { + name: "properties differ", + desired: &resourcev1alpha1.SchemaInfo{ + Type: "JSON", + Schema: `{"type":"object"}`, + Properties: map[string]string{"env": "prod"}, + }, + current: &resourcev1alpha1.SchemaInfo{ + Type: "JSON", + Schema: `{"type":"object"}`, + Properties: map[string]string{"env": "dev"}, + }, + expected: false, + }, + { + name: "different schema content", + desired: &resourcev1alpha1.SchemaInfo{ + Type: "AVRO", + Schema: `{"name":"Example","type":"record","fields":[{"name":"foo","type":"string"}]}`, + }, + current: &resourcev1alpha1.SchemaInfo{ + Type: "AVRO", + Schema: `{"name":"Example","type":"record","fields":[{"name":"foo","type":"int"}]}`, + }, + expected: false, + }, + } + + for i := range cases { + tc := cases[i] + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + if got := schemasEqual(tc.desired, tc.current); got != tc.expected { + t.Fatalf("schemasEqual() = %v, want %v", got, tc.expected) + } + }) + } +} From e279d7d8f6ce74691ed5d6f34d5eb73a2fb9b473 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 24 Sep 2025 16:49:36 +0800 Subject: [PATCH 2/5] Update pkg/connection/reconcile_topic_test.go Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- pkg/connection/reconcile_topic_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/connection/reconcile_topic_test.go b/pkg/connection/reconcile_topic_test.go index 290410b7..9bc0b770 100644 --- a/pkg/connection/reconcile_topic_test.go +++ b/pkg/connection/reconcile_topic_test.go @@ -1,3 +1,17 @@ +// Copyright 2025 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package connection import ( From 1df5d7cac34096c538cbf3d839f824496500c1b5 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 24 Sep 2025 18:08:36 +0800 Subject: [PATCH 3/5] fix --- pkg/connection/reconcile_topic.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index 67911258..1c4d1e81 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -287,13 +287,13 @@ func schemasEqual(desired, current *resourcev1alpha1.SchemaInfo) bool { if desired.Type != current.Type { return false } - if canonicalizeSchemaString(desired.Type, desired.Schema) != canonicalizeSchemaString(current.Type, current.Schema) { + if canonicalizeSchemaString(desired.Schema) != canonicalizeSchemaString(current.Schema) { return false } return maps.Equal(desired.Properties, current.Properties) } -func canonicalizeSchemaString(schemaType, schema string) string { +func canonicalizeSchemaString(schema string) string { trimmed := strings.TrimSpace(schema) if trimmed == "" { return "" From 41f8f95ca9d5a5f77549f2cbb3b51fa23c5dfaa7 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sun, 5 Oct 2025 20:34:39 +0800 Subject: [PATCH 4/5] revert changes --- pkg/connection/reconcile_topic.go | 63 +------------------------------ 1 file changed, 2 insertions(+), 61 deletions(-) diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index 1c4d1e81..f9f0a725 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -15,13 +15,10 @@ package connection import ( - "bytes" "context" - "encoding/json" "fmt" - "maps" + "reflect" "slices" - "strings" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/go-logr/logr" @@ -257,7 +254,7 @@ func applySchema(pulsarAdmin admin.PulsarAdmin, topic *resourcev1alpha1.PulsarTo } if topic.Spec.SchemaInfo != nil { // Only upload the schema when schema doesn't exist or the schema has been updated - if admin.IsNotFound(serr) || !schemasEqual(topic.Spec.SchemaInfo, schema) { + if admin.IsNotFound(serr) || !reflect.DeepEqual(topic.Spec.SchemaInfo, schema) { info := topic.Spec.SchemaInfo param := &admin.SchemaParams{ Type: info.Type, @@ -277,62 +274,6 @@ func applySchema(pulsarAdmin admin.PulsarAdmin, topic *resourcev1alpha1.PulsarTo return nil } -func schemasEqual(desired, current *resourcev1alpha1.SchemaInfo) bool { - if desired == nil && current == nil { - return true - } - if desired == nil || current == nil { - return false - } - if desired.Type != current.Type { - return false - } - if canonicalizeSchemaString(desired.Schema) != canonicalizeSchemaString(current.Schema) { - return false - } - return maps.Equal(desired.Properties, current.Properties) -} - -func canonicalizeSchemaString(schema string) string { - trimmed := strings.TrimSpace(schema) - if trimmed == "" { - return "" - } - if looksLikeJSON(trimmed) { - if normalized, err := normalizeJSON(trimmed); err == nil { - return normalized - } - } - return trimmed -} - -func looksLikeJSON(schema string) bool { - if schema == "" { - return false - } - first := schema[0] - return first == '{' || first == '[' -} - -func normalizeJSON(schema string) (string, error) { - decoder := json.NewDecoder(strings.NewReader(schema)) - decoder.UseNumber() - var data any - if err := decoder.Decode(&data); err != nil { - return "", err - } - if decoder.More() { - return "", fmt.Errorf("unexpected trailing data in schema JSON") - } - buf := bytes.NewBuffer(nil) - encoder := json.NewEncoder(buf) - encoder.SetEscapeHTML(false) - if err := encoder.Encode(data); err != nil { - return "", err - } - return strings.TrimSpace(buf.String()), nil -} - func createTopicParams(topic *resourcev1alpha1.PulsarTopic) *admin.TopicParams { return &admin.TopicParams{ Persistent: topic.Spec.Persistent, From 8f750c7a7428148e78400665d012ef18d3af3718 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sun, 5 Oct 2025 20:44:44 +0800 Subject: [PATCH 5/5] fix --- pkg/connection/reconcile_topic_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/connection/reconcile_topic_test.go b/pkg/connection/reconcile_topic_test.go index 9bc0b770..c8a271e0 100644 --- a/pkg/connection/reconcile_topic_test.go +++ b/pkg/connection/reconcile_topic_test.go @@ -15,6 +15,7 @@ package connection import ( + "reflect" "testing" "github.com/go-logr/logr" @@ -186,8 +187,8 @@ func TestSchemasEqual(t *testing.T) { tc := cases[i] t.Run(tc.name, func(t *testing.T) { t.Parallel() - if got := schemasEqual(tc.desired, tc.current); got != tc.expected { - t.Fatalf("schemasEqual() = %v, want %v", got, tc.expected) + if got := reflect.DeepEqual(tc.desired, tc.current); got != tc.expected { + t.Fatalf("reflect.DeepEqual() = %v, want %v", got, tc.expected) } }) }