Skip to content

Commit 45772eb

Browse files
authored
refactor: refactor kafka cluster reconcile with the latest operator-go framework (#142)
* refactor: refactor kafka cluster reconcile with the latest operator-go framework Refactored the Kafka coordination logic to align with the latest operator-go development framework. The following changes have been made: - Implemented discovery using listener for better modularity. - Replaced the previous getService sidecar with listener for internal Kafka cluster services. - Added TLS support via listener volumes for secure communication. - When mounting a listener volume in a pod, ensure at least one port is configured. This is required due to implementation specifics of the listener mechanism. * ci: fix lint error * ci: set max-parallel jobs to 2 in GitHub Actions workflow * test: comment out namespace and skipDelete in chainsaw configuration * feat: add RBAC permissions for listeners
1 parent 67eb3b5 commit 45772eb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+1937
-7403
lines changed

.github/workflows/test.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
matrix:
3535
k8s-version: ['1.26.15', '1.27.16']
3636
product-version: ['3.7.1', '3.8.0']
37-
# max-parallel: 1
37+
max-parallel: 2
3838
steps:
3939
- name: Clone the code
4040
uses: actions/checkout@v4

api/v1alpha1/kafkacluster_types.go

+34-96
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"github.com/zncdatadev/operator-go/pkg/status"
2121
corev1 "k8s.io/api/core/v1"
2222
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
24+
commonsv1alpha1 "github.com/zncdatadev/operator-go/pkg/apis/commons/v1alpha1"
2325
)
2426

2527
const (
@@ -48,18 +50,20 @@ const (
4850
ImageTag = "3.7.1-kubedoop0.0.0-dev"
4951
ImagePullPolicy = corev1.PullIfNotPresent
5052

51-
KubedoopKafkaDataDirName = "data" // kafka log dirs
52-
KubedoopLogConfigDirName = "log-config"
53-
KubedoopConfigDirName = "config"
54-
KubedoopTmpDirName = "tmp"
55-
KubedoopLogDirName = "log"
56-
57-
KubedoopRoot = "/kubedoop"
58-
KubedoopTmpDir = KubedoopRoot + "/tmp"
59-
KubedoopDataDir = KubedoopRoot + "/data"
60-
KubedoopConfigDir = KubedoopRoot + "/config"
61-
KubedoopLogConfigDir = KubedoopRoot + "/log_config"
62-
KubedoopLogDir = KubedoopRoot + "/log"
53+
KubedoopKafkaDataDirName = "data" // kafka log dirs
54+
KubedoopLogConfigDirName = "log-config"
55+
KubedoopConfigDirName = "config"
56+
KubedoopLogDirName = "log"
57+
KubedoopListenerBroker = "listener-broker"
58+
KubedoopListenerBootstrap = "listener-bootstrap"
59+
60+
KubedoopRoot = "/kubedoop"
61+
KubedoopDataDir = KubedoopRoot + "/data"
62+
KubedoopConfigDir = KubedoopRoot + "/config"
63+
KubedoopLogConfigDir = KubedoopRoot + "/log_config"
64+
KubedoopLogDir = KubedoopRoot + "/log"
65+
KubedoopListenerBrokerDir = KubedoopRoot + "/listener-broker"
66+
KubedoopListenerBootstrapDir = KubedoopRoot + "/listener-bootstrap"
6367
)
6468

6569
// +kubebuilder:object:root=true
@@ -91,20 +95,20 @@ type KafkaClusterSpec struct {
9195
// +kubebuilder:validation:Required
9296
ClusterConfig *ClusterConfigSpec `json:"clusterConfig,omitempty"`
9397

98+
// +kubebuilder:validation:Optional
99+
ClusterOperation *commonsv1alpha1.ClusterOperationSpec `json:"clusterOperation,omitempty"`
100+
94101
// +kubebuilder:validation:Required
95102
Brokers *BrokersSpec `json:"brokers,omitempty"`
96103
}
97104

98105
type ClusterConfigSpec struct {
99-
// +kubebuilder:validation:Optional
100-
Service *ServiceSpec `json:"service,omitempty"`
101-
102106
// +kubebuilder:validation:Optional
103107
// +kubebuilder:default:="cluster.local"
104108
ClusterDomain string `json:"clusterDomain,omitempty"`
105109

106110
// +kubebuilder:validation:Optional
107-
Tls *TlsSpec `json:"tls,omitempty"`
111+
Tls *KafkaTlsSpec `json:"tls,omitempty"`
108112

109113
// +kubebuilder:validation:Optional
110114
VectorAggregatorConfigMapName string `json:"vectorAggregatorConfigMapName,omitempty"`
@@ -113,7 +117,7 @@ type ClusterConfigSpec struct {
113117
ZookeeperConfigMapName string `json:"zookeeperConfigMapName,omitempty"`
114118
}
115119

116-
type TlsSpec struct {
120+
type KafkaTlsSpec struct {
117121
// The SecretClass to use for internal broker communication. Use mutual verification between brokers (mandatory).
118122
// This setting controls: - Which cert the brokers should use to authenticate themselves against other brokers -
119123
// Which ca.crt to use when validating the other brokers Defaults to tls
@@ -154,16 +158,9 @@ type BrokersSpec struct {
154158
RoleGroups map[string]*BrokersRoleGroupSpec `json:"roleGroups,omitempty"`
155159

156160
// +kubebuilder:validation:Optional
157-
PodDisruptionBudget *PodDisruptionBudgetSpec `json:"podDisruptionBudget,omitempty"`
158-
159-
// +kubebuilder:validation:Optional
160-
CliOverrides []string `json:"cliOverrides,omitempty"`
161-
162-
// +kubebuilder:validation:Optional
163-
ConfigOverrides *ConfigOverridesSpec `json:"configOverrides,omitempty"`
161+
Roleconfig *commonsv1alpha1.RoleConfigSpec `json:"roleconfig,omitempty"`
164162

165-
// +kubebuilder:validation:Optional
166-
EnvOverrides map[string]string `json:"envOverrides,omitempty"`
163+
*commonsv1alpha1.OverridesSpec `json:",inline"`
167164
}
168165

169166
type BrokersRoleGroupSpec struct {
@@ -174,91 +171,32 @@ type BrokersRoleGroupSpec struct {
174171
// +kubebuilder:validation:Required
175172
Config *BrokersConfigSpec `json:"config,omitempty"`
176173

177-
// +kubebuilder:validation:Optional
178-
CliOverrides []string `json:"cliOverrides,omitempty"`
179-
180-
// +kubebuilder:validation:Optional
181-
ConfigOverrides *ConfigOverridesSpec `json:"configOverrides,omitempty"`
182-
183-
// +kubebuilder:validation:Optional
184-
EnvOverrides map[string]string `json:"envOverrides,omitempty"`
174+
*commonsv1alpha1.OverridesSpec `json:",inline"`
185175
}
186176

187177
type BrokersConfigSpec struct {
188-
// +kubebuilder:validation:Optional
189-
Resources *ResourcesSpec `json:"resources,omitempty"`
178+
*commonsv1alpha1.RoleGroupConfigSpec `json:",inline"`
190179

180+
// The ListenerClass used for connecting to brokers. Should use a direct connection ListenerClass to minimize cost
181+
// and minimize performance overhead (such as `cluster-internal` or `external-unstable`)
191182
// +kubebuilder:validation:Optional
192183
// +kubebuilder:default:="cluster-internal"
193-
ListenerClass string `json:"listenerClass,omitempty"`
194-
195-
// +kubebuilder:validation:Optional
196-
SecurityContext *corev1.PodSecurityContext `json:"securityContext"`
197-
198-
// +kubebuilder:validation:Optional
199-
Affinity *corev1.Affinity `json:"affinity"`
200-
201-
// +kubebuilder:validation:Optional
202-
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
203-
204-
// +kubebuilder:validation:Optional
205-
Tolerations []corev1.Toleration `json:"tolerations"`
184+
BrokerListenerClass string `json:"listenerClass,omitempty"`
206185

186+
// The ListenerClass used for bootstrapping new clients. Should use a stable ListenerClass to avoid unnecessary client restarts (such as `cluster-internal` or `external-stable`).
207187
// +kubebuilder:validation:Optional
208-
PodDisruptionBudget *PodDisruptionBudgetSpec `json:"podDisruptionBudget,omitempty"`
188+
BootstrapListenerClass string `json:"bootstrapListenerClass,omitempty"`
209189

190+
// Request secret (currently only autoTls certificates) lifetime from the secret operator, e.g. `7d`, or `30d`.
191+
// Please note that this can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate.
210192
// +kubebuilder:validation:Optional
211-
StorageClass string `json:"storageClass,omitempty"`
212-
213-
// +kubebuilder:validation:Optional
214-
// +kubebuilder:default="8Gi"
215-
StorageSize string `json:"storageSize,omitempty"`
216-
217-
// +kubebuilder:validation:Optional
218-
ExtraEnv map[string]string `json:"extraEnv,omitempty"`
219-
220-
// +kubebuilder:validation:Optional
221-
ExtraSecret map[string]string `json:"extraSecret,omitempty"`
222-
223-
// +kubebuilder:validation:Optional
224-
Logging *BrokersContainerLoggingSpec `json:"logging,omitempty"`
225-
}
226-
type BrokersContainerLoggingSpec struct {
227-
// +kubebuilder:validation:Optional
228-
// +kubebuilder:default=false
229-
EnableVectorAgent bool `json:"enableVectorAgent,omitempty"`
230-
// +kubebuilder:validation:Optional
231-
Broker *LoggingConfigSpec `json:"broker,omitempty"`
193+
RequestedSecretLifeTime string `json:"requestedSecretLifeTime,omitempty"`
232194
}
233195
type ConfigOverridesSpec struct {
234-
Log4j map[string]string `json:"log4j.properties,omitempty"`
196+
Server map[string]string `json:"server.properties,omitempty"`
235197
Security map[string]string `json:"security.properties,omitempty"`
236198
}
237199

238-
type PodDisruptionBudgetSpec struct {
239-
// +kubebuilder:validation:Optional
240-
MinAvailable int32 `json:"minAvailable,omitempty"`
241-
242-
// +kubebuilder:validation:Optional
243-
MaxUnavailable int32 `json:"maxUnavailable,omitempty"`
244-
}
245-
246-
type ServiceSpec struct {
247-
// +kubebuilder:validation:Optional
248-
Annotations map[string]string `json:"annotations,omitempty"`
249-
250-
// +kubebuilder:validation:Optional
251-
// +kubebuilder:validation:enum=ClusterIP;NodePort;LoadBalancer;ExternalName
252-
// +kubebuilder:default=ClusterIP
253-
Type corev1.ServiceType `json:"type,omitempty"`
254-
255-
// +kubebuilder:validation:Optional
256-
// +kubebuilder:validation:Minimum=1
257-
// +kubebuilder:validation:Maximum=65535
258-
// +kubebuilder:default=18080
259-
Port int32 `json:"port,omitempty"`
260-
}
261-
262200
func init() {
263201
SchemeBuilder.Register(&KafkaCluster{}, &KafkaClusterList{})
264202
}

api/v1alpha1/logger.go

-37
This file was deleted.

api/v1alpha1/resources.go

-48
This file was deleted.

0 commit comments

Comments
 (0)